97
86
from bzrlib.symbol_versioning import (
98
87
DEPRECATED_PARAMETER,
99
88
deprecated_function,
101
89
deprecated_method,
102
90
deprecated_passed,
104
92
import bzrlib.trace
105
from bzrlib.transport import (
93
from bzrlib.transport import get_transport
110
94
import bzrlib.transport
95
from bzrlib.transport.local import LocalURLServer
96
from bzrlib.transport.memory import MemoryServer
97
from bzrlib.transport.readonly import ReadonlyServer
111
98
from bzrlib.trace import mutter, note
112
from bzrlib.tests import (
99
from bzrlib.tests import TestUtil
117
100
from bzrlib.tests.http_server import HttpServer
118
101
from bzrlib.tests.TestUtil import (
105
from bzrlib.tests.treeshape import build_tree_contents
122
106
from bzrlib.ui import NullProgressView
123
107
from bzrlib.ui.text import TextUIFactory
124
108
import bzrlib.version_info_formats.format_custom
242
221
'%s is leaking threads among %d leaking tests.\n' % (
243
222
TestCase._first_thread_leaker_id,
244
223
TestCase._leaking_threads_tests))
245
# We don't report the main thread as an active one.
247
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
250
def getDescription(self, test):
253
def _extractBenchmarkTime(self, testCase, details=None):
225
def _extractBenchmarkTime(self, testCase):
254
226
"""Add a benchmark time for the current test case."""
255
if details and 'benchtime' in details:
256
return float(''.join(details['benchtime'].iter_bytes()))
257
227
return getattr(testCase, "_benchtime", None)
259
229
def _elapsedTestTimeString(self):
322
292
Called from the TestCase run() method when the test
323
293
fails with an unexpected error.
326
unittest.TestResult.addError(self, test, err)
327
self.error_count += 1
328
self.report_error(test, err)
331
self._cleanupLogFile(test)
295
self._testConcluded(test)
296
if isinstance(err[1], TestNotApplicable):
297
return self._addNotApplicable(test, err)
298
elif isinstance(err[1], UnavailableFeature):
299
return self.addNotSupported(test, err[1].args[0])
302
unittest.TestResult.addError(self, test, err)
303
self.error_count += 1
304
self.report_error(test, err)
307
self._cleanupLogFile(test)
333
309
def addFailure(self, test, err):
334
310
"""Tell result that test failed.
336
312
Called from the TestCase run() method when the test
337
313
fails because e.g. an assert() method failed.
340
unittest.TestResult.addFailure(self, test, err)
341
self.failure_count += 1
342
self.report_failure(test, err)
345
self._cleanupLogFile(test)
315
self._testConcluded(test)
316
if isinstance(err[1], KnownFailure):
317
return self._addKnownFailure(test, err)
320
unittest.TestResult.addFailure(self, test, err)
321
self.failure_count += 1
322
self.report_failure(test, err)
325
self._cleanupLogFile(test)
347
def addSuccess(self, test, details=None):
327
def addSuccess(self, test):
348
328
"""Tell result that test completed successfully.
350
330
Called from the TestCase run()
332
self._testConcluded(test)
352
333
if self._bench_history is not None:
353
benchmark_time = self._extractBenchmarkTime(test, details)
334
benchmark_time = self._extractBenchmarkTime(test)
354
335
if benchmark_time is not None:
355
336
self._bench_history.write("%s %s\n" % (
356
337
self._formatTime(benchmark_time),
360
341
unittest.TestResult.addSuccess(self, test)
361
342
test._log_contents = ''
363
def addExpectedFailure(self, test, err):
344
def _testConcluded(self, test):
345
"""Common code when a test has finished.
347
Called regardless of whether it succeded, failed, etc.
351
def _addKnownFailure(self, test, err):
364
352
self.known_failure_count += 1
365
353
self.report_known_failure(test, err)
368
356
"""The test will not be run because of a missing feature.
370
358
# this can be called in two different ways: it may be that the
371
# test started running, and then raised (through requireFeature)
359
# test started running, and then raised (through addError)
372
360
# UnavailableFeature. Alternatively this method can be called
373
# while probing for features before running the test code proper; in
374
# that case we will see startTest and stopTest, but the test will
375
# never actually run.
361
# while probing for features before running the tests; in that
362
# case we will see startTest and stopTest, but the test will never
376
364
self.unsupported.setdefault(str(feature), 0)
377
365
self.unsupported[str(feature)] += 1
378
366
self.report_unsupported(test, feature)
382
370
self.skip_count += 1
383
371
self.report_skip(test, reason)
385
def addNotApplicable(self, test, reason):
386
self.not_applicable_count += 1
387
self.report_not_applicable(test, reason)
373
def _addNotApplicable(self, test, skip_excinfo):
374
if isinstance(skip_excinfo[1], TestNotApplicable):
375
self.not_applicable_count += 1
376
self.report_not_applicable(test, skip_excinfo)
379
except KeyboardInterrupt:
382
self.addError(test, test.exc_info())
384
# seems best to treat this as success from point-of-view of unittest
385
# -- it actually does nothing so it barely matters :)
386
unittest.TestResult.addSuccess(self, test)
387
test._log_contents = ''
389
def printErrorList(self, flavour, errors):
390
for test, err in errors:
391
self.stream.writeln(self.separator1)
392
self.stream.write("%s: " % flavour)
393
self.stream.writeln(self.getDescription(test))
394
if getattr(test, '_get_log', None) is not None:
395
log_contents = test._get_log()
397
self.stream.write('\n')
399
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
400
self.stream.write('\n')
401
self.stream.write(log_contents)
402
self.stream.write('\n')
404
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
405
self.stream.write('\n')
406
self.stream.writeln(self.separator2)
407
self.stream.writeln("%s" % err)
389
409
def _post_mortem(self):
390
410
"""Start a PDB post mortem session."""
490
511
return self._shortened_test_description(test)
492
513
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
514
self.pb.note('ERROR: %s\n %s\n',
494
515
self._test_description(test),
498
519
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
520
self.pb.note('FAIL: %s\n %s\n',
500
521
self._test_description(test),
504
525
def report_known_failure(self, test, err):
526
self.pb.note('XFAIL: %s\n%s\n',
527
self._test_description(test), err[1])
507
529
def report_skip(self, test, reason):
510
def report_not_applicable(self, test, reason):
532
def report_not_applicable(self, test, skip_excinfo):
513
535
def report_unsupported(self, test, feature):
535
557
def report_test_start(self, test):
537
559
name = self._shortened_test_description(test)
538
width = osutils.terminal_width()
539
if width is not None:
540
# width needs space for 6 char status, plus 1 for slash, plus an
541
# 11-char time string, plus a trailing blank
542
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
544
self.stream.write(self._ellipsize_to_right(name, width-18))
546
self.stream.write(name)
560
# width needs space for 6 char status, plus 1 for slash, plus an
561
# 11-char time string, plus a trailing blank
562
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
563
self.stream.write(self._ellipsize_to_right(name,
564
osutils.terminal_width()-18))
547
565
self.stream.flush()
549
567
def _error_summary(self, err):
578
596
self.stream.writeln(' SKIP %s\n%s'
579
597
% (self._testTimeString(test), reason))
581
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
583
% (self._testTimeString(test), reason))
599
def report_not_applicable(self, test, skip_excinfo):
600
self.stream.writeln(' N/A %s\n%s'
601
% (self._testTimeString(test),
602
self._error_summary(skip_excinfo)))
585
604
def report_unsupported(self, test, feature):
586
605
"""test cannot be run because feature is missing."""
606
625
applied left to right - the first element in the list is the
607
626
innermost decorator.
609
# stream may know claim to know to write unicode strings, but in older
610
# pythons this goes sufficiently wrong that it is a bad idea. (
611
# specifically a built in file with encoding 'UTF-8' will still try
612
# to encode using ascii.
613
new_encoding = osutils.get_terminal_encoding()
614
codec = codecs.lookup(new_encoding)
615
if type(codec) is tuple:
619
encode = codec.encode
620
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
621
stream.encoding = new_encoding
622
628
self.stream = unittest._WritelnDecorator(stream)
623
629
self.descriptions = descriptions
624
630
self.verbosity = verbosity
682
# traceback._some_str fails to format exceptions that have the default
683
# __str__ which does an implicit ascii conversion. However, repr() on those
684
# objects works, for all that its not quite what the doctor may have ordered.
685
def _clever_some_str(value):
690
return repr(value).replace('\\n', '\n')
692
return '<unprintable %s object>' % type(value).__name__
694
traceback._some_str = _clever_some_str
697
# deprecated - use self.knownFailure(), or self.expectFailure.
698
KnownFailure = testtools.testcase._ExpectedFailure
697
class KnownFailure(AssertionError):
698
"""Indicates that a test failed in a precisely expected manner.
700
Such failures dont block the whole test suite from passing because they are
701
indicators of partially completed code or of future work. We have an
702
explicit error for them so that we can ensure that they are always visible:
703
KnownFailures are always shown in the output of bzr selftest.
701
707
class UnavailableFeature(Exception):
702
708
"""A feature required for this test was not available.
704
This can be considered a specialised form of SkippedTest.
706
710
The feature should be used to construct the exception.
714
class CommandFailed(Exception):
710
718
class StringIOWrapper(object):
711
719
"""A wrapper around cStringIO which just adds an encoding attribute.
798
806
_leaking_threads_tests = 0
799
807
_first_thread_leaker_id = None
800
808
_log_file_name = None
810
_keep_log_file = False
801
811
# record lsprof data when performing benchmark calls.
802
812
_gather_lsprof_in_benchmarks = False
813
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
814
'_log_contents', '_log_file_name', '_benchtime',
815
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
804
817
def __init__(self, methodName='testMethod'):
805
818
super(TestCase, self).__init__(methodName)
806
819
self._cleanups = []
807
self._directory_isolation = True
808
self.exception_handlers.insert(0,
809
(UnavailableFeature, self._do_unsupported_or_skip))
810
self.exception_handlers.insert(0,
811
(TestNotApplicable, self._do_not_applicable))
820
self._bzr_test_setUp_run = False
821
self._bzr_test_tearDown_run = False
814
super(TestCase, self).setUp()
815
for feature in getattr(self, '_test_needs_features', []):
816
self.requireFeature(feature)
817
self._log_contents = None
818
self.addDetail("log", content.Content(content.ContentType("text",
819
"plain", {"charset": "utf8"}),
820
lambda:[self._get_log(keep_log_file=True)]))
824
unittest.TestCase.setUp(self)
825
self._bzr_test_setUp_run = True
821
826
self._cleanEnvironment()
822
827
self._silenceUI()
823
828
self._startLogFile()
824
829
self._benchcalls = []
825
830
self._benchtime = None
826
831
self._clear_hooks()
827
self._track_transports()
828
832
self._track_locks()
829
833
self._clear_debug_flags()
830
834
TestCase._active_threads = threading.activeCount()
839
843
active = threading.activeCount()
840
844
leaked_threads = active - TestCase._active_threads
841
845
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
847
TestCase._leaking_threads_tests += 1
850
848
if TestCase._first_thread_leaker_id is None:
851
849
TestCase._first_thread_leaker_id = self.id()
856
854
Tests that want to use debug flags can just set them in the
857
855
debug_flags set during setup/teardown.
859
# Start with a copy of the current debug flags we can safely modify.
860
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
857
self._preserved_debug_flags = set(debug.debug_flags)
861
858
if 'allow_debug' not in selftest_debug_flags:
862
859
debug.debug_flags.clear()
863
860
if 'disable_lock_checks' not in selftest_debug_flags:
864
861
debug.debug_flags.add('strict_locks')
862
self.addCleanup(self._restore_debug_flags)
866
864
def _clear_hooks(self):
867
865
# prevent hooks affecting tests
877
875
# this hook should always be installed
878
876
request._install_hook()
880
def disable_directory_isolation(self):
881
"""Turn off directory isolation checks."""
882
self._directory_isolation = False
884
def enable_directory_isolation(self):
885
"""Enable directory isolation checks."""
886
self._directory_isolation = True
888
878
def _silenceUI(self):
889
879
"""Turn off UI for duration of test"""
890
880
# by default the UI is off; tests can turn it on if they want it.
891
self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
881
saved = ui.ui_factory
883
ui.ui_factory = saved
884
ui.ui_factory = ui.SilentUIFactory()
885
self.addCleanup(_restore)
893
887
def _check_locks(self):
894
888
"""Check that all lock take/release actions have been paired."""
941
935
def _lock_broken(self, result):
942
936
self._lock_actions.append(('broken', result))
944
def permit_dir(self, name):
945
"""Permit a directory to be used by this test. See permit_url."""
946
name_transport = get_transport(name)
947
self.permit_url(name)
948
self.permit_url(name_transport.base)
950
def permit_url(self, url):
951
"""Declare that url is an ok url to use in this test.
953
Do this for memory transports, temporary test directory etc.
955
Do not do this for the current working directory, /tmp, or any other
956
preexisting non isolated url.
958
if not url.endswith('/'):
960
self._bzr_selftest_roots.append(url)
962
def permit_source_tree_branch_repo(self):
963
"""Permit the source tree bzr is running from to be opened.
965
Some code such as bzrlib.version attempts to read from the bzr branch
966
that bzr is executing from (if any). This method permits that directory
967
to be used in the test suite.
969
path = self.get_source_path()
970
self.record_directory_isolation()
973
workingtree.WorkingTree.open(path)
974
except (errors.NotBranchError, errors.NoWorkingTree):
977
self.enable_directory_isolation()
979
def _preopen_isolate_transport(self, transport):
980
"""Check that all transport openings are done in the test work area."""
981
while isinstance(transport, pathfilter.PathFilteringTransport):
982
# Unwrap pathfiltered transports
983
transport = transport.server.backing_transport.clone(
984
transport._filter('.'))
986
# ReadonlySmartTCPServer_for_testing decorates the backing transport
987
# urls it is given by prepending readonly+. This is appropriate as the
988
# client shouldn't know that the server is readonly (or not readonly).
989
# We could register all servers twice, with readonly+ prepending, but
990
# that makes for a long list; this is about the same but easier to
992
if url.startswith('readonly+'):
993
url = url[len('readonly+'):]
994
self._preopen_isolate_url(url)
996
def _preopen_isolate_url(self, url):
997
if not self._directory_isolation:
999
if self._directory_isolation == 'record':
1000
self._bzr_selftest_roots.append(url)
1002
# This prevents all transports, including e.g. sftp ones backed on disk
1003
# from working unless they are explicitly granted permission. We then
1004
# depend on the code that sets up test transports to check that they are
1005
# appropriately isolated and enable their use by calling
1006
# self.permit_transport()
1007
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1008
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1009
% (url, self._bzr_selftest_roots))
1011
def record_directory_isolation(self):
1012
"""Gather accessed directories to permit later access.
1014
This is used for tests that access the branch bzr is running from.
1016
self._directory_isolation = "record"
1018
938
def start_server(self, transport_server, backing_server=None):
1019
939
"""Start transport_server for this test.
1022
942
server's urls to be used.
1024
944
if backing_server is None:
1025
transport_server.start_server()
945
transport_server.setUp()
1027
transport_server.start_server(backing_server)
1028
self.addCleanup(transport_server.stop_server)
1029
# Obtain a real transport because if the server supplies a password, it
1030
# will be hidden from the base on the client side.
1031
t = get_transport(transport_server.get_url())
1032
# Some transport servers effectively chroot the backing transport;
1033
# others like SFTPServer don't - users of the transport can walk up the
1034
# transport to read the entire backing transport. This wouldn't matter
1035
# except that the workdir tests are given - and that they expect the
1036
# server's url to point at - is one directory under the safety net. So
1037
# Branch operations into the transport will attempt to walk up one
1038
# directory. Chrooting all servers would avoid this but also mean that
1039
# we wouldn't be testing directly against non-root urls. Alternatively
1040
# getting the test framework to start the server with a backing server
1041
# at the actual safety net directory would work too, but this then
1042
# means that the self.get_url/self.get_transport methods would need
1043
# to transform all their results. On balance its cleaner to handle it
1044
# here, and permit a higher url when we have one of these transports.
1045
if t.base.endswith('/work/'):
1046
# we have safety net/test root/work
1047
t = t.clone('../..')
1048
elif isinstance(transport_server,
1049
test_server.SmartTCPServer_for_testing):
1050
# The smart server adds a path similar to work, which is traversed
1051
# up from by the client. But the server is chrooted - the actual
1052
# backing transport is not escaped from, and VFS requests to the
1053
# root will error (because they try to escape the chroot).
1055
while t2.base != t.base:
1058
self.permit_url(t.base)
1060
def _track_transports(self):
1061
"""Install checks for transport usage."""
1062
# TestCase has no safe place it can write to.
1063
self._bzr_selftest_roots = []
1064
# Currently the easiest way to be sure that nothing is going on is to
1065
# hook into bzr dir opening. This leaves a small window of error for
1066
# transport tests, but they are well known, and we can improve on this
1068
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1069
self._preopen_isolate_transport, "Check bzr directories are safe.")
947
transport_server.setUp(backing_server)
948
self.addCleanup(transport_server.tearDown)
1071
950
def _ndiff_strings(self, a, b):
1072
951
"""Return ndiff between two strings containing lines.
1129
1008
:raises AssertionError: If the expected and actual stat values differ
1130
1009
other than by atime.
1132
self.assertEqual(expected.st_size, actual.st_size,
1133
'st_size did not match')
1134
self.assertEqual(expected.st_mtime, actual.st_mtime,
1135
'st_mtime did not match')
1136
self.assertEqual(expected.st_ctime, actual.st_ctime,
1137
'st_ctime did not match')
1138
if sys.platform != 'win32':
1139
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1140
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1143
self.assertEqual(expected.st_dev, actual.st_dev,
1144
'st_dev did not match')
1145
self.assertEqual(expected.st_ino, actual.st_ino,
1146
'st_ino did not match')
1147
self.assertEqual(expected.st_mode, actual.st_mode,
1148
'st_mode did not match')
1011
self.assertEqual(expected.st_size, actual.st_size)
1012
self.assertEqual(expected.st_mtime, actual.st_mtime)
1013
self.assertEqual(expected.st_ctime, actual.st_ctime)
1014
self.assertEqual(expected.st_dev, actual.st_dev)
1015
self.assertEqual(expected.st_ino, actual.st_ino)
1016
self.assertEqual(expected.st_mode, actual.st_mode)
1150
1018
def assertLength(self, length, obj_with_len):
1151
1019
"""Assert that obj_with_len is of length length."""
1153
1021
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1154
1022
length, len(obj_with_len), obj_with_len))
1156
def assertLogsError(self, exception_class, func, *args, **kwargs):
1157
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1159
from bzrlib import trace
1161
orig_log_exception_quietly = trace.log_exception_quietly
1164
orig_log_exception_quietly()
1165
captured.append(sys.exc_info())
1166
trace.log_exception_quietly = capture
1167
func(*args, **kwargs)
1169
trace.log_exception_quietly = orig_log_exception_quietly
1170
self.assertLength(1, captured)
1171
err = captured[0][1]
1172
self.assertIsInstance(err, exception_class)
1175
1024
def assertPositive(self, val):
1176
1025
"""Assert that val is greater than 0."""
1177
1026
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1303
1148
m += ": " + msg
1151
def expectFailure(self, reason, assertion, *args, **kwargs):
1152
"""Invoke a test, expecting it to fail for the given reason.
1154
This is for assertions that ought to succeed, but currently fail.
1155
(The failure is *expected* but not *wanted*.) Please be very precise
1156
about the failure you're expecting. If a new bug is introduced,
1157
AssertionError should be raised, not KnownFailure.
1159
Frequently, expectFailure should be followed by an opposite assertion.
1162
Intended to be used with a callable that raises AssertionError as the
1163
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1165
Raises KnownFailure if the test fails. Raises AssertionError if the
1170
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1172
self.assertEqual(42, dynamic_val)
1174
This means that a dynamic_val of 54 will cause the test to raise
1175
a KnownFailure. Once math is fixed and the expectFailure is removed,
1176
only a dynamic_val of 42 will allow the test to pass. Anything other
1177
than 54 or 42 will cause an AssertionError.
1180
assertion(*args, **kwargs)
1181
except AssertionError:
1182
raise KnownFailure(reason)
1184
self.fail('Unexpected success. Should have failed: %s' % reason)
1306
1186
def assertFileEqual(self, content, path):
1307
1187
"""Fail if path does not contain 'content'."""
1308
1188
self.failUnlessExists(path)
1314
1194
self.assertEqualDiff(content, s)
1316
def assertDocstring(self, expected_docstring, obj):
1317
"""Fail if obj does not have expected_docstring"""
1319
# With -OO the docstring should be None instead
1320
self.assertIs(obj.__doc__, None)
1322
self.assertEqual(expected_docstring, obj.__doc__)
1324
1196
def failUnlessExists(self, path):
1325
1197
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1198
if not isinstance(path, basestring):
1467
1339
Close the file and delete it, unless setKeepLogfile was called.
1469
if bzrlib.trace._trace_file:
1470
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1341
if self._log_file is None:
1472
1343
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1344
self._log_file.close()
1345
self._log_file = None
1346
if not self._keep_log_file:
1347
os.remove(self._log_file_name)
1348
self._log_file_name = None
1350
def setKeepLogfile(self):
1351
"""Make the logfile not be deleted when _finishLogFile is called."""
1352
self._keep_log_file = True
1476
1354
def thisFailsStrictLockCheck(self):
1477
1355
"""It is known that this test would fail with -Dstrict_locks.
1495
1373
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
1516
1375
def _cleanEnvironment(self):
1518
1377
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1556
1411
'ftp_proxy': None,
1557
1412
'FTP_PROXY': None,
1558
1413
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
1416
self.addCleanup(self._restoreEnvironment)
1567
1417
for name, value in new_env.iteritems():
1568
1418
self._captureVar(name, value)
1570
1420
def _captureVar(self, name, newvalue):
1571
1421
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1422
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1424
def _restore_debug_flags(self):
1425
debug.debug_flags.clear()
1426
debug.debug_flags.update(self._preserved_debug_flags)
1574
1428
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1429
for name, value in self.__old_env.iteritems():
1576
1430
osutils.set_or_unset_env(name, value)
1578
1432
def _restoreHooks(self):
1586
1440
def _do_skip(self, result, reason):
1587
1441
addSkip = getattr(result, 'addSkip', None)
1588
1442
if not callable(addSkip):
1589
result.addSuccess(result)
1443
result.addError(self, sys.exc_info())
1591
1445
addSkip(self, reason)
1594
def _do_known_failure(self, result, e):
1595
err = sys.exc_info()
1596
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1597
if addExpectedFailure is not None:
1598
addExpectedFailure(self, err)
1600
result.addSuccess(self)
1603
def _do_not_applicable(self, result, e):
1605
reason = 'No reason given'
1608
addNotApplicable = getattr(result, 'addNotApplicable', None)
1609
if addNotApplicable is not None:
1610
result.addNotApplicable(self, reason)
1612
self._do_skip(result, reason)
1615
def _do_unsupported_or_skip(self, result, e):
1617
addNotSupported = getattr(result, 'addNotSupported', None)
1618
if addNotSupported is not None:
1619
result.addNotSupported(self, reason)
1621
self._do_skip(result, reason)
1447
def run(self, result=None):
1448
if result is None: result = self.defaultTestResult()
1449
for feature in getattr(self, '_test_needs_features', []):
1450
if not feature.available():
1451
result.startTest(self)
1452
if getattr(result, 'addNotSupported', None):
1453
result.addNotSupported(self, feature)
1455
result.addSuccess(self)
1456
result.stopTest(self)
1460
result.startTest(self)
1461
absent_attr = object()
1463
method_name = getattr(self, '_testMethodName', absent_attr)
1464
if method_name is absent_attr:
1466
method_name = getattr(self, '_TestCase__testMethodName')
1467
testMethod = getattr(self, method_name)
1471
if not self._bzr_test_setUp_run:
1473
"test setUp did not invoke "
1474
"bzrlib.tests.TestCase's setUp")
1475
except KeyboardInterrupt:
1478
except TestSkipped, e:
1479
self._do_skip(result, e.args[0])
1483
result.addError(self, sys.exc_info())
1491
except self.failureException:
1492
result.addFailure(self, sys.exc_info())
1493
except TestSkipped, e:
1495
reason = "No reason given."
1498
self._do_skip(result, reason)
1499
except KeyboardInterrupt:
1503
result.addError(self, sys.exc_info())
1507
if not self._bzr_test_tearDown_run:
1509
"test tearDown did not invoke "
1510
"bzrlib.tests.TestCase's tearDown")
1511
except KeyboardInterrupt:
1515
result.addError(self, sys.exc_info())
1518
if ok: result.addSuccess(self)
1520
result.stopTest(self)
1522
except TestNotApplicable:
1523
# Not moved from the result [yet].
1526
except KeyboardInterrupt:
1531
for attr_name in self.attrs_to_keep:
1532
if attr_name in self.__dict__:
1533
saved_attrs[attr_name] = self.__dict__[attr_name]
1534
self.__dict__ = saved_attrs
1538
self._log_contents = ''
1539
self._bzr_test_tearDown_run = True
1540
unittest.TestCase.tearDown(self)
1623
1542
def time(self, callable, *args, **kwargs):
1624
1543
"""Run callable and accrue the time it takes to the benchmark time.
1645
1562
self._benchtime += time.time() - start
1564
def _runCleanups(self):
1565
"""Run registered cleanup functions.
1567
This should only be called from TestCase.tearDown.
1569
# TODO: Perhaps this should keep running cleanups even if
1570
# one of them fails?
1572
# Actually pop the cleanups from the list so tearDown running
1573
# twice is safe (this happens for skipped tests).
1574
while self._cleanups:
1575
cleanup, args, kwargs = self._cleanups.pop()
1576
cleanup(*args, **kwargs)
1647
1578
def log(self, *args):
1650
1581
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1582
"""Get the log from bzrlib.trace calls from this test.
1656
1584
:param keep_log_file: When True, if the log is still a file on disk
1657
1585
leave it as a file on disk. When False, if the log is still a file
1659
1587
self._log_contents.
1660
1588
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1590
# flush the log file, to get all content
1592
if bzrlib.trace._trace_file:
1593
bzrlib.trace._trace_file.flush()
1594
if self._log_contents:
1595
# XXX: this can hardly contain the content flushed above --vila
1668
1597
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
1598
if self._log_file_name is not None:
1674
1599
logfile = open(self._log_file_name)
1676
1601
log_contents = logfile.read()
1678
1603
logfile.close()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
1604
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
1605
self._log_contents = log_contents
1717
1607
os.remove(self._log_file_name)
1779
1658
os.chdir(working_dir)
1783
result = self.apply_redirected(ui.ui_factory.stdin,
1785
bzrlib.commands.run_bzr_catch_user_errors,
1787
except KeyboardInterrupt:
1788
# Reraise KeyboardInterrupt with contents of redirected stdout
1789
# and stderr as arguments, for tests which are interested in
1790
# stdout and stderr and are expecting the exception.
1791
out = stdout.getvalue()
1792
err = stderr.getvalue()
1794
self.log('output:\n%r', out)
1796
self.log('errors:\n%r', err)
1797
raise KeyboardInterrupt(out, err)
1661
result = self.apply_redirected(ui.ui_factory.stdin,
1663
bzrlib.commands.run_bzr_catch_user_errors,
1799
1666
logger.removeHandler(handler)
1800
1667
ui.ui_factory = old_ui_factory
2342
2210
# recreate a new one or all the followng tests will fail.
2343
2211
# If you need to inspect its content uncomment the following line
2344
2212
# import pdb; pdb.set_trace()
2345
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2213
_rmtree_temp_dir(root + '/.bzr')
2346
2214
self._create_safety_net()
2347
2215
raise AssertionError('%s/.bzr should not be modified' % root)
2349
2217
def _make_test_root(self):
2350
2218
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2351
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2352
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2219
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
2354
2220
TestCaseWithMemoryTransport.TEST_ROOT = root
2356
2222
self._create_safety_net()
2424
2288
return branchbuilder.BranchBuilder(branch=branch)
2426
2290
def overrideEnvironmentForTesting(self):
2427
test_home_dir = self.test_home_dir
2428
if isinstance(test_home_dir, unicode):
2429
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2291
os.environ['HOME'] = self.test_home_dir
2292
os.environ['BZR_HOME'] = self.test_home_dir
2433
2294
def setUp(self):
2434
2295
super(TestCaseWithMemoryTransport, self).setUp()
2435
2296
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2297
_currentdir = os.getcwdu()
2298
def _leaveDirectory():
2299
os.chdir(_currentdir)
2300
self.addCleanup(_leaveDirectory)
2437
2301
self.makeAndChdirToTestDir()
2438
2302
self.overrideEnvironmentForTesting()
2439
2303
self.__readonly_server = None
2511
2375
if os.path.exists(name):
2512
2376
name = name_prefix + '_' + str(i)
2514
# now create test and home directories within this dir
2515
self.test_base_dir = name
2516
self.addCleanup(self.deleteTestDir)
2517
os.mkdir(self.test_base_dir)
2519
self.permit_dir(self.test_base_dir)
2520
# 'sprouting' and 'init' of a branch both walk up the tree to find
2521
# stacking policy to honour; create a bzr dir with an unshared
2522
# repository (but not a branch - our code would be trying to escape
2523
# then!) to stop them, and permit it to be read.
2524
# control = bzrdir.BzrDir.create(self.test_base_dir)
2525
# control.create_repository()
2380
# now create test and home directories within this dir
2381
self.test_base_dir = name
2526
2382
self.test_home_dir = self.test_base_dir + '/home'
2527
2383
os.mkdir(self.test_home_dir)
2528
2384
self.test_dir = self.test_base_dir + '/work'
3295
3134
if not os.path.isfile(bzr_path):
3296
3135
# We are probably installed. Assume sys.argv is the right file
3297
3136
bzr_path = sys.argv[0]
3298
bzr_path = [bzr_path]
3299
if sys.platform == "win32":
3300
# if we're on windows, we can't execute the bzr script directly
3301
bzr_path = [sys.executable] + bzr_path
3302
3137
fd, test_list_file_name = tempfile.mkstemp()
3303
3138
test_list_file = os.fdopen(fd, 'wb', 1)
3304
3139
for test in process_tests:
3305
3140
test_list_file.write(test.id() + '\n')
3306
3141
test_list_file.close()
3308
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3143
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3310
3145
if '--no-plugins' in sys.argv:
3311
3146
argv.append('--no-plugins')
3351
3186
def addFailure(self, test, err):
3352
3187
self.result.addFailure(test, err)
3353
ForwardingResult = testtools.ExtendedToOriginalDecorator
3190
class BZRTransformingResult(ForwardingResult):
3192
def addError(self, test, err):
3193
feature = self._error_looks_like('UnavailableFeature: ', err)
3194
if feature is not None:
3195
self.result.addNotSupported(test, feature)
3197
self.result.addError(test, err)
3199
def addFailure(self, test, err):
3200
known = self._error_looks_like('KnownFailure: ', err)
3201
if known is not None:
3202
self.result._addKnownFailure(test, [KnownFailure,
3203
KnownFailure(known), None])
3205
self.result.addFailure(test, err)
3207
def _error_looks_like(self, prefix, err):
3208
"""Deserialize exception and returns the stringify value."""
3212
if isinstance(exc, subunit.RemoteException):
3213
# stringify the exception gives access to the remote traceback
3214
# We search the last line for 'prefix'
3215
lines = str(exc).split('\n')
3216
while lines and not lines[-1]:
3219
if lines[-1].startswith(prefix):
3220
value = lines[-1][len(prefix):]
3356
3224
class ProfileResult(ForwardingResult):
3633
3501
'bzrlib.tests.commands',
3634
3502
'bzrlib.tests.per_branch',
3635
3503
'bzrlib.tests.per_bzrdir',
3636
'bzrlib.tests.per_bzrdir_colo',
3637
'bzrlib.tests.per_foreign_vcs',
3638
3504
'bzrlib.tests.per_interrepository',
3639
3505
'bzrlib.tests.per_intertree',
3640
3506
'bzrlib.tests.per_inventory',
3641
3507
'bzrlib.tests.per_interbranch',
3642
3508
'bzrlib.tests.per_lock',
3643
'bzrlib.tests.per_merger',
3644
3509
'bzrlib.tests.per_transport',
3645
3510
'bzrlib.tests.per_tree',
3646
3511
'bzrlib.tests.per_pack_repository',
3647
3512
'bzrlib.tests.per_repository',
3648
3513
'bzrlib.tests.per_repository_chk',
3649
3514
'bzrlib.tests.per_repository_reference',
3650
'bzrlib.tests.per_uifactory',
3651
3515
'bzrlib.tests.per_versionedfile',
3652
3516
'bzrlib.tests.per_workingtree',
3653
3517
'bzrlib.tests.test__annotator',
3654
'bzrlib.tests.test__bencode',
3655
3518
'bzrlib.tests.test__chk_map',
3656
3519
'bzrlib.tests.test__dirstate_helpers',
3657
3520
'bzrlib.tests.test__groupcompress',
3658
3521
'bzrlib.tests.test__known_graph',
3659
3522
'bzrlib.tests.test__rio',
3660
'bzrlib.tests.test__simple_set',
3661
'bzrlib.tests.test__static_tuple',
3662
3523
'bzrlib.tests.test__walkdirs_win32',
3663
3524
'bzrlib.tests.test_ancestry',
3664
3525
'bzrlib.tests.test_annotate',
3665
3526
'bzrlib.tests.test_api',
3666
3527
'bzrlib.tests.test_atomicfile',
3667
3528
'bzrlib.tests.test_bad_files',
3529
'bzrlib.tests.test_bencode',
3668
3530
'bzrlib.tests.test_bisect_multi',
3669
3531
'bzrlib.tests.test_branch',
3670
3532
'bzrlib.tests.test_branchbuilder',
4039
3892
return new_test
4042
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4044
"""Helper for permutating tests against an extension module.
4046
This is meant to be used inside a modules 'load_tests()' function. It will
4047
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4048
against both implementations. Setting 'test.module' to the appropriate
4049
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4051
:param standard_tests: A test suite to permute
4052
:param loader: A TestLoader
4053
:param py_module_name: The python path to a python module that can always
4054
be loaded, and will be considered the 'python' implementation. (eg
4055
'bzrlib._chk_map_py')
4056
:param ext_module_name: The python path to an extension module. If the
4057
module cannot be loaded, a single test will be added, which notes that
4058
the module is not available. If it can be loaded, all standard_tests
4059
will be run against that module.
4060
:return: (suite, feature) suite is a test-suite that has all the permuted
4061
tests. feature is the Feature object that can be used to determine if
4062
the module is available.
4065
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4067
('python', {'module': py_module}),
4069
suite = loader.suiteClass()
4070
feature = ModuleAvailableFeature(ext_module_name)
4071
if feature.available():
4072
scenarios.append(('C', {'module': feature.module}))
4074
# the compiled module isn't available, so we add a failing test
4075
class FailWithoutFeature(TestCase):
4076
def test_fail(self):
4077
self.requireFeature(feature)
4078
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4079
result = multiply_tests(standard_tests, scenarios, suite)
4080
return result, feature
4083
def _rmtree_temp_dir(dirname, test_id=None):
3895
def _rmtree_temp_dir(dirname):
4084
3896
# If LANG=C we probably have created some bogus paths
4085
3897
# which rmtree(unicode) will fail to delete
4086
3898
# so make sure we are using rmtree(str) to delete everything
4190
3999
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4193
class _CompatabilityThunkFeature(Feature):
4194
"""This feature is just a thunk to another feature.
4196
It issues a deprecation warning if it is accessed, to let you know that you
4197
should really use a different feature.
4200
def __init__(self, dep_version, module, name,
4201
replacement_name, replacement_module=None):
4202
super(_CompatabilityThunkFeature, self).__init__()
4203
self._module = module
4204
if replacement_module is None:
4205
replacement_module = module
4206
self._replacement_module = replacement_module
4208
self._replacement_name = replacement_name
4209
self._dep_version = dep_version
4210
self._feature = None
4213
if self._feature is None:
4214
depr_msg = self._dep_version % ('%s.%s'
4215
% (self._module, self._name))
4216
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4217
self._replacement_name)
4218
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4219
# Import the new feature and use it as a replacement for the
4221
mod = __import__(self._replacement_module, {}, {},
4222
[self._replacement_name])
4223
self._feature = getattr(mod, self._replacement_name)
4227
return self._feature._probe()
4230
class ModuleAvailableFeature(Feature):
4231
"""This is a feature than describes a module we want to be available.
4233
Declare the name of the module in __init__(), and then after probing, the
4234
module will be available as 'self.module'.
4236
:ivar module: The module if it is available, else None.
4239
def __init__(self, module_name):
4240
super(ModuleAvailableFeature, self).__init__()
4241
self.module_name = module_name
4245
self._module = __import__(self.module_name, {}, {}, [''])
4252
if self.available(): # Make sure the probe has been done
4256
def feature_name(self):
4257
return self.module_name
4260
# This is kept here for compatibility, it is recommended to use
4261
# 'bzrlib.tests.feature.paramiko' instead
4262
ParamikoFeature = _CompatabilityThunkFeature(
4263
deprecated_in((2,1,0)),
4264
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4267
4002
def probe_unicode_in_user_encoding():
4268
4003
"""Try to encode several unicode strings to use in unicode-aware tests.
4269
4004
Return first successfull match.
4349
4084
UTF8Filesystem = _UTF8Filesystem()
4352
class _BreakinFeature(Feature):
4353
"""Does this platform support the breakin feature?"""
4356
from bzrlib import breakin
4357
if breakin.determine_signal() is None:
4359
if sys.platform == 'win32':
4360
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4361
# We trigger SIGBREAK via a Console api so we need ctypes to
4362
# access the function
4369
def feature_name(self):
4370
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4373
BreakinFeature = _BreakinFeature()
4376
4087
class _CaseInsCasePresFilenameFeature(Feature):
4377
4088
"""Is the file-system case insensitive, but case-preserving?"""
4428
4139
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4431
class _CaseSensitiveFilesystemFeature(Feature):
4142
class _SubUnitFeature(Feature):
4143
"""Check if subunit is available."""
4433
4145
def _probe(self):
4434
if CaseInsCasePresFilenameFeature.available():
4436
elif CaseInsensitiveFilesystemFeature.available():
4441
4152
def feature_name(self):
4442
return 'case-sensitive filesystem'
4444
# new coding style is for feature instances to be lowercase
4445
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4448
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4449
SubUnitFeature = _CompatabilityThunkFeature(
4450
deprecated_in((2,1,0)),
4451
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4155
SubUnitFeature = _SubUnitFeature()
4452
4156
# Only define SubUnitBzrRunner if subunit is available.
4454
4158
from subunit import TestProtocolClient
4455
from subunit.test_results import AutoTimingTestResultDecorator
4160
from subunit.test_results import AutoTimingTestResultDecorator
4162
AutoTimingTestResultDecorator = lambda x:x
4456
4163
class SubUnitBzrRunner(TextTestRunner):
4457
4164
def run(self, test):
4458
4165
result = AutoTimingTestResultDecorator(