333
342
def test_scenarios(self):
334
343
# check that constructor parameters are passed through to the adapted
336
from bzrlib.tests.per_workingtree import make_scenarios
345
from .per_workingtree import make_scenarios
339
formats = [workingtree.WorkingTreeFormat2(),
340
workingtree.WorkingTreeFormat3(),]
341
scenarios = make_scenarios(server1, server2, formats)
348
formats = [workingtree_4.WorkingTreeFormat4(),
349
workingtree_3.WorkingTreeFormat3(),
350
workingtree_4.WorkingTreeFormat6()]
351
scenarios = make_scenarios(server1, server2, formats,
352
remote_server='c', remote_readonly_server='d',
353
remote_backing_server='e')
342
354
self.assertEqual([
343
('WorkingTreeFormat2',
344
{'bzrdir_format': formats[0]._matchingbzrdir,
355
('WorkingTreeFormat4',
356
{'bzrdir_format': formats[0]._matchingcontroldir,
345
357
'transport_readonly_server': 'b',
346
358
'transport_server': 'a',
347
359
'workingtree_format': formats[0]}),
348
360
('WorkingTreeFormat3',
349
{'bzrdir_format': formats[1]._matchingbzrdir,
350
'transport_readonly_server': 'b',
351
'transport_server': 'a',
352
'workingtree_format': formats[1]})],
361
{'bzrdir_format': formats[1]._matchingcontroldir,
362
'transport_readonly_server': 'b',
363
'transport_server': 'a',
364
'workingtree_format': formats[1]}),
365
('WorkingTreeFormat6',
366
{'bzrdir_format': formats[2]._matchingcontroldir,
367
'transport_readonly_server': 'b',
368
'transport_server': 'a',
369
'workingtree_format': formats[2]}),
370
('WorkingTreeFormat6,remote',
371
{'bzrdir_format': formats[2]._matchingcontroldir,
372
'repo_is_remote': True,
373
'transport_readonly_server': 'd',
374
'transport_server': 'c',
375
'vfs_transport_factory': 'e',
376
'workingtree_format': formats[2]}),
356
380
class TestTreeScenarios(tests.TestCase):
358
382
def test_scenarios(self):
359
383
# the tree implementation scenario generator is meant to setup one
360
# instance for each working tree format, and one additional instance
384
# instance for each working tree format, one additional instance
361
385
# that will use the default wt format, but create a revision tree for
362
# the tests. this means that the wt ones should have the
363
# workingtree_to_test_tree attribute set to 'return_parameter' and the
364
# revision one set to revision_tree_from_workingtree.
386
# the tests, and one more that uses the default wt format as a
387
# lightweight checkout of a remote repository. This means that the wt
388
# ones should have the workingtree_to_test_tree attribute set to
389
# 'return_parameter' and the revision one set to
390
# revision_tree_from_workingtree.
366
from bzrlib.tests.per_tree import (
392
from .per_tree import (
367
393
_dirstate_tree_from_workingtree,
369
395
preview_tree_pre,
376
formats = [workingtree.WorkingTreeFormat2(),
377
workingtree.WorkingTreeFormat3(),]
402
smart_server = test_server.SmartTCPServer_for_testing
403
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
404
mem_server = memory.MemoryServer
405
formats = [workingtree_4.WorkingTreeFormat4(),
406
workingtree_3.WorkingTreeFormat3(),]
378
407
scenarios = make_scenarios(server1, server2, formats)
379
self.assertEqual(7, len(scenarios))
380
default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
wt4_format = workingtree.WorkingTreeFormat4()
382
wt5_format = workingtree.WorkingTreeFormat5()
408
self.assertEqual(9, len(scenarios))
409
default_wt_format = workingtree.format_registry.get_default()
410
wt4_format = workingtree_4.WorkingTreeFormat4()
411
wt5_format = workingtree_4.WorkingTreeFormat5()
412
wt6_format = workingtree_4.WorkingTreeFormat6()
413
git_wt_format = git_workingtree.GitWorkingTreeFormat()
383
414
expected_scenarios = [
384
('WorkingTreeFormat2',
385
{'bzrdir_format': formats[0]._matchingbzrdir,
415
('WorkingTreeFormat4',
416
{'bzrdir_format': formats[0]._matchingcontroldir,
386
417
'transport_readonly_server': 'b',
387
418
'transport_server': 'a',
388
419
'workingtree_format': formats[0],
389
420
'_workingtree_to_test_tree': return_parameter,
391
422
('WorkingTreeFormat3',
392
{'bzrdir_format': formats[1]._matchingbzrdir,
423
{'bzrdir_format': formats[1]._matchingcontroldir,
393
424
'transport_readonly_server': 'b',
394
425
'transport_server': 'a',
395
426
'workingtree_format': formats[1],
396
427
'_workingtree_to_test_tree': return_parameter,
429
('WorkingTreeFormat6,remote',
430
{'bzrdir_format': wt6_format._matchingcontroldir,
431
'repo_is_remote': True,
432
'transport_readonly_server': smart_readonly_server,
433
'transport_server': smart_server,
434
'vfs_transport_factory': mem_server,
435
'workingtree_format': wt6_format,
436
'_workingtree_to_test_tree': return_parameter,
399
439
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
400
'bzrdir_format': default_wt_format._matchingbzrdir,
440
'bzrdir_format': default_wt_format._matchingcontroldir,
401
441
'transport_readonly_server': 'b',
402
442
'transport_server': 'a',
403
443
'workingtree_format': default_wt_format,
446
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
447
'bzrdir_format': git_wt_format._matchingcontroldir,
448
'transport_readonly_server': 'b',
449
'transport_server': 'a',
450
'workingtree_format': git_wt_format,
405
453
('DirStateRevisionTree,WT4',
406
454
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
407
'bzrdir_format': wt4_format._matchingbzrdir,
455
'bzrdir_format': wt4_format._matchingcontroldir,
408
456
'transport_readonly_server': 'b',
409
457
'transport_server': 'a',
410
458
'workingtree_format': wt4_format,
412
460
('DirStateRevisionTree,WT5',
413
461
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
414
'bzrdir_format': wt5_format._matchingbzrdir,
462
'bzrdir_format': wt5_format._matchingcontroldir,
415
463
'transport_readonly_server': 'b',
416
464
'transport_server': 'a',
417
465
'workingtree_format': wt5_format,
420
468
{'_workingtree_to_test_tree': preview_tree_pre,
421
'bzrdir_format': default_wt_format._matchingbzrdir,
469
'bzrdir_format': default_wt_format._matchingcontroldir,
422
470
'transport_readonly_server': 'b',
423
471
'transport_server': 'a',
424
472
'workingtree_format': default_wt_format}),
425
473
('PreviewTreePost',
426
474
{'_workingtree_to_test_tree': preview_tree_post,
427
'bzrdir_format': default_wt_format._matchingbzrdir,
475
'bzrdir_format': default_wt_format._matchingcontroldir,
428
476
'transport_readonly_server': 'b',
429
477
'transport_server': 'a',
430
478
'workingtree_format': default_wt_format}),
592
646
the_branch = builder.get_branch()
593
647
# Guard against regression into MemoryTransport leaking
594
648
# files to disk instead of keeping them in memory.
595
self.failIf(osutils.lexists('dir'))
596
dir_format = bzrdir.format_registry.make_bzrdir('knit')
649
self.assertFalse(osutils.lexists('dir'))
650
dir_format = controldir.format_registry.make_controldir('knit')
597
651
self.assertEqual(dir_format.repository_format.__class__,
598
652
the_branch.repository._format.__class__)
599
self.assertEqual('Bazaar-NG Knit Repository Format 1',
653
self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
600
654
self.get_transport().get_bytes(
601
655
'dir/.bzr/repository/format'))
603
657
def test_dangling_locks_cause_failures(self):
604
658
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
659
def test_function(self):
606
t = self.get_transport('.')
660
t = self.get_transport_from_path('.')
607
661
l = lockdir.LockDir(t, 'lock')
610
664
test = TestDanglingLock('test_function')
611
665
result = test.run()
666
total_failures = result.errors + result.failures
612
667
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
668
self.assertEqual(1, len(total_failures))
615
670
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
672
self.assertEqual(0, len(total_failures))
620
675
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
676
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
678
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
679
from ..transport.readonly import ReadonlyTransportDecorator
626
680
self.vfs_transport_factory = memory.MemoryServer
627
681
self.transport_readonly_server = None
628
682
# calling get_readonly_transport() constructs a decorator on the url
630
684
url = self.get_readonly_url()
631
685
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
686
t = transport.get_transport_from_url(url)
687
t2 = transport.get_transport_from_url(url2)
688
self.assertIsInstance(t, ReadonlyTransportDecorator)
689
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
690
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
692
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
693
from .http_server import HttpServer
694
from ..transport.http import HttpTransport
642
695
self.transport_server = test_server.LocalURLServer
643
696
self.transport_readonly_server = HttpServer
644
697
# calling get_readonly_transport() gives us a HTTP server instance.
645
698
url = self.get_readonly_url()
646
699
url2 = self.get_readonly_url('foo/bar')
647
700
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
701
t = transport.get_transport_from_url(url)
702
t2 = transport.get_transport_from_url(url2)
703
self.assertIsInstance(t, HttpTransport)
704
self.assertIsInstance(t2, HttpTransport)
652
705
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
707
def test_is_directory(self):
743
792
r"^ +[0-9]+ms\*$")
745
794
def test_unittest_reporting_unittest_class(self):
746
# getting the time from a non-bzrlib test works ok
795
# getting the time from a non-breezy test works ok
747
796
class ShortDelayTestCase(unittest.TestCase):
748
797
def test_short_delay(self):
749
798
time.sleep(0.003)
750
799
self.check_timing(ShortDelayTestCase('test_short_delay'),
753
def _patch_get_bzr_source_tree(self):
754
# Reading from the actual source tree breaks isolation, but we don't
755
# want to assume that thats *all* that would happen.
756
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
758
def test_assigned_benchmark_file_stores_date(self):
759
self._patch_get_bzr_source_tree()
761
result = bzrlib.tests.TextTestResult(self._log_file,
766
output_string = output.getvalue()
767
# if you are wondering about the regexp please read the comment in
768
# test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
# XXX: what comment? -- Andrew Bennetts
770
self.assertContainsRe(output_string, "--date [0-9.]+")
772
def test_benchhistory_records_test_times(self):
773
self._patch_get_bzr_source_tree()
774
result_stream = StringIO()
775
result = bzrlib.tests.TextTestResult(
779
bench_history=result_stream
782
# we want profile a call and check that its test duration is recorded
783
# make a new test instance that when run will generate a benchmark
784
example_test_case = TestTestResult("_time_hello_world_encoding")
785
# execute the test, which should succeed and record times
786
example_test_case.run(result)
787
lines = result_stream.getvalue().splitlines()
788
self.assertEqual(2, len(lines))
789
self.assertContainsRe(lines[1],
790
" *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
"._time_hello_world_encoding")
793
802
def _time_hello_world_encoding(self):
794
803
"""Profile two sleep calls
796
805
This is used to exercise the test framework.
798
self.time(unicode, 'hello', errors='replace')
799
self.time(unicode, 'world', errors='replace')
807
self.time(text_type, b'hello', errors='replace')
808
self.time(text_type, b'world', errors='replace')
801
810
def test_lsprofiling(self):
802
811
"""Verbose test result prints lsprof statistics from test cases."""
803
self.requireFeature(test_lsprof.LSProfFeature)
812
self.requireFeature(features.lsprof_feature)
804
813
result_stream = StringIO()
805
result = bzrlib.tests.VerboseTestResult(
806
unittest._WritelnDecorator(result_stream),
814
result = breezy.tests.VerboseTestResult(
826
835
# and then repeated but with 'world', rather than 'hello'.
827
836
# this should appear in the output stream of our test result.
828
837
output = result_stream.getvalue()
829
self.assertContainsRe(output,
830
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
839
self.assertContainsRe(output,
840
r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
841
self.assertContainsRe(output,
842
r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
844
self.assertContainsRe(output,
845
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
846
self.assertContainsRe(output,
847
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
831
848
self.assertContainsRe(output,
832
849
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
833
850
self.assertContainsRe(output,
834
851
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
835
self.assertContainsRe(output,
836
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
853
def test_uses_time_from_testtools(self):
854
"""Test case timings in verbose results should use testtools times"""
856
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
857
def startTest(self, test):
858
self.time(datetime.datetime.utcfromtimestamp(1.145))
859
super(TimeAddedVerboseTestResult, self).startTest(test)
860
def addSuccess(self, test):
861
self.time(datetime.datetime.utcfromtimestamp(51.147))
862
super(TimeAddedVerboseTestResult, self).addSuccess(test)
863
def report_tests_starting(self): pass
865
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
866
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
838
868
def test_known_failure(self):
839
"""A KnownFailure being raised should trigger several result actions."""
869
"""Using knownFailure should trigger several result actions."""
840
870
class InstrumentedTestResult(tests.ExtendedTestResult):
841
871
def stopTestRun(self): pass
842
def startTests(self): pass
843
def report_test_start(self, test): pass
872
def report_tests_starting(self): pass
844
873
def report_known_failure(self, test, err=None, details=None):
845
874
self._call = test, 'known failure'
846
875
result = InstrumentedTestResult(None, None, None, None)
847
876
class Test(tests.TestCase):
848
877
def test_function(self):
849
raise tests.KnownFailure('failed!')
878
self.knownFailure('failed!')
850
879
test = Test("test_function")
852
881
# it should invoke 'report_known_failure'.
1643
1696
class Test(tests.TestCase):
1645
1698
def setUp(self):
1646
tests.TestCase.setUp(self)
1699
super(Test, self).setUp()
1647
1700
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1702
def test_value(self):
1650
1703
self.assertEqual('original', self.orig)
1651
1704
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1706
self._run_successful_test(Test('test_value'))
1655
1707
self.assertEqual('original', obj.test_attr)
1709
def test_overrideAttr_with_no_existing_value_and_value(self):
1710
# Do not define the test_attribute
1711
obj = self # Make 'obj' visible to the embedded test
1712
class Test(tests.TestCase):
1715
tests.TestCase.setUp(self)
1716
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1718
def test_value(self):
1719
self.assertEqual(tests._unitialized_attr, self.orig)
1720
self.assertEqual('modified', obj.test_attr)
1722
self._run_successful_test(Test('test_value'))
1723
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1725
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1726
# Do not define the test_attribute
1727
obj = self # Make 'obj' visible to the embedded test
1728
class Test(tests.TestCase):
1731
tests.TestCase.setUp(self)
1732
self.orig = self.overrideAttr(obj, 'test_attr')
1734
def test_value(self):
1735
self.assertEqual(tests._unitialized_attr, self.orig)
1736
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1738
self._run_successful_test(Test('test_value'))
1739
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1741
def test_recordCalls(self):
1742
from breezy.tests import test_selftest
1743
calls = self.recordCalls(
1744
test_selftest, '_add_numbers')
1745
self.assertEqual(test_selftest._add_numbers(2, 10),
1747
self.assertEqual(calls, [((2, 10), {})])
1750
def _add_numbers(a, b):
1754
class _MissingFeature(features.Feature):
1757
missing_feature = _MissingFeature()
1760
def _get_test(name):
1761
"""Get an instance of a specific example test.
1763
We protect this in a function so that they don't auto-run in the test
1767
class ExampleTests(tests.TestCase):
1769
def test_fail(self):
1770
mutter('this was a failing test')
1771
self.fail('this test will fail')
1773
def test_error(self):
1774
mutter('this test errored')
1775
raise RuntimeError('gotcha')
1777
def test_missing_feature(self):
1778
mutter('missing the feature')
1779
self.requireFeature(missing_feature)
1781
def test_skip(self):
1782
mutter('this test will be skipped')
1783
raise tests.TestSkipped('reason')
1785
def test_success(self):
1786
mutter('this test succeeds')
1788
def test_xfail(self):
1789
mutter('test with expected failure')
1790
self.knownFailure('this_fails')
1792
def test_unexpected_success(self):
1793
mutter('test with unexpected success')
1794
self.expectFailure('should_fail', lambda: None)
1796
return ExampleTests(name)
1799
class TestTestCaseLogDetails(tests.TestCase):
1801
def _run_test(self, test_name):
1802
test = _get_test(test_name)
1803
result = testtools.TestResult()
1807
def test_fail_has_log(self):
1808
result = self._run_test('test_fail')
1809
self.assertEqual(1, len(result.failures))
1810
result_content = result.failures[0][1]
1811
self.assertContainsRe(result_content,
1812
'(?m)^(?:Text attachment: )?log(?:$|: )')
1813
self.assertContainsRe(result_content, 'this was a failing test')
1815
def test_error_has_log(self):
1816
result = self._run_test('test_error')
1817
self.assertEqual(1, len(result.errors))
1818
result_content = result.errors[0][1]
1819
self.assertContainsRe(result_content,
1820
'(?m)^(?:Text attachment: )?log(?:$|: )')
1821
self.assertContainsRe(result_content, 'this test errored')
1823
def test_skip_has_no_log(self):
1824
result = self._run_test('test_skip')
1825
reasons = result.skip_reasons
1826
self.assertEqual({'reason'}, set(reasons))
1827
skips = reasons['reason']
1828
self.assertEqual(1, len(skips))
1830
self.assertFalse('log' in test.getDetails())
1832
def test_missing_feature_has_no_log(self):
1833
# testtools doesn't know about addNotSupported, so it just gets
1834
# considered as a skip
1835
result = self._run_test('test_missing_feature')
1836
reasons = result.skip_reasons
1837
self.assertEqual({str(missing_feature)}, set(reasons))
1838
skips = reasons[str(missing_feature)]
1839
self.assertEqual(1, len(skips))
1841
self.assertFalse('log' in test.getDetails())
1843
def test_xfail_has_no_log(self):
1844
result = self._run_test('test_xfail')
1845
self.assertEqual(1, len(result.expectedFailures))
1846
result_content = result.expectedFailures[0][1]
1847
self.assertNotContainsRe(result_content,
1848
'(?m)^(?:Text attachment: )?log(?:$|: )')
1849
self.assertNotContainsRe(result_content, 'test with expected failure')
1851
def test_unexpected_success_has_log(self):
1852
result = self._run_test('test_unexpected_success')
1853
self.assertEqual(1, len(result.unexpectedSuccesses))
1854
# Inconsistency, unexpectedSuccesses is a list of tests,
1855
# expectedFailures is a list of reasons?
1856
test = result.unexpectedSuccesses[0]
1857
details = test.getDetails()
1858
self.assertTrue('log' in details)
1861
class TestTestCloning(tests.TestCase):
1862
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1864
def test_cloned_testcase_does_not_share_details(self):
1865
"""A TestCase cloned with clone_test does not share mutable attributes
1866
such as details or cleanups.
1868
class Test(tests.TestCase):
1870
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1871
orig_test = Test('test_foo')
1872
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1873
orig_test.run(unittest.TestResult())
1874
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1875
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1877
def test_double_apply_scenario_preserves_first_scenario(self):
1878
"""Applying two levels of scenarios to a test preserves the attributes
1879
added by both scenarios.
1881
class Test(tests.TestCase):
1884
test = Test('test_foo')
1885
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1886
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1887
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1888
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1889
all_tests = list(tests.iter_suite_tests(suite))
1890
self.assertLength(4, all_tests)
1891
all_xys = sorted((t.x, t.y) for t in all_tests)
1892
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1895
# NB: Don't delete this; it's not actually from 0.11!
1659
1896
@deprecated_function(deprecated_in((0, 11, 0)))
1971
2221
load_list='missing file name', list_only=True)
2224
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2226
_test_needs_features = [features.subunit]
2228
def run_subunit_stream(self, test_name):
2229
from subunit import ProtocolTestCase
2231
return TestUtil.TestSuite([_get_test(test_name)])
2232
stream = self.run_selftest(
2233
runner_class=tests.SubUnitBzrRunnerv1,
2234
test_suite_factory=factory)
2235
test = ProtocolTestCase(stream)
2236
result = testtools.TestResult()
2238
content = stream.getvalue()
2239
return content, result
2241
def test_fail_has_log(self):
2242
content, result = self.run_subunit_stream('test_fail')
2243
self.assertEqual(1, len(result.failures))
2244
self.assertContainsRe(content, b'(?m)^log$')
2245
self.assertContainsRe(content, b'this test will fail')
2247
def test_error_has_log(self):
2248
content, result = self.run_subunit_stream('test_error')
2249
self.assertContainsRe(content, b'(?m)^log$')
2250
self.assertContainsRe(content, b'this test errored')
2252
def test_skip_has_no_log(self):
2253
content, result = self.run_subunit_stream('test_skip')
2254
self.assertNotContainsRe(content, b'(?m)^log$')
2255
self.assertNotContainsRe(content, b'this test will be skipped')
2256
reasons = result.skip_reasons
2257
self.assertEqual({'reason'}, set(reasons))
2258
skips = reasons['reason']
2259
self.assertEqual(1, len(skips))
2261
# RemotedTestCase doesn't preserve the "details"
2262
## self.assertFalse('log' in test.getDetails())
2264
def test_missing_feature_has_no_log(self):
2265
content, result = self.run_subunit_stream('test_missing_feature')
2266
self.assertNotContainsRe(content, b'(?m)^log$')
2267
self.assertNotContainsRe(content, b'missing the feature')
2268
reasons = result.skip_reasons
2269
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2270
skips = reasons['_MissingFeature\n']
2271
self.assertEqual(1, len(skips))
2273
# RemotedTestCase doesn't preserve the "details"
2274
## self.assertFalse('log' in test.getDetails())
2276
def test_xfail_has_no_log(self):
2277
content, result = self.run_subunit_stream('test_xfail')
2278
self.assertNotContainsRe(content, b'(?m)^log$')
2279
self.assertNotContainsRe(content, b'test with expected failure')
2280
self.assertEqual(1, len(result.expectedFailures))
2281
result_content = result.expectedFailures[0][1]
2282
self.assertNotContainsRe(result_content,
2283
'(?m)^(?:Text attachment: )?log(?:$|: )')
2284
self.assertNotContainsRe(result_content, 'test with expected failure')
2286
def test_unexpected_success_has_log(self):
2287
content, result = self.run_subunit_stream('test_unexpected_success')
2288
self.assertContainsRe(content, b'(?m)^log$')
2289
self.assertContainsRe(content, b'test with unexpected success')
2290
self.assertEqual(1, len(result.unexpectedSuccesses))
2291
test = result.unexpectedSuccesses[0]
2292
# RemotedTestCase doesn't preserve the "details"
2293
## self.assertTrue('log' in test.getDetails())
2295
def test_success_has_no_log(self):
2296
content, result = self.run_subunit_stream('test_success')
2297
self.assertEqual(1, result.testsRun)
2298
self.assertNotContainsRe(content, b'(?m)^log$')
2299
self.assertNotContainsRe(content, b'this test succeeds')
1974
2302
class TestRunBzr(tests.TestCase):
1979
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2308
def _run_bzr_core(self, argv, encoding=None, stdin=None,
2309
stdout=None, stderr=None, working_dir=None):
1981
2310
"""Override _run_bzr_core to test how it is invoked by run_bzr.
1983
2312
Attempts to run bzr from inside this class don't actually run it.
2264
2576
class TestStartBzrSubProcess(tests.TestCase):
2577
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2579
def _subprocess_log_cleanup(self):
2580
"""Inhibits the base version as we don't produce a log file."""
2269
2582
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2583
"""Override the base version to record the command that is run.
2585
From there we can ensure it is correct without spawning a real process.
2271
2587
self.check_popen_state()
2272
2588
self._popen_args = args
2273
2589
self._popen_kwargs = kwargs
2274
2590
raise _DontSpawnProcess()
2592
def check_popen_state(self):
2593
"""Replace to make assertions when popen is called."""
2276
2595
def test_run_bzr_subprocess_no_plugins(self):
2277
2596
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2597
command = self._popen_args[0]
2279
2598
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2599
self.assertEqual(self.get_brz_path(), command[1])
2281
2600
self.assertEqual(['--no-plugins'], command[2:])
2283
2602
def test_allow_plugins(self):
2284
2603
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2605
command = self._popen_args[0]
2287
2606
self.assertEqual([], command[2:])
2289
2608
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2609
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2610
# set in the child
2292
2611
def check_environment():
2293
2612
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2613
self.check_popen_state = check_environment
2295
2614
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2615
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2297
2616
# not set in theparent
2298
2617
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2619
def test_run_bzr_subprocess_env_del(self):
2301
2620
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2621
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2622
def check_environment():
2304
2623
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2624
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2625
self.check_popen_state = check_environment
2307
2626
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2627
env_changes={'EXISTANT_ENV_VAR':None})
2309
2628
# Still set in parent
2310
2629
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2630
del os.environ['EXISTANT_ENV_VAR']
2313
2632
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2633
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2634
def check_environment():
2316
2635
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2636
self.check_popen_state = check_environment
2318
2637
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2638
env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2640
def test_working_dir(self):
2322
2641
"""Test that we can specify the working dir for the child"""
2350
2668
self.disable_missing_extensions_warning()
2351
2669
process = self.start_bzr_subprocess(['wait-until-signalled'],
2352
2670
skip_if_plan_to_signal=True)
2353
self.assertEqual('running\n', process.stdout.readline())
2671
self.assertEqual(b'running\n', process.stdout.readline())
2354
2672
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2674
self.assertEqual(b'', result[0])
2675
self.assertEqual(b'brz: interrupted\n', result[1])
2434
2678
class TestSelftestFiltering(tests.TestCase):
2436
2680
def setUp(self):
2437
tests.TestCase.setUp(self)
2681
super(TestSelftestFiltering, self).setUp()
2438
2682
self.suite = TestUtil.TestSuite()
2439
2683
self.loader = TestUtil.TestLoader()
2440
2684
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2685
sys.modules['breezy.tests.test_selftest']))
2442
2686
self.all_names = _test_ids(self.suite)
2444
2688
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2689
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2446
2690
'test_condition_id_re')
2447
2691
filtered_suite = tests.filter_suite_by_condition(
2448
2692
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2693
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2695
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2696
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2453
2697
'test_condition_id_in_list']
2454
2698
id_list = tests.TestIdList(test_names)
2455
2699
filtered_suite = tests.filter_suite_by_condition(
2953
3202
def test_predefined_prefixes(self):
2954
3203
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3204
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3205
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3206
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3207
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3208
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3209
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3212
class TestThreadLeakDetection(tests.TestCase):
3213
"""Ensure when tests leak threads we detect and report it"""
3215
class LeakRecordingResult(tests.ExtendedTestResult):
3217
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3219
def _report_thread_leak(self, test, leaks, alive):
3220
self.leaks.append((test, leaks))
3222
def test_testcase_without_addCleanups(self):
3223
"""Check old TestCase instances don't break with leak detection"""
3224
class Test(unittest.TestCase):
3227
result = self.LeakRecordingResult()
3229
result.startTestRun()
3231
result.stopTestRun()
3232
self.assertEqual(result._tests_leaking_threads_count, 0)
3233
self.assertEqual(result.leaks, [])
3235
def test_thread_leak(self):
3236
"""Ensure a thread that outlives the running of a test is reported
3238
Uses a thread that blocks on an event, and is started by the inner
3239
test case. As the thread outlives the inner case's run, it should be
3240
detected as a leak, but the event is then set so that the thread can
3241
be safely joined in cleanup so it's not leaked for real.
3243
event = threading.Event()
3244
thread = threading.Thread(name="Leaker", target=event.wait)
3245
class Test(tests.TestCase):
3246
def test_leak(self):
3248
result = self.LeakRecordingResult()
3249
test = Test("test_leak")
3250
self.addCleanup(thread.join)
3251
self.addCleanup(event.set)
3252
result.startTestRun()
3254
result.stopTestRun()
3255
self.assertEqual(result._tests_leaking_threads_count, 1)
3256
self.assertEqual(result._first_thread_leaker_id, test.id())
3257
self.assertEqual(result.leaks, [(test, {thread})])
3258
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3260
def test_multiple_leaks(self):
3261
"""Check multiple leaks are blamed on the test cases at fault
3263
Same concept as the previous test, but has one inner test method that
3264
leaks two threads, and one that doesn't leak at all.
3266
event = threading.Event()
3267
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3268
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3269
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3270
class Test(tests.TestCase):
3271
def test_first_leak(self):
3273
def test_second_no_leak(self):
3275
def test_third_leak(self):
3278
result = self.LeakRecordingResult()
3279
first_test = Test("test_first_leak")
3280
third_test = Test("test_third_leak")
3281
self.addCleanup(thread_a.join)
3282
self.addCleanup(thread_b.join)
3283
self.addCleanup(thread_c.join)
3284
self.addCleanup(event.set)
3285
result.startTestRun()
3287
[first_test, Test("test_second_no_leak"), third_test]
3289
result.stopTestRun()
3290
self.assertEqual(result._tests_leaking_threads_count, 2)
3291
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3292
self.assertEqual(result.leaks, [
3293
(first_test, {thread_b}),
3294
(third_test, {thread_a, thread_c})])
3295
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3298
class TestPostMortemDebugging(tests.TestCase):
3299
"""Check post mortem debugging works when tests fail or error"""
3301
class TracebackRecordingResult(tests.ExtendedTestResult):
3303
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3304
self.postcode = None
3305
def _post_mortem(self, tb=None):
3306
"""Record the code object at the end of the current traceback"""
3307
tb = tb or sys.exc_info()[2]
3310
while next is not None:
3313
self.postcode = tb.tb_frame.f_code
3314
def report_error(self, test, err):
3316
def report_failure(self, test, err):
3319
def test_location_unittest_error(self):
3320
"""Needs right post mortem traceback with erroring unittest case"""
3321
class Test(unittest.TestCase):
3324
result = self.TracebackRecordingResult()
3326
self.assertEqual(result.postcode, Test.runTest.__code__)
3328
def test_location_unittest_failure(self):
3329
"""Needs right post mortem traceback with failing unittest case"""
3330
class Test(unittest.TestCase):
3332
raise self.failureException
3333
result = self.TracebackRecordingResult()
3335
self.assertEqual(result.postcode, Test.runTest.__code__)
3337
def test_location_bt_error(self):
3338
"""Needs right post mortem traceback with erroring breezy.tests case"""
3339
class Test(tests.TestCase):
3340
def test_error(self):
3342
result = self.TracebackRecordingResult()
3343
Test("test_error").run(result)
3344
self.assertEqual(result.postcode, Test.test_error.__code__)
3346
def test_location_bt_failure(self):
3347
"""Needs right post mortem traceback with failing breezy.tests case"""
3348
class Test(tests.TestCase):
3349
def test_failure(self):
3350
raise self.failureException
3351
result = self.TracebackRecordingResult()
3352
Test("test_failure").run(result)
3353
self.assertEqual(result.postcode, Test.test_failure.__code__)
3355
def test_env_var_triggers_post_mortem(self):
3356
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3358
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3359
post_mortem_calls = []
3360
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3361
self.overrideEnv('BRZ_TEST_PDB', None)
3362
result._post_mortem(1)
3363
self.overrideEnv('BRZ_TEST_PDB', 'on')
3364
result._post_mortem(2)
3365
self.assertEqual([2], post_mortem_calls)
2963
3368
class TestRunSuite(tests.TestCase):
2976
3381
self.verbosity)
2977
3382
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3383
self.assertLength(1, calls)
3386
class _Selftest(object):
3387
"""Mixin for tests needing full selftest output"""
3389
def _inject_stream_into_subunit(self, stream):
3390
"""To be overridden by subclasses that run tests out of process"""
3392
def _run_selftest(self, **kwargs):
3395
sio = TextIOWrapper(bio, 'utf-8')
3397
sio = bio = StringIO()
3398
self._inject_stream_into_subunit(bio)
3399
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3401
return bio.getvalue()
3404
class _ForkedSelftest(_Selftest):
3405
"""Mixin for tests needing full selftest output with forked children"""
3407
_test_needs_features = [features.subunit]
3409
def _inject_stream_into_subunit(self, stream):
3410
"""Monkey-patch subunit so the extra output goes to stream not stdout
3412
Some APIs need rewriting so this kind of bogus hackery can be replaced
3413
by passing the stream param from run_tests down into ProtocolTestCase.
3415
from subunit import ProtocolTestCase
3416
_original_init = ProtocolTestCase.__init__
3417
def _init_with_passthrough(self, *args, **kwargs):
3418
_original_init(self, *args, **kwargs)
3419
self._passthrough = stream
3420
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3422
def _run_selftest(self, **kwargs):
3423
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3424
if getattr(os, "fork", None) is None:
3425
raise tests.TestNotApplicable("Platform doesn't support forking")
3426
# Make sure the fork code is actually invoked by claiming two cores
3427
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3428
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3429
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3432
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3433
"""Check operation of --parallel=fork selftest option"""
3435
def test_error_in_child_during_fork(self):
3436
"""Error in a forked child during test setup should get reported"""
3437
class Test(tests.TestCase):
3438
def testMethod(self):
3440
# We don't care what, just break something that a child will run
3441
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3442
out = self._run_selftest(test_suite_factory=Test)
3443
# Lines from the tracebacks of the two child processes may be mixed
3444
# together due to the way subunit parses and forwards the streams,
3445
# so permit extra lines between each part of the error output.
3446
self.assertContainsRe(out,
3449
b".+ in fork_for_tests\n"
3451
b"\\s*workaround_zealous_crypto_random\\(\\)\n"
3456
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3457
"""Check a test case still alive after being run emits a warning"""
3459
class Test(tests.TestCase):
3460
def test_pass(self):
3462
def test_self_ref(self):
3463
self.also_self = self.test_self_ref
3464
def test_skip(self):
3465
self.skipTest("Don't need")
3467
def _get_suite(self):
3468
return TestUtil.TestSuite([
3469
self.Test("test_pass"),
3470
self.Test("test_self_ref"),
3471
self.Test("test_skip"),
3474
def _run_selftest_with_suite(self, **kwargs):
3475
old_flags = tests.selftest_debug_flags
3476
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3477
gc_on = gc.isenabled()
3481
output = self._run_selftest(test_suite_factory=self._get_suite,
3486
tests.selftest_debug_flags = old_flags
3487
self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
3488
self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
3491
def test_testsuite(self):
3492
self._run_selftest_with_suite()
3494
def test_pattern(self):
3495
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3496
self.assertNotContainsRe(out, b"test_skip")
3498
def test_exclude_pattern(self):
3499
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3500
self.assertNotContainsRe(out, b"test_skip")
3502
def test_random_seed(self):
3503
self._run_selftest_with_suite(random_seed="now")
3505
def test_matching_tests_first(self):
3506
self._run_selftest_with_suite(matching_tests_first=True,
3507
pattern="test_self_ref$")
3509
def test_starting_with_and_exclude(self):
3510
out = self._run_selftest_with_suite(starting_with=["bt."],
3511
exclude_pattern="test_skip$")
3512
self.assertNotContainsRe(out, b"test_skip")
3514
def test_additonal_decorator(self):
3515
out = self._run_selftest_with_suite(
3516
suite_decorators=[tests.TestDecorator])
3519
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3520
"""Check warnings from tests staying alive are emitted with subunit"""
3522
_test_needs_features = [features.subunit]
3524
def _run_selftest_with_suite(self, **kwargs):
3525
return TestUncollectedWarnings._run_selftest_with_suite(
3526
self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3529
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3530
"""Check warnings from tests staying alive are emitted when forking"""
3533
class TestEnvironHandling(tests.TestCase):
3535
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3536
self.assertFalse('MYVAR' in os.environ)
3537
self.overrideEnv('MYVAR', '42')
3538
# We use an embedded test to make sure we fix the _captureVar bug
3539
class Test(tests.TestCase):
3541
# The first call save the 42 value
3542
self.overrideEnv('MYVAR', None)
3543
self.assertEqual(None, os.environ.get('MYVAR'))
3544
# Make sure we can call it twice
3545
self.overrideEnv('MYVAR', None)
3546
self.assertEqual(None, os.environ.get('MYVAR'))
3548
result = tests.TextTestResult(output, 0, 1)
3549
Test('test_me').run(result)
3550
if not result.wasStrictlySuccessful():
3551
self.fail(output.getvalue())
3552
# We get our value back
3553
self.assertEqual('42', os.environ.get('MYVAR'))
3556
class TestIsolatedEnv(tests.TestCase):
3557
"""Test isolating tests from os.environ.
3559
Since we use tests that are already isolated from os.environ a bit of care
3560
should be taken when designing the tests to avoid bootstrap side-effects.
3561
The tests start an already clean os.environ which allow doing valid
3562
assertions about which variables are present or not and design tests around
3566
class ScratchMonkey(tests.TestCase):
3571
def test_basics(self):
3572
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3573
# for tests.TestCase.
3574
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3575
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3576
# Being part of isolated_environ, BRZ_HOME should not appear here
3577
self.assertFalse('BRZ_HOME' in os.environ)
3578
# Make sure we know the definition of LINES: part of os.environ for
3580
self.assertTrue('LINES' in tests.isolated_environ)
3581
self.assertEqual('25', tests.isolated_environ['LINES'])
3582
self.assertEqual('25', os.environ['LINES'])
3584
def test_injecting_unknown_variable(self):
3585
# BRZ_HOME is known to be absent from os.environ
3586
test = self.ScratchMonkey('test_me')
3587
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3588
self.assertEqual('foo', os.environ['BRZ_HOME'])
3589
tests.restore_os_environ(test)
3590
self.assertFalse('BRZ_HOME' in os.environ)
3592
def test_injecting_known_variable(self):
3593
test = self.ScratchMonkey('test_me')
3594
# LINES is known to be present in os.environ
3595
tests.override_os_environ(test, {'LINES': '42'})
3596
self.assertEqual('42', os.environ['LINES'])
3597
tests.restore_os_environ(test)
3598
self.assertEqual('25', os.environ['LINES'])
3600
def test_deleting_variable(self):
3601
test = self.ScratchMonkey('test_me')
3602
# LINES is known to be present in os.environ
3603
tests.override_os_environ(test, {'LINES': None})
3604
self.assertTrue('LINES' not in os.environ)
3605
tests.restore_os_environ(test)
3606
self.assertEqual('25', os.environ['LINES'])
3609
class TestDocTestSuiteIsolation(tests.TestCase):
3610
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3612
Since tests.TestCase alreay provides an isolation from os.environ, we use
3613
the clean environment as a base for testing. To precisely capture the
3614
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3617
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3618
not `os.environ` so each test overrides it to suit its needs.
3622
def get_doctest_suite_for_string(self, klass, string):
3623
class Finder(doctest.DocTestFinder):
3625
def find(*args, **kwargs):
3626
test = doctest.DocTestParser().get_doctest(
3627
string, {}, 'foo', 'foo.py', 0)
3630
suite = klass(test_finder=Finder())
3633
def run_doctest_suite_for_string(self, klass, string):
3634
suite = self.get_doctest_suite_for_string(klass, string)
3636
result = tests.TextTestResult(output, 0, 1)
3638
return result, output
3640
def assertDocTestStringSucceds(self, klass, string):
3641
result, output = self.run_doctest_suite_for_string(klass, string)
3642
if not result.wasStrictlySuccessful():
3643
self.fail(output.getvalue())
3645
def assertDocTestStringFails(self, klass, string):
3646
result, output = self.run_doctest_suite_for_string(klass, string)
3647
if result.wasStrictlySuccessful():
3648
self.fail(output.getvalue())
3650
def test_injected_variable(self):
3651
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3654
>>> os.environ['LINES']
3657
# doctest.DocTestSuite fails as it sees '25'
3658
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3659
# tests.DocTestSuite sees '42'
3660
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3662
def test_deleted_variable(self):
3663
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3666
>>> os.environ.get('LINES')
3668
# doctest.DocTestSuite fails as it sees '25'
3669
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3670
# tests.DocTestSuite sees None
3671
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3674
class TestSelftestExcludePatterns(tests.TestCase):
3677
super(TestSelftestExcludePatterns, self).setUp()
3678
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3680
def suite_factory(self, keep_only=None, starting_with=None):
3681
"""A test suite factory with only a few tests."""
3682
class Test(tests.TestCase):
3684
# We don't need the full class path
3685
return self._testMethodName
3692
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3694
def assertTestList(self, expected, *selftest_args):
3695
# We rely on setUp installing the right test suite factory so we can
3696
# test at the command level without loading the whole test suite
3697
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3698
actual = out.splitlines()
3699
self.assertEqual(expected, actual)
3701
def test_full_list(self):
3702
self.assertTestList(['a', 'b', 'c'])
3704
def test_single_exclude(self):
3705
self.assertTestList(['b', 'c'], '-x', 'a')
3707
def test_mutiple_excludes(self):
3708
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3711
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3713
_test_needs_features = [features.subunit]
3716
super(TestCounterHooks, self).setUp()
3717
class Test(tests.TestCase):
3720
super(Test, self).setUp()
3721
self.hooks = hooks.Hooks()
3722
self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3723
self.install_counter_hook(self.hooks, 'myhook')
3728
def run_hook_once(self):
3729
for hook in self.hooks['myhook']:
3732
self.test_class = Test
3734
def assertHookCalls(self, expected_calls, test_name):
3735
test = self.test_class(test_name)
3736
result = unittest.TestResult()
3738
self.assertTrue(hasattr(test, '_counters'))
3739
self.assertTrue('myhook' in test._counters)
3740
self.assertEqual(expected_calls, test._counters['myhook'])
3742
def test_no_hook(self):
3743
self.assertHookCalls(0, 'no_hook')
3745
def test_run_hook_once(self):
3746
tt = features.testtools
3747
if tt.module.__version__ < (0, 9, 8):
3748
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3749
self.assertHookCalls(1, 'run_hook_once')