/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2020-01-31 17:43:44 UTC
  • mto: This revision was merged to the branch mainline in revision 7478.
  • Revision ID: jelmer@jelmer.uk-20200131174344-qjhgqm7bdkuqj9sj
Default to running Python 3.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2013, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the test framework."""
 
18
 
 
19
import gc
 
20
import doctest
 
21
from functools import reduce
 
22
from io import BytesIO, TextIOWrapper
 
23
import os
 
24
import signal
 
25
import sys
 
26
import threading
 
27
import time
 
28
import unittest
 
29
import warnings
 
30
 
 
31
from testtools import (
 
32
    ExtendedToOriginalDecorator,
 
33
    MultiTestResult,
 
34
    )
 
35
from testtools.content import Content
 
36
from testtools.content_type import ContentType
 
37
from testtools.matchers import (
 
38
    DocTestMatches,
 
39
    Equals,
 
40
    )
 
41
import testtools.testresult.doubles
 
42
 
 
43
import breezy
 
44
from .. import (
 
45
    branchbuilder,
 
46
    controldir,
 
47
    errors,
 
48
    hooks,
 
49
    lockdir,
 
50
    memorytree,
 
51
    osutils,
 
52
    repository,
 
53
    symbol_versioning,
 
54
    tests,
 
55
    transport,
 
56
    workingtree,
 
57
    )
 
58
from ..bzr import (
 
59
    bzrdir,
 
60
    remote,
 
61
    workingtree_3,
 
62
    workingtree_4,
 
63
    )
 
64
from ..bzr import (
 
65
    groupcompress_repo,
 
66
    )
 
67
from ..git import (
 
68
    workingtree as git_workingtree,
 
69
    )
 
70
from ..sixish import (
 
71
    PY3,
 
72
    StringIO,
 
73
    text_type,
 
74
    )
 
75
from ..symbol_versioning import (
 
76
    deprecated_function,
 
77
    deprecated_in,
 
78
    deprecated_method,
 
79
    )
 
80
from . import (
 
81
    features,
 
82
    test_lsprof,
 
83
    test_server,
 
84
    TestUtil,
 
85
    )
 
86
from ..trace import note, mutter
 
87
from ..transport import memory
 
88
 
 
89
 
 
90
def _test_ids(test_suite):
 
91
    """Get the ids for the tests in a test suite."""
 
92
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
93
 
 
94
 
 
95
class MetaTestLog(tests.TestCase):
 
96
 
 
97
    def test_logging(self):
 
98
        """Test logs are captured when a test fails."""
 
99
        self.log('a test message')
 
100
        details = self.getDetails()
 
101
        log = details['log']
 
102
        self.assertThat(log.content_type, Equals(ContentType(
 
103
            "text", "plain", {"charset": "utf8"})))
 
104
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
 
105
        self.assertThat(self.get_log(),
 
106
                        DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
107
 
 
108
 
 
109
class TestTreeShape(tests.TestCaseInTempDir):
 
110
 
 
111
    def test_unicode_paths(self):
 
112
        self.requireFeature(features.UnicodeFilenameFeature)
 
113
 
 
114
        filename = u'hell\u00d8'
 
115
        self.build_tree_contents([(filename, b'contents of hello')])
 
116
        self.assertPathExists(filename)
 
117
 
 
118
 
 
119
class TestClassesAvailable(tests.TestCase):
 
120
    """As a convenience we expose Test* classes from breezy.tests"""
 
121
 
 
122
    def test_test_case(self):
 
123
        from . import TestCase
 
124
 
 
125
    def test_test_loader(self):
 
126
        from . import TestLoader
 
127
 
 
128
    def test_test_suite(self):
 
129
        from . import TestSuite
 
130
 
 
131
 
 
132
class TestTransportScenarios(tests.TestCase):
 
133
    """A group of tests that test the transport implementation adaption core.
 
134
 
 
135
    This is a meta test that the tests are applied to all available
 
136
    transports.
 
137
 
 
138
    This will be generalised in the future which is why it is in this
 
139
    test file even though it is specific to transport tests at the moment.
 
140
    """
 
141
 
 
142
    def test_get_transport_permutations(self):
 
143
        # this checks that get_test_permutations defined by the module is
 
144
        # called by the get_transport_test_permutations function.
 
145
        class MockModule(object):
 
146
            def get_test_permutations(self):
 
147
                return sample_permutation
 
148
        sample_permutation = [(1, 2), (3, 4)]
 
149
        from .per_transport import get_transport_test_permutations
 
150
        self.assertEqual(sample_permutation,
 
151
                         get_transport_test_permutations(MockModule()))
 
152
 
 
153
    def test_scenarios_include_all_modules(self):
 
154
        # this checks that the scenario generator returns as many permutations
 
155
        # as there are in all the registered transport modules - we assume if
 
156
        # this matches its probably doing the right thing especially in
 
157
        # combination with the tests for setting the right classes below.
 
158
        from .per_transport import transport_test_permutations
 
159
        from ..transport import _get_transport_modules
 
160
        modules = _get_transport_modules()
 
161
        permutation_count = 0
 
162
        for module in modules:
 
163
            try:
 
164
                permutation_count += len(reduce(getattr,
 
165
                                                (module
 
166
                                                 + ".get_test_permutations").split('.')[1:],
 
167
                                                __import__(module))())
 
168
            except errors.DependencyNotPresent:
 
169
                pass
 
170
        scenarios = transport_test_permutations()
 
171
        self.assertEqual(permutation_count, len(scenarios))
 
172
 
 
173
    def test_scenarios_include_transport_class(self):
 
174
        # This test used to know about all the possible transports and the
 
175
        # order they were returned but that seems overly brittle (mbp
 
176
        # 20060307)
 
177
        from .per_transport import transport_test_permutations
 
178
        scenarios = transport_test_permutations()
 
179
        # there are at least that many builtin transports
 
180
        self.assertTrue(len(scenarios) > 6)
 
181
        one_scenario = scenarios[0]
 
182
        self.assertIsInstance(one_scenario[0], str)
 
183
        self.assertTrue(issubclass(one_scenario[1]["transport_class"],
 
184
                                   breezy.transport.Transport))
 
185
        self.assertTrue(issubclass(one_scenario[1]["transport_server"],
 
186
                                   breezy.transport.Server))
 
187
 
 
188
 
 
189
class TestBranchScenarios(tests.TestCase):
 
190
 
 
191
    def test_scenarios(self):
 
192
        # check that constructor parameters are passed through to the adapted
 
193
        # test.
 
194
        from .per_branch import make_scenarios
 
195
        server1 = "a"
 
196
        server2 = "b"
 
197
        formats = [("c", "C"), ("d", "D")]
 
198
        scenarios = make_scenarios(server1, server2, formats)
 
199
        self.assertEqual(2, len(scenarios))
 
200
        self.assertEqual([
 
201
            ('str',
 
202
             {'branch_format': 'c',
 
203
              'bzrdir_format': 'C',
 
204
              'transport_readonly_server': 'b',
 
205
              'transport_server': 'a'}),
 
206
            ('str',
 
207
             {'branch_format': 'd',
 
208
              'bzrdir_format': 'D',
 
209
              'transport_readonly_server': 'b',
 
210
              'transport_server': 'a'})],
 
211
            scenarios)
 
212
 
 
213
 
 
214
class TestBzrDirScenarios(tests.TestCase):
 
215
 
 
216
    def test_scenarios(self):
 
217
        # check that constructor parameters are passed through to the adapted
 
218
        # test.
 
219
        from .per_controldir import make_scenarios
 
220
        vfs_factory = "v"
 
221
        server1 = "a"
 
222
        server2 = "b"
 
223
        formats = ["c", "d"]
 
224
        scenarios = make_scenarios(vfs_factory, server1, server2, formats)
 
225
        self.assertEqual([
 
226
            ('str',
 
227
             {'bzrdir_format': 'c',
 
228
              'transport_readonly_server': 'b',
 
229
              'transport_server': 'a',
 
230
              'vfs_transport_factory': 'v'}),
 
231
            ('str',
 
232
             {'bzrdir_format': 'd',
 
233
              'transport_readonly_server': 'b',
 
234
              'transport_server': 'a',
 
235
              'vfs_transport_factory': 'v'})],
 
236
            scenarios)
 
237
 
 
238
 
 
239
class TestRepositoryScenarios(tests.TestCase):
 
240
 
 
241
    def test_formats_to_scenarios(self):
 
242
        from .per_repository import formats_to_scenarios
 
243
        formats = [("(c)", remote.RemoteRepositoryFormat()),
 
244
                   ("(d)", repository.format_registry.get(
 
245
                    b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
 
246
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
247
                                                None)
 
248
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
249
                                             vfs_transport_factory="vfs")
 
250
        # no_vfs generate scenarios without vfs_transport_factory
 
251
        expected = [
 
252
            ('RemoteRepositoryFormat(c)',
 
253
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
254
              'repository_format': remote.RemoteRepositoryFormat(),
 
255
              'transport_readonly_server': 'readonly',
 
256
              'transport_server': 'server'}),
 
257
            ('RepositoryFormat2a(d)',
 
258
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
259
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
260
              'transport_readonly_server': 'readonly',
 
261
              'transport_server': 'server'})]
 
262
        self.assertEqual(expected, no_vfs_scenarios)
 
263
        self.assertEqual([
 
264
            ('RemoteRepositoryFormat(c)',
 
265
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
266
              'repository_format': remote.RemoteRepositoryFormat(),
 
267
              'transport_readonly_server': 'readonly',
 
268
              'transport_server': 'server',
 
269
              'vfs_transport_factory': 'vfs'}),
 
270
            ('RepositoryFormat2a(d)',
 
271
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
272
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
273
              'transport_readonly_server': 'readonly',
 
274
              'transport_server': 'server',
 
275
              'vfs_transport_factory': 'vfs'})],
 
276
            vfs_scenarios)
 
277
 
 
278
 
 
279
class TestTestScenarioApplication(tests.TestCase):
 
280
    """Tests for the test adaption facilities."""
 
281
 
 
282
    def test_apply_scenario(self):
 
283
        from breezy.tests import apply_scenario
 
284
        input_test = TestTestScenarioApplication("test_apply_scenario")
 
285
        # setup two adapted tests
 
286
        adapted_test1 = apply_scenario(input_test,
 
287
                                       ("new id",
 
288
                                        {"bzrdir_format": "bzr_format",
 
289
                                         "repository_format": "repo_fmt",
 
290
                                         "transport_server": "transport_server",
 
291
                                         "transport_readonly_server": "readonly-server"}))
 
292
        adapted_test2 = apply_scenario(input_test,
 
293
                                       ("new id 2", {"bzrdir_format": None}))
 
294
        # input_test should have been altered.
 
295
        self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
 
296
        # the new tests are mutually incompatible, ensuring it has
 
297
        # made new ones, and unspecified elements in the scenario
 
298
        # should not have been altered.
 
299
        self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
 
300
        self.assertEqual("repo_fmt", adapted_test1.repository_format)
 
301
        self.assertEqual("transport_server", adapted_test1.transport_server)
 
302
        self.assertEqual("readonly-server",
 
303
                         adapted_test1.transport_readonly_server)
 
304
        self.assertEqual(
 
305
            "breezy.tests.test_selftest.TestTestScenarioApplication."
 
306
            "test_apply_scenario(new id)",
 
307
            adapted_test1.id())
 
308
        self.assertEqual(None, adapted_test2.bzrdir_format)
 
309
        self.assertEqual(
 
310
            "breezy.tests.test_selftest.TestTestScenarioApplication."
 
311
            "test_apply_scenario(new id 2)",
 
312
            adapted_test2.id())
 
313
 
 
314
 
 
315
class TestInterRepositoryScenarios(tests.TestCase):
 
316
 
 
317
    def test_scenarios(self):
 
318
        # check that constructor parameters are passed through to the adapted
 
319
        # test.
 
320
        from .per_interrepository import make_scenarios
 
321
        server1 = "a"
 
322
        server2 = "b"
 
323
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
324
        scenarios = make_scenarios(server1, server2, formats)
 
325
        self.assertEqual([
 
326
            ('C0,str,str',
 
327
             {'repository_format': 'C1',
 
328
              'repository_format_to': 'C2',
 
329
              'transport_readonly_server': 'b',
 
330
              'transport_server': 'a',
 
331
              'extra_setup': 'C3'}),
 
332
            ('D0,str,str',
 
333
             {'repository_format': 'D1',
 
334
              'repository_format_to': 'D2',
 
335
              'transport_readonly_server': 'b',
 
336
              'transport_server': 'a',
 
337
              'extra_setup': 'D3'})],
 
338
            scenarios)
 
339
 
 
340
 
 
341
class TestWorkingTreeScenarios(tests.TestCase):
 
342
 
 
343
    def test_scenarios(self):
 
344
        # check that constructor parameters are passed through to the adapted
 
345
        # test.
 
346
        from .per_workingtree import make_scenarios
 
347
        server1 = "a"
 
348
        server2 = "b"
 
349
        formats = [workingtree_4.WorkingTreeFormat4(),
 
350
                   workingtree_3.WorkingTreeFormat3(),
 
351
                   workingtree_4.WorkingTreeFormat6()]
 
352
        scenarios = make_scenarios(server1, server2, formats,
 
353
                                   remote_server='c', remote_readonly_server='d',
 
354
                                   remote_backing_server='e')
 
355
        self.assertEqual([
 
356
            ('WorkingTreeFormat4',
 
357
             {'bzrdir_format': formats[0]._matchingcontroldir,
 
358
              'transport_readonly_server': 'b',
 
359
              'transport_server': 'a',
 
360
              'workingtree_format': formats[0]}),
 
361
            ('WorkingTreeFormat3',
 
362
             {'bzrdir_format': formats[1]._matchingcontroldir,
 
363
              'transport_readonly_server': 'b',
 
364
              'transport_server': 'a',
 
365
              'workingtree_format': formats[1]}),
 
366
            ('WorkingTreeFormat6',
 
367
             {'bzrdir_format': formats[2]._matchingcontroldir,
 
368
              'transport_readonly_server': 'b',
 
369
              'transport_server': 'a',
 
370
              'workingtree_format': formats[2]}),
 
371
            ('WorkingTreeFormat6,remote',
 
372
             {'bzrdir_format': formats[2]._matchingcontroldir,
 
373
              'repo_is_remote': True,
 
374
              'transport_readonly_server': 'd',
 
375
              'transport_server': 'c',
 
376
              'vfs_transport_factory': 'e',
 
377
              'workingtree_format': formats[2]}),
 
378
            ], scenarios)
 
379
 
 
380
 
 
381
class TestTreeScenarios(tests.TestCase):
 
382
 
 
383
    def test_scenarios(self):
 
384
        # the tree implementation scenario generator is meant to setup one
 
385
        # instance for each working tree format, one additional instance
 
386
        # that will use the default wt format, but create a revision tree for
 
387
        # the tests, and one more that uses the default wt format as a
 
388
        # lightweight checkout of a remote repository.  This means that the wt
 
389
        # ones should have the workingtree_to_test_tree attribute set to
 
390
        # 'return_parameter' and the revision one set to
 
391
        # revision_tree_from_workingtree.
 
392
 
 
393
        from .per_tree import (
 
394
            _dirstate_tree_from_workingtree,
 
395
            make_scenarios,
 
396
            preview_tree_pre,
 
397
            preview_tree_post,
 
398
            return_parameter,
 
399
            revision_tree_from_workingtree
 
400
            )
 
401
        server1 = "a"
 
402
        server2 = "b"
 
403
        smart_server = test_server.SmartTCPServer_for_testing
 
404
        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
 
405
        mem_server = memory.MemoryServer
 
406
        formats = [workingtree_4.WorkingTreeFormat4(),
 
407
                   workingtree_3.WorkingTreeFormat3(), ]
 
408
        scenarios = make_scenarios(server1, server2, formats)
 
409
        self.assertEqual(9, len(scenarios))
 
410
        default_wt_format = workingtree.format_registry.get_default()
 
411
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
412
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
413
        wt6_format = workingtree_4.WorkingTreeFormat6()
 
414
        git_wt_format = git_workingtree.GitWorkingTreeFormat()
 
415
        expected_scenarios = [
 
416
            ('WorkingTreeFormat4',
 
417
             {'bzrdir_format': formats[0]._matchingcontroldir,
 
418
              'transport_readonly_server': 'b',
 
419
              'transport_server': 'a',
 
420
              'workingtree_format': formats[0],
 
421
              '_workingtree_to_test_tree': return_parameter,
 
422
              }),
 
423
            ('WorkingTreeFormat3',
 
424
             {'bzrdir_format': formats[1]._matchingcontroldir,
 
425
              'transport_readonly_server': 'b',
 
426
              'transport_server': 'a',
 
427
              'workingtree_format': formats[1],
 
428
              '_workingtree_to_test_tree': return_parameter,
 
429
              }),
 
430
            ('WorkingTreeFormat6,remote',
 
431
             {'bzrdir_format': wt6_format._matchingcontroldir,
 
432
              'repo_is_remote': True,
 
433
              'transport_readonly_server': smart_readonly_server,
 
434
              'transport_server': smart_server,
 
435
              'vfs_transport_factory': mem_server,
 
436
              'workingtree_format': wt6_format,
 
437
              '_workingtree_to_test_tree': return_parameter,
 
438
              }),
 
439
            ('RevisionTree',
 
440
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
 
441
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
442
              'transport_readonly_server': 'b',
 
443
              'transport_server': 'a',
 
444
              'workingtree_format': default_wt_format,
 
445
              }),
 
446
            ('GitRevisionTree',
 
447
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
 
448
              'bzrdir_format': git_wt_format._matchingcontroldir,
 
449
              'transport_readonly_server': 'b',
 
450
              'transport_server': 'a',
 
451
              'workingtree_format': git_wt_format,
 
452
              }
 
453
             ),
 
454
            ('DirStateRevisionTree,WT4',
 
455
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
456
              'bzrdir_format': wt4_format._matchingcontroldir,
 
457
              'transport_readonly_server': 'b',
 
458
              'transport_server': 'a',
 
459
              'workingtree_format': wt4_format,
 
460
              }),
 
461
            ('DirStateRevisionTree,WT5',
 
462
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
463
              'bzrdir_format': wt5_format._matchingcontroldir,
 
464
              'transport_readonly_server': 'b',
 
465
              'transport_server': 'a',
 
466
              'workingtree_format': wt5_format,
 
467
              }),
 
468
            ('PreviewTree',
 
469
             {'_workingtree_to_test_tree': preview_tree_pre,
 
470
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
471
              'transport_readonly_server': 'b',
 
472
              'transport_server': 'a',
 
473
              'workingtree_format': default_wt_format}),
 
474
            ('PreviewTreePost',
 
475
             {'_workingtree_to_test_tree': preview_tree_post,
 
476
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
477
              'transport_readonly_server': 'b',
 
478
              'transport_server': 'a',
 
479
              'workingtree_format': default_wt_format}),
 
480
            ]
 
481
        self.assertEqual(expected_scenarios, scenarios)
 
482
 
 
483
 
 
484
class TestInterTreeScenarios(tests.TestCase):
 
485
    """A group of tests that test the InterTreeTestAdapter."""
 
486
 
 
487
    def test_scenarios(self):
 
488
        # check that constructor parameters are passed through to the adapted
 
489
        # test.
 
490
        # for InterTree tests we want the machinery to bring up two trees in
 
491
        # each instance: the base one, and the one we are interacting with.
 
492
        # because each optimiser can be direction specific, we need to test
 
493
        # each optimiser in its chosen direction.
 
494
        # unlike the TestProviderAdapter we dont want to automatically add a
 
495
        # parameterized one for WorkingTree - the optimisers will tell us what
 
496
        # ones to add.
 
497
        from .per_tree import (
 
498
            return_parameter,
 
499
            )
 
500
        from .per_intertree import (
 
501
            make_scenarios,
 
502
            )
 
503
        from ..bzr.workingtree_3 import WorkingTreeFormat3
 
504
        from ..bzr.workingtree_4 import WorkingTreeFormat4
 
505
        input_test = TestInterTreeScenarios(
 
506
            "test_scenarios")
 
507
        server1 = "a"
 
508
        server2 = "b"
 
509
        format1 = WorkingTreeFormat4()
 
510
        format2 = WorkingTreeFormat3()
 
511
        formats = [("1", str, format1, format2, "converter1"),
 
512
                   ("2", int, format2, format1, "converter2")]
 
513
        scenarios = make_scenarios(server1, server2, formats)
 
514
        self.assertEqual(2, len(scenarios))
 
515
        expected_scenarios = [
 
516
            ("1", {
 
517
                "bzrdir_format": format1._matchingcontroldir,
 
518
                "intertree_class": formats[0][1],
 
519
                "workingtree_format": formats[0][2],
 
520
                "workingtree_format_to": formats[0][3],
 
521
                "mutable_trees_to_test_trees": formats[0][4],
 
522
                "_workingtree_to_test_tree": return_parameter,
 
523
                "transport_server": server1,
 
524
                "transport_readonly_server": server2,
 
525
                }),
 
526
            ("2", {
 
527
                "bzrdir_format": format2._matchingcontroldir,
 
528
                "intertree_class": formats[1][1],
 
529
                "workingtree_format": formats[1][2],
 
530
                "workingtree_format_to": formats[1][3],
 
531
                "mutable_trees_to_test_trees": formats[1][4],
 
532
                "_workingtree_to_test_tree": return_parameter,
 
533
                "transport_server": server1,
 
534
                "transport_readonly_server": server2,
 
535
                }),
 
536
            ]
 
537
        self.assertEqual(scenarios, expected_scenarios)
 
538
 
 
539
 
 
540
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
 
541
 
 
542
    def test_home_is_not_working(self):
 
543
        self.assertNotEqual(self.test_dir, self.test_home_dir)
 
544
        cwd = osutils.getcwd()
 
545
        self.assertIsSameRealPath(self.test_dir, cwd)
 
546
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
547
 
 
548
    def test_assertEqualStat_equal(self):
 
549
        from .test_dirstate import _FakeStat
 
550
        self.build_tree(["foo"])
 
551
        real = os.lstat("foo")
 
552
        fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
 
553
                         real.st_dev, real.st_ino, real.st_mode)
 
554
        self.assertEqualStat(real, fake)
 
555
 
 
556
    def test_assertEqualStat_notequal(self):
 
557
        self.build_tree(["foo", "longname"])
 
558
        self.assertRaises(AssertionError, self.assertEqualStat,
 
559
                          os.lstat("foo"), os.lstat("longname"))
 
560
 
 
561
    def test_assertPathExists(self):
 
562
        self.assertPathExists('.')
 
563
        self.build_tree(['foo/', 'foo/bar'])
 
564
        self.assertPathExists('foo/bar')
 
565
        self.assertPathDoesNotExist('foo/foo')
 
566
 
 
567
 
 
568
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
 
569
 
 
570
    def test_home_is_non_existant_dir_under_root(self):
 
571
        """The test_home_dir for TestCaseWithMemoryTransport is missing.
 
572
 
 
573
        This is because TestCaseWithMemoryTransport is for tests that do not
 
574
        need any disk resources: they should be hooked into breezy in such a
 
575
        way that no global settings are being changed by the test (only a
 
576
        few tests should need to do that), and having a missing dir as home is
 
577
        an effective way to ensure that this is the case.
 
578
        """
 
579
        self.assertIsSameRealPath(
 
580
            self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
 
581
            self.test_home_dir)
 
582
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
583
 
 
584
    def test_cwd_is_TEST_ROOT(self):
 
585
        self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
 
586
        cwd = osutils.getcwd()
 
587
        self.assertIsSameRealPath(self.test_dir, cwd)
 
588
 
 
589
    def test_BRZ_HOME_and_HOME_are_bytestrings(self):
 
590
        """The $BRZ_HOME and $HOME environment variables should not be unicode.
 
591
 
 
592
        See https://bugs.launchpad.net/bzr/+bug/464174
 
593
        """
 
594
        self.assertIsInstance(os.environ['BRZ_HOME'], str)
 
595
        self.assertIsInstance(os.environ['HOME'], str)
 
596
 
 
597
    def test_make_branch_and_memory_tree(self):
 
598
        """In TestCaseWithMemoryTransport we should not make the branch on disk.
 
599
 
 
600
        This is hard to comprehensively robustly test, so we settle for making
 
601
        a branch and checking no directory was created at its relpath.
 
602
        """
 
603
        tree = self.make_branch_and_memory_tree('dir')
 
604
        # Guard against regression into MemoryTransport leaking
 
605
        # files to disk instead of keeping them in memory.
 
606
        self.assertFalse(osutils.lexists('dir'))
 
607
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
608
 
 
609
    def test_make_branch_and_memory_tree_with_format(self):
 
610
        """make_branch_and_memory_tree should accept a format option."""
 
611
        format = bzrdir.BzrDirMetaFormat1()
 
612
        format.repository_format = repository.format_registry.get_default()
 
613
        tree = self.make_branch_and_memory_tree('dir', format=format)
 
614
        # Guard against regression into MemoryTransport leaking
 
615
        # files to disk instead of keeping them in memory.
 
616
        self.assertFalse(osutils.lexists('dir'))
 
617
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
618
        self.assertEqual(format.repository_format.__class__,
 
619
                         tree.branch.repository._format.__class__)
 
620
 
 
621
    def test_make_branch_builder(self):
 
622
        builder = self.make_branch_builder('dir')
 
623
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
 
624
        # Guard against regression into MemoryTransport leaking
 
625
        # files to disk instead of keeping them in memory.
 
626
        self.assertFalse(osutils.lexists('dir'))
 
627
 
 
628
    def test_make_branch_builder_with_format(self):
 
629
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
 
630
        # that the format objects are used.
 
631
        format = bzrdir.BzrDirMetaFormat1()
 
632
        repo_format = repository.format_registry.get_default()
 
633
        format.repository_format = repo_format
 
634
        builder = self.make_branch_builder('dir', format=format)
 
635
        the_branch = builder.get_branch()
 
636
        # Guard against regression into MemoryTransport leaking
 
637
        # files to disk instead of keeping them in memory.
 
638
        self.assertFalse(osutils.lexists('dir'))
 
639
        self.assertEqual(format.repository_format.__class__,
 
640
                         the_branch.repository._format.__class__)
 
641
        self.assertEqual(repo_format.get_format_string(),
 
642
                         self.get_transport().get_bytes(
 
643
            'dir/.bzr/repository/format'))
 
644
 
 
645
    def test_make_branch_builder_with_format_name(self):
 
646
        builder = self.make_branch_builder('dir', format='knit')
 
647
        the_branch = builder.get_branch()
 
648
        # Guard against regression into MemoryTransport leaking
 
649
        # files to disk instead of keeping them in memory.
 
650
        self.assertFalse(osutils.lexists('dir'))
 
651
        dir_format = controldir.format_registry.make_controldir('knit')
 
652
        self.assertEqual(dir_format.repository_format.__class__,
 
653
                         the_branch.repository._format.__class__)
 
654
        self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
 
655
                         self.get_transport().get_bytes(
 
656
                             'dir/.bzr/repository/format'))
 
657
 
 
658
    def test_dangling_locks_cause_failures(self):
 
659
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
 
660
            def test_function(self):
 
661
                t = self.get_transport_from_path('.')
 
662
                l = lockdir.LockDir(t, 'lock')
 
663
                l.create()
 
664
                l.attempt_lock()
 
665
        test = TestDanglingLock('test_function')
 
666
        result = test.run()
 
667
        total_failures = result.errors + result.failures
 
668
        if self._lock_check_thorough:
 
669
            self.assertEqual(1, len(total_failures))
 
670
        else:
 
671
            # When _lock_check_thorough is disabled, then we don't trigger a
 
672
            # failure
 
673
            self.assertEqual(0, len(total_failures))
 
674
 
 
675
 
 
676
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
 
677
    """Tests for the convenience functions TestCaseWithTransport introduces."""
 
678
 
 
679
    def test_get_readonly_url_none(self):
 
680
        from ..transport.readonly import ReadonlyTransportDecorator
 
681
        self.vfs_transport_factory = memory.MemoryServer
 
682
        self.transport_readonly_server = None
 
683
        # calling get_readonly_transport() constructs a decorator on the url
 
684
        # for the server
 
685
        url = self.get_readonly_url()
 
686
        url2 = self.get_readonly_url('foo/bar')
 
687
        t = transport.get_transport_from_url(url)
 
688
        t2 = transport.get_transport_from_url(url2)
 
689
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
690
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
691
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
692
 
 
693
    def test_get_readonly_url_http(self):
 
694
        from .http_server import HttpServer
 
695
        from ..transport.http import HttpTransport
 
696
        self.transport_server = test_server.LocalURLServer
 
697
        self.transport_readonly_server = HttpServer
 
698
        # calling get_readonly_transport() gives us a HTTP server instance.
 
699
        url = self.get_readonly_url()
 
700
        url2 = self.get_readonly_url('foo/bar')
 
701
        # the transport returned may be any HttpTransportBase subclass
 
702
        t = transport.get_transport_from_url(url)
 
703
        t2 = transport.get_transport_from_url(url2)
 
704
        self.assertIsInstance(t, HttpTransport)
 
705
        self.assertIsInstance(t2, HttpTransport)
 
706
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
707
 
 
708
    def test_is_directory(self):
 
709
        """Test assertIsDirectory assertion"""
 
710
        t = self.get_transport()
 
711
        self.build_tree(['a_dir/', 'a_file'], transport=t)
 
712
        self.assertIsDirectory('a_dir', t)
 
713
        self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
 
714
        self.assertRaises(
 
715
            AssertionError, self.assertIsDirectory, 'not_here', t)
 
716
 
 
717
    def test_make_branch_builder(self):
 
718
        builder = self.make_branch_builder('dir')
 
719
        rev_id = builder.build_commit()
 
720
        self.assertPathExists('dir')
 
721
        a_dir = controldir.ControlDir.open('dir')
 
722
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
 
723
        a_branch = a_dir.open_branch()
 
724
        builder_branch = builder.get_branch()
 
725
        self.assertEqual(a_branch.base, builder_branch.base)
 
726
        self.assertEqual((1, rev_id), builder_branch.last_revision_info())
 
727
        self.assertEqual((1, rev_id), a_branch.last_revision_info())
 
728
 
 
729
 
 
730
class TestTestCaseTransports(tests.TestCaseWithTransport):
 
731
 
 
732
    def setUp(self):
 
733
        super(TestTestCaseTransports, self).setUp()
 
734
        self.vfs_transport_factory = memory.MemoryServer
 
735
 
 
736
    def test_make_controldir_preserves_transport(self):
 
737
        t = self.get_transport()
 
738
        result_bzrdir = self.make_controldir('subdir')
 
739
        self.assertIsInstance(result_bzrdir.transport,
 
740
                              memory.MemoryTransport)
 
741
        # should not be on disk, should only be in memory
 
742
        self.assertPathDoesNotExist('subdir')
 
743
 
 
744
 
 
745
class TestChrootedTest(tests.ChrootedTestCase):
 
746
 
 
747
    def test_root_is_root(self):
 
748
        t = transport.get_transport_from_url(self.get_readonly_url())
 
749
        url = t.base
 
750
        self.assertEqual(url, t.clone('..').base)
 
751
 
 
752
 
 
753
class TestProfileResult(tests.TestCase):
 
754
 
 
755
    def test_profiles_tests(self):
 
756
        self.requireFeature(features.lsprof_feature)
 
757
        terminal = testtools.testresult.doubles.ExtendedTestResult()
 
758
        result = tests.ProfileResult(terminal)
 
759
 
 
760
        class Sample(tests.TestCase):
 
761
            def a(self):
 
762
                self.sample_function()
 
763
 
 
764
            def sample_function(self):
 
765
                pass
 
766
        test = Sample("a")
 
767
        test.run(result)
 
768
        case = terminal._events[0][1]
 
769
        self.assertLength(1, case._benchcalls)
 
770
        # We must be able to unpack it as the test reporting code wants
 
771
        (_, _, _), stats = case._benchcalls[0]
 
772
        self.assertTrue(callable(stats.pprint))
 
773
 
 
774
 
 
775
class TestTestResult(tests.TestCase):
 
776
 
 
777
    def check_timing(self, test_case, expected_re):
 
778
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
779
        capture = testtools.testresult.doubles.ExtendedTestResult()
 
780
        test_case.run(MultiTestResult(result, capture))
 
781
        run_case = capture._events[0][1]
 
782
        timed_string = result._testTimeString(run_case)
 
783
        self.assertContainsRe(timed_string, expected_re)
 
784
 
 
785
    def test_test_reporting(self):
 
786
        class ShortDelayTestCase(tests.TestCase):
 
787
            def test_short_delay(self):
 
788
                time.sleep(0.003)
 
789
 
 
790
            def test_short_benchmark(self):
 
791
                self.time(time.sleep, 0.003)
 
792
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
793
                          r"^ +[0-9]+ms$")
 
794
        # if a benchmark time is given, we now show just that time followed by
 
795
        # a star
 
796
        self.check_timing(ShortDelayTestCase('test_short_benchmark'),
 
797
                          r"^ +[0-9]+ms\*$")
 
798
 
 
799
    def test_unittest_reporting_unittest_class(self):
 
800
        # getting the time from a non-breezy test works ok
 
801
        class ShortDelayTestCase(unittest.TestCase):
 
802
            def test_short_delay(self):
 
803
                time.sleep(0.003)
 
804
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
805
                          r"^ +[0-9]+ms$")
 
806
 
 
807
    def _time_hello_world_encoding(self):
 
808
        """Profile two sleep calls
 
809
 
 
810
        This is used to exercise the test framework.
 
811
        """
 
812
        self.time(text_type, b'hello', errors='replace')
 
813
        self.time(text_type, b'world', errors='replace')
 
814
 
 
815
    def test_lsprofiling(self):
 
816
        """Verbose test result prints lsprof statistics from test cases."""
 
817
        self.requireFeature(features.lsprof_feature)
 
818
        result_stream = StringIO()
 
819
        result = breezy.tests.VerboseTestResult(
 
820
            result_stream,
 
821
            descriptions=0,
 
822
            verbosity=2,
 
823
            )
 
824
        # we want profile a call of some sort and check it is output by
 
825
        # addSuccess. We dont care about addError or addFailure as they
 
826
        # are not that interesting for performance tuning.
 
827
        # make a new test instance that when run will generate a profile
 
828
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
829
        example_test_case._gather_lsprof_in_benchmarks = True
 
830
        # execute the test, which should succeed and record profiles
 
831
        example_test_case.run(result)
 
832
        # lsprofile_something()
 
833
        # if this worked we want
 
834
        # LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
 
835
        #    CallCount    Recursive    Total(ms)   Inline(ms) module:lineno(function)
 
836
        # (the lsprof header)
 
837
        # ... an arbitrary number of lines
 
838
        # and the function call which is time.sleep.
 
839
        #           1        0            ???         ???       ???(sleep)
 
840
        # and then repeated but with 'world', rather than 'hello'.
 
841
        # this should appear in the output stream of our test result.
 
842
        output = result_stream.getvalue()
 
843
        if PY3:
 
844
            self.assertContainsRe(output,
 
845
                                  r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
 
846
            self.assertContainsRe(output,
 
847
                                  r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
 
848
        else:
 
849
            self.assertContainsRe(output,
 
850
                                  r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
 
851
            self.assertContainsRe(output,
 
852
                                  r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
 
853
        self.assertContainsRe(output,
 
854
                              r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
 
855
        self.assertContainsRe(output,
 
856
                              r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
 
857
 
 
858
    def test_uses_time_from_testtools(self):
 
859
        """Test case timings in verbose results should use testtools times"""
 
860
        import datetime
 
861
 
 
862
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
863
            def startTest(self, test):
 
864
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
865
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
866
 
 
867
            def addSuccess(self, test):
 
868
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
869
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
870
 
 
871
            def report_tests_starting(self): pass
 
872
        sio = StringIO()
 
873
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
874
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
875
 
 
876
    def test_known_failure(self):
 
877
        """Using knownFailure should trigger several result actions."""
 
878
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
879
            def stopTestRun(self): pass
 
880
 
 
881
            def report_tests_starting(self): pass
 
882
 
 
883
            def report_known_failure(self, test, err=None, details=None):
 
884
                self._call = test, 'known failure'
 
885
        result = InstrumentedTestResult(None, None, None, None)
 
886
 
 
887
        class Test(tests.TestCase):
 
888
            def test_function(self):
 
889
                self.knownFailure('failed!')
 
890
        test = Test("test_function")
 
891
        test.run(result)
 
892
        # it should invoke 'report_known_failure'.
 
893
        self.assertEqual(2, len(result._call))
 
894
        self.assertEqual(test.id(), result._call[0].id())
 
895
        self.assertEqual('known failure', result._call[1])
 
896
        # we dont introspec the traceback, if the rest is ok, it would be
 
897
        # exceptional for it not to be.
 
898
        # it should update the known_failure_count on the object.
 
899
        self.assertEqual(1, result.known_failure_count)
 
900
        # the result should be successful.
 
901
        self.assertTrue(result.wasSuccessful())
 
902
 
 
903
    def test_verbose_report_known_failure(self):
 
904
        # verbose test output formatting
 
905
        result_stream = StringIO()
 
906
        result = breezy.tests.VerboseTestResult(
 
907
            result_stream,
 
908
            descriptions=0,
 
909
            verbosity=2,
 
910
            )
 
911
        _get_test("test_xfail").run(result)
 
912
        self.assertContainsRe(result_stream.getvalue(),
 
913
                              "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
914
                              "\\s*(?:Text attachment: )?reason"
 
915
                              "(?:\n-+\n|: {{{)"
 
916
                              "this_fails"
 
917
                              "(?:\n-+\n|}}}\n)")
 
918
 
 
919
    def get_passing_test(self):
 
920
        """Return a test object that can't be run usefully."""
 
921
        def passing_test():
 
922
            pass
 
923
        return unittest.FunctionTestCase(passing_test)
 
924
 
 
925
    def test_add_not_supported(self):
 
926
        """Test the behaviour of invoking addNotSupported."""
 
927
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
928
            def stopTestRun(self): pass
 
929
 
 
930
            def report_tests_starting(self): pass
 
931
 
 
932
            def report_unsupported(self, test, feature):
 
933
                self._call = test, feature
 
934
        result = InstrumentedTestResult(None, None, None, None)
 
935
        test = SampleTestCase('_test_pass')
 
936
        feature = features.Feature()
 
937
        result.startTest(test)
 
938
        result.addNotSupported(test, feature)
 
939
        # it should invoke 'report_unsupported'.
 
940
        self.assertEqual(2, len(result._call))
 
941
        self.assertEqual(test, result._call[0])
 
942
        self.assertEqual(feature, result._call[1])
 
943
        # the result should be successful.
 
944
        self.assertTrue(result.wasSuccessful())
 
945
        # it should record the test against a count of tests not run due to
 
946
        # this feature.
 
947
        self.assertEqual(1, result.unsupported['Feature'])
 
948
        # and invoking it again should increment that counter
 
949
        result.addNotSupported(test, feature)
 
950
        self.assertEqual(2, result.unsupported['Feature'])
 
951
 
 
952
    def test_verbose_report_unsupported(self):
 
953
        # verbose test output formatting
 
954
        result_stream = StringIO()
 
955
        result = breezy.tests.VerboseTestResult(
 
956
            result_stream,
 
957
            descriptions=0,
 
958
            verbosity=2,
 
959
            )
 
960
        test = self.get_passing_test()
 
961
        feature = features.Feature()
 
962
        result.startTest(test)
 
963
        prefix = len(result_stream.getvalue())
 
964
        result.report_unsupported(test, feature)
 
965
        output = result_stream.getvalue()[prefix:]
 
966
        lines = output.splitlines()
 
967
        # We don't check for the final '0ms' since it may fail on slow hosts
 
968
        self.assertStartsWith(lines[0], 'NODEP')
 
969
        self.assertEqual(lines[1],
 
970
                         "    The feature 'Feature' is not available.")
 
971
 
 
972
    def test_unavailable_exception(self):
 
973
        """An UnavailableFeature being raised should invoke addNotSupported."""
 
974
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
975
            def stopTestRun(self):
 
976
                pass
 
977
 
 
978
            def report_tests_starting(self):
 
979
                pass
 
980
 
 
981
            def addNotSupported(self, test, feature):
 
982
                self._call = test, feature
 
983
        result = InstrumentedTestResult(None, None, None, None)
 
984
        feature = features.Feature()
 
985
 
 
986
        class Test(tests.TestCase):
 
987
            def test_function(self):
 
988
                raise tests.UnavailableFeature(feature)
 
989
        test = Test("test_function")
 
990
        test.run(result)
 
991
        # it should invoke 'addNotSupported'.
 
992
        self.assertEqual(2, len(result._call))
 
993
        self.assertEqual(test.id(), result._call[0].id())
 
994
        self.assertEqual(feature, result._call[1])
 
995
        # and not count as an error
 
996
        self.assertEqual(0, result.error_count)
 
997
 
 
998
    def test_strict_with_unsupported_feature(self):
 
999
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
1000
        test = self.get_passing_test()
 
1001
        feature = "Unsupported Feature"
 
1002
        result.addNotSupported(test, feature)
 
1003
        self.assertFalse(result.wasStrictlySuccessful())
 
1004
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
1005
 
 
1006
    def test_strict_with_known_failure(self):
 
1007
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
1008
        test = _get_test("test_xfail")
 
1009
        test.run(result)
 
1010
        self.assertFalse(result.wasStrictlySuccessful())
 
1011
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
1012
 
 
1013
    def test_strict_with_success(self):
 
1014
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
1015
        test = self.get_passing_test()
 
1016
        result.addSuccess(test)
 
1017
        self.assertTrue(result.wasStrictlySuccessful())
 
1018
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
1019
 
 
1020
    def test_startTests(self):
 
1021
        """Starting the first test should trigger startTests."""
 
1022
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1023
            calls = 0
 
1024
 
 
1025
            def startTests(self):
 
1026
                self.calls += 1
 
1027
        result = InstrumentedTestResult(None, None, None, None)
 
1028
 
 
1029
        def test_function():
 
1030
            pass
 
1031
        test = unittest.FunctionTestCase(test_function)
 
1032
        test.run(result)
 
1033
        self.assertEqual(1, result.calls)
 
1034
 
 
1035
    def test_startTests_only_once(self):
 
1036
        """With multiple tests startTests should still only be called once"""
 
1037
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1038
            calls = 0
 
1039
 
 
1040
            def startTests(self):
 
1041
                self.calls += 1
 
1042
        result = InstrumentedTestResult(None, None, None, None)
 
1043
        suite = unittest.TestSuite([
 
1044
            unittest.FunctionTestCase(lambda: None),
 
1045
            unittest.FunctionTestCase(lambda: None)])
 
1046
        suite.run(result)
 
1047
        self.assertEqual(1, result.calls)
 
1048
        self.assertEqual(2, result.count)
 
1049
 
 
1050
 
 
1051
class TestRunner(tests.TestCase):
 
1052
 
 
1053
    def dummy_test(self):
 
1054
        pass
 
1055
 
 
1056
    def run_test_runner(self, testrunner, test):
 
1057
        """Run suite in testrunner, saving global state and restoring it.
 
1058
 
 
1059
        This current saves and restores:
 
1060
        TestCaseInTempDir.TEST_ROOT
 
1061
 
 
1062
        There should be no tests in this file that use
 
1063
        breezy.tests.TextTestRunner without using this convenience method,
 
1064
        because of our use of global state.
 
1065
        """
 
1066
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1067
        try:
 
1068
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1069
            return testrunner.run(test)
 
1070
        finally:
 
1071
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1072
 
 
1073
    def test_known_failure_failed_run(self):
 
1074
        # run a test that generates a known failure which should be printed in
 
1075
        # the final output when real failures occur.
 
1076
        class Test(tests.TestCase):
 
1077
            def known_failure_test(self):
 
1078
                self.expectFailure('failed', self.assertTrue, False)
 
1079
        test = unittest.TestSuite()
 
1080
        test.addTest(Test("known_failure_test"))
 
1081
 
 
1082
        def failing_test():
 
1083
            raise AssertionError('foo')
 
1084
        test.addTest(unittest.FunctionTestCase(failing_test))
 
1085
        stream = StringIO()
 
1086
        runner = tests.TextTestRunner(stream=stream)
 
1087
        self.run_test_runner(runner, test)
 
1088
        self.assertContainsRe(
 
1089
            stream.getvalue(),
 
1090
            '(?sm)^brz selftest.*$'
 
1091
            '.*'
 
1092
            '^======================================================================\n'
 
1093
            '^FAIL: failing_test\n'
 
1094
            '^----------------------------------------------------------------------\n'
 
1095
            'Traceback \\(most recent call last\\):\n'
 
1096
            '  .*'  # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1097
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1098
            '.*'
 
1099
            '^----------------------------------------------------------------------\n'
 
1100
            '.*'
 
1101
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1102
            )
 
1103
 
 
1104
    def test_known_failure_ok_run(self):
 
1105
        # run a test that generates a known failure which should be printed in
 
1106
        # the final output.
 
1107
        class Test(tests.TestCase):
 
1108
            def known_failure_test(self):
 
1109
                self.knownFailure("Never works...")
 
1110
        test = Test("known_failure_test")
 
1111
        stream = StringIO()
 
1112
        runner = tests.TextTestRunner(stream=stream)
 
1113
        self.run_test_runner(runner, test)
 
1114
        self.assertContainsRe(stream.getvalue(),
 
1115
                              '\n'
 
1116
                              '-*\n'
 
1117
                              'Ran 1 test in .*\n'
 
1118
                              '\n'
 
1119
                              'OK \\(known_failures=1\\)\n')
 
1120
 
 
1121
    def test_unexpected_success_bad(self):
 
1122
        class Test(tests.TestCase):
 
1123
            def test_truth(self):
 
1124
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1125
        runner = tests.TextTestRunner(stream=StringIO())
 
1126
        self.run_test_runner(runner, Test("test_truth"))
 
1127
        self.assertContainsRe(runner.stream.getvalue(),
 
1128
                              "=+\n"
 
1129
                              "FAIL: \\S+\\.test_truth\n"
 
1130
                              "-+\n"
 
1131
                              "(?:.*\n)*"
 
1132
                              "\\s*(?:Text attachment: )?reason"
 
1133
                              "(?:\n-+\n|: {{{)"
 
1134
                              "No absolute truth"
 
1135
                              "(?:\n-+\n|}}}\n)"
 
1136
                              "(?:.*\n)*"
 
1137
                              "-+\n"
 
1138
                              "Ran 1 test in .*\n"
 
1139
                              "\n"
 
1140
                              "FAILED \\(failures=1\\)\n\\Z")
 
1141
 
 
1142
    def test_result_decorator(self):
 
1143
        # decorate results
 
1144
        calls = []
 
1145
 
 
1146
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1147
            def startTest(self, test):
 
1148
                ExtendedToOriginalDecorator.startTest(self, test)
 
1149
                calls.append('start')
 
1150
        test = unittest.FunctionTestCase(lambda: None)
 
1151
        stream = StringIO()
 
1152
        runner = tests.TextTestRunner(stream=stream,
 
1153
                                      result_decorators=[LoggingDecorator])
 
1154
        self.run_test_runner(runner, test)
 
1155
        self.assertLength(1, calls)
 
1156
 
 
1157
    def test_skipped_test(self):
 
1158
        # run a test that is skipped, and check the suite as a whole still
 
1159
        # succeeds.
 
1160
        # skipping_test must be hidden in here so it's not run as a real test
 
1161
        class SkippingTest(tests.TestCase):
 
1162
            def skipping_test(self):
 
1163
                raise tests.TestSkipped('test intentionally skipped')
 
1164
        runner = tests.TextTestRunner(stream=StringIO())
 
1165
        test = SkippingTest("skipping_test")
 
1166
        result = self.run_test_runner(runner, test)
 
1167
        self.assertTrue(result.wasSuccessful())
 
1168
 
 
1169
    def test_skipped_from_setup(self):
 
1170
        calls = []
 
1171
 
 
1172
        class SkippedSetupTest(tests.TestCase):
 
1173
 
 
1174
            def setUp(self):
 
1175
                calls.append('setUp')
 
1176
                self.addCleanup(self.cleanup)
 
1177
                raise tests.TestSkipped('skipped setup')
 
1178
 
 
1179
            def test_skip(self):
 
1180
                self.fail('test reached')
 
1181
 
 
1182
            def cleanup(self):
 
1183
                calls.append('cleanup')
 
1184
 
 
1185
        runner = tests.TextTestRunner(stream=StringIO())
 
1186
        test = SkippedSetupTest('test_skip')
 
1187
        result = self.run_test_runner(runner, test)
 
1188
        self.assertTrue(result.wasSuccessful())
 
1189
        # Check if cleanup was called the right number of times.
 
1190
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1191
 
 
1192
    def test_skipped_from_test(self):
 
1193
        calls = []
 
1194
 
 
1195
        class SkippedTest(tests.TestCase):
 
1196
 
 
1197
            def setUp(self):
 
1198
                super(SkippedTest, self).setUp()
 
1199
                calls.append('setUp')
 
1200
                self.addCleanup(self.cleanup)
 
1201
 
 
1202
            def test_skip(self):
 
1203
                raise tests.TestSkipped('skipped test')
 
1204
 
 
1205
            def cleanup(self):
 
1206
                calls.append('cleanup')
 
1207
 
 
1208
        runner = tests.TextTestRunner(stream=StringIO())
 
1209
        test = SkippedTest('test_skip')
 
1210
        result = self.run_test_runner(runner, test)
 
1211
        self.assertTrue(result.wasSuccessful())
 
1212
        # Check if cleanup was called the right number of times.
 
1213
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1214
 
 
1215
    def test_not_applicable(self):
 
1216
        # run a test that is skipped because it's not applicable
 
1217
        class Test(tests.TestCase):
 
1218
            def not_applicable_test(self):
 
1219
                raise tests.TestNotApplicable('this test never runs')
 
1220
        out = StringIO()
 
1221
        runner = tests.TextTestRunner(stream=out, verbosity=2)
 
1222
        test = Test("not_applicable_test")
 
1223
        result = self.run_test_runner(runner, test)
 
1224
        self.log(out.getvalue())
 
1225
        self.assertTrue(result.wasSuccessful())
 
1226
        self.assertTrue(result.wasStrictlySuccessful())
 
1227
        self.assertContainsRe(out.getvalue(),
 
1228
                              r'(?m)not_applicable_test  * N/A')
 
1229
        self.assertContainsRe(out.getvalue(),
 
1230
                              r'(?m)^    this test never runs')
 
1231
 
 
1232
    def test_unsupported_features_listed(self):
 
1233
        """When unsupported features are encountered they are detailed."""
 
1234
        class Feature1(features.Feature):
 
1235
            def _probe(self):
 
1236
                return False
 
1237
 
 
1238
        class Feature2(features.Feature):
 
1239
            def _probe(self):
 
1240
                return False
 
1241
        # create sample tests
 
1242
        test1 = SampleTestCase('_test_pass')
 
1243
        test1._test_needs_features = [Feature1()]
 
1244
        test2 = SampleTestCase('_test_pass')
 
1245
        test2._test_needs_features = [Feature2()]
 
1246
        test = unittest.TestSuite()
 
1247
        test.addTest(test1)
 
1248
        test.addTest(test2)
 
1249
        stream = StringIO()
 
1250
        runner = tests.TextTestRunner(stream=stream)
 
1251
        self.run_test_runner(runner, test)
 
1252
        lines = stream.getvalue().splitlines()
 
1253
        self.assertEqual([
 
1254
            'OK',
 
1255
            "Missing feature 'Feature1' skipped 1 tests.",
 
1256
            "Missing feature 'Feature2' skipped 1 tests.",
 
1257
            ],
 
1258
            lines[-3:])
 
1259
 
 
1260
    def test_verbose_test_count(self):
 
1261
        """A verbose test run reports the right test count at the start"""
 
1262
        suite = TestUtil.TestSuite([
 
1263
            unittest.FunctionTestCase(lambda:None),
 
1264
            unittest.FunctionTestCase(lambda:None)])
 
1265
        self.assertEqual(suite.countTestCases(), 2)
 
1266
        stream = StringIO()
 
1267
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1268
        # Need to use the CountingDecorator as that's what sets num_tests
 
1269
        self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1270
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1271
 
 
1272
    def test_startTestRun(self):
 
1273
        """run should call result.startTestRun()"""
 
1274
        calls = []
 
1275
 
 
1276
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1277
            def startTestRun(self):
 
1278
                ExtendedToOriginalDecorator.startTestRun(self)
 
1279
                calls.append('startTestRun')
 
1280
        test = unittest.FunctionTestCase(lambda: None)
 
1281
        stream = StringIO()
 
1282
        runner = tests.TextTestRunner(stream=stream,
 
1283
                                      result_decorators=[LoggingDecorator])
 
1284
        self.run_test_runner(runner, test)
 
1285
        self.assertLength(1, calls)
 
1286
 
 
1287
    def test_stopTestRun(self):
 
1288
        """run should call result.stopTestRun()"""
 
1289
        calls = []
 
1290
 
 
1291
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1292
            def stopTestRun(self):
 
1293
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1294
                calls.append('stopTestRun')
 
1295
        test = unittest.FunctionTestCase(lambda: None)
 
1296
        stream = StringIO()
 
1297
        runner = tests.TextTestRunner(stream=stream,
 
1298
                                      result_decorators=[LoggingDecorator])
 
1299
        self.run_test_runner(runner, test)
 
1300
        self.assertLength(1, calls)
 
1301
 
 
1302
    def test_unicode_test_output_on_ascii_stream(self):
 
1303
        """Showing results should always succeed even on an ascii console"""
 
1304
        class FailureWithUnicode(tests.TestCase):
 
1305
            def test_log_unicode(self):
 
1306
                self.log(u"\u2606")
 
1307
                self.fail("Now print that log!")
 
1308
        if PY3:
 
1309
            bio = BytesIO()
 
1310
            out = TextIOWrapper(bio, 'ascii', 'backslashreplace')
 
1311
        else:
 
1312
            bio = out = StringIO()
 
1313
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1314
                          lambda trace=False: "ascii")
 
1315
        self.run_test_runner(
 
1316
            tests.TextTestRunner(stream=out),
 
1317
            FailureWithUnicode("test_log_unicode"))
 
1318
        out.flush()
 
1319
        self.assertContainsRe(bio.getvalue(),
 
1320
                              b"(?:Text attachment: )?log"
 
1321
                              b"(?:\n-+\n|: {{{)"
 
1322
                              b"\\d+\\.\\d+  \\\\u2606"
 
1323
                              b"(?:\n-+\n|}}}\n)")
 
1324
 
 
1325
 
 
1326
class SampleTestCase(tests.TestCase):
 
1327
 
 
1328
    def _test_pass(self):
 
1329
        pass
 
1330
 
 
1331
 
 
1332
class _TestException(Exception):
 
1333
    pass
 
1334
 
 
1335
 
 
1336
class TestTestCase(tests.TestCase):
 
1337
    """Tests that test the core breezy TestCase."""
 
1338
 
 
1339
    def test_assertLength_matches_empty(self):
 
1340
        a_list = []
 
1341
        self.assertLength(0, a_list)
 
1342
 
 
1343
    def test_assertLength_matches_nonempty(self):
 
1344
        a_list = [1, 2, 3]
 
1345
        self.assertLength(3, a_list)
 
1346
 
 
1347
    def test_assertLength_fails_different(self):
 
1348
        a_list = []
 
1349
        self.assertRaises(AssertionError, self.assertLength, 1, a_list)
 
1350
 
 
1351
    def test_assertLength_shows_sequence_in_failure(self):
 
1352
        a_list = [1, 2, 3]
 
1353
        exception = self.assertRaises(AssertionError, self.assertLength, 2,
 
1354
                                      a_list)
 
1355
        self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
 
1356
                         exception.args[0])
 
1357
 
 
1358
    def test_base_setUp_not_called_causes_failure(self):
 
1359
        class TestCaseWithBrokenSetUp(tests.TestCase):
 
1360
            def setUp(self):
 
1361
                pass  # does not call TestCase.setUp
 
1362
 
 
1363
            def test_foo(self):
 
1364
                pass
 
1365
        test = TestCaseWithBrokenSetUp('test_foo')
 
1366
        result = unittest.TestResult()
 
1367
        test.run(result)
 
1368
        self.assertFalse(result.wasSuccessful())
 
1369
        self.assertEqual(1, result.testsRun)
 
1370
 
 
1371
    def test_base_tearDown_not_called_causes_failure(self):
 
1372
        class TestCaseWithBrokenTearDown(tests.TestCase):
 
1373
            def tearDown(self):
 
1374
                pass  # does not call TestCase.tearDown
 
1375
 
 
1376
            def test_foo(self):
 
1377
                pass
 
1378
        test = TestCaseWithBrokenTearDown('test_foo')
 
1379
        result = unittest.TestResult()
 
1380
        test.run(result)
 
1381
        self.assertFalse(result.wasSuccessful())
 
1382
        self.assertEqual(1, result.testsRun)
 
1383
 
 
1384
    def test_debug_flags_sanitised(self):
 
1385
        """The breezy debug flags should be sanitised by setUp."""
 
1386
        if 'allow_debug' in tests.selftest_debug_flags:
 
1387
            raise tests.TestNotApplicable(
 
1388
                '-Eallow_debug option prevents debug flag sanitisation')
 
1389
        # we could set something and run a test that will check
 
1390
        # it gets santised, but this is probably sufficient for now:
 
1391
        # if someone runs the test with -Dsomething it will error.
 
1392
        flags = set()
 
1393
        if self._lock_check_thorough:
 
1394
            flags.add('strict_locks')
 
1395
        self.assertEqual(flags, breezy.debug.debug_flags)
 
1396
 
 
1397
    def change_selftest_debug_flags(self, new_flags):
 
1398
        self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
 
1399
 
 
1400
    def test_allow_debug_flag(self):
 
1401
        """The -Eallow_debug flag prevents breezy.debug.debug_flags from being
 
1402
        sanitised (i.e. cleared) before running a test.
 
1403
        """
 
1404
        self.change_selftest_debug_flags({'allow_debug'})
 
1405
        breezy.debug.debug_flags = {'a-flag'}
 
1406
 
 
1407
        class TestThatRecordsFlags(tests.TestCase):
 
1408
            def test_foo(nested_self):
 
1409
                self.flags = set(breezy.debug.debug_flags)
 
1410
        test = TestThatRecordsFlags('test_foo')
 
1411
        test.run(self.make_test_result())
 
1412
        flags = {'a-flag'}
 
1413
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1414
            flags.add('strict_locks')
 
1415
        self.assertEqual(flags, self.flags)
 
1416
 
 
1417
    def test_disable_lock_checks(self):
 
1418
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1419
        class TestThatRecordsFlags(tests.TestCase):
 
1420
            def test_foo(nested_self):
 
1421
                self.flags = set(breezy.debug.debug_flags)
 
1422
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1423
        self.change_selftest_debug_flags(set())
 
1424
        test = TestThatRecordsFlags('test_foo')
 
1425
        test.run(self.make_test_result())
 
1426
        # By default we do strict lock checking and thorough lock/unlock
 
1427
        # tracking.
 
1428
        self.assertTrue(self.test_lock_check_thorough)
 
1429
        self.assertEqual({'strict_locks'}, self.flags)
 
1430
        # Now set the disable_lock_checks flag, and show that this changed.
 
1431
        self.change_selftest_debug_flags({'disable_lock_checks'})
 
1432
        test = TestThatRecordsFlags('test_foo')
 
1433
        test.run(self.make_test_result())
 
1434
        self.assertFalse(self.test_lock_check_thorough)
 
1435
        self.assertEqual(set(), self.flags)
 
1436
 
 
1437
    def test_this_fails_strict_lock_check(self):
 
1438
        class TestThatRecordsFlags(tests.TestCase):
 
1439
            def test_foo(nested_self):
 
1440
                self.flags1 = set(breezy.debug.debug_flags)
 
1441
                self.thisFailsStrictLockCheck()
 
1442
                self.flags2 = set(breezy.debug.debug_flags)
 
1443
        # Make sure lock checking is active
 
1444
        self.change_selftest_debug_flags(set())
 
1445
        test = TestThatRecordsFlags('test_foo')
 
1446
        test.run(self.make_test_result())
 
1447
        self.assertEqual({'strict_locks'}, self.flags1)
 
1448
        self.assertEqual(set(), self.flags2)
 
1449
 
 
1450
    def test_debug_flags_restored(self):
 
1451
        """The breezy debug flags should be restored to their original state
 
1452
        after the test was run, even if allow_debug is set.
 
1453
        """
 
1454
        self.change_selftest_debug_flags({'allow_debug'})
 
1455
        # Now run a test that modifies debug.debug_flags.
 
1456
        breezy.debug.debug_flags = {'original-state'}
 
1457
 
 
1458
        class TestThatModifiesFlags(tests.TestCase):
 
1459
            def test_foo(self):
 
1460
                breezy.debug.debug_flags = {'modified'}
 
1461
        test = TestThatModifiesFlags('test_foo')
 
1462
        test.run(self.make_test_result())
 
1463
        self.assertEqual({'original-state'}, breezy.debug.debug_flags)
 
1464
 
 
1465
    def make_test_result(self):
 
1466
        """Get a test result that writes to a StringIO."""
 
1467
        return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
1468
 
 
1469
    def inner_test(self):
 
1470
        # the inner child test
 
1471
        note("inner_test")
 
1472
 
 
1473
    def outer_child(self):
 
1474
        # the outer child test
 
1475
        note("outer_start")
 
1476
        self.inner_test = TestTestCase("inner_child")
 
1477
        result = self.make_test_result()
 
1478
        self.inner_test.run(result)
 
1479
        note("outer finish")
 
1480
        self.addCleanup(osutils.delete_any, self._log_file_name)
 
1481
 
 
1482
    def test_trace_nesting(self):
 
1483
        # this tests that each test case nests its trace facility correctly.
 
1484
        # we do this by running a test case manually. That test case (A)
 
1485
        # should setup a new log, log content to it, setup a child case (B),
 
1486
        # which should log independently, then case (A) should log a trailer
 
1487
        # and return.
 
1488
        # we do two nested children so that we can verify the state of the
 
1489
        # logs after the outer child finishes is correct, which a bad clean
 
1490
        # up routine in tearDown might trigger a fault in our test with only
 
1491
        # one child, we should instead see the bad result inside our test with
 
1492
        # the two children.
 
1493
        # the outer child test
 
1494
        original_trace = breezy.trace._trace_file
 
1495
        outer_test = TestTestCase("outer_child")
 
1496
        result = self.make_test_result()
 
1497
        outer_test.run(result)
 
1498
        self.assertEqual(original_trace, breezy.trace._trace_file)
 
1499
 
 
1500
    def method_that_times_a_bit_twice(self):
 
1501
        # call self.time twice to ensure it aggregates
 
1502
        self.time(time.sleep, 0.007)
 
1503
        self.time(time.sleep, 0.007)
 
1504
 
 
1505
    def test_time_creates_benchmark_in_result(self):
 
1506
        """The TestCase.time() method accumulates a benchmark time."""
 
1507
        sample_test = TestTestCase("method_that_times_a_bit_twice")
 
1508
        output_stream = StringIO()
 
1509
        result = breezy.tests.VerboseTestResult(
 
1510
            output_stream,
 
1511
            descriptions=0,
 
1512
            verbosity=2)
 
1513
        sample_test.run(result)
 
1514
        self.assertContainsRe(
 
1515
            output_stream.getvalue(),
 
1516
            r"\d+ms\*\n$")
 
1517
 
 
1518
    def test_hooks_sanitised(self):
 
1519
        """The breezy hooks should be sanitised by setUp."""
 
1520
        # Note this test won't fail with hooks that the core library doesn't
 
1521
        # use - but it trigger with a plugin that adds hooks, so its still a
 
1522
        # useful warning in that case.
 
1523
        self.assertEqual(breezy.branch.BranchHooks(),
 
1524
                         breezy.branch.Branch.hooks)
 
1525
        self.assertEqual(
 
1526
            breezy.bzr.smart.server.SmartServerHooks(),
 
1527
            breezy.bzr.smart.server.SmartTCPServer.hooks)
 
1528
        self.assertEqual(
 
1529
            breezy.commands.CommandHooks(), breezy.commands.Command.hooks)
 
1530
 
 
1531
    def test__gather_lsprof_in_benchmarks(self):
 
1532
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
 
1533
 
 
1534
        Each self.time() call is individually and separately profiled.
 
1535
        """
 
1536
        self.requireFeature(features.lsprof_feature)
 
1537
        # overrides the class member with an instance member so no cleanup
 
1538
        # needed.
 
1539
        self._gather_lsprof_in_benchmarks = True
 
1540
        self.time(time.sleep, 0.000)
 
1541
        self.time(time.sleep, 0.003)
 
1542
        self.assertEqual(2, len(self._benchcalls))
 
1543
        self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
 
1544
        self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
 
1545
        self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats)
 
1546
        self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats)
 
1547
        del self._benchcalls[:]
 
1548
 
 
1549
    def test_knownFailure(self):
 
1550
        """Self.knownFailure() should raise a KnownFailure exception."""
 
1551
        self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
 
1552
 
 
1553
    def test_open_bzrdir_safe_roots(self):
 
1554
        # even a memory transport should fail to open when its url isn't
 
1555
        # permitted.
 
1556
        # Manually set one up (TestCase doesn't and shouldn't provide magic
 
1557
        # machinery)
 
1558
        transport_server = memory.MemoryServer()
 
1559
        transport_server.start_server()
 
1560
        self.addCleanup(transport_server.stop_server)
 
1561
        t = transport.get_transport_from_url(transport_server.get_url())
 
1562
        controldir.ControlDir.create(t.base)
 
1563
        self.assertRaises(errors.BzrError,
 
1564
                          controldir.ControlDir.open_from_transport, t)
 
1565
        # But if we declare this as safe, we can open the bzrdir.
 
1566
        self.permit_url(t.base)
 
1567
        self._bzr_selftest_roots.append(t.base)
 
1568
        controldir.ControlDir.open_from_transport(t)
 
1569
 
 
1570
    def test_requireFeature_available(self):
 
1571
        """self.requireFeature(available) is a no-op."""
 
1572
        class Available(features.Feature):
 
1573
            def _probe(self):
 
1574
                return True
 
1575
        feature = Available()
 
1576
        self.requireFeature(feature)
 
1577
 
 
1578
    def test_requireFeature_unavailable(self):
 
1579
        """self.requireFeature(unavailable) raises UnavailableFeature."""
 
1580
        class Unavailable(features.Feature):
 
1581
            def _probe(self):
 
1582
                return False
 
1583
        feature = Unavailable()
 
1584
        self.assertRaises(tests.UnavailableFeature,
 
1585
                          self.requireFeature, feature)
 
1586
 
 
1587
    def test_run_no_parameters(self):
 
1588
        test = SampleTestCase('_test_pass')
 
1589
        test.run()
 
1590
 
 
1591
    def test_run_enabled_unittest_result(self):
 
1592
        """Test we revert to regular behaviour when the test is enabled."""
 
1593
        test = SampleTestCase('_test_pass')
 
1594
 
 
1595
        class EnabledFeature(object):
 
1596
            def available(self):
 
1597
                return True
 
1598
        test._test_needs_features = [EnabledFeature()]
 
1599
        result = unittest.TestResult()
 
1600
        test.run(result)
 
1601
        self.assertEqual(1, result.testsRun)
 
1602
        self.assertEqual([], result.errors)
 
1603
        self.assertEqual([], result.failures)
 
1604
 
 
1605
    def test_run_disabled_unittest_result(self):
 
1606
        """Test our compatibility for disabled tests with unittest results."""
 
1607
        test = SampleTestCase('_test_pass')
 
1608
 
 
1609
        class DisabledFeature(object):
 
1610
            def available(self):
 
1611
                return False
 
1612
        test._test_needs_features = [DisabledFeature()]
 
1613
        result = unittest.TestResult()
 
1614
        test.run(result)
 
1615
        self.assertEqual(1, result.testsRun)
 
1616
        self.assertEqual([], result.errors)
 
1617
        self.assertEqual([], result.failures)
 
1618
 
 
1619
    def test_run_disabled_supporting_result(self):
 
1620
        """Test disabled tests behaviour with support aware results."""
 
1621
        test = SampleTestCase('_test_pass')
 
1622
 
 
1623
        class DisabledFeature(object):
 
1624
            def __eq__(self, other):
 
1625
                return isinstance(other, DisabledFeature)
 
1626
 
 
1627
            def available(self):
 
1628
                return False
 
1629
        the_feature = DisabledFeature()
 
1630
        test._test_needs_features = [the_feature]
 
1631
 
 
1632
        class InstrumentedTestResult(unittest.TestResult):
 
1633
            def __init__(self):
 
1634
                unittest.TestResult.__init__(self)
 
1635
                self.calls = []
 
1636
 
 
1637
            def startTest(self, test):
 
1638
                self.calls.append(('startTest', test))
 
1639
 
 
1640
            def stopTest(self, test):
 
1641
                self.calls.append(('stopTest', test))
 
1642
 
 
1643
            def addNotSupported(self, test, feature):
 
1644
                self.calls.append(('addNotSupported', test, feature))
 
1645
        result = InstrumentedTestResult()
 
1646
        test.run(result)
 
1647
        case = result.calls[0][1]
 
1648
        self.assertEqual([
 
1649
            ('startTest', case),
 
1650
            ('addNotSupported', case, the_feature),
 
1651
            ('stopTest', case),
 
1652
            ],
 
1653
            result.calls)
 
1654
 
 
1655
    def test_start_server_registers_url(self):
 
1656
        transport_server = memory.MemoryServer()
 
1657
        # A little strict, but unlikely to be changed soon.
 
1658
        self.assertEqual([], self._bzr_selftest_roots)
 
1659
        self.start_server(transport_server)
 
1660
        self.assertSubset([transport_server.get_url()],
 
1661
                          self._bzr_selftest_roots)
 
1662
 
 
1663
    def test_assert_list_raises_on_generator(self):
 
1664
        def generator_which_will_raise():
 
1665
            # This will not raise until after the first yield
 
1666
            yield 1
 
1667
            raise _TestException()
 
1668
 
 
1669
        e = self.assertListRaises(_TestException, generator_which_will_raise)
 
1670
        self.assertIsInstance(e, _TestException)
 
1671
 
 
1672
        e = self.assertListRaises(Exception, generator_which_will_raise)
 
1673
        self.assertIsInstance(e, _TestException)
 
1674
 
 
1675
    def test_assert_list_raises_on_plain(self):
 
1676
        def plain_exception():
 
1677
            raise _TestException()
 
1678
            return []
 
1679
 
 
1680
        e = self.assertListRaises(_TestException, plain_exception)
 
1681
        self.assertIsInstance(e, _TestException)
 
1682
 
 
1683
        e = self.assertListRaises(Exception, plain_exception)
 
1684
        self.assertIsInstance(e, _TestException)
 
1685
 
 
1686
    def test_assert_list_raises_assert_wrong_exception(self):
 
1687
        class _NotTestException(Exception):
 
1688
            pass
 
1689
 
 
1690
        def wrong_exception():
 
1691
            raise _NotTestException()
 
1692
 
 
1693
        def wrong_exception_generator():
 
1694
            yield 1
 
1695
            yield 2
 
1696
            raise _NotTestException()
 
1697
 
 
1698
        # Wrong exceptions are not intercepted
 
1699
        self.assertRaises(
 
1700
            _NotTestException,
 
1701
            self.assertListRaises, _TestException, wrong_exception)
 
1702
        self.assertRaises(
 
1703
            _NotTestException,
 
1704
            self.assertListRaises, _TestException, wrong_exception_generator)
 
1705
 
 
1706
    def test_assert_list_raises_no_exception(self):
 
1707
        def success():
 
1708
            return []
 
1709
 
 
1710
        def success_generator():
 
1711
            yield 1
 
1712
            yield 2
 
1713
 
 
1714
        self.assertRaises(AssertionError,
 
1715
                          self.assertListRaises, _TestException, success)
 
1716
 
 
1717
        self.assertRaises(
 
1718
            AssertionError,
 
1719
            self.assertListRaises, _TestException, success_generator)
 
1720
 
 
1721
    def _run_successful_test(self, test):
 
1722
        result = testtools.TestResult()
 
1723
        test.run(result)
 
1724
        self.assertTrue(result.wasSuccessful())
 
1725
        return result
 
1726
 
 
1727
    def test_overrideAttr_without_value(self):
 
1728
        self.test_attr = 'original'  # Define a test attribute
 
1729
        obj = self  # Make 'obj' visible to the embedded test
 
1730
 
 
1731
        class Test(tests.TestCase):
 
1732
 
 
1733
            def setUp(self):
 
1734
                super(Test, self).setUp()
 
1735
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1736
 
 
1737
            def test_value(self):
 
1738
                self.assertEqual('original', self.orig)
 
1739
                self.assertEqual('original', obj.test_attr)
 
1740
                obj.test_attr = 'modified'
 
1741
                self.assertEqual('modified', obj.test_attr)
 
1742
 
 
1743
        self._run_successful_test(Test('test_value'))
 
1744
        self.assertEqual('original', obj.test_attr)
 
1745
 
 
1746
    def test_overrideAttr_with_value(self):
 
1747
        self.test_attr = 'original'  # Define a test attribute
 
1748
        obj = self  # Make 'obj' visible to the embedded test
 
1749
 
 
1750
        class Test(tests.TestCase):
 
1751
 
 
1752
            def setUp(self):
 
1753
                super(Test, self).setUp()
 
1754
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1755
 
 
1756
            def test_value(self):
 
1757
                self.assertEqual('original', self.orig)
 
1758
                self.assertEqual('modified', obj.test_attr)
 
1759
 
 
1760
        self._run_successful_test(Test('test_value'))
 
1761
        self.assertEqual('original', obj.test_attr)
 
1762
 
 
1763
    def test_overrideAttr_with_no_existing_value_and_value(self):
 
1764
        # Do not define the test_attribute
 
1765
        obj = self  # Make 'obj' visible to the embedded test
 
1766
 
 
1767
        class Test(tests.TestCase):
 
1768
 
 
1769
            def setUp(self):
 
1770
                tests.TestCase.setUp(self)
 
1771
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1772
 
 
1773
            def test_value(self):
 
1774
                self.assertEqual(tests._unitialized_attr, self.orig)
 
1775
                self.assertEqual('modified', obj.test_attr)
 
1776
 
 
1777
        self._run_successful_test(Test('test_value'))
 
1778
        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1779
 
 
1780
    def test_overrideAttr_with_no_existing_value_and_no_value(self):
 
1781
        # Do not define the test_attribute
 
1782
        obj = self  # Make 'obj' visible to the embedded test
 
1783
 
 
1784
        class Test(tests.TestCase):
 
1785
 
 
1786
            def setUp(self):
 
1787
                tests.TestCase.setUp(self)
 
1788
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1789
 
 
1790
            def test_value(self):
 
1791
                self.assertEqual(tests._unitialized_attr, self.orig)
 
1792
                self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1793
 
 
1794
        self._run_successful_test(Test('test_value'))
 
1795
        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1796
 
 
1797
    def test_recordCalls(self):
 
1798
        from breezy.tests import test_selftest
 
1799
        calls = self.recordCalls(
 
1800
            test_selftest, '_add_numbers')
 
1801
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1802
                         12)
 
1803
        self.assertEqual(calls, [((2, 10), {})])
 
1804
 
 
1805
 
 
1806
def _add_numbers(a, b):
 
1807
    return a + b
 
1808
 
 
1809
 
 
1810
class _MissingFeature(features.Feature):
 
1811
    def _probe(self):
 
1812
        return False
 
1813
 
 
1814
 
 
1815
missing_feature = _MissingFeature()
 
1816
 
 
1817
 
 
1818
def _get_test(name):
 
1819
    """Get an instance of a specific example test.
 
1820
 
 
1821
    We protect this in a function so that they don't auto-run in the test
 
1822
    suite.
 
1823
    """
 
1824
 
 
1825
    class ExampleTests(tests.TestCase):
 
1826
 
 
1827
        def test_fail(self):
 
1828
            mutter('this was a failing test')
 
1829
            self.fail('this test will fail')
 
1830
 
 
1831
        def test_error(self):
 
1832
            mutter('this test errored')
 
1833
            raise RuntimeError('gotcha')
 
1834
 
 
1835
        def test_missing_feature(self):
 
1836
            mutter('missing the feature')
 
1837
            self.requireFeature(missing_feature)
 
1838
 
 
1839
        def test_skip(self):
 
1840
            mutter('this test will be skipped')
 
1841
            raise tests.TestSkipped('reason')
 
1842
 
 
1843
        def test_success(self):
 
1844
            mutter('this test succeeds')
 
1845
 
 
1846
        def test_xfail(self):
 
1847
            mutter('test with expected failure')
 
1848
            self.knownFailure('this_fails')
 
1849
 
 
1850
        def test_unexpected_success(self):
 
1851
            mutter('test with unexpected success')
 
1852
            self.expectFailure('should_fail', lambda: None)
 
1853
 
 
1854
    return ExampleTests(name)
 
1855
 
 
1856
 
 
1857
class TestTestCaseLogDetails(tests.TestCase):
 
1858
 
 
1859
    def _run_test(self, test_name):
 
1860
        test = _get_test(test_name)
 
1861
        result = testtools.TestResult()
 
1862
        test.run(result)
 
1863
        return result
 
1864
 
 
1865
    def test_fail_has_log(self):
 
1866
        result = self._run_test('test_fail')
 
1867
        self.assertEqual(1, len(result.failures))
 
1868
        result_content = result.failures[0][1]
 
1869
        self.assertContainsRe(result_content,
 
1870
                              '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1871
        self.assertContainsRe(result_content, 'this was a failing test')
 
1872
 
 
1873
    def test_error_has_log(self):
 
1874
        result = self._run_test('test_error')
 
1875
        self.assertEqual(1, len(result.errors))
 
1876
        result_content = result.errors[0][1]
 
1877
        self.assertContainsRe(result_content,
 
1878
                              '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1879
        self.assertContainsRe(result_content, 'this test errored')
 
1880
 
 
1881
    def test_skip_has_no_log(self):
 
1882
        result = self._run_test('test_skip')
 
1883
        reasons = result.skip_reasons
 
1884
        self.assertEqual({'reason'}, set(reasons))
 
1885
        skips = reasons['reason']
 
1886
        self.assertEqual(1, len(skips))
 
1887
        test = skips[0]
 
1888
        self.assertFalse('log' in test.getDetails())
 
1889
 
 
1890
    def test_missing_feature_has_no_log(self):
 
1891
        # testtools doesn't know about addNotSupported, so it just gets
 
1892
        # considered as a skip
 
1893
        result = self._run_test('test_missing_feature')
 
1894
        reasons = result.skip_reasons
 
1895
        self.assertEqual({str(missing_feature)}, set(reasons))
 
1896
        skips = reasons[str(missing_feature)]
 
1897
        self.assertEqual(1, len(skips))
 
1898
        test = skips[0]
 
1899
        self.assertFalse('log' in test.getDetails())
 
1900
 
 
1901
    def test_xfail_has_no_log(self):
 
1902
        result = self._run_test('test_xfail')
 
1903
        self.assertEqual(1, len(result.expectedFailures))
 
1904
        result_content = result.expectedFailures[0][1]
 
1905
        self.assertNotContainsRe(result_content,
 
1906
                                 '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1907
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1908
 
 
1909
    def test_unexpected_success_has_log(self):
 
1910
        result = self._run_test('test_unexpected_success')
 
1911
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1912
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1913
        # expectedFailures is a list of reasons?
 
1914
        test = result.unexpectedSuccesses[0]
 
1915
        details = test.getDetails()
 
1916
        self.assertTrue('log' in details)
 
1917
 
 
1918
 
 
1919
class TestTestCloning(tests.TestCase):
 
1920
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1921
 
 
1922
    def test_cloned_testcase_does_not_share_details(self):
 
1923
        """A TestCase cloned with clone_test does not share mutable attributes
 
1924
        such as details or cleanups.
 
1925
        """
 
1926
        class Test(tests.TestCase):
 
1927
            def test_foo(self):
 
1928
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1929
        orig_test = Test('test_foo')
 
1930
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1931
        orig_test.run(unittest.TestResult())
 
1932
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1933
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1934
 
 
1935
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1936
        """Applying two levels of scenarios to a test preserves the attributes
 
1937
        added by both scenarios.
 
1938
        """
 
1939
        class Test(tests.TestCase):
 
1940
            def test_foo(self):
 
1941
                pass
 
1942
        test = Test('test_foo')
 
1943
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1944
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1945
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1946
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1947
        all_tests = list(tests.iter_suite_tests(suite))
 
1948
        self.assertLength(4, all_tests)
 
1949
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1950
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1951
 
 
1952
 
 
1953
# NB: Don't delete this; it's not actually from 0.11!
 
1954
@deprecated_function(deprecated_in((0, 11, 0)))
 
1955
def sample_deprecated_function():
 
1956
    """A deprecated function to test applyDeprecated with."""
 
1957
    return 2
 
1958
 
 
1959
 
 
1960
def sample_undeprecated_function(a_param):
 
1961
    """A undeprecated function to test applyDeprecated with."""
 
1962
 
 
1963
 
 
1964
class ApplyDeprecatedHelper(object):
 
1965
    """A helper class for ApplyDeprecated tests."""
 
1966
 
 
1967
    @deprecated_method(deprecated_in((0, 11, 0)))
 
1968
    def sample_deprecated_method(self, param_one):
 
1969
        """A deprecated method for testing with."""
 
1970
        return param_one
 
1971
 
 
1972
    def sample_normal_method(self):
 
1973
        """A undeprecated method."""
 
1974
 
 
1975
    @deprecated_method(deprecated_in((0, 10, 0)))
 
1976
    def sample_nested_deprecation(self):
 
1977
        return sample_deprecated_function()
 
1978
 
 
1979
 
 
1980
class TestExtraAssertions(tests.TestCase):
 
1981
    """Tests for new test assertions in breezy test suite"""
 
1982
 
 
1983
    def test_assert_isinstance(self):
 
1984
        self.assertIsInstance(2, int)
 
1985
        self.assertIsInstance(u'', (str, text_type))
 
1986
        e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
 
1987
        self.assertIn(
 
1988
            str(e),
 
1989
            ["None is an instance of <type 'NoneType'> rather than "
 
1990
             "<type 'int'>",
 
1991
             "None is an instance of <class 'NoneType'> rather than "
 
1992
             "<class 'int'>"])
 
1993
        self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
 
1994
        e = self.assertRaises(AssertionError,
 
1995
                              self.assertIsInstance, None, int,
 
1996
                              "it's just not")
 
1997
        if PY3:
 
1998
            self.assertEqual(
 
1999
                str(e),
 
2000
                "None is an instance of <class 'NoneType'> rather "
 
2001
                "than <class 'int'>: it's just not")
 
2002
        else:
 
2003
            self.assertEqual(
 
2004
                str(e),
 
2005
                "None is an instance of <type 'NoneType'> "
 
2006
                "rather than <type 'int'>: it's just not")
 
2007
 
 
2008
    def test_assertEndsWith(self):
 
2009
        self.assertEndsWith('foo', 'oo')
 
2010
        self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
 
2011
 
 
2012
    def test_assertEqualDiff(self):
 
2013
        e = self.assertRaises(AssertionError,
 
2014
                              self.assertEqualDiff, '', '\n')
 
2015
        self.assertEqual(str(e),
 
2016
                         # Don't blink ! The '+' applies to the second string
 
2017
                         'first string is missing a final newline.\n+ \n')
 
2018
        e = self.assertRaises(AssertionError,
 
2019
                              self.assertEqualDiff, '\n', '')
 
2020
        self.assertEqual(str(e),
 
2021
                         # Don't blink ! The '-' applies to the second string
 
2022
                         'second string is missing a final newline.\n- \n')
 
2023
 
 
2024
 
 
2025
class TestDeprecations(tests.TestCase):
 
2026
 
 
2027
    def test_applyDeprecated_not_deprecated(self):
 
2028
        sample_object = ApplyDeprecatedHelper()
 
2029
        # calling an undeprecated callable raises an assertion
 
2030
        self.assertRaises(AssertionError, self.applyDeprecated,
 
2031
                          deprecated_in((0, 11, 0)),
 
2032
                          sample_object.sample_normal_method)
 
2033
        self.assertRaises(AssertionError, self.applyDeprecated,
 
2034
                          deprecated_in((0, 11, 0)),
 
2035
                          sample_undeprecated_function, "a param value")
 
2036
        # calling a deprecated callable (function or method) with the wrong
 
2037
        # expected deprecation fails.
 
2038
        self.assertRaises(AssertionError, self.applyDeprecated,
 
2039
                          deprecated_in((0, 10, 0)),
 
2040
                          sample_object.sample_deprecated_method,
 
2041
                          "a param value")
 
2042
        self.assertRaises(AssertionError, self.applyDeprecated,
 
2043
                          deprecated_in((0, 10, 0)),
 
2044
                          sample_deprecated_function)
 
2045
        # calling a deprecated callable (function or method) with the right
 
2046
        # expected deprecation returns the functions result.
 
2047
        self.assertEqual(
 
2048
            "a param value",
 
2049
            self.applyDeprecated(
 
2050
                deprecated_in((0, 11, 0)),
 
2051
                sample_object.sample_deprecated_method, "a param value"))
 
2052
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
 
2053
                                                 sample_deprecated_function))
 
2054
        # calling a nested deprecation with the wrong deprecation version
 
2055
        # fails even if a deeper nested function was deprecated with the
 
2056
        # supplied version.
 
2057
        self.assertRaises(
 
2058
            AssertionError, self.applyDeprecated,
 
2059
            deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
 
2060
        # calling a nested deprecation with the right deprecation value
 
2061
        # returns the calls result.
 
2062
        self.assertEqual(
 
2063
            2, self.applyDeprecated(
 
2064
                deprecated_in((0, 10, 0)),
 
2065
                sample_object.sample_nested_deprecation))
 
2066
 
 
2067
    def test_callDeprecated(self):
 
2068
        def testfunc(be_deprecated, result=None):
 
2069
            if be_deprecated is True:
 
2070
                symbol_versioning.warn('i am deprecated', DeprecationWarning,
 
2071
                                       stacklevel=1)
 
2072
            return result
 
2073
        result = self.callDeprecated(['i am deprecated'], testfunc, True)
 
2074
        self.assertIs(None, result)
 
2075
        result = self.callDeprecated([], testfunc, False, 'result')
 
2076
        self.assertEqual('result', result)
 
2077
        self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
 
2078
        self.callDeprecated([], testfunc, be_deprecated=False)
 
2079
 
 
2080
 
 
2081
class TestWarningTests(tests.TestCase):
 
2082
    """Tests for calling methods that raise warnings."""
 
2083
 
 
2084
    def test_callCatchWarnings(self):
 
2085
        def meth(a, b):
 
2086
            warnings.warn("this is your last warning")
 
2087
            return a + b
 
2088
        wlist, result = self.callCatchWarnings(meth, 1, 2)
 
2089
        self.assertEqual(3, result)
 
2090
        # would like just to compare them, but UserWarning doesn't implement
 
2091
        # eq well
 
2092
        w0, = wlist
 
2093
        self.assertIsInstance(w0, UserWarning)
 
2094
        self.assertEqual("this is your last warning", str(w0))
 
2095
 
 
2096
 
 
2097
class TestConvenienceMakers(tests.TestCaseWithTransport):
 
2098
    """Test for the make_* convenience functions."""
 
2099
 
 
2100
    def test_make_branch_and_tree_with_format(self):
 
2101
        # we should be able to supply a format to make_branch_and_tree
 
2102
        self.make_branch_and_tree(
 
2103
            'a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
 
2104
        self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
 
2105
                              breezy.bzr.bzrdir.BzrDirMetaFormat1)
 
2106
 
 
2107
    def test_make_branch_and_memory_tree(self):
 
2108
        # we should be able to get a new branch and a mutable tree from
 
2109
        # TestCaseWithTransport
 
2110
        tree = self.make_branch_and_memory_tree('a')
 
2111
        self.assertIsInstance(tree, breezy.memorytree.MemoryTree)
 
2112
 
 
2113
    def test_make_tree_for_local_vfs_backed_transport(self):
 
2114
        # make_branch_and_tree has to use local branch and repositories
 
2115
        # when the vfs transport and local disk are colocated, even if
 
2116
        # a different transport is in use for url generation.
 
2117
        self.transport_server = test_server.FakeVFATServer
 
2118
        self.assertFalse(self.get_url('t1').startswith('file://'))
 
2119
        tree = self.make_branch_and_tree('t1')
 
2120
        base = tree.controldir.root_transport.base
 
2121
        self.assertStartsWith(base, 'file://')
 
2122
        self.assertEqual(tree.controldir.root_transport,
 
2123
                         tree.branch.controldir.root_transport)
 
2124
        self.assertEqual(tree.controldir.root_transport,
 
2125
                         tree.branch.repository.controldir.root_transport)
 
2126
 
 
2127
 
 
2128
class SelfTestHelper(object):
 
2129
 
 
2130
    def run_selftest(self, **kwargs):
 
2131
        """Run selftest returning its output."""
 
2132
        if PY3:
 
2133
            bio = BytesIO()
 
2134
            output = TextIOWrapper(bio, 'utf-8')
 
2135
        else:
 
2136
            bio = output = StringIO()
 
2137
        old_transport = breezy.tests.default_transport
 
2138
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
2139
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
2140
        try:
 
2141
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
2142
        finally:
 
2143
            breezy.tests.default_transport = old_transport
 
2144
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
2145
        if PY3:
 
2146
            output.flush()
 
2147
            output.detach()
 
2148
        bio.seek(0)
 
2149
        return bio
 
2150
 
 
2151
 
 
2152
class TestSelftest(tests.TestCase, SelfTestHelper):
 
2153
    """Tests of breezy.tests.selftest."""
 
2154
 
 
2155
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(
 
2156
            self):
 
2157
        factory_called = []
 
2158
 
 
2159
        def factory():
 
2160
            factory_called.append(True)
 
2161
            return TestUtil.TestSuite()
 
2162
        out = StringIO()
 
2163
        err = StringIO()
 
2164
        self.apply_redirected(out, err, None, breezy.tests.selftest,
 
2165
                              test_suite_factory=factory)
 
2166
        self.assertEqual([True], factory_called)
 
2167
 
 
2168
    def factory(self):
 
2169
        """A test suite factory."""
 
2170
        class Test(tests.TestCase):
 
2171
            def id(self):
 
2172
                return __name__ + ".Test." + self._testMethodName
 
2173
 
 
2174
            def a(self):
 
2175
                pass
 
2176
 
 
2177
            def b(self):
 
2178
                pass
 
2179
 
 
2180
            def c(telf):
 
2181
                pass
 
2182
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
2183
 
 
2184
    def test_list_only(self):
 
2185
        output = self.run_selftest(test_suite_factory=self.factory,
 
2186
                                   list_only=True)
 
2187
        self.assertEqual(3, len(output.readlines()))
 
2188
 
 
2189
    def test_list_only_filtered(self):
 
2190
        output = self.run_selftest(test_suite_factory=self.factory,
 
2191
                                   list_only=True, pattern="Test.b")
 
2192
        self.assertEndsWith(output.getvalue(), b"Test.b\n")
 
2193
        self.assertLength(1, output.readlines())
 
2194
 
 
2195
    def test_list_only_excludes(self):
 
2196
        output = self.run_selftest(test_suite_factory=self.factory,
 
2197
                                   list_only=True, exclude_pattern="Test.b")
 
2198
        self.assertNotContainsRe(b"Test.b", output.getvalue())
 
2199
        self.assertLength(2, output.readlines())
 
2200
 
 
2201
    def test_lsprof_tests(self):
 
2202
        self.requireFeature(features.lsprof_feature)
 
2203
        results = []
 
2204
 
 
2205
        class Test(object):
 
2206
            def __call__(test, result):
 
2207
                test.run(result)
 
2208
 
 
2209
            def run(test, result):
 
2210
                results.append(result)
 
2211
 
 
2212
            def countTestCases(self):
 
2213
                return 1
 
2214
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
 
2215
        self.assertLength(1, results)
 
2216
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
 
2217
 
 
2218
    def test_random(self):
 
2219
        # test randomising by listing a number of tests.
 
2220
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
2221
                                       list_only=True, random_seed="123")
 
2222
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
2223
                                       list_only=True, random_seed="234")
 
2224
        self.assertNotEqual(output_123, output_234)
 
2225
        # "Randominzing test order..\n\n
 
2226
        self.assertLength(5, output_123.readlines())
 
2227
        self.assertLength(5, output_234.readlines())
 
2228
 
 
2229
    def test_random_reuse_is_same_order(self):
 
2230
        # test randomising by listing a number of tests.
 
2231
        expected = self.run_selftest(test_suite_factory=self.factory,
 
2232
                                     list_only=True, random_seed="123")
 
2233
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
2234
                                     list_only=True, random_seed="123")
 
2235
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
2236
 
 
2237
    def test_runner_class(self):
 
2238
        self.requireFeature(features.subunit)
 
2239
        from subunit import ProtocolTestCase
 
2240
        stream = self.run_selftest(
 
2241
            runner_class=tests.SubUnitBzrRunnerv1,
 
2242
            test_suite_factory=self.factory)
 
2243
        test = ProtocolTestCase(stream)
 
2244
        result = unittest.TestResult()
 
2245
        test.run(result)
 
2246
        self.assertEqual(3, result.testsRun)
 
2247
 
 
2248
    def test_starting_with_single_argument(self):
 
2249
        output = self.run_selftest(test_suite_factory=self.factory,
 
2250
                                   starting_with=[
 
2251
                                       'breezy.tests.test_selftest.Test.a'],
 
2252
                                   list_only=True)
 
2253
        self.assertEqual(b'breezy.tests.test_selftest.Test.a\n',
 
2254
                         output.getvalue())
 
2255
 
 
2256
    def test_starting_with_multiple_argument(self):
 
2257
        output = self.run_selftest(
 
2258
            test_suite_factory=self.factory,
 
2259
            starting_with=['breezy.tests.test_selftest.Test.a',
 
2260
                           'breezy.tests.test_selftest.Test.b'],
 
2261
            list_only=True)
 
2262
        self.assertEqual(b'breezy.tests.test_selftest.Test.a\n'
 
2263
                         b'breezy.tests.test_selftest.Test.b\n',
 
2264
                         output.getvalue())
 
2265
 
 
2266
    def check_transport_set(self, transport_server):
 
2267
        captured_transport = []
 
2268
 
 
2269
        def seen_transport(a_transport):
 
2270
            captured_transport.append(a_transport)
 
2271
 
 
2272
        class Capture(tests.TestCase):
 
2273
            def a(self):
 
2274
                seen_transport(breezy.tests.default_transport)
 
2275
 
 
2276
        def factory():
 
2277
            return TestUtil.TestSuite([Capture("a")])
 
2278
        self.run_selftest(transport=transport_server,
 
2279
                          test_suite_factory=factory)
 
2280
        self.assertEqual(transport_server, captured_transport[0])
 
2281
 
 
2282
    def test_transport_sftp(self):
 
2283
        self.requireFeature(features.paramiko)
 
2284
        from breezy.tests import stub_sftp
 
2285
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
 
2286
 
 
2287
    def test_transport_memory(self):
 
2288
        self.check_transport_set(memory.MemoryServer)
 
2289
 
 
2290
 
 
2291
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
2292
    # Does IO: reads test.list
 
2293
 
 
2294
    def test_load_list(self):
 
2295
        # Provide a list with one test - this test.
 
2296
        test_id_line = b'%s\n' % self.id().encode('ascii')
 
2297
        self.build_tree_contents([('test.list', test_id_line)])
 
2298
        # And generate a list of the tests in  the suite.
 
2299
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
2300
        self.assertEqual(test_id_line, stream.getvalue())
 
2301
 
 
2302
    def test_load_unknown(self):
 
2303
        # Provide a list with one test - this test.
 
2304
        # And generate a list of the tests in  the suite.
 
2305
        self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
2306
                          load_list='missing file name', list_only=True)
 
2307
 
 
2308
 
 
2309
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2310
 
 
2311
    _test_needs_features = [features.subunit]
 
2312
 
 
2313
    def run_subunit_stream(self, test_name):
 
2314
        from subunit import ProtocolTestCase
 
2315
 
 
2316
        def factory():
 
2317
            return TestUtil.TestSuite([_get_test(test_name)])
 
2318
        stream = self.run_selftest(
 
2319
            runner_class=tests.SubUnitBzrRunnerv1,
 
2320
            test_suite_factory=factory)
 
2321
        test = ProtocolTestCase(stream)
 
2322
        result = testtools.TestResult()
 
2323
        test.run(result)
 
2324
        content = stream.getvalue()
 
2325
        return content, result
 
2326
 
 
2327
    def test_fail_has_log(self):
 
2328
        content, result = self.run_subunit_stream('test_fail')
 
2329
        self.assertEqual(1, len(result.failures))
 
2330
        self.assertContainsRe(content, b'(?m)^log$')
 
2331
        self.assertContainsRe(content, b'this test will fail')
 
2332
 
 
2333
    def test_error_has_log(self):
 
2334
        content, result = self.run_subunit_stream('test_error')
 
2335
        self.assertContainsRe(content, b'(?m)^log$')
 
2336
        self.assertContainsRe(content, b'this test errored')
 
2337
 
 
2338
    def test_skip_has_no_log(self):
 
2339
        content, result = self.run_subunit_stream('test_skip')
 
2340
        self.assertNotContainsRe(content, b'(?m)^log$')
 
2341
        self.assertNotContainsRe(content, b'this test will be skipped')
 
2342
        reasons = result.skip_reasons
 
2343
        self.assertEqual({'reason'}, set(reasons))
 
2344
        skips = reasons['reason']
 
2345
        self.assertEqual(1, len(skips))
 
2346
        # test = skips[0]
 
2347
        # RemotedTestCase doesn't preserve the "details"
 
2348
        # self.assertFalse('log' in test.getDetails())
 
2349
 
 
2350
    def test_missing_feature_has_no_log(self):
 
2351
        content, result = self.run_subunit_stream('test_missing_feature')
 
2352
        self.assertNotContainsRe(content, b'(?m)^log$')
 
2353
        self.assertNotContainsRe(content, b'missing the feature')
 
2354
        reasons = result.skip_reasons
 
2355
        self.assertEqual({'_MissingFeature\n'}, set(reasons))
 
2356
        skips = reasons['_MissingFeature\n']
 
2357
        self.assertEqual(1, len(skips))
 
2358
        # test = skips[0]
 
2359
        # RemotedTestCase doesn't preserve the "details"
 
2360
        # self.assertFalse('log' in test.getDetails())
 
2361
 
 
2362
    def test_xfail_has_no_log(self):
 
2363
        content, result = self.run_subunit_stream('test_xfail')
 
2364
        self.assertNotContainsRe(content, b'(?m)^log$')
 
2365
        self.assertNotContainsRe(content, b'test with expected failure')
 
2366
        self.assertEqual(1, len(result.expectedFailures))
 
2367
        result_content = result.expectedFailures[0][1]
 
2368
        self.assertNotContainsRe(result_content,
 
2369
                                 '(?m)^(?:Text attachment: )?log(?:$|: )')
 
2370
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2371
 
 
2372
    def test_unexpected_success_has_log(self):
 
2373
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2374
        self.assertContainsRe(content, b'(?m)^log$')
 
2375
        self.assertContainsRe(content, b'test with unexpected success')
 
2376
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2377
        # test = result.unexpectedSuccesses[0]
 
2378
        # RemotedTestCase doesn't preserve the "details"
 
2379
        # self.assertTrue('log' in test.getDetails())
 
2380
 
 
2381
    def test_success_has_no_log(self):
 
2382
        content, result = self.run_subunit_stream('test_success')
 
2383
        self.assertEqual(1, result.testsRun)
 
2384
        self.assertNotContainsRe(content, b'(?m)^log$')
 
2385
        self.assertNotContainsRe(content, b'this test succeeds')
 
2386
 
 
2387
 
 
2388
class TestRunBzr(tests.TestCase):
 
2389
 
 
2390
    result = 0
 
2391
    out = ''
 
2392
    err = ''
 
2393
 
 
2394
    def _run_bzr_core(self, argv, encoding=None, stdin=None,
 
2395
                      stdout=None, stderr=None, working_dir=None):
 
2396
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
2397
 
 
2398
        Attempts to run bzr from inside this class don't actually run it.
 
2399
 
 
2400
        We test how run_bzr actually invokes bzr in another location.  Here we
 
2401
        only need to test that it passes the right parameters to run_bzr.
 
2402
        """
 
2403
        self.argv = list(argv)
 
2404
        self.encoding = encoding
 
2405
        self.stdin = stdin
 
2406
        self.working_dir = working_dir
 
2407
        stdout.write(self.out)
 
2408
        stderr.write(self.err)
 
2409
        return self.result
 
2410
 
 
2411
    def test_run_bzr_error(self):
 
2412
        self.out = "It sure does!\n"
 
2413
        self.result = 34
 
2414
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
2415
        self.assertEqual(['rocks'], self.argv)
 
2416
        self.assertEqual('It sure does!\n', out)
 
2417
        self.assertEqual(out, self.out)
 
2418
        self.assertEqual('', err)
 
2419
        self.assertEqual(err, self.err)
 
2420
 
 
2421
    def test_run_bzr_error_regexes(self):
 
2422
        self.out = ''
 
2423
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
2424
        self.result = 3
 
2425
        out, err = self.run_bzr_error(
 
2426
            ["bzr: ERROR: foobarbaz is not versioned"],
 
2427
            ['file-id', 'foobarbaz'])
 
2428
 
 
2429
    def test_encoding(self):
 
2430
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
2431
        self.run_bzr('foo bar')
 
2432
        self.assertEqual(osutils.get_user_encoding(), self.encoding)
 
2433
        self.assertEqual(['foo', 'bar'], self.argv)
 
2434
 
 
2435
        self.run_bzr('foo bar', encoding='baz')
 
2436
        self.assertEqual('baz', self.encoding)
 
2437
        self.assertEqual(['foo', 'bar'], self.argv)
 
2438
 
 
2439
    def test_stdin(self):
 
2440
        # test that the stdin keyword to run_bzr is passed through to
 
2441
        # _run_bzr_core as-is. We do this by overriding
 
2442
        # _run_bzr_core in this class, and then calling run_bzr,
 
2443
        # which is a convenience function for _run_bzr_core, so
 
2444
        # should invoke it.
 
2445
        self.run_bzr('foo bar', stdin='gam')
 
2446
        self.assertEqual('gam', self.stdin)
 
2447
        self.assertEqual(['foo', 'bar'], self.argv)
 
2448
 
 
2449
        self.run_bzr('foo bar', stdin='zippy')
 
2450
        self.assertEqual('zippy', self.stdin)
 
2451
        self.assertEqual(['foo', 'bar'], self.argv)
 
2452
 
 
2453
    def test_working_dir(self):
 
2454
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
2455
        self.run_bzr('foo bar')
 
2456
        self.assertEqual(None, self.working_dir)
 
2457
        self.assertEqual(['foo', 'bar'], self.argv)
 
2458
 
 
2459
        self.run_bzr('foo bar', working_dir='baz')
 
2460
        self.assertEqual('baz', self.working_dir)
 
2461
        self.assertEqual(['foo', 'bar'], self.argv)
 
2462
 
 
2463
    def test_reject_extra_keyword_arguments(self):
 
2464
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
2465
                          error_regex=['error message'])
 
2466
 
 
2467
 
 
2468
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2469
    # Does IO when testing the working_dir parameter.
 
2470
 
 
2471
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2472
                         a_callable=None, *args, **kwargs):
 
2473
        self.stdin = stdin
 
2474
        self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None)
 
2475
        self.factory = breezy.ui.ui_factory
 
2476
        self.working_dir = osutils.getcwd()
 
2477
        stdout.write('foo\n')
 
2478
        stderr.write('bar\n')
 
2479
        return 0
 
2480
 
 
2481
    def test_stdin(self):
 
2482
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2483
        # apply_redirected as a StringIO. We do this by overriding
 
2484
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2485
        # which calls apply_redirected.
 
2486
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2487
        self.assertEqual('gam', self.stdin.read())
 
2488
        self.assertTrue(self.stdin is self.factory_stdin)
 
2489
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2490
        self.assertEqual('zippy', self.stdin.read())
 
2491
        self.assertTrue(self.stdin is self.factory_stdin)
 
2492
 
 
2493
    def test_ui_factory(self):
 
2494
        # each invocation of self.run_bzr should get its
 
2495
        # own UI factory, which is an instance of TestUIFactory,
 
2496
        # with stdin, stdout and stderr attached to the stdin,
 
2497
        # stdout and stderr of the invoked run_bzr
 
2498
        current_factory = breezy.ui.ui_factory
 
2499
        self.run_bzr(['foo'])
 
2500
        self.assertFalse(current_factory is self.factory)
 
2501
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2502
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2503
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2504
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2505
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2506
 
 
2507
    def test_working_dir(self):
 
2508
        self.build_tree(['one/', 'two/'])
 
2509
        cwd = osutils.getcwd()
 
2510
 
 
2511
        # Default is to work in the current directory
 
2512
        self.run_bzr(['foo', 'bar'])
 
2513
        self.assertEqual(cwd, self.working_dir)
 
2514
 
 
2515
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2516
        self.assertEqual(cwd, self.working_dir)
 
2517
 
 
2518
        # The function should be run in the alternative directory
 
2519
        # but afterwards the current working dir shouldn't be changed
 
2520
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2521
        self.assertNotEqual(cwd, self.working_dir)
 
2522
        self.assertEndsWith(self.working_dir, 'one')
 
2523
        self.assertEqual(cwd, osutils.getcwd())
 
2524
 
 
2525
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2526
        self.assertNotEqual(cwd, self.working_dir)
 
2527
        self.assertEndsWith(self.working_dir, 'two')
 
2528
        self.assertEqual(cwd, osutils.getcwd())
 
2529
 
 
2530
 
 
2531
class StubProcess(object):
 
2532
    """A stub process for testing run_bzr_subprocess."""
 
2533
 
 
2534
    def __init__(self, out="", err="", retcode=0):
 
2535
        self.out = out
 
2536
        self.err = err
 
2537
        self.returncode = retcode
 
2538
 
 
2539
    def communicate(self):
 
2540
        return self.out, self.err
 
2541
 
 
2542
 
 
2543
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
 
2544
    """Base class for tests testing how we might run bzr."""
 
2545
 
 
2546
    def setUp(self):
 
2547
        super(TestWithFakedStartBzrSubprocess, self).setUp()
 
2548
        self.subprocess_calls = []
 
2549
 
 
2550
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2551
                             skip_if_plan_to_signal=False,
 
2552
                             working_dir=None,
 
2553
                             allow_plugins=False):
 
2554
        """capture what run_bzr_subprocess tries to do."""
 
2555
        self.subprocess_calls.append(
 
2556
            {'process_args': process_args,
 
2557
             'env_changes': env_changes,
 
2558
             'skip_if_plan_to_signal': skip_if_plan_to_signal,
 
2559
             'working_dir': working_dir, 'allow_plugins': allow_plugins})
 
2560
        return self.next_subprocess
 
2561
 
 
2562
 
 
2563
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2564
 
 
2565
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2566
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2567
 
 
2568
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2569
        that will return static results. This assertion method populates those
 
2570
        results and also checks the arguments run_bzr_subprocess generates.
 
2571
        """
 
2572
        self.next_subprocess = process
 
2573
        try:
 
2574
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2575
        except BaseException:
 
2576
            self.next_subprocess = None
 
2577
            for key, expected in expected_args.items():
 
2578
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2579
            raise
 
2580
        else:
 
2581
            self.next_subprocess = None
 
2582
            for key, expected in expected_args.items():
 
2583
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2584
            return result
 
2585
 
 
2586
    def test_run_bzr_subprocess(self):
 
2587
        """The run_bzr_helper_external command behaves nicely."""
 
2588
        self.assertRunBzrSubprocess({'process_args': ['--version']},
 
2589
                                    StubProcess(), '--version')
 
2590
        self.assertRunBzrSubprocess({'process_args': ['--version']},
 
2591
                                    StubProcess(), ['--version'])
 
2592
        # retcode=None disables retcode checking
 
2593
        result = self.assertRunBzrSubprocess(
 
2594
            {}, StubProcess(retcode=3), '--version', retcode=None)
 
2595
        result = self.assertRunBzrSubprocess(
 
2596
            {}, StubProcess(out="is free software"), '--version')
 
2597
        self.assertContainsRe(result[0], 'is free software')
 
2598
        # Running a subcommand that is missing errors
 
2599
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2600
                          {'process_args': ['--versionn']
 
2601
                           }, StubProcess(retcode=3),
 
2602
                          '--versionn')
 
2603
        # Unless it is told to expect the error from the subprocess
 
2604
        result = self.assertRunBzrSubprocess(
 
2605
            {}, StubProcess(retcode=3), '--versionn', retcode=3)
 
2606
        # Or to ignore retcode checking
 
2607
        result = self.assertRunBzrSubprocess(
 
2608
            {}, StubProcess(err="unknown command", retcode=3),
 
2609
            '--versionn', retcode=None)
 
2610
        self.assertContainsRe(result[1], 'unknown command')
 
2611
 
 
2612
    def test_env_change_passes_through(self):
 
2613
        self.assertRunBzrSubprocess(
 
2614
            {'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}},
 
2615
            StubProcess(), '',
 
2616
            env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None})
 
2617
 
 
2618
    def test_no_working_dir_passed_as_None(self):
 
2619
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2620
 
 
2621
    def test_no_working_dir_passed_through(self):
 
2622
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2623
                                    working_dir='dir')
 
2624
 
 
2625
    def test_run_bzr_subprocess_no_plugins(self):
 
2626
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2627
                                    StubProcess(), '')
 
2628
 
 
2629
    def test_allow_plugins(self):
 
2630
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2631
                                    StubProcess(), '', allow_plugins=True)
 
2632
 
 
2633
 
 
2634
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2635
 
 
2636
    def test_finish_bzr_subprocess_with_error(self):
 
2637
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2638
        """
 
2639
        process = StubProcess(err="unknown command", retcode=3)
 
2640
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2641
        self.assertEqual('', result[0])
 
2642
        self.assertContainsRe(result[1], 'unknown command')
 
2643
 
 
2644
    def test_finish_bzr_subprocess_ignoring_retcode(self):
 
2645
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2646
        process = StubProcess(err="unknown command", retcode=3)
 
2647
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2648
        self.assertEqual('', result[0])
 
2649
        self.assertContainsRe(result[1], 'unknown command')
 
2650
 
 
2651
    def test_finish_subprocess_with_unexpected_retcode(self):
 
2652
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2653
        not the expected one.
 
2654
        """
 
2655
        process = StubProcess(err="unknown command", retcode=3)
 
2656
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2657
                          process)
 
2658
 
 
2659
 
 
2660
class _DontSpawnProcess(Exception):
 
2661
    """A simple exception which just allows us to skip unnecessary steps"""
 
2662
 
 
2663
 
 
2664
class TestStartBzrSubProcess(tests.TestCase):
 
2665
    """Stub test start_bzr_subprocess."""
 
2666
 
 
2667
    def _subprocess_log_cleanup(self):
 
2668
        """Inhibits the base version as we don't produce a log file."""
 
2669
 
 
2670
    def _popen(self, *args, **kwargs):
 
2671
        """Override the base version to record the command that is run.
 
2672
 
 
2673
        From there we can ensure it is correct without spawning a real process.
 
2674
        """
 
2675
        self.check_popen_state()
 
2676
        self._popen_args = args
 
2677
        self._popen_kwargs = kwargs
 
2678
        raise _DontSpawnProcess()
 
2679
 
 
2680
    def check_popen_state(self):
 
2681
        """Replace to make assertions when popen is called."""
 
2682
 
 
2683
    def test_run_bzr_subprocess_no_plugins(self):
 
2684
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2685
        command = self._popen_args[0]
 
2686
        self.assertEqual(sys.executable, command[0])
 
2687
        self.assertEqual(self.get_brz_path(), command[1])
 
2688
        self.assertEqual(['--no-plugins'], command[2:])
 
2689
 
 
2690
    def test_allow_plugins(self):
 
2691
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2692
                          allow_plugins=True)
 
2693
        command = self._popen_args[0]
 
2694
        self.assertEqual([], command[2:])
 
2695
 
 
2696
    def test_set_env(self):
 
2697
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2698
        # set in the child
 
2699
 
 
2700
        def check_environment():
 
2701
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2702
        self.check_popen_state = check_environment
 
2703
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2704
                          env_changes={'EXISTANT_ENV_VAR': 'set variable'})
 
2705
        # not set in theparent
 
2706
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2707
 
 
2708
    def test_run_bzr_subprocess_env_del(self):
 
2709
        """run_bzr_subprocess can remove environment variables too."""
 
2710
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2711
 
 
2712
        def check_environment():
 
2713
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2714
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2715
        self.check_popen_state = check_environment
 
2716
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2717
                          env_changes={'EXISTANT_ENV_VAR': None})
 
2718
        # Still set in parent
 
2719
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2720
        del os.environ['EXISTANT_ENV_VAR']
 
2721
 
 
2722
    def test_env_del_missing(self):
 
2723
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2724
 
 
2725
        def check_environment():
 
2726
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2727
        self.check_popen_state = check_environment
 
2728
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2729
                          env_changes={'NON_EXISTANT_ENV_VAR': None})
 
2730
 
 
2731
    def test_working_dir(self):
 
2732
        """Test that we can specify the working dir for the child"""
 
2733
        chdirs = []
 
2734
 
 
2735
        def chdir(path):
 
2736
            chdirs.append(path)
 
2737
        self.overrideAttr(os, 'chdir', chdir)
 
2738
 
 
2739
        def getcwd():
 
2740
            return 'current'
 
2741
        self.overrideAttr(osutils, 'getcwd', getcwd)
 
2742
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2743
                          working_dir='foo')
 
2744
        self.assertEqual(['foo', 'current'], chdirs)
 
2745
 
 
2746
    def test_get_brz_path_with_cwd_breezy(self):
 
2747
        self.get_source_path = lambda: ""
 
2748
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2749
        self.assertEqual(self.get_brz_path(), "brz")
 
2750
 
 
2751
 
 
2752
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
 
2753
    """Tests that really need to do things with an external bzr."""
 
2754
 
 
2755
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2756
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2757
        not the expected one.
 
2758
        """
 
2759
        self.disable_missing_extensions_warning()
 
2760
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2761
                                            skip_if_plan_to_signal=True)
 
2762
        self.assertEqual(b'running\n', process.stdout.readline())
 
2763
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2764
                                            retcode=3)
 
2765
        self.assertEqual(b'', result[0])
 
2766
        self.assertEqual(b'brz: interrupted\n', result[1])
 
2767
 
 
2768
 
 
2769
class TestSelftestFiltering(tests.TestCase):
 
2770
 
 
2771
    def setUp(self):
 
2772
        super(TestSelftestFiltering, self).setUp()
 
2773
        self.suite = TestUtil.TestSuite()
 
2774
        self.loader = TestUtil.TestLoader()
 
2775
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2776
            sys.modules['breezy.tests.test_selftest']))
 
2777
        self.all_names = _test_ids(self.suite)
 
2778
 
 
2779
    def test_condition_id_re(self):
 
2780
        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2781
                     'test_condition_id_re')
 
2782
        filtered_suite = tests.filter_suite_by_condition(
 
2783
            self.suite, tests.condition_id_re('test_condition_id_re'))
 
2784
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2785
 
 
2786
    def test_condition_id_in_list(self):
 
2787
        test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
 
2788
                      'test_condition_id_in_list']
 
2789
        id_list = tests.TestIdList(test_names)
 
2790
        filtered_suite = tests.filter_suite_by_condition(
 
2791
            self.suite, tests.condition_id_in_list(id_list))
 
2792
        my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
 
2793
        re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
 
2794
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2795
 
 
2796
    def test_condition_id_startswith(self):
 
2797
        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2798
        start1 = klass + 'test_condition_id_starts'
 
2799
        start2 = klass + 'test_condition_id_in'
 
2800
        test_names = [klass + 'test_condition_id_in_list',
 
2801
                      klass + 'test_condition_id_startswith',
 
2802
                      ]
 
2803
        filtered_suite = tests.filter_suite_by_condition(
 
2804
            self.suite, tests.condition_id_startswith([start1, start2]))
 
2805
        self.assertEqual(test_names, _test_ids(filtered_suite))
 
2806
 
 
2807
    def test_condition_isinstance(self):
 
2808
        filtered_suite = tests.filter_suite_by_condition(
 
2809
            self.suite, tests.condition_isinstance(self.__class__))
 
2810
        class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2811
        re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
 
2812
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2813
 
 
2814
    def test_exclude_tests_by_condition(self):
 
2815
        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2816
                         'test_exclude_tests_by_condition')
 
2817
        filtered_suite = tests.exclude_tests_by_condition(
 
2818
            self.suite, lambda x: x.id() == excluded_name)
 
2819
        self.assertEqual(len(self.all_names) - 1,
 
2820
                         filtered_suite.countTestCases())
 
2821
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2822
        remaining_names = list(self.all_names)
 
2823
        remaining_names.remove(excluded_name)
 
2824
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2825
 
 
2826
    def test_exclude_tests_by_re(self):
 
2827
        self.all_names = _test_ids(self.suite)
 
2828
        filtered_suite = tests.exclude_tests_by_re(self.suite,
 
2829
                                                   'exclude_tests_by_re')
 
2830
        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2831
                         'test_exclude_tests_by_re')
 
2832
        self.assertEqual(len(self.all_names) - 1,
 
2833
                         filtered_suite.countTestCases())
 
2834
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2835
        remaining_names = list(self.all_names)
 
2836
        remaining_names.remove(excluded_name)
 
2837
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2838
 
 
2839
    def test_filter_suite_by_condition(self):
 
2840
        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2841
                     'test_filter_suite_by_condition')
 
2842
        filtered_suite = tests.filter_suite_by_condition(
 
2843
            self.suite, lambda x: x.id() == test_name)
 
2844
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2845
 
 
2846
    def test_filter_suite_by_re(self):
 
2847
        filtered_suite = tests.filter_suite_by_re(self.suite,
 
2848
                                                  'test_filter_suite_by_r')
 
2849
        filtered_names = _test_ids(filtered_suite)
 
2850
        self.assertEqual(
 
2851
            filtered_names, ['breezy.tests.test_selftest.'
 
2852
                             'TestSelftestFiltering.test_filter_suite_by_re'])
 
2853
 
 
2854
    def test_filter_suite_by_id_list(self):
 
2855
        test_list = ['breezy.tests.test_selftest.'
 
2856
                     'TestSelftestFiltering.test_filter_suite_by_id_list']
 
2857
        filtered_suite = tests.filter_suite_by_id_list(
 
2858
            self.suite, tests.TestIdList(test_list))
 
2859
        filtered_names = _test_ids(filtered_suite)
 
2860
        self.assertEqual(
 
2861
            filtered_names,
 
2862
            ['breezy.tests.test_selftest.'
 
2863
             'TestSelftestFiltering.test_filter_suite_by_id_list'])
 
2864
 
 
2865
    def test_filter_suite_by_id_startswith(self):
 
2866
        # By design this test may fail if another test is added whose name also
 
2867
        # begins with one of the start value used.
 
2868
        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2869
        start1 = klass + 'test_filter_suite_by_id_starts'
 
2870
        start2 = klass + 'test_filter_suite_by_id_li'
 
2871
        test_list = [klass + 'test_filter_suite_by_id_list',
 
2872
                     klass + 'test_filter_suite_by_id_startswith',
 
2873
                     ]
 
2874
        filtered_suite = tests.filter_suite_by_id_startswith(
 
2875
            self.suite, [start1, start2])
 
2876
        self.assertEqual(
 
2877
            test_list,
 
2878
            _test_ids(filtered_suite),
 
2879
            )
 
2880
 
 
2881
    def test_preserve_input(self):
 
2882
        # NB: Surely this is something in the stdlib to do this?
 
2883
        self.assertIs(self.suite, tests.preserve_input(self.suite))
 
2884
        self.assertEqual("@#$", tests.preserve_input("@#$"))
 
2885
 
 
2886
    def test_randomize_suite(self):
 
2887
        randomized_suite = tests.randomize_suite(self.suite)
 
2888
        # randomizing should not add or remove test names.
 
2889
        self.assertEqual(set(_test_ids(self.suite)),
 
2890
                         set(_test_ids(randomized_suite)))
 
2891
        # Technically, this *can* fail, because random.shuffle(list) can be
 
2892
        # equal to list. Trying multiple times just pushes the frequency back.
 
2893
        # As its len(self.all_names)!:1, the failure frequency should be low
 
2894
        # enough to ignore. RBC 20071021.
 
2895
        # It should change the order.
 
2896
        self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
 
2897
        # But not the length. (Possibly redundant with the set test, but not
 
2898
        # necessarily.)
 
2899
        self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
 
2900
 
 
2901
    def test_split_suit_by_condition(self):
 
2902
        self.all_names = _test_ids(self.suite)
 
2903
        condition = tests.condition_id_re('test_filter_suite_by_r')
 
2904
        split_suite = tests.split_suite_by_condition(self.suite, condition)
 
2905
        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2906
                         'test_filter_suite_by_re')
 
2907
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2908
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2909
        remaining_names = list(self.all_names)
 
2910
        remaining_names.remove(filtered_name)
 
2911
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2912
 
 
2913
    def test_split_suit_by_re(self):
 
2914
        self.all_names = _test_ids(self.suite)
 
2915
        split_suite = tests.split_suite_by_re(self.suite,
 
2916
                                              'test_filter_suite_by_r')
 
2917
        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2918
                         'test_filter_suite_by_re')
 
2919
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2920
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2921
        remaining_names = list(self.all_names)
 
2922
        remaining_names.remove(filtered_name)
 
2923
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2924
 
 
2925
 
 
2926
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2927
 
 
2928
    def test_check_tree_shape(self):
 
2929
        files = ['a', 'b/', 'b/c']
 
2930
        tree = self.make_branch_and_tree('.')
 
2931
        self.build_tree(files)
 
2932
        tree.add(files)
 
2933
        tree.lock_read()
 
2934
        try:
 
2935
            self.check_tree_shape(tree, files)
 
2936
        finally:
 
2937
            tree.unlock()
 
2938
 
 
2939
 
 
2940
class TestBlackboxSupport(tests.TestCase):
 
2941
    """Tests for testsuite blackbox features."""
 
2942
 
 
2943
    def test_run_bzr_failure_not_caught(self):
 
2944
        # When we run bzr in blackbox mode, we want any unexpected errors to
 
2945
        # propagate up to the test suite so that it can show the error in the
 
2946
        # usual way, and we won't get a double traceback.
 
2947
        e = self.assertRaises(
 
2948
            AssertionError,
 
2949
            self.run_bzr, ['assert-fail'])
 
2950
        # make sure we got the real thing, not an error from somewhere else in
 
2951
        # the test framework
 
2952
        self.assertEqual('always fails', str(e))
 
2953
        # check that there's no traceback in the test log
 
2954
        self.assertNotContainsRe(self.get_log(), r'Traceback')
 
2955
 
 
2956
    def test_run_bzr_user_error_caught(self):
 
2957
        # Running bzr in blackbox mode, normal/expected/user errors should be
 
2958
        # caught in the regular way and turned into an error message plus exit
 
2959
        # code.
 
2960
        transport_server = memory.MemoryServer()
 
2961
        transport_server.start_server()
 
2962
        self.addCleanup(transport_server.stop_server)
 
2963
        url = transport_server.get_url()
 
2964
        self.permit_url(url)
 
2965
        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
 
2966
        self.assertEqual(out, '')
 
2967
        self.assertContainsRe(
 
2968
            err, 'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
 
2969
 
 
2970
 
 
2971
class TestTestLoader(tests.TestCase):
 
2972
    """Tests for the test loader."""
 
2973
 
 
2974
    def _get_loader_and_module(self):
 
2975
        """Gets a TestLoader and a module with one test in it."""
 
2976
        loader = TestUtil.TestLoader()
 
2977
        module = {}
 
2978
 
 
2979
        class Stub(tests.TestCase):
 
2980
            def test_foo(self):
 
2981
                pass
 
2982
 
 
2983
        class MyModule(object):
 
2984
            pass
 
2985
        MyModule.a_class = Stub
 
2986
        module = MyModule()
 
2987
        module.__name__ = 'fake_module'
 
2988
        return loader, module
 
2989
 
 
2990
    def test_module_no_load_tests_attribute_loads_classes(self):
 
2991
        loader, module = self._get_loader_and_module()
 
2992
        self.assertEqual(1, loader.loadTestsFromModule(
 
2993
            module).countTestCases())
 
2994
 
 
2995
    def test_module_load_tests_attribute_gets_called(self):
 
2996
        loader, module = self._get_loader_and_module()
 
2997
 
 
2998
        def load_tests(loader, standard_tests, pattern):
 
2999
            result = loader.suiteClass()
 
3000
            for test in tests.iter_suite_tests(standard_tests):
 
3001
                result.addTests([test, test])
 
3002
            return result
 
3003
        # add a load_tests() method which multiplies the tests from the module.
 
3004
        module.__class__.load_tests = staticmethod(load_tests)
 
3005
        self.assertEqual(
 
3006
            2 * [str(module.a_class('test_foo'))],
 
3007
            list(map(str, loader.loadTestsFromModule(module))))
 
3008
 
 
3009
    def test_load_tests_from_module_name_smoke_test(self):
 
3010
        loader = TestUtil.TestLoader()
 
3011
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3012
        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
 
3013
                         _test_ids(suite))
 
3014
 
 
3015
    def test_load_tests_from_module_name_with_bogus_module_name(self):
 
3016
        loader = TestUtil.TestLoader()
 
3017
        self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
 
3018
 
 
3019
 
 
3020
class TestTestIdList(tests.TestCase):
 
3021
 
 
3022
    def _create_id_list(self, test_list):
 
3023
        return tests.TestIdList(test_list)
 
3024
 
 
3025
    def _create_suite(self, test_id_list):
 
3026
 
 
3027
        class Stub(tests.TestCase):
 
3028
            def test_foo(self):
 
3029
                pass
 
3030
 
 
3031
        def _create_test_id(id):
 
3032
            return lambda: id
 
3033
 
 
3034
        suite = TestUtil.TestSuite()
 
3035
        for id in test_id_list:
 
3036
            t = Stub('test_foo')
 
3037
            t.id = _create_test_id(id)
 
3038
            suite.addTest(t)
 
3039
        return suite
 
3040
 
 
3041
    def _test_ids(self, test_suite):
 
3042
        """Get the ids for the tests in a test suite."""
 
3043
        return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
3044
 
 
3045
    def test_empty_list(self):
 
3046
        id_list = self._create_id_list([])
 
3047
        self.assertEqual({}, id_list.tests)
 
3048
        self.assertEqual({}, id_list.modules)
 
3049
 
 
3050
    def test_valid_list(self):
 
3051
        id_list = self._create_id_list(
 
3052
            ['mod1.cl1.meth1', 'mod1.cl1.meth2',
 
3053
             'mod1.func1', 'mod1.cl2.meth2',
 
3054
             'mod1.submod1',
 
3055
             'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
 
3056
             ])
 
3057
        self.assertTrue(id_list.refers_to('mod1'))
 
3058
        self.assertTrue(id_list.refers_to('mod1.submod1'))
 
3059
        self.assertTrue(id_list.refers_to('mod1.submod2'))
 
3060
        self.assertTrue(id_list.includes('mod1.cl1.meth1'))
 
3061
        self.assertTrue(id_list.includes('mod1.submod1'))
 
3062
        self.assertTrue(id_list.includes('mod1.func1'))
 
3063
 
 
3064
    def test_bad_chars_in_params(self):
 
3065
        id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
 
3066
        self.assertTrue(id_list.refers_to('mod1'))
 
3067
        self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
 
3068
 
 
3069
    def test_module_used(self):
 
3070
        id_list = self._create_id_list(['mod.class.meth'])
 
3071
        self.assertTrue(id_list.refers_to('mod'))
 
3072
        self.assertTrue(id_list.refers_to('mod.class'))
 
3073
        self.assertTrue(id_list.refers_to('mod.class.meth'))
 
3074
 
 
3075
    def test_test_suite_matches_id_list_with_unknown(self):
 
3076
        loader = TestUtil.TestLoader()
 
3077
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3078
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',
 
3079
                     'bogus']
 
3080
        not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
 
3081
        self.assertEqual(['bogus'], not_found)
 
3082
        self.assertEqual([], duplicates)
 
3083
 
 
3084
    def test_suite_matches_id_list_with_duplicates(self):
 
3085
        loader = TestUtil.TestLoader()
 
3086
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3087
        dupes = loader.suiteClass()
 
3088
        for test in tests.iter_suite_tests(suite):
 
3089
            dupes.addTest(test)
 
3090
            dupes.addTest(test)  # Add it again
 
3091
 
 
3092
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing', ]
 
3093
        not_found, duplicates = tests.suite_matches_id_list(
 
3094
            dupes, test_list)
 
3095
        self.assertEqual([], not_found)
 
3096
        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
 
3097
                         duplicates)
 
3098
 
 
3099
 
 
3100
class TestTestSuite(tests.TestCase):
 
3101
 
 
3102
    def test__test_suite_testmod_names(self):
 
3103
        # Test that a plausible list of test module names are returned
 
3104
        # by _test_suite_testmod_names.
 
3105
        test_list = tests._test_suite_testmod_names()
 
3106
        self.assertSubset([
 
3107
            'breezy.tests.blackbox',
 
3108
            'breezy.tests.per_transport',
 
3109
            'breezy.tests.test_selftest',
 
3110
            ],
 
3111
            test_list)
 
3112
 
 
3113
    def test__test_suite_modules_to_doctest(self):
 
3114
        # Test that a plausible list of modules to doctest is returned
 
3115
        # by _test_suite_modules_to_doctest.
 
3116
        test_list = tests._test_suite_modules_to_doctest()
 
3117
        if __doc__ is None:
 
3118
            # When docstrings are stripped, there are no modules to doctest
 
3119
            self.assertEqual([], test_list)
 
3120
            return
 
3121
        self.assertSubset([
 
3122
            'breezy.timestamp',
 
3123
            ],
 
3124
            test_list)
 
3125
 
 
3126
    def test_test_suite(self):
 
3127
        # test_suite() loads the entire test suite to operate. To avoid this
 
3128
        # overhead, and yet still be confident that things are happening,
 
3129
        # we temporarily replace two functions used by test_suite with
 
3130
        # test doubles that supply a few sample tests to load, and check they
 
3131
        # are loaded.
 
3132
        calls = []
 
3133
 
 
3134
        def testmod_names():
 
3135
            calls.append("testmod_names")
 
3136
            return [
 
3137
                'breezy.tests.blackbox.test_branch',
 
3138
                'breezy.tests.per_transport',
 
3139
                'breezy.tests.test_selftest',
 
3140
                ]
 
3141
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
 
3142
 
 
3143
        def doctests():
 
3144
            calls.append("modules_to_doctest")
 
3145
            if __doc__ is None:
 
3146
                return []
 
3147
            return ['breezy.timestamp']
 
3148
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
 
3149
        expected_test_list = [
 
3150
            # testmod_names
 
3151
            'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
 
3152
            ('breezy.tests.per_transport.TransportTests'
 
3153
             '.test_abspath(LocalTransport,LocalURLServer)'),
 
3154
            'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
 
3155
            # plugins can't be tested that way since selftest may be run with
 
3156
            # --no-plugins
 
3157
            ]
 
3158
        if __doc__ is not None and not PY3:
 
3159
            expected_test_list.extend([
 
3160
                # modules_to_doctest
 
3161
                'breezy.timestamp.format_highres_date',
 
3162
                ])
 
3163
        suite = tests.test_suite()
 
3164
        if PY3:
 
3165
            self.assertEqual({"testmod_names"}, set(calls))
 
3166
        else:
 
3167
            self.assertEqual({"testmod_names", "modules_to_doctest"},
 
3168
                             set(calls))
 
3169
        self.assertSubset(expected_test_list, _test_ids(suite))
 
3170
 
 
3171
    def test_test_suite_list_and_start(self):
 
3172
        # We cannot test this at the same time as the main load, because we
 
3173
        # want to know that starting_with == None works. So a second load is
 
3174
        # incurred - note that the starting_with parameter causes a partial
 
3175
        # load rather than a full load so this test should be pretty quick.
 
3176
        test_list = [
 
3177
            'breezy.tests.test_selftest.TestTestSuite.test_test_suite']
 
3178
        suite = tests.test_suite(test_list,
 
3179
                                 ['breezy.tests.test_selftest.TestTestSuite'])
 
3180
        # test_test_suite_list_and_start is not included
 
3181
        self.assertEqual(test_list, _test_ids(suite))
 
3182
 
 
3183
 
 
3184
class TestLoadTestIdList(tests.TestCaseInTempDir):
 
3185
 
 
3186
    def _create_test_list_file(self, file_name, content):
 
3187
        fl = open(file_name, 'wt')
 
3188
        fl.write(content)
 
3189
        fl.close()
 
3190
 
 
3191
    def test_load_unknown(self):
 
3192
        self.assertRaises(errors.NoSuchFile,
 
3193
                          tests.load_test_id_list, 'i_do_not_exist')
 
3194
 
 
3195
    def test_load_test_list(self):
 
3196
        test_list_fname = 'test.list'
 
3197
        self._create_test_list_file(test_list_fname,
 
3198
                                    'mod1.cl1.meth1\nmod2.cl2.meth2\n')
 
3199
        tlist = tests.load_test_id_list(test_list_fname)
 
3200
        self.assertEqual(2, len(tlist))
 
3201
        self.assertEqual('mod1.cl1.meth1', tlist[0])
 
3202
        self.assertEqual('mod2.cl2.meth2', tlist[1])
 
3203
 
 
3204
    def test_load_dirty_file(self):
 
3205
        test_list_fname = 'test.list'
 
3206
        self._create_test_list_file(test_list_fname,
 
3207
                                    '  mod1.cl1.meth1\n\nmod2.cl2.meth2  \n'
 
3208
                                    'bar baz\n')
 
3209
        tlist = tests.load_test_id_list(test_list_fname)
 
3210
        self.assertEqual(4, len(tlist))
 
3211
        self.assertEqual('mod1.cl1.meth1', tlist[0])
 
3212
        self.assertEqual('', tlist[1])
 
3213
        self.assertEqual('mod2.cl2.meth2', tlist[2])
 
3214
        self.assertEqual('bar baz', tlist[3])
 
3215
 
 
3216
 
 
3217
class TestFilteredByModuleTestLoader(tests.TestCase):
 
3218
 
 
3219
    def _create_loader(self, test_list):
 
3220
        id_filter = tests.TestIdList(test_list)
 
3221
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3222
        return loader
 
3223
 
 
3224
    def test_load_tests(self):
 
3225
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3226
        loader = self._create_loader(test_list)
 
3227
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3228
        self.assertEqual(test_list, _test_ids(suite))
 
3229
 
 
3230
    def test_exclude_tests(self):
 
3231
        test_list = ['bogus']
 
3232
        loader = self._create_loader(test_list)
 
3233
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3234
        self.assertEqual([], _test_ids(suite))
 
3235
 
 
3236
 
 
3237
class TestFilteredByNameStartTestLoader(tests.TestCase):
 
3238
 
 
3239
    def _create_loader(self, name_start):
 
3240
        def needs_module(name):
 
3241
            return name.startswith(name_start) or name_start.startswith(name)
 
3242
        loader = TestUtil.FilteredByModuleTestLoader(needs_module)
 
3243
        return loader
 
3244
 
 
3245
    def test_load_tests(self):
 
3246
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3247
        loader = self._create_loader('breezy.tests.test_samp')
 
3248
 
 
3249
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3250
        self.assertEqual(test_list, _test_ids(suite))
 
3251
 
 
3252
    def test_load_tests_inside_module(self):
 
3253
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3254
        loader = self._create_loader('breezy.tests.test_sampler.Demo')
 
3255
 
 
3256
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3257
        self.assertEqual(test_list, _test_ids(suite))
 
3258
 
 
3259
    def test_exclude_tests(self):
 
3260
        loader = self._create_loader('bogus')
 
3261
 
 
3262
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3263
        self.assertEqual([], _test_ids(suite))
 
3264
 
 
3265
 
 
3266
class TestTestPrefixRegistry(tests.TestCase):
 
3267
 
 
3268
    def _get_registry(self):
 
3269
        tp_registry = tests.TestPrefixAliasRegistry()
 
3270
        return tp_registry
 
3271
 
 
3272
    def test_register_new_prefix(self):
 
3273
        tpr = self._get_registry()
 
3274
        tpr.register('foo', 'fff.ooo.ooo')
 
3275
        self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
 
3276
 
 
3277
    def test_register_existing_prefix(self):
 
3278
        tpr = self._get_registry()
 
3279
        tpr.register('bar', 'bbb.aaa.rrr')
 
3280
        tpr.register('bar', 'bBB.aAA.rRR')
 
3281
        self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
 
3282
        self.assertThat(self.get_log(),
 
3283
                        DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3284
                                       doctest.ELLIPSIS))
 
3285
 
 
3286
    def test_get_unknown_prefix(self):
 
3287
        tpr = self._get_registry()
 
3288
        self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
 
3289
 
 
3290
    def test_resolve_prefix(self):
 
3291
        tpr = self._get_registry()
 
3292
        tpr.register('bar', 'bb.aa.rr')
 
3293
        self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
 
3294
 
 
3295
    def test_resolve_unknown_alias(self):
 
3296
        tpr = self._get_registry()
 
3297
        self.assertRaises(errors.BzrCommandError,
 
3298
                          tpr.resolve_alias, 'I am not a prefix')
 
3299
 
 
3300
    def test_predefined_prefixes(self):
 
3301
        tpr = tests.test_prefix_alias_registry
 
3302
        self.assertEqual('breezy', tpr.resolve_alias('breezy'))
 
3303
        self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
 
3304
        self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
 
3305
        self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
 
3306
        self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
 
3307
        self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
 
3308
 
 
3309
 
 
3310
class TestThreadLeakDetection(tests.TestCase):
 
3311
    """Ensure when tests leak threads we detect and report it"""
 
3312
 
 
3313
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3314
        def __init__(self):
 
3315
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3316
            self.leaks = []
 
3317
 
 
3318
        def _report_thread_leak(self, test, leaks, alive):
 
3319
            self.leaks.append((test, leaks))
 
3320
 
 
3321
    def test_testcase_without_addCleanups(self):
 
3322
        """Check old TestCase instances don't break with leak detection"""
 
3323
        class Test(unittest.TestCase):
 
3324
            def runTest(self):
 
3325
                pass
 
3326
        result = self.LeakRecordingResult()
 
3327
        test = Test()
 
3328
        result.startTestRun()
 
3329
        test.run(result)
 
3330
        result.stopTestRun()
 
3331
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3332
        self.assertEqual(result.leaks, [])
 
3333
 
 
3334
    def test_thread_leak(self):
 
3335
        """Ensure a thread that outlives the running of a test is reported
 
3336
 
 
3337
        Uses a thread that blocks on an event, and is started by the inner
 
3338
        test case. As the thread outlives the inner case's run, it should be
 
3339
        detected as a leak, but the event is then set so that the thread can
 
3340
        be safely joined in cleanup so it's not leaked for real.
 
3341
        """
 
3342
        event = threading.Event()
 
3343
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3344
 
 
3345
        class Test(tests.TestCase):
 
3346
            def test_leak(self):
 
3347
                thread.start()
 
3348
        result = self.LeakRecordingResult()
 
3349
        test = Test("test_leak")
 
3350
        self.addCleanup(thread.join)
 
3351
        self.addCleanup(event.set)
 
3352
        result.startTestRun()
 
3353
        test.run(result)
 
3354
        result.stopTestRun()
 
3355
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3356
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3357
        self.assertEqual(result.leaks, [(test, {thread})])
 
3358
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3359
 
 
3360
    def test_multiple_leaks(self):
 
3361
        """Check multiple leaks are blamed on the test cases at fault
 
3362
 
 
3363
        Same concept as the previous test, but has one inner test method that
 
3364
        leaks two threads, and one that doesn't leak at all.
 
3365
        """
 
3366
        event = threading.Event()
 
3367
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3368
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3369
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3370
 
 
3371
        class Test(tests.TestCase):
 
3372
            def test_first_leak(self):
 
3373
                thread_b.start()
 
3374
 
 
3375
            def test_second_no_leak(self):
 
3376
                pass
 
3377
 
 
3378
            def test_third_leak(self):
 
3379
                thread_c.start()
 
3380
                thread_a.start()
 
3381
        result = self.LeakRecordingResult()
 
3382
        first_test = Test("test_first_leak")
 
3383
        third_test = Test("test_third_leak")
 
3384
        self.addCleanup(thread_a.join)
 
3385
        self.addCleanup(thread_b.join)
 
3386
        self.addCleanup(thread_c.join)
 
3387
        self.addCleanup(event.set)
 
3388
        result.startTestRun()
 
3389
        unittest.TestSuite(
 
3390
            [first_test, Test("test_second_no_leak"), third_test]
 
3391
            ).run(result)
 
3392
        result.stopTestRun()
 
3393
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3394
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3395
        self.assertEqual(result.leaks, [
 
3396
            (first_test, {thread_b}),
 
3397
            (third_test, {thread_a, thread_c})])
 
3398
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3399
 
 
3400
 
 
3401
class TestPostMortemDebugging(tests.TestCase):
 
3402
    """Check post mortem debugging works when tests fail or error"""
 
3403
 
 
3404
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3405
        def __init__(self):
 
3406
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3407
            self.postcode = None
 
3408
 
 
3409
        def _post_mortem(self, tb=None):
 
3410
            """Record the code object at the end of the current traceback"""
 
3411
            tb = tb or sys.exc_info()[2]
 
3412
            if tb is not None:
 
3413
                next = tb.tb_next
 
3414
                while next is not None:
 
3415
                    tb = next
 
3416
                    next = next.tb_next
 
3417
                self.postcode = tb.tb_frame.f_code
 
3418
 
 
3419
        def report_error(self, test, err):
 
3420
            pass
 
3421
 
 
3422
        def report_failure(self, test, err):
 
3423
            pass
 
3424
 
 
3425
    def test_location_unittest_error(self):
 
3426
        """Needs right post mortem traceback with erroring unittest case"""
 
3427
        class Test(unittest.TestCase):
 
3428
            def runTest(self):
 
3429
                raise RuntimeError
 
3430
        result = self.TracebackRecordingResult()
 
3431
        Test().run(result)
 
3432
        self.assertEqual(result.postcode, Test.runTest.__code__)
 
3433
 
 
3434
    def test_location_unittest_failure(self):
 
3435
        """Needs right post mortem traceback with failing unittest case"""
 
3436
        class Test(unittest.TestCase):
 
3437
            def runTest(self):
 
3438
                raise self.failureException
 
3439
        result = self.TracebackRecordingResult()
 
3440
        Test().run(result)
 
3441
        self.assertEqual(result.postcode, Test.runTest.__code__)
 
3442
 
 
3443
    def test_location_bt_error(self):
 
3444
        """Needs right post mortem traceback with erroring breezy.tests case"""
 
3445
        class Test(tests.TestCase):
 
3446
            def test_error(self):
 
3447
                raise RuntimeError
 
3448
        result = self.TracebackRecordingResult()
 
3449
        Test("test_error").run(result)
 
3450
        self.assertEqual(result.postcode, Test.test_error.__code__)
 
3451
 
 
3452
    def test_location_bt_failure(self):
 
3453
        """Needs right post mortem traceback with failing breezy.tests case"""
 
3454
        class Test(tests.TestCase):
 
3455
            def test_failure(self):
 
3456
                raise self.failureException
 
3457
        result = self.TracebackRecordingResult()
 
3458
        Test("test_failure").run(result)
 
3459
        self.assertEqual(result.postcode, Test.test_failure.__code__)
 
3460
 
 
3461
    def test_env_var_triggers_post_mortem(self):
 
3462
        """Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
 
3463
        import pdb
 
3464
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3465
        post_mortem_calls = []
 
3466
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3467
        self.overrideEnv('BRZ_TEST_PDB', None)
 
3468
        result._post_mortem(1)
 
3469
        self.overrideEnv('BRZ_TEST_PDB', 'on')
 
3470
        result._post_mortem(2)
 
3471
        self.assertEqual([2], post_mortem_calls)
 
3472
 
 
3473
 
 
3474
class TestRunSuite(tests.TestCase):
 
3475
 
 
3476
    def test_runner_class(self):
 
3477
        """run_suite accepts and uses a runner_class keyword argument."""
 
3478
        class Stub(tests.TestCase):
 
3479
            def test_foo(self):
 
3480
                pass
 
3481
        suite = Stub("test_foo")
 
3482
        calls = []
 
3483
 
 
3484
        class MyRunner(tests.TextTestRunner):
 
3485
            def run(self, test):
 
3486
                calls.append(test)
 
3487
                return tests.ExtendedTestResult(self.stream, self.descriptions,
 
3488
                                                self.verbosity)
 
3489
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
 
3490
        self.assertLength(1, calls)
 
3491
 
 
3492
 
 
3493
class _Selftest(object):
 
3494
    """Mixin for tests needing full selftest output"""
 
3495
 
 
3496
    def _inject_stream_into_subunit(self, stream):
 
3497
        """To be overridden by subclasses that run tests out of process"""
 
3498
 
 
3499
    def _run_selftest(self, **kwargs):
 
3500
        if PY3:
 
3501
            bio = BytesIO()
 
3502
            sio = TextIOWrapper(bio, 'utf-8')
 
3503
        else:
 
3504
            sio = bio = StringIO()
 
3505
        self._inject_stream_into_subunit(bio)
 
3506
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3507
        sio.flush()
 
3508
        return bio.getvalue()
 
3509
 
 
3510
 
 
3511
class _ForkedSelftest(_Selftest):
 
3512
    """Mixin for tests needing full selftest output with forked children"""
 
3513
 
 
3514
    _test_needs_features = [features.subunit]
 
3515
 
 
3516
    def _inject_stream_into_subunit(self, stream):
 
3517
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3518
 
 
3519
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3520
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3521
        """
 
3522
        from subunit import ProtocolTestCase
 
3523
        _original_init = ProtocolTestCase.__init__
 
3524
 
 
3525
        def _init_with_passthrough(self, *args, **kwargs):
 
3526
            _original_init(self, *args, **kwargs)
 
3527
            self._passthrough = stream
 
3528
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3529
 
 
3530
    def _run_selftest(self, **kwargs):
 
3531
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3532
        if getattr(os, "fork", None) is None:
 
3533
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3534
        # Make sure the fork code is actually invoked by claiming two cores
 
3535
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3536
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3537
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3538
 
 
3539
 
 
3540
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3541
    """Check operation of --parallel=fork selftest option"""
 
3542
 
 
3543
    def test_error_in_child_during_fork(self):
 
3544
        """Error in a forked child during test setup should get reported"""
 
3545
        class Test(tests.TestCase):
 
3546
            def testMethod(self):
 
3547
                pass
 
3548
        # We don't care what, just break something that a child will run
 
3549
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3550
        out = self._run_selftest(test_suite_factory=Test)
 
3551
        # Lines from the tracebacks of the two child processes may be mixed
 
3552
        # together due to the way subunit parses and forwards the streams,
 
3553
        # so permit extra lines between each part of the error output.
 
3554
        self.assertContainsRe(out,
 
3555
                              b"Traceback.*:\n"
 
3556
                              b"(?:.*\n)*"
 
3557
                              b".+ in fork_for_tests\n"
 
3558
                              b"(?:.*\n)*"
 
3559
                              b"\\s*workaround_zealous_crypto_random\\(\\)\n"
 
3560
                              b"(?:.*\n)*"
 
3561
                              b"TypeError:")
 
3562
 
 
3563
 
 
3564
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3565
    """Check a test case still alive after being run emits a warning"""
 
3566
 
 
3567
    class Test(tests.TestCase):
 
3568
        def test_pass(self):
 
3569
            pass
 
3570
 
 
3571
        def test_self_ref(self):
 
3572
            self.also_self = self.test_self_ref
 
3573
 
 
3574
        def test_skip(self):
 
3575
            self.skipTest("Don't need")
 
3576
 
 
3577
    def _get_suite(self):
 
3578
        return TestUtil.TestSuite([
 
3579
            self.Test("test_pass"),
 
3580
            self.Test("test_self_ref"),
 
3581
            self.Test("test_skip"),
 
3582
            ])
 
3583
 
 
3584
    def _run_selftest_with_suite(self, **kwargs):
 
3585
        old_flags = tests.selftest_debug_flags
 
3586
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3587
        gc_on = gc.isenabled()
 
3588
        if gc_on:
 
3589
            gc.disable()
 
3590
        try:
 
3591
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3592
                                        **kwargs)
 
3593
        finally:
 
3594
            if gc_on:
 
3595
                gc.enable()
 
3596
            tests.selftest_debug_flags = old_flags
 
3597
        self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
 
3598
        self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
 
3599
        return output
 
3600
 
 
3601
    def test_testsuite(self):
 
3602
        self._run_selftest_with_suite()
 
3603
 
 
3604
    def test_pattern(self):
 
3605
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3606
        self.assertNotContainsRe(out, b"test_skip")
 
3607
 
 
3608
    def test_exclude_pattern(self):
 
3609
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3610
        self.assertNotContainsRe(out, b"test_skip")
 
3611
 
 
3612
    def test_random_seed(self):
 
3613
        self._run_selftest_with_suite(random_seed="now")
 
3614
 
 
3615
    def test_matching_tests_first(self):
 
3616
        self._run_selftest_with_suite(matching_tests_first=True,
 
3617
                                      pattern="test_self_ref$")
 
3618
 
 
3619
    def test_starting_with_and_exclude(self):
 
3620
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3621
                                            exclude_pattern="test_skip$")
 
3622
        self.assertNotContainsRe(out, b"test_skip")
 
3623
 
 
3624
    def test_additonal_decorator(self):
 
3625
        self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator])
 
3626
 
 
3627
 
 
3628
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3629
    """Check warnings from tests staying alive are emitted with subunit"""
 
3630
 
 
3631
    _test_needs_features = [features.subunit]
 
3632
 
 
3633
    def _run_selftest_with_suite(self, **kwargs):
 
3634
        return TestUncollectedWarnings._run_selftest_with_suite(
 
3635
            self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
 
3636
 
 
3637
 
 
3638
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3639
    """Check warnings from tests staying alive are emitted when forking"""
 
3640
 
 
3641
 
 
3642
class TestEnvironHandling(tests.TestCase):
 
3643
 
 
3644
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3645
        self.assertFalse('MYVAR' in os.environ)
 
3646
        self.overrideEnv('MYVAR', '42')
 
3647
        # We use an embedded test to make sure we fix the _captureVar bug
 
3648
 
 
3649
        class Test(tests.TestCase):
 
3650
            def test_me(self):
 
3651
                # The first call save the 42 value
 
3652
                self.overrideEnv('MYVAR', None)
 
3653
                self.assertEqual(None, os.environ.get('MYVAR'))
 
3654
                # Make sure we can call it twice
 
3655
                self.overrideEnv('MYVAR', None)
 
3656
                self.assertEqual(None, os.environ.get('MYVAR'))
 
3657
        output = StringIO()
 
3658
        result = tests.TextTestResult(output, 0, 1)
 
3659
        Test('test_me').run(result)
 
3660
        if not result.wasStrictlySuccessful():
 
3661
            self.fail(output.getvalue())
 
3662
        # We get our value back
 
3663
        self.assertEqual('42', os.environ.get('MYVAR'))
 
3664
 
 
3665
 
 
3666
class TestIsolatedEnv(tests.TestCase):
 
3667
    """Test isolating tests from os.environ.
 
3668
 
 
3669
    Since we use tests that are already isolated from os.environ a bit of care
 
3670
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3671
    The tests start an already clean os.environ which allow doing valid
 
3672
    assertions about which variables are present or not and design tests around
 
3673
    these assertions.
 
3674
    """
 
3675
 
 
3676
    class ScratchMonkey(tests.TestCase):
 
3677
 
 
3678
        def test_me(self):
 
3679
            pass
 
3680
 
 
3681
    def test_basics(self):
 
3682
        # Make sure we know the definition of BRZ_HOME: not part of os.environ
 
3683
        # for tests.TestCase.
 
3684
        self.assertTrue('BRZ_HOME' in tests.isolated_environ)
 
3685
        self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
 
3686
        # Being part of isolated_environ, BRZ_HOME should not appear here
 
3687
        self.assertFalse('BRZ_HOME' in os.environ)
 
3688
        # Make sure we know the definition of LINES: part of os.environ for
 
3689
        # tests.TestCase
 
3690
        self.assertTrue('LINES' in tests.isolated_environ)
 
3691
        self.assertEqual('25', tests.isolated_environ['LINES'])
 
3692
        self.assertEqual('25', os.environ['LINES'])
 
3693
 
 
3694
    def test_injecting_unknown_variable(self):
 
3695
        # BRZ_HOME is known to be absent from os.environ
 
3696
        test = self.ScratchMonkey('test_me')
 
3697
        tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
 
3698
        self.assertEqual('foo', os.environ['BRZ_HOME'])
 
3699
        tests.restore_os_environ(test)
 
3700
        self.assertFalse('BRZ_HOME' in os.environ)
 
3701
 
 
3702
    def test_injecting_known_variable(self):
 
3703
        test = self.ScratchMonkey('test_me')
 
3704
        # LINES is known to be present in os.environ
 
3705
        tests.override_os_environ(test, {'LINES': '42'})
 
3706
        self.assertEqual('42', os.environ['LINES'])
 
3707
        tests.restore_os_environ(test)
 
3708
        self.assertEqual('25', os.environ['LINES'])
 
3709
 
 
3710
    def test_deleting_variable(self):
 
3711
        test = self.ScratchMonkey('test_me')
 
3712
        # LINES is known to be present in os.environ
 
3713
        tests.override_os_environ(test, {'LINES': None})
 
3714
        self.assertTrue('LINES' not in os.environ)
 
3715
        tests.restore_os_environ(test)
 
3716
        self.assertEqual('25', os.environ['LINES'])
 
3717
 
 
3718
 
 
3719
class TestDocTestSuiteIsolation(tests.TestCase):
 
3720
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3721
 
 
3722
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3723
    the clean environment as a base for testing. To precisely capture the
 
3724
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3725
    compare against.
 
3726
 
 
3727
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3728
    not `os.environ` so each test overrides it to suit its needs.
 
3729
 
 
3730
    """
 
3731
 
 
3732
    def get_doctest_suite_for_string(self, klass, string):
 
3733
        class Finder(doctest.DocTestFinder):
 
3734
 
 
3735
            def find(*args, **kwargs):
 
3736
                test = doctest.DocTestParser().get_doctest(
 
3737
                    string, {}, 'foo', 'foo.py', 0)
 
3738
                return [test]
 
3739
 
 
3740
        suite = klass(test_finder=Finder())
 
3741
        return suite
 
3742
 
 
3743
    def run_doctest_suite_for_string(self, klass, string):
 
3744
        suite = self.get_doctest_suite_for_string(klass, string)
 
3745
        output = StringIO()
 
3746
        result = tests.TextTestResult(output, 0, 1)
 
3747
        suite.run(result)
 
3748
        return result, output
 
3749
 
 
3750
    def assertDocTestStringSucceds(self, klass, string):
 
3751
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3752
        if not result.wasStrictlySuccessful():
 
3753
            self.fail(output.getvalue())
 
3754
 
 
3755
    def assertDocTestStringFails(self, klass, string):
 
3756
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3757
        if result.wasStrictlySuccessful():
 
3758
            self.fail(output.getvalue())
 
3759
 
 
3760
    def test_injected_variable(self):
 
3761
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3762
        test = """
 
3763
            >>> import os
 
3764
            >>> os.environ['LINES']
 
3765
            '42'
 
3766
            """
 
3767
        # doctest.DocTestSuite fails as it sees '25'
 
3768
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3769
        # tests.DocTestSuite sees '42'
 
3770
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3771
 
 
3772
    def test_deleted_variable(self):
 
3773
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3774
        test = """
 
3775
            >>> import os
 
3776
            >>> os.environ.get('LINES')
 
3777
            """
 
3778
        # doctest.DocTestSuite fails as it sees '25'
 
3779
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3780
        # tests.DocTestSuite sees None
 
3781
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3782
 
 
3783
 
 
3784
class TestSelftestExcludePatterns(tests.TestCase):
 
3785
 
 
3786
    def setUp(self):
 
3787
        super(TestSelftestExcludePatterns, self).setUp()
 
3788
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3789
 
 
3790
    def suite_factory(self, keep_only=None, starting_with=None):
 
3791
        """A test suite factory with only a few tests."""
 
3792
        class Test(tests.TestCase):
 
3793
            def id(self):
 
3794
                # We don't need the full class path
 
3795
                return self._testMethodName
 
3796
 
 
3797
            def a(self):
 
3798
                pass
 
3799
 
 
3800
            def b(self):
 
3801
                pass
 
3802
 
 
3803
            def c(self):
 
3804
                pass
 
3805
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3806
 
 
3807
    def assertTestList(self, expected, *selftest_args):
 
3808
        # We rely on setUp installing the right test suite factory so we can
 
3809
        # test at the command level without loading the whole test suite
 
3810
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3811
        actual = out.splitlines()
 
3812
        self.assertEqual(expected, actual)
 
3813
 
 
3814
    def test_full_list(self):
 
3815
        self.assertTestList(['a', 'b', 'c'])
 
3816
 
 
3817
    def test_single_exclude(self):
 
3818
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3819
 
 
3820
    def test_mutiple_excludes(self):
 
3821
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
 
3822
 
 
3823
 
 
3824
class TestCounterHooks(tests.TestCase, SelfTestHelper):
 
3825
 
 
3826
    _test_needs_features = [features.subunit]
 
3827
 
 
3828
    def setUp(self):
 
3829
        super(TestCounterHooks, self).setUp()
 
3830
 
 
3831
        class Test(tests.TestCase):
 
3832
 
 
3833
            def setUp(self):
 
3834
                super(Test, self).setUp()
 
3835
                self.hooks = hooks.Hooks()
 
3836
                self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
 
3837
                self.install_counter_hook(self.hooks, 'myhook')
 
3838
 
 
3839
            def no_hook(self):
 
3840
                pass
 
3841
 
 
3842
            def run_hook_once(self):
 
3843
                for hook in self.hooks['myhook']:
 
3844
                    hook(self)
 
3845
 
 
3846
        self.test_class = Test
 
3847
 
 
3848
    def assertHookCalls(self, expected_calls, test_name):
 
3849
        test = self.test_class(test_name)
 
3850
        result = unittest.TestResult()
 
3851
        test.run(result)
 
3852
        self.assertTrue(hasattr(test, '_counters'))
 
3853
        self.assertTrue('myhook' in test._counters)
 
3854
        self.assertEqual(expected_calls, test._counters['myhook'])
 
3855
 
 
3856
    def test_no_hook(self):
 
3857
        self.assertHookCalls(0, 'no_hook')
 
3858
 
 
3859
    def test_run_hook_once(self):
 
3860
        tt = features.testtools
 
3861
        if tt.module.__version__ < (0, 9, 8):
 
3862
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
 
3863
        self.assertHookCalls(1, 'run_hook_once')