/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2017-07-23 22:06:41 UTC
  • mfrom: (6738 trunk)
  • mto: This revision was merged to the branch mainline in revision 6739.
  • Revision ID: jelmer@jelmer.uk-20170723220641-69eczax9bmv8d6kk
Merge trunk, address review comments.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
import gc
20
20
import doctest
21
 
from functools import reduce
22
 
from io import BytesIO, StringIO, TextIOWrapper
23
21
import os
24
22
import signal
25
23
import sys
64
62
from ..bzr import (
65
63
    groupcompress_repo,
66
64
    )
67
 
from ..git import (
68
 
    workingtree as git_workingtree,
 
65
from ..sixish import (
 
66
    StringIO,
 
67
    text_type,
69
68
    )
70
69
from ..symbol_versioning import (
71
70
    deprecated_function,
98
97
            "text", "plain", {"charset": "utf8"})))
99
98
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
100
99
        self.assertThat(self.get_log(),
101
 
                        DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
100
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
102
101
 
103
102
 
104
103
class TestTreeShape(tests.TestCaseInTempDir):
107
106
        self.requireFeature(features.UnicodeFilenameFeature)
108
107
 
109
108
        filename = u'hell\u00d8'
110
 
        self.build_tree_contents([(filename, b'contents of hello')])
 
109
        self.build_tree_contents([(filename, 'contents of hello')])
111
110
        self.assertPathExists(filename)
112
111
 
113
112
 
140
139
        class MockModule(object):
141
140
            def get_test_permutations(self):
142
141
                return sample_permutation
143
 
        sample_permutation = [(1, 2), (3, 4)]
 
142
        sample_permutation = [(1,2), (3,4)]
144
143
        from .per_transport import get_transport_test_permutations
145
144
        self.assertEqual(sample_permutation,
146
145
                         get_transport_test_permutations(MockModule()))
157
156
        for module in modules:
158
157
            try:
159
158
                permutation_count += len(reduce(getattr,
160
 
                                                (module
161
 
                                                 + ".get_test_permutations").split('.')[1:],
162
 
                                                __import__(module))())
 
159
                    (module + ".get_test_permutations").split('.')[1:],
 
160
                     __import__(module))())
163
161
            except errors.DependencyNotPresent:
164
162
                pass
165
163
        scenarios = transport_test_permutations()
237
235
        from .per_repository import formats_to_scenarios
238
236
        formats = [("(c)", remote.RemoteRepositoryFormat()),
239
237
                   ("(d)", repository.format_registry.get(
240
 
                    b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
 
238
                    'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
241
239
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
242
 
                                                None)
 
240
            None)
243
241
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
244
 
                                             vfs_transport_factory="vfs")
 
242
            vfs_transport_factory="vfs")
245
243
        # no_vfs generate scenarios without vfs_transport_factory
246
244
        expected = [
247
245
            ('RemoteRepositoryFormat(c)',
279
277
        input_test = TestTestScenarioApplication("test_apply_scenario")
280
278
        # setup two adapted tests
281
279
        adapted_test1 = apply_scenario(input_test,
282
 
                                       ("new id",
283
 
                                        {"bzrdir_format": "bzr_format",
284
 
                                         "repository_format": "repo_fmt",
285
 
                                         "transport_server": "transport_server",
286
 
                                         "transport_readonly_server": "readonly-server"}))
 
280
            ("new id",
 
281
            {"bzrdir_format":"bzr_format",
 
282
             "repository_format":"repo_fmt",
 
283
             "transport_server":"transport_server",
 
284
             "transport_readonly_server":"readonly-server"}))
287
285
        adapted_test2 = apply_scenario(input_test,
288
 
                                       ("new id 2", {"bzrdir_format": None}))
 
286
            ("new id 2", {"bzrdir_format":None}))
289
287
        # input_test should have been altered.
290
288
        self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
291
289
        # the new tests are mutually incompatible, ensuring it has
295
293
        self.assertEqual("repo_fmt", adapted_test1.repository_format)
296
294
        self.assertEqual("transport_server", adapted_test1.transport_server)
297
295
        self.assertEqual("readonly-server",
298
 
                         adapted_test1.transport_readonly_server)
 
296
            adapted_test1.transport_readonly_server)
299
297
        self.assertEqual(
300
298
            "breezy.tests.test_selftest.TestTestScenarioApplication."
301
299
            "test_apply_scenario(new id)",
345
343
                   workingtree_3.WorkingTreeFormat3(),
346
344
                   workingtree_4.WorkingTreeFormat6()]
347
345
        scenarios = make_scenarios(server1, server2, formats,
348
 
                                   remote_server='c', remote_readonly_server='d',
349
 
                                   remote_backing_server='e')
 
346
            remote_server='c', remote_readonly_server='d',
 
347
            remote_backing_server='e')
350
348
        self.assertEqual([
351
349
            ('WorkingTreeFormat4',
352
 
             {'bzrdir_format': formats[0]._matchingcontroldir,
 
350
             {'bzrdir_format': formats[0]._matchingbzrdir,
353
351
              'transport_readonly_server': 'b',
354
352
              'transport_server': 'a',
355
353
              'workingtree_format': formats[0]}),
356
354
            ('WorkingTreeFormat3',
357
 
             {'bzrdir_format': formats[1]._matchingcontroldir,
 
355
             {'bzrdir_format': formats[1]._matchingbzrdir,
358
356
              'transport_readonly_server': 'b',
359
357
              'transport_server': 'a',
360
358
              'workingtree_format': formats[1]}),
361
359
            ('WorkingTreeFormat6',
362
 
             {'bzrdir_format': formats[2]._matchingcontroldir,
 
360
             {'bzrdir_format': formats[2]._matchingbzrdir,
363
361
              'transport_readonly_server': 'b',
364
362
              'transport_server': 'a',
365
363
              'workingtree_format': formats[2]}),
366
364
            ('WorkingTreeFormat6,remote',
367
 
             {'bzrdir_format': formats[2]._matchingcontroldir,
 
365
             {'bzrdir_format': formats[2]._matchingbzrdir,
368
366
              'repo_is_remote': True,
369
367
              'transport_readonly_server': 'd',
370
368
              'transport_server': 'c',
399
397
        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
400
398
        mem_server = memory.MemoryServer
401
399
        formats = [workingtree_4.WorkingTreeFormat4(),
402
 
                   workingtree_3.WorkingTreeFormat3(), ]
 
400
                   workingtree_3.WorkingTreeFormat3(),]
403
401
        scenarios = make_scenarios(server1, server2, formats)
404
 
        self.assertEqual(9, len(scenarios))
 
402
        self.assertEqual(8, len(scenarios))
405
403
        default_wt_format = workingtree.format_registry.get_default()
406
404
        wt4_format = workingtree_4.WorkingTreeFormat4()
407
405
        wt5_format = workingtree_4.WorkingTreeFormat5()
408
406
        wt6_format = workingtree_4.WorkingTreeFormat6()
409
 
        git_wt_format = git_workingtree.GitWorkingTreeFormat()
410
407
        expected_scenarios = [
411
408
            ('WorkingTreeFormat4',
412
 
             {'bzrdir_format': formats[0]._matchingcontroldir,
 
409
             {'bzrdir_format': formats[0]._matchingbzrdir,
413
410
              'transport_readonly_server': 'b',
414
411
              'transport_server': 'a',
415
412
              'workingtree_format': formats[0],
416
413
              '_workingtree_to_test_tree': return_parameter,
417
414
              }),
418
415
            ('WorkingTreeFormat3',
419
 
             {'bzrdir_format': formats[1]._matchingcontroldir,
 
416
             {'bzrdir_format': formats[1]._matchingbzrdir,
420
417
              'transport_readonly_server': 'b',
421
418
              'transport_server': 'a',
422
419
              'workingtree_format': formats[1],
423
420
              '_workingtree_to_test_tree': return_parameter,
424
 
              }),
 
421
             }),
425
422
            ('WorkingTreeFormat6,remote',
426
 
             {'bzrdir_format': wt6_format._matchingcontroldir,
 
423
             {'bzrdir_format': wt6_format._matchingbzrdir,
427
424
              'repo_is_remote': True,
428
425
              'transport_readonly_server': smart_readonly_server,
429
426
              'transport_server': smart_server,
430
427
              'vfs_transport_factory': mem_server,
431
428
              'workingtree_format': wt6_format,
432
429
              '_workingtree_to_test_tree': return_parameter,
433
 
              }),
 
430
             }),
434
431
            ('RevisionTree',
435
432
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
436
 
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
433
              'bzrdir_format': default_wt_format._matchingbzrdir,
437
434
              'transport_readonly_server': 'b',
438
435
              'transport_server': 'a',
439
436
              'workingtree_format': default_wt_format,
440
 
              }),
441
 
            ('GitRevisionTree',
442
 
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
443
 
              'bzrdir_format': git_wt_format._matchingcontroldir,
444
 
              'transport_readonly_server': 'b',
445
 
              'transport_server': 'a',
446
 
              'workingtree_format': git_wt_format,
447
 
              }
448
 
             ),
 
437
             }),
449
438
            ('DirStateRevisionTree,WT4',
450
439
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
451
 
              'bzrdir_format': wt4_format._matchingcontroldir,
 
440
              'bzrdir_format': wt4_format._matchingbzrdir,
452
441
              'transport_readonly_server': 'b',
453
442
              'transport_server': 'a',
454
443
              'workingtree_format': wt4_format,
455
 
              }),
 
444
             }),
456
445
            ('DirStateRevisionTree,WT5',
457
446
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
458
 
              'bzrdir_format': wt5_format._matchingcontroldir,
 
447
              'bzrdir_format': wt5_format._matchingbzrdir,
459
448
              'transport_readonly_server': 'b',
460
449
              'transport_server': 'a',
461
450
              'workingtree_format': wt5_format,
462
 
              }),
 
451
             }),
463
452
            ('PreviewTree',
464
453
             {'_workingtree_to_test_tree': preview_tree_pre,
465
 
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
454
              'bzrdir_format': default_wt_format._matchingbzrdir,
466
455
              'transport_readonly_server': 'b',
467
456
              'transport_server': 'a',
468
457
              'workingtree_format': default_wt_format}),
469
458
            ('PreviewTreePost',
470
459
             {'_workingtree_to_test_tree': preview_tree_post,
471
 
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
460
              'bzrdir_format': default_wt_format._matchingbzrdir,
472
461
              'transport_readonly_server': 'b',
473
462
              'transport_server': 'a',
474
463
              'workingtree_format': default_wt_format}),
475
 
            ]
 
464
             ]
476
465
        self.assertEqual(expected_scenarios, scenarios)
477
466
 
478
467
 
504
493
        format1 = WorkingTreeFormat4()
505
494
        format2 = WorkingTreeFormat3()
506
495
        formats = [("1", str, format1, format2, "converter1"),
507
 
                   ("2", int, format2, format1, "converter2")]
 
496
            ("2", int, format2, format1, "converter2")]
508
497
        scenarios = make_scenarios(server1, server2, formats)
509
498
        self.assertEqual(2, len(scenarios))
510
499
        expected_scenarios = [
511
500
            ("1", {
512
 
                "bzrdir_format": format1._matchingcontroldir,
 
501
                "bzrdir_format": format1._matchingbzrdir,
513
502
                "intertree_class": formats[0][1],
514
503
                "workingtree_format": formats[0][2],
515
504
                "workingtree_format_to": formats[0][3],
519
508
                "transport_readonly_server": server2,
520
509
                }),
521
510
            ("2", {
522
 
                "bzrdir_format": format2._matchingcontroldir,
 
511
                "bzrdir_format": format2._matchingbzrdir,
523
512
                "intertree_class": formats[1][1],
524
513
                "workingtree_format": formats[1][2],
525
514
                "workingtree_format_to": formats[1][3],
541
530
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
542
531
 
543
532
    def test_assertEqualStat_equal(self):
544
 
        from ..bzr.tests.test_dirstate import _FakeStat
 
533
        from .test_dirstate import _FakeStat
545
534
        self.build_tree(["foo"])
546
535
        real = os.lstat("foo")
547
536
        fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
548
 
                         real.st_dev, real.st_ino, real.st_mode)
 
537
            real.st_dev, real.st_ino, real.st_mode)
549
538
        self.assertEqualStat(real, fake)
550
539
 
551
540
    def test_assertEqualStat_notequal(self):
552
541
        self.build_tree(["foo", "longname"])
553
542
        self.assertRaises(AssertionError, self.assertEqualStat,
554
 
                          os.lstat("foo"), os.lstat("longname"))
 
543
            os.lstat("foo"), os.lstat("longname"))
555
544
 
556
545
    def test_assertPathExists(self):
557
546
        self.assertPathExists('.')
611
600
        self.assertFalse(osutils.lexists('dir'))
612
601
        self.assertIsInstance(tree, memorytree.MemoryTree)
613
602
        self.assertEqual(format.repository_format.__class__,
614
 
                         tree.branch.repository._format.__class__)
 
603
            tree.branch.repository._format.__class__)
615
604
 
616
605
    def test_make_branch_builder(self):
617
606
        builder = self.make_branch_builder('dir')
635
624
                         the_branch.repository._format.__class__)
636
625
        self.assertEqual(repo_format.get_format_string(),
637
626
                         self.get_transport().get_bytes(
638
 
            'dir/.bzr/repository/format'))
 
627
                            'dir/.bzr/repository/format'))
639
628
 
640
629
    def test_make_branch_builder_with_format_name(self):
641
630
        builder = self.make_branch_builder('dir', format='knit')
646
635
        dir_format = controldir.format_registry.make_controldir('knit')
647
636
        self.assertEqual(dir_format.repository_format.__class__,
648
637
                         the_branch.repository._format.__class__)
649
 
        self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
 
638
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
650
639
                         self.get_transport().get_bytes(
651
 
                             'dir/.bzr/repository/format'))
 
640
                            'dir/.bzr/repository/format'))
652
641
 
653
642
    def test_dangling_locks_cause_failures(self):
654
643
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
687
676
 
688
677
    def test_get_readonly_url_http(self):
689
678
        from .http_server import HttpServer
690
 
        from ..transport.http import HttpTransport
 
679
        from ..transport.http import HttpTransportBase
691
680
        self.transport_server = test_server.LocalURLServer
692
681
        self.transport_readonly_server = HttpServer
693
682
        # calling get_readonly_transport() gives us a HTTP server instance.
696
685
        # the transport returned may be any HttpTransportBase subclass
697
686
        t = transport.get_transport_from_url(url)
698
687
        t2 = transport.get_transport_from_url(url2)
699
 
        self.assertIsInstance(t, HttpTransport)
700
 
        self.assertIsInstance(t2, HttpTransport)
 
688
        self.assertIsInstance(t, HttpTransportBase)
 
689
        self.assertIsInstance(t2, HttpTransportBase)
701
690
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
702
691
 
703
692
    def test_is_directory(self):
706
695
        self.build_tree(['a_dir/', 'a_file'], transport=t)
707
696
        self.assertIsDirectory('a_dir', t)
708
697
        self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
709
 
        self.assertRaises(
710
 
            AssertionError, self.assertIsDirectory, 'not_here', t)
 
698
        self.assertRaises(AssertionError, self.assertIsDirectory, 'not_here', t)
711
699
 
712
700
    def test_make_branch_builder(self):
713
701
        builder = self.make_branch_builder('dir')
751
739
        self.requireFeature(features.lsprof_feature)
752
740
        terminal = testtools.testresult.doubles.ExtendedTestResult()
753
741
        result = tests.ProfileResult(terminal)
754
 
 
755
742
        class Sample(tests.TestCase):
756
743
            def a(self):
757
744
                self.sample_function()
758
 
 
759
745
            def sample_function(self):
760
746
                pass
761
747
        test = Sample("a")
781
767
        class ShortDelayTestCase(tests.TestCase):
782
768
            def test_short_delay(self):
783
769
                time.sleep(0.003)
784
 
 
785
770
            def test_short_benchmark(self):
786
771
                self.time(time.sleep, 0.003)
787
772
        self.check_timing(ShortDelayTestCase('test_short_delay'),
804
789
 
805
790
        This is used to exercise the test framework.
806
791
        """
807
 
        self.time(str, b'hello', errors='replace')
808
 
        self.time(str, b'world', errors='replace')
 
792
        self.time(unicode, 'hello', errors='replace')
 
793
        self.time(unicode, 'world', errors='replace')
809
794
 
810
795
    def test_lsprofiling(self):
811
796
        """Verbose test result prints lsprof statistics from test cases."""
836
821
        # this should appear in the output stream of our test result.
837
822
        output = result_stream.getvalue()
838
823
        self.assertContainsRe(output,
839
 
                              r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
840
 
        self.assertContainsRe(output,
841
 
                              r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
842
 
        self.assertContainsRe(output,
843
 
                              r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
844
 
        self.assertContainsRe(output,
845
 
                              r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
 
824
            r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
 
825
        self.assertContainsRe(output,
 
826
            r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
 
827
        self.assertContainsRe(output,
 
828
            r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
 
829
        self.assertContainsRe(output,
 
830
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
846
831
 
847
832
    def test_uses_time_from_testtools(self):
848
833
        """Test case timings in verbose results should use testtools times"""
849
834
        import datetime
850
 
 
851
835
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
852
836
            def startTest(self, test):
853
837
                self.time(datetime.datetime.utcfromtimestamp(1.145))
854
838
                super(TimeAddedVerboseTestResult, self).startTest(test)
855
 
 
856
839
            def addSuccess(self, test):
857
840
                self.time(datetime.datetime.utcfromtimestamp(51.147))
858
841
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
859
 
 
860
842
            def report_tests_starting(self): pass
861
843
        sio = StringIO()
862
844
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
866
848
        """Using knownFailure should trigger several result actions."""
867
849
        class InstrumentedTestResult(tests.ExtendedTestResult):
868
850
            def stopTestRun(self): pass
869
 
 
870
851
            def report_tests_starting(self): pass
871
 
 
872
852
            def report_known_failure(self, test, err=None, details=None):
873
853
                self._call = test, 'known failure'
874
854
        result = InstrumentedTestResult(None, None, None, None)
875
 
 
876
855
        class Test(tests.TestCase):
877
856
            def test_function(self):
878
857
                self.knownFailure('failed!')
899
878
            )
900
879
        _get_test("test_xfail").run(result)
901
880
        self.assertContainsRe(result_stream.getvalue(),
902
 
                              "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
903
 
                              "\\s*(?:Text attachment: )?reason"
904
 
                              "(?:\n-+\n|: {{{)"
905
 
                              "this_fails"
906
 
                              "(?:\n-+\n|}}}\n)")
 
881
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
882
            "\\s*(?:Text attachment: )?reason"
 
883
            "(?:\n-+\n|: {{{)"
 
884
            "this_fails"
 
885
            "(?:\n-+\n|}}}\n)")
907
886
 
908
887
    def get_passing_test(self):
909
888
        """Return a test object that can't be run usefully."""
915
894
        """Test the behaviour of invoking addNotSupported."""
916
895
        class InstrumentedTestResult(tests.ExtendedTestResult):
917
896
            def stopTestRun(self): pass
918
 
 
919
897
            def report_tests_starting(self): pass
920
 
 
921
898
            def report_unsupported(self, test, feature):
922
899
                self._call = test, feature
923
900
        result = InstrumentedTestResult(None, None, None, None)
961
938
    def test_unavailable_exception(self):
962
939
        """An UnavailableFeature being raised should invoke addNotSupported."""
963
940
        class InstrumentedTestResult(tests.ExtendedTestResult):
964
 
            def stopTestRun(self):
965
 
                pass
966
 
 
967
 
            def report_tests_starting(self):
968
 
                pass
969
 
 
 
941
            def stopTestRun(self): pass
 
942
            def report_tests_starting(self): pass
970
943
            def addNotSupported(self, test, feature):
971
944
                self._call = test, feature
972
945
        result = InstrumentedTestResult(None, None, None, None)
973
946
        feature = features.Feature()
974
 
 
975
947
        class Test(tests.TestCase):
976
948
            def test_function(self):
977
949
                raise tests.UnavailableFeature(feature)
1010
982
        """Starting the first test should trigger startTests."""
1011
983
        class InstrumentedTestResult(tests.ExtendedTestResult):
1012
984
            calls = 0
1013
 
 
1014
 
            def startTests(self):
1015
 
                self.calls += 1
 
985
            def startTests(self): self.calls += 1
1016
986
        result = InstrumentedTestResult(None, None, None, None)
1017
 
 
1018
987
        def test_function():
1019
988
            pass
1020
989
        test = unittest.FunctionTestCase(test_function)
1025
994
        """With multiple tests startTests should still only be called once"""
1026
995
        class InstrumentedTestResult(tests.ExtendedTestResult):
1027
996
            calls = 0
1028
 
 
1029
 
            def startTests(self):
1030
 
                self.calls += 1
 
997
            def startTests(self): self.calls += 1
1031
998
        result = InstrumentedTestResult(None, None, None, None)
1032
999
        suite = unittest.TestSuite([
1033
1000
            unittest.FunctionTestCase(lambda: None),
1067
1034
                self.expectFailure('failed', self.assertTrue, False)
1068
1035
        test = unittest.TestSuite()
1069
1036
        test.addTest(Test("known_failure_test"))
1070
 
 
1071
1037
        def failing_test():
1072
1038
            raise AssertionError('foo')
1073
1039
        test.addTest(unittest.FunctionTestCase(failing_test))
1074
1040
        stream = StringIO()
1075
1041
        runner = tests.TextTestRunner(stream=stream)
1076
 
        self.run_test_runner(runner, test)
1077
 
        self.assertContainsRe(
1078
 
            stream.getvalue(),
 
1042
        result = self.run_test_runner(runner, test)
 
1043
        lines = stream.getvalue().splitlines()
 
1044
        self.assertContainsRe(stream.getvalue(),
1079
1045
            '(?sm)^brz selftest.*$'
1080
1046
            '.*'
1081
1047
            '^======================================================================\n'
1082
1048
            '^FAIL: failing_test\n'
1083
1049
            '^----------------------------------------------------------------------\n'
1084
1050
            'Traceback \\(most recent call last\\):\n'
1085
 
            '  .*'  # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1051
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1086
1052
            '    raise AssertionError\\(\'foo\'\\)\n'
1087
1053
            '.*'
1088
1054
            '^----------------------------------------------------------------------\n'
1099
1065
        test = Test("known_failure_test")
1100
1066
        stream = StringIO()
1101
1067
        runner = tests.TextTestRunner(stream=stream)
1102
 
        self.run_test_runner(runner, test)
 
1068
        result = self.run_test_runner(runner, test)
1103
1069
        self.assertContainsRe(stream.getvalue(),
1104
 
                              '\n'
1105
 
                              '-*\n'
1106
 
                              'Ran 1 test in .*\n'
1107
 
                              '\n'
1108
 
                              'OK \\(known_failures=1\\)\n')
 
1070
            '\n'
 
1071
            '-*\n'
 
1072
            'Ran 1 test in .*\n'
 
1073
            '\n'
 
1074
            'OK \\(known_failures=1\\)\n')
1109
1075
 
1110
1076
    def test_unexpected_success_bad(self):
1111
1077
        class Test(tests.TestCase):
1112
1078
            def test_truth(self):
1113
1079
                self.expectFailure("No absolute truth", self.assertTrue, True)
1114
1080
        runner = tests.TextTestRunner(stream=StringIO())
1115
 
        self.run_test_runner(runner, Test("test_truth"))
 
1081
        result = self.run_test_runner(runner, Test("test_truth"))
1116
1082
        self.assertContainsRe(runner.stream.getvalue(),
1117
 
                              "=+\n"
1118
 
                              "FAIL: \\S+\\.test_truth\n"
1119
 
                              "-+\n"
1120
 
                              "(?:.*\n)*"
1121
 
                              "\\s*(?:Text attachment: )?reason"
1122
 
                              "(?:\n-+\n|: {{{)"
1123
 
                              "No absolute truth"
1124
 
                              "(?:\n-+\n|}}}\n)"
1125
 
                              "(?:.*\n)*"
1126
 
                              "-+\n"
1127
 
                              "Ran 1 test in .*\n"
1128
 
                              "\n"
1129
 
                              "FAILED \\(failures=1\\)\n\\Z")
 
1083
            "=+\n"
 
1084
            "FAIL: \\S+\.test_truth\n"
 
1085
            "-+\n"
 
1086
            "(?:.*\n)*"
 
1087
            "\\s*(?:Text attachment: )?reason"
 
1088
            "(?:\n-+\n|: {{{)"
 
1089
            "No absolute truth"
 
1090
            "(?:\n-+\n|}}}\n)"
 
1091
            "(?:.*\n)*"
 
1092
            "-+\n"
 
1093
            "Ran 1 test in .*\n"
 
1094
            "\n"
 
1095
            "FAILED \\(failures=1\\)\n\\Z")
1130
1096
 
1131
1097
    def test_result_decorator(self):
1132
1098
        # decorate results
1133
1099
        calls = []
1134
 
 
1135
1100
        class LoggingDecorator(ExtendedToOriginalDecorator):
1136
1101
            def startTest(self, test):
1137
1102
                ExtendedToOriginalDecorator.startTest(self, test)
1138
1103
                calls.append('start')
1139
 
        test = unittest.FunctionTestCase(lambda: None)
 
1104
        test = unittest.FunctionTestCase(lambda:None)
1140
1105
        stream = StringIO()
1141
1106
        runner = tests.TextTestRunner(stream=stream,
1142
 
                                      result_decorators=[LoggingDecorator])
1143
 
        self.run_test_runner(runner, test)
 
1107
            result_decorators=[LoggingDecorator])
 
1108
        result = self.run_test_runner(runner, test)
1144
1109
        self.assertLength(1, calls)
1145
1110
 
1146
1111
    def test_skipped_test(self):
1157
1122
 
1158
1123
    def test_skipped_from_setup(self):
1159
1124
        calls = []
1160
 
 
1161
1125
        class SkippedSetupTest(tests.TestCase):
1162
1126
 
1163
1127
            def setUp(self):
1180
1144
 
1181
1145
    def test_skipped_from_test(self):
1182
1146
        calls = []
1183
 
 
1184
1147
        class SkippedTest(tests.TestCase):
1185
1148
 
1186
1149
            def setUp(self):
1214
1177
        self.assertTrue(result.wasSuccessful())
1215
1178
        self.assertTrue(result.wasStrictlySuccessful())
1216
1179
        self.assertContainsRe(out.getvalue(),
1217
 
                              r'(?m)not_applicable_test  * N/A')
 
1180
                r'(?m)not_applicable_test  * N/A')
1218
1181
        self.assertContainsRe(out.getvalue(),
1219
 
                              r'(?m)^    this test never runs')
 
1182
                r'(?m)^    this test never runs')
1220
1183
 
1221
1184
    def test_unsupported_features_listed(self):
1222
1185
        """When unsupported features are encountered they are detailed."""
1223
1186
        class Feature1(features.Feature):
1224
 
            def _probe(self):
1225
 
                return False
1226
 
 
 
1187
            def _probe(self): return False
1227
1188
        class Feature2(features.Feature):
1228
 
            def _probe(self):
1229
 
                return False
 
1189
            def _probe(self): return False
1230
1190
        # create sample tests
1231
1191
        test1 = SampleTestCase('_test_pass')
1232
1192
        test1._test_needs_features = [Feature1()]
1237
1197
        test.addTest(test2)
1238
1198
        stream = StringIO()
1239
1199
        runner = tests.TextTestRunner(stream=stream)
1240
 
        self.run_test_runner(runner, test)
 
1200
        result = self.run_test_runner(runner, test)
1241
1201
        lines = stream.getvalue().splitlines()
1242
1202
        self.assertEqual([
1243
1203
            'OK',
1255
1215
        stream = StringIO()
1256
1216
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1257
1217
        # Need to use the CountingDecorator as that's what sets num_tests
1258
 
        self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1218
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1259
1219
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1260
1220
 
1261
1221
    def test_startTestRun(self):
1262
1222
        """run should call result.startTestRun()"""
1263
1223
        calls = []
1264
 
 
1265
1224
        class LoggingDecorator(ExtendedToOriginalDecorator):
1266
1225
            def startTestRun(self):
1267
1226
                ExtendedToOriginalDecorator.startTestRun(self)
1268
1227
                calls.append('startTestRun')
1269
 
        test = unittest.FunctionTestCase(lambda: None)
 
1228
        test = unittest.FunctionTestCase(lambda:None)
1270
1229
        stream = StringIO()
1271
1230
        runner = tests.TextTestRunner(stream=stream,
1272
 
                                      result_decorators=[LoggingDecorator])
1273
 
        self.run_test_runner(runner, test)
 
1231
            result_decorators=[LoggingDecorator])
 
1232
        result = self.run_test_runner(runner, test)
1274
1233
        self.assertLength(1, calls)
1275
1234
 
1276
1235
    def test_stopTestRun(self):
1277
1236
        """run should call result.stopTestRun()"""
1278
1237
        calls = []
1279
 
 
1280
1238
        class LoggingDecorator(ExtendedToOriginalDecorator):
1281
1239
            def stopTestRun(self):
1282
1240
                ExtendedToOriginalDecorator.stopTestRun(self)
1283
1241
                calls.append('stopTestRun')
1284
 
        test = unittest.FunctionTestCase(lambda: None)
 
1242
        test = unittest.FunctionTestCase(lambda:None)
1285
1243
        stream = StringIO()
1286
1244
        runner = tests.TextTestRunner(stream=stream,
1287
 
                                      result_decorators=[LoggingDecorator])
1288
 
        self.run_test_runner(runner, test)
 
1245
            result_decorators=[LoggingDecorator])
 
1246
        result = self.run_test_runner(runner, test)
1289
1247
        self.assertLength(1, calls)
1290
1248
 
1291
1249
    def test_unicode_test_output_on_ascii_stream(self):
1294
1252
            def test_log_unicode(self):
1295
1253
                self.log(u"\u2606")
1296
1254
                self.fail("Now print that log!")
1297
 
        bio = BytesIO()
1298
 
        out = TextIOWrapper(bio, 'ascii', 'backslashreplace')
 
1255
        out = StringIO()
1299
1256
        self.overrideAttr(osutils, "get_terminal_encoding",
1300
 
                          lambda trace=False: "ascii")
1301
 
        self.run_test_runner(
1302
 
            tests.TextTestRunner(stream=out),
 
1257
            lambda trace=False: "ascii")
 
1258
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
1303
1259
            FailureWithUnicode("test_log_unicode"))
1304
 
        out.flush()
1305
 
        self.assertContainsRe(bio.getvalue(),
1306
 
                              b"(?:Text attachment: )?log"
1307
 
                              b"(?:\n-+\n|: {{{)"
1308
 
                              b"\\d+\\.\\d+  \\\\u2606"
1309
 
                              b"(?:\n-+\n|}}}\n)")
 
1260
        self.assertContainsRe(out.getvalue(),
 
1261
            "(?:Text attachment: )?log"
 
1262
            "(?:\n-+\n|: {{{)"
 
1263
            "\d+\.\d+  \\\\u2606"
 
1264
            "(?:\n-+\n|}}}\n)")
1310
1265
 
1311
1266
 
1312
1267
class SampleTestCase(tests.TestCase):
1314
1269
    def _test_pass(self):
1315
1270
        pass
1316
1271
 
1317
 
 
1318
1272
class _TestException(Exception):
1319
1273
    pass
1320
1274
 
1337
1291
    def test_assertLength_shows_sequence_in_failure(self):
1338
1292
        a_list = [1, 2, 3]
1339
1293
        exception = self.assertRaises(AssertionError, self.assertLength, 2,
1340
 
                                      a_list)
 
1294
            a_list)
1341
1295
        self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
1342
 
                         exception.args[0])
 
1296
            exception.args[0])
1343
1297
 
1344
1298
    def test_base_setUp_not_called_causes_failure(self):
1345
1299
        class TestCaseWithBrokenSetUp(tests.TestCase):
1346
1300
            def setUp(self):
1347
 
                pass  # does not call TestCase.setUp
1348
 
 
 
1301
                pass # does not call TestCase.setUp
1349
1302
            def test_foo(self):
1350
1303
                pass
1351
1304
        test = TestCaseWithBrokenSetUp('test_foo')
1357
1310
    def test_base_tearDown_not_called_causes_failure(self):
1358
1311
        class TestCaseWithBrokenTearDown(tests.TestCase):
1359
1312
            def tearDown(self):
1360
 
                pass  # does not call TestCase.tearDown
1361
 
 
 
1313
                pass # does not call TestCase.tearDown
1362
1314
            def test_foo(self):
1363
1315
                pass
1364
1316
        test = TestCaseWithBrokenTearDown('test_foo')
1389
1341
        """
1390
1342
        self.change_selftest_debug_flags({'allow_debug'})
1391
1343
        breezy.debug.debug_flags = {'a-flag'}
1392
 
 
1393
1344
        class TestThatRecordsFlags(tests.TestCase):
1394
1345
            def test_foo(nested_self):
1395
1346
                self.flags = set(breezy.debug.debug_flags)
1440
1391
        self.change_selftest_debug_flags({'allow_debug'})
1441
1392
        # Now run a test that modifies debug.debug_flags.
1442
1393
        breezy.debug.debug_flags = {'original-state'}
1443
 
 
1444
1394
        class TestThatModifiesFlags(tests.TestCase):
1445
1395
            def test_foo(self):
1446
1396
                breezy.debug.debug_flags = {'modified'}
1489
1439
        self.time(time.sleep, 0.007)
1490
1440
 
1491
1441
    def test_time_creates_benchmark_in_result(self):
1492
 
        """The TestCase.time() method accumulates a benchmark time."""
 
1442
        """Test that the TestCase.time() method accumulates a benchmark time."""
1493
1443
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1494
1444
        output_stream = StringIO()
1495
1445
        result = breezy.tests.VerboseTestResult(
1506
1456
        # Note this test won't fail with hooks that the core library doesn't
1507
1457
        # use - but it trigger with a plugin that adds hooks, so its still a
1508
1458
        # useful warning in that case.
1509
 
        self.assertEqual(breezy.branch.BranchHooks(),
1510
 
                         breezy.branch.Branch.hooks)
 
1459
        self.assertEqual(breezy.branch.BranchHooks(), breezy.branch.Branch.hooks)
1511
1460
        self.assertEqual(
1512
1461
            breezy.bzr.smart.server.SmartServerHooks(),
1513
1462
            breezy.bzr.smart.server.SmartTCPServer.hooks)
1537
1486
        self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1538
1487
 
1539
1488
    def test_open_bzrdir_safe_roots(self):
1540
 
        # even a memory transport should fail to open when its url isn't
 
1489
        # even a memory transport should fail to open when its url isn't 
1541
1490
        # permitted.
1542
1491
        # Manually set one up (TestCase doesn't and shouldn't provide magic
1543
1492
        # machinery)
1547
1496
        t = transport.get_transport_from_url(transport_server.get_url())
1548
1497
        controldir.ControlDir.create(t.base)
1549
1498
        self.assertRaises(errors.BzrError,
1550
 
                          controldir.ControlDir.open_from_transport, t)
 
1499
            controldir.ControlDir.open_from_transport, t)
1551
1500
        # But if we declare this as safe, we can open the bzrdir.
1552
1501
        self.permit_url(t.base)
1553
1502
        self._bzr_selftest_roots.append(t.base)
1556
1505
    def test_requireFeature_available(self):
1557
1506
        """self.requireFeature(available) is a no-op."""
1558
1507
        class Available(features.Feature):
1559
 
            def _probe(self):
1560
 
                return True
 
1508
            def _probe(self):return True
1561
1509
        feature = Available()
1562
1510
        self.requireFeature(feature)
1563
1511
 
1564
1512
    def test_requireFeature_unavailable(self):
1565
1513
        """self.requireFeature(unavailable) raises UnavailableFeature."""
1566
1514
        class Unavailable(features.Feature):
1567
 
            def _probe(self):
1568
 
                return False
 
1515
            def _probe(self):return False
1569
1516
        feature = Unavailable()
1570
1517
        self.assertRaises(tests.UnavailableFeature,
1571
1518
                          self.requireFeature, feature)
1577
1524
    def test_run_enabled_unittest_result(self):
1578
1525
        """Test we revert to regular behaviour when the test is enabled."""
1579
1526
        test = SampleTestCase('_test_pass')
1580
 
 
1581
1527
        class EnabledFeature(object):
1582
1528
            def available(self):
1583
1529
                return True
1589
1535
        self.assertEqual([], result.failures)
1590
1536
 
1591
1537
    def test_run_disabled_unittest_result(self):
1592
 
        """Test our compatibility for disabled tests with unittest results."""
 
1538
        """Test our compatability for disabled tests with unittest results."""
1593
1539
        test = SampleTestCase('_test_pass')
1594
 
 
1595
1540
        class DisabledFeature(object):
1596
1541
            def available(self):
1597
1542
                return False
1605
1550
    def test_run_disabled_supporting_result(self):
1606
1551
        """Test disabled tests behaviour with support aware results."""
1607
1552
        test = SampleTestCase('_test_pass')
1608
 
 
1609
1553
        class DisabledFeature(object):
1610
1554
            def __eq__(self, other):
1611
1555
                return isinstance(other, DisabledFeature)
1612
 
 
1613
1556
            def available(self):
1614
1557
                return False
1615
1558
        the_feature = DisabledFeature()
1616
1559
        test._test_needs_features = [the_feature]
1617
 
 
1618
1560
        class InstrumentedTestResult(unittest.TestResult):
1619
1561
            def __init__(self):
1620
1562
                unittest.TestResult.__init__(self)
1621
1563
                self.calls = []
1622
 
 
1623
1564
            def startTest(self, test):
1624
1565
                self.calls.append(('startTest', test))
1625
 
 
1626
1566
            def stopTest(self, test):
1627
1567
                self.calls.append(('stopTest', test))
1628
 
 
1629
1568
            def addNotSupported(self, test, feature):
1630
1569
                self.calls.append(('addNotSupported', test, feature))
1631
1570
        result = InstrumentedTestResult()
1644
1583
        self.assertEqual([], self._bzr_selftest_roots)
1645
1584
        self.start_server(transport_server)
1646
1585
        self.assertSubset([transport_server.get_url()],
1647
 
                          self._bzr_selftest_roots)
 
1586
            self._bzr_selftest_roots)
1648
1587
 
1649
1588
    def test_assert_list_raises_on_generator(self):
1650
1589
        def generator_which_will_raise():
1682
1621
            raise _NotTestException()
1683
1622
 
1684
1623
        # Wrong exceptions are not intercepted
1685
 
        self.assertRaises(
1686
 
            _NotTestException,
 
1624
        self.assertRaises(_NotTestException,
1687
1625
            self.assertListRaises, _TestException, wrong_exception)
1688
 
        self.assertRaises(
1689
 
            _NotTestException,
 
1626
        self.assertRaises(_NotTestException,
1690
1627
            self.assertListRaises, _TestException, wrong_exception_generator)
1691
1628
 
1692
1629
    def test_assert_list_raises_no_exception(self):
1698
1635
            yield 2
1699
1636
 
1700
1637
        self.assertRaises(AssertionError,
1701
 
                          self.assertListRaises, _TestException, success)
 
1638
            self.assertListRaises, _TestException, success)
1702
1639
 
1703
 
        self.assertRaises(
1704
 
            AssertionError,
 
1640
        self.assertRaises(AssertionError,
1705
1641
            self.assertListRaises, _TestException, success_generator)
1706
1642
 
1707
1643
    def _run_successful_test(self, test):
1711
1647
        return result
1712
1648
 
1713
1649
    def test_overrideAttr_without_value(self):
1714
 
        self.test_attr = 'original'  # Define a test attribute
1715
 
        obj = self  # Make 'obj' visible to the embedded test
1716
 
 
 
1650
        self.test_attr = 'original' # Define a test attribute
 
1651
        obj = self # Make 'obj' visible to the embedded test
1717
1652
        class Test(tests.TestCase):
1718
1653
 
1719
1654
            def setUp(self):
1730
1665
        self.assertEqual('original', obj.test_attr)
1731
1666
 
1732
1667
    def test_overrideAttr_with_value(self):
1733
 
        self.test_attr = 'original'  # Define a test attribute
1734
 
        obj = self  # Make 'obj' visible to the embedded test
1735
 
 
 
1668
        self.test_attr = 'original' # Define a test attribute
 
1669
        obj = self # Make 'obj' visible to the embedded test
1736
1670
        class Test(tests.TestCase):
1737
1671
 
1738
1672
            def setUp(self):
1748
1682
 
1749
1683
    def test_overrideAttr_with_no_existing_value_and_value(self):
1750
1684
        # Do not define the test_attribute
1751
 
        obj = self  # Make 'obj' visible to the embedded test
1752
 
 
 
1685
        obj = self # Make 'obj' visible to the embedded test
1753
1686
        class Test(tests.TestCase):
1754
1687
 
1755
1688
            def setUp(self):
1765
1698
 
1766
1699
    def test_overrideAttr_with_no_existing_value_and_no_value(self):
1767
1700
        # Do not define the test_attribute
1768
 
        obj = self  # Make 'obj' visible to the embedded test
1769
 
 
 
1701
        obj = self # Make 'obj' visible to the embedded test
1770
1702
        class Test(tests.TestCase):
1771
1703
 
1772
1704
            def setUp(self):
1785
1717
        calls = self.recordCalls(
1786
1718
            test_selftest, '_add_numbers')
1787
1719
        self.assertEqual(test_selftest._add_numbers(2, 10),
1788
 
                         12)
 
1720
            12)
1789
1721
        self.assertEqual(calls, [((2, 10), {})])
1790
1722
 
1791
1723
 
1796
1728
class _MissingFeature(features.Feature):
1797
1729
    def _probe(self):
1798
1730
        return False
1799
 
 
1800
 
 
1801
1731
missing_feature = _MissingFeature()
1802
1732
 
1803
1733
 
1840
1770
    return ExampleTests(name)
1841
1771
 
1842
1772
 
 
1773
def _get_skip_reasons(result):
 
1774
    # GZ 2017-06-06: Newer testtools doesn't have this, uses detail instead
 
1775
    return result.skip_reasons
 
1776
 
 
1777
 
1843
1778
class TestTestCaseLogDetails(tests.TestCase):
1844
1779
 
1845
1780
    def _run_test(self, test_name):
1853
1788
        self.assertEqual(1, len(result.failures))
1854
1789
        result_content = result.failures[0][1]
1855
1790
        self.assertContainsRe(result_content,
1856
 
                              '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1791
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1857
1792
        self.assertContainsRe(result_content, 'this was a failing test')
1858
1793
 
1859
1794
    def test_error_has_log(self):
1861
1796
        self.assertEqual(1, len(result.errors))
1862
1797
        result_content = result.errors[0][1]
1863
1798
        self.assertContainsRe(result_content,
1864
 
                              '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1799
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1865
1800
        self.assertContainsRe(result_content, 'this test errored')
1866
1801
 
1867
1802
    def test_skip_has_no_log(self):
1868
1803
        result = self._run_test('test_skip')
1869
 
        reasons = result.skip_reasons
 
1804
        reasons = _get_skip_reasons(result)
1870
1805
        self.assertEqual({'reason'}, set(reasons))
1871
1806
        skips = reasons['reason']
1872
1807
        self.assertEqual(1, len(skips))
1877
1812
        # testtools doesn't know about addNotSupported, so it just gets
1878
1813
        # considered as a skip
1879
1814
        result = self._run_test('test_missing_feature')
1880
 
        reasons = result.skip_reasons
1881
 
        self.assertEqual({str(missing_feature)}, set(reasons))
1882
 
        skips = reasons[str(missing_feature)]
 
1815
        reasons = _get_skip_reasons(result)
 
1816
        self.assertEqual({missing_feature}, set(reasons))
 
1817
        skips = reasons[missing_feature]
1883
1818
        self.assertEqual(1, len(skips))
1884
1819
        test = skips[0]
1885
1820
        self.assertFalse('log' in test.getDetails())
1889
1824
        self.assertEqual(1, len(result.expectedFailures))
1890
1825
        result_content = result.expectedFailures[0][1]
1891
1826
        self.assertNotContainsRe(result_content,
1892
 
                                 '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1827
            '(?m)^(?:Text attachment: )?log(?:$|: )')
1893
1828
        self.assertNotContainsRe(result_content, 'test with expected failure')
1894
1829
 
1895
1830
    def test_unexpected_success_has_log(self):
1968
1903
 
1969
1904
    def test_assert_isinstance(self):
1970
1905
        self.assertIsInstance(2, int)
1971
 
        self.assertIsInstance(u'', str)
 
1906
        self.assertIsInstance(u'', (str, text_type))
1972
1907
        e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1973
 
        self.assertIn(
1974
 
            str(e),
1975
 
            ["None is an instance of <type 'NoneType'> rather than "
1976
 
             "<type 'int'>",
1977
 
             "None is an instance of <class 'NoneType'> rather than "
1978
 
             "<class 'int'>"])
 
1908
        self.assertEqual(str(e),
 
1909
            "None is an instance of <type 'NoneType'> rather than <type 'int'>")
1979
1910
        self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1980
1911
        e = self.assertRaises(AssertionError,
1981
 
                              self.assertIsInstance, None, int,
1982
 
                              "it's just not")
1983
 
        self.assertEqual(
1984
 
            str(e),
1985
 
            "None is an instance of <class 'NoneType'> rather "
1986
 
            "than <class 'int'>: it's just not")
 
1912
            self.assertIsInstance, None, int, "it's just not")
 
1913
        self.assertEqual(str(e),
 
1914
            "None is an instance of <type 'NoneType'> rather than <type 'int'>"
 
1915
            ": it's just not")
1987
1916
 
1988
1917
    def test_assertEndsWith(self):
1989
1918
        self.assertEndsWith('foo', 'oo')
1993
1922
        e = self.assertRaises(AssertionError,
1994
1923
                              self.assertEqualDiff, '', '\n')
1995
1924
        self.assertEqual(str(e),
1996
 
                         # Don't blink ! The '+' applies to the second string
1997
 
                         'first string is missing a final newline.\n+ \n')
 
1925
                          # Don't blink ! The '+' applies to the second string
 
1926
                          'first string is missing a final newline.\n+ \n')
1998
1927
        e = self.assertRaises(AssertionError,
1999
1928
                              self.assertEqualDiff, '\n', '')
2000
1929
        self.assertEqual(str(e),
2001
 
                         # Don't blink ! The '-' applies to the second string
2002
 
                         'second string is missing a final newline.\n- \n')
 
1930
                          # Don't blink ! The '-' applies to the second string
 
1931
                          'second string is missing a final newline.\n- \n')
2003
1932
 
2004
1933
 
2005
1934
class TestDeprecations(tests.TestCase):
2008
1937
        sample_object = ApplyDeprecatedHelper()
2009
1938
        # calling an undeprecated callable raises an assertion
2010
1939
        self.assertRaises(AssertionError, self.applyDeprecated,
2011
 
                          deprecated_in((0, 11, 0)),
2012
 
                          sample_object.sample_normal_method)
 
1940
            deprecated_in((0, 11, 0)),
 
1941
            sample_object.sample_normal_method)
2013
1942
        self.assertRaises(AssertionError, self.applyDeprecated,
2014
 
                          deprecated_in((0, 11, 0)),
2015
 
                          sample_undeprecated_function, "a param value")
 
1943
            deprecated_in((0, 11, 0)),
 
1944
            sample_undeprecated_function, "a param value")
2016
1945
        # calling a deprecated callable (function or method) with the wrong
2017
1946
        # expected deprecation fails.
2018
1947
        self.assertRaises(AssertionError, self.applyDeprecated,
2019
 
                          deprecated_in((0, 10, 0)),
2020
 
                          sample_object.sample_deprecated_method,
2021
 
                          "a param value")
 
1948
            deprecated_in((0, 10, 0)),
 
1949
            sample_object.sample_deprecated_method, "a param value")
2022
1950
        self.assertRaises(AssertionError, self.applyDeprecated,
2023
 
                          deprecated_in((0, 10, 0)),
2024
 
                          sample_deprecated_function)
 
1951
            deprecated_in((0, 10, 0)),
 
1952
            sample_deprecated_function)
2025
1953
        # calling a deprecated callable (function or method) with the right
2026
1954
        # expected deprecation returns the functions result.
2027
 
        self.assertEqual(
2028
 
            "a param value",
2029
 
            self.applyDeprecated(
2030
 
                deprecated_in((0, 11, 0)),
2031
 
                sample_object.sample_deprecated_method, "a param value"))
 
1955
        self.assertEqual("a param value",
 
1956
            self.applyDeprecated(deprecated_in((0, 11, 0)),
 
1957
            sample_object.sample_deprecated_method, "a param value"))
2032
1958
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
2033
 
                                                 sample_deprecated_function))
 
1959
            sample_deprecated_function))
2034
1960
        # calling a nested deprecation with the wrong deprecation version
2035
1961
        # fails even if a deeper nested function was deprecated with the
2036
1962
        # supplied version.
2037
 
        self.assertRaises(
2038
 
            AssertionError, self.applyDeprecated,
 
1963
        self.assertRaises(AssertionError, self.applyDeprecated,
2039
1964
            deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
2040
1965
        # calling a nested deprecation with the right deprecation value
2041
1966
        # returns the calls result.
2042
 
        self.assertEqual(
2043
 
            2, self.applyDeprecated(
2044
 
                deprecated_in((0, 10, 0)),
2045
 
                sample_object.sample_nested_deprecation))
 
1967
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
 
1968
            sample_object.sample_nested_deprecation))
2046
1969
 
2047
1970
    def test_callDeprecated(self):
2048
1971
        def testfunc(be_deprecated, result=None):
2079
2002
 
2080
2003
    def test_make_branch_and_tree_with_format(self):
2081
2004
        # we should be able to supply a format to make_branch_and_tree
2082
 
        self.make_branch_and_tree(
2083
 
            'a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
 
2005
        self.make_branch_and_tree('a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
2084
2006
        self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
2085
2007
                              breezy.bzr.bzrdir.BzrDirMetaFormat1)
2086
2008
 
2100
2022
        base = tree.controldir.root_transport.base
2101
2023
        self.assertStartsWith(base, 'file://')
2102
2024
        self.assertEqual(tree.controldir.root_transport,
2103
 
                         tree.branch.controldir.root_transport)
 
2025
                tree.branch.controldir.root_transport)
2104
2026
        self.assertEqual(tree.controldir.root_transport,
2105
 
                         tree.branch.repository.controldir.root_transport)
 
2027
                tree.branch.repository.controldir.root_transport)
2106
2028
 
2107
2029
 
2108
2030
class SelfTestHelper(object):
2109
2031
 
2110
2032
    def run_selftest(self, **kwargs):
2111
2033
        """Run selftest returning its output."""
2112
 
        bio = BytesIO()
2113
 
        output = TextIOWrapper(bio, 'utf-8')
 
2034
        output = StringIO()
2114
2035
        old_transport = breezy.tests.default_transport
2115
2036
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2116
2037
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2119
2040
        finally:
2120
2041
            breezy.tests.default_transport = old_transport
2121
2042
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2122
 
        output.flush()
2123
 
        output.detach()
2124
 
        bio.seek(0)
2125
 
        return bio
 
2043
        output.seek(0)
 
2044
        return output
2126
2045
 
2127
2046
 
2128
2047
class TestSelftest(tests.TestCase, SelfTestHelper):
2129
2048
    """Tests of breezy.tests.selftest."""
2130
2049
 
2131
 
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(
2132
 
            self):
 
2050
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
2133
2051
        factory_called = []
2134
 
 
2135
2052
        def factory():
2136
2053
            factory_called.append(True)
2137
2054
            return TestUtil.TestSuite()
2138
2055
        out = StringIO()
2139
2056
        err = StringIO()
2140
2057
        self.apply_redirected(out, err, None, breezy.tests.selftest,
2141
 
                              test_suite_factory=factory)
 
2058
            test_suite_factory=factory)
2142
2059
        self.assertEqual([True], factory_called)
2143
2060
 
2144
2061
    def factory(self):
2145
2062
        """A test suite factory."""
2146
2063
        class Test(tests.TestCase):
2147
 
            def id(self):
2148
 
                return __name__ + ".Test." + self._testMethodName
2149
 
 
2150
2064
            def a(self):
2151
2065
                pass
2152
 
 
2153
2066
            def b(self):
2154
2067
                pass
2155
 
 
2156
 
            def c(telf):
 
2068
            def c(self):
2157
2069
                pass
2158
2070
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2159
2071
 
2160
2072
    def test_list_only(self):
2161
2073
        output = self.run_selftest(test_suite_factory=self.factory,
2162
 
                                   list_only=True)
 
2074
            list_only=True)
2163
2075
        self.assertEqual(3, len(output.readlines()))
2164
2076
 
2165
2077
    def test_list_only_filtered(self):
2166
2078
        output = self.run_selftest(test_suite_factory=self.factory,
2167
 
                                   list_only=True, pattern="Test.b")
2168
 
        self.assertEndsWith(output.getvalue(), b"Test.b\n")
 
2079
            list_only=True, pattern="Test.b")
 
2080
        self.assertEndsWith(output.getvalue(), "Test.b\n")
2169
2081
        self.assertLength(1, output.readlines())
2170
2082
 
2171
2083
    def test_list_only_excludes(self):
2172
2084
        output = self.run_selftest(test_suite_factory=self.factory,
2173
 
                                   list_only=True, exclude_pattern="Test.b")
2174
 
        self.assertNotContainsRe(b"Test.b", output.getvalue())
 
2085
            list_only=True, exclude_pattern="Test.b")
 
2086
        self.assertNotContainsRe("Test.b", output.getvalue())
2175
2087
        self.assertLength(2, output.readlines())
2176
2088
 
2177
2089
    def test_lsprof_tests(self):
2178
2090
        self.requireFeature(features.lsprof_feature)
2179
2091
        results = []
2180
 
 
2181
2092
        class Test(object):
2182
2093
            def __call__(test, result):
2183
2094
                test.run(result)
2184
 
 
2185
2095
            def run(test, result):
2186
2096
                results.append(result)
2187
 
 
2188
2097
            def countTestCases(self):
2189
2098
                return 1
2190
2099
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2194
2103
    def test_random(self):
2195
2104
        # test randomising by listing a number of tests.
2196
2105
        output_123 = self.run_selftest(test_suite_factory=self.factory,
2197
 
                                       list_only=True, random_seed="123")
 
2106
            list_only=True, random_seed="123")
2198
2107
        output_234 = self.run_selftest(test_suite_factory=self.factory,
2199
 
                                       list_only=True, random_seed="234")
 
2108
            list_only=True, random_seed="234")
2200
2109
        self.assertNotEqual(output_123, output_234)
2201
2110
        # "Randominzing test order..\n\n
2202
2111
        self.assertLength(5, output_123.readlines())
2205
2114
    def test_random_reuse_is_same_order(self):
2206
2115
        # test randomising by listing a number of tests.
2207
2116
        expected = self.run_selftest(test_suite_factory=self.factory,
2208
 
                                     list_only=True, random_seed="123")
 
2117
            list_only=True, random_seed="123")
2209
2118
        repeated = self.run_selftest(test_suite_factory=self.factory,
2210
 
                                     list_only=True, random_seed="123")
 
2119
            list_only=True, random_seed="123")
2211
2120
        self.assertEqual(expected.getvalue(), repeated.getvalue())
2212
2121
 
2213
2122
    def test_runner_class(self):
2214
2123
        self.requireFeature(features.subunit)
2215
2124
        from subunit import ProtocolTestCase
2216
 
        stream = self.run_selftest(
2217
 
            runner_class=tests.SubUnitBzrRunnerv1,
 
2125
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2218
2126
            test_suite_factory=self.factory)
2219
2127
        test = ProtocolTestCase(stream)
2220
2128
        result = unittest.TestResult()
2223
2131
 
2224
2132
    def test_starting_with_single_argument(self):
2225
2133
        output = self.run_selftest(test_suite_factory=self.factory,
2226
 
                                   starting_with=[
2227
 
                                       'breezy.tests.test_selftest.Test.a'],
2228
 
                                   list_only=True)
2229
 
        self.assertEqual(b'breezy.tests.test_selftest.Test.a\n',
2230
 
                         output.getvalue())
 
2134
            starting_with=['breezy.tests.test_selftest.Test.a'],
 
2135
            list_only=True)
 
2136
        self.assertEqual('breezy.tests.test_selftest.Test.a\n',
 
2137
            output.getvalue())
2231
2138
 
2232
2139
    def test_starting_with_multiple_argument(self):
2233
 
        output = self.run_selftest(
2234
 
            test_suite_factory=self.factory,
 
2140
        output = self.run_selftest(test_suite_factory=self.factory,
2235
2141
            starting_with=['breezy.tests.test_selftest.Test.a',
2236
 
                           'breezy.tests.test_selftest.Test.b'],
 
2142
                'breezy.tests.test_selftest.Test.b'],
2237
2143
            list_only=True)
2238
 
        self.assertEqual(b'breezy.tests.test_selftest.Test.a\n'
2239
 
                         b'breezy.tests.test_selftest.Test.b\n',
2240
 
                         output.getvalue())
 
2144
        self.assertEqual('breezy.tests.test_selftest.Test.a\n'
 
2145
            'breezy.tests.test_selftest.Test.b\n',
 
2146
            output.getvalue())
2241
2147
 
2242
2148
    def check_transport_set(self, transport_server):
2243
2149
        captured_transport = []
2244
 
 
2245
2150
        def seen_transport(a_transport):
2246
2151
            captured_transport.append(a_transport)
2247
 
 
2248
2152
        class Capture(tests.TestCase):
2249
2153
            def a(self):
2250
2154
                seen_transport(breezy.tests.default_transport)
2251
 
 
2252
2155
        def factory():
2253
2156
            return TestUtil.TestSuite([Capture("a")])
2254
 
        self.run_selftest(transport=transport_server,
2255
 
                          test_suite_factory=factory)
 
2157
        self.run_selftest(transport=transport_server, test_suite_factory=factory)
2256
2158
        self.assertEqual(transport_server, captured_transport[0])
2257
2159
 
2258
2160
    def test_transport_sftp(self):
2269
2171
 
2270
2172
    def test_load_list(self):
2271
2173
        # Provide a list with one test - this test.
2272
 
        test_id_line = b'%s\n' % self.id().encode('ascii')
 
2174
        test_id_line = '%s\n' % self.id()
2273
2175
        self.build_tree_contents([('test.list', test_id_line)])
2274
2176
        # And generate a list of the tests in  the suite.
2275
2177
        stream = self.run_selftest(load_list='test.list', list_only=True)
2278
2180
    def test_load_unknown(self):
2279
2181
        # Provide a list with one test - this test.
2280
2182
        # And generate a list of the tests in  the suite.
2281
 
        self.assertRaises(errors.NoSuchFile, self.run_selftest,
2282
 
                          load_list='missing file name', list_only=True)
 
2183
        err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
2184
            load_list='missing file name', list_only=True)
2283
2185
 
2284
2186
 
2285
2187
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2288
2190
 
2289
2191
    def run_subunit_stream(self, test_name):
2290
2192
        from subunit import ProtocolTestCase
2291
 
 
2292
2193
        def factory():
2293
2194
            return TestUtil.TestSuite([_get_test(test_name)])
2294
 
        stream = self.run_selftest(
2295
 
            runner_class=tests.SubUnitBzrRunnerv1,
 
2195
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2296
2196
            test_suite_factory=factory)
2297
2197
        test = ProtocolTestCase(stream)
2298
2198
        result = testtools.TestResult()
2303
2203
    def test_fail_has_log(self):
2304
2204
        content, result = self.run_subunit_stream('test_fail')
2305
2205
        self.assertEqual(1, len(result.failures))
2306
 
        self.assertContainsRe(content, b'(?m)^log$')
2307
 
        self.assertContainsRe(content, b'this test will fail')
 
2206
        self.assertContainsRe(content, '(?m)^log$')
 
2207
        self.assertContainsRe(content, 'this test will fail')
2308
2208
 
2309
2209
    def test_error_has_log(self):
2310
2210
        content, result = self.run_subunit_stream('test_error')
2311
 
        self.assertContainsRe(content, b'(?m)^log$')
2312
 
        self.assertContainsRe(content, b'this test errored')
 
2211
        self.assertContainsRe(content, '(?m)^log$')
 
2212
        self.assertContainsRe(content, 'this test errored')
2313
2213
 
2314
2214
    def test_skip_has_no_log(self):
2315
2215
        content, result = self.run_subunit_stream('test_skip')
2316
 
        self.assertNotContainsRe(content, b'(?m)^log$')
2317
 
        self.assertNotContainsRe(content, b'this test will be skipped')
2318
 
        reasons = result.skip_reasons
 
2216
        self.assertNotContainsRe(content, '(?m)^log$')
 
2217
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2218
        reasons = _get_skip_reasons(result)
2319
2219
        self.assertEqual({'reason'}, set(reasons))
2320
2220
        skips = reasons['reason']
2321
2221
        self.assertEqual(1, len(skips))
2322
 
        # test = skips[0]
 
2222
        test = skips[0]
2323
2223
        # RemotedTestCase doesn't preserve the "details"
2324
 
        # self.assertFalse('log' in test.getDetails())
 
2224
        ## self.assertFalse('log' in test.getDetails())
2325
2225
 
2326
2226
    def test_missing_feature_has_no_log(self):
2327
2227
        content, result = self.run_subunit_stream('test_missing_feature')
2328
 
        self.assertNotContainsRe(content, b'(?m)^log$')
2329
 
        self.assertNotContainsRe(content, b'missing the feature')
2330
 
        reasons = result.skip_reasons
 
2228
        self.assertNotContainsRe(content, '(?m)^log$')
 
2229
        self.assertNotContainsRe(content, 'missing the feature')
 
2230
        reasons = _get_skip_reasons(result)
2331
2231
        self.assertEqual({'_MissingFeature\n'}, set(reasons))
2332
2232
        skips = reasons['_MissingFeature\n']
2333
2233
        self.assertEqual(1, len(skips))
2334
 
        # test = skips[0]
 
2234
        test = skips[0]
2335
2235
        # RemotedTestCase doesn't preserve the "details"
2336
 
        # self.assertFalse('log' in test.getDetails())
 
2236
        ## self.assertFalse('log' in test.getDetails())
2337
2237
 
2338
2238
    def test_xfail_has_no_log(self):
2339
2239
        content, result = self.run_subunit_stream('test_xfail')
2340
 
        self.assertNotContainsRe(content, b'(?m)^log$')
2341
 
        self.assertNotContainsRe(content, b'test with expected failure')
 
2240
        self.assertNotContainsRe(content, '(?m)^log$')
 
2241
        self.assertNotContainsRe(content, 'test with expected failure')
2342
2242
        self.assertEqual(1, len(result.expectedFailures))
2343
2243
        result_content = result.expectedFailures[0][1]
2344
2244
        self.assertNotContainsRe(result_content,
2345
 
                                 '(?m)^(?:Text attachment: )?log(?:$|: )')
 
2245
            '(?m)^(?:Text attachment: )?log(?:$|: )')
2346
2246
        self.assertNotContainsRe(result_content, 'test with expected failure')
2347
2247
 
2348
2248
    def test_unexpected_success_has_log(self):
2349
2249
        content, result = self.run_subunit_stream('test_unexpected_success')
2350
 
        self.assertContainsRe(content, b'(?m)^log$')
2351
 
        self.assertContainsRe(content, b'test with unexpected success')
 
2250
        self.assertContainsRe(content, '(?m)^log$')
 
2251
        self.assertContainsRe(content, 'test with unexpected success')
 
2252
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
 
2253
        #                success, if a min version check is added remove this
 
2254
        from subunit import TestProtocolClient as _Client
 
2255
        if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
 
2256
            self.expectFailure('subunit treats "unexpectedSuccess"'
 
2257
                               ' as a plain success',
 
2258
                self.assertEqual, 1, len(result.unexpectedSuccesses))
2352
2259
        self.assertEqual(1, len(result.unexpectedSuccesses))
2353
 
        # test = result.unexpectedSuccesses[0]
 
2260
        test = result.unexpectedSuccesses[0]
2354
2261
        # RemotedTestCase doesn't preserve the "details"
2355
 
        # self.assertTrue('log' in test.getDetails())
 
2262
        ## self.assertTrue('log' in test.getDetails())
2356
2263
 
2357
2264
    def test_success_has_no_log(self):
2358
2265
        content, result = self.run_subunit_stream('test_success')
2359
2266
        self.assertEqual(1, result.testsRun)
2360
 
        self.assertNotContainsRe(content, b'(?m)^log$')
2361
 
        self.assertNotContainsRe(content, b'this test succeeds')
 
2267
        self.assertNotContainsRe(content, '(?m)^log$')
 
2268
        self.assertNotContainsRe(content, 'this test succeeds')
2362
2269
 
2363
2270
 
2364
2271
class TestRunBzr(tests.TestCase):
2365
2272
 
2366
 
    result = 0
2367
2273
    out = ''
2368
2274
    err = ''
2369
2275
 
2370
 
    def _run_bzr_core(self, argv, encoding=None, stdin=None,
2371
 
                      stdout=None, stderr=None, working_dir=None):
 
2276
    def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
 
2277
                         working_dir=None):
2372
2278
        """Override _run_bzr_core to test how it is invoked by run_bzr.
2373
2279
 
2374
2280
        Attempts to run bzr from inside this class don't actually run it.
2377
2283
        only need to test that it passes the right parameters to run_bzr.
2378
2284
        """
2379
2285
        self.argv = list(argv)
 
2286
        self.retcode = retcode
2380
2287
        self.encoding = encoding
2381
2288
        self.stdin = stdin
2382
2289
        self.working_dir = working_dir
2383
 
        stdout.write(self.out)
2384
 
        stderr.write(self.err)
2385
 
        return self.result
 
2290
        return self.retcode, self.out, self.err
2386
2291
 
2387
2292
    def test_run_bzr_error(self):
2388
2293
        self.out = "It sure does!\n"
2389
 
        self.result = 34
2390
2294
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2391
2295
        self.assertEqual(['rocks'], self.argv)
 
2296
        self.assertEqual(34, self.retcode)
2392
2297
        self.assertEqual('It sure does!\n', out)
2393
2298
        self.assertEqual(out, self.out)
2394
2299
        self.assertEqual('', err)
2397
2302
    def test_run_bzr_error_regexes(self):
2398
2303
        self.out = ''
2399
2304
        self.err = "bzr: ERROR: foobarbaz is not versioned"
2400
 
        self.result = 3
2401
2305
        out, err = self.run_bzr_error(
2402
2306
            ["bzr: ERROR: foobarbaz is not versioned"],
2403
2307
            ['file-id', 'foobarbaz'])
2405
2309
    def test_encoding(self):
2406
2310
        """Test that run_bzr passes encoding to _run_bzr_core"""
2407
2311
        self.run_bzr('foo bar')
2408
 
        self.assertEqual(osutils.get_user_encoding(), self.encoding)
 
2312
        self.assertEqual(None, self.encoding)
2409
2313
        self.assertEqual(['foo', 'bar'], self.argv)
2410
2314
 
2411
2315
        self.run_bzr('foo bar', encoding='baz')
2412
2316
        self.assertEqual('baz', self.encoding)
2413
2317
        self.assertEqual(['foo', 'bar'], self.argv)
2414
2318
 
 
2319
    def test_retcode(self):
 
2320
        """Test that run_bzr passes retcode to _run_bzr_core"""
 
2321
        # Default is retcode == 0
 
2322
        self.run_bzr('foo bar')
 
2323
        self.assertEqual(0, self.retcode)
 
2324
        self.assertEqual(['foo', 'bar'], self.argv)
 
2325
 
 
2326
        self.run_bzr('foo bar', retcode=1)
 
2327
        self.assertEqual(1, self.retcode)
 
2328
        self.assertEqual(['foo', 'bar'], self.argv)
 
2329
 
 
2330
        self.run_bzr('foo bar', retcode=None)
 
2331
        self.assertEqual(None, self.retcode)
 
2332
        self.assertEqual(['foo', 'bar'], self.argv)
 
2333
 
 
2334
        self.run_bzr(['foo', 'bar'], retcode=3)
 
2335
        self.assertEqual(3, self.retcode)
 
2336
        self.assertEqual(['foo', 'bar'], self.argv)
 
2337
 
2415
2338
    def test_stdin(self):
2416
2339
        # test that the stdin keyword to run_bzr is passed through to
2417
2340
        # _run_bzr_core as-is. We do this by overriding
2506
2429
 
2507
2430
class StubProcess(object):
2508
2431
    """A stub process for testing run_bzr_subprocess."""
2509
 
 
 
2432
    
2510
2433
    def __init__(self, out="", err="", retcode=0):
2511
2434
        self.out = out
2512
2435
        self.err = err
2528
2451
                             working_dir=None,
2529
2452
                             allow_plugins=False):
2530
2453
        """capture what run_bzr_subprocess tries to do."""
2531
 
        self.subprocess_calls.append(
2532
 
            {'process_args': process_args,
2533
 
             'env_changes': env_changes,
2534
 
             'skip_if_plan_to_signal': skip_if_plan_to_signal,
2535
 
             'working_dir': working_dir, 'allow_plugins': allow_plugins})
 
2454
        self.subprocess_calls.append({'process_args':process_args,
 
2455
            'env_changes':env_changes,
 
2456
            'skip_if_plan_to_signal':skip_if_plan_to_signal,
 
2457
            'working_dir':working_dir, 'allow_plugins':allow_plugins})
2536
2458
        return self.next_subprocess
2537
2459
 
2538
2460
 
2548
2470
        self.next_subprocess = process
2549
2471
        try:
2550
2472
            result = self.run_bzr_subprocess(*args, **kwargs)
2551
 
        except BaseException:
 
2473
        except:
2552
2474
            self.next_subprocess = None
2553
2475
            for key, expected in expected_args.items():
2554
2476
                self.assertEqual(expected, self.subprocess_calls[-1][key])
2561
2483
 
2562
2484
    def test_run_bzr_subprocess(self):
2563
2485
        """The run_bzr_helper_external command behaves nicely."""
2564
 
        self.assertRunBzrSubprocess({'process_args': ['--version']},
2565
 
                                    StubProcess(), '--version')
2566
 
        self.assertRunBzrSubprocess({'process_args': ['--version']},
2567
 
                                    StubProcess(), ['--version'])
 
2486
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2487
            StubProcess(), '--version')
 
2488
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2489
            StubProcess(), ['--version'])
2568
2490
        # retcode=None disables retcode checking
2569
 
        result = self.assertRunBzrSubprocess(
2570
 
            {}, StubProcess(retcode=3), '--version', retcode=None)
2571
 
        result = self.assertRunBzrSubprocess(
2572
 
            {}, StubProcess(out="is free software"), '--version')
 
2491
        result = self.assertRunBzrSubprocess({},
 
2492
            StubProcess(retcode=3), '--version', retcode=None)
 
2493
        result = self.assertRunBzrSubprocess({},
 
2494
            StubProcess(out="is free software"), '--version')
2573
2495
        self.assertContainsRe(result[0], 'is free software')
2574
2496
        # Running a subcommand that is missing errors
2575
2497
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2576
 
                          {'process_args': ['--versionn']
2577
 
                           }, StubProcess(retcode=3),
2578
 
                          '--versionn')
 
2498
            {'process_args':['--versionn']}, StubProcess(retcode=3),
 
2499
            '--versionn')
2579
2500
        # Unless it is told to expect the error from the subprocess
2580
 
        result = self.assertRunBzrSubprocess(
2581
 
            {}, StubProcess(retcode=3), '--versionn', retcode=3)
 
2501
        result = self.assertRunBzrSubprocess({},
 
2502
            StubProcess(retcode=3), '--versionn', retcode=3)
2582
2503
        # Or to ignore retcode checking
2583
 
        result = self.assertRunBzrSubprocess(
2584
 
            {}, StubProcess(err="unknown command", retcode=3),
2585
 
            '--versionn', retcode=None)
 
2504
        result = self.assertRunBzrSubprocess({},
 
2505
            StubProcess(err="unknown command", retcode=3), '--versionn',
 
2506
            retcode=None)
2586
2507
        self.assertContainsRe(result[1], 'unknown command')
2587
2508
 
2588
2509
    def test_env_change_passes_through(self):
2589
2510
        self.assertRunBzrSubprocess(
2590
 
            {'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}},
 
2511
            {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2591
2512
            StubProcess(), '',
2592
 
            env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None})
 
2513
            env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2593
2514
 
2594
2515
    def test_no_working_dir_passed_as_None(self):
2595
2516
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2596
2517
 
2597
2518
    def test_no_working_dir_passed_through(self):
2598
2519
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2599
 
                                    working_dir='dir')
 
2520
            working_dir='dir')
2600
2521
 
2601
2522
    def test_run_bzr_subprocess_no_plugins(self):
2602
2523
        self.assertRunBzrSubprocess({'allow_plugins': False},
2603
 
                                    StubProcess(), '')
 
2524
            StubProcess(), '')
2604
2525
 
2605
2526
    def test_allow_plugins(self):
2606
2527
        self.assertRunBzrSubprocess({'allow_plugins': True},
2607
 
                                    StubProcess(), '', allow_plugins=True)
 
2528
            StubProcess(), '', allow_plugins=True)
2608
2529
 
2609
2530
 
2610
2531
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2672
2593
    def test_set_env(self):
2673
2594
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2674
2595
        # set in the child
2675
 
 
2676
2596
        def check_environment():
2677
2597
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2678
2598
        self.check_popen_state = check_environment
2679
2599
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2680
 
                          env_changes={'EXISTANT_ENV_VAR': 'set variable'})
 
2600
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
2681
2601
        # not set in theparent
2682
2602
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2683
2603
 
2684
2604
    def test_run_bzr_subprocess_env_del(self):
2685
2605
        """run_bzr_subprocess can remove environment variables too."""
2686
2606
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2687
 
 
2688
2607
        def check_environment():
2689
2608
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2690
2609
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2691
2610
        self.check_popen_state = check_environment
2692
2611
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2693
 
                          env_changes={'EXISTANT_ENV_VAR': None})
 
2612
                          env_changes={'EXISTANT_ENV_VAR':None})
2694
2613
        # Still set in parent
2695
2614
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2696
2615
        del os.environ['EXISTANT_ENV_VAR']
2697
2616
 
2698
2617
    def test_env_del_missing(self):
2699
2618
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2700
 
 
2701
2619
        def check_environment():
2702
2620
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2703
2621
        self.check_popen_state = check_environment
2704
2622
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2705
 
                          env_changes={'NON_EXISTANT_ENV_VAR': None})
 
2623
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
2706
2624
 
2707
2625
    def test_working_dir(self):
2708
2626
        """Test that we can specify the working dir for the child"""
 
2627
        orig_getcwd = osutils.getcwd
 
2628
        orig_chdir = os.chdir
2709
2629
        chdirs = []
2710
 
 
2711
2630
        def chdir(path):
2712
2631
            chdirs.append(path)
2713
2632
        self.overrideAttr(os, 'chdir', chdir)
2714
 
 
2715
2633
        def getcwd():
2716
2634
            return 'current'
2717
2635
        self.overrideAttr(osutils, 'getcwd', getcwd)
2735
2653
        self.disable_missing_extensions_warning()
2736
2654
        process = self.start_bzr_subprocess(['wait-until-signalled'],
2737
2655
                                            skip_if_plan_to_signal=True)
2738
 
        self.assertEqual(b'running\n', process.stdout.readline())
 
2656
        self.assertEqual('running\n', process.stdout.readline())
2739
2657
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2740
2658
                                            retcode=3)
2741
 
        self.assertEqual(b'', result[0])
2742
 
        self.assertEqual(b'brz: interrupted\n', result[1])
 
2659
        self.assertEqual('', result[0])
 
2660
        self.assertEqual('brz: interrupted\n', result[1])
2743
2661
 
2744
2662
 
2745
2663
class TestSelftestFiltering(tests.TestCase):
2754
2672
 
2755
2673
    def test_condition_id_re(self):
2756
2674
        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2757
 
                     'test_condition_id_re')
 
2675
            'test_condition_id_re')
2758
2676
        filtered_suite = tests.filter_suite_by_condition(
2759
2677
            self.suite, tests.condition_id_re('test_condition_id_re'))
2760
2678
        self.assertEqual([test_name], _test_ids(filtered_suite))
2773
2691
        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2774
2692
        start1 = klass + 'test_condition_id_starts'
2775
2693
        start2 = klass + 'test_condition_id_in'
2776
 
        test_names = [klass + 'test_condition_id_in_list',
 
2694
        test_names = [ klass + 'test_condition_id_in_list',
2777
2695
                      klass + 'test_condition_id_startswith',
2778
 
                      ]
 
2696
                     ]
2779
2697
        filtered_suite = tests.filter_suite_by_condition(
2780
2698
            self.suite, tests.condition_id_startswith([start1, start2]))
2781
2699
        self.assertEqual(test_names, _test_ids(filtered_suite))
2789
2707
 
2790
2708
    def test_exclude_tests_by_condition(self):
2791
2709
        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2792
 
                         'test_exclude_tests_by_condition')
2793
 
        filtered_suite = tests.exclude_tests_by_condition(
2794
 
            self.suite, lambda x: x.id() == excluded_name)
 
2710
            'test_exclude_tests_by_condition')
 
2711
        filtered_suite = tests.exclude_tests_by_condition(self.suite,
 
2712
            lambda x:x.id() == excluded_name)
2795
2713
        self.assertEqual(len(self.all_names) - 1,
2796
 
                         filtered_suite.countTestCases())
 
2714
            filtered_suite.countTestCases())
2797
2715
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
2798
2716
        remaining_names = list(self.all_names)
2799
2717
        remaining_names.remove(excluded_name)
2804
2722
        filtered_suite = tests.exclude_tests_by_re(self.suite,
2805
2723
                                                   'exclude_tests_by_re')
2806
2724
        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2807
 
                         'test_exclude_tests_by_re')
 
2725
            'test_exclude_tests_by_re')
2808
2726
        self.assertEqual(len(self.all_names) - 1,
2809
 
                         filtered_suite.countTestCases())
 
2727
            filtered_suite.countTestCases())
2810
2728
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
2811
2729
        remaining_names = list(self.all_names)
2812
2730
        remaining_names.remove(excluded_name)
2814
2732
 
2815
2733
    def test_filter_suite_by_condition(self):
2816
2734
        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2817
 
                     'test_filter_suite_by_condition')
2818
 
        filtered_suite = tests.filter_suite_by_condition(
2819
 
            self.suite, lambda x: x.id() == test_name)
 
2735
            'test_filter_suite_by_condition')
 
2736
        filtered_suite = tests.filter_suite_by_condition(self.suite,
 
2737
            lambda x:x.id() == test_name)
2820
2738
        self.assertEqual([test_name], _test_ids(filtered_suite))
2821
2739
 
2822
2740
    def test_filter_suite_by_re(self):
2823
2741
        filtered_suite = tests.filter_suite_by_re(self.suite,
2824
2742
                                                  'test_filter_suite_by_r')
2825
2743
        filtered_names = _test_ids(filtered_suite)
2826
 
        self.assertEqual(
2827
 
            filtered_names, ['breezy.tests.test_selftest.'
2828
 
                             'TestSelftestFiltering.test_filter_suite_by_re'])
 
2744
        self.assertEqual(filtered_names, ['breezy.tests.test_selftest.'
 
2745
            'TestSelftestFiltering.test_filter_suite_by_re'])
2829
2746
 
2830
2747
    def test_filter_suite_by_id_list(self):
2831
2748
        test_list = ['breezy.tests.test_selftest.'
2856
2773
 
2857
2774
    def test_preserve_input(self):
2858
2775
        # NB: Surely this is something in the stdlib to do this?
2859
 
        self.assertIs(self.suite, tests.preserve_input(self.suite))
2860
 
        self.assertEqual("@#$", tests.preserve_input("@#$"))
 
2776
        self.assertTrue(self.suite is tests.preserve_input(self.suite))
 
2777
        self.assertTrue("@#$" is tests.preserve_input("@#$"))
2861
2778
 
2862
2779
    def test_randomize_suite(self):
2863
2780
        randomized_suite = tests.randomize_suite(self.suite)
2879
2796
        condition = tests.condition_id_re('test_filter_suite_by_r')
2880
2797
        split_suite = tests.split_suite_by_condition(self.suite, condition)
2881
2798
        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2882
 
                         'test_filter_suite_by_re')
 
2799
            'test_filter_suite_by_re')
2883
2800
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2884
2801
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2885
2802
        remaining_names = list(self.all_names)
2891
2808
        split_suite = tests.split_suite_by_re(self.suite,
2892
2809
                                              'test_filter_suite_by_r')
2893
2810
        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2894
 
                         'test_filter_suite_by_re')
 
2811
            'test_filter_suite_by_re')
2895
2812
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2896
2813
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2897
2814
        remaining_names = list(self.all_names)
2940
2857
        self.permit_url(url)
2941
2858
        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2942
2859
        self.assertEqual(out, '')
2943
 
        self.assertContainsRe(
2944
 
            err, 'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
 
2860
        self.assertContainsRe(err,
 
2861
            'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
2945
2862
 
2946
2863
 
2947
2864
class TestTestLoader(tests.TestCase):
2951
2868
        """Gets a TestLoader and a module with one test in it."""
2952
2869
        loader = TestUtil.TestLoader()
2953
2870
        module = {}
2954
 
 
2955
2871
        class Stub(tests.TestCase):
2956
2872
            def test_foo(self):
2957
2873
                pass
2958
 
 
2959
2874
        class MyModule(object):
2960
2875
            pass
2961
2876
        MyModule.a_class = Stub
2965
2880
 
2966
2881
    def test_module_no_load_tests_attribute_loads_classes(self):
2967
2882
        loader, module = self._get_loader_and_module()
2968
 
        self.assertEqual(1, loader.loadTestsFromModule(
2969
 
            module).countTestCases())
 
2883
        self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases())
2970
2884
 
2971
2885
    def test_module_load_tests_attribute_gets_called(self):
2972
2886
        loader, module = self._get_loader_and_module()
2973
 
 
2974
2887
        def load_tests(loader, standard_tests, pattern):
2975
2888
            result = loader.suiteClass()
2976
2889
            for test in tests.iter_suite_tests(standard_tests):
2986
2899
        loader = TestUtil.TestLoader()
2987
2900
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2988
2901
        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
2989
 
                         _test_ids(suite))
 
2902
                          _test_ids(suite))
2990
2903
 
2991
2904
    def test_load_tests_from_module_name_with_bogus_module_name(self):
2992
2905
        loader = TestUtil.TestLoader()
3009
2922
 
3010
2923
        suite = TestUtil.TestSuite()
3011
2924
        for id in test_id_list:
3012
 
            t = Stub('test_foo')
 
2925
            t  = Stub('test_foo')
3013
2926
            t.id = _create_test_id(id)
3014
2927
            suite.addTest(t)
3015
2928
        return suite
3063
2976
        dupes = loader.suiteClass()
3064
2977
        for test in tests.iter_suite_tests(suite):
3065
2978
            dupes.addTest(test)
3066
 
            dupes.addTest(test)  # Add it again
 
2979
            dupes.addTest(test) # Add it again
3067
2980
 
3068
 
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing', ]
 
2981
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',]
3069
2982
        not_found, duplicates = tests.suite_matches_id_list(
3070
2983
            dupes, test_list)
3071
2984
        self.assertEqual([], not_found)
3072
2985
        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
3073
 
                         duplicates)
 
2986
                          duplicates)
3074
2987
 
3075
2988
 
3076
2989
class TestTestSuite(tests.TestCase):
3102
3015
    def test_test_suite(self):
3103
3016
        # test_suite() loads the entire test suite to operate. To avoid this
3104
3017
        # overhead, and yet still be confident that things are happening,
3105
 
        # we temporarily replace two functions used by test_suite with
 
3018
        # we temporarily replace two functions used by test_suite with 
3106
3019
        # test doubles that supply a few sample tests to load, and check they
3107
3020
        # are loaded.
3108
3021
        calls = []
3109
 
 
3110
3022
        def testmod_names():
3111
3023
            calls.append("testmod_names")
3112
3024
            return [
3115
3027
                'breezy.tests.test_selftest',
3116
3028
                ]
3117
3029
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3118
 
 
3119
3030
        def doctests():
3120
3031
            calls.append("modules_to_doctest")
3121
3032
            if __doc__ is None:
3131
3042
            # plugins can't be tested that way since selftest may be run with
3132
3043
            # --no-plugins
3133
3044
            ]
 
3045
        if __doc__ is not None:
 
3046
            expected_test_list.extend([
 
3047
                # modules_to_doctest
 
3048
                'breezy.timestamp.format_highres_date',
 
3049
                ])
3134
3050
        suite = tests.test_suite()
3135
 
        self.assertEqual({"testmod_names", "modules_to_doctest"}, set(calls))
 
3051
        self.assertEqual({"testmod_names", "modules_to_doctest"},
 
3052
            set(calls))
3136
3053
        self.assertSubset(expected_test_list, _test_ids(suite))
3137
3054
 
3138
3055
    def test_test_suite_list_and_start(self):
3139
 
        # We cannot test this at the same time as the main load, because we
3140
 
        # want to know that starting_with == None works. So a second load is
3141
 
        # incurred - note that the starting_with parameter causes a partial
3142
 
        # load rather than a full load so this test should be pretty quick.
3143
 
        test_list = [
3144
 
            'breezy.tests.test_selftest.TestTestSuite.test_test_suite']
 
3056
        # We cannot test this at the same time as the main load, because we want
 
3057
        # to know that starting_with == None works. So a second load is
 
3058
        # incurred - note that the starting_with parameter causes a partial load
 
3059
        # rather than a full load so this test should be pretty quick.
 
3060
        test_list = ['breezy.tests.test_selftest.TestTestSuite.test_test_suite']
3145
3061
        suite = tests.test_suite(test_list,
3146
3062
                                 ['breezy.tests.test_selftest.TestTestSuite'])
3147
 
        # test_test_suite_list_and_start is not included
 
3063
        # test_test_suite_list_and_start is not included 
3148
3064
        self.assertEqual(test_list, _test_ids(suite))
3149
3065
 
3150
3066
 
3224
3140
        self.assertEqual(test_list, _test_ids(suite))
3225
3141
 
3226
3142
    def test_exclude_tests(self):
 
3143
        test_list = ['bogus']
3227
3144
        loader = self._create_loader('bogus')
3228
3145
 
3229
3146
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3247
3164
        tpr.register('bar', 'bBB.aAA.rRR')
3248
3165
        self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
3249
3166
        self.assertThat(self.get_log(),
3250
 
                        DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3251
 
                                       doctest.ELLIPSIS))
 
3167
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3168
                           doctest.ELLIPSIS))
3252
3169
 
3253
3170
    def test_get_unknown_prefix(self):
3254
3171
        tpr = self._get_registry()
3261
3178
 
3262
3179
    def test_resolve_unknown_alias(self):
3263
3180
        tpr = self._get_registry()
3264
 
        self.assertRaises(errors.CommandError,
 
3181
        self.assertRaises(errors.BzrCommandError,
3265
3182
                          tpr.resolve_alias, 'I am not a prefix')
3266
3183
 
3267
3184
    def test_predefined_prefixes(self):
3281
3198
        def __init__(self):
3282
3199
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3283
3200
            self.leaks = []
3284
 
 
3285
3201
        def _report_thread_leak(self, test, leaks, alive):
3286
3202
            self.leaks.append((test, leaks))
3287
3203
 
3297
3213
        result.stopTestRun()
3298
3214
        self.assertEqual(result._tests_leaking_threads_count, 0)
3299
3215
        self.assertEqual(result.leaks, [])
3300
 
 
 
3216
        
3301
3217
    def test_thread_leak(self):
3302
3218
        """Ensure a thread that outlives the running of a test is reported
3303
3219
 
3308
3224
        """
3309
3225
        event = threading.Event()
3310
3226
        thread = threading.Thread(name="Leaker", target=event.wait)
3311
 
 
3312
3227
        class Test(tests.TestCase):
3313
3228
            def test_leak(self):
3314
3229
                thread.start()
3334
3249
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3335
3250
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3336
3251
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3337
 
 
3338
3252
        class Test(tests.TestCase):
3339
3253
            def test_first_leak(self):
3340
3254
                thread_b.start()
3341
 
 
3342
3255
            def test_second_no_leak(self):
3343
3256
                pass
3344
 
 
3345
3257
            def test_third_leak(self):
3346
3258
                thread_c.start()
3347
3259
                thread_a.start()
3372
3284
        def __init__(self):
3373
3285
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3374
3286
            self.postcode = None
3375
 
 
3376
3287
        def _post_mortem(self, tb=None):
3377
3288
            """Record the code object at the end of the current traceback"""
3378
3289
            tb = tb or sys.exc_info()[2]
3382
3293
                    tb = next
3383
3294
                    next = next.tb_next
3384
3295
                self.postcode = tb.tb_frame.f_code
3385
 
 
3386
3296
        def report_error(self, test, err):
3387
3297
            pass
3388
 
 
3389
3298
        def report_failure(self, test, err):
3390
3299
            pass
3391
3300
 
3447
3356
                pass
3448
3357
        suite = Stub("test_foo")
3449
3358
        calls = []
3450
 
 
3451
3359
        class MyRunner(tests.TextTestRunner):
3452
3360
            def run(self, test):
3453
3361
                calls.append(test)
3464
3372
        """To be overridden by subclasses that run tests out of process"""
3465
3373
 
3466
3374
    def _run_selftest(self, **kwargs):
3467
 
        bio = BytesIO()
3468
 
        sio = TextIOWrapper(bio, 'utf-8')
3469
 
        self._inject_stream_into_subunit(bio)
 
3375
        sio = StringIO()
 
3376
        self._inject_stream_into_subunit(sio)
3470
3377
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3471
 
        sio.flush()
3472
 
        return bio.getvalue()
 
3378
        return sio.getvalue()
3473
3379
 
3474
3380
 
3475
3381
class _ForkedSelftest(_Selftest):
3485
3391
        """
3486
3392
        from subunit import ProtocolTestCase
3487
3393
        _original_init = ProtocolTestCase.__init__
3488
 
 
3489
3394
        def _init_with_passthrough(self, *args, **kwargs):
3490
3395
            _original_init(self, *args, **kwargs)
3491
3396
            self._passthrough = stream
3516
3421
        # together due to the way subunit parses and forwards the streams,
3517
3422
        # so permit extra lines between each part of the error output.
3518
3423
        self.assertContainsRe(out,
3519
 
                              b"Traceback.*:\n"
3520
 
                              b"(?:.*\n)*"
3521
 
                              b".+ in fork_for_tests\n"
3522
 
                              b"(?:.*\n)*"
3523
 
                              b"\\s*workaround_zealous_crypto_random\\(\\)\n"
3524
 
                              b"(?:.*\n)*"
3525
 
                              b"TypeError:")
 
3424
            "Traceback.*:\n"
 
3425
            "(?:.*\n)*"
 
3426
            ".+ in fork_for_tests\n"
 
3427
            "(?:.*\n)*"
 
3428
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3429
            "(?:.*\n)*"
 
3430
            "TypeError:")
3526
3431
 
3527
3432
 
3528
3433
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3531
3436
    class Test(tests.TestCase):
3532
3437
        def test_pass(self):
3533
3438
            pass
3534
 
 
3535
3439
        def test_self_ref(self):
3536
3440
            self.also_self = self.test_self_ref
3537
 
 
3538
3441
        def test_skip(self):
3539
3442
            self.skipTest("Don't need")
3540
3443
 
3553
3456
            gc.disable()
3554
3457
        try:
3555
3458
            output = self._run_selftest(test_suite_factory=self._get_suite,
3556
 
                                        **kwargs)
 
3459
                **kwargs)
3557
3460
        finally:
3558
3461
            if gc_on:
3559
3462
                gc.enable()
3560
3463
            tests.selftest_debug_flags = old_flags
3561
 
        self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
3562
 
        self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
 
3464
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3465
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3563
3466
        return output
3564
3467
 
3565
3468
    def test_testsuite(self):
3567
3470
 
3568
3471
    def test_pattern(self):
3569
3472
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3570
 
        self.assertNotContainsRe(out, b"test_skip")
 
3473
        self.assertNotContainsRe(out, "test_skip")
3571
3474
 
3572
3475
    def test_exclude_pattern(self):
3573
3476
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3574
 
        self.assertNotContainsRe(out, b"test_skip")
 
3477
        self.assertNotContainsRe(out, "test_skip")
3575
3478
 
3576
3479
    def test_random_seed(self):
3577
3480
        self._run_selftest_with_suite(random_seed="now")
3578
3481
 
3579
3482
    def test_matching_tests_first(self):
3580
3483
        self._run_selftest_with_suite(matching_tests_first=True,
3581
 
                                      pattern="test_self_ref$")
 
3484
            pattern="test_self_ref$")
3582
3485
 
3583
3486
    def test_starting_with_and_exclude(self):
3584
3487
        out = self._run_selftest_with_suite(starting_with=["bt."],
3585
 
                                            exclude_pattern="test_skip$")
3586
 
        self.assertNotContainsRe(out, b"test_skip")
 
3488
            exclude_pattern="test_skip$")
 
3489
        self.assertNotContainsRe(out, "test_skip")
3587
3490
 
3588
3491
    def test_additonal_decorator(self):
3589
 
        self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator])
 
3492
        out = self._run_selftest_with_suite(
 
3493
            suite_decorators=[tests.TestDecorator])
3590
3494
 
3591
3495
 
3592
3496
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3595
3499
    _test_needs_features = [features.subunit]
3596
3500
 
3597
3501
    def _run_selftest_with_suite(self, **kwargs):
3598
 
        return TestUncollectedWarnings._run_selftest_with_suite(
3599
 
            self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
 
3502
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3503
            runner_class=tests.SubUnitBzrRunner, **kwargs)
3600
3504
 
3601
3505
 
3602
3506
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3609
3513
        self.assertFalse('MYVAR' in os.environ)
3610
3514
        self.overrideEnv('MYVAR', '42')
3611
3515
        # We use an embedded test to make sure we fix the _captureVar bug
3612
 
 
3613
3516
        class Test(tests.TestCase):
3614
3517
            def test_me(self):
3615
3518
                # The first call save the 42 value
3757
3660
            def id(self):
3758
3661
                # We don't need the full class path
3759
3662
                return self._testMethodName
3760
 
 
3761
3663
            def a(self):
3762
3664
                pass
3763
 
 
3764
3665
            def b(self):
3765
3666
                pass
3766
 
 
3767
3667
            def c(self):
3768
3668
                pass
3769
3669
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3791
3691
 
3792
3692
    def setUp(self):
3793
3693
        super(TestCounterHooks, self).setUp()
3794
 
 
3795
3694
        class Test(tests.TestCase):
3796
3695
 
3797
3696
            def setUp(self):
3798
3697
                super(Test, self).setUp()
3799
3698
                self.hooks = hooks.Hooks()
3800
 
                self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
 
3699
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3801
3700
                self.install_counter_hook(self.hooks, 'myhook')
3802
3701
 
3803
3702
            def no_hook(self):