1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
102
101
deprecated_passed,
104
103
import bzrlib.trace
105
from bzrlib.transport import (
104
from bzrlib.transport import get_transport, pathfilter
110
105
import bzrlib.transport
106
from bzrlib.transport.local import LocalURLServer
107
from bzrlib.transport.memory import MemoryServer
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
from bzrlib.tests import (
110
from bzrlib.tests import TestUtil
116
111
from bzrlib.tests.http_server import HttpServer
117
112
from bzrlib.tests.TestUtil import (
129
124
# shown frame is the test code, not our assertXYZ.
132
default_transport = test_server.LocalURLServer
135
_unitialized_attr = object()
136
"""A sentinel needed to act as a default value in a method signature."""
127
default_transport = LocalURLServer
139
129
# Subunit result codes, defined here to prevent a hard dependency on subunit.
140
130
SUBUNIT_SEEK_SET = 0
247
237
'%d non-main threads were left active in the end.\n'
248
238
% (TestCase._active_threads - 1))
250
def getDescription(self, test):
253
240
def _extractBenchmarkTime(self, testCase, details=None):
254
241
"""Add a benchmark time for the current test case."""
255
242
if details and 'benchtime' in details:
490
477
return self._shortened_test_description(test)
492
479
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
480
ui.ui_factory.note('ERROR: %s\n %s\n' % (
494
481
self._test_description(test),
498
485
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
486
ui.ui_factory.note('FAIL: %s\n %s\n' % (
500
487
self._test_description(test),
856
847
Tests that want to use debug flags can just set them in the
857
848
debug_flags set during setup/teardown.
859
# Start with a copy of the current debug flags we can safely modify.
860
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
850
self._preserved_debug_flags = set(debug.debug_flags)
861
851
if 'allow_debug' not in selftest_debug_flags:
862
852
debug.debug_flags.clear()
863
853
if 'disable_lock_checks' not in selftest_debug_flags:
864
854
debug.debug_flags.add('strict_locks')
855
self.addCleanup(self._restore_debug_flags)
866
857
def _clear_hooks(self):
867
858
# prevent hooks affecting tests
888
879
def _silenceUI(self):
889
880
"""Turn off UI for duration of test"""
890
881
# by default the UI is off; tests can turn it on if they want it.
891
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
882
saved = ui.ui_factory
884
ui.ui_factory = saved
885
ui.ui_factory = ui.SilentUIFactory()
886
self.addCleanup(_restore)
893
888
def _check_locks(self):
894
889
"""Check that all lock take/release actions have been paired."""
923
918
self._lock_check_thorough = False
925
920
self._lock_check_thorough = True
927
922
self.addCleanup(self._check_locks)
928
923
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
929
924
self._lock_acquired, None)
1022
1017
server's urls to be used.
1024
1019
if backing_server is None:
1025
transport_server.start_server()
1020
transport_server.setUp()
1027
transport_server.start_server(backing_server)
1028
self.addCleanup(transport_server.stop_server)
1022
transport_server.setUp(backing_server)
1023
self.addCleanup(transport_server.tearDown)
1029
1024
# Obtain a real transport because if the server supplies a password, it
1030
1025
# will be hidden from the base on the client side.
1031
1026
t = get_transport(transport_server.get_url())
1045
1040
if t.base.endswith('/work/'):
1046
1041
# we have safety net/test root/work
1047
1042
t = t.clone('../..')
1048
elif isinstance(transport_server,
1049
test_server.SmartTCPServer_for_testing):
1043
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1050
1044
# The smart server adds a path similar to work, which is traversed
1051
1045
# up from by the client. But the server is chrooted - the actual
1052
1046
# backing transport is not escaped from, and VFS requests to the
1207
1201
raise AssertionError('pattern "%s" found in "%s"'
1208
1202
% (needle_re, haystack))
1210
def assertContainsString(self, haystack, needle):
1211
if haystack.find(needle) == -1:
1212
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1214
1204
def assertSubset(self, sublist, superlist):
1215
1205
"""Assert that every entry in sublist is present in superlist."""
1216
1206
missing = set(sublist) - set(superlist)
1314
1304
self.assertEqualDiff(content, s)
1316
def assertDocstring(self, expected_docstring, obj):
1317
"""Fail if obj does not have expected_docstring"""
1319
# With -OO the docstring should be None instead
1320
self.assertIs(obj.__doc__, None)
1322
self.assertEqual(expected_docstring, obj.__doc__)
1324
1306
def failUnlessExists(self, path):
1325
1307
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1308
if not isinstance(path, basestring):
1495
1477
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
1516
1479
def _cleanEnvironment(self):
1518
1481
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1528
1491
'BZR_PROGRESS_BAR': None,
1529
1492
'BZR_LOG': None,
1530
1493
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
1494
'BZR_CONCURRENCY': None,
1534
1495
# Make sure that any text ui tests are consistent regardless of
1535
1496
# the environment the test case is run in; you may want tests that
1556
1517
'ftp_proxy': None,
1557
1518
'FTP_PROXY': None,
1558
1519
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
1522
self.addCleanup(self._restoreEnvironment)
1567
1523
for name, value in new_env.iteritems():
1568
1524
self._captureVar(name, value)
1570
1526
def _captureVar(self, name, newvalue):
1571
1527
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1528
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1530
def _restore_debug_flags(self):
1531
debug.debug_flags.clear()
1532
debug.debug_flags.update(self._preserved_debug_flags)
1574
1534
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1535
for name, value in self.__old_env.iteritems():
1576
1536
osutils.set_or_unset_env(name, value)
1578
1538
def _restoreHooks(self):
1682
1642
unicodestr = log_contents.decode('utf8', 'replace')
1683
1643
log_contents = unicodestr.encode('utf8')
1684
1644
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1645
self._log_file.close()
1712
1646
self._log_file = None
1713
1647
# Permit multiple calls to get_log until we clean it up in
1714
1648
# finishLogFile
2101
2035
Tests that expect to provoke LockContention errors should call this.
2103
self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2037
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2039
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2040
self.addCleanup(resetTimeout)
2041
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2105
2043
def make_utf8_encoded_stringio(self, encoding_type=None):
2106
2044
"""Return a StringIOWrapper instance, that will encode Unicode
2120
2058
request_handlers = request.request_handlers
2121
2059
orig_method = request_handlers.get(verb)
2122
2060
request_handlers.remove(verb)
2123
self.addCleanup(request_handlers.register, verb, orig_method)
2062
request_handlers.register(verb, orig_method)
2063
self.addCleanup(restoreVerb)
2126
2066
class CapturedCall(object):
2217
2157
if self.__readonly_server is None:
2218
2158
if self.transport_readonly_server is None:
2219
2159
# readonly decorator requested
2220
self.__readonly_server = test_server.ReadonlyServer()
2160
self.__readonly_server = ReadonlyServer()
2222
2162
# explicit readonly transport.
2223
2163
self.__readonly_server = self.create_transport_readonly_server()
2246
2186
is no means to override it.
2248
2188
if self.__vfs_server is None:
2249
self.__vfs_server = memory.MemoryServer()
2189
self.__vfs_server = MemoryServer()
2250
2190
self.start_server(self.__vfs_server)
2251
2191
return self.__vfs_server
2409
2349
return made_control.create_repository(shared=shared)
2411
2351
def make_smart_server(self, path):
2412
smart_server = test_server.SmartTCPServer_for_testing()
2352
smart_server = server.SmartTCPServer_for_testing()
2413
2353
self.start_server(smart_server, self.get_server())
2414
2354
remote_transport = get_transport(smart_server.get_url()).clone(path)
2415
2355
return remote_transport
2433
2373
def setUp(self):
2434
2374
super(TestCaseWithMemoryTransport, self).setUp()
2435
2375
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2376
_currentdir = os.getcwdu()
2377
def _leaveDirectory():
2378
os.chdir(_currentdir)
2379
self.addCleanup(_leaveDirectory)
2437
2380
self.makeAndChdirToTestDir()
2438
2381
self.overrideEnvironmentForTesting()
2439
2382
self.__readonly_server = None
2443
2386
def setup_smart_server_with_call_log(self):
2444
2387
"""Sets up a smart server as the transport server with a call log."""
2445
self.transport_server = test_server.SmartTCPServer_for_testing
2388
self.transport_server = server.SmartTCPServer_for_testing
2446
2389
self.hpss_calls = []
2447
2390
import traceback
2448
2391
# Skip the current stack down to the caller of
2661
2604
# We can only make working trees locally at the moment. If the
2662
2605
# transport can't support them, then we keep the non-disk-backed
2663
2606
# branch and create a local checkout.
2664
if self.vfs_transport_factory is test_server.LocalURLServer:
2607
if self.vfs_transport_factory is LocalURLServer:
2665
2608
# the branch is colocated on disk, we cannot create a checkout.
2666
2609
# hopefully callers will expect this.
2667
2610
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2728
2671
def setUp(self):
2729
2672
super(ChrootedTestCase, self).setUp()
2730
if not self.vfs_transport_factory == memory.MemoryServer:
2673
if not self.vfs_transport_factory == MemoryServer:
2731
2674
self.transport_readonly_server = HttpServer
3204
def workaround_zealous_crypto_random():
3205
"""Crypto.Random want to help us being secure, but we don't care here.
3207
This workaround some test failure related to the sftp server. Once paramiko
3208
stop using the controversial API in Crypto.Random, we may get rid of it.
3211
from Crypto.Random import atfork
3217
3147
def fork_for_tests(suite):
3218
3148
"""Take suite and start up one runner per CPU by forking()
3235
3165
ProtocolTestCase.run(self, result)
3237
os.waitpid(self.pid, 0)
3167
os.waitpid(self.pid, os.WNOHANG)
3239
3169
test_blocks = partition_tests(suite, concurrency)
3240
3170
for process_tests in test_blocks:
3618
3547
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3619
3548
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3621
# Obvious highest levels prefixes, feel free to add your own via a plugin
3550
# Obvious higest levels prefixes, feel free to add your own via a plugin
3622
3551
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3623
3552
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3624
3553
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3634
3563
'bzrlib.tests.commands',
3635
3564
'bzrlib.tests.per_branch',
3636
3565
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3638
3566
'bzrlib.tests.per_foreign_vcs',
3639
3567
'bzrlib.tests.per_interrepository',
3640
3568
'bzrlib.tests.per_intertree',
3680
3608
'bzrlib.tests.test_chunk_writer',
3681
3609
'bzrlib.tests.test_clean_tree',
3682
3610
'bzrlib.tests.test_cleanup',
3683
'bzrlib.tests.test_cmdline',
3684
3611
'bzrlib.tests.test_commands',
3685
3612
'bzrlib.tests.test_commit',
3686
3613
'bzrlib.tests.test_commit_merge',
3720
3647
'bzrlib.tests.test_identitymap',
3721
3648
'bzrlib.tests.test_ignores',
3722
3649
'bzrlib.tests.test_index',
3723
'bzrlib.tests.test_import_tariff',
3724
3650
'bzrlib.tests.test_info',
3725
3651
'bzrlib.tests.test_inv',
3726
3652
'bzrlib.tests.test_inventory_delta',
3734
3660
'bzrlib.tests.test_lru_cache',
3735
3661
'bzrlib.tests.test_lsprof',
3736
3662
'bzrlib.tests.test_mail_client',
3737
'bzrlib.tests.test_matchers',
3738
3663
'bzrlib.tests.test_memorytree',
3739
3664
'bzrlib.tests.test_merge',
3740
3665
'bzrlib.tests.test_merge3',
4198
4119
should really use a different feature.
4201
def __init__(self, dep_version, module, name,
4202
replacement_name, replacement_module=None):
4122
def __init__(self, module, name, this_name, dep_version):
4203
4123
super(_CompatabilityThunkFeature, self).__init__()
4204
4124
self._module = module
4205
if replacement_module is None:
4206
replacement_module = module
4207
self._replacement_module = replacement_module
4208
4125
self._name = name
4209
self._replacement_name = replacement_name
4126
self._this_name = this_name
4210
4127
self._dep_version = dep_version
4211
4128
self._feature = None
4213
4130
def _ensure(self):
4214
4131
if self._feature is None:
4215
depr_msg = self._dep_version % ('%s.%s'
4216
% (self._module, self._name))
4217
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4218
self._replacement_name)
4219
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4220
# Import the new feature and use it as a replacement for the
4222
mod = __import__(self._replacement_module, {}, {},
4223
[self._replacement_name])
4224
self._feature = getattr(mod, self._replacement_name)
4132
msg = (self._dep_version % self._this_name) + (
4133
' Use %s.%s instead.' % (self._module, self._name))
4134
symbol_versioning.warn(msg, DeprecationWarning)
4135
mod = __import__(self._module, {}, {}, [self._name])
4136
self._feature = getattr(mod, self._name)
4226
4138
def _probe(self):
4253
4165
if self.available(): # Make sure the probe has been done
4254
4166
return self._module
4257
4169
def feature_name(self):
4258
4170
return self.module_name
4261
4173
# This is kept here for compatibility, it is recommended to use
4262
4174
# 'bzrlib.tests.feature.paramiko' instead
4263
ParamikoFeature = _CompatabilityThunkFeature(
4264
deprecated_in((2,1,0)),
4265
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4175
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
4176
'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4268
4179
def probe_unicode_in_user_encoding():
4429
4340
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4432
class _CaseSensitiveFilesystemFeature(Feature):
4435
if CaseInsCasePresFilenameFeature.available():
4437
elif CaseInsensitiveFilesystemFeature.available():
4442
def feature_name(self):
4443
return 'case-sensitive filesystem'
4445
# new coding style is for feature instances to be lowercase
4446
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4449
4343
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4450
SubUnitFeature = _CompatabilityThunkFeature(
4451
deprecated_in((2,1,0)),
4452
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4344
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4345
'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4453
4346
# Only define SubUnitBzrRunner if subunit is available.
4455
4348
from subunit import TestProtocolClient
4463
4356
except ImportError:
4466
class _PosixPermissionsFeature(Feature):
4470
# create temporary file and check if specified perms are maintained.
4473
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4474
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4477
os.chmod(name, write_perms)
4479
read_perms = os.stat(name).st_mode & 0777
4481
return (write_perms == read_perms)
4483
return (os.name == 'posix') and has_perms()
4485
def feature_name(self):
4486
return 'POSIX permissions support'
4488
posix_permissions_feature = _PosixPermissionsFeature()