87
97
from bzrlib.symbol_versioning import (
88
98
DEPRECATED_PARAMETER,
89
99
deprecated_function,
90
101
deprecated_method,
91
102
deprecated_passed,
93
104
import bzrlib.trace
94
from bzrlib.transport import get_transport, pathfilter
105
from bzrlib.transport import (
95
110
import bzrlib.transport
96
from bzrlib.transport.local import LocalURLServer
97
from bzrlib.transport.memory import MemoryServer
98
from bzrlib.transport.readonly import ReadonlyServer
99
111
from bzrlib.trace import mutter, note
100
from bzrlib.tests import TestUtil
112
from bzrlib.tests import (
101
117
from bzrlib.tests.http_server import HttpServer
102
118
from bzrlib.tests.TestUtil import (
106
from bzrlib.tests.treeshape import build_tree_contents
107
122
from bzrlib.ui import NullProgressView
108
123
from bzrlib.ui.text import TextUIFactory
109
124
import bzrlib.version_info_formats.format_custom
222
242
'%s is leaking threads among %d leaking tests.\n' % (
223
243
TestCase._first_thread_leaker_id,
224
244
TestCase._leaking_threads_tests))
226
def _extractBenchmarkTime(self, testCase):
245
# We don't report the main thread as an active one.
247
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
250
def getDescription(self, test):
253
def _extractBenchmarkTime(self, testCase, details=None):
227
254
"""Add a benchmark time for the current test case."""
255
if details and 'benchtime' in details:
256
return float(''.join(details['benchtime'].iter_bytes()))
228
257
return getattr(testCase, "_benchtime", None)
230
259
def _elapsedTestTimeString(self):
293
322
Called from the TestCase run() method when the test
294
323
fails with an unexpected error.
296
self._testConcluded(test)
297
if isinstance(err[1], TestNotApplicable):
298
return self._addNotApplicable(test, err)
299
elif isinstance(err[1], UnavailableFeature):
300
return self.addNotSupported(test, err[1].args[0])
303
unittest.TestResult.addError(self, test, err)
304
self.error_count += 1
305
self.report_error(test, err)
308
self._cleanupLogFile(test)
326
unittest.TestResult.addError(self, test, err)
327
self.error_count += 1
328
self.report_error(test, err)
331
self._cleanupLogFile(test)
310
333
def addFailure(self, test, err):
311
334
"""Tell result that test failed.
313
336
Called from the TestCase run() method when the test
314
337
fails because e.g. an assert() method failed.
316
self._testConcluded(test)
317
if isinstance(err[1], KnownFailure):
318
return self._addKnownFailure(test, err)
321
unittest.TestResult.addFailure(self, test, err)
322
self.failure_count += 1
323
self.report_failure(test, err)
326
self._cleanupLogFile(test)
340
unittest.TestResult.addFailure(self, test, err)
341
self.failure_count += 1
342
self.report_failure(test, err)
345
self._cleanupLogFile(test)
328
def addSuccess(self, test):
347
def addSuccess(self, test, details=None):
329
348
"""Tell result that test completed successfully.
331
350
Called from the TestCase run()
333
self._testConcluded(test)
334
352
if self._bench_history is not None:
335
benchmark_time = self._extractBenchmarkTime(test)
353
benchmark_time = self._extractBenchmarkTime(test, details)
336
354
if benchmark_time is not None:
337
355
self._bench_history.write("%s %s\n" % (
338
356
self._formatTime(benchmark_time),
342
360
unittest.TestResult.addSuccess(self, test)
343
361
test._log_contents = ''
345
def _testConcluded(self, test):
346
"""Common code when a test has finished.
348
Called regardless of whether it succeded, failed, etc.
352
def _addKnownFailure(self, test, err):
363
def addExpectedFailure(self, test, err):
353
364
self.known_failure_count += 1
354
365
self.report_known_failure(test, err)
357
368
"""The test will not be run because of a missing feature.
359
370
# this can be called in two different ways: it may be that the
360
# test started running, and then raised (through addError)
371
# test started running, and then raised (through requireFeature)
361
372
# UnavailableFeature. Alternatively this method can be called
362
# while probing for features before running the tests; in that
363
# case we will see startTest and stopTest, but the test will never
373
# while probing for features before running the test code proper; in
374
# that case we will see startTest and stopTest, but the test will
375
# never actually run.
365
376
self.unsupported.setdefault(str(feature), 0)
366
377
self.unsupported[str(feature)] += 1
367
378
self.report_unsupported(test, feature)
371
382
self.skip_count += 1
372
383
self.report_skip(test, reason)
374
def _addNotApplicable(self, test, skip_excinfo):
375
if isinstance(skip_excinfo[1], TestNotApplicable):
376
self.not_applicable_count += 1
377
self.report_not_applicable(test, skip_excinfo)
380
except KeyboardInterrupt:
383
self.addError(test, test.exc_info())
385
# seems best to treat this as success from point-of-view of unittest
386
# -- it actually does nothing so it barely matters :)
387
unittest.TestResult.addSuccess(self, test)
388
test._log_contents = ''
390
def printErrorList(self, flavour, errors):
391
for test, err in errors:
392
self.stream.writeln(self.separator1)
393
self.stream.write("%s: " % flavour)
394
self.stream.writeln(self.getDescription(test))
395
if getattr(test, '_get_log', None) is not None:
396
log_contents = test._get_log()
398
self.stream.write('\n')
400
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
401
self.stream.write('\n')
402
self.stream.write(log_contents)
403
self.stream.write('\n')
405
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
406
self.stream.write('\n')
407
self.stream.writeln(self.separator2)
408
self.stream.writeln("%s" % err)
385
def addNotApplicable(self, test, reason):
386
self.not_applicable_count += 1
387
self.report_not_applicable(test, reason)
410
389
def _post_mortem(self):
411
390
"""Start a PDB post mortem session."""
512
490
return self._shortened_test_description(test)
514
492
def report_error(self, test, err):
515
self.pb.note('ERROR: %s\n %s\n',
493
self.stream.write('ERROR: %s\n %s\n' % (
516
494
self._test_description(test),
520
498
def report_failure(self, test, err):
521
self.pb.note('FAIL: %s\n %s\n',
499
self.stream.write('FAIL: %s\n %s\n' % (
522
500
self._test_description(test),
526
504
def report_known_failure(self, test, err):
527
self.pb.note('XFAIL: %s\n%s\n',
528
self._test_description(test), err[1])
530
507
def report_skip(self, test, reason):
533
def report_not_applicable(self, test, skip_excinfo):
510
def report_not_applicable(self, test, reason):
536
513
def report_unsupported(self, test, feature):
558
535
def report_test_start(self, test):
560
537
name = self._shortened_test_description(test)
561
# width needs space for 6 char status, plus 1 for slash, plus an
562
# 11-char time string, plus a trailing blank
563
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
564
self.stream.write(self._ellipsize_to_right(name,
565
osutils.terminal_width()-18))
538
width = osutils.terminal_width()
539
if width is not None:
540
# width needs space for 6 char status, plus 1 for slash, plus an
541
# 11-char time string, plus a trailing blank
542
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
544
self.stream.write(self._ellipsize_to_right(name, width-18))
546
self.stream.write(name)
566
547
self.stream.flush()
568
549
def _error_summary(self, err):
597
578
self.stream.writeln(' SKIP %s\n%s'
598
579
% (self._testTimeString(test), reason))
600
def report_not_applicable(self, test, skip_excinfo):
601
self.stream.writeln(' N/A %s\n%s'
602
% (self._testTimeString(test),
603
self._error_summary(skip_excinfo)))
581
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
583
% (self._testTimeString(test), reason))
605
585
def report_unsupported(self, test, feature):
606
586
"""test cannot be run because feature is missing."""
626
606
applied left to right - the first element in the list is the
627
607
innermost decorator.
609
# stream may know claim to know to write unicode strings, but in older
610
# pythons this goes sufficiently wrong that it is a bad idea. (
611
# specifically a built in file with encoding 'UTF-8' will still try
612
# to encode using ascii.
613
new_encoding = osutils.get_terminal_encoding()
614
codec = codecs.lookup(new_encoding)
615
if type(codec) is tuple:
619
encode = codec.encode
620
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
621
stream.encoding = new_encoding
629
622
self.stream = unittest._WritelnDecorator(stream)
630
623
self.descriptions = descriptions
631
624
self.verbosity = verbosity
698
class KnownFailure(AssertionError):
699
"""Indicates that a test failed in a precisely expected manner.
701
Such failures dont block the whole test suite from passing because they are
702
indicators of partially completed code or of future work. We have an
703
explicit error for them so that we can ensure that they are always visible:
704
KnownFailures are always shown in the output of bzr selftest.
682
# traceback._some_str fails to format exceptions that have the default
683
# __str__ which does an implicit ascii conversion. However, repr() on those
684
# objects works, for all that its not quite what the doctor may have ordered.
685
def _clever_some_str(value):
690
return repr(value).replace('\\n', '\n')
692
return '<unprintable %s object>' % type(value).__name__
694
traceback._some_str = _clever_some_str
697
# deprecated - use self.knownFailure(), or self.expectFailure.
698
KnownFailure = testtools.testcase._ExpectedFailure
708
701
class UnavailableFeature(Exception):
709
702
"""A feature required for this test was not available.
704
This can be considered a specialised form of SkippedTest.
711
706
The feature should be used to construct the exception.
715
class CommandFailed(Exception):
719
710
class StringIOWrapper(object):
720
711
"""A wrapper around cStringIO which just adds an encoding attribute.
807
798
_leaking_threads_tests = 0
808
799
_first_thread_leaker_id = None
809
800
_log_file_name = None
811
_keep_log_file = False
812
801
# record lsprof data when performing benchmark calls.
813
802
_gather_lsprof_in_benchmarks = False
814
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
815
'_log_contents', '_log_file_name', '_benchtime',
816
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
818
804
def __init__(self, methodName='testMethod'):
819
805
super(TestCase, self).__init__(methodName)
820
806
self._cleanups = []
821
self._bzr_test_setUp_run = False
822
self._bzr_test_tearDown_run = False
823
807
self._directory_isolation = True
808
self.exception_handlers.insert(0,
809
(UnavailableFeature, self._do_unsupported_or_skip))
810
self.exception_handlers.insert(0,
811
(TestNotApplicable, self._do_not_applicable))
826
unittest.TestCase.setUp(self)
827
self._bzr_test_setUp_run = True
814
super(TestCase, self).setUp()
815
for feature in getattr(self, '_test_needs_features', []):
816
self.requireFeature(feature)
817
self._log_contents = None
818
self.addDetail("log", content.Content(content.ContentType("text",
819
"plain", {"charset": "utf8"}),
820
lambda:[self._get_log(keep_log_file=True)]))
828
821
self._cleanEnvironment()
829
822
self._silenceUI()
830
823
self._startLogFile()
846
839
active = threading.activeCount()
847
840
leaked_threads = active - TestCase._active_threads
848
841
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
850
849
TestCase._leaking_threads_tests += 1
851
850
if TestCase._first_thread_leaker_id is None:
852
851
TestCase._first_thread_leaker_id = self.id()
857
856
Tests that want to use debug flags can just set them in the
858
857
debug_flags set during setup/teardown.
860
self._preserved_debug_flags = set(debug.debug_flags)
859
# Start with a copy of the current debug flags we can safely modify.
860
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
861
861
if 'allow_debug' not in selftest_debug_flags:
862
862
debug.debug_flags.clear()
863
863
if 'disable_lock_checks' not in selftest_debug_flags:
864
864
debug.debug_flags.add('strict_locks')
865
self.addCleanup(self._restore_debug_flags)
867
866
def _clear_hooks(self):
868
867
# prevent hooks affecting tests
1133
1129
:raises AssertionError: If the expected and actual stat values differ
1134
1130
other than by atime.
1136
self.assertEqual(expected.st_size, actual.st_size)
1137
self.assertEqual(expected.st_mtime, actual.st_mtime)
1138
self.assertEqual(expected.st_ctime, actual.st_ctime)
1139
self.assertEqual(expected.st_dev, actual.st_dev)
1140
self.assertEqual(expected.st_ino, actual.st_ino)
1141
self.assertEqual(expected.st_mode, actual.st_mode)
1132
self.assertEqual(expected.st_size, actual.st_size,
1133
'st_size did not match')
1134
self.assertEqual(expected.st_mtime, actual.st_mtime,
1135
'st_mtime did not match')
1136
self.assertEqual(expected.st_ctime, actual.st_ctime,
1137
'st_ctime did not match')
1138
if sys.platform != 'win32':
1139
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1140
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1143
self.assertEqual(expected.st_dev, actual.st_dev,
1144
'st_dev did not match')
1145
self.assertEqual(expected.st_ino, actual.st_ino,
1146
'st_ino did not match')
1147
self.assertEqual(expected.st_mode, actual.st_mode,
1148
'st_mode did not match')
1143
1150
def assertLength(self, length, obj_with_len):
1144
1151
"""Assert that obj_with_len is of length length."""
1146
1153
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1147
1154
length, len(obj_with_len), obj_with_len))
1156
def assertLogsError(self, exception_class, func, *args, **kwargs):
1157
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1159
from bzrlib import trace
1161
orig_log_exception_quietly = trace.log_exception_quietly
1164
orig_log_exception_quietly()
1165
captured.append(sys.exc_info())
1166
trace.log_exception_quietly = capture
1167
func(*args, **kwargs)
1169
trace.log_exception_quietly = orig_log_exception_quietly
1170
self.assertLength(1, captured)
1171
err = captured[0][1]
1172
self.assertIsInstance(err, exception_class)
1149
1175
def assertPositive(self, val):
1150
1176
"""Assert that val is greater than 0."""
1151
1177
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1273
1303
m += ": " + msg
1276
def expectFailure(self, reason, assertion, *args, **kwargs):
1277
"""Invoke a test, expecting it to fail for the given reason.
1279
This is for assertions that ought to succeed, but currently fail.
1280
(The failure is *expected* but not *wanted*.) Please be very precise
1281
about the failure you're expecting. If a new bug is introduced,
1282
AssertionError should be raised, not KnownFailure.
1284
Frequently, expectFailure should be followed by an opposite assertion.
1287
Intended to be used with a callable that raises AssertionError as the
1288
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1290
Raises KnownFailure if the test fails. Raises AssertionError if the
1295
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1297
self.assertEqual(42, dynamic_val)
1299
This means that a dynamic_val of 54 will cause the test to raise
1300
a KnownFailure. Once math is fixed and the expectFailure is removed,
1301
only a dynamic_val of 42 will allow the test to pass. Anything other
1302
than 54 or 42 will cause an AssertionError.
1305
assertion(*args, **kwargs)
1306
except AssertionError:
1307
raise KnownFailure(reason)
1309
self.fail('Unexpected success. Should have failed: %s' % reason)
1311
1306
def assertFileEqual(self, content, path):
1312
1307
"""Fail if path does not contain 'content'."""
1313
1308
self.failUnlessExists(path)
1319
1314
self.assertEqualDiff(content, s)
1316
def assertDocstring(self, expected_docstring, obj):
1317
"""Fail if obj does not have expected_docstring"""
1319
# With -OO the docstring should be None instead
1320
self.assertIs(obj.__doc__, None)
1322
self.assertEqual(expected_docstring, obj.__doc__)
1321
1324
def failUnlessExists(self, path):
1322
1325
"""Fail unless path or paths, which may be abs or relative, exist."""
1323
1326
if not isinstance(path, basestring):
1464
1467
Close the file and delete it, unless setKeepLogfile was called.
1466
if self._log_file is None:
1469
if bzrlib.trace._trace_file:
1470
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1468
1472
bzrlib.trace.pop_log_file(self._log_memento)
1469
self._log_file.close()
1470
self._log_file = None
1471
if not self._keep_log_file:
1472
os.remove(self._log_file_name)
1473
self._log_file_name = None
1475
def setKeepLogfile(self):
1476
"""Make the logfile not be deleted when _finishLogFile is called."""
1477
self._keep_log_file = True
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1479
1476
def thisFailsStrictLockCheck(self):
1480
1477
"""It is known that this test would fail with -Dstrict_locks.
1498
1495
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
1500
1516
def _cleanEnvironment(self):
1502
1518
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1536
1556
'ftp_proxy': None,
1537
1557
'FTP_PROXY': None,
1538
1558
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1541
1566
self.addCleanup(self._restoreEnvironment)
1542
1567
for name, value in new_env.iteritems():
1543
1568
self._captureVar(name, value)
1545
1570
def _captureVar(self, name, newvalue):
1546
1571
"""Set an environment variable, and reset it when finished."""
1547
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1549
def _restore_debug_flags(self):
1550
debug.debug_flags.clear()
1551
debug.debug_flags.update(self._preserved_debug_flags)
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1553
1574
def _restoreEnvironment(self):
1554
for name, value in self.__old_env.iteritems():
1575
for name, value in self._old_env.iteritems():
1555
1576
osutils.set_or_unset_env(name, value)
1557
1578
def _restoreHooks(self):
1565
1586
def _do_skip(self, result, reason):
1566
1587
addSkip = getattr(result, 'addSkip', None)
1567
1588
if not callable(addSkip):
1568
result.addError(self, sys.exc_info())
1589
result.addSuccess(result)
1570
1591
addSkip(self, reason)
1572
def run(self, result=None):
1573
if result is None: result = self.defaultTestResult()
1574
for feature in getattr(self, '_test_needs_features', []):
1575
if not feature.available():
1576
result.startTest(self)
1577
if getattr(result, 'addNotSupported', None):
1578
result.addNotSupported(self, feature)
1580
result.addSuccess(self)
1581
result.stopTest(self)
1585
result.startTest(self)
1586
absent_attr = object()
1588
method_name = getattr(self, '_testMethodName', absent_attr)
1589
if method_name is absent_attr:
1591
method_name = getattr(self, '_TestCase__testMethodName')
1592
testMethod = getattr(self, method_name)
1596
if not self._bzr_test_setUp_run:
1598
"test setUp did not invoke "
1599
"bzrlib.tests.TestCase's setUp")
1600
except KeyboardInterrupt:
1603
except TestSkipped, e:
1604
self._do_skip(result, e.args[0])
1608
result.addError(self, sys.exc_info())
1616
except self.failureException:
1617
result.addFailure(self, sys.exc_info())
1618
except TestSkipped, e:
1620
reason = "No reason given."
1623
self._do_skip(result, reason)
1624
except KeyboardInterrupt:
1628
result.addError(self, sys.exc_info())
1632
if not self._bzr_test_tearDown_run:
1634
"test tearDown did not invoke "
1635
"bzrlib.tests.TestCase's tearDown")
1636
except KeyboardInterrupt:
1640
result.addError(self, sys.exc_info())
1643
if ok: result.addSuccess(self)
1645
result.stopTest(self)
1647
except TestNotApplicable:
1648
# Not moved from the result [yet].
1651
except KeyboardInterrupt:
1656
for attr_name in self.attrs_to_keep:
1657
if attr_name in self.__dict__:
1658
saved_attrs[attr_name] = self.__dict__[attr_name]
1659
self.__dict__ = saved_attrs
1663
self._log_contents = ''
1664
self._bzr_test_tearDown_run = True
1665
unittest.TestCase.tearDown(self)
1594
def _do_known_failure(self, result, e):
1595
err = sys.exc_info()
1596
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1597
if addExpectedFailure is not None:
1598
addExpectedFailure(self, err)
1600
result.addSuccess(self)
1603
def _do_not_applicable(self, result, e):
1605
reason = 'No reason given'
1608
addNotApplicable = getattr(result, 'addNotApplicable', None)
1609
if addNotApplicable is not None:
1610
result.addNotApplicable(self, reason)
1612
self._do_skip(result, reason)
1615
def _do_unsupported_or_skip(self, result, e):
1617
addNotSupported = getattr(result, 'addNotSupported', None)
1618
if addNotSupported is not None:
1619
result.addNotSupported(self, reason)
1621
self._do_skip(result, reason)
1667
1623
def time(self, callable, *args, **kwargs):
1668
1624
"""Run callable and accrue the time it takes to the benchmark time.
1687
1645
self._benchtime += time.time() - start
1689
def _runCleanups(self):
1690
"""Run registered cleanup functions.
1692
This should only be called from TestCase.tearDown.
1694
# TODO: Perhaps this should keep running cleanups even if
1695
# one of them fails?
1697
# Actually pop the cleanups from the list so tearDown running
1698
# twice is safe (this happens for skipped tests).
1699
while self._cleanups:
1700
cleanup, args, kwargs = self._cleanups.pop()
1701
cleanup(*args, **kwargs)
1703
1647
def log(self, *args):
1706
1650
def _get_log(self, keep_log_file=False):
1707
"""Get the log from bzrlib.trace calls from this test.
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1709
1656
:param keep_log_file: When True, if the log is still a file on disk
1710
1657
leave it as a file on disk. When False, if the log is still a file
1712
1659
self._log_contents.
1713
1660
:return: A string containing the log.
1715
# flush the log file, to get all content
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1668
return self._log_contents
1716
1669
import bzrlib.trace
1717
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1718
1672
bzrlib.trace._trace_file.flush()
1719
if self._log_contents:
1720
# XXX: this can hardly contain the content flushed above --vila
1722
return self._log_contents
1723
1673
if self._log_file_name is not None:
1724
1674
logfile = open(self._log_file_name)
1726
1676
log_contents = logfile.read()
1728
1678
logfile.close()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1729
1684
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1730
1715
self._log_contents = log_contents
1732
1717
os.remove(self._log_file_name)
1783
1779
os.chdir(working_dir)
1786
result = self.apply_redirected(ui.ui_factory.stdin,
1788
bzrlib.commands.run_bzr_catch_user_errors,
1783
result = self.apply_redirected(ui.ui_factory.stdin,
1785
bzrlib.commands.run_bzr_catch_user_errors,
1787
except KeyboardInterrupt:
1788
# Reraise KeyboardInterrupt with contents of redirected stdout
1789
# and stderr as arguments, for tests which are interested in
1790
# stdout and stderr and are expecting the exception.
1791
out = stdout.getvalue()
1792
err = stderr.getvalue()
1794
self.log('output:\n%r', out)
1796
self.log('errors:\n%r', err)
1797
raise KeyboardInterrupt(out, err)
1791
1799
logger.removeHandler(handler)
1792
1800
ui.ui_factory = old_ui_factory
2422
2424
return branchbuilder.BranchBuilder(branch=branch)
2424
2426
def overrideEnvironmentForTesting(self):
2425
os.environ['HOME'] = self.test_home_dir
2426
os.environ['BZR_HOME'] = self.test_home_dir
2427
test_home_dir = self.test_home_dir
2428
if isinstance(test_home_dir, unicode):
2429
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2428
2433
def setUp(self):
2429
2434
super(TestCaseWithMemoryTransport, self).setUp()
2430
2435
self._make_test_root()
2431
_currentdir = os.getcwdu()
2432
def _leaveDirectory():
2433
os.chdir(_currentdir)
2434
self.addCleanup(_leaveDirectory)
2436
self.addCleanup(os.chdir, os.getcwdu())
2435
2437
self.makeAndChdirToTestDir()
2436
2438
self.overrideEnvironmentForTesting()
2437
2439
self.__readonly_server = None
3287
3299
if not os.path.isfile(bzr_path):
3288
3300
# We are probably installed. Assume sys.argv is the right file
3289
3301
bzr_path = sys.argv[0]
3302
bzr_path = [bzr_path]
3303
if sys.platform == "win32":
3304
# if we're on windows, we can't execute the bzr script directly
3305
bzr_path = [sys.executable] + bzr_path
3290
3306
fd, test_list_file_name = tempfile.mkstemp()
3291
3307
test_list_file = os.fdopen(fd, 'wb', 1)
3292
3308
for test in process_tests:
3293
3309
test_list_file.write(test.id() + '\n')
3294
3310
test_list_file.close()
3296
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3312
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3298
3314
if '--no-plugins' in sys.argv:
3299
3315
argv.append('--no-plugins')
3339
3355
def addFailure(self, test, err):
3340
3356
self.result.addFailure(test, err)
3343
class BZRTransformingResult(ForwardingResult):
3345
def addError(self, test, err):
3346
feature = self._error_looks_like('UnavailableFeature: ', err)
3347
if feature is not None:
3348
self.result.addNotSupported(test, feature)
3350
self.result.addError(test, err)
3352
def addFailure(self, test, err):
3353
known = self._error_looks_like('KnownFailure: ', err)
3354
if known is not None:
3355
self.result._addKnownFailure(test, [KnownFailure,
3356
KnownFailure(known), None])
3358
self.result.addFailure(test, err)
3360
def _error_looks_like(self, prefix, err):
3361
"""Deserialize exception and returns the stringify value."""
3365
if isinstance(exc, subunit.RemoteException):
3366
# stringify the exception gives access to the remote traceback
3367
# We search the last line for 'prefix'
3368
lines = str(exc).split('\n')
3369
while lines and not lines[-1]:
3372
if lines[-1].startswith(prefix):
3373
value = lines[-1][len(prefix):]
3357
ForwardingResult = testtools.ExtendedToOriginalDecorator
3377
3360
class ProfileResult(ForwardingResult):
3654
3637
'bzrlib.tests.commands',
3655
3638
'bzrlib.tests.per_branch',
3656
3639
'bzrlib.tests.per_bzrdir',
3640
'bzrlib.tests.per_bzrdir_colo',
3641
'bzrlib.tests.per_foreign_vcs',
3657
3642
'bzrlib.tests.per_interrepository',
3658
3643
'bzrlib.tests.per_intertree',
3659
3644
'bzrlib.tests.per_inventory',
3660
3645
'bzrlib.tests.per_interbranch',
3661
3646
'bzrlib.tests.per_lock',
3647
'bzrlib.tests.per_merger',
3662
3648
'bzrlib.tests.per_transport',
3663
3649
'bzrlib.tests.per_tree',
3664
3650
'bzrlib.tests.per_pack_repository',
3665
3651
'bzrlib.tests.per_repository',
3666
3652
'bzrlib.tests.per_repository_chk',
3667
3653
'bzrlib.tests.per_repository_reference',
3654
'bzrlib.tests.per_uifactory',
3668
3655
'bzrlib.tests.per_versionedfile',
3669
3656
'bzrlib.tests.per_workingtree',
3670
3657
'bzrlib.tests.test__annotator',
3658
'bzrlib.tests.test__bencode',
3671
3659
'bzrlib.tests.test__chk_map',
3672
3660
'bzrlib.tests.test__dirstate_helpers',
3673
3661
'bzrlib.tests.test__groupcompress',
3674
3662
'bzrlib.tests.test__known_graph',
3675
3663
'bzrlib.tests.test__rio',
3664
'bzrlib.tests.test__simple_set',
3665
'bzrlib.tests.test__static_tuple',
3676
3666
'bzrlib.tests.test__walkdirs_win32',
3677
3667
'bzrlib.tests.test_ancestry',
3678
3668
'bzrlib.tests.test_annotate',
3679
3669
'bzrlib.tests.test_api',
3680
3670
'bzrlib.tests.test_atomicfile',
3681
3671
'bzrlib.tests.test_bad_files',
3682
'bzrlib.tests.test_bencode',
3683
3672
'bzrlib.tests.test_bisect_multi',
3684
3673
'bzrlib.tests.test_branch',
3685
3674
'bzrlib.tests.test_branchbuilder',
4046
4043
return new_test
4049
def _rmtree_temp_dir(dirname):
4046
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4048
"""Helper for permutating tests against an extension module.
4050
This is meant to be used inside a modules 'load_tests()' function. It will
4051
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4052
against both implementations. Setting 'test.module' to the appropriate
4053
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4055
:param standard_tests: A test suite to permute
4056
:param loader: A TestLoader
4057
:param py_module_name: The python path to a python module that can always
4058
be loaded, and will be considered the 'python' implementation. (eg
4059
'bzrlib._chk_map_py')
4060
:param ext_module_name: The python path to an extension module. If the
4061
module cannot be loaded, a single test will be added, which notes that
4062
the module is not available. If it can be loaded, all standard_tests
4063
will be run against that module.
4064
:return: (suite, feature) suite is a test-suite that has all the permuted
4065
tests. feature is the Feature object that can be used to determine if
4066
the module is available.
4069
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4071
('python', {'module': py_module}),
4073
suite = loader.suiteClass()
4074
feature = ModuleAvailableFeature(ext_module_name)
4075
if feature.available():
4076
scenarios.append(('C', {'module': feature.module}))
4078
# the compiled module isn't available, so we add a failing test
4079
class FailWithoutFeature(TestCase):
4080
def test_fail(self):
4081
self.requireFeature(feature)
4082
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4083
result = multiply_tests(standard_tests, scenarios, suite)
4084
return result, feature
4087
def _rmtree_temp_dir(dirname, test_id=None):
4050
4088
# If LANG=C we probably have created some bogus paths
4051
4089
# which rmtree(unicode) will fail to delete
4052
4090
# so make sure we are using rmtree(str) to delete everything
4064
4102
# We don't want to fail here because some useful display will be lost
4065
4103
# otherwise. Polluting the tmp dir is bad, but not giving all the
4066
4104
# possible info to the test runner is even worse.
4106
ui.ui_factory.clear_term()
4107
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4108
# Ugly, but the last thing we want here is fail, so bear with it.
4109
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4110
).encode('ascii', 'replace')
4067
4111
sys.stderr.write('Unable to remove testing dir %s\n%s'
4068
% (os.path.basename(dirname), e))
4112
% (os.path.basename(dirname), printable_e))
4071
4115
class Feature(object):
4153
4197
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4200
class _CompatabilityThunkFeature(Feature):
4201
"""This feature is just a thunk to another feature.
4203
It issues a deprecation warning if it is accessed, to let you know that you
4204
should really use a different feature.
4207
def __init__(self, dep_version, module, name,
4208
replacement_name, replacement_module=None):
4209
super(_CompatabilityThunkFeature, self).__init__()
4210
self._module = module
4211
if replacement_module is None:
4212
replacement_module = module
4213
self._replacement_module = replacement_module
4215
self._replacement_name = replacement_name
4216
self._dep_version = dep_version
4217
self._feature = None
4220
if self._feature is None:
4221
depr_msg = self._dep_version % ('%s.%s'
4222
% (self._module, self._name))
4223
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4224
self._replacement_name)
4225
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4226
# Import the new feature and use it as a replacement for the
4228
mod = __import__(self._replacement_module, {}, {},
4229
[self._replacement_name])
4230
self._feature = getattr(mod, self._replacement_name)
4234
return self._feature._probe()
4237
class ModuleAvailableFeature(Feature):
4238
"""This is a feature than describes a module we want to be available.
4240
Declare the name of the module in __init__(), and then after probing, the
4241
module will be available as 'self.module'.
4243
:ivar module: The module if it is available, else None.
4246
def __init__(self, module_name):
4247
super(ModuleAvailableFeature, self).__init__()
4248
self.module_name = module_name
4252
self._module = __import__(self.module_name, {}, {}, [''])
4259
if self.available(): # Make sure the probe has been done
4263
def feature_name(self):
4264
return self.module_name
4267
# This is kept here for compatibility, it is recommended to use
4268
# 'bzrlib.tests.feature.paramiko' instead
4269
ParamikoFeature = _CompatabilityThunkFeature(
4270
deprecated_in((2,1,0)),
4271
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4156
4274
def probe_unicode_in_user_encoding():
4157
4275
"""Try to encode several unicode strings to use in unicode-aware tests.
4158
4276
Return first successfull match.
4238
4356
UTF8Filesystem = _UTF8Filesystem()
4359
class _BreakinFeature(Feature):
4360
"""Does this platform support the breakin feature?"""
4363
from bzrlib import breakin
4364
if breakin.determine_signal() is None:
4366
if sys.platform == 'win32':
4367
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4368
# We trigger SIGBREAK via a Console api so we need ctypes to
4369
# access the function
4376
def feature_name(self):
4377
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4380
BreakinFeature = _BreakinFeature()
4241
4383
class _CaseInsCasePresFilenameFeature(Feature):
4242
4384
"""Is the file-system case insensitive, but case-preserving?"""
4293
4435
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4296
class _SubUnitFeature(Feature):
4297
"""Check if subunit is available."""
4438
class _CaseSensitiveFilesystemFeature(Feature):
4299
4440
def _probe(self):
4441
if CaseInsCasePresFilenameFeature.available():
4443
elif CaseInsensitiveFilesystemFeature.available():
4306
4448
def feature_name(self):
4309
SubUnitFeature = _SubUnitFeature()
4449
return 'case-sensitive filesystem'
4451
# new coding style is for feature instances to be lowercase
4452
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4455
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4456
SubUnitFeature = _CompatabilityThunkFeature(
4457
deprecated_in((2,1,0)),
4458
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4310
4459
# Only define SubUnitBzrRunner if subunit is available.
4312
4461
from subunit import TestProtocolClient
4314
from subunit.test_results import AutoTimingTestResultDecorator
4316
AutoTimingTestResultDecorator = lambda x:x
4462
from subunit.test_results import AutoTimingTestResultDecorator
4317
4463
class SubUnitBzrRunner(TextTestRunner):
4318
4464
def run(self, test):
4319
4465
result = AutoTimingTestResultDecorator(