/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2010-01-12 22:36:23 UTC
  • mfrom: (4953 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4955.
  • Revision ID: john@arbash-meinel.com-20100112223623-836x5mou0gm5vsep
merge bzr.dev 4953 to clean up the ui factory issues

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
46
46
import tempfile
47
47
import threading
48
48
import time
 
49
import traceback
49
50
import unittest
50
51
import warnings
51
52
 
 
53
import testtools
 
54
# nb: check this before importing anything else from within it
 
55
_testtools_version = getattr(testtools, '__version__', ())
 
56
if _testtools_version < (0, 9, 2):
 
57
    raise ImportError("need at least testtools 0.9.2: %s is %r"
 
58
        % (testtools.__file__, _testtools_version))
 
59
from testtools import content
52
60
 
53
61
from bzrlib import (
54
62
    branchbuilder,
88
96
from bzrlib.symbol_versioning import (
89
97
    DEPRECATED_PARAMETER,
90
98
    deprecated_function,
 
99
    deprecated_in,
91
100
    deprecated_method,
92
101
    deprecated_passed,
93
102
    )
228
237
                '%d non-main threads were left active in the end.\n'
229
238
                % (TestCase._active_threads - 1))
230
239
 
231
 
    def _extractBenchmarkTime(self, testCase):
 
240
    def _extractBenchmarkTime(self, testCase, details=None):
232
241
        """Add a benchmark time for the current test case."""
 
242
        if details and 'benchtime' in details:
 
243
            return float(''.join(details['benchtime'].iter_bytes()))
233
244
        return getattr(testCase, "_benchtime", None)
234
245
 
235
246
    def _elapsedTestTimeString(self):
269
280
        else:
270
281
            bzr_path = sys.executable
271
282
        self.stream.write(
272
 
            'testing: %s\n' % (bzr_path,))
 
283
            'bzr selftest: %s\n' % (bzr_path,))
273
284
        self.stream.write(
274
285
            '   %s\n' % (
275
286
                    bzrlib.__path__[0],))
320
331
            self.stop()
321
332
        self._cleanupLogFile(test)
322
333
 
323
 
    def addSuccess(self, test):
 
334
    def addSuccess(self, test, details=None):
324
335
        """Tell result that test completed successfully.
325
336
 
326
337
        Called from the TestCase run()
327
338
        """
328
339
        if self._bench_history is not None:
329
 
            benchmark_time = self._extractBenchmarkTime(test)
 
340
            benchmark_time = self._extractBenchmarkTime(test, details)
330
341
            if benchmark_time is not None:
331
342
                self._bench_history.write("%s %s\n" % (
332
343
                    self._formatTime(benchmark_time),
362
373
        self.not_applicable_count += 1
363
374
        self.report_not_applicable(test, reason)
364
375
 
365
 
    def printErrorList(self, flavour, errors):
366
 
        for test, err in errors:
367
 
            self.stream.writeln(self.separator1)
368
 
            self.stream.write("%s: " % flavour)
369
 
            self.stream.writeln(self.getDescription(test))
370
 
            if getattr(test, '_get_log', None) is not None:
371
 
                log_contents = test._get_log()
372
 
                if log_contents:
373
 
                    self.stream.write('\n')
374
 
                    self.stream.write(
375
 
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
376
 
                    self.stream.write('\n')
377
 
                    self.stream.write(log_contents)
378
 
                    self.stream.write('\n')
379
 
                    self.stream.write(
380
 
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
381
 
                    self.stream.write('\n')
382
 
            self.stream.writeln(self.separator2)
383
 
            self.stream.writeln("%s" % err)
384
 
 
385
376
    def _post_mortem(self):
386
377
        """Start a PDB post mortem session."""
387
378
        if os.environ.get('BZR_TEST_PDB', None):
467
458
            a += '%dm%ds' % (runtime / 60, runtime % 60)
468
459
        else:
469
460
            a += '%ds' % runtime
470
 
        if self.error_count:
471
 
            a += ', %d err' % self.error_count
472
 
        if self.failure_count:
473
 
            a += ', %d fail' % self.failure_count
 
461
        total_fail_count = self.error_count + self.failure_count
 
462
        if total_fail_count:
 
463
            a += ', %d failed' % total_fail_count
474
464
        # if self.unsupported:
475
465
        #     a += ', %d missing' % len(self.unsupported)
476
466
        a += ']'
499
489
            ))
500
490
 
501
491
    def report_known_failure(self, test, err):
502
 
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
503
 
            self._test_description(test), err[1]))
 
492
        pass
504
493
 
505
494
    def report_skip(self, test, reason):
506
495
        pass
604
593
            applied left to right - the first element in the list is the 
605
594
            innermost decorator.
606
595
        """
 
596
        # stream may know claim to know to write unicode strings, but in older
 
597
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
598
        # specifically a built in file with encoding 'UTF-8' will still try
 
599
        # to encode using ascii.
 
600
        new_encoding = osutils.get_terminal_encoding()
 
601
        codec = codecs.lookup(new_encoding)
 
602
        if type(codec) is tuple:
 
603
            # Python 2.4
 
604
            encode = codec[0]
 
605
        else:
 
606
            encode = codec.encode
 
607
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
 
608
        stream.encoding = new_encoding
607
609
        self.stream = unittest._WritelnDecorator(stream)
608
610
        self.descriptions = descriptions
609
611
        self.verbosity = verbosity
629
631
        for decorator in self._result_decorators:
630
632
            result = decorator(result)
631
633
            result.stop_early = self.stop_on_failure
632
 
        try:
633
 
            import testtools
634
 
        except ImportError:
635
 
            pass
636
 
        else:
637
 
            if isinstance(test, testtools.ConcurrentTestSuite):
638
 
                # We need to catch bzr specific behaviors
639
 
                result = BZRTransformingResult(result)
640
634
        result.startTestRun()
641
635
        try:
642
636
            test.run(result)
660
654
                        % (type(suite), suite))
661
655
 
662
656
 
663
 
class TestSkipped(Exception):
664
 
    """Indicates that a test was intentionally skipped, rather than failing."""
 
657
TestSkipped = testtools.testcase.TestSkipped
665
658
 
666
659
 
667
660
class TestNotApplicable(TestSkipped):
673
666
    """
674
667
 
675
668
 
676
 
class KnownFailure(AssertionError):
677
 
    """Indicates that a test failed in a precisely expected manner.
678
 
 
679
 
    Such failures dont block the whole test suite from passing because they are
680
 
    indicators of partially completed code or of future work. We have an
681
 
    explicit error for them so that we can ensure that they are always visible:
682
 
    KnownFailures are always shown in the output of bzr selftest.
683
 
    """
 
669
# traceback._some_str fails to format exceptions that have the default
 
670
# __str__ which does an implicit ascii conversion. However, repr() on those
 
671
# objects works, for all that its not quite what the doctor may have ordered.
 
672
def _clever_some_str(value):
 
673
    try:
 
674
        return str(value)
 
675
    except:
 
676
        try:
 
677
            return repr(value).replace('\\n', '\n')
 
678
        except:
 
679
            return '<unprintable %s object>' % type(value).__name__
 
680
 
 
681
traceback._some_str = _clever_some_str
 
682
 
 
683
 
 
684
# deprecated - use self.knownFailure(), or self.expectFailure.
 
685
KnownFailure = testtools.testcase._ExpectedFailure
684
686
 
685
687
 
686
688
class UnavailableFeature(Exception):
762
764
        return NullProgressView()
763
765
 
764
766
 
765
 
class TestCase(unittest.TestCase):
 
767
class TestCase(testtools.TestCase):
766
768
    """Base class for bzr unit tests.
767
769
 
768
770
    Tests that need access to disk resources should subclass
787
789
    _leaking_threads_tests = 0
788
790
    _first_thread_leaker_id = None
789
791
    _log_file_name = None
790
 
    _log_contents = ''
791
 
    _keep_log_file = False
792
792
    # record lsprof data when performing benchmark calls.
793
793
    _gather_lsprof_in_benchmarks = False
794
 
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
795
 
                     '_log_contents', '_log_file_name', '_benchtime',
796
 
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
797
794
 
798
795
    def __init__(self, methodName='testMethod'):
799
796
        super(TestCase, self).__init__(methodName)
800
797
        self._cleanups = []
801
 
        self._bzr_test_setUp_run = False
802
 
        self._bzr_test_tearDown_run = False
803
798
        self._directory_isolation = True
 
799
        self.exception_handlers.insert(0,
 
800
            (UnavailableFeature, self._do_unsupported_or_skip))
 
801
        self.exception_handlers.insert(0,
 
802
            (TestNotApplicable, self._do_not_applicable))
804
803
 
805
804
    def setUp(self):
806
 
        unittest.TestCase.setUp(self)
807
 
        self._bzr_test_setUp_run = True
 
805
        super(TestCase, self).setUp()
 
806
        for feature in getattr(self, '_test_needs_features', []):
 
807
            self.requireFeature(feature)
 
808
        self._log_contents = None
 
809
        self.addDetail("log", content.Content(content.ContentType("text",
 
810
            "plain", {"charset": "utf8"}),
 
811
            lambda:[self._get_log(keep_log_file=True)]))
808
812
        self._cleanEnvironment()
809
813
        self._silenceUI()
810
814
        self._startLogFile()
1013
1017
        server's urls to be used.
1014
1018
        """
1015
1019
        if backing_server is None:
1016
 
            transport_server.setUp()
 
1020
            transport_server.start_server()
1017
1021
        else:
1018
 
            transport_server.setUp(backing_server)
1019
 
        self.addCleanup(transport_server.tearDown)
 
1022
            transport_server.start_server(backing_server)
 
1023
        self.addCleanup(transport_server.stop_server)
1020
1024
        # Obtain a real transport because if the server supplies a password, it
1021
1025
        # will be hidden from the base on the client side.
1022
1026
        t = get_transport(transport_server.get_url())
1289
1293
                m += ": " + msg
1290
1294
            self.fail(m)
1291
1295
 
1292
 
    def expectFailure(self, reason, assertion, *args, **kwargs):
1293
 
        """Invoke a test, expecting it to fail for the given reason.
1294
 
 
1295
 
        This is for assertions that ought to succeed, but currently fail.
1296
 
        (The failure is *expected* but not *wanted*.)  Please be very precise
1297
 
        about the failure you're expecting.  If a new bug is introduced,
1298
 
        AssertionError should be raised, not KnownFailure.
1299
 
 
1300
 
        Frequently, expectFailure should be followed by an opposite assertion.
1301
 
        See example below.
1302
 
 
1303
 
        Intended to be used with a callable that raises AssertionError as the
1304
 
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
1305
 
 
1306
 
        Raises KnownFailure if the test fails.  Raises AssertionError if the
1307
 
        test succeeds.
1308
 
 
1309
 
        example usage::
1310
 
 
1311
 
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
1312
 
                             dynamic_val)
1313
 
          self.assertEqual(42, dynamic_val)
1314
 
 
1315
 
          This means that a dynamic_val of 54 will cause the test to raise
1316
 
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
1317
 
          only a dynamic_val of 42 will allow the test to pass.  Anything other
1318
 
          than 54 or 42 will cause an AssertionError.
1319
 
        """
1320
 
        try:
1321
 
            assertion(*args, **kwargs)
1322
 
        except AssertionError:
1323
 
            raise KnownFailure(reason)
1324
 
        else:
1325
 
            self.fail('Unexpected success.  Should have failed: %s' % reason)
1326
 
 
1327
1296
    def assertFileEqual(self, content, path):
1328
1297
        """Fail if path does not contain 'content'."""
1329
1298
        self.failUnlessExists(path)
1479
1448
 
1480
1449
        Close the file and delete it, unless setKeepLogfile was called.
1481
1450
        """
1482
 
        if self._log_file is None:
1483
 
            return
 
1451
        if bzrlib.trace._trace_file:
 
1452
            # flush the log file, to get all content
 
1453
            bzrlib.trace._trace_file.flush()
1484
1454
        bzrlib.trace.pop_log_file(self._log_memento)
1485
 
        self._log_file.close()
1486
 
        self._log_file = None
1487
 
        if not self._keep_log_file:
1488
 
            os.remove(self._log_file_name)
1489
 
            self._log_file_name = None
1490
 
 
1491
 
    def setKeepLogfile(self):
1492
 
        """Make the logfile not be deleted when _finishLogFile is called."""
1493
 
        self._keep_log_file = True
 
1455
        # Cache the log result and delete the file on disk
 
1456
        self._get_log(False)
1494
1457
 
1495
1458
    def thisFailsStrictLockCheck(self):
1496
1459
        """It is known that this test would fail with -Dstrict_locks.
1587
1550
        else:
1588
1551
            addSkip(self, reason)
1589
1552
 
1590
 
    def _do_known_failure(self, result):
 
1553
    @staticmethod
 
1554
    def _do_known_failure(self, result, e):
1591
1555
        err = sys.exc_info()
1592
1556
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1593
1557
        if addExpectedFailure is not None:
1595
1559
        else:
1596
1560
            result.addSuccess(self)
1597
1561
 
 
1562
    @staticmethod
1598
1563
    def _do_not_applicable(self, result, e):
1599
1564
        if not e.args:
1600
1565
            reason = 'No reason given'
1606
1571
        else:
1607
1572
            self._do_skip(result, reason)
1608
1573
 
1609
 
    def _do_unsupported_or_skip(self, result, reason):
 
1574
    @staticmethod
 
1575
    def _do_unsupported_or_skip(self, result, e):
 
1576
        reason = e.args[0]
1610
1577
        addNotSupported = getattr(result, 'addNotSupported', None)
1611
1578
        if addNotSupported is not None:
1612
1579
            result.addNotSupported(self, reason)
1613
1580
        else:
1614
1581
            self._do_skip(result, reason)
1615
1582
 
1616
 
    def run(self, result=None):
1617
 
        if result is None: result = self.defaultTestResult()
1618
 
        result.startTest(self)
1619
 
        try:
1620
 
            self._run(result)
1621
 
            return result
1622
 
        finally:
1623
 
            result.stopTest(self)
1624
 
 
1625
 
    def _run(self, result):
1626
 
        for feature in getattr(self, '_test_needs_features', []):
1627
 
            if not feature.available():
1628
 
                return self._do_unsupported_or_skip(result, feature)
1629
 
        try:
1630
 
            absent_attr = object()
1631
 
            # Python 2.5
1632
 
            method_name = getattr(self, '_testMethodName', absent_attr)
1633
 
            if method_name is absent_attr:
1634
 
                # Python 2.4
1635
 
                method_name = getattr(self, '_TestCase__testMethodName')
1636
 
            testMethod = getattr(self, method_name)
1637
 
            try:
1638
 
                try:
1639
 
                    self.setUp()
1640
 
                    if not self._bzr_test_setUp_run:
1641
 
                        self.fail(
1642
 
                            "test setUp did not invoke "
1643
 
                            "bzrlib.tests.TestCase's setUp")
1644
 
                except KeyboardInterrupt:
1645
 
                    self._runCleanups()
1646
 
                    raise
1647
 
                except KnownFailure:
1648
 
                    self._do_known_failure(result)
1649
 
                    self.tearDown()
1650
 
                    return
1651
 
                except TestNotApplicable, e:
1652
 
                    self._do_not_applicable(result, e)
1653
 
                    self.tearDown()
1654
 
                    return
1655
 
                except TestSkipped, e:
1656
 
                    self._do_skip(result, e.args[0])
1657
 
                    self.tearDown()
1658
 
                    return result
1659
 
                except UnavailableFeature, e:
1660
 
                    self._do_unsupported_or_skip(result, e.args[0])
1661
 
                    self.tearDown()
1662
 
                    return
1663
 
                except:
1664
 
                    result.addError(self, sys.exc_info())
1665
 
                    self._runCleanups()
1666
 
                    return result
1667
 
 
1668
 
                ok = False
1669
 
                try:
1670
 
                    testMethod()
1671
 
                    ok = True
1672
 
                except KnownFailure:
1673
 
                    self._do_known_failure(result)
1674
 
                except self.failureException:
1675
 
                    result.addFailure(self, sys.exc_info())
1676
 
                except TestNotApplicable, e:
1677
 
                    self._do_not_applicable(result, e)
1678
 
                except TestSkipped, e:
1679
 
                    if not e.args:
1680
 
                        reason = "No reason given."
1681
 
                    else:
1682
 
                        reason = e.args[0]
1683
 
                    self._do_skip(result, reason)
1684
 
                except UnavailableFeature, e:
1685
 
                    self._do_unsupported_or_skip(result, e.args[0])
1686
 
                except KeyboardInterrupt:
1687
 
                    self._runCleanups()
1688
 
                    raise
1689
 
                except:
1690
 
                    result.addError(self, sys.exc_info())
1691
 
 
1692
 
                try:
1693
 
                    self.tearDown()
1694
 
                    if not self._bzr_test_tearDown_run:
1695
 
                        self.fail(
1696
 
                            "test tearDown did not invoke "
1697
 
                            "bzrlib.tests.TestCase's tearDown")
1698
 
                except KeyboardInterrupt:
1699
 
                    self._runCleanups()
1700
 
                    raise
1701
 
                except:
1702
 
                    result.addError(self, sys.exc_info())
1703
 
                    self._runCleanups()
1704
 
                    ok = False
1705
 
                if ok: result.addSuccess(self)
1706
 
                return result
1707
 
            except KeyboardInterrupt:
1708
 
                self._runCleanups()
1709
 
                raise
1710
 
        finally:
1711
 
            saved_attrs = {}
1712
 
            for attr_name in self.attrs_to_keep:
1713
 
                if attr_name in self.__dict__:
1714
 
                    saved_attrs[attr_name] = self.__dict__[attr_name]
1715
 
            self.__dict__ = saved_attrs
1716
 
 
1717
 
    def tearDown(self):
1718
 
        self._runCleanups()
1719
 
        self._log_contents = ''
1720
 
        self._bzr_test_tearDown_run = True
1721
 
        unittest.TestCase.tearDown(self)
1722
 
 
1723
1583
    def time(self, callable, *args, **kwargs):
1724
1584
        """Run callable and accrue the time it takes to the benchmark time.
1725
1585
 
1728
1588
        self._benchcalls.
1729
1589
        """
1730
1590
        if self._benchtime is None:
 
1591
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1592
                "text", "plain"), lambda:[str(self._benchtime)]))
1731
1593
            self._benchtime = 0
1732
1594
        start = time.time()
1733
1595
        try:
1742
1604
        finally:
1743
1605
            self._benchtime += time.time() - start
1744
1606
 
1745
 
    def _runCleanups(self):
1746
 
        """Run registered cleanup functions.
1747
 
 
1748
 
        This should only be called from TestCase.tearDown.
1749
 
        """
1750
 
        # TODO: Perhaps this should keep running cleanups even if
1751
 
        # one of them fails?
1752
 
 
1753
 
        # Actually pop the cleanups from the list so tearDown running
1754
 
        # twice is safe (this happens for skipped tests).
1755
 
        while self._cleanups:
1756
 
            cleanup, args, kwargs = self._cleanups.pop()
1757
 
            cleanup(*args, **kwargs)
1758
 
 
1759
1607
    def log(self, *args):
1760
1608
        mutter(*args)
1761
1609
 
1762
1610
    def _get_log(self, keep_log_file=False):
1763
 
        """Get the log from bzrlib.trace calls from this test.
 
1611
        """Internal helper to get the log from bzrlib.trace for this test.
 
1612
 
 
1613
        Please use self.getDetails, or self.get_log to access this in test case
 
1614
        code.
1764
1615
 
1765
1616
        :param keep_log_file: When True, if the log is still a file on disk
1766
1617
            leave it as a file on disk. When False, if the log is still a file
1768
1619
            self._log_contents.
1769
1620
        :return: A string containing the log.
1770
1621
        """
1771
 
        # flush the log file, to get all content
 
1622
        if self._log_contents is not None:
 
1623
            try:
 
1624
                self._log_contents.decode('utf8')
 
1625
            except UnicodeDecodeError:
 
1626
                unicodestr = self._log_contents.decode('utf8', 'replace')
 
1627
                self._log_contents = unicodestr.encode('utf8')
 
1628
            return self._log_contents
1772
1629
        import bzrlib.trace
1773
1630
        if bzrlib.trace._trace_file:
 
1631
            # flush the log file, to get all content
1774
1632
            bzrlib.trace._trace_file.flush()
1775
 
        if self._log_contents:
1776
 
            # XXX: this can hardly contain the content flushed above --vila
1777
 
            # 20080128
1778
 
            return self._log_contents
1779
1633
        if self._log_file_name is not None:
1780
1634
            logfile = open(self._log_file_name)
1781
1635
            try:
1782
1636
                log_contents = logfile.read()
1783
1637
            finally:
1784
1638
                logfile.close()
 
1639
            try:
 
1640
                log_contents.decode('utf8')
 
1641
            except UnicodeDecodeError:
 
1642
                unicodestr = log_contents.decode('utf8', 'replace')
 
1643
                log_contents = unicodestr.encode('utf8')
1785
1644
            if not keep_log_file:
 
1645
                self._log_file.close()
 
1646
                self._log_file = None
 
1647
                # Permit multiple calls to get_log until we clean it up in
 
1648
                # finishLogFile
1786
1649
                self._log_contents = log_contents
1787
1650
                try:
1788
1651
                    os.remove(self._log_file_name)
1792
1655
                                             ' %r\n' % self._log_file_name))
1793
1656
                    else:
1794
1657
                        raise
 
1658
                self._log_file_name = None
1795
1659
            return log_contents
1796
1660
        else:
1797
 
            return "DELETED log file to reduce memory footprint"
 
1661
            return "No log file content and no log file name."
 
1662
 
 
1663
    def get_log(self):
 
1664
        """Get a unicode string containing the log from bzrlib.trace.
 
1665
 
 
1666
        Undecodable characters are replaced.
 
1667
        """
 
1668
        return u"".join(self.getDetails()['log'].iter_text())
1798
1669
 
1799
1670
    def requireFeature(self, feature):
1800
1671
        """This test requires a specific feature is available.
3219
3090
        if self.randomised:
3220
3091
            return iter(self._tests)
3221
3092
        self.randomised = True
3222
 
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3093
        self.stream.write("Randomizing test order using seed %s\n\n" %
3223
3094
            (self.actual_seed()))
3224
3095
        # Initialise the random number generator.
3225
3096
        random.seed(self.actual_seed())
3282
3153
    concurrency = osutils.local_concurrency()
3283
3154
    result = []
3284
3155
    from subunit import TestProtocolClient, ProtocolTestCase
 
3156
    from subunit.test_results import AutoTimingTestResultDecorator
3285
3157
    class TestInOtherProcess(ProtocolTestCase):
3286
3158
        # Should be in subunit, I think. RBC.
3287
3159
        def __init__(self, stream, pid):
3310
3182
                sys.stdin.close()
3311
3183
                sys.stdin = None
3312
3184
                stream = os.fdopen(c2pwrite, 'wb', 1)
3313
 
                subunit_result = BzrAutoTimingTestResultDecorator(
 
3185
                subunit_result = AutoTimingTestResultDecorator(
3314
3186
                    TestProtocolClient(stream))
3315
3187
                process_suite.run(subunit_result)
3316
3188
            finally:
3408
3280
 
3409
3281
    def addFailure(self, test, err):
3410
3282
        self.result.addFailure(test, err)
3411
 
 
3412
 
 
3413
 
class BZRTransformingResult(ForwardingResult):
3414
 
 
3415
 
    def addError(self, test, err):
3416
 
        feature = self._error_looks_like('UnavailableFeature: ', err)
3417
 
        if feature is not None:
3418
 
            self.result.addNotSupported(test, feature)
3419
 
        else:
3420
 
            self.result.addError(test, err)
3421
 
 
3422
 
    def addFailure(self, test, err):
3423
 
        known = self._error_looks_like('KnownFailure: ', err)
3424
 
        if known is not None:
3425
 
            self.result.addExpectedFailure(test,
3426
 
                [KnownFailure, KnownFailure(known), None])
3427
 
        else:
3428
 
            self.result.addFailure(test, err)
3429
 
 
3430
 
    def _error_looks_like(self, prefix, err):
3431
 
        """Deserialize exception and returns the stringify value."""
3432
 
        import subunit
3433
 
        value = None
3434
 
        typ, exc, _ = err
3435
 
        if isinstance(exc, subunit.RemoteException):
3436
 
            # stringify the exception gives access to the remote traceback
3437
 
            # We search the last line for 'prefix'
3438
 
            lines = str(exc).split('\n')
3439
 
            while lines and not lines[-1]:
3440
 
                lines.pop(-1)
3441
 
            if lines:
3442
 
                if lines[-1].startswith(prefix):
3443
 
                    value = lines[-1][len(prefix):]
3444
 
        return value
3445
 
 
3446
 
 
3447
 
try:
3448
 
    from subunit.test_results import AutoTimingTestResultDecorator
3449
 
    # Expected failure should be seen as a success not a failure Once subunit
3450
 
    # provide native support for that, BZRTransformingResult and this class
3451
 
    # will become useless.
3452
 
    class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
3453
 
 
3454
 
        def addExpectedFailure(self, test, err):
3455
 
            self._before_event()
3456
 
            return self._call_maybe("addExpectedFailure", self._degrade_skip,
3457
 
                                    test, err)
3458
 
except ImportError:
3459
 
    # Let's just define a no-op decorator
3460
 
    BzrAutoTimingTestResultDecorator = lambda x:x
 
3283
ForwardingResult = testtools.ExtendedToOriginalDecorator
3461
3284
 
3462
3285
 
3463
3286
class ProfileResult(ForwardingResult):
3746
3569
        'bzrlib.tests.per_inventory',
3747
3570
        'bzrlib.tests.per_interbranch',
3748
3571
        'bzrlib.tests.per_lock',
 
3572
        'bzrlib.tests.per_merger',
3749
3573
        'bzrlib.tests.per_transport',
3750
3574
        'bzrlib.tests.per_tree',
3751
3575
        'bzrlib.tests.per_pack_repository',
3756
3580
        'bzrlib.tests.per_versionedfile',
3757
3581
        'bzrlib.tests.per_workingtree',
3758
3582
        'bzrlib.tests.test__annotator',
 
3583
        'bzrlib.tests.test__bencode',
3759
3584
        'bzrlib.tests.test__chk_map',
3760
3585
        'bzrlib.tests.test__dirstate_helpers',
3761
3586
        'bzrlib.tests.test__groupcompress',
3769
3594
        'bzrlib.tests.test_api',
3770
3595
        'bzrlib.tests.test_atomicfile',
3771
3596
        'bzrlib.tests.test_bad_files',
3772
 
        'bzrlib.tests.test_bencode',
3773
3597
        'bzrlib.tests.test_bisect_multi',
3774
3598
        'bzrlib.tests.test_branch',
3775
3599
        'bzrlib.tests.test_branchbuilder',
4137
3961
    return new_test
4138
3962
 
4139
3963
 
 
3964
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
3965
                                ext_module_name):
 
3966
    """Helper for permutating tests against an extension module.
 
3967
 
 
3968
    This is meant to be used inside a modules 'load_tests()' function. It will
 
3969
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
3970
    against both implementations. Setting 'test.module' to the appropriate
 
3971
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
3972
 
 
3973
    :param standard_tests: A test suite to permute
 
3974
    :param loader: A TestLoader
 
3975
    :param py_module_name: The python path to a python module that can always
 
3976
        be loaded, and will be considered the 'python' implementation. (eg
 
3977
        'bzrlib._chk_map_py')
 
3978
    :param ext_module_name: The python path to an extension module. If the
 
3979
        module cannot be loaded, a single test will be added, which notes that
 
3980
        the module is not available. If it can be loaded, all standard_tests
 
3981
        will be run against that module.
 
3982
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
3983
        tests. feature is the Feature object that can be used to determine if
 
3984
        the module is available.
 
3985
    """
 
3986
 
 
3987
    py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
 
3988
    scenarios = [
 
3989
        ('python', {'module': py_module}),
 
3990
    ]
 
3991
    suite = loader.suiteClass()
 
3992
    feature = ModuleAvailableFeature(ext_module_name)
 
3993
    if feature.available():
 
3994
        scenarios.append(('C', {'module': feature.module}))
 
3995
    else:
 
3996
        # the compiled module isn't available, so we add a failing test
 
3997
        class FailWithoutFeature(TestCase):
 
3998
            def test_fail(self):
 
3999
                self.requireFeature(feature)
 
4000
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4001
    result = multiply_tests(standard_tests, scenarios, suite)
 
4002
    return result, feature
 
4003
 
 
4004
 
4140
4005
def _rmtree_temp_dir(dirname, test_id=None):
4141
4006
    # If LANG=C we probably have created some bogus paths
4142
4007
    # which rmtree(unicode) will fail to delete
4247
4112
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4248
4113
 
4249
4114
 
 
4115
class _CompatabilityThunkFeature(Feature):
 
4116
    """This feature is just a thunk to another feature.
 
4117
 
 
4118
    It issues a deprecation warning if it is accessed, to let you know that you
 
4119
    should really use a different feature.
 
4120
    """
 
4121
 
 
4122
    def __init__(self, module, name, this_name, dep_version):
 
4123
        super(_CompatabilityThunkFeature, self).__init__()
 
4124
        self._module = module
 
4125
        self._name = name
 
4126
        self._this_name = this_name
 
4127
        self._dep_version = dep_version
 
4128
        self._feature = None
 
4129
 
 
4130
    def _ensure(self):
 
4131
        if self._feature is None:
 
4132
            msg = (self._dep_version % self._this_name) + (
 
4133
                   ' Use %s.%s instead.' % (self._module, self._name))
 
4134
            symbol_versioning.warn(msg, DeprecationWarning)
 
4135
            mod = __import__(self._module, {}, {}, [self._name])
 
4136
            self._feature = getattr(mod, self._name)
 
4137
 
 
4138
    def _probe(self):
 
4139
        self._ensure()
 
4140
        return self._feature._probe()
 
4141
 
 
4142
 
4250
4143
class ModuleAvailableFeature(Feature):
4251
4144
    """This is a feature than describes a module we want to be available.
4252
4145
 
4277
4170
        return self.module_name
4278
4171
 
4279
4172
 
 
4173
# This is kept here for compatibility, it is recommended to use
 
4174
# 'bzrlib.tests.feature.paramiko' instead
 
4175
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
 
4176
    'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
 
4177
 
4280
4178
 
4281
4179
def probe_unicode_in_user_encoding():
4282
4180
    """Try to encode several unicode strings to use in unicode-aware tests.
4332
4230
HTTPSServerFeature = _HTTPSServerFeature()
4333
4231
 
4334
4232
 
4335
 
class _ParamikoFeature(Feature):
4336
 
    """Is paramiko available?"""
4337
 
 
4338
 
    def _probe(self):
4339
 
        try:
4340
 
            from bzrlib.transport.sftp import SFTPAbsoluteServer
4341
 
            return True
4342
 
        except errors.ParamikoNotPresent:
4343
 
            return False
4344
 
 
4345
 
    def feature_name(self):
4346
 
        return "Paramiko"
4347
 
 
4348
 
 
4349
 
ParamikoFeature = _ParamikoFeature()
4350
 
 
4351
 
 
4352
4233
class _UnicodeFilename(Feature):
4353
4234
    """Does the filesystem support Unicode filenames?"""
4354
4235
 
4459
4340
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4460
4341
 
4461
4342
 
4462
 
class _SubUnitFeature(Feature):
4463
 
    """Check if subunit is available."""
4464
 
 
4465
 
    def _probe(self):
4466
 
        try:
4467
 
            import subunit
4468
 
            return True
4469
 
        except ImportError:
4470
 
            return False
4471
 
 
4472
 
    def feature_name(self):
4473
 
        return 'subunit'
4474
 
 
4475
 
SubUnitFeature = _SubUnitFeature()
 
4343
# Kept for compatibility, use bzrlib.tests.features.subunit instead
 
4344
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
 
4345
    'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4476
4346
# Only define SubUnitBzrRunner if subunit is available.
4477
4347
try:
4478
4348
    from subunit import TestProtocolClient
 
4349
    from subunit.test_results import AutoTimingTestResultDecorator
4479
4350
    class SubUnitBzrRunner(TextTestRunner):
4480
4351
        def run(self, test):
4481
 
            result = BzrAutoTimingTestResultDecorator(
 
4352
            result = AutoTimingTestResultDecorator(
4482
4353
                TestProtocolClient(self.stream))
4483
4354
            test.run(result)
4484
4355
            return result