322
292
Called from the TestCase run() method when the test
323
293
fails with an unexpected error.
326
unittest.TestResult.addError(self, test, err)
327
self.error_count += 1
328
self.report_error(test, err)
331
self._cleanupLogFile(test)
295
self._testConcluded(test)
296
if isinstance(err[1], TestNotApplicable):
297
return self._addNotApplicable(test, err)
298
elif isinstance(err[1], UnavailableFeature):
299
return self.addNotSupported(test, err[1].args[0])
301
unittest.TestResult.addError(self, test, err)
302
self.error_count += 1
303
self.report_error(test, err)
306
self._cleanupLogFile(test)
333
308
def addFailure(self, test, err):
334
309
"""Tell result that test failed.
336
311
Called from the TestCase run() method when the test
337
312
fails because e.g. an assert() method failed.
340
unittest.TestResult.addFailure(self, test, err)
341
self.failure_count += 1
342
self.report_failure(test, err)
345
self._cleanupLogFile(test)
314
self._testConcluded(test)
315
if isinstance(err[1], KnownFailure):
316
return self._addKnownFailure(test, err)
318
unittest.TestResult.addFailure(self, test, err)
319
self.failure_count += 1
320
self.report_failure(test, err)
323
self._cleanupLogFile(test)
347
def addSuccess(self, test, details=None):
325
def addSuccess(self, test):
348
326
"""Tell result that test completed successfully.
350
328
Called from the TestCase run()
330
self._testConcluded(test)
352
331
if self._bench_history is not None:
353
benchmark_time = self._extractBenchmarkTime(test, details)
332
benchmark_time = self._extractBenchmarkTime(test)
354
333
if benchmark_time is not None:
355
334
self._bench_history.write("%s %s\n" % (
356
335
self._formatTime(benchmark_time),
382
368
self.skip_count += 1
383
369
self.report_skip(test, reason)
385
def addNotApplicable(self, test, reason):
386
self.not_applicable_count += 1
387
self.report_not_applicable(test, reason)
371
def _addNotApplicable(self, test, skip_excinfo):
372
if isinstance(skip_excinfo[1], TestNotApplicable):
373
self.not_applicable_count += 1
374
self.report_not_applicable(test, skip_excinfo)
377
except KeyboardInterrupt:
380
self.addError(test, test.exc_info())
382
# seems best to treat this as success from point-of-view of unittest
383
# -- it actually does nothing so it barely matters :)
384
unittest.TestResult.addSuccess(self, test)
385
test._log_contents = ''
389
def _post_mortem(self):
390
"""Start a PDB post mortem session."""
391
if os.environ.get('BZR_TEST_PDB', None):
392
import pdb;pdb.post_mortem()
387
def printErrorList(self, flavour, errors):
388
for test, err in errors:
389
self.stream.writeln(self.separator1)
390
self.stream.write("%s: " % flavour)
391
self.stream.writeln(self.getDescription(test))
392
if getattr(test, '_get_log', None) is not None:
393
log_contents = test._get_log()
395
self.stream.write('\n')
397
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
398
self.stream.write('\n')
399
self.stream.write(log_contents)
400
self.stream.write('\n')
402
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
403
self.stream.write('\n')
404
self.stream.writeln(self.separator2)
405
self.stream.writeln("%s" % err)
394
407
def progress(self, offset, whence):
395
408
"""The test is adjusting the count of tests to run."""
490
504
return self._shortened_test_description(test)
492
506
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
507
self.pb.note('ERROR: %s\n %s\n',
494
508
self._test_description(test),
498
512
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
513
self.pb.note('FAIL: %s\n %s\n',
500
514
self._test_description(test),
504
518
def report_known_failure(self, test, err):
519
self.pb.note('XFAIL: %s\n%s\n',
520
self._test_description(test), err[1])
507
522
def report_skip(self, test, reason):
510
def report_not_applicable(self, test, reason):
525
def report_not_applicable(self, test, skip_excinfo):
513
528
def report_unsupported(self, test, feature):
682
# traceback._some_str fails to format exceptions that have the default
683
# __str__ which does an implicit ascii conversion. However, repr() on those
684
# objects works, for all that its not quite what the doctor may have ordered.
685
def _clever_some_str(value):
690
return repr(value).replace('\\n', '\n')
692
return '<unprintable %s object>' % type(value).__name__
694
traceback._some_str = _clever_some_str
697
# deprecated - use self.knownFailure(), or self.expectFailure.
698
KnownFailure = testtools.testcase._ExpectedFailure
690
class KnownFailure(AssertionError):
691
"""Indicates that a test failed in a precisely expected manner.
693
Such failures dont block the whole test suite from passing because they are
694
indicators of partially completed code or of future work. We have an
695
explicit error for them so that we can ensure that they are always visible:
696
KnownFailures are always shown in the output of bzr selftest.
701
700
class UnavailableFeature(Exception):
702
701
"""A feature required for this test was not available.
704
This can be considered a specialised form of SkippedTest.
706
703
The feature should be used to construct the exception.
707
class CommandFailed(Exception):
710
711
class StringIOWrapper(object):
711
712
"""A wrapper around cStringIO which just adds an encoding attribute.
798
799
_leaking_threads_tests = 0
799
800
_first_thread_leaker_id = None
800
801
_log_file_name = None
803
_keep_log_file = False
801
804
# record lsprof data when performing benchmark calls.
802
805
_gather_lsprof_in_benchmarks = False
806
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
807
'_log_contents', '_log_file_name', '_benchtime',
808
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
804
810
def __init__(self, methodName='testMethod'):
805
811
super(TestCase, self).__init__(methodName)
806
812
self._cleanups = []
807
self._directory_isolation = True
808
self.exception_handlers.insert(0,
809
(UnavailableFeature, self._do_unsupported_or_skip))
810
self.exception_handlers.insert(0,
811
(TestNotApplicable, self._do_not_applicable))
813
self._bzr_test_setUp_run = False
814
self._bzr_test_tearDown_run = False
814
super(TestCase, self).setUp()
815
for feature in getattr(self, '_test_needs_features', []):
816
self.requireFeature(feature)
817
self._log_contents = None
818
self.addDetail("log", content.Content(content.ContentType("text",
819
"plain", {"charset": "utf8"}),
820
lambda:[self._get_log(keep_log_file=True)]))
817
unittest.TestCase.setUp(self)
818
self._bzr_test_setUp_run = True
821
819
self._cleanEnvironment()
822
820
self._silenceUI()
823
821
self._startLogFile()
824
822
self._benchcalls = []
825
823
self._benchtime = None
826
824
self._clear_hooks()
827
self._track_transports()
828
825
self._track_locks()
829
826
self._clear_debug_flags()
830
827
TestCase._active_threads = threading.activeCount()
941
928
def _lock_broken(self, result):
942
929
self._lock_actions.append(('broken', result))
944
def permit_dir(self, name):
945
"""Permit a directory to be used by this test. See permit_url."""
946
name_transport = get_transport(name)
947
self.permit_url(name)
948
self.permit_url(name_transport.base)
950
def permit_url(self, url):
951
"""Declare that url is an ok url to use in this test.
953
Do this for memory transports, temporary test directory etc.
955
Do not do this for the current working directory, /tmp, or any other
956
preexisting non isolated url.
958
if not url.endswith('/'):
960
self._bzr_selftest_roots.append(url)
962
def permit_source_tree_branch_repo(self):
963
"""Permit the source tree bzr is running from to be opened.
965
Some code such as bzrlib.version attempts to read from the bzr branch
966
that bzr is executing from (if any). This method permits that directory
967
to be used in the test suite.
969
path = self.get_source_path()
970
self.record_directory_isolation()
973
workingtree.WorkingTree.open(path)
974
except (errors.NotBranchError, errors.NoWorkingTree):
977
self.enable_directory_isolation()
979
def _preopen_isolate_transport(self, transport):
980
"""Check that all transport openings are done in the test work area."""
981
while isinstance(transport, pathfilter.PathFilteringTransport):
982
# Unwrap pathfiltered transports
983
transport = transport.server.backing_transport.clone(
984
transport._filter('.'))
986
# ReadonlySmartTCPServer_for_testing decorates the backing transport
987
# urls it is given by prepending readonly+. This is appropriate as the
988
# client shouldn't know that the server is readonly (or not readonly).
989
# We could register all servers twice, with readonly+ prepending, but
990
# that makes for a long list; this is about the same but easier to
992
if url.startswith('readonly+'):
993
url = url[len('readonly+'):]
994
self._preopen_isolate_url(url)
996
def _preopen_isolate_url(self, url):
997
if not self._directory_isolation:
999
if self._directory_isolation == 'record':
1000
self._bzr_selftest_roots.append(url)
1002
# This prevents all transports, including e.g. sftp ones backed on disk
1003
# from working unless they are explicitly granted permission. We then
1004
# depend on the code that sets up test transports to check that they are
1005
# appropriately isolated and enable their use by calling
1006
# self.permit_transport()
1007
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1008
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1009
% (url, self._bzr_selftest_roots))
1011
def record_directory_isolation(self):
1012
"""Gather accessed directories to permit later access.
1014
This is used for tests that access the branch bzr is running from.
1016
self._directory_isolation = "record"
1018
def start_server(self, transport_server, backing_server=None):
1019
"""Start transport_server for this test.
1021
This starts the server, registers a cleanup for it and permits the
1022
server's urls to be used.
1024
if backing_server is None:
1025
transport_server.start_server()
1027
transport_server.start_server(backing_server)
1028
self.addCleanup(transport_server.stop_server)
1029
# Obtain a real transport because if the server supplies a password, it
1030
# will be hidden from the base on the client side.
1031
t = get_transport(transport_server.get_url())
1032
# Some transport servers effectively chroot the backing transport;
1033
# others like SFTPServer don't - users of the transport can walk up the
1034
# transport to read the entire backing transport. This wouldn't matter
1035
# except that the workdir tests are given - and that they expect the
1036
# server's url to point at - is one directory under the safety net. So
1037
# Branch operations into the transport will attempt to walk up one
1038
# directory. Chrooting all servers would avoid this but also mean that
1039
# we wouldn't be testing directly against non-root urls. Alternatively
1040
# getting the test framework to start the server with a backing server
1041
# at the actual safety net directory would work too, but this then
1042
# means that the self.get_url/self.get_transport methods would need
1043
# to transform all their results. On balance its cleaner to handle it
1044
# here, and permit a higher url when we have one of these transports.
1045
if t.base.endswith('/work/'):
1046
# we have safety net/test root/work
1047
t = t.clone('../..')
1048
elif isinstance(transport_server,
1049
test_server.SmartTCPServer_for_testing):
1050
# The smart server adds a path similar to work, which is traversed
1051
# up from by the client. But the server is chrooted - the actual
1052
# backing transport is not escaped from, and VFS requests to the
1053
# root will error (because they try to escape the chroot).
1055
while t2.base != t.base:
1058
self.permit_url(t.base)
1060
def _track_transports(self):
1061
"""Install checks for transport usage."""
1062
# TestCase has no safe place it can write to.
1063
self._bzr_selftest_roots = []
1064
# Currently the easiest way to be sure that nothing is going on is to
1065
# hook into bzr dir opening. This leaves a small window of error for
1066
# transport tests, but they are well known, and we can improve on this
1068
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1069
self._preopen_isolate_transport, "Check bzr directories are safe.")
1071
931
def _ndiff_strings(self, a, b):
1072
932
"""Return ndiff between two strings containing lines.
1129
989
:raises AssertionError: If the expected and actual stat values differ
1130
990
other than by atime.
1132
self.assertEqual(expected.st_size, actual.st_size,
1133
'st_size did not match')
1134
self.assertEqual(expected.st_mtime, actual.st_mtime,
1135
'st_mtime did not match')
1136
self.assertEqual(expected.st_ctime, actual.st_ctime,
1137
'st_ctime did not match')
1138
if sys.platform != 'win32':
1139
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1140
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1143
self.assertEqual(expected.st_dev, actual.st_dev,
1144
'st_dev did not match')
1145
self.assertEqual(expected.st_ino, actual.st_ino,
1146
'st_ino did not match')
1147
self.assertEqual(expected.st_mode, actual.st_mode,
1148
'st_mode did not match')
992
self.assertEqual(expected.st_size, actual.st_size)
993
self.assertEqual(expected.st_mtime, actual.st_mtime)
994
self.assertEqual(expected.st_ctime, actual.st_ctime)
995
self.assertEqual(expected.st_dev, actual.st_dev)
996
self.assertEqual(expected.st_ino, actual.st_ino)
997
self.assertEqual(expected.st_mode, actual.st_mode)
1150
999
def assertLength(self, length, obj_with_len):
1151
1000
"""Assert that obj_with_len is of length length."""
1153
1002
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1154
1003
length, len(obj_with_len), obj_with_len))
1156
def assertLogsError(self, exception_class, func, *args, **kwargs):
1157
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1159
from bzrlib import trace
1161
orig_log_exception_quietly = trace.log_exception_quietly
1164
orig_log_exception_quietly()
1165
captured.append(sys.exc_info())
1166
trace.log_exception_quietly = capture
1167
func(*args, **kwargs)
1169
trace.log_exception_quietly = orig_log_exception_quietly
1170
self.assertLength(1, captured)
1171
err = captured[0][1]
1172
self.assertIsInstance(err, exception_class)
1175
1005
def assertPositive(self, val):
1176
1006
"""Assert that val is greater than 0."""
1177
1007
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1303
1129
m += ": " + msg
1132
def expectFailure(self, reason, assertion, *args, **kwargs):
1133
"""Invoke a test, expecting it to fail for the given reason.
1135
This is for assertions that ought to succeed, but currently fail.
1136
(The failure is *expected* but not *wanted*.) Please be very precise
1137
about the failure you're expecting. If a new bug is introduced,
1138
AssertionError should be raised, not KnownFailure.
1140
Frequently, expectFailure should be followed by an opposite assertion.
1143
Intended to be used with a callable that raises AssertionError as the
1144
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1146
Raises KnownFailure if the test fails. Raises AssertionError if the
1151
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1153
self.assertEqual(42, dynamic_val)
1155
This means that a dynamic_val of 54 will cause the test to raise
1156
a KnownFailure. Once math is fixed and the expectFailure is removed,
1157
only a dynamic_val of 42 will allow the test to pass. Anything other
1158
than 54 or 42 will cause an AssertionError.
1161
assertion(*args, **kwargs)
1162
except AssertionError:
1163
raise KnownFailure(reason)
1165
self.fail('Unexpected success. Should have failed: %s' % reason)
1306
1167
def assertFileEqual(self, content, path):
1307
1168
"""Fail if path does not contain 'content'."""
1308
1169
self.failUnlessExists(path)
1495
1354
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
1516
1356
def _cleanEnvironment(self):
1518
1358
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1556
1392
'ftp_proxy': None,
1557
1393
'FTP_PROXY': None,
1558
1394
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
1397
self.addCleanup(self._restoreEnvironment)
1567
1398
for name, value in new_env.iteritems():
1568
1399
self._captureVar(name, value)
1570
1401
def _captureVar(self, name, newvalue):
1571
1402
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1403
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1405
def _restore_debug_flags(self):
1406
debug.debug_flags.clear()
1407
debug.debug_flags.update(self._preserved_debug_flags)
1574
1409
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1410
for name, value in self.__old_env.iteritems():
1576
1411
osutils.set_or_unset_env(name, value)
1578
1413
def _restoreHooks(self):
1586
1421
def _do_skip(self, result, reason):
1587
1422
addSkip = getattr(result, 'addSkip', None)
1588
1423
if not callable(addSkip):
1589
result.addSuccess(result)
1424
result.addError(self, sys.exc_info())
1591
1426
addSkip(self, reason)
1594
def _do_known_failure(self, result, e):
1595
err = sys.exc_info()
1596
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1597
if addExpectedFailure is not None:
1598
addExpectedFailure(self, err)
1600
result.addSuccess(self)
1603
def _do_not_applicable(self, result, e):
1605
reason = 'No reason given'
1608
addNotApplicable = getattr(result, 'addNotApplicable', None)
1609
if addNotApplicable is not None:
1610
result.addNotApplicable(self, reason)
1612
self._do_skip(result, reason)
1615
def _do_unsupported_or_skip(self, result, e):
1617
addNotSupported = getattr(result, 'addNotSupported', None)
1618
if addNotSupported is not None:
1619
result.addNotSupported(self, reason)
1621
self._do_skip(result, reason)
1428
def run(self, result=None):
1429
if result is None: result = self.defaultTestResult()
1430
for feature in getattr(self, '_test_needs_features', []):
1431
if not feature.available():
1432
result.startTest(self)
1433
if getattr(result, 'addNotSupported', None):
1434
result.addNotSupported(self, feature)
1436
result.addSuccess(self)
1437
result.stopTest(self)
1441
result.startTest(self)
1442
absent_attr = object()
1444
method_name = getattr(self, '_testMethodName', absent_attr)
1445
if method_name is absent_attr:
1447
method_name = getattr(self, '_TestCase__testMethodName')
1448
testMethod = getattr(self, method_name)
1452
if not self._bzr_test_setUp_run:
1454
"test setUp did not invoke "
1455
"bzrlib.tests.TestCase's setUp")
1456
except KeyboardInterrupt:
1459
except TestSkipped, e:
1460
self._do_skip(result, e.args[0])
1464
result.addError(self, sys.exc_info())
1472
except self.failureException:
1473
result.addFailure(self, sys.exc_info())
1474
except TestSkipped, e:
1476
reason = "No reason given."
1479
self._do_skip(result, reason)
1480
except KeyboardInterrupt:
1484
result.addError(self, sys.exc_info())
1488
if not self._bzr_test_tearDown_run:
1490
"test tearDown did not invoke "
1491
"bzrlib.tests.TestCase's tearDown")
1492
except KeyboardInterrupt:
1496
result.addError(self, sys.exc_info())
1499
if ok: result.addSuccess(self)
1501
result.stopTest(self)
1503
except TestNotApplicable:
1504
# Not moved from the result [yet].
1507
except KeyboardInterrupt:
1512
for attr_name in self.attrs_to_keep:
1513
if attr_name in self.__dict__:
1514
saved_attrs[attr_name] = self.__dict__[attr_name]
1515
self.__dict__ = saved_attrs
1519
self._log_contents = ''
1520
self._bzr_test_tearDown_run = True
1521
unittest.TestCase.tearDown(self)
1623
1523
def time(self, callable, *args, **kwargs):
1624
1524
"""Run callable and accrue the time it takes to the benchmark time.
1659
1568
self._log_contents.
1660
1569
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1571
# flush the log file, to get all content
1573
if bzrlib.trace._trace_file:
1574
bzrlib.trace._trace_file.flush()
1575
if self._log_contents:
1576
# XXX: this can hardly contain the content flushed above --vila
1668
1578
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
1579
if self._log_file_name is not None:
1674
1580
logfile = open(self._log_file_name)
1676
1582
log_contents = logfile.read()
1678
1584
logfile.close()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
1585
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
1586
self._log_contents = log_contents
1717
1588
os.remove(self._log_file_name)
2260
2112
then the self.get_vfs_server is returned.
2262
2114
if self.__server is None:
2263
if (self.transport_server is None or self.transport_server is
2264
self.vfs_transport_factory):
2265
self.__server = self.get_vfs_only_server()
2115
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
2116
return self.get_vfs_only_server()
2267
2118
# bring up a decorated means of access to the vfs only server.
2268
2119
self.__server = self.transport_server()
2269
self.start_server(self.__server, self.get_vfs_only_server())
2121
self.__server.setUp(self.get_vfs_only_server())
2122
except TypeError, e:
2123
# This should never happen; the try:Except here is to assist
2124
# developers having to update code rather than seeing an
2125
# uninformative TypeError.
2126
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
2127
self.addCleanup(self.__server.tearDown)
2270
2128
return self.__server
2272
2130
def _adjust_url(self, base, relpath):
3352
3177
def addFailure(self, test, err):
3353
3178
self.result.addFailure(test, err)
3354
ForwardingResult = testtools.ExtendedToOriginalDecorator
3181
class BZRTransformingResult(ForwardingResult):
3183
def addError(self, test, err):
3184
feature = self._error_looks_like('UnavailableFeature: ', err)
3185
if feature is not None:
3186
self.result.addNotSupported(test, feature)
3188
self.result.addError(test, err)
3190
def addFailure(self, test, err):
3191
known = self._error_looks_like('KnownFailure: ', err)
3192
if known is not None:
3193
self.result._addKnownFailure(test, [KnownFailure,
3194
KnownFailure(known), None])
3196
self.result.addFailure(test, err)
3198
def _error_looks_like(self, prefix, err):
3199
"""Deserialize exception and returns the stringify value."""
3203
if isinstance(exc, subunit.RemoteException):
3204
# stringify the exception gives access to the remote traceback
3205
# We search the last line for 'prefix'
3206
lines = str(exc).split('\n')
3207
while lines and not lines[-1]:
3210
if lines[-1].startswith(prefix):
3211
value = lines[-1][len(prefix):]
3357
3215
class ProfileResult(ForwardingResult):
3634
3492
'bzrlib.tests.commands',
3635
3493
'bzrlib.tests.per_branch',
3636
3494
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3638
'bzrlib.tests.per_foreign_vcs',
3639
3495
'bzrlib.tests.per_interrepository',
3640
3496
'bzrlib.tests.per_intertree',
3641
3497
'bzrlib.tests.per_inventory',
3642
3498
'bzrlib.tests.per_interbranch',
3643
3499
'bzrlib.tests.per_lock',
3644
'bzrlib.tests.per_merger',
3645
3500
'bzrlib.tests.per_transport',
3646
3501
'bzrlib.tests.per_tree',
3647
3502
'bzrlib.tests.per_pack_repository',
3648
3503
'bzrlib.tests.per_repository',
3649
3504
'bzrlib.tests.per_repository_chk',
3650
3505
'bzrlib.tests.per_repository_reference',
3651
'bzrlib.tests.per_uifactory',
3652
3506
'bzrlib.tests.per_versionedfile',
3653
3507
'bzrlib.tests.per_workingtree',
3654
3508
'bzrlib.tests.test__annotator',
3655
'bzrlib.tests.test__bencode',
3656
3509
'bzrlib.tests.test__chk_map',
3657
3510
'bzrlib.tests.test__dirstate_helpers',
3658
3511
'bzrlib.tests.test__groupcompress',
3659
3512
'bzrlib.tests.test__known_graph',
3660
3513
'bzrlib.tests.test__rio',
3661
'bzrlib.tests.test__simple_set',
3662
'bzrlib.tests.test__static_tuple',
3663
3514
'bzrlib.tests.test__walkdirs_win32',
3664
3515
'bzrlib.tests.test_ancestry',
3665
3516
'bzrlib.tests.test_annotate',
3666
3517
'bzrlib.tests.test_api',
3667
3518
'bzrlib.tests.test_atomicfile',
3668
3519
'bzrlib.tests.test_bad_files',
3520
'bzrlib.tests.test_bencode',
3669
3521
'bzrlib.tests.test_bisect_multi',
3670
3522
'bzrlib.tests.test_branch',
3671
3523
'bzrlib.tests.test_branchbuilder',
4039
3883
return new_test
4042
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4044
"""Helper for permutating tests against an extension module.
4046
This is meant to be used inside a modules 'load_tests()' function. It will
4047
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4048
against both implementations. Setting 'test.module' to the appropriate
4049
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4051
:param standard_tests: A test suite to permute
4052
:param loader: A TestLoader
4053
:param py_module_name: The python path to a python module that can always
4054
be loaded, and will be considered the 'python' implementation. (eg
4055
'bzrlib._chk_map_py')
4056
:param ext_module_name: The python path to an extension module. If the
4057
module cannot be loaded, a single test will be added, which notes that
4058
the module is not available. If it can be loaded, all standard_tests
4059
will be run against that module.
4060
:return: (suite, feature) suite is a test-suite that has all the permuted
4061
tests. feature is the Feature object that can be used to determine if
4062
the module is available.
4065
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4067
('python', {'module': py_module}),
4069
suite = loader.suiteClass()
4070
feature = ModuleAvailableFeature(ext_module_name)
4071
if feature.available():
4072
scenarios.append(('C', {'module': feature.module}))
4074
# the compiled module isn't available, so we add a failing test
4075
class FailWithoutFeature(TestCase):
4076
def test_fail(self):
4077
self.requireFeature(feature)
4078
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4079
result = multiply_tests(standard_tests, scenarios, suite)
4080
return result, feature
4083
def _rmtree_temp_dir(dirname, test_id=None):
3886
def _rmtree_temp_dir(dirname):
4084
3887
# If LANG=C we probably have created some bogus paths
4085
3888
# which rmtree(unicode) will fail to delete
4086
3889
# so make sure we are using rmtree(str) to delete everything
4190
3990
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4193
class _CompatabilityThunkFeature(Feature):
4194
"""This feature is just a thunk to another feature.
4196
It issues a deprecation warning if it is accessed, to let you know that you
4197
should really use a different feature.
4200
def __init__(self, dep_version, module, name,
4201
replacement_name, replacement_module=None):
4202
super(_CompatabilityThunkFeature, self).__init__()
4203
self._module = module
4204
if replacement_module is None:
4205
replacement_module = module
4206
self._replacement_module = replacement_module
4208
self._replacement_name = replacement_name
4209
self._dep_version = dep_version
4210
self._feature = None
4213
if self._feature is None:
4214
depr_msg = self._dep_version % ('%s.%s'
4215
% (self._module, self._name))
4216
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4217
self._replacement_name)
4218
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4219
# Import the new feature and use it as a replacement for the
4221
mod = __import__(self._replacement_module, {}, {},
4222
[self._replacement_name])
4223
self._feature = getattr(mod, self._replacement_name)
4227
return self._feature._probe()
4230
class ModuleAvailableFeature(Feature):
4231
"""This is a feature than describes a module we want to be available.
4233
Declare the name of the module in __init__(), and then after probing, the
4234
module will be available as 'self.module'.
4236
:ivar module: The module if it is available, else None.
4239
def __init__(self, module_name):
4240
super(ModuleAvailableFeature, self).__init__()
4241
self.module_name = module_name
4245
self._module = __import__(self.module_name, {}, {}, [''])
4252
if self.available(): # Make sure the probe has been done
4256
def feature_name(self):
4257
return self.module_name
4260
# This is kept here for compatibility, it is recommended to use
4261
# 'bzrlib.tests.feature.paramiko' instead
4262
ParamikoFeature = _CompatabilityThunkFeature(
4263
deprecated_in((2,1,0)),
4264
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4267
3993
def probe_unicode_in_user_encoding():
4268
3994
"""Try to encode several unicode strings to use in unicode-aware tests.
4269
3995
Return first successfull match.
4428
4130
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4431
class _CaseSensitiveFilesystemFeature(Feature):
4133
class _SubUnitFeature(Feature):
4134
"""Check if subunit is available."""
4433
4136
def _probe(self):
4434
if CaseInsCasePresFilenameFeature.available():
4436
elif CaseInsensitiveFilesystemFeature.available():
4441
4143
def feature_name(self):
4442
return 'case-sensitive filesystem'
4444
# new coding style is for feature instances to be lowercase
4445
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4448
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4449
SubUnitFeature = _CompatabilityThunkFeature(
4450
deprecated_in((2,1,0)),
4451
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4146
SubUnitFeature = _SubUnitFeature()
4452
4147
# Only define SubUnitBzrRunner if subunit is available.
4454
4149
from subunit import TestProtocolClient
4455
from subunit.test_results import AutoTimingTestResultDecorator
4151
from subunit.test_results import AutoTimingTestResultDecorator
4153
AutoTimingTestResultDecorator = lambda x:x
4456
4154
class SubUnitBzrRunner(TextTestRunner):
4457
4155
def run(self, test):
4458
4156
result = AutoTimingTestResultDecorator(