97
88
from bzrlib.symbol_versioning import (
98
89
DEPRECATED_PARAMETER,
99
90
deprecated_function,
101
91
deprecated_method,
102
92
deprecated_passed,
104
94
import bzrlib.trace
105
from bzrlib.transport import (
95
from bzrlib.transport import get_transport, pathfilter
110
96
import bzrlib.transport
97
from bzrlib.transport.local import LocalURLServer
98
from bzrlib.transport.memory import MemoryServer
99
from bzrlib.transport.readonly import ReadonlyServer
111
100
from bzrlib.trace import mutter, note
112
from bzrlib.tests import (
101
from bzrlib.tests import TestUtil
117
102
from bzrlib.tests.http_server import HttpServer
118
103
from bzrlib.tests.TestUtil import (
107
from bzrlib.tests.treeshape import build_tree_contents
122
108
from bzrlib.ui import NullProgressView
123
109
from bzrlib.ui.text import TextUIFactory
124
110
import bzrlib.version_info_formats.format_custom
247
228
'%d non-main threads were left active in the end.\n'
248
229
% (TestCase._active_threads - 1))
250
def getDescription(self, test):
253
def _extractBenchmarkTime(self, testCase, details=None):
231
def _extractBenchmarkTime(self, testCase):
254
232
"""Add a benchmark time for the current test case."""
255
if details and 'benchtime' in details:
256
return float(''.join(details['benchtime'].iter_bytes()))
257
233
return getattr(testCase, "_benchtime", None)
259
235
def _elapsedTestTimeString(self):
345
321
self._cleanupLogFile(test)
347
def addSuccess(self, test, details=None):
323
def addSuccess(self, test):
348
324
"""Tell result that test completed successfully.
350
326
Called from the TestCase run()
352
328
if self._bench_history is not None:
353
benchmark_time = self._extractBenchmarkTime(test, details)
329
benchmark_time = self._extractBenchmarkTime(test)
354
330
if benchmark_time is not None:
355
331
self._bench_history.write("%s %s\n" % (
356
332
self._formatTime(benchmark_time),
386
362
self.not_applicable_count += 1
387
363
self.report_not_applicable(test, reason)
365
def printErrorList(self, flavour, errors):
366
for test, err in errors:
367
self.stream.writeln(self.separator1)
368
self.stream.write("%s: " % flavour)
369
self.stream.writeln(self.getDescription(test))
370
if getattr(test, '_get_log', None) is not None:
371
log_contents = test._get_log()
373
self.stream.write('\n')
375
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
376
self.stream.write('\n')
377
self.stream.write(log_contents)
378
self.stream.write('\n')
380
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
381
self.stream.write('\n')
382
self.stream.writeln(self.separator2)
383
self.stream.writeln("%s" % err)
389
385
def _post_mortem(self):
390
386
"""Start a PDB post mortem session."""
391
387
if os.environ.get('BZR_TEST_PDB', None):
490
487
return self._shortened_test_description(test)
492
489
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
490
ui.ui_factory.note('ERROR: %s\n %s\n' % (
494
491
self._test_description(test),
498
495
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
496
ui.ui_factory.note('FAIL: %s\n %s\n' % (
500
497
self._test_description(test),
504
501
def report_known_failure(self, test, err):
502
ui.ui_factory.note('XFAIL: %s\n%s\n' % (
503
self._test_description(test), err[1]))
507
505
def report_skip(self, test, reason):
535
533
def report_test_start(self, test):
537
535
name = self._shortened_test_description(test)
538
width = osutils.terminal_width()
539
if width is not None:
540
# width needs space for 6 char status, plus 1 for slash, plus an
541
# 11-char time string, plus a trailing blank
542
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
544
self.stream.write(self._ellipsize_to_right(name, width-18))
546
self.stream.write(name)
536
# width needs space for 6 char status, plus 1 for slash, plus an
537
# 11-char time string, plus a trailing blank
538
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
539
self.stream.write(self._ellipsize_to_right(name,
540
osutils.terminal_width()-18))
547
541
self.stream.flush()
549
543
def _error_summary(self, err):
606
600
applied left to right - the first element in the list is the
607
601
innermost decorator.
609
# stream may know claim to know to write unicode strings, but in older
610
# pythons this goes sufficiently wrong that it is a bad idea. (
611
# specifically a built in file with encoding 'UTF-8' will still try
612
# to encode using ascii.
613
new_encoding = osutils.get_terminal_encoding()
614
codec = codecs.lookup(new_encoding)
615
if type(codec) is tuple:
619
encode = codec.encode
620
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
621
stream.encoding = new_encoding
622
603
self.stream = unittest._WritelnDecorator(stream)
623
604
self.descriptions = descriptions
624
605
self.verbosity = verbosity
682
# traceback._some_str fails to format exceptions that have the default
683
# __str__ which does an implicit ascii conversion. However, repr() on those
684
# objects works, for all that its not quite what the doctor may have ordered.
685
def _clever_some_str(value):
690
return repr(value).replace('\\n', '\n')
692
return '<unprintable %s object>' % type(value).__name__
694
traceback._some_str = _clever_some_str
697
# deprecated - use self.knownFailure(), or self.expectFailure.
698
KnownFailure = testtools.testcase._ExpectedFailure
672
class KnownFailure(AssertionError):
673
"""Indicates that a test failed in a precisely expected manner.
675
Such failures dont block the whole test suite from passing because they are
676
indicators of partially completed code or of future work. We have an
677
explicit error for them so that we can ensure that they are always visible:
678
KnownFailures are always shown in the output of bzr selftest.
701
682
class UnavailableFeature(Exception):
798
783
_leaking_threads_tests = 0
799
784
_first_thread_leaker_id = None
800
785
_log_file_name = None
787
_keep_log_file = False
801
788
# record lsprof data when performing benchmark calls.
802
789
_gather_lsprof_in_benchmarks = False
790
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
791
'_log_contents', '_log_file_name', '_benchtime',
792
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
804
794
def __init__(self, methodName='testMethod'):
805
795
super(TestCase, self).__init__(methodName)
806
796
self._cleanups = []
797
self._bzr_test_setUp_run = False
798
self._bzr_test_tearDown_run = False
807
799
self._directory_isolation = True
808
self.exception_handlers.insert(0,
809
(UnavailableFeature, self._do_unsupported_or_skip))
810
self.exception_handlers.insert(0,
811
(TestNotApplicable, self._do_not_applicable))
814
super(TestCase, self).setUp()
815
for feature in getattr(self, '_test_needs_features', []):
816
self.requireFeature(feature)
817
self._log_contents = None
818
self.addDetail("log", content.Content(content.ContentType("text",
819
"plain", {"charset": "utf8"}),
820
lambda:[self._get_log(keep_log_file=True)]))
802
unittest.TestCase.setUp(self)
803
self._bzr_test_setUp_run = True
821
804
self._cleanEnvironment()
822
805
self._silenceUI()
823
806
self._startLogFile()
856
839
Tests that want to use debug flags can just set them in the
857
840
debug_flags set during setup/teardown.
859
# Start with a copy of the current debug flags we can safely modify.
860
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
842
self._preserved_debug_flags = set(debug.debug_flags)
861
843
if 'allow_debug' not in selftest_debug_flags:
862
844
debug.debug_flags.clear()
863
845
if 'disable_lock_checks' not in selftest_debug_flags:
864
846
debug.debug_flags.add('strict_locks')
847
self.addCleanup(self._restore_debug_flags)
866
849
def _clear_hooks(self):
867
850
# prevent hooks affecting tests
1022
1009
server's urls to be used.
1024
1011
if backing_server is None:
1025
transport_server.start_server()
1012
transport_server.setUp()
1027
transport_server.start_server(backing_server)
1028
self.addCleanup(transport_server.stop_server)
1014
transport_server.setUp(backing_server)
1015
self.addCleanup(transport_server.tearDown)
1029
1016
# Obtain a real transport because if the server supplies a password, it
1030
1017
# will be hidden from the base on the client side.
1031
1018
t = get_transport(transport_server.get_url())
1045
1032
if t.base.endswith('/work/'):
1046
1033
# we have safety net/test root/work
1047
1034
t = t.clone('../..')
1048
elif isinstance(transport_server,
1049
test_server.SmartTCPServer_for_testing):
1035
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1050
1036
# The smart server adds a path similar to work, which is traversed
1051
1037
# up from by the client. But the server is chrooted - the actual
1052
1038
# backing transport is not escaped from, and VFS requests to the
1129
1115
:raises AssertionError: If the expected and actual stat values differ
1130
1116
other than by atime.
1132
self.assertEqual(expected.st_size, actual.st_size,
1133
'st_size did not match')
1134
self.assertEqual(expected.st_mtime, actual.st_mtime,
1135
'st_mtime did not match')
1136
self.assertEqual(expected.st_ctime, actual.st_ctime,
1137
'st_ctime did not match')
1138
if sys.platform != 'win32':
1139
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1140
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1143
self.assertEqual(expected.st_dev, actual.st_dev,
1144
'st_dev did not match')
1145
self.assertEqual(expected.st_ino, actual.st_ino,
1146
'st_ino did not match')
1147
self.assertEqual(expected.st_mode, actual.st_mode,
1148
'st_mode did not match')
1118
self.assertEqual(expected.st_size, actual.st_size)
1119
self.assertEqual(expected.st_mtime, actual.st_mtime)
1120
self.assertEqual(expected.st_ctime, actual.st_ctime)
1121
self.assertEqual(expected.st_dev, actual.st_dev)
1122
self.assertEqual(expected.st_ino, actual.st_ino)
1123
self.assertEqual(expected.st_mode, actual.st_mode)
1150
1125
def assertLength(self, length, obj_with_len):
1151
1126
"""Assert that obj_with_len is of length length."""
1303
1274
m += ": " + msg
1277
def expectFailure(self, reason, assertion, *args, **kwargs):
1278
"""Invoke a test, expecting it to fail for the given reason.
1280
This is for assertions that ought to succeed, but currently fail.
1281
(The failure is *expected* but not *wanted*.) Please be very precise
1282
about the failure you're expecting. If a new bug is introduced,
1283
AssertionError should be raised, not KnownFailure.
1285
Frequently, expectFailure should be followed by an opposite assertion.
1288
Intended to be used with a callable that raises AssertionError as the
1289
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1291
Raises KnownFailure if the test fails. Raises AssertionError if the
1296
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1298
self.assertEqual(42, dynamic_val)
1300
This means that a dynamic_val of 54 will cause the test to raise
1301
a KnownFailure. Once math is fixed and the expectFailure is removed,
1302
only a dynamic_val of 42 will allow the test to pass. Anything other
1303
than 54 or 42 will cause an AssertionError.
1306
assertion(*args, **kwargs)
1307
except AssertionError:
1308
raise KnownFailure(reason)
1310
self.fail('Unexpected success. Should have failed: %s' % reason)
1306
1312
def assertFileEqual(self, content, path):
1307
1313
"""Fail if path does not contain 'content'."""
1308
1314
self.failUnlessExists(path)
1314
1320
self.assertEqualDiff(content, s)
1316
def assertDocstring(self, expected_docstring, obj):
1317
"""Fail if obj does not have expected_docstring"""
1319
# With -OO the docstring should be None instead
1320
self.assertIs(obj.__doc__, None)
1322
self.assertEqual(expected_docstring, obj.__doc__)
1324
1322
def failUnlessExists(self, path):
1325
1323
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1324
if not isinstance(path, basestring):
1467
1465
Close the file and delete it, unless setKeepLogfile was called.
1469
if bzrlib.trace._trace_file:
1470
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1467
if self._log_file is None:
1472
1469
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1470
self._log_file.close()
1471
self._log_file = None
1472
if not self._keep_log_file:
1473
os.remove(self._log_file_name)
1474
self._log_file_name = None
1476
def setKeepLogfile(self):
1477
"""Make the logfile not be deleted when _finishLogFile is called."""
1478
self._keep_log_file = True
1476
1480
def thisFailsStrictLockCheck(self):
1477
1481
"""It is known that this test would fail with -Dstrict_locks.
1495
1499
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
1516
1501
def _cleanEnvironment(self):
1518
1503
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1556
1537
'ftp_proxy': None,
1557
1538
'FTP_PROXY': None,
1558
1539
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
1542
self.addCleanup(self._restoreEnvironment)
1567
1543
for name, value in new_env.iteritems():
1568
1544
self._captureVar(name, value)
1570
1546
def _captureVar(self, name, newvalue):
1571
1547
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1548
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1550
def _restore_debug_flags(self):
1551
debug.debug_flags.clear()
1552
debug.debug_flags.update(self._preserved_debug_flags)
1574
1554
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1555
for name, value in self.__old_env.iteritems():
1576
1556
osutils.set_or_unset_env(name, value)
1578
1558
def _restoreHooks(self):
1612
1590
self._do_skip(result, reason)
1615
def _do_unsupported_or_skip(self, result, e):
1592
def _do_unsupported_or_skip(self, result, reason):
1617
1593
addNotSupported = getattr(result, 'addNotSupported', None)
1618
1594
if addNotSupported is not None:
1619
1595
result.addNotSupported(self, reason)
1621
1597
self._do_skip(result, reason)
1599
def run(self, result=None):
1600
if result is None: result = self.defaultTestResult()
1601
result.startTest(self)
1606
result.stopTest(self)
1608
def _run(self, result):
1609
for feature in getattr(self, '_test_needs_features', []):
1610
if not feature.available():
1611
return self._do_unsupported_or_skip(result, feature)
1613
absent_attr = object()
1615
method_name = getattr(self, '_testMethodName', absent_attr)
1616
if method_name is absent_attr:
1618
method_name = getattr(self, '_TestCase__testMethodName')
1619
testMethod = getattr(self, method_name)
1623
if not self._bzr_test_setUp_run:
1625
"test setUp did not invoke "
1626
"bzrlib.tests.TestCase's setUp")
1627
except KeyboardInterrupt:
1630
except KnownFailure:
1631
self._do_known_failure(result)
1634
except TestNotApplicable, e:
1635
self._do_not_applicable(result, e)
1638
except TestSkipped, e:
1639
self._do_skip(result, e.args[0])
1642
except UnavailableFeature, e:
1643
self._do_unsupported_or_skip(result, e.args[0])
1647
result.addError(self, sys.exc_info())
1655
except KnownFailure:
1656
self._do_known_failure(result)
1657
except self.failureException:
1658
result.addFailure(self, sys.exc_info())
1659
except TestNotApplicable, e:
1660
self._do_not_applicable(result, e)
1661
except TestSkipped, e:
1663
reason = "No reason given."
1666
self._do_skip(result, reason)
1667
except UnavailableFeature, e:
1668
self._do_unsupported_or_skip(result, e.args[0])
1669
except KeyboardInterrupt:
1673
result.addError(self, sys.exc_info())
1677
if not self._bzr_test_tearDown_run:
1679
"test tearDown did not invoke "
1680
"bzrlib.tests.TestCase's tearDown")
1681
except KeyboardInterrupt:
1685
result.addError(self, sys.exc_info())
1688
if ok: result.addSuccess(self)
1690
except KeyboardInterrupt:
1695
for attr_name in self.attrs_to_keep:
1696
if attr_name in self.__dict__:
1697
saved_attrs[attr_name] = self.__dict__[attr_name]
1698
self.__dict__ = saved_attrs
1702
self._log_contents = ''
1703
self._bzr_test_tearDown_run = True
1704
unittest.TestCase.tearDown(self)
1623
1706
def time(self, callable, *args, **kwargs):
1624
1707
"""Run callable and accrue the time it takes to the benchmark time.
1645
1726
self._benchtime += time.time() - start
1728
def _runCleanups(self):
1729
"""Run registered cleanup functions.
1731
This should only be called from TestCase.tearDown.
1733
# TODO: Perhaps this should keep running cleanups even if
1734
# one of them fails?
1736
# Actually pop the cleanups from the list so tearDown running
1737
# twice is safe (this happens for skipped tests).
1738
while self._cleanups:
1739
cleanup, args, kwargs = self._cleanups.pop()
1740
cleanup(*args, **kwargs)
1647
1742
def log(self, *args):
1650
1745
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1746
"""Get the log from bzrlib.trace calls from this test.
1656
1748
:param keep_log_file: When True, if the log is still a file on disk
1657
1749
leave it as a file on disk. When False, if the log is still a file
1659
1751
self._log_contents.
1660
1752
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1754
# flush the log file, to get all content
1756
if bzrlib.trace._trace_file:
1757
bzrlib.trace._trace_file.flush()
1758
if self._log_contents:
1759
# XXX: this can hardly contain the content flushed above --vila
1668
1761
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
1762
if self._log_file_name is not None:
1674
1763
logfile = open(self._log_file_name)
1676
1765
log_contents = logfile.read()
1678
1767
logfile.close()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
1768
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
1769
self._log_contents = log_contents
1717
1771
os.remove(self._log_file_name)
1779
1825
os.chdir(working_dir)
1783
result = self.apply_redirected(ui.ui_factory.stdin,
1785
bzrlib.commands.run_bzr_catch_user_errors,
1787
except KeyboardInterrupt:
1788
# Reraise KeyboardInterrupt with contents of redirected stdout
1789
# and stderr as arguments, for tests which are interested in
1790
# stdout and stderr and are expecting the exception.
1791
out = stdout.getvalue()
1792
err = stderr.getvalue()
1794
self.log('output:\n%r', out)
1796
self.log('errors:\n%r', err)
1797
raise KeyboardInterrupt(out, err)
1828
result = self.apply_redirected(ui.ui_factory.stdin,
1830
bzrlib.commands.run_bzr_catch_user_errors,
1799
1833
logger.removeHandler(handler)
1800
1834
ui.ui_factory = old_ui_factory
2101
2135
Tests that expect to provoke LockContention errors should call this.
2103
self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2137
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2139
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2140
self.addCleanup(resetTimeout)
2141
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2105
2143
def make_utf8_encoded_stringio(self, encoding_type=None):
2106
2144
"""Return a StringIOWrapper instance, that will encode Unicode
2424
2464
return branchbuilder.BranchBuilder(branch=branch)
2426
2466
def overrideEnvironmentForTesting(self):
2427
test_home_dir = self.test_home_dir
2428
if isinstance(test_home_dir, unicode):
2429
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2467
os.environ['HOME'] = self.test_home_dir
2468
os.environ['BZR_HOME'] = self.test_home_dir
2433
2470
def setUp(self):
2434
2471
super(TestCaseWithMemoryTransport, self).setUp()
2435
2472
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2473
_currentdir = os.getcwdu()
2474
def _leaveDirectory():
2475
os.chdir(_currentdir)
2476
self.addCleanup(_leaveDirectory)
2437
2477
self.makeAndChdirToTestDir()
2438
2478
self.overrideEnvironmentForTesting()
2439
2479
self.__readonly_server = None
3295
3321
if not os.path.isfile(bzr_path):
3296
3322
# We are probably installed. Assume sys.argv is the right file
3297
3323
bzr_path = sys.argv[0]
3298
bzr_path = [bzr_path]
3299
if sys.platform == "win32":
3300
# if we're on windows, we can't execute the bzr script directly
3301
bzr_path = [sys.executable] + bzr_path
3302
3324
fd, test_list_file_name = tempfile.mkstemp()
3303
3325
test_list_file = os.fdopen(fd, 'wb', 1)
3304
3326
for test in process_tests:
3305
3327
test_list_file.write(test.id() + '\n')
3306
3328
test_list_file.close()
3308
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3330
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3310
3332
if '--no-plugins' in sys.argv:
3311
3333
argv.append('--no-plugins')
3351
3373
def addFailure(self, test, err):
3352
3374
self.result.addFailure(test, err)
3353
ForwardingResult = testtools.ExtendedToOriginalDecorator
3377
class BZRTransformingResult(ForwardingResult):
3379
def addError(self, test, err):
3380
feature = self._error_looks_like('UnavailableFeature: ', err)
3381
if feature is not None:
3382
self.result.addNotSupported(test, feature)
3384
self.result.addError(test, err)
3386
def addFailure(self, test, err):
3387
known = self._error_looks_like('KnownFailure: ', err)
3388
if known is not None:
3389
self.result.addExpectedFailure(test,
3390
[KnownFailure, KnownFailure(known), None])
3392
self.result.addFailure(test, err)
3394
def _error_looks_like(self, prefix, err):
3395
"""Deserialize exception and returns the stringify value."""
3399
if isinstance(exc, subunit.RemoteException):
3400
# stringify the exception gives access to the remote traceback
3401
# We search the last line for 'prefix'
3402
lines = str(exc).split('\n')
3403
while lines and not lines[-1]:
3406
if lines[-1].startswith(prefix):
3407
value = lines[-1][len(prefix):]
3412
from subunit.test_results import AutoTimingTestResultDecorator
3413
# Expected failure should be seen as a success not a failure Once subunit
3414
# provide native support for that, BZRTransformingResult and this class
3415
# will become useless.
3416
class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
3418
def addExpectedFailure(self, test, err):
3419
self._before_event()
3420
return self._call_maybe("addExpectedFailure", self._degrade_skip,
3423
# Let's just define a no-op decorator
3424
BzrAutoTimingTestResultDecorator = lambda x:x
3356
3427
class ProfileResult(ForwardingResult):
3617
3688
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3618
3689
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3620
# Obvious highest levels prefixes, feel free to add your own via a plugin
3691
# Obvious higest levels prefixes, feel free to add your own via a plugin
3621
3692
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3622
3693
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3623
3694
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3633
3704
'bzrlib.tests.commands',
3634
3705
'bzrlib.tests.per_branch',
3635
3706
'bzrlib.tests.per_bzrdir',
3636
'bzrlib.tests.per_bzrdir_colo',
3637
3707
'bzrlib.tests.per_foreign_vcs',
3638
3708
'bzrlib.tests.per_interrepository',
3639
3709
'bzrlib.tests.per_intertree',
3640
3710
'bzrlib.tests.per_inventory',
3641
3711
'bzrlib.tests.per_interbranch',
3642
3712
'bzrlib.tests.per_lock',
3643
'bzrlib.tests.per_merger',
3644
3713
'bzrlib.tests.per_transport',
3645
3714
'bzrlib.tests.per_tree',
3646
3715
'bzrlib.tests.per_pack_repository',
4038
4101
return new_test
4041
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4043
"""Helper for permutating tests against an extension module.
4045
This is meant to be used inside a modules 'load_tests()' function. It will
4046
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4047
against both implementations. Setting 'test.module' to the appropriate
4048
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4050
:param standard_tests: A test suite to permute
4051
:param loader: A TestLoader
4052
:param py_module_name: The python path to a python module that can always
4053
be loaded, and will be considered the 'python' implementation. (eg
4054
'bzrlib._chk_map_py')
4055
:param ext_module_name: The python path to an extension module. If the
4056
module cannot be loaded, a single test will be added, which notes that
4057
the module is not available. If it can be loaded, all standard_tests
4058
will be run against that module.
4059
:return: (suite, feature) suite is a test-suite that has all the permuted
4060
tests. feature is the Feature object that can be used to determine if
4061
the module is available.
4064
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4066
('python', {'module': py_module}),
4068
suite = loader.suiteClass()
4069
feature = ModuleAvailableFeature(ext_module_name)
4070
if feature.available():
4071
scenarios.append(('C', {'module': feature.module}))
4073
# the compiled module isn't available, so we add a failing test
4074
class FailWithoutFeature(TestCase):
4075
def test_fail(self):
4076
self.requireFeature(feature)
4077
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4078
result = multiply_tests(standard_tests, scenarios, suite)
4079
return result, feature
4082
def _rmtree_temp_dir(dirname, test_id=None):
4104
def _rmtree_temp_dir(dirname):
4083
4105
# If LANG=C we probably have created some bogus paths
4084
4106
# which rmtree(unicode) will fail to delete
4085
4107
# so make sure we are using rmtree(str) to delete everything
4189
4208
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4192
class _CompatabilityThunkFeature(Feature):
4193
"""This feature is just a thunk to another feature.
4195
It issues a deprecation warning if it is accessed, to let you know that you
4196
should really use a different feature.
4199
def __init__(self, dep_version, module, name,
4200
replacement_name, replacement_module=None):
4201
super(_CompatabilityThunkFeature, self).__init__()
4202
self._module = module
4203
if replacement_module is None:
4204
replacement_module = module
4205
self._replacement_module = replacement_module
4207
self._replacement_name = replacement_name
4208
self._dep_version = dep_version
4209
self._feature = None
4212
if self._feature is None:
4213
depr_msg = self._dep_version % ('%s.%s'
4214
% (self._module, self._name))
4215
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4216
self._replacement_name)
4217
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4218
# Import the new feature and use it as a replacement for the
4220
mod = __import__(self._replacement_module, {}, {},
4221
[self._replacement_name])
4222
self._feature = getattr(mod, self._replacement_name)
4226
return self._feature._probe()
4229
class ModuleAvailableFeature(Feature):
4230
"""This is a feature than describes a module we want to be available.
4232
Declare the name of the module in __init__(), and then after probing, the
4233
module will be available as 'self.module'.
4235
:ivar module: The module if it is available, else None.
4238
def __init__(self, module_name):
4239
super(ModuleAvailableFeature, self).__init__()
4240
self.module_name = module_name
4244
self._module = __import__(self.module_name, {}, {}, [''])
4251
if self.available(): # Make sure the probe has been done
4255
def feature_name(self):
4256
return self.module_name
4259
# This is kept here for compatibility, it is recommended to use
4260
# 'bzrlib.tests.feature.paramiko' instead
4261
ParamikoFeature = _CompatabilityThunkFeature(
4262
deprecated_in((2,1,0)),
4263
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4266
4211
def probe_unicode_in_user_encoding():
4267
4212
"""Try to encode several unicode strings to use in unicode-aware tests.
4268
4213
Return first successfull match.
4427
4387
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4430
class _CaseSensitiveFilesystemFeature(Feature):
4390
class _SubUnitFeature(Feature):
4391
"""Check if subunit is available."""
4432
4393
def _probe(self):
4433
if CaseInsCasePresFilenameFeature.available():
4435
elif CaseInsensitiveFilesystemFeature.available():
4440
4400
def feature_name(self):
4441
return 'case-sensitive filesystem'
4443
# new coding style is for feature instances to be lowercase
4444
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4447
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4448
SubUnitFeature = _CompatabilityThunkFeature(
4449
deprecated_in((2,1,0)),
4450
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4403
SubUnitFeature = _SubUnitFeature()
4451
4404
# Only define SubUnitBzrRunner if subunit is available.
4453
4406
from subunit import TestProtocolClient
4454
from subunit.test_results import AutoTimingTestResultDecorator
4455
4407
class SubUnitBzrRunner(TextTestRunner):
4456
4408
def run(self, test):
4457
result = AutoTimingTestResultDecorator(
4409
result = BzrAutoTimingTestResultDecorator(
4458
4410
TestProtocolClient(self.stream))
4459
4411
test.run(result)
4461
4413
except ImportError:
4464
class _PosixPermissionsFeature(Feature):
4468
# create temporary file and check if specified perms are maintained.
4471
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4472
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4475
os.chmod(name, write_perms)
4477
read_perms = os.stat(name).st_mode & 0777
4479
return (write_perms == read_perms)
4481
return (os.name == 'posix') and has_perms()
4483
def feature_name(self):
4484
return 'POSIX permissions support'
4486
posix_permissions_feature = _PosixPermissionsFeature()