/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-01-07 17:02:44 UTC
  • mfrom: (4934.1.14 2.1.0rc1-set-mtime)
  • Revision ID: pqm@pqm.ubuntu.com-20100107170244-3cgdapvuokgf8l42
(jam,
        gz) (bug #488724) Set the mtime of files touched in a TreeTransform.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Testing framework extensions"""
18
17
 
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
102
101
    deprecated_passed,
103
102
    )
104
103
import bzrlib.trace
105
 
from bzrlib.transport import (
106
 
    get_transport,
107
 
    memory,
108
 
    pathfilter,
109
 
    )
 
104
from bzrlib.transport import get_transport, pathfilter
110
105
import bzrlib.transport
 
106
from bzrlib.transport.local import LocalURLServer
 
107
from bzrlib.transport.memory import MemoryServer
 
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
 
from bzrlib.tests import (
113
 
    test_server,
114
 
    TestUtil,
115
 
    )
 
110
from bzrlib.tests import TestUtil
116
111
from bzrlib.tests.http_server import HttpServer
117
112
from bzrlib.tests.TestUtil import (
118
113
                          TestSuite,
129
124
# shown frame is the test code, not our assertXYZ.
130
125
__unittest = 1
131
126
 
132
 
default_transport = test_server.LocalURLServer
133
 
 
134
 
 
135
 
_unitialized_attr = object()
136
 
"""A sentinel needed to act as a default value in a method signature."""
137
 
 
 
127
default_transport = LocalURLServer
138
128
 
139
129
# Subunit result codes, defined here to prevent a hard dependency on subunit.
140
130
SUBUNIT_SEEK_SET = 0
247
237
                '%d non-main threads were left active in the end.\n'
248
238
                % (TestCase._active_threads - 1))
249
239
 
250
 
    def getDescription(self, test):
251
 
        return test.id()
252
 
 
253
240
    def _extractBenchmarkTime(self, testCase, details=None):
254
241
        """Add a benchmark time for the current test case."""
255
242
        if details and 'benchtime' in details:
490
477
        return self._shortened_test_description(test)
491
478
 
492
479
    def report_error(self, test, err):
493
 
        self.stream.write('ERROR: %s\n    %s\n' % (
 
480
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
494
481
            self._test_description(test),
495
482
            err[1],
496
483
            ))
497
484
 
498
485
    def report_failure(self, test, err):
499
 
        self.stream.write('FAIL: %s\n    %s\n' % (
 
486
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
500
487
            self._test_description(test),
501
488
            err[1],
502
489
            ))
707
694
    """
708
695
 
709
696
 
 
697
class CommandFailed(Exception):
 
698
    pass
 
699
 
 
700
 
710
701
class StringIOWrapper(object):
711
702
    """A wrapper around cStringIO which just adds an encoding attribute.
712
703
 
856
847
        Tests that want to use debug flags can just set them in the
857
848
        debug_flags set during setup/teardown.
858
849
        """
859
 
        # Start with a copy of the current debug flags we can safely modify.
860
 
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
850
        self._preserved_debug_flags = set(debug.debug_flags)
861
851
        if 'allow_debug' not in selftest_debug_flags:
862
852
            debug.debug_flags.clear()
863
853
        if 'disable_lock_checks' not in selftest_debug_flags:
864
854
            debug.debug_flags.add('strict_locks')
 
855
        self.addCleanup(self._restore_debug_flags)
865
856
 
866
857
    def _clear_hooks(self):
867
858
        # prevent hooks affecting tests
888
879
    def _silenceUI(self):
889
880
        """Turn off UI for duration of test"""
890
881
        # by default the UI is off; tests can turn it on if they want it.
891
 
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
882
        saved = ui.ui_factory
 
883
        def _restore():
 
884
            ui.ui_factory = saved
 
885
        ui.ui_factory = ui.SilentUIFactory()
 
886
        self.addCleanup(_restore)
892
887
 
893
888
    def _check_locks(self):
894
889
        """Check that all lock take/release actions have been paired."""
923
918
            self._lock_check_thorough = False
924
919
        else:
925
920
            self._lock_check_thorough = True
926
 
 
 
921
            
927
922
        self.addCleanup(self._check_locks)
928
923
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
929
924
                                                self._lock_acquired, None)
1022
1017
        server's urls to be used.
1023
1018
        """
1024
1019
        if backing_server is None:
1025
 
            transport_server.start_server()
 
1020
            transport_server.setUp()
1026
1021
        else:
1027
 
            transport_server.start_server(backing_server)
1028
 
        self.addCleanup(transport_server.stop_server)
 
1022
            transport_server.setUp(backing_server)
 
1023
        self.addCleanup(transport_server.tearDown)
1029
1024
        # Obtain a real transport because if the server supplies a password, it
1030
1025
        # will be hidden from the base on the client side.
1031
1026
        t = get_transport(transport_server.get_url())
1045
1040
        if t.base.endswith('/work/'):
1046
1041
            # we have safety net/test root/work
1047
1042
            t = t.clone('../..')
1048
 
        elif isinstance(transport_server,
1049
 
                        test_server.SmartTCPServer_for_testing):
 
1043
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1050
1044
            # The smart server adds a path similar to work, which is traversed
1051
1045
            # up from by the client. But the server is chrooted - the actual
1052
1046
            # backing transport is not escaped from, and VFS requests to the
1207
1201
            raise AssertionError('pattern "%s" found in "%s"'
1208
1202
                    % (needle_re, haystack))
1209
1203
 
1210
 
    def assertContainsString(self, haystack, needle):
1211
 
        if haystack.find(needle) == -1:
1212
 
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
1213
 
 
1214
1204
    def assertSubset(self, sublist, superlist):
1215
1205
        """Assert that every entry in sublist is present in superlist."""
1216
1206
        missing = set(sublist) - set(superlist)
1313
1303
            f.close()
1314
1304
        self.assertEqualDiff(content, s)
1315
1305
 
1316
 
    def assertDocstring(self, expected_docstring, obj):
1317
 
        """Fail if obj does not have expected_docstring"""
1318
 
        if __doc__ is None:
1319
 
            # With -OO the docstring should be None instead
1320
 
            self.assertIs(obj.__doc__, None)
1321
 
        else:
1322
 
            self.assertEqual(expected_docstring, obj.__doc__)
1323
 
 
1324
1306
    def failUnlessExists(self, path):
1325
1307
        """Fail unless path or paths, which may be abs or relative, exist."""
1326
1308
        if not isinstance(path, basestring):
1494
1476
        """
1495
1477
        self._cleanups.append((callable, args, kwargs))
1496
1478
 
1497
 
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
 
        """Overrides an object attribute restoring it after the test.
1499
 
 
1500
 
        :param obj: The object that will be mutated.
1501
 
 
1502
 
        :param attr_name: The attribute name we want to preserve/override in
1503
 
            the object.
1504
 
 
1505
 
        :param new: The optional value we want to set the attribute to.
1506
 
 
1507
 
        :returns: The actual attr value.
1508
 
        """
1509
 
        value = getattr(obj, attr_name)
1510
 
        # The actual value is captured by the call below
1511
 
        self.addCleanup(setattr, obj, attr_name, value)
1512
 
        if new is not _unitialized_attr:
1513
 
            setattr(obj, attr_name, new)
1514
 
        return value
1515
 
 
1516
1479
    def _cleanEnvironment(self):
1517
1480
        new_env = {
1518
1481
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1528
1491
            'BZR_PROGRESS_BAR': None,
1529
1492
            'BZR_LOG': None,
1530
1493
            'BZR_PLUGIN_PATH': None,
1531
 
            'BZR_DISABLE_PLUGINS': None,
1532
 
            'BZR_PLUGINS_AT': None,
1533
1494
            'BZR_CONCURRENCY': None,
1534
1495
            # Make sure that any text ui tests are consistent regardless of
1535
1496
            # the environment the test case is run in; you may want tests that
1556
1517
            'ftp_proxy': None,
1557
1518
            'FTP_PROXY': None,
1558
1519
            'BZR_REMOTE_PATH': None,
1559
 
            # Generally speaking, we don't want apport reporting on crashes in
1560
 
            # the test envirnoment unless we're specifically testing apport,
1561
 
            # so that it doesn't leak into the real system environment.  We
1562
 
            # use an env var so it propagates to subprocesses.
1563
 
            'APPORT_DISABLE': '1',
1564
1520
        }
1565
 
        self._old_env = {}
 
1521
        self.__old_env = {}
1566
1522
        self.addCleanup(self._restoreEnvironment)
1567
1523
        for name, value in new_env.iteritems():
1568
1524
            self._captureVar(name, value)
1569
1525
 
1570
1526
    def _captureVar(self, name, newvalue):
1571
1527
        """Set an environment variable, and reset it when finished."""
1572
 
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1528
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1529
 
 
1530
    def _restore_debug_flags(self):
 
1531
        debug.debug_flags.clear()
 
1532
        debug.debug_flags.update(self._preserved_debug_flags)
1573
1533
 
1574
1534
    def _restoreEnvironment(self):
1575
 
        for name, value in self._old_env.iteritems():
 
1535
        for name, value in self.__old_env.iteritems():
1576
1536
            osutils.set_or_unset_env(name, value)
1577
1537
 
1578
1538
    def _restoreHooks(self):
1682
1642
                unicodestr = log_contents.decode('utf8', 'replace')
1683
1643
                log_contents = unicodestr.encode('utf8')
1684
1644
            if not keep_log_file:
1685
 
                close_attempts = 0
1686
 
                max_close_attempts = 100
1687
 
                first_close_error = None
1688
 
                while close_attempts < max_close_attempts:
1689
 
                    close_attempts += 1
1690
 
                    try:
1691
 
                        self._log_file.close()
1692
 
                    except IOError, ioe:
1693
 
                        if ioe.errno is None:
1694
 
                            # No errno implies 'close() called during
1695
 
                            # concurrent operation on the same file object', so
1696
 
                            # retry.  Probably a thread is trying to write to
1697
 
                            # the log file.
1698
 
                            if first_close_error is None:
1699
 
                                first_close_error = ioe
1700
 
                            continue
1701
 
                        raise
1702
 
                    else:
1703
 
                        break
1704
 
                if close_attempts > 1:
1705
 
                    sys.stderr.write(
1706
 
                        'Unable to close log file on first attempt, '
1707
 
                        'will retry: %s\n' % (first_close_error,))
1708
 
                    if close_attempts == max_close_attempts:
1709
 
                        sys.stderr.write(
1710
 
                            'Unable to close log file after %d attempts.\n'
1711
 
                            % (max_close_attempts,))
 
1645
                self._log_file.close()
1712
1646
                self._log_file = None
1713
1647
                # Permit multiple calls to get_log until we clean it up in
1714
1648
                # finishLogFile
2100
2034
 
2101
2035
        Tests that expect to provoke LockContention errors should call this.
2102
2036
        """
2103
 
        self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2037
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
2038
        def resetTimeout():
 
2039
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
2040
        self.addCleanup(resetTimeout)
 
2041
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2104
2042
 
2105
2043
    def make_utf8_encoded_stringio(self, encoding_type=None):
2106
2044
        """Return a StringIOWrapper instance, that will encode Unicode
2120
2058
        request_handlers = request.request_handlers
2121
2059
        orig_method = request_handlers.get(verb)
2122
2060
        request_handlers.remove(verb)
2123
 
        self.addCleanup(request_handlers.register, verb, orig_method)
 
2061
        def restoreVerb():
 
2062
            request_handlers.register(verb, orig_method)
 
2063
        self.addCleanup(restoreVerb)
2124
2064
 
2125
2065
 
2126
2066
class CapturedCall(object):
2217
2157
        if self.__readonly_server is None:
2218
2158
            if self.transport_readonly_server is None:
2219
2159
                # readonly decorator requested
2220
 
                self.__readonly_server = test_server.ReadonlyServer()
 
2160
                self.__readonly_server = ReadonlyServer()
2221
2161
            else:
2222
2162
                # explicit readonly transport.
2223
2163
                self.__readonly_server = self.create_transport_readonly_server()
2246
2186
        is no means to override it.
2247
2187
        """
2248
2188
        if self.__vfs_server is None:
2249
 
            self.__vfs_server = memory.MemoryServer()
 
2189
            self.__vfs_server = MemoryServer()
2250
2190
            self.start_server(self.__vfs_server)
2251
2191
        return self.__vfs_server
2252
2192
 
2409
2349
        return made_control.create_repository(shared=shared)
2410
2350
 
2411
2351
    def make_smart_server(self, path):
2412
 
        smart_server = test_server.SmartTCPServer_for_testing()
 
2352
        smart_server = server.SmartTCPServer_for_testing()
2413
2353
        self.start_server(smart_server, self.get_server())
2414
2354
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2415
2355
        return remote_transport
2433
2373
    def setUp(self):
2434
2374
        super(TestCaseWithMemoryTransport, self).setUp()
2435
2375
        self._make_test_root()
2436
 
        self.addCleanup(os.chdir, os.getcwdu())
 
2376
        _currentdir = os.getcwdu()
 
2377
        def _leaveDirectory():
 
2378
            os.chdir(_currentdir)
 
2379
        self.addCleanup(_leaveDirectory)
2437
2380
        self.makeAndChdirToTestDir()
2438
2381
        self.overrideEnvironmentForTesting()
2439
2382
        self.__readonly_server = None
2442
2385
 
2443
2386
    def setup_smart_server_with_call_log(self):
2444
2387
        """Sets up a smart server as the transport server with a call log."""
2445
 
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2388
        self.transport_server = server.SmartTCPServer_for_testing
2446
2389
        self.hpss_calls = []
2447
2390
        import traceback
2448
2391
        # Skip the current stack down to the caller of
2661
2604
            # We can only make working trees locally at the moment.  If the
2662
2605
            # transport can't support them, then we keep the non-disk-backed
2663
2606
            # branch and create a local checkout.
2664
 
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2607
            if self.vfs_transport_factory is LocalURLServer:
2665
2608
                # the branch is colocated on disk, we cannot create a checkout.
2666
2609
                # hopefully callers will expect this.
2667
2610
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2727
2670
 
2728
2671
    def setUp(self):
2729
2672
        super(ChrootedTestCase, self).setUp()
2730
 
        if not self.vfs_transport_factory == memory.MemoryServer:
 
2673
        if not self.vfs_transport_factory == MemoryServer:
2731
2674
            self.transport_readonly_server = HttpServer
2732
2675
 
2733
2676
 
3201
3144
    return result
3202
3145
 
3203
3146
 
3204
 
def workaround_zealous_crypto_random():
3205
 
    """Crypto.Random want to help us being secure, but we don't care here.
3206
 
 
3207
 
    This workaround some test failure related to the sftp server. Once paramiko
3208
 
    stop using the controversial API in Crypto.Random, we may get rid of it.
3209
 
    """
3210
 
    try:
3211
 
        from Crypto.Random import atfork
3212
 
        atfork()
3213
 
    except ImportError:
3214
 
        pass
3215
 
 
3216
 
 
3217
3147
def fork_for_tests(suite):
3218
3148
    """Take suite and start up one runner per CPU by forking()
3219
3149
 
3234
3164
            try:
3235
3165
                ProtocolTestCase.run(self, result)
3236
3166
            finally:
3237
 
                os.waitpid(self.pid, 0)
 
3167
                os.waitpid(self.pid, os.WNOHANG)
3238
3168
 
3239
3169
    test_blocks = partition_tests(suite, concurrency)
3240
3170
    for process_tests in test_blocks:
3243
3173
        c2pread, c2pwrite = os.pipe()
3244
3174
        pid = os.fork()
3245
3175
        if pid == 0:
3246
 
            workaround_zealous_crypto_random()
3247
3176
            try:
3248
3177
                os.close(c2pread)
3249
3178
                # Leave stderr and stdout open so we can see test noise
3618
3547
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3619
3548
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3620
3549
 
3621
 
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3550
# Obvious higest levels prefixes, feel free to add your own via a plugin
3622
3551
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3623
3552
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3624
3553
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3634
3563
        'bzrlib.tests.commands',
3635
3564
        'bzrlib.tests.per_branch',
3636
3565
        'bzrlib.tests.per_bzrdir',
3637
 
        'bzrlib.tests.per_bzrdir_colo',
3638
3566
        'bzrlib.tests.per_foreign_vcs',
3639
3567
        'bzrlib.tests.per_interrepository',
3640
3568
        'bzrlib.tests.per_intertree',
3680
3608
        'bzrlib.tests.test_chunk_writer',
3681
3609
        'bzrlib.tests.test_clean_tree',
3682
3610
        'bzrlib.tests.test_cleanup',
3683
 
        'bzrlib.tests.test_cmdline',
3684
3611
        'bzrlib.tests.test_commands',
3685
3612
        'bzrlib.tests.test_commit',
3686
3613
        'bzrlib.tests.test_commit_merge',
3720
3647
        'bzrlib.tests.test_identitymap',
3721
3648
        'bzrlib.tests.test_ignores',
3722
3649
        'bzrlib.tests.test_index',
3723
 
        'bzrlib.tests.test_import_tariff',
3724
3650
        'bzrlib.tests.test_info',
3725
3651
        'bzrlib.tests.test_inv',
3726
3652
        'bzrlib.tests.test_inventory_delta',
3734
3660
        'bzrlib.tests.test_lru_cache',
3735
3661
        'bzrlib.tests.test_lsprof',
3736
3662
        'bzrlib.tests.test_mail_client',
3737
 
        'bzrlib.tests.test_matchers',
3738
3663
        'bzrlib.tests.test_memorytree',
3739
3664
        'bzrlib.tests.test_merge',
3740
3665
        'bzrlib.tests.test_merge3',
3820
3745
 
3821
3746
 
3822
3747
def _test_suite_modules_to_doctest():
3823
 
    """Return the list of modules to doctest."""
3824
 
    if __doc__ is None:
3825
 
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3826
 
        return []
 
3748
    """Return the list of modules to doctest."""   
3827
3749
    return [
3828
3750
        'bzrlib',
3829
3751
        'bzrlib.branchbuilder',
3830
 
        'bzrlib.decorators',
3831
3752
        'bzrlib.export',
3832
3753
        'bzrlib.inventory',
3833
3754
        'bzrlib.iterablefile',
4198
4119
    should really use a different feature.
4199
4120
    """
4200
4121
 
4201
 
    def __init__(self, dep_version, module, name,
4202
 
                 replacement_name, replacement_module=None):
 
4122
    def __init__(self, module, name, this_name, dep_version):
4203
4123
        super(_CompatabilityThunkFeature, self).__init__()
4204
4124
        self._module = module
4205
 
        if replacement_module is None:
4206
 
            replacement_module = module
4207
 
        self._replacement_module = replacement_module
4208
4125
        self._name = name
4209
 
        self._replacement_name = replacement_name
 
4126
        self._this_name = this_name
4210
4127
        self._dep_version = dep_version
4211
4128
        self._feature = None
4212
4129
 
4213
4130
    def _ensure(self):
4214
4131
        if self._feature is None:
4215
 
            depr_msg = self._dep_version % ('%s.%s'
4216
 
                                            % (self._module, self._name))
4217
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4218
 
                                               self._replacement_name)
4219
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4220
 
            # Import the new feature and use it as a replacement for the
4221
 
            # deprecated one.
4222
 
            mod = __import__(self._replacement_module, {}, {},
4223
 
                             [self._replacement_name])
4224
 
            self._feature = getattr(mod, self._replacement_name)
 
4132
            msg = (self._dep_version % self._this_name) + (
 
4133
                   ' Use %s.%s instead.' % (self._module, self._name))
 
4134
            symbol_versioning.warn(msg, DeprecationWarning)
 
4135
            mod = __import__(self._module, {}, {}, [self._name])
 
4136
            self._feature = getattr(mod, self._name)
4225
4137
 
4226
4138
    def _probe(self):
4227
4139
        self._ensure()
4253
4165
        if self.available(): # Make sure the probe has been done
4254
4166
            return self._module
4255
4167
        return None
4256
 
 
 
4168
    
4257
4169
    def feature_name(self):
4258
4170
        return self.module_name
4259
4171
 
4260
4172
 
4261
4173
# This is kept here for compatibility, it is recommended to use
4262
4174
# 'bzrlib.tests.feature.paramiko' instead
4263
 
ParamikoFeature = _CompatabilityThunkFeature(
4264
 
    deprecated_in((2,1,0)),
4265
 
    'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
 
4175
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
 
4176
    'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4266
4177
 
4267
4178
 
4268
4179
def probe_unicode_in_user_encoding():
4429
4340
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4430
4341
 
4431
4342
 
4432
 
class _CaseSensitiveFilesystemFeature(Feature):
4433
 
 
4434
 
    def _probe(self):
4435
 
        if CaseInsCasePresFilenameFeature.available():
4436
 
            return False
4437
 
        elif CaseInsensitiveFilesystemFeature.available():
4438
 
            return False
4439
 
        else:
4440
 
            return True
4441
 
 
4442
 
    def feature_name(self):
4443
 
        return 'case-sensitive filesystem'
4444
 
 
4445
 
# new coding style is for feature instances to be lowercase
4446
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4447
 
 
4448
 
 
4449
4343
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4450
 
SubUnitFeature = _CompatabilityThunkFeature(
4451
 
    deprecated_in((2,1,0)),
4452
 
    'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
 
4344
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
 
4345
    'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4453
4346
# Only define SubUnitBzrRunner if subunit is available.
4454
4347
try:
4455
4348
    from subunit import TestProtocolClient
4462
4355
            return result
4463
4356
except ImportError:
4464
4357
    pass
4465
 
 
4466
 
class _PosixPermissionsFeature(Feature):
4467
 
 
4468
 
    def _probe(self):
4469
 
        def has_perms():
4470
 
            # create temporary file and check if specified perms are maintained.
4471
 
            import tempfile
4472
 
 
4473
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4474
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4475
 
            fd, name = f
4476
 
            os.close(fd)
4477
 
            os.chmod(name, write_perms)
4478
 
 
4479
 
            read_perms = os.stat(name).st_mode & 0777
4480
 
            os.unlink(name)
4481
 
            return (write_perms == read_perms)
4482
 
 
4483
 
        return (os.name == 'posix') and has_perms()
4484
 
 
4485
 
    def feature_name(self):
4486
 
        return 'POSIX permissions support'
4487
 
 
4488
 
posix_permissions_feature = _PosixPermissionsFeature()