333
338
def test_scenarios(self):
334
339
# check that constructor parameters are passed through to the adapted
336
from bzrlib.tests.per_workingtree import make_scenarios
341
from .per_workingtree import make_scenarios
339
formats = [workingtree.WorkingTreeFormat2(),
340
workingtree.WorkingTreeFormat3(),]
341
scenarios = make_scenarios(server1, server2, formats)
344
formats = [workingtree_4.WorkingTreeFormat4(),
345
workingtree_3.WorkingTreeFormat3(),
346
workingtree_4.WorkingTreeFormat6()]
347
scenarios = make_scenarios(server1, server2, formats,
348
remote_server='c', remote_readonly_server='d',
349
remote_backing_server='e')
342
350
self.assertEqual([
343
('WorkingTreeFormat2',
344
{'bzrdir_format': formats[0]._matchingbzrdir,
351
('WorkingTreeFormat4',
352
{'bzrdir_format': formats[0]._matchingcontroldir,
345
353
'transport_readonly_server': 'b',
346
354
'transport_server': 'a',
347
355
'workingtree_format': formats[0]}),
348
356
('WorkingTreeFormat3',
349
{'bzrdir_format': formats[1]._matchingbzrdir,
350
'transport_readonly_server': 'b',
351
'transport_server': 'a',
352
'workingtree_format': formats[1]})],
357
{'bzrdir_format': formats[1]._matchingcontroldir,
358
'transport_readonly_server': 'b',
359
'transport_server': 'a',
360
'workingtree_format': formats[1]}),
361
('WorkingTreeFormat6',
362
{'bzrdir_format': formats[2]._matchingcontroldir,
363
'transport_readonly_server': 'b',
364
'transport_server': 'a',
365
'workingtree_format': formats[2]}),
366
('WorkingTreeFormat6,remote',
367
{'bzrdir_format': formats[2]._matchingcontroldir,
368
'repo_is_remote': True,
369
'transport_readonly_server': 'd',
370
'transport_server': 'c',
371
'vfs_transport_factory': 'e',
372
'workingtree_format': formats[2]}),
356
376
class TestTreeScenarios(tests.TestCase):
358
378
def test_scenarios(self):
359
379
# the tree implementation scenario generator is meant to setup one
360
# instance for each working tree format, and one additional instance
380
# instance for each working tree format, one additional instance
361
381
# that will use the default wt format, but create a revision tree for
362
# the tests. this means that the wt ones should have the
363
# workingtree_to_test_tree attribute set to 'return_parameter' and the
364
# revision one set to revision_tree_from_workingtree.
382
# the tests, and one more that uses the default wt format as a
383
# lightweight checkout of a remote repository. This means that the wt
384
# ones should have the workingtree_to_test_tree attribute set to
385
# 'return_parameter' and the revision one set to
386
# revision_tree_from_workingtree.
366
from bzrlib.tests.per_tree import (
388
from .per_tree import (
367
389
_dirstate_tree_from_workingtree,
369
391
preview_tree_pre,
376
formats = [workingtree.WorkingTreeFormat2(),
377
workingtree.WorkingTreeFormat3(),]
398
smart_server = test_server.SmartTCPServer_for_testing
399
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
400
mem_server = memory.MemoryServer
401
formats = [workingtree_4.WorkingTreeFormat4(),
402
workingtree_3.WorkingTreeFormat3(), ]
378
403
scenarios = make_scenarios(server1, server2, formats)
379
self.assertEqual(7, len(scenarios))
380
default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
wt4_format = workingtree.WorkingTreeFormat4()
382
wt5_format = workingtree.WorkingTreeFormat5()
404
self.assertEqual(9, len(scenarios))
405
default_wt_format = workingtree.format_registry.get_default()
406
wt4_format = workingtree_4.WorkingTreeFormat4()
407
wt5_format = workingtree_4.WorkingTreeFormat5()
408
wt6_format = workingtree_4.WorkingTreeFormat6()
409
git_wt_format = git_workingtree.GitWorkingTreeFormat()
383
410
expected_scenarios = [
384
('WorkingTreeFormat2',
385
{'bzrdir_format': formats[0]._matchingbzrdir,
411
('WorkingTreeFormat4',
412
{'bzrdir_format': formats[0]._matchingcontroldir,
386
413
'transport_readonly_server': 'b',
387
414
'transport_server': 'a',
388
415
'workingtree_format': formats[0],
389
416
'_workingtree_to_test_tree': return_parameter,
391
418
('WorkingTreeFormat3',
392
{'bzrdir_format': formats[1]._matchingbzrdir,
419
{'bzrdir_format': formats[1]._matchingcontroldir,
393
420
'transport_readonly_server': 'b',
394
421
'transport_server': 'a',
395
422
'workingtree_format': formats[1],
396
423
'_workingtree_to_test_tree': return_parameter,
425
('WorkingTreeFormat6,remote',
426
{'bzrdir_format': wt6_format._matchingcontroldir,
427
'repo_is_remote': True,
428
'transport_readonly_server': smart_readonly_server,
429
'transport_server': smart_server,
430
'vfs_transport_factory': mem_server,
431
'workingtree_format': wt6_format,
432
'_workingtree_to_test_tree': return_parameter,
399
435
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
400
'bzrdir_format': default_wt_format._matchingbzrdir,
436
'bzrdir_format': default_wt_format._matchingcontroldir,
401
437
'transport_readonly_server': 'b',
402
438
'transport_server': 'a',
403
439
'workingtree_format': default_wt_format,
442
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
443
'bzrdir_format': git_wt_format._matchingcontroldir,
444
'transport_readonly_server': 'b',
445
'transport_server': 'a',
446
'workingtree_format': git_wt_format,
405
449
('DirStateRevisionTree,WT4',
406
450
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
407
'bzrdir_format': wt4_format._matchingbzrdir,
451
'bzrdir_format': wt4_format._matchingcontroldir,
408
452
'transport_readonly_server': 'b',
409
453
'transport_server': 'a',
410
454
'workingtree_format': wt4_format,
412
456
('DirStateRevisionTree,WT5',
413
457
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
414
'bzrdir_format': wt5_format._matchingbzrdir,
458
'bzrdir_format': wt5_format._matchingcontroldir,
415
459
'transport_readonly_server': 'b',
416
460
'transport_server': 'a',
417
461
'workingtree_format': wt5_format,
420
464
{'_workingtree_to_test_tree': preview_tree_pre,
421
'bzrdir_format': default_wt_format._matchingbzrdir,
465
'bzrdir_format': default_wt_format._matchingcontroldir,
422
466
'transport_readonly_server': 'b',
423
467
'transport_server': 'a',
424
468
'workingtree_format': default_wt_format}),
425
469
('PreviewTreePost',
426
470
{'_workingtree_to_test_tree': preview_tree_post,
427
'bzrdir_format': default_wt_format._matchingbzrdir,
471
'bzrdir_format': default_wt_format._matchingcontroldir,
428
472
'transport_readonly_server': 'b',
429
473
'transport_server': 'a',
430
474
'workingtree_format': default_wt_format}),
432
476
self.assertEqual(expected_scenarios, scenarios)
548
598
tree = self.make_branch_and_memory_tree('dir')
549
599
# Guard against regression into MemoryTransport leaking
550
600
# files to disk instead of keeping them in memory.
551
self.failIf(osutils.lexists('dir'))
601
self.assertFalse(osutils.lexists('dir'))
552
602
self.assertIsInstance(tree, memorytree.MemoryTree)
554
604
def test_make_branch_and_memory_tree_with_format(self):
555
605
"""make_branch_and_memory_tree should accept a format option."""
556
606
format = bzrdir.BzrDirMetaFormat1()
557
format.repository_format = weaverepo.RepositoryFormat7()
607
format.repository_format = repository.format_registry.get_default()
558
608
tree = self.make_branch_and_memory_tree('dir', format=format)
559
609
# Guard against regression into MemoryTransport leaking
560
610
# files to disk instead of keeping them in memory.
561
self.failIf(osutils.lexists('dir'))
611
self.assertFalse(osutils.lexists('dir'))
562
612
self.assertIsInstance(tree, memorytree.MemoryTree)
563
613
self.assertEqual(format.repository_format.__class__,
564
tree.branch.repository._format.__class__)
614
tree.branch.repository._format.__class__)
566
616
def test_make_branch_builder(self):
567
617
builder = self.make_branch_builder('dir')
568
618
self.assertIsInstance(builder, branchbuilder.BranchBuilder)
569
619
# Guard against regression into MemoryTransport leaking
570
620
# files to disk instead of keeping them in memory.
571
self.failIf(osutils.lexists('dir'))
621
self.assertFalse(osutils.lexists('dir'))
573
623
def test_make_branch_builder_with_format(self):
574
624
# Use a repo layout that doesn't conform to a 'named' layout, to ensure
575
625
# that the format objects are used.
576
626
format = bzrdir.BzrDirMetaFormat1()
577
repo_format = weaverepo.RepositoryFormat7()
627
repo_format = repository.format_registry.get_default()
578
628
format.repository_format = repo_format
579
629
builder = self.make_branch_builder('dir', format=format)
580
630
the_branch = builder.get_branch()
581
631
# Guard against regression into MemoryTransport leaking
582
632
# files to disk instead of keeping them in memory.
583
self.failIf(osutils.lexists('dir'))
633
self.assertFalse(osutils.lexists('dir'))
584
634
self.assertEqual(format.repository_format.__class__,
585
635
the_branch.repository._format.__class__)
586
636
self.assertEqual(repo_format.get_format_string(),
587
637
self.get_transport().get_bytes(
588
'dir/.bzr/repository/format'))
638
'dir/.bzr/repository/format'))
590
640
def test_make_branch_builder_with_format_name(self):
591
641
builder = self.make_branch_builder('dir', format='knit')
592
642
the_branch = builder.get_branch()
593
643
# Guard against regression into MemoryTransport leaking
594
644
# files to disk instead of keeping them in memory.
595
self.failIf(osutils.lexists('dir'))
596
dir_format = bzrdir.format_registry.make_bzrdir('knit')
645
self.assertFalse(osutils.lexists('dir'))
646
dir_format = controldir.format_registry.make_controldir('knit')
597
647
self.assertEqual(dir_format.repository_format.__class__,
598
648
the_branch.repository._format.__class__)
599
self.assertEqual('Bazaar-NG Knit Repository Format 1',
649
self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
600
650
self.get_transport().get_bytes(
601
'dir/.bzr/repository/format'))
651
'dir/.bzr/repository/format'))
603
653
def test_dangling_locks_cause_failures(self):
604
654
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
655
def test_function(self):
606
t = self.get_transport('.')
656
t = self.get_transport_from_path('.')
607
657
l = lockdir.LockDir(t, 'lock')
610
660
test = TestDanglingLock('test_function')
611
661
result = test.run()
662
total_failures = result.errors + result.failures
612
663
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
664
self.assertEqual(1, len(total_failures))
615
666
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
668
self.assertEqual(0, len(total_failures))
620
671
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
672
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
674
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
675
from ..transport.readonly import ReadonlyTransportDecorator
626
676
self.vfs_transport_factory = memory.MemoryServer
627
677
self.transport_readonly_server = None
628
678
# calling get_readonly_transport() constructs a decorator on the url
630
680
url = self.get_readonly_url()
631
681
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
682
t = transport.get_transport_from_url(url)
683
t2 = transport.get_transport_from_url(url2)
684
self.assertIsInstance(t, ReadonlyTransportDecorator)
685
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
686
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
688
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
689
from .http_server import HttpServer
690
from ..transport.http import HttpTransport
642
691
self.transport_server = test_server.LocalURLServer
643
692
self.transport_readonly_server = HttpServer
644
693
# calling get_readonly_transport() gives us a HTTP server instance.
645
694
url = self.get_readonly_url()
646
695
url2 = self.get_readonly_url('foo/bar')
647
696
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
697
t = transport.get_transport_from_url(url)
698
t2 = transport.get_transport_from_url(url2)
699
self.assertIsInstance(t, HttpTransport)
700
self.assertIsInstance(t2, HttpTransport)
652
701
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
703
def test_is_directory(self):
743
792
r"^ +[0-9]+ms\*$")
745
794
def test_unittest_reporting_unittest_class(self):
746
# getting the time from a non-bzrlib test works ok
795
# getting the time from a non-breezy test works ok
747
796
class ShortDelayTestCase(unittest.TestCase):
748
797
def test_short_delay(self):
749
798
time.sleep(0.003)
750
799
self.check_timing(ShortDelayTestCase('test_short_delay'),
753
def _patch_get_bzr_source_tree(self):
754
# Reading from the actual source tree breaks isolation, but we don't
755
# want to assume that thats *all* that would happen.
756
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
758
def test_assigned_benchmark_file_stores_date(self):
759
self._patch_get_bzr_source_tree()
761
result = bzrlib.tests.TextTestResult(self._log_file,
766
output_string = output.getvalue()
767
# if you are wondering about the regexp please read the comment in
768
# test_bench_history (bzrlib.tests.test_selftest.TestRunner)
769
# XXX: what comment? -- Andrew Bennetts
770
self.assertContainsRe(output_string, "--date [0-9.]+")
772
def test_benchhistory_records_test_times(self):
773
self._patch_get_bzr_source_tree()
774
result_stream = StringIO()
775
result = bzrlib.tests.TextTestResult(
779
bench_history=result_stream
782
# we want profile a call and check that its test duration is recorded
783
# make a new test instance that when run will generate a benchmark
784
example_test_case = TestTestResult("_time_hello_world_encoding")
785
# execute the test, which should succeed and record times
786
example_test_case.run(result)
787
lines = result_stream.getvalue().splitlines()
788
self.assertEqual(2, len(lines))
789
self.assertContainsRe(lines[1],
790
" *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
791
"._time_hello_world_encoding")
793
802
def _time_hello_world_encoding(self):
794
803
"""Profile two sleep calls
796
805
This is used to exercise the test framework.
798
self.time(unicode, 'hello', errors='replace')
799
self.time(unicode, 'world', errors='replace')
807
self.time(str, b'hello', errors='replace')
808
self.time(str, b'world', errors='replace')
801
810
def test_lsprofiling(self):
802
811
"""Verbose test result prints lsprof statistics from test cases."""
803
self.requireFeature(test_lsprof.LSProfFeature)
812
self.requireFeature(features.lsprof_feature)
804
813
result_stream = StringIO()
805
result = bzrlib.tests.VerboseTestResult(
806
unittest._WritelnDecorator(result_stream),
814
result = breezy.tests.VerboseTestResult(
827
836
# this should appear in the output stream of our test result.
828
837
output = result_stream.getvalue()
829
838
self.assertContainsRe(output,
830
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
831
self.assertContainsRe(output,
832
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
833
self.assertContainsRe(output,
834
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
835
self.assertContainsRe(output,
836
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
839
r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
840
self.assertContainsRe(output,
841
r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
842
self.assertContainsRe(output,
843
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
844
self.assertContainsRe(output,
845
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
847
def test_uses_time_from_testtools(self):
848
"""Test case timings in verbose results should use testtools times"""
851
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
852
def startTest(self, test):
853
self.time(datetime.datetime.utcfromtimestamp(1.145))
854
super(TimeAddedVerboseTestResult, self).startTest(test)
856
def addSuccess(self, test):
857
self.time(datetime.datetime.utcfromtimestamp(51.147))
858
super(TimeAddedVerboseTestResult, self).addSuccess(test)
860
def report_tests_starting(self): pass
862
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
863
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
838
865
def test_known_failure(self):
839
"""A KnownFailure being raised should trigger several result actions."""
866
"""Using knownFailure should trigger several result actions."""
840
867
class InstrumentedTestResult(tests.ExtendedTestResult):
841
868
def stopTestRun(self): pass
842
def startTests(self): pass
843
def report_test_start(self, test): pass
870
def report_tests_starting(self): pass
844
872
def report_known_failure(self, test, err=None, details=None):
845
873
self._call = test, 'known failure'
846
874
result = InstrumentedTestResult(None, None, None, None)
847
876
class Test(tests.TestCase):
848
877
def test_function(self):
849
raise tests.KnownFailure('failed!')
878
self.knownFailure('failed!')
850
879
test = Test("test_function")
852
881
# it should invoke 'report_known_failure'.
1193
def _patch_get_bzr_source_tree(self):
1194
# Reading from the actual source tree breaks isolation, but we don't
1195
# want to assume that thats *all* that would happen.
1196
self._get_source_tree_calls = []
1198
self._get_source_tree_calls.append("called")
1200
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', new_get)
1202
def test_bench_history(self):
1203
# tests that the running the benchmark passes bench_history into
1204
# the test result object. We can tell that happens if
1205
# _get_bzr_source_tree is called.
1206
self._patch_get_bzr_source_tree()
1207
test = TestRunner('dummy_test')
1209
runner = tests.TextTestRunner(stream=self._log_file,
1210
bench_history=output)
1211
result = self.run_test_runner(runner, test)
1212
output_string = output.getvalue()
1213
self.assertContainsRe(output_string, "--date [0-9.]+")
1214
self.assertLength(1, self._get_source_tree_calls)
1249
def test_verbose_test_count(self):
1250
"""A verbose test run reports the right test count at the start"""
1251
suite = TestUtil.TestSuite([
1252
unittest.FunctionTestCase(lambda:None),
1253
unittest.FunctionTestCase(lambda:None)])
1254
self.assertEqual(suite.countTestCases(), 2)
1256
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1257
# Need to use the CountingDecorator as that's what sets num_tests
1258
self.run_test_runner(runner, tests.CountingDecorator(suite))
1259
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1216
1261
def test_startTestRun(self):
1217
1262
"""run should call result.startTestRun()"""
1219
class LoggingDecorator(tests.ForwardingResult):
1265
class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1266
def startTestRun(self):
1221
tests.ForwardingResult.startTestRun(self)
1267
ExtendedToOriginalDecorator.startTestRun(self)
1222
1268
calls.append('startTestRun')
1223
test = unittest.FunctionTestCase(lambda:None)
1269
test = unittest.FunctionTestCase(lambda: None)
1224
1270
stream = StringIO()
1225
1271
runner = tests.TextTestRunner(stream=stream,
1226
result_decorators=[LoggingDecorator])
1227
result = self.run_test_runner(runner, test)
1272
result_decorators=[LoggingDecorator])
1273
self.run_test_runner(runner, test)
1228
1274
self.assertLength(1, calls)
1230
1276
def test_stopTestRun(self):
1231
1277
"""run should call result.stopTestRun()"""
1233
class LoggingDecorator(tests.ForwardingResult):
1280
class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1281
def stopTestRun(self):
1235
tests.ForwardingResult.stopTestRun(self)
1282
ExtendedToOriginalDecorator.stopTestRun(self)
1236
1283
calls.append('stopTestRun')
1237
test = unittest.FunctionTestCase(lambda:None)
1284
test = unittest.FunctionTestCase(lambda: None)
1238
1285
stream = StringIO()
1239
1286
runner = tests.TextTestRunner(stream=stream,
1240
result_decorators=[LoggingDecorator])
1241
result = self.run_test_runner(runner, test)
1287
result_decorators=[LoggingDecorator])
1288
self.run_test_runner(runner, test)
1242
1289
self.assertLength(1, calls)
1291
def test_unicode_test_output_on_ascii_stream(self):
1292
"""Showing results should always succeed even on an ascii console"""
1293
class FailureWithUnicode(tests.TestCase):
1294
def test_log_unicode(self):
1296
self.fail("Now print that log!")
1298
out = TextIOWrapper(bio, 'ascii', 'backslashreplace')
1299
self.overrideAttr(osutils, "get_terminal_encoding",
1300
lambda trace=False: "ascii")
1301
self.run_test_runner(
1302
tests.TextTestRunner(stream=out),
1303
FailureWithUnicode("test_log_unicode"))
1305
self.assertContainsRe(bio.getvalue(),
1306
b"(?:Text attachment: )?log"
1308
b"\\d+\\.\\d+ \\\\u2606"
1309
b"(?:\n-+\n|}}}\n)")
1245
1312
class SampleTestCase(tests.TestCase):
1247
1314
def _test_pass(self):
1250
1318
class _TestException(Exception):
1254
1322
class TestTestCase(tests.TestCase):
1255
"""Tests that test the core bzrlib TestCase."""
1323
"""Tests that test the core breezy TestCase."""
1257
1325
def test_assertLength_matches_empty(self):
1633
1726
obj.test_attr = 'modified'
1634
1727
self.assertEqual('modified', obj.test_attr)
1636
test = Test('test_value')
1637
test.run(unittest.TestResult())
1729
self._run_successful_test(Test('test_value'))
1638
1730
self.assertEqual('original', obj.test_attr)
1640
1732
def test_overrideAttr_with_value(self):
1641
self.test_attr = 'original' # Define a test attribute
1642
obj = self # Make 'obj' visible to the embedded test
1733
self.test_attr = 'original' # Define a test attribute
1734
obj = self # Make 'obj' visible to the embedded test
1643
1736
class Test(tests.TestCase):
1645
1738
def setUp(self):
1646
tests.TestCase.setUp(self)
1739
super(Test, self).setUp()
1647
1740
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1742
def test_value(self):
1650
1743
self.assertEqual('original', self.orig)
1651
1744
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1746
self._run_successful_test(Test('test_value'))
1655
1747
self.assertEqual('original', obj.test_attr)
1749
def test_overrideAttr_with_no_existing_value_and_value(self):
1750
# Do not define the test_attribute
1751
obj = self # Make 'obj' visible to the embedded test
1753
class Test(tests.TestCase):
1756
tests.TestCase.setUp(self)
1757
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1759
def test_value(self):
1760
self.assertEqual(tests._unitialized_attr, self.orig)
1761
self.assertEqual('modified', obj.test_attr)
1763
self._run_successful_test(Test('test_value'))
1764
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1766
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1767
# Do not define the test_attribute
1768
obj = self # Make 'obj' visible to the embedded test
1770
class Test(tests.TestCase):
1773
tests.TestCase.setUp(self)
1774
self.orig = self.overrideAttr(obj, 'test_attr')
1776
def test_value(self):
1777
self.assertEqual(tests._unitialized_attr, self.orig)
1778
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1780
self._run_successful_test(Test('test_value'))
1781
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1783
def test_recordCalls(self):
1784
from breezy.tests import test_selftest
1785
calls = self.recordCalls(
1786
test_selftest, '_add_numbers')
1787
self.assertEqual(test_selftest._add_numbers(2, 10),
1789
self.assertEqual(calls, [((2, 10), {})])
1792
def _add_numbers(a, b):
1796
class _MissingFeature(features.Feature):
1801
missing_feature = _MissingFeature()
1804
def _get_test(name):
1805
"""Get an instance of a specific example test.
1807
We protect this in a function so that they don't auto-run in the test
1811
class ExampleTests(tests.TestCase):
1813
def test_fail(self):
1814
mutter('this was a failing test')
1815
self.fail('this test will fail')
1817
def test_error(self):
1818
mutter('this test errored')
1819
raise RuntimeError('gotcha')
1821
def test_missing_feature(self):
1822
mutter('missing the feature')
1823
self.requireFeature(missing_feature)
1825
def test_skip(self):
1826
mutter('this test will be skipped')
1827
raise tests.TestSkipped('reason')
1829
def test_success(self):
1830
mutter('this test succeeds')
1832
def test_xfail(self):
1833
mutter('test with expected failure')
1834
self.knownFailure('this_fails')
1836
def test_unexpected_success(self):
1837
mutter('test with unexpected success')
1838
self.expectFailure('should_fail', lambda: None)
1840
return ExampleTests(name)
1843
class TestTestCaseLogDetails(tests.TestCase):
1845
def _run_test(self, test_name):
1846
test = _get_test(test_name)
1847
result = testtools.TestResult()
1851
def test_fail_has_log(self):
1852
result = self._run_test('test_fail')
1853
self.assertEqual(1, len(result.failures))
1854
result_content = result.failures[0][1]
1855
self.assertContainsRe(result_content,
1856
'(?m)^(?:Text attachment: )?log(?:$|: )')
1857
self.assertContainsRe(result_content, 'this was a failing test')
1859
def test_error_has_log(self):
1860
result = self._run_test('test_error')
1861
self.assertEqual(1, len(result.errors))
1862
result_content = result.errors[0][1]
1863
self.assertContainsRe(result_content,
1864
'(?m)^(?:Text attachment: )?log(?:$|: )')
1865
self.assertContainsRe(result_content, 'this test errored')
1867
def test_skip_has_no_log(self):
1868
result = self._run_test('test_skip')
1869
reasons = result.skip_reasons
1870
self.assertEqual({'reason'}, set(reasons))
1871
skips = reasons['reason']
1872
self.assertEqual(1, len(skips))
1874
self.assertFalse('log' in test.getDetails())
1876
def test_missing_feature_has_no_log(self):
1877
# testtools doesn't know about addNotSupported, so it just gets
1878
# considered as a skip
1879
result = self._run_test('test_missing_feature')
1880
reasons = result.skip_reasons
1881
self.assertEqual({str(missing_feature)}, set(reasons))
1882
skips = reasons[str(missing_feature)]
1883
self.assertEqual(1, len(skips))
1885
self.assertFalse('log' in test.getDetails())
1887
def test_xfail_has_no_log(self):
1888
result = self._run_test('test_xfail')
1889
self.assertEqual(1, len(result.expectedFailures))
1890
result_content = result.expectedFailures[0][1]
1891
self.assertNotContainsRe(result_content,
1892
'(?m)^(?:Text attachment: )?log(?:$|: )')
1893
self.assertNotContainsRe(result_content, 'test with expected failure')
1895
def test_unexpected_success_has_log(self):
1896
result = self._run_test('test_unexpected_success')
1897
self.assertEqual(1, len(result.unexpectedSuccesses))
1898
# Inconsistency, unexpectedSuccesses is a list of tests,
1899
# expectedFailures is a list of reasons?
1900
test = result.unexpectedSuccesses[0]
1901
details = test.getDetails()
1902
self.assertTrue('log' in details)
1905
class TestTestCloning(tests.TestCase):
1906
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1908
def test_cloned_testcase_does_not_share_details(self):
1909
"""A TestCase cloned with clone_test does not share mutable attributes
1910
such as details or cleanups.
1912
class Test(tests.TestCase):
1914
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1915
orig_test = Test('test_foo')
1916
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1917
orig_test.run(unittest.TestResult())
1918
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1919
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1921
def test_double_apply_scenario_preserves_first_scenario(self):
1922
"""Applying two levels of scenarios to a test preserves the attributes
1923
added by both scenarios.
1925
class Test(tests.TestCase):
1928
test = Test('test_foo')
1929
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1930
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1931
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1932
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1933
all_tests = list(tests.iter_suite_tests(suite))
1934
self.assertLength(4, all_tests)
1935
all_xys = sorted((t.x, t.y) for t in all_tests)
1936
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1939
# NB: Don't delete this; it's not actually from 0.11!
1659
1940
@deprecated_function(deprecated_in((0, 11, 0)))
1721
2008
sample_object = ApplyDeprecatedHelper()
1722
2009
# calling an undeprecated callable raises an assertion
1723
2010
self.assertRaises(AssertionError, self.applyDeprecated,
1724
deprecated_in((0, 11, 0)),
1725
sample_object.sample_normal_method)
2011
deprecated_in((0, 11, 0)),
2012
sample_object.sample_normal_method)
1726
2013
self.assertRaises(AssertionError, self.applyDeprecated,
1727
deprecated_in((0, 11, 0)),
1728
sample_undeprecated_function, "a param value")
2014
deprecated_in((0, 11, 0)),
2015
sample_undeprecated_function, "a param value")
1729
2016
# calling a deprecated callable (function or method) with the wrong
1730
2017
# expected deprecation fails.
1731
2018
self.assertRaises(AssertionError, self.applyDeprecated,
1732
deprecated_in((0, 10, 0)),
1733
sample_object.sample_deprecated_method, "a param value")
2019
deprecated_in((0, 10, 0)),
2020
sample_object.sample_deprecated_method,
1734
2022
self.assertRaises(AssertionError, self.applyDeprecated,
1735
deprecated_in((0, 10, 0)),
1736
sample_deprecated_function)
2023
deprecated_in((0, 10, 0)),
2024
sample_deprecated_function)
1737
2025
# calling a deprecated callable (function or method) with the right
1738
2026
# expected deprecation returns the functions result.
1739
self.assertEqual("a param value",
1740
self.applyDeprecated(deprecated_in((0, 11, 0)),
1741
sample_object.sample_deprecated_method, "a param value"))
2029
self.applyDeprecated(
2030
deprecated_in((0, 11, 0)),
2031
sample_object.sample_deprecated_method, "a param value"))
1742
2032
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
1743
sample_deprecated_function))
2033
sample_deprecated_function))
1744
2034
# calling a nested deprecation with the wrong deprecation version
1745
2035
# fails even if a deeper nested function was deprecated with the
1746
2036
# supplied version.
1747
self.assertRaises(AssertionError, self.applyDeprecated,
2038
AssertionError, self.applyDeprecated,
1748
2039
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
1749
2040
# calling a nested deprecation with the right deprecation value
1750
2041
# returns the calls result.
1751
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
1752
sample_object.sample_nested_deprecation))
2043
2, self.applyDeprecated(
2044
deprecated_in((0, 10, 0)),
2045
sample_object.sample_nested_deprecation))
1754
2047
def test_callDeprecated(self):
1755
2048
def testfunc(be_deprecated, result=None):
1806
2097
self.transport_server = test_server.FakeVFATServer
1807
2098
self.assertFalse(self.get_url('t1').startswith('file://'))
1808
2099
tree = self.make_branch_and_tree('t1')
1809
base = tree.bzrdir.root_transport.base
2100
base = tree.controldir.root_transport.base
1810
2101
self.assertStartsWith(base, 'file://')
1811
self.assertEquals(tree.bzrdir.root_transport,
1812
tree.branch.bzrdir.root_transport)
1813
self.assertEquals(tree.bzrdir.root_transport,
1814
tree.branch.repository.bzrdir.root_transport)
1817
class SelfTestHelper:
2102
self.assertEqual(tree.controldir.root_transport,
2103
tree.branch.controldir.root_transport)
2104
self.assertEqual(tree.controldir.root_transport,
2105
tree.branch.repository.controldir.root_transport)
2108
class SelfTestHelper(object):
1819
2110
def run_selftest(self, **kwargs):
1820
2111
"""Run selftest returning its output."""
1822
old_transport = bzrlib.tests.default_transport
2113
output = TextIOWrapper(bio, 'utf-8')
2114
old_transport = breezy.tests.default_transport
1823
2115
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
1824
2116
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
1826
2118
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
1828
bzrlib.tests.default_transport = old_transport
2120
breezy.tests.default_transport = old_transport
1829
2121
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
1834
2128
class TestSelftest(tests.TestCase, SelfTestHelper):
1835
"""Tests of bzrlib.tests.selftest."""
2129
"""Tests of breezy.tests.selftest."""
1837
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
2131
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(
1838
2133
factory_called = []
1840
2136
factory_called.append(True)
1841
2137
return TestUtil.TestSuite()
1842
2138
out = StringIO()
1843
2139
err = StringIO()
1844
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
1845
test_suite_factory=factory)
2140
self.apply_redirected(out, err, None, breezy.tests.selftest,
2141
test_suite_factory=factory)
1846
2142
self.assertEqual([True], factory_called)
1848
2144
def factory(self):
1849
2145
"""A test suite factory."""
1850
2146
class Test(tests.TestCase):
2148
return __name__ + ".Test." + self._testMethodName
1857
2158
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
1859
2160
def test_list_only(self):
1860
2161
output = self.run_selftest(test_suite_factory=self.factory,
1862
2163
self.assertEqual(3, len(output.readlines()))
1864
2165
def test_list_only_filtered(self):
1865
2166
output = self.run_selftest(test_suite_factory=self.factory,
1866
list_only=True, pattern="Test.b")
1867
self.assertEndsWith(output.getvalue(), "Test.b\n")
2167
list_only=True, pattern="Test.b")
2168
self.assertEndsWith(output.getvalue(), b"Test.b\n")
1868
2169
self.assertLength(1, output.readlines())
1870
2171
def test_list_only_excludes(self):
1871
2172
output = self.run_selftest(test_suite_factory=self.factory,
1872
list_only=True, exclude_pattern="Test.b")
1873
self.assertNotContainsRe("Test.b", output.getvalue())
2173
list_only=True, exclude_pattern="Test.b")
2174
self.assertNotContainsRe(b"Test.b", output.getvalue())
1874
2175
self.assertLength(2, output.readlines())
1876
2177
def test_lsprof_tests(self):
1877
self.requireFeature(test_lsprof.LSProfFeature)
2178
self.requireFeature(features.lsprof_feature)
1879
2181
class Test(object):
1880
2182
def __call__(test, result):
1881
2183
test.run(result)
1882
2185
def run(test, result):
1883
self.assertIsInstance(result, tests.ForwardingResult)
1884
calls.append("called")
2186
results.append(result)
1885
2188
def countTestCases(self):
1887
2190
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
1888
self.assertLength(1, calls)
2191
self.assertLength(1, results)
2192
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
1890
2194
def test_random(self):
1891
2195
# test randomising by listing a number of tests.
1892
2196
output_123 = self.run_selftest(test_suite_factory=self.factory,
1893
list_only=True, random_seed="123")
2197
list_only=True, random_seed="123")
1894
2198
output_234 = self.run_selftest(test_suite_factory=self.factory,
1895
list_only=True, random_seed="234")
2199
list_only=True, random_seed="234")
1896
2200
self.assertNotEqual(output_123, output_234)
1897
2201
# "Randominzing test order..\n\n
1898
2202
self.assertLength(5, output_123.readlines())
1967
2278
def test_load_unknown(self):
1968
2279
# Provide a list with one test - this test.
1969
2280
# And generate a list of the tests in the suite.
1970
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
1971
load_list='missing file name', list_only=True)
2281
self.assertRaises(errors.NoSuchFile, self.run_selftest,
2282
load_list='missing file name', list_only=True)
2285
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2287
_test_needs_features = [features.subunit]
2289
def run_subunit_stream(self, test_name):
2290
from subunit import ProtocolTestCase
2293
return TestUtil.TestSuite([_get_test(test_name)])
2294
stream = self.run_selftest(
2295
runner_class=tests.SubUnitBzrRunnerv1,
2296
test_suite_factory=factory)
2297
test = ProtocolTestCase(stream)
2298
result = testtools.TestResult()
2300
content = stream.getvalue()
2301
return content, result
2303
def test_fail_has_log(self):
2304
content, result = self.run_subunit_stream('test_fail')
2305
self.assertEqual(1, len(result.failures))
2306
self.assertContainsRe(content, b'(?m)^log$')
2307
self.assertContainsRe(content, b'this test will fail')
2309
def test_error_has_log(self):
2310
content, result = self.run_subunit_stream('test_error')
2311
self.assertContainsRe(content, b'(?m)^log$')
2312
self.assertContainsRe(content, b'this test errored')
2314
def test_skip_has_no_log(self):
2315
content, result = self.run_subunit_stream('test_skip')
2316
self.assertNotContainsRe(content, b'(?m)^log$')
2317
self.assertNotContainsRe(content, b'this test will be skipped')
2318
reasons = result.skip_reasons
2319
self.assertEqual({'reason'}, set(reasons))
2320
skips = reasons['reason']
2321
self.assertEqual(1, len(skips))
2323
# RemotedTestCase doesn't preserve the "details"
2324
# self.assertFalse('log' in test.getDetails())
2326
def test_missing_feature_has_no_log(self):
2327
content, result = self.run_subunit_stream('test_missing_feature')
2328
self.assertNotContainsRe(content, b'(?m)^log$')
2329
self.assertNotContainsRe(content, b'missing the feature')
2330
reasons = result.skip_reasons
2331
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2332
skips = reasons['_MissingFeature\n']
2333
self.assertEqual(1, len(skips))
2335
# RemotedTestCase doesn't preserve the "details"
2336
# self.assertFalse('log' in test.getDetails())
2338
def test_xfail_has_no_log(self):
2339
content, result = self.run_subunit_stream('test_xfail')
2340
self.assertNotContainsRe(content, b'(?m)^log$')
2341
self.assertNotContainsRe(content, b'test with expected failure')
2342
self.assertEqual(1, len(result.expectedFailures))
2343
result_content = result.expectedFailures[0][1]
2344
self.assertNotContainsRe(result_content,
2345
'(?m)^(?:Text attachment: )?log(?:$|: )')
2346
self.assertNotContainsRe(result_content, 'test with expected failure')
2348
def test_unexpected_success_has_log(self):
2349
content, result = self.run_subunit_stream('test_unexpected_success')
2350
self.assertContainsRe(content, b'(?m)^log$')
2351
self.assertContainsRe(content, b'test with unexpected success')
2352
self.assertEqual(1, len(result.unexpectedSuccesses))
2353
# test = result.unexpectedSuccesses[0]
2354
# RemotedTestCase doesn't preserve the "details"
2355
# self.assertTrue('log' in test.getDetails())
2357
def test_success_has_no_log(self):
2358
content, result = self.run_subunit_stream('test_success')
2359
self.assertEqual(1, result.testsRun)
2360
self.assertNotContainsRe(content, b'(?m)^log$')
2361
self.assertNotContainsRe(content, b'this test succeeds')
1974
2364
class TestRunBzr(tests.TestCase):
1979
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2370
def _run_bzr_core(self, argv, encoding=None, stdin=None,
2371
stdout=None, stderr=None, working_dir=None):
1981
2372
"""Override _run_bzr_core to test how it is invoked by run_bzr.
1983
2374
Attempts to run bzr from inside this class don't actually run it.
2173
2548
self.next_subprocess = process
2175
2550
result = self.run_bzr_subprocess(*args, **kwargs)
2551
except BaseException:
2177
2552
self.next_subprocess = None
2178
for key, expected in expected_args.iteritems():
2553
for key, expected in expected_args.items():
2179
2554
self.assertEqual(expected, self.subprocess_calls[-1][key])
2182
2557
self.next_subprocess = None
2183
for key, expected in expected_args.iteritems():
2558
for key, expected in expected_args.items():
2184
2559
self.assertEqual(expected, self.subprocess_calls[-1][key])
2187
2562
def test_run_bzr_subprocess(self):
2188
2563
"""The run_bzr_helper_external command behaves nicely."""
2189
self.assertRunBzrSubprocess({'process_args':['--version']},
2190
StubProcess(), '--version')
2191
self.assertRunBzrSubprocess({'process_args':['--version']},
2192
StubProcess(), ['--version'])
2564
self.assertRunBzrSubprocess({'process_args': ['--version']},
2565
StubProcess(), '--version')
2566
self.assertRunBzrSubprocess({'process_args': ['--version']},
2567
StubProcess(), ['--version'])
2193
2568
# retcode=None disables retcode checking
2194
result = self.assertRunBzrSubprocess({},
2195
StubProcess(retcode=3), '--version', retcode=None)
2196
result = self.assertRunBzrSubprocess({},
2197
StubProcess(out="is free software"), '--version')
2569
result = self.assertRunBzrSubprocess(
2570
{}, StubProcess(retcode=3), '--version', retcode=None)
2571
result = self.assertRunBzrSubprocess(
2572
{}, StubProcess(out="is free software"), '--version')
2198
2573
self.assertContainsRe(result[0], 'is free software')
2199
2574
# Running a subcommand that is missing errors
2200
2575
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2201
{'process_args':['--versionn']}, StubProcess(retcode=3),
2576
{'process_args': ['--versionn']
2577
}, StubProcess(retcode=3),
2203
2579
# Unless it is told to expect the error from the subprocess
2204
result = self.assertRunBzrSubprocess({},
2205
StubProcess(retcode=3), '--versionn', retcode=3)
2580
result = self.assertRunBzrSubprocess(
2581
{}, StubProcess(retcode=3), '--versionn', retcode=3)
2206
2582
# Or to ignore retcode checking
2207
result = self.assertRunBzrSubprocess({},
2208
StubProcess(err="unknown command", retcode=3), '--versionn',
2583
result = self.assertRunBzrSubprocess(
2584
{}, StubProcess(err="unknown command", retcode=3),
2585
'--versionn', retcode=None)
2210
2586
self.assertContainsRe(result[1], 'unknown command')
2212
2588
def test_env_change_passes_through(self):
2213
2589
self.assertRunBzrSubprocess(
2214
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2590
{'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}},
2215
2591
StubProcess(), '',
2216
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2592
env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None})
2218
2594
def test_no_working_dir_passed_as_None(self):
2219
2595
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2221
2597
def test_no_working_dir_passed_through(self):
2222
2598
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2225
2601
def test_run_bzr_subprocess_no_plugins(self):
2226
2602
self.assertRunBzrSubprocess({'allow_plugins': False},
2229
2605
def test_allow_plugins(self):
2230
2606
self.assertRunBzrSubprocess({'allow_plugins': True},
2231
StubProcess(), '', allow_plugins=True)
2607
StubProcess(), '', allow_plugins=True)
2234
2610
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2264
2640
class TestStartBzrSubProcess(tests.TestCase):
2641
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2643
def _subprocess_log_cleanup(self):
2644
"""Inhibits the base version as we don't produce a log file."""
2269
2646
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2647
"""Override the base version to record the command that is run.
2649
From there we can ensure it is correct without spawning a real process.
2271
2651
self.check_popen_state()
2272
2652
self._popen_args = args
2273
2653
self._popen_kwargs = kwargs
2274
2654
raise _DontSpawnProcess()
2656
def check_popen_state(self):
2657
"""Replace to make assertions when popen is called."""
2276
2659
def test_run_bzr_subprocess_no_plugins(self):
2277
2660
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2661
command = self._popen_args[0]
2279
2662
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2663
self.assertEqual(self.get_brz_path(), command[1])
2281
2664
self.assertEqual(['--no-plugins'], command[2:])
2283
2666
def test_allow_plugins(self):
2284
2667
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2669
command = self._popen_args[0]
2287
2670
self.assertEqual([], command[2:])
2289
2672
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2673
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2674
# set in the child
2292
2676
def check_environment():
2293
2677
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2678
self.check_popen_state = check_environment
2295
2679
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2680
env_changes={'EXISTANT_ENV_VAR': 'set variable'})
2297
2681
# not set in theparent
2298
2682
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2684
def test_run_bzr_subprocess_env_del(self):
2301
2685
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2686
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2688
def check_environment():
2304
2689
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2690
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2691
self.check_popen_state = check_environment
2307
2692
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2693
env_changes={'EXISTANT_ENV_VAR': None})
2309
2694
# Still set in parent
2310
2695
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2696
del os.environ['EXISTANT_ENV_VAR']
2313
2698
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2699
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2701
def check_environment():
2316
2702
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2703
self.check_popen_state = check_environment
2318
2704
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2705
env_changes={'NON_EXISTANT_ENV_VAR': None})
2321
2707
def test_working_dir(self):
2322
2708
"""Test that we can specify the working dir for the child"""
2323
orig_getcwd = osutils.getcwd
2324
orig_chdir = os.chdir
2326
2711
def chdir(path):
2327
2712
chdirs.append(path)
2332
osutils.getcwd = getcwd
2334
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2337
osutils.getcwd = orig_getcwd
2339
os.chdir = orig_chdir
2713
self.overrideAttr(os, 'chdir', chdir)
2717
self.overrideAttr(osutils, 'getcwd', getcwd)
2718
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2340
2720
self.assertEqual(['foo', 'current'], chdirs)
2722
def test_get_brz_path_with_cwd_breezy(self):
2723
self.get_source_path = lambda: ""
2724
self.overrideAttr(os.path, "isfile", lambda path: True)
2725
self.assertEqual(self.get_brz_path(), "brz")
2343
2728
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2729
"""Tests that really need to do things with an external bzr."""
2350
2735
self.disable_missing_extensions_warning()
2351
2736
process = self.start_bzr_subprocess(['wait-until-signalled'],
2352
2737
skip_if_plan_to_signal=True)
2353
self.assertEqual('running\n', process.stdout.readline())
2738
self.assertEqual(b'running\n', process.stdout.readline())
2354
2739
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2741
self.assertEqual(b'', result[0])
2742
self.assertEqual(b'brz: interrupted\n', result[1])
2434
2745
class TestSelftestFiltering(tests.TestCase):
2436
2747
def setUp(self):
2437
tests.TestCase.setUp(self)
2748
super(TestSelftestFiltering, self).setUp()
2438
2749
self.suite = TestUtil.TestSuite()
2439
2750
self.loader = TestUtil.TestLoader()
2440
2751
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2752
sys.modules['breezy.tests.test_selftest']))
2442
2753
self.all_names = _test_ids(self.suite)
2444
2755
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2446
'test_condition_id_re')
2756
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2757
'test_condition_id_re')
2447
2758
filtered_suite = tests.filter_suite_by_condition(
2448
2759
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2760
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2762
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2763
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2453
2764
'test_condition_id_in_list']
2454
2765
id_list = tests.TestIdList(test_names)
2455
2766
filtered_suite = tests.filter_suite_by_condition(
2492
2803
self.all_names = _test_ids(self.suite)
2493
2804
filtered_suite = tests.exclude_tests_by_re(self.suite,
2494
2805
'exclude_tests_by_re')
2495
excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2496
'test_exclude_tests_by_re')
2806
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2807
'test_exclude_tests_by_re')
2497
2808
self.assertEqual(len(self.all_names) - 1,
2498
filtered_suite.countTestCases())
2809
filtered_suite.countTestCases())
2499
2810
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2500
2811
remaining_names = list(self.all_names)
2501
2812
remaining_names.remove(excluded_name)
2502
2813
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2504
2815
def test_filter_suite_by_condition(self):
2505
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2506
'test_filter_suite_by_condition')
2507
filtered_suite = tests.filter_suite_by_condition(self.suite,
2508
lambda x:x.id() == test_name)
2816
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2817
'test_filter_suite_by_condition')
2818
filtered_suite = tests.filter_suite_by_condition(
2819
self.suite, lambda x: x.id() == test_name)
2509
2820
self.assertEqual([test_name], _test_ids(filtered_suite))
2511
2822
def test_filter_suite_by_re(self):
2512
2823
filtered_suite = tests.filter_suite_by_re(self.suite,
2513
2824
'test_filter_suite_by_r')
2514
2825
filtered_names = _test_ids(filtered_suite)
2515
self.assertEqual(filtered_names, ['bzrlib.tests.test_selftest.'
2516
'TestSelftestFiltering.test_filter_suite_by_re'])
2827
filtered_names, ['breezy.tests.test_selftest.'
2828
'TestSelftestFiltering.test_filter_suite_by_re'])
2518
2830
def test_filter_suite_by_id_list(self):
2519
test_list = ['bzrlib.tests.test_selftest.'
2831
test_list = ['breezy.tests.test_selftest.'
2520
2832
'TestSelftestFiltering.test_filter_suite_by_id_list']
2521
2833
filtered_suite = tests.filter_suite_by_id_list(
2522
2834
self.suite, tests.TestIdList(test_list))
2523
2835
filtered_names = _test_ids(filtered_suite)
2524
2836
self.assertEqual(
2525
2837
filtered_names,
2526
['bzrlib.tests.test_selftest.'
2838
['breezy.tests.test_selftest.'
2527
2839
'TestSelftestFiltering.test_filter_suite_by_id_list'])
2529
2841
def test_filter_suite_by_id_startswith(self):
2530
2842
# By design this test may fail if another test is added whose name also
2531
2843
# begins with one of the start value used.
2532
klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
2844
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2533
2845
start1 = klass + 'test_filter_suite_by_id_starts'
2534
2846
start2 = klass + 'test_filter_suite_by_id_li'
2535
2847
test_list = [klass + 'test_filter_suite_by_id_list',
2778
3095
self.assertEqual([], test_list)
2780
3097
self.assertSubset([
2785
3102
def test_test_suite(self):
2786
3103
# test_suite() loads the entire test suite to operate. To avoid this
2787
3104
# overhead, and yet still be confident that things are happening,
2788
# we temporarily replace two functions used by test_suite with
3105
# we temporarily replace two functions used by test_suite with
2789
3106
# test doubles that supply a few sample tests to load, and check they
2792
3110
def testmod_names():
2793
3111
calls.append("testmod_names")
2795
'bzrlib.tests.blackbox.test_branch',
2796
'bzrlib.tests.per_transport',
2797
'bzrlib.tests.test_selftest',
3113
'breezy.tests.blackbox.test_branch',
3114
'breezy.tests.per_transport',
3115
'breezy.tests.test_selftest',
2799
3117
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
2800
3119
def doctests():
2801
3120
calls.append("modules_to_doctest")
2802
3121
if __doc__ is None:
2804
return ['bzrlib.timestamp']
3123
return ['breezy.timestamp']
2805
3124
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
2806
3125
expected_test_list = [
2807
3126
# testmod_names
2808
'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2809
('bzrlib.tests.per_transport.TransportTests'
3127
'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
3128
('breezy.tests.per_transport.TransportTests'
2810
3129
'.test_abspath(LocalTransport,LocalURLServer)'),
2811
'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
3130
'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
2812
3131
# plugins can't be tested that way since selftest may be run with
2815
if __doc__ is not None:
2816
expected_test_list.extend([
2817
# modules_to_doctest
2818
'bzrlib.timestamp.format_highres_date',
2820
3134
suite = tests.test_suite()
2821
self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
3135
self.assertEqual({"testmod_names", "modules_to_doctest"}, set(calls))
2823
3136
self.assertSubset(expected_test_list, _test_ids(suite))
2825
3138
def test_test_suite_list_and_start(self):
2826
# We cannot test this at the same time as the main load, because we want
2827
# to know that starting_with == None works. So a second load is
2828
# incurred - note that the starting_with parameter causes a partial load
2829
# rather than a full load so this test should be pretty quick.
2830
test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
3139
# We cannot test this at the same time as the main load, because we
3140
# want to know that starting_with == None works. So a second load is
3141
# incurred - note that the starting_with parameter causes a partial
3142
# load rather than a full load so this test should be pretty quick.
3144
'breezy.tests.test_selftest.TestTestSuite.test_test_suite']
2831
3145
suite = tests.test_suite(test_list,
2832
['bzrlib.tests.test_selftest.TestTestSuite'])
2833
# test_test_suite_list_and_start is not included
2834
self.assertEquals(test_list, _test_ids(suite))
3146
['breezy.tests.test_selftest.TestTestSuite'])
3147
# test_test_suite_list_and_start is not included
3148
self.assertEqual(test_list, _test_ids(suite))
2837
3151
class TestLoadTestIdList(tests.TestCaseInTempDir):
2943
3257
def test_resolve_prefix(self):
2944
3258
tpr = self._get_registry()
2945
3259
tpr.register('bar', 'bb.aa.rr')
2946
self.assertEquals('bb.aa.rr', tpr.resolve_alias('bar'))
3260
self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
2948
3262
def test_resolve_unknown_alias(self):
2949
3263
tpr = self._get_registry()
2950
self.assertRaises(errors.BzrCommandError,
3264
self.assertRaises(errors.CommandError,
2951
3265
tpr.resolve_alias, 'I am not a prefix')
2953
3267
def test_predefined_prefixes(self):
2954
3268
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3269
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3270
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3271
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3272
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3273
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3274
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3277
class TestThreadLeakDetection(tests.TestCase):
3278
"""Ensure when tests leak threads we detect and report it"""
3280
class LeakRecordingResult(tests.ExtendedTestResult):
3282
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3285
def _report_thread_leak(self, test, leaks, alive):
3286
self.leaks.append((test, leaks))
3288
def test_testcase_without_addCleanups(self):
3289
"""Check old TestCase instances don't break with leak detection"""
3290
class Test(unittest.TestCase):
3293
result = self.LeakRecordingResult()
3295
result.startTestRun()
3297
result.stopTestRun()
3298
self.assertEqual(result._tests_leaking_threads_count, 0)
3299
self.assertEqual(result.leaks, [])
3301
def test_thread_leak(self):
3302
"""Ensure a thread that outlives the running of a test is reported
3304
Uses a thread that blocks on an event, and is started by the inner
3305
test case. As the thread outlives the inner case's run, it should be
3306
detected as a leak, but the event is then set so that the thread can
3307
be safely joined in cleanup so it's not leaked for real.
3309
event = threading.Event()
3310
thread = threading.Thread(name="Leaker", target=event.wait)
3312
class Test(tests.TestCase):
3313
def test_leak(self):
3315
result = self.LeakRecordingResult()
3316
test = Test("test_leak")
3317
self.addCleanup(thread.join)
3318
self.addCleanup(event.set)
3319
result.startTestRun()
3321
result.stopTestRun()
3322
self.assertEqual(result._tests_leaking_threads_count, 1)
3323
self.assertEqual(result._first_thread_leaker_id, test.id())
3324
self.assertEqual(result.leaks, [(test, {thread})])
3325
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3327
def test_multiple_leaks(self):
3328
"""Check multiple leaks are blamed on the test cases at fault
3330
Same concept as the previous test, but has one inner test method that
3331
leaks two threads, and one that doesn't leak at all.
3333
event = threading.Event()
3334
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3335
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3336
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3338
class Test(tests.TestCase):
3339
def test_first_leak(self):
3342
def test_second_no_leak(self):
3345
def test_third_leak(self):
3348
result = self.LeakRecordingResult()
3349
first_test = Test("test_first_leak")
3350
third_test = Test("test_third_leak")
3351
self.addCleanup(thread_a.join)
3352
self.addCleanup(thread_b.join)
3353
self.addCleanup(thread_c.join)
3354
self.addCleanup(event.set)
3355
result.startTestRun()
3357
[first_test, Test("test_second_no_leak"), third_test]
3359
result.stopTestRun()
3360
self.assertEqual(result._tests_leaking_threads_count, 2)
3361
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3362
self.assertEqual(result.leaks, [
3363
(first_test, {thread_b}),
3364
(third_test, {thread_a, thread_c})])
3365
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3368
class TestPostMortemDebugging(tests.TestCase):
3369
"""Check post mortem debugging works when tests fail or error"""
3371
class TracebackRecordingResult(tests.ExtendedTestResult):
3373
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3374
self.postcode = None
3376
def _post_mortem(self, tb=None):
3377
"""Record the code object at the end of the current traceback"""
3378
tb = tb or sys.exc_info()[2]
3381
while next is not None:
3384
self.postcode = tb.tb_frame.f_code
3386
def report_error(self, test, err):
3389
def report_failure(self, test, err):
3392
def test_location_unittest_error(self):
3393
"""Needs right post mortem traceback with erroring unittest case"""
3394
class Test(unittest.TestCase):
3397
result = self.TracebackRecordingResult()
3399
self.assertEqual(result.postcode, Test.runTest.__code__)
3401
def test_location_unittest_failure(self):
3402
"""Needs right post mortem traceback with failing unittest case"""
3403
class Test(unittest.TestCase):
3405
raise self.failureException
3406
result = self.TracebackRecordingResult()
3408
self.assertEqual(result.postcode, Test.runTest.__code__)
3410
def test_location_bt_error(self):
3411
"""Needs right post mortem traceback with erroring breezy.tests case"""
3412
class Test(tests.TestCase):
3413
def test_error(self):
3415
result = self.TracebackRecordingResult()
3416
Test("test_error").run(result)
3417
self.assertEqual(result.postcode, Test.test_error.__code__)
3419
def test_location_bt_failure(self):
3420
"""Needs right post mortem traceback with failing breezy.tests case"""
3421
class Test(tests.TestCase):
3422
def test_failure(self):
3423
raise self.failureException
3424
result = self.TracebackRecordingResult()
3425
Test("test_failure").run(result)
3426
self.assertEqual(result.postcode, Test.test_failure.__code__)
3428
def test_env_var_triggers_post_mortem(self):
3429
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3431
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3432
post_mortem_calls = []
3433
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3434
self.overrideEnv('BRZ_TEST_PDB', None)
3435
result._post_mortem(1)
3436
self.overrideEnv('BRZ_TEST_PDB', 'on')
3437
result._post_mortem(2)
3438
self.assertEqual([2], post_mortem_calls)
2963
3441
class TestRunSuite(tests.TestCase):
2976
3455
self.verbosity)
2977
3456
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3457
self.assertLength(1, calls)
3460
class _Selftest(object):
3461
"""Mixin for tests needing full selftest output"""
3463
def _inject_stream_into_subunit(self, stream):
3464
"""To be overridden by subclasses that run tests out of process"""
3466
def _run_selftest(self, **kwargs):
3468
sio = TextIOWrapper(bio, 'utf-8')
3469
self._inject_stream_into_subunit(bio)
3470
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3472
return bio.getvalue()
3475
class _ForkedSelftest(_Selftest):
3476
"""Mixin for tests needing full selftest output with forked children"""
3478
_test_needs_features = [features.subunit]
3480
def _inject_stream_into_subunit(self, stream):
3481
"""Monkey-patch subunit so the extra output goes to stream not stdout
3483
Some APIs need rewriting so this kind of bogus hackery can be replaced
3484
by passing the stream param from run_tests down into ProtocolTestCase.
3486
from subunit import ProtocolTestCase
3487
_original_init = ProtocolTestCase.__init__
3489
def _init_with_passthrough(self, *args, **kwargs):
3490
_original_init(self, *args, **kwargs)
3491
self._passthrough = stream
3492
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3494
def _run_selftest(self, **kwargs):
3495
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3496
if getattr(os, "fork", None) is None:
3497
raise tests.TestNotApplicable("Platform doesn't support forking")
3498
# Make sure the fork code is actually invoked by claiming two cores
3499
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3500
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3501
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3504
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3505
"""Check operation of --parallel=fork selftest option"""
3507
def test_error_in_child_during_fork(self):
3508
"""Error in a forked child during test setup should get reported"""
3509
class Test(tests.TestCase):
3510
def testMethod(self):
3512
# We don't care what, just break something that a child will run
3513
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3514
out = self._run_selftest(test_suite_factory=Test)
3515
# Lines from the tracebacks of the two child processes may be mixed
3516
# together due to the way subunit parses and forwards the streams,
3517
# so permit extra lines between each part of the error output.
3518
self.assertContainsRe(out,
3521
b".+ in fork_for_tests\n"
3523
b"\\s*workaround_zealous_crypto_random\\(\\)\n"
3528
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3529
"""Check a test case still alive after being run emits a warning"""
3531
class Test(tests.TestCase):
3532
def test_pass(self):
3535
def test_self_ref(self):
3536
self.also_self = self.test_self_ref
3538
def test_skip(self):
3539
self.skipTest("Don't need")
3541
def _get_suite(self):
3542
return TestUtil.TestSuite([
3543
self.Test("test_pass"),
3544
self.Test("test_self_ref"),
3545
self.Test("test_skip"),
3548
def _run_selftest_with_suite(self, **kwargs):
3549
old_flags = tests.selftest_debug_flags
3550
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3551
gc_on = gc.isenabled()
3555
output = self._run_selftest(test_suite_factory=self._get_suite,
3560
tests.selftest_debug_flags = old_flags
3561
self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
3562
self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
3565
def test_testsuite(self):
3566
self._run_selftest_with_suite()
3568
def test_pattern(self):
3569
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3570
self.assertNotContainsRe(out, b"test_skip")
3572
def test_exclude_pattern(self):
3573
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3574
self.assertNotContainsRe(out, b"test_skip")
3576
def test_random_seed(self):
3577
self._run_selftest_with_suite(random_seed="now")
3579
def test_matching_tests_first(self):
3580
self._run_selftest_with_suite(matching_tests_first=True,
3581
pattern="test_self_ref$")
3583
def test_starting_with_and_exclude(self):
3584
out = self._run_selftest_with_suite(starting_with=["bt."],
3585
exclude_pattern="test_skip$")
3586
self.assertNotContainsRe(out, b"test_skip")
3588
def test_additonal_decorator(self):
3589
self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator])
3592
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3593
"""Check warnings from tests staying alive are emitted with subunit"""
3595
_test_needs_features = [features.subunit]
3597
def _run_selftest_with_suite(self, **kwargs):
3598
return TestUncollectedWarnings._run_selftest_with_suite(
3599
self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3602
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3603
"""Check warnings from tests staying alive are emitted when forking"""
3606
class TestEnvironHandling(tests.TestCase):
3608
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3609
self.assertFalse('MYVAR' in os.environ)
3610
self.overrideEnv('MYVAR', '42')
3611
# We use an embedded test to make sure we fix the _captureVar bug
3613
class Test(tests.TestCase):
3615
# The first call save the 42 value
3616
self.overrideEnv('MYVAR', None)
3617
self.assertEqual(None, os.environ.get('MYVAR'))
3618
# Make sure we can call it twice
3619
self.overrideEnv('MYVAR', None)
3620
self.assertEqual(None, os.environ.get('MYVAR'))
3622
result = tests.TextTestResult(output, 0, 1)
3623
Test('test_me').run(result)
3624
if not result.wasStrictlySuccessful():
3625
self.fail(output.getvalue())
3626
# We get our value back
3627
self.assertEqual('42', os.environ.get('MYVAR'))
3630
class TestIsolatedEnv(tests.TestCase):
3631
"""Test isolating tests from os.environ.
3633
Since we use tests that are already isolated from os.environ a bit of care
3634
should be taken when designing the tests to avoid bootstrap side-effects.
3635
The tests start an already clean os.environ which allow doing valid
3636
assertions about which variables are present or not and design tests around
3640
class ScratchMonkey(tests.TestCase):
3645
def test_basics(self):
3646
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3647
# for tests.TestCase.
3648
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3649
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3650
# Being part of isolated_environ, BRZ_HOME should not appear here
3651
self.assertFalse('BRZ_HOME' in os.environ)
3652
# Make sure we know the definition of LINES: part of os.environ for
3654
self.assertTrue('LINES' in tests.isolated_environ)
3655
self.assertEqual('25', tests.isolated_environ['LINES'])
3656
self.assertEqual('25', os.environ['LINES'])
3658
def test_injecting_unknown_variable(self):
3659
# BRZ_HOME is known to be absent from os.environ
3660
test = self.ScratchMonkey('test_me')
3661
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3662
self.assertEqual('foo', os.environ['BRZ_HOME'])
3663
tests.restore_os_environ(test)
3664
self.assertFalse('BRZ_HOME' in os.environ)
3666
def test_injecting_known_variable(self):
3667
test = self.ScratchMonkey('test_me')
3668
# LINES is known to be present in os.environ
3669
tests.override_os_environ(test, {'LINES': '42'})
3670
self.assertEqual('42', os.environ['LINES'])
3671
tests.restore_os_environ(test)
3672
self.assertEqual('25', os.environ['LINES'])
3674
def test_deleting_variable(self):
3675
test = self.ScratchMonkey('test_me')
3676
# LINES is known to be present in os.environ
3677
tests.override_os_environ(test, {'LINES': None})
3678
self.assertTrue('LINES' not in os.environ)
3679
tests.restore_os_environ(test)
3680
self.assertEqual('25', os.environ['LINES'])
3683
class TestDocTestSuiteIsolation(tests.TestCase):
3684
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3686
Since tests.TestCase alreay provides an isolation from os.environ, we use
3687
the clean environment as a base for testing. To precisely capture the
3688
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3691
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3692
not `os.environ` so each test overrides it to suit its needs.
3696
def get_doctest_suite_for_string(self, klass, string):
3697
class Finder(doctest.DocTestFinder):
3699
def find(*args, **kwargs):
3700
test = doctest.DocTestParser().get_doctest(
3701
string, {}, 'foo', 'foo.py', 0)
3704
suite = klass(test_finder=Finder())
3707
def run_doctest_suite_for_string(self, klass, string):
3708
suite = self.get_doctest_suite_for_string(klass, string)
3710
result = tests.TextTestResult(output, 0, 1)
3712
return result, output
3714
def assertDocTestStringSucceds(self, klass, string):
3715
result, output = self.run_doctest_suite_for_string(klass, string)
3716
if not result.wasStrictlySuccessful():
3717
self.fail(output.getvalue())
3719
def assertDocTestStringFails(self, klass, string):
3720
result, output = self.run_doctest_suite_for_string(klass, string)
3721
if result.wasStrictlySuccessful():
3722
self.fail(output.getvalue())
3724
def test_injected_variable(self):
3725
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3728
>>> os.environ['LINES']
3731
# doctest.DocTestSuite fails as it sees '25'
3732
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3733
# tests.DocTestSuite sees '42'
3734
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3736
def test_deleted_variable(self):
3737
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3740
>>> os.environ.get('LINES')
3742
# doctest.DocTestSuite fails as it sees '25'
3743
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3744
# tests.DocTestSuite sees None
3745
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3748
class TestSelftestExcludePatterns(tests.TestCase):
3751
super(TestSelftestExcludePatterns, self).setUp()
3752
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3754
def suite_factory(self, keep_only=None, starting_with=None):
3755
"""A test suite factory with only a few tests."""
3756
class Test(tests.TestCase):
3758
# We don't need the full class path
3759
return self._testMethodName
3769
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3771
def assertTestList(self, expected, *selftest_args):
3772
# We rely on setUp installing the right test suite factory so we can
3773
# test at the command level without loading the whole test suite
3774
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3775
actual = out.splitlines()
3776
self.assertEqual(expected, actual)
3778
def test_full_list(self):
3779
self.assertTestList(['a', 'b', 'c'])
3781
def test_single_exclude(self):
3782
self.assertTestList(['b', 'c'], '-x', 'a')
3784
def test_mutiple_excludes(self):
3785
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3788
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3790
_test_needs_features = [features.subunit]
3793
super(TestCounterHooks, self).setUp()
3795
class Test(tests.TestCase):
3798
super(Test, self).setUp()
3799
self.hooks = hooks.Hooks()
3800
self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3801
self.install_counter_hook(self.hooks, 'myhook')
3806
def run_hook_once(self):
3807
for hook in self.hooks['myhook']:
3810
self.test_class = Test
3812
def assertHookCalls(self, expected_calls, test_name):
3813
test = self.test_class(test_name)
3814
result = unittest.TestResult()
3816
self.assertTrue(hasattr(test, '_counters'))
3817
self.assertTrue('myhook' in test._counters)
3818
self.assertEqual(expected_calls, test._counters['myhook'])
3820
def test_no_hook(self):
3821
self.assertHookCalls(0, 'no_hook')
3823
def test_run_hook_once(self):
3824
tt = features.testtools
3825
if tt.module.__version__ < (0, 9, 8):
3826
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3827
self.assertHookCalls(1, 'run_hook_once')