343
333
def test_scenarios(self):
344
334
# check that constructor parameters are passed through to the adapted
346
from .per_workingtree import make_scenarios
336
from bzrlib.tests.per_workingtree import make_scenarios
349
formats = [workingtree_4.WorkingTreeFormat4(),
350
workingtree_3.WorkingTreeFormat3(),
351
workingtree_4.WorkingTreeFormat6()]
352
scenarios = make_scenarios(server1, server2, formats,
353
remote_server='c', remote_readonly_server='d',
354
remote_backing_server='e')
339
formats = [workingtree.WorkingTreeFormat2(),
340
workingtree.WorkingTreeFormat3(),]
341
scenarios = make_scenarios(server1, server2, formats)
355
342
self.assertEqual([
356
('WorkingTreeFormat4',
357
{'bzrdir_format': formats[0]._matchingcontroldir,
343
('WorkingTreeFormat2',
344
{'bzrdir_format': formats[0]._matchingbzrdir,
358
345
'transport_readonly_server': 'b',
359
346
'transport_server': 'a',
360
347
'workingtree_format': formats[0]}),
361
348
('WorkingTreeFormat3',
362
{'bzrdir_format': formats[1]._matchingcontroldir,
363
'transport_readonly_server': 'b',
364
'transport_server': 'a',
365
'workingtree_format': formats[1]}),
366
('WorkingTreeFormat6',
367
{'bzrdir_format': formats[2]._matchingcontroldir,
368
'transport_readonly_server': 'b',
369
'transport_server': 'a',
370
'workingtree_format': formats[2]}),
371
('WorkingTreeFormat6,remote',
372
{'bzrdir_format': formats[2]._matchingcontroldir,
373
'repo_is_remote': True,
374
'transport_readonly_server': 'd',
375
'transport_server': 'c',
376
'vfs_transport_factory': 'e',
377
'workingtree_format': formats[2]}),
349
{'bzrdir_format': formats[1]._matchingbzrdir,
350
'transport_readonly_server': 'b',
351
'transport_server': 'a',
352
'workingtree_format': formats[1]})],
381
356
class TestTreeScenarios(tests.TestCase):
383
358
def test_scenarios(self):
384
359
# the tree implementation scenario generator is meant to setup one
385
# instance for each working tree format, one additional instance
360
# instance for each working tree format, and one additional instance
386
361
# that will use the default wt format, but create a revision tree for
387
# the tests, and one more that uses the default wt format as a
388
# lightweight checkout of a remote repository. This means that the wt
389
# ones should have the workingtree_to_test_tree attribute set to
390
# 'return_parameter' and the revision one set to
391
# revision_tree_from_workingtree.
362
# the tests. this means that the wt ones should have the
363
# workingtree_to_test_tree attribute set to 'return_parameter' and the
364
# revision one set to revision_tree_from_workingtree.
393
from .per_tree import (
366
from bzrlib.tests.per_tree import (
394
367
_dirstate_tree_from_workingtree,
396
369
preview_tree_pre,
403
smart_server = test_server.SmartTCPServer_for_testing
404
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
405
mem_server = memory.MemoryServer
406
formats = [workingtree_4.WorkingTreeFormat4(),
407
workingtree_3.WorkingTreeFormat3(), ]
376
formats = [workingtree.WorkingTreeFormat2(),
377
workingtree.WorkingTreeFormat3(),]
408
378
scenarios = make_scenarios(server1, server2, formats)
409
self.assertEqual(9, len(scenarios))
410
default_wt_format = workingtree.format_registry.get_default()
411
wt4_format = workingtree_4.WorkingTreeFormat4()
412
wt5_format = workingtree_4.WorkingTreeFormat5()
413
wt6_format = workingtree_4.WorkingTreeFormat6()
414
git_wt_format = git_workingtree.GitWorkingTreeFormat()
379
self.assertEqual(7, len(scenarios))
380
default_wt_format = workingtree.WorkingTreeFormat4._default_format
381
wt4_format = workingtree.WorkingTreeFormat4()
382
wt5_format = workingtree.WorkingTreeFormat5()
415
383
expected_scenarios = [
416
('WorkingTreeFormat4',
417
{'bzrdir_format': formats[0]._matchingcontroldir,
384
('WorkingTreeFormat2',
385
{'bzrdir_format': formats[0]._matchingbzrdir,
418
386
'transport_readonly_server': 'b',
419
387
'transport_server': 'a',
420
388
'workingtree_format': formats[0],
421
389
'_workingtree_to_test_tree': return_parameter,
423
391
('WorkingTreeFormat3',
424
{'bzrdir_format': formats[1]._matchingcontroldir,
392
{'bzrdir_format': formats[1]._matchingbzrdir,
425
393
'transport_readonly_server': 'b',
426
394
'transport_server': 'a',
427
395
'workingtree_format': formats[1],
428
396
'_workingtree_to_test_tree': return_parameter,
430
('WorkingTreeFormat6,remote',
431
{'bzrdir_format': wt6_format._matchingcontroldir,
432
'repo_is_remote': True,
433
'transport_readonly_server': smart_readonly_server,
434
'transport_server': smart_server,
435
'vfs_transport_factory': mem_server,
436
'workingtree_format': wt6_format,
437
'_workingtree_to_test_tree': return_parameter,
440
399
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
441
'bzrdir_format': default_wt_format._matchingcontroldir,
400
'bzrdir_format': default_wt_format._matchingbzrdir,
442
401
'transport_readonly_server': 'b',
443
402
'transport_server': 'a',
444
403
'workingtree_format': default_wt_format,
447
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
448
'bzrdir_format': git_wt_format._matchingcontroldir,
449
'transport_readonly_server': 'b',
450
'transport_server': 'a',
451
'workingtree_format': git_wt_format,
454
405
('DirStateRevisionTree,WT4',
455
406
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
456
'bzrdir_format': wt4_format._matchingcontroldir,
407
'bzrdir_format': wt4_format._matchingbzrdir,
457
408
'transport_readonly_server': 'b',
458
409
'transport_server': 'a',
459
410
'workingtree_format': wt4_format,
461
412
('DirStateRevisionTree,WT5',
462
413
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
463
'bzrdir_format': wt5_format._matchingcontroldir,
414
'bzrdir_format': wt5_format._matchingbzrdir,
464
415
'transport_readonly_server': 'b',
465
416
'transport_server': 'a',
466
417
'workingtree_format': wt5_format,
469
420
{'_workingtree_to_test_tree': preview_tree_pre,
470
'bzrdir_format': default_wt_format._matchingcontroldir,
421
'bzrdir_format': default_wt_format._matchingbzrdir,
471
422
'transport_readonly_server': 'b',
472
423
'transport_server': 'a',
473
424
'workingtree_format': default_wt_format}),
474
425
('PreviewTreePost',
475
426
{'_workingtree_to_test_tree': preview_tree_post,
476
'bzrdir_format': default_wt_format._matchingcontroldir,
427
'bzrdir_format': default_wt_format._matchingbzrdir,
477
428
'transport_readonly_server': 'b',
478
429
'transport_server': 'a',
479
430
'workingtree_format': default_wt_format}),
481
432
self.assertEqual(expected_scenarios, scenarios)
603
548
tree = self.make_branch_and_memory_tree('dir')
604
549
# Guard against regression into MemoryTransport leaking
605
550
# files to disk instead of keeping them in memory.
606
self.assertFalse(osutils.lexists('dir'))
551
self.failIf(osutils.lexists('dir'))
607
552
self.assertIsInstance(tree, memorytree.MemoryTree)
609
554
def test_make_branch_and_memory_tree_with_format(self):
610
555
"""make_branch_and_memory_tree should accept a format option."""
611
556
format = bzrdir.BzrDirMetaFormat1()
612
format.repository_format = repository.format_registry.get_default()
557
format.repository_format = weaverepo.RepositoryFormat7()
613
558
tree = self.make_branch_and_memory_tree('dir', format=format)
614
559
# Guard against regression into MemoryTransport leaking
615
560
# files to disk instead of keeping them in memory.
616
self.assertFalse(osutils.lexists('dir'))
561
self.failIf(osutils.lexists('dir'))
617
562
self.assertIsInstance(tree, memorytree.MemoryTree)
618
563
self.assertEqual(format.repository_format.__class__,
619
tree.branch.repository._format.__class__)
564
tree.branch.repository._format.__class__)
621
566
def test_make_branch_builder(self):
622
567
builder = self.make_branch_builder('dir')
623
568
self.assertIsInstance(builder, branchbuilder.BranchBuilder)
624
569
# Guard against regression into MemoryTransport leaking
625
570
# files to disk instead of keeping them in memory.
626
self.assertFalse(osutils.lexists('dir'))
571
self.failIf(osutils.lexists('dir'))
628
573
def test_make_branch_builder_with_format(self):
629
574
# Use a repo layout that doesn't conform to a 'named' layout, to ensure
630
575
# that the format objects are used.
631
576
format = bzrdir.BzrDirMetaFormat1()
632
repo_format = repository.format_registry.get_default()
577
repo_format = weaverepo.RepositoryFormat7()
633
578
format.repository_format = repo_format
634
579
builder = self.make_branch_builder('dir', format=format)
635
580
the_branch = builder.get_branch()
636
581
# Guard against regression into MemoryTransport leaking
637
582
# files to disk instead of keeping them in memory.
638
self.assertFalse(osutils.lexists('dir'))
583
self.failIf(osutils.lexists('dir'))
639
584
self.assertEqual(format.repository_format.__class__,
640
585
the_branch.repository._format.__class__)
641
586
self.assertEqual(repo_format.get_format_string(),
642
587
self.get_transport().get_bytes(
643
'dir/.bzr/repository/format'))
588
'dir/.bzr/repository/format'))
645
590
def test_make_branch_builder_with_format_name(self):
646
591
builder = self.make_branch_builder('dir', format='knit')
647
592
the_branch = builder.get_branch()
648
593
# Guard against regression into MemoryTransport leaking
649
594
# files to disk instead of keeping them in memory.
650
self.assertFalse(osutils.lexists('dir'))
651
dir_format = controldir.format_registry.make_controldir('knit')
595
self.failIf(osutils.lexists('dir'))
596
dir_format = bzrdir.format_registry.make_bzrdir('knit')
652
597
self.assertEqual(dir_format.repository_format.__class__,
653
598
the_branch.repository._format.__class__)
654
self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
599
self.assertEqual('Bazaar-NG Knit Repository Format 1',
655
600
self.get_transport().get_bytes(
656
'dir/.bzr/repository/format'))
601
'dir/.bzr/repository/format'))
658
603
def test_dangling_locks_cause_failures(self):
659
604
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
660
605
def test_function(self):
661
t = self.get_transport_from_path('.')
606
t = self.get_transport('.')
662
607
l = lockdir.LockDir(t, 'lock')
666
611
result = test.run()
667
612
total_failures = result.errors + result.failures
668
613
if self._lock_check_thorough:
669
self.assertEqual(1, len(total_failures))
614
self.assertLength(1, total_failures)
671
616
# When _lock_check_thorough is disabled, then we don't trigger a
673
self.assertEqual(0, len(total_failures))
618
self.assertLength(0, total_failures)
676
621
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
677
622
"""Tests for the convenience functions TestCaseWithTransport introduces."""
679
624
def test_get_readonly_url_none(self):
680
from ..transport.readonly import ReadonlyTransportDecorator
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
681
626
self.vfs_transport_factory = memory.MemoryServer
682
627
self.transport_readonly_server = None
683
628
# calling get_readonly_transport() constructs a decorator on the url
685
630
url = self.get_readonly_url()
686
631
url2 = self.get_readonly_url('foo/bar')
687
t = transport.get_transport_from_url(url)
688
t2 = transport.get_transport_from_url(url2)
689
self.assertIsInstance(t, ReadonlyTransportDecorator)
690
self.assertIsInstance(t2, ReadonlyTransportDecorator)
632
t = transport.get_transport(url)
633
t2 = transport.get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
691
636
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
693
638
def test_get_readonly_url_http(self):
694
from .http_server import HttpServer
695
from ..transport.http import HttpTransport
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport.http import HttpTransportBase
696
641
self.transport_server = test_server.LocalURLServer
697
642
self.transport_readonly_server = HttpServer
698
643
# calling get_readonly_transport() gives us a HTTP server instance.
699
644
url = self.get_readonly_url()
700
645
url2 = self.get_readonly_url('foo/bar')
701
646
# the transport returned may be any HttpTransportBase subclass
702
t = transport.get_transport_from_url(url)
703
t2 = transport.get_transport_from_url(url2)
704
self.assertIsInstance(t, HttpTransport)
705
self.assertIsInstance(t2, HttpTransport)
647
t = transport.get_transport(url)
648
t2 = transport.get_transport(url2)
649
self.failUnless(isinstance(t, HttpTransportBase))
650
self.failUnless(isinstance(t2, HttpTransportBase))
706
651
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
708
653
def test_is_directory(self):
797
741
r"^ +[0-9]+ms\*$")
799
743
def test_unittest_reporting_unittest_class(self):
800
# getting the time from a non-breezy test works ok
744
# getting the time from a non-bzrlib test works ok
801
745
class ShortDelayTestCase(unittest.TestCase):
802
746
def test_short_delay(self):
803
747
time.sleep(0.003)
804
748
self.check_timing(ShortDelayTestCase('test_short_delay'),
751
def _patch_get_bzr_source_tree(self):
752
# Reading from the actual source tree breaks isolation, but we don't
753
# want to assume that thats *all* that would happen.
754
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
756
def test_assigned_benchmark_file_stores_date(self):
757
self._patch_get_bzr_source_tree()
759
result = bzrlib.tests.TextTestResult(self._log_file,
764
output_string = output.getvalue()
765
# if you are wondering about the regexp please read the comment in
766
# test_bench_history (bzrlib.tests.test_selftest.TestRunner)
767
# XXX: what comment? -- Andrew Bennetts
768
self.assertContainsRe(output_string, "--date [0-9.]+")
770
def test_benchhistory_records_test_times(self):
771
self._patch_get_bzr_source_tree()
772
result_stream = StringIO()
773
result = bzrlib.tests.TextTestResult(
777
bench_history=result_stream
780
# we want profile a call and check that its test duration is recorded
781
# make a new test instance that when run will generate a benchmark
782
example_test_case = TestTestResult("_time_hello_world_encoding")
783
# execute the test, which should succeed and record times
784
example_test_case.run(result)
785
lines = result_stream.getvalue().splitlines()
786
self.assertEqual(2, len(lines))
787
self.assertContainsRe(lines[1],
788
" *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
789
"._time_hello_world_encoding")
807
791
def _time_hello_world_encoding(self):
808
792
"""Profile two sleep calls
810
794
This is used to exercise the test framework.
812
self.time(text_type, b'hello', errors='replace')
813
self.time(text_type, b'world', errors='replace')
796
self.time(unicode, 'hello', errors='replace')
797
self.time(unicode, 'world', errors='replace')
815
799
def test_lsprofiling(self):
816
800
"""Verbose test result prints lsprof statistics from test cases."""
817
self.requireFeature(features.lsprof_feature)
801
self.requireFeature(test_lsprof.LSProfFeature)
818
802
result_stream = StringIO()
819
result = breezy.tests.VerboseTestResult(
803
result = bzrlib.tests.VerboseTestResult(
804
unittest._WritelnDecorator(result_stream),
840
824
# and then repeated but with 'world', rather than 'hello'.
841
825
# this should appear in the output stream of our test result.
842
826
output = result_stream.getvalue()
844
self.assertContainsRe(output,
845
r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
846
self.assertContainsRe(output,
847
r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
849
self.assertContainsRe(output,
850
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
851
self.assertContainsRe(output,
852
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
853
self.assertContainsRe(output,
854
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
855
self.assertContainsRe(output,
856
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
858
def test_uses_time_from_testtools(self):
859
"""Test case timings in verbose results should use testtools times"""
862
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
863
def startTest(self, test):
864
self.time(datetime.datetime.utcfromtimestamp(1.145))
865
super(TimeAddedVerboseTestResult, self).startTest(test)
867
def addSuccess(self, test):
868
self.time(datetime.datetime.utcfromtimestamp(51.147))
869
super(TimeAddedVerboseTestResult, self).addSuccess(test)
871
def report_tests_starting(self): pass
873
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
874
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
827
self.assertContainsRe(output,
828
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
829
self.assertContainsRe(output,
830
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
831
self.assertContainsRe(output,
832
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
833
self.assertContainsRe(output,
834
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
876
836
def test_known_failure(self):
877
"""Using knownFailure should trigger several result actions."""
837
"""A KnownFailure being raised should trigger several result actions."""
878
838
class InstrumentedTestResult(tests.ExtendedTestResult):
879
839
def stopTestRun(self): pass
881
def report_tests_starting(self): pass
840
def startTests(self): pass
841
def report_test_start(self, test): pass
883
842
def report_known_failure(self, test, err=None, details=None):
884
843
self._call = test, 'known failure'
885
844
result = InstrumentedTestResult(None, None, None, None)
887
845
class Test(tests.TestCase):
888
846
def test_function(self):
889
self.knownFailure('failed!')
847
raise tests.KnownFailure('failed!')
890
848
test = Test("test_function")
892
850
# it should invoke 'report_known_failure'.
1260
def test_verbose_test_count(self):
1261
"""A verbose test run reports the right test count at the start"""
1262
suite = TestUtil.TestSuite([
1263
unittest.FunctionTestCase(lambda:None),
1264
unittest.FunctionTestCase(lambda:None)])
1265
self.assertEqual(suite.countTestCases(), 2)
1267
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1268
# Need to use the CountingDecorator as that's what sets num_tests
1269
self.run_test_runner(runner, tests.CountingDecorator(suite))
1270
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1191
def _patch_get_bzr_source_tree(self):
1192
# Reading from the actual source tree breaks isolation, but we don't
1193
# want to assume that thats *all* that would happen.
1194
self._get_source_tree_calls = []
1196
self._get_source_tree_calls.append("called")
1198
self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', new_get)
1200
def test_bench_history(self):
1201
# tests that the running the benchmark passes bench_history into
1202
# the test result object. We can tell that happens if
1203
# _get_bzr_source_tree is called.
1204
self._patch_get_bzr_source_tree()
1205
test = TestRunner('dummy_test')
1207
runner = tests.TextTestRunner(stream=self._log_file,
1208
bench_history=output)
1209
result = self.run_test_runner(runner, test)
1210
output_string = output.getvalue()
1211
self.assertContainsRe(output_string, "--date [0-9.]+")
1212
self.assertLength(1, self._get_source_tree_calls)
1272
1214
def test_startTestRun(self):
1273
1215
"""run should call result.startTestRun()"""
1276
class LoggingDecorator(ExtendedToOriginalDecorator):
1217
class LoggingDecorator(tests.ForwardingResult):
1277
1218
def startTestRun(self):
1278
ExtendedToOriginalDecorator.startTestRun(self)
1219
tests.ForwardingResult.startTestRun(self)
1279
1220
calls.append('startTestRun')
1280
test = unittest.FunctionTestCase(lambda: None)
1221
test = unittest.FunctionTestCase(lambda:None)
1281
1222
stream = StringIO()
1282
1223
runner = tests.TextTestRunner(stream=stream,
1283
result_decorators=[LoggingDecorator])
1284
self.run_test_runner(runner, test)
1224
result_decorators=[LoggingDecorator])
1225
result = self.run_test_runner(runner, test)
1285
1226
self.assertLength(1, calls)
1287
1228
def test_stopTestRun(self):
1288
1229
"""run should call result.stopTestRun()"""
1291
class LoggingDecorator(ExtendedToOriginalDecorator):
1231
class LoggingDecorator(tests.ForwardingResult):
1292
1232
def stopTestRun(self):
1293
ExtendedToOriginalDecorator.stopTestRun(self)
1233
tests.ForwardingResult.stopTestRun(self)
1294
1234
calls.append('stopTestRun')
1295
test = unittest.FunctionTestCase(lambda: None)
1235
test = unittest.FunctionTestCase(lambda:None)
1296
1236
stream = StringIO()
1297
1237
runner = tests.TextTestRunner(stream=stream,
1298
result_decorators=[LoggingDecorator])
1299
self.run_test_runner(runner, test)
1238
result_decorators=[LoggingDecorator])
1239
result = self.run_test_runner(runner, test)
1300
1240
self.assertLength(1, calls)
1302
def test_unicode_test_output_on_ascii_stream(self):
1303
"""Showing results should always succeed even on an ascii console"""
1304
class FailureWithUnicode(tests.TestCase):
1305
def test_log_unicode(self):
1307
self.fail("Now print that log!")
1310
out = TextIOWrapper(bio, 'ascii', 'backslashreplace')
1312
bio = out = StringIO()
1313
self.overrideAttr(osutils, "get_terminal_encoding",
1314
lambda trace=False: "ascii")
1315
self.run_test_runner(
1316
tests.TextTestRunner(stream=out),
1317
FailureWithUnicode("test_log_unicode"))
1319
self.assertContainsRe(bio.getvalue(),
1320
b"(?:Text attachment: )?log"
1322
b"\\d+\\.\\d+ \\\\u2606"
1323
b"(?:\n-+\n|}}}\n)")
1326
1243
class SampleTestCase(tests.TestCase):
1328
1245
def _test_pass(self):
1332
1248
class _TestException(Exception):
1336
1252
class TestTestCase(tests.TestCase):
1337
"""Tests that test the core breezy TestCase."""
1253
"""Tests that test the core bzrlib TestCase."""
1339
1255
def test_assertLength_matches_empty(self):
1740
1631
obj.test_attr = 'modified'
1741
1632
self.assertEqual('modified', obj.test_attr)
1743
self._run_successful_test(Test('test_value'))
1634
test = Test('test_value')
1635
test.run(unittest.TestResult())
1744
1636
self.assertEqual('original', obj.test_attr)
1746
1638
def test_overrideAttr_with_value(self):
1747
self.test_attr = 'original' # Define a test attribute
1748
obj = self # Make 'obj' visible to the embedded test
1639
self.test_attr = 'original' # Define a test attribute
1640
obj = self # Make 'obj' visible to the embedded test
1750
1641
class Test(tests.TestCase):
1752
1643
def setUp(self):
1753
super(Test, self).setUp()
1644
tests.TestCase.setUp(self)
1754
1645
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1756
1647
def test_value(self):
1757
1648
self.assertEqual('original', self.orig)
1758
1649
self.assertEqual('modified', obj.test_attr)
1760
self._run_successful_test(Test('test_value'))
1651
test = Test('test_value')
1652
test.run(unittest.TestResult())
1761
1653
self.assertEqual('original', obj.test_attr)
1763
def test_overrideAttr_with_no_existing_value_and_value(self):
1764
# Do not define the test_attribute
1765
obj = self # Make 'obj' visible to the embedded test
1767
class Test(tests.TestCase):
1770
tests.TestCase.setUp(self)
1771
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1773
def test_value(self):
1774
self.assertEqual(tests._unitialized_attr, self.orig)
1775
self.assertEqual('modified', obj.test_attr)
1777
self._run_successful_test(Test('test_value'))
1778
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1780
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1781
# Do not define the test_attribute
1782
obj = self # Make 'obj' visible to the embedded test
1784
class Test(tests.TestCase):
1787
tests.TestCase.setUp(self)
1788
self.orig = self.overrideAttr(obj, 'test_attr')
1790
def test_value(self):
1791
self.assertEqual(tests._unitialized_attr, self.orig)
1792
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1794
self._run_successful_test(Test('test_value'))
1795
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1797
def test_recordCalls(self):
1798
from breezy.tests import test_selftest
1799
calls = self.recordCalls(
1800
test_selftest, '_add_numbers')
1801
self.assertEqual(test_selftest._add_numbers(2, 10),
1803
self.assertEqual(calls, [((2, 10), {})])
1806
def _add_numbers(a, b):
1810
class _MissingFeature(features.Feature):
1815
missing_feature = _MissingFeature()
1818
def _get_test(name):
1819
"""Get an instance of a specific example test.
1821
We protect this in a function so that they don't auto-run in the test
1825
class ExampleTests(tests.TestCase):
1827
def test_fail(self):
1828
mutter('this was a failing test')
1829
self.fail('this test will fail')
1831
def test_error(self):
1832
mutter('this test errored')
1833
raise RuntimeError('gotcha')
1835
def test_missing_feature(self):
1836
mutter('missing the feature')
1837
self.requireFeature(missing_feature)
1839
def test_skip(self):
1840
mutter('this test will be skipped')
1841
raise tests.TestSkipped('reason')
1843
def test_success(self):
1844
mutter('this test succeeds')
1846
def test_xfail(self):
1847
mutter('test with expected failure')
1848
self.knownFailure('this_fails')
1850
def test_unexpected_success(self):
1851
mutter('test with unexpected success')
1852
self.expectFailure('should_fail', lambda: None)
1854
return ExampleTests(name)
1857
class TestTestCaseLogDetails(tests.TestCase):
1859
def _run_test(self, test_name):
1860
test = _get_test(test_name)
1861
result = testtools.TestResult()
1865
def test_fail_has_log(self):
1866
result = self._run_test('test_fail')
1867
self.assertEqual(1, len(result.failures))
1868
result_content = result.failures[0][1]
1869
self.assertContainsRe(result_content,
1870
'(?m)^(?:Text attachment: )?log(?:$|: )')
1871
self.assertContainsRe(result_content, 'this was a failing test')
1873
def test_error_has_log(self):
1874
result = self._run_test('test_error')
1875
self.assertEqual(1, len(result.errors))
1876
result_content = result.errors[0][1]
1877
self.assertContainsRe(result_content,
1878
'(?m)^(?:Text attachment: )?log(?:$|: )')
1879
self.assertContainsRe(result_content, 'this test errored')
1881
def test_skip_has_no_log(self):
1882
result = self._run_test('test_skip')
1883
reasons = result.skip_reasons
1884
self.assertEqual({'reason'}, set(reasons))
1885
skips = reasons['reason']
1886
self.assertEqual(1, len(skips))
1888
self.assertFalse('log' in test.getDetails())
1890
def test_missing_feature_has_no_log(self):
1891
# testtools doesn't know about addNotSupported, so it just gets
1892
# considered as a skip
1893
result = self._run_test('test_missing_feature')
1894
reasons = result.skip_reasons
1895
self.assertEqual({str(missing_feature)}, set(reasons))
1896
skips = reasons[str(missing_feature)]
1897
self.assertEqual(1, len(skips))
1899
self.assertFalse('log' in test.getDetails())
1901
def test_xfail_has_no_log(self):
1902
result = self._run_test('test_xfail')
1903
self.assertEqual(1, len(result.expectedFailures))
1904
result_content = result.expectedFailures[0][1]
1905
self.assertNotContainsRe(result_content,
1906
'(?m)^(?:Text attachment: )?log(?:$|: )')
1907
self.assertNotContainsRe(result_content, 'test with expected failure')
1909
def test_unexpected_success_has_log(self):
1910
result = self._run_test('test_unexpected_success')
1911
self.assertEqual(1, len(result.unexpectedSuccesses))
1912
# Inconsistency, unexpectedSuccesses is a list of tests,
1913
# expectedFailures is a list of reasons?
1914
test = result.unexpectedSuccesses[0]
1915
details = test.getDetails()
1916
self.assertTrue('log' in details)
1919
class TestTestCloning(tests.TestCase):
1920
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1922
def test_cloned_testcase_does_not_share_details(self):
1923
"""A TestCase cloned with clone_test does not share mutable attributes
1924
such as details or cleanups.
1926
class Test(tests.TestCase):
1928
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1929
orig_test = Test('test_foo')
1930
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1931
orig_test.run(unittest.TestResult())
1932
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1933
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1935
def test_double_apply_scenario_preserves_first_scenario(self):
1936
"""Applying two levels of scenarios to a test preserves the attributes
1937
added by both scenarios.
1939
class Test(tests.TestCase):
1942
test = Test('test_foo')
1943
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1944
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1945
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1946
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1947
all_tests = list(tests.iter_suite_tests(suite))
1948
self.assertLength(4, all_tests)
1949
all_xys = sorted((t.x, t.y) for t in all_tests)
1950
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1953
1656
# NB: Don't delete this; it's not actually from 0.11!
1954
1657
@deprecated_function(deprecated_in((0, 11, 0)))
2028
1719
sample_object = ApplyDeprecatedHelper()
2029
1720
# calling an undeprecated callable raises an assertion
2030
1721
self.assertRaises(AssertionError, self.applyDeprecated,
2031
deprecated_in((0, 11, 0)),
2032
sample_object.sample_normal_method)
1722
deprecated_in((0, 11, 0)),
1723
sample_object.sample_normal_method)
2033
1724
self.assertRaises(AssertionError, self.applyDeprecated,
2034
deprecated_in((0, 11, 0)),
2035
sample_undeprecated_function, "a param value")
1725
deprecated_in((0, 11, 0)),
1726
sample_undeprecated_function, "a param value")
2036
1727
# calling a deprecated callable (function or method) with the wrong
2037
1728
# expected deprecation fails.
2038
1729
self.assertRaises(AssertionError, self.applyDeprecated,
2039
deprecated_in((0, 10, 0)),
2040
sample_object.sample_deprecated_method,
1730
deprecated_in((0, 10, 0)),
1731
sample_object.sample_deprecated_method, "a param value")
2042
1732
self.assertRaises(AssertionError, self.applyDeprecated,
2043
deprecated_in((0, 10, 0)),
2044
sample_deprecated_function)
1733
deprecated_in((0, 10, 0)),
1734
sample_deprecated_function)
2045
1735
# calling a deprecated callable (function or method) with the right
2046
1736
# expected deprecation returns the functions result.
2049
self.applyDeprecated(
2050
deprecated_in((0, 11, 0)),
2051
sample_object.sample_deprecated_method, "a param value"))
1737
self.assertEqual("a param value",
1738
self.applyDeprecated(deprecated_in((0, 11, 0)),
1739
sample_object.sample_deprecated_method, "a param value"))
2052
1740
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
2053
sample_deprecated_function))
1741
sample_deprecated_function))
2054
1742
# calling a nested deprecation with the wrong deprecation version
2055
1743
# fails even if a deeper nested function was deprecated with the
2056
1744
# supplied version.
2058
AssertionError, self.applyDeprecated,
1745
self.assertRaises(AssertionError, self.applyDeprecated,
2059
1746
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
2060
1747
# calling a nested deprecation with the right deprecation value
2061
1748
# returns the calls result.
2063
2, self.applyDeprecated(
2064
deprecated_in((0, 10, 0)),
2065
sample_object.sample_nested_deprecation))
1749
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
1750
sample_object.sample_nested_deprecation))
2067
1752
def test_callDeprecated(self):
2068
1753
def testfunc(be_deprecated, result=None):
2117
1804
self.transport_server = test_server.FakeVFATServer
2118
1805
self.assertFalse(self.get_url('t1').startswith('file://'))
2119
1806
tree = self.make_branch_and_tree('t1')
2120
base = tree.controldir.root_transport.base
1807
base = tree.bzrdir.root_transport.base
2121
1808
self.assertStartsWith(base, 'file://')
2122
self.assertEqual(tree.controldir.root_transport,
2123
tree.branch.controldir.root_transport)
2124
self.assertEqual(tree.controldir.root_transport,
2125
tree.branch.repository.controldir.root_transport)
2128
class SelfTestHelper(object):
1809
self.assertEquals(tree.bzrdir.root_transport,
1810
tree.branch.bzrdir.root_transport)
1811
self.assertEquals(tree.bzrdir.root_transport,
1812
tree.branch.repository.bzrdir.root_transport)
1815
class SelfTestHelper:
2130
1817
def run_selftest(self, **kwargs):
2131
1818
"""Run selftest returning its output."""
2134
output = TextIOWrapper(bio, 'utf-8')
2136
bio = output = StringIO()
2137
old_transport = breezy.tests.default_transport
1820
old_transport = bzrlib.tests.default_transport
2138
1821
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2139
1822
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2141
1824
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
2143
breezy.tests.default_transport = old_transport
1826
bzrlib.tests.default_transport = old_transport
2144
1827
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2152
1832
class TestSelftest(tests.TestCase, SelfTestHelper):
2153
"""Tests of breezy.tests.selftest."""
1833
"""Tests of bzrlib.tests.selftest."""
2155
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(
1835
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
2157
1836
factory_called = []
2160
1838
factory_called.append(True)
2161
1839
return TestUtil.TestSuite()
2162
1840
out = StringIO()
2163
1841
err = StringIO()
2164
self.apply_redirected(out, err, None, breezy.tests.selftest,
2165
test_suite_factory=factory)
1842
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
1843
test_suite_factory=factory)
2166
1844
self.assertEqual([True], factory_called)
2168
1846
def factory(self):
2169
1847
"""A test suite factory."""
2170
1848
class Test(tests.TestCase):
2172
return __name__ + ".Test." + self._testMethodName
2182
1855
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2184
1857
def test_list_only(self):
2185
1858
output = self.run_selftest(test_suite_factory=self.factory,
2187
1860
self.assertEqual(3, len(output.readlines()))
2189
1862
def test_list_only_filtered(self):
2190
1863
output = self.run_selftest(test_suite_factory=self.factory,
2191
list_only=True, pattern="Test.b")
2192
self.assertEndsWith(output.getvalue(), b"Test.b\n")
1864
list_only=True, pattern="Test.b")
1865
self.assertEndsWith(output.getvalue(), "Test.b\n")
2193
1866
self.assertLength(1, output.readlines())
2195
1868
def test_list_only_excludes(self):
2196
1869
output = self.run_selftest(test_suite_factory=self.factory,
2197
list_only=True, exclude_pattern="Test.b")
2198
self.assertNotContainsRe(b"Test.b", output.getvalue())
1870
list_only=True, exclude_pattern="Test.b")
1871
self.assertNotContainsRe("Test.b", output.getvalue())
2199
1872
self.assertLength(2, output.readlines())
2201
1874
def test_lsprof_tests(self):
2202
self.requireFeature(features.lsprof_feature)
1875
self.requireFeature(test_lsprof.LSProfFeature)
2205
1877
class Test(object):
2206
1878
def __call__(test, result):
2207
1879
test.run(result)
2209
1880
def run(test, result):
2210
results.append(result)
1881
self.assertIsInstance(result, tests.ForwardingResult)
1882
calls.append("called")
2212
1883
def countTestCases(self):
2214
1885
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2215
self.assertLength(1, results)
2216
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
1886
self.assertLength(1, calls)
2218
1888
def test_random(self):
2219
1889
# test randomising by listing a number of tests.
2220
1890
output_123 = self.run_selftest(test_suite_factory=self.factory,
2221
list_only=True, random_seed="123")
1891
list_only=True, random_seed="123")
2222
1892
output_234 = self.run_selftest(test_suite_factory=self.factory,
2223
list_only=True, random_seed="234")
1893
list_only=True, random_seed="234")
2224
1894
self.assertNotEqual(output_123, output_234)
2225
1895
# "Randominzing test order..\n\n
2226
1896
self.assertLength(5, output_123.readlines())
2302
1965
def test_load_unknown(self):
2303
1966
# Provide a list with one test - this test.
2304
1967
# And generate a list of the tests in the suite.
2305
self.assertRaises(errors.NoSuchFile, self.run_selftest,
2306
load_list='missing file name', list_only=True)
2309
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2311
_test_needs_features = [features.subunit]
2313
def run_subunit_stream(self, test_name):
2314
from subunit import ProtocolTestCase
2317
return TestUtil.TestSuite([_get_test(test_name)])
2318
stream = self.run_selftest(
2319
runner_class=tests.SubUnitBzrRunnerv1,
2320
test_suite_factory=factory)
2321
test = ProtocolTestCase(stream)
2322
result = testtools.TestResult()
2324
content = stream.getvalue()
2325
return content, result
2327
def test_fail_has_log(self):
2328
content, result = self.run_subunit_stream('test_fail')
2329
self.assertEqual(1, len(result.failures))
2330
self.assertContainsRe(content, b'(?m)^log$')
2331
self.assertContainsRe(content, b'this test will fail')
2333
def test_error_has_log(self):
2334
content, result = self.run_subunit_stream('test_error')
2335
self.assertContainsRe(content, b'(?m)^log$')
2336
self.assertContainsRe(content, b'this test errored')
2338
def test_skip_has_no_log(self):
2339
content, result = self.run_subunit_stream('test_skip')
2340
self.assertNotContainsRe(content, b'(?m)^log$')
2341
self.assertNotContainsRe(content, b'this test will be skipped')
2342
reasons = result.skip_reasons
2343
self.assertEqual({'reason'}, set(reasons))
2344
skips = reasons['reason']
2345
self.assertEqual(1, len(skips))
2347
# RemotedTestCase doesn't preserve the "details"
2348
# self.assertFalse('log' in test.getDetails())
2350
def test_missing_feature_has_no_log(self):
2351
content, result = self.run_subunit_stream('test_missing_feature')
2352
self.assertNotContainsRe(content, b'(?m)^log$')
2353
self.assertNotContainsRe(content, b'missing the feature')
2354
reasons = result.skip_reasons
2355
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2356
skips = reasons['_MissingFeature\n']
2357
self.assertEqual(1, len(skips))
2359
# RemotedTestCase doesn't preserve the "details"
2360
# self.assertFalse('log' in test.getDetails())
2362
def test_xfail_has_no_log(self):
2363
content, result = self.run_subunit_stream('test_xfail')
2364
self.assertNotContainsRe(content, b'(?m)^log$')
2365
self.assertNotContainsRe(content, b'test with expected failure')
2366
self.assertEqual(1, len(result.expectedFailures))
2367
result_content = result.expectedFailures[0][1]
2368
self.assertNotContainsRe(result_content,
2369
'(?m)^(?:Text attachment: )?log(?:$|: )')
2370
self.assertNotContainsRe(result_content, 'test with expected failure')
2372
def test_unexpected_success_has_log(self):
2373
content, result = self.run_subunit_stream('test_unexpected_success')
2374
self.assertContainsRe(content, b'(?m)^log$')
2375
self.assertContainsRe(content, b'test with unexpected success')
2376
self.assertEqual(1, len(result.unexpectedSuccesses))
2377
# test = result.unexpectedSuccesses[0]
2378
# RemotedTestCase doesn't preserve the "details"
2379
# self.assertTrue('log' in test.getDetails())
2381
def test_success_has_no_log(self):
2382
content, result = self.run_subunit_stream('test_success')
2383
self.assertEqual(1, result.testsRun)
2384
self.assertNotContainsRe(content, b'(?m)^log$')
2385
self.assertNotContainsRe(content, b'this test succeeds')
1968
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
1969
load_list='missing file name', list_only=True)
2388
1972
class TestRunBzr(tests.TestCase):
2394
def _run_bzr_core(self, argv, encoding=None, stdin=None,
2395
stdout=None, stderr=None, working_dir=None):
1977
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2396
1979
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2398
1981
Attempts to run bzr from inside this class don't actually run it.
2572
2171
self.next_subprocess = process
2574
2173
result = self.run_bzr_subprocess(*args, **kwargs)
2575
except BaseException:
2576
2175
self.next_subprocess = None
2577
for key, expected in expected_args.items():
2176
for key, expected in expected_args.iteritems():
2578
2177
self.assertEqual(expected, self.subprocess_calls[-1][key])
2581
2180
self.next_subprocess = None
2582
for key, expected in expected_args.items():
2181
for key, expected in expected_args.iteritems():
2583
2182
self.assertEqual(expected, self.subprocess_calls[-1][key])
2586
2185
def test_run_bzr_subprocess(self):
2587
2186
"""The run_bzr_helper_external command behaves nicely."""
2588
self.assertRunBzrSubprocess({'process_args': ['--version']},
2589
StubProcess(), '--version')
2590
self.assertRunBzrSubprocess({'process_args': ['--version']},
2591
StubProcess(), ['--version'])
2187
self.assertRunBzrSubprocess({'process_args':['--version']},
2188
StubProcess(), '--version')
2189
self.assertRunBzrSubprocess({'process_args':['--version']},
2190
StubProcess(), ['--version'])
2592
2191
# retcode=None disables retcode checking
2593
result = self.assertRunBzrSubprocess(
2594
{}, StubProcess(retcode=3), '--version', retcode=None)
2595
result = self.assertRunBzrSubprocess(
2596
{}, StubProcess(out="is free software"), '--version')
2192
result = self.assertRunBzrSubprocess({},
2193
StubProcess(retcode=3), '--version', retcode=None)
2194
result = self.assertRunBzrSubprocess({},
2195
StubProcess(out="is free software"), '--version')
2597
2196
self.assertContainsRe(result[0], 'is free software')
2598
2197
# Running a subcommand that is missing errors
2599
2198
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2600
{'process_args': ['--versionn']
2601
}, StubProcess(retcode=3),
2199
{'process_args':['--versionn']}, StubProcess(retcode=3),
2603
2201
# Unless it is told to expect the error from the subprocess
2604
result = self.assertRunBzrSubprocess(
2605
{}, StubProcess(retcode=3), '--versionn', retcode=3)
2202
result = self.assertRunBzrSubprocess({},
2203
StubProcess(retcode=3), '--versionn', retcode=3)
2606
2204
# Or to ignore retcode checking
2607
result = self.assertRunBzrSubprocess(
2608
{}, StubProcess(err="unknown command", retcode=3),
2609
'--versionn', retcode=None)
2205
result = self.assertRunBzrSubprocess({},
2206
StubProcess(err="unknown command", retcode=3), '--versionn',
2610
2208
self.assertContainsRe(result[1], 'unknown command')
2612
2210
def test_env_change_passes_through(self):
2613
2211
self.assertRunBzrSubprocess(
2614
{'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}},
2212
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2615
2213
StubProcess(), '',
2616
env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None})
2214
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2618
2216
def test_no_working_dir_passed_as_None(self):
2619
2217
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2621
2219
def test_no_working_dir_passed_through(self):
2622
2220
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2625
2223
def test_run_bzr_subprocess_no_plugins(self):
2626
2224
self.assertRunBzrSubprocess({'allow_plugins': False},
2629
2227
def test_allow_plugins(self):
2630
2228
self.assertRunBzrSubprocess({'allow_plugins': True},
2631
StubProcess(), '', allow_plugins=True)
2229
StubProcess(), '', allow_plugins=True)
2634
2232
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2664
2262
class TestStartBzrSubProcess(tests.TestCase):
2665
"""Stub test start_bzr_subprocess."""
2667
def _subprocess_log_cleanup(self):
2668
"""Inhibits the base version as we don't produce a log file."""
2264
def check_popen_state(self):
2265
"""Replace to make assertions when popen is called."""
2670
2267
def _popen(self, *args, **kwargs):
2671
"""Override the base version to record the command that is run.
2673
From there we can ensure it is correct without spawning a real process.
2268
"""Record the command that is run, so that we can ensure it is correct"""
2675
2269
self.check_popen_state()
2676
2270
self._popen_args = args
2677
2271
self._popen_kwargs = kwargs
2678
2272
raise _DontSpawnProcess()
2680
def check_popen_state(self):
2681
"""Replace to make assertions when popen is called."""
2683
2274
def test_run_bzr_subprocess_no_plugins(self):
2684
2275
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2685
2276
command = self._popen_args[0]
2686
2277
self.assertEqual(sys.executable, command[0])
2687
self.assertEqual(self.get_brz_path(), command[1])
2278
self.assertEqual(self.get_bzr_path(), command[1])
2688
2279
self.assertEqual(['--no-plugins'], command[2:])
2690
2281
def test_allow_plugins(self):
2691
2282
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2693
2284
command = self._popen_args[0]
2694
2285
self.assertEqual([], command[2:])
2696
2287
def test_set_env(self):
2697
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2288
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2698
2289
# set in the child
2700
2290
def check_environment():
2701
2291
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2702
2292
self.check_popen_state = check_environment
2703
2293
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2704
env_changes={'EXISTANT_ENV_VAR': 'set variable'})
2294
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2705
2295
# not set in theparent
2706
2296
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2708
2298
def test_run_bzr_subprocess_env_del(self):
2709
2299
"""run_bzr_subprocess can remove environment variables too."""
2710
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2712
2301
def check_environment():
2713
2302
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2714
2303
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2715
2304
self.check_popen_state = check_environment
2716
2305
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2717
env_changes={'EXISTANT_ENV_VAR': None})
2306
env_changes={'EXISTANT_ENV_VAR':None})
2718
2307
# Still set in parent
2719
2308
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2720
2309
del os.environ['EXISTANT_ENV_VAR']
2722
2311
def test_env_del_missing(self):
2723
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2312
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2725
2313
def check_environment():
2726
2314
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2727
2315
self.check_popen_state = check_environment
2728
2316
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2729
env_changes={'NON_EXISTANT_ENV_VAR': None})
2317
env_changes={'NON_EXISTANT_ENV_VAR':None})
2731
2319
def test_working_dir(self):
2732
2320
"""Test that we can specify the working dir for the child"""
2321
orig_getcwd = osutils.getcwd
2322
orig_chdir = os.chdir
2735
2324
def chdir(path):
2736
2325
chdirs.append(path)
2737
self.overrideAttr(os, 'chdir', chdir)
2741
self.overrideAttr(osutils, 'getcwd', getcwd)
2742
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2330
osutils.getcwd = getcwd
2332
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2335
osutils.getcwd = orig_getcwd
2337
os.chdir = orig_chdir
2744
2338
self.assertEqual(['foo', 'current'], chdirs)
2746
def test_get_brz_path_with_cwd_breezy(self):
2747
self.get_source_path = lambda: ""
2748
self.overrideAttr(os.path, "isfile", lambda path: True)
2749
self.assertEqual(self.get_brz_path(), "brz")
2752
2341
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2753
2342
"""Tests that really need to do things with an external bzr."""
2759
2348
self.disable_missing_extensions_warning()
2760
2349
process = self.start_bzr_subprocess(['wait-until-signalled'],
2761
2350
skip_if_plan_to_signal=True)
2762
self.assertEqual(b'running\n', process.stdout.readline())
2351
self.assertEqual('running\n', process.stdout.readline())
2763
2352
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2765
self.assertEqual(b'', result[0])
2766
self.assertEqual(b'brz: interrupted\n', result[1])
2354
self.assertEqual('', result[0])
2355
self.assertEqual('bzr: interrupted\n', result[1])
2358
class TestFeature(tests.TestCase):
2360
def test_caching(self):
2361
"""Feature._probe is called by the feature at most once."""
2362
class InstrumentedFeature(tests.Feature):
2364
super(InstrumentedFeature, self).__init__()
2367
self.calls.append('_probe')
2369
feature = InstrumentedFeature()
2371
self.assertEqual(['_probe'], feature.calls)
2373
self.assertEqual(['_probe'], feature.calls)
2375
def test_named_str(self):
2376
"""Feature.__str__ should thunk to feature_name()."""
2377
class NamedFeature(tests.Feature):
2378
def feature_name(self):
2380
feature = NamedFeature()
2381
self.assertEqual('symlinks', str(feature))
2383
def test_default_str(self):
2384
"""Feature.__str__ should default to __class__.__name__."""
2385
class NamedFeature(tests.Feature):
2387
feature = NamedFeature()
2388
self.assertEqual('NamedFeature', str(feature))
2391
class TestUnavailableFeature(tests.TestCase):
2393
def test_access_feature(self):
2394
feature = tests.Feature()
2395
exception = tests.UnavailableFeature(feature)
2396
self.assertIs(feature, exception.args[0])
2399
simple_thunk_feature = tests._CompatabilityThunkFeature(
2400
deprecated_in((2, 1, 0)),
2401
'bzrlib.tests.test_selftest',
2402
'simple_thunk_feature','UnicodeFilename',
2403
replacement_module='bzrlib.tests'
2406
class Test_CompatibilityFeature(tests.TestCase):
2408
def test_does_thunk(self):
2409
res = self.callDeprecated(
2410
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2411
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2412
simple_thunk_feature.available)
2413
self.assertEqual(tests.UnicodeFilename.available(), res)
2416
class TestModuleAvailableFeature(tests.TestCase):
2418
def test_available_module(self):
2419
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2420
self.assertEqual('bzrlib.tests', feature.module_name)
2421
self.assertEqual('bzrlib.tests', str(feature))
2422
self.assertTrue(feature.available())
2423
self.assertIs(tests, feature.module)
2425
def test_unavailable_module(self):
2426
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2427
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2428
self.assertFalse(feature.available())
2429
self.assertIs(None, feature.module)
2769
2432
class TestSelftestFiltering(tests.TestCase):
2771
2434
def setUp(self):
2772
super(TestSelftestFiltering, self).setUp()
2435
tests.TestCase.setUp(self)
2773
2436
self.suite = TestUtil.TestSuite()
2774
2437
self.loader = TestUtil.TestLoader()
2775
2438
self.suite.addTest(self.loader.loadTestsFromModule(
2776
sys.modules['breezy.tests.test_selftest']))
2439
sys.modules['bzrlib.tests.test_selftest']))
2777
2440
self.all_names = _test_ids(self.suite)
2779
2442
def test_condition_id_re(self):
2780
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2781
'test_condition_id_re')
2443
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2444
'test_condition_id_re')
2782
2445
filtered_suite = tests.filter_suite_by_condition(
2783
2446
self.suite, tests.condition_id_re('test_condition_id_re'))
2784
2447
self.assertEqual([test_name], _test_ids(filtered_suite))
2786
2449
def test_condition_id_in_list(self):
2787
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2450
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2788
2451
'test_condition_id_in_list']
2789
2452
id_list = tests.TestIdList(test_names)
2790
2453
filtered_suite = tests.filter_suite_by_condition(
2827
2490
self.all_names = _test_ids(self.suite)
2828
2491
filtered_suite = tests.exclude_tests_by_re(self.suite,
2829
2492
'exclude_tests_by_re')
2830
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2831
'test_exclude_tests_by_re')
2493
excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2494
'test_exclude_tests_by_re')
2832
2495
self.assertEqual(len(self.all_names) - 1,
2833
filtered_suite.countTestCases())
2496
filtered_suite.countTestCases())
2834
2497
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2835
2498
remaining_names = list(self.all_names)
2836
2499
remaining_names.remove(excluded_name)
2837
2500
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2839
2502
def test_filter_suite_by_condition(self):
2840
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2841
'test_filter_suite_by_condition')
2842
filtered_suite = tests.filter_suite_by_condition(
2843
self.suite, lambda x: x.id() == test_name)
2503
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2504
'test_filter_suite_by_condition')
2505
filtered_suite = tests.filter_suite_by_condition(self.suite,
2506
lambda x:x.id() == test_name)
2844
2507
self.assertEqual([test_name], _test_ids(filtered_suite))
2846
2509
def test_filter_suite_by_re(self):
2847
2510
filtered_suite = tests.filter_suite_by_re(self.suite,
2848
2511
'test_filter_suite_by_r')
2849
2512
filtered_names = _test_ids(filtered_suite)
2851
filtered_names, ['breezy.tests.test_selftest.'
2852
'TestSelftestFiltering.test_filter_suite_by_re'])
2513
self.assertEqual(filtered_names, ['bzrlib.tests.test_selftest.'
2514
'TestSelftestFiltering.test_filter_suite_by_re'])
2854
2516
def test_filter_suite_by_id_list(self):
2855
test_list = ['breezy.tests.test_selftest.'
2517
test_list = ['bzrlib.tests.test_selftest.'
2856
2518
'TestSelftestFiltering.test_filter_suite_by_id_list']
2857
2519
filtered_suite = tests.filter_suite_by_id_list(
2858
2520
self.suite, tests.TestIdList(test_list))
2859
2521
filtered_names = _test_ids(filtered_suite)
2860
2522
self.assertEqual(
2861
2523
filtered_names,
2862
['breezy.tests.test_selftest.'
2524
['bzrlib.tests.test_selftest.'
2863
2525
'TestSelftestFiltering.test_filter_suite_by_id_list'])
2865
2527
def test_filter_suite_by_id_startswith(self):
2866
2528
# By design this test may fail if another test is added whose name also
2867
2529
# begins with one of the start value used.
2868
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2530
klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
2869
2531
start1 = klass + 'test_filter_suite_by_id_starts'
2870
2532
start2 = klass + 'test_filter_suite_by_id_li'
2871
2533
test_list = [klass + 'test_filter_suite_by_id_list',
3119
2776
self.assertEqual([], test_list)
3121
2778
self.assertSubset([
3126
2783
def test_test_suite(self):
3127
2784
# test_suite() loads the entire test suite to operate. To avoid this
3128
2785
# overhead, and yet still be confident that things are happening,
3129
# we temporarily replace two functions used by test_suite with
2786
# we temporarily replace two functions used by test_suite with
3130
2787
# test doubles that supply a few sample tests to load, and check they
3134
2790
def testmod_names():
3135
2791
calls.append("testmod_names")
3137
'breezy.tests.blackbox.test_branch',
3138
'breezy.tests.per_transport',
3139
'breezy.tests.test_selftest',
2793
'bzrlib.tests.blackbox.test_branch',
2794
'bzrlib.tests.per_transport',
2795
'bzrlib.tests.test_selftest',
3141
2797
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3143
2798
def doctests():
3144
2799
calls.append("modules_to_doctest")
3145
2800
if __doc__ is None:
3147
return ['breezy.timestamp']
2802
return ['bzrlib.timestamp']
3148
2803
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3149
2804
expected_test_list = [
3150
2805
# testmod_names
3151
'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
3152
('breezy.tests.per_transport.TransportTests'
2806
'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2807
('bzrlib.tests.per_transport.TransportTests'
3153
2808
'.test_abspath(LocalTransport,LocalURLServer)'),
3154
'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
2809
'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
3155
2810
# plugins can't be tested that way since selftest may be run with
3158
if __doc__ is not None and not PY3:
2813
if __doc__ is not None:
3159
2814
expected_test_list.extend([
3160
2815
# modules_to_doctest
3161
'breezy.timestamp.format_highres_date',
2816
'bzrlib.timestamp.format_highres_date',
3163
2818
suite = tests.test_suite()
3165
self.assertEqual({"testmod_names"}, set(calls))
3167
self.assertEqual({"testmod_names", "modules_to_doctest"},
2819
self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
3169
2821
self.assertSubset(expected_test_list, _test_ids(suite))
3171
2823
def test_test_suite_list_and_start(self):
3172
# We cannot test this at the same time as the main load, because we
3173
# want to know that starting_with == None works. So a second load is
3174
# incurred - note that the starting_with parameter causes a partial
3175
# load rather than a full load so this test should be pretty quick.
3177
'breezy.tests.test_selftest.TestTestSuite.test_test_suite']
2824
# We cannot test this at the same time as the main load, because we want
2825
# to know that starting_with == None works. So a second load is
2826
# incurred - note that the starting_with parameter causes a partial load
2827
# rather than a full load so this test should be pretty quick.
2828
test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
3178
2829
suite = tests.test_suite(test_list,
3179
['breezy.tests.test_selftest.TestTestSuite'])
3180
# test_test_suite_list_and_start is not included
3181
self.assertEqual(test_list, _test_ids(suite))
2830
['bzrlib.tests.test_selftest.TestTestSuite'])
2831
# test_test_suite_list_and_start is not included
2832
self.assertEquals(test_list, _test_ids(suite))
3184
2835
class TestLoadTestIdList(tests.TestCaseInTempDir):
3300
2951
def test_predefined_prefixes(self):
3301
2952
tpr = tests.test_prefix_alias_registry
3302
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3303
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3304
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3305
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3306
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3307
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3310
class TestThreadLeakDetection(tests.TestCase):
3311
"""Ensure when tests leak threads we detect and report it"""
3313
class LeakRecordingResult(tests.ExtendedTestResult):
3315
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3318
def _report_thread_leak(self, test, leaks, alive):
3319
self.leaks.append((test, leaks))
3321
def test_testcase_without_addCleanups(self):
3322
"""Check old TestCase instances don't break with leak detection"""
3323
class Test(unittest.TestCase):
3326
result = self.LeakRecordingResult()
3328
result.startTestRun()
3330
result.stopTestRun()
3331
self.assertEqual(result._tests_leaking_threads_count, 0)
3332
self.assertEqual(result.leaks, [])
3334
def test_thread_leak(self):
3335
"""Ensure a thread that outlives the running of a test is reported
3337
Uses a thread that blocks on an event, and is started by the inner
3338
test case. As the thread outlives the inner case's run, it should be
3339
detected as a leak, but the event is then set so that the thread can
3340
be safely joined in cleanup so it's not leaked for real.
3342
event = threading.Event()
3343
thread = threading.Thread(name="Leaker", target=event.wait)
3345
class Test(tests.TestCase):
3346
def test_leak(self):
3348
result = self.LeakRecordingResult()
3349
test = Test("test_leak")
3350
self.addCleanup(thread.join)
3351
self.addCleanup(event.set)
3352
result.startTestRun()
3354
result.stopTestRun()
3355
self.assertEqual(result._tests_leaking_threads_count, 1)
3356
self.assertEqual(result._first_thread_leaker_id, test.id())
3357
self.assertEqual(result.leaks, [(test, {thread})])
3358
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3360
def test_multiple_leaks(self):
3361
"""Check multiple leaks are blamed on the test cases at fault
3363
Same concept as the previous test, but has one inner test method that
3364
leaks two threads, and one that doesn't leak at all.
3366
event = threading.Event()
3367
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3368
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3369
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3371
class Test(tests.TestCase):
3372
def test_first_leak(self):
3375
def test_second_no_leak(self):
3378
def test_third_leak(self):
3381
result = self.LeakRecordingResult()
3382
first_test = Test("test_first_leak")
3383
third_test = Test("test_third_leak")
3384
self.addCleanup(thread_a.join)
3385
self.addCleanup(thread_b.join)
3386
self.addCleanup(thread_c.join)
3387
self.addCleanup(event.set)
3388
result.startTestRun()
3390
[first_test, Test("test_second_no_leak"), third_test]
3392
result.stopTestRun()
3393
self.assertEqual(result._tests_leaking_threads_count, 2)
3394
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3395
self.assertEqual(result.leaks, [
3396
(first_test, {thread_b}),
3397
(third_test, {thread_a, thread_c})])
3398
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3401
class TestPostMortemDebugging(tests.TestCase):
3402
"""Check post mortem debugging works when tests fail or error"""
3404
class TracebackRecordingResult(tests.ExtendedTestResult):
3406
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3407
self.postcode = None
3409
def _post_mortem(self, tb=None):
3410
"""Record the code object at the end of the current traceback"""
3411
tb = tb or sys.exc_info()[2]
3414
while next is not None:
3417
self.postcode = tb.tb_frame.f_code
3419
def report_error(self, test, err):
3422
def report_failure(self, test, err):
3425
def test_location_unittest_error(self):
3426
"""Needs right post mortem traceback with erroring unittest case"""
3427
class Test(unittest.TestCase):
3430
result = self.TracebackRecordingResult()
3432
self.assertEqual(result.postcode, Test.runTest.__code__)
3434
def test_location_unittest_failure(self):
3435
"""Needs right post mortem traceback with failing unittest case"""
3436
class Test(unittest.TestCase):
3438
raise self.failureException
3439
result = self.TracebackRecordingResult()
3441
self.assertEqual(result.postcode, Test.runTest.__code__)
3443
def test_location_bt_error(self):
3444
"""Needs right post mortem traceback with erroring breezy.tests case"""
3445
class Test(tests.TestCase):
3446
def test_error(self):
3448
result = self.TracebackRecordingResult()
3449
Test("test_error").run(result)
3450
self.assertEqual(result.postcode, Test.test_error.__code__)
3452
def test_location_bt_failure(self):
3453
"""Needs right post mortem traceback with failing breezy.tests case"""
3454
class Test(tests.TestCase):
3455
def test_failure(self):
3456
raise self.failureException
3457
result = self.TracebackRecordingResult()
3458
Test("test_failure").run(result)
3459
self.assertEqual(result.postcode, Test.test_failure.__code__)
3461
def test_env_var_triggers_post_mortem(self):
3462
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3464
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3465
post_mortem_calls = []
3466
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3467
self.overrideEnv('BRZ_TEST_PDB', None)
3468
result._post_mortem(1)
3469
self.overrideEnv('BRZ_TEST_PDB', 'on')
3470
result._post_mortem(2)
3471
self.assertEqual([2], post_mortem_calls)
2953
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2954
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2955
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2956
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2957
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2958
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3474
2961
class TestRunSuite(tests.TestCase):
3488
2974
self.verbosity)
3489
2975
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3490
2976
self.assertLength(1, calls)
3493
class _Selftest(object):
3494
"""Mixin for tests needing full selftest output"""
3496
def _inject_stream_into_subunit(self, stream):
3497
"""To be overridden by subclasses that run tests out of process"""
3499
def _run_selftest(self, **kwargs):
3502
sio = TextIOWrapper(bio, 'utf-8')
3504
sio = bio = StringIO()
3505
self._inject_stream_into_subunit(bio)
3506
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3508
return bio.getvalue()
3511
class _ForkedSelftest(_Selftest):
3512
"""Mixin for tests needing full selftest output with forked children"""
3514
_test_needs_features = [features.subunit]
3516
def _inject_stream_into_subunit(self, stream):
3517
"""Monkey-patch subunit so the extra output goes to stream not stdout
3519
Some APIs need rewriting so this kind of bogus hackery can be replaced
3520
by passing the stream param from run_tests down into ProtocolTestCase.
3522
from subunit import ProtocolTestCase
3523
_original_init = ProtocolTestCase.__init__
3525
def _init_with_passthrough(self, *args, **kwargs):
3526
_original_init(self, *args, **kwargs)
3527
self._passthrough = stream
3528
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3530
def _run_selftest(self, **kwargs):
3531
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3532
if getattr(os, "fork", None) is None:
3533
raise tests.TestNotApplicable("Platform doesn't support forking")
3534
# Make sure the fork code is actually invoked by claiming two cores
3535
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3536
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3537
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3540
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3541
"""Check operation of --parallel=fork selftest option"""
3543
def test_error_in_child_during_fork(self):
3544
"""Error in a forked child during test setup should get reported"""
3545
class Test(tests.TestCase):
3546
def testMethod(self):
3548
# We don't care what, just break something that a child will run
3549
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3550
out = self._run_selftest(test_suite_factory=Test)
3551
# Lines from the tracebacks of the two child processes may be mixed
3552
# together due to the way subunit parses and forwards the streams,
3553
# so permit extra lines between each part of the error output.
3554
self.assertContainsRe(out,
3557
b".+ in fork_for_tests\n"
3559
b"\\s*workaround_zealous_crypto_random\\(\\)\n"
3564
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3565
"""Check a test case still alive after being run emits a warning"""
3567
class Test(tests.TestCase):
3568
def test_pass(self):
3571
def test_self_ref(self):
3572
self.also_self = self.test_self_ref
3574
def test_skip(self):
3575
self.skipTest("Don't need")
3577
def _get_suite(self):
3578
return TestUtil.TestSuite([
3579
self.Test("test_pass"),
3580
self.Test("test_self_ref"),
3581
self.Test("test_skip"),
3584
def _run_selftest_with_suite(self, **kwargs):
3585
old_flags = tests.selftest_debug_flags
3586
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3587
gc_on = gc.isenabled()
3591
output = self._run_selftest(test_suite_factory=self._get_suite,
3596
tests.selftest_debug_flags = old_flags
3597
self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
3598
self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
3601
def test_testsuite(self):
3602
self._run_selftest_with_suite()
3604
def test_pattern(self):
3605
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3606
self.assertNotContainsRe(out, b"test_skip")
3608
def test_exclude_pattern(self):
3609
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3610
self.assertNotContainsRe(out, b"test_skip")
3612
def test_random_seed(self):
3613
self._run_selftest_with_suite(random_seed="now")
3615
def test_matching_tests_first(self):
3616
self._run_selftest_with_suite(matching_tests_first=True,
3617
pattern="test_self_ref$")
3619
def test_starting_with_and_exclude(self):
3620
out = self._run_selftest_with_suite(starting_with=["bt."],
3621
exclude_pattern="test_skip$")
3622
self.assertNotContainsRe(out, b"test_skip")
3624
def test_additonal_decorator(self):
3625
self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator])
3628
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3629
"""Check warnings from tests staying alive are emitted with subunit"""
3631
_test_needs_features = [features.subunit]
3633
def _run_selftest_with_suite(self, **kwargs):
3634
return TestUncollectedWarnings._run_selftest_with_suite(
3635
self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3638
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3639
"""Check warnings from tests staying alive are emitted when forking"""
3642
class TestEnvironHandling(tests.TestCase):
3644
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3645
self.assertFalse('MYVAR' in os.environ)
3646
self.overrideEnv('MYVAR', '42')
3647
# We use an embedded test to make sure we fix the _captureVar bug
3649
class Test(tests.TestCase):
3651
# The first call save the 42 value
3652
self.overrideEnv('MYVAR', None)
3653
self.assertEqual(None, os.environ.get('MYVAR'))
3654
# Make sure we can call it twice
3655
self.overrideEnv('MYVAR', None)
3656
self.assertEqual(None, os.environ.get('MYVAR'))
3658
result = tests.TextTestResult(output, 0, 1)
3659
Test('test_me').run(result)
3660
if not result.wasStrictlySuccessful():
3661
self.fail(output.getvalue())
3662
# We get our value back
3663
self.assertEqual('42', os.environ.get('MYVAR'))
3666
class TestIsolatedEnv(tests.TestCase):
3667
"""Test isolating tests from os.environ.
3669
Since we use tests that are already isolated from os.environ a bit of care
3670
should be taken when designing the tests to avoid bootstrap side-effects.
3671
The tests start an already clean os.environ which allow doing valid
3672
assertions about which variables are present or not and design tests around
3676
class ScratchMonkey(tests.TestCase):
3681
def test_basics(self):
3682
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3683
# for tests.TestCase.
3684
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3685
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3686
# Being part of isolated_environ, BRZ_HOME should not appear here
3687
self.assertFalse('BRZ_HOME' in os.environ)
3688
# Make sure we know the definition of LINES: part of os.environ for
3690
self.assertTrue('LINES' in tests.isolated_environ)
3691
self.assertEqual('25', tests.isolated_environ['LINES'])
3692
self.assertEqual('25', os.environ['LINES'])
3694
def test_injecting_unknown_variable(self):
3695
# BRZ_HOME is known to be absent from os.environ
3696
test = self.ScratchMonkey('test_me')
3697
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3698
self.assertEqual('foo', os.environ['BRZ_HOME'])
3699
tests.restore_os_environ(test)
3700
self.assertFalse('BRZ_HOME' in os.environ)
3702
def test_injecting_known_variable(self):
3703
test = self.ScratchMonkey('test_me')
3704
# LINES is known to be present in os.environ
3705
tests.override_os_environ(test, {'LINES': '42'})
3706
self.assertEqual('42', os.environ['LINES'])
3707
tests.restore_os_environ(test)
3708
self.assertEqual('25', os.environ['LINES'])
3710
def test_deleting_variable(self):
3711
test = self.ScratchMonkey('test_me')
3712
# LINES is known to be present in os.environ
3713
tests.override_os_environ(test, {'LINES': None})
3714
self.assertTrue('LINES' not in os.environ)
3715
tests.restore_os_environ(test)
3716
self.assertEqual('25', os.environ['LINES'])
3719
class TestDocTestSuiteIsolation(tests.TestCase):
3720
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3722
Since tests.TestCase alreay provides an isolation from os.environ, we use
3723
the clean environment as a base for testing. To precisely capture the
3724
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3727
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3728
not `os.environ` so each test overrides it to suit its needs.
3732
def get_doctest_suite_for_string(self, klass, string):
3733
class Finder(doctest.DocTestFinder):
3735
def find(*args, **kwargs):
3736
test = doctest.DocTestParser().get_doctest(
3737
string, {}, 'foo', 'foo.py', 0)
3740
suite = klass(test_finder=Finder())
3743
def run_doctest_suite_for_string(self, klass, string):
3744
suite = self.get_doctest_suite_for_string(klass, string)
3746
result = tests.TextTestResult(output, 0, 1)
3748
return result, output
3750
def assertDocTestStringSucceds(self, klass, string):
3751
result, output = self.run_doctest_suite_for_string(klass, string)
3752
if not result.wasStrictlySuccessful():
3753
self.fail(output.getvalue())
3755
def assertDocTestStringFails(self, klass, string):
3756
result, output = self.run_doctest_suite_for_string(klass, string)
3757
if result.wasStrictlySuccessful():
3758
self.fail(output.getvalue())
3760
def test_injected_variable(self):
3761
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3764
>>> os.environ['LINES']
3767
# doctest.DocTestSuite fails as it sees '25'
3768
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3769
# tests.DocTestSuite sees '42'
3770
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3772
def test_deleted_variable(self):
3773
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3776
>>> os.environ.get('LINES')
3778
# doctest.DocTestSuite fails as it sees '25'
3779
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3780
# tests.DocTestSuite sees None
3781
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3784
class TestSelftestExcludePatterns(tests.TestCase):
3787
super(TestSelftestExcludePatterns, self).setUp()
3788
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3790
def suite_factory(self, keep_only=None, starting_with=None):
3791
"""A test suite factory with only a few tests."""
3792
class Test(tests.TestCase):
3794
# We don't need the full class path
3795
return self._testMethodName
3805
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3807
def assertTestList(self, expected, *selftest_args):
3808
# We rely on setUp installing the right test suite factory so we can
3809
# test at the command level without loading the whole test suite
3810
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3811
actual = out.splitlines()
3812
self.assertEqual(expected, actual)
3814
def test_full_list(self):
3815
self.assertTestList(['a', 'b', 'c'])
3817
def test_single_exclude(self):
3818
self.assertTestList(['b', 'c'], '-x', 'a')
3820
def test_mutiple_excludes(self):
3821
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3824
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3826
_test_needs_features = [features.subunit]
3829
super(TestCounterHooks, self).setUp()
3831
class Test(tests.TestCase):
3834
super(Test, self).setUp()
3835
self.hooks = hooks.Hooks()
3836
self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3837
self.install_counter_hook(self.hooks, 'myhook')
3842
def run_hook_once(self):
3843
for hook in self.hooks['myhook']:
3846
self.test_class = Test
3848
def assertHookCalls(self, expected_calls, test_name):
3849
test = self.test_class(test_name)
3850
result = unittest.TestResult()
3852
self.assertTrue(hasattr(test, '_counters'))
3853
self.assertTrue('myhook' in test._counters)
3854
self.assertEqual(expected_calls, test._counters['myhook'])
3856
def test_no_hook(self):
3857
self.assertHookCalls(0, 'no_hook')
3859
def test_run_hook_once(self):
3860
tt = features.testtools
3861
if tt.module.__version__ < (0, 9, 8):
3862
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3863
self.assertHookCalls(1, 'run_hook_once')