/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_selftest.py

  • Committer: Gustav Hartvigsson
  • Date: 2021-01-09 21:36:27 UTC
  • Revision ID: gustav.hartvigsson@gmail.com-20210109213627-h1xwcutzy9m7a99b
Added 'Case Preserving Working Tree Use Cases' from Canonical Wiki

* Addod a page from the Canonical Bazaar wiki
  with information on the scmeatics of case
  perserving filesystems an a case insensitive
  filesystem works.
  
  * Needs re-work, but this will do as it is the
    same inforamoton as what was on the linked
    page in the currint documentation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2013, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the test framework."""
 
18
 
 
19
import gc
 
20
import doctest
 
21
from functools import reduce
 
22
from io import BytesIO, StringIO, TextIOWrapper
 
23
import os
 
24
import signal
 
25
import sys
 
26
import threading
 
27
import time
 
28
import unittest
 
29
import warnings
 
30
 
 
31
from testtools import (
 
32
    ExtendedToOriginalDecorator,
 
33
    MultiTestResult,
 
34
    )
 
35
from testtools.content import Content
 
36
from testtools.content_type import ContentType
 
37
from testtools.matchers import (
 
38
    DocTestMatches,
 
39
    Equals,
 
40
    )
 
41
import testtools.testresult.doubles
 
42
 
 
43
import breezy
 
44
from .. import (
 
45
    branchbuilder,
 
46
    controldir,
 
47
    errors,
 
48
    hooks,
 
49
    lockdir,
 
50
    memorytree,
 
51
    osutils,
 
52
    repository,
 
53
    symbol_versioning,
 
54
    tests,
 
55
    transport,
 
56
    workingtree,
 
57
    )
 
58
from ..bzr import (
 
59
    bzrdir,
 
60
    remote,
 
61
    workingtree_3,
 
62
    workingtree_4,
 
63
    )
 
64
from ..bzr import (
 
65
    groupcompress_repo,
 
66
    )
 
67
from ..git import (
 
68
    workingtree as git_workingtree,
 
69
    )
 
70
from ..symbol_versioning import (
 
71
    deprecated_function,
 
72
    deprecated_in,
 
73
    deprecated_method,
 
74
    )
 
75
from . import (
 
76
    features,
 
77
    test_lsprof,
 
78
    test_server,
 
79
    TestUtil,
 
80
    )
 
81
from ..trace import note, mutter
 
82
from ..transport import memory
 
83
 
 
84
 
 
85
def _test_ids(test_suite):
 
86
    """Get the ids for the tests in a test suite."""
 
87
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
88
 
 
89
 
 
90
class MetaTestLog(tests.TestCase):
 
91
 
 
92
    def test_logging(self):
 
93
        """Test logs are captured when a test fails."""
 
94
        self.log('a test message')
 
95
        details = self.getDetails()
 
96
        log = details['log']
 
97
        self.assertThat(log.content_type, Equals(ContentType(
 
98
            "text", "plain", {"charset": "utf8"})))
 
99
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
 
100
        self.assertThat(self.get_log(),
 
101
                        DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
102
 
 
103
 
 
104
class TestTreeShape(tests.TestCaseInTempDir):
 
105
 
 
106
    def test_unicode_paths(self):
 
107
        self.requireFeature(features.UnicodeFilenameFeature)
 
108
 
 
109
        filename = u'hell\u00d8'
 
110
        self.build_tree_contents([(filename, b'contents of hello')])
 
111
        self.assertPathExists(filename)
 
112
 
 
113
 
 
114
class TestClassesAvailable(tests.TestCase):
 
115
    """As a convenience we expose Test* classes from breezy.tests"""
 
116
 
 
117
    def test_test_case(self):
 
118
        from . import TestCase
 
119
 
 
120
    def test_test_loader(self):
 
121
        from . import TestLoader
 
122
 
 
123
    def test_test_suite(self):
 
124
        from . import TestSuite
 
125
 
 
126
 
 
127
class TestTransportScenarios(tests.TestCase):
 
128
    """A group of tests that test the transport implementation adaption core.
 
129
 
 
130
    This is a meta test that the tests are applied to all available
 
131
    transports.
 
132
 
 
133
    This will be generalised in the future which is why it is in this
 
134
    test file even though it is specific to transport tests at the moment.
 
135
    """
 
136
 
 
137
    def test_get_transport_permutations(self):
 
138
        # this checks that get_test_permutations defined by the module is
 
139
        # called by the get_transport_test_permutations function.
 
140
        class MockModule(object):
 
141
            def get_test_permutations(self):
 
142
                return sample_permutation
 
143
        sample_permutation = [(1, 2), (3, 4)]
 
144
        from .per_transport import get_transport_test_permutations
 
145
        self.assertEqual(sample_permutation,
 
146
                         get_transport_test_permutations(MockModule()))
 
147
 
 
148
    def test_scenarios_include_all_modules(self):
 
149
        # this checks that the scenario generator returns as many permutations
 
150
        # as there are in all the registered transport modules - we assume if
 
151
        # this matches its probably doing the right thing especially in
 
152
        # combination with the tests for setting the right classes below.
 
153
        from .per_transport import transport_test_permutations
 
154
        from ..transport import _get_transport_modules
 
155
        modules = _get_transport_modules()
 
156
        permutation_count = 0
 
157
        for module in modules:
 
158
            try:
 
159
                permutation_count += len(reduce(getattr,
 
160
                                                (module
 
161
                                                 + ".get_test_permutations").split('.')[1:],
 
162
                                                __import__(module))())
 
163
            except errors.DependencyNotPresent:
 
164
                pass
 
165
        scenarios = transport_test_permutations()
 
166
        self.assertEqual(permutation_count, len(scenarios))
 
167
 
 
168
    def test_scenarios_include_transport_class(self):
 
169
        # This test used to know about all the possible transports and the
 
170
        # order they were returned but that seems overly brittle (mbp
 
171
        # 20060307)
 
172
        from .per_transport import transport_test_permutations
 
173
        scenarios = transport_test_permutations()
 
174
        # there are at least that many builtin transports
 
175
        self.assertTrue(len(scenarios) > 6)
 
176
        one_scenario = scenarios[0]
 
177
        self.assertIsInstance(one_scenario[0], str)
 
178
        self.assertTrue(issubclass(one_scenario[1]["transport_class"],
 
179
                                   breezy.transport.Transport))
 
180
        self.assertTrue(issubclass(one_scenario[1]["transport_server"],
 
181
                                   breezy.transport.Server))
 
182
 
 
183
 
 
184
class TestBranchScenarios(tests.TestCase):
 
185
 
 
186
    def test_scenarios(self):
 
187
        # check that constructor parameters are passed through to the adapted
 
188
        # test.
 
189
        from .per_branch import make_scenarios
 
190
        server1 = "a"
 
191
        server2 = "b"
 
192
        formats = [("c", "C"), ("d", "D")]
 
193
        scenarios = make_scenarios(server1, server2, formats)
 
194
        self.assertEqual(2, len(scenarios))
 
195
        self.assertEqual([
 
196
            ('str',
 
197
             {'branch_format': 'c',
 
198
              'bzrdir_format': 'C',
 
199
              'transport_readonly_server': 'b',
 
200
              'transport_server': 'a'}),
 
201
            ('str',
 
202
             {'branch_format': 'd',
 
203
              'bzrdir_format': 'D',
 
204
              'transport_readonly_server': 'b',
 
205
              'transport_server': 'a'})],
 
206
            scenarios)
 
207
 
 
208
 
 
209
class TestBzrDirScenarios(tests.TestCase):
 
210
 
 
211
    def test_scenarios(self):
 
212
        # check that constructor parameters are passed through to the adapted
 
213
        # test.
 
214
        from .per_controldir import make_scenarios
 
215
        vfs_factory = "v"
 
216
        server1 = "a"
 
217
        server2 = "b"
 
218
        formats = ["c", "d"]
 
219
        scenarios = make_scenarios(vfs_factory, server1, server2, formats)
 
220
        self.assertEqual([
 
221
            ('str',
 
222
             {'bzrdir_format': 'c',
 
223
              'transport_readonly_server': 'b',
 
224
              'transport_server': 'a',
 
225
              'vfs_transport_factory': 'v'}),
 
226
            ('str',
 
227
             {'bzrdir_format': 'd',
 
228
              'transport_readonly_server': 'b',
 
229
              'transport_server': 'a',
 
230
              'vfs_transport_factory': 'v'})],
 
231
            scenarios)
 
232
 
 
233
 
 
234
class TestRepositoryScenarios(tests.TestCase):
 
235
 
 
236
    def test_formats_to_scenarios(self):
 
237
        from .per_repository import formats_to_scenarios
 
238
        formats = [("(c)", remote.RemoteRepositoryFormat()),
 
239
                   ("(d)", repository.format_registry.get(
 
240
                    b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
 
241
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
242
                                                None)
 
243
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
244
                                             vfs_transport_factory="vfs")
 
245
        # no_vfs generate scenarios without vfs_transport_factory
 
246
        expected = [
 
247
            ('RemoteRepositoryFormat(c)',
 
248
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
249
              'repository_format': remote.RemoteRepositoryFormat(),
 
250
              'transport_readonly_server': 'readonly',
 
251
              'transport_server': 'server'}),
 
252
            ('RepositoryFormat2a(d)',
 
253
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
254
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
255
              'transport_readonly_server': 'readonly',
 
256
              'transport_server': 'server'})]
 
257
        self.assertEqual(expected, no_vfs_scenarios)
 
258
        self.assertEqual([
 
259
            ('RemoteRepositoryFormat(c)',
 
260
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
261
              'repository_format': remote.RemoteRepositoryFormat(),
 
262
              'transport_readonly_server': 'readonly',
 
263
              'transport_server': 'server',
 
264
              'vfs_transport_factory': 'vfs'}),
 
265
            ('RepositoryFormat2a(d)',
 
266
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
267
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
268
              'transport_readonly_server': 'readonly',
 
269
              'transport_server': 'server',
 
270
              'vfs_transport_factory': 'vfs'})],
 
271
            vfs_scenarios)
 
272
 
 
273
 
 
274
class TestTestScenarioApplication(tests.TestCase):
 
275
    """Tests for the test adaption facilities."""
 
276
 
 
277
    def test_apply_scenario(self):
 
278
        from breezy.tests import apply_scenario
 
279
        input_test = TestTestScenarioApplication("test_apply_scenario")
 
280
        # setup two adapted tests
 
281
        adapted_test1 = apply_scenario(input_test,
 
282
                                       ("new id",
 
283
                                        {"bzrdir_format": "bzr_format",
 
284
                                         "repository_format": "repo_fmt",
 
285
                                         "transport_server": "transport_server",
 
286
                                         "transport_readonly_server": "readonly-server"}))
 
287
        adapted_test2 = apply_scenario(input_test,
 
288
                                       ("new id 2", {"bzrdir_format": None}))
 
289
        # input_test should have been altered.
 
290
        self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
 
291
        # the new tests are mutually incompatible, ensuring it has
 
292
        # made new ones, and unspecified elements in the scenario
 
293
        # should not have been altered.
 
294
        self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
 
295
        self.assertEqual("repo_fmt", adapted_test1.repository_format)
 
296
        self.assertEqual("transport_server", adapted_test1.transport_server)
 
297
        self.assertEqual("readonly-server",
 
298
                         adapted_test1.transport_readonly_server)
 
299
        self.assertEqual(
 
300
            "breezy.tests.test_selftest.TestTestScenarioApplication."
 
301
            "test_apply_scenario(new id)",
 
302
            adapted_test1.id())
 
303
        self.assertEqual(None, adapted_test2.bzrdir_format)
 
304
        self.assertEqual(
 
305
            "breezy.tests.test_selftest.TestTestScenarioApplication."
 
306
            "test_apply_scenario(new id 2)",
 
307
            adapted_test2.id())
 
308
 
 
309
 
 
310
class TestInterRepositoryScenarios(tests.TestCase):
 
311
 
 
312
    def test_scenarios(self):
 
313
        # check that constructor parameters are passed through to the adapted
 
314
        # test.
 
315
        from .per_interrepository import make_scenarios
 
316
        server1 = "a"
 
317
        server2 = "b"
 
318
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
319
        scenarios = make_scenarios(server1, server2, formats)
 
320
        self.assertEqual([
 
321
            ('C0,str,str',
 
322
             {'repository_format': 'C1',
 
323
              'repository_format_to': 'C2',
 
324
              'transport_readonly_server': 'b',
 
325
              'transport_server': 'a',
 
326
              'extra_setup': 'C3'}),
 
327
            ('D0,str,str',
 
328
             {'repository_format': 'D1',
 
329
              'repository_format_to': 'D2',
 
330
              'transport_readonly_server': 'b',
 
331
              'transport_server': 'a',
 
332
              'extra_setup': 'D3'})],
 
333
            scenarios)
 
334
 
 
335
 
 
336
class TestWorkingTreeScenarios(tests.TestCase):
 
337
 
 
338
    def test_scenarios(self):
 
339
        # check that constructor parameters are passed through to the adapted
 
340
        # test.
 
341
        from .per_workingtree import make_scenarios
 
342
        server1 = "a"
 
343
        server2 = "b"
 
344
        formats = [workingtree_4.WorkingTreeFormat4(),
 
345
                   workingtree_3.WorkingTreeFormat3(),
 
346
                   workingtree_4.WorkingTreeFormat6()]
 
347
        scenarios = make_scenarios(server1, server2, formats,
 
348
                                   remote_server='c', remote_readonly_server='d',
 
349
                                   remote_backing_server='e')
 
350
        self.assertEqual([
 
351
            ('WorkingTreeFormat4',
 
352
             {'bzrdir_format': formats[0]._matchingcontroldir,
 
353
              'transport_readonly_server': 'b',
 
354
              'transport_server': 'a',
 
355
              'workingtree_format': formats[0]}),
 
356
            ('WorkingTreeFormat3',
 
357
             {'bzrdir_format': formats[1]._matchingcontroldir,
 
358
              'transport_readonly_server': 'b',
 
359
              'transport_server': 'a',
 
360
              'workingtree_format': formats[1]}),
 
361
            ('WorkingTreeFormat6',
 
362
             {'bzrdir_format': formats[2]._matchingcontroldir,
 
363
              'transport_readonly_server': 'b',
 
364
              'transport_server': 'a',
 
365
              'workingtree_format': formats[2]}),
 
366
            ('WorkingTreeFormat6,remote',
 
367
             {'bzrdir_format': formats[2]._matchingcontroldir,
 
368
              'repo_is_remote': True,
 
369
              'transport_readonly_server': 'd',
 
370
              'transport_server': 'c',
 
371
              'vfs_transport_factory': 'e',
 
372
              'workingtree_format': formats[2]}),
 
373
            ], scenarios)
 
374
 
 
375
 
 
376
class TestTreeScenarios(tests.TestCase):
 
377
 
 
378
    def test_scenarios(self):
 
379
        # the tree implementation scenario generator is meant to setup one
 
380
        # instance for each working tree format, one additional instance
 
381
        # that will use the default wt format, but create a revision tree for
 
382
        # the tests, and one more that uses the default wt format as a
 
383
        # lightweight checkout of a remote repository.  This means that the wt
 
384
        # ones should have the workingtree_to_test_tree attribute set to
 
385
        # 'return_parameter' and the revision one set to
 
386
        # revision_tree_from_workingtree.
 
387
 
 
388
        from .per_tree import (
 
389
            _dirstate_tree_from_workingtree,
 
390
            make_scenarios,
 
391
            preview_tree_pre,
 
392
            preview_tree_post,
 
393
            return_parameter,
 
394
            revision_tree_from_workingtree
 
395
            )
 
396
        server1 = "a"
 
397
        server2 = "b"
 
398
        smart_server = test_server.SmartTCPServer_for_testing
 
399
        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
 
400
        mem_server = memory.MemoryServer
 
401
        formats = [workingtree_4.WorkingTreeFormat4(),
 
402
                   workingtree_3.WorkingTreeFormat3(), ]
 
403
        scenarios = make_scenarios(server1, server2, formats)
 
404
        self.assertEqual(9, len(scenarios))
 
405
        default_wt_format = workingtree.format_registry.get_default()
 
406
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
407
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
408
        wt6_format = workingtree_4.WorkingTreeFormat6()
 
409
        git_wt_format = git_workingtree.GitWorkingTreeFormat()
 
410
        expected_scenarios = [
 
411
            ('WorkingTreeFormat4',
 
412
             {'bzrdir_format': formats[0]._matchingcontroldir,
 
413
              'transport_readonly_server': 'b',
 
414
              'transport_server': 'a',
 
415
              'workingtree_format': formats[0],
 
416
              '_workingtree_to_test_tree': return_parameter,
 
417
              }),
 
418
            ('WorkingTreeFormat3',
 
419
             {'bzrdir_format': formats[1]._matchingcontroldir,
 
420
              'transport_readonly_server': 'b',
 
421
              'transport_server': 'a',
 
422
              'workingtree_format': formats[1],
 
423
              '_workingtree_to_test_tree': return_parameter,
 
424
              }),
 
425
            ('WorkingTreeFormat6,remote',
 
426
             {'bzrdir_format': wt6_format._matchingcontroldir,
 
427
              'repo_is_remote': True,
 
428
              'transport_readonly_server': smart_readonly_server,
 
429
              'transport_server': smart_server,
 
430
              'vfs_transport_factory': mem_server,
 
431
              'workingtree_format': wt6_format,
 
432
              '_workingtree_to_test_tree': return_parameter,
 
433
              }),
 
434
            ('RevisionTree',
 
435
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
 
436
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
437
              'transport_readonly_server': 'b',
 
438
              'transport_server': 'a',
 
439
              'workingtree_format': default_wt_format,
 
440
              }),
 
441
            ('GitRevisionTree',
 
442
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
 
443
              'bzrdir_format': git_wt_format._matchingcontroldir,
 
444
              'transport_readonly_server': 'b',
 
445
              'transport_server': 'a',
 
446
              'workingtree_format': git_wt_format,
 
447
              }
 
448
             ),
 
449
            ('DirStateRevisionTree,WT4',
 
450
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
451
              'bzrdir_format': wt4_format._matchingcontroldir,
 
452
              'transport_readonly_server': 'b',
 
453
              'transport_server': 'a',
 
454
              'workingtree_format': wt4_format,
 
455
              }),
 
456
            ('DirStateRevisionTree,WT5',
 
457
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
458
              'bzrdir_format': wt5_format._matchingcontroldir,
 
459
              'transport_readonly_server': 'b',
 
460
              'transport_server': 'a',
 
461
              'workingtree_format': wt5_format,
 
462
              }),
 
463
            ('PreviewTree',
 
464
             {'_workingtree_to_test_tree': preview_tree_pre,
 
465
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
466
              'transport_readonly_server': 'b',
 
467
              'transport_server': 'a',
 
468
              'workingtree_format': default_wt_format}),
 
469
            ('PreviewTreePost',
 
470
             {'_workingtree_to_test_tree': preview_tree_post,
 
471
              'bzrdir_format': default_wt_format._matchingcontroldir,
 
472
              'transport_readonly_server': 'b',
 
473
              'transport_server': 'a',
 
474
              'workingtree_format': default_wt_format}),
 
475
            ]
 
476
        self.assertEqual(expected_scenarios, scenarios)
 
477
 
 
478
 
 
479
class TestInterTreeScenarios(tests.TestCase):
 
480
    """A group of tests that test the InterTreeTestAdapter."""
 
481
 
 
482
    def test_scenarios(self):
 
483
        # check that constructor parameters are passed through to the adapted
 
484
        # test.
 
485
        # for InterTree tests we want the machinery to bring up two trees in
 
486
        # each instance: the base one, and the one we are interacting with.
 
487
        # because each optimiser can be direction specific, we need to test
 
488
        # each optimiser in its chosen direction.
 
489
        # unlike the TestProviderAdapter we dont want to automatically add a
 
490
        # parameterized one for WorkingTree - the optimisers will tell us what
 
491
        # ones to add.
 
492
        from .per_tree import (
 
493
            return_parameter,
 
494
            )
 
495
        from .per_intertree import (
 
496
            make_scenarios,
 
497
            )
 
498
        from ..bzr.workingtree_3 import WorkingTreeFormat3
 
499
        from ..bzr.workingtree_4 import WorkingTreeFormat4
 
500
        input_test = TestInterTreeScenarios(
 
501
            "test_scenarios")
 
502
        server1 = "a"
 
503
        server2 = "b"
 
504
        format1 = WorkingTreeFormat4()
 
505
        format2 = WorkingTreeFormat3()
 
506
        formats = [("1", str, format1, format2, "converter1"),
 
507
                   ("2", int, format2, format1, "converter2")]
 
508
        scenarios = make_scenarios(server1, server2, formats)
 
509
        self.assertEqual(2, len(scenarios))
 
510
        expected_scenarios = [
 
511
            ("1", {
 
512
                "bzrdir_format": format1._matchingcontroldir,
 
513
                "intertree_class": formats[0][1],
 
514
                "workingtree_format": formats[0][2],
 
515
                "workingtree_format_to": formats[0][3],
 
516
                "mutable_trees_to_test_trees": formats[0][4],
 
517
                "_workingtree_to_test_tree": return_parameter,
 
518
                "transport_server": server1,
 
519
                "transport_readonly_server": server2,
 
520
                }),
 
521
            ("2", {
 
522
                "bzrdir_format": format2._matchingcontroldir,
 
523
                "intertree_class": formats[1][1],
 
524
                "workingtree_format": formats[1][2],
 
525
                "workingtree_format_to": formats[1][3],
 
526
                "mutable_trees_to_test_trees": formats[1][4],
 
527
                "_workingtree_to_test_tree": return_parameter,
 
528
                "transport_server": server1,
 
529
                "transport_readonly_server": server2,
 
530
                }),
 
531
            ]
 
532
        self.assertEqual(scenarios, expected_scenarios)
 
533
 
 
534
 
 
535
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
 
536
 
 
537
    def test_home_is_not_working(self):
 
538
        self.assertNotEqual(self.test_dir, self.test_home_dir)
 
539
        cwd = osutils.getcwd()
 
540
        self.assertIsSameRealPath(self.test_dir, cwd)
 
541
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
542
 
 
543
    def test_assertEqualStat_equal(self):
 
544
        from ..bzr.tests.test_dirstate import _FakeStat
 
545
        self.build_tree(["foo"])
 
546
        real = os.lstat("foo")
 
547
        fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
 
548
                         real.st_dev, real.st_ino, real.st_mode)
 
549
        self.assertEqualStat(real, fake)
 
550
 
 
551
    def test_assertEqualStat_notequal(self):
 
552
        self.build_tree(["foo", "longname"])
 
553
        self.assertRaises(AssertionError, self.assertEqualStat,
 
554
                          os.lstat("foo"), os.lstat("longname"))
 
555
 
 
556
    def test_assertPathExists(self):
 
557
        self.assertPathExists('.')
 
558
        self.build_tree(['foo/', 'foo/bar'])
 
559
        self.assertPathExists('foo/bar')
 
560
        self.assertPathDoesNotExist('foo/foo')
 
561
 
 
562
 
 
563
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
 
564
 
 
565
    def test_home_is_non_existant_dir_under_root(self):
 
566
        """The test_home_dir for TestCaseWithMemoryTransport is missing.
 
567
 
 
568
        This is because TestCaseWithMemoryTransport is for tests that do not
 
569
        need any disk resources: they should be hooked into breezy in such a
 
570
        way that no global settings are being changed by the test (only a
 
571
        few tests should need to do that), and having a missing dir as home is
 
572
        an effective way to ensure that this is the case.
 
573
        """
 
574
        self.assertIsSameRealPath(
 
575
            self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
 
576
            self.test_home_dir)
 
577
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
578
 
 
579
    def test_cwd_is_TEST_ROOT(self):
 
580
        self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
 
581
        cwd = osutils.getcwd()
 
582
        self.assertIsSameRealPath(self.test_dir, cwd)
 
583
 
 
584
    def test_BRZ_HOME_and_HOME_are_bytestrings(self):
 
585
        """The $BRZ_HOME and $HOME environment variables should not be unicode.
 
586
 
 
587
        See https://bugs.launchpad.net/bzr/+bug/464174
 
588
        """
 
589
        self.assertIsInstance(os.environ['BRZ_HOME'], str)
 
590
        self.assertIsInstance(os.environ['HOME'], str)
 
591
 
 
592
    def test_make_branch_and_memory_tree(self):
 
593
        """In TestCaseWithMemoryTransport we should not make the branch on disk.
 
594
 
 
595
        This is hard to comprehensively robustly test, so we settle for making
 
596
        a branch and checking no directory was created at its relpath.
 
597
        """
 
598
        tree = self.make_branch_and_memory_tree('dir')
 
599
        # Guard against regression into MemoryTransport leaking
 
600
        # files to disk instead of keeping them in memory.
 
601
        self.assertFalse(osutils.lexists('dir'))
 
602
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
603
 
 
604
    def test_make_branch_and_memory_tree_with_format(self):
 
605
        """make_branch_and_memory_tree should accept a format option."""
 
606
        format = bzrdir.BzrDirMetaFormat1()
 
607
        format.repository_format = repository.format_registry.get_default()
 
608
        tree = self.make_branch_and_memory_tree('dir', format=format)
 
609
        # Guard against regression into MemoryTransport leaking
 
610
        # files to disk instead of keeping them in memory.
 
611
        self.assertFalse(osutils.lexists('dir'))
 
612
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
613
        self.assertEqual(format.repository_format.__class__,
 
614
                         tree.branch.repository._format.__class__)
 
615
 
 
616
    def test_make_branch_builder(self):
 
617
        builder = self.make_branch_builder('dir')
 
618
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
 
619
        # Guard against regression into MemoryTransport leaking
 
620
        # files to disk instead of keeping them in memory.
 
621
        self.assertFalse(osutils.lexists('dir'))
 
622
 
 
623
    def test_make_branch_builder_with_format(self):
 
624
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
 
625
        # that the format objects are used.
 
626
        format = bzrdir.BzrDirMetaFormat1()
 
627
        repo_format = repository.format_registry.get_default()
 
628
        format.repository_format = repo_format
 
629
        builder = self.make_branch_builder('dir', format=format)
 
630
        the_branch = builder.get_branch()
 
631
        # Guard against regression into MemoryTransport leaking
 
632
        # files to disk instead of keeping them in memory.
 
633
        self.assertFalse(osutils.lexists('dir'))
 
634
        self.assertEqual(format.repository_format.__class__,
 
635
                         the_branch.repository._format.__class__)
 
636
        self.assertEqual(repo_format.get_format_string(),
 
637
                         self.get_transport().get_bytes(
 
638
            'dir/.bzr/repository/format'))
 
639
 
 
640
    def test_make_branch_builder_with_format_name(self):
 
641
        builder = self.make_branch_builder('dir', format='knit')
 
642
        the_branch = builder.get_branch()
 
643
        # Guard against regression into MemoryTransport leaking
 
644
        # files to disk instead of keeping them in memory.
 
645
        self.assertFalse(osutils.lexists('dir'))
 
646
        dir_format = controldir.format_registry.make_controldir('knit')
 
647
        self.assertEqual(dir_format.repository_format.__class__,
 
648
                         the_branch.repository._format.__class__)
 
649
        self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
 
650
                         self.get_transport().get_bytes(
 
651
                             'dir/.bzr/repository/format'))
 
652
 
 
653
    def test_dangling_locks_cause_failures(self):
 
654
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
 
655
            def test_function(self):
 
656
                t = self.get_transport_from_path('.')
 
657
                l = lockdir.LockDir(t, 'lock')
 
658
                l.create()
 
659
                l.attempt_lock()
 
660
        test = TestDanglingLock('test_function')
 
661
        result = test.run()
 
662
        total_failures = result.errors + result.failures
 
663
        if self._lock_check_thorough:
 
664
            self.assertEqual(1, len(total_failures))
 
665
        else:
 
666
            # When _lock_check_thorough is disabled, then we don't trigger a
 
667
            # failure
 
668
            self.assertEqual(0, len(total_failures))
 
669
 
 
670
 
 
671
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
 
672
    """Tests for the convenience functions TestCaseWithTransport introduces."""
 
673
 
 
674
    def test_get_readonly_url_none(self):
 
675
        from ..transport.readonly import ReadonlyTransportDecorator
 
676
        self.vfs_transport_factory = memory.MemoryServer
 
677
        self.transport_readonly_server = None
 
678
        # calling get_readonly_transport() constructs a decorator on the url
 
679
        # for the server
 
680
        url = self.get_readonly_url()
 
681
        url2 = self.get_readonly_url('foo/bar')
 
682
        t = transport.get_transport_from_url(url)
 
683
        t2 = transport.get_transport_from_url(url2)
 
684
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
685
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
686
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
687
 
 
688
    def test_get_readonly_url_http(self):
 
689
        from .http_server import HttpServer
 
690
        from ..transport.http import HttpTransport
 
691
        self.transport_server = test_server.LocalURLServer
 
692
        self.transport_readonly_server = HttpServer
 
693
        # calling get_readonly_transport() gives us a HTTP server instance.
 
694
        url = self.get_readonly_url()
 
695
        url2 = self.get_readonly_url('foo/bar')
 
696
        # the transport returned may be any HttpTransportBase subclass
 
697
        t = transport.get_transport_from_url(url)
 
698
        t2 = transport.get_transport_from_url(url2)
 
699
        self.assertIsInstance(t, HttpTransport)
 
700
        self.assertIsInstance(t2, HttpTransport)
 
701
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
702
 
 
703
    def test_is_directory(self):
 
704
        """Test assertIsDirectory assertion"""
 
705
        t = self.get_transport()
 
706
        self.build_tree(['a_dir/', 'a_file'], transport=t)
 
707
        self.assertIsDirectory('a_dir', t)
 
708
        self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
 
709
        self.assertRaises(
 
710
            AssertionError, self.assertIsDirectory, 'not_here', t)
 
711
 
 
712
    def test_make_branch_builder(self):
 
713
        builder = self.make_branch_builder('dir')
 
714
        rev_id = builder.build_commit()
 
715
        self.assertPathExists('dir')
 
716
        a_dir = controldir.ControlDir.open('dir')
 
717
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
 
718
        a_branch = a_dir.open_branch()
 
719
        builder_branch = builder.get_branch()
 
720
        self.assertEqual(a_branch.base, builder_branch.base)
 
721
        self.assertEqual((1, rev_id), builder_branch.last_revision_info())
 
722
        self.assertEqual((1, rev_id), a_branch.last_revision_info())
 
723
 
 
724
 
 
725
class TestTestCaseTransports(tests.TestCaseWithTransport):
 
726
 
 
727
    def setUp(self):
 
728
        super(TestTestCaseTransports, self).setUp()
 
729
        self.vfs_transport_factory = memory.MemoryServer
 
730
 
 
731
    def test_make_controldir_preserves_transport(self):
 
732
        t = self.get_transport()
 
733
        result_bzrdir = self.make_controldir('subdir')
 
734
        self.assertIsInstance(result_bzrdir.transport,
 
735
                              memory.MemoryTransport)
 
736
        # should not be on disk, should only be in memory
 
737
        self.assertPathDoesNotExist('subdir')
 
738
 
 
739
 
 
740
class TestChrootedTest(tests.ChrootedTestCase):
 
741
 
 
742
    def test_root_is_root(self):
 
743
        t = transport.get_transport_from_url(self.get_readonly_url())
 
744
        url = t.base
 
745
        self.assertEqual(url, t.clone('..').base)
 
746
 
 
747
 
 
748
class TestProfileResult(tests.TestCase):
 
749
 
 
750
    def test_profiles_tests(self):
 
751
        self.requireFeature(features.lsprof_feature)
 
752
        terminal = testtools.testresult.doubles.ExtendedTestResult()
 
753
        result = tests.ProfileResult(terminal)
 
754
 
 
755
        class Sample(tests.TestCase):
 
756
            def a(self):
 
757
                self.sample_function()
 
758
 
 
759
            def sample_function(self):
 
760
                pass
 
761
        test = Sample("a")
 
762
        test.run(result)
 
763
        case = terminal._events[0][1]
 
764
        self.assertLength(1, case._benchcalls)
 
765
        # We must be able to unpack it as the test reporting code wants
 
766
        (_, _, _), stats = case._benchcalls[0]
 
767
        self.assertTrue(callable(stats.pprint))
 
768
 
 
769
 
 
770
class TestTestResult(tests.TestCase):
 
771
 
 
772
    def check_timing(self, test_case, expected_re):
 
773
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
774
        capture = testtools.testresult.doubles.ExtendedTestResult()
 
775
        test_case.run(MultiTestResult(result, capture))
 
776
        run_case = capture._events[0][1]
 
777
        timed_string = result._testTimeString(run_case)
 
778
        self.assertContainsRe(timed_string, expected_re)
 
779
 
 
780
    def test_test_reporting(self):
 
781
        class ShortDelayTestCase(tests.TestCase):
 
782
            def test_short_delay(self):
 
783
                time.sleep(0.003)
 
784
 
 
785
            def test_short_benchmark(self):
 
786
                self.time(time.sleep, 0.003)
 
787
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
788
                          r"^ +[0-9]+ms$")
 
789
        # if a benchmark time is given, we now show just that time followed by
 
790
        # a star
 
791
        self.check_timing(ShortDelayTestCase('test_short_benchmark'),
 
792
                          r"^ +[0-9]+ms\*$")
 
793
 
 
794
    def test_unittest_reporting_unittest_class(self):
 
795
        # getting the time from a non-breezy test works ok
 
796
        class ShortDelayTestCase(unittest.TestCase):
 
797
            def test_short_delay(self):
 
798
                time.sleep(0.003)
 
799
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
800
                          r"^ +[0-9]+ms$")
 
801
 
 
802
    def _time_hello_world_encoding(self):
 
803
        """Profile two sleep calls
 
804
 
 
805
        This is used to exercise the test framework.
 
806
        """
 
807
        self.time(str, b'hello', errors='replace')
 
808
        self.time(str, b'world', errors='replace')
 
809
 
 
810
    def test_lsprofiling(self):
 
811
        """Verbose test result prints lsprof statistics from test cases."""
 
812
        self.requireFeature(features.lsprof_feature)
 
813
        result_stream = StringIO()
 
814
        result = breezy.tests.VerboseTestResult(
 
815
            result_stream,
 
816
            descriptions=0,
 
817
            verbosity=2,
 
818
            )
 
819
        # we want profile a call of some sort and check it is output by
 
820
        # addSuccess. We dont care about addError or addFailure as they
 
821
        # are not that interesting for performance tuning.
 
822
        # make a new test instance that when run will generate a profile
 
823
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
824
        example_test_case._gather_lsprof_in_benchmarks = True
 
825
        # execute the test, which should succeed and record profiles
 
826
        example_test_case.run(result)
 
827
        # lsprofile_something()
 
828
        # if this worked we want
 
829
        # LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
 
830
        #    CallCount    Recursive    Total(ms)   Inline(ms) module:lineno(function)
 
831
        # (the lsprof header)
 
832
        # ... an arbitrary number of lines
 
833
        # and the function call which is time.sleep.
 
834
        #           1        0            ???         ???       ???(sleep)
 
835
        # and then repeated but with 'world', rather than 'hello'.
 
836
        # this should appear in the output stream of our test result.
 
837
        output = result_stream.getvalue()
 
838
        self.assertContainsRe(output,
 
839
                              r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
 
840
        self.assertContainsRe(output,
 
841
                              r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
 
842
        self.assertContainsRe(output,
 
843
                              r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
 
844
        self.assertContainsRe(output,
 
845
                              r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
 
846
 
 
847
    def test_uses_time_from_testtools(self):
 
848
        """Test case timings in verbose results should use testtools times"""
 
849
        import datetime
 
850
 
 
851
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
852
            def startTest(self, test):
 
853
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
854
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
855
 
 
856
            def addSuccess(self, test):
 
857
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
858
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
859
 
 
860
            def report_tests_starting(self): pass
 
861
        sio = StringIO()
 
862
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
863
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
864
 
 
865
    def test_known_failure(self):
 
866
        """Using knownFailure should trigger several result actions."""
 
867
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
868
            def stopTestRun(self): pass
 
869
 
 
870
            def report_tests_starting(self): pass
 
871
 
 
872
            def report_known_failure(self, test, err=None, details=None):
 
873
                self._call = test, 'known failure'
 
874
        result = InstrumentedTestResult(None, None, None, None)
 
875
 
 
876
        class Test(tests.TestCase):
 
877
            def test_function(self):
 
878
                self.knownFailure('failed!')
 
879
        test = Test("test_function")
 
880
        test.run(result)
 
881
        # it should invoke 'report_known_failure'.
 
882
        self.assertEqual(2, len(result._call))
 
883
        self.assertEqual(test.id(), result._call[0].id())
 
884
        self.assertEqual('known failure', result._call[1])
 
885
        # we dont introspec the traceback, if the rest is ok, it would be
 
886
        # exceptional for it not to be.
 
887
        # it should update the known_failure_count on the object.
 
888
        self.assertEqual(1, result.known_failure_count)
 
889
        # the result should be successful.
 
890
        self.assertTrue(result.wasSuccessful())
 
891
 
 
892
    def test_verbose_report_known_failure(self):
 
893
        # verbose test output formatting
 
894
        result_stream = StringIO()
 
895
        result = breezy.tests.VerboseTestResult(
 
896
            result_stream,
 
897
            descriptions=0,
 
898
            verbosity=2,
 
899
            )
 
900
        _get_test("test_xfail").run(result)
 
901
        self.assertContainsRe(result_stream.getvalue(),
 
902
                              "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
903
                              "\\s*(?:Text attachment: )?reason"
 
904
                              "(?:\n-+\n|: {{{)"
 
905
                              "this_fails"
 
906
                              "(?:\n-+\n|}}}\n)")
 
907
 
 
908
    def get_passing_test(self):
 
909
        """Return a test object that can't be run usefully."""
 
910
        def passing_test():
 
911
            pass
 
912
        return unittest.FunctionTestCase(passing_test)
 
913
 
 
914
    def test_add_not_supported(self):
 
915
        """Test the behaviour of invoking addNotSupported."""
 
916
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
917
            def stopTestRun(self): pass
 
918
 
 
919
            def report_tests_starting(self): pass
 
920
 
 
921
            def report_unsupported(self, test, feature):
 
922
                self._call = test, feature
 
923
        result = InstrumentedTestResult(None, None, None, None)
 
924
        test = SampleTestCase('_test_pass')
 
925
        feature = features.Feature()
 
926
        result.startTest(test)
 
927
        result.addNotSupported(test, feature)
 
928
        # it should invoke 'report_unsupported'.
 
929
        self.assertEqual(2, len(result._call))
 
930
        self.assertEqual(test, result._call[0])
 
931
        self.assertEqual(feature, result._call[1])
 
932
        # the result should be successful.
 
933
        self.assertTrue(result.wasSuccessful())
 
934
        # it should record the test against a count of tests not run due to
 
935
        # this feature.
 
936
        self.assertEqual(1, result.unsupported['Feature'])
 
937
        # and invoking it again should increment that counter
 
938
        result.addNotSupported(test, feature)
 
939
        self.assertEqual(2, result.unsupported['Feature'])
 
940
 
 
941
    def test_verbose_report_unsupported(self):
 
942
        # verbose test output formatting
 
943
        result_stream = StringIO()
 
944
        result = breezy.tests.VerboseTestResult(
 
945
            result_stream,
 
946
            descriptions=0,
 
947
            verbosity=2,
 
948
            )
 
949
        test = self.get_passing_test()
 
950
        feature = features.Feature()
 
951
        result.startTest(test)
 
952
        prefix = len(result_stream.getvalue())
 
953
        result.report_unsupported(test, feature)
 
954
        output = result_stream.getvalue()[prefix:]
 
955
        lines = output.splitlines()
 
956
        # We don't check for the final '0ms' since it may fail on slow hosts
 
957
        self.assertStartsWith(lines[0], 'NODEP')
 
958
        self.assertEqual(lines[1],
 
959
                         "    The feature 'Feature' is not available.")
 
960
 
 
961
    def test_unavailable_exception(self):
 
962
        """An UnavailableFeature being raised should invoke addNotSupported."""
 
963
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
964
            def stopTestRun(self):
 
965
                pass
 
966
 
 
967
            def report_tests_starting(self):
 
968
                pass
 
969
 
 
970
            def addNotSupported(self, test, feature):
 
971
                self._call = test, feature
 
972
        result = InstrumentedTestResult(None, None, None, None)
 
973
        feature = features.Feature()
 
974
 
 
975
        class Test(tests.TestCase):
 
976
            def test_function(self):
 
977
                raise tests.UnavailableFeature(feature)
 
978
        test = Test("test_function")
 
979
        test.run(result)
 
980
        # it should invoke 'addNotSupported'.
 
981
        self.assertEqual(2, len(result._call))
 
982
        self.assertEqual(test.id(), result._call[0].id())
 
983
        self.assertEqual(feature, result._call[1])
 
984
        # and not count as an error
 
985
        self.assertEqual(0, result.error_count)
 
986
 
 
987
    def test_strict_with_unsupported_feature(self):
 
988
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
989
        test = self.get_passing_test()
 
990
        feature = "Unsupported Feature"
 
991
        result.addNotSupported(test, feature)
 
992
        self.assertFalse(result.wasStrictlySuccessful())
 
993
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
994
 
 
995
    def test_strict_with_known_failure(self):
 
996
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
997
        test = _get_test("test_xfail")
 
998
        test.run(result)
 
999
        self.assertFalse(result.wasStrictlySuccessful())
 
1000
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
1001
 
 
1002
    def test_strict_with_success(self):
 
1003
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
1004
        test = self.get_passing_test()
 
1005
        result.addSuccess(test)
 
1006
        self.assertTrue(result.wasStrictlySuccessful())
 
1007
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
1008
 
 
1009
    def test_startTests(self):
 
1010
        """Starting the first test should trigger startTests."""
 
1011
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1012
            calls = 0
 
1013
 
 
1014
            def startTests(self):
 
1015
                self.calls += 1
 
1016
        result = InstrumentedTestResult(None, None, None, None)
 
1017
 
 
1018
        def test_function():
 
1019
            pass
 
1020
        test = unittest.FunctionTestCase(test_function)
 
1021
        test.run(result)
 
1022
        self.assertEqual(1, result.calls)
 
1023
 
 
1024
    def test_startTests_only_once(self):
 
1025
        """With multiple tests startTests should still only be called once"""
 
1026
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1027
            calls = 0
 
1028
 
 
1029
            def startTests(self):
 
1030
                self.calls += 1
 
1031
        result = InstrumentedTestResult(None, None, None, None)
 
1032
        suite = unittest.TestSuite([
 
1033
            unittest.FunctionTestCase(lambda: None),
 
1034
            unittest.FunctionTestCase(lambda: None)])
 
1035
        suite.run(result)
 
1036
        self.assertEqual(1, result.calls)
 
1037
        self.assertEqual(2, result.count)
 
1038
 
 
1039
 
 
1040
class TestRunner(tests.TestCase):
 
1041
 
 
1042
    def dummy_test(self):
 
1043
        pass
 
1044
 
 
1045
    def run_test_runner(self, testrunner, test):
 
1046
        """Run suite in testrunner, saving global state and restoring it.
 
1047
 
 
1048
        This current saves and restores:
 
1049
        TestCaseInTempDir.TEST_ROOT
 
1050
 
 
1051
        There should be no tests in this file that use
 
1052
        breezy.tests.TextTestRunner without using this convenience method,
 
1053
        because of our use of global state.
 
1054
        """
 
1055
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1056
        try:
 
1057
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1058
            return testrunner.run(test)
 
1059
        finally:
 
1060
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1061
 
 
1062
    def test_known_failure_failed_run(self):
 
1063
        # run a test that generates a known failure which should be printed in
 
1064
        # the final output when real failures occur.
 
1065
        class Test(tests.TestCase):
 
1066
            def known_failure_test(self):
 
1067
                self.expectFailure('failed', self.assertTrue, False)
 
1068
        test = unittest.TestSuite()
 
1069
        test.addTest(Test("known_failure_test"))
 
1070
 
 
1071
        def failing_test():
 
1072
            raise AssertionError('foo')
 
1073
        test.addTest(unittest.FunctionTestCase(failing_test))
 
1074
        stream = StringIO()
 
1075
        runner = tests.TextTestRunner(stream=stream)
 
1076
        self.run_test_runner(runner, test)
 
1077
        self.assertContainsRe(
 
1078
            stream.getvalue(),
 
1079
            '(?sm)^brz selftest.*$'
 
1080
            '.*'
 
1081
            '^======================================================================\n'
 
1082
            '^FAIL: failing_test\n'
 
1083
            '^----------------------------------------------------------------------\n'
 
1084
            'Traceback \\(most recent call last\\):\n'
 
1085
            '  .*'  # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1086
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1087
            '.*'
 
1088
            '^----------------------------------------------------------------------\n'
 
1089
            '.*'
 
1090
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1091
            )
 
1092
 
 
1093
    def test_known_failure_ok_run(self):
 
1094
        # run a test that generates a known failure which should be printed in
 
1095
        # the final output.
 
1096
        class Test(tests.TestCase):
 
1097
            def known_failure_test(self):
 
1098
                self.knownFailure("Never works...")
 
1099
        test = Test("known_failure_test")
 
1100
        stream = StringIO()
 
1101
        runner = tests.TextTestRunner(stream=stream)
 
1102
        self.run_test_runner(runner, test)
 
1103
        self.assertContainsRe(stream.getvalue(),
 
1104
                              '\n'
 
1105
                              '-*\n'
 
1106
                              'Ran 1 test in .*\n'
 
1107
                              '\n'
 
1108
                              'OK \\(known_failures=1\\)\n')
 
1109
 
 
1110
    def test_unexpected_success_bad(self):
 
1111
        class Test(tests.TestCase):
 
1112
            def test_truth(self):
 
1113
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1114
        runner = tests.TextTestRunner(stream=StringIO())
 
1115
        self.run_test_runner(runner, Test("test_truth"))
 
1116
        self.assertContainsRe(runner.stream.getvalue(),
 
1117
                              "=+\n"
 
1118
                              "FAIL: \\S+\\.test_truth\n"
 
1119
                              "-+\n"
 
1120
                              "(?:.*\n)*"
 
1121
                              "\\s*(?:Text attachment: )?reason"
 
1122
                              "(?:\n-+\n|: {{{)"
 
1123
                              "No absolute truth"
 
1124
                              "(?:\n-+\n|}}}\n)"
 
1125
                              "(?:.*\n)*"
 
1126
                              "-+\n"
 
1127
                              "Ran 1 test in .*\n"
 
1128
                              "\n"
 
1129
                              "FAILED \\(failures=1\\)\n\\Z")
 
1130
 
 
1131
    def test_result_decorator(self):
 
1132
        # decorate results
 
1133
        calls = []
 
1134
 
 
1135
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1136
            def startTest(self, test):
 
1137
                ExtendedToOriginalDecorator.startTest(self, test)
 
1138
                calls.append('start')
 
1139
        test = unittest.FunctionTestCase(lambda: None)
 
1140
        stream = StringIO()
 
1141
        runner = tests.TextTestRunner(stream=stream,
 
1142
                                      result_decorators=[LoggingDecorator])
 
1143
        self.run_test_runner(runner, test)
 
1144
        self.assertLength(1, calls)
 
1145
 
 
1146
    def test_skipped_test(self):
 
1147
        # run a test that is skipped, and check the suite as a whole still
 
1148
        # succeeds.
 
1149
        # skipping_test must be hidden in here so it's not run as a real test
 
1150
        class SkippingTest(tests.TestCase):
 
1151
            def skipping_test(self):
 
1152
                raise tests.TestSkipped('test intentionally skipped')
 
1153
        runner = tests.TextTestRunner(stream=StringIO())
 
1154
        test = SkippingTest("skipping_test")
 
1155
        result = self.run_test_runner(runner, test)
 
1156
        self.assertTrue(result.wasSuccessful())
 
1157
 
 
1158
    def test_skipped_from_setup(self):
 
1159
        calls = []
 
1160
 
 
1161
        class SkippedSetupTest(tests.TestCase):
 
1162
 
 
1163
            def setUp(self):
 
1164
                calls.append('setUp')
 
1165
                self.addCleanup(self.cleanup)
 
1166
                raise tests.TestSkipped('skipped setup')
 
1167
 
 
1168
            def test_skip(self):
 
1169
                self.fail('test reached')
 
1170
 
 
1171
            def cleanup(self):
 
1172
                calls.append('cleanup')
 
1173
 
 
1174
        runner = tests.TextTestRunner(stream=StringIO())
 
1175
        test = SkippedSetupTest('test_skip')
 
1176
        result = self.run_test_runner(runner, test)
 
1177
        self.assertTrue(result.wasSuccessful())
 
1178
        # Check if cleanup was called the right number of times.
 
1179
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1180
 
 
1181
    def test_skipped_from_test(self):
 
1182
        calls = []
 
1183
 
 
1184
        class SkippedTest(tests.TestCase):
 
1185
 
 
1186
            def setUp(self):
 
1187
                super(SkippedTest, self).setUp()
 
1188
                calls.append('setUp')
 
1189
                self.addCleanup(self.cleanup)
 
1190
 
 
1191
            def test_skip(self):
 
1192
                raise tests.TestSkipped('skipped test')
 
1193
 
 
1194
            def cleanup(self):
 
1195
                calls.append('cleanup')
 
1196
 
 
1197
        runner = tests.TextTestRunner(stream=StringIO())
 
1198
        test = SkippedTest('test_skip')
 
1199
        result = self.run_test_runner(runner, test)
 
1200
        self.assertTrue(result.wasSuccessful())
 
1201
        # Check if cleanup was called the right number of times.
 
1202
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1203
 
 
1204
    def test_not_applicable(self):
 
1205
        # run a test that is skipped because it's not applicable
 
1206
        class Test(tests.TestCase):
 
1207
            def not_applicable_test(self):
 
1208
                raise tests.TestNotApplicable('this test never runs')
 
1209
        out = StringIO()
 
1210
        runner = tests.TextTestRunner(stream=out, verbosity=2)
 
1211
        test = Test("not_applicable_test")
 
1212
        result = self.run_test_runner(runner, test)
 
1213
        self.log(out.getvalue())
 
1214
        self.assertTrue(result.wasSuccessful())
 
1215
        self.assertTrue(result.wasStrictlySuccessful())
 
1216
        self.assertContainsRe(out.getvalue(),
 
1217
                              r'(?m)not_applicable_test  * N/A')
 
1218
        self.assertContainsRe(out.getvalue(),
 
1219
                              r'(?m)^    this test never runs')
 
1220
 
 
1221
    def test_unsupported_features_listed(self):
 
1222
        """When unsupported features are encountered they are detailed."""
 
1223
        class Feature1(features.Feature):
 
1224
            def _probe(self):
 
1225
                return False
 
1226
 
 
1227
        class Feature2(features.Feature):
 
1228
            def _probe(self):
 
1229
                return False
 
1230
        # create sample tests
 
1231
        test1 = SampleTestCase('_test_pass')
 
1232
        test1._test_needs_features = [Feature1()]
 
1233
        test2 = SampleTestCase('_test_pass')
 
1234
        test2._test_needs_features = [Feature2()]
 
1235
        test = unittest.TestSuite()
 
1236
        test.addTest(test1)
 
1237
        test.addTest(test2)
 
1238
        stream = StringIO()
 
1239
        runner = tests.TextTestRunner(stream=stream)
 
1240
        self.run_test_runner(runner, test)
 
1241
        lines = stream.getvalue().splitlines()
 
1242
        self.assertEqual([
 
1243
            'OK',
 
1244
            "Missing feature 'Feature1' skipped 1 tests.",
 
1245
            "Missing feature 'Feature2' skipped 1 tests.",
 
1246
            ],
 
1247
            lines[-3:])
 
1248
 
 
1249
    def test_verbose_test_count(self):
 
1250
        """A verbose test run reports the right test count at the start"""
 
1251
        suite = TestUtil.TestSuite([
 
1252
            unittest.FunctionTestCase(lambda:None),
 
1253
            unittest.FunctionTestCase(lambda:None)])
 
1254
        self.assertEqual(suite.countTestCases(), 2)
 
1255
        stream = StringIO()
 
1256
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1257
        # Need to use the CountingDecorator as that's what sets num_tests
 
1258
        self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1259
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1260
 
 
1261
    def test_startTestRun(self):
 
1262
        """run should call result.startTestRun()"""
 
1263
        calls = []
 
1264
 
 
1265
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1266
            def startTestRun(self):
 
1267
                ExtendedToOriginalDecorator.startTestRun(self)
 
1268
                calls.append('startTestRun')
 
1269
        test = unittest.FunctionTestCase(lambda: None)
 
1270
        stream = StringIO()
 
1271
        runner = tests.TextTestRunner(stream=stream,
 
1272
                                      result_decorators=[LoggingDecorator])
 
1273
        self.run_test_runner(runner, test)
 
1274
        self.assertLength(1, calls)
 
1275
 
 
1276
    def test_stopTestRun(self):
 
1277
        """run should call result.stopTestRun()"""
 
1278
        calls = []
 
1279
 
 
1280
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1281
            def stopTestRun(self):
 
1282
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1283
                calls.append('stopTestRun')
 
1284
        test = unittest.FunctionTestCase(lambda: None)
 
1285
        stream = StringIO()
 
1286
        runner = tests.TextTestRunner(stream=stream,
 
1287
                                      result_decorators=[LoggingDecorator])
 
1288
        self.run_test_runner(runner, test)
 
1289
        self.assertLength(1, calls)
 
1290
 
 
1291
    def test_unicode_test_output_on_ascii_stream(self):
 
1292
        """Showing results should always succeed even on an ascii console"""
 
1293
        class FailureWithUnicode(tests.TestCase):
 
1294
            def test_log_unicode(self):
 
1295
                self.log(u"\u2606")
 
1296
                self.fail("Now print that log!")
 
1297
        bio = BytesIO()
 
1298
        out = TextIOWrapper(bio, 'ascii', 'backslashreplace')
 
1299
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1300
                          lambda trace=False: "ascii")
 
1301
        self.run_test_runner(
 
1302
            tests.TextTestRunner(stream=out),
 
1303
            FailureWithUnicode("test_log_unicode"))
 
1304
        out.flush()
 
1305
        self.assertContainsRe(bio.getvalue(),
 
1306
                              b"(?:Text attachment: )?log"
 
1307
                              b"(?:\n-+\n|: {{{)"
 
1308
                              b"\\d+\\.\\d+  \\\\u2606"
 
1309
                              b"(?:\n-+\n|}}}\n)")
 
1310
 
 
1311
 
 
1312
class SampleTestCase(tests.TestCase):
 
1313
 
 
1314
    def _test_pass(self):
 
1315
        pass
 
1316
 
 
1317
 
 
1318
class _TestException(Exception):
 
1319
    pass
 
1320
 
 
1321
 
 
1322
class TestTestCase(tests.TestCase):
 
1323
    """Tests that test the core breezy TestCase."""
 
1324
 
 
1325
    def test_assertLength_matches_empty(self):
 
1326
        a_list = []
 
1327
        self.assertLength(0, a_list)
 
1328
 
 
1329
    def test_assertLength_matches_nonempty(self):
 
1330
        a_list = [1, 2, 3]
 
1331
        self.assertLength(3, a_list)
 
1332
 
 
1333
    def test_assertLength_fails_different(self):
 
1334
        a_list = []
 
1335
        self.assertRaises(AssertionError, self.assertLength, 1, a_list)
 
1336
 
 
1337
    def test_assertLength_shows_sequence_in_failure(self):
 
1338
        a_list = [1, 2, 3]
 
1339
        exception = self.assertRaises(AssertionError, self.assertLength, 2,
 
1340
                                      a_list)
 
1341
        self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
 
1342
                         exception.args[0])
 
1343
 
 
1344
    def test_base_setUp_not_called_causes_failure(self):
 
1345
        class TestCaseWithBrokenSetUp(tests.TestCase):
 
1346
            def setUp(self):
 
1347
                pass  # does not call TestCase.setUp
 
1348
 
 
1349
            def test_foo(self):
 
1350
                pass
 
1351
        test = TestCaseWithBrokenSetUp('test_foo')
 
1352
        result = unittest.TestResult()
 
1353
        test.run(result)
 
1354
        self.assertFalse(result.wasSuccessful())
 
1355
        self.assertEqual(1, result.testsRun)
 
1356
 
 
1357
    def test_base_tearDown_not_called_causes_failure(self):
 
1358
        class TestCaseWithBrokenTearDown(tests.TestCase):
 
1359
            def tearDown(self):
 
1360
                pass  # does not call TestCase.tearDown
 
1361
 
 
1362
            def test_foo(self):
 
1363
                pass
 
1364
        test = TestCaseWithBrokenTearDown('test_foo')
 
1365
        result = unittest.TestResult()
 
1366
        test.run(result)
 
1367
        self.assertFalse(result.wasSuccessful())
 
1368
        self.assertEqual(1, result.testsRun)
 
1369
 
 
1370
    def test_debug_flags_sanitised(self):
 
1371
        """The breezy debug flags should be sanitised by setUp."""
 
1372
        if 'allow_debug' in tests.selftest_debug_flags:
 
1373
            raise tests.TestNotApplicable(
 
1374
                '-Eallow_debug option prevents debug flag sanitisation')
 
1375
        # we could set something and run a test that will check
 
1376
        # it gets santised, but this is probably sufficient for now:
 
1377
        # if someone runs the test with -Dsomething it will error.
 
1378
        flags = set()
 
1379
        if self._lock_check_thorough:
 
1380
            flags.add('strict_locks')
 
1381
        self.assertEqual(flags, breezy.debug.debug_flags)
 
1382
 
 
1383
    def change_selftest_debug_flags(self, new_flags):
 
1384
        self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
 
1385
 
 
1386
    def test_allow_debug_flag(self):
 
1387
        """The -Eallow_debug flag prevents breezy.debug.debug_flags from being
 
1388
        sanitised (i.e. cleared) before running a test.
 
1389
        """
 
1390
        self.change_selftest_debug_flags({'allow_debug'})
 
1391
        breezy.debug.debug_flags = {'a-flag'}
 
1392
 
 
1393
        class TestThatRecordsFlags(tests.TestCase):
 
1394
            def test_foo(nested_self):
 
1395
                self.flags = set(breezy.debug.debug_flags)
 
1396
        test = TestThatRecordsFlags('test_foo')
 
1397
        test.run(self.make_test_result())
 
1398
        flags = {'a-flag'}
 
1399
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1400
            flags.add('strict_locks')
 
1401
        self.assertEqual(flags, self.flags)
 
1402
 
 
1403
    def test_disable_lock_checks(self):
 
1404
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1405
        class TestThatRecordsFlags(tests.TestCase):
 
1406
            def test_foo(nested_self):
 
1407
                self.flags = set(breezy.debug.debug_flags)
 
1408
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1409
        self.change_selftest_debug_flags(set())
 
1410
        test = TestThatRecordsFlags('test_foo')
 
1411
        test.run(self.make_test_result())
 
1412
        # By default we do strict lock checking and thorough lock/unlock
 
1413
        # tracking.
 
1414
        self.assertTrue(self.test_lock_check_thorough)
 
1415
        self.assertEqual({'strict_locks'}, self.flags)
 
1416
        # Now set the disable_lock_checks flag, and show that this changed.
 
1417
        self.change_selftest_debug_flags({'disable_lock_checks'})
 
1418
        test = TestThatRecordsFlags('test_foo')
 
1419
        test.run(self.make_test_result())
 
1420
        self.assertFalse(self.test_lock_check_thorough)
 
1421
        self.assertEqual(set(), self.flags)
 
1422
 
 
1423
    def test_this_fails_strict_lock_check(self):
 
1424
        class TestThatRecordsFlags(tests.TestCase):
 
1425
            def test_foo(nested_self):
 
1426
                self.flags1 = set(breezy.debug.debug_flags)
 
1427
                self.thisFailsStrictLockCheck()
 
1428
                self.flags2 = set(breezy.debug.debug_flags)
 
1429
        # Make sure lock checking is active
 
1430
        self.change_selftest_debug_flags(set())
 
1431
        test = TestThatRecordsFlags('test_foo')
 
1432
        test.run(self.make_test_result())
 
1433
        self.assertEqual({'strict_locks'}, self.flags1)
 
1434
        self.assertEqual(set(), self.flags2)
 
1435
 
 
1436
    def test_debug_flags_restored(self):
 
1437
        """The breezy debug flags should be restored to their original state
 
1438
        after the test was run, even if allow_debug is set.
 
1439
        """
 
1440
        self.change_selftest_debug_flags({'allow_debug'})
 
1441
        # Now run a test that modifies debug.debug_flags.
 
1442
        breezy.debug.debug_flags = {'original-state'}
 
1443
 
 
1444
        class TestThatModifiesFlags(tests.TestCase):
 
1445
            def test_foo(self):
 
1446
                breezy.debug.debug_flags = {'modified'}
 
1447
        test = TestThatModifiesFlags('test_foo')
 
1448
        test.run(self.make_test_result())
 
1449
        self.assertEqual({'original-state'}, breezy.debug.debug_flags)
 
1450
 
 
1451
    def make_test_result(self):
 
1452
        """Get a test result that writes to a StringIO."""
 
1453
        return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
1454
 
 
1455
    def inner_test(self):
 
1456
        # the inner child test
 
1457
        note("inner_test")
 
1458
 
 
1459
    def outer_child(self):
 
1460
        # the outer child test
 
1461
        note("outer_start")
 
1462
        self.inner_test = TestTestCase("inner_child")
 
1463
        result = self.make_test_result()
 
1464
        self.inner_test.run(result)
 
1465
        note("outer finish")
 
1466
        self.addCleanup(osutils.delete_any, self._log_file_name)
 
1467
 
 
1468
    def test_trace_nesting(self):
 
1469
        # this tests that each test case nests its trace facility correctly.
 
1470
        # we do this by running a test case manually. That test case (A)
 
1471
        # should setup a new log, log content to it, setup a child case (B),
 
1472
        # which should log independently, then case (A) should log a trailer
 
1473
        # and return.
 
1474
        # we do two nested children so that we can verify the state of the
 
1475
        # logs after the outer child finishes is correct, which a bad clean
 
1476
        # up routine in tearDown might trigger a fault in our test with only
 
1477
        # one child, we should instead see the bad result inside our test with
 
1478
        # the two children.
 
1479
        # the outer child test
 
1480
        original_trace = breezy.trace._trace_file
 
1481
        outer_test = TestTestCase("outer_child")
 
1482
        result = self.make_test_result()
 
1483
        outer_test.run(result)
 
1484
        self.assertEqual(original_trace, breezy.trace._trace_file)
 
1485
 
 
1486
    def method_that_times_a_bit_twice(self):
 
1487
        # call self.time twice to ensure it aggregates
 
1488
        self.time(time.sleep, 0.007)
 
1489
        self.time(time.sleep, 0.007)
 
1490
 
 
1491
    def test_time_creates_benchmark_in_result(self):
 
1492
        """The TestCase.time() method accumulates a benchmark time."""
 
1493
        sample_test = TestTestCase("method_that_times_a_bit_twice")
 
1494
        output_stream = StringIO()
 
1495
        result = breezy.tests.VerboseTestResult(
 
1496
            output_stream,
 
1497
            descriptions=0,
 
1498
            verbosity=2)
 
1499
        sample_test.run(result)
 
1500
        self.assertContainsRe(
 
1501
            output_stream.getvalue(),
 
1502
            r"\d+ms\*\n$")
 
1503
 
 
1504
    def test_hooks_sanitised(self):
 
1505
        """The breezy hooks should be sanitised by setUp."""
 
1506
        # Note this test won't fail with hooks that the core library doesn't
 
1507
        # use - but it trigger with a plugin that adds hooks, so its still a
 
1508
        # useful warning in that case.
 
1509
        self.assertEqual(breezy.branch.BranchHooks(),
 
1510
                         breezy.branch.Branch.hooks)
 
1511
        self.assertEqual(
 
1512
            breezy.bzr.smart.server.SmartServerHooks(),
 
1513
            breezy.bzr.smart.server.SmartTCPServer.hooks)
 
1514
        self.assertEqual(
 
1515
            breezy.commands.CommandHooks(), breezy.commands.Command.hooks)
 
1516
 
 
1517
    def test__gather_lsprof_in_benchmarks(self):
 
1518
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
 
1519
 
 
1520
        Each self.time() call is individually and separately profiled.
 
1521
        """
 
1522
        self.requireFeature(features.lsprof_feature)
 
1523
        # overrides the class member with an instance member so no cleanup
 
1524
        # needed.
 
1525
        self._gather_lsprof_in_benchmarks = True
 
1526
        self.time(time.sleep, 0.000)
 
1527
        self.time(time.sleep, 0.003)
 
1528
        self.assertEqual(2, len(self._benchcalls))
 
1529
        self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
 
1530
        self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
 
1531
        self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats)
 
1532
        self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats)
 
1533
        del self._benchcalls[:]
 
1534
 
 
1535
    def test_knownFailure(self):
 
1536
        """Self.knownFailure() should raise a KnownFailure exception."""
 
1537
        self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
 
1538
 
 
1539
    def test_open_bzrdir_safe_roots(self):
 
1540
        # even a memory transport should fail to open when its url isn't
 
1541
        # permitted.
 
1542
        # Manually set one up (TestCase doesn't and shouldn't provide magic
 
1543
        # machinery)
 
1544
        transport_server = memory.MemoryServer()
 
1545
        transport_server.start_server()
 
1546
        self.addCleanup(transport_server.stop_server)
 
1547
        t = transport.get_transport_from_url(transport_server.get_url())
 
1548
        controldir.ControlDir.create(t.base)
 
1549
        self.assertRaises(errors.BzrError,
 
1550
                          controldir.ControlDir.open_from_transport, t)
 
1551
        # But if we declare this as safe, we can open the bzrdir.
 
1552
        self.permit_url(t.base)
 
1553
        self._bzr_selftest_roots.append(t.base)
 
1554
        controldir.ControlDir.open_from_transport(t)
 
1555
 
 
1556
    def test_requireFeature_available(self):
 
1557
        """self.requireFeature(available) is a no-op."""
 
1558
        class Available(features.Feature):
 
1559
            def _probe(self):
 
1560
                return True
 
1561
        feature = Available()
 
1562
        self.requireFeature(feature)
 
1563
 
 
1564
    def test_requireFeature_unavailable(self):
 
1565
        """self.requireFeature(unavailable) raises UnavailableFeature."""
 
1566
        class Unavailable(features.Feature):
 
1567
            def _probe(self):
 
1568
                return False
 
1569
        feature = Unavailable()
 
1570
        self.assertRaises(tests.UnavailableFeature,
 
1571
                          self.requireFeature, feature)
 
1572
 
 
1573
    def test_run_no_parameters(self):
 
1574
        test = SampleTestCase('_test_pass')
 
1575
        test.run()
 
1576
 
 
1577
    def test_run_enabled_unittest_result(self):
 
1578
        """Test we revert to regular behaviour when the test is enabled."""
 
1579
        test = SampleTestCase('_test_pass')
 
1580
 
 
1581
        class EnabledFeature(object):
 
1582
            def available(self):
 
1583
                return True
 
1584
        test._test_needs_features = [EnabledFeature()]
 
1585
        result = unittest.TestResult()
 
1586
        test.run(result)
 
1587
        self.assertEqual(1, result.testsRun)
 
1588
        self.assertEqual([], result.errors)
 
1589
        self.assertEqual([], result.failures)
 
1590
 
 
1591
    def test_run_disabled_unittest_result(self):
 
1592
        """Test our compatibility for disabled tests with unittest results."""
 
1593
        test = SampleTestCase('_test_pass')
 
1594
 
 
1595
        class DisabledFeature(object):
 
1596
            def available(self):
 
1597
                return False
 
1598
        test._test_needs_features = [DisabledFeature()]
 
1599
        result = unittest.TestResult()
 
1600
        test.run(result)
 
1601
        self.assertEqual(1, result.testsRun)
 
1602
        self.assertEqual([], result.errors)
 
1603
        self.assertEqual([], result.failures)
 
1604
 
 
1605
    def test_run_disabled_supporting_result(self):
 
1606
        """Test disabled tests behaviour with support aware results."""
 
1607
        test = SampleTestCase('_test_pass')
 
1608
 
 
1609
        class DisabledFeature(object):
 
1610
            def __eq__(self, other):
 
1611
                return isinstance(other, DisabledFeature)
 
1612
 
 
1613
            def available(self):
 
1614
                return False
 
1615
        the_feature = DisabledFeature()
 
1616
        test._test_needs_features = [the_feature]
 
1617
 
 
1618
        class InstrumentedTestResult(unittest.TestResult):
 
1619
            def __init__(self):
 
1620
                unittest.TestResult.__init__(self)
 
1621
                self.calls = []
 
1622
 
 
1623
            def startTest(self, test):
 
1624
                self.calls.append(('startTest', test))
 
1625
 
 
1626
            def stopTest(self, test):
 
1627
                self.calls.append(('stopTest', test))
 
1628
 
 
1629
            def addNotSupported(self, test, feature):
 
1630
                self.calls.append(('addNotSupported', test, feature))
 
1631
        result = InstrumentedTestResult()
 
1632
        test.run(result)
 
1633
        case = result.calls[0][1]
 
1634
        self.assertEqual([
 
1635
            ('startTest', case),
 
1636
            ('addNotSupported', case, the_feature),
 
1637
            ('stopTest', case),
 
1638
            ],
 
1639
            result.calls)
 
1640
 
 
1641
    def test_start_server_registers_url(self):
 
1642
        transport_server = memory.MemoryServer()
 
1643
        # A little strict, but unlikely to be changed soon.
 
1644
        self.assertEqual([], self._bzr_selftest_roots)
 
1645
        self.start_server(transport_server)
 
1646
        self.assertSubset([transport_server.get_url()],
 
1647
                          self._bzr_selftest_roots)
 
1648
 
 
1649
    def test_assert_list_raises_on_generator(self):
 
1650
        def generator_which_will_raise():
 
1651
            # This will not raise until after the first yield
 
1652
            yield 1
 
1653
            raise _TestException()
 
1654
 
 
1655
        e = self.assertListRaises(_TestException, generator_which_will_raise)
 
1656
        self.assertIsInstance(e, _TestException)
 
1657
 
 
1658
        e = self.assertListRaises(Exception, generator_which_will_raise)
 
1659
        self.assertIsInstance(e, _TestException)
 
1660
 
 
1661
    def test_assert_list_raises_on_plain(self):
 
1662
        def plain_exception():
 
1663
            raise _TestException()
 
1664
            return []
 
1665
 
 
1666
        e = self.assertListRaises(_TestException, plain_exception)
 
1667
        self.assertIsInstance(e, _TestException)
 
1668
 
 
1669
        e = self.assertListRaises(Exception, plain_exception)
 
1670
        self.assertIsInstance(e, _TestException)
 
1671
 
 
1672
    def test_assert_list_raises_assert_wrong_exception(self):
 
1673
        class _NotTestException(Exception):
 
1674
            pass
 
1675
 
 
1676
        def wrong_exception():
 
1677
            raise _NotTestException()
 
1678
 
 
1679
        def wrong_exception_generator():
 
1680
            yield 1
 
1681
            yield 2
 
1682
            raise _NotTestException()
 
1683
 
 
1684
        # Wrong exceptions are not intercepted
 
1685
        self.assertRaises(
 
1686
            _NotTestException,
 
1687
            self.assertListRaises, _TestException, wrong_exception)
 
1688
        self.assertRaises(
 
1689
            _NotTestException,
 
1690
            self.assertListRaises, _TestException, wrong_exception_generator)
 
1691
 
 
1692
    def test_assert_list_raises_no_exception(self):
 
1693
        def success():
 
1694
            return []
 
1695
 
 
1696
        def success_generator():
 
1697
            yield 1
 
1698
            yield 2
 
1699
 
 
1700
        self.assertRaises(AssertionError,
 
1701
                          self.assertListRaises, _TestException, success)
 
1702
 
 
1703
        self.assertRaises(
 
1704
            AssertionError,
 
1705
            self.assertListRaises, _TestException, success_generator)
 
1706
 
 
1707
    def _run_successful_test(self, test):
 
1708
        result = testtools.TestResult()
 
1709
        test.run(result)
 
1710
        self.assertTrue(result.wasSuccessful())
 
1711
        return result
 
1712
 
 
1713
    def test_overrideAttr_without_value(self):
 
1714
        self.test_attr = 'original'  # Define a test attribute
 
1715
        obj = self  # Make 'obj' visible to the embedded test
 
1716
 
 
1717
        class Test(tests.TestCase):
 
1718
 
 
1719
            def setUp(self):
 
1720
                super(Test, self).setUp()
 
1721
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1722
 
 
1723
            def test_value(self):
 
1724
                self.assertEqual('original', self.orig)
 
1725
                self.assertEqual('original', obj.test_attr)
 
1726
                obj.test_attr = 'modified'
 
1727
                self.assertEqual('modified', obj.test_attr)
 
1728
 
 
1729
        self._run_successful_test(Test('test_value'))
 
1730
        self.assertEqual('original', obj.test_attr)
 
1731
 
 
1732
    def test_overrideAttr_with_value(self):
 
1733
        self.test_attr = 'original'  # Define a test attribute
 
1734
        obj = self  # Make 'obj' visible to the embedded test
 
1735
 
 
1736
        class Test(tests.TestCase):
 
1737
 
 
1738
            def setUp(self):
 
1739
                super(Test, self).setUp()
 
1740
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1741
 
 
1742
            def test_value(self):
 
1743
                self.assertEqual('original', self.orig)
 
1744
                self.assertEqual('modified', obj.test_attr)
 
1745
 
 
1746
        self._run_successful_test(Test('test_value'))
 
1747
        self.assertEqual('original', obj.test_attr)
 
1748
 
 
1749
    def test_overrideAttr_with_no_existing_value_and_value(self):
 
1750
        # Do not define the test_attribute
 
1751
        obj = self  # Make 'obj' visible to the embedded test
 
1752
 
 
1753
        class Test(tests.TestCase):
 
1754
 
 
1755
            def setUp(self):
 
1756
                tests.TestCase.setUp(self)
 
1757
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1758
 
 
1759
            def test_value(self):
 
1760
                self.assertEqual(tests._unitialized_attr, self.orig)
 
1761
                self.assertEqual('modified', obj.test_attr)
 
1762
 
 
1763
        self._run_successful_test(Test('test_value'))
 
1764
        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1765
 
 
1766
    def test_overrideAttr_with_no_existing_value_and_no_value(self):
 
1767
        # Do not define the test_attribute
 
1768
        obj = self  # Make 'obj' visible to the embedded test
 
1769
 
 
1770
        class Test(tests.TestCase):
 
1771
 
 
1772
            def setUp(self):
 
1773
                tests.TestCase.setUp(self)
 
1774
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1775
 
 
1776
            def test_value(self):
 
1777
                self.assertEqual(tests._unitialized_attr, self.orig)
 
1778
                self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1779
 
 
1780
        self._run_successful_test(Test('test_value'))
 
1781
        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1782
 
 
1783
    def test_recordCalls(self):
 
1784
        from breezy.tests import test_selftest
 
1785
        calls = self.recordCalls(
 
1786
            test_selftest, '_add_numbers')
 
1787
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1788
                         12)
 
1789
        self.assertEqual(calls, [((2, 10), {})])
 
1790
 
 
1791
 
 
1792
def _add_numbers(a, b):
 
1793
    return a + b
 
1794
 
 
1795
 
 
1796
class _MissingFeature(features.Feature):
 
1797
    def _probe(self):
 
1798
        return False
 
1799
 
 
1800
 
 
1801
missing_feature = _MissingFeature()
 
1802
 
 
1803
 
 
1804
def _get_test(name):
 
1805
    """Get an instance of a specific example test.
 
1806
 
 
1807
    We protect this in a function so that they don't auto-run in the test
 
1808
    suite.
 
1809
    """
 
1810
 
 
1811
    class ExampleTests(tests.TestCase):
 
1812
 
 
1813
        def test_fail(self):
 
1814
            mutter('this was a failing test')
 
1815
            self.fail('this test will fail')
 
1816
 
 
1817
        def test_error(self):
 
1818
            mutter('this test errored')
 
1819
            raise RuntimeError('gotcha')
 
1820
 
 
1821
        def test_missing_feature(self):
 
1822
            mutter('missing the feature')
 
1823
            self.requireFeature(missing_feature)
 
1824
 
 
1825
        def test_skip(self):
 
1826
            mutter('this test will be skipped')
 
1827
            raise tests.TestSkipped('reason')
 
1828
 
 
1829
        def test_success(self):
 
1830
            mutter('this test succeeds')
 
1831
 
 
1832
        def test_xfail(self):
 
1833
            mutter('test with expected failure')
 
1834
            self.knownFailure('this_fails')
 
1835
 
 
1836
        def test_unexpected_success(self):
 
1837
            mutter('test with unexpected success')
 
1838
            self.expectFailure('should_fail', lambda: None)
 
1839
 
 
1840
    return ExampleTests(name)
 
1841
 
 
1842
 
 
1843
class TestTestCaseLogDetails(tests.TestCase):
 
1844
 
 
1845
    def _run_test(self, test_name):
 
1846
        test = _get_test(test_name)
 
1847
        result = testtools.TestResult()
 
1848
        test.run(result)
 
1849
        return result
 
1850
 
 
1851
    def test_fail_has_log(self):
 
1852
        result = self._run_test('test_fail')
 
1853
        self.assertEqual(1, len(result.failures))
 
1854
        result_content = result.failures[0][1]
 
1855
        self.assertContainsRe(result_content,
 
1856
                              '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1857
        self.assertContainsRe(result_content, 'this was a failing test')
 
1858
 
 
1859
    def test_error_has_log(self):
 
1860
        result = self._run_test('test_error')
 
1861
        self.assertEqual(1, len(result.errors))
 
1862
        result_content = result.errors[0][1]
 
1863
        self.assertContainsRe(result_content,
 
1864
                              '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1865
        self.assertContainsRe(result_content, 'this test errored')
 
1866
 
 
1867
    def test_skip_has_no_log(self):
 
1868
        result = self._run_test('test_skip')
 
1869
        reasons = result.skip_reasons
 
1870
        self.assertEqual({'reason'}, set(reasons))
 
1871
        skips = reasons['reason']
 
1872
        self.assertEqual(1, len(skips))
 
1873
        test = skips[0]
 
1874
        self.assertFalse('log' in test.getDetails())
 
1875
 
 
1876
    def test_missing_feature_has_no_log(self):
 
1877
        # testtools doesn't know about addNotSupported, so it just gets
 
1878
        # considered as a skip
 
1879
        result = self._run_test('test_missing_feature')
 
1880
        reasons = result.skip_reasons
 
1881
        self.assertEqual({str(missing_feature)}, set(reasons))
 
1882
        skips = reasons[str(missing_feature)]
 
1883
        self.assertEqual(1, len(skips))
 
1884
        test = skips[0]
 
1885
        self.assertFalse('log' in test.getDetails())
 
1886
 
 
1887
    def test_xfail_has_no_log(self):
 
1888
        result = self._run_test('test_xfail')
 
1889
        self.assertEqual(1, len(result.expectedFailures))
 
1890
        result_content = result.expectedFailures[0][1]
 
1891
        self.assertNotContainsRe(result_content,
 
1892
                                 '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1893
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1894
 
 
1895
    def test_unexpected_success_has_log(self):
 
1896
        result = self._run_test('test_unexpected_success')
 
1897
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1898
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1899
        # expectedFailures is a list of reasons?
 
1900
        test = result.unexpectedSuccesses[0]
 
1901
        details = test.getDetails()
 
1902
        self.assertTrue('log' in details)
 
1903
 
 
1904
 
 
1905
class TestTestCloning(tests.TestCase):
 
1906
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1907
 
 
1908
    def test_cloned_testcase_does_not_share_details(self):
 
1909
        """A TestCase cloned with clone_test does not share mutable attributes
 
1910
        such as details or cleanups.
 
1911
        """
 
1912
        class Test(tests.TestCase):
 
1913
            def test_foo(self):
 
1914
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1915
        orig_test = Test('test_foo')
 
1916
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1917
        orig_test.run(unittest.TestResult())
 
1918
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1919
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1920
 
 
1921
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1922
        """Applying two levels of scenarios to a test preserves the attributes
 
1923
        added by both scenarios.
 
1924
        """
 
1925
        class Test(tests.TestCase):
 
1926
            def test_foo(self):
 
1927
                pass
 
1928
        test = Test('test_foo')
 
1929
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1930
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1931
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1932
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1933
        all_tests = list(tests.iter_suite_tests(suite))
 
1934
        self.assertLength(4, all_tests)
 
1935
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1936
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1937
 
 
1938
 
 
1939
# NB: Don't delete this; it's not actually from 0.11!
 
1940
@deprecated_function(deprecated_in((0, 11, 0)))
 
1941
def sample_deprecated_function():
 
1942
    """A deprecated function to test applyDeprecated with."""
 
1943
    return 2
 
1944
 
 
1945
 
 
1946
def sample_undeprecated_function(a_param):
 
1947
    """A undeprecated function to test applyDeprecated with."""
 
1948
 
 
1949
 
 
1950
class ApplyDeprecatedHelper(object):
 
1951
    """A helper class for ApplyDeprecated tests."""
 
1952
 
 
1953
    @deprecated_method(deprecated_in((0, 11, 0)))
 
1954
    def sample_deprecated_method(self, param_one):
 
1955
        """A deprecated method for testing with."""
 
1956
        return param_one
 
1957
 
 
1958
    def sample_normal_method(self):
 
1959
        """A undeprecated method."""
 
1960
 
 
1961
    @deprecated_method(deprecated_in((0, 10, 0)))
 
1962
    def sample_nested_deprecation(self):
 
1963
        return sample_deprecated_function()
 
1964
 
 
1965
 
 
1966
class TestExtraAssertions(tests.TestCase):
 
1967
    """Tests for new test assertions in breezy test suite"""
 
1968
 
 
1969
    def test_assert_isinstance(self):
 
1970
        self.assertIsInstance(2, int)
 
1971
        self.assertIsInstance(u'', str)
 
1972
        e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
 
1973
        self.assertIn(
 
1974
            str(e),
 
1975
            ["None is an instance of <type 'NoneType'> rather than "
 
1976
             "<type 'int'>",
 
1977
             "None is an instance of <class 'NoneType'> rather than "
 
1978
             "<class 'int'>"])
 
1979
        self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
 
1980
        e = self.assertRaises(AssertionError,
 
1981
                              self.assertIsInstance, None, int,
 
1982
                              "it's just not")
 
1983
        self.assertEqual(
 
1984
            str(e),
 
1985
            "None is an instance of <class 'NoneType'> rather "
 
1986
            "than <class 'int'>: it's just not")
 
1987
 
 
1988
    def test_assertEndsWith(self):
 
1989
        self.assertEndsWith('foo', 'oo')
 
1990
        self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
 
1991
 
 
1992
    def test_assertEqualDiff(self):
 
1993
        e = self.assertRaises(AssertionError,
 
1994
                              self.assertEqualDiff, '', '\n')
 
1995
        self.assertEqual(str(e),
 
1996
                         # Don't blink ! The '+' applies to the second string
 
1997
                         'first string is missing a final newline.\n+ \n')
 
1998
        e = self.assertRaises(AssertionError,
 
1999
                              self.assertEqualDiff, '\n', '')
 
2000
        self.assertEqual(str(e),
 
2001
                         # Don't blink ! The '-' applies to the second string
 
2002
                         'second string is missing a final newline.\n- \n')
 
2003
 
 
2004
 
 
2005
class TestDeprecations(tests.TestCase):
 
2006
 
 
2007
    def test_applyDeprecated_not_deprecated(self):
 
2008
        sample_object = ApplyDeprecatedHelper()
 
2009
        # calling an undeprecated callable raises an assertion
 
2010
        self.assertRaises(AssertionError, self.applyDeprecated,
 
2011
                          deprecated_in((0, 11, 0)),
 
2012
                          sample_object.sample_normal_method)
 
2013
        self.assertRaises(AssertionError, self.applyDeprecated,
 
2014
                          deprecated_in((0, 11, 0)),
 
2015
                          sample_undeprecated_function, "a param value")
 
2016
        # calling a deprecated callable (function or method) with the wrong
 
2017
        # expected deprecation fails.
 
2018
        self.assertRaises(AssertionError, self.applyDeprecated,
 
2019
                          deprecated_in((0, 10, 0)),
 
2020
                          sample_object.sample_deprecated_method,
 
2021
                          "a param value")
 
2022
        self.assertRaises(AssertionError, self.applyDeprecated,
 
2023
                          deprecated_in((0, 10, 0)),
 
2024
                          sample_deprecated_function)
 
2025
        # calling a deprecated callable (function or method) with the right
 
2026
        # expected deprecation returns the functions result.
 
2027
        self.assertEqual(
 
2028
            "a param value",
 
2029
            self.applyDeprecated(
 
2030
                deprecated_in((0, 11, 0)),
 
2031
                sample_object.sample_deprecated_method, "a param value"))
 
2032
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
 
2033
                                                 sample_deprecated_function))
 
2034
        # calling a nested deprecation with the wrong deprecation version
 
2035
        # fails even if a deeper nested function was deprecated with the
 
2036
        # supplied version.
 
2037
        self.assertRaises(
 
2038
            AssertionError, self.applyDeprecated,
 
2039
            deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
 
2040
        # calling a nested deprecation with the right deprecation value
 
2041
        # returns the calls result.
 
2042
        self.assertEqual(
 
2043
            2, self.applyDeprecated(
 
2044
                deprecated_in((0, 10, 0)),
 
2045
                sample_object.sample_nested_deprecation))
 
2046
 
 
2047
    def test_callDeprecated(self):
 
2048
        def testfunc(be_deprecated, result=None):
 
2049
            if be_deprecated is True:
 
2050
                symbol_versioning.warn('i am deprecated', DeprecationWarning,
 
2051
                                       stacklevel=1)
 
2052
            return result
 
2053
        result = self.callDeprecated(['i am deprecated'], testfunc, True)
 
2054
        self.assertIs(None, result)
 
2055
        result = self.callDeprecated([], testfunc, False, 'result')
 
2056
        self.assertEqual('result', result)
 
2057
        self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
 
2058
        self.callDeprecated([], testfunc, be_deprecated=False)
 
2059
 
 
2060
 
 
2061
class TestWarningTests(tests.TestCase):
 
2062
    """Tests for calling methods that raise warnings."""
 
2063
 
 
2064
    def test_callCatchWarnings(self):
 
2065
        def meth(a, b):
 
2066
            warnings.warn("this is your last warning")
 
2067
            return a + b
 
2068
        wlist, result = self.callCatchWarnings(meth, 1, 2)
 
2069
        self.assertEqual(3, result)
 
2070
        # would like just to compare them, but UserWarning doesn't implement
 
2071
        # eq well
 
2072
        w0, = wlist
 
2073
        self.assertIsInstance(w0, UserWarning)
 
2074
        self.assertEqual("this is your last warning", str(w0))
 
2075
 
 
2076
 
 
2077
class TestConvenienceMakers(tests.TestCaseWithTransport):
 
2078
    """Test for the make_* convenience functions."""
 
2079
 
 
2080
    def test_make_branch_and_tree_with_format(self):
 
2081
        # we should be able to supply a format to make_branch_and_tree
 
2082
        self.make_branch_and_tree(
 
2083
            'a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
 
2084
        self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
 
2085
                              breezy.bzr.bzrdir.BzrDirMetaFormat1)
 
2086
 
 
2087
    def test_make_branch_and_memory_tree(self):
 
2088
        # we should be able to get a new branch and a mutable tree from
 
2089
        # TestCaseWithTransport
 
2090
        tree = self.make_branch_and_memory_tree('a')
 
2091
        self.assertIsInstance(tree, breezy.memorytree.MemoryTree)
 
2092
 
 
2093
    def test_make_tree_for_local_vfs_backed_transport(self):
 
2094
        # make_branch_and_tree has to use local branch and repositories
 
2095
        # when the vfs transport and local disk are colocated, even if
 
2096
        # a different transport is in use for url generation.
 
2097
        self.transport_server = test_server.FakeVFATServer
 
2098
        self.assertFalse(self.get_url('t1').startswith('file://'))
 
2099
        tree = self.make_branch_and_tree('t1')
 
2100
        base = tree.controldir.root_transport.base
 
2101
        self.assertStartsWith(base, 'file://')
 
2102
        self.assertEqual(tree.controldir.root_transport,
 
2103
                         tree.branch.controldir.root_transport)
 
2104
        self.assertEqual(tree.controldir.root_transport,
 
2105
                         tree.branch.repository.controldir.root_transport)
 
2106
 
 
2107
 
 
2108
class SelfTestHelper(object):
 
2109
 
 
2110
    def run_selftest(self, **kwargs):
 
2111
        """Run selftest returning its output."""
 
2112
        bio = BytesIO()
 
2113
        output = TextIOWrapper(bio, 'utf-8')
 
2114
        old_transport = breezy.tests.default_transport
 
2115
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
2116
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
2117
        try:
 
2118
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
2119
        finally:
 
2120
            breezy.tests.default_transport = old_transport
 
2121
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
2122
        output.flush()
 
2123
        output.detach()
 
2124
        bio.seek(0)
 
2125
        return bio
 
2126
 
 
2127
 
 
2128
class TestSelftest(tests.TestCase, SelfTestHelper):
 
2129
    """Tests of breezy.tests.selftest."""
 
2130
 
 
2131
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(
 
2132
            self):
 
2133
        factory_called = []
 
2134
 
 
2135
        def factory():
 
2136
            factory_called.append(True)
 
2137
            return TestUtil.TestSuite()
 
2138
        out = StringIO()
 
2139
        err = StringIO()
 
2140
        self.apply_redirected(out, err, None, breezy.tests.selftest,
 
2141
                              test_suite_factory=factory)
 
2142
        self.assertEqual([True], factory_called)
 
2143
 
 
2144
    def factory(self):
 
2145
        """A test suite factory."""
 
2146
        class Test(tests.TestCase):
 
2147
            def id(self):
 
2148
                return __name__ + ".Test." + self._testMethodName
 
2149
 
 
2150
            def a(self):
 
2151
                pass
 
2152
 
 
2153
            def b(self):
 
2154
                pass
 
2155
 
 
2156
            def c(telf):
 
2157
                pass
 
2158
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
2159
 
 
2160
    def test_list_only(self):
 
2161
        output = self.run_selftest(test_suite_factory=self.factory,
 
2162
                                   list_only=True)
 
2163
        self.assertEqual(3, len(output.readlines()))
 
2164
 
 
2165
    def test_list_only_filtered(self):
 
2166
        output = self.run_selftest(test_suite_factory=self.factory,
 
2167
                                   list_only=True, pattern="Test.b")
 
2168
        self.assertEndsWith(output.getvalue(), b"Test.b\n")
 
2169
        self.assertLength(1, output.readlines())
 
2170
 
 
2171
    def test_list_only_excludes(self):
 
2172
        output = self.run_selftest(test_suite_factory=self.factory,
 
2173
                                   list_only=True, exclude_pattern="Test.b")
 
2174
        self.assertNotContainsRe(b"Test.b", output.getvalue())
 
2175
        self.assertLength(2, output.readlines())
 
2176
 
 
2177
    def test_lsprof_tests(self):
 
2178
        self.requireFeature(features.lsprof_feature)
 
2179
        results = []
 
2180
 
 
2181
        class Test(object):
 
2182
            def __call__(test, result):
 
2183
                test.run(result)
 
2184
 
 
2185
            def run(test, result):
 
2186
                results.append(result)
 
2187
 
 
2188
            def countTestCases(self):
 
2189
                return 1
 
2190
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
 
2191
        self.assertLength(1, results)
 
2192
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
 
2193
 
 
2194
    def test_random(self):
 
2195
        # test randomising by listing a number of tests.
 
2196
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
2197
                                       list_only=True, random_seed="123")
 
2198
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
2199
                                       list_only=True, random_seed="234")
 
2200
        self.assertNotEqual(output_123, output_234)
 
2201
        # "Randominzing test order..\n\n
 
2202
        self.assertLength(5, output_123.readlines())
 
2203
        self.assertLength(5, output_234.readlines())
 
2204
 
 
2205
    def test_random_reuse_is_same_order(self):
 
2206
        # test randomising by listing a number of tests.
 
2207
        expected = self.run_selftest(test_suite_factory=self.factory,
 
2208
                                     list_only=True, random_seed="123")
 
2209
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
2210
                                     list_only=True, random_seed="123")
 
2211
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
2212
 
 
2213
    def test_runner_class(self):
 
2214
        self.requireFeature(features.subunit)
 
2215
        from subunit import ProtocolTestCase
 
2216
        stream = self.run_selftest(
 
2217
            runner_class=tests.SubUnitBzrRunnerv1,
 
2218
            test_suite_factory=self.factory)
 
2219
        test = ProtocolTestCase(stream)
 
2220
        result = unittest.TestResult()
 
2221
        test.run(result)
 
2222
        self.assertEqual(3, result.testsRun)
 
2223
 
 
2224
    def test_starting_with_single_argument(self):
 
2225
        output = self.run_selftest(test_suite_factory=self.factory,
 
2226
                                   starting_with=[
 
2227
                                       'breezy.tests.test_selftest.Test.a'],
 
2228
                                   list_only=True)
 
2229
        self.assertEqual(b'breezy.tests.test_selftest.Test.a\n',
 
2230
                         output.getvalue())
 
2231
 
 
2232
    def test_starting_with_multiple_argument(self):
 
2233
        output = self.run_selftest(
 
2234
            test_suite_factory=self.factory,
 
2235
            starting_with=['breezy.tests.test_selftest.Test.a',
 
2236
                           'breezy.tests.test_selftest.Test.b'],
 
2237
            list_only=True)
 
2238
        self.assertEqual(b'breezy.tests.test_selftest.Test.a\n'
 
2239
                         b'breezy.tests.test_selftest.Test.b\n',
 
2240
                         output.getvalue())
 
2241
 
 
2242
    def check_transport_set(self, transport_server):
 
2243
        captured_transport = []
 
2244
 
 
2245
        def seen_transport(a_transport):
 
2246
            captured_transport.append(a_transport)
 
2247
 
 
2248
        class Capture(tests.TestCase):
 
2249
            def a(self):
 
2250
                seen_transport(breezy.tests.default_transport)
 
2251
 
 
2252
        def factory():
 
2253
            return TestUtil.TestSuite([Capture("a")])
 
2254
        self.run_selftest(transport=transport_server,
 
2255
                          test_suite_factory=factory)
 
2256
        self.assertEqual(transport_server, captured_transport[0])
 
2257
 
 
2258
    def test_transport_sftp(self):
 
2259
        self.requireFeature(features.paramiko)
 
2260
        from breezy.tests import stub_sftp
 
2261
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
 
2262
 
 
2263
    def test_transport_memory(self):
 
2264
        self.check_transport_set(memory.MemoryServer)
 
2265
 
 
2266
 
 
2267
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
2268
    # Does IO: reads test.list
 
2269
 
 
2270
    def test_load_list(self):
 
2271
        # Provide a list with one test - this test.
 
2272
        test_id_line = b'%s\n' % self.id().encode('ascii')
 
2273
        self.build_tree_contents([('test.list', test_id_line)])
 
2274
        # And generate a list of the tests in  the suite.
 
2275
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
2276
        self.assertEqual(test_id_line, stream.getvalue())
 
2277
 
 
2278
    def test_load_unknown(self):
 
2279
        # Provide a list with one test - this test.
 
2280
        # And generate a list of the tests in  the suite.
 
2281
        self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
2282
                          load_list='missing file name', list_only=True)
 
2283
 
 
2284
 
 
2285
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2286
 
 
2287
    _test_needs_features = [features.subunit]
 
2288
 
 
2289
    def run_subunit_stream(self, test_name):
 
2290
        from subunit import ProtocolTestCase
 
2291
 
 
2292
        def factory():
 
2293
            return TestUtil.TestSuite([_get_test(test_name)])
 
2294
        stream = self.run_selftest(
 
2295
            runner_class=tests.SubUnitBzrRunnerv1,
 
2296
            test_suite_factory=factory)
 
2297
        test = ProtocolTestCase(stream)
 
2298
        result = testtools.TestResult()
 
2299
        test.run(result)
 
2300
        content = stream.getvalue()
 
2301
        return content, result
 
2302
 
 
2303
    def test_fail_has_log(self):
 
2304
        content, result = self.run_subunit_stream('test_fail')
 
2305
        self.assertEqual(1, len(result.failures))
 
2306
        self.assertContainsRe(content, b'(?m)^log$')
 
2307
        self.assertContainsRe(content, b'this test will fail')
 
2308
 
 
2309
    def test_error_has_log(self):
 
2310
        content, result = self.run_subunit_stream('test_error')
 
2311
        self.assertContainsRe(content, b'(?m)^log$')
 
2312
        self.assertContainsRe(content, b'this test errored')
 
2313
 
 
2314
    def test_skip_has_no_log(self):
 
2315
        content, result = self.run_subunit_stream('test_skip')
 
2316
        self.assertNotContainsRe(content, b'(?m)^log$')
 
2317
        self.assertNotContainsRe(content, b'this test will be skipped')
 
2318
        reasons = result.skip_reasons
 
2319
        self.assertEqual({'reason'}, set(reasons))
 
2320
        skips = reasons['reason']
 
2321
        self.assertEqual(1, len(skips))
 
2322
        # test = skips[0]
 
2323
        # RemotedTestCase doesn't preserve the "details"
 
2324
        # self.assertFalse('log' in test.getDetails())
 
2325
 
 
2326
    def test_missing_feature_has_no_log(self):
 
2327
        content, result = self.run_subunit_stream('test_missing_feature')
 
2328
        self.assertNotContainsRe(content, b'(?m)^log$')
 
2329
        self.assertNotContainsRe(content, b'missing the feature')
 
2330
        reasons = result.skip_reasons
 
2331
        self.assertEqual({'_MissingFeature\n'}, set(reasons))
 
2332
        skips = reasons['_MissingFeature\n']
 
2333
        self.assertEqual(1, len(skips))
 
2334
        # test = skips[0]
 
2335
        # RemotedTestCase doesn't preserve the "details"
 
2336
        # self.assertFalse('log' in test.getDetails())
 
2337
 
 
2338
    def test_xfail_has_no_log(self):
 
2339
        content, result = self.run_subunit_stream('test_xfail')
 
2340
        self.assertNotContainsRe(content, b'(?m)^log$')
 
2341
        self.assertNotContainsRe(content, b'test with expected failure')
 
2342
        self.assertEqual(1, len(result.expectedFailures))
 
2343
        result_content = result.expectedFailures[0][1]
 
2344
        self.assertNotContainsRe(result_content,
 
2345
                                 '(?m)^(?:Text attachment: )?log(?:$|: )')
 
2346
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2347
 
 
2348
    def test_unexpected_success_has_log(self):
 
2349
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2350
        self.assertContainsRe(content, b'(?m)^log$')
 
2351
        self.assertContainsRe(content, b'test with unexpected success')
 
2352
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2353
        # test = result.unexpectedSuccesses[0]
 
2354
        # RemotedTestCase doesn't preserve the "details"
 
2355
        # self.assertTrue('log' in test.getDetails())
 
2356
 
 
2357
    def test_success_has_no_log(self):
 
2358
        content, result = self.run_subunit_stream('test_success')
 
2359
        self.assertEqual(1, result.testsRun)
 
2360
        self.assertNotContainsRe(content, b'(?m)^log$')
 
2361
        self.assertNotContainsRe(content, b'this test succeeds')
 
2362
 
 
2363
 
 
2364
class TestRunBzr(tests.TestCase):
 
2365
 
 
2366
    result = 0
 
2367
    out = ''
 
2368
    err = ''
 
2369
 
 
2370
    def _run_bzr_core(self, argv, encoding=None, stdin=None,
 
2371
                      stdout=None, stderr=None, working_dir=None):
 
2372
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
2373
 
 
2374
        Attempts to run bzr from inside this class don't actually run it.
 
2375
 
 
2376
        We test how run_bzr actually invokes bzr in another location.  Here we
 
2377
        only need to test that it passes the right parameters to run_bzr.
 
2378
        """
 
2379
        self.argv = list(argv)
 
2380
        self.encoding = encoding
 
2381
        self.stdin = stdin
 
2382
        self.working_dir = working_dir
 
2383
        stdout.write(self.out)
 
2384
        stderr.write(self.err)
 
2385
        return self.result
 
2386
 
 
2387
    def test_run_bzr_error(self):
 
2388
        self.out = "It sure does!\n"
 
2389
        self.result = 34
 
2390
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
2391
        self.assertEqual(['rocks'], self.argv)
 
2392
        self.assertEqual('It sure does!\n', out)
 
2393
        self.assertEqual(out, self.out)
 
2394
        self.assertEqual('', err)
 
2395
        self.assertEqual(err, self.err)
 
2396
 
 
2397
    def test_run_bzr_error_regexes(self):
 
2398
        self.out = ''
 
2399
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
2400
        self.result = 3
 
2401
        out, err = self.run_bzr_error(
 
2402
            ["bzr: ERROR: foobarbaz is not versioned"],
 
2403
            ['file-id', 'foobarbaz'])
 
2404
 
 
2405
    def test_encoding(self):
 
2406
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
2407
        self.run_bzr('foo bar')
 
2408
        self.assertEqual(osutils.get_user_encoding(), self.encoding)
 
2409
        self.assertEqual(['foo', 'bar'], self.argv)
 
2410
 
 
2411
        self.run_bzr('foo bar', encoding='baz')
 
2412
        self.assertEqual('baz', self.encoding)
 
2413
        self.assertEqual(['foo', 'bar'], self.argv)
 
2414
 
 
2415
    def test_stdin(self):
 
2416
        # test that the stdin keyword to run_bzr is passed through to
 
2417
        # _run_bzr_core as-is. We do this by overriding
 
2418
        # _run_bzr_core in this class, and then calling run_bzr,
 
2419
        # which is a convenience function for _run_bzr_core, so
 
2420
        # should invoke it.
 
2421
        self.run_bzr('foo bar', stdin='gam')
 
2422
        self.assertEqual('gam', self.stdin)
 
2423
        self.assertEqual(['foo', 'bar'], self.argv)
 
2424
 
 
2425
        self.run_bzr('foo bar', stdin='zippy')
 
2426
        self.assertEqual('zippy', self.stdin)
 
2427
        self.assertEqual(['foo', 'bar'], self.argv)
 
2428
 
 
2429
    def test_working_dir(self):
 
2430
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
2431
        self.run_bzr('foo bar')
 
2432
        self.assertEqual(None, self.working_dir)
 
2433
        self.assertEqual(['foo', 'bar'], self.argv)
 
2434
 
 
2435
        self.run_bzr('foo bar', working_dir='baz')
 
2436
        self.assertEqual('baz', self.working_dir)
 
2437
        self.assertEqual(['foo', 'bar'], self.argv)
 
2438
 
 
2439
    def test_reject_extra_keyword_arguments(self):
 
2440
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
2441
                          error_regex=['error message'])
 
2442
 
 
2443
 
 
2444
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2445
    # Does IO when testing the working_dir parameter.
 
2446
 
 
2447
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2448
                         a_callable=None, *args, **kwargs):
 
2449
        self.stdin = stdin
 
2450
        self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None)
 
2451
        self.factory = breezy.ui.ui_factory
 
2452
        self.working_dir = osutils.getcwd()
 
2453
        stdout.write('foo\n')
 
2454
        stderr.write('bar\n')
 
2455
        return 0
 
2456
 
 
2457
    def test_stdin(self):
 
2458
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2459
        # apply_redirected as a StringIO. We do this by overriding
 
2460
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2461
        # which calls apply_redirected.
 
2462
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2463
        self.assertEqual('gam', self.stdin.read())
 
2464
        self.assertTrue(self.stdin is self.factory_stdin)
 
2465
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2466
        self.assertEqual('zippy', self.stdin.read())
 
2467
        self.assertTrue(self.stdin is self.factory_stdin)
 
2468
 
 
2469
    def test_ui_factory(self):
 
2470
        # each invocation of self.run_bzr should get its
 
2471
        # own UI factory, which is an instance of TestUIFactory,
 
2472
        # with stdin, stdout and stderr attached to the stdin,
 
2473
        # stdout and stderr of the invoked run_bzr
 
2474
        current_factory = breezy.ui.ui_factory
 
2475
        self.run_bzr(['foo'])
 
2476
        self.assertFalse(current_factory is self.factory)
 
2477
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2478
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2479
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2480
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2481
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2482
 
 
2483
    def test_working_dir(self):
 
2484
        self.build_tree(['one/', 'two/'])
 
2485
        cwd = osutils.getcwd()
 
2486
 
 
2487
        # Default is to work in the current directory
 
2488
        self.run_bzr(['foo', 'bar'])
 
2489
        self.assertEqual(cwd, self.working_dir)
 
2490
 
 
2491
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2492
        self.assertEqual(cwd, self.working_dir)
 
2493
 
 
2494
        # The function should be run in the alternative directory
 
2495
        # but afterwards the current working dir shouldn't be changed
 
2496
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2497
        self.assertNotEqual(cwd, self.working_dir)
 
2498
        self.assertEndsWith(self.working_dir, 'one')
 
2499
        self.assertEqual(cwd, osutils.getcwd())
 
2500
 
 
2501
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2502
        self.assertNotEqual(cwd, self.working_dir)
 
2503
        self.assertEndsWith(self.working_dir, 'two')
 
2504
        self.assertEqual(cwd, osutils.getcwd())
 
2505
 
 
2506
 
 
2507
class StubProcess(object):
 
2508
    """A stub process for testing run_bzr_subprocess."""
 
2509
 
 
2510
    def __init__(self, out="", err="", retcode=0):
 
2511
        self.out = out
 
2512
        self.err = err
 
2513
        self.returncode = retcode
 
2514
 
 
2515
    def communicate(self):
 
2516
        return self.out, self.err
 
2517
 
 
2518
 
 
2519
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
 
2520
    """Base class for tests testing how we might run bzr."""
 
2521
 
 
2522
    def setUp(self):
 
2523
        super(TestWithFakedStartBzrSubprocess, self).setUp()
 
2524
        self.subprocess_calls = []
 
2525
 
 
2526
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2527
                             skip_if_plan_to_signal=False,
 
2528
                             working_dir=None,
 
2529
                             allow_plugins=False):
 
2530
        """capture what run_bzr_subprocess tries to do."""
 
2531
        self.subprocess_calls.append(
 
2532
            {'process_args': process_args,
 
2533
             'env_changes': env_changes,
 
2534
             'skip_if_plan_to_signal': skip_if_plan_to_signal,
 
2535
             'working_dir': working_dir, 'allow_plugins': allow_plugins})
 
2536
        return self.next_subprocess
 
2537
 
 
2538
 
 
2539
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2540
 
 
2541
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2542
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2543
 
 
2544
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2545
        that will return static results. This assertion method populates those
 
2546
        results and also checks the arguments run_bzr_subprocess generates.
 
2547
        """
 
2548
        self.next_subprocess = process
 
2549
        try:
 
2550
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2551
        except BaseException:
 
2552
            self.next_subprocess = None
 
2553
            for key, expected in expected_args.items():
 
2554
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2555
            raise
 
2556
        else:
 
2557
            self.next_subprocess = None
 
2558
            for key, expected in expected_args.items():
 
2559
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2560
            return result
 
2561
 
 
2562
    def test_run_bzr_subprocess(self):
 
2563
        """The run_bzr_helper_external command behaves nicely."""
 
2564
        self.assertRunBzrSubprocess({'process_args': ['--version']},
 
2565
                                    StubProcess(), '--version')
 
2566
        self.assertRunBzrSubprocess({'process_args': ['--version']},
 
2567
                                    StubProcess(), ['--version'])
 
2568
        # retcode=None disables retcode checking
 
2569
        result = self.assertRunBzrSubprocess(
 
2570
            {}, StubProcess(retcode=3), '--version', retcode=None)
 
2571
        result = self.assertRunBzrSubprocess(
 
2572
            {}, StubProcess(out="is free software"), '--version')
 
2573
        self.assertContainsRe(result[0], 'is free software')
 
2574
        # Running a subcommand that is missing errors
 
2575
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2576
                          {'process_args': ['--versionn']
 
2577
                           }, StubProcess(retcode=3),
 
2578
                          '--versionn')
 
2579
        # Unless it is told to expect the error from the subprocess
 
2580
        result = self.assertRunBzrSubprocess(
 
2581
            {}, StubProcess(retcode=3), '--versionn', retcode=3)
 
2582
        # Or to ignore retcode checking
 
2583
        result = self.assertRunBzrSubprocess(
 
2584
            {}, StubProcess(err="unknown command", retcode=3),
 
2585
            '--versionn', retcode=None)
 
2586
        self.assertContainsRe(result[1], 'unknown command')
 
2587
 
 
2588
    def test_env_change_passes_through(self):
 
2589
        self.assertRunBzrSubprocess(
 
2590
            {'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}},
 
2591
            StubProcess(), '',
 
2592
            env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None})
 
2593
 
 
2594
    def test_no_working_dir_passed_as_None(self):
 
2595
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2596
 
 
2597
    def test_no_working_dir_passed_through(self):
 
2598
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2599
                                    working_dir='dir')
 
2600
 
 
2601
    def test_run_bzr_subprocess_no_plugins(self):
 
2602
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2603
                                    StubProcess(), '')
 
2604
 
 
2605
    def test_allow_plugins(self):
 
2606
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2607
                                    StubProcess(), '', allow_plugins=True)
 
2608
 
 
2609
 
 
2610
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2611
 
 
2612
    def test_finish_bzr_subprocess_with_error(self):
 
2613
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2614
        """
 
2615
        process = StubProcess(err="unknown command", retcode=3)
 
2616
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2617
        self.assertEqual('', result[0])
 
2618
        self.assertContainsRe(result[1], 'unknown command')
 
2619
 
 
2620
    def test_finish_bzr_subprocess_ignoring_retcode(self):
 
2621
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2622
        process = StubProcess(err="unknown command", retcode=3)
 
2623
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2624
        self.assertEqual('', result[0])
 
2625
        self.assertContainsRe(result[1], 'unknown command')
 
2626
 
 
2627
    def test_finish_subprocess_with_unexpected_retcode(self):
 
2628
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2629
        not the expected one.
 
2630
        """
 
2631
        process = StubProcess(err="unknown command", retcode=3)
 
2632
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2633
                          process)
 
2634
 
 
2635
 
 
2636
class _DontSpawnProcess(Exception):
 
2637
    """A simple exception which just allows us to skip unnecessary steps"""
 
2638
 
 
2639
 
 
2640
class TestStartBzrSubProcess(tests.TestCase):
 
2641
    """Stub test start_bzr_subprocess."""
 
2642
 
 
2643
    def _subprocess_log_cleanup(self):
 
2644
        """Inhibits the base version as we don't produce a log file."""
 
2645
 
 
2646
    def _popen(self, *args, **kwargs):
 
2647
        """Override the base version to record the command that is run.
 
2648
 
 
2649
        From there we can ensure it is correct without spawning a real process.
 
2650
        """
 
2651
        self.check_popen_state()
 
2652
        self._popen_args = args
 
2653
        self._popen_kwargs = kwargs
 
2654
        raise _DontSpawnProcess()
 
2655
 
 
2656
    def check_popen_state(self):
 
2657
        """Replace to make assertions when popen is called."""
 
2658
 
 
2659
    def test_run_bzr_subprocess_no_plugins(self):
 
2660
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2661
        command = self._popen_args[0]
 
2662
        self.assertEqual(sys.executable, command[0])
 
2663
        self.assertEqual(self.get_brz_path(), command[1])
 
2664
        self.assertEqual(['--no-plugins'], command[2:])
 
2665
 
 
2666
    def test_allow_plugins(self):
 
2667
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2668
                          allow_plugins=True)
 
2669
        command = self._popen_args[0]
 
2670
        self.assertEqual([], command[2:])
 
2671
 
 
2672
    def test_set_env(self):
 
2673
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2674
        # set in the child
 
2675
 
 
2676
        def check_environment():
 
2677
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2678
        self.check_popen_state = check_environment
 
2679
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2680
                          env_changes={'EXISTANT_ENV_VAR': 'set variable'})
 
2681
        # not set in theparent
 
2682
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2683
 
 
2684
    def test_run_bzr_subprocess_env_del(self):
 
2685
        """run_bzr_subprocess can remove environment variables too."""
 
2686
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2687
 
 
2688
        def check_environment():
 
2689
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2690
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2691
        self.check_popen_state = check_environment
 
2692
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2693
                          env_changes={'EXISTANT_ENV_VAR': None})
 
2694
        # Still set in parent
 
2695
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2696
        del os.environ['EXISTANT_ENV_VAR']
 
2697
 
 
2698
    def test_env_del_missing(self):
 
2699
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2700
 
 
2701
        def check_environment():
 
2702
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2703
        self.check_popen_state = check_environment
 
2704
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2705
                          env_changes={'NON_EXISTANT_ENV_VAR': None})
 
2706
 
 
2707
    def test_working_dir(self):
 
2708
        """Test that we can specify the working dir for the child"""
 
2709
        chdirs = []
 
2710
 
 
2711
        def chdir(path):
 
2712
            chdirs.append(path)
 
2713
        self.overrideAttr(os, 'chdir', chdir)
 
2714
 
 
2715
        def getcwd():
 
2716
            return 'current'
 
2717
        self.overrideAttr(osutils, 'getcwd', getcwd)
 
2718
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2719
                          working_dir='foo')
 
2720
        self.assertEqual(['foo', 'current'], chdirs)
 
2721
 
 
2722
    def test_get_brz_path_with_cwd_breezy(self):
 
2723
        self.get_source_path = lambda: ""
 
2724
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2725
        self.assertEqual(self.get_brz_path(), "brz")
 
2726
 
 
2727
 
 
2728
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
 
2729
    """Tests that really need to do things with an external bzr."""
 
2730
 
 
2731
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2732
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2733
        not the expected one.
 
2734
        """
 
2735
        self.disable_missing_extensions_warning()
 
2736
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2737
                                            skip_if_plan_to_signal=True)
 
2738
        self.assertEqual(b'running\n', process.stdout.readline())
 
2739
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2740
                                            retcode=3)
 
2741
        self.assertEqual(b'', result[0])
 
2742
        self.assertEqual(b'brz: interrupted\n', result[1])
 
2743
 
 
2744
 
 
2745
class TestSelftestFiltering(tests.TestCase):
 
2746
 
 
2747
    def setUp(self):
 
2748
        super(TestSelftestFiltering, self).setUp()
 
2749
        self.suite = TestUtil.TestSuite()
 
2750
        self.loader = TestUtil.TestLoader()
 
2751
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2752
            sys.modules['breezy.tests.test_selftest']))
 
2753
        self.all_names = _test_ids(self.suite)
 
2754
 
 
2755
    def test_condition_id_re(self):
 
2756
        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2757
                     'test_condition_id_re')
 
2758
        filtered_suite = tests.filter_suite_by_condition(
 
2759
            self.suite, tests.condition_id_re('test_condition_id_re'))
 
2760
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2761
 
 
2762
    def test_condition_id_in_list(self):
 
2763
        test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
 
2764
                      'test_condition_id_in_list']
 
2765
        id_list = tests.TestIdList(test_names)
 
2766
        filtered_suite = tests.filter_suite_by_condition(
 
2767
            self.suite, tests.condition_id_in_list(id_list))
 
2768
        my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
 
2769
        re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
 
2770
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2771
 
 
2772
    def test_condition_id_startswith(self):
 
2773
        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2774
        start1 = klass + 'test_condition_id_starts'
 
2775
        start2 = klass + 'test_condition_id_in'
 
2776
        test_names = [klass + 'test_condition_id_in_list',
 
2777
                      klass + 'test_condition_id_startswith',
 
2778
                      ]
 
2779
        filtered_suite = tests.filter_suite_by_condition(
 
2780
            self.suite, tests.condition_id_startswith([start1, start2]))
 
2781
        self.assertEqual(test_names, _test_ids(filtered_suite))
 
2782
 
 
2783
    def test_condition_isinstance(self):
 
2784
        filtered_suite = tests.filter_suite_by_condition(
 
2785
            self.suite, tests.condition_isinstance(self.__class__))
 
2786
        class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2787
        re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
 
2788
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2789
 
 
2790
    def test_exclude_tests_by_condition(self):
 
2791
        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2792
                         'test_exclude_tests_by_condition')
 
2793
        filtered_suite = tests.exclude_tests_by_condition(
 
2794
            self.suite, lambda x: x.id() == excluded_name)
 
2795
        self.assertEqual(len(self.all_names) - 1,
 
2796
                         filtered_suite.countTestCases())
 
2797
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2798
        remaining_names = list(self.all_names)
 
2799
        remaining_names.remove(excluded_name)
 
2800
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2801
 
 
2802
    def test_exclude_tests_by_re(self):
 
2803
        self.all_names = _test_ids(self.suite)
 
2804
        filtered_suite = tests.exclude_tests_by_re(self.suite,
 
2805
                                                   'exclude_tests_by_re')
 
2806
        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2807
                         'test_exclude_tests_by_re')
 
2808
        self.assertEqual(len(self.all_names) - 1,
 
2809
                         filtered_suite.countTestCases())
 
2810
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2811
        remaining_names = list(self.all_names)
 
2812
        remaining_names.remove(excluded_name)
 
2813
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2814
 
 
2815
    def test_filter_suite_by_condition(self):
 
2816
        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2817
                     'test_filter_suite_by_condition')
 
2818
        filtered_suite = tests.filter_suite_by_condition(
 
2819
            self.suite, lambda x: x.id() == test_name)
 
2820
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2821
 
 
2822
    def test_filter_suite_by_re(self):
 
2823
        filtered_suite = tests.filter_suite_by_re(self.suite,
 
2824
                                                  'test_filter_suite_by_r')
 
2825
        filtered_names = _test_ids(filtered_suite)
 
2826
        self.assertEqual(
 
2827
            filtered_names, ['breezy.tests.test_selftest.'
 
2828
                             'TestSelftestFiltering.test_filter_suite_by_re'])
 
2829
 
 
2830
    def test_filter_suite_by_id_list(self):
 
2831
        test_list = ['breezy.tests.test_selftest.'
 
2832
                     'TestSelftestFiltering.test_filter_suite_by_id_list']
 
2833
        filtered_suite = tests.filter_suite_by_id_list(
 
2834
            self.suite, tests.TestIdList(test_list))
 
2835
        filtered_names = _test_ids(filtered_suite)
 
2836
        self.assertEqual(
 
2837
            filtered_names,
 
2838
            ['breezy.tests.test_selftest.'
 
2839
             'TestSelftestFiltering.test_filter_suite_by_id_list'])
 
2840
 
 
2841
    def test_filter_suite_by_id_startswith(self):
 
2842
        # By design this test may fail if another test is added whose name also
 
2843
        # begins with one of the start value used.
 
2844
        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2845
        start1 = klass + 'test_filter_suite_by_id_starts'
 
2846
        start2 = klass + 'test_filter_suite_by_id_li'
 
2847
        test_list = [klass + 'test_filter_suite_by_id_list',
 
2848
                     klass + 'test_filter_suite_by_id_startswith',
 
2849
                     ]
 
2850
        filtered_suite = tests.filter_suite_by_id_startswith(
 
2851
            self.suite, [start1, start2])
 
2852
        self.assertEqual(
 
2853
            test_list,
 
2854
            _test_ids(filtered_suite),
 
2855
            )
 
2856
 
 
2857
    def test_preserve_input(self):
 
2858
        # NB: Surely this is something in the stdlib to do this?
 
2859
        self.assertIs(self.suite, tests.preserve_input(self.suite))
 
2860
        self.assertEqual("@#$", tests.preserve_input("@#$"))
 
2861
 
 
2862
    def test_randomize_suite(self):
 
2863
        randomized_suite = tests.randomize_suite(self.suite)
 
2864
        # randomizing should not add or remove test names.
 
2865
        self.assertEqual(set(_test_ids(self.suite)),
 
2866
                         set(_test_ids(randomized_suite)))
 
2867
        # Technically, this *can* fail, because random.shuffle(list) can be
 
2868
        # equal to list. Trying multiple times just pushes the frequency back.
 
2869
        # As its len(self.all_names)!:1, the failure frequency should be low
 
2870
        # enough to ignore. RBC 20071021.
 
2871
        # It should change the order.
 
2872
        self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
 
2873
        # But not the length. (Possibly redundant with the set test, but not
 
2874
        # necessarily.)
 
2875
        self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
 
2876
 
 
2877
    def test_split_suit_by_condition(self):
 
2878
        self.all_names = _test_ids(self.suite)
 
2879
        condition = tests.condition_id_re('test_filter_suite_by_r')
 
2880
        split_suite = tests.split_suite_by_condition(self.suite, condition)
 
2881
        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2882
                         'test_filter_suite_by_re')
 
2883
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2884
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2885
        remaining_names = list(self.all_names)
 
2886
        remaining_names.remove(filtered_name)
 
2887
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2888
 
 
2889
    def test_split_suit_by_re(self):
 
2890
        self.all_names = _test_ids(self.suite)
 
2891
        split_suite = tests.split_suite_by_re(self.suite,
 
2892
                                              'test_filter_suite_by_r')
 
2893
        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2894
                         'test_filter_suite_by_re')
 
2895
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2896
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2897
        remaining_names = list(self.all_names)
 
2898
        remaining_names.remove(filtered_name)
 
2899
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2900
 
 
2901
 
 
2902
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2903
 
 
2904
    def test_check_tree_shape(self):
 
2905
        files = ['a', 'b/', 'b/c']
 
2906
        tree = self.make_branch_and_tree('.')
 
2907
        self.build_tree(files)
 
2908
        tree.add(files)
 
2909
        tree.lock_read()
 
2910
        try:
 
2911
            self.check_tree_shape(tree, files)
 
2912
        finally:
 
2913
            tree.unlock()
 
2914
 
 
2915
 
 
2916
class TestBlackboxSupport(tests.TestCase):
 
2917
    """Tests for testsuite blackbox features."""
 
2918
 
 
2919
    def test_run_bzr_failure_not_caught(self):
 
2920
        # When we run bzr in blackbox mode, we want any unexpected errors to
 
2921
        # propagate up to the test suite so that it can show the error in the
 
2922
        # usual way, and we won't get a double traceback.
 
2923
        e = self.assertRaises(
 
2924
            AssertionError,
 
2925
            self.run_bzr, ['assert-fail'])
 
2926
        # make sure we got the real thing, not an error from somewhere else in
 
2927
        # the test framework
 
2928
        self.assertEqual('always fails', str(e))
 
2929
        # check that there's no traceback in the test log
 
2930
        self.assertNotContainsRe(self.get_log(), r'Traceback')
 
2931
 
 
2932
    def test_run_bzr_user_error_caught(self):
 
2933
        # Running bzr in blackbox mode, normal/expected/user errors should be
 
2934
        # caught in the regular way and turned into an error message plus exit
 
2935
        # code.
 
2936
        transport_server = memory.MemoryServer()
 
2937
        transport_server.start_server()
 
2938
        self.addCleanup(transport_server.stop_server)
 
2939
        url = transport_server.get_url()
 
2940
        self.permit_url(url)
 
2941
        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
 
2942
        self.assertEqual(out, '')
 
2943
        self.assertContainsRe(
 
2944
            err, 'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
 
2945
 
 
2946
 
 
2947
class TestTestLoader(tests.TestCase):
 
2948
    """Tests for the test loader."""
 
2949
 
 
2950
    def _get_loader_and_module(self):
 
2951
        """Gets a TestLoader and a module with one test in it."""
 
2952
        loader = TestUtil.TestLoader()
 
2953
        module = {}
 
2954
 
 
2955
        class Stub(tests.TestCase):
 
2956
            def test_foo(self):
 
2957
                pass
 
2958
 
 
2959
        class MyModule(object):
 
2960
            pass
 
2961
        MyModule.a_class = Stub
 
2962
        module = MyModule()
 
2963
        module.__name__ = 'fake_module'
 
2964
        return loader, module
 
2965
 
 
2966
    def test_module_no_load_tests_attribute_loads_classes(self):
 
2967
        loader, module = self._get_loader_and_module()
 
2968
        self.assertEqual(1, loader.loadTestsFromModule(
 
2969
            module).countTestCases())
 
2970
 
 
2971
    def test_module_load_tests_attribute_gets_called(self):
 
2972
        loader, module = self._get_loader_and_module()
 
2973
 
 
2974
        def load_tests(loader, standard_tests, pattern):
 
2975
            result = loader.suiteClass()
 
2976
            for test in tests.iter_suite_tests(standard_tests):
 
2977
                result.addTests([test, test])
 
2978
            return result
 
2979
        # add a load_tests() method which multiplies the tests from the module.
 
2980
        module.__class__.load_tests = staticmethod(load_tests)
 
2981
        self.assertEqual(
 
2982
            2 * [str(module.a_class('test_foo'))],
 
2983
            list(map(str, loader.loadTestsFromModule(module))))
 
2984
 
 
2985
    def test_load_tests_from_module_name_smoke_test(self):
 
2986
        loader = TestUtil.TestLoader()
 
2987
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
2988
        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
 
2989
                         _test_ids(suite))
 
2990
 
 
2991
    def test_load_tests_from_module_name_with_bogus_module_name(self):
 
2992
        loader = TestUtil.TestLoader()
 
2993
        self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
 
2994
 
 
2995
 
 
2996
class TestTestIdList(tests.TestCase):
 
2997
 
 
2998
    def _create_id_list(self, test_list):
 
2999
        return tests.TestIdList(test_list)
 
3000
 
 
3001
    def _create_suite(self, test_id_list):
 
3002
 
 
3003
        class Stub(tests.TestCase):
 
3004
            def test_foo(self):
 
3005
                pass
 
3006
 
 
3007
        def _create_test_id(id):
 
3008
            return lambda: id
 
3009
 
 
3010
        suite = TestUtil.TestSuite()
 
3011
        for id in test_id_list:
 
3012
            t = Stub('test_foo')
 
3013
            t.id = _create_test_id(id)
 
3014
            suite.addTest(t)
 
3015
        return suite
 
3016
 
 
3017
    def _test_ids(self, test_suite):
 
3018
        """Get the ids for the tests in a test suite."""
 
3019
        return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
3020
 
 
3021
    def test_empty_list(self):
 
3022
        id_list = self._create_id_list([])
 
3023
        self.assertEqual({}, id_list.tests)
 
3024
        self.assertEqual({}, id_list.modules)
 
3025
 
 
3026
    def test_valid_list(self):
 
3027
        id_list = self._create_id_list(
 
3028
            ['mod1.cl1.meth1', 'mod1.cl1.meth2',
 
3029
             'mod1.func1', 'mod1.cl2.meth2',
 
3030
             'mod1.submod1',
 
3031
             'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
 
3032
             ])
 
3033
        self.assertTrue(id_list.refers_to('mod1'))
 
3034
        self.assertTrue(id_list.refers_to('mod1.submod1'))
 
3035
        self.assertTrue(id_list.refers_to('mod1.submod2'))
 
3036
        self.assertTrue(id_list.includes('mod1.cl1.meth1'))
 
3037
        self.assertTrue(id_list.includes('mod1.submod1'))
 
3038
        self.assertTrue(id_list.includes('mod1.func1'))
 
3039
 
 
3040
    def test_bad_chars_in_params(self):
 
3041
        id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
 
3042
        self.assertTrue(id_list.refers_to('mod1'))
 
3043
        self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
 
3044
 
 
3045
    def test_module_used(self):
 
3046
        id_list = self._create_id_list(['mod.class.meth'])
 
3047
        self.assertTrue(id_list.refers_to('mod'))
 
3048
        self.assertTrue(id_list.refers_to('mod.class'))
 
3049
        self.assertTrue(id_list.refers_to('mod.class.meth'))
 
3050
 
 
3051
    def test_test_suite_matches_id_list_with_unknown(self):
 
3052
        loader = TestUtil.TestLoader()
 
3053
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3054
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',
 
3055
                     'bogus']
 
3056
        not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
 
3057
        self.assertEqual(['bogus'], not_found)
 
3058
        self.assertEqual([], duplicates)
 
3059
 
 
3060
    def test_suite_matches_id_list_with_duplicates(self):
 
3061
        loader = TestUtil.TestLoader()
 
3062
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3063
        dupes = loader.suiteClass()
 
3064
        for test in tests.iter_suite_tests(suite):
 
3065
            dupes.addTest(test)
 
3066
            dupes.addTest(test)  # Add it again
 
3067
 
 
3068
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing', ]
 
3069
        not_found, duplicates = tests.suite_matches_id_list(
 
3070
            dupes, test_list)
 
3071
        self.assertEqual([], not_found)
 
3072
        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
 
3073
                         duplicates)
 
3074
 
 
3075
 
 
3076
class TestTestSuite(tests.TestCase):
 
3077
 
 
3078
    def test__test_suite_testmod_names(self):
 
3079
        # Test that a plausible list of test module names are returned
 
3080
        # by _test_suite_testmod_names.
 
3081
        test_list = tests._test_suite_testmod_names()
 
3082
        self.assertSubset([
 
3083
            'breezy.tests.blackbox',
 
3084
            'breezy.tests.per_transport',
 
3085
            'breezy.tests.test_selftest',
 
3086
            ],
 
3087
            test_list)
 
3088
 
 
3089
    def test__test_suite_modules_to_doctest(self):
 
3090
        # Test that a plausible list of modules to doctest is returned
 
3091
        # by _test_suite_modules_to_doctest.
 
3092
        test_list = tests._test_suite_modules_to_doctest()
 
3093
        if __doc__ is None:
 
3094
            # When docstrings are stripped, there are no modules to doctest
 
3095
            self.assertEqual([], test_list)
 
3096
            return
 
3097
        self.assertSubset([
 
3098
            'breezy.timestamp',
 
3099
            ],
 
3100
            test_list)
 
3101
 
 
3102
    def test_test_suite(self):
 
3103
        # test_suite() loads the entire test suite to operate. To avoid this
 
3104
        # overhead, and yet still be confident that things are happening,
 
3105
        # we temporarily replace two functions used by test_suite with
 
3106
        # test doubles that supply a few sample tests to load, and check they
 
3107
        # are loaded.
 
3108
        calls = []
 
3109
 
 
3110
        def testmod_names():
 
3111
            calls.append("testmod_names")
 
3112
            return [
 
3113
                'breezy.tests.blackbox.test_branch',
 
3114
                'breezy.tests.per_transport',
 
3115
                'breezy.tests.test_selftest',
 
3116
                ]
 
3117
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
 
3118
 
 
3119
        def doctests():
 
3120
            calls.append("modules_to_doctest")
 
3121
            if __doc__ is None:
 
3122
                return []
 
3123
            return ['breezy.timestamp']
 
3124
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
 
3125
        expected_test_list = [
 
3126
            # testmod_names
 
3127
            'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
 
3128
            ('breezy.tests.per_transport.TransportTests'
 
3129
             '.test_abspath(LocalTransport,LocalURLServer)'),
 
3130
            'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
 
3131
            # plugins can't be tested that way since selftest may be run with
 
3132
            # --no-plugins
 
3133
            ]
 
3134
        suite = tests.test_suite()
 
3135
        self.assertEqual({"testmod_names", "modules_to_doctest"}, set(calls))
 
3136
        self.assertSubset(expected_test_list, _test_ids(suite))
 
3137
 
 
3138
    def test_test_suite_list_and_start(self):
 
3139
        # We cannot test this at the same time as the main load, because we
 
3140
        # want to know that starting_with == None works. So a second load is
 
3141
        # incurred - note that the starting_with parameter causes a partial
 
3142
        # load rather than a full load so this test should be pretty quick.
 
3143
        test_list = [
 
3144
            'breezy.tests.test_selftest.TestTestSuite.test_test_suite']
 
3145
        suite = tests.test_suite(test_list,
 
3146
                                 ['breezy.tests.test_selftest.TestTestSuite'])
 
3147
        # test_test_suite_list_and_start is not included
 
3148
        self.assertEqual(test_list, _test_ids(suite))
 
3149
 
 
3150
 
 
3151
class TestLoadTestIdList(tests.TestCaseInTempDir):
 
3152
 
 
3153
    def _create_test_list_file(self, file_name, content):
 
3154
        fl = open(file_name, 'wt')
 
3155
        fl.write(content)
 
3156
        fl.close()
 
3157
 
 
3158
    def test_load_unknown(self):
 
3159
        self.assertRaises(errors.NoSuchFile,
 
3160
                          tests.load_test_id_list, 'i_do_not_exist')
 
3161
 
 
3162
    def test_load_test_list(self):
 
3163
        test_list_fname = 'test.list'
 
3164
        self._create_test_list_file(test_list_fname,
 
3165
                                    'mod1.cl1.meth1\nmod2.cl2.meth2\n')
 
3166
        tlist = tests.load_test_id_list(test_list_fname)
 
3167
        self.assertEqual(2, len(tlist))
 
3168
        self.assertEqual('mod1.cl1.meth1', tlist[0])
 
3169
        self.assertEqual('mod2.cl2.meth2', tlist[1])
 
3170
 
 
3171
    def test_load_dirty_file(self):
 
3172
        test_list_fname = 'test.list'
 
3173
        self._create_test_list_file(test_list_fname,
 
3174
                                    '  mod1.cl1.meth1\n\nmod2.cl2.meth2  \n'
 
3175
                                    'bar baz\n')
 
3176
        tlist = tests.load_test_id_list(test_list_fname)
 
3177
        self.assertEqual(4, len(tlist))
 
3178
        self.assertEqual('mod1.cl1.meth1', tlist[0])
 
3179
        self.assertEqual('', tlist[1])
 
3180
        self.assertEqual('mod2.cl2.meth2', tlist[2])
 
3181
        self.assertEqual('bar baz', tlist[3])
 
3182
 
 
3183
 
 
3184
class TestFilteredByModuleTestLoader(tests.TestCase):
 
3185
 
 
3186
    def _create_loader(self, test_list):
 
3187
        id_filter = tests.TestIdList(test_list)
 
3188
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3189
        return loader
 
3190
 
 
3191
    def test_load_tests(self):
 
3192
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3193
        loader = self._create_loader(test_list)
 
3194
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3195
        self.assertEqual(test_list, _test_ids(suite))
 
3196
 
 
3197
    def test_exclude_tests(self):
 
3198
        test_list = ['bogus']
 
3199
        loader = self._create_loader(test_list)
 
3200
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3201
        self.assertEqual([], _test_ids(suite))
 
3202
 
 
3203
 
 
3204
class TestFilteredByNameStartTestLoader(tests.TestCase):
 
3205
 
 
3206
    def _create_loader(self, name_start):
 
3207
        def needs_module(name):
 
3208
            return name.startswith(name_start) or name_start.startswith(name)
 
3209
        loader = TestUtil.FilteredByModuleTestLoader(needs_module)
 
3210
        return loader
 
3211
 
 
3212
    def test_load_tests(self):
 
3213
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3214
        loader = self._create_loader('breezy.tests.test_samp')
 
3215
 
 
3216
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3217
        self.assertEqual(test_list, _test_ids(suite))
 
3218
 
 
3219
    def test_load_tests_inside_module(self):
 
3220
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3221
        loader = self._create_loader('breezy.tests.test_sampler.Demo')
 
3222
 
 
3223
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3224
        self.assertEqual(test_list, _test_ids(suite))
 
3225
 
 
3226
    def test_exclude_tests(self):
 
3227
        loader = self._create_loader('bogus')
 
3228
 
 
3229
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3230
        self.assertEqual([], _test_ids(suite))
 
3231
 
 
3232
 
 
3233
class TestTestPrefixRegistry(tests.TestCase):
 
3234
 
 
3235
    def _get_registry(self):
 
3236
        tp_registry = tests.TestPrefixAliasRegistry()
 
3237
        return tp_registry
 
3238
 
 
3239
    def test_register_new_prefix(self):
 
3240
        tpr = self._get_registry()
 
3241
        tpr.register('foo', 'fff.ooo.ooo')
 
3242
        self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
 
3243
 
 
3244
    def test_register_existing_prefix(self):
 
3245
        tpr = self._get_registry()
 
3246
        tpr.register('bar', 'bbb.aaa.rrr')
 
3247
        tpr.register('bar', 'bBB.aAA.rRR')
 
3248
        self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
 
3249
        self.assertThat(self.get_log(),
 
3250
                        DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3251
                                       doctest.ELLIPSIS))
 
3252
 
 
3253
    def test_get_unknown_prefix(self):
 
3254
        tpr = self._get_registry()
 
3255
        self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
 
3256
 
 
3257
    def test_resolve_prefix(self):
 
3258
        tpr = self._get_registry()
 
3259
        tpr.register('bar', 'bb.aa.rr')
 
3260
        self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
 
3261
 
 
3262
    def test_resolve_unknown_alias(self):
 
3263
        tpr = self._get_registry()
 
3264
        self.assertRaises(errors.CommandError,
 
3265
                          tpr.resolve_alias, 'I am not a prefix')
 
3266
 
 
3267
    def test_predefined_prefixes(self):
 
3268
        tpr = tests.test_prefix_alias_registry
 
3269
        self.assertEqual('breezy', tpr.resolve_alias('breezy'))
 
3270
        self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
 
3271
        self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
 
3272
        self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
 
3273
        self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
 
3274
        self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
 
3275
 
 
3276
 
 
3277
class TestThreadLeakDetection(tests.TestCase):
 
3278
    """Ensure when tests leak threads we detect and report it"""
 
3279
 
 
3280
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3281
        def __init__(self):
 
3282
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3283
            self.leaks = []
 
3284
 
 
3285
        def _report_thread_leak(self, test, leaks, alive):
 
3286
            self.leaks.append((test, leaks))
 
3287
 
 
3288
    def test_testcase_without_addCleanups(self):
 
3289
        """Check old TestCase instances don't break with leak detection"""
 
3290
        class Test(unittest.TestCase):
 
3291
            def runTest(self):
 
3292
                pass
 
3293
        result = self.LeakRecordingResult()
 
3294
        test = Test()
 
3295
        result.startTestRun()
 
3296
        test.run(result)
 
3297
        result.stopTestRun()
 
3298
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3299
        self.assertEqual(result.leaks, [])
 
3300
 
 
3301
    def test_thread_leak(self):
 
3302
        """Ensure a thread that outlives the running of a test is reported
 
3303
 
 
3304
        Uses a thread that blocks on an event, and is started by the inner
 
3305
        test case. As the thread outlives the inner case's run, it should be
 
3306
        detected as a leak, but the event is then set so that the thread can
 
3307
        be safely joined in cleanup so it's not leaked for real.
 
3308
        """
 
3309
        event = threading.Event()
 
3310
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3311
 
 
3312
        class Test(tests.TestCase):
 
3313
            def test_leak(self):
 
3314
                thread.start()
 
3315
        result = self.LeakRecordingResult()
 
3316
        test = Test("test_leak")
 
3317
        self.addCleanup(thread.join)
 
3318
        self.addCleanup(event.set)
 
3319
        result.startTestRun()
 
3320
        test.run(result)
 
3321
        result.stopTestRun()
 
3322
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3323
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3324
        self.assertEqual(result.leaks, [(test, {thread})])
 
3325
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3326
 
 
3327
    def test_multiple_leaks(self):
 
3328
        """Check multiple leaks are blamed on the test cases at fault
 
3329
 
 
3330
        Same concept as the previous test, but has one inner test method that
 
3331
        leaks two threads, and one that doesn't leak at all.
 
3332
        """
 
3333
        event = threading.Event()
 
3334
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3335
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3336
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3337
 
 
3338
        class Test(tests.TestCase):
 
3339
            def test_first_leak(self):
 
3340
                thread_b.start()
 
3341
 
 
3342
            def test_second_no_leak(self):
 
3343
                pass
 
3344
 
 
3345
            def test_third_leak(self):
 
3346
                thread_c.start()
 
3347
                thread_a.start()
 
3348
        result = self.LeakRecordingResult()
 
3349
        first_test = Test("test_first_leak")
 
3350
        third_test = Test("test_third_leak")
 
3351
        self.addCleanup(thread_a.join)
 
3352
        self.addCleanup(thread_b.join)
 
3353
        self.addCleanup(thread_c.join)
 
3354
        self.addCleanup(event.set)
 
3355
        result.startTestRun()
 
3356
        unittest.TestSuite(
 
3357
            [first_test, Test("test_second_no_leak"), third_test]
 
3358
            ).run(result)
 
3359
        result.stopTestRun()
 
3360
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3361
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3362
        self.assertEqual(result.leaks, [
 
3363
            (first_test, {thread_b}),
 
3364
            (third_test, {thread_a, thread_c})])
 
3365
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3366
 
 
3367
 
 
3368
class TestPostMortemDebugging(tests.TestCase):
 
3369
    """Check post mortem debugging works when tests fail or error"""
 
3370
 
 
3371
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3372
        def __init__(self):
 
3373
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3374
            self.postcode = None
 
3375
 
 
3376
        def _post_mortem(self, tb=None):
 
3377
            """Record the code object at the end of the current traceback"""
 
3378
            tb = tb or sys.exc_info()[2]
 
3379
            if tb is not None:
 
3380
                next = tb.tb_next
 
3381
                while next is not None:
 
3382
                    tb = next
 
3383
                    next = next.tb_next
 
3384
                self.postcode = tb.tb_frame.f_code
 
3385
 
 
3386
        def report_error(self, test, err):
 
3387
            pass
 
3388
 
 
3389
        def report_failure(self, test, err):
 
3390
            pass
 
3391
 
 
3392
    def test_location_unittest_error(self):
 
3393
        """Needs right post mortem traceback with erroring unittest case"""
 
3394
        class Test(unittest.TestCase):
 
3395
            def runTest(self):
 
3396
                raise RuntimeError
 
3397
        result = self.TracebackRecordingResult()
 
3398
        Test().run(result)
 
3399
        self.assertEqual(result.postcode, Test.runTest.__code__)
 
3400
 
 
3401
    def test_location_unittest_failure(self):
 
3402
        """Needs right post mortem traceback with failing unittest case"""
 
3403
        class Test(unittest.TestCase):
 
3404
            def runTest(self):
 
3405
                raise self.failureException
 
3406
        result = self.TracebackRecordingResult()
 
3407
        Test().run(result)
 
3408
        self.assertEqual(result.postcode, Test.runTest.__code__)
 
3409
 
 
3410
    def test_location_bt_error(self):
 
3411
        """Needs right post mortem traceback with erroring breezy.tests case"""
 
3412
        class Test(tests.TestCase):
 
3413
            def test_error(self):
 
3414
                raise RuntimeError
 
3415
        result = self.TracebackRecordingResult()
 
3416
        Test("test_error").run(result)
 
3417
        self.assertEqual(result.postcode, Test.test_error.__code__)
 
3418
 
 
3419
    def test_location_bt_failure(self):
 
3420
        """Needs right post mortem traceback with failing breezy.tests case"""
 
3421
        class Test(tests.TestCase):
 
3422
            def test_failure(self):
 
3423
                raise self.failureException
 
3424
        result = self.TracebackRecordingResult()
 
3425
        Test("test_failure").run(result)
 
3426
        self.assertEqual(result.postcode, Test.test_failure.__code__)
 
3427
 
 
3428
    def test_env_var_triggers_post_mortem(self):
 
3429
        """Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
 
3430
        import pdb
 
3431
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3432
        post_mortem_calls = []
 
3433
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3434
        self.overrideEnv('BRZ_TEST_PDB', None)
 
3435
        result._post_mortem(1)
 
3436
        self.overrideEnv('BRZ_TEST_PDB', 'on')
 
3437
        result._post_mortem(2)
 
3438
        self.assertEqual([2], post_mortem_calls)
 
3439
 
 
3440
 
 
3441
class TestRunSuite(tests.TestCase):
 
3442
 
 
3443
    def test_runner_class(self):
 
3444
        """run_suite accepts and uses a runner_class keyword argument."""
 
3445
        class Stub(tests.TestCase):
 
3446
            def test_foo(self):
 
3447
                pass
 
3448
        suite = Stub("test_foo")
 
3449
        calls = []
 
3450
 
 
3451
        class MyRunner(tests.TextTestRunner):
 
3452
            def run(self, test):
 
3453
                calls.append(test)
 
3454
                return tests.ExtendedTestResult(self.stream, self.descriptions,
 
3455
                                                self.verbosity)
 
3456
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
 
3457
        self.assertLength(1, calls)
 
3458
 
 
3459
 
 
3460
class _Selftest(object):
 
3461
    """Mixin for tests needing full selftest output"""
 
3462
 
 
3463
    def _inject_stream_into_subunit(self, stream):
 
3464
        """To be overridden by subclasses that run tests out of process"""
 
3465
 
 
3466
    def _run_selftest(self, **kwargs):
 
3467
        bio = BytesIO()
 
3468
        sio = TextIOWrapper(bio, 'utf-8')
 
3469
        self._inject_stream_into_subunit(bio)
 
3470
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3471
        sio.flush()
 
3472
        return bio.getvalue()
 
3473
 
 
3474
 
 
3475
class _ForkedSelftest(_Selftest):
 
3476
    """Mixin for tests needing full selftest output with forked children"""
 
3477
 
 
3478
    _test_needs_features = [features.subunit]
 
3479
 
 
3480
    def _inject_stream_into_subunit(self, stream):
 
3481
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3482
 
 
3483
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3484
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3485
        """
 
3486
        from subunit import ProtocolTestCase
 
3487
        _original_init = ProtocolTestCase.__init__
 
3488
 
 
3489
        def _init_with_passthrough(self, *args, **kwargs):
 
3490
            _original_init(self, *args, **kwargs)
 
3491
            self._passthrough = stream
 
3492
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3493
 
 
3494
    def _run_selftest(self, **kwargs):
 
3495
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3496
        if getattr(os, "fork", None) is None:
 
3497
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3498
        # Make sure the fork code is actually invoked by claiming two cores
 
3499
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3500
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3501
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3502
 
 
3503
 
 
3504
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3505
    """Check operation of --parallel=fork selftest option"""
 
3506
 
 
3507
    def test_error_in_child_during_fork(self):
 
3508
        """Error in a forked child during test setup should get reported"""
 
3509
        class Test(tests.TestCase):
 
3510
            def testMethod(self):
 
3511
                pass
 
3512
        # We don't care what, just break something that a child will run
 
3513
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3514
        out = self._run_selftest(test_suite_factory=Test)
 
3515
        # Lines from the tracebacks of the two child processes may be mixed
 
3516
        # together due to the way subunit parses and forwards the streams,
 
3517
        # so permit extra lines between each part of the error output.
 
3518
        self.assertContainsRe(out,
 
3519
                              b"Traceback.*:\n"
 
3520
                              b"(?:.*\n)*"
 
3521
                              b".+ in fork_for_tests\n"
 
3522
                              b"(?:.*\n)*"
 
3523
                              b"\\s*workaround_zealous_crypto_random\\(\\)\n"
 
3524
                              b"(?:.*\n)*"
 
3525
                              b"TypeError:")
 
3526
 
 
3527
 
 
3528
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3529
    """Check a test case still alive after being run emits a warning"""
 
3530
 
 
3531
    class Test(tests.TestCase):
 
3532
        def test_pass(self):
 
3533
            pass
 
3534
 
 
3535
        def test_self_ref(self):
 
3536
            self.also_self = self.test_self_ref
 
3537
 
 
3538
        def test_skip(self):
 
3539
            self.skipTest("Don't need")
 
3540
 
 
3541
    def _get_suite(self):
 
3542
        return TestUtil.TestSuite([
 
3543
            self.Test("test_pass"),
 
3544
            self.Test("test_self_ref"),
 
3545
            self.Test("test_skip"),
 
3546
            ])
 
3547
 
 
3548
    def _run_selftest_with_suite(self, **kwargs):
 
3549
        old_flags = tests.selftest_debug_flags
 
3550
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3551
        gc_on = gc.isenabled()
 
3552
        if gc_on:
 
3553
            gc.disable()
 
3554
        try:
 
3555
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3556
                                        **kwargs)
 
3557
        finally:
 
3558
            if gc_on:
 
3559
                gc.enable()
 
3560
            tests.selftest_debug_flags = old_flags
 
3561
        self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
 
3562
        self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
 
3563
        return output
 
3564
 
 
3565
    def test_testsuite(self):
 
3566
        self._run_selftest_with_suite()
 
3567
 
 
3568
    def test_pattern(self):
 
3569
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3570
        self.assertNotContainsRe(out, b"test_skip")
 
3571
 
 
3572
    def test_exclude_pattern(self):
 
3573
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3574
        self.assertNotContainsRe(out, b"test_skip")
 
3575
 
 
3576
    def test_random_seed(self):
 
3577
        self._run_selftest_with_suite(random_seed="now")
 
3578
 
 
3579
    def test_matching_tests_first(self):
 
3580
        self._run_selftest_with_suite(matching_tests_first=True,
 
3581
                                      pattern="test_self_ref$")
 
3582
 
 
3583
    def test_starting_with_and_exclude(self):
 
3584
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3585
                                            exclude_pattern="test_skip$")
 
3586
        self.assertNotContainsRe(out, b"test_skip")
 
3587
 
 
3588
    def test_additonal_decorator(self):
 
3589
        self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator])
 
3590
 
 
3591
 
 
3592
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3593
    """Check warnings from tests staying alive are emitted with subunit"""
 
3594
 
 
3595
    _test_needs_features = [features.subunit]
 
3596
 
 
3597
    def _run_selftest_with_suite(self, **kwargs):
 
3598
        return TestUncollectedWarnings._run_selftest_with_suite(
 
3599
            self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
 
3600
 
 
3601
 
 
3602
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3603
    """Check warnings from tests staying alive are emitted when forking"""
 
3604
 
 
3605
 
 
3606
class TestEnvironHandling(tests.TestCase):
 
3607
 
 
3608
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3609
        self.assertFalse('MYVAR' in os.environ)
 
3610
        self.overrideEnv('MYVAR', '42')
 
3611
        # We use an embedded test to make sure we fix the _captureVar bug
 
3612
 
 
3613
        class Test(tests.TestCase):
 
3614
            def test_me(self):
 
3615
                # The first call save the 42 value
 
3616
                self.overrideEnv('MYVAR', None)
 
3617
                self.assertEqual(None, os.environ.get('MYVAR'))
 
3618
                # Make sure we can call it twice
 
3619
                self.overrideEnv('MYVAR', None)
 
3620
                self.assertEqual(None, os.environ.get('MYVAR'))
 
3621
        output = StringIO()
 
3622
        result = tests.TextTestResult(output, 0, 1)
 
3623
        Test('test_me').run(result)
 
3624
        if not result.wasStrictlySuccessful():
 
3625
            self.fail(output.getvalue())
 
3626
        # We get our value back
 
3627
        self.assertEqual('42', os.environ.get('MYVAR'))
 
3628
 
 
3629
 
 
3630
class TestIsolatedEnv(tests.TestCase):
 
3631
    """Test isolating tests from os.environ.
 
3632
 
 
3633
    Since we use tests that are already isolated from os.environ a bit of care
 
3634
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3635
    The tests start an already clean os.environ which allow doing valid
 
3636
    assertions about which variables are present or not and design tests around
 
3637
    these assertions.
 
3638
    """
 
3639
 
 
3640
    class ScratchMonkey(tests.TestCase):
 
3641
 
 
3642
        def test_me(self):
 
3643
            pass
 
3644
 
 
3645
    def test_basics(self):
 
3646
        # Make sure we know the definition of BRZ_HOME: not part of os.environ
 
3647
        # for tests.TestCase.
 
3648
        self.assertTrue('BRZ_HOME' in tests.isolated_environ)
 
3649
        self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
 
3650
        # Being part of isolated_environ, BRZ_HOME should not appear here
 
3651
        self.assertFalse('BRZ_HOME' in os.environ)
 
3652
        # Make sure we know the definition of LINES: part of os.environ for
 
3653
        # tests.TestCase
 
3654
        self.assertTrue('LINES' in tests.isolated_environ)
 
3655
        self.assertEqual('25', tests.isolated_environ['LINES'])
 
3656
        self.assertEqual('25', os.environ['LINES'])
 
3657
 
 
3658
    def test_injecting_unknown_variable(self):
 
3659
        # BRZ_HOME is known to be absent from os.environ
 
3660
        test = self.ScratchMonkey('test_me')
 
3661
        tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
 
3662
        self.assertEqual('foo', os.environ['BRZ_HOME'])
 
3663
        tests.restore_os_environ(test)
 
3664
        self.assertFalse('BRZ_HOME' in os.environ)
 
3665
 
 
3666
    def test_injecting_known_variable(self):
 
3667
        test = self.ScratchMonkey('test_me')
 
3668
        # LINES is known to be present in os.environ
 
3669
        tests.override_os_environ(test, {'LINES': '42'})
 
3670
        self.assertEqual('42', os.environ['LINES'])
 
3671
        tests.restore_os_environ(test)
 
3672
        self.assertEqual('25', os.environ['LINES'])
 
3673
 
 
3674
    def test_deleting_variable(self):
 
3675
        test = self.ScratchMonkey('test_me')
 
3676
        # LINES is known to be present in os.environ
 
3677
        tests.override_os_environ(test, {'LINES': None})
 
3678
        self.assertTrue('LINES' not in os.environ)
 
3679
        tests.restore_os_environ(test)
 
3680
        self.assertEqual('25', os.environ['LINES'])
 
3681
 
 
3682
 
 
3683
class TestDocTestSuiteIsolation(tests.TestCase):
 
3684
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3685
 
 
3686
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3687
    the clean environment as a base for testing. To precisely capture the
 
3688
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3689
    compare against.
 
3690
 
 
3691
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3692
    not `os.environ` so each test overrides it to suit its needs.
 
3693
 
 
3694
    """
 
3695
 
 
3696
    def get_doctest_suite_for_string(self, klass, string):
 
3697
        class Finder(doctest.DocTestFinder):
 
3698
 
 
3699
            def find(*args, **kwargs):
 
3700
                test = doctest.DocTestParser().get_doctest(
 
3701
                    string, {}, 'foo', 'foo.py', 0)
 
3702
                return [test]
 
3703
 
 
3704
        suite = klass(test_finder=Finder())
 
3705
        return suite
 
3706
 
 
3707
    def run_doctest_suite_for_string(self, klass, string):
 
3708
        suite = self.get_doctest_suite_for_string(klass, string)
 
3709
        output = StringIO()
 
3710
        result = tests.TextTestResult(output, 0, 1)
 
3711
        suite.run(result)
 
3712
        return result, output
 
3713
 
 
3714
    def assertDocTestStringSucceds(self, klass, string):
 
3715
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3716
        if not result.wasStrictlySuccessful():
 
3717
            self.fail(output.getvalue())
 
3718
 
 
3719
    def assertDocTestStringFails(self, klass, string):
 
3720
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3721
        if result.wasStrictlySuccessful():
 
3722
            self.fail(output.getvalue())
 
3723
 
 
3724
    def test_injected_variable(self):
 
3725
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3726
        test = """
 
3727
            >>> import os
 
3728
            >>> os.environ['LINES']
 
3729
            '42'
 
3730
            """
 
3731
        # doctest.DocTestSuite fails as it sees '25'
 
3732
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3733
        # tests.DocTestSuite sees '42'
 
3734
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3735
 
 
3736
    def test_deleted_variable(self):
 
3737
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3738
        test = """
 
3739
            >>> import os
 
3740
            >>> os.environ.get('LINES')
 
3741
            """
 
3742
        # doctest.DocTestSuite fails as it sees '25'
 
3743
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3744
        # tests.DocTestSuite sees None
 
3745
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3746
 
 
3747
 
 
3748
class TestSelftestExcludePatterns(tests.TestCase):
 
3749
 
 
3750
    def setUp(self):
 
3751
        super(TestSelftestExcludePatterns, self).setUp()
 
3752
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3753
 
 
3754
    def suite_factory(self, keep_only=None, starting_with=None):
 
3755
        """A test suite factory with only a few tests."""
 
3756
        class Test(tests.TestCase):
 
3757
            def id(self):
 
3758
                # We don't need the full class path
 
3759
                return self._testMethodName
 
3760
 
 
3761
            def a(self):
 
3762
                pass
 
3763
 
 
3764
            def b(self):
 
3765
                pass
 
3766
 
 
3767
            def c(self):
 
3768
                pass
 
3769
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3770
 
 
3771
    def assertTestList(self, expected, *selftest_args):
 
3772
        # We rely on setUp installing the right test suite factory so we can
 
3773
        # test at the command level without loading the whole test suite
 
3774
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3775
        actual = out.splitlines()
 
3776
        self.assertEqual(expected, actual)
 
3777
 
 
3778
    def test_full_list(self):
 
3779
        self.assertTestList(['a', 'b', 'c'])
 
3780
 
 
3781
    def test_single_exclude(self):
 
3782
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3783
 
 
3784
    def test_mutiple_excludes(self):
 
3785
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
 
3786
 
 
3787
 
 
3788
class TestCounterHooks(tests.TestCase, SelfTestHelper):
 
3789
 
 
3790
    _test_needs_features = [features.subunit]
 
3791
 
 
3792
    def setUp(self):
 
3793
        super(TestCounterHooks, self).setUp()
 
3794
 
 
3795
        class Test(tests.TestCase):
 
3796
 
 
3797
            def setUp(self):
 
3798
                super(Test, self).setUp()
 
3799
                self.hooks = hooks.Hooks()
 
3800
                self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
 
3801
                self.install_counter_hook(self.hooks, 'myhook')
 
3802
 
 
3803
            def no_hook(self):
 
3804
                pass
 
3805
 
 
3806
            def run_hook_once(self):
 
3807
                for hook in self.hooks['myhook']:
 
3808
                    hook(self)
 
3809
 
 
3810
        self.test_class = Test
 
3811
 
 
3812
    def assertHookCalls(self, expected_calls, test_name):
 
3813
        test = self.test_class(test_name)
 
3814
        result = unittest.TestResult()
 
3815
        test.run(result)
 
3816
        self.assertTrue(hasattr(test, '_counters'))
 
3817
        self.assertTrue('myhook' in test._counters)
 
3818
        self.assertEqual(expected_calls, test._counters['myhook'])
 
3819
 
 
3820
    def test_no_hook(self):
 
3821
        self.assertHookCalls(0, 'no_hook')
 
3822
 
 
3823
    def test_run_hook_once(self):
 
3824
        tt = features.testtools
 
3825
        if tt.module.__version__ < (0, 9, 8):
 
3826
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
 
3827
        self.assertHookCalls(1, 'run_hook_once')