333
338
def test_scenarios(self):
334
339
# check that constructor parameters are passed through to the adapted
336
from bzrlib.tests.per_workingtree import make_scenarios
341
from .per_workingtree import make_scenarios
339
formats = [workingtree.WorkingTreeFormat2(),
340
workingtree.WorkingTreeFormat3(),]
341
scenarios = make_scenarios(server1, server2, formats)
344
formats = [workingtree_4.WorkingTreeFormat4(),
345
workingtree_3.WorkingTreeFormat3(),
346
workingtree_4.WorkingTreeFormat6()]
347
scenarios = make_scenarios(server1, server2, formats,
348
remote_server='c', remote_readonly_server='d',
349
remote_backing_server='e')
342
350
self.assertEqual([
343
('WorkingTreeFormat2',
344
{'bzrdir_format': formats[0]._matchingbzrdir,
351
('WorkingTreeFormat4',
352
{'bzrdir_format': formats[0]._matchingcontroldir,
345
353
'transport_readonly_server': 'b',
346
354
'transport_server': 'a',
347
355
'workingtree_format': formats[0]}),
348
356
('WorkingTreeFormat3',
349
{'bzrdir_format': formats[1]._matchingbzrdir,
350
'transport_readonly_server': 'b',
351
'transport_server': 'a',
352
'workingtree_format': formats[1]})],
357
{'bzrdir_format': formats[1]._matchingcontroldir,
358
'transport_readonly_server': 'b',
359
'transport_server': 'a',
360
'workingtree_format': formats[1]}),
361
('WorkingTreeFormat6',
362
{'bzrdir_format': formats[2]._matchingcontroldir,
363
'transport_readonly_server': 'b',
364
'transport_server': 'a',
365
'workingtree_format': formats[2]}),
366
('WorkingTreeFormat6,remote',
367
{'bzrdir_format': formats[2]._matchingcontroldir,
368
'repo_is_remote': True,
369
'transport_readonly_server': 'd',
370
'transport_server': 'c',
371
'vfs_transport_factory': 'e',
372
'workingtree_format': formats[2]}),
356
376
class TestTreeScenarios(tests.TestCase):
358
378
def test_scenarios(self):
359
379
# the tree implementation scenario generator is meant to setup one
360
# instance for each working tree format, and one additional instance
380
# instance for each working tree format, one additional instance
361
381
# that will use the default wt format, but create a revision tree for
362
# the tests. this means that the wt ones should have the
363
# workingtree_to_test_tree attribute set to 'return_parameter' and the
364
# revision one set to revision_tree_from_workingtree.
382
# the tests, and one more that uses the default wt format as a
383
# lightweight checkout of a remote repository. This means that the wt
384
# ones should have the workingtree_to_test_tree attribute set to
385
# 'return_parameter' and the revision one set to
386
# revision_tree_from_workingtree.
366
from bzrlib.tests.per_tree import (
388
from .per_tree import (
367
389
_dirstate_tree_from_workingtree,
369
391
preview_tree_pre,
376
formats = [workingtree.WorkingTreeFormat2(),
377
workingtree.WorkingTreeFormat3(),]
398
smart_server = test_server.SmartTCPServer_for_testing
399
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
400
mem_server = memory.MemoryServer
401
formats = [workingtree_4.WorkingTreeFormat4(),
402
workingtree_3.WorkingTreeFormat3(),]
378
403
scenarios = make_scenarios(server1, server2, formats)
379
self.assertEqual(7, len(scenarios))
380
default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
wt4_format = workingtree.WorkingTreeFormat4()
382
wt5_format = workingtree.WorkingTreeFormat5()
404
self.assertEqual(8, len(scenarios))
405
default_wt_format = workingtree.format_registry.get_default()
406
wt4_format = workingtree_4.WorkingTreeFormat4()
407
wt5_format = workingtree_4.WorkingTreeFormat5()
408
wt6_format = workingtree_4.WorkingTreeFormat6()
383
409
expected_scenarios = [
384
('WorkingTreeFormat2',
385
{'bzrdir_format': formats[0]._matchingbzrdir,
410
('WorkingTreeFormat4',
411
{'bzrdir_format': formats[0]._matchingcontroldir,
386
412
'transport_readonly_server': 'b',
387
413
'transport_server': 'a',
388
414
'workingtree_format': formats[0],
389
415
'_workingtree_to_test_tree': return_parameter,
391
417
('WorkingTreeFormat3',
392
{'bzrdir_format': formats[1]._matchingbzrdir,
418
{'bzrdir_format': formats[1]._matchingcontroldir,
393
419
'transport_readonly_server': 'b',
394
420
'transport_server': 'a',
395
421
'workingtree_format': formats[1],
396
422
'_workingtree_to_test_tree': return_parameter,
424
('WorkingTreeFormat6,remote',
425
{'bzrdir_format': wt6_format._matchingcontroldir,
426
'repo_is_remote': True,
427
'transport_readonly_server': smart_readonly_server,
428
'transport_server': smart_server,
429
'vfs_transport_factory': mem_server,
430
'workingtree_format': wt6_format,
431
'_workingtree_to_test_tree': return_parameter,
399
434
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
400
'bzrdir_format': default_wt_format._matchingbzrdir,
435
'bzrdir_format': default_wt_format._matchingcontroldir,
401
436
'transport_readonly_server': 'b',
402
437
'transport_server': 'a',
403
438
'workingtree_format': default_wt_format,
405
440
('DirStateRevisionTree,WT4',
406
441
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
407
'bzrdir_format': wt4_format._matchingbzrdir,
442
'bzrdir_format': wt4_format._matchingcontroldir,
408
443
'transport_readonly_server': 'b',
409
444
'transport_server': 'a',
410
445
'workingtree_format': wt4_format,
412
447
('DirStateRevisionTree,WT5',
413
448
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
414
'bzrdir_format': wt5_format._matchingbzrdir,
449
'bzrdir_format': wt5_format._matchingcontroldir,
415
450
'transport_readonly_server': 'b',
416
451
'transport_server': 'a',
417
452
'workingtree_format': wt5_format,
420
455
{'_workingtree_to_test_tree': preview_tree_pre,
421
'bzrdir_format': default_wt_format._matchingbzrdir,
456
'bzrdir_format': default_wt_format._matchingcontroldir,
422
457
'transport_readonly_server': 'b',
423
458
'transport_server': 'a',
424
459
'workingtree_format': default_wt_format}),
425
460
('PreviewTreePost',
426
461
{'_workingtree_to_test_tree': preview_tree_post,
427
'bzrdir_format': default_wt_format._matchingbzrdir,
462
'bzrdir_format': default_wt_format._matchingcontroldir,
428
463
'transport_readonly_server': 'b',
429
464
'transport_server': 'a',
430
465
'workingtree_format': default_wt_format}),
592
633
the_branch = builder.get_branch()
593
634
# Guard against regression into MemoryTransport leaking
594
635
# files to disk instead of keeping them in memory.
595
self.failIf(osutils.lexists('dir'))
596
dir_format = bzrdir.format_registry.make_bzrdir('knit')
636
self.assertFalse(osutils.lexists('dir'))
637
dir_format = controldir.format_registry.make_controldir('knit')
597
638
self.assertEqual(dir_format.repository_format.__class__,
598
639
the_branch.repository._format.__class__)
599
self.assertEqual('Bazaar-NG Knit Repository Format 1',
640
self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
600
641
self.get_transport().get_bytes(
601
642
'dir/.bzr/repository/format'))
603
644
def test_dangling_locks_cause_failures(self):
604
645
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
646
def test_function(self):
606
t = self.get_transport('.')
647
t = self.get_transport_from_path('.')
607
648
l = lockdir.LockDir(t, 'lock')
610
651
test = TestDanglingLock('test_function')
611
652
result = test.run()
653
total_failures = result.errors + result.failures
612
654
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
655
self.assertEqual(1, len(total_failures))
615
657
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
659
self.assertEqual(0, len(total_failures))
620
662
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
663
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
665
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
666
from ..transport.readonly import ReadonlyTransportDecorator
626
667
self.vfs_transport_factory = memory.MemoryServer
627
668
self.transport_readonly_server = None
628
669
# calling get_readonly_transport() constructs a decorator on the url
630
671
url = self.get_readonly_url()
631
672
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
673
t = transport.get_transport_from_url(url)
674
t2 = transport.get_transport_from_url(url2)
675
self.assertIsInstance(t, ReadonlyTransportDecorator)
676
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
677
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
679
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
680
from .http_server import HttpServer
681
from ..transport.http import HttpTransport
642
682
self.transport_server = test_server.LocalURLServer
643
683
self.transport_readonly_server = HttpServer
644
684
# calling get_readonly_transport() gives us a HTTP server instance.
645
685
url = self.get_readonly_url()
646
686
url2 = self.get_readonly_url('foo/bar')
647
687
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
688
t = transport.get_transport_from_url(url)
689
t2 = transport.get_transport_from_url(url2)
690
self.assertIsInstance(t, HttpTransport)
691
self.assertIsInstance(t2, HttpTransport)
652
692
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
694
def test_is_directory(self):
743
779
r"^ +[0-9]+ms\*$")
745
781
def test_unittest_reporting_unittest_class(self):
746
# getting the time from a non-bzrlib test works ok
782
# getting the time from a non-breezy test works ok
747
783
class ShortDelayTestCase(unittest.TestCase):
748
784
def test_short_delay(self):
749
785
time.sleep(0.003)
750
786
self.check_timing(ShortDelayTestCase('test_short_delay'),
753
def _patch_get_bzr_source_tree(self):
754
# Reading from the actual source tree breaks isolation, but we don't
755
# want to assume that thats *all* that would happen.
756
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
758
def test_assigned_benchmark_file_stores_date(self):
759
self._patch_get_bzr_source_tree()
761
result = bzrlib.tests.TextTestResult(self._log_file,
766
output_string = output.getvalue()
767
# if you are wondering about the regexp please read the comment in
768
# test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
# XXX: what comment? -- Andrew Bennetts
770
self.assertContainsRe(output_string, "--date [0-9.]+")
772
def test_benchhistory_records_test_times(self):
773
self._patch_get_bzr_source_tree()
774
result_stream = StringIO()
775
result = bzrlib.tests.TextTestResult(
779
bench_history=result_stream
782
# we want profile a call and check that its test duration is recorded
783
# make a new test instance that when run will generate a benchmark
784
example_test_case = TestTestResult("_time_hello_world_encoding")
785
# execute the test, which should succeed and record times
786
example_test_case.run(result)
787
lines = result_stream.getvalue().splitlines()
788
self.assertEqual(2, len(lines))
789
self.assertContainsRe(lines[1],
790
" *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
"._time_hello_world_encoding")
793
789
def _time_hello_world_encoding(self):
794
790
"""Profile two sleep calls
796
792
This is used to exercise the test framework.
798
self.time(unicode, 'hello', errors='replace')
799
self.time(unicode, 'world', errors='replace')
794
self.time(text_type, b'hello', errors='replace')
795
self.time(text_type, b'world', errors='replace')
801
797
def test_lsprofiling(self):
802
798
"""Verbose test result prints lsprof statistics from test cases."""
803
self.requireFeature(test_lsprof.LSProfFeature)
799
self.requireFeature(features.lsprof_feature)
804
800
result_stream = StringIO()
805
result = bzrlib.tests.VerboseTestResult(
806
unittest._WritelnDecorator(result_stream),
801
result = breezy.tests.VerboseTestResult(
826
822
# and then repeated but with 'world', rather than 'hello'.
827
823
# this should appear in the output stream of our test result.
828
824
output = result_stream.getvalue()
829
self.assertContainsRe(output,
830
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
826
self.assertContainsRe(output,
827
r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
828
self.assertContainsRe(output,
829
r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
831
self.assertContainsRe(output,
832
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
833
self.assertContainsRe(output,
834
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
831
835
self.assertContainsRe(output,
832
836
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
833
837
self.assertContainsRe(output,
834
838
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
835
self.assertContainsRe(output,
836
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
840
def test_uses_time_from_testtools(self):
841
"""Test case timings in verbose results should use testtools times"""
843
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
844
def startTest(self, test):
845
self.time(datetime.datetime.utcfromtimestamp(1.145))
846
super(TimeAddedVerboseTestResult, self).startTest(test)
847
def addSuccess(self, test):
848
self.time(datetime.datetime.utcfromtimestamp(51.147))
849
super(TimeAddedVerboseTestResult, self).addSuccess(test)
850
def report_tests_starting(self): pass
852
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
853
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
838
855
def test_known_failure(self):
839
"""A KnownFailure being raised should trigger several result actions."""
856
"""Using knownFailure should trigger several result actions."""
840
857
class InstrumentedTestResult(tests.ExtendedTestResult):
841
858
def stopTestRun(self): pass
842
def startTests(self): pass
843
def report_test_start(self, test): pass
859
def report_tests_starting(self): pass
844
860
def report_known_failure(self, test, err=None, details=None):
845
861
self._call = test, 'known failure'
846
862
result = InstrumentedTestResult(None, None, None, None)
847
863
class Test(tests.TestCase):
848
864
def test_function(self):
849
raise tests.KnownFailure('failed!')
865
self.knownFailure('failed!')
850
866
test = Test("test_function")
852
868
# it should invoke 'report_known_failure'.
1643
1678
class Test(tests.TestCase):
1645
1680
def setUp(self):
1646
tests.TestCase.setUp(self)
1681
super(Test, self).setUp()
1647
1682
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1684
def test_value(self):
1650
1685
self.assertEqual('original', self.orig)
1651
1686
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1688
self._run_successful_test(Test('test_value'))
1655
1689
self.assertEqual('original', obj.test_attr)
1691
def test_overrideAttr_with_no_existing_value_and_value(self):
1692
# Do not define the test_attribute
1693
obj = self # Make 'obj' visible to the embedded test
1694
class Test(tests.TestCase):
1697
tests.TestCase.setUp(self)
1698
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1700
def test_value(self):
1701
self.assertEqual(tests._unitialized_attr, self.orig)
1702
self.assertEqual('modified', obj.test_attr)
1704
self._run_successful_test(Test('test_value'))
1705
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1707
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1708
# Do not define the test_attribute
1709
obj = self # Make 'obj' visible to the embedded test
1710
class Test(tests.TestCase):
1713
tests.TestCase.setUp(self)
1714
self.orig = self.overrideAttr(obj, 'test_attr')
1716
def test_value(self):
1717
self.assertEqual(tests._unitialized_attr, self.orig)
1718
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1720
self._run_successful_test(Test('test_value'))
1721
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1723
def test_recordCalls(self):
1724
from breezy.tests import test_selftest
1725
calls = self.recordCalls(
1726
test_selftest, '_add_numbers')
1727
self.assertEqual(test_selftest._add_numbers(2, 10),
1729
self.assertEqual(calls, [((2, 10), {})])
1732
def _add_numbers(a, b):
1736
class _MissingFeature(features.Feature):
1739
missing_feature = _MissingFeature()
1742
def _get_test(name):
1743
"""Get an instance of a specific example test.
1745
We protect this in a function so that they don't auto-run in the test
1749
class ExampleTests(tests.TestCase):
1751
def test_fail(self):
1752
mutter('this was a failing test')
1753
self.fail('this test will fail')
1755
def test_error(self):
1756
mutter('this test errored')
1757
raise RuntimeError('gotcha')
1759
def test_missing_feature(self):
1760
mutter('missing the feature')
1761
self.requireFeature(missing_feature)
1763
def test_skip(self):
1764
mutter('this test will be skipped')
1765
raise tests.TestSkipped('reason')
1767
def test_success(self):
1768
mutter('this test succeeds')
1770
def test_xfail(self):
1771
mutter('test with expected failure')
1772
self.knownFailure('this_fails')
1774
def test_unexpected_success(self):
1775
mutter('test with unexpected success')
1776
self.expectFailure('should_fail', lambda: None)
1778
return ExampleTests(name)
1781
def _get_skip_reasons(result):
1782
# GZ 2017-06-06: Newer testtools doesn't have this, uses detail instead
1783
return result.skip_reasons
1786
class TestTestCaseLogDetails(tests.TestCase):
1788
def _run_test(self, test_name):
1789
test = _get_test(test_name)
1790
result = testtools.TestResult()
1794
def test_fail_has_log(self):
1795
result = self._run_test('test_fail')
1796
self.assertEqual(1, len(result.failures))
1797
result_content = result.failures[0][1]
1798
self.assertContainsRe(result_content,
1799
'(?m)^(?:Text attachment: )?log(?:$|: )')
1800
self.assertContainsRe(result_content, 'this was a failing test')
1802
def test_error_has_log(self):
1803
result = self._run_test('test_error')
1804
self.assertEqual(1, len(result.errors))
1805
result_content = result.errors[0][1]
1806
self.assertContainsRe(result_content,
1807
'(?m)^(?:Text attachment: )?log(?:$|: )')
1808
self.assertContainsRe(result_content, 'this test errored')
1810
def test_skip_has_no_log(self):
1811
result = self._run_test('test_skip')
1812
reasons = _get_skip_reasons(result)
1813
self.assertEqual({'reason'}, set(reasons))
1814
skips = reasons['reason']
1815
self.assertEqual(1, len(skips))
1817
self.assertFalse('log' in test.getDetails())
1819
def test_missing_feature_has_no_log(self):
1820
# testtools doesn't know about addNotSupported, so it just gets
1821
# considered as a skip
1822
result = self._run_test('test_missing_feature')
1823
reasons = _get_skip_reasons(result)
1824
self.assertEqual({str(missing_feature)}, set(reasons))
1825
skips = reasons[str(missing_feature)]
1826
self.assertEqual(1, len(skips))
1828
self.assertFalse('log' in test.getDetails())
1830
def test_xfail_has_no_log(self):
1831
result = self._run_test('test_xfail')
1832
self.assertEqual(1, len(result.expectedFailures))
1833
result_content = result.expectedFailures[0][1]
1834
self.assertNotContainsRe(result_content,
1835
'(?m)^(?:Text attachment: )?log(?:$|: )')
1836
self.assertNotContainsRe(result_content, 'test with expected failure')
1838
def test_unexpected_success_has_log(self):
1839
result = self._run_test('test_unexpected_success')
1840
self.assertEqual(1, len(result.unexpectedSuccesses))
1841
# Inconsistency, unexpectedSuccesses is a list of tests,
1842
# expectedFailures is a list of reasons?
1843
test = result.unexpectedSuccesses[0]
1844
details = test.getDetails()
1845
self.assertTrue('log' in details)
1848
class TestTestCloning(tests.TestCase):
1849
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1851
def test_cloned_testcase_does_not_share_details(self):
1852
"""A TestCase cloned with clone_test does not share mutable attributes
1853
such as details or cleanups.
1855
class Test(tests.TestCase):
1857
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1858
orig_test = Test('test_foo')
1859
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1860
orig_test.run(unittest.TestResult())
1861
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1862
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1864
def test_double_apply_scenario_preserves_first_scenario(self):
1865
"""Applying two levels of scenarios to a test preserves the attributes
1866
added by both scenarios.
1868
class Test(tests.TestCase):
1871
test = Test('test_foo')
1872
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1873
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1874
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1875
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1876
all_tests = list(tests.iter_suite_tests(suite))
1877
self.assertLength(4, all_tests)
1878
all_xys = sorted((t.x, t.y) for t in all_tests)
1879
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1882
# NB: Don't delete this; it's not actually from 0.11!
1659
1883
@deprecated_function(deprecated_in((0, 11, 0)))
1971
2199
load_list='missing file name', list_only=True)
2202
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2204
_test_needs_features = [features.subunit]
2206
def run_subunit_stream(self, test_name):
2207
from subunit import ProtocolTestCase
2209
return TestUtil.TestSuite([_get_test(test_name)])
2210
stream = self.run_selftest(
2211
runner_class=tests.SubUnitBzrRunnerv1,
2212
test_suite_factory=factory)
2213
test = ProtocolTestCase(stream)
2214
result = testtools.TestResult()
2216
content = stream.getvalue()
2217
return content, result
2219
def test_fail_has_log(self):
2220
content, result = self.run_subunit_stream('test_fail')
2221
self.assertEqual(1, len(result.failures))
2222
self.assertContainsRe(content, '(?m)^log$')
2223
self.assertContainsRe(content, 'this test will fail')
2225
def test_error_has_log(self):
2226
content, result = self.run_subunit_stream('test_error')
2227
self.assertContainsRe(content, '(?m)^log$')
2228
self.assertContainsRe(content, 'this test errored')
2230
def test_skip_has_no_log(self):
2231
content, result = self.run_subunit_stream('test_skip')
2232
self.assertNotContainsRe(content, '(?m)^log$')
2233
self.assertNotContainsRe(content, 'this test will be skipped')
2234
reasons = _get_skip_reasons(result)
2235
self.assertEqual({'reason'}, set(reasons))
2236
skips = reasons['reason']
2237
self.assertEqual(1, len(skips))
2239
# RemotedTestCase doesn't preserve the "details"
2240
## self.assertFalse('log' in test.getDetails())
2242
def test_missing_feature_has_no_log(self):
2243
content, result = self.run_subunit_stream('test_missing_feature')
2244
self.assertNotContainsRe(content, '(?m)^log$')
2245
self.assertNotContainsRe(content, 'missing the feature')
2246
reasons = _get_skip_reasons(result)
2247
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2248
skips = reasons['_MissingFeature\n']
2249
self.assertEqual(1, len(skips))
2251
# RemotedTestCase doesn't preserve the "details"
2252
## self.assertFalse('log' in test.getDetails())
2254
def test_xfail_has_no_log(self):
2255
content, result = self.run_subunit_stream('test_xfail')
2256
self.assertNotContainsRe(content, '(?m)^log$')
2257
self.assertNotContainsRe(content, 'test with expected failure')
2258
self.assertEqual(1, len(result.expectedFailures))
2259
result_content = result.expectedFailures[0][1]
2260
self.assertNotContainsRe(result_content,
2261
'(?m)^(?:Text attachment: )?log(?:$|: )')
2262
self.assertNotContainsRe(result_content, 'test with expected failure')
2264
def test_unexpected_success_has_log(self):
2265
content, result = self.run_subunit_stream('test_unexpected_success')
2266
self.assertContainsRe(content, '(?m)^log$')
2267
self.assertContainsRe(content, 'test with unexpected success')
2268
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2269
# success, if a min version check is added remove this
2270
from subunit import TestProtocolClient as _Client
2271
if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
2272
self.expectFailure('subunit treats "unexpectedSuccess"'
2273
' as a plain success',
2274
self.assertEqual, 1, len(result.unexpectedSuccesses))
2275
self.assertEqual(1, len(result.unexpectedSuccesses))
2276
test = result.unexpectedSuccesses[0]
2277
# RemotedTestCase doesn't preserve the "details"
2278
## self.assertTrue('log' in test.getDetails())
2280
def test_success_has_no_log(self):
2281
content, result = self.run_subunit_stream('test_success')
2282
self.assertEqual(1, result.testsRun)
2283
self.assertNotContainsRe(content, '(?m)^log$')
2284
self.assertNotContainsRe(content, 'this test succeeds')
1974
2287
class TestRunBzr(tests.TestCase):
2264
2577
class TestStartBzrSubProcess(tests.TestCase):
2578
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2580
def _subprocess_log_cleanup(self):
2581
"""Inhibits the base version as we don't produce a log file."""
2269
2583
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2584
"""Override the base version to record the command that is run.
2586
From there we can ensure it is correct without spawning a real process.
2271
2588
self.check_popen_state()
2272
2589
self._popen_args = args
2273
2590
self._popen_kwargs = kwargs
2274
2591
raise _DontSpawnProcess()
2593
def check_popen_state(self):
2594
"""Replace to make assertions when popen is called."""
2276
2596
def test_run_bzr_subprocess_no_plugins(self):
2277
2597
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2598
command = self._popen_args[0]
2279
2599
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2600
self.assertEqual(self.get_brz_path(), command[1])
2281
2601
self.assertEqual(['--no-plugins'], command[2:])
2283
2603
def test_allow_plugins(self):
2284
2604
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2606
command = self._popen_args[0]
2287
2607
self.assertEqual([], command[2:])
2289
2609
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2610
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2611
# set in the child
2292
2612
def check_environment():
2293
2613
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2614
self.check_popen_state = check_environment
2295
2615
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2616
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2297
2617
# not set in theparent
2298
2618
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2620
def test_run_bzr_subprocess_env_del(self):
2301
2621
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2622
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2623
def check_environment():
2304
2624
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2625
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2626
self.check_popen_state = check_environment
2307
2627
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2628
env_changes={'EXISTANT_ENV_VAR':None})
2309
2629
# Still set in parent
2310
2630
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2631
del os.environ['EXISTANT_ENV_VAR']
2313
2633
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2634
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2635
def check_environment():
2316
2636
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2637
self.check_popen_state = check_environment
2318
2638
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2639
env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2641
def test_working_dir(self):
2322
2642
"""Test that we can specify the working dir for the child"""
2350
2669
self.disable_missing_extensions_warning()
2351
2670
process = self.start_bzr_subprocess(['wait-until-signalled'],
2352
2671
skip_if_plan_to_signal=True)
2353
self.assertEqual('running\n', process.stdout.readline())
2672
self.assertEqual(b'running\n', process.stdout.readline())
2354
2673
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2675
self.assertEqual(b'', result[0])
2676
self.assertEqual(b'brz: interrupted\n', result[1])
2434
2679
class TestSelftestFiltering(tests.TestCase):
2436
2681
def setUp(self):
2437
tests.TestCase.setUp(self)
2682
super(TestSelftestFiltering, self).setUp()
2438
2683
self.suite = TestUtil.TestSuite()
2439
2684
self.loader = TestUtil.TestLoader()
2440
2685
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2686
sys.modules['breezy.tests.test_selftest']))
2442
2687
self.all_names = _test_ids(self.suite)
2444
2689
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2690
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2446
2691
'test_condition_id_re')
2447
2692
filtered_suite = tests.filter_suite_by_condition(
2448
2693
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2694
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2696
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2697
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2453
2698
'test_condition_id_in_list']
2454
2699
id_list = tests.TestIdList(test_names)
2455
2700
filtered_suite = tests.filter_suite_by_condition(
2953
3203
def test_predefined_prefixes(self):
2954
3204
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3205
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3206
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3207
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3208
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3209
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3210
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3213
class TestThreadLeakDetection(tests.TestCase):
3214
"""Ensure when tests leak threads we detect and report it"""
3216
class LeakRecordingResult(tests.ExtendedTestResult):
3218
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3220
def _report_thread_leak(self, test, leaks, alive):
3221
self.leaks.append((test, leaks))
3223
def test_testcase_without_addCleanups(self):
3224
"""Check old TestCase instances don't break with leak detection"""
3225
class Test(unittest.TestCase):
3228
result = self.LeakRecordingResult()
3230
result.startTestRun()
3232
result.stopTestRun()
3233
self.assertEqual(result._tests_leaking_threads_count, 0)
3234
self.assertEqual(result.leaks, [])
3236
def test_thread_leak(self):
3237
"""Ensure a thread that outlives the running of a test is reported
3239
Uses a thread that blocks on an event, and is started by the inner
3240
test case. As the thread outlives the inner case's run, it should be
3241
detected as a leak, but the event is then set so that the thread can
3242
be safely joined in cleanup so it's not leaked for real.
3244
event = threading.Event()
3245
thread = threading.Thread(name="Leaker", target=event.wait)
3246
class Test(tests.TestCase):
3247
def test_leak(self):
3249
result = self.LeakRecordingResult()
3250
test = Test("test_leak")
3251
self.addCleanup(thread.join)
3252
self.addCleanup(event.set)
3253
result.startTestRun()
3255
result.stopTestRun()
3256
self.assertEqual(result._tests_leaking_threads_count, 1)
3257
self.assertEqual(result._first_thread_leaker_id, test.id())
3258
self.assertEqual(result.leaks, [(test, {thread})])
3259
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3261
def test_multiple_leaks(self):
3262
"""Check multiple leaks are blamed on the test cases at fault
3264
Same concept as the previous test, but has one inner test method that
3265
leaks two threads, and one that doesn't leak at all.
3267
event = threading.Event()
3268
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3269
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3270
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3271
class Test(tests.TestCase):
3272
def test_first_leak(self):
3274
def test_second_no_leak(self):
3276
def test_third_leak(self):
3279
result = self.LeakRecordingResult()
3280
first_test = Test("test_first_leak")
3281
third_test = Test("test_third_leak")
3282
self.addCleanup(thread_a.join)
3283
self.addCleanup(thread_b.join)
3284
self.addCleanup(thread_c.join)
3285
self.addCleanup(event.set)
3286
result.startTestRun()
3288
[first_test, Test("test_second_no_leak"), third_test]
3290
result.stopTestRun()
3291
self.assertEqual(result._tests_leaking_threads_count, 2)
3292
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3293
self.assertEqual(result.leaks, [
3294
(first_test, {thread_b}),
3295
(third_test, {thread_a, thread_c})])
3296
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3299
class TestPostMortemDebugging(tests.TestCase):
3300
"""Check post mortem debugging works when tests fail or error"""
3302
class TracebackRecordingResult(tests.ExtendedTestResult):
3304
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3305
self.postcode = None
3306
def _post_mortem(self, tb=None):
3307
"""Record the code object at the end of the current traceback"""
3308
tb = tb or sys.exc_info()[2]
3311
while next is not None:
3314
self.postcode = tb.tb_frame.f_code
3315
def report_error(self, test, err):
3317
def report_failure(self, test, err):
3320
def test_location_unittest_error(self):
3321
"""Needs right post mortem traceback with erroring unittest case"""
3322
class Test(unittest.TestCase):
3325
result = self.TracebackRecordingResult()
3327
self.assertEqual(result.postcode, Test.runTest.__code__)
3329
def test_location_unittest_failure(self):
3330
"""Needs right post mortem traceback with failing unittest case"""
3331
class Test(unittest.TestCase):
3333
raise self.failureException
3334
result = self.TracebackRecordingResult()
3336
self.assertEqual(result.postcode, Test.runTest.__code__)
3338
def test_location_bt_error(self):
3339
"""Needs right post mortem traceback with erroring breezy.tests case"""
3340
class Test(tests.TestCase):
3341
def test_error(self):
3343
result = self.TracebackRecordingResult()
3344
Test("test_error").run(result)
3345
self.assertEqual(result.postcode, Test.test_error.__code__)
3347
def test_location_bt_failure(self):
3348
"""Needs right post mortem traceback with failing breezy.tests case"""
3349
class Test(tests.TestCase):
3350
def test_failure(self):
3351
raise self.failureException
3352
result = self.TracebackRecordingResult()
3353
Test("test_failure").run(result)
3354
self.assertEqual(result.postcode, Test.test_failure.__code__)
3356
def test_env_var_triggers_post_mortem(self):
3357
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3359
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3360
post_mortem_calls = []
3361
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3362
self.overrideEnv('BRZ_TEST_PDB', None)
3363
result._post_mortem(1)
3364
self.overrideEnv('BRZ_TEST_PDB', 'on')
3365
result._post_mortem(2)
3366
self.assertEqual([2], post_mortem_calls)
2963
3369
class TestRunSuite(tests.TestCase):
2976
3382
self.verbosity)
2977
3383
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3384
self.assertLength(1, calls)
3387
class _Selftest(object):
3388
"""Mixin for tests needing full selftest output"""
3390
def _inject_stream_into_subunit(self, stream):
3391
"""To be overridden by subclasses that run tests out of process"""
3393
def _run_selftest(self, **kwargs):
3395
self._inject_stream_into_subunit(sio)
3396
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3397
return sio.getvalue()
3400
class _ForkedSelftest(_Selftest):
3401
"""Mixin for tests needing full selftest output with forked children"""
3403
_test_needs_features = [features.subunit]
3405
def _inject_stream_into_subunit(self, stream):
3406
"""Monkey-patch subunit so the extra output goes to stream not stdout
3408
Some APIs need rewriting so this kind of bogus hackery can be replaced
3409
by passing the stream param from run_tests down into ProtocolTestCase.
3411
from subunit import ProtocolTestCase
3412
_original_init = ProtocolTestCase.__init__
3413
def _init_with_passthrough(self, *args, **kwargs):
3414
_original_init(self, *args, **kwargs)
3415
self._passthrough = stream
3416
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3418
def _run_selftest(self, **kwargs):
3419
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3420
if getattr(os, "fork", None) is None:
3421
raise tests.TestNotApplicable("Platform doesn't support forking")
3422
# Make sure the fork code is actually invoked by claiming two cores
3423
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3424
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3425
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3428
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3429
"""Check operation of --parallel=fork selftest option"""
3431
def test_error_in_child_during_fork(self):
3432
"""Error in a forked child during test setup should get reported"""
3433
class Test(tests.TestCase):
3434
def testMethod(self):
3436
# We don't care what, just break something that a child will run
3437
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3438
out = self._run_selftest(test_suite_factory=Test)
3439
# Lines from the tracebacks of the two child processes may be mixed
3440
# together due to the way subunit parses and forwards the streams,
3441
# so permit extra lines between each part of the error output.
3442
self.assertContainsRe(out,
3445
".+ in fork_for_tests\n"
3447
"\\s*workaround_zealous_crypto_random\\(\\)\n"
3452
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3453
"""Check a test case still alive after being run emits a warning"""
3455
class Test(tests.TestCase):
3456
def test_pass(self):
3458
def test_self_ref(self):
3459
self.also_self = self.test_self_ref
3460
def test_skip(self):
3461
self.skipTest("Don't need")
3463
def _get_suite(self):
3464
return TestUtil.TestSuite([
3465
self.Test("test_pass"),
3466
self.Test("test_self_ref"),
3467
self.Test("test_skip"),
3470
def _run_selftest_with_suite(self, **kwargs):
3471
old_flags = tests.selftest_debug_flags
3472
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3473
gc_on = gc.isenabled()
3477
output = self._run_selftest(test_suite_factory=self._get_suite,
3482
tests.selftest_debug_flags = old_flags
3483
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3484
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3487
def test_testsuite(self):
3488
self._run_selftest_with_suite()
3490
def test_pattern(self):
3491
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3492
self.assertNotContainsRe(out, "test_skip")
3494
def test_exclude_pattern(self):
3495
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3496
self.assertNotContainsRe(out, "test_skip")
3498
def test_random_seed(self):
3499
self._run_selftest_with_suite(random_seed="now")
3501
def test_matching_tests_first(self):
3502
self._run_selftest_with_suite(matching_tests_first=True,
3503
pattern="test_self_ref$")
3505
def test_starting_with_and_exclude(self):
3506
out = self._run_selftest_with_suite(starting_with=["bt."],
3507
exclude_pattern="test_skip$")
3508
self.assertNotContainsRe(out, "test_skip")
3510
def test_additonal_decorator(self):
3511
out = self._run_selftest_with_suite(
3512
suite_decorators=[tests.TestDecorator])
3515
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3516
"""Check warnings from tests staying alive are emitted with subunit"""
3518
_test_needs_features = [features.subunit]
3520
def _run_selftest_with_suite(self, **kwargs):
3521
return TestUncollectedWarnings._run_selftest_with_suite(
3522
self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3525
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3526
"""Check warnings from tests staying alive are emitted when forking"""
3529
class TestEnvironHandling(tests.TestCase):
3531
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3532
self.assertFalse('MYVAR' in os.environ)
3533
self.overrideEnv('MYVAR', '42')
3534
# We use an embedded test to make sure we fix the _captureVar bug
3535
class Test(tests.TestCase):
3537
# The first call save the 42 value
3538
self.overrideEnv('MYVAR', None)
3539
self.assertEqual(None, os.environ.get('MYVAR'))
3540
# Make sure we can call it twice
3541
self.overrideEnv('MYVAR', None)
3542
self.assertEqual(None, os.environ.get('MYVAR'))
3544
result = tests.TextTestResult(output, 0, 1)
3545
Test('test_me').run(result)
3546
if not result.wasStrictlySuccessful():
3547
self.fail(output.getvalue())
3548
# We get our value back
3549
self.assertEqual('42', os.environ.get('MYVAR'))
3552
class TestIsolatedEnv(tests.TestCase):
3553
"""Test isolating tests from os.environ.
3555
Since we use tests that are already isolated from os.environ a bit of care
3556
should be taken when designing the tests to avoid bootstrap side-effects.
3557
The tests start an already clean os.environ which allow doing valid
3558
assertions about which variables are present or not and design tests around
3562
class ScratchMonkey(tests.TestCase):
3567
def test_basics(self):
3568
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3569
# for tests.TestCase.
3570
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3571
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3572
# Being part of isolated_environ, BRZ_HOME should not appear here
3573
self.assertFalse('BRZ_HOME' in os.environ)
3574
# Make sure we know the definition of LINES: part of os.environ for
3576
self.assertTrue('LINES' in tests.isolated_environ)
3577
self.assertEqual('25', tests.isolated_environ['LINES'])
3578
self.assertEqual('25', os.environ['LINES'])
3580
def test_injecting_unknown_variable(self):
3581
# BRZ_HOME is known to be absent from os.environ
3582
test = self.ScratchMonkey('test_me')
3583
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3584
self.assertEqual('foo', os.environ['BRZ_HOME'])
3585
tests.restore_os_environ(test)
3586
self.assertFalse('BRZ_HOME' in os.environ)
3588
def test_injecting_known_variable(self):
3589
test = self.ScratchMonkey('test_me')
3590
# LINES is known to be present in os.environ
3591
tests.override_os_environ(test, {'LINES': '42'})
3592
self.assertEqual('42', os.environ['LINES'])
3593
tests.restore_os_environ(test)
3594
self.assertEqual('25', os.environ['LINES'])
3596
def test_deleting_variable(self):
3597
test = self.ScratchMonkey('test_me')
3598
# LINES is known to be present in os.environ
3599
tests.override_os_environ(test, {'LINES': None})
3600
self.assertTrue('LINES' not in os.environ)
3601
tests.restore_os_environ(test)
3602
self.assertEqual('25', os.environ['LINES'])
3605
class TestDocTestSuiteIsolation(tests.TestCase):
3606
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3608
Since tests.TestCase alreay provides an isolation from os.environ, we use
3609
the clean environment as a base for testing. To precisely capture the
3610
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3613
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3614
not `os.environ` so each test overrides it to suit its needs.
3618
def get_doctest_suite_for_string(self, klass, string):
3619
class Finder(doctest.DocTestFinder):
3621
def find(*args, **kwargs):
3622
test = doctest.DocTestParser().get_doctest(
3623
string, {}, 'foo', 'foo.py', 0)
3626
suite = klass(test_finder=Finder())
3629
def run_doctest_suite_for_string(self, klass, string):
3630
suite = self.get_doctest_suite_for_string(klass, string)
3632
result = tests.TextTestResult(output, 0, 1)
3634
return result, output
3636
def assertDocTestStringSucceds(self, klass, string):
3637
result, output = self.run_doctest_suite_for_string(klass, string)
3638
if not result.wasStrictlySuccessful():
3639
self.fail(output.getvalue())
3641
def assertDocTestStringFails(self, klass, string):
3642
result, output = self.run_doctest_suite_for_string(klass, string)
3643
if result.wasStrictlySuccessful():
3644
self.fail(output.getvalue())
3646
def test_injected_variable(self):
3647
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3650
>>> os.environ['LINES']
3653
# doctest.DocTestSuite fails as it sees '25'
3654
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3655
# tests.DocTestSuite sees '42'
3656
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3658
def test_deleted_variable(self):
3659
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3662
>>> os.environ.get('LINES')
3664
# doctest.DocTestSuite fails as it sees '25'
3665
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3666
# tests.DocTestSuite sees None
3667
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3670
class TestSelftestExcludePatterns(tests.TestCase):
3673
super(TestSelftestExcludePatterns, self).setUp()
3674
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3676
def suite_factory(self, keep_only=None, starting_with=None):
3677
"""A test suite factory with only a few tests."""
3678
class Test(tests.TestCase):
3680
# We don't need the full class path
3681
return self._testMethodName
3688
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3690
def assertTestList(self, expected, *selftest_args):
3691
# We rely on setUp installing the right test suite factory so we can
3692
# test at the command level without loading the whole test suite
3693
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3694
actual = out.splitlines()
3695
self.assertEqual(expected, actual)
3697
def test_full_list(self):
3698
self.assertTestList(['a', 'b', 'c'])
3700
def test_single_exclude(self):
3701
self.assertTestList(['b', 'c'], '-x', 'a')
3703
def test_mutiple_excludes(self):
3704
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3707
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3709
_test_needs_features = [features.subunit]
3712
super(TestCounterHooks, self).setUp()
3713
class Test(tests.TestCase):
3716
super(Test, self).setUp()
3717
self.hooks = hooks.Hooks()
3718
self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3719
self.install_counter_hook(self.hooks, 'myhook')
3724
def run_hook_once(self):
3725
for hook in self.hooks['myhook']:
3728
self.test_class = Test
3730
def assertHookCalls(self, expected_calls, test_name):
3731
test = self.test_class(test_name)
3732
result = unittest.TestResult()
3734
self.assertTrue(hasattr(test, '_counters'))
3735
self.assertTrue('myhook' in test._counters)
3736
self.assertEqual(expected_calls, test._counters['myhook'])
3738
def test_no_hook(self):
3739
self.assertHookCalls(0, 'no_hook')
3741
def test_run_hook_once(self):
3742
tt = features.testtools
3743
if tt.module.__version__ < (0, 9, 8):
3744
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3745
self.assertHookCalls(1, 'run_hook_once')