/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Vincent Ladeuil
  • Date: 2010-02-09 20:49:50 UTC
  • mto: (5029.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5030.
  • Revision ID: v.ladeuil+lp@free.fr-20100209204950-p86omh9xnn0w124a
selftest -s bt.test_bzrdir passing

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Testing framework extensions"""
18
17
 
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
102
101
    deprecated_passed,
103
102
    )
104
103
import bzrlib.trace
105
 
from bzrlib.transport import (
106
 
    get_transport,
107
 
    memory,
108
 
    pathfilter,
109
 
    )
 
104
from bzrlib.transport import get_transport, pathfilter
110
105
import bzrlib.transport
111
106
from bzrlib.trace import mutter, note
112
107
from bzrlib.tests import (
490
485
        return self._shortened_test_description(test)
491
486
 
492
487
    def report_error(self, test, err):
493
 
        self.stream.write('ERROR: %s\n    %s\n' % (
 
488
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
494
489
            self._test_description(test),
495
490
            err[1],
496
491
            ))
497
492
 
498
493
    def report_failure(self, test, err):
499
 
        self.stream.write('FAIL: %s\n    %s\n' % (
 
494
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
500
495
            self._test_description(test),
501
496
            err[1],
502
497
            ))
707
702
    """
708
703
 
709
704
 
 
705
class CommandFailed(Exception):
 
706
    pass
 
707
 
 
708
 
710
709
class StringIOWrapper(object):
711
710
    """A wrapper around cStringIO which just adds an encoding attribute.
712
711
 
1207
1206
            raise AssertionError('pattern "%s" found in "%s"'
1208
1207
                    % (needle_re, haystack))
1209
1208
 
1210
 
    def assertContainsString(self, haystack, needle):
1211
 
        if haystack.find(needle) == -1:
1212
 
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
1213
 
 
1214
1209
    def assertSubset(self, sublist, superlist):
1215
1210
        """Assert that every entry in sublist is present in superlist."""
1216
1211
        missing = set(sublist) - set(superlist)
1313
1308
            f.close()
1314
1309
        self.assertEqualDiff(content, s)
1315
1310
 
1316
 
    def assertDocstring(self, expected_docstring, obj):
1317
 
        """Fail if obj does not have expected_docstring"""
1318
 
        if __doc__ is None:
1319
 
            # With -OO the docstring should be None instead
1320
 
            self.assertIs(obj.__doc__, None)
1321
 
        else:
1322
 
            self.assertEqual(expected_docstring, obj.__doc__)
1323
 
 
1324
1311
    def failUnlessExists(self, path):
1325
1312
        """Fail unless path or paths, which may be abs or relative, exist."""
1326
1313
        if not isinstance(path, basestring):
1528
1515
            'BZR_PROGRESS_BAR': None,
1529
1516
            'BZR_LOG': None,
1530
1517
            'BZR_PLUGIN_PATH': None,
1531
 
            'BZR_DISABLE_PLUGINS': None,
1532
 
            'BZR_PLUGINS_AT': None,
1533
1518
            'BZR_CONCURRENCY': None,
1534
1519
            # Make sure that any text ui tests are consistent regardless of
1535
1520
            # the environment the test case is run in; you may want tests that
1562
1547
            # use an env var so it propagates to subprocesses.
1563
1548
            'APPORT_DISABLE': '1',
1564
1549
        }
1565
 
        self._old_env = {}
 
1550
        self.__old_env = {}
1566
1551
        self.addCleanup(self._restoreEnvironment)
1567
1552
        for name, value in new_env.iteritems():
1568
1553
            self._captureVar(name, value)
1569
1554
 
1570
1555
    def _captureVar(self, name, newvalue):
1571
1556
        """Set an environment variable, and reset it when finished."""
1572
 
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1557
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1573
1558
 
1574
1559
    def _restoreEnvironment(self):
1575
 
        for name, value in self._old_env.iteritems():
 
1560
        for name, value in self.__old_env.iteritems():
1576
1561
            osutils.set_or_unset_env(name, value)
1577
1562
 
1578
1563
    def _restoreHooks(self):
1682
1667
                unicodestr = log_contents.decode('utf8', 'replace')
1683
1668
                log_contents = unicodestr.encode('utf8')
1684
1669
            if not keep_log_file:
1685
 
                close_attempts = 0
1686
 
                max_close_attempts = 100
1687
 
                first_close_error = None
1688
 
                while close_attempts < max_close_attempts:
1689
 
                    close_attempts += 1
1690
 
                    try:
1691
 
                        self._log_file.close()
1692
 
                    except IOError, ioe:
1693
 
                        if ioe.errno is None:
1694
 
                            # No errno implies 'close() called during
1695
 
                            # concurrent operation on the same file object', so
1696
 
                            # retry.  Probably a thread is trying to write to
1697
 
                            # the log file.
1698
 
                            if first_close_error is None:
1699
 
                                first_close_error = ioe
1700
 
                            continue
1701
 
                        raise
1702
 
                    else:
1703
 
                        break
1704
 
                if close_attempts > 1:
1705
 
                    sys.stderr.write(
1706
 
                        'Unable to close log file on first attempt, '
1707
 
                        'will retry: %s\n' % (first_close_error,))
1708
 
                    if close_attempts == max_close_attempts:
1709
 
                        sys.stderr.write(
1710
 
                            'Unable to close log file after %d attempts.\n'
1711
 
                            % (max_close_attempts,))
 
1670
                self._log_file.close()
1712
1671
                self._log_file = None
1713
1672
                # Permit multiple calls to get_log until we clean it up in
1714
1673
                # finishLogFile
2246
2205
        is no means to override it.
2247
2206
        """
2248
2207
        if self.__vfs_server is None:
2249
 
            self.__vfs_server = memory.MemoryServer()
 
2208
            self.__vfs_server = test_server.MemoryServer()
2250
2209
            self.start_server(self.__vfs_server)
2251
2210
        return self.__vfs_server
2252
2211
 
2409
2368
        return made_control.create_repository(shared=shared)
2410
2369
 
2411
2370
    def make_smart_server(self, path):
2412
 
        smart_server = test_server.SmartTCPServer_for_testing()
 
2371
        smart_server = server.SmartTCPServer_for_testing()
2413
2372
        self.start_server(smart_server, self.get_server())
2414
2373
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2415
2374
        return remote_transport
2727
2686
 
2728
2687
    def setUp(self):
2729
2688
        super(ChrootedTestCase, self).setUp()
2730
 
        if not self.vfs_transport_factory == memory.MemoryServer:
 
2689
        if not self.vfs_transport_factory == test_server.MemoryServer:
2731
2690
            self.transport_readonly_server = HttpServer
2732
2691
 
2733
2692
 
3201
3160
    return result
3202
3161
 
3203
3162
 
3204
 
def workaround_zealous_crypto_random():
3205
 
    """Crypto.Random want to help us being secure, but we don't care here.
3206
 
 
3207
 
    This workaround some test failure related to the sftp server. Once paramiko
3208
 
    stop using the controversial API in Crypto.Random, we may get rid of it.
3209
 
    """
3210
 
    try:
3211
 
        from Crypto.Random import atfork
3212
 
        atfork()
3213
 
    except ImportError:
3214
 
        pass
3215
 
 
3216
 
 
3217
3163
def fork_for_tests(suite):
3218
3164
    """Take suite and start up one runner per CPU by forking()
3219
3165
 
3234
3180
            try:
3235
3181
                ProtocolTestCase.run(self, result)
3236
3182
            finally:
3237
 
                os.waitpid(self.pid, 0)
 
3183
                os.waitpid(self.pid, os.WNOHANG)
3238
3184
 
3239
3185
    test_blocks = partition_tests(suite, concurrency)
3240
3186
    for process_tests in test_blocks:
3243
3189
        c2pread, c2pwrite = os.pipe()
3244
3190
        pid = os.fork()
3245
3191
        if pid == 0:
3246
 
            workaround_zealous_crypto_random()
3247
3192
            try:
3248
3193
                os.close(c2pread)
3249
3194
                # Leave stderr and stdout open so we can see test noise
3634
3579
        'bzrlib.tests.commands',
3635
3580
        'bzrlib.tests.per_branch',
3636
3581
        'bzrlib.tests.per_bzrdir',
3637
 
        'bzrlib.tests.per_bzrdir_colo',
3638
3582
        'bzrlib.tests.per_foreign_vcs',
3639
3583
        'bzrlib.tests.per_interrepository',
3640
3584
        'bzrlib.tests.per_intertree',
3680
3624
        'bzrlib.tests.test_chunk_writer',
3681
3625
        'bzrlib.tests.test_clean_tree',
3682
3626
        'bzrlib.tests.test_cleanup',
3683
 
        'bzrlib.tests.test_cmdline',
3684
3627
        'bzrlib.tests.test_commands',
3685
3628
        'bzrlib.tests.test_commit',
3686
3629
        'bzrlib.tests.test_commit_merge',
3720
3663
        'bzrlib.tests.test_identitymap',
3721
3664
        'bzrlib.tests.test_ignores',
3722
3665
        'bzrlib.tests.test_index',
3723
 
        'bzrlib.tests.test_import_tariff',
3724
3666
        'bzrlib.tests.test_info',
3725
3667
        'bzrlib.tests.test_inv',
3726
3668
        'bzrlib.tests.test_inventory_delta',
3734
3676
        'bzrlib.tests.test_lru_cache',
3735
3677
        'bzrlib.tests.test_lsprof',
3736
3678
        'bzrlib.tests.test_mail_client',
3737
 
        'bzrlib.tests.test_matchers',
3738
3679
        'bzrlib.tests.test_memorytree',
3739
3680
        'bzrlib.tests.test_merge',
3740
3681
        'bzrlib.tests.test_merge3',
3820
3761
 
3821
3762
 
3822
3763
def _test_suite_modules_to_doctest():
3823
 
    """Return the list of modules to doctest."""
3824
 
    if __doc__ is None:
3825
 
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3826
 
        return []
 
3764
    """Return the list of modules to doctest."""   
3827
3765
    return [
3828
3766
        'bzrlib',
3829
3767
        'bzrlib.branchbuilder',
4429
4367
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4430
4368
 
4431
4369
 
4432
 
class _CaseSensitiveFilesystemFeature(Feature):
4433
 
 
4434
 
    def _probe(self):
4435
 
        if CaseInsCasePresFilenameFeature.available():
4436
 
            return False
4437
 
        elif CaseInsensitiveFilesystemFeature.available():
4438
 
            return False
4439
 
        else:
4440
 
            return True
4441
 
 
4442
 
    def feature_name(self):
4443
 
        return 'case-sensitive filesystem'
4444
 
 
4445
 
# new coding style is for feature instances to be lowercase
4446
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4447
 
 
4448
 
 
4449
4370
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4450
4371
SubUnitFeature = _CompatabilityThunkFeature(
4451
4372
    deprecated_in((2,1,0)),
4462
4383
            return result
4463
4384
except ImportError:
4464
4385
    pass
4465
 
 
4466
 
class _PosixPermissionsFeature(Feature):
4467
 
 
4468
 
    def _probe(self):
4469
 
        def has_perms():
4470
 
            # create temporary file and check if specified perms are maintained.
4471
 
            import tempfile
4472
 
 
4473
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4474
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4475
 
            fd, name = f
4476
 
            os.close(fd)
4477
 
            os.chmod(name, write_perms)
4478
 
 
4479
 
            read_perms = os.stat(name).st_mode & 0777
4480
 
            os.unlink(name)
4481
 
            return (write_perms == read_perms)
4482
 
 
4483
 
        return (os.name == 'posix') and has_perms()
4484
 
 
4485
 
    def feature_name(self):
4486
 
        return 'POSIX permissions support'
4487
 
 
4488
 
posix_permissions_feature = _PosixPermissionsFeature()