216
217
fails with an unexpected error.
218
219
self._testConcluded(test)
219
if isinstance(err[1], TestSkipped):
220
return self._addSkipped(test, err)
220
if isinstance(err[1], TestNotApplicable):
221
return self._addNotApplicable(test, err)
221
222
elif isinstance(err[1], UnavailableFeature):
222
223
return self.addNotSupported(test, err[1].args[0])
286
287
self.unsupported[str(feature)] += 1
287
288
self.report_unsupported(test, feature)
289
def _addSkipped(self, test, skip_excinfo):
290
def addSkip(self, test, reason):
291
"""A test has not run for 'reason'."""
293
self.report_skip(test, reason)
295
def _addNotApplicable(self, test, skip_excinfo):
290
296
if isinstance(skip_excinfo[1], TestNotApplicable):
291
297
self.not_applicable_count += 1
292
298
self.report_not_applicable(test, skip_excinfo)
295
self.report_skip(test, skip_excinfo)
298
301
except KeyboardInterrupt:
301
self.addError(test, test._exc_info())
304
self.addError(test, test.exc_info())
303
306
# seems best to treat this as success from point-of-view of unittest
304
307
# -- it actually does nothing so it barely matters :)
485
488
# used to show the output in PQM.
486
489
self.stream.flush()
488
def report_skip(self, test, skip_excinfo):
491
def report_skip(self, test, reason):
489
492
self.stream.writeln(' SKIP %s\n%s'
490
% (self._testTimeString(test),
491
self._error_summary(skip_excinfo)))
493
% (self._testTimeString(test), reason))
493
495
def report_not_applicable(self, test, skip_excinfo):
494
496
self.stream.writeln(' N/A %s\n%s'
585
587
def iter_suite_tests(suite):
586
588
"""Return all tests in a suite, recursing through nested suites"""
587
for item in suite._tests:
588
if isinstance(item, unittest.TestCase):
590
elif isinstance(item, unittest.TestSuite):
589
if isinstance(suite, unittest.TestCase):
591
elif isinstance(suite, unittest.TestSuite):
592
for item in suite._tests:
591
593
for r in iter_suite_tests(item):
594
raise Exception('unknown object %r inside test suite %r'
596
raise Exception('unknown type %r for object %r'
597
% (type(suite), suite))
598
600
class TestSkipped(Exception):
775
780
TestCase._active_threads = threading.activeCount()
776
781
self.addCleanup(self._check_leaked_threads)
786
pdb.Pdb().set_trace(sys._getframe().f_back)
789
absent_attr = object()
790
exc_info = getattr(self, '_exc_info', absent_attr)
791
if exc_info is absent_attr:
792
exc_info = getattr(self, '_TestCase__exc_info')
778
795
def _check_leaked_threads(self):
779
796
active = threading.activeCount()
780
797
leaked_threads = active - TestCase._active_threads
802
819
def _clear_hooks(self):
803
820
# prevent hooks affecting tests
805
import bzrlib.smart.client
806
import bzrlib.smart.server
807
self._preserved_hooks = {
808
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
809
bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
810
bzrlib.smart.client._SmartClient: bzrlib.smart.client._SmartClient.hooks,
811
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
812
bzrlib.commands.Command: bzrlib.commands.Command.hooks,
821
self._preserved_hooks = {}
822
for key, factory in hooks.known_hooks.items():
823
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
824
current_hooks = hooks.known_hooks_key_to_object(key)
825
self._preserved_hooks[parent] = (name, current_hooks)
814
826
self.addCleanup(self._restoreHooks)
815
# reset all hooks to an empty instance of the appropriate type
816
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
817
bzrlib.smart.client._SmartClient.hooks = bzrlib.smart.client.SmartClientHooks()
818
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
819
bzrlib.commands.Command.hooks = bzrlib.commands.CommandHooks()
827
for key, factory in hooks.known_hooks.items():
828
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
829
setattr(parent, name, factory())
821
831
def _silenceUI(self):
822
832
"""Turn off UI for duration of test"""
895
905
self.assertEqual(expected.st_ino, actual.st_ino)
896
906
self.assertEqual(expected.st_mode, actual.st_mode)
908
def assertLength(self, length, obj_with_len):
909
"""Assert that obj_with_len is of length length."""
910
if len(obj_with_len) != length:
911
self.fail("Incorrect length: wanted %d, got %d for %r" % (
912
length, len(obj_with_len), obj_with_len))
898
914
def assertPositive(self, val):
899
915
"""Assert that val is greater than 0."""
900
916
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1276
1292
osutils.set_or_unset_env(name, value)
1278
1294
def _restoreHooks(self):
1279
for klass, hooks in self._preserved_hooks.items():
1280
setattr(klass, 'hooks', hooks)
1295
for klass, (name, hooks) in self._preserved_hooks.items():
1296
setattr(klass, name, hooks)
1282
1298
def knownFailure(self, reason):
1283
1299
"""This test has failed for some known reason."""
1284
1300
raise KnownFailure(reason)
1302
def _do_skip(self, result, reason):
1303
addSkip = getattr(result, 'addSkip', None)
1304
if not callable(addSkip):
1305
result.addError(self, self.exc_info())
1307
addSkip(self, reason)
1286
1309
def run(self, result=None):
1287
1310
if result is None: result = self.defaultTestResult()
1288
1311
for feature in getattr(self, '_test_needs_features', []):
1295
1318
result.stopTest(self)
1298
return unittest.TestCase.run(self, result)
1322
result.startTest(self)
1323
absent_attr = object()
1325
method_name = getattr(self, '_testMethodName', absent_attr)
1326
if method_name is absent_attr:
1328
method_name = getattr(self, '_TestCase__testMethodName')
1329
testMethod = getattr(self, method_name)
1333
if not self._bzr_test_setUp_run:
1335
"test setUp did not invoke "
1336
"bzrlib.tests.TestCase's setUp")
1337
except KeyboardInterrupt:
1339
except TestSkipped, e:
1340
self._do_skip(result, e.args[0])
1344
result.addError(self, self.exc_info())
1351
except self.failureException:
1352
result.addFailure(self, self.exc_info())
1353
except TestSkipped, e:
1355
reason = "No reason given."
1358
self._do_skip(result, reason)
1359
except KeyboardInterrupt:
1362
result.addError(self, self.exc_info())
1366
if not self._bzr_test_tearDown_run:
1368
"test tearDown did not invoke "
1369
"bzrlib.tests.TestCase's tearDown")
1370
except KeyboardInterrupt:
1373
result.addError(self, self.exc_info())
1375
if ok: result.addSuccess(self)
1377
result.stopTest(self)
1379
except TestNotApplicable:
1380
# Not moved from the result [yet].
1382
except KeyboardInterrupt:
1300
1385
saved_attrs = {}
1301
1386
absent_attr = object()
1841
class CapturedCall(object):
1842
"""A helper for capturing smart server calls for easy debug analysis."""
1844
def __init__(self, params, prefix_length):
1845
"""Capture the call with params and skip prefix_length stack frames."""
1848
# The last 5 frames are the __init__, the hook frame, and 3 smart
1849
# client frames. Beyond this we could get more clever, but this is good
1851
stack = traceback.extract_stack()[prefix_length:-5]
1852
self.stack = ''.join(traceback.format_list(stack))
1855
return self.call.method
1858
return self.call.method
1754
1864
class TestCaseWithMemoryTransport(TestCase):
1755
1865
"""Common test class for tests that do not need disk resources.
2016
2126
made_control = self.make_bzrdir(relpath, format=format)
2017
2127
return made_control.create_repository(shared=shared)
2129
def make_smart_server(self, path):
2130
smart_server = server.SmartTCPServer_for_testing()
2131
smart_server.setUp(self.get_server())
2132
remote_transport = get_transport(smart_server.get_url()).clone(path)
2133
self.addCleanup(smart_server.tearDown)
2134
return remote_transport
2019
2136
def make_branch_and_memory_tree(self, relpath, format=None):
2020
2137
"""Create a branch on the default transport and a MemoryTree for it."""
2021
2138
b = self.make_branch(relpath, format=format)
2022
2139
return memorytree.MemoryTree.create_on_branch(b)
2024
2141
def make_branch_builder(self, relpath, format=None):
2025
url = self.get_url(relpath)
2026
tran = get_transport(url)
2027
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2142
return branchbuilder.BranchBuilder(self.get_transport(relpath),
2029
2145
def overrideEnvironmentForTesting(self):
2030
2146
os.environ['HOME'] = self.test_home_dir
2047
2163
"""Sets up a smart server as the transport server with a call log."""
2048
2164
self.transport_server = server.SmartTCPServer_for_testing
2049
2165
self.hpss_calls = []
2167
# Skip the current stack down to the caller of
2168
# setup_smart_server_with_call_log
2169
prefix_length = len(traceback.extract_stack()) - 2
2050
2170
def capture_hpss_call(params):
2052
self.hpss_calls.append((params, traceback.format_stack()))
2171
self.hpss_calls.append(
2172
CapturedCall(params, prefix_length))
2053
2173
client._SmartClient.hooks.install_named_hook(
2054
2174
'call', capture_hpss_call, None)
3066
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
3067
"""Adapt all tests in some given modules to given scenarios.
3069
This is the recommended public interface for test parameterization.
3070
Typically the test_suite() method for a per-implementation test
3071
suite will call multiply_tests_from_modules and return the
3074
:param module_name_list: List of fully-qualified names of test
3076
:param scenario_iter: Iterable of pairs of (scenario_name,
3077
scenario_param_dict).
3078
:param loader: If provided, will be used instead of a new
3079
bzrlib.tests.TestLoader() instance.
3081
This returns a new TestSuite containing the cross product of
3082
all the tests in all the modules, each repeated for each scenario.
3083
Each test is adapted by adding the scenario name at the end
3084
of its name, and updating the test object's __dict__ with the
3085
scenario_param_dict.
3087
>>> r = multiply_tests_from_modules(
3088
... ['bzrlib.tests.test_sampler'],
3089
... [('one', dict(param=1)),
3090
... ('two', dict(param=2))])
3091
>>> tests = list(iter_suite_tests(r))
3095
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3101
# XXX: Isn't load_tests() a better way to provide the same functionality
3102
# without forcing a predefined TestScenarioApplier ? --vila 080215
3104
loader = TestUtil.TestLoader()
3106
suite = loader.suiteClass()
3108
adapter = TestScenarioApplier()
3109
adapter.scenarios = list(scenario_iter)
3110
adapt_modules(module_name_list, adapter, loader, suite)
3114
3188
def multiply_scenarios(scenarios_left, scenarios_right):
3115
3189
"""Multiply two sets of scenarios.
3125
3199
for right_name, right_dict in scenarios_right]
3129
def adapt_modules(mods_list, adapter, loader, suite):
3130
"""Adapt the modules in mods_list using adapter and add to suite."""
3131
tests = loader.loadTestsFromModuleNames(mods_list)
3132
adapt_tests(tests, adapter, suite)
3135
def adapt_tests(tests_list, adapter, suite):
3136
"""Adapt the tests in tests_list using adapter and add to suite."""
3137
for test in iter_suite_tests(tests_list):
3138
suite.addTests(adapter.adapt(test))
3202
def multiply_tests(tests, scenarios, result):
3203
"""Multiply tests_list by scenarios into result.
3205
This is the core workhorse for test parameterisation.
3207
Typically the load_tests() method for a per-implementation test suite will
3208
call multiply_tests and return the result.
3210
:param tests: The tests to parameterise.
3211
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3212
scenario_param_dict).
3213
:param result: A TestSuite to add created tests to.
3215
This returns the passed in result TestSuite with the cross product of all
3216
the tests repeated once for each scenario. Each test is adapted by adding
3217
the scenario name at the end of its id(), and updating the test object's
3218
__dict__ with the scenario_param_dict.
3220
>>> r = multiply_tests(
3221
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3222
... [('one', dict(param=1)),
3223
... ('two', dict(param=2))],
3225
>>> tests = list(iter_suite_tests(r))
3229
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3235
for test in iter_suite_tests(tests):
3236
apply_scenarios(test, scenarios, result)
3240
def apply_scenarios(test, scenarios, result):
3241
"""Apply the scenarios in scenarios to test and add to result.
3243
:param test: The test to apply scenarios to.
3244
:param scenarios: An iterable of scenarios to apply to test.
3246
:seealso: apply_scenario
3248
for scenario in scenarios:
3249
result.addTest(apply_scenario(test, scenario))
3253
def apply_scenario(test, scenario):
3254
"""Copy test and apply scenario to it.
3256
:param test: A test to adapt.
3257
:param scenario: A tuple describing the scenarion.
3258
The first element of the tuple is the new test id.
3259
The second element is a dict containing attributes to set on the
3261
:return: The adapted test.
3263
new_id = "%s(%s)" % (test.id(), scenario[0])
3264
new_test = clone_test(test, new_id)
3265
for name, value in scenario[1].items():
3266
setattr(new_test, name, value)
3270
def clone_test(test, new_id):
3271
"""Clone a test giving it a new id.
3273
:param test: The test to clone.
3274
:param new_id: The id to assign to it.
3275
:return: The new test.
3277
from copy import deepcopy
3278
new_test = deepcopy(test)
3279
new_test.id = lambda: new_id
3141
3283
def _rmtree_temp_dir(dirname):
3246
3388
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3249
class TestScenarioApplier(object):
3250
"""A tool to apply scenarios to tests."""
3252
def adapt(self, test):
3253
"""Return a TestSuite containing a copy of test for each scenario."""
3254
result = unittest.TestSuite()
3255
for scenario in self.scenarios:
3256
result.addTest(self.adapt_test_to_scenario(test, scenario))
3259
def adapt_test_to_scenario(self, test, scenario):
3260
"""Copy test and apply scenario to it.
3262
:param test: A test to adapt.
3263
:param scenario: A tuple describing the scenarion.
3264
The first element of the tuple is the new test id.
3265
The second element is a dict containing attributes to set on the
3267
:return: The adapted test.
3269
from copy import deepcopy
3270
new_test = deepcopy(test)
3271
for name, value in scenario[1].items():
3272
setattr(new_test, name, value)
3273
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3274
new_test.id = lambda: new_id
3278
3391
def probe_unicode_in_user_encoding():
3279
3392
"""Try to encode several unicode strings to use in unicode-aware tests.
3280
3393
Return first successfull match.