/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-12-21 06:03:07 UTC
  • mfrom: (4665.7.3 serve-init)
  • Revision ID: pqm@pqm.ubuntu.com-20091221060307-uvja3vdy1o6dzzy0
(mbp) example debian init script

Show diffs side-by-side

added added

removed removed

Lines of Context:
53
53
from bzrlib import (
54
54
    branchbuilder,
55
55
    bzrdir,
 
56
    chk_map,
 
57
    config,
56
58
    debug,
57
59
    errors,
58
60
    hooks,
90
92
    deprecated_passed,
91
93
    )
92
94
import bzrlib.trace
93
 
from bzrlib.transport import get_transport
 
95
from bzrlib.transport import get_transport, pathfilter
94
96
import bzrlib.transport
95
97
from bzrlib.transport.local import LocalURLServer
96
98
from bzrlib.transport.memory import MemoryServer
221
223
                '%s is leaking threads among %d leaking tests.\n' % (
222
224
                TestCase._first_thread_leaker_id,
223
225
                TestCase._leaking_threads_tests))
 
226
            # We don't report the main thread as an active one.
 
227
            self.stream.write(
 
228
                '%d non-main threads were left active in the end.\n'
 
229
                % (TestCase._active_threads - 1))
224
230
 
225
231
    def _extractBenchmarkTime(self, testCase):
226
232
        """Add a benchmark time for the current test case."""
292
298
        Called from the TestCase run() method when the test
293
299
        fails with an unexpected error.
294
300
        """
295
 
        self._testConcluded(test)
296
 
        if isinstance(err[1], TestNotApplicable):
297
 
            return self._addNotApplicable(test, err)
298
 
        elif isinstance(err[1], UnavailableFeature):
299
 
            return self.addNotSupported(test, err[1].args[0])
300
 
        else:
301
 
            unittest.TestResult.addError(self, test, err)
302
 
            self.error_count += 1
303
 
            self.report_error(test, err)
304
 
            if self.stop_early:
305
 
                self.stop()
306
 
            self._cleanupLogFile(test)
 
301
        self._post_mortem()
 
302
        unittest.TestResult.addError(self, test, err)
 
303
        self.error_count += 1
 
304
        self.report_error(test, err)
 
305
        if self.stop_early:
 
306
            self.stop()
 
307
        self._cleanupLogFile(test)
307
308
 
308
309
    def addFailure(self, test, err):
309
310
        """Tell result that test failed.
311
312
        Called from the TestCase run() method when the test
312
313
        fails because e.g. an assert() method failed.
313
314
        """
314
 
        self._testConcluded(test)
315
 
        if isinstance(err[1], KnownFailure):
316
 
            return self._addKnownFailure(test, err)
317
 
        else:
318
 
            unittest.TestResult.addFailure(self, test, err)
319
 
            self.failure_count += 1
320
 
            self.report_failure(test, err)
321
 
            if self.stop_early:
322
 
                self.stop()
323
 
            self._cleanupLogFile(test)
 
315
        self._post_mortem()
 
316
        unittest.TestResult.addFailure(self, test, err)
 
317
        self.failure_count += 1
 
318
        self.report_failure(test, err)
 
319
        if self.stop_early:
 
320
            self.stop()
 
321
        self._cleanupLogFile(test)
324
322
 
325
323
    def addSuccess(self, test):
326
324
        """Tell result that test completed successfully.
327
325
 
328
326
        Called from the TestCase run()
329
327
        """
330
 
        self._testConcluded(test)
331
328
        if self._bench_history is not None:
332
329
            benchmark_time = self._extractBenchmarkTime(test)
333
330
            if benchmark_time is not None:
339
336
        unittest.TestResult.addSuccess(self, test)
340
337
        test._log_contents = ''
341
338
 
342
 
    def _testConcluded(self, test):
343
 
        """Common code when a test has finished.
344
 
 
345
 
        Called regardless of whether it succeded, failed, etc.
346
 
        """
347
 
        pass
348
 
 
349
 
    def _addKnownFailure(self, test, err):
 
339
    def addExpectedFailure(self, test, err):
350
340
        self.known_failure_count += 1
351
341
        self.report_known_failure(test, err)
352
342
 
354
344
        """The test will not be run because of a missing feature.
355
345
        """
356
346
        # this can be called in two different ways: it may be that the
357
 
        # test started running, and then raised (through addError)
 
347
        # test started running, and then raised (through requireFeature)
358
348
        # UnavailableFeature.  Alternatively this method can be called
359
 
        # while probing for features before running the tests; in that
360
 
        # case we will see startTest and stopTest, but the test will never
361
 
        # actually run.
 
349
        # while probing for features before running the test code proper; in
 
350
        # that case we will see startTest and stopTest, but the test will
 
351
        # never actually run.
362
352
        self.unsupported.setdefault(str(feature), 0)
363
353
        self.unsupported[str(feature)] += 1
364
354
        self.report_unsupported(test, feature)
368
358
        self.skip_count += 1
369
359
        self.report_skip(test, reason)
370
360
 
371
 
    def _addNotApplicable(self, test, skip_excinfo):
372
 
        if isinstance(skip_excinfo[1], TestNotApplicable):
373
 
            self.not_applicable_count += 1
374
 
            self.report_not_applicable(test, skip_excinfo)
375
 
        try:
376
 
            test.tearDown()
377
 
        except KeyboardInterrupt:
378
 
            raise
379
 
        except:
380
 
            self.addError(test, test.exc_info())
381
 
        else:
382
 
            # seems best to treat this as success from point-of-view of unittest
383
 
            # -- it actually does nothing so it barely matters :)
384
 
            unittest.TestResult.addSuccess(self, test)
385
 
            test._log_contents = ''
 
361
    def addNotApplicable(self, test, reason):
 
362
        self.not_applicable_count += 1
 
363
        self.report_not_applicable(test, reason)
386
364
 
387
365
    def printErrorList(self, flavour, errors):
388
366
        for test, err in errors:
404
382
            self.stream.writeln(self.separator2)
405
383
            self.stream.writeln("%s" % err)
406
384
 
 
385
    def _post_mortem(self):
 
386
        """Start a PDB post mortem session."""
 
387
        if os.environ.get('BZR_TEST_PDB', None):
 
388
            import pdb;pdb.post_mortem()
 
389
 
407
390
    def progress(self, offset, whence):
408
391
        """The test is adjusting the count of tests to run."""
409
392
        if whence == SUBUNIT_SEEK_SET:
488
471
            a += ', %d err' % self.error_count
489
472
        if self.failure_count:
490
473
            a += ', %d fail' % self.failure_count
491
 
        if self.unsupported:
492
 
            a += ', %d missing' % len(self.unsupported)
 
474
        # if self.unsupported:
 
475
        #     a += ', %d missing' % len(self.unsupported)
493
476
        a += ']'
494
477
        return a
495
478
 
504
487
        return self._shortened_test_description(test)
505
488
 
506
489
    def report_error(self, test, err):
507
 
        self.pb.note('ERROR: %s\n    %s\n',
 
490
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
508
491
            self._test_description(test),
509
492
            err[1],
510
 
            )
 
493
            ))
511
494
 
512
495
    def report_failure(self, test, err):
513
 
        self.pb.note('FAIL: %s\n    %s\n',
 
496
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
514
497
            self._test_description(test),
515
498
            err[1],
516
 
            )
 
499
            ))
517
500
 
518
501
    def report_known_failure(self, test, err):
519
 
        self.pb.note('XFAIL: %s\n%s\n',
520
 
            self._test_description(test), err[1])
 
502
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
 
503
            self._test_description(test), err[1]))
521
504
 
522
505
    def report_skip(self, test, reason):
523
506
        pass
524
507
 
525
 
    def report_not_applicable(self, test, skip_excinfo):
 
508
    def report_not_applicable(self, test, reason):
526
509
        pass
527
510
 
528
511
    def report_unsupported(self, test, feature):
550
533
    def report_test_start(self, test):
551
534
        self.count += 1
552
535
        name = self._shortened_test_description(test)
553
 
        # width needs space for 6 char status, plus 1 for slash, plus an
554
 
        # 11-char time string, plus a trailing blank
555
 
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
556
 
        self.stream.write(self._ellipsize_to_right(name,
557
 
                          osutils.terminal_width()-18))
 
536
        width = osutils.terminal_width()
 
537
        if width is not None:
 
538
            # width needs space for 6 char status, plus 1 for slash, plus an
 
539
            # 11-char time string, plus a trailing blank
 
540
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
541
            # space
 
542
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
543
        else:
 
544
            self.stream.write(name)
558
545
        self.stream.flush()
559
546
 
560
547
    def _error_summary(self, err):
589
576
        self.stream.writeln(' SKIP %s\n%s'
590
577
                % (self._testTimeString(test), reason))
591
578
 
592
 
    def report_not_applicable(self, test, skip_excinfo):
593
 
        self.stream.writeln('  N/A %s\n%s'
594
 
                % (self._testTimeString(test),
595
 
                   self._error_summary(skip_excinfo)))
 
579
    def report_not_applicable(self, test, reason):
 
580
        self.stream.writeln('  N/A %s\n    %s'
 
581
                % (self._testTimeString(test), reason))
596
582
 
597
583
    def report_unsupported(self, test, feature):
598
584
        """test cannot be run because feature is missing."""
700
686
class UnavailableFeature(Exception):
701
687
    """A feature required for this test was not available.
702
688
 
 
689
    This can be considered a specialised form of SkippedTest.
 
690
 
703
691
    The feature should be used to construct the exception.
704
692
    """
705
693
 
812
800
        self._cleanups = []
813
801
        self._bzr_test_setUp_run = False
814
802
        self._bzr_test_tearDown_run = False
 
803
        self._directory_isolation = True
815
804
 
816
805
    def setUp(self):
817
806
        unittest.TestCase.setUp(self)
822
811
        self._benchcalls = []
823
812
        self._benchtime = None
824
813
        self._clear_hooks()
 
814
        self._track_transports()
825
815
        self._track_locks()
826
816
        self._clear_debug_flags()
827
817
        TestCase._active_threads = threading.activeCount()
836
826
        active = threading.activeCount()
837
827
        leaked_threads = active - TestCase._active_threads
838
828
        TestCase._active_threads = active
839
 
        if leaked_threads:
 
829
        # If some tests make the number of threads *decrease*, we'll consider
 
830
        # that they are just observing old threads dieing, not agressively kill
 
831
        # random threads. So we don't report these tests as leaking. The risk
 
832
        # is that we have false positives that way (the test see 2 threads
 
833
        # going away but leak one) but it seems less likely than the actual
 
834
        # false positives (the test see threads going away and does not leak).
 
835
        if leaked_threads > 0:
840
836
            TestCase._leaking_threads_tests += 1
841
837
            if TestCase._first_thread_leaker_id is None:
842
838
                TestCase._first_thread_leaker_id = self.id()
868
864
        # this hook should always be installed
869
865
        request._install_hook()
870
866
 
 
867
    def disable_directory_isolation(self):
 
868
        """Turn off directory isolation checks."""
 
869
        self._directory_isolation = False
 
870
 
 
871
    def enable_directory_isolation(self):
 
872
        """Enable directory isolation checks."""
 
873
        self._directory_isolation = True
 
874
 
871
875
    def _silenceUI(self):
872
876
        """Turn off UI for duration of test"""
873
877
        # by default the UI is off; tests can turn it on if they want it.
928
932
    def _lock_broken(self, result):
929
933
        self._lock_actions.append(('broken', result))
930
934
 
 
935
    def permit_dir(self, name):
 
936
        """Permit a directory to be used by this test. See permit_url."""
 
937
        name_transport = get_transport(name)
 
938
        self.permit_url(name)
 
939
        self.permit_url(name_transport.base)
 
940
 
 
941
    def permit_url(self, url):
 
942
        """Declare that url is an ok url to use in this test.
 
943
        
 
944
        Do this for memory transports, temporary test directory etc.
 
945
        
 
946
        Do not do this for the current working directory, /tmp, or any other
 
947
        preexisting non isolated url.
 
948
        """
 
949
        if not url.endswith('/'):
 
950
            url += '/'
 
951
        self._bzr_selftest_roots.append(url)
 
952
 
 
953
    def permit_source_tree_branch_repo(self):
 
954
        """Permit the source tree bzr is running from to be opened.
 
955
 
 
956
        Some code such as bzrlib.version attempts to read from the bzr branch
 
957
        that bzr is executing from (if any). This method permits that directory
 
958
        to be used in the test suite.
 
959
        """
 
960
        path = self.get_source_path()
 
961
        self.record_directory_isolation()
 
962
        try:
 
963
            try:
 
964
                workingtree.WorkingTree.open(path)
 
965
            except (errors.NotBranchError, errors.NoWorkingTree):
 
966
                return
 
967
        finally:
 
968
            self.enable_directory_isolation()
 
969
 
 
970
    def _preopen_isolate_transport(self, transport):
 
971
        """Check that all transport openings are done in the test work area."""
 
972
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
973
            # Unwrap pathfiltered transports
 
974
            transport = transport.server.backing_transport.clone(
 
975
                transport._filter('.'))
 
976
        url = transport.base
 
977
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
978
        # urls it is given by prepending readonly+. This is appropriate as the
 
979
        # client shouldn't know that the server is readonly (or not readonly).
 
980
        # We could register all servers twice, with readonly+ prepending, but
 
981
        # that makes for a long list; this is about the same but easier to
 
982
        # read.
 
983
        if url.startswith('readonly+'):
 
984
            url = url[len('readonly+'):]
 
985
        self._preopen_isolate_url(url)
 
986
 
 
987
    def _preopen_isolate_url(self, url):
 
988
        if not self._directory_isolation:
 
989
            return
 
990
        if self._directory_isolation == 'record':
 
991
            self._bzr_selftest_roots.append(url)
 
992
            return
 
993
        # This prevents all transports, including e.g. sftp ones backed on disk
 
994
        # from working unless they are explicitly granted permission. We then
 
995
        # depend on the code that sets up test transports to check that they are
 
996
        # appropriately isolated and enable their use by calling
 
997
        # self.permit_transport()
 
998
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
999
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1000
                % (url, self._bzr_selftest_roots))
 
1001
 
 
1002
    def record_directory_isolation(self):
 
1003
        """Gather accessed directories to permit later access.
 
1004
        
 
1005
        This is used for tests that access the branch bzr is running from.
 
1006
        """
 
1007
        self._directory_isolation = "record"
 
1008
 
931
1009
    def start_server(self, transport_server, backing_server=None):
932
1010
        """Start transport_server for this test.
933
1011
 
939
1017
        else:
940
1018
            transport_server.setUp(backing_server)
941
1019
        self.addCleanup(transport_server.tearDown)
 
1020
        # Obtain a real transport because if the server supplies a password, it
 
1021
        # will be hidden from the base on the client side.
 
1022
        t = get_transport(transport_server.get_url())
 
1023
        # Some transport servers effectively chroot the backing transport;
 
1024
        # others like SFTPServer don't - users of the transport can walk up the
 
1025
        # transport to read the entire backing transport. This wouldn't matter
 
1026
        # except that the workdir tests are given - and that they expect the
 
1027
        # server's url to point at - is one directory under the safety net. So
 
1028
        # Branch operations into the transport will attempt to walk up one
 
1029
        # directory. Chrooting all servers would avoid this but also mean that
 
1030
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1031
        # getting the test framework to start the server with a backing server
 
1032
        # at the actual safety net directory would work too, but this then
 
1033
        # means that the self.get_url/self.get_transport methods would need
 
1034
        # to transform all their results. On balance its cleaner to handle it
 
1035
        # here, and permit a higher url when we have one of these transports.
 
1036
        if t.base.endswith('/work/'):
 
1037
            # we have safety net/test root/work
 
1038
            t = t.clone('../..')
 
1039
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1040
            # The smart server adds a path similar to work, which is traversed
 
1041
            # up from by the client. But the server is chrooted - the actual
 
1042
            # backing transport is not escaped from, and VFS requests to the
 
1043
            # root will error (because they try to escape the chroot).
 
1044
            t2 = t.clone('..')
 
1045
            while t2.base != t.base:
 
1046
                t = t2
 
1047
                t2 = t.clone('..')
 
1048
        self.permit_url(t.base)
 
1049
 
 
1050
    def _track_transports(self):
 
1051
        """Install checks for transport usage."""
 
1052
        # TestCase has no safe place it can write to.
 
1053
        self._bzr_selftest_roots = []
 
1054
        # Currently the easiest way to be sure that nothing is going on is to
 
1055
        # hook into bzr dir opening. This leaves a small window of error for
 
1056
        # transport tests, but they are well known, and we can improve on this
 
1057
        # step.
 
1058
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1059
            self._preopen_isolate_transport, "Check bzr directories are safe.")
942
1060
 
943
1061
    def _ndiff_strings(self, a, b):
944
1062
        """Return ndiff between two strings containing lines.
982
1100
            return
983
1101
        if message is None:
984
1102
            message = "texts not equal:\n"
 
1103
        if a + '\n' == b:
 
1104
            message = 'first string is missing a final newline.\n'
985
1105
        if a == b + '\n':
986
 
            message = 'first string is missing a final newline.\n'
987
 
        if a + '\n' == b:
988
1106
            message = 'second string is missing a final newline.\n'
989
1107
        raise AssertionError(message +
990
1108
                             self._ndiff_strings(a, b))
1001
1119
        :raises AssertionError: If the expected and actual stat values differ
1002
1120
            other than by atime.
1003
1121
        """
1004
 
        self.assertEqual(expected.st_size, actual.st_size)
1005
 
        self.assertEqual(expected.st_mtime, actual.st_mtime)
1006
 
        self.assertEqual(expected.st_ctime, actual.st_ctime)
1007
 
        self.assertEqual(expected.st_dev, actual.st_dev)
1008
 
        self.assertEqual(expected.st_ino, actual.st_ino)
1009
 
        self.assertEqual(expected.st_mode, actual.st_mode)
 
1122
        self.assertEqual(expected.st_size, actual.st_size,
 
1123
                         'st_size did not match')
 
1124
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1125
                         'st_mtime did not match')
 
1126
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1127
                         'st_ctime did not match')
 
1128
        if sys.platform != 'win32':
 
1129
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1130
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1131
            # odd. Regardless we shouldn't actually try to assert anything
 
1132
            # about their values
 
1133
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1134
                             'st_dev did not match')
 
1135
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1136
                             'st_ino did not match')
 
1137
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1138
                         'st_mode did not match')
1010
1139
 
1011
1140
    def assertLength(self, length, obj_with_len):
1012
1141
        """Assert that obj_with_len is of length length."""
1014
1143
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
1015
1144
                length, len(obj_with_len), obj_with_len))
1016
1145
 
 
1146
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1147
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
 
1148
        """
 
1149
        from bzrlib import trace
 
1150
        captured = []
 
1151
        orig_log_exception_quietly = trace.log_exception_quietly
 
1152
        try:
 
1153
            def capture():
 
1154
                orig_log_exception_quietly()
 
1155
                captured.append(sys.exc_info())
 
1156
            trace.log_exception_quietly = capture
 
1157
            func(*args, **kwargs)
 
1158
        finally:
 
1159
            trace.log_exception_quietly = orig_log_exception_quietly
 
1160
        self.assertLength(1, captured)
 
1161
        err = captured[0][1]
 
1162
        self.assertIsInstance(err, exception_class)
 
1163
        return err
 
1164
 
1017
1165
    def assertPositive(self, val):
1018
1166
        """Assert that val is greater than 0."""
1019
1167
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1380
1528
            'BZR_PROGRESS_BAR': None,
1381
1529
            'BZR_LOG': None,
1382
1530
            'BZR_PLUGIN_PATH': None,
 
1531
            'BZR_CONCURRENCY': None,
1383
1532
            # Make sure that any text ui tests are consistent regardless of
1384
1533
            # the environment the test case is run in; you may want tests that
1385
1534
            # test other combinations.  'dumb' is a reasonable guess for tests
1387
1536
            'TERM': 'dumb',
1388
1537
            'LINES': '25',
1389
1538
            'COLUMNS': '80',
 
1539
            'BZR_COLUMNS': '80',
1390
1540
            # SSH Agent
1391
1541
            'SSH_AUTH_SOCK': None,
1392
1542
            # Proxies
1433
1583
    def _do_skip(self, result, reason):
1434
1584
        addSkip = getattr(result, 'addSkip', None)
1435
1585
        if not callable(addSkip):
1436
 
            result.addError(self, sys.exc_info())
 
1586
            result.addSuccess(result)
1437
1587
        else:
1438
1588
            addSkip(self, reason)
1439
1589
 
 
1590
    def _do_known_failure(self, result):
 
1591
        err = sys.exc_info()
 
1592
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1593
        if addExpectedFailure is not None:
 
1594
            addExpectedFailure(self, err)
 
1595
        else:
 
1596
            result.addSuccess(self)
 
1597
 
 
1598
    def _do_not_applicable(self, result, e):
 
1599
        if not e.args:
 
1600
            reason = 'No reason given'
 
1601
        else:
 
1602
            reason = e.args[0]
 
1603
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1604
        if addNotApplicable is not None:
 
1605
            result.addNotApplicable(self, reason)
 
1606
        else:
 
1607
            self._do_skip(result, reason)
 
1608
 
 
1609
    def _do_unsupported_or_skip(self, result, reason):
 
1610
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1611
        if addNotSupported is not None:
 
1612
            result.addNotSupported(self, reason)
 
1613
        else:
 
1614
            self._do_skip(result, reason)
 
1615
 
1440
1616
    def run(self, result=None):
1441
1617
        if result is None: result = self.defaultTestResult()
 
1618
        result.startTest(self)
 
1619
        try:
 
1620
            self._run(result)
 
1621
            return result
 
1622
        finally:
 
1623
            result.stopTest(self)
 
1624
 
 
1625
    def _run(self, result):
1442
1626
        for feature in getattr(self, '_test_needs_features', []):
1443
1627
            if not feature.available():
1444
 
                result.startTest(self)
1445
 
                if getattr(result, 'addNotSupported', None):
1446
 
                    result.addNotSupported(self, feature)
1447
 
                else:
1448
 
                    result.addSuccess(self)
1449
 
                result.stopTest(self)
1450
 
                return result
 
1628
                return self._do_unsupported_or_skip(result, feature)
1451
1629
        try:
 
1630
            absent_attr = object()
 
1631
            # Python 2.5
 
1632
            method_name = getattr(self, '_testMethodName', absent_attr)
 
1633
            if method_name is absent_attr:
 
1634
                # Python 2.4
 
1635
                method_name = getattr(self, '_TestCase__testMethodName')
 
1636
            testMethod = getattr(self, method_name)
1452
1637
            try:
1453
 
                result.startTest(self)
1454
 
                absent_attr = object()
1455
 
                # Python 2.5
1456
 
                method_name = getattr(self, '_testMethodName', absent_attr)
1457
 
                if method_name is absent_attr:
1458
 
                    # Python 2.4
1459
 
                    method_name = getattr(self, '_TestCase__testMethodName')
1460
 
                testMethod = getattr(self, method_name)
1461
 
                try:
1462
 
                    try:
1463
 
                        self.setUp()
1464
 
                        if not self._bzr_test_setUp_run:
1465
 
                            self.fail(
1466
 
                                "test setUp did not invoke "
1467
 
                                "bzrlib.tests.TestCase's setUp")
1468
 
                    except KeyboardInterrupt:
1469
 
                        self._runCleanups()
1470
 
                        raise
1471
 
                    except TestSkipped, e:
1472
 
                        self._do_skip(result, e.args[0])
1473
 
                        self.tearDown()
1474
 
                        return result
1475
 
                    except:
1476
 
                        result.addError(self, sys.exc_info())
1477
 
                        self._runCleanups()
1478
 
                        return result
1479
 
 
 
1638
                try:
 
1639
                    self.setUp()
 
1640
                    if not self._bzr_test_setUp_run:
 
1641
                        self.fail(
 
1642
                            "test setUp did not invoke "
 
1643
                            "bzrlib.tests.TestCase's setUp")
 
1644
                except KeyboardInterrupt:
 
1645
                    self._runCleanups()
 
1646
                    raise
 
1647
                except KnownFailure:
 
1648
                    self._do_known_failure(result)
 
1649
                    self.tearDown()
 
1650
                    return
 
1651
                except TestNotApplicable, e:
 
1652
                    self._do_not_applicable(result, e)
 
1653
                    self.tearDown()
 
1654
                    return
 
1655
                except TestSkipped, e:
 
1656
                    self._do_skip(result, e.args[0])
 
1657
                    self.tearDown()
 
1658
                    return result
 
1659
                except UnavailableFeature, e:
 
1660
                    self._do_unsupported_or_skip(result, e.args[0])
 
1661
                    self.tearDown()
 
1662
                    return
 
1663
                except:
 
1664
                    result.addError(self, sys.exc_info())
 
1665
                    self._runCleanups()
 
1666
                    return result
 
1667
 
 
1668
                ok = False
 
1669
                try:
 
1670
                    testMethod()
 
1671
                    ok = True
 
1672
                except KnownFailure:
 
1673
                    self._do_known_failure(result)
 
1674
                except self.failureException:
 
1675
                    result.addFailure(self, sys.exc_info())
 
1676
                except TestNotApplicable, e:
 
1677
                    self._do_not_applicable(result, e)
 
1678
                except TestSkipped, e:
 
1679
                    if not e.args:
 
1680
                        reason = "No reason given."
 
1681
                    else:
 
1682
                        reason = e.args[0]
 
1683
                    self._do_skip(result, reason)
 
1684
                except UnavailableFeature, e:
 
1685
                    self._do_unsupported_or_skip(result, e.args[0])
 
1686
                except KeyboardInterrupt:
 
1687
                    self._runCleanups()
 
1688
                    raise
 
1689
                except:
 
1690
                    result.addError(self, sys.exc_info())
 
1691
 
 
1692
                try:
 
1693
                    self.tearDown()
 
1694
                    if not self._bzr_test_tearDown_run:
 
1695
                        self.fail(
 
1696
                            "test tearDown did not invoke "
 
1697
                            "bzrlib.tests.TestCase's tearDown")
 
1698
                except KeyboardInterrupt:
 
1699
                    self._runCleanups()
 
1700
                    raise
 
1701
                except:
 
1702
                    result.addError(self, sys.exc_info())
 
1703
                    self._runCleanups()
1480
1704
                    ok = False
1481
 
                    try:
1482
 
                        testMethod()
1483
 
                        ok = True
1484
 
                    except self.failureException:
1485
 
                        result.addFailure(self, sys.exc_info())
1486
 
                    except TestSkipped, e:
1487
 
                        if not e.args:
1488
 
                            reason = "No reason given."
1489
 
                        else:
1490
 
                            reason = e.args[0]
1491
 
                        self._do_skip(result, reason)
1492
 
                    except KeyboardInterrupt:
1493
 
                        self._runCleanups()
1494
 
                        raise
1495
 
                    except:
1496
 
                        result.addError(self, sys.exc_info())
1497
 
 
1498
 
                    try:
1499
 
                        self.tearDown()
1500
 
                        if not self._bzr_test_tearDown_run:
1501
 
                            self.fail(
1502
 
                                "test tearDown did not invoke "
1503
 
                                "bzrlib.tests.TestCase's tearDown")
1504
 
                    except KeyboardInterrupt:
1505
 
                        self._runCleanups()
1506
 
                        raise
1507
 
                    except:
1508
 
                        result.addError(self, sys.exc_info())
1509
 
                        self._runCleanups()
1510
 
                        ok = False
1511
 
                    if ok: result.addSuccess(self)
1512
 
                finally:
1513
 
                    result.stopTest(self)
 
1705
                if ok: result.addSuccess(self)
1514
1706
                return result
1515
 
            except TestNotApplicable:
1516
 
                # Not moved from the result [yet].
1517
 
                self._runCleanups()
1518
 
                raise
1519
1707
            except KeyboardInterrupt:
1520
1708
                self._runCleanups()
1521
1709
                raise
1629
1817
 
1630
1818
    def _run_bzr_core(self, args, retcode, encoding, stdin,
1631
1819
            working_dir):
 
1820
        # Clear chk_map page cache, because the contents are likely to mask
 
1821
        # locking errors.
 
1822
        chk_map.clear_cache()
1632
1823
        if encoding is None:
1633
1824
            encoding = osutils.get_user_encoding()
1634
1825
        stdout = StringIOWrapper()
1651
1842
            os.chdir(working_dir)
1652
1843
 
1653
1844
        try:
1654
 
            result = self.apply_redirected(ui.ui_factory.stdin,
1655
 
                stdout, stderr,
1656
 
                bzrlib.commands.run_bzr_catch_user_errors,
1657
 
                args)
 
1845
            try:
 
1846
                result = self.apply_redirected(ui.ui_factory.stdin,
 
1847
                    stdout, stderr,
 
1848
                    bzrlib.commands.run_bzr_catch_user_errors,
 
1849
                    args)
 
1850
            except KeyboardInterrupt:
 
1851
                # Reraise KeyboardInterrupt with contents of redirected stdout
 
1852
                # and stderr as arguments, for tests which are interested in
 
1853
                # stdout and stderr and are expecting the exception.
 
1854
                out = stdout.getvalue()
 
1855
                err = stderr.getvalue()
 
1856
                if out:
 
1857
                    self.log('output:\n%r', out)
 
1858
                if err:
 
1859
                    self.log('errors:\n%r', err)
 
1860
                raise KeyboardInterrupt(out, err)
1658
1861
        finally:
1659
1862
            logger.removeHandler(handler)
1660
1863
            ui.ui_factory = old_ui_factory
1670
1873
        if retcode is not None:
1671
1874
            self.assertEquals(retcode, result,
1672
1875
                              message='Unexpected return code')
1673
 
        return out, err
 
1876
        return result, out, err
1674
1877
 
1675
1878
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1676
1879
                working_dir=None, error_regexes=[], output_encoding=None):
1705
1908
        :keyword error_regexes: A list of expected error messages.  If
1706
1909
            specified they must be seen in the error output of the command.
1707
1910
        """
1708
 
        out, err = self._run_bzr_autosplit(
 
1911
        retcode, out, err = self._run_bzr_autosplit(
1709
1912
            args=args,
1710
1913
            retcode=retcode,
1711
1914
            encoding=encoding,
1862
2065
        """
1863
2066
        return Popen(*args, **kwargs)
1864
2067
 
 
2068
    def get_source_path(self):
 
2069
        """Return the path of the directory containing bzrlib."""
 
2070
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2071
 
1865
2072
    def get_bzr_path(self):
1866
2073
        """Return the path of the 'bzr' executable for this test suite."""
1867
 
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
2074
        bzr_path = self.get_source_path()+'/bzr'
1868
2075
        if not os.path.isfile(bzr_path):
1869
2076
            # We are probably installed. Assume sys.argv is the right file
1870
2077
            bzr_path = sys.argv[0]
2196
2403
        propagating. This method ensures than a test did not leaked.
2197
2404
        """
2198
2405
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2406
        self.permit_url(get_transport(root).base)
2199
2407
        wt = workingtree.WorkingTree.open(root)
2200
2408
        last_rev = wt.last_revision()
2201
2409
        if last_rev != 'null:':
2203
2411
            # recreate a new one or all the followng tests will fail.
2204
2412
            # If you need to inspect its content uncomment the following line
2205
2413
            # import pdb; pdb.set_trace()
2206
 
            _rmtree_temp_dir(root + '/.bzr')
 
2414
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2207
2415
            self._create_safety_net()
2208
2416
            raise AssertionError('%s/.bzr should not be modified' % root)
2209
2417
 
2210
2418
    def _make_test_root(self):
2211
2419
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
2212
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2420
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2421
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2422
                                                    suffix='.tmp'))
2213
2423
            TestCaseWithMemoryTransport.TEST_ROOT = root
2214
2424
 
2215
2425
            self._create_safety_net()
2218
2428
            # specifically told when all tests are finished.  This will do.
2219
2429
            atexit.register(_rmtree_temp_dir, root)
2220
2430
 
 
2431
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2221
2432
        self.addCleanup(self._check_safety_net)
2222
2433
 
2223
2434
    def makeAndChdirToTestDir(self):
2231
2442
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2232
2443
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2233
2444
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2445
        self.permit_dir(self.test_dir)
2234
2446
 
2235
2447
    def make_branch(self, relpath, format=None):
2236
2448
        """Create a branch on the transport at relpath."""
2281
2493
        return branchbuilder.BranchBuilder(branch=branch)
2282
2494
 
2283
2495
    def overrideEnvironmentForTesting(self):
2284
 
        os.environ['HOME'] = self.test_home_dir
2285
 
        os.environ['BZR_HOME'] = self.test_home_dir
 
2496
        test_home_dir = self.test_home_dir
 
2497
        if isinstance(test_home_dir, unicode):
 
2498
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2499
        os.environ['HOME'] = test_home_dir
 
2500
        os.environ['BZR_HOME'] = test_home_dir
2286
2501
 
2287
2502
    def setUp(self):
2288
2503
        super(TestCaseWithMemoryTransport, self).setUp()
2368
2583
            if os.path.exists(name):
2369
2584
                name = name_prefix + '_' + str(i)
2370
2585
            else:
2371
 
                os.mkdir(name)
 
2586
                # now create test and home directories within this dir
 
2587
                self.test_base_dir = name
 
2588
                self.addCleanup(self.deleteTestDir)
 
2589
                os.mkdir(self.test_base_dir)
2372
2590
                break
2373
 
        # now create test and home directories within this dir
2374
 
        self.test_base_dir = name
 
2591
        self.permit_dir(self.test_base_dir)
 
2592
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2593
        # stacking policy to honour; create a bzr dir with an unshared
 
2594
        # repository (but not a branch - our code would be trying to escape
 
2595
        # then!) to stop them, and permit it to be read.
 
2596
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2597
        # control.create_repository()
2375
2598
        self.test_home_dir = self.test_base_dir + '/home'
2376
2599
        os.mkdir(self.test_home_dir)
2377
2600
        self.test_dir = self.test_base_dir + '/work'
2383
2606
            f.write(self.id())
2384
2607
        finally:
2385
2608
            f.close()
2386
 
        self.addCleanup(self.deleteTestDir)
2387
2609
 
2388
2610
    def deleteTestDir(self):
2389
2611
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2390
 
        _rmtree_temp_dir(self.test_base_dir)
 
2612
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2391
2613
 
2392
2614
    def build_tree(self, shape, line_endings='binary', transport=None):
2393
2615
        """Build a test tree according to a pattern.
2554
2776
        super(TestCaseWithTransport, self).setUp()
2555
2777
        self.__vfs_server = None
2556
2778
 
 
2779
    def disable_missing_extensions_warning(self):
 
2780
        """Some tests expect a precise stderr content.
 
2781
 
 
2782
        There is no point in forcing them to duplicate the extension related
 
2783
        warning.
 
2784
        """
 
2785
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
2786
 
2557
2787
 
2558
2788
class ChrootedTestCase(TestCaseWithTransport):
2559
2789
    """A support class that provides readonly urls outside the local namespace.
3052
3282
    concurrency = osutils.local_concurrency()
3053
3283
    result = []
3054
3284
    from subunit import TestProtocolClient, ProtocolTestCase
3055
 
    try:
3056
 
        from subunit.test_results import AutoTimingTestResultDecorator
3057
 
    except ImportError:
3058
 
        AutoTimingTestResultDecorator = lambda x:x
3059
3285
    class TestInOtherProcess(ProtocolTestCase):
3060
3286
        # Should be in subunit, I think. RBC.
3061
3287
        def __init__(self, stream, pid):
3084
3310
                sys.stdin.close()
3085
3311
                sys.stdin = None
3086
3312
                stream = os.fdopen(c2pwrite, 'wb', 1)
3087
 
                subunit_result = AutoTimingTestResultDecorator(
 
3313
                subunit_result = BzrAutoTimingTestResultDecorator(
3088
3314
                    TestProtocolClient(stream))
3089
3315
                process_suite.run(subunit_result)
3090
3316
            finally:
3127
3353
        if not os.path.isfile(bzr_path):
3128
3354
            # We are probably installed. Assume sys.argv is the right file
3129
3355
            bzr_path = sys.argv[0]
 
3356
        bzr_path = [bzr_path]
 
3357
        if sys.platform == "win32":
 
3358
            # if we're on windows, we can't execute the bzr script directly
 
3359
            bzr_path = [sys.executable] + bzr_path
3130
3360
        fd, test_list_file_name = tempfile.mkstemp()
3131
3361
        test_list_file = os.fdopen(fd, 'wb', 1)
3132
3362
        for test in process_tests:
3133
3363
            test_list_file.write(test.id() + '\n')
3134
3364
        test_list_file.close()
3135
3365
        try:
3136
 
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3366
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3137
3367
                '--subunit']
3138
3368
            if '--no-plugins' in sys.argv:
3139
3369
                argv.append('--no-plugins')
3192
3422
    def addFailure(self, test, err):
3193
3423
        known = self._error_looks_like('KnownFailure: ', err)
3194
3424
        if known is not None:
3195
 
            self.result._addKnownFailure(test, [KnownFailure,
3196
 
                                                KnownFailure(known), None])
 
3425
            self.result.addExpectedFailure(test,
 
3426
                [KnownFailure, KnownFailure(known), None])
3197
3427
        else:
3198
3428
            self.result.addFailure(test, err)
3199
3429
 
3214
3444
        return value
3215
3445
 
3216
3446
 
 
3447
try:
 
3448
    from subunit.test_results import AutoTimingTestResultDecorator
 
3449
    # Expected failure should be seen as a success not a failure Once subunit
 
3450
    # provide native support for that, BZRTransformingResult and this class
 
3451
    # will become useless.
 
3452
    class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
 
3453
 
 
3454
        def addExpectedFailure(self, test, err):
 
3455
            self._before_event()
 
3456
            return self._call_maybe("addExpectedFailure", self._degrade_skip,
 
3457
                                    test, err)
 
3458
except ImportError:
 
3459
    # Let's just define a no-op decorator
 
3460
    BzrAutoTimingTestResultDecorator = lambda x:x
 
3461
 
 
3462
 
3217
3463
class ProfileResult(ForwardingResult):
3218
3464
    """Generate profiling data for all activity between start and success.
3219
3465
    
3494
3740
        'bzrlib.tests.commands',
3495
3741
        'bzrlib.tests.per_branch',
3496
3742
        'bzrlib.tests.per_bzrdir',
 
3743
        'bzrlib.tests.per_foreign_vcs',
3497
3744
        'bzrlib.tests.per_interrepository',
3498
3745
        'bzrlib.tests.per_intertree',
3499
3746
        'bzrlib.tests.per_inventory',
3500
3747
        'bzrlib.tests.per_interbranch',
3501
3748
        'bzrlib.tests.per_lock',
 
3749
        'bzrlib.tests.per_merger',
3502
3750
        'bzrlib.tests.per_transport',
3503
3751
        'bzrlib.tests.per_tree',
3504
3752
        'bzrlib.tests.per_pack_repository',
3505
3753
        'bzrlib.tests.per_repository',
3506
3754
        'bzrlib.tests.per_repository_chk',
3507
3755
        'bzrlib.tests.per_repository_reference',
 
3756
        'bzrlib.tests.per_uifactory',
3508
3757
        'bzrlib.tests.per_versionedfile',
3509
3758
        'bzrlib.tests.per_workingtree',
3510
3759
        'bzrlib.tests.test__annotator',
3513
3762
        'bzrlib.tests.test__groupcompress',
3514
3763
        'bzrlib.tests.test__known_graph',
3515
3764
        'bzrlib.tests.test__rio',
 
3765
        'bzrlib.tests.test__simple_set',
 
3766
        'bzrlib.tests.test__static_tuple',
3516
3767
        'bzrlib.tests.test__walkdirs_win32',
3517
3768
        'bzrlib.tests.test_ancestry',
3518
3769
        'bzrlib.tests.test_annotate',
3533
3784
        'bzrlib.tests.test_chk_serializer',
3534
3785
        'bzrlib.tests.test_chunk_writer',
3535
3786
        'bzrlib.tests.test_clean_tree',
 
3787
        'bzrlib.tests.test_cleanup',
3536
3788
        'bzrlib.tests.test_commands',
3537
3789
        'bzrlib.tests.test_commit',
3538
3790
        'bzrlib.tests.test_commit_merge',
3618
3870
        'bzrlib.tests.test_rio',
3619
3871
        'bzrlib.tests.test_rules',
3620
3872
        'bzrlib.tests.test_sampler',
 
3873
        'bzrlib.tests.test_script',
3621
3874
        'bzrlib.tests.test_selftest',
3622
3875
        'bzrlib.tests.test_serializer',
3623
3876
        'bzrlib.tests.test_setup',
3885
4138
    return new_test
3886
4139
 
3887
4140
 
3888
 
def _rmtree_temp_dir(dirname):
 
4141
def _rmtree_temp_dir(dirname, test_id=None):
3889
4142
    # If LANG=C we probably have created some bogus paths
3890
4143
    # which rmtree(unicode) will fail to delete
3891
4144
    # so make sure we are using rmtree(str) to delete everything
3903
4156
        # We don't want to fail here because some useful display will be lost
3904
4157
        # otherwise. Polluting the tmp dir is bad, but not giving all the
3905
4158
        # possible info to the test runner is even worse.
 
4159
        if test_id != None:
 
4160
            ui.ui_factory.clear_term()
 
4161
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
3906
4162
        sys.stderr.write('Unable to remove testing dir %s\n%s'
3907
4163
                         % (os.path.basename(dirname), e))
3908
4164
 
3992
4248
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3993
4249
 
3994
4250
 
 
4251
class ModuleAvailableFeature(Feature):
 
4252
    """This is a feature than describes a module we want to be available.
 
4253
 
 
4254
    Declare the name of the module in __init__(), and then after probing, the
 
4255
    module will be available as 'self.module'.
 
4256
 
 
4257
    :ivar module: The module if it is available, else None.
 
4258
    """
 
4259
 
 
4260
    def __init__(self, module_name):
 
4261
        super(ModuleAvailableFeature, self).__init__()
 
4262
        self.module_name = module_name
 
4263
 
 
4264
    def _probe(self):
 
4265
        try:
 
4266
            self._module = __import__(self.module_name, {}, {}, [''])
 
4267
            return True
 
4268
        except ImportError:
 
4269
            return False
 
4270
 
 
4271
    @property
 
4272
    def module(self):
 
4273
        if self.available(): # Make sure the probe has been done
 
4274
            return self._module
 
4275
        return None
 
4276
    
 
4277
    def feature_name(self):
 
4278
        return self.module_name
 
4279
 
 
4280
 
 
4281
 
3995
4282
def probe_unicode_in_user_encoding():
3996
4283
    """Try to encode several unicode strings to use in unicode-aware tests.
3997
4284
    Return first successfull match.
4046
4333
HTTPSServerFeature = _HTTPSServerFeature()
4047
4334
 
4048
4335
 
 
4336
class _ParamikoFeature(Feature):
 
4337
    """Is paramiko available?"""
 
4338
 
 
4339
    def _probe(self):
 
4340
        try:
 
4341
            from bzrlib.transport.sftp import SFTPAbsoluteServer
 
4342
            return True
 
4343
        except errors.ParamikoNotPresent:
 
4344
            return False
 
4345
 
 
4346
    def feature_name(self):
 
4347
        return "Paramiko"
 
4348
 
 
4349
 
 
4350
ParamikoFeature = _ParamikoFeature()
 
4351
 
 
4352
 
4049
4353
class _UnicodeFilename(Feature):
4050
4354
    """Does the filesystem support Unicode filenames?"""
4051
4355
 
4077
4381
UTF8Filesystem = _UTF8Filesystem()
4078
4382
 
4079
4383
 
 
4384
class _BreakinFeature(Feature):
 
4385
    """Does this platform support the breakin feature?"""
 
4386
 
 
4387
    def _probe(self):
 
4388
        from bzrlib import breakin
 
4389
        if breakin.determine_signal() is None:
 
4390
            return False
 
4391
        if sys.platform == 'win32':
 
4392
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4393
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4394
            # access the function
 
4395
            try:
 
4396
                import ctypes
 
4397
            except OSError:
 
4398
                return False
 
4399
        return True
 
4400
 
 
4401
    def feature_name(self):
 
4402
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4403
 
 
4404
 
 
4405
BreakinFeature = _BreakinFeature()
 
4406
 
 
4407
 
4080
4408
class _CaseInsCasePresFilenameFeature(Feature):
4081
4409
    """Is the file-system case insensitive, but case-preserving?"""
4082
4410
 
4149
4477
# Only define SubUnitBzrRunner if subunit is available.
4150
4478
try:
4151
4479
    from subunit import TestProtocolClient
4152
 
    try:
4153
 
        from subunit.test_results import AutoTimingTestResultDecorator
4154
 
    except ImportError:
4155
 
        AutoTimingTestResultDecorator = lambda x:x
4156
4480
    class SubUnitBzrRunner(TextTestRunner):
4157
4481
        def run(self, test):
4158
 
            result = AutoTimingTestResultDecorator(
 
4482
            result = BzrAutoTimingTestResultDecorator(
4159
4483
                TestProtocolClient(self.stream))
4160
4484
            test.run(result)
4161
4485
            return result