97
87
from bzrlib.symbol_versioning import (
98
88
DEPRECATED_PARAMETER,
99
89
deprecated_function,
101
90
deprecated_method,
102
91
deprecated_passed,
104
93
import bzrlib.trace
105
from bzrlib.transport import (
94
from bzrlib.transport import get_transport, pathfilter
110
95
import bzrlib.transport
96
from bzrlib.transport.local import LocalURLServer
97
from bzrlib.transport.memory import MemoryServer
98
from bzrlib.transport.readonly import ReadonlyServer
111
99
from bzrlib.trace import mutter, note
112
from bzrlib.tests import (
100
from bzrlib.tests import TestUtil
116
101
from bzrlib.tests.http_server import HttpServer
117
102
from bzrlib.tests.TestUtil import (
247
227
'%d non-main threads were left active in the end.\n'
248
228
% (TestCase._active_threads - 1))
250
def getDescription(self, test):
253
def _extractBenchmarkTime(self, testCase, details=None):
230
def _extractBenchmarkTime(self, testCase):
254
231
"""Add a benchmark time for the current test case."""
255
if details and 'benchtime' in details:
256
return float(''.join(details['benchtime'].iter_bytes()))
257
232
return getattr(testCase, "_benchtime", None)
259
234
def _elapsedTestTimeString(self):
322
297
Called from the TestCase run() method when the test
323
298
fails with an unexpected error.
326
unittest.TestResult.addError(self, test, err)
327
self.error_count += 1
328
self.report_error(test, err)
331
self._cleanupLogFile(test)
300
self._testConcluded(test)
301
if isinstance(err[1], TestNotApplicable):
302
return self._addNotApplicable(test, err)
303
elif isinstance(err[1], UnavailableFeature):
304
return self.addNotSupported(test, err[1].args[0])
307
unittest.TestResult.addError(self, test, err)
308
self.error_count += 1
309
self.report_error(test, err)
312
self._cleanupLogFile(test)
333
314
def addFailure(self, test, err):
334
315
"""Tell result that test failed.
336
317
Called from the TestCase run() method when the test
337
318
fails because e.g. an assert() method failed.
340
unittest.TestResult.addFailure(self, test, err)
341
self.failure_count += 1
342
self.report_failure(test, err)
345
self._cleanupLogFile(test)
320
self._testConcluded(test)
321
if isinstance(err[1], KnownFailure):
322
return self._addKnownFailure(test, err)
325
unittest.TestResult.addFailure(self, test, err)
326
self.failure_count += 1
327
self.report_failure(test, err)
330
self._cleanupLogFile(test)
347
def addSuccess(self, test, details=None):
332
def addSuccess(self, test):
348
333
"""Tell result that test completed successfully.
350
335
Called from the TestCase run()
337
self._testConcluded(test)
352
338
if self._bench_history is not None:
353
benchmark_time = self._extractBenchmarkTime(test, details)
339
benchmark_time = self._extractBenchmarkTime(test)
354
340
if benchmark_time is not None:
355
341
self._bench_history.write("%s %s\n" % (
356
342
self._formatTime(benchmark_time),
360
346
unittest.TestResult.addSuccess(self, test)
361
347
test._log_contents = ''
363
def addExpectedFailure(self, test, err):
349
def _testConcluded(self, test):
350
"""Common code when a test has finished.
352
Called regardless of whether it succeded, failed, etc.
356
def _addKnownFailure(self, test, err):
364
357
self.known_failure_count += 1
365
358
self.report_known_failure(test, err)
368
361
"""The test will not be run because of a missing feature.
370
363
# this can be called in two different ways: it may be that the
371
# test started running, and then raised (through requireFeature)
364
# test started running, and then raised (through addError)
372
365
# UnavailableFeature. Alternatively this method can be called
373
# while probing for features before running the test code proper; in
374
# that case we will see startTest and stopTest, but the test will
375
# never actually run.
366
# while probing for features before running the tests; in that
367
# case we will see startTest and stopTest, but the test will never
376
369
self.unsupported.setdefault(str(feature), 0)
377
370
self.unsupported[str(feature)] += 1
378
371
self.report_unsupported(test, feature)
382
375
self.skip_count += 1
383
376
self.report_skip(test, reason)
385
def addNotApplicable(self, test, reason):
386
self.not_applicable_count += 1
387
self.report_not_applicable(test, reason)
378
def _addNotApplicable(self, test, skip_excinfo):
379
if isinstance(skip_excinfo[1], TestNotApplicable):
380
self.not_applicable_count += 1
381
self.report_not_applicable(test, skip_excinfo)
384
except KeyboardInterrupt:
387
self.addError(test, test.exc_info())
389
# seems best to treat this as success from point-of-view of unittest
390
# -- it actually does nothing so it barely matters :)
391
unittest.TestResult.addSuccess(self, test)
392
test._log_contents = ''
394
def printErrorList(self, flavour, errors):
395
for test, err in errors:
396
self.stream.writeln(self.separator1)
397
self.stream.write("%s: " % flavour)
398
self.stream.writeln(self.getDescription(test))
399
if getattr(test, '_get_log', None) is not None:
400
log_contents = test._get_log()
402
self.stream.write('\n')
404
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
405
self.stream.write('\n')
406
self.stream.write(log_contents)
407
self.stream.write('\n')
409
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
410
self.stream.write('\n')
411
self.stream.writeln(self.separator2)
412
self.stream.writeln("%s" % err)
389
414
def _post_mortem(self):
390
415
"""Start a PDB post mortem session."""
490
516
return self._shortened_test_description(test)
492
518
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
519
ui.ui_factory.note('ERROR: %s\n %s\n' % (
494
520
self._test_description(test),
498
524
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
525
ui.ui_factory.note('FAIL: %s\n %s\n' % (
500
526
self._test_description(test),
504
530
def report_known_failure(self, test, err):
531
ui.ui_factory.note('XFAIL: %s\n%s\n' % (
532
self._test_description(test), err[1]))
507
534
def report_skip(self, test, reason):
510
def report_not_applicable(self, test, reason):
537
def report_not_applicable(self, test, skip_excinfo):
513
540
def report_unsupported(self, test, feature):
535
562
def report_test_start(self, test):
537
564
name = self._shortened_test_description(test)
538
width = osutils.terminal_width()
539
if width is not None:
540
# width needs space for 6 char status, plus 1 for slash, plus an
541
# 11-char time string, plus a trailing blank
542
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
544
self.stream.write(self._ellipsize_to_right(name, width-18))
546
self.stream.write(name)
565
# width needs space for 6 char status, plus 1 for slash, plus an
566
# 11-char time string, plus a trailing blank
567
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
568
self.stream.write(self._ellipsize_to_right(name,
569
osutils.terminal_width()-18))
547
570
self.stream.flush()
549
572
def _error_summary(self, err):
578
601
self.stream.writeln(' SKIP %s\n%s'
579
602
% (self._testTimeString(test), reason))
581
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
583
% (self._testTimeString(test), reason))
604
def report_not_applicable(self, test, skip_excinfo):
605
self.stream.writeln(' N/A %s\n%s'
606
% (self._testTimeString(test),
607
self._error_summary(skip_excinfo)))
585
609
def report_unsupported(self, test, feature):
586
610
"""test cannot be run because feature is missing."""
606
630
applied left to right - the first element in the list is the
607
631
innermost decorator.
609
# stream may know claim to know to write unicode strings, but in older
610
# pythons this goes sufficiently wrong that it is a bad idea. (
611
# specifically a built in file with encoding 'UTF-8' will still try
612
# to encode using ascii.
613
new_encoding = osutils.get_terminal_encoding()
614
codec = codecs.lookup(new_encoding)
615
if type(codec) is tuple:
619
encode = codec.encode
620
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
621
stream.encoding = new_encoding
622
633
self.stream = unittest._WritelnDecorator(stream)
623
634
self.descriptions = descriptions
624
635
self.verbosity = verbosity
682
# traceback._some_str fails to format exceptions that have the default
683
# __str__ which does an implicit ascii conversion. However, repr() on those
684
# objects works, for all that its not quite what the doctor may have ordered.
685
def _clever_some_str(value):
690
return repr(value).replace('\\n', '\n')
692
return '<unprintable %s object>' % type(value).__name__
694
traceback._some_str = _clever_some_str
697
# deprecated - use self.knownFailure(), or self.expectFailure.
698
KnownFailure = testtools.testcase._ExpectedFailure
702
class KnownFailure(AssertionError):
703
"""Indicates that a test failed in a precisely expected manner.
705
Such failures dont block the whole test suite from passing because they are
706
indicators of partially completed code or of future work. We have an
707
explicit error for them so that we can ensure that they are always visible:
708
KnownFailures are always shown in the output of bzr selftest.
701
712
class UnavailableFeature(Exception):
702
713
"""A feature required for this test was not available.
704
This can be considered a specialised form of SkippedTest.
706
715
The feature should be used to construct the exception.
719
class CommandFailed(Exception):
710
723
class StringIOWrapper(object):
711
724
"""A wrapper around cStringIO which just adds an encoding attribute.
798
811
_leaking_threads_tests = 0
799
812
_first_thread_leaker_id = None
800
813
_log_file_name = None
815
_keep_log_file = False
801
816
# record lsprof data when performing benchmark calls.
802
817
_gather_lsprof_in_benchmarks = False
818
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
819
'_log_contents', '_log_file_name', '_benchtime',
820
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
804
822
def __init__(self, methodName='testMethod'):
805
823
super(TestCase, self).__init__(methodName)
806
824
self._cleanups = []
825
self._bzr_test_setUp_run = False
826
self._bzr_test_tearDown_run = False
807
827
self._directory_isolation = True
808
self.exception_handlers.insert(0,
809
(UnavailableFeature, self._do_unsupported_or_skip))
810
self.exception_handlers.insert(0,
811
(TestNotApplicable, self._do_not_applicable))
814
super(TestCase, self).setUp()
815
for feature in getattr(self, '_test_needs_features', []):
816
self.requireFeature(feature)
817
self._log_contents = None
818
self.addDetail("log", content.Content(content.ContentType("text",
819
"plain", {"charset": "utf8"}),
820
lambda:[self._get_log(keep_log_file=True)]))
830
unittest.TestCase.setUp(self)
831
self._bzr_test_setUp_run = True
821
832
self._cleanEnvironment()
822
833
self._silenceUI()
823
834
self._startLogFile()
856
867
Tests that want to use debug flags can just set them in the
857
868
debug_flags set during setup/teardown.
859
# Start with a copy of the current debug flags we can safely modify.
860
self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
870
self._preserved_debug_flags = set(debug.debug_flags)
861
871
if 'allow_debug' not in selftest_debug_flags:
862
872
debug.debug_flags.clear()
863
873
if 'disable_lock_checks' not in selftest_debug_flags:
864
874
debug.debug_flags.add('strict_locks')
875
self.addCleanup(self._restore_debug_flags)
866
877
def _clear_hooks(self):
867
878
# prevent hooks affecting tests
1129
1143
:raises AssertionError: If the expected and actual stat values differ
1130
1144
other than by atime.
1132
self.assertEqual(expected.st_size, actual.st_size,
1133
'st_size did not match')
1134
self.assertEqual(expected.st_mtime, actual.st_mtime,
1135
'st_mtime did not match')
1136
self.assertEqual(expected.st_ctime, actual.st_ctime,
1137
'st_ctime did not match')
1138
if sys.platform != 'win32':
1139
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1140
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1143
self.assertEqual(expected.st_dev, actual.st_dev,
1144
'st_dev did not match')
1145
self.assertEqual(expected.st_ino, actual.st_ino,
1146
'st_ino did not match')
1147
self.assertEqual(expected.st_mode, actual.st_mode,
1148
'st_mode did not match')
1146
self.assertEqual(expected.st_size, actual.st_size)
1147
self.assertEqual(expected.st_mtime, actual.st_mtime)
1148
self.assertEqual(expected.st_ctime, actual.st_ctime)
1149
self.assertEqual(expected.st_dev, actual.st_dev)
1150
self.assertEqual(expected.st_ino, actual.st_ino)
1151
self.assertEqual(expected.st_mode, actual.st_mode)
1150
1153
def assertLength(self, length, obj_with_len):
1151
1154
"""Assert that obj_with_len is of length length."""
1303
1302
m += ": " + msg
1305
def expectFailure(self, reason, assertion, *args, **kwargs):
1306
"""Invoke a test, expecting it to fail for the given reason.
1308
This is for assertions that ought to succeed, but currently fail.
1309
(The failure is *expected* but not *wanted*.) Please be very precise
1310
about the failure you're expecting. If a new bug is introduced,
1311
AssertionError should be raised, not KnownFailure.
1313
Frequently, expectFailure should be followed by an opposite assertion.
1316
Intended to be used with a callable that raises AssertionError as the
1317
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1319
Raises KnownFailure if the test fails. Raises AssertionError if the
1324
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1326
self.assertEqual(42, dynamic_val)
1328
This means that a dynamic_val of 54 will cause the test to raise
1329
a KnownFailure. Once math is fixed and the expectFailure is removed,
1330
only a dynamic_val of 42 will allow the test to pass. Anything other
1331
than 54 or 42 will cause an AssertionError.
1334
assertion(*args, **kwargs)
1335
except AssertionError:
1336
raise KnownFailure(reason)
1338
self.fail('Unexpected success. Should have failed: %s' % reason)
1306
1340
def assertFileEqual(self, content, path):
1307
1341
"""Fail if path does not contain 'content'."""
1308
1342
self.failUnlessExists(path)
1314
1348
self.assertEqualDiff(content, s)
1316
def assertDocstring(self, expected_docstring, obj):
1317
"""Fail if obj does not have expected_docstring"""
1319
# With -OO the docstring should be None instead
1320
self.assertIs(obj.__doc__, None)
1322
self.assertEqual(expected_docstring, obj.__doc__)
1324
1350
def failUnlessExists(self, path):
1325
1351
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1352
if not isinstance(path, basestring):
1467
1493
Close the file and delete it, unless setKeepLogfile was called.
1469
if bzrlib.trace._trace_file:
1470
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1495
if self._log_file is None:
1472
1497
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1498
self._log_file.close()
1499
self._log_file = None
1500
if not self._keep_log_file:
1501
os.remove(self._log_file_name)
1502
self._log_file_name = None
1504
def setKeepLogfile(self):
1505
"""Make the logfile not be deleted when _finishLogFile is called."""
1506
self._keep_log_file = True
1476
1508
def thisFailsStrictLockCheck(self):
1477
1509
"""It is known that this test would fail with -Dstrict_locks.
1495
1527
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
1516
1529
def _cleanEnvironment(self):
1518
1531
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1556
1565
'ftp_proxy': None,
1557
1566
'FTP_PROXY': None,
1558
1567
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
1570
self.addCleanup(self._restoreEnvironment)
1567
1571
for name, value in new_env.iteritems():
1568
1572
self._captureVar(name, value)
1570
1574
def _captureVar(self, name, newvalue):
1571
1575
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1576
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1578
def _restore_debug_flags(self):
1579
debug.debug_flags.clear()
1580
debug.debug_flags.update(self._preserved_debug_flags)
1574
1582
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1583
for name, value in self.__old_env.iteritems():
1576
1584
osutils.set_or_unset_env(name, value)
1578
1586
def _restoreHooks(self):
1586
1594
def _do_skip(self, result, reason):
1587
1595
addSkip = getattr(result, 'addSkip', None)
1588
1596
if not callable(addSkip):
1589
result.addSuccess(result)
1597
result.addError(self, sys.exc_info())
1591
1599
addSkip(self, reason)
1594
def _do_known_failure(self, result, e):
1595
err = sys.exc_info()
1596
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1597
if addExpectedFailure is not None:
1598
addExpectedFailure(self, err)
1600
result.addSuccess(self)
1603
def _do_not_applicable(self, result, e):
1605
reason = 'No reason given'
1608
addNotApplicable = getattr(result, 'addNotApplicable', None)
1609
if addNotApplicable is not None:
1610
result.addNotApplicable(self, reason)
1612
self._do_skip(result, reason)
1615
def _do_unsupported_or_skip(self, result, e):
1617
addNotSupported = getattr(result, 'addNotSupported', None)
1618
if addNotSupported is not None:
1619
result.addNotSupported(self, reason)
1621
self._do_skip(result, reason)
1601
def run(self, result=None):
1602
if result is None: result = self.defaultTestResult()
1603
for feature in getattr(self, '_test_needs_features', []):
1604
if not feature.available():
1605
result.startTest(self)
1606
if getattr(result, 'addNotSupported', None):
1607
result.addNotSupported(self, feature)
1609
result.addSuccess(self)
1610
result.stopTest(self)
1614
result.startTest(self)
1615
absent_attr = object()
1617
method_name = getattr(self, '_testMethodName', absent_attr)
1618
if method_name is absent_attr:
1620
method_name = getattr(self, '_TestCase__testMethodName')
1621
testMethod = getattr(self, method_name)
1625
if not self._bzr_test_setUp_run:
1627
"test setUp did not invoke "
1628
"bzrlib.tests.TestCase's setUp")
1629
except KeyboardInterrupt:
1632
except TestSkipped, e:
1633
self._do_skip(result, e.args[0])
1637
result.addError(self, sys.exc_info())
1645
except self.failureException:
1646
result.addFailure(self, sys.exc_info())
1647
except TestSkipped, e:
1649
reason = "No reason given."
1652
self._do_skip(result, reason)
1653
except KeyboardInterrupt:
1657
result.addError(self, sys.exc_info())
1661
if not self._bzr_test_tearDown_run:
1663
"test tearDown did not invoke "
1664
"bzrlib.tests.TestCase's tearDown")
1665
except KeyboardInterrupt:
1669
result.addError(self, sys.exc_info())
1672
if ok: result.addSuccess(self)
1674
result.stopTest(self)
1676
except TestNotApplicable:
1677
# Not moved from the result [yet].
1680
except KeyboardInterrupt:
1685
for attr_name in self.attrs_to_keep:
1686
if attr_name in self.__dict__:
1687
saved_attrs[attr_name] = self.__dict__[attr_name]
1688
self.__dict__ = saved_attrs
1692
self._log_contents = ''
1693
self._bzr_test_tearDown_run = True
1694
unittest.TestCase.tearDown(self)
1623
1696
def time(self, callable, *args, **kwargs):
1624
1697
"""Run callable and accrue the time it takes to the benchmark time.
1645
1716
self._benchtime += time.time() - start
1718
def _runCleanups(self):
1719
"""Run registered cleanup functions.
1721
This should only be called from TestCase.tearDown.
1723
# TODO: Perhaps this should keep running cleanups even if
1724
# one of them fails?
1726
# Actually pop the cleanups from the list so tearDown running
1727
# twice is safe (this happens for skipped tests).
1728
while self._cleanups:
1729
cleanup, args, kwargs = self._cleanups.pop()
1730
cleanup(*args, **kwargs)
1647
1732
def log(self, *args):
1650
1735
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1736
"""Get the log from bzrlib.trace calls from this test.
1656
1738
:param keep_log_file: When True, if the log is still a file on disk
1657
1739
leave it as a file on disk. When False, if the log is still a file
1659
1741
self._log_contents.
1660
1742
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1744
# flush the log file, to get all content
1746
if bzrlib.trace._trace_file:
1747
bzrlib.trace._trace_file.flush()
1748
if self._log_contents:
1749
# XXX: this can hardly contain the content flushed above --vila
1668
1751
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
1752
if self._log_file_name is not None:
1674
1753
logfile = open(self._log_file_name)
1676
1755
log_contents = logfile.read()
1678
1757
logfile.close()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
1758
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
1759
self._log_contents = log_contents
1717
1761
os.remove(self._log_file_name)
1779
1812
os.chdir(working_dir)
1783
result = self.apply_redirected(ui.ui_factory.stdin,
1785
bzrlib.commands.run_bzr_catch_user_errors,
1787
except KeyboardInterrupt:
1788
# Reraise KeyboardInterrupt with contents of redirected stdout
1789
# and stderr as arguments, for tests which are interested in
1790
# stdout and stderr and are expecting the exception.
1791
out = stdout.getvalue()
1792
err = stderr.getvalue()
1794
self.log('output:\n%r', out)
1796
self.log('errors:\n%r', err)
1797
raise KeyboardInterrupt(out, err)
1815
result = self.apply_redirected(ui.ui_factory.stdin,
1817
bzrlib.commands.run_bzr_catch_user_errors,
1799
1820
logger.removeHandler(handler)
1800
1821
ui.ui_factory = old_ui_factory
2424
2451
return branchbuilder.BranchBuilder(branch=branch)
2426
2453
def overrideEnvironmentForTesting(self):
2427
test_home_dir = self.test_home_dir
2428
if isinstance(test_home_dir, unicode):
2429
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2454
os.environ['HOME'] = self.test_home_dir
2455
os.environ['BZR_HOME'] = self.test_home_dir
2433
2457
def setUp(self):
2434
2458
super(TestCaseWithMemoryTransport, self).setUp()
2435
2459
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2460
_currentdir = os.getcwdu()
2461
def _leaveDirectory():
2462
os.chdir(_currentdir)
2463
self.addCleanup(_leaveDirectory)
2437
2464
self.makeAndChdirToTestDir()
2438
2465
self.overrideEnvironmentForTesting()
2439
2466
self.__readonly_server = None
3296
3312
if not os.path.isfile(bzr_path):
3297
3313
# We are probably installed. Assume sys.argv is the right file
3298
3314
bzr_path = sys.argv[0]
3299
bzr_path = [bzr_path]
3300
if sys.platform == "win32":
3301
# if we're on windows, we can't execute the bzr script directly
3302
bzr_path = [sys.executable] + bzr_path
3303
3315
fd, test_list_file_name = tempfile.mkstemp()
3304
3316
test_list_file = os.fdopen(fd, 'wb', 1)
3305
3317
for test in process_tests:
3306
3318
test_list_file.write(test.id() + '\n')
3307
3319
test_list_file.close()
3309
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3321
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3311
3323
if '--no-plugins' in sys.argv:
3312
3324
argv.append('--no-plugins')
3352
3364
def addFailure(self, test, err):
3353
3365
self.result.addFailure(test, err)
3354
ForwardingResult = testtools.ExtendedToOriginalDecorator
3368
class BZRTransformingResult(ForwardingResult):
3370
def addError(self, test, err):
3371
feature = self._error_looks_like('UnavailableFeature: ', err)
3372
if feature is not None:
3373
self.result.addNotSupported(test, feature)
3375
self.result.addError(test, err)
3377
def addFailure(self, test, err):
3378
known = self._error_looks_like('KnownFailure: ', err)
3379
if known is not None:
3380
self.result._addKnownFailure(test, [KnownFailure,
3381
KnownFailure(known), None])
3383
self.result.addFailure(test, err)
3385
def _error_looks_like(self, prefix, err):
3386
"""Deserialize exception and returns the stringify value."""
3390
if isinstance(exc, subunit.RemoteException):
3391
# stringify the exception gives access to the remote traceback
3392
# We search the last line for 'prefix'
3393
lines = str(exc).split('\n')
3394
while lines and not lines[-1]:
3397
if lines[-1].startswith(prefix):
3398
value = lines[-1][len(prefix):]
3357
3402
class ProfileResult(ForwardingResult):
3634
3679
'bzrlib.tests.commands',
3635
3680
'bzrlib.tests.per_branch',
3636
3681
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3638
3682
'bzrlib.tests.per_foreign_vcs',
3639
3683
'bzrlib.tests.per_interrepository',
3640
3684
'bzrlib.tests.per_intertree',
3641
3685
'bzrlib.tests.per_inventory',
3642
3686
'bzrlib.tests.per_interbranch',
3643
3687
'bzrlib.tests.per_lock',
3644
'bzrlib.tests.per_merger',
3645
3688
'bzrlib.tests.per_transport',
3646
3689
'bzrlib.tests.per_tree',
3647
3690
'bzrlib.tests.per_pack_repository',
4040
4076
return new_test
4043
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4045
"""Helper for permutating tests against an extension module.
4047
This is meant to be used inside a modules 'load_tests()' function. It will
4048
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4049
against both implementations. Setting 'test.module' to the appropriate
4050
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4052
:param standard_tests: A test suite to permute
4053
:param loader: A TestLoader
4054
:param py_module_name: The python path to a python module that can always
4055
be loaded, and will be considered the 'python' implementation. (eg
4056
'bzrlib._chk_map_py')
4057
:param ext_module_name: The python path to an extension module. If the
4058
module cannot be loaded, a single test will be added, which notes that
4059
the module is not available. If it can be loaded, all standard_tests
4060
will be run against that module.
4061
:return: (suite, feature) suite is a test-suite that has all the permuted
4062
tests. feature is the Feature object that can be used to determine if
4063
the module is available.
4066
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4068
('python', {'module': py_module}),
4070
suite = loader.suiteClass()
4071
feature = ModuleAvailableFeature(ext_module_name)
4072
if feature.available():
4073
scenarios.append(('C', {'module': feature.module}))
4075
# the compiled module isn't available, so we add a failing test
4076
class FailWithoutFeature(TestCase):
4077
def test_fail(self):
4078
self.requireFeature(feature)
4079
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4080
result = multiply_tests(standard_tests, scenarios, suite)
4081
return result, feature
4084
def _rmtree_temp_dir(dirname, test_id=None):
4079
def _rmtree_temp_dir(dirname):
4085
4080
# If LANG=C we probably have created some bogus paths
4086
4081
# which rmtree(unicode) will fail to delete
4087
4082
# so make sure we are using rmtree(str) to delete everything
4191
4183
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4194
class _CompatabilityThunkFeature(Feature):
4195
"""This feature is just a thunk to another feature.
4197
It issues a deprecation warning if it is accessed, to let you know that you
4198
should really use a different feature.
4201
def __init__(self, dep_version, module, name,
4202
replacement_name, replacement_module=None):
4203
super(_CompatabilityThunkFeature, self).__init__()
4204
self._module = module
4205
if replacement_module is None:
4206
replacement_module = module
4207
self._replacement_module = replacement_module
4209
self._replacement_name = replacement_name
4210
self._dep_version = dep_version
4211
self._feature = None
4214
if self._feature is None:
4215
depr_msg = self._dep_version % ('%s.%s'
4216
% (self._module, self._name))
4217
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4218
self._replacement_name)
4219
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4220
# Import the new feature and use it as a replacement for the
4222
mod = __import__(self._replacement_module, {}, {},
4223
[self._replacement_name])
4224
self._feature = getattr(mod, self._replacement_name)
4228
return self._feature._probe()
4231
class ModuleAvailableFeature(Feature):
4232
"""This is a feature than describes a module we want to be available.
4234
Declare the name of the module in __init__(), and then after probing, the
4235
module will be available as 'self.module'.
4237
:ivar module: The module if it is available, else None.
4240
def __init__(self, module_name):
4241
super(ModuleAvailableFeature, self).__init__()
4242
self.module_name = module_name
4246
self._module = __import__(self.module_name, {}, {}, [''])
4253
if self.available(): # Make sure the probe has been done
4257
def feature_name(self):
4258
return self.module_name
4261
# This is kept here for compatibility, it is recommended to use
4262
# 'bzrlib.tests.feature.paramiko' instead
4263
ParamikoFeature = _CompatabilityThunkFeature(
4264
deprecated_in((2,1,0)),
4265
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4268
4186
def probe_unicode_in_user_encoding():
4269
4187
"""Try to encode several unicode strings to use in unicode-aware tests.
4270
4188
Return first successfull match.
4429
4362
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4432
class _CaseSensitiveFilesystemFeature(Feature):
4365
class _SubUnitFeature(Feature):
4366
"""Check if subunit is available."""
4434
4368
def _probe(self):
4435
if CaseInsCasePresFilenameFeature.available():
4437
elif CaseInsensitiveFilesystemFeature.available():
4442
4375
def feature_name(self):
4443
return 'case-sensitive filesystem'
4445
# new coding style is for feature instances to be lowercase
4446
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4449
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4450
SubUnitFeature = _CompatabilityThunkFeature(
4451
deprecated_in((2,1,0)),
4452
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4378
SubUnitFeature = _SubUnitFeature()
4453
4379
# Only define SubUnitBzrRunner if subunit is available.
4455
4381
from subunit import TestProtocolClient
4456
from subunit.test_results import AutoTimingTestResultDecorator
4383
from subunit.test_results import AutoTimingTestResultDecorator
4385
AutoTimingTestResultDecorator = lambda x:x
4457
4386
class SubUnitBzrRunner(TextTestRunner):
4458
4387
def run(self, test):
4459
4388
result = AutoTimingTestResultDecorator(