215
217
fails with an unexpected error.
217
219
self._testConcluded(test)
218
if isinstance(err[1], TestSkipped):
219
return self._addSkipped(test, err)
220
if isinstance(err[1], TestNotApplicable):
221
return self._addNotApplicable(test, err)
220
222
elif isinstance(err[1], UnavailableFeature):
221
223
return self.addNotSupported(test, err[1].args[0])
285
287
self.unsupported[str(feature)] += 1
286
288
self.report_unsupported(test, feature)
288
def _addSkipped(self, test, skip_excinfo):
290
def addSkip(self, test, reason):
291
"""A test has not run for 'reason'."""
293
self.report_skip(test, reason)
295
def _addNotApplicable(self, test, skip_excinfo):
289
296
if isinstance(skip_excinfo[1], TestNotApplicable):
290
297
self.not_applicable_count += 1
291
298
self.report_not_applicable(test, skip_excinfo)
294
self.report_skip(test, skip_excinfo)
297
301
except KeyboardInterrupt:
300
self.addError(test, test._exc_info())
304
self.addError(test, test.exc_info())
302
306
# seems best to treat this as success from point-of-view of unittest
303
307
# -- it actually does nothing so it barely matters :)
395
399
self._progress_prefix_text()
397
401
+ self._shortened_test_description(test))
399
403
def _test_description(self, test):
400
404
return self._shortened_test_description(test)
402
406
def report_error(self, test, err):
403
self.pb.note('ERROR: %s\n %s\n',
407
self.pb.note('ERROR: %s\n %s\n',
404
408
self._test_description(test),
408
412
def report_failure(self, test, err):
409
self.pb.note('FAIL: %s\n %s\n',
413
self.pb.note('FAIL: %s\n %s\n',
410
414
self._test_description(test),
484
488
# used to show the output in PQM.
485
489
self.stream.flush()
487
def report_skip(self, test, skip_excinfo):
491
def report_skip(self, test, reason):
488
492
self.stream.writeln(' SKIP %s\n%s'
489
% (self._testTimeString(test),
490
self._error_summary(skip_excinfo)))
493
% (self._testTimeString(test), reason))
492
495
def report_not_applicable(self, test, skip_excinfo):
493
496
self.stream.writeln(' N/A %s\n%s'
584
587
def iter_suite_tests(suite):
585
588
"""Return all tests in a suite, recursing through nested suites"""
586
for item in suite._tests:
587
if isinstance(item, unittest.TestCase):
589
elif isinstance(item, unittest.TestSuite):
589
if isinstance(suite, unittest.TestCase):
591
elif isinstance(suite, unittest.TestSuite):
592
for item in suite._tests:
590
593
for r in iter_suite_tests(item):
593
raise Exception('unknown object %r inside test suite %r'
596
raise Exception('unknown type %r for object %r'
597
% (type(suite), suite))
597
600
class TestSkipped(Exception):
736
739
retrieved by _get_log(). We use a real OS file, not an in-memory object,
737
740
so that it can also capture file IO. When the test completes this file
738
741
is read into memory and removed from disk.
740
743
There are also convenience functions to invoke bzr's command-line
741
744
routine, and to build and check bzr trees.
743
746
In addition to the usual method of overriding tearDown(), this class also
744
747
allows subclasses to register functions into the _cleanups list, which is
745
748
run in order as the object is torn down. It's less likely this will be
774
777
TestCase._active_threads = threading.activeCount()
775
778
self.addCleanup(self._check_leaked_threads)
783
pdb.Pdb().set_trace(sys._getframe().f_back)
786
absent_attr = object()
787
exc_info = getattr(self, '_exc_info', absent_attr)
788
if exc_info is absent_attr:
789
exc_info = getattr(self, '_TestCase__exc_info')
777
792
def _check_leaked_threads(self):
778
793
active = threading.activeCount()
779
794
leaked_threads = active - TestCase._active_threads
801
816
def _clear_hooks(self):
802
817
# prevent hooks affecting tests
804
import bzrlib.smart.client
805
import bzrlib.smart.server
806
self._preserved_hooks = {
807
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
808
bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
809
bzrlib.smart.client._SmartClient: bzrlib.smart.client._SmartClient.hooks,
810
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
818
self._preserved_hooks = {}
819
for key, factory in hooks.known_hooks.items():
820
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
821
current_hooks = hooks.known_hooks_key_to_object(key)
822
self._preserved_hooks[parent] = (name, current_hooks)
812
823
self.addCleanup(self._restoreHooks)
813
# reset all hooks to an empty instance of the appropriate type
814
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
815
bzrlib.smart.client._SmartClient.hooks = bzrlib.smart.client.SmartClientHooks()
816
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
824
for key, factory in hooks.known_hooks.items():
825
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
826
setattr(parent, name, factory())
818
828
def _silenceUI(self):
819
829
"""Turn off UI for duration of test"""
1272
1282
osutils.set_or_unset_env(name, value)
1274
1284
def _restoreHooks(self):
1275
for klass, hooks in self._preserved_hooks.items():
1276
setattr(klass, 'hooks', hooks)
1285
for klass, (name, hooks) in self._preserved_hooks.items():
1286
setattr(klass, name, hooks)
1278
1288
def knownFailure(self, reason):
1279
1289
"""This test has failed for some known reason."""
1280
1290
raise KnownFailure(reason)
1292
def _do_skip(self, result, reason):
1293
addSkip = getattr(result, 'addSkip', None)
1294
if not callable(addSkip):
1295
result.addError(self, self.exc_info())
1297
addSkip(self, reason)
1282
1299
def run(self, result=None):
1283
1300
if result is None: result = self.defaultTestResult()
1284
1301
for feature in getattr(self, '_test_needs_features', []):
1291
1308
result.stopTest(self)
1294
return unittest.TestCase.run(self, result)
1312
result.startTest(self)
1313
absent_attr = object()
1315
method_name = getattr(self, '_testMethodName', absent_attr)
1316
if method_name is absent_attr:
1318
method_name = getattr(self, '_TestCase__testMethodName')
1319
testMethod = getattr(self, method_name)
1323
except KeyboardInterrupt:
1325
except TestSkipped, e:
1326
self._do_skip(result, e.args[0])
1330
result.addError(self, self.exc_info())
1337
except self.failureException:
1338
result.addFailure(self, self.exc_info())
1339
except TestSkipped, e:
1341
reason = "No reason given."
1344
self._do_skip(result, reason)
1345
except KeyboardInterrupt:
1348
result.addError(self, self.exc_info())
1352
except KeyboardInterrupt:
1355
result.addError(self, self.exc_info())
1357
if ok: result.addSuccess(self)
1359
result.stopTest(self)
1361
except TestNotApplicable:
1362
# Not moved from the result [yet].
1364
except KeyboardInterrupt:
1296
1367
saved_attrs = {}
1297
1368
absent_attr = object()
1522
1594
def run_bzr_subprocess(self, *args, **kwargs):
1523
1595
"""Run bzr in a subprocess for testing.
1525
This starts a new Python interpreter and runs bzr in there.
1597
This starts a new Python interpreter and runs bzr in there.
1526
1598
This should only be used for tests that have a justifiable need for
1527
1599
this isolation: e.g. they are testing startup time, or signal
1528
handling, or early startup code, etc. Subprocess code can't be
1600
handling, or early startup code, etc. Subprocess code can't be
1529
1601
profiled or debugged so easily.
1531
1603
:keyword retcode: The status code that is expected. Defaults to 0. If
1822
class CapturedCall(object):
1823
"""A helper for capturing smart server calls for easy debug analysis."""
1825
def __init__(self, params, prefix_length):
1826
"""Capture the call with params and skip prefix_length stack frames."""
1829
# The last 5 frames are the __init__, the hook frame, and 3 smart
1830
# client frames. Beyond this we could get more clever, but this is good
1832
stack = traceback.extract_stack()[prefix_length:-5]
1833
self.stack = ''.join(traceback.format_list(stack))
1836
return self.call.method
1839
return self.call.method
1750
1845
class TestCaseWithMemoryTransport(TestCase):
1751
1846
"""Common test class for tests that do not need disk resources.
1970
2065
def makeAndChdirToTestDir(self):
1971
2066
"""Create a temporary directories for this one test.
1973
2068
This must set self.test_home_dir and self.test_dir and chdir to
1976
2071
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1978
2073
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1979
2074
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1980
2075
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1982
2077
def make_branch(self, relpath, format=None):
1983
2078
"""Create a branch on the transport at relpath."""
1984
2079
repo = self.make_repository(relpath, format=format)
2003
2098
def make_repository(self, relpath, shared=False, format=None):
2004
2099
"""Create a repository on our default transport at relpath.
2006
2101
Note that relpath must be a relative path, not a full url.
2008
2103
# FIXME: If you create a remoterepository this returns the underlying
2009
# real format, which is incorrect. Actually we should make sure that
2104
# real format, which is incorrect. Actually we should make sure that
2010
2105
# RemoteBzrDir returns a RemoteRepository.
2011
2106
# maybe mbp 20070410
2012
2107
made_control = self.make_bzrdir(relpath, format=format)
2018
2113
return memorytree.MemoryTree.create_on_branch(b)
2020
2115
def make_branch_builder(self, relpath, format=None):
2021
url = self.get_url(relpath)
2022
tran = get_transport(url)
2023
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2116
return branchbuilder.BranchBuilder(self.get_transport(relpath),
2025
2119
def overrideEnvironmentForTesting(self):
2026
2120
os.environ['HOME'] = self.test_home_dir
2027
2121
os.environ['BZR_HOME'] = self.test_home_dir
2029
2123
def setUp(self):
2030
2124
super(TestCaseWithMemoryTransport, self).setUp()
2031
2125
self._make_test_root()
2039
2133
self.__server = None
2040
2134
self.reduceLockdirTimeout()
2136
def setup_smart_server_with_call_log(self):
2137
"""Sets up a smart server as the transport server with a call log."""
2138
self.transport_server = server.SmartTCPServer_for_testing
2139
self.hpss_calls = []
2141
# Skip the current stack down to the caller of
2142
# setup_smart_server_with_call_log
2143
prefix_length = len(traceback.extract_stack()) - 2
2144
def capture_hpss_call(params):
2145
self.hpss_calls.append(
2146
CapturedCall(params, prefix_length))
2147
client._SmartClient.hooks.install_named_hook(
2148
'call', capture_hpss_call, None)
2150
def reset_smart_call_log(self):
2151
self.hpss_calls = []
2043
2154
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2044
2155
"""Derived class that runs a test within a temporary directory.
2485
2596
list_only=False,
2486
2597
random_seed=None,
2487
2598
exclude_pattern=None,
2601
"""Run a test suite for bzr selftest.
2603
:param runner_class: The class of runner to use. Must support the
2604
constructor arguments passed by run_suite which are more than standard
2606
:return: A boolean indicating success.
2489
2608
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2494
runner = TextTestRunner(stream=sys.stdout,
2613
if runner_class is None:
2614
runner_class = TextTestRunner
2615
runner = runner_class(stream=sys.stdout,
2495
2616
descriptions=0,
2496
2617
verbosity=verbosity,
2497
2618
bench_history=bench_history,
2797
2923
'bzrlib.tests.test_counted_lock',
2798
2924
'bzrlib.tests.test_decorators',
2799
2925
'bzrlib.tests.test_delta',
2926
'bzrlib.tests.test_debug',
2800
2927
'bzrlib.tests.test_deprecated_graph',
2801
2928
'bzrlib.tests.test_diff',
2802
2929
'bzrlib.tests.test_directory_service',
2803
2930
'bzrlib.tests.test_dirstate',
2804
2931
'bzrlib.tests.test_email_message',
2805
2932
'bzrlib.tests.test_errors',
2933
'bzrlib.tests.test_export',
2806
2934
'bzrlib.tests.test_extract',
2807
2935
'bzrlib.tests.test_fetch',
2808
2936
'bzrlib.tests.test_fifo_cache',
3033
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
3034
"""Adapt all tests in some given modules to given scenarios.
3036
This is the recommended public interface for test parameterization.
3037
Typically the test_suite() method for a per-implementation test
3038
suite will call multiply_tests_from_modules and return the
3041
:param module_name_list: List of fully-qualified names of test
3043
:param scenario_iter: Iterable of pairs of (scenario_name,
3044
scenario_param_dict).
3045
:param loader: If provided, will be used instead of a new
3046
bzrlib.tests.TestLoader() instance.
3048
This returns a new TestSuite containing the cross product of
3049
all the tests in all the modules, each repeated for each scenario.
3050
Each test is adapted by adding the scenario name at the end
3051
of its name, and updating the test object's __dict__ with the
3052
scenario_param_dict.
3054
>>> r = multiply_tests_from_modules(
3055
... ['bzrlib.tests.test_sampler'],
3056
... [('one', dict(param=1)),
3057
... ('two', dict(param=2))])
3058
>>> tests = list(iter_suite_tests(r))
3062
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3068
# XXX: Isn't load_tests() a better way to provide the same functionality
3069
# without forcing a predefined TestScenarioApplier ? --vila 080215
3071
loader = TestUtil.TestLoader()
3073
suite = loader.suiteClass()
3075
adapter = TestScenarioApplier()
3076
adapter.scenarios = list(scenario_iter)
3077
adapt_modules(module_name_list, adapter, loader, suite)
3081
3162
def multiply_scenarios(scenarios_left, scenarios_right):
3082
3163
"""Multiply two sets of scenarios.
3092
3173
for right_name, right_dict in scenarios_right]
3096
def adapt_modules(mods_list, adapter, loader, suite):
3097
"""Adapt the modules in mods_list using adapter and add to suite."""
3098
tests = loader.loadTestsFromModuleNames(mods_list)
3099
adapt_tests(tests, adapter, suite)
3102
def adapt_tests(tests_list, adapter, suite):
3103
"""Adapt the tests in tests_list using adapter and add to suite."""
3104
for test in iter_suite_tests(tests_list):
3105
suite.addTests(adapter.adapt(test))
3176
def multiply_tests(tests, scenarios, result):
3177
"""Multiply tests_list by scenarios into result.
3179
This is the core workhorse for test parameterisation.
3181
Typically the load_tests() method for a per-implementation test suite will
3182
call multiply_tests and return the result.
3184
:param tests: The tests to parameterise.
3185
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3186
scenario_param_dict).
3187
:param result: A TestSuite to add created tests to.
3189
This returns the passed in result TestSuite with the cross product of all
3190
the tests repeated once for each scenario. Each test is adapted by adding
3191
the scenario name at the end of its id(), and updating the test object's
3192
__dict__ with the scenario_param_dict.
3194
>>> r = multiply_tests(
3195
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3196
... [('one', dict(param=1)),
3197
... ('two', dict(param=2))],
3199
>>> tests = list(iter_suite_tests(r))
3203
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3209
for test in iter_suite_tests(tests):
3210
apply_scenarios(test, scenarios, result)
3214
def apply_scenarios(test, scenarios, result):
3215
"""Apply the scenarios in scenarios to test and add to result.
3217
:param test: The test to apply scenarios to.
3218
:param scenarios: An iterable of scenarios to apply to test.
3220
:seealso: apply_scenario
3222
for scenario in scenarios:
3223
result.addTest(apply_scenario(test, scenario))
3227
def apply_scenario(test, scenario):
3228
"""Copy test and apply scenario to it.
3230
:param test: A test to adapt.
3231
:param scenario: A tuple describing the scenarion.
3232
The first element of the tuple is the new test id.
3233
The second element is a dict containing attributes to set on the
3235
:return: The adapted test.
3237
new_id = "%s(%s)" % (test.id(), scenario[0])
3238
new_test = clone_test(test, new_id)
3239
for name, value in scenario[1].items():
3240
setattr(new_test, name, value)
3244
def clone_test(test, new_id):
3245
"""Clone a test giving it a new id.
3247
:param test: The test to clone.
3248
:param new_id: The id to assign to it.
3249
:return: The new test.
3251
from copy import deepcopy
3252
new_test = deepcopy(test)
3253
new_test.id = lambda: new_id
3108
3257
def _rmtree_temp_dir(dirname):
3213
3362
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3216
class TestScenarioApplier(object):
3217
"""A tool to apply scenarios to tests."""
3219
def adapt(self, test):
3220
"""Return a TestSuite containing a copy of test for each scenario."""
3221
result = unittest.TestSuite()
3222
for scenario in self.scenarios:
3223
result.addTest(self.adapt_test_to_scenario(test, scenario))
3226
def adapt_test_to_scenario(self, test, scenario):
3227
"""Copy test and apply scenario to it.
3229
:param test: A test to adapt.
3230
:param scenario: A tuple describing the scenarion.
3231
The first element of the tuple is the new test id.
3232
The second element is a dict containing attributes to set on the
3234
:return: The adapted test.
3236
from copy import deepcopy
3237
new_test = deepcopy(test)
3238
for name, value in scenario[1].items():
3239
setattr(new_test, name, value)
3240
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3241
new_test.id = lambda: new_id
3245
3365
def probe_unicode_in_user_encoding():
3246
3366
"""Try to encode several unicode strings to use in unicode-aware tests.
3247
3367
Return first successfull match.