333
339
def test_scenarios(self):
334
340
# check that constructor parameters are passed through to the adapted
336
from bzrlib.tests.per_workingtree import make_scenarios
342
from .per_workingtree import make_scenarios
339
formats = [workingtree.WorkingTreeFormat2(),
340
workingtree.WorkingTreeFormat3(),]
341
scenarios = make_scenarios(server1, server2, formats)
345
formats = [workingtree_4.WorkingTreeFormat4(),
346
workingtree_3.WorkingTreeFormat3(),
347
workingtree_4.WorkingTreeFormat6()]
348
scenarios = make_scenarios(server1, server2, formats,
349
remote_server='c', remote_readonly_server='d',
350
remote_backing_server='e')
342
351
self.assertEqual([
343
('WorkingTreeFormat2',
344
{'bzrdir_format': formats[0]._matchingbzrdir,
352
('WorkingTreeFormat4',
353
{'bzrdir_format': formats[0]._matchingcontroldir,
345
354
'transport_readonly_server': 'b',
346
355
'transport_server': 'a',
347
356
'workingtree_format': formats[0]}),
348
357
('WorkingTreeFormat3',
349
{'bzrdir_format': formats[1]._matchingbzrdir,
350
'transport_readonly_server': 'b',
351
'transport_server': 'a',
352
'workingtree_format': formats[1]})],
358
{'bzrdir_format': formats[1]._matchingcontroldir,
359
'transport_readonly_server': 'b',
360
'transport_server': 'a',
361
'workingtree_format': formats[1]}),
362
('WorkingTreeFormat6',
363
{'bzrdir_format': formats[2]._matchingcontroldir,
364
'transport_readonly_server': 'b',
365
'transport_server': 'a',
366
'workingtree_format': formats[2]}),
367
('WorkingTreeFormat6,remote',
368
{'bzrdir_format': formats[2]._matchingcontroldir,
369
'repo_is_remote': True,
370
'transport_readonly_server': 'd',
371
'transport_server': 'c',
372
'vfs_transport_factory': 'e',
373
'workingtree_format': formats[2]}),
356
377
class TestTreeScenarios(tests.TestCase):
358
379
def test_scenarios(self):
359
380
# the tree implementation scenario generator is meant to setup one
360
# instance for each working tree format, and one additional instance
381
# instance for each working tree format, one additional instance
361
382
# that will use the default wt format, but create a revision tree for
362
# the tests. this means that the wt ones should have the
363
# workingtree_to_test_tree attribute set to 'return_parameter' and the
364
# revision one set to revision_tree_from_workingtree.
383
# the tests, and one more that uses the default wt format as a
384
# lightweight checkout of a remote repository. This means that the wt
385
# ones should have the workingtree_to_test_tree attribute set to
386
# 'return_parameter' and the revision one set to
387
# revision_tree_from_workingtree.
366
from bzrlib.tests.per_tree import (
389
from .per_tree import (
367
390
_dirstate_tree_from_workingtree,
369
392
preview_tree_pre,
376
formats = [workingtree.WorkingTreeFormat2(),
377
workingtree.WorkingTreeFormat3(),]
399
smart_server = test_server.SmartTCPServer_for_testing
400
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
401
mem_server = memory.MemoryServer
402
formats = [workingtree_4.WorkingTreeFormat4(),
403
workingtree_3.WorkingTreeFormat3(),]
378
404
scenarios = make_scenarios(server1, server2, formats)
379
self.assertEqual(7, len(scenarios))
380
default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
wt4_format = workingtree.WorkingTreeFormat4()
382
wt5_format = workingtree.WorkingTreeFormat5()
405
self.assertEqual(8, len(scenarios))
406
default_wt_format = workingtree.format_registry.get_default()
407
wt4_format = workingtree_4.WorkingTreeFormat4()
408
wt5_format = workingtree_4.WorkingTreeFormat5()
409
wt6_format = workingtree_4.WorkingTreeFormat6()
383
410
expected_scenarios = [
384
('WorkingTreeFormat2',
385
{'bzrdir_format': formats[0]._matchingbzrdir,
411
('WorkingTreeFormat4',
412
{'bzrdir_format': formats[0]._matchingcontroldir,
386
413
'transport_readonly_server': 'b',
387
414
'transport_server': 'a',
388
415
'workingtree_format': formats[0],
389
416
'_workingtree_to_test_tree': return_parameter,
391
418
('WorkingTreeFormat3',
392
{'bzrdir_format': formats[1]._matchingbzrdir,
419
{'bzrdir_format': formats[1]._matchingcontroldir,
393
420
'transport_readonly_server': 'b',
394
421
'transport_server': 'a',
395
422
'workingtree_format': formats[1],
396
423
'_workingtree_to_test_tree': return_parameter,
425
('WorkingTreeFormat6,remote',
426
{'bzrdir_format': wt6_format._matchingcontroldir,
427
'repo_is_remote': True,
428
'transport_readonly_server': smart_readonly_server,
429
'transport_server': smart_server,
430
'vfs_transport_factory': mem_server,
431
'workingtree_format': wt6_format,
432
'_workingtree_to_test_tree': return_parameter,
399
435
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
400
'bzrdir_format': default_wt_format._matchingbzrdir,
436
'bzrdir_format': default_wt_format._matchingcontroldir,
401
437
'transport_readonly_server': 'b',
402
438
'transport_server': 'a',
403
439
'workingtree_format': default_wt_format,
405
441
('DirStateRevisionTree,WT4',
406
442
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
407
'bzrdir_format': wt4_format._matchingbzrdir,
443
'bzrdir_format': wt4_format._matchingcontroldir,
408
444
'transport_readonly_server': 'b',
409
445
'transport_server': 'a',
410
446
'workingtree_format': wt4_format,
412
448
('DirStateRevisionTree,WT5',
413
449
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
414
'bzrdir_format': wt5_format._matchingbzrdir,
450
'bzrdir_format': wt5_format._matchingcontroldir,
415
451
'transport_readonly_server': 'b',
416
452
'transport_server': 'a',
417
453
'workingtree_format': wt5_format,
420
456
{'_workingtree_to_test_tree': preview_tree_pre,
421
'bzrdir_format': default_wt_format._matchingbzrdir,
457
'bzrdir_format': default_wt_format._matchingcontroldir,
422
458
'transport_readonly_server': 'b',
423
459
'transport_server': 'a',
424
460
'workingtree_format': default_wt_format}),
425
461
('PreviewTreePost',
426
462
{'_workingtree_to_test_tree': preview_tree_post,
427
'bzrdir_format': default_wt_format._matchingbzrdir,
463
'bzrdir_format': default_wt_format._matchingcontroldir,
428
464
'transport_readonly_server': 'b',
429
465
'transport_server': 'a',
430
466
'workingtree_format': default_wt_format}),
592
634
the_branch = builder.get_branch()
593
635
# Guard against regression into MemoryTransport leaking
594
636
# files to disk instead of keeping them in memory.
595
self.failIf(osutils.lexists('dir'))
596
dir_format = bzrdir.format_registry.make_bzrdir('knit')
637
self.assertFalse(osutils.lexists('dir'))
638
dir_format = controldir.format_registry.make_controldir('knit')
597
639
self.assertEqual(dir_format.repository_format.__class__,
598
640
the_branch.repository._format.__class__)
599
self.assertEqual('Bazaar-NG Knit Repository Format 1',
641
self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
600
642
self.get_transport().get_bytes(
601
643
'dir/.bzr/repository/format'))
603
645
def test_dangling_locks_cause_failures(self):
604
646
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
647
def test_function(self):
606
t = self.get_transport('.')
648
t = self.get_transport_from_path('.')
607
649
l = lockdir.LockDir(t, 'lock')
610
652
test = TestDanglingLock('test_function')
611
653
result = test.run()
654
total_failures = result.errors + result.failures
612
655
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
656
self.assertEqual(1, len(total_failures))
615
658
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
660
self.assertEqual(0, len(total_failures))
620
663
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
664
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
666
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
667
from ..transport.readonly import ReadonlyTransportDecorator
626
668
self.vfs_transport_factory = memory.MemoryServer
627
669
self.transport_readonly_server = None
628
670
# calling get_readonly_transport() constructs a decorator on the url
630
672
url = self.get_readonly_url()
631
673
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
674
t = transport.get_transport_from_url(url)
675
t2 = transport.get_transport_from_url(url2)
676
self.assertIsInstance(t, ReadonlyTransportDecorator)
677
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
678
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
680
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
681
from .http_server import HttpServer
682
from ..transport.http import HttpTransport
642
683
self.transport_server = test_server.LocalURLServer
643
684
self.transport_readonly_server = HttpServer
644
685
# calling get_readonly_transport() gives us a HTTP server instance.
645
686
url = self.get_readonly_url()
646
687
url2 = self.get_readonly_url('foo/bar')
647
688
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
689
t = transport.get_transport_from_url(url)
690
t2 = transport.get_transport_from_url(url2)
691
self.assertIsInstance(t, HttpTransport)
692
self.assertIsInstance(t2, HttpTransport)
652
693
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
695
def test_is_directory(self):
743
780
r"^ +[0-9]+ms\*$")
745
782
def test_unittest_reporting_unittest_class(self):
746
# getting the time from a non-bzrlib test works ok
783
# getting the time from a non-breezy test works ok
747
784
class ShortDelayTestCase(unittest.TestCase):
748
785
def test_short_delay(self):
749
786
time.sleep(0.003)
750
787
self.check_timing(ShortDelayTestCase('test_short_delay'),
753
def _patch_get_bzr_source_tree(self):
754
# Reading from the actual source tree breaks isolation, but we don't
755
# want to assume that thats *all* that would happen.
756
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
758
def test_assigned_benchmark_file_stores_date(self):
759
self._patch_get_bzr_source_tree()
761
result = bzrlib.tests.TextTestResult(self._log_file,
766
output_string = output.getvalue()
767
# if you are wondering about the regexp please read the comment in
768
# test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
# XXX: what comment? -- Andrew Bennetts
770
self.assertContainsRe(output_string, "--date [0-9.]+")
772
def test_benchhistory_records_test_times(self):
773
self._patch_get_bzr_source_tree()
774
result_stream = StringIO()
775
result = bzrlib.tests.TextTestResult(
779
bench_history=result_stream
782
# we want profile a call and check that its test duration is recorded
783
# make a new test instance that when run will generate a benchmark
784
example_test_case = TestTestResult("_time_hello_world_encoding")
785
# execute the test, which should succeed and record times
786
example_test_case.run(result)
787
lines = result_stream.getvalue().splitlines()
788
self.assertEqual(2, len(lines))
789
self.assertContainsRe(lines[1],
790
" *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
"._time_hello_world_encoding")
793
790
def _time_hello_world_encoding(self):
794
791
"""Profile two sleep calls
796
793
This is used to exercise the test framework.
798
self.time(unicode, 'hello', errors='replace')
799
self.time(unicode, 'world', errors='replace')
795
self.time(text_type, b'hello', errors='replace')
796
self.time(text_type, b'world', errors='replace')
801
798
def test_lsprofiling(self):
802
799
"""Verbose test result prints lsprof statistics from test cases."""
803
self.requireFeature(test_lsprof.LSProfFeature)
800
self.requireFeature(features.lsprof_feature)
804
801
result_stream = StringIO()
805
result = bzrlib.tests.VerboseTestResult(
806
unittest._WritelnDecorator(result_stream),
802
result = breezy.tests.VerboseTestResult(
826
823
# and then repeated but with 'world', rather than 'hello'.
827
824
# this should appear in the output stream of our test result.
828
825
output = result_stream.getvalue()
829
self.assertContainsRe(output,
830
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
827
self.assertContainsRe(output,
828
r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
829
self.assertContainsRe(output,
830
r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
832
self.assertContainsRe(output,
833
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
834
self.assertContainsRe(output,
835
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
831
836
self.assertContainsRe(output,
832
837
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
833
838
self.assertContainsRe(output,
834
839
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
835
self.assertContainsRe(output,
836
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
841
def test_uses_time_from_testtools(self):
842
"""Test case timings in verbose results should use testtools times"""
844
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
845
def startTest(self, test):
846
self.time(datetime.datetime.utcfromtimestamp(1.145))
847
super(TimeAddedVerboseTestResult, self).startTest(test)
848
def addSuccess(self, test):
849
self.time(datetime.datetime.utcfromtimestamp(51.147))
850
super(TimeAddedVerboseTestResult, self).addSuccess(test)
851
def report_tests_starting(self): pass
853
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
854
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
838
856
def test_known_failure(self):
839
"""A KnownFailure being raised should trigger several result actions."""
857
"""Using knownFailure should trigger several result actions."""
840
858
class InstrumentedTestResult(tests.ExtendedTestResult):
841
859
def stopTestRun(self): pass
842
def startTests(self): pass
843
def report_test_start(self, test): pass
860
def report_tests_starting(self): pass
844
861
def report_known_failure(self, test, err=None, details=None):
845
862
self._call = test, 'known failure'
846
863
result = InstrumentedTestResult(None, None, None, None)
847
864
class Test(tests.TestCase):
848
865
def test_function(self):
849
raise tests.KnownFailure('failed!')
866
self.knownFailure('failed!')
850
867
test = Test("test_function")
852
869
# it should invoke 'report_known_failure'.
1643
1684
class Test(tests.TestCase):
1645
1686
def setUp(self):
1646
tests.TestCase.setUp(self)
1687
super(Test, self).setUp()
1647
1688
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1690
def test_value(self):
1650
1691
self.assertEqual('original', self.orig)
1651
1692
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1694
self._run_successful_test(Test('test_value'))
1655
1695
self.assertEqual('original', obj.test_attr)
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1729
def test_recordCalls(self):
1730
from breezy.tests import test_selftest
1731
calls = self.recordCalls(
1732
test_selftest, '_add_numbers')
1733
self.assertEqual(test_selftest._add_numbers(2, 10),
1735
self.assertEqual(calls, [((2, 10), {})])
1738
def _add_numbers(a, b):
1742
class _MissingFeature(features.Feature):
1745
missing_feature = _MissingFeature()
1748
def _get_test(name):
1749
"""Get an instance of a specific example test.
1751
We protect this in a function so that they don't auto-run in the test
1755
class ExampleTests(tests.TestCase):
1757
def test_fail(self):
1758
mutter('this was a failing test')
1759
self.fail('this test will fail')
1761
def test_error(self):
1762
mutter('this test errored')
1763
raise RuntimeError('gotcha')
1765
def test_missing_feature(self):
1766
mutter('missing the feature')
1767
self.requireFeature(missing_feature)
1769
def test_skip(self):
1770
mutter('this test will be skipped')
1771
raise tests.TestSkipped('reason')
1773
def test_success(self):
1774
mutter('this test succeeds')
1776
def test_xfail(self):
1777
mutter('test with expected failure')
1778
self.knownFailure('this_fails')
1780
def test_unexpected_success(self):
1781
mutter('test with unexpected success')
1782
self.expectFailure('should_fail', lambda: None)
1784
return ExampleTests(name)
1787
class TestTestCaseLogDetails(tests.TestCase):
1789
def _run_test(self, test_name):
1790
test = _get_test(test_name)
1791
result = testtools.TestResult()
1795
def test_fail_has_log(self):
1796
result = self._run_test('test_fail')
1797
self.assertEqual(1, len(result.failures))
1798
result_content = result.failures[0][1]
1799
self.assertContainsRe(result_content,
1800
'(?m)^(?:Text attachment: )?log(?:$|: )')
1801
self.assertContainsRe(result_content, 'this was a failing test')
1803
def test_error_has_log(self):
1804
result = self._run_test('test_error')
1805
self.assertEqual(1, len(result.errors))
1806
result_content = result.errors[0][1]
1807
self.assertContainsRe(result_content,
1808
'(?m)^(?:Text attachment: )?log(?:$|: )')
1809
self.assertContainsRe(result_content, 'this test errored')
1811
def test_skip_has_no_log(self):
1812
result = self._run_test('test_skip')
1813
reasons = result.skip_reasons
1814
self.assertEqual({'reason'}, set(reasons))
1815
skips = reasons['reason']
1816
self.assertEqual(1, len(skips))
1818
self.assertFalse('log' in test.getDetails())
1820
def test_missing_feature_has_no_log(self):
1821
# testtools doesn't know about addNotSupported, so it just gets
1822
# considered as a skip
1823
result = self._run_test('test_missing_feature')
1824
reasons = result.skip_reasons
1825
self.assertEqual({str(missing_feature)}, set(reasons))
1826
skips = reasons[str(missing_feature)]
1827
self.assertEqual(1, len(skips))
1829
self.assertFalse('log' in test.getDetails())
1831
def test_xfail_has_no_log(self):
1832
result = self._run_test('test_xfail')
1833
self.assertEqual(1, len(result.expectedFailures))
1834
result_content = result.expectedFailures[0][1]
1835
self.assertNotContainsRe(result_content,
1836
'(?m)^(?:Text attachment: )?log(?:$|: )')
1837
self.assertNotContainsRe(result_content, 'test with expected failure')
1839
def test_unexpected_success_has_log(self):
1840
result = self._run_test('test_unexpected_success')
1841
self.assertEqual(1, len(result.unexpectedSuccesses))
1842
# Inconsistency, unexpectedSuccesses is a list of tests,
1843
# expectedFailures is a list of reasons?
1844
test = result.unexpectedSuccesses[0]
1845
details = test.getDetails()
1846
self.assertTrue('log' in details)
1849
class TestTestCloning(tests.TestCase):
1850
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1852
def test_cloned_testcase_does_not_share_details(self):
1853
"""A TestCase cloned with clone_test does not share mutable attributes
1854
such as details or cleanups.
1856
class Test(tests.TestCase):
1858
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1859
orig_test = Test('test_foo')
1860
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1861
orig_test.run(unittest.TestResult())
1862
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1863
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1865
def test_double_apply_scenario_preserves_first_scenario(self):
1866
"""Applying two levels of scenarios to a test preserves the attributes
1867
added by both scenarios.
1869
class Test(tests.TestCase):
1872
test = Test('test_foo')
1873
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1874
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1875
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1876
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1877
all_tests = list(tests.iter_suite_tests(suite))
1878
self.assertLength(4, all_tests)
1879
all_xys = sorted((t.x, t.y) for t in all_tests)
1880
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1883
# NB: Don't delete this; it's not actually from 0.11!
1659
1884
@deprecated_function(deprecated_in((0, 11, 0)))
1971
2209
load_list='missing file name', list_only=True)
2212
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2214
_test_needs_features = [features.subunit]
2216
def run_subunit_stream(self, test_name):
2217
from subunit import ProtocolTestCase
2219
return TestUtil.TestSuite([_get_test(test_name)])
2220
stream = self.run_selftest(
2221
runner_class=tests.SubUnitBzrRunnerv1,
2222
test_suite_factory=factory)
2223
test = ProtocolTestCase(stream)
2224
result = testtools.TestResult()
2226
content = stream.getvalue()
2227
return content, result
2229
def test_fail_has_log(self):
2230
content, result = self.run_subunit_stream('test_fail')
2231
self.assertEqual(1, len(result.failures))
2232
self.assertContainsRe(content, b'(?m)^log$')
2233
self.assertContainsRe(content, b'this test will fail')
2235
def test_error_has_log(self):
2236
content, result = self.run_subunit_stream('test_error')
2237
self.assertContainsRe(content, b'(?m)^log$')
2238
self.assertContainsRe(content, b'this test errored')
2240
def test_skip_has_no_log(self):
2241
content, result = self.run_subunit_stream('test_skip')
2242
self.assertNotContainsRe(content, b'(?m)^log$')
2243
self.assertNotContainsRe(content, b'this test will be skipped')
2244
reasons = result.skip_reasons
2245
self.assertEqual({'reason'}, set(reasons))
2246
skips = reasons['reason']
2247
self.assertEqual(1, len(skips))
2249
# RemotedTestCase doesn't preserve the "details"
2250
## self.assertFalse('log' in test.getDetails())
2252
def test_missing_feature_has_no_log(self):
2253
content, result = self.run_subunit_stream('test_missing_feature')
2254
self.assertNotContainsRe(content, b'(?m)^log$')
2255
self.assertNotContainsRe(content, b'missing the feature')
2256
reasons = result.skip_reasons
2257
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2258
skips = reasons['_MissingFeature\n']
2259
self.assertEqual(1, len(skips))
2261
# RemotedTestCase doesn't preserve the "details"
2262
## self.assertFalse('log' in test.getDetails())
2264
def test_xfail_has_no_log(self):
2265
content, result = self.run_subunit_stream('test_xfail')
2266
self.assertNotContainsRe(content, b'(?m)^log$')
2267
self.assertNotContainsRe(content, b'test with expected failure')
2268
self.assertEqual(1, len(result.expectedFailures))
2269
result_content = result.expectedFailures[0][1]
2270
self.assertNotContainsRe(result_content,
2271
'(?m)^(?:Text attachment: )?log(?:$|: )')
2272
self.assertNotContainsRe(result_content, 'test with expected failure')
2274
def test_unexpected_success_has_log(self):
2275
content, result = self.run_subunit_stream('test_unexpected_success')
2276
self.assertContainsRe(content, b'(?m)^log$')
2277
self.assertContainsRe(content, b'test with unexpected success')
2278
self.assertEqual(1, len(result.unexpectedSuccesses))
2279
test = result.unexpectedSuccesses[0]
2280
# RemotedTestCase doesn't preserve the "details"
2281
## self.assertTrue('log' in test.getDetails())
2283
def test_success_has_no_log(self):
2284
content, result = self.run_subunit_stream('test_success')
2285
self.assertEqual(1, result.testsRun)
2286
self.assertNotContainsRe(content, b'(?m)^log$')
2287
self.assertNotContainsRe(content, b'this test succeeds')
1974
2290
class TestRunBzr(tests.TestCase):
1979
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2296
def _run_bzr_core(self, argv, encoding=None, stdin=None,
2297
stdout=None, stderr=None, working_dir=None):
1981
2298
"""Override _run_bzr_core to test how it is invoked by run_bzr.
1983
2300
Attempts to run bzr from inside this class don't actually run it.
2264
2564
class TestStartBzrSubProcess(tests.TestCase):
2565
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2567
def _subprocess_log_cleanup(self):
2568
"""Inhibits the base version as we don't produce a log file."""
2269
2570
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2571
"""Override the base version to record the command that is run.
2573
From there we can ensure it is correct without spawning a real process.
2271
2575
self.check_popen_state()
2272
2576
self._popen_args = args
2273
2577
self._popen_kwargs = kwargs
2274
2578
raise _DontSpawnProcess()
2580
def check_popen_state(self):
2581
"""Replace to make assertions when popen is called."""
2276
2583
def test_run_bzr_subprocess_no_plugins(self):
2277
2584
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2585
command = self._popen_args[0]
2279
2586
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2587
self.assertEqual(self.get_brz_path(), command[1])
2281
2588
self.assertEqual(['--no-plugins'], command[2:])
2283
2590
def test_allow_plugins(self):
2284
2591
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2593
command = self._popen_args[0]
2287
2594
self.assertEqual([], command[2:])
2289
2596
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2597
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2598
# set in the child
2292
2599
def check_environment():
2293
2600
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2601
self.check_popen_state = check_environment
2295
2602
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2603
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2297
2604
# not set in theparent
2298
2605
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2607
def test_run_bzr_subprocess_env_del(self):
2301
2608
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2609
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2610
def check_environment():
2304
2611
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2612
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2613
self.check_popen_state = check_environment
2307
2614
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2615
env_changes={'EXISTANT_ENV_VAR':None})
2309
2616
# Still set in parent
2310
2617
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2618
del os.environ['EXISTANT_ENV_VAR']
2313
2620
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2621
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2622
def check_environment():
2316
2623
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2624
self.check_popen_state = check_environment
2318
2625
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2626
env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2628
def test_working_dir(self):
2322
2629
"""Test that we can specify the working dir for the child"""
2350
2656
self.disable_missing_extensions_warning()
2351
2657
process = self.start_bzr_subprocess(['wait-until-signalled'],
2352
2658
skip_if_plan_to_signal=True)
2353
self.assertEqual('running\n', process.stdout.readline())
2659
self.assertEqual(b'running\n', process.stdout.readline())
2354
2660
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2662
self.assertEqual(b'', result[0])
2663
self.assertEqual(b'brz: interrupted\n', result[1])
2434
2666
class TestSelftestFiltering(tests.TestCase):
2436
2668
def setUp(self):
2437
tests.TestCase.setUp(self)
2669
super(TestSelftestFiltering, self).setUp()
2438
2670
self.suite = TestUtil.TestSuite()
2439
2671
self.loader = TestUtil.TestLoader()
2440
2672
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2673
sys.modules['breezy.tests.test_selftest']))
2442
2674
self.all_names = _test_ids(self.suite)
2444
2676
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2677
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2446
2678
'test_condition_id_re')
2447
2679
filtered_suite = tests.filter_suite_by_condition(
2448
2680
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2681
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2683
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2684
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2453
2685
'test_condition_id_in_list']
2454
2686
id_list = tests.TestIdList(test_names)
2455
2687
filtered_suite = tests.filter_suite_by_condition(
2953
3190
def test_predefined_prefixes(self):
2954
3191
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3192
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3193
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3194
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3195
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3196
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3197
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3200
class TestThreadLeakDetection(tests.TestCase):
3201
"""Ensure when tests leak threads we detect and report it"""
3203
class LeakRecordingResult(tests.ExtendedTestResult):
3205
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3207
def _report_thread_leak(self, test, leaks, alive):
3208
self.leaks.append((test, leaks))
3210
def test_testcase_without_addCleanups(self):
3211
"""Check old TestCase instances don't break with leak detection"""
3212
class Test(unittest.TestCase):
3215
result = self.LeakRecordingResult()
3217
result.startTestRun()
3219
result.stopTestRun()
3220
self.assertEqual(result._tests_leaking_threads_count, 0)
3221
self.assertEqual(result.leaks, [])
3223
def test_thread_leak(self):
3224
"""Ensure a thread that outlives the running of a test is reported
3226
Uses a thread that blocks on an event, and is started by the inner
3227
test case. As the thread outlives the inner case's run, it should be
3228
detected as a leak, but the event is then set so that the thread can
3229
be safely joined in cleanup so it's not leaked for real.
3231
event = threading.Event()
3232
thread = threading.Thread(name="Leaker", target=event.wait)
3233
class Test(tests.TestCase):
3234
def test_leak(self):
3236
result = self.LeakRecordingResult()
3237
test = Test("test_leak")
3238
self.addCleanup(thread.join)
3239
self.addCleanup(event.set)
3240
result.startTestRun()
3242
result.stopTestRun()
3243
self.assertEqual(result._tests_leaking_threads_count, 1)
3244
self.assertEqual(result._first_thread_leaker_id, test.id())
3245
self.assertEqual(result.leaks, [(test, {thread})])
3246
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3248
def test_multiple_leaks(self):
3249
"""Check multiple leaks are blamed on the test cases at fault
3251
Same concept as the previous test, but has one inner test method that
3252
leaks two threads, and one that doesn't leak at all.
3254
event = threading.Event()
3255
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3256
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3257
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3258
class Test(tests.TestCase):
3259
def test_first_leak(self):
3261
def test_second_no_leak(self):
3263
def test_third_leak(self):
3266
result = self.LeakRecordingResult()
3267
first_test = Test("test_first_leak")
3268
third_test = Test("test_third_leak")
3269
self.addCleanup(thread_a.join)
3270
self.addCleanup(thread_b.join)
3271
self.addCleanup(thread_c.join)
3272
self.addCleanup(event.set)
3273
result.startTestRun()
3275
[first_test, Test("test_second_no_leak"), third_test]
3277
result.stopTestRun()
3278
self.assertEqual(result._tests_leaking_threads_count, 2)
3279
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3280
self.assertEqual(result.leaks, [
3281
(first_test, {thread_b}),
3282
(third_test, {thread_a, thread_c})])
3283
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3286
class TestPostMortemDebugging(tests.TestCase):
3287
"""Check post mortem debugging works when tests fail or error"""
3289
class TracebackRecordingResult(tests.ExtendedTestResult):
3291
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3292
self.postcode = None
3293
def _post_mortem(self, tb=None):
3294
"""Record the code object at the end of the current traceback"""
3295
tb = tb or sys.exc_info()[2]
3298
while next is not None:
3301
self.postcode = tb.tb_frame.f_code
3302
def report_error(self, test, err):
3304
def report_failure(self, test, err):
3307
def test_location_unittest_error(self):
3308
"""Needs right post mortem traceback with erroring unittest case"""
3309
class Test(unittest.TestCase):
3312
result = self.TracebackRecordingResult()
3314
self.assertEqual(result.postcode, Test.runTest.__code__)
3316
def test_location_unittest_failure(self):
3317
"""Needs right post mortem traceback with failing unittest case"""
3318
class Test(unittest.TestCase):
3320
raise self.failureException
3321
result = self.TracebackRecordingResult()
3323
self.assertEqual(result.postcode, Test.runTest.__code__)
3325
def test_location_bt_error(self):
3326
"""Needs right post mortem traceback with erroring breezy.tests case"""
3327
class Test(tests.TestCase):
3328
def test_error(self):
3330
result = self.TracebackRecordingResult()
3331
Test("test_error").run(result)
3332
self.assertEqual(result.postcode, Test.test_error.__code__)
3334
def test_location_bt_failure(self):
3335
"""Needs right post mortem traceback with failing breezy.tests case"""
3336
class Test(tests.TestCase):
3337
def test_failure(self):
3338
raise self.failureException
3339
result = self.TracebackRecordingResult()
3340
Test("test_failure").run(result)
3341
self.assertEqual(result.postcode, Test.test_failure.__code__)
3343
def test_env_var_triggers_post_mortem(self):
3344
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3346
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3347
post_mortem_calls = []
3348
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3349
self.overrideEnv('BRZ_TEST_PDB', None)
3350
result._post_mortem(1)
3351
self.overrideEnv('BRZ_TEST_PDB', 'on')
3352
result._post_mortem(2)
3353
self.assertEqual([2], post_mortem_calls)
2963
3356
class TestRunSuite(tests.TestCase):
2976
3369
self.verbosity)
2977
3370
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3371
self.assertLength(1, calls)
3374
class _Selftest(object):
3375
"""Mixin for tests needing full selftest output"""
3377
def _inject_stream_into_subunit(self, stream):
3378
"""To be overridden by subclasses that run tests out of process"""
3380
def _run_selftest(self, **kwargs):
3383
sio = TextIOWrapper(bio, 'utf-8')
3385
sio = bio = StringIO()
3386
self._inject_stream_into_subunit(bio)
3387
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3389
return bio.getvalue()
3392
class _ForkedSelftest(_Selftest):
3393
"""Mixin for tests needing full selftest output with forked children"""
3395
_test_needs_features = [features.subunit]
3397
def _inject_stream_into_subunit(self, stream):
3398
"""Monkey-patch subunit so the extra output goes to stream not stdout
3400
Some APIs need rewriting so this kind of bogus hackery can be replaced
3401
by passing the stream param from run_tests down into ProtocolTestCase.
3403
from subunit import ProtocolTestCase
3404
_original_init = ProtocolTestCase.__init__
3405
def _init_with_passthrough(self, *args, **kwargs):
3406
_original_init(self, *args, **kwargs)
3407
self._passthrough = stream
3408
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3410
def _run_selftest(self, **kwargs):
3411
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3412
if getattr(os, "fork", None) is None:
3413
raise tests.TestNotApplicable("Platform doesn't support forking")
3414
# Make sure the fork code is actually invoked by claiming two cores
3415
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3416
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3417
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3420
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3421
"""Check operation of --parallel=fork selftest option"""
3423
def test_error_in_child_during_fork(self):
3424
"""Error in a forked child during test setup should get reported"""
3425
class Test(tests.TestCase):
3426
def testMethod(self):
3428
# We don't care what, just break something that a child will run
3429
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3430
out = self._run_selftest(test_suite_factory=Test)
3431
# Lines from the tracebacks of the two child processes may be mixed
3432
# together due to the way subunit parses and forwards the streams,
3433
# so permit extra lines between each part of the error output.
3434
self.assertContainsRe(out,
3437
b".+ in fork_for_tests\n"
3439
b"\\s*workaround_zealous_crypto_random\\(\\)\n"
3444
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3445
"""Check a test case still alive after being run emits a warning"""
3447
class Test(tests.TestCase):
3448
def test_pass(self):
3450
def test_self_ref(self):
3451
self.also_self = self.test_self_ref
3452
def test_skip(self):
3453
self.skipTest("Don't need")
3455
def _get_suite(self):
3456
return TestUtil.TestSuite([
3457
self.Test("test_pass"),
3458
self.Test("test_self_ref"),
3459
self.Test("test_skip"),
3462
def _run_selftest_with_suite(self, **kwargs):
3463
old_flags = tests.selftest_debug_flags
3464
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3465
gc_on = gc.isenabled()
3469
output = self._run_selftest(test_suite_factory=self._get_suite,
3474
tests.selftest_debug_flags = old_flags
3475
self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
3476
self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
3479
def test_testsuite(self):
3480
self._run_selftest_with_suite()
3482
def test_pattern(self):
3483
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3484
self.assertNotContainsRe(out, b"test_skip")
3486
def test_exclude_pattern(self):
3487
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3488
self.assertNotContainsRe(out, b"test_skip")
3490
def test_random_seed(self):
3491
self._run_selftest_with_suite(random_seed="now")
3493
def test_matching_tests_first(self):
3494
self._run_selftest_with_suite(matching_tests_first=True,
3495
pattern="test_self_ref$")
3497
def test_starting_with_and_exclude(self):
3498
out = self._run_selftest_with_suite(starting_with=["bt."],
3499
exclude_pattern="test_skip$")
3500
self.assertNotContainsRe(out, b"test_skip")
3502
def test_additonal_decorator(self):
3503
out = self._run_selftest_with_suite(
3504
suite_decorators=[tests.TestDecorator])
3507
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3508
"""Check warnings from tests staying alive are emitted with subunit"""
3510
_test_needs_features = [features.subunit]
3512
def _run_selftest_with_suite(self, **kwargs):
3513
return TestUncollectedWarnings._run_selftest_with_suite(
3514
self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3517
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3518
"""Check warnings from tests staying alive are emitted when forking"""
3521
class TestEnvironHandling(tests.TestCase):
3523
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3524
self.assertFalse('MYVAR' in os.environ)
3525
self.overrideEnv('MYVAR', '42')
3526
# We use an embedded test to make sure we fix the _captureVar bug
3527
class Test(tests.TestCase):
3529
# The first call save the 42 value
3530
self.overrideEnv('MYVAR', None)
3531
self.assertEqual(None, os.environ.get('MYVAR'))
3532
# Make sure we can call it twice
3533
self.overrideEnv('MYVAR', None)
3534
self.assertEqual(None, os.environ.get('MYVAR'))
3536
result = tests.TextTestResult(output, 0, 1)
3537
Test('test_me').run(result)
3538
if not result.wasStrictlySuccessful():
3539
self.fail(output.getvalue())
3540
# We get our value back
3541
self.assertEqual('42', os.environ.get('MYVAR'))
3544
class TestIsolatedEnv(tests.TestCase):
3545
"""Test isolating tests from os.environ.
3547
Since we use tests that are already isolated from os.environ a bit of care
3548
should be taken when designing the tests to avoid bootstrap side-effects.
3549
The tests start an already clean os.environ which allow doing valid
3550
assertions about which variables are present or not and design tests around
3554
class ScratchMonkey(tests.TestCase):
3559
def test_basics(self):
3560
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3561
# for tests.TestCase.
3562
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3563
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3564
# Being part of isolated_environ, BRZ_HOME should not appear here
3565
self.assertFalse('BRZ_HOME' in os.environ)
3566
# Make sure we know the definition of LINES: part of os.environ for
3568
self.assertTrue('LINES' in tests.isolated_environ)
3569
self.assertEqual('25', tests.isolated_environ['LINES'])
3570
self.assertEqual('25', os.environ['LINES'])
3572
def test_injecting_unknown_variable(self):
3573
# BRZ_HOME is known to be absent from os.environ
3574
test = self.ScratchMonkey('test_me')
3575
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3576
self.assertEqual('foo', os.environ['BRZ_HOME'])
3577
tests.restore_os_environ(test)
3578
self.assertFalse('BRZ_HOME' in os.environ)
3580
def test_injecting_known_variable(self):
3581
test = self.ScratchMonkey('test_me')
3582
# LINES is known to be present in os.environ
3583
tests.override_os_environ(test, {'LINES': '42'})
3584
self.assertEqual('42', os.environ['LINES'])
3585
tests.restore_os_environ(test)
3586
self.assertEqual('25', os.environ['LINES'])
3588
def test_deleting_variable(self):
3589
test = self.ScratchMonkey('test_me')
3590
# LINES is known to be present in os.environ
3591
tests.override_os_environ(test, {'LINES': None})
3592
self.assertTrue('LINES' not in os.environ)
3593
tests.restore_os_environ(test)
3594
self.assertEqual('25', os.environ['LINES'])
3597
class TestDocTestSuiteIsolation(tests.TestCase):
3598
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3600
Since tests.TestCase alreay provides an isolation from os.environ, we use
3601
the clean environment as a base for testing. To precisely capture the
3602
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3605
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3606
not `os.environ` so each test overrides it to suit its needs.
3610
def get_doctest_suite_for_string(self, klass, string):
3611
class Finder(doctest.DocTestFinder):
3613
def find(*args, **kwargs):
3614
test = doctest.DocTestParser().get_doctest(
3615
string, {}, 'foo', 'foo.py', 0)
3618
suite = klass(test_finder=Finder())
3621
def run_doctest_suite_for_string(self, klass, string):
3622
suite = self.get_doctest_suite_for_string(klass, string)
3624
result = tests.TextTestResult(output, 0, 1)
3626
return result, output
3628
def assertDocTestStringSucceds(self, klass, string):
3629
result, output = self.run_doctest_suite_for_string(klass, string)
3630
if not result.wasStrictlySuccessful():
3631
self.fail(output.getvalue())
3633
def assertDocTestStringFails(self, klass, string):
3634
result, output = self.run_doctest_suite_for_string(klass, string)
3635
if result.wasStrictlySuccessful():
3636
self.fail(output.getvalue())
3638
def test_injected_variable(self):
3639
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3642
>>> os.environ['LINES']
3645
# doctest.DocTestSuite fails as it sees '25'
3646
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3647
# tests.DocTestSuite sees '42'
3648
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3650
def test_deleted_variable(self):
3651
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3654
>>> os.environ.get('LINES')
3656
# doctest.DocTestSuite fails as it sees '25'
3657
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3658
# tests.DocTestSuite sees None
3659
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3662
class TestSelftestExcludePatterns(tests.TestCase):
3665
super(TestSelftestExcludePatterns, self).setUp()
3666
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3668
def suite_factory(self, keep_only=None, starting_with=None):
3669
"""A test suite factory with only a few tests."""
3670
class Test(tests.TestCase):
3672
# We don't need the full class path
3673
return self._testMethodName
3680
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3682
def assertTestList(self, expected, *selftest_args):
3683
# We rely on setUp installing the right test suite factory so we can
3684
# test at the command level without loading the whole test suite
3685
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3686
actual = out.splitlines()
3687
self.assertEqual(expected, actual)
3689
def test_full_list(self):
3690
self.assertTestList(['a', 'b', 'c'])
3692
def test_single_exclude(self):
3693
self.assertTestList(['b', 'c'], '-x', 'a')
3695
def test_mutiple_excludes(self):
3696
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3699
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3701
_test_needs_features = [features.subunit]
3704
super(TestCounterHooks, self).setUp()
3705
class Test(tests.TestCase):
3708
super(Test, self).setUp()
3709
self.hooks = hooks.Hooks()
3710
self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3711
self.install_counter_hook(self.hooks, 'myhook')
3716
def run_hook_once(self):
3717
for hook in self.hooks['myhook']:
3720
self.test_class = Test
3722
def assertHookCalls(self, expected_calls, test_name):
3723
test = self.test_class(test_name)
3724
result = unittest.TestResult()
3726
self.assertTrue(hasattr(test, '_counters'))
3727
self.assertTrue('myhook' in test._counters)
3728
self.assertEqual(expected_calls, test._counters['myhook'])
3730
def test_no_hook(self):
3731
self.assertHookCalls(0, 'no_hook')
3733
def test_run_hook_once(self):
3734
tt = features.testtools
3735
if tt.module.__version__ < (0, 9, 8):
3736
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3737
self.assertHookCalls(1, 'run_hook_once')