/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2010-03-21 21:39:33 UTC
  • mfrom: (5102 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5143.
  • Revision ID: jelmer@samba.org-20100321213933-fexeh9zcoz8oaju2
merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
46
46
import tempfile
47
47
import threading
48
48
import time
 
49
import traceback
49
50
import unittest
50
51
import warnings
51
52
 
 
53
import testtools
 
54
# nb: check this before importing anything else from within it
 
55
_testtools_version = getattr(testtools, '__version__', ())
 
56
if _testtools_version < (0, 9, 2):
 
57
    raise ImportError("need at least testtools 0.9.2: %s is %r"
 
58
        % (testtools.__file__, _testtools_version))
 
59
from testtools import content
52
60
 
53
61
from bzrlib import (
54
62
    branchbuilder,
88
96
from bzrlib.symbol_versioning import (
89
97
    DEPRECATED_PARAMETER,
90
98
    deprecated_function,
 
99
    deprecated_in,
91
100
    deprecated_method,
92
101
    deprecated_passed,
93
102
    )
94
103
import bzrlib.trace
95
 
from bzrlib.transport import get_transport, pathfilter
 
104
from bzrlib.transport import (
 
105
    get_transport,
 
106
    memory,
 
107
    pathfilter,
 
108
    )
96
109
import bzrlib.transport
97
 
from bzrlib.transport.local import LocalURLServer
98
 
from bzrlib.transport.memory import MemoryServer
99
 
from bzrlib.transport.readonly import ReadonlyServer
100
110
from bzrlib.trace import mutter, note
101
 
from bzrlib.tests import TestUtil
 
111
from bzrlib.tests import (
 
112
    test_server,
 
113
    TestUtil,
 
114
    )
102
115
from bzrlib.tests.http_server import HttpServer
103
116
from bzrlib.tests.TestUtil import (
104
117
                          TestSuite,
115
128
# shown frame is the test code, not our assertXYZ.
116
129
__unittest = 1
117
130
 
118
 
default_transport = LocalURLServer
 
131
default_transport = test_server.LocalURLServer
 
132
 
 
133
 
 
134
_unitialized_attr = object()
 
135
"""A sentinel needed to act as a default value in a method signature."""
 
136
 
119
137
 
120
138
# Subunit result codes, defined here to prevent a hard dependency on subunit.
121
139
SUBUNIT_SEEK_SET = 0
228
246
                '%d non-main threads were left active in the end.\n'
229
247
                % (TestCase._active_threads - 1))
230
248
 
231
 
    def _extractBenchmarkTime(self, testCase):
 
249
    def getDescription(self, test):
 
250
        return test.id()
 
251
 
 
252
    def _extractBenchmarkTime(self, testCase, details=None):
232
253
        """Add a benchmark time for the current test case."""
 
254
        if details and 'benchtime' in details:
 
255
            return float(''.join(details['benchtime'].iter_bytes()))
233
256
        return getattr(testCase, "_benchtime", None)
234
257
 
235
258
    def _elapsedTestTimeString(self):
269
292
        else:
270
293
            bzr_path = sys.executable
271
294
        self.stream.write(
272
 
            'testing: %s\n' % (bzr_path,))
 
295
            'bzr selftest: %s\n' % (bzr_path,))
273
296
        self.stream.write(
274
297
            '   %s\n' % (
275
298
                    bzrlib.__path__[0],))
320
343
            self.stop()
321
344
        self._cleanupLogFile(test)
322
345
 
323
 
    def addSuccess(self, test):
 
346
    def addSuccess(self, test, details=None):
324
347
        """Tell result that test completed successfully.
325
348
 
326
349
        Called from the TestCase run()
327
350
        """
328
351
        if self._bench_history is not None:
329
 
            benchmark_time = self._extractBenchmarkTime(test)
 
352
            benchmark_time = self._extractBenchmarkTime(test, details)
330
353
            if benchmark_time is not None:
331
354
                self._bench_history.write("%s %s\n" % (
332
355
                    self._formatTime(benchmark_time),
362
385
        self.not_applicable_count += 1
363
386
        self.report_not_applicable(test, reason)
364
387
 
365
 
    def printErrorList(self, flavour, errors):
366
 
        for test, err in errors:
367
 
            self.stream.writeln(self.separator1)
368
 
            self.stream.write("%s: " % flavour)
369
 
            self.stream.writeln(self.getDescription(test))
370
 
            if getattr(test, '_get_log', None) is not None:
371
 
                log_contents = test._get_log()
372
 
                if log_contents:
373
 
                    self.stream.write('\n')
374
 
                    self.stream.write(
375
 
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
376
 
                    self.stream.write('\n')
377
 
                    self.stream.write(log_contents)
378
 
                    self.stream.write('\n')
379
 
                    self.stream.write(
380
 
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
381
 
                    self.stream.write('\n')
382
 
            self.stream.writeln(self.separator2)
383
 
            self.stream.writeln("%s" % err)
384
 
 
385
388
    def _post_mortem(self):
386
389
        """Start a PDB post mortem session."""
387
390
        if os.environ.get('BZR_TEST_PDB', None):
467
470
            a += '%dm%ds' % (runtime / 60, runtime % 60)
468
471
        else:
469
472
            a += '%ds' % runtime
470
 
        if self.error_count:
471
 
            a += ', %d err' % self.error_count
472
 
        if self.failure_count:
473
 
            a += ', %d fail' % self.failure_count
 
473
        total_fail_count = self.error_count + self.failure_count
 
474
        if total_fail_count:
 
475
            a += ', %d failed' % total_fail_count
474
476
        # if self.unsupported:
475
477
        #     a += ', %d missing' % len(self.unsupported)
476
478
        a += ']'
487
489
        return self._shortened_test_description(test)
488
490
 
489
491
    def report_error(self, test, err):
490
 
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
 
492
        self.ui.note('ERROR: %s\n    %s\n' % (
491
493
            self._test_description(test),
492
494
            err[1],
493
495
            ))
494
496
 
495
497
    def report_failure(self, test, err):
496
 
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
 
498
        self.ui.note('FAIL: %s\n    %s\n' % (
497
499
            self._test_description(test),
498
500
            err[1],
499
501
            ))
500
502
 
501
503
    def report_known_failure(self, test, err):
502
 
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
503
 
            self._test_description(test), err[1]))
 
504
        pass
504
505
 
505
506
    def report_skip(self, test, reason):
506
507
        pass
604
605
            applied left to right - the first element in the list is the 
605
606
            innermost decorator.
606
607
        """
 
608
        # stream may know claim to know to write unicode strings, but in older
 
609
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
610
        # specifically a built in file with encoding 'UTF-8' will still try
 
611
        # to encode using ascii.
 
612
        new_encoding = osutils.get_terminal_encoding()
 
613
        codec = codecs.lookup(new_encoding)
 
614
        if type(codec) is tuple:
 
615
            # Python 2.4
 
616
            encode = codec[0]
 
617
        else:
 
618
            encode = codec.encode
 
619
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
 
620
        stream.encoding = new_encoding
607
621
        self.stream = unittest._WritelnDecorator(stream)
608
622
        self.descriptions = descriptions
609
623
        self.verbosity = verbosity
629
643
        for decorator in self._result_decorators:
630
644
            result = decorator(result)
631
645
            result.stop_early = self.stop_on_failure
632
 
        try:
633
 
            import testtools
634
 
        except ImportError:
635
 
            pass
636
 
        else:
637
 
            if isinstance(test, testtools.ConcurrentTestSuite):
638
 
                # We need to catch bzr specific behaviors
639
 
                result = BZRTransformingResult(result)
640
646
        result.startTestRun()
641
647
        try:
642
648
            test.run(result)
660
666
                        % (type(suite), suite))
661
667
 
662
668
 
663
 
class TestSkipped(Exception):
664
 
    """Indicates that a test was intentionally skipped, rather than failing."""
 
669
TestSkipped = testtools.testcase.TestSkipped
665
670
 
666
671
 
667
672
class TestNotApplicable(TestSkipped):
673
678
    """
674
679
 
675
680
 
676
 
class KnownFailure(AssertionError):
677
 
    """Indicates that a test failed in a precisely expected manner.
678
 
 
679
 
    Such failures dont block the whole test suite from passing because they are
680
 
    indicators of partially completed code or of future work. We have an
681
 
    explicit error for them so that we can ensure that they are always visible:
682
 
    KnownFailures are always shown in the output of bzr selftest.
683
 
    """
 
681
# traceback._some_str fails to format exceptions that have the default
 
682
# __str__ which does an implicit ascii conversion. However, repr() on those
 
683
# objects works, for all that its not quite what the doctor may have ordered.
 
684
def _clever_some_str(value):
 
685
    try:
 
686
        return str(value)
 
687
    except:
 
688
        try:
 
689
            return repr(value).replace('\\n', '\n')
 
690
        except:
 
691
            return '<unprintable %s object>' % type(value).__name__
 
692
 
 
693
traceback._some_str = _clever_some_str
 
694
 
 
695
 
 
696
# deprecated - use self.knownFailure(), or self.expectFailure.
 
697
KnownFailure = testtools.testcase._ExpectedFailure
684
698
 
685
699
 
686
700
class UnavailableFeature(Exception):
692
706
    """
693
707
 
694
708
 
695
 
class CommandFailed(Exception):
696
 
    pass
697
 
 
698
 
 
699
709
class StringIOWrapper(object):
700
710
    """A wrapper around cStringIO which just adds an encoding attribute.
701
711
 
762
772
        return NullProgressView()
763
773
 
764
774
 
765
 
class TestCase(unittest.TestCase):
 
775
class TestCase(testtools.TestCase):
766
776
    """Base class for bzr unit tests.
767
777
 
768
778
    Tests that need access to disk resources should subclass
787
797
    _leaking_threads_tests = 0
788
798
    _first_thread_leaker_id = None
789
799
    _log_file_name = None
790
 
    _log_contents = ''
791
 
    _keep_log_file = False
792
800
    # record lsprof data when performing benchmark calls.
793
801
    _gather_lsprof_in_benchmarks = False
794
 
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
795
 
                     '_log_contents', '_log_file_name', '_benchtime',
796
 
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
797
802
 
798
803
    def __init__(self, methodName='testMethod'):
799
804
        super(TestCase, self).__init__(methodName)
800
805
        self._cleanups = []
801
 
        self._bzr_test_setUp_run = False
802
 
        self._bzr_test_tearDown_run = False
803
806
        self._directory_isolation = True
 
807
        self.exception_handlers.insert(0,
 
808
            (UnavailableFeature, self._do_unsupported_or_skip))
 
809
        self.exception_handlers.insert(0,
 
810
            (TestNotApplicable, self._do_not_applicable))
804
811
 
805
812
    def setUp(self):
806
 
        unittest.TestCase.setUp(self)
807
 
        self._bzr_test_setUp_run = True
 
813
        super(TestCase, self).setUp()
 
814
        for feature in getattr(self, '_test_needs_features', []):
 
815
            self.requireFeature(feature)
 
816
        self._log_contents = None
 
817
        self.addDetail("log", content.Content(content.ContentType("text",
 
818
            "plain", {"charset": "utf8"}),
 
819
            lambda:[self._get_log(keep_log_file=True)]))
808
820
        self._cleanEnvironment()
809
821
        self._silenceUI()
810
822
        self._startLogFile()
843
855
        Tests that want to use debug flags can just set them in the
844
856
        debug_flags set during setup/teardown.
845
857
        """
846
 
        self._preserved_debug_flags = set(debug.debug_flags)
 
858
        # Start with a copy of the current debug flags we can safely modify.
 
859
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
847
860
        if 'allow_debug' not in selftest_debug_flags:
848
861
            debug.debug_flags.clear()
849
862
        if 'disable_lock_checks' not in selftest_debug_flags:
850
863
            debug.debug_flags.add('strict_locks')
851
 
        self.addCleanup(self._restore_debug_flags)
852
864
 
853
865
    def _clear_hooks(self):
854
866
        # prevent hooks affecting tests
875
887
    def _silenceUI(self):
876
888
        """Turn off UI for duration of test"""
877
889
        # by default the UI is off; tests can turn it on if they want it.
878
 
        saved = ui.ui_factory
879
 
        def _restore():
880
 
            ui.ui_factory = saved
881
 
        ui.ui_factory = ui.SilentUIFactory()
882
 
        self.addCleanup(_restore)
 
890
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
883
891
 
884
892
    def _check_locks(self):
885
893
        """Check that all lock take/release actions have been paired."""
914
922
            self._lock_check_thorough = False
915
923
        else:
916
924
            self._lock_check_thorough = True
917
 
            
 
925
 
918
926
        self.addCleanup(self._check_locks)
919
927
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
920
928
                                                self._lock_acquired, None)
1013
1021
        server's urls to be used.
1014
1022
        """
1015
1023
        if backing_server is None:
1016
 
            transport_server.setUp()
 
1024
            transport_server.start_server()
1017
1025
        else:
1018
 
            transport_server.setUp(backing_server)
1019
 
        self.addCleanup(transport_server.tearDown)
 
1026
            transport_server.start_server(backing_server)
 
1027
        self.addCleanup(transport_server.stop_server)
1020
1028
        # Obtain a real transport because if the server supplies a password, it
1021
1029
        # will be hidden from the base on the client side.
1022
1030
        t = get_transport(transport_server.get_url())
1036
1044
        if t.base.endswith('/work/'):
1037
1045
            # we have safety net/test root/work
1038
1046
            t = t.clone('../..')
1039
 
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1047
        elif isinstance(transport_server,
 
1048
                        test_server.SmartTCPServer_for_testing):
1040
1049
            # The smart server adds a path similar to work, which is traversed
1041
1050
            # up from by the client. But the server is chrooted - the actual
1042
1051
            # backing transport is not escaped from, and VFS requests to the
1197
1206
            raise AssertionError('pattern "%s" found in "%s"'
1198
1207
                    % (needle_re, haystack))
1199
1208
 
 
1209
    def assertContainsString(self, haystack, needle):
 
1210
        if haystack.find(needle) == -1:
 
1211
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1212
 
1200
1213
    def assertSubset(self, sublist, superlist):
1201
1214
        """Assert that every entry in sublist is present in superlist."""
1202
1215
        missing = set(sublist) - set(superlist)
1289
1302
                m += ": " + msg
1290
1303
            self.fail(m)
1291
1304
 
1292
 
    def expectFailure(self, reason, assertion, *args, **kwargs):
1293
 
        """Invoke a test, expecting it to fail for the given reason.
1294
 
 
1295
 
        This is for assertions that ought to succeed, but currently fail.
1296
 
        (The failure is *expected* but not *wanted*.)  Please be very precise
1297
 
        about the failure you're expecting.  If a new bug is introduced,
1298
 
        AssertionError should be raised, not KnownFailure.
1299
 
 
1300
 
        Frequently, expectFailure should be followed by an opposite assertion.
1301
 
        See example below.
1302
 
 
1303
 
        Intended to be used with a callable that raises AssertionError as the
1304
 
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
1305
 
 
1306
 
        Raises KnownFailure if the test fails.  Raises AssertionError if the
1307
 
        test succeeds.
1308
 
 
1309
 
        example usage::
1310
 
 
1311
 
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
1312
 
                             dynamic_val)
1313
 
          self.assertEqual(42, dynamic_val)
1314
 
 
1315
 
          This means that a dynamic_val of 54 will cause the test to raise
1316
 
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
1317
 
          only a dynamic_val of 42 will allow the test to pass.  Anything other
1318
 
          than 54 or 42 will cause an AssertionError.
1319
 
        """
1320
 
        try:
1321
 
            assertion(*args, **kwargs)
1322
 
        except AssertionError:
1323
 
            raise KnownFailure(reason)
1324
 
        else:
1325
 
            self.fail('Unexpected success.  Should have failed: %s' % reason)
1326
 
 
1327
1305
    def assertFileEqual(self, content, path):
1328
1306
        """Fail if path does not contain 'content'."""
1329
1307
        self.failUnlessExists(path)
1479
1457
 
1480
1458
        Close the file and delete it, unless setKeepLogfile was called.
1481
1459
        """
1482
 
        if self._log_file is None:
1483
 
            return
 
1460
        if bzrlib.trace._trace_file:
 
1461
            # flush the log file, to get all content
 
1462
            bzrlib.trace._trace_file.flush()
1484
1463
        bzrlib.trace.pop_log_file(self._log_memento)
1485
 
        self._log_file.close()
1486
 
        self._log_file = None
1487
 
        if not self._keep_log_file:
1488
 
            os.remove(self._log_file_name)
1489
 
            self._log_file_name = None
1490
 
 
1491
 
    def setKeepLogfile(self):
1492
 
        """Make the logfile not be deleted when _finishLogFile is called."""
1493
 
        self._keep_log_file = True
 
1464
        # Cache the log result and delete the file on disk
 
1465
        self._get_log(False)
1494
1466
 
1495
1467
    def thisFailsStrictLockCheck(self):
1496
1468
        """It is known that this test would fail with -Dstrict_locks.
1513
1485
        """
1514
1486
        self._cleanups.append((callable, args, kwargs))
1515
1487
 
 
1488
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1489
        """Overrides an object attribute restoring it after the test.
 
1490
 
 
1491
        :param obj: The object that will be mutated.
 
1492
 
 
1493
        :param attr_name: The attribute name we want to preserve/override in
 
1494
            the object.
 
1495
 
 
1496
        :param new: The optional value we want to set the attribute to.
 
1497
 
 
1498
        :returns: The actual attr value.
 
1499
        """
 
1500
        value = getattr(obj, attr_name)
 
1501
        # The actual value is captured by the call below
 
1502
        self.addCleanup(setattr, obj, attr_name, value)
 
1503
        if new is not _unitialized_attr:
 
1504
            setattr(obj, attr_name, new)
 
1505
        return value
 
1506
 
1516
1507
    def _cleanEnvironment(self):
1517
1508
        new_env = {
1518
1509
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1528
1519
            'BZR_PROGRESS_BAR': None,
1529
1520
            'BZR_LOG': None,
1530
1521
            'BZR_PLUGIN_PATH': None,
 
1522
            'BZR_DISABLE_PLUGINS': None,
1531
1523
            'BZR_CONCURRENCY': None,
1532
1524
            # Make sure that any text ui tests are consistent regardless of
1533
1525
            # the environment the test case is run in; you may want tests that
1554
1546
            'ftp_proxy': None,
1555
1547
            'FTP_PROXY': None,
1556
1548
            'BZR_REMOTE_PATH': None,
 
1549
            # Generally speaking, we don't want apport reporting on crashes in
 
1550
            # the test envirnoment unless we're specifically testing apport,
 
1551
            # so that it doesn't leak into the real system environment.  We
 
1552
            # use an env var so it propagates to subprocesses.
 
1553
            'APPORT_DISABLE': '1',
1557
1554
        }
1558
 
        self.__old_env = {}
 
1555
        self._old_env = {}
1559
1556
        self.addCleanup(self._restoreEnvironment)
1560
1557
        for name, value in new_env.iteritems():
1561
1558
            self._captureVar(name, value)
1562
1559
 
1563
1560
    def _captureVar(self, name, newvalue):
1564
1561
        """Set an environment variable, and reset it when finished."""
1565
 
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1566
 
 
1567
 
    def _restore_debug_flags(self):
1568
 
        debug.debug_flags.clear()
1569
 
        debug.debug_flags.update(self._preserved_debug_flags)
 
1562
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1570
1563
 
1571
1564
    def _restoreEnvironment(self):
1572
 
        for name, value in self.__old_env.iteritems():
 
1565
        for name, value in self._old_env.iteritems():
1573
1566
            osutils.set_or_unset_env(name, value)
1574
1567
 
1575
1568
    def _restoreHooks(self):
1587
1580
        else:
1588
1581
            addSkip(self, reason)
1589
1582
 
1590
 
    def _do_known_failure(self, result):
 
1583
    @staticmethod
 
1584
    def _do_known_failure(self, result, e):
1591
1585
        err = sys.exc_info()
1592
1586
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1593
1587
        if addExpectedFailure is not None:
1595
1589
        else:
1596
1590
            result.addSuccess(self)
1597
1591
 
 
1592
    @staticmethod
1598
1593
    def _do_not_applicable(self, result, e):
1599
1594
        if not e.args:
1600
1595
            reason = 'No reason given'
1606
1601
        else:
1607
1602
            self._do_skip(result, reason)
1608
1603
 
1609
 
    def _do_unsupported_or_skip(self, result, reason):
 
1604
    @staticmethod
 
1605
    def _do_unsupported_or_skip(self, result, e):
 
1606
        reason = e.args[0]
1610
1607
        addNotSupported = getattr(result, 'addNotSupported', None)
1611
1608
        if addNotSupported is not None:
1612
1609
            result.addNotSupported(self, reason)
1613
1610
        else:
1614
1611
            self._do_skip(result, reason)
1615
1612
 
1616
 
    def run(self, result=None):
1617
 
        if result is None: result = self.defaultTestResult()
1618
 
        result.startTest(self)
1619
 
        try:
1620
 
            self._run(result)
1621
 
            return result
1622
 
        finally:
1623
 
            result.stopTest(self)
1624
 
 
1625
 
    def _run(self, result):
1626
 
        for feature in getattr(self, '_test_needs_features', []):
1627
 
            if not feature.available():
1628
 
                return self._do_unsupported_or_skip(result, feature)
1629
 
        try:
1630
 
            absent_attr = object()
1631
 
            # Python 2.5
1632
 
            method_name = getattr(self, '_testMethodName', absent_attr)
1633
 
            if method_name is absent_attr:
1634
 
                # Python 2.4
1635
 
                method_name = getattr(self, '_TestCase__testMethodName')
1636
 
            testMethod = getattr(self, method_name)
1637
 
            try:
1638
 
                try:
1639
 
                    self.setUp()
1640
 
                    if not self._bzr_test_setUp_run:
1641
 
                        self.fail(
1642
 
                            "test setUp did not invoke "
1643
 
                            "bzrlib.tests.TestCase's setUp")
1644
 
                except KeyboardInterrupt:
1645
 
                    self._runCleanups()
1646
 
                    raise
1647
 
                except KnownFailure:
1648
 
                    self._do_known_failure(result)
1649
 
                    self.tearDown()
1650
 
                    return
1651
 
                except TestNotApplicable, e:
1652
 
                    self._do_not_applicable(result, e)
1653
 
                    self.tearDown()
1654
 
                    return
1655
 
                except TestSkipped, e:
1656
 
                    self._do_skip(result, e.args[0])
1657
 
                    self.tearDown()
1658
 
                    return result
1659
 
                except UnavailableFeature, e:
1660
 
                    self._do_unsupported_or_skip(result, e.args[0])
1661
 
                    self.tearDown()
1662
 
                    return
1663
 
                except:
1664
 
                    result.addError(self, sys.exc_info())
1665
 
                    self._runCleanups()
1666
 
                    return result
1667
 
 
1668
 
                ok = False
1669
 
                try:
1670
 
                    testMethod()
1671
 
                    ok = True
1672
 
                except KnownFailure:
1673
 
                    self._do_known_failure(result)
1674
 
                except self.failureException:
1675
 
                    result.addFailure(self, sys.exc_info())
1676
 
                except TestNotApplicable, e:
1677
 
                    self._do_not_applicable(result, e)
1678
 
                except TestSkipped, e:
1679
 
                    if not e.args:
1680
 
                        reason = "No reason given."
1681
 
                    else:
1682
 
                        reason = e.args[0]
1683
 
                    self._do_skip(result, reason)
1684
 
                except UnavailableFeature, e:
1685
 
                    self._do_unsupported_or_skip(result, e.args[0])
1686
 
                except KeyboardInterrupt:
1687
 
                    self._runCleanups()
1688
 
                    raise
1689
 
                except:
1690
 
                    result.addError(self, sys.exc_info())
1691
 
 
1692
 
                try:
1693
 
                    self.tearDown()
1694
 
                    if not self._bzr_test_tearDown_run:
1695
 
                        self.fail(
1696
 
                            "test tearDown did not invoke "
1697
 
                            "bzrlib.tests.TestCase's tearDown")
1698
 
                except KeyboardInterrupt:
1699
 
                    self._runCleanups()
1700
 
                    raise
1701
 
                except:
1702
 
                    result.addError(self, sys.exc_info())
1703
 
                    self._runCleanups()
1704
 
                    ok = False
1705
 
                if ok: result.addSuccess(self)
1706
 
                return result
1707
 
            except KeyboardInterrupt:
1708
 
                self._runCleanups()
1709
 
                raise
1710
 
        finally:
1711
 
            saved_attrs = {}
1712
 
            for attr_name in self.attrs_to_keep:
1713
 
                if attr_name in self.__dict__:
1714
 
                    saved_attrs[attr_name] = self.__dict__[attr_name]
1715
 
            self.__dict__ = saved_attrs
1716
 
 
1717
 
    def tearDown(self):
1718
 
        self._runCleanups()
1719
 
        self._log_contents = ''
1720
 
        self._bzr_test_tearDown_run = True
1721
 
        unittest.TestCase.tearDown(self)
1722
 
 
1723
1613
    def time(self, callable, *args, **kwargs):
1724
1614
        """Run callable and accrue the time it takes to the benchmark time.
1725
1615
 
1728
1618
        self._benchcalls.
1729
1619
        """
1730
1620
        if self._benchtime is None:
 
1621
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1622
                "text", "plain"), lambda:[str(self._benchtime)]))
1731
1623
            self._benchtime = 0
1732
1624
        start = time.time()
1733
1625
        try:
1742
1634
        finally:
1743
1635
            self._benchtime += time.time() - start
1744
1636
 
1745
 
    def _runCleanups(self):
1746
 
        """Run registered cleanup functions.
1747
 
 
1748
 
        This should only be called from TestCase.tearDown.
1749
 
        """
1750
 
        # TODO: Perhaps this should keep running cleanups even if
1751
 
        # one of them fails?
1752
 
 
1753
 
        # Actually pop the cleanups from the list so tearDown running
1754
 
        # twice is safe (this happens for skipped tests).
1755
 
        while self._cleanups:
1756
 
            cleanup, args, kwargs = self._cleanups.pop()
1757
 
            cleanup(*args, **kwargs)
1758
 
 
1759
1637
    def log(self, *args):
1760
1638
        mutter(*args)
1761
1639
 
1762
1640
    def _get_log(self, keep_log_file=False):
1763
 
        """Get the log from bzrlib.trace calls from this test.
 
1641
        """Internal helper to get the log from bzrlib.trace for this test.
 
1642
 
 
1643
        Please use self.getDetails, or self.get_log to access this in test case
 
1644
        code.
1764
1645
 
1765
1646
        :param keep_log_file: When True, if the log is still a file on disk
1766
1647
            leave it as a file on disk. When False, if the log is still a file
1768
1649
            self._log_contents.
1769
1650
        :return: A string containing the log.
1770
1651
        """
1771
 
        # flush the log file, to get all content
 
1652
        if self._log_contents is not None:
 
1653
            try:
 
1654
                self._log_contents.decode('utf8')
 
1655
            except UnicodeDecodeError:
 
1656
                unicodestr = self._log_contents.decode('utf8', 'replace')
 
1657
                self._log_contents = unicodestr.encode('utf8')
 
1658
            return self._log_contents
1772
1659
        import bzrlib.trace
1773
1660
        if bzrlib.trace._trace_file:
 
1661
            # flush the log file, to get all content
1774
1662
            bzrlib.trace._trace_file.flush()
1775
 
        if self._log_contents:
1776
 
            # XXX: this can hardly contain the content flushed above --vila
1777
 
            # 20080128
1778
 
            return self._log_contents
1779
1663
        if self._log_file_name is not None:
1780
1664
            logfile = open(self._log_file_name)
1781
1665
            try:
1782
1666
                log_contents = logfile.read()
1783
1667
            finally:
1784
1668
                logfile.close()
 
1669
            try:
 
1670
                log_contents.decode('utf8')
 
1671
            except UnicodeDecodeError:
 
1672
                unicodestr = log_contents.decode('utf8', 'replace')
 
1673
                log_contents = unicodestr.encode('utf8')
1785
1674
            if not keep_log_file:
 
1675
                close_attempts = 0
 
1676
                max_close_attempts = 100
 
1677
                first_close_error = None
 
1678
                while close_attempts < max_close_attempts:
 
1679
                    close_attempts += 1
 
1680
                    try:
 
1681
                        self._log_file.close()
 
1682
                    except IOError, ioe:
 
1683
                        if ioe.errno is None:
 
1684
                            # No errno implies 'close() called during
 
1685
                            # concurrent operation on the same file object', so
 
1686
                            # retry.  Probably a thread is trying to write to
 
1687
                            # the log file.
 
1688
                            if first_close_error is None:
 
1689
                                first_close_error = ioe
 
1690
                            continue
 
1691
                        raise
 
1692
                    else:
 
1693
                        break
 
1694
                if close_attempts > 1:
 
1695
                    sys.stderr.write(
 
1696
                        'Unable to close log file on first attempt, '
 
1697
                        'will retry: %s\n' % (first_close_error,))
 
1698
                    if close_attempts == max_close_attempts:
 
1699
                        sys.stderr.write(
 
1700
                            'Unable to close log file after %d attempts.\n'
 
1701
                            % (max_close_attempts,))
 
1702
                self._log_file = None
 
1703
                # Permit multiple calls to get_log until we clean it up in
 
1704
                # finishLogFile
1786
1705
                self._log_contents = log_contents
1787
1706
                try:
1788
1707
                    os.remove(self._log_file_name)
1792
1711
                                             ' %r\n' % self._log_file_name))
1793
1712
                    else:
1794
1713
                        raise
 
1714
                self._log_file_name = None
1795
1715
            return log_contents
1796
1716
        else:
1797
 
            return "DELETED log file to reduce memory footprint"
 
1717
            return "No log file content and no log file name."
 
1718
 
 
1719
    def get_log(self):
 
1720
        """Get a unicode string containing the log from bzrlib.trace.
 
1721
 
 
1722
        Undecodable characters are replaced.
 
1723
        """
 
1724
        return u"".join(self.getDetails()['log'].iter_text())
1798
1725
 
1799
1726
    def requireFeature(self, feature):
1800
1727
        """This test requires a specific feature is available.
2163
2090
 
2164
2091
        Tests that expect to provoke LockContention errors should call this.
2165
2092
        """
2166
 
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2167
 
        def resetTimeout():
2168
 
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2169
 
        self.addCleanup(resetTimeout)
2170
 
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
2093
        self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2171
2094
 
2172
2095
    def make_utf8_encoded_stringio(self, encoding_type=None):
2173
2096
        """Return a StringIOWrapper instance, that will encode Unicode
2187
2110
        request_handlers = request.request_handlers
2188
2111
        orig_method = request_handlers.get(verb)
2189
2112
        request_handlers.remove(verb)
2190
 
        def restoreVerb():
2191
 
            request_handlers.register(verb, orig_method)
2192
 
        self.addCleanup(restoreVerb)
 
2113
        self.addCleanup(request_handlers.register, verb, orig_method)
2193
2114
 
2194
2115
 
2195
2116
class CapturedCall(object):
2286
2207
        if self.__readonly_server is None:
2287
2208
            if self.transport_readonly_server is None:
2288
2209
                # readonly decorator requested
2289
 
                self.__readonly_server = ReadonlyServer()
 
2210
                self.__readonly_server = test_server.ReadonlyServer()
2290
2211
            else:
2291
2212
                # explicit readonly transport.
2292
2213
                self.__readonly_server = self.create_transport_readonly_server()
2315
2236
        is no means to override it.
2316
2237
        """
2317
2238
        if self.__vfs_server is None:
2318
 
            self.__vfs_server = MemoryServer()
 
2239
            self.__vfs_server = memory.MemoryServer()
2319
2240
            self.start_server(self.__vfs_server)
2320
2241
        return self.__vfs_server
2321
2242
 
2478
2399
        return made_control.create_repository(shared=shared)
2479
2400
 
2480
2401
    def make_smart_server(self, path):
2481
 
        smart_server = server.SmartTCPServer_for_testing()
 
2402
        smart_server = test_server.SmartTCPServer_for_testing()
2482
2403
        self.start_server(smart_server, self.get_server())
2483
2404
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2484
2405
        return remote_transport
2502
2423
    def setUp(self):
2503
2424
        super(TestCaseWithMemoryTransport, self).setUp()
2504
2425
        self._make_test_root()
2505
 
        _currentdir = os.getcwdu()
2506
 
        def _leaveDirectory():
2507
 
            os.chdir(_currentdir)
2508
 
        self.addCleanup(_leaveDirectory)
 
2426
        self.addCleanup(os.chdir, os.getcwdu())
2509
2427
        self.makeAndChdirToTestDir()
2510
2428
        self.overrideEnvironmentForTesting()
2511
2429
        self.__readonly_server = None
2514
2432
 
2515
2433
    def setup_smart_server_with_call_log(self):
2516
2434
        """Sets up a smart server as the transport server with a call log."""
2517
 
        self.transport_server = server.SmartTCPServer_for_testing
 
2435
        self.transport_server = test_server.SmartTCPServer_for_testing
2518
2436
        self.hpss_calls = []
2519
2437
        import traceback
2520
2438
        # Skip the current stack down to the caller of
2733
2651
            # We can only make working trees locally at the moment.  If the
2734
2652
            # transport can't support them, then we keep the non-disk-backed
2735
2653
            # branch and create a local checkout.
2736
 
            if self.vfs_transport_factory is LocalURLServer:
 
2654
            if self.vfs_transport_factory is test_server.LocalURLServer:
2737
2655
                # the branch is colocated on disk, we cannot create a checkout.
2738
2656
                # hopefully callers will expect this.
2739
2657
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2799
2717
 
2800
2718
    def setUp(self):
2801
2719
        super(ChrootedTestCase, self).setUp()
2802
 
        if not self.vfs_transport_factory == MemoryServer:
 
2720
        if not self.vfs_transport_factory == memory.MemoryServer:
2803
2721
            self.transport_readonly_server = HttpServer
2804
2722
 
2805
2723
 
3219
3137
        if self.randomised:
3220
3138
            return iter(self._tests)
3221
3139
        self.randomised = True
3222
 
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3140
        self.stream.write("Randomizing test order using seed %s\n\n" %
3223
3141
            (self.actual_seed()))
3224
3142
        # Initialise the random number generator.
3225
3143
        random.seed(self.actual_seed())
3282
3200
    concurrency = osutils.local_concurrency()
3283
3201
    result = []
3284
3202
    from subunit import TestProtocolClient, ProtocolTestCase
 
3203
    from subunit.test_results import AutoTimingTestResultDecorator
3285
3204
    class TestInOtherProcess(ProtocolTestCase):
3286
3205
        # Should be in subunit, I think. RBC.
3287
3206
        def __init__(self, stream, pid):
3310
3229
                sys.stdin.close()
3311
3230
                sys.stdin = None
3312
3231
                stream = os.fdopen(c2pwrite, 'wb', 1)
3313
 
                subunit_result = BzrAutoTimingTestResultDecorator(
 
3232
                subunit_result = AutoTimingTestResultDecorator(
3314
3233
                    TestProtocolClient(stream))
3315
3234
                process_suite.run(subunit_result)
3316
3235
            finally:
3408
3327
 
3409
3328
    def addFailure(self, test, err):
3410
3329
        self.result.addFailure(test, err)
3411
 
 
3412
 
 
3413
 
class BZRTransformingResult(ForwardingResult):
3414
 
 
3415
 
    def addError(self, test, err):
3416
 
        feature = self._error_looks_like('UnavailableFeature: ', err)
3417
 
        if feature is not None:
3418
 
            self.result.addNotSupported(test, feature)
3419
 
        else:
3420
 
            self.result.addError(test, err)
3421
 
 
3422
 
    def addFailure(self, test, err):
3423
 
        known = self._error_looks_like('KnownFailure: ', err)
3424
 
        if known is not None:
3425
 
            self.result.addExpectedFailure(test,
3426
 
                [KnownFailure, KnownFailure(known), None])
3427
 
        else:
3428
 
            self.result.addFailure(test, err)
3429
 
 
3430
 
    def _error_looks_like(self, prefix, err):
3431
 
        """Deserialize exception and returns the stringify value."""
3432
 
        import subunit
3433
 
        value = None
3434
 
        typ, exc, _ = err
3435
 
        if isinstance(exc, subunit.RemoteException):
3436
 
            # stringify the exception gives access to the remote traceback
3437
 
            # We search the last line for 'prefix'
3438
 
            lines = str(exc).split('\n')
3439
 
            while lines and not lines[-1]:
3440
 
                lines.pop(-1)
3441
 
            if lines:
3442
 
                if lines[-1].startswith(prefix):
3443
 
                    value = lines[-1][len(prefix):]
3444
 
        return value
3445
 
 
3446
 
 
3447
 
try:
3448
 
    from subunit.test_results import AutoTimingTestResultDecorator
3449
 
    # Expected failure should be seen as a success not a failure Once subunit
3450
 
    # provide native support for that, BZRTransformingResult and this class
3451
 
    # will become useless.
3452
 
    class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
3453
 
 
3454
 
        def addExpectedFailure(self, test, err):
3455
 
            self._before_event()
3456
 
            return self._call_maybe("addExpectedFailure", self._degrade_skip,
3457
 
                                    test, err)
3458
 
except ImportError:
3459
 
    # Let's just define a no-op decorator
3460
 
    BzrAutoTimingTestResultDecorator = lambda x:x
 
3330
ForwardingResult = testtools.ExtendedToOriginalDecorator
3461
3331
 
3462
3332
 
3463
3333
class ProfileResult(ForwardingResult):
3724
3594
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3725
3595
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3726
3596
 
3727
 
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3597
# Obvious highest levels prefixes, feel free to add your own via a plugin
3728
3598
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3729
3599
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3730
3600
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3757
3627
        'bzrlib.tests.per_versionedfile',
3758
3628
        'bzrlib.tests.per_workingtree',
3759
3629
        'bzrlib.tests.test__annotator',
 
3630
        'bzrlib.tests.test__bencode',
3760
3631
        'bzrlib.tests.test__chk_map',
3761
3632
        'bzrlib.tests.test__dirstate_helpers',
3762
3633
        'bzrlib.tests.test__groupcompress',
3770
3641
        'bzrlib.tests.test_api',
3771
3642
        'bzrlib.tests.test_atomicfile',
3772
3643
        'bzrlib.tests.test_bad_files',
3773
 
        'bzrlib.tests.test_bencode',
3774
3644
        'bzrlib.tests.test_bisect_multi',
3775
3645
        'bzrlib.tests.test_branch',
3776
3646
        'bzrlib.tests.test_branchbuilder',
3785
3655
        'bzrlib.tests.test_chunk_writer',
3786
3656
        'bzrlib.tests.test_clean_tree',
3787
3657
        'bzrlib.tests.test_cleanup',
 
3658
        'bzrlib.tests.test_cmdline',
3788
3659
        'bzrlib.tests.test_commands',
3789
3660
        'bzrlib.tests.test_commit',
3790
3661
        'bzrlib.tests.test_commit_merge',
3824
3695
        'bzrlib.tests.test_identitymap',
3825
3696
        'bzrlib.tests.test_ignores',
3826
3697
        'bzrlib.tests.test_index',
 
3698
        'bzrlib.tests.test_import_tariff',
3827
3699
        'bzrlib.tests.test_info',
3828
3700
        'bzrlib.tests.test_inv',
3829
3701
        'bzrlib.tests.test_inventory_delta',
3926
3798
    return [
3927
3799
        'bzrlib',
3928
3800
        'bzrlib.branchbuilder',
 
3801
        'bzrlib.decorators',
3929
3802
        'bzrlib.export',
3930
3803
        'bzrlib.inventory',
3931
3804
        'bzrlib.iterablefile',
4138
4011
    return new_test
4139
4012
 
4140
4013
 
 
4014
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4015
                                ext_module_name):
 
4016
    """Helper for permutating tests against an extension module.
 
4017
 
 
4018
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4019
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4020
    against both implementations. Setting 'test.module' to the appropriate
 
4021
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
4022
 
 
4023
    :param standard_tests: A test suite to permute
 
4024
    :param loader: A TestLoader
 
4025
    :param py_module_name: The python path to a python module that can always
 
4026
        be loaded, and will be considered the 'python' implementation. (eg
 
4027
        'bzrlib._chk_map_py')
 
4028
    :param ext_module_name: The python path to an extension module. If the
 
4029
        module cannot be loaded, a single test will be added, which notes that
 
4030
        the module is not available. If it can be loaded, all standard_tests
 
4031
        will be run against that module.
 
4032
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4033
        tests. feature is the Feature object that can be used to determine if
 
4034
        the module is available.
 
4035
    """
 
4036
 
 
4037
    py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
 
4038
    scenarios = [
 
4039
        ('python', {'module': py_module}),
 
4040
    ]
 
4041
    suite = loader.suiteClass()
 
4042
    feature = ModuleAvailableFeature(ext_module_name)
 
4043
    if feature.available():
 
4044
        scenarios.append(('C', {'module': feature.module}))
 
4045
    else:
 
4046
        # the compiled module isn't available, so we add a failing test
 
4047
        class FailWithoutFeature(TestCase):
 
4048
            def test_fail(self):
 
4049
                self.requireFeature(feature)
 
4050
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4051
    result = multiply_tests(standard_tests, scenarios, suite)
 
4052
    return result, feature
 
4053
 
 
4054
 
4141
4055
def _rmtree_temp_dir(dirname, test_id=None):
4142
4056
    # If LANG=C we probably have created some bogus paths
4143
4057
    # which rmtree(unicode) will fail to delete
4248
4162
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4249
4163
 
4250
4164
 
 
4165
class _CompatabilityThunkFeature(Feature):
 
4166
    """This feature is just a thunk to another feature.
 
4167
 
 
4168
    It issues a deprecation warning if it is accessed, to let you know that you
 
4169
    should really use a different feature.
 
4170
    """
 
4171
 
 
4172
    def __init__(self, dep_version, module, name,
 
4173
                 replacement_name, replacement_module=None):
 
4174
        super(_CompatabilityThunkFeature, self).__init__()
 
4175
        self._module = module
 
4176
        if replacement_module is None:
 
4177
            replacement_module = module
 
4178
        self._replacement_module = replacement_module
 
4179
        self._name = name
 
4180
        self._replacement_name = replacement_name
 
4181
        self._dep_version = dep_version
 
4182
        self._feature = None
 
4183
 
 
4184
    def _ensure(self):
 
4185
        if self._feature is None:
 
4186
            depr_msg = self._dep_version % ('%s.%s'
 
4187
                                            % (self._module, self._name))
 
4188
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4189
                                               self._replacement_name)
 
4190
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4191
            # Import the new feature and use it as a replacement for the
 
4192
            # deprecated one.
 
4193
            mod = __import__(self._replacement_module, {}, {},
 
4194
                             [self._replacement_name])
 
4195
            self._feature = getattr(mod, self._replacement_name)
 
4196
 
 
4197
    def _probe(self):
 
4198
        self._ensure()
 
4199
        return self._feature._probe()
 
4200
 
 
4201
 
4251
4202
class ModuleAvailableFeature(Feature):
4252
4203
    """This is a feature than describes a module we want to be available.
4253
4204
 
4273
4224
        if self.available(): # Make sure the probe has been done
4274
4225
            return self._module
4275
4226
        return None
4276
 
    
 
4227
 
4277
4228
    def feature_name(self):
4278
4229
        return self.module_name
4279
4230
 
4280
4231
 
 
4232
# This is kept here for compatibility, it is recommended to use
 
4233
# 'bzrlib.tests.feature.paramiko' instead
 
4234
ParamikoFeature = _CompatabilityThunkFeature(
 
4235
    deprecated_in((2,1,0)),
 
4236
    'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
 
4237
 
4281
4238
 
4282
4239
def probe_unicode_in_user_encoding():
4283
4240
    """Try to encode several unicode strings to use in unicode-aware tests.
4333
4290
HTTPSServerFeature = _HTTPSServerFeature()
4334
4291
 
4335
4292
 
4336
 
class _ParamikoFeature(Feature):
4337
 
    """Is paramiko available?"""
4338
 
 
4339
 
    def _probe(self):
4340
 
        try:
4341
 
            from bzrlib.transport.sftp import SFTPAbsoluteServer
4342
 
            return True
4343
 
        except errors.ParamikoNotPresent:
4344
 
            return False
4345
 
 
4346
 
    def feature_name(self):
4347
 
        return "Paramiko"
4348
 
 
4349
 
 
4350
 
ParamikoFeature = _ParamikoFeature()
4351
 
 
4352
 
 
4353
4293
class _UnicodeFilename(Feature):
4354
4294
    """Does the filesystem support Unicode filenames?"""
4355
4295
 
4460
4400
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4461
4401
 
4462
4402
 
4463
 
class _SubUnitFeature(Feature):
4464
 
    """Check if subunit is available."""
 
4403
class _CaseSensitiveFilesystemFeature(Feature):
4465
4404
 
4466
4405
    def _probe(self):
4467
 
        try:
4468
 
            import subunit
 
4406
        if CaseInsCasePresFilenameFeature.available():
 
4407
            return False
 
4408
        elif CaseInsensitiveFilesystemFeature.available():
 
4409
            return False
 
4410
        else:
4469
4411
            return True
4470
 
        except ImportError:
4471
 
            return False
4472
4412
 
4473
4413
    def feature_name(self):
4474
 
        return 'subunit'
4475
 
 
4476
 
SubUnitFeature = _SubUnitFeature()
 
4414
        return 'case-sensitive filesystem'
 
4415
 
 
4416
# new coding style is for feature instances to be lowercase
 
4417
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4418
 
 
4419
 
 
4420
# Kept for compatibility, use bzrlib.tests.features.subunit instead
 
4421
SubUnitFeature = _CompatabilityThunkFeature(
 
4422
    deprecated_in((2,1,0)),
 
4423
    'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4477
4424
# Only define SubUnitBzrRunner if subunit is available.
4478
4425
try:
4479
4426
    from subunit import TestProtocolClient
 
4427
    from subunit.test_results import AutoTimingTestResultDecorator
4480
4428
    class SubUnitBzrRunner(TextTestRunner):
4481
4429
        def run(self, test):
4482
 
            result = BzrAutoTimingTestResultDecorator(
 
4430
            result = AutoTimingTestResultDecorator(
4483
4431
                TestProtocolClient(self.stream))
4484
4432
            test.run(result)
4485
4433
            return result