/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Vincent Ladeuil
  • Date: 2010-01-25 17:48:22 UTC
  • mto: (4987.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4988.
  • Revision ID: v.ladeuil+lp@free.fr-20100125174822-nce4l19sbwx83jvq
Deploying the new overrideAttr facility further reduces the complexity
and make the code clearer.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Testing framework extensions"""
18
17
 
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
102
101
    deprecated_passed,
103
102
    )
104
103
import bzrlib.trace
105
 
from bzrlib.transport import (
106
 
    get_transport,
107
 
    memory,
108
 
    pathfilter,
109
 
    )
 
104
from bzrlib.transport import get_transport, pathfilter
110
105
import bzrlib.transport
 
106
from bzrlib.transport.local import LocalURLServer
 
107
from bzrlib.transport.memory import MemoryServer
 
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
 
from bzrlib.tests import (
113
 
    test_server,
114
 
    TestUtil,
115
 
    )
 
110
from bzrlib.tests import TestUtil
116
111
from bzrlib.tests.http_server import HttpServer
117
112
from bzrlib.tests.TestUtil import (
118
113
                          TestSuite,
129
124
# shown frame is the test code, not our assertXYZ.
130
125
__unittest = 1
131
126
 
132
 
default_transport = test_server.LocalURLServer
 
127
default_transport = LocalURLServer
133
128
 
134
129
 
135
130
_unitialized_attr = object()
490
485
        return self._shortened_test_description(test)
491
486
 
492
487
    def report_error(self, test, err):
493
 
        self.stream.write('ERROR: %s\n    %s\n' % (
 
488
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
494
489
            self._test_description(test),
495
490
            err[1],
496
491
            ))
497
492
 
498
493
    def report_failure(self, test, err):
499
 
        self.stream.write('FAIL: %s\n    %s\n' % (
 
494
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
500
495
            self._test_description(test),
501
496
            err[1],
502
497
            ))
707
702
    """
708
703
 
709
704
 
 
705
class CommandFailed(Exception):
 
706
    pass
 
707
 
 
708
 
710
709
class StringIOWrapper(object):
711
710
    """A wrapper around cStringIO which just adds an encoding attribute.
712
711
 
1045
1044
        if t.base.endswith('/work/'):
1046
1045
            # we have safety net/test root/work
1047
1046
            t = t.clone('../..')
1048
 
        elif isinstance(transport_server,
1049
 
                        test_server.SmartTCPServer_for_testing):
 
1047
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1050
1048
            # The smart server adds a path similar to work, which is traversed
1051
1049
            # up from by the client. But the server is chrooted - the actual
1052
1050
            # backing transport is not escaped from, and VFS requests to the
1207
1205
            raise AssertionError('pattern "%s" found in "%s"'
1208
1206
                    % (needle_re, haystack))
1209
1207
 
1210
 
    def assertContainsString(self, haystack, needle):
1211
 
        if haystack.find(needle) == -1:
1212
 
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
1213
 
 
1214
1208
    def assertSubset(self, sublist, superlist):
1215
1209
        """Assert that every entry in sublist is present in superlist."""
1216
1210
        missing = set(sublist) - set(superlist)
1313
1307
            f.close()
1314
1308
        self.assertEqualDiff(content, s)
1315
1309
 
1316
 
    def assertDocstring(self, expected_docstring, obj):
1317
 
        """Fail if obj does not have expected_docstring"""
1318
 
        if __doc__ is None:
1319
 
            # With -OO the docstring should be None instead
1320
 
            self.assertIs(obj.__doc__, None)
1321
 
        else:
1322
 
            self.assertEqual(expected_docstring, obj.__doc__)
1323
 
 
1324
1310
    def failUnlessExists(self, path):
1325
1311
        """Fail unless path or paths, which may be abs or relative, exist."""
1326
1312
        if not isinstance(path, basestring):
1528
1514
            'BZR_PROGRESS_BAR': None,
1529
1515
            'BZR_LOG': None,
1530
1516
            'BZR_PLUGIN_PATH': None,
1531
 
            'BZR_DISABLE_PLUGINS': None,
1532
 
            'BZR_PLUGINS_AT': None,
1533
1517
            'BZR_CONCURRENCY': None,
1534
1518
            # Make sure that any text ui tests are consistent regardless of
1535
1519
            # the environment the test case is run in; you may want tests that
1556
1540
            'ftp_proxy': None,
1557
1541
            'FTP_PROXY': None,
1558
1542
            'BZR_REMOTE_PATH': None,
1559
 
            # Generally speaking, we don't want apport reporting on crashes in
1560
 
            # the test envirnoment unless we're specifically testing apport,
1561
 
            # so that it doesn't leak into the real system environment.  We
1562
 
            # use an env var so it propagates to subprocesses.
1563
 
            'APPORT_DISABLE': '1',
1564
1543
        }
1565
 
        self._old_env = {}
 
1544
        self.__old_env = {}
1566
1545
        self.addCleanup(self._restoreEnvironment)
1567
1546
        for name, value in new_env.iteritems():
1568
1547
            self._captureVar(name, value)
1569
1548
 
1570
1549
    def _captureVar(self, name, newvalue):
1571
1550
        """Set an environment variable, and reset it when finished."""
1572
 
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1551
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1573
1552
 
1574
1553
    def _restoreEnvironment(self):
1575
 
        for name, value in self._old_env.iteritems():
 
1554
        for name, value in self.__old_env.iteritems():
1576
1555
            osutils.set_or_unset_env(name, value)
1577
1556
 
1578
1557
    def _restoreHooks(self):
1682
1661
                unicodestr = log_contents.decode('utf8', 'replace')
1683
1662
                log_contents = unicodestr.encode('utf8')
1684
1663
            if not keep_log_file:
1685
 
                close_attempts = 0
1686
 
                max_close_attempts = 100
1687
 
                first_close_error = None
1688
 
                while close_attempts < max_close_attempts:
1689
 
                    close_attempts += 1
1690
 
                    try:
1691
 
                        self._log_file.close()
1692
 
                    except IOError, ioe:
1693
 
                        if ioe.errno is None:
1694
 
                            # No errno implies 'close() called during
1695
 
                            # concurrent operation on the same file object', so
1696
 
                            # retry.  Probably a thread is trying to write to
1697
 
                            # the log file.
1698
 
                            if first_close_error is None:
1699
 
                                first_close_error = ioe
1700
 
                            continue
1701
 
                        raise
1702
 
                    else:
1703
 
                        break
1704
 
                if close_attempts > 1:
1705
 
                    sys.stderr.write(
1706
 
                        'Unable to close log file on first attempt, '
1707
 
                        'will retry: %s\n' % (first_close_error,))
1708
 
                    if close_attempts == max_close_attempts:
1709
 
                        sys.stderr.write(
1710
 
                            'Unable to close log file after %d attempts.\n'
1711
 
                            % (max_close_attempts,))
 
1664
                self._log_file.close()
1712
1665
                self._log_file = None
1713
1666
                # Permit multiple calls to get_log until we clean it up in
1714
1667
                # finishLogFile
2217
2170
        if self.__readonly_server is None:
2218
2171
            if self.transport_readonly_server is None:
2219
2172
                # readonly decorator requested
2220
 
                self.__readonly_server = test_server.ReadonlyServer()
 
2173
                self.__readonly_server = ReadonlyServer()
2221
2174
            else:
2222
2175
                # explicit readonly transport.
2223
2176
                self.__readonly_server = self.create_transport_readonly_server()
2246
2199
        is no means to override it.
2247
2200
        """
2248
2201
        if self.__vfs_server is None:
2249
 
            self.__vfs_server = memory.MemoryServer()
 
2202
            self.__vfs_server = MemoryServer()
2250
2203
            self.start_server(self.__vfs_server)
2251
2204
        return self.__vfs_server
2252
2205
 
2409
2362
        return made_control.create_repository(shared=shared)
2410
2363
 
2411
2364
    def make_smart_server(self, path):
2412
 
        smart_server = test_server.SmartTCPServer_for_testing()
 
2365
        smart_server = server.SmartTCPServer_for_testing()
2413
2366
        self.start_server(smart_server, self.get_server())
2414
2367
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2415
2368
        return remote_transport
2442
2395
 
2443
2396
    def setup_smart_server_with_call_log(self):
2444
2397
        """Sets up a smart server as the transport server with a call log."""
2445
 
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2398
        self.transport_server = server.SmartTCPServer_for_testing
2446
2399
        self.hpss_calls = []
2447
2400
        import traceback
2448
2401
        # Skip the current stack down to the caller of
2661
2614
            # We can only make working trees locally at the moment.  If the
2662
2615
            # transport can't support them, then we keep the non-disk-backed
2663
2616
            # branch and create a local checkout.
2664
 
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2617
            if self.vfs_transport_factory is LocalURLServer:
2665
2618
                # the branch is colocated on disk, we cannot create a checkout.
2666
2619
                # hopefully callers will expect this.
2667
2620
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2727
2680
 
2728
2681
    def setUp(self):
2729
2682
        super(ChrootedTestCase, self).setUp()
2730
 
        if not self.vfs_transport_factory == memory.MemoryServer:
 
2683
        if not self.vfs_transport_factory == MemoryServer:
2731
2684
            self.transport_readonly_server = HttpServer
2732
2685
 
2733
2686
 
3201
3154
    return result
3202
3155
 
3203
3156
 
3204
 
def workaround_zealous_crypto_random():
3205
 
    """Crypto.Random want to help us being secure, but we don't care here.
3206
 
 
3207
 
    This workaround some test failure related to the sftp server. Once paramiko
3208
 
    stop using the controversial API in Crypto.Random, we may get rid of it.
3209
 
    """
3210
 
    try:
3211
 
        from Crypto.Random import atfork
3212
 
        atfork()
3213
 
    except ImportError:
3214
 
        pass
3215
 
 
3216
 
 
3217
3157
def fork_for_tests(suite):
3218
3158
    """Take suite and start up one runner per CPU by forking()
3219
3159
 
3234
3174
            try:
3235
3175
                ProtocolTestCase.run(self, result)
3236
3176
            finally:
3237
 
                os.waitpid(self.pid, 0)
 
3177
                os.waitpid(self.pid, os.WNOHANG)
3238
3178
 
3239
3179
    test_blocks = partition_tests(suite, concurrency)
3240
3180
    for process_tests in test_blocks:
3243
3183
        c2pread, c2pwrite = os.pipe()
3244
3184
        pid = os.fork()
3245
3185
        if pid == 0:
3246
 
            workaround_zealous_crypto_random()
3247
3186
            try:
3248
3187
                os.close(c2pread)
3249
3188
                # Leave stderr and stdout open so we can see test noise
3618
3557
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3619
3558
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3620
3559
 
3621
 
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3560
# Obvious higest levels prefixes, feel free to add your own via a plugin
3622
3561
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3623
3562
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3624
3563
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3634
3573
        'bzrlib.tests.commands',
3635
3574
        'bzrlib.tests.per_branch',
3636
3575
        'bzrlib.tests.per_bzrdir',
3637
 
        'bzrlib.tests.per_bzrdir_colo',
3638
3576
        'bzrlib.tests.per_foreign_vcs',
3639
3577
        'bzrlib.tests.per_interrepository',
3640
3578
        'bzrlib.tests.per_intertree',
3680
3618
        'bzrlib.tests.test_chunk_writer',
3681
3619
        'bzrlib.tests.test_clean_tree',
3682
3620
        'bzrlib.tests.test_cleanup',
3683
 
        'bzrlib.tests.test_cmdline',
3684
3621
        'bzrlib.tests.test_commands',
3685
3622
        'bzrlib.tests.test_commit',
3686
3623
        'bzrlib.tests.test_commit_merge',
3720
3657
        'bzrlib.tests.test_identitymap',
3721
3658
        'bzrlib.tests.test_ignores',
3722
3659
        'bzrlib.tests.test_index',
3723
 
        'bzrlib.tests.test_import_tariff',
3724
3660
        'bzrlib.tests.test_info',
3725
3661
        'bzrlib.tests.test_inv',
3726
3662
        'bzrlib.tests.test_inventory_delta',
3819
3755
 
3820
3756
 
3821
3757
def _test_suite_modules_to_doctest():
3822
 
    """Return the list of modules to doctest."""
3823
 
    if __doc__ is None:
3824
 
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3825
 
        return []
 
3758
    """Return the list of modules to doctest."""   
3826
3759
    return [
3827
3760
        'bzrlib',
3828
3761
        'bzrlib.branchbuilder',
4197
4130
    should really use a different feature.
4198
4131
    """
4199
4132
 
4200
 
    def __init__(self, dep_version, module, name,
4201
 
                 replacement_name, replacement_module=None):
 
4133
    def __init__(self, module, name, this_name, dep_version):
4202
4134
        super(_CompatabilityThunkFeature, self).__init__()
4203
4135
        self._module = module
4204
 
        if replacement_module is None:
4205
 
            replacement_module = module
4206
 
        self._replacement_module = replacement_module
4207
4136
        self._name = name
4208
 
        self._replacement_name = replacement_name
 
4137
        self._this_name = this_name
4209
4138
        self._dep_version = dep_version
4210
4139
        self._feature = None
4211
4140
 
4212
4141
    def _ensure(self):
4213
4142
        if self._feature is None:
4214
 
            depr_msg = self._dep_version % ('%s.%s'
4215
 
                                            % (self._module, self._name))
4216
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4217
 
                                               self._replacement_name)
4218
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4219
 
            # Import the new feature and use it as a replacement for the
4220
 
            # deprecated one.
4221
 
            mod = __import__(self._replacement_module, {}, {},
4222
 
                             [self._replacement_name])
4223
 
            self._feature = getattr(mod, self._replacement_name)
 
4143
            msg = (self._dep_version % self._this_name) + (
 
4144
                   ' Use %s.%s instead.' % (self._module, self._name))
 
4145
            symbol_versioning.warn(msg, DeprecationWarning)
 
4146
            mod = __import__(self._module, {}, {}, [self._name])
 
4147
            self._feature = getattr(mod, self._name)
4224
4148
 
4225
4149
    def _probe(self):
4226
4150
        self._ensure()
4252
4176
        if self.available(): # Make sure the probe has been done
4253
4177
            return self._module
4254
4178
        return None
4255
 
 
 
4179
    
4256
4180
    def feature_name(self):
4257
4181
        return self.module_name
4258
4182
 
4259
4183
 
4260
4184
# This is kept here for compatibility, it is recommended to use
4261
4185
# 'bzrlib.tests.feature.paramiko' instead
4262
 
ParamikoFeature = _CompatabilityThunkFeature(
4263
 
    deprecated_in((2,1,0)),
4264
 
    'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
 
4186
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
 
4187
    'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4265
4188
 
4266
4189
 
4267
4190
def probe_unicode_in_user_encoding():
4428
4351
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4429
4352
 
4430
4353
 
4431
 
class _CaseSensitiveFilesystemFeature(Feature):
4432
 
 
4433
 
    def _probe(self):
4434
 
        if CaseInsCasePresFilenameFeature.available():
4435
 
            return False
4436
 
        elif CaseInsensitiveFilesystemFeature.available():
4437
 
            return False
4438
 
        else:
4439
 
            return True
4440
 
 
4441
 
    def feature_name(self):
4442
 
        return 'case-sensitive filesystem'
4443
 
 
4444
 
# new coding style is for feature instances to be lowercase
4445
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4446
 
 
4447
 
 
4448
4354
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4449
 
SubUnitFeature = _CompatabilityThunkFeature(
4450
 
    deprecated_in((2,1,0)),
4451
 
    'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
 
4355
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
 
4356
    'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4452
4357
# Only define SubUnitBzrRunner if subunit is available.
4453
4358
try:
4454
4359
    from subunit import TestProtocolClient
4461
4366
            return result
4462
4367
except ImportError:
4463
4368
    pass
4464
 
 
4465
 
class _PosixPermissionsFeature(Feature):
4466
 
 
4467
 
    def _probe(self):
4468
 
        def has_perms():
4469
 
            # create temporary file and check if specified perms are maintained.
4470
 
            import tempfile
4471
 
 
4472
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4473
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4474
 
            fd, name = f
4475
 
            os.close(fd)
4476
 
            os.chmod(name, write_perms)
4477
 
 
4478
 
            read_perms = os.stat(name).st_mode & 0777
4479
 
            os.unlink(name)
4480
 
            return (write_perms == read_perms)
4481
 
 
4482
 
        return (os.name == 'posix') and has_perms()
4483
 
 
4484
 
    def feature_name(self):
4485
 
        return 'POSIX permissions support'
4486
 
 
4487
 
posix_permissions_feature = _PosixPermissionsFeature()