292
298
Called from the TestCase run() method when the test
293
299
fails with an unexpected error.
295
self._testConcluded(test)
296
if isinstance(err[1], TestNotApplicable):
297
return self._addNotApplicable(test, err)
298
elif isinstance(err[1], UnavailableFeature):
299
return self.addNotSupported(test, err[1].args[0])
301
unittest.TestResult.addError(self, test, err)
302
self.error_count += 1
303
self.report_error(test, err)
306
self._cleanupLogFile(test)
302
unittest.TestResult.addError(self, test, err)
303
self.error_count += 1
304
self.report_error(test, err)
307
self._cleanupLogFile(test)
308
309
def addFailure(self, test, err):
309
310
"""Tell result that test failed.
311
312
Called from the TestCase run() method when the test
312
313
fails because e.g. an assert() method failed.
314
self._testConcluded(test)
315
if isinstance(err[1], KnownFailure):
316
return self._addKnownFailure(test, err)
318
unittest.TestResult.addFailure(self, test, err)
319
self.failure_count += 1
320
self.report_failure(test, err)
323
self._cleanupLogFile(test)
316
unittest.TestResult.addFailure(self, test, err)
317
self.failure_count += 1
318
self.report_failure(test, err)
321
self._cleanupLogFile(test)
325
323
def addSuccess(self, test):
326
324
"""Tell result that test completed successfully.
328
326
Called from the TestCase run()
330
self._testConcluded(test)
331
328
if self._bench_history is not None:
332
329
benchmark_time = self._extractBenchmarkTime(test)
333
330
if benchmark_time is not None:
368
358
self.skip_count += 1
369
359
self.report_skip(test, reason)
371
def _addNotApplicable(self, test, skip_excinfo):
372
if isinstance(skip_excinfo[1], TestNotApplicable):
373
self.not_applicable_count += 1
374
self.report_not_applicable(test, skip_excinfo)
377
except KeyboardInterrupt:
380
self.addError(test, test.exc_info())
382
# seems best to treat this as success from point-of-view of unittest
383
# -- it actually does nothing so it barely matters :)
384
unittest.TestResult.addSuccess(self, test)
385
test._log_contents = ''
361
def addNotApplicable(self, test, reason):
362
self.not_applicable_count += 1
363
self.report_not_applicable(test, reason)
387
365
def printErrorList(self, flavour, errors):
388
366
for test, err in errors:
504
487
return self._shortened_test_description(test)
506
489
def report_error(self, test, err):
507
self.pb.note('ERROR: %s\n %s\n',
490
ui.ui_factory.note('ERROR: %s\n %s\n' % (
508
491
self._test_description(test),
512
495
def report_failure(self, test, err):
513
self.pb.note('FAIL: %s\n %s\n',
496
ui.ui_factory.note('FAIL: %s\n %s\n' % (
514
497
self._test_description(test),
518
501
def report_known_failure(self, test, err):
519
self.pb.note('XFAIL: %s\n%s\n',
520
self._test_description(test), err[1])
502
ui.ui_factory.note('XFAIL: %s\n%s\n' % (
503
self._test_description(test), err[1]))
522
505
def report_skip(self, test, reason):
525
def report_not_applicable(self, test, skip_excinfo):
508
def report_not_applicable(self, test, reason):
528
511
def report_unsupported(self, test, feature):
550
533
def report_test_start(self, test):
552
535
name = self._shortened_test_description(test)
553
# width needs space for 6 char status, plus 1 for slash, plus an
554
# 11-char time string, plus a trailing blank
555
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
556
self.stream.write(self._ellipsize_to_right(name,
557
osutils.terminal_width()-18))
536
width = osutils.terminal_width()
537
if width is not None:
538
# width needs space for 6 char status, plus 1 for slash, plus an
539
# 11-char time string, plus a trailing blank
540
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
542
self.stream.write(self._ellipsize_to_right(name, width-18))
544
self.stream.write(name)
558
545
self.stream.flush()
560
547
def _error_summary(self, err):
928
932
def _lock_broken(self, result):
929
933
self._lock_actions.append(('broken', result))
935
def permit_dir(self, name):
936
"""Permit a directory to be used by this test. See permit_url."""
937
name_transport = get_transport(name)
938
self.permit_url(name)
939
self.permit_url(name_transport.base)
941
def permit_url(self, url):
942
"""Declare that url is an ok url to use in this test.
944
Do this for memory transports, temporary test directory etc.
946
Do not do this for the current working directory, /tmp, or any other
947
preexisting non isolated url.
949
if not url.endswith('/'):
951
self._bzr_selftest_roots.append(url)
953
def permit_source_tree_branch_repo(self):
954
"""Permit the source tree bzr is running from to be opened.
956
Some code such as bzrlib.version attempts to read from the bzr branch
957
that bzr is executing from (if any). This method permits that directory
958
to be used in the test suite.
960
path = self.get_source_path()
961
self.record_directory_isolation()
964
workingtree.WorkingTree.open(path)
965
except (errors.NotBranchError, errors.NoWorkingTree):
968
self.enable_directory_isolation()
970
def _preopen_isolate_transport(self, transport):
971
"""Check that all transport openings are done in the test work area."""
972
while isinstance(transport, pathfilter.PathFilteringTransport):
973
# Unwrap pathfiltered transports
974
transport = transport.server.backing_transport.clone(
975
transport._filter('.'))
977
# ReadonlySmartTCPServer_for_testing decorates the backing transport
978
# urls it is given by prepending readonly+. This is appropriate as the
979
# client shouldn't know that the server is readonly (or not readonly).
980
# We could register all servers twice, with readonly+ prepending, but
981
# that makes for a long list; this is about the same but easier to
983
if url.startswith('readonly+'):
984
url = url[len('readonly+'):]
985
self._preopen_isolate_url(url)
987
def _preopen_isolate_url(self, url):
988
if not self._directory_isolation:
990
if self._directory_isolation == 'record':
991
self._bzr_selftest_roots.append(url)
993
# This prevents all transports, including e.g. sftp ones backed on disk
994
# from working unless they are explicitly granted permission. We then
995
# depend on the code that sets up test transports to check that they are
996
# appropriately isolated and enable their use by calling
997
# self.permit_transport()
998
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
999
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1000
% (url, self._bzr_selftest_roots))
1002
def record_directory_isolation(self):
1003
"""Gather accessed directories to permit later access.
1005
This is used for tests that access the branch bzr is running from.
1007
self._directory_isolation = "record"
931
1009
def start_server(self, transport_server, backing_server=None):
932
1010
"""Start transport_server for this test.
940
1018
transport_server.setUp(backing_server)
941
1019
self.addCleanup(transport_server.tearDown)
1020
# Obtain a real transport because if the server supplies a password, it
1021
# will be hidden from the base on the client side.
1022
t = get_transport(transport_server.get_url())
1023
# Some transport servers effectively chroot the backing transport;
1024
# others like SFTPServer don't - users of the transport can walk up the
1025
# transport to read the entire backing transport. This wouldn't matter
1026
# except that the workdir tests are given - and that they expect the
1027
# server's url to point at - is one directory under the safety net. So
1028
# Branch operations into the transport will attempt to walk up one
1029
# directory. Chrooting all servers would avoid this but also mean that
1030
# we wouldn't be testing directly against non-root urls. Alternatively
1031
# getting the test framework to start the server with a backing server
1032
# at the actual safety net directory would work too, but this then
1033
# means that the self.get_url/self.get_transport methods would need
1034
# to transform all their results. On balance its cleaner to handle it
1035
# here, and permit a higher url when we have one of these transports.
1036
if t.base.endswith('/work/'):
1037
# we have safety net/test root/work
1038
t = t.clone('../..')
1039
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1040
# The smart server adds a path similar to work, which is traversed
1041
# up from by the client. But the server is chrooted - the actual
1042
# backing transport is not escaped from, and VFS requests to the
1043
# root will error (because they try to escape the chroot).
1045
while t2.base != t.base:
1048
self.permit_url(t.base)
1050
def _track_transports(self):
1051
"""Install checks for transport usage."""
1052
# TestCase has no safe place it can write to.
1053
self._bzr_selftest_roots = []
1054
# Currently the easiest way to be sure that nothing is going on is to
1055
# hook into bzr dir opening. This leaves a small window of error for
1056
# transport tests, but they are well known, and we can improve on this
1058
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1059
self._preopen_isolate_transport, "Check bzr directories are safe.")
943
1061
def _ndiff_strings(self, a, b):
944
1062
"""Return ndiff between two strings containing lines.
1001
1119
:raises AssertionError: If the expected and actual stat values differ
1002
1120
other than by atime.
1004
self.assertEqual(expected.st_size, actual.st_size)
1005
self.assertEqual(expected.st_mtime, actual.st_mtime)
1006
self.assertEqual(expected.st_ctime, actual.st_ctime)
1007
self.assertEqual(expected.st_dev, actual.st_dev)
1008
self.assertEqual(expected.st_ino, actual.st_ino)
1009
self.assertEqual(expected.st_mode, actual.st_mode)
1122
self.assertEqual(expected.st_size, actual.st_size,
1123
'st_size did not match')
1124
self.assertEqual(expected.st_mtime, actual.st_mtime,
1125
'st_mtime did not match')
1126
self.assertEqual(expected.st_ctime, actual.st_ctime,
1127
'st_ctime did not match')
1128
if sys.platform != 'win32':
1129
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1130
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1131
# odd. Regardless we shouldn't actually try to assert anything
1132
# about their values
1133
self.assertEqual(expected.st_dev, actual.st_dev,
1134
'st_dev did not match')
1135
self.assertEqual(expected.st_ino, actual.st_ino,
1136
'st_ino did not match')
1137
self.assertEqual(expected.st_mode, actual.st_mode,
1138
'st_mode did not match')
1011
1140
def assertLength(self, length, obj_with_len):
1012
1141
"""Assert that obj_with_len is of length length."""
1014
1143
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1015
1144
length, len(obj_with_len), obj_with_len))
1146
def assertLogsError(self, exception_class, func, *args, **kwargs):
1147
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1149
from bzrlib import trace
1151
orig_log_exception_quietly = trace.log_exception_quietly
1154
orig_log_exception_quietly()
1155
captured.append(sys.exc_info())
1156
trace.log_exception_quietly = capture
1157
func(*args, **kwargs)
1159
trace.log_exception_quietly = orig_log_exception_quietly
1160
self.assertLength(1, captured)
1161
err = captured[0][1]
1162
self.assertIsInstance(err, exception_class)
1017
1165
def assertPositive(self, val):
1018
1166
"""Assert that val is greater than 0."""
1019
1167
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1433
1583
def _do_skip(self, result, reason):
1434
1584
addSkip = getattr(result, 'addSkip', None)
1435
1585
if not callable(addSkip):
1436
result.addError(self, sys.exc_info())
1586
result.addSuccess(result)
1438
1588
addSkip(self, reason)
1590
def _do_known_failure(self, result):
1591
err = sys.exc_info()
1592
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1593
if addExpectedFailure is not None:
1594
addExpectedFailure(self, err)
1596
result.addSuccess(self)
1598
def _do_not_applicable(self, result, e):
1600
reason = 'No reason given'
1603
addNotApplicable = getattr(result, 'addNotApplicable', None)
1604
if addNotApplicable is not None:
1605
result.addNotApplicable(self, reason)
1607
self._do_skip(result, reason)
1609
def _do_unsupported_or_skip(self, result, reason):
1610
addNotSupported = getattr(result, 'addNotSupported', None)
1611
if addNotSupported is not None:
1612
result.addNotSupported(self, reason)
1614
self._do_skip(result, reason)
1440
1616
def run(self, result=None):
1441
1617
if result is None: result = self.defaultTestResult()
1618
result.startTest(self)
1623
result.stopTest(self)
1625
def _run(self, result):
1442
1626
for feature in getattr(self, '_test_needs_features', []):
1443
1627
if not feature.available():
1444
result.startTest(self)
1445
if getattr(result, 'addNotSupported', None):
1446
result.addNotSupported(self, feature)
1448
result.addSuccess(self)
1449
result.stopTest(self)
1628
return self._do_unsupported_or_skip(result, feature)
1630
absent_attr = object()
1632
method_name = getattr(self, '_testMethodName', absent_attr)
1633
if method_name is absent_attr:
1635
method_name = getattr(self, '_TestCase__testMethodName')
1636
testMethod = getattr(self, method_name)
1453
result.startTest(self)
1454
absent_attr = object()
1456
method_name = getattr(self, '_testMethodName', absent_attr)
1457
if method_name is absent_attr:
1459
method_name = getattr(self, '_TestCase__testMethodName')
1460
testMethod = getattr(self, method_name)
1464
if not self._bzr_test_setUp_run:
1466
"test setUp did not invoke "
1467
"bzrlib.tests.TestCase's setUp")
1468
except KeyboardInterrupt:
1471
except TestSkipped, e:
1472
self._do_skip(result, e.args[0])
1476
result.addError(self, sys.exc_info())
1640
if not self._bzr_test_setUp_run:
1642
"test setUp did not invoke "
1643
"bzrlib.tests.TestCase's setUp")
1644
except KeyboardInterrupt:
1647
except KnownFailure:
1648
self._do_known_failure(result)
1651
except TestNotApplicable, e:
1652
self._do_not_applicable(result, e)
1655
except TestSkipped, e:
1656
self._do_skip(result, e.args[0])
1659
except UnavailableFeature, e:
1660
self._do_unsupported_or_skip(result, e.args[0])
1664
result.addError(self, sys.exc_info())
1672
except KnownFailure:
1673
self._do_known_failure(result)
1674
except self.failureException:
1675
result.addFailure(self, sys.exc_info())
1676
except TestNotApplicable, e:
1677
self._do_not_applicable(result, e)
1678
except TestSkipped, e:
1680
reason = "No reason given."
1683
self._do_skip(result, reason)
1684
except UnavailableFeature, e:
1685
self._do_unsupported_or_skip(result, e.args[0])
1686
except KeyboardInterrupt:
1690
result.addError(self, sys.exc_info())
1694
if not self._bzr_test_tearDown_run:
1696
"test tearDown did not invoke "
1697
"bzrlib.tests.TestCase's tearDown")
1698
except KeyboardInterrupt:
1702
result.addError(self, sys.exc_info())
1484
except self.failureException:
1485
result.addFailure(self, sys.exc_info())
1486
except TestSkipped, e:
1488
reason = "No reason given."
1491
self._do_skip(result, reason)
1492
except KeyboardInterrupt:
1496
result.addError(self, sys.exc_info())
1500
if not self._bzr_test_tearDown_run:
1502
"test tearDown did not invoke "
1503
"bzrlib.tests.TestCase's tearDown")
1504
except KeyboardInterrupt:
1508
result.addError(self, sys.exc_info())
1511
if ok: result.addSuccess(self)
1513
result.stopTest(self)
1705
if ok: result.addSuccess(self)
1515
except TestNotApplicable:
1516
# Not moved from the result [yet].
1519
1707
except KeyboardInterrupt:
1520
1708
self._runCleanups()
3494
3740
'bzrlib.tests.commands',
3495
3741
'bzrlib.tests.per_branch',
3496
3742
'bzrlib.tests.per_bzrdir',
3743
'bzrlib.tests.per_foreign_vcs',
3497
3744
'bzrlib.tests.per_interrepository',
3498
3745
'bzrlib.tests.per_intertree',
3499
3746
'bzrlib.tests.per_inventory',
3500
3747
'bzrlib.tests.per_interbranch',
3501
3748
'bzrlib.tests.per_lock',
3749
'bzrlib.tests.per_merger',
3502
3750
'bzrlib.tests.per_transport',
3503
3751
'bzrlib.tests.per_tree',
3504
3752
'bzrlib.tests.per_pack_repository',
3505
3753
'bzrlib.tests.per_repository',
3506
3754
'bzrlib.tests.per_repository_chk',
3507
3755
'bzrlib.tests.per_repository_reference',
3756
'bzrlib.tests.per_uifactory',
3508
3757
'bzrlib.tests.per_versionedfile',
3509
3758
'bzrlib.tests.per_workingtree',
3510
3759
'bzrlib.tests.test__annotator',
3992
4248
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4251
class ModuleAvailableFeature(Feature):
4252
"""This is a feature than describes a module we want to be available.
4254
Declare the name of the module in __init__(), and then after probing, the
4255
module will be available as 'self.module'.
4257
:ivar module: The module if it is available, else None.
4260
def __init__(self, module_name):
4261
super(ModuleAvailableFeature, self).__init__()
4262
self.module_name = module_name
4266
self._module = __import__(self.module_name, {}, {}, [''])
4273
if self.available(): # Make sure the probe has been done
4277
def feature_name(self):
4278
return self.module_name
3995
4282
def probe_unicode_in_user_encoding():
3996
4283
"""Try to encode several unicode strings to use in unicode-aware tests.
3997
4284
Return first successfull match.