/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Martin
  • Date: 2009-11-07 08:02:13 UTC
  • mfrom: (4789 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4809.
  • Revision ID: gzlist@googlemail.com-20091107080213-jad185091b3l69ih
Merge bzr.dev 4789 to resolve conflict from the disabling of plink auto-detection, and relocate NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
 
21
import signal
21
22
import sys
22
23
import time
23
24
import unittest
37
38
    repository,
38
39
    symbol_versioning,
39
40
    tests,
 
41
    transport,
40
42
    workingtree,
41
43
    )
42
44
from bzrlib.repofmt import (
 
45
    groupcompress_repo,
43
46
    pack_repo,
44
47
    weaverepo,
45
48
    )
49
52
    deprecated_method,
50
53
    )
51
54
from bzrlib.tests import (
 
55
    SubUnitFeature,
52
56
    test_lsprof,
53
57
    test_sftp_transport,
54
58
    TestUtil,
124
128
        self.assertEqual(sample_permutation,
125
129
                         get_transport_test_permutations(MockModule()))
126
130
 
127
 
    def test_scenarios_invlude_all_modules(self):
 
131
    def test_scenarios_include_all_modules(self):
128
132
        # this checks that the scenario generator returns as many permutations
129
133
        # as there are in all the registered transport modules - we assume if
130
134
        # this matches its probably doing the right thing especially in
215
219
        from bzrlib.tests.per_repository import formats_to_scenarios
216
220
        formats = [("(c)", remote.RemoteRepositoryFormat()),
217
221
                   ("(d)", repository.format_registry.get(
218
 
                        'Bazaar pack repository format 1 (needs bzr 0.92)\n'))]
 
222
                    'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
219
223
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
220
224
            None)
221
225
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
222
226
            vfs_transport_factory="vfs")
223
227
        # no_vfs generate scenarios without vfs_transport_factory
224
 
        self.assertEqual([
 
228
        expected = [
225
229
            ('RemoteRepositoryFormat(c)',
226
230
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
227
231
              'repository_format': remote.RemoteRepositoryFormat(),
228
232
              'transport_readonly_server': 'readonly',
229
233
              'transport_server': 'server'}),
230
 
            ('RepositoryFormatKnitPack1(d)',
 
234
            ('RepositoryFormat2a(d)',
231
235
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
232
 
              'repository_format': pack_repo.RepositoryFormatKnitPack1(),
 
236
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
233
237
              'transport_readonly_server': 'readonly',
234
 
              'transport_server': 'server'})],
235
 
            no_vfs_scenarios)
 
238
              'transport_server': 'server'})]
 
239
        self.assertEqual(expected, no_vfs_scenarios)
236
240
        self.assertEqual([
237
241
            ('RemoteRepositoryFormat(c)',
238
242
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
240
244
              'transport_readonly_server': 'readonly',
241
245
              'transport_server': 'server',
242
246
              'vfs_transport_factory': 'vfs'}),
243
 
            ('RepositoryFormatKnitPack1(d)',
 
247
            ('RepositoryFormat2a(d)',
244
248
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
245
 
              'repository_format': pack_repo.RepositoryFormatKnitPack1(),
 
249
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
246
250
              'transport_readonly_server': 'readonly',
247
251
              'transport_server': 'server',
248
252
              'vfs_transport_factory': 'vfs'})],
293
297
        from bzrlib.tests.per_interrepository import make_scenarios
294
298
        server1 = "a"
295
299
        server2 = "b"
296
 
        formats = [(str, "C1", "C2"), (int, "D1", "D2")]
 
300
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
297
301
        scenarios = make_scenarios(server1, server2, formats)
298
302
        self.assertEqual([
299
 
            ('str,str,str',
300
 
             {'interrepo_class': str,
301
 
              'repository_format': 'C1',
 
303
            ('C0,str,str',
 
304
             {'repository_format': 'C1',
302
305
              'repository_format_to': 'C2',
303
306
              'transport_readonly_server': 'b',
304
307
              'transport_server': 'a'}),
305
 
            ('int,str,str',
306
 
             {'interrepo_class': int,
307
 
              'repository_format': 'D1',
 
308
            ('D0,str,str',
 
309
             {'repository_format': 'D1',
308
310
              'repository_format_to': 'D2',
309
311
              'transport_readonly_server': 'b',
310
312
              'transport_server': 'a'})],
575
577
                         self.get_transport().get_bytes(
576
578
                            'dir/.bzr/repository/format'))
577
579
 
578
 
    def test_safety_net(self):
579
 
        """No test should modify the safety .bzr directory.
580
 
 
581
 
        We just test that the _check_safety_net private method raises
582
 
        AssertionError, it's easier than building a test suite with the same
583
 
        test.
584
 
        """
585
 
        # Oops, a commit in the current directory (i.e. without local .bzr
586
 
        # directory) will crawl up the hierarchy to find a .bzr directory.
587
 
        self.run_bzr(['commit', '-mfoo', '--unchanged'])
588
 
        # But we have a safety net in place.
589
 
        self.assertRaises(AssertionError, self._check_safety_net)
590
 
 
591
580
    def test_dangling_locks_cause_failures(self):
592
581
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
593
582
            def test_function(self):
597
586
                l.attempt_lock()
598
587
        test = TestDanglingLock('test_function')
599
588
        result = test.run()
600
 
        self.assertEqual(1, len(result.errors))
 
589
        if self._lock_check_thorough:
 
590
            self.assertEqual(1, len(result.errors))
 
591
        else:
 
592
            # When _lock_check_thorough is disabled, then we don't trigger a
 
593
            # failure
 
594
            self.assertEqual(0, len(result.errors))
601
595
 
602
596
 
603
597
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
681
675
        self.assertEqual(url, t.clone('..').base)
682
676
 
683
677
 
 
678
class TestProfileResult(tests.TestCase):
 
679
 
 
680
    def test_profiles_tests(self):
 
681
        self.requireFeature(test_lsprof.LSProfFeature)
 
682
        terminal = unittest.TestResult()
 
683
        result = tests.ProfileResult(terminal)
 
684
        class Sample(tests.TestCase):
 
685
            def a(self):
 
686
                self.sample_function()
 
687
            def sample_function(self):
 
688
                pass
 
689
        test = Sample("a")
 
690
        test.attrs_to_keep = test.attrs_to_keep + ('_benchcalls',)
 
691
        test.run(result)
 
692
        self.assertLength(1, test._benchcalls)
 
693
        # We must be able to unpack it as the test reporting code wants
 
694
        (_, _, _), stats = test._benchcalls[0]
 
695
        self.assertTrue(callable(stats.pprint))
 
696
 
 
697
 
684
698
class TestTestResult(tests.TestCase):
685
699
 
686
700
    def check_timing(self, test_case, expected_re):
713
727
        self.check_timing(ShortDelayTestCase('test_short_delay'),
714
728
                          r"^ +[0-9]+ms$")
715
729
 
 
730
    def _patch_get_bzr_source_tree(self):
 
731
        # Reading from the actual source tree breaks isolation, but we don't
 
732
        # want to assume that thats *all* that would happen.
 
733
        def _get_bzr_source_tree():
 
734
            return None
 
735
        orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
 
736
        bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
 
737
        def restore():
 
738
            bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
 
739
        self.addCleanup(restore)
 
740
 
716
741
    def test_assigned_benchmark_file_stores_date(self):
 
742
        self._patch_get_bzr_source_tree()
717
743
        output = StringIO()
718
744
        result = bzrlib.tests.TextTestResult(self._log_file,
719
745
                                        descriptions=0,
727
753
        self.assertContainsRe(output_string, "--date [0-9.]+")
728
754
 
729
755
    def test_benchhistory_records_test_times(self):
 
756
        self._patch_get_bzr_source_tree()
730
757
        result_stream = StringIO()
731
758
        result = bzrlib.tests.TextTestResult(
732
759
            self._log_file,
794
821
    def test_known_failure(self):
795
822
        """A KnownFailure being raised should trigger several result actions."""
796
823
        class InstrumentedTestResult(tests.ExtendedTestResult):
797
 
            def done(self): pass
 
824
            def stopTestRun(self): pass
798
825
            def startTests(self): pass
799
826
            def report_test_start(self, test): pass
800
827
            def report_known_failure(self, test, err):
801
828
                self._call = test, err
802
829
        result = InstrumentedTestResult(None, None, None, None)
803
 
        def test_function():
804
 
            raise tests.KnownFailure('failed!')
805
 
        test = unittest.FunctionTestCase(test_function)
 
830
        class Test(tests.TestCase):
 
831
            def test_function(self):
 
832
                raise tests.KnownFailure('failed!')
 
833
        test = Test("test_function")
806
834
        test.run(result)
807
835
        # it should invoke 'report_known_failure'.
808
836
        self.assertEqual(2, len(result._call))
848
876
    def test_add_not_supported(self):
849
877
        """Test the behaviour of invoking addNotSupported."""
850
878
        class InstrumentedTestResult(tests.ExtendedTestResult):
851
 
            def done(self): pass
 
879
            def stopTestRun(self): pass
852
880
            def startTests(self): pass
853
881
            def report_test_start(self, test): pass
854
882
            def report_unsupported(self, test, feature):
892
920
    def test_unavailable_exception(self):
893
921
        """An UnavailableFeature being raised should invoke addNotSupported."""
894
922
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
 
            def done(self): pass
 
923
            def stopTestRun(self): pass
896
924
            def startTests(self): pass
897
925
            def report_test_start(self, test): pass
898
926
            def addNotSupported(self, test, feature):
899
927
                self._call = test, feature
900
928
        result = InstrumentedTestResult(None, None, None, None)
901
929
        feature = tests.Feature()
902
 
        def test_function():
903
 
            raise tests.UnavailableFeature(feature)
904
 
        test = unittest.FunctionTestCase(test_function)
 
930
        class Test(tests.TestCase):
 
931
            def test_function(self):
 
932
                raise tests.UnavailableFeature(feature)
 
933
        test = Test("test_function")
905
934
        test.run(result)
906
935
        # it should invoke 'addNotSupported'.
907
936
        self.assertEqual(2, len(result._call))
924
953
                                             verbosity=1)
925
954
        test = self.get_passing_test()
926
955
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
927
 
        result._addKnownFailure(test, err)
 
956
        result.addExpectedFailure(test, err)
928
957
        self.assertFalse(result.wasStrictlySuccessful())
929
958
        self.assertEqual(None, result._extractBenchmarkTime(test))
930
959
 
975
1004
        because of our use of global state.
976
1005
        """
977
1006
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1007
        old_leak = tests.TestCase._first_thread_leaker_id
978
1008
        try:
979
1009
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1010
            tests.TestCase._first_thread_leaker_id = None
980
1011
            return testrunner.run(test)
981
1012
        finally:
982
1013
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1014
            tests.TestCase._first_thread_leaker_id = old_leak
983
1015
 
984
1016
    def test_known_failure_failed_run(self):
985
1017
        # run a test that generates a known failure which should be printed in
986
1018
        # the final output when real failures occur.
987
 
        def known_failure_test():
988
 
            raise tests.KnownFailure('failed')
 
1019
        class Test(tests.TestCase):
 
1020
            def known_failure_test(self):
 
1021
                raise tests.KnownFailure('failed')
989
1022
        test = unittest.TestSuite()
990
 
        test.addTest(unittest.FunctionTestCase(known_failure_test))
 
1023
        test.addTest(Test("known_failure_test"))
991
1024
        def failing_test():
992
1025
            raise AssertionError('foo')
993
1026
        test.addTest(unittest.FunctionTestCase(failing_test))
995
1028
        runner = tests.TextTestRunner(stream=stream)
996
1029
        result = self.run_test_runner(runner, test)
997
1030
        lines = stream.getvalue().splitlines()
998
 
        self.assertEqual([
999
 
            '',
1000
 
            '======================================================================',
1001
 
            'FAIL: unittest.FunctionTestCase (failing_test)',
1002
 
            '----------------------------------------------------------------------',
1003
 
            'Traceback (most recent call last):',
1004
 
            '    raise AssertionError(\'foo\')',
1005
 
            'AssertionError: foo',
1006
 
            '',
1007
 
            '----------------------------------------------------------------------',
1008
 
            '',
1009
 
            'FAILED (failures=1, known_failure_count=1)'],
1010
 
            lines[3:8] + lines[9:13] + lines[14:])
 
1031
        self.assertContainsRe(stream.getvalue(),
 
1032
            '(?sm)^testing.*$'
 
1033
            '.*'
 
1034
            '^======================================================================\n'
 
1035
            '^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
 
1036
            '^----------------------------------------------------------------------\n'
 
1037
            'Traceback \\(most recent call last\\):\n'
 
1038
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1039
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1040
            '.*'
 
1041
            '^----------------------------------------------------------------------\n'
 
1042
            '.*'
 
1043
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1044
            )
1011
1045
 
1012
1046
    def test_known_failure_ok_run(self):
1013
 
        # run a test that generates a known failure which should be printed in the final output.
1014
 
        def known_failure_test():
1015
 
            raise tests.KnownFailure('failed')
1016
 
        test = unittest.FunctionTestCase(known_failure_test)
 
1047
        # run a test that generates a known failure which should be printed in
 
1048
        # the final output.
 
1049
        class Test(tests.TestCase):
 
1050
            def known_failure_test(self):
 
1051
                raise tests.KnownFailure('failed')
 
1052
        test = Test("known_failure_test")
1017
1053
        stream = StringIO()
1018
1054
        runner = tests.TextTestRunner(stream=stream)
1019
1055
        result = self.run_test_runner(runner, test)
1024
1060
            '\n'
1025
1061
            'OK \\(known_failures=1\\)\n')
1026
1062
 
 
1063
    def test_result_decorator(self):
 
1064
        # decorate results
 
1065
        calls = []
 
1066
        class LoggingDecorator(tests.ForwardingResult):
 
1067
            def startTest(self, test):
 
1068
                tests.ForwardingResult.startTest(self, test)
 
1069
                calls.append('start')
 
1070
        test = unittest.FunctionTestCase(lambda:None)
 
1071
        stream = StringIO()
 
1072
        runner = tests.TextTestRunner(stream=stream,
 
1073
            result_decorators=[LoggingDecorator])
 
1074
        result = self.run_test_runner(runner, test)
 
1075
        self.assertLength(1, calls)
 
1076
 
1027
1077
    def test_skipped_test(self):
1028
1078
        # run a test that is skipped, and check the suite as a whole still
1029
1079
        # succeeds.
1082
1132
 
1083
1133
    def test_not_applicable(self):
1084
1134
        # run a test that is skipped because it's not applicable
1085
 
        def not_applicable_test():
1086
 
            raise tests.TestNotApplicable('this test never runs')
 
1135
        class Test(tests.TestCase):
 
1136
            def not_applicable_test(self):
 
1137
                raise tests.TestNotApplicable('this test never runs')
1087
1138
        out = StringIO()
1088
1139
        runner = tests.TextTestRunner(stream=out, verbosity=2)
1089
 
        test = unittest.FunctionTestCase(not_applicable_test)
 
1140
        test = Test("not_applicable_test")
1090
1141
        result = self.run_test_runner(runner, test)
1091
1142
        self._log_file.write(out.getvalue())
1092
1143
        self.assertTrue(result.wasSuccessful())
1096
1147
        self.assertContainsRe(out.getvalue(),
1097
1148
                r'(?m)^    this test never runs')
1098
1149
 
1099
 
    def test_not_applicable_demo(self):
1100
 
        # just so you can see it in the test output
1101
 
        raise tests.TestNotApplicable('this test is just a demonstation')
1102
 
 
1103
1150
    def test_unsupported_features_listed(self):
1104
1151
        """When unsupported features are encountered they are detailed."""
1105
1152
        class Feature1(tests.Feature):
1125
1172
            ],
1126
1173
            lines[-3:])
1127
1174
 
 
1175
    def _patch_get_bzr_source_tree(self):
 
1176
        # Reading from the actual source tree breaks isolation, but we don't
 
1177
        # want to assume that thats *all* that would happen.
 
1178
        self._get_source_tree_calls = []
 
1179
        def _get_bzr_source_tree():
 
1180
            self._get_source_tree_calls.append("called")
 
1181
            return None
 
1182
        orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
 
1183
        bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
 
1184
        def restore():
 
1185
            bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
 
1186
        self.addCleanup(restore)
 
1187
 
1128
1188
    def test_bench_history(self):
1129
 
        # tests that the running the benchmark produces a history file
1130
 
        # containing a timestamp and the revision id of the bzrlib source which
1131
 
        # was tested.
1132
 
        workingtree = _get_bzr_source_tree()
 
1189
        # tests that the running the benchmark passes bench_history into
 
1190
        # the test result object. We can tell that happens if
 
1191
        # _get_bzr_source_tree is called.
 
1192
        self._patch_get_bzr_source_tree()
1133
1193
        test = TestRunner('dummy_test')
1134
1194
        output = StringIO()
1135
1195
        runner = tests.TextTestRunner(stream=self._log_file,
1137
1197
        result = self.run_test_runner(runner, test)
1138
1198
        output_string = output.getvalue()
1139
1199
        self.assertContainsRe(output_string, "--date [0-9.]+")
1140
 
        if workingtree is not None:
1141
 
            revision_id = workingtree.get_parent_ids()[0]
1142
 
            self.assertEndsWith(output_string.rstrip(), revision_id)
 
1200
        self.assertLength(1, self._get_source_tree_calls)
1143
1201
 
1144
1202
    def assertLogDeleted(self, test):
1145
1203
        log = test._get_log()
1254
1312
        self.assertContainsRe(log, 'this will be kept')
1255
1313
        self.assertEqual(log, test._log_contents)
1256
1314
 
 
1315
    def test_startTestRun(self):
 
1316
        """run should call result.startTestRun()"""
 
1317
        calls = []
 
1318
        class LoggingDecorator(tests.ForwardingResult):
 
1319
            def startTestRun(self):
 
1320
                tests.ForwardingResult.startTestRun(self)
 
1321
                calls.append('startTestRun')
 
1322
        test = unittest.FunctionTestCase(lambda:None)
 
1323
        stream = StringIO()
 
1324
        runner = tests.TextTestRunner(stream=stream,
 
1325
            result_decorators=[LoggingDecorator])
 
1326
        result = self.run_test_runner(runner, test)
 
1327
        self.assertLength(1, calls)
 
1328
 
 
1329
    def test_stopTestRun(self):
 
1330
        """run should call result.stopTestRun()"""
 
1331
        calls = []
 
1332
        class LoggingDecorator(tests.ForwardingResult):
 
1333
            def stopTestRun(self):
 
1334
                tests.ForwardingResult.stopTestRun(self)
 
1335
                calls.append('stopTestRun')
 
1336
        test = unittest.FunctionTestCase(lambda:None)
 
1337
        stream = StringIO()
 
1338
        runner = tests.TextTestRunner(stream=stream,
 
1339
            result_decorators=[LoggingDecorator])
 
1340
        result = self.run_test_runner(runner, test)
 
1341
        self.assertLength(1, calls)
 
1342
 
1257
1343
 
1258
1344
class SampleTestCase(tests.TestCase):
1259
1345
 
1263
1349
class _TestException(Exception):
1264
1350
    pass
1265
1351
 
 
1352
 
1266
1353
class TestTestCase(tests.TestCase):
1267
1354
    """Tests that test the core bzrlib TestCase."""
1268
1355
 
1317
1404
        # we could set something and run a test that will check
1318
1405
        # it gets santised, but this is probably sufficient for now:
1319
1406
        # if someone runs the test with -Dsomething it will error.
1320
 
        self.assertEqual(set(), bzrlib.debug.debug_flags)
 
1407
        flags = set()
 
1408
        if self._lock_check_thorough:
 
1409
            flags.add('strict_locks')
 
1410
        self.assertEqual(flags, bzrlib.debug.debug_flags)
1321
1411
 
1322
1412
    def change_selftest_debug_flags(self, new_flags):
1323
1413
        orig_selftest_flags = tests.selftest_debug_flags
1338
1428
                self.flags = set(bzrlib.debug.debug_flags)
1339
1429
        test = TestThatRecordsFlags('test_foo')
1340
1430
        test.run(self.make_test_result())
1341
 
        self.assertEqual(set(['a-flag']), self.flags)
 
1431
        flags = set(['a-flag'])
 
1432
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1433
            flags.add('strict_locks')
 
1434
        self.assertEqual(flags, self.flags)
 
1435
 
 
1436
    def test_disable_lock_checks(self):
 
1437
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1438
        class TestThatRecordsFlags(tests.TestCase):
 
1439
            def test_foo(nested_self):
 
1440
                self.flags = set(bzrlib.debug.debug_flags)
 
1441
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1442
        self.change_selftest_debug_flags(set())
 
1443
        test = TestThatRecordsFlags('test_foo')
 
1444
        test.run(self.make_test_result())
 
1445
        # By default we do strict lock checking and thorough lock/unlock
 
1446
        # tracking.
 
1447
        self.assertTrue(self.test_lock_check_thorough)
 
1448
        self.assertEqual(set(['strict_locks']), self.flags)
 
1449
        # Now set the disable_lock_checks flag, and show that this changed.
 
1450
        self.change_selftest_debug_flags(set(['disable_lock_checks']))
 
1451
        test = TestThatRecordsFlags('test_foo')
 
1452
        test.run(self.make_test_result())
 
1453
        self.assertFalse(self.test_lock_check_thorough)
 
1454
        self.assertEqual(set(), self.flags)
 
1455
 
 
1456
    def test_this_fails_strict_lock_check(self):
 
1457
        class TestThatRecordsFlags(tests.TestCase):
 
1458
            def test_foo(nested_self):
 
1459
                self.flags1 = set(bzrlib.debug.debug_flags)
 
1460
                self.thisFailsStrictLockCheck()
 
1461
                self.flags2 = set(bzrlib.debug.debug_flags)
 
1462
        # Make sure lock checking is active
 
1463
        self.change_selftest_debug_flags(set())
 
1464
        test = TestThatRecordsFlags('test_foo')
 
1465
        test.run(self.make_test_result())
 
1466
        self.assertEqual(set(['strict_locks']), self.flags1)
 
1467
        self.assertEqual(set(), self.flags2)
1342
1468
 
1343
1469
    def test_debug_flags_restored(self):
1344
1470
        """The bzrlib debug flags should be restored to their original state
1385
1511
        outer_test = TestTestCase("outer_child")
1386
1512
        result = self.make_test_result()
1387
1513
        outer_test.run(result)
 
1514
        self.addCleanup(osutils.delete_any, outer_test._log_file_name)
1388
1515
        self.assertEqual(original_trace, bzrlib.trace._trace_file)
1389
1516
 
1390
1517
    def method_that_times_a_bit_twice(self):
1433
1560
        self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1434
1561
        self.assertIsInstance(self._benchcalls[0][1], bzrlib.lsprof.Stats)
1435
1562
        self.assertIsInstance(self._benchcalls[1][1], bzrlib.lsprof.Stats)
 
1563
        del self._benchcalls[:]
1436
1564
 
1437
1565
    def test_knownFailure(self):
1438
1566
        """Self.knownFailure() should raise a KnownFailure exception."""
1439
1567
        self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1440
1568
 
 
1569
    def test_open_bzrdir_safe_roots(self):
 
1570
        # even a memory transport should fail to open when its url isn't 
 
1571
        # permitted.
 
1572
        # Manually set one up (TestCase doesn't and shouldn't provide magic
 
1573
        # machinery)
 
1574
        transport_server = MemoryServer()
 
1575
        transport_server.setUp()
 
1576
        self.addCleanup(transport_server.tearDown)
 
1577
        t = transport.get_transport(transport_server.get_url())
 
1578
        bzrdir.BzrDir.create(t.base)
 
1579
        self.assertRaises(errors.BzrError,
 
1580
            bzrdir.BzrDir.open_from_transport, t)
 
1581
        # But if we declare this as safe, we can open the bzrdir.
 
1582
        self.permit_url(t.base)
 
1583
        self._bzr_selftest_roots.append(t.base)
 
1584
        bzrdir.BzrDir.open_from_transport(t)
 
1585
 
1441
1586
    def test_requireFeature_available(self):
1442
1587
        """self.requireFeature(available) is a no-op."""
1443
1588
        class Available(tests.Feature):
1510
1655
            ],
1511
1656
            result.calls)
1512
1657
 
 
1658
    def test_start_server_registers_url(self):
 
1659
        transport_server = MemoryServer()
 
1660
        # A little strict, but unlikely to be changed soon.
 
1661
        self.assertEqual([], self._bzr_selftest_roots)
 
1662
        self.start_server(transport_server)
 
1663
        self.assertSubset([transport_server.get_url()],
 
1664
            self._bzr_selftest_roots)
 
1665
 
1513
1666
    def test_assert_list_raises_on_generator(self):
1514
1667
        def generator_which_will_raise():
1515
1668
            # This will not raise until after the first yield
1613
1766
        self.assertEndsWith('foo', 'oo')
1614
1767
        self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1615
1768
 
 
1769
    def test_assertEqualDiff(self):
 
1770
        e = self.assertRaises(AssertionError,
 
1771
                              self.assertEqualDiff, '', '\n')
 
1772
        self.assertEquals(str(e),
 
1773
                          # Don't blink ! The '+' applies to the second string
 
1774
                          'first string is missing a final newline.\n+ \n')
 
1775
        e = self.assertRaises(AssertionError,
 
1776
                              self.assertEqualDiff, '\n', '')
 
1777
        self.assertEquals(str(e),
 
1778
                          # Don't blink ! The '-' applies to the second string
 
1779
                          'second string is missing a final newline.\n- \n')
 
1780
 
 
1781
 
 
1782
class TestDeprecations(tests.TestCase):
 
1783
 
1616
1784
    def test_applyDeprecated_not_deprecated(self):
1617
1785
        sample_object = ApplyDeprecatedHelper()
1618
1786
        # calling an undeprecated callable raises an assertion
1695
1863
        tree = self.make_branch_and_memory_tree('a')
1696
1864
        self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree)
1697
1865
 
1698
 
 
1699
 
class TestSFTPMakeBranchAndTree(test_sftp_transport.TestCaseWithSFTPServer):
1700
 
 
1701
 
    def test_make_tree_for_sftp_branch(self):
1702
 
        """Transports backed by local directories create local trees."""
1703
 
 
 
1866
    def test_make_tree_for_local_vfs_backed_transport(self):
 
1867
        # make_branch_and_tree has to use local branch and repositories
 
1868
        # when the vfs transport and local disk are colocated, even if
 
1869
        # a different transport is in use for url generation.
 
1870
        from bzrlib.transport.fakevfat import FakeVFATServer
 
1871
        self.transport_server = FakeVFATServer
 
1872
        self.assertFalse(self.get_url('t1').startswith('file://'))
1704
1873
        tree = self.make_branch_and_tree('t1')
1705
1874
        base = tree.bzrdir.root_transport.base
1706
 
        self.failIf(base.startswith('sftp'),
1707
 
                'base %r is on sftp but should be local' % base)
 
1875
        self.assertStartsWith(base, 'file://')
1708
1876
        self.assertEquals(tree.bzrdir.root_transport,
1709
1877
                tree.branch.bzrdir.root_transport)
1710
1878
        self.assertEquals(tree.bzrdir.root_transport,
1711
1879
                tree.branch.repository.bzrdir.root_transport)
1712
1880
 
1713
1881
 
1714
 
class TestSelftest(tests.TestCase):
 
1882
class SelfTestHelper:
 
1883
 
 
1884
    def run_selftest(self, **kwargs):
 
1885
        """Run selftest returning its output."""
 
1886
        output = StringIO()
 
1887
        old_transport = bzrlib.tests.default_transport
 
1888
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
1889
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
1890
        try:
 
1891
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
1892
        finally:
 
1893
            bzrlib.tests.default_transport = old_transport
 
1894
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
1895
        output.seek(0)
 
1896
        return output
 
1897
 
 
1898
 
 
1899
class TestSelftest(tests.TestCase, SelfTestHelper):
1715
1900
    """Tests of bzrlib.tests.selftest."""
1716
1901
 
1717
1902
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
1725
1910
            test_suite_factory=factory)
1726
1911
        self.assertEqual([True], factory_called)
1727
1912
 
 
1913
    def factory(self):
 
1914
        """A test suite factory."""
 
1915
        class Test(tests.TestCase):
 
1916
            def a(self):
 
1917
                pass
 
1918
            def b(self):
 
1919
                pass
 
1920
            def c(self):
 
1921
                pass
 
1922
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
1923
 
 
1924
    def test_list_only(self):
 
1925
        output = self.run_selftest(test_suite_factory=self.factory,
 
1926
            list_only=True)
 
1927
        self.assertEqual(3, len(output.readlines()))
 
1928
 
 
1929
    def test_list_only_filtered(self):
 
1930
        output = self.run_selftest(test_suite_factory=self.factory,
 
1931
            list_only=True, pattern="Test.b")
 
1932
        self.assertEndsWith(output.getvalue(), "Test.b\n")
 
1933
        self.assertLength(1, output.readlines())
 
1934
 
 
1935
    def test_list_only_excludes(self):
 
1936
        output = self.run_selftest(test_suite_factory=self.factory,
 
1937
            list_only=True, exclude_pattern="Test.b")
 
1938
        self.assertNotContainsRe("Test.b", output.getvalue())
 
1939
        self.assertLength(2, output.readlines())
 
1940
 
 
1941
    def test_lsprof_tests(self):
 
1942
        self.requireFeature(test_lsprof.LSProfFeature)
 
1943
        calls = []
 
1944
        class Test(object):
 
1945
            def __call__(test, result):
 
1946
                test.run(result)
 
1947
            def run(test, result):
 
1948
                self.assertIsInstance(result, tests.ForwardingResult)
 
1949
                calls.append("called")
 
1950
            def countTestCases(self):
 
1951
                return 1
 
1952
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
 
1953
        self.assertLength(1, calls)
 
1954
 
 
1955
    def test_random(self):
 
1956
        # test randomising by listing a number of tests.
 
1957
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
1958
            list_only=True, random_seed="123")
 
1959
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
1960
            list_only=True, random_seed="234")
 
1961
        self.assertNotEqual(output_123, output_234)
 
1962
        # "Randominzing test order..\n\n
 
1963
        self.assertLength(5, output_123.readlines())
 
1964
        self.assertLength(5, output_234.readlines())
 
1965
 
 
1966
    def test_random_reuse_is_same_order(self):
 
1967
        # test randomising by listing a number of tests.
 
1968
        expected = self.run_selftest(test_suite_factory=self.factory,
 
1969
            list_only=True, random_seed="123")
 
1970
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
1971
            list_only=True, random_seed="123")
 
1972
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
1973
 
 
1974
    def test_runner_class(self):
 
1975
        self.requireFeature(SubUnitFeature)
 
1976
        from subunit import ProtocolTestCase
 
1977
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
1978
            test_suite_factory=self.factory)
 
1979
        test = ProtocolTestCase(stream)
 
1980
        result = unittest.TestResult()
 
1981
        test.run(result)
 
1982
        self.assertEqual(3, result.testsRun)
 
1983
 
 
1984
    def test_starting_with_single_argument(self):
 
1985
        output = self.run_selftest(test_suite_factory=self.factory,
 
1986
            starting_with=['bzrlib.tests.test_selftest.Test.a'],
 
1987
            list_only=True)
 
1988
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
 
1989
            output.getvalue())
 
1990
 
 
1991
    def test_starting_with_multiple_argument(self):
 
1992
        output = self.run_selftest(test_suite_factory=self.factory,
 
1993
            starting_with=['bzrlib.tests.test_selftest.Test.a',
 
1994
                'bzrlib.tests.test_selftest.Test.b'],
 
1995
            list_only=True)
 
1996
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
 
1997
            'bzrlib.tests.test_selftest.Test.b\n',
 
1998
            output.getvalue())
 
1999
 
 
2000
    def check_transport_set(self, transport_server):
 
2001
        captured_transport = []
 
2002
        def seen_transport(a_transport):
 
2003
            captured_transport.append(a_transport)
 
2004
        class Capture(tests.TestCase):
 
2005
            def a(self):
 
2006
                seen_transport(bzrlib.tests.default_transport)
 
2007
        def factory():
 
2008
            return TestUtil.TestSuite([Capture("a")])
 
2009
        self.run_selftest(transport=transport_server, test_suite_factory=factory)
 
2010
        self.assertEqual(transport_server, captured_transport[0])
 
2011
 
 
2012
    def test_transport_sftp(self):
 
2013
        try:
 
2014
            import bzrlib.transport.sftp
 
2015
        except errors.ParamikoNotPresent:
 
2016
            raise tests.TestSkipped("Paramiko not present")
 
2017
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
 
2018
 
 
2019
    def test_transport_memory(self):
 
2020
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
 
2021
 
 
2022
 
 
2023
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
2024
    # Does IO: reads test.list
 
2025
 
 
2026
    def test_load_list(self):
 
2027
        # Provide a list with one test - this test.
 
2028
        test_id_line = '%s\n' % self.id()
 
2029
        self.build_tree_contents([('test.list', test_id_line)])
 
2030
        # And generate a list of the tests in  the suite.
 
2031
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
2032
        self.assertEqual(test_id_line, stream.getvalue())
 
2033
 
 
2034
    def test_load_unknown(self):
 
2035
        # Provide a list with one test - this test.
 
2036
        # And generate a list of the tests in  the suite.
 
2037
        err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
2038
            load_list='missing file name', list_only=True)
 
2039
 
 
2040
 
 
2041
class TestRunBzr(tests.TestCase):
 
2042
 
 
2043
    out = ''
 
2044
    err = ''
 
2045
 
 
2046
    def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
 
2047
                         working_dir=None):
 
2048
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
2049
 
 
2050
        Attempts to run bzr from inside this class don't actually run it.
 
2051
 
 
2052
        We test how run_bzr actually invokes bzr in another location.  Here we
 
2053
        only need to test that it passes the right parameters to run_bzr.
 
2054
        """
 
2055
        self.argv = list(argv)
 
2056
        self.retcode = retcode
 
2057
        self.encoding = encoding
 
2058
        self.stdin = stdin
 
2059
        self.working_dir = working_dir
 
2060
        return self.retcode, self.out, self.err
 
2061
 
 
2062
    def test_run_bzr_error(self):
 
2063
        self.out = "It sure does!\n"
 
2064
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
2065
        self.assertEqual(['rocks'], self.argv)
 
2066
        self.assertEqual(34, self.retcode)
 
2067
        self.assertEqual('It sure does!\n', out)
 
2068
        self.assertEquals(out, self.out)
 
2069
        self.assertEqual('', err)
 
2070
        self.assertEquals(err, self.err)
 
2071
 
 
2072
    def test_run_bzr_error_regexes(self):
 
2073
        self.out = ''
 
2074
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
2075
        out, err = self.run_bzr_error(
 
2076
            ["bzr: ERROR: foobarbaz is not versioned"],
 
2077
            ['file-id', 'foobarbaz'])
 
2078
 
 
2079
    def test_encoding(self):
 
2080
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
2081
        self.run_bzr('foo bar')
 
2082
        self.assertEqual(None, self.encoding)
 
2083
        self.assertEqual(['foo', 'bar'], self.argv)
 
2084
 
 
2085
        self.run_bzr('foo bar', encoding='baz')
 
2086
        self.assertEqual('baz', self.encoding)
 
2087
        self.assertEqual(['foo', 'bar'], self.argv)
 
2088
 
 
2089
    def test_retcode(self):
 
2090
        """Test that run_bzr passes retcode to _run_bzr_core"""
 
2091
        # Default is retcode == 0
 
2092
        self.run_bzr('foo bar')
 
2093
        self.assertEqual(0, self.retcode)
 
2094
        self.assertEqual(['foo', 'bar'], self.argv)
 
2095
 
 
2096
        self.run_bzr('foo bar', retcode=1)
 
2097
        self.assertEqual(1, self.retcode)
 
2098
        self.assertEqual(['foo', 'bar'], self.argv)
 
2099
 
 
2100
        self.run_bzr('foo bar', retcode=None)
 
2101
        self.assertEqual(None, self.retcode)
 
2102
        self.assertEqual(['foo', 'bar'], self.argv)
 
2103
 
 
2104
        self.run_bzr(['foo', 'bar'], retcode=3)
 
2105
        self.assertEqual(3, self.retcode)
 
2106
        self.assertEqual(['foo', 'bar'], self.argv)
 
2107
 
 
2108
    def test_stdin(self):
 
2109
        # test that the stdin keyword to run_bzr is passed through to
 
2110
        # _run_bzr_core as-is. We do this by overriding
 
2111
        # _run_bzr_core in this class, and then calling run_bzr,
 
2112
        # which is a convenience function for _run_bzr_core, so
 
2113
        # should invoke it.
 
2114
        self.run_bzr('foo bar', stdin='gam')
 
2115
        self.assertEqual('gam', self.stdin)
 
2116
        self.assertEqual(['foo', 'bar'], self.argv)
 
2117
 
 
2118
        self.run_bzr('foo bar', stdin='zippy')
 
2119
        self.assertEqual('zippy', self.stdin)
 
2120
        self.assertEqual(['foo', 'bar'], self.argv)
 
2121
 
 
2122
    def test_working_dir(self):
 
2123
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
2124
        self.run_bzr('foo bar')
 
2125
        self.assertEqual(None, self.working_dir)
 
2126
        self.assertEqual(['foo', 'bar'], self.argv)
 
2127
 
 
2128
        self.run_bzr('foo bar', working_dir='baz')
 
2129
        self.assertEqual('baz', self.working_dir)
 
2130
        self.assertEqual(['foo', 'bar'], self.argv)
 
2131
 
 
2132
    def test_reject_extra_keyword_arguments(self):
 
2133
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
2134
                          error_regex=['error message'])
 
2135
 
 
2136
 
 
2137
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2138
    # Does IO when testing the working_dir parameter.
 
2139
 
 
2140
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2141
                         a_callable=None, *args, **kwargs):
 
2142
        self.stdin = stdin
 
2143
        self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
2144
        self.factory = bzrlib.ui.ui_factory
 
2145
        self.working_dir = osutils.getcwd()
 
2146
        stdout.write('foo\n')
 
2147
        stderr.write('bar\n')
 
2148
        return 0
 
2149
 
 
2150
    def test_stdin(self):
 
2151
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2152
        # apply_redirected as a StringIO. We do this by overriding
 
2153
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2154
        # which calls apply_redirected.
 
2155
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2156
        self.assertEqual('gam', self.stdin.read())
 
2157
        self.assertTrue(self.stdin is self.factory_stdin)
 
2158
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2159
        self.assertEqual('zippy', self.stdin.read())
 
2160
        self.assertTrue(self.stdin is self.factory_stdin)
 
2161
 
 
2162
    def test_ui_factory(self):
 
2163
        # each invocation of self.run_bzr should get its
 
2164
        # own UI factory, which is an instance of TestUIFactory,
 
2165
        # with stdin, stdout and stderr attached to the stdin,
 
2166
        # stdout and stderr of the invoked run_bzr
 
2167
        current_factory = bzrlib.ui.ui_factory
 
2168
        self.run_bzr(['foo'])
 
2169
        self.failIf(current_factory is self.factory)
 
2170
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2171
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2172
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2173
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2174
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2175
 
 
2176
    def test_working_dir(self):
 
2177
        self.build_tree(['one/', 'two/'])
 
2178
        cwd = osutils.getcwd()
 
2179
 
 
2180
        # Default is to work in the current directory
 
2181
        self.run_bzr(['foo', 'bar'])
 
2182
        self.assertEqual(cwd, self.working_dir)
 
2183
 
 
2184
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2185
        self.assertEqual(cwd, self.working_dir)
 
2186
 
 
2187
        # The function should be run in the alternative directory
 
2188
        # but afterwards the current working dir shouldn't be changed
 
2189
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2190
        self.assertNotEqual(cwd, self.working_dir)
 
2191
        self.assertEndsWith(self.working_dir, 'one')
 
2192
        self.assertEqual(cwd, osutils.getcwd())
 
2193
 
 
2194
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2195
        self.assertNotEqual(cwd, self.working_dir)
 
2196
        self.assertEndsWith(self.working_dir, 'two')
 
2197
        self.assertEqual(cwd, osutils.getcwd())
 
2198
 
 
2199
 
 
2200
class StubProcess(object):
 
2201
    """A stub process for testing run_bzr_subprocess."""
 
2202
    
 
2203
    def __init__(self, out="", err="", retcode=0):
 
2204
        self.out = out
 
2205
        self.err = err
 
2206
        self.returncode = retcode
 
2207
 
 
2208
    def communicate(self):
 
2209
        return self.out, self.err
 
2210
 
 
2211
 
 
2212
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
 
2213
    """Base class for tests testing how we might run bzr."""
 
2214
 
 
2215
    def setUp(self):
 
2216
        tests.TestCaseWithTransport.setUp(self)
 
2217
        self.subprocess_calls = []
 
2218
 
 
2219
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2220
                             skip_if_plan_to_signal=False,
 
2221
                             working_dir=None,
 
2222
                             allow_plugins=False):
 
2223
        """capture what run_bzr_subprocess tries to do."""
 
2224
        self.subprocess_calls.append({'process_args':process_args,
 
2225
            'env_changes':env_changes,
 
2226
            'skip_if_plan_to_signal':skip_if_plan_to_signal,
 
2227
            'working_dir':working_dir, 'allow_plugins':allow_plugins})
 
2228
        return self.next_subprocess
 
2229
 
 
2230
 
 
2231
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2232
 
 
2233
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2234
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2235
 
 
2236
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2237
        that will return static results. This assertion method populates those
 
2238
        results and also checks the arguments run_bzr_subprocess generates.
 
2239
        """
 
2240
        self.next_subprocess = process
 
2241
        try:
 
2242
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2243
        except:
 
2244
            self.next_subprocess = None
 
2245
            for key, expected in expected_args.iteritems():
 
2246
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2247
            raise
 
2248
        else:
 
2249
            self.next_subprocess = None
 
2250
            for key, expected in expected_args.iteritems():
 
2251
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2252
            return result
 
2253
 
 
2254
    def test_run_bzr_subprocess(self):
 
2255
        """The run_bzr_helper_external command behaves nicely."""
 
2256
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2257
            StubProcess(), '--version')
 
2258
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2259
            StubProcess(), ['--version'])
 
2260
        # retcode=None disables retcode checking
 
2261
        result = self.assertRunBzrSubprocess({},
 
2262
            StubProcess(retcode=3), '--version', retcode=None)
 
2263
        result = self.assertRunBzrSubprocess({},
 
2264
            StubProcess(out="is free software"), '--version')
 
2265
        self.assertContainsRe(result[0], 'is free software')
 
2266
        # Running a subcommand that is missing errors
 
2267
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2268
            {'process_args':['--versionn']}, StubProcess(retcode=3),
 
2269
            '--versionn')
 
2270
        # Unless it is told to expect the error from the subprocess
 
2271
        result = self.assertRunBzrSubprocess({},
 
2272
            StubProcess(retcode=3), '--versionn', retcode=3)
 
2273
        # Or to ignore retcode checking
 
2274
        result = self.assertRunBzrSubprocess({},
 
2275
            StubProcess(err="unknown command", retcode=3), '--versionn',
 
2276
            retcode=None)
 
2277
        self.assertContainsRe(result[1], 'unknown command')
 
2278
 
 
2279
    def test_env_change_passes_through(self):
 
2280
        self.assertRunBzrSubprocess(
 
2281
            {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
 
2282
            StubProcess(), '',
 
2283
            env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
 
2284
 
 
2285
    def test_no_working_dir_passed_as_None(self):
 
2286
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2287
 
 
2288
    def test_no_working_dir_passed_through(self):
 
2289
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2290
            working_dir='dir')
 
2291
 
 
2292
    def test_run_bzr_subprocess_no_plugins(self):
 
2293
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2294
            StubProcess(), '')
 
2295
 
 
2296
    def test_allow_plugins(self):
 
2297
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2298
            StubProcess(), '', allow_plugins=True)
 
2299
 
 
2300
 
 
2301
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2302
 
 
2303
    def test_finish_bzr_subprocess_with_error(self):
 
2304
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2305
        """
 
2306
        process = StubProcess(err="unknown command", retcode=3)
 
2307
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2308
        self.assertEqual('', result[0])
 
2309
        self.assertContainsRe(result[1], 'unknown command')
 
2310
 
 
2311
    def test_finish_bzr_subprocess_ignoring_retcode(self):
 
2312
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2313
        process = StubProcess(err="unknown command", retcode=3)
 
2314
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2315
        self.assertEqual('', result[0])
 
2316
        self.assertContainsRe(result[1], 'unknown command')
 
2317
 
 
2318
    def test_finish_subprocess_with_unexpected_retcode(self):
 
2319
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2320
        not the expected one.
 
2321
        """
 
2322
        process = StubProcess(err="unknown command", retcode=3)
 
2323
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2324
                          process)
 
2325
 
 
2326
 
 
2327
class _DontSpawnProcess(Exception):
 
2328
    """A simple exception which just allows us to skip unnecessary steps"""
 
2329
 
 
2330
 
 
2331
class TestStartBzrSubProcess(tests.TestCase):
 
2332
 
 
2333
    def check_popen_state(self):
 
2334
        """Replace to make assertions when popen is called."""
 
2335
 
 
2336
    def _popen(self, *args, **kwargs):
 
2337
        """Record the command that is run, so that we can ensure it is correct"""
 
2338
        self.check_popen_state()
 
2339
        self._popen_args = args
 
2340
        self._popen_kwargs = kwargs
 
2341
        raise _DontSpawnProcess()
 
2342
 
 
2343
    def test_run_bzr_subprocess_no_plugins(self):
 
2344
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2345
        command = self._popen_args[0]
 
2346
        self.assertEqual(sys.executable, command[0])
 
2347
        self.assertEqual(self.get_bzr_path(), command[1])
 
2348
        self.assertEqual(['--no-plugins'], command[2:])
 
2349
 
 
2350
    def test_allow_plugins(self):
 
2351
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2352
            allow_plugins=True)
 
2353
        command = self._popen_args[0]
 
2354
        self.assertEqual([], command[2:])
 
2355
 
 
2356
    def test_set_env(self):
 
2357
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2358
        # set in the child
 
2359
        def check_environment():
 
2360
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2361
        self.check_popen_state = check_environment
 
2362
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2363
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2364
        # not set in theparent
 
2365
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2366
 
 
2367
    def test_run_bzr_subprocess_env_del(self):
 
2368
        """run_bzr_subprocess can remove environment variables too."""
 
2369
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2370
        def check_environment():
 
2371
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2372
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2373
        self.check_popen_state = check_environment
 
2374
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2375
            env_changes={'EXISTANT_ENV_VAR':None})
 
2376
        # Still set in parent
 
2377
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2378
        del os.environ['EXISTANT_ENV_VAR']
 
2379
 
 
2380
    def test_env_del_missing(self):
 
2381
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2382
        def check_environment():
 
2383
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2384
        self.check_popen_state = check_environment
 
2385
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2386
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2387
 
 
2388
    def test_working_dir(self):
 
2389
        """Test that we can specify the working dir for the child"""
 
2390
        orig_getcwd = osutils.getcwd
 
2391
        orig_chdir = os.chdir
 
2392
        chdirs = []
 
2393
        def chdir(path):
 
2394
            chdirs.append(path)
 
2395
        os.chdir = chdir
 
2396
        try:
 
2397
            def getcwd():
 
2398
                return 'current'
 
2399
            osutils.getcwd = getcwd
 
2400
            try:
 
2401
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2402
                    working_dir='foo')
 
2403
            finally:
 
2404
                osutils.getcwd = orig_getcwd
 
2405
        finally:
 
2406
            os.chdir = orig_chdir
 
2407
        self.assertEqual(['foo', 'current'], chdirs)
 
2408
 
 
2409
 
 
2410
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
 
2411
    """Tests that really need to do things with an external bzr."""
 
2412
 
 
2413
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2414
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2415
        not the expected one.
 
2416
        """
 
2417
        self.disable_missing_extensions_warning()
 
2418
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2419
                                            skip_if_plan_to_signal=True)
 
2420
        self.assertEqual('running\n', process.stdout.readline())
 
2421
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2422
                                            retcode=3)
 
2423
        self.assertEqual('', result[0])
 
2424
        self.assertEqual('bzr: interrupted\n', result[1])
 
2425
 
1728
2426
 
1729
2427
class TestKnownFailure(tests.TestCase):
1730
2428
 
1795
2493
        tests.TestCase.setUp(self)
1796
2494
        self.suite = TestUtil.TestSuite()
1797
2495
        self.loader = TestUtil.TestLoader()
1798
 
        self.suite.addTest(self.loader.loadTestsFromModuleNames([
1799
 
            'bzrlib.tests.test_selftest']))
 
2496
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2497
            sys.modules['bzrlib.tests.test_selftest']))
1800
2498
        self.all_names = _test_ids(self.suite)
1801
2499
 
1802
2500
    def test_condition_id_re(self):
1980
2678
        # Running bzr in blackbox mode, normal/expected/user errors should be
1981
2679
        # caught in the regular way and turned into an error message plus exit
1982
2680
        # code.
1983
 
        out, err = self.run_bzr(["log", "/nonexistantpath"], retcode=3)
 
2681
        transport_server = MemoryServer()
 
2682
        transport_server.setUp()
 
2683
        self.addCleanup(transport_server.tearDown)
 
2684
        url = transport_server.get_url()
 
2685
        self.permit_url(url)
 
2686
        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
1984
2687
        self.assertEqual(out, '')
1985
2688
        self.assertContainsRe(err,
1986
2689
            'bzr: ERROR: Not a branch: ".*nonexistantpath/".\n')
2112
2815
 
2113
2816
class TestTestSuite(tests.TestCase):
2114
2817
 
 
2818
    def test__test_suite_testmod_names(self):
 
2819
        # Test that a plausible list of test module names are returned
 
2820
        # by _test_suite_testmod_names.
 
2821
        test_list = tests._test_suite_testmod_names()
 
2822
        self.assertSubset([
 
2823
            'bzrlib.tests.blackbox',
 
2824
            'bzrlib.tests.per_transport',
 
2825
            'bzrlib.tests.test_selftest',
 
2826
            ],
 
2827
            test_list)
 
2828
 
 
2829
    def test__test_suite_modules_to_doctest(self):
 
2830
        # Test that a plausible list of modules to doctest is returned
 
2831
        # by _test_suite_modules_to_doctest.
 
2832
        test_list = tests._test_suite_modules_to_doctest()
 
2833
        self.assertSubset([
 
2834
            'bzrlib.timestamp',
 
2835
            ],
 
2836
            test_list)
 
2837
 
2115
2838
    def test_test_suite(self):
2116
 
        # This test is slow, so we do a single test with one test in each
2117
 
        # category
2118
 
        test_list = [
 
2839
        # test_suite() loads the entire test suite to operate. To avoid this
 
2840
        # overhead, and yet still be confident that things are happening,
 
2841
        # we temporarily replace two functions used by test_suite with 
 
2842
        # test doubles that supply a few sample tests to load, and check they
 
2843
        # are loaded.
 
2844
        calls = []
 
2845
        def _test_suite_testmod_names():
 
2846
            calls.append("testmod_names")
 
2847
            return [
 
2848
                'bzrlib.tests.blackbox.test_branch',
 
2849
                'bzrlib.tests.per_transport',
 
2850
                'bzrlib.tests.test_selftest',
 
2851
                ]
 
2852
        original_testmod_names = tests._test_suite_testmod_names
 
2853
        def _test_suite_modules_to_doctest():
 
2854
            calls.append("modules_to_doctest")
 
2855
            return ['bzrlib.timestamp']
 
2856
        orig_modules_to_doctest = tests._test_suite_modules_to_doctest
 
2857
        def restore_names():
 
2858
            tests._test_suite_testmod_names = original_testmod_names
 
2859
            tests._test_suite_modules_to_doctest = orig_modules_to_doctest
 
2860
        self.addCleanup(restore_names)
 
2861
        tests._test_suite_testmod_names = _test_suite_testmod_names
 
2862
        tests._test_suite_modules_to_doctest = _test_suite_modules_to_doctest
 
2863
        expected_test_list = [
2119
2864
            # testmod_names
2120
2865
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2121
2866
            ('bzrlib.tests.per_transport.TransportTests'
2122
 
             '.test_abspath(LocalURLServer)'),
 
2867
             '.test_abspath(LocalTransport,LocalURLServer)'),
2123
2868
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
2124
2869
            # modules_to_doctest
2125
2870
            'bzrlib.timestamp.format_highres_date',
2126
2871
            # plugins can't be tested that way since selftest may be run with
2127
2872
            # --no-plugins
2128
2873
            ]
2129
 
        suite = tests.test_suite(test_list)
2130
 
        self.assertEquals(test_list, _test_ids(suite))
 
2874
        suite = tests.test_suite()
 
2875
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
 
2876
            set(calls))
 
2877
        self.assertSubset(expected_test_list, _test_ids(suite))
2131
2878
 
2132
2879
    def test_test_suite_list_and_start(self):
 
2880
        # We cannot test this at the same time as the main load, because we want
 
2881
        # to know that starting_with == None works. So a second load is
 
2882
        # incurred - note that the starting_with parameter causes a partial load
 
2883
        # rather than a full load so this test should be pretty quick.
2133
2884
        test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
2134
2885
        suite = tests.test_suite(test_list,
2135
2886
                                 ['bzrlib.tests.test_selftest.TestTestSuite'])
2281
3032
                                                self.verbosity)
2282
3033
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2283
3034
        self.assertLength(1, calls)
2284
 
 
2285
 
    def test_done(self):
2286
 
        """run_suite should call result.done()"""
2287
 
        self.calls = 0
2288
 
        def one_more_call(): self.calls += 1
2289
 
        def test_function():
2290
 
            pass
2291
 
        test = unittest.FunctionTestCase(test_function)
2292
 
        class InstrumentedTestResult(tests.ExtendedTestResult):
2293
 
            def done(self): one_more_call()
2294
 
        class MyRunner(tests.TextTestRunner):
2295
 
            def run(self, test):
2296
 
                return InstrumentedTestResult(self.stream, self.descriptions,
2297
 
                                              self.verbosity)
2298
 
        tests.run_suite(test, runner_class=MyRunner, stream=StringIO())
2299
 
        self.assertEquals(1, self.calls)