1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Testing framework extensions"""
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
102
101
deprecated_passed,
104
103
import bzrlib.trace
105
from bzrlib.transport import (
104
from bzrlib.transport import get_transport, pathfilter
110
105
import bzrlib.transport
106
from bzrlib.transport.local import LocalURLServer
107
from bzrlib.transport.memory import MemoryServer
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
from bzrlib.tests import (
110
from bzrlib.tests import TestUtil
116
111
from bzrlib.tests.http_server import HttpServer
117
112
from bzrlib.tests.TestUtil import (
129
124
# shown frame is the test code, not our assertXYZ.
132
default_transport = test_server.LocalURLServer
127
default_transport = LocalURLServer
135
130
_unitialized_attr = object()
490
485
return self._shortened_test_description(test)
492
487
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
488
ui.ui_factory.note('ERROR: %s\n %s\n' % (
494
489
self._test_description(test),
498
493
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
494
ui.ui_factory.note('FAIL: %s\n %s\n' % (
500
495
self._test_description(test),
1045
1044
if t.base.endswith('/work/'):
1046
1045
# we have safety net/test root/work
1047
1046
t = t.clone('../..')
1048
elif isinstance(transport_server,
1049
test_server.SmartTCPServer_for_testing):
1047
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1050
1048
# The smart server adds a path similar to work, which is traversed
1051
1049
# up from by the client. But the server is chrooted - the actual
1052
1050
# backing transport is not escaped from, and VFS requests to the
1207
1205
raise AssertionError('pattern "%s" found in "%s"'
1208
1206
% (needle_re, haystack))
1210
def assertContainsString(self, haystack, needle):
1211
if haystack.find(needle) == -1:
1212
self.fail("string %r not found in '''%s'''" % (needle, haystack))
1214
1208
def assertSubset(self, sublist, superlist):
1215
1209
"""Assert that every entry in sublist is present in superlist."""
1216
1210
missing = set(sublist) - set(superlist)
1314
1308
self.assertEqualDiff(content, s)
1316
def assertDocstring(self, expected_docstring, obj):
1317
"""Fail if obj does not have expected_docstring"""
1319
# With -OO the docstring should be None instead
1320
self.assertIs(obj.__doc__, None)
1322
self.assertEqual(expected_docstring, obj.__doc__)
1324
1310
def failUnlessExists(self, path):
1325
1311
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1312
if not isinstance(path, basestring):
1528
1514
'BZR_PROGRESS_BAR': None,
1529
1515
'BZR_LOG': None,
1530
1516
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
1517
'BZR_CONCURRENCY': None,
1534
1518
# Make sure that any text ui tests are consistent regardless of
1535
1519
# the environment the test case is run in; you may want tests that
1556
1540
'ftp_proxy': None,
1557
1541
'FTP_PROXY': None,
1558
1542
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
1545
self.addCleanup(self._restoreEnvironment)
1567
1546
for name, value in new_env.iteritems():
1568
1547
self._captureVar(name, value)
1570
1549
def _captureVar(self, name, newvalue):
1571
1550
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1551
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1574
1553
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1554
for name, value in self.__old_env.iteritems():
1576
1555
osutils.set_or_unset_env(name, value)
1578
1557
def _restoreHooks(self):
1682
1661
unicodestr = log_contents.decode('utf8', 'replace')
1683
1662
log_contents = unicodestr.encode('utf8')
1684
1663
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1664
self._log_file.close()
1712
1665
self._log_file = None
1713
1666
# Permit multiple calls to get_log until we clean it up in
1714
1667
# finishLogFile
2217
2170
if self.__readonly_server is None:
2218
2171
if self.transport_readonly_server is None:
2219
2172
# readonly decorator requested
2220
self.__readonly_server = test_server.ReadonlyServer()
2173
self.__readonly_server = ReadonlyServer()
2222
2175
# explicit readonly transport.
2223
2176
self.__readonly_server = self.create_transport_readonly_server()
2246
2199
is no means to override it.
2248
2201
if self.__vfs_server is None:
2249
self.__vfs_server = memory.MemoryServer()
2202
self.__vfs_server = MemoryServer()
2250
2203
self.start_server(self.__vfs_server)
2251
2204
return self.__vfs_server
2409
2362
return made_control.create_repository(shared=shared)
2411
2364
def make_smart_server(self, path):
2412
smart_server = test_server.SmartTCPServer_for_testing()
2365
smart_server = server.SmartTCPServer_for_testing()
2413
2366
self.start_server(smart_server, self.get_server())
2414
2367
remote_transport = get_transport(smart_server.get_url()).clone(path)
2415
2368
return remote_transport
2443
2396
def setup_smart_server_with_call_log(self):
2444
2397
"""Sets up a smart server as the transport server with a call log."""
2445
self.transport_server = test_server.SmartTCPServer_for_testing
2398
self.transport_server = server.SmartTCPServer_for_testing
2446
2399
self.hpss_calls = []
2447
2400
import traceback
2448
2401
# Skip the current stack down to the caller of
2661
2614
# We can only make working trees locally at the moment. If the
2662
2615
# transport can't support them, then we keep the non-disk-backed
2663
2616
# branch and create a local checkout.
2664
if self.vfs_transport_factory is test_server.LocalURLServer:
2617
if self.vfs_transport_factory is LocalURLServer:
2665
2618
# the branch is colocated on disk, we cannot create a checkout.
2666
2619
# hopefully callers will expect this.
2667
2620
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2728
2681
def setUp(self):
2729
2682
super(ChrootedTestCase, self).setUp()
2730
if not self.vfs_transport_factory == memory.MemoryServer:
2683
if not self.vfs_transport_factory == MemoryServer:
2731
2684
self.transport_readonly_server = HttpServer
3204
def workaround_zealous_crypto_random():
3205
"""Crypto.Random want to help us being secure, but we don't care here.
3207
This workaround some test failure related to the sftp server. Once paramiko
3208
stop using the controversial API in Crypto.Random, we may get rid of it.
3211
from Crypto.Random import atfork
3217
3157
def fork_for_tests(suite):
3218
3158
"""Take suite and start up one runner per CPU by forking()
3235
3175
ProtocolTestCase.run(self, result)
3237
os.waitpid(self.pid, 0)
3177
os.waitpid(self.pid, os.WNOHANG)
3239
3179
test_blocks = partition_tests(suite, concurrency)
3240
3180
for process_tests in test_blocks:
3618
3557
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3619
3558
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3621
# Obvious highest levels prefixes, feel free to add your own via a plugin
3560
# Obvious higest levels prefixes, feel free to add your own via a plugin
3622
3561
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3623
3562
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3624
3563
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3634
3573
'bzrlib.tests.commands',
3635
3574
'bzrlib.tests.per_branch',
3636
3575
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3638
3576
'bzrlib.tests.per_foreign_vcs',
3639
3577
'bzrlib.tests.per_interrepository',
3640
3578
'bzrlib.tests.per_intertree',
3680
3618
'bzrlib.tests.test_chunk_writer',
3681
3619
'bzrlib.tests.test_clean_tree',
3682
3620
'bzrlib.tests.test_cleanup',
3683
'bzrlib.tests.test_cmdline',
3684
3621
'bzrlib.tests.test_commands',
3685
3622
'bzrlib.tests.test_commit',
3686
3623
'bzrlib.tests.test_commit_merge',
3720
3657
'bzrlib.tests.test_identitymap',
3721
3658
'bzrlib.tests.test_ignores',
3722
3659
'bzrlib.tests.test_index',
3723
'bzrlib.tests.test_import_tariff',
3724
3660
'bzrlib.tests.test_info',
3725
3661
'bzrlib.tests.test_inv',
3726
3662
'bzrlib.tests.test_inventory_delta',
4197
4130
should really use a different feature.
4200
def __init__(self, dep_version, module, name,
4201
replacement_name, replacement_module=None):
4133
def __init__(self, module, name, this_name, dep_version):
4202
4134
super(_CompatabilityThunkFeature, self).__init__()
4203
4135
self._module = module
4204
if replacement_module is None:
4205
replacement_module = module
4206
self._replacement_module = replacement_module
4207
4136
self._name = name
4208
self._replacement_name = replacement_name
4137
self._this_name = this_name
4209
4138
self._dep_version = dep_version
4210
4139
self._feature = None
4212
4141
def _ensure(self):
4213
4142
if self._feature is None:
4214
depr_msg = self._dep_version % ('%s.%s'
4215
% (self._module, self._name))
4216
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4217
self._replacement_name)
4218
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4219
# Import the new feature and use it as a replacement for the
4221
mod = __import__(self._replacement_module, {}, {},
4222
[self._replacement_name])
4223
self._feature = getattr(mod, self._replacement_name)
4143
msg = (self._dep_version % self._this_name) + (
4144
' Use %s.%s instead.' % (self._module, self._name))
4145
symbol_versioning.warn(msg, DeprecationWarning)
4146
mod = __import__(self._module, {}, {}, [self._name])
4147
self._feature = getattr(mod, self._name)
4225
4149
def _probe(self):
4252
4176
if self.available(): # Make sure the probe has been done
4253
4177
return self._module
4256
4180
def feature_name(self):
4257
4181
return self.module_name
4260
4184
# This is kept here for compatibility, it is recommended to use
4261
4185
# 'bzrlib.tests.feature.paramiko' instead
4262
ParamikoFeature = _CompatabilityThunkFeature(
4263
deprecated_in((2,1,0)),
4264
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4186
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
4187
'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4267
4190
def probe_unicode_in_user_encoding():
4428
4351
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4431
class _CaseSensitiveFilesystemFeature(Feature):
4434
if CaseInsCasePresFilenameFeature.available():
4436
elif CaseInsensitiveFilesystemFeature.available():
4441
def feature_name(self):
4442
return 'case-sensitive filesystem'
4444
# new coding style is for feature instances to be lowercase
4445
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4448
4354
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4449
SubUnitFeature = _CompatabilityThunkFeature(
4450
deprecated_in((2,1,0)),
4451
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4355
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4356
'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4452
4357
# Only define SubUnitBzrRunner if subunit is available.
4454
4359
from subunit import TestProtocolClient
4462
4367
except ImportError:
4465
class _PosixPermissionsFeature(Feature):
4469
# create temporary file and check if specified perms are maintained.
4472
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4473
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4476
os.chmod(name, write_perms)
4478
read_perms = os.stat(name).st_mode & 0777
4480
return (write_perms == read_perms)
4482
return (os.name == 'posix') and has_perms()
4484
def feature_name(self):
4485
return 'POSIX permissions support'
4487
posix_permissions_feature = _PosixPermissionsFeature()