242
221
'%s is leaking threads among %d leaking tests.\n' % (
243
222
TestCase._first_thread_leaker_id,
244
223
TestCase._leaking_threads_tests))
245
# We don't report the main thread as an active one.
247
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
250
def getDescription(self, test):
253
def _extractBenchmarkTime(self, testCase, details=None):
225
def _extractBenchmarkTime(self, testCase):
254
226
"""Add a benchmark time for the current test case."""
255
if details and 'benchtime' in details:
256
return float(''.join(details['benchtime'].iter_bytes()))
257
227
return getattr(testCase, "_benchtime", None)
259
229
def _elapsedTestTimeString(self):
322
292
Called from the TestCase run() method when the test
323
293
fails with an unexpected error.
326
unittest.TestResult.addError(self, test, err)
327
self.error_count += 1
328
self.report_error(test, err)
331
self._cleanupLogFile(test)
295
self._testConcluded(test)
296
if isinstance(err[1], TestNotApplicable):
297
return self._addNotApplicable(test, err)
298
elif isinstance(err[1], UnavailableFeature):
299
return self.addNotSupported(test, err[1].args[0])
301
unittest.TestResult.addError(self, test, err)
302
self.error_count += 1
303
self.report_error(test, err)
306
self._cleanupLogFile(test)
333
308
def addFailure(self, test, err):
334
309
"""Tell result that test failed.
336
311
Called from the TestCase run() method when the test
337
312
fails because e.g. an assert() method failed.
340
unittest.TestResult.addFailure(self, test, err)
341
self.failure_count += 1
342
self.report_failure(test, err)
345
self._cleanupLogFile(test)
314
self._testConcluded(test)
315
if isinstance(err[1], KnownFailure):
316
return self._addKnownFailure(test, err)
318
unittest.TestResult.addFailure(self, test, err)
319
self.failure_count += 1
320
self.report_failure(test, err)
323
self._cleanupLogFile(test)
347
def addSuccess(self, test, details=None):
325
def addSuccess(self, test):
348
326
"""Tell result that test completed successfully.
350
328
Called from the TestCase run()
330
self._testConcluded(test)
352
331
if self._bench_history is not None:
353
benchmark_time = self._extractBenchmarkTime(test, details)
332
benchmark_time = self._extractBenchmarkTime(test)
354
333
if benchmark_time is not None:
355
334
self._bench_history.write("%s %s\n" % (
356
335
self._formatTime(benchmark_time),
360
339
unittest.TestResult.addSuccess(self, test)
361
340
test._log_contents = ''
363
def addExpectedFailure(self, test, err):
342
def _testConcluded(self, test):
343
"""Common code when a test has finished.
345
Called regardless of whether it succeded, failed, etc.
349
def _addKnownFailure(self, test, err):
364
350
self.known_failure_count += 1
365
351
self.report_known_failure(test, err)
368
354
"""The test will not be run because of a missing feature.
370
356
# this can be called in two different ways: it may be that the
371
# test started running, and then raised (through requireFeature)
357
# test started running, and then raised (through addError)
372
358
# UnavailableFeature. Alternatively this method can be called
373
# while probing for features before running the test code proper; in
374
# that case we will see startTest and stopTest, but the test will
375
# never actually run.
359
# while probing for features before running the tests; in that
360
# case we will see startTest and stopTest, but the test will never
376
362
self.unsupported.setdefault(str(feature), 0)
377
363
self.unsupported[str(feature)] += 1
378
364
self.report_unsupported(test, feature)
382
368
self.skip_count += 1
383
369
self.report_skip(test, reason)
385
def addNotApplicable(self, test, reason):
386
self.not_applicable_count += 1
387
self.report_not_applicable(test, reason)
371
def _addNotApplicable(self, test, skip_excinfo):
372
if isinstance(skip_excinfo[1], TestNotApplicable):
373
self.not_applicable_count += 1
374
self.report_not_applicable(test, skip_excinfo)
377
except KeyboardInterrupt:
380
self.addError(test, test.exc_info())
382
# seems best to treat this as success from point-of-view of unittest
383
# -- it actually does nothing so it barely matters :)
384
unittest.TestResult.addSuccess(self, test)
385
test._log_contents = ''
389
def _post_mortem(self):
390
"""Start a PDB post mortem session."""
391
if os.environ.get('BZR_TEST_PDB', None):
392
import pdb;pdb.post_mortem()
387
def printErrorList(self, flavour, errors):
388
for test, err in errors:
389
self.stream.writeln(self.separator1)
390
self.stream.write("%s: " % flavour)
391
self.stream.writeln(self.getDescription(test))
392
if getattr(test, '_get_log', None) is not None:
393
log_contents = test._get_log()
395
self.stream.write('\n')
397
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
398
self.stream.write('\n')
399
self.stream.write(log_contents)
400
self.stream.write('\n')
402
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
403
self.stream.write('\n')
404
self.stream.writeln(self.separator2)
405
self.stream.writeln("%s" % err)
394
407
def progress(self, offset, whence):
395
408
"""The test is adjusting the count of tests to run."""
490
504
return self._shortened_test_description(test)
492
506
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
507
self.pb.note('ERROR: %s\n %s\n',
494
508
self._test_description(test),
498
512
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
513
self.pb.note('FAIL: %s\n %s\n',
500
514
self._test_description(test),
504
518
def report_known_failure(self, test, err):
519
self.pb.note('XFAIL: %s\n%s\n',
520
self._test_description(test), err[1])
507
522
def report_skip(self, test, reason):
510
def report_not_applicable(self, test, reason):
525
def report_not_applicable(self, test, skip_excinfo):
513
528
def report_unsupported(self, test, feature):
535
550
def report_test_start(self, test):
537
552
name = self._shortened_test_description(test)
538
width = osutils.terminal_width()
539
if width is not None:
540
# width needs space for 6 char status, plus 1 for slash, plus an
541
# 11-char time string, plus a trailing blank
542
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
544
self.stream.write(self._ellipsize_to_right(name, width-18))
546
self.stream.write(name)
553
# width needs space for 6 char status, plus 1 for slash, plus an
554
# 11-char time string, plus a trailing blank
555
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
556
self.stream.write(self._ellipsize_to_right(name,
557
osutils.terminal_width()-18))
547
558
self.stream.flush()
549
560
def _error_summary(self, err):
578
589
self.stream.writeln(' SKIP %s\n%s'
579
590
% (self._testTimeString(test), reason))
581
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
583
% (self._testTimeString(test), reason))
592
def report_not_applicable(self, test, skip_excinfo):
593
self.stream.writeln(' N/A %s\n%s'
594
% (self._testTimeString(test),
595
self._error_summary(skip_excinfo)))
585
597
def report_unsupported(self, test, feature):
586
598
"""test cannot be run because feature is missing."""
606
618
applied left to right - the first element in the list is the
607
619
innermost decorator.
609
# stream may know claim to know to write unicode strings, but in older
610
# pythons this goes sufficiently wrong that it is a bad idea. (
611
# specifically a built in file with encoding 'UTF-8' will still try
612
# to encode using ascii.
613
new_encoding = osutils.get_terminal_encoding()
614
codec = codecs.lookup(new_encoding)
615
if type(codec) is tuple:
619
encode = codec.encode
620
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
621
stream.encoding = new_encoding
622
621
self.stream = unittest._WritelnDecorator(stream)
623
622
self.descriptions = descriptions
624
623
self.verbosity = verbosity
682
# traceback._some_str fails to format exceptions that have the default
683
# __str__ which does an implicit ascii conversion. However, repr() on those
684
# objects works, for all that its not quite what the doctor may have ordered.
685
def _clever_some_str(value):
690
return repr(value).replace('\\n', '\n')
692
return '<unprintable %s object>' % type(value).__name__
694
traceback._some_str = _clever_some_str
697
# deprecated - use self.knownFailure(), or self.expectFailure.
698
KnownFailure = testtools.testcase._ExpectedFailure
690
class KnownFailure(AssertionError):
691
"""Indicates that a test failed in a precisely expected manner.
693
Such failures dont block the whole test suite from passing because they are
694
indicators of partially completed code or of future work. We have an
695
explicit error for them so that we can ensure that they are always visible:
696
KnownFailures are always shown in the output of bzr selftest.
701
700
class UnavailableFeature(Exception):
702
701
"""A feature required for this test was not available.
704
This can be considered a specialised form of SkippedTest.
706
703
The feature should be used to construct the exception.
707
class CommandFailed(Exception):
710
711
class StringIOWrapper(object):
711
712
"""A wrapper around cStringIO which just adds an encoding attribute.
798
799
_leaking_threads_tests = 0
799
800
_first_thread_leaker_id = None
800
801
_log_file_name = None
803
_keep_log_file = False
801
804
# record lsprof data when performing benchmark calls.
802
805
_gather_lsprof_in_benchmarks = False
806
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
807
'_log_contents', '_log_file_name', '_benchtime',
808
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
804
810
def __init__(self, methodName='testMethod'):
805
811
super(TestCase, self).__init__(methodName)
806
812
self._cleanups = []
807
self._directory_isolation = True
808
self.exception_handlers.insert(0,
809
(UnavailableFeature, self._do_unsupported_or_skip))
810
self.exception_handlers.insert(0,
811
(TestNotApplicable, self._do_not_applicable))
813
self._bzr_test_setUp_run = False
814
self._bzr_test_tearDown_run = False
814
super(TestCase, self).setUp()
815
for feature in getattr(self, '_test_needs_features', []):
816
self.requireFeature(feature)
817
self._log_contents = None
818
self.addDetail("log", content.Content(content.ContentType("text",
819
"plain", {"charset": "utf8"}),
820
lambda:[self._get_log(keep_log_file=True)]))
817
unittest.TestCase.setUp(self)
818
self._bzr_test_setUp_run = True
821
819
self._cleanEnvironment()
822
820
self._silenceUI()
823
821
self._startLogFile()
824
822
self._benchcalls = []
825
823
self._benchtime = None
826
824
self._clear_hooks()
827
self._track_transports()
828
825
self._track_locks()
829
826
self._clear_debug_flags()
830
827
TestCase._active_threads = threading.activeCount()
839
836
active = threading.activeCount()
840
837
leaked_threads = active - TestCase._active_threads
841
838
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
840
TestCase._leaking_threads_tests += 1
850
841
if TestCase._first_thread_leaker_id is None:
851
842
TestCase._first_thread_leaker_id = self.id()
856
847
Tests that want to use debug flags can just set them in the
857
848
debug_flags set during setup/teardown.
859
# Start with a copy of the current debug flags we can safely modify.
860
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
850
self._preserved_debug_flags = set(debug.debug_flags)
861
851
if 'allow_debug' not in selftest_debug_flags:
862
852
debug.debug_flags.clear()
863
853
if 'disable_lock_checks' not in selftest_debug_flags:
864
854
debug.debug_flags.add('strict_locks')
855
self.addCleanup(self._restore_debug_flags)
866
857
def _clear_hooks(self):
867
858
# prevent hooks affecting tests
877
868
# this hook should always be installed
878
869
request._install_hook()
880
def disable_directory_isolation(self):
881
"""Turn off directory isolation checks."""
882
self._directory_isolation = False
884
def enable_directory_isolation(self):
885
"""Enable directory isolation checks."""
886
self._directory_isolation = True
888
871
def _silenceUI(self):
889
872
"""Turn off UI for duration of test"""
890
873
# by default the UI is off; tests can turn it on if they want it.
891
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
874
saved = ui.ui_factory
876
ui.ui_factory = saved
877
ui.ui_factory = ui.SilentUIFactory()
878
self.addCleanup(_restore)
893
880
def _check_locks(self):
894
881
"""Check that all lock take/release actions have been paired."""
941
928
def _lock_broken(self, result):
942
929
self._lock_actions.append(('broken', result))
944
def permit_dir(self, name):
945
"""Permit a directory to be used by this test. See permit_url."""
946
name_transport = get_transport(name)
947
self.permit_url(name)
948
self.permit_url(name_transport.base)
950
def permit_url(self, url):
951
"""Declare that url is an ok url to use in this test.
953
Do this for memory transports, temporary test directory etc.
955
Do not do this for the current working directory, /tmp, or any other
956
preexisting non isolated url.
958
if not url.endswith('/'):
960
self._bzr_selftest_roots.append(url)
962
def permit_source_tree_branch_repo(self):
963
"""Permit the source tree bzr is running from to be opened.
965
Some code such as bzrlib.version attempts to read from the bzr branch
966
that bzr is executing from (if any). This method permits that directory
967
to be used in the test suite.
969
path = self.get_source_path()
970
self.record_directory_isolation()
973
workingtree.WorkingTree.open(path)
974
except (errors.NotBranchError, errors.NoWorkingTree):
977
self.enable_directory_isolation()
979
def _preopen_isolate_transport(self, transport):
980
"""Check that all transport openings are done in the test work area."""
981
while isinstance(transport, pathfilter.PathFilteringTransport):
982
# Unwrap pathfiltered transports
983
transport = transport.server.backing_transport.clone(
984
transport._filter('.'))
986
# ReadonlySmartTCPServer_for_testing decorates the backing transport
987
# urls it is given by prepending readonly+. This is appropriate as the
988
# client shouldn't know that the server is readonly (or not readonly).
989
# We could register all servers twice, with readonly+ prepending, but
990
# that makes for a long list; this is about the same but easier to
992
if url.startswith('readonly+'):
993
url = url[len('readonly+'):]
994
self._preopen_isolate_url(url)
996
def _preopen_isolate_url(self, url):
997
if not self._directory_isolation:
999
if self._directory_isolation == 'record':
1000
self._bzr_selftest_roots.append(url)
1002
# This prevents all transports, including e.g. sftp ones backed on disk
1003
# from working unless they are explicitly granted permission. We then
1004
# depend on the code that sets up test transports to check that they are
1005
# appropriately isolated and enable their use by calling
1006
# self.permit_transport()
1007
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1008
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1009
% (url, self._bzr_selftest_roots))
1011
def record_directory_isolation(self):
1012
"""Gather accessed directories to permit later access.
1014
This is used for tests that access the branch bzr is running from.
1016
self._directory_isolation = "record"
1018
931
def start_server(self, transport_server, backing_server=None):
1019
932
"""Start transport_server for this test.
1022
935
server's urls to be used.
1024
937
if backing_server is None:
1025
transport_server.start_server()
938
transport_server.setUp()
1027
transport_server.start_server(backing_server)
1028
self.addCleanup(transport_server.stop_server)
1029
# Obtain a real transport because if the server supplies a password, it
1030
# will be hidden from the base on the client side.
1031
t = get_transport(transport_server.get_url())
1032
# Some transport servers effectively chroot the backing transport;
1033
# others like SFTPServer don't - users of the transport can walk up the
1034
# transport to read the entire backing transport. This wouldn't matter
1035
# except that the workdir tests are given - and that they expect the
1036
# server's url to point at - is one directory under the safety net. So
1037
# Branch operations into the transport will attempt to walk up one
1038
# directory. Chrooting all servers would avoid this but also mean that
1039
# we wouldn't be testing directly against non-root urls. Alternatively
1040
# getting the test framework to start the server with a backing server
1041
# at the actual safety net directory would work too, but this then
1042
# means that the self.get_url/self.get_transport methods would need
1043
# to transform all their results. On balance its cleaner to handle it
1044
# here, and permit a higher url when we have one of these transports.
1045
if t.base.endswith('/work/'):
1046
# we have safety net/test root/work
1047
t = t.clone('../..')
1048
elif isinstance(transport_server,
1049
test_server.SmartTCPServer_for_testing):
1050
# The smart server adds a path similar to work, which is traversed
1051
# up from by the client. But the server is chrooted - the actual
1052
# backing transport is not escaped from, and VFS requests to the
1053
# root will error (because they try to escape the chroot).
1055
while t2.base != t.base:
1058
self.permit_url(t.base)
1060
def _track_transports(self):
1061
"""Install checks for transport usage."""
1062
# TestCase has no safe place it can write to.
1063
self._bzr_selftest_roots = []
1064
# Currently the easiest way to be sure that nothing is going on is to
1065
# hook into bzr dir opening. This leaves a small window of error for
1066
# transport tests, but they are well known, and we can improve on this
1068
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1069
self._preopen_isolate_transport, "Check bzr directories are safe.")
940
transport_server.setUp(backing_server)
941
self.addCleanup(transport_server.tearDown)
1071
943
def _ndiff_strings(self, a, b):
1072
944
"""Return ndiff between two strings containing lines.
1129
1001
:raises AssertionError: If the expected and actual stat values differ
1130
1002
other than by atime.
1132
self.assertEqual(expected.st_size, actual.st_size,
1133
'st_size did not match')
1134
self.assertEqual(expected.st_mtime, actual.st_mtime,
1135
'st_mtime did not match')
1136
self.assertEqual(expected.st_ctime, actual.st_ctime,
1137
'st_ctime did not match')
1138
if sys.platform != 'win32':
1139
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1140
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1143
self.assertEqual(expected.st_dev, actual.st_dev,
1144
'st_dev did not match')
1145
self.assertEqual(expected.st_ino, actual.st_ino,
1146
'st_ino did not match')
1147
self.assertEqual(expected.st_mode, actual.st_mode,
1148
'st_mode did not match')
1004
self.assertEqual(expected.st_size, actual.st_size)
1005
self.assertEqual(expected.st_mtime, actual.st_mtime)
1006
self.assertEqual(expected.st_ctime, actual.st_ctime)
1007
self.assertEqual(expected.st_dev, actual.st_dev)
1008
self.assertEqual(expected.st_ino, actual.st_ino)
1009
self.assertEqual(expected.st_mode, actual.st_mode)
1150
1011
def assertLength(self, length, obj_with_len):
1151
1012
"""Assert that obj_with_len is of length length."""
1153
1014
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1154
1015
length, len(obj_with_len), obj_with_len))
1156
def assertLogsError(self, exception_class, func, *args, **kwargs):
1157
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1159
from bzrlib import trace
1161
orig_log_exception_quietly = trace.log_exception_quietly
1164
orig_log_exception_quietly()
1165
captured.append(sys.exc_info())
1166
trace.log_exception_quietly = capture
1167
func(*args, **kwargs)
1169
trace.log_exception_quietly = orig_log_exception_quietly
1170
self.assertLength(1, captured)
1171
err = captured[0][1]
1172
self.assertIsInstance(err, exception_class)
1175
1017
def assertPositive(self, val):
1176
1018
"""Assert that val is greater than 0."""
1177
1019
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1303
1141
m += ": " + msg
1144
def expectFailure(self, reason, assertion, *args, **kwargs):
1145
"""Invoke a test, expecting it to fail for the given reason.
1147
This is for assertions that ought to succeed, but currently fail.
1148
(The failure is *expected* but not *wanted*.) Please be very precise
1149
about the failure you're expecting. If a new bug is introduced,
1150
AssertionError should be raised, not KnownFailure.
1152
Frequently, expectFailure should be followed by an opposite assertion.
1155
Intended to be used with a callable that raises AssertionError as the
1156
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1158
Raises KnownFailure if the test fails. Raises AssertionError if the
1163
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1165
self.assertEqual(42, dynamic_val)
1167
This means that a dynamic_val of 54 will cause the test to raise
1168
a KnownFailure. Once math is fixed and the expectFailure is removed,
1169
only a dynamic_val of 42 will allow the test to pass. Anything other
1170
than 54 or 42 will cause an AssertionError.
1173
assertion(*args, **kwargs)
1174
except AssertionError:
1175
raise KnownFailure(reason)
1177
self.fail('Unexpected success. Should have failed: %s' % reason)
1306
1179
def assertFileEqual(self, content, path):
1307
1180
"""Fail if path does not contain 'content'."""
1308
1181
self.failUnlessExists(path)
1314
1187
self.assertEqualDiff(content, s)
1316
def assertDocstring(self, expected_docstring, obj):
1317
"""Fail if obj does not have expected_docstring"""
1319
# With -OO the docstring should be None instead
1320
self.assertIs(obj.__doc__, None)
1322
self.assertEqual(expected_docstring, obj.__doc__)
1324
1189
def failUnlessExists(self, path):
1325
1190
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1191
if not isinstance(path, basestring):
1467
1332
Close the file and delete it, unless setKeepLogfile was called.
1469
if bzrlib.trace._trace_file:
1470
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1334
if self._log_file is None:
1472
1336
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1337
self._log_file.close()
1338
self._log_file = None
1339
if not self._keep_log_file:
1340
os.remove(self._log_file_name)
1341
self._log_file_name = None
1343
def setKeepLogfile(self):
1344
"""Make the logfile not be deleted when _finishLogFile is called."""
1345
self._keep_log_file = True
1476
1347
def thisFailsStrictLockCheck(self):
1477
1348
"""It is known that this test would fail with -Dstrict_locks.
1495
1366
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
1516
1368
def _cleanEnvironment(self):
1518
1370
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1556
1404
'ftp_proxy': None,
1557
1405
'FTP_PROXY': None,
1558
1406
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
1409
self.addCleanup(self._restoreEnvironment)
1567
1410
for name, value in new_env.iteritems():
1568
1411
self._captureVar(name, value)
1570
1413
def _captureVar(self, name, newvalue):
1571
1414
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1415
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1417
def _restore_debug_flags(self):
1418
debug.debug_flags.clear()
1419
debug.debug_flags.update(self._preserved_debug_flags)
1574
1421
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1422
for name, value in self.__old_env.iteritems():
1576
1423
osutils.set_or_unset_env(name, value)
1578
1425
def _restoreHooks(self):
1586
1433
def _do_skip(self, result, reason):
1587
1434
addSkip = getattr(result, 'addSkip', None)
1588
1435
if not callable(addSkip):
1589
result.addSuccess(result)
1436
result.addError(self, sys.exc_info())
1591
1438
addSkip(self, reason)
1594
def _do_known_failure(self, result, e):
1595
err = sys.exc_info()
1596
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1597
if addExpectedFailure is not None:
1598
addExpectedFailure(self, err)
1600
result.addSuccess(self)
1603
def _do_not_applicable(self, result, e):
1605
reason = 'No reason given'
1608
addNotApplicable = getattr(result, 'addNotApplicable', None)
1609
if addNotApplicable is not None:
1610
result.addNotApplicable(self, reason)
1612
self._do_skip(result, reason)
1615
def _do_unsupported_or_skip(self, result, e):
1617
addNotSupported = getattr(result, 'addNotSupported', None)
1618
if addNotSupported is not None:
1619
result.addNotSupported(self, reason)
1621
self._do_skip(result, reason)
1440
def run(self, result=None):
1441
if result is None: result = self.defaultTestResult()
1442
for feature in getattr(self, '_test_needs_features', []):
1443
if not feature.available():
1444
result.startTest(self)
1445
if getattr(result, 'addNotSupported', None):
1446
result.addNotSupported(self, feature)
1448
result.addSuccess(self)
1449
result.stopTest(self)
1453
result.startTest(self)
1454
absent_attr = object()
1456
method_name = getattr(self, '_testMethodName', absent_attr)
1457
if method_name is absent_attr:
1459
method_name = getattr(self, '_TestCase__testMethodName')
1460
testMethod = getattr(self, method_name)
1464
if not self._bzr_test_setUp_run:
1466
"test setUp did not invoke "
1467
"bzrlib.tests.TestCase's setUp")
1468
except KeyboardInterrupt:
1471
except TestSkipped, e:
1472
self._do_skip(result, e.args[0])
1476
result.addError(self, sys.exc_info())
1484
except self.failureException:
1485
result.addFailure(self, sys.exc_info())
1486
except TestSkipped, e:
1488
reason = "No reason given."
1491
self._do_skip(result, reason)
1492
except KeyboardInterrupt:
1496
result.addError(self, sys.exc_info())
1500
if not self._bzr_test_tearDown_run:
1502
"test tearDown did not invoke "
1503
"bzrlib.tests.TestCase's tearDown")
1504
except KeyboardInterrupt:
1508
result.addError(self, sys.exc_info())
1511
if ok: result.addSuccess(self)
1513
result.stopTest(self)
1515
except TestNotApplicable:
1516
# Not moved from the result [yet].
1519
except KeyboardInterrupt:
1524
for attr_name in self.attrs_to_keep:
1525
if attr_name in self.__dict__:
1526
saved_attrs[attr_name] = self.__dict__[attr_name]
1527
self.__dict__ = saved_attrs
1531
self._log_contents = ''
1532
self._bzr_test_tearDown_run = True
1533
unittest.TestCase.tearDown(self)
1623
1535
def time(self, callable, *args, **kwargs):
1624
1536
"""Run callable and accrue the time it takes to the benchmark time.
1645
1555
self._benchtime += time.time() - start
1557
def _runCleanups(self):
1558
"""Run registered cleanup functions.
1560
This should only be called from TestCase.tearDown.
1562
# TODO: Perhaps this should keep running cleanups even if
1563
# one of them fails?
1565
# Actually pop the cleanups from the list so tearDown running
1566
# twice is safe (this happens for skipped tests).
1567
while self._cleanups:
1568
cleanup, args, kwargs = self._cleanups.pop()
1569
cleanup(*args, **kwargs)
1647
1571
def log(self, *args):
1650
1574
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1575
"""Get the log from bzrlib.trace calls from this test.
1656
1577
:param keep_log_file: When True, if the log is still a file on disk
1657
1578
leave it as a file on disk. When False, if the log is still a file
1659
1580
self._log_contents.
1660
1581
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1583
# flush the log file, to get all content
1585
if bzrlib.trace._trace_file:
1586
bzrlib.trace._trace_file.flush()
1587
if self._log_contents:
1588
# XXX: this can hardly contain the content flushed above --vila
1668
1590
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
1591
if self._log_file_name is not None:
1674
1592
logfile = open(self._log_file_name)
1676
1594
log_contents = logfile.read()
1678
1596
logfile.close()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
1597
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
1598
self._log_contents = log_contents
1717
1600
os.remove(self._log_file_name)
1779
1651
os.chdir(working_dir)
1783
result = self.apply_redirected(ui.ui_factory.stdin,
1785
bzrlib.commands.run_bzr_catch_user_errors,
1787
except KeyboardInterrupt:
1788
# Reraise KeyboardInterrupt with contents of redirected stdout
1789
# and stderr as arguments, for tests which are interested in
1790
# stdout and stderr and are expecting the exception.
1791
out = stdout.getvalue()
1792
err = stderr.getvalue()
1794
self.log('output:\n%r', out)
1796
self.log('errors:\n%r', err)
1797
raise KeyboardInterrupt(out, err)
1654
result = self.apply_redirected(ui.ui_factory.stdin,
1656
bzrlib.commands.run_bzr_catch_user_errors,
1799
1659
logger.removeHandler(handler)
1800
1660
ui.ui_factory = old_ui_factory
2342
2203
# recreate a new one or all the followng tests will fail.
2343
2204
# If you need to inspect its content uncomment the following line
2344
2205
# import pdb; pdb.set_trace()
2345
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2206
_rmtree_temp_dir(root + '/.bzr')
2346
2207
self._create_safety_net()
2347
2208
raise AssertionError('%s/.bzr should not be modified' % root)
2349
2210
def _make_test_root(self):
2350
2211
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2351
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2352
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2212
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
2354
2213
TestCaseWithMemoryTransport.TEST_ROOT = root
2356
2215
self._create_safety_net()
2424
2281
return branchbuilder.BranchBuilder(branch=branch)
2426
2283
def overrideEnvironmentForTesting(self):
2427
test_home_dir = self.test_home_dir
2428
if isinstance(test_home_dir, unicode):
2429
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2284
os.environ['HOME'] = self.test_home_dir
2285
os.environ['BZR_HOME'] = self.test_home_dir
2433
2287
def setUp(self):
2434
2288
super(TestCaseWithMemoryTransport, self).setUp()
2435
2289
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2290
_currentdir = os.getcwdu()
2291
def _leaveDirectory():
2292
os.chdir(_currentdir)
2293
self.addCleanup(_leaveDirectory)
2437
2294
self.makeAndChdirToTestDir()
2438
2295
self.overrideEnvironmentForTesting()
2439
2296
self.__readonly_server = None
2511
2368
if os.path.exists(name):
2512
2369
name = name_prefix + '_' + str(i)
2514
# now create test and home directories within this dir
2515
self.test_base_dir = name
2516
self.addCleanup(self.deleteTestDir)
2517
os.mkdir(self.test_base_dir)
2519
self.permit_dir(self.test_base_dir)
2520
# 'sprouting' and 'init' of a branch both walk up the tree to find
2521
# stacking policy to honour; create a bzr dir with an unshared
2522
# repository (but not a branch - our code would be trying to escape
2523
# then!) to stop them, and permit it to be read.
2524
# control = bzrdir.BzrDir.create(self.test_base_dir)
2525
# control.create_repository()
2373
# now create test and home directories within this dir
2374
self.test_base_dir = name
2526
2375
self.test_home_dir = self.test_base_dir + '/home'
2527
2376
os.mkdir(self.test_home_dir)
2528
2377
self.test_dir = self.test_base_dir + '/work'
3296
3127
if not os.path.isfile(bzr_path):
3297
3128
# We are probably installed. Assume sys.argv is the right file
3298
3129
bzr_path = sys.argv[0]
3299
bzr_path = [bzr_path]
3300
if sys.platform == "win32":
3301
# if we're on windows, we can't execute the bzr script directly
3302
bzr_path = [sys.executable] + bzr_path
3303
3130
fd, test_list_file_name = tempfile.mkstemp()
3304
3131
test_list_file = os.fdopen(fd, 'wb', 1)
3305
3132
for test in process_tests:
3306
3133
test_list_file.write(test.id() + '\n')
3307
3134
test_list_file.close()
3309
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3136
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3311
3138
if '--no-plugins' in sys.argv:
3312
3139
argv.append('--no-plugins')
3352
3179
def addFailure(self, test, err):
3353
3180
self.result.addFailure(test, err)
3354
ForwardingResult = testtools.ExtendedToOriginalDecorator
3183
class BZRTransformingResult(ForwardingResult):
3185
def addError(self, test, err):
3186
feature = self._error_looks_like('UnavailableFeature: ', err)
3187
if feature is not None:
3188
self.result.addNotSupported(test, feature)
3190
self.result.addError(test, err)
3192
def addFailure(self, test, err):
3193
known = self._error_looks_like('KnownFailure: ', err)
3194
if known is not None:
3195
self.result._addKnownFailure(test, [KnownFailure,
3196
KnownFailure(known), None])
3198
self.result.addFailure(test, err)
3200
def _error_looks_like(self, prefix, err):
3201
"""Deserialize exception and returns the stringify value."""
3205
if isinstance(exc, subunit.RemoteException):
3206
# stringify the exception gives access to the remote traceback
3207
# We search the last line for 'prefix'
3208
lines = str(exc).split('\n')
3209
while lines and not lines[-1]:
3212
if lines[-1].startswith(prefix):
3213
value = lines[-1][len(prefix):]
3357
3217
class ProfileResult(ForwardingResult):
3634
3494
'bzrlib.tests.commands',
3635
3495
'bzrlib.tests.per_branch',
3636
3496
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3638
'bzrlib.tests.per_foreign_vcs',
3639
3497
'bzrlib.tests.per_interrepository',
3640
3498
'bzrlib.tests.per_intertree',
3641
3499
'bzrlib.tests.per_inventory',
3642
3500
'bzrlib.tests.per_interbranch',
3643
3501
'bzrlib.tests.per_lock',
3644
'bzrlib.tests.per_merger',
3645
3502
'bzrlib.tests.per_transport',
3646
3503
'bzrlib.tests.per_tree',
3647
3504
'bzrlib.tests.per_pack_repository',
3648
3505
'bzrlib.tests.per_repository',
3649
3506
'bzrlib.tests.per_repository_chk',
3650
3507
'bzrlib.tests.per_repository_reference',
3651
'bzrlib.tests.per_uifactory',
3652
3508
'bzrlib.tests.per_versionedfile',
3653
3509
'bzrlib.tests.per_workingtree',
3654
3510
'bzrlib.tests.test__annotator',
3655
'bzrlib.tests.test__bencode',
3656
3511
'bzrlib.tests.test__chk_map',
3657
3512
'bzrlib.tests.test__dirstate_helpers',
3658
3513
'bzrlib.tests.test__groupcompress',
3659
3514
'bzrlib.tests.test__known_graph',
3660
3515
'bzrlib.tests.test__rio',
3661
'bzrlib.tests.test__simple_set',
3662
'bzrlib.tests.test__static_tuple',
3663
3516
'bzrlib.tests.test__walkdirs_win32',
3664
3517
'bzrlib.tests.test_ancestry',
3665
3518
'bzrlib.tests.test_annotate',
3666
3519
'bzrlib.tests.test_api',
3667
3520
'bzrlib.tests.test_atomicfile',
3668
3521
'bzrlib.tests.test_bad_files',
3522
'bzrlib.tests.test_bencode',
3669
3523
'bzrlib.tests.test_bisect_multi',
3670
3524
'bzrlib.tests.test_branch',
3671
3525
'bzrlib.tests.test_branchbuilder',
4040
3885
return new_test
4043
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4045
"""Helper for permutating tests against an extension module.
4047
This is meant to be used inside a modules 'load_tests()' function. It will
4048
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4049
against both implementations. Setting 'test.module' to the appropriate
4050
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4052
:param standard_tests: A test suite to permute
4053
:param loader: A TestLoader
4054
:param py_module_name: The python path to a python module that can always
4055
be loaded, and will be considered the 'python' implementation. (eg
4056
'bzrlib._chk_map_py')
4057
:param ext_module_name: The python path to an extension module. If the
4058
module cannot be loaded, a single test will be added, which notes that
4059
the module is not available. If it can be loaded, all standard_tests
4060
will be run against that module.
4061
:return: (suite, feature) suite is a test-suite that has all the permuted
4062
tests. feature is the Feature object that can be used to determine if
4063
the module is available.
4066
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4068
('python', {'module': py_module}),
4070
suite = loader.suiteClass()
4071
feature = ModuleAvailableFeature(ext_module_name)
4072
if feature.available():
4073
scenarios.append(('C', {'module': feature.module}))
4075
# the compiled module isn't available, so we add a failing test
4076
class FailWithoutFeature(TestCase):
4077
def test_fail(self):
4078
self.requireFeature(feature)
4079
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4080
result = multiply_tests(standard_tests, scenarios, suite)
4081
return result, feature
4084
def _rmtree_temp_dir(dirname, test_id=None):
3888
def _rmtree_temp_dir(dirname):
4085
3889
# If LANG=C we probably have created some bogus paths
4086
3890
# which rmtree(unicode) will fail to delete
4087
3891
# so make sure we are using rmtree(str) to delete everything
4191
3992
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4194
class _CompatabilityThunkFeature(Feature):
4195
"""This feature is just a thunk to another feature.
4197
It issues a deprecation warning if it is accessed, to let you know that you
4198
should really use a different feature.
4201
def __init__(self, dep_version, module, name,
4202
replacement_name, replacement_module=None):
4203
super(_CompatabilityThunkFeature, self).__init__()
4204
self._module = module
4205
if replacement_module is None:
4206
replacement_module = module
4207
self._replacement_module = replacement_module
4209
self._replacement_name = replacement_name
4210
self._dep_version = dep_version
4211
self._feature = None
4214
if self._feature is None:
4215
depr_msg = self._dep_version % ('%s.%s'
4216
% (self._module, self._name))
4217
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4218
self._replacement_name)
4219
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4220
# Import the new feature and use it as a replacement for the
4222
mod = __import__(self._replacement_module, {}, {},
4223
[self._replacement_name])
4224
self._feature = getattr(mod, self._replacement_name)
4228
return self._feature._probe()
4231
class ModuleAvailableFeature(Feature):
4232
"""This is a feature than describes a module we want to be available.
4234
Declare the name of the module in __init__(), and then after probing, the
4235
module will be available as 'self.module'.
4237
:ivar module: The module if it is available, else None.
4240
def __init__(self, module_name):
4241
super(ModuleAvailableFeature, self).__init__()
4242
self.module_name = module_name
4246
self._module = __import__(self.module_name, {}, {}, [''])
4253
if self.available(): # Make sure the probe has been done
4257
def feature_name(self):
4258
return self.module_name
4261
# This is kept here for compatibility, it is recommended to use
4262
# 'bzrlib.tests.feature.paramiko' instead
4263
ParamikoFeature = _CompatabilityThunkFeature(
4264
deprecated_in((2,1,0)),
4265
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4268
3995
def probe_unicode_in_user_encoding():
4269
3996
"""Try to encode several unicode strings to use in unicode-aware tests.
4270
3997
Return first successfull match.
4350
4077
UTF8Filesystem = _UTF8Filesystem()
4353
class _BreakinFeature(Feature):
4354
"""Does this platform support the breakin feature?"""
4357
from bzrlib import breakin
4358
if breakin.determine_signal() is None:
4360
if sys.platform == 'win32':
4361
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4362
# We trigger SIGBREAK via a Console api so we need ctypes to
4363
# access the function
4370
def feature_name(self):
4371
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4374
BreakinFeature = _BreakinFeature()
4377
4080
class _CaseInsCasePresFilenameFeature(Feature):
4378
4081
"""Is the file-system case insensitive, but case-preserving?"""
4429
4132
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4432
class _CaseSensitiveFilesystemFeature(Feature):
4135
class _SubUnitFeature(Feature):
4136
"""Check if subunit is available."""
4434
4138
def _probe(self):
4435
if CaseInsCasePresFilenameFeature.available():
4437
elif CaseInsensitiveFilesystemFeature.available():
4442
4145
def feature_name(self):
4443
return 'case-sensitive filesystem'
4445
# new coding style is for feature instances to be lowercase
4446
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4449
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4450
SubUnitFeature = _CompatabilityThunkFeature(
4451
deprecated_in((2,1,0)),
4452
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4148
SubUnitFeature = _SubUnitFeature()
4453
4149
# Only define SubUnitBzrRunner if subunit is available.
4455
4151
from subunit import TestProtocolClient
4456
from subunit.test_results import AutoTimingTestResultDecorator
4153
from subunit.test_results import AutoTimingTestResultDecorator
4155
AutoTimingTestResultDecorator = lambda x:x
4457
4156
class SubUnitBzrRunner(TextTestRunner):
4458
4157
def run(self, test):
4459
4158
result = AutoTimingTestResultDecorator(