/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

Merge the 2.3 branch changes up to 2.4. The final changes are
too invasive to do in a stable series (IMO).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import doctest
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
37
42
from bzrlib import (
38
43
    branchbuilder,
39
44
    bzrdir,
40
 
    debug,
41
45
    errors,
42
46
    lockdir,
43
47
    memorytree,
44
48
    osutils,
45
 
    progress,
46
49
    remote,
47
50
    repository,
48
51
    symbol_versioning,
52
55
    )
53
56
from bzrlib.repofmt import (
54
57
    groupcompress_repo,
55
 
    pack_repo,
56
 
    weaverepo,
57
58
    )
58
59
from bzrlib.symbol_versioning import (
59
60
    deprecated_function,
64
65
    features,
65
66
    test_lsprof,
66
67
    test_server,
67
 
    test_sftp_transport,
68
68
    TestUtil,
69
69
    )
70
 
from bzrlib.trace import note
 
70
from bzrlib.trace import note, mutter
71
71
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
72
 
74
73
 
75
74
def _test_ids(test_suite):
77
76
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
77
 
79
78
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
79
class MetaTestLog(tests.TestCase):
93
80
 
94
81
    def test_logging(self):
100
87
            "text", "plain", {"charset": "utf8"})))
101
88
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
89
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
90
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
91
 
105
92
 
106
93
class TestUnicodeFilename(tests.TestCase):
122
109
        self.failUnlessExists(filename)
123
110
 
124
111
 
 
112
class TestClassesAvailable(tests.TestCase):
 
113
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
114
 
 
115
    def test_test_case(self):
 
116
        from bzrlib.tests import TestCase
 
117
 
 
118
    def test_test_loader(self):
 
119
        from bzrlib.tests import TestLoader
 
120
 
 
121
    def test_test_suite(self):
 
122
        from bzrlib.tests import TestSuite
 
123
 
 
124
 
125
125
class TestTransportScenarios(tests.TestCase):
126
126
    """A group of tests that test the transport implementation adaption core.
127
127
 
208
208
    def test_scenarios(self):
209
209
        # check that constructor parameters are passed through to the adapted
210
210
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
211
        from bzrlib.tests.per_controldir import make_scenarios
212
212
        vfs_factory = "v"
213
213
        server1 = "a"
214
214
        server2 = "b"
312
312
        from bzrlib.tests.per_interrepository import make_scenarios
313
313
        server1 = "a"
314
314
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
315
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
316
        scenarios = make_scenarios(server1, server2, formats)
317
317
        self.assertEqual([
318
318
            ('C0,str,str',
319
319
             {'repository_format': 'C1',
320
320
              'repository_format_to': 'C2',
321
321
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
322
              'transport_server': 'a',
 
323
              'extra_setup': 'C3'}),
323
324
            ('D0,str,str',
324
325
             {'repository_format': 'D1',
325
326
              'repository_format_to': 'D2',
326
327
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
328
              'transport_server': 'a',
 
329
              'extra_setup': 'D3'})],
328
330
            scenarios)
329
331
 
330
332
 
336
338
        from bzrlib.tests.per_workingtree import make_scenarios
337
339
        server1 = "a"
338
340
        server2 = "b"
339
 
        formats = [workingtree.WorkingTreeFormat2(),
 
341
        formats = [workingtree.WorkingTreeFormat4(),
340
342
                   workingtree.WorkingTreeFormat3(),]
341
343
        scenarios = make_scenarios(server1, server2, formats)
342
344
        self.assertEqual([
343
 
            ('WorkingTreeFormat2',
 
345
            ('WorkingTreeFormat4',
344
346
             {'bzrdir_format': formats[0]._matchingbzrdir,
345
347
              'transport_readonly_server': 'b',
346
348
              'transport_server': 'a',
373
375
            )
374
376
        server1 = "a"
375
377
        server2 = "b"
376
 
        formats = [workingtree.WorkingTreeFormat2(),
 
378
        formats = [workingtree.WorkingTreeFormat4(),
377
379
                   workingtree.WorkingTreeFormat3(),]
378
380
        scenarios = make_scenarios(server1, server2, formats)
379
381
        self.assertEqual(7, len(scenarios))
380
 
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
382
        default_wt_format = workingtree.format_registry.get_default()
381
383
        wt4_format = workingtree.WorkingTreeFormat4()
382
384
        wt5_format = workingtree.WorkingTreeFormat5()
383
385
        expected_scenarios = [
384
 
            ('WorkingTreeFormat2',
 
386
            ('WorkingTreeFormat4',
385
387
             {'bzrdir_format': formats[0]._matchingbzrdir,
386
388
              'transport_readonly_server': 'b',
387
389
              'transport_server': 'a',
447
449
        # ones to add.
448
450
        from bzrlib.tests.per_tree import (
449
451
            return_parameter,
450
 
            revision_tree_from_workingtree
451
452
            )
452
453
        from bzrlib.tests.per_intertree import (
453
454
            make_scenarios,
454
455
            )
455
 
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
456
        from bzrlib.workingtree import WorkingTreeFormat3, WorkingTreeFormat4
456
457
        input_test = TestInterTreeScenarios(
457
458
            "test_scenarios")
458
459
        server1 = "a"
459
460
        server2 = "b"
460
 
        format1 = WorkingTreeFormat2()
 
461
        format1 = WorkingTreeFormat4()
461
462
        format2 = WorkingTreeFormat3()
462
463
        formats = [("1", str, format1, format2, "converter1"),
463
464
            ("2", int, format2, format1, "converter2")]
554
555
    def test_make_branch_and_memory_tree_with_format(self):
555
556
        """make_branch_and_memory_tree should accept a format option."""
556
557
        format = bzrdir.BzrDirMetaFormat1()
557
 
        format.repository_format = weaverepo.RepositoryFormat7()
 
558
        format.repository_format = repository.format_registry.get_default()
558
559
        tree = self.make_branch_and_memory_tree('dir', format=format)
559
560
        # Guard against regression into MemoryTransport leaking
560
561
        # files to disk instead of keeping them in memory.
574
575
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
576
        # that the format objects are used.
576
577
        format = bzrdir.BzrDirMetaFormat1()
577
 
        repo_format = weaverepo.RepositoryFormat7()
 
578
        repo_format = repository.format_registry.get_default()
578
579
        format.repository_format = repo_format
579
580
        builder = self.make_branch_builder('dir', format=format)
580
581
        the_branch = builder.get_branch()
609
610
                l.attempt_lock()
610
611
        test = TestDanglingLock('test_function')
611
612
        result = test.run()
 
613
        total_failures = result.errors + result.failures
612
614
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
615
            self.assertEqual(1, len(total_failures))
614
616
        else:
615
617
            # When _lock_check_thorough is disabled, then we don't trigger a
616
618
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
619
            self.assertEqual(0, len(total_failures))
618
620
 
619
621
 
620
622
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
623
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
624
 
623
625
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
626
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
627
        self.vfs_transport_factory = memory.MemoryServer
627
628
        self.transport_readonly_server = None
629
630
        # for the server
630
631
        url = self.get_readonly_url()
631
632
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
633
        t = transport.get_transport(url)
 
634
        t2 = transport.get_transport(url2)
634
635
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
636
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
637
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
638
 
638
639
    def test_get_readonly_url_http(self):
639
640
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
641
        from bzrlib.transport.http import HttpTransportBase
642
642
        self.transport_server = test_server.LocalURLServer
643
643
        self.transport_readonly_server = HttpServer
645
645
        url = self.get_readonly_url()
646
646
        url2 = self.get_readonly_url('foo/bar')
647
647
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
650
650
        self.failUnless(isinstance(t, HttpTransportBase))
651
651
        self.failUnless(isinstance(t2, HttpTransportBase))
652
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
690
690
class TestChrootedTest(tests.ChrootedTestCase):
691
691
 
692
692
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
693
        t = transport.get_transport(self.get_readonly_url())
695
694
        url = t.base
696
695
        self.assertEqual(url, t.clone('..').base)
697
696
 
755
754
        # want to assume that thats *all* that would happen.
756
755
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
757
756
 
758
 
    def test_assigned_benchmark_file_stores_date(self):
759
 
        self._patch_get_bzr_source_tree()
760
 
        output = StringIO()
761
 
        result = bzrlib.tests.TextTestResult(self._log_file,
762
 
                                        descriptions=0,
763
 
                                        verbosity=1,
764
 
                                        bench_history=output
765
 
                                        )
766
 
        output_string = output.getvalue()
767
 
        # if you are wondering about the regexp please read the comment in
768
 
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
 
        # XXX: what comment?  -- Andrew Bennetts
770
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
771
 
 
772
 
    def test_benchhistory_records_test_times(self):
773
 
        self._patch_get_bzr_source_tree()
774
 
        result_stream = StringIO()
775
 
        result = bzrlib.tests.TextTestResult(
776
 
            self._log_file,
777
 
            descriptions=0,
778
 
            verbosity=1,
779
 
            bench_history=result_stream
780
 
            )
781
 
 
782
 
        # we want profile a call and check that its test duration is recorded
783
 
        # make a new test instance that when run will generate a benchmark
784
 
        example_test_case = TestTestResult("_time_hello_world_encoding")
785
 
        # execute the test, which should succeed and record times
786
 
        example_test_case.run(result)
787
 
        lines = result_stream.getvalue().splitlines()
788
 
        self.assertEqual(2, len(lines))
789
 
        self.assertContainsRe(lines[1],
790
 
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
 
            "._time_hello_world_encoding")
792
 
 
793
757
    def _time_hello_world_encoding(self):
794
758
        """Profile two sleep calls
795
759
 
803
767
        self.requireFeature(test_lsprof.LSProfFeature)
804
768
        result_stream = StringIO()
805
769
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
770
            result_stream,
807
771
            descriptions=0,
808
772
            verbosity=2,
809
773
            )
835
799
        self.assertContainsRe(output,
836
800
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
801
 
 
802
    def test_uses_time_from_testtools(self):
 
803
        """Test case timings in verbose results should use testtools times"""
 
804
        import datetime
 
805
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
806
            def startTest(self, test):
 
807
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
808
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
809
            def addSuccess(self, test):
 
810
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
811
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
812
            def report_tests_starting(self): pass
 
813
        sio = StringIO()
 
814
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
815
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
816
 
838
817
    def test_known_failure(self):
839
818
        """A KnownFailure being raised should trigger several result actions."""
840
819
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
820
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
821
            def report_tests_starting(self): pass
844
822
            def report_known_failure(self, test, err=None, details=None):
845
823
                self._call = test, 'known failure'
846
824
        result = InstrumentedTestResult(None, None, None, None)
864
842
        # verbose test output formatting
865
843
        result_stream = StringIO()
866
844
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
845
            result_stream,
868
846
            descriptions=0,
869
847
            verbosity=2,
870
848
            )
880
858
        output = result_stream.getvalue()[prefix:]
881
859
        lines = output.splitlines()
882
860
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
861
        if sys.version_info > (2, 7):
 
862
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
863
                self.assertNotEqual, lines[1], '    ')
883
864
        self.assertEqual(lines[1], '    foo')
884
865
        self.assertEqual(2, len(lines))
885
866
 
893
874
        """Test the behaviour of invoking addNotSupported."""
894
875
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
876
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
877
            def report_tests_starting(self): pass
898
878
            def report_unsupported(self, test, feature):
899
879
                self._call = test, feature
900
880
        result = InstrumentedTestResult(None, None, None, None)
919
899
        # verbose test output formatting
920
900
        result_stream = StringIO()
921
901
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
902
            result_stream,
923
903
            descriptions=0,
924
904
            verbosity=2,
925
905
            )
939
919
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
920
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
921
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
922
            def report_tests_starting(self): pass
944
923
            def addNotSupported(self, test, feature):
945
924
                self._call = test, feature
946
925
        result = InstrumentedTestResult(None, None, None, None)
988
967
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
968
            calls = 0
990
969
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
970
        result = InstrumentedTestResult(None, None, None, None)
993
971
        def test_function():
994
972
            pass
996
974
        test.run(result)
997
975
        self.assertEquals(1, result.calls)
998
976
 
 
977
    def test_startTests_only_once(self):
 
978
        """With multiple tests startTests should still only be called once"""
 
979
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
980
            calls = 0
 
981
            def startTests(self): self.calls += 1
 
982
        result = InstrumentedTestResult(None, None, None, None)
 
983
        suite = unittest.TestSuite([
 
984
            unittest.FunctionTestCase(lambda: None),
 
985
            unittest.FunctionTestCase(lambda: None)])
 
986
        suite.run(result)
 
987
        self.assertEquals(1, result.calls)
 
988
        self.assertEquals(2, result.count)
 
989
 
999
990
 
1000
991
class TestUnicodeFilenameFeature(tests.TestCase):
1001
992
 
1022
1013
        because of our use of global state.
1023
1014
        """
1024
1015
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1016
        try:
1027
1017
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1018
            return testrunner.run(test)
1030
1019
        finally:
1031
1020
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1021
 
1034
1022
    def test_known_failure_failed_run(self):
1035
1023
        # run a test that generates a known failure which should be printed in
1081
1069
    def test_result_decorator(self):
1082
1070
        # decorate results
1083
1071
        calls = []
1084
 
        class LoggingDecorator(tests.ForwardingResult):
 
1072
        class LoggingDecorator(ExtendedToOriginalDecorator):
1085
1073
            def startTest(self, test):
1086
 
                tests.ForwardingResult.startTest(self, test)
 
1074
                ExtendedToOriginalDecorator.startTest(self, test)
1087
1075
                calls.append('start')
1088
1076
        test = unittest.FunctionTestCase(lambda:None)
1089
1077
        stream = StringIO()
1199
1187
            return None
1200
1188
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1201
1189
 
1202
 
    def test_bench_history(self):
1203
 
        # tests that the running the benchmark passes bench_history into
1204
 
        # the test result object. We can tell that happens if
1205
 
        # _get_bzr_source_tree is called.
1206
 
        self._patch_get_bzr_source_tree()
1207
 
        test = TestRunner('dummy_test')
1208
 
        output = StringIO()
1209
 
        runner = tests.TextTestRunner(stream=self._log_file,
1210
 
                                      bench_history=output)
1211
 
        result = self.run_test_runner(runner, test)
1212
 
        output_string = output.getvalue()
1213
 
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
 
        self.assertLength(1, self._get_source_tree_calls)
 
1190
    def test_verbose_test_count(self):
 
1191
        """A verbose test run reports the right test count at the start"""
 
1192
        suite = TestUtil.TestSuite([
 
1193
            unittest.FunctionTestCase(lambda:None),
 
1194
            unittest.FunctionTestCase(lambda:None)])
 
1195
        self.assertEqual(suite.countTestCases(), 2)
 
1196
        stream = StringIO()
 
1197
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1198
        # Need to use the CountingDecorator as that's what sets num_tests
 
1199
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1200
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1215
1201
 
1216
1202
    def test_startTestRun(self):
1217
1203
        """run should call result.startTestRun()"""
1218
1204
        calls = []
1219
 
        class LoggingDecorator(tests.ForwardingResult):
 
1205
        class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1206
            def startTestRun(self):
1221
 
                tests.ForwardingResult.startTestRun(self)
 
1207
                ExtendedToOriginalDecorator.startTestRun(self)
1222
1208
                calls.append('startTestRun')
1223
1209
        test = unittest.FunctionTestCase(lambda:None)
1224
1210
        stream = StringIO()
1230
1216
    def test_stopTestRun(self):
1231
1217
        """run should call result.stopTestRun()"""
1232
1218
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1219
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1220
            def stopTestRun(self):
1235
 
                tests.ForwardingResult.stopTestRun(self)
 
1221
                ExtendedToOriginalDecorator.stopTestRun(self)
1236
1222
                calls.append('stopTestRun')
1237
1223
        test = unittest.FunctionTestCase(lambda:None)
1238
1224
        stream = StringIO()
1241
1227
        result = self.run_test_runner(runner, test)
1242
1228
        self.assertLength(1, calls)
1243
1229
 
 
1230
    def test_unicode_test_output_on_ascii_stream(self):
 
1231
        """Showing results should always succeed even on an ascii console"""
 
1232
        class FailureWithUnicode(tests.TestCase):
 
1233
            def test_log_unicode(self):
 
1234
                self.log(u"\u2606")
 
1235
                self.fail("Now print that log!")
 
1236
        out = StringIO()
 
1237
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1238
            lambda trace=False: "ascii")
 
1239
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1240
            FailureWithUnicode("test_log_unicode"))
 
1241
        self.assertContainsRe(out.getvalue(),
 
1242
            "Text attachment: log\n"
 
1243
            "-+\n"
 
1244
            "\d+\.\d+  \\\\u2606\n"
 
1245
            "-+\n")
 
1246
 
1244
1247
 
1245
1248
class SampleTestCase(tests.TestCase):
1246
1249
 
1421
1424
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1425
        output_stream = StringIO()
1423
1426
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1427
            output_stream,
1425
1428
            descriptions=0,
1426
1429
            verbosity=2)
1427
1430
        sample_test.run(result)
1434
1437
        # Note this test won't fail with hooks that the core library doesn't
1435
1438
        # use - but it trigger with a plugin that adds hooks, so its still a
1436
1439
        # useful warning in that case.
1437
 
        self.assertEqual(bzrlib.branch.BranchHooks(),
1438
 
            bzrlib.branch.Branch.hooks)
1439
 
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1440
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1441
        self.assertEqual(
 
1442
            bzrlib.smart.server.SmartServerHooks(),
1440
1443
            bzrlib.smart.server.SmartTCPServer.hooks)
1441
 
        self.assertEqual(bzrlib.commands.CommandHooks(),
1442
 
            bzrlib.commands.Command.hooks)
 
1444
        self.assertEqual(
 
1445
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1443
1446
 
1444
1447
    def test__gather_lsprof_in_benchmarks(self):
1445
1448
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1655
1658
        self.assertEqual('original', obj.test_attr)
1656
1659
 
1657
1660
 
 
1661
class _MissingFeature(tests.Feature):
 
1662
    def _probe(self):
 
1663
        return False
 
1664
missing_feature = _MissingFeature()
 
1665
 
 
1666
 
 
1667
def _get_test(name):
 
1668
    """Get an instance of a specific example test.
 
1669
 
 
1670
    We protect this in a function so that they don't auto-run in the test
 
1671
    suite.
 
1672
    """
 
1673
 
 
1674
    class ExampleTests(tests.TestCase):
 
1675
 
 
1676
        def test_fail(self):
 
1677
            mutter('this was a failing test')
 
1678
            self.fail('this test will fail')
 
1679
 
 
1680
        def test_error(self):
 
1681
            mutter('this test errored')
 
1682
            raise RuntimeError('gotcha')
 
1683
 
 
1684
        def test_missing_feature(self):
 
1685
            mutter('missing the feature')
 
1686
            self.requireFeature(missing_feature)
 
1687
 
 
1688
        def test_skip(self):
 
1689
            mutter('this test will be skipped')
 
1690
            raise tests.TestSkipped('reason')
 
1691
 
 
1692
        def test_success(self):
 
1693
            mutter('this test succeeds')
 
1694
 
 
1695
        def test_xfail(self):
 
1696
            mutter('test with expected failure')
 
1697
            self.knownFailure('this_fails')
 
1698
 
 
1699
        def test_unexpected_success(self):
 
1700
            mutter('test with unexpected success')
 
1701
            self.expectFailure('should_fail', lambda: None)
 
1702
 
 
1703
    return ExampleTests(name)
 
1704
 
 
1705
 
 
1706
class TestTestCaseLogDetails(tests.TestCase):
 
1707
 
 
1708
    def _run_test(self, test_name):
 
1709
        test = _get_test(test_name)
 
1710
        result = testtools.TestResult()
 
1711
        test.run(result)
 
1712
        return result
 
1713
 
 
1714
    def test_fail_has_log(self):
 
1715
        result = self._run_test('test_fail')
 
1716
        self.assertEqual(1, len(result.failures))
 
1717
        result_content = result.failures[0][1]
 
1718
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1719
        self.assertContainsRe(result_content, 'this was a failing test')
 
1720
 
 
1721
    def test_error_has_log(self):
 
1722
        result = self._run_test('test_error')
 
1723
        self.assertEqual(1, len(result.errors))
 
1724
        result_content = result.errors[0][1]
 
1725
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1726
        self.assertContainsRe(result_content, 'this test errored')
 
1727
 
 
1728
    def test_skip_has_no_log(self):
 
1729
        result = self._run_test('test_skip')
 
1730
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1731
        skips = result.skip_reasons['reason']
 
1732
        self.assertEqual(1, len(skips))
 
1733
        test = skips[0]
 
1734
        self.assertFalse('log' in test.getDetails())
 
1735
 
 
1736
    def test_missing_feature_has_no_log(self):
 
1737
        # testtools doesn't know about addNotSupported, so it just gets
 
1738
        # considered as a skip
 
1739
        result = self._run_test('test_missing_feature')
 
1740
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1741
        skips = result.skip_reasons[missing_feature]
 
1742
        self.assertEqual(1, len(skips))
 
1743
        test = skips[0]
 
1744
        self.assertFalse('log' in test.getDetails())
 
1745
 
 
1746
    def test_xfail_has_no_log(self):
 
1747
        result = self._run_test('test_xfail')
 
1748
        self.assertEqual(1, len(result.expectedFailures))
 
1749
        result_content = result.expectedFailures[0][1]
 
1750
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1751
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1752
 
 
1753
    def test_unexpected_success_has_log(self):
 
1754
        result = self._run_test('test_unexpected_success')
 
1755
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1756
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1757
        # expectedFailures is a list of reasons?
 
1758
        test = result.unexpectedSuccesses[0]
 
1759
        details = test.getDetails()
 
1760
        self.assertTrue('log' in details)
 
1761
 
 
1762
 
 
1763
class TestTestCloning(tests.TestCase):
 
1764
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1765
 
 
1766
    def test_cloned_testcase_does_not_share_details(self):
 
1767
        """A TestCase cloned with clone_test does not share mutable attributes
 
1768
        such as details or cleanups.
 
1769
        """
 
1770
        class Test(tests.TestCase):
 
1771
            def test_foo(self):
 
1772
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1773
        orig_test = Test('test_foo')
 
1774
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1775
        orig_test.run(unittest.TestResult())
 
1776
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1777
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1778
 
 
1779
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1780
        """Applying two levels of scenarios to a test preserves the attributes
 
1781
        added by both scenarios.
 
1782
        """
 
1783
        class Test(tests.TestCase):
 
1784
            def test_foo(self):
 
1785
                pass
 
1786
        test = Test('test_foo')
 
1787
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1788
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1789
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1790
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1791
        all_tests = list(tests.iter_suite_tests(suite))
 
1792
        self.assertLength(4, all_tests)
 
1793
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1794
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1795
 
 
1796
 
1658
1797
# NB: Don't delete this; it's not actually from 0.11!
1659
1798
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1799
def sample_deprecated_function():
1787
1926
    def test_make_branch_and_tree_with_format(self):
1788
1927
        # we should be able to supply a format to make_branch_and_tree
1789
1928
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1790
 
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
1791
1929
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
1792
1930
                              bzrlib.bzrdir.BzrDirMetaFormat1)
1793
 
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
1794
 
                              bzrlib.bzrdir.BzrDirFormat6)
1795
1931
 
1796
1932
    def test_make_branch_and_memory_tree(self):
1797
1933
        # we should be able to get a new branch and a mutable tree from
1814
1950
                tree.branch.repository.bzrdir.root_transport)
1815
1951
 
1816
1952
 
1817
 
class SelfTestHelper:
 
1953
class SelfTestHelper(object):
1818
1954
 
1819
1955
    def run_selftest(self, **kwargs):
1820
1956
        """Run selftest returning its output."""
1880
2016
            def __call__(test, result):
1881
2017
                test.run(result)
1882
2018
            def run(test, result):
1883
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2019
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1884
2020
                calls.append("called")
1885
2021
            def countTestCases(self):
1886
2022
                return 1
1971
2107
            load_list='missing file name', list_only=True)
1972
2108
 
1973
2109
 
 
2110
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2111
 
 
2112
    _test_needs_features = [features.subunit]
 
2113
 
 
2114
    def run_subunit_stream(self, test_name):
 
2115
        from subunit import ProtocolTestCase
 
2116
        def factory():
 
2117
            return TestUtil.TestSuite([_get_test(test_name)])
 
2118
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2119
            test_suite_factory=factory)
 
2120
        test = ProtocolTestCase(stream)
 
2121
        result = testtools.TestResult()
 
2122
        test.run(result)
 
2123
        content = stream.getvalue()
 
2124
        return content, result
 
2125
 
 
2126
    def test_fail_has_log(self):
 
2127
        content, result = self.run_subunit_stream('test_fail')
 
2128
        self.assertEqual(1, len(result.failures))
 
2129
        self.assertContainsRe(content, '(?m)^log$')
 
2130
        self.assertContainsRe(content, 'this test will fail')
 
2131
 
 
2132
    def test_error_has_log(self):
 
2133
        content, result = self.run_subunit_stream('test_error')
 
2134
        self.assertContainsRe(content, '(?m)^log$')
 
2135
        self.assertContainsRe(content, 'this test errored')
 
2136
 
 
2137
    def test_skip_has_no_log(self):
 
2138
        content, result = self.run_subunit_stream('test_skip')
 
2139
        self.assertNotContainsRe(content, '(?m)^log$')
 
2140
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2141
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2142
        skips = result.skip_reasons['reason']
 
2143
        self.assertEqual(1, len(skips))
 
2144
        test = skips[0]
 
2145
        # RemotedTestCase doesn't preserve the "details"
 
2146
        ## self.assertFalse('log' in test.getDetails())
 
2147
 
 
2148
    def test_missing_feature_has_no_log(self):
 
2149
        content, result = self.run_subunit_stream('test_missing_feature')
 
2150
        self.assertNotContainsRe(content, '(?m)^log$')
 
2151
        self.assertNotContainsRe(content, 'missing the feature')
 
2152
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2153
        skips = result.skip_reasons['_MissingFeature\n']
 
2154
        self.assertEqual(1, len(skips))
 
2155
        test = skips[0]
 
2156
        # RemotedTestCase doesn't preserve the "details"
 
2157
        ## self.assertFalse('log' in test.getDetails())
 
2158
 
 
2159
    def test_xfail_has_no_log(self):
 
2160
        content, result = self.run_subunit_stream('test_xfail')
 
2161
        self.assertNotContainsRe(content, '(?m)^log$')
 
2162
        self.assertNotContainsRe(content, 'test with expected failure')
 
2163
        self.assertEqual(1, len(result.expectedFailures))
 
2164
        result_content = result.expectedFailures[0][1]
 
2165
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2166
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2167
 
 
2168
    def test_unexpected_success_has_log(self):
 
2169
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2170
        self.assertContainsRe(content, '(?m)^log$')
 
2171
        self.assertContainsRe(content, 'test with unexpected success')
 
2172
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2173
                           ' as a plain success',
 
2174
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2175
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2176
        test = result.unexpectedSuccesses[0]
 
2177
        # RemotedTestCase doesn't preserve the "details"
 
2178
        ## self.assertTrue('log' in test.getDetails())
 
2179
 
 
2180
    def test_success_has_no_log(self):
 
2181
        content, result = self.run_subunit_stream('test_success')
 
2182
        self.assertEqual(1, result.testsRun)
 
2183
        self.assertNotContainsRe(content, '(?m)^log$')
 
2184
        self.assertNotContainsRe(content, 'this test succeeds')
 
2185
 
 
2186
 
1974
2187
class TestRunBzr(tests.TestCase):
1975
2188
 
1976
2189
    out = ''
2339
2552
            os.chdir = orig_chdir
2340
2553
        self.assertEqual(['foo', 'current'], chdirs)
2341
2554
 
 
2555
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2556
        self.get_source_path = lambda: ""
 
2557
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2558
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2559
 
2342
2560
 
2343
2561
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2562
    """Tests that really need to do things with an external bzr."""
2934
3152
        tpr.register('bar', 'bBB.aAA.rRR')
2935
3153
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2936
3154
        self.assertThat(self.get_log(),
2937
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3155
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3156
                           doctest.ELLIPSIS))
2938
3157
 
2939
3158
    def test_get_unknown_prefix(self):
2940
3159
        tpr = self._get_registry()
2960
3179
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3180
 
2962
3181
 
 
3182
class TestThreadLeakDetection(tests.TestCase):
 
3183
    """Ensure when tests leak threads we detect and report it"""
 
3184
 
 
3185
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3186
        def __init__(self):
 
3187
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3188
            self.leaks = []
 
3189
        def _report_thread_leak(self, test, leaks, alive):
 
3190
            self.leaks.append((test, leaks))
 
3191
 
 
3192
    def test_testcase_without_addCleanups(self):
 
3193
        """Check old TestCase instances don't break with leak detection"""
 
3194
        class Test(unittest.TestCase):
 
3195
            def runTest(self):
 
3196
                pass
 
3197
        result = self.LeakRecordingResult()
 
3198
        test = Test()
 
3199
        result.startTestRun()
 
3200
        test.run(result)
 
3201
        result.stopTestRun()
 
3202
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3203
        self.assertEqual(result.leaks, [])
 
3204
        
 
3205
    def test_thread_leak(self):
 
3206
        """Ensure a thread that outlives the running of a test is reported
 
3207
 
 
3208
        Uses a thread that blocks on an event, and is started by the inner
 
3209
        test case. As the thread outlives the inner case's run, it should be
 
3210
        detected as a leak, but the event is then set so that the thread can
 
3211
        be safely joined in cleanup so it's not leaked for real.
 
3212
        """
 
3213
        event = threading.Event()
 
3214
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3215
        class Test(tests.TestCase):
 
3216
            def test_leak(self):
 
3217
                thread.start()
 
3218
        result = self.LeakRecordingResult()
 
3219
        test = Test("test_leak")
 
3220
        self.addCleanup(thread.join)
 
3221
        self.addCleanup(event.set)
 
3222
        result.startTestRun()
 
3223
        test.run(result)
 
3224
        result.stopTestRun()
 
3225
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3226
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3227
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3228
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3229
 
 
3230
    def test_multiple_leaks(self):
 
3231
        """Check multiple leaks are blamed on the test cases at fault
 
3232
 
 
3233
        Same concept as the previous test, but has one inner test method that
 
3234
        leaks two threads, and one that doesn't leak at all.
 
3235
        """
 
3236
        event = threading.Event()
 
3237
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3238
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3239
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3240
        class Test(tests.TestCase):
 
3241
            def test_first_leak(self):
 
3242
                thread_b.start()
 
3243
            def test_second_no_leak(self):
 
3244
                pass
 
3245
            def test_third_leak(self):
 
3246
                thread_c.start()
 
3247
                thread_a.start()
 
3248
        result = self.LeakRecordingResult()
 
3249
        first_test = Test("test_first_leak")
 
3250
        third_test = Test("test_third_leak")
 
3251
        self.addCleanup(thread_a.join)
 
3252
        self.addCleanup(thread_b.join)
 
3253
        self.addCleanup(thread_c.join)
 
3254
        self.addCleanup(event.set)
 
3255
        result.startTestRun()
 
3256
        unittest.TestSuite(
 
3257
            [first_test, Test("test_second_no_leak"), third_test]
 
3258
            ).run(result)
 
3259
        result.stopTestRun()
 
3260
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3261
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3262
        self.assertEqual(result.leaks, [
 
3263
            (first_test, set([thread_b])),
 
3264
            (third_test, set([thread_a, thread_c]))])
 
3265
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3266
 
 
3267
 
 
3268
class TestPostMortemDebugging(tests.TestCase):
 
3269
    """Check post mortem debugging works when tests fail or error"""
 
3270
 
 
3271
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3272
        def __init__(self):
 
3273
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3274
            self.postcode = None
 
3275
        def _post_mortem(self, tb=None):
 
3276
            """Record the code object at the end of the current traceback"""
 
3277
            tb = tb or sys.exc_info()[2]
 
3278
            if tb is not None:
 
3279
                next = tb.tb_next
 
3280
                while next is not None:
 
3281
                    tb = next
 
3282
                    next = next.tb_next
 
3283
                self.postcode = tb.tb_frame.f_code
 
3284
        def report_error(self, test, err):
 
3285
            pass
 
3286
        def report_failure(self, test, err):
 
3287
            pass
 
3288
 
 
3289
    def test_location_unittest_error(self):
 
3290
        """Needs right post mortem traceback with erroring unittest case"""
 
3291
        class Test(unittest.TestCase):
 
3292
            def runTest(self):
 
3293
                raise RuntimeError
 
3294
        result = self.TracebackRecordingResult()
 
3295
        Test().run(result)
 
3296
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3297
 
 
3298
    def test_location_unittest_failure(self):
 
3299
        """Needs right post mortem traceback with failing unittest case"""
 
3300
        class Test(unittest.TestCase):
 
3301
            def runTest(self):
 
3302
                raise self.failureException
 
3303
        result = self.TracebackRecordingResult()
 
3304
        Test().run(result)
 
3305
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3306
 
 
3307
    def test_location_bt_error(self):
 
3308
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3309
        class Test(tests.TestCase):
 
3310
            def test_error(self):
 
3311
                raise RuntimeError
 
3312
        result = self.TracebackRecordingResult()
 
3313
        Test("test_error").run(result)
 
3314
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3315
 
 
3316
    def test_location_bt_failure(self):
 
3317
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3318
        class Test(tests.TestCase):
 
3319
            def test_failure(self):
 
3320
                raise self.failureException
 
3321
        result = self.TracebackRecordingResult()
 
3322
        Test("test_failure").run(result)
 
3323
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3324
 
 
3325
    def test_env_var_triggers_post_mortem(self):
 
3326
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3327
        import pdb
 
3328
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3329
        post_mortem_calls = []
 
3330
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3331
        self.overrideEnv('BZR_TEST_PDB', None)
 
3332
        result._post_mortem(1)
 
3333
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3334
        result._post_mortem(2)
 
3335
        self.assertEqual([2], post_mortem_calls)
 
3336
 
 
3337
 
2963
3338
class TestRunSuite(tests.TestCase):
2964
3339
 
2965
3340
    def test_runner_class(self):
2976
3351
                                                self.verbosity)
2977
3352
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3353
        self.assertLength(1, calls)
 
3354
 
 
3355
 
 
3356
class TestEnvironHandling(tests.TestCase):
 
3357
 
 
3358
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3359
        self.failIf('MYVAR' in os.environ)
 
3360
        self.overrideEnv('MYVAR', '42')
 
3361
        # We use an embedded test to make sure we fix the _captureVar bug
 
3362
        class Test(tests.TestCase):
 
3363
            def test_me(self):
 
3364
                # The first call save the 42 value
 
3365
                self.overrideEnv('MYVAR', None)
 
3366
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3367
                # Make sure we can call it twice
 
3368
                self.overrideEnv('MYVAR', None)
 
3369
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3370
        output = StringIO()
 
3371
        result = tests.TextTestResult(output, 0, 1)
 
3372
        Test('test_me').run(result)
 
3373
        if not result.wasStrictlySuccessful():
 
3374
            self.fail(output.getvalue())
 
3375
        # We get our value back
 
3376
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3377
 
 
3378
 
 
3379
class TestIsolatedEnv(tests.TestCase):
 
3380
    """Test isolating tests from os.environ.
 
3381
 
 
3382
    Since we use tests that are already isolated from os.environ a bit of care
 
3383
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3384
    The tests start an already clean os.environ which allow doing valid
 
3385
    assertions about which variables are present or not and design tests around
 
3386
    these assertions.
 
3387
    """
 
3388
 
 
3389
    class ScratchMonkey(tests.TestCase):
 
3390
 
 
3391
        def test_me(self):
 
3392
            pass
 
3393
 
 
3394
    def test_basics(self):
 
3395
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3396
        # for tests.TestCase.
 
3397
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3398
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3399
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3400
        self.assertFalse('BZR_HOME' in os.environ)
 
3401
        # Make sure we know the definition of LINES: part of os.environ for
 
3402
        # tests.TestCase
 
3403
        self.assertTrue('LINES' in tests.isolated_environ)
 
3404
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3405
        self.assertEquals('25', os.environ['LINES'])
 
3406
 
 
3407
    def test_injecting_unknown_variable(self):
 
3408
        # BZR_HOME is known to be absent from os.environ
 
3409
        test = self.ScratchMonkey('test_me')
 
3410
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3411
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3412
        tests.restore_os_environ(test)
 
3413
        self.assertFalse('BZR_HOME' in os.environ)
 
3414
 
 
3415
    def test_injecting_known_variable(self):
 
3416
        test = self.ScratchMonkey('test_me')
 
3417
        # LINES is known to be present in os.environ
 
3418
        tests.override_os_environ(test, {'LINES': '42'})
 
3419
        self.assertEquals('42', os.environ['LINES'])
 
3420
        tests.restore_os_environ(test)
 
3421
        self.assertEquals('25', os.environ['LINES'])
 
3422
 
 
3423
    def test_deleting_variable(self):
 
3424
        test = self.ScratchMonkey('test_me')
 
3425
        # LINES is known to be present in os.environ
 
3426
        tests.override_os_environ(test, {'LINES': None})
 
3427
        self.assertTrue('LINES' not in os.environ)
 
3428
        tests.restore_os_environ(test)
 
3429
        self.assertEquals('25', os.environ['LINES'])
 
3430
 
 
3431
 
 
3432
class TestDocTestSuiteIsolation(tests.TestCase):
 
3433
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3434
 
 
3435
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3436
    the clean environment as a base for testing. To precisely capture the
 
3437
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3438
    compare against.
 
3439
 
 
3440
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3441
    not `os.environ` so each test overrides it to suit its needs.
 
3442
 
 
3443
    """
 
3444
 
 
3445
    def get_doctest_suite_for_string(self, klass, string):
 
3446
        class Finder(doctest.DocTestFinder):
 
3447
 
 
3448
            def find(*args, **kwargs):
 
3449
                test = doctest.DocTestParser().get_doctest(
 
3450
                    string, {}, 'foo', 'foo.py', 0)
 
3451
                return [test]
 
3452
 
 
3453
        suite = klass(test_finder=Finder())
 
3454
        return suite
 
3455
 
 
3456
    def run_doctest_suite_for_string(self, klass, string):
 
3457
        suite = self.get_doctest_suite_for_string(klass, string)
 
3458
        output = StringIO()
 
3459
        result = tests.TextTestResult(output, 0, 1)
 
3460
        suite.run(result)
 
3461
        return result, output
 
3462
 
 
3463
    def assertDocTestStringSucceds(self, klass, string):
 
3464
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3465
        if not result.wasStrictlySuccessful():
 
3466
            self.fail(output.getvalue())
 
3467
 
 
3468
    def assertDocTestStringFails(self, klass, string):
 
3469
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3470
        if result.wasStrictlySuccessful():
 
3471
            self.fail(output.getvalue())
 
3472
 
 
3473
    def test_injected_variable(self):
 
3474
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3475
        test = """
 
3476
            >>> import os
 
3477
            >>> os.environ['LINES']
 
3478
            '42'
 
3479
            """
 
3480
        # doctest.DocTestSuite fails as it sees '25'
 
3481
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3482
        # tests.DocTestSuite sees '42'
 
3483
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3484
 
 
3485
    def test_deleted_variable(self):
 
3486
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3487
        test = """
 
3488
            >>> import os
 
3489
            >>> os.environ.get('LINES')
 
3490
            """
 
3491
        # doctest.DocTestSuite fails as it sees '25'
 
3492
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3493
        # tests.DocTestSuite sees None
 
3494
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)