228
237
'%d non-main threads were left active in the end.\n'
229
238
% (TestCase._active_threads - 1))
231
def _extractBenchmarkTime(self, testCase):
240
def _extractBenchmarkTime(self, testCase, details=None):
232
241
"""Add a benchmark time for the current test case."""
242
if details and 'benchtime' in details:
243
return float(''.join(details['benchtime'].iter_bytes()))
233
244
return getattr(testCase, "_benchtime", None)
235
246
def _elapsedTestTimeString(self):
321
332
self._cleanupLogFile(test)
323
def addSuccess(self, test):
334
def addSuccess(self, test, details=None):
324
335
"""Tell result that test completed successfully.
326
337
Called from the TestCase run()
328
339
if self._bench_history is not None:
329
benchmark_time = self._extractBenchmarkTime(test)
340
benchmark_time = self._extractBenchmarkTime(test, details)
330
341
if benchmark_time is not None:
331
342
self._bench_history.write("%s %s\n" % (
332
343
self._formatTime(benchmark_time),
362
373
self.not_applicable_count += 1
363
374
self.report_not_applicable(test, reason)
365
def printErrorList(self, flavour, errors):
366
for test, err in errors:
367
self.stream.writeln(self.separator1)
368
self.stream.write("%s: " % flavour)
369
self.stream.writeln(self.getDescription(test))
370
if getattr(test, '_get_log', None) is not None:
371
log_contents = test._get_log()
373
self.stream.write('\n')
375
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
376
self.stream.write('\n')
377
self.stream.write(log_contents)
378
self.stream.write('\n')
380
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
381
self.stream.write('\n')
382
self.stream.writeln(self.separator2)
383
self.stream.writeln("%s" % err)
385
376
def _post_mortem(self):
386
377
"""Start a PDB post mortem session."""
387
378
if os.environ.get('BZR_TEST_PDB', None):
604
593
applied left to right - the first element in the list is the
605
594
innermost decorator.
596
# stream may know claim to know to write unicode strings, but in older
597
# pythons this goes sufficiently wrong that it is a bad idea. (
598
# specifically a built in file with encoding 'UTF-8' will still try
599
# to encode using ascii.
600
new_encoding = osutils.get_terminal_encoding()
601
codec = codecs.lookup(new_encoding)
602
if type(codec) is tuple:
606
encode = codec.encode
607
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
608
stream.encoding = new_encoding
607
609
self.stream = unittest._WritelnDecorator(stream)
608
610
self.descriptions = descriptions
609
611
self.verbosity = verbosity
676
class KnownFailure(AssertionError):
677
"""Indicates that a test failed in a precisely expected manner.
679
Such failures dont block the whole test suite from passing because they are
680
indicators of partially completed code or of future work. We have an
681
explicit error for them so that we can ensure that they are always visible:
682
KnownFailures are always shown in the output of bzr selftest.
669
# traceback._some_str fails to format exceptions that have the default
670
# __str__ which does an implicit ascii conversion. However, repr() on those
671
# objects works, for all that its not quite what the doctor may have ordered.
672
def _clever_some_str(value):
677
return repr(value).replace('\\n', '\n')
679
return '<unprintable %s object>' % type(value).__name__
681
traceback._some_str = _clever_some_str
684
# deprecated - use self.knownFailure(), or self.expectFailure.
685
KnownFailure = testtools.testcase._ExpectedFailure
686
688
class UnavailableFeature(Exception):
787
789
_leaking_threads_tests = 0
788
790
_first_thread_leaker_id = None
789
791
_log_file_name = None
791
_keep_log_file = False
792
792
# record lsprof data when performing benchmark calls.
793
793
_gather_lsprof_in_benchmarks = False
794
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
795
'_log_contents', '_log_file_name', '_benchtime',
796
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
798
795
def __init__(self, methodName='testMethod'):
799
796
super(TestCase, self).__init__(methodName)
800
797
self._cleanups = []
801
self._bzr_test_setUp_run = False
802
self._bzr_test_tearDown_run = False
803
798
self._directory_isolation = True
799
self.exception_handlers.insert(0,
800
(UnavailableFeature, self._do_unsupported_or_skip))
801
self.exception_handlers.insert(0,
802
(TestNotApplicable, self._do_not_applicable))
806
unittest.TestCase.setUp(self)
807
self._bzr_test_setUp_run = True
805
super(TestCase, self).setUp()
806
for feature in getattr(self, '_test_needs_features', []):
807
self.requireFeature(feature)
808
self._log_contents = None
809
self.addDetail("log", content.Content(content.ContentType("text",
810
"plain", {"charset": "utf8"}),
811
lambda:[self._get_log(keep_log_file=True)]))
808
812
self._cleanEnvironment()
809
813
self._silenceUI()
810
814
self._startLogFile()
1013
1017
server's urls to be used.
1015
1019
if backing_server is None:
1016
transport_server.setUp()
1020
transport_server.start_server()
1018
transport_server.setUp(backing_server)
1019
self.addCleanup(transport_server.tearDown)
1022
transport_server.start_server(backing_server)
1023
self.addCleanup(transport_server.stop_server)
1020
1024
# Obtain a real transport because if the server supplies a password, it
1021
1025
# will be hidden from the base on the client side.
1022
1026
t = get_transport(transport_server.get_url())
1289
1293
m += ": " + msg
1292
def expectFailure(self, reason, assertion, *args, **kwargs):
1293
"""Invoke a test, expecting it to fail for the given reason.
1295
This is for assertions that ought to succeed, but currently fail.
1296
(The failure is *expected* but not *wanted*.) Please be very precise
1297
about the failure you're expecting. If a new bug is introduced,
1298
AssertionError should be raised, not KnownFailure.
1300
Frequently, expectFailure should be followed by an opposite assertion.
1303
Intended to be used with a callable that raises AssertionError as the
1304
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1306
Raises KnownFailure if the test fails. Raises AssertionError if the
1311
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1313
self.assertEqual(42, dynamic_val)
1315
This means that a dynamic_val of 54 will cause the test to raise
1316
a KnownFailure. Once math is fixed and the expectFailure is removed,
1317
only a dynamic_val of 42 will allow the test to pass. Anything other
1318
than 54 or 42 will cause an AssertionError.
1321
assertion(*args, **kwargs)
1322
except AssertionError:
1323
raise KnownFailure(reason)
1325
self.fail('Unexpected success. Should have failed: %s' % reason)
1327
1296
def assertFileEqual(self, content, path):
1328
1297
"""Fail if path does not contain 'content'."""
1329
1298
self.failUnlessExists(path)
1480
1449
Close the file and delete it, unless setKeepLogfile was called.
1482
if self._log_file is None:
1451
if bzrlib.trace._trace_file:
1452
# flush the log file, to get all content
1453
bzrlib.trace._trace_file.flush()
1484
1454
bzrlib.trace.pop_log_file(self._log_memento)
1485
self._log_file.close()
1486
self._log_file = None
1487
if not self._keep_log_file:
1488
os.remove(self._log_file_name)
1489
self._log_file_name = None
1491
def setKeepLogfile(self):
1492
"""Make the logfile not be deleted when _finishLogFile is called."""
1493
self._keep_log_file = True
1455
# Cache the log result and delete the file on disk
1456
self._get_log(False)
1495
1458
def thisFailsStrictLockCheck(self):
1496
1459
"""It is known that this test would fail with -Dstrict_locks.
1607
1572
self._do_skip(result, reason)
1609
def _do_unsupported_or_skip(self, result, reason):
1575
def _do_unsupported_or_skip(self, result, e):
1610
1577
addNotSupported = getattr(result, 'addNotSupported', None)
1611
1578
if addNotSupported is not None:
1612
1579
result.addNotSupported(self, reason)
1614
1581
self._do_skip(result, reason)
1616
def run(self, result=None):
1617
if result is None: result = self.defaultTestResult()
1618
result.startTest(self)
1623
result.stopTest(self)
1625
def _run(self, result):
1626
for feature in getattr(self, '_test_needs_features', []):
1627
if not feature.available():
1628
return self._do_unsupported_or_skip(result, feature)
1630
absent_attr = object()
1632
method_name = getattr(self, '_testMethodName', absent_attr)
1633
if method_name is absent_attr:
1635
method_name = getattr(self, '_TestCase__testMethodName')
1636
testMethod = getattr(self, method_name)
1640
if not self._bzr_test_setUp_run:
1642
"test setUp did not invoke "
1643
"bzrlib.tests.TestCase's setUp")
1644
except KeyboardInterrupt:
1647
except KnownFailure:
1648
self._do_known_failure(result)
1651
except TestNotApplicable, e:
1652
self._do_not_applicable(result, e)
1655
except TestSkipped, e:
1656
self._do_skip(result, e.args[0])
1659
except UnavailableFeature, e:
1660
self._do_unsupported_or_skip(result, e.args[0])
1664
result.addError(self, sys.exc_info())
1672
except KnownFailure:
1673
self._do_known_failure(result)
1674
except self.failureException:
1675
result.addFailure(self, sys.exc_info())
1676
except TestNotApplicable, e:
1677
self._do_not_applicable(result, e)
1678
except TestSkipped, e:
1680
reason = "No reason given."
1683
self._do_skip(result, reason)
1684
except UnavailableFeature, e:
1685
self._do_unsupported_or_skip(result, e.args[0])
1686
except KeyboardInterrupt:
1690
result.addError(self, sys.exc_info())
1694
if not self._bzr_test_tearDown_run:
1696
"test tearDown did not invoke "
1697
"bzrlib.tests.TestCase's tearDown")
1698
except KeyboardInterrupt:
1702
result.addError(self, sys.exc_info())
1705
if ok: result.addSuccess(self)
1707
except KeyboardInterrupt:
1712
for attr_name in self.attrs_to_keep:
1713
if attr_name in self.__dict__:
1714
saved_attrs[attr_name] = self.__dict__[attr_name]
1715
self.__dict__ = saved_attrs
1719
self._log_contents = ''
1720
self._bzr_test_tearDown_run = True
1721
unittest.TestCase.tearDown(self)
1723
1583
def time(self, callable, *args, **kwargs):
1724
1584
"""Run callable and accrue the time it takes to the benchmark time.
1743
1605
self._benchtime += time.time() - start
1745
def _runCleanups(self):
1746
"""Run registered cleanup functions.
1748
This should only be called from TestCase.tearDown.
1750
# TODO: Perhaps this should keep running cleanups even if
1751
# one of them fails?
1753
# Actually pop the cleanups from the list so tearDown running
1754
# twice is safe (this happens for skipped tests).
1755
while self._cleanups:
1756
cleanup, args, kwargs = self._cleanups.pop()
1757
cleanup(*args, **kwargs)
1759
1607
def log(self, *args):
1762
1610
def _get_log(self, keep_log_file=False):
1763
"""Get the log from bzrlib.trace calls from this test.
1611
"""Internal helper to get the log from bzrlib.trace for this test.
1613
Please use self.getDetails, or self.get_log to access this in test case
1765
1616
:param keep_log_file: When True, if the log is still a file on disk
1766
1617
leave it as a file on disk. When False, if the log is still a file
1768
1619
self._log_contents.
1769
1620
:return: A string containing the log.
1771
# flush the log file, to get all content
1622
if self._log_contents is not None:
1624
self._log_contents.decode('utf8')
1625
except UnicodeDecodeError:
1626
unicodestr = self._log_contents.decode('utf8', 'replace')
1627
self._log_contents = unicodestr.encode('utf8')
1628
return self._log_contents
1772
1629
import bzrlib.trace
1773
1630
if bzrlib.trace._trace_file:
1631
# flush the log file, to get all content
1774
1632
bzrlib.trace._trace_file.flush()
1775
if self._log_contents:
1776
# XXX: this can hardly contain the content flushed above --vila
1778
return self._log_contents
1779
1633
if self._log_file_name is not None:
1780
1634
logfile = open(self._log_file_name)
1782
1636
log_contents = logfile.read()
1784
1638
logfile.close()
1640
log_contents.decode('utf8')
1641
except UnicodeDecodeError:
1642
unicodestr = log_contents.decode('utf8', 'replace')
1643
log_contents = unicodestr.encode('utf8')
1785
1644
if not keep_log_file:
1645
self._log_file.close()
1646
self._log_file = None
1647
# Permit multiple calls to get_log until we clean it up in
1786
1649
self._log_contents = log_contents
1788
1651
os.remove(self._log_file_name)
3409
3281
def addFailure(self, test, err):
3410
3282
self.result.addFailure(test, err)
3413
class BZRTransformingResult(ForwardingResult):
3415
def addError(self, test, err):
3416
feature = self._error_looks_like('UnavailableFeature: ', err)
3417
if feature is not None:
3418
self.result.addNotSupported(test, feature)
3420
self.result.addError(test, err)
3422
def addFailure(self, test, err):
3423
known = self._error_looks_like('KnownFailure: ', err)
3424
if known is not None:
3425
self.result.addExpectedFailure(test,
3426
[KnownFailure, KnownFailure(known), None])
3428
self.result.addFailure(test, err)
3430
def _error_looks_like(self, prefix, err):
3431
"""Deserialize exception and returns the stringify value."""
3435
if isinstance(exc, subunit.RemoteException):
3436
# stringify the exception gives access to the remote traceback
3437
# We search the last line for 'prefix'
3438
lines = str(exc).split('\n')
3439
while lines and not lines[-1]:
3442
if lines[-1].startswith(prefix):
3443
value = lines[-1][len(prefix):]
3448
from subunit.test_results import AutoTimingTestResultDecorator
3449
# Expected failure should be seen as a success not a failure Once subunit
3450
# provide native support for that, BZRTransformingResult and this class
3451
# will become useless.
3452
class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
3454
def addExpectedFailure(self, test, err):
3455
self._before_event()
3456
return self._call_maybe("addExpectedFailure", self._degrade_skip,
3459
# Let's just define a no-op decorator
3460
BzrAutoTimingTestResultDecorator = lambda x:x
3283
ForwardingResult = testtools.ExtendedToOriginalDecorator
3463
3286
class ProfileResult(ForwardingResult):
4137
3961
return new_test
3964
def permute_tests_for_extension(standard_tests, loader, py_module_name,
3966
"""Helper for permutating tests against an extension module.
3968
This is meant to be used inside a modules 'load_tests()' function. It will
3969
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
3970
against both implementations. Setting 'test.module' to the appropriate
3971
module. See bzrlib.tests.test__chk_map.load_tests as an example.
3973
:param standard_tests: A test suite to permute
3974
:param loader: A TestLoader
3975
:param py_module_name: The python path to a python module that can always
3976
be loaded, and will be considered the 'python' implementation. (eg
3977
'bzrlib._chk_map_py')
3978
:param ext_module_name: The python path to an extension module. If the
3979
module cannot be loaded, a single test will be added, which notes that
3980
the module is not available. If it can be loaded, all standard_tests
3981
will be run against that module.
3982
:return: (suite, feature) suite is a test-suite that has all the permuted
3983
tests. feature is the Feature object that can be used to determine if
3984
the module is available.
3987
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
3989
('python', {'module': py_module}),
3991
suite = loader.suiteClass()
3992
feature = ModuleAvailableFeature(ext_module_name)
3993
if feature.available():
3994
scenarios.append(('C', {'module': feature.module}))
3996
# the compiled module isn't available, so we add a failing test
3997
class FailWithoutFeature(TestCase):
3998
def test_fail(self):
3999
self.requireFeature(feature)
4000
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4001
result = multiply_tests(standard_tests, scenarios, suite)
4002
return result, feature
4140
4005
def _rmtree_temp_dir(dirname, test_id=None):
4141
4006
# If LANG=C we probably have created some bogus paths
4142
4007
# which rmtree(unicode) will fail to delete
4247
4112
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4115
class _CompatabilityThunkFeature(Feature):
4116
"""This feature is just a thunk to another feature.
4118
It issues a deprecation warning if it is accessed, to let you know that you
4119
should really use a different feature.
4122
def __init__(self, module, name, this_name, dep_version):
4123
super(_CompatabilityThunkFeature, self).__init__()
4124
self._module = module
4126
self._this_name = this_name
4127
self._dep_version = dep_version
4128
self._feature = None
4131
if self._feature is None:
4132
msg = (self._dep_version % self._this_name) + (
4133
' Use %s.%s instead.' % (self._module, self._name))
4134
symbol_versioning.warn(msg, DeprecationWarning)
4135
mod = __import__(self._module, {}, {}, [self._name])
4136
self._feature = getattr(mod, self._name)
4140
return self._feature._probe()
4250
4143
class ModuleAvailableFeature(Feature):
4251
4144
"""This is a feature than describes a module we want to be available.
4459
4340
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4462
class _SubUnitFeature(Feature):
4463
"""Check if subunit is available."""
4472
def feature_name(self):
4475
SubUnitFeature = _SubUnitFeature()
4343
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4344
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4345
'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4476
4346
# Only define SubUnitBzrRunner if subunit is available.
4478
4348
from subunit import TestProtocolClient
4349
from subunit.test_results import AutoTimingTestResultDecorator
4479
4350
class SubUnitBzrRunner(TextTestRunner):
4480
4351
def run(self, test):
4481
result = BzrAutoTimingTestResultDecorator(
4352
result = AutoTimingTestResultDecorator(
4482
4353
TestProtocolClient(self.stream))
4483
4354
test.run(result)