1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
102
101
deprecated_passed,
104
103
import bzrlib.trace
105
from bzrlib.transport import (
104
from bzrlib.transport import get_transport, pathfilter
110
105
import bzrlib.transport
106
from bzrlib.transport.local import LocalURLServer
107
from bzrlib.transport.memory import MemoryServer
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
from bzrlib.tests import (
110
from bzrlib.tests import TestUtil
116
111
from bzrlib.tests.http_server import HttpServer
117
112
from bzrlib.tests.TestUtil import (
129
124
# shown frame is the test code, not our assertXYZ.
132
default_transport = test_server.LocalURLServer
135
_unitialized_attr = object()
136
"""A sentinel needed to act as a default value in a method signature."""
127
default_transport = LocalURLServer
139
129
# Subunit result codes, defined here to prevent a hard dependency on subunit.
140
130
SUBUNIT_SEEK_SET = 0
490
480
return self._shortened_test_description(test)
492
482
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
483
ui.ui_factory.note('ERROR: %s\n %s\n' % (
494
484
self._test_description(test),
498
488
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
489
ui.ui_factory.note('FAIL: %s\n %s\n' % (
500
490
self._test_description(test),
856
850
Tests that want to use debug flags can just set them in the
857
851
debug_flags set during setup/teardown.
859
# Start with a copy of the current debug flags we can safely modify.
860
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
853
self._preserved_debug_flags = set(debug.debug_flags)
861
854
if 'allow_debug' not in selftest_debug_flags:
862
855
debug.debug_flags.clear()
863
856
if 'disable_lock_checks' not in selftest_debug_flags:
864
857
debug.debug_flags.add('strict_locks')
858
self.addCleanup(self._restore_debug_flags)
866
860
def _clear_hooks(self):
867
861
# prevent hooks affecting tests
888
882
def _silenceUI(self):
889
883
"""Turn off UI for duration of test"""
890
884
# by default the UI is off; tests can turn it on if they want it.
891
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
885
saved = ui.ui_factory
887
ui.ui_factory = saved
888
ui.ui_factory = ui.SilentUIFactory()
889
self.addCleanup(_restore)
893
891
def _check_locks(self):
894
892
"""Check that all lock take/release actions have been paired."""
923
921
self._lock_check_thorough = False
925
923
self._lock_check_thorough = True
927
925
self.addCleanup(self._check_locks)
928
926
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
929
927
self._lock_acquired, None)
1045
1043
if t.base.endswith('/work/'):
1046
1044
# we have safety net/test root/work
1047
1045
t = t.clone('../..')
1048
elif isinstance(transport_server,
1049
test_server.SmartTCPServer_for_testing):
1046
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1050
1047
# The smart server adds a path similar to work, which is traversed
1051
1048
# up from by the client. But the server is chrooted - the actual
1052
1049
# backing transport is not escaped from, and VFS requests to the
1207
1204
raise AssertionError('pattern "%s" found in "%s"'
1208
1205
% (needle_re, haystack))
1210
def assertContainsString(self, haystack, needle):
1211
if haystack.find(needle) == -1:
1212
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1214
1207
def assertSubset(self, sublist, superlist):
1215
1208
"""Assert that every entry in sublist is present in superlist."""
1216
1209
missing = set(sublist) - set(superlist)
1314
1307
self.assertEqualDiff(content, s)
1316
def assertDocstring(self, expected_docstring, obj):
1317
"""Fail if obj does not have expected_docstring"""
1319
# With -OO the docstring should be None instead
1320
self.assertIs(obj.__doc__, None)
1322
self.assertEqual(expected_docstring, obj.__doc__)
1324
1309
def failUnlessExists(self, path):
1325
1310
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1311
if not isinstance(path, basestring):
1495
1480
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
1516
1482
def _cleanEnvironment(self):
1518
1484
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1528
1494
'BZR_PROGRESS_BAR': None,
1529
1495
'BZR_LOG': None,
1530
1496
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
1497
'BZR_CONCURRENCY': None,
1534
1498
# Make sure that any text ui tests are consistent regardless of
1535
1499
# the environment the test case is run in; you may want tests that
1556
1520
'ftp_proxy': None,
1557
1521
'FTP_PROXY': None,
1558
1522
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
1525
self.addCleanup(self._restoreEnvironment)
1567
1526
for name, value in new_env.iteritems():
1568
1527
self._captureVar(name, value)
1570
1529
def _captureVar(self, name, newvalue):
1571
1530
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1531
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1533
def _restore_debug_flags(self):
1534
debug.debug_flags.clear()
1535
debug.debug_flags.update(self._preserved_debug_flags)
1574
1537
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1538
for name, value in self.__old_env.iteritems():
1576
1539
osutils.set_or_unset_env(name, value)
1578
1541
def _restoreHooks(self):
1682
1645
unicodestr = log_contents.decode('utf8', 'replace')
1683
1646
log_contents = unicodestr.encode('utf8')
1684
1647
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1648
self._log_file.close()
1712
1649
self._log_file = None
1713
1650
# Permit multiple calls to get_log until we clean it up in
1714
1651
# finishLogFile
2101
2038
Tests that expect to provoke LockContention errors should call this.
2103
self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2040
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2042
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2043
self.addCleanup(resetTimeout)
2044
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2105
2046
def make_utf8_encoded_stringio(self, encoding_type=None):
2106
2047
"""Return a StringIOWrapper instance, that will encode Unicode
2120
2061
request_handlers = request.request_handlers
2121
2062
orig_method = request_handlers.get(verb)
2122
2063
request_handlers.remove(verb)
2123
self.addCleanup(request_handlers.register, verb, orig_method)
2065
request_handlers.register(verb, orig_method)
2066
self.addCleanup(restoreVerb)
2126
2069
class CapturedCall(object):
2217
2160
if self.__readonly_server is None:
2218
2161
if self.transport_readonly_server is None:
2219
2162
# readonly decorator requested
2220
self.__readonly_server = test_server.ReadonlyServer()
2163
self.__readonly_server = ReadonlyServer()
2222
2165
# explicit readonly transport.
2223
2166
self.__readonly_server = self.create_transport_readonly_server()
2246
2189
is no means to override it.
2248
2191
if self.__vfs_server is None:
2249
self.__vfs_server = memory.MemoryServer()
2192
self.__vfs_server = MemoryServer()
2250
2193
self.start_server(self.__vfs_server)
2251
2194
return self.__vfs_server
2409
2352
return made_control.create_repository(shared=shared)
2411
2354
def make_smart_server(self, path):
2412
smart_server = test_server.SmartTCPServer_for_testing()
2355
smart_server = server.SmartTCPServer_for_testing()
2413
2356
self.start_server(smart_server, self.get_server())
2414
2357
remote_transport = get_transport(smart_server.get_url()).clone(path)
2415
2358
return remote_transport
2433
2376
def setUp(self):
2434
2377
super(TestCaseWithMemoryTransport, self).setUp()
2435
2378
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2379
_currentdir = os.getcwdu()
2380
def _leaveDirectory():
2381
os.chdir(_currentdir)
2382
self.addCleanup(_leaveDirectory)
2437
2383
self.makeAndChdirToTestDir()
2438
2384
self.overrideEnvironmentForTesting()
2439
2385
self.__readonly_server = None
2443
2389
def setup_smart_server_with_call_log(self):
2444
2390
"""Sets up a smart server as the transport server with a call log."""
2445
self.transport_server = test_server.SmartTCPServer_for_testing
2391
self.transport_server = server.SmartTCPServer_for_testing
2446
2392
self.hpss_calls = []
2447
2393
import traceback
2448
2394
# Skip the current stack down to the caller of
2661
2607
# We can only make working trees locally at the moment. If the
2662
2608
# transport can't support them, then we keep the non-disk-backed
2663
2609
# branch and create a local checkout.
2664
if self.vfs_transport_factory is test_server.LocalURLServer:
2610
if self.vfs_transport_factory is LocalURLServer:
2665
2611
# the branch is colocated on disk, we cannot create a checkout.
2666
2612
# hopefully callers will expect this.
2667
2613
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2728
2674
def setUp(self):
2729
2675
super(ChrootedTestCase, self).setUp()
2730
if not self.vfs_transport_factory == memory.MemoryServer:
2676
if not self.vfs_transport_factory == MemoryServer:
2731
2677
self.transport_readonly_server = HttpServer
3204
def workaround_zealous_crypto_random():
3205
"""Crypto.Random want to help us being secure, but we don't care here.
3207
This workaround some test failure related to the sftp server. Once paramiko
3208
stop using the controversial API in Crypto.Random, we may get rid of it.
3211
from Crypto.Random import atfork
3217
3150
def fork_for_tests(suite):
3218
3151
"""Take suite and start up one runner per CPU by forking()
3235
3168
ProtocolTestCase.run(self, result)
3237
os.waitpid(self.pid, 0)
3170
os.waitpid(self.pid, os.WNOHANG)
3239
3172
test_blocks = partition_tests(suite, concurrency)
3240
3173
for process_tests in test_blocks:
3618
3550
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3619
3551
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3621
# Obvious highest levels prefixes, feel free to add your own via a plugin
3553
# Obvious higest levels prefixes, feel free to add your own via a plugin
3622
3554
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3623
3555
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3624
3556
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3634
3566
'bzrlib.tests.commands',
3635
3567
'bzrlib.tests.per_branch',
3636
3568
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3638
3569
'bzrlib.tests.per_foreign_vcs',
3639
3570
'bzrlib.tests.per_interrepository',
3640
3571
'bzrlib.tests.per_intertree',
3680
3611
'bzrlib.tests.test_chunk_writer',
3681
3612
'bzrlib.tests.test_clean_tree',
3682
3613
'bzrlib.tests.test_cleanup',
3683
'bzrlib.tests.test_cmdline',
3684
3614
'bzrlib.tests.test_commands',
3685
3615
'bzrlib.tests.test_commit',
3686
3616
'bzrlib.tests.test_commit_merge',
3720
3650
'bzrlib.tests.test_identitymap',
3721
3651
'bzrlib.tests.test_ignores',
3722
3652
'bzrlib.tests.test_index',
3723
'bzrlib.tests.test_import_tariff',
3724
3653
'bzrlib.tests.test_info',
3725
3654
'bzrlib.tests.test_inv',
3726
3655
'bzrlib.tests.test_inventory_delta',
3734
3663
'bzrlib.tests.test_lru_cache',
3735
3664
'bzrlib.tests.test_lsprof',
3736
3665
'bzrlib.tests.test_mail_client',
3737
'bzrlib.tests.test_matchers',
3738
3666
'bzrlib.tests.test_memorytree',
3739
3667
'bzrlib.tests.test_merge',
3740
3668
'bzrlib.tests.test_merge3',
4198
4123
should really use a different feature.
4201
def __init__(self, dep_version, module, name,
4202
replacement_name, replacement_module=None):
4126
def __init__(self, module, name, this_name, dep_version):
4203
4127
super(_CompatabilityThunkFeature, self).__init__()
4204
4128
self._module = module
4205
if replacement_module is None:
4206
replacement_module = module
4207
self._replacement_module = replacement_module
4208
4129
self._name = name
4209
self._replacement_name = replacement_name
4130
self._this_name = this_name
4210
4131
self._dep_version = dep_version
4211
4132
self._feature = None
4213
4134
def _ensure(self):
4214
4135
if self._feature is None:
4215
depr_msg = self._dep_version % ('%s.%s'
4216
% (self._module, self._name))
4217
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4218
self._replacement_name)
4219
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4220
# Import the new feature and use it as a replacement for the
4222
mod = __import__(self._replacement_module, {}, {},
4223
[self._replacement_name])
4224
self._feature = getattr(mod, self._replacement_name)
4136
msg = (self._dep_version % self._this_name) + (
4137
' Use %s.%s instead.' % (self._module, self._name))
4138
symbol_versioning.warn(msg, DeprecationWarning)
4139
mod = __import__(self._module, {}, {}, [self._name])
4140
self._feature = getattr(mod, self._name)
4226
4142
def _probe(self):
4253
4169
if self.available(): # Make sure the probe has been done
4254
4170
return self._module
4257
4173
def feature_name(self):
4258
4174
return self.module_name
4261
4177
# This is kept here for compatibility, it is recommended to use
4262
4178
# 'bzrlib.tests.feature.paramiko' instead
4263
ParamikoFeature = _CompatabilityThunkFeature(
4264
deprecated_in((2,1,0)),
4265
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4179
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
4180
'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4268
4183
def probe_unicode_in_user_encoding():
4429
4344
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4432
class _CaseSensitiveFilesystemFeature(Feature):
4435
if CaseInsCasePresFilenameFeature.available():
4437
elif CaseInsensitiveFilesystemFeature.available():
4442
def feature_name(self):
4443
return 'case-sensitive filesystem'
4445
# new coding style is for feature instances to be lowercase
4446
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4449
4347
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4450
SubUnitFeature = _CompatabilityThunkFeature(
4451
deprecated_in((2,1,0)),
4452
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4348
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4349
'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4453
4350
# Only define SubUnitBzrRunner if subunit is available.
4455
4352
from subunit import TestProtocolClient
4463
4360
except ImportError:
4466
class _PosixPermissionsFeature(Feature):
4470
# create temporary file and check if specified perms are maintained.
4473
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4474
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4477
os.chmod(name, write_perms)
4479
read_perms = os.stat(name).st_mode & 0777
4481
return (write_perms == read_perms)
4483
return (os.name == 'posix') and has_perms()
4485
def feature_name(self):
4486
return 'POSIX permissions support'
4488
posix_permissions_feature = _PosixPermissionsFeature()