/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_selftest.py

  • Committer: Martin
  • Date: 2017-06-18 10:15:11 UTC
  • mto: This revision was merged to the branch mainline in revision 6715.
  • Revision ID: gzlist@googlemail.com-20170618101511-fri1mouxt1hc09r8
Make _simple_set tests pass on py3 and with random hash

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2013, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the test framework."""
 
18
 
 
19
import gc
 
20
import doctest
 
21
import os
 
22
import signal
 
23
import sys
 
24
import threading
 
25
import time
 
26
import unittest
 
27
import warnings
 
28
 
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
 
34
from testtools.content_type import ContentType
 
35
from testtools.matchers import (
 
36
    DocTestMatches,
 
37
    Equals,
 
38
    )
 
39
import testtools.testresult.doubles
 
40
 
 
41
import breezy
 
42
from .. import (
 
43
    branchbuilder,
 
44
    controldir,
 
45
    errors,
 
46
    hooks,
 
47
    lockdir,
 
48
    memorytree,
 
49
    osutils,
 
50
    repository,
 
51
    symbol_versioning,
 
52
    tests,
 
53
    transport,
 
54
    workingtree,
 
55
    )
 
56
from ..bzr import (
 
57
    bzrdir,
 
58
    remote,
 
59
    workingtree_3,
 
60
    workingtree_4,
 
61
    )
 
62
from ..bzr import (
 
63
    groupcompress_repo,
 
64
    )
 
65
from ..sixish import (
 
66
    StringIO,
 
67
    text_type,
 
68
    )
 
69
from ..symbol_versioning import (
 
70
    deprecated_function,
 
71
    deprecated_in,
 
72
    deprecated_method,
 
73
    )
 
74
from . import (
 
75
    features,
 
76
    test_lsprof,
 
77
    test_server,
 
78
    TestUtil,
 
79
    )
 
80
from ..trace import note, mutter
 
81
from ..transport import memory
 
82
 
 
83
 
 
84
def _test_ids(test_suite):
 
85
    """Get the ids for the tests in a test suite."""
 
86
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
87
 
 
88
 
 
89
class MetaTestLog(tests.TestCase):
 
90
 
 
91
    def test_logging(self):
 
92
        """Test logs are captured when a test fails."""
 
93
        self.log('a test message')
 
94
        details = self.getDetails()
 
95
        log = details['log']
 
96
        self.assertThat(log.content_type, Equals(ContentType(
 
97
            "text", "plain", {"charset": "utf8"})))
 
98
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
 
99
        self.assertThat(self.get_log(),
 
100
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
101
 
 
102
 
 
103
class TestTreeShape(tests.TestCaseInTempDir):
 
104
 
 
105
    def test_unicode_paths(self):
 
106
        self.requireFeature(features.UnicodeFilenameFeature)
 
107
 
 
108
        filename = u'hell\u00d8'
 
109
        self.build_tree_contents([(filename, 'contents of hello')])
 
110
        self.assertPathExists(filename)
 
111
 
 
112
 
 
113
class TestClassesAvailable(tests.TestCase):
 
114
    """As a convenience we expose Test* classes from breezy.tests"""
 
115
 
 
116
    def test_test_case(self):
 
117
        from . import TestCase
 
118
 
 
119
    def test_test_loader(self):
 
120
        from . import TestLoader
 
121
 
 
122
    def test_test_suite(self):
 
123
        from . import TestSuite
 
124
 
 
125
 
 
126
class TestTransportScenarios(tests.TestCase):
 
127
    """A group of tests that test the transport implementation adaption core.
 
128
 
 
129
    This is a meta test that the tests are applied to all available
 
130
    transports.
 
131
 
 
132
    This will be generalised in the future which is why it is in this
 
133
    test file even though it is specific to transport tests at the moment.
 
134
    """
 
135
 
 
136
    def test_get_transport_permutations(self):
 
137
        # this checks that get_test_permutations defined by the module is
 
138
        # called by the get_transport_test_permutations function.
 
139
        class MockModule(object):
 
140
            def get_test_permutations(self):
 
141
                return sample_permutation
 
142
        sample_permutation = [(1,2), (3,4)]
 
143
        from .per_transport import get_transport_test_permutations
 
144
        self.assertEqual(sample_permutation,
 
145
                         get_transport_test_permutations(MockModule()))
 
146
 
 
147
    def test_scenarios_include_all_modules(self):
 
148
        # this checks that the scenario generator returns as many permutations
 
149
        # as there are in all the registered transport modules - we assume if
 
150
        # this matches its probably doing the right thing especially in
 
151
        # combination with the tests for setting the right classes below.
 
152
        from .per_transport import transport_test_permutations
 
153
        from ..transport import _get_transport_modules
 
154
        modules = _get_transport_modules()
 
155
        permutation_count = 0
 
156
        for module in modules:
 
157
            try:
 
158
                permutation_count += len(reduce(getattr,
 
159
                    (module + ".get_test_permutations").split('.')[1:],
 
160
                     __import__(module))())
 
161
            except errors.DependencyNotPresent:
 
162
                pass
 
163
        scenarios = transport_test_permutations()
 
164
        self.assertEqual(permutation_count, len(scenarios))
 
165
 
 
166
    def test_scenarios_include_transport_class(self):
 
167
        # This test used to know about all the possible transports and the
 
168
        # order they were returned but that seems overly brittle (mbp
 
169
        # 20060307)
 
170
        from .per_transport import transport_test_permutations
 
171
        scenarios = transport_test_permutations()
 
172
        # there are at least that many builtin transports
 
173
        self.assertTrue(len(scenarios) > 6)
 
174
        one_scenario = scenarios[0]
 
175
        self.assertIsInstance(one_scenario[0], str)
 
176
        self.assertTrue(issubclass(one_scenario[1]["transport_class"],
 
177
                                   breezy.transport.Transport))
 
178
        self.assertTrue(issubclass(one_scenario[1]["transport_server"],
 
179
                                   breezy.transport.Server))
 
180
 
 
181
 
 
182
class TestBranchScenarios(tests.TestCase):
 
183
 
 
184
    def test_scenarios(self):
 
185
        # check that constructor parameters are passed through to the adapted
 
186
        # test.
 
187
        from .per_branch import make_scenarios
 
188
        server1 = "a"
 
189
        server2 = "b"
 
190
        formats = [("c", "C"), ("d", "D")]
 
191
        scenarios = make_scenarios(server1, server2, formats)
 
192
        self.assertEqual(2, len(scenarios))
 
193
        self.assertEqual([
 
194
            ('str',
 
195
             {'branch_format': 'c',
 
196
              'bzrdir_format': 'C',
 
197
              'transport_readonly_server': 'b',
 
198
              'transport_server': 'a'}),
 
199
            ('str',
 
200
             {'branch_format': 'd',
 
201
              'bzrdir_format': 'D',
 
202
              'transport_readonly_server': 'b',
 
203
              'transport_server': 'a'})],
 
204
            scenarios)
 
205
 
 
206
 
 
207
class TestBzrDirScenarios(tests.TestCase):
 
208
 
 
209
    def test_scenarios(self):
 
210
        # check that constructor parameters are passed through to the adapted
 
211
        # test.
 
212
        from .per_controldir import make_scenarios
 
213
        vfs_factory = "v"
 
214
        server1 = "a"
 
215
        server2 = "b"
 
216
        formats = ["c", "d"]
 
217
        scenarios = make_scenarios(vfs_factory, server1, server2, formats)
 
218
        self.assertEqual([
 
219
            ('str',
 
220
             {'bzrdir_format': 'c',
 
221
              'transport_readonly_server': 'b',
 
222
              'transport_server': 'a',
 
223
              'vfs_transport_factory': 'v'}),
 
224
            ('str',
 
225
             {'bzrdir_format': 'd',
 
226
              'transport_readonly_server': 'b',
 
227
              'transport_server': 'a',
 
228
              'vfs_transport_factory': 'v'})],
 
229
            scenarios)
 
230
 
 
231
 
 
232
class TestRepositoryScenarios(tests.TestCase):
 
233
 
 
234
    def test_formats_to_scenarios(self):
 
235
        from .per_repository import formats_to_scenarios
 
236
        formats = [("(c)", remote.RemoteRepositoryFormat()),
 
237
                   ("(d)", repository.format_registry.get(
 
238
                    'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
 
239
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
240
            None)
 
241
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
242
            vfs_transport_factory="vfs")
 
243
        # no_vfs generate scenarios without vfs_transport_factory
 
244
        expected = [
 
245
            ('RemoteRepositoryFormat(c)',
 
246
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
247
              'repository_format': remote.RemoteRepositoryFormat(),
 
248
              'transport_readonly_server': 'readonly',
 
249
              'transport_server': 'server'}),
 
250
            ('RepositoryFormat2a(d)',
 
251
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
252
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
253
              'transport_readonly_server': 'readonly',
 
254
              'transport_server': 'server'})]
 
255
        self.assertEqual(expected, no_vfs_scenarios)
 
256
        self.assertEqual([
 
257
            ('RemoteRepositoryFormat(c)',
 
258
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
259
              'repository_format': remote.RemoteRepositoryFormat(),
 
260
              'transport_readonly_server': 'readonly',
 
261
              'transport_server': 'server',
 
262
              'vfs_transport_factory': 'vfs'}),
 
263
            ('RepositoryFormat2a(d)',
 
264
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
265
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
266
              'transport_readonly_server': 'readonly',
 
267
              'transport_server': 'server',
 
268
              'vfs_transport_factory': 'vfs'})],
 
269
            vfs_scenarios)
 
270
 
 
271
 
 
272
class TestTestScenarioApplication(tests.TestCase):
 
273
    """Tests for the test adaption facilities."""
 
274
 
 
275
    def test_apply_scenario(self):
 
276
        from breezy.tests import apply_scenario
 
277
        input_test = TestTestScenarioApplication("test_apply_scenario")
 
278
        # setup two adapted tests
 
279
        adapted_test1 = apply_scenario(input_test,
 
280
            ("new id",
 
281
            {"bzrdir_format":"bzr_format",
 
282
             "repository_format":"repo_fmt",
 
283
             "transport_server":"transport_server",
 
284
             "transport_readonly_server":"readonly-server"}))
 
285
        adapted_test2 = apply_scenario(input_test,
 
286
            ("new id 2", {"bzrdir_format":None}))
 
287
        # input_test should have been altered.
 
288
        self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
 
289
        # the new tests are mutually incompatible, ensuring it has
 
290
        # made new ones, and unspecified elements in the scenario
 
291
        # should not have been altered.
 
292
        self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
 
293
        self.assertEqual("repo_fmt", adapted_test1.repository_format)
 
294
        self.assertEqual("transport_server", adapted_test1.transport_server)
 
295
        self.assertEqual("readonly-server",
 
296
            adapted_test1.transport_readonly_server)
 
297
        self.assertEqual(
 
298
            "breezy.tests.test_selftest.TestTestScenarioApplication."
 
299
            "test_apply_scenario(new id)",
 
300
            adapted_test1.id())
 
301
        self.assertEqual(None, adapted_test2.bzrdir_format)
 
302
        self.assertEqual(
 
303
            "breezy.tests.test_selftest.TestTestScenarioApplication."
 
304
            "test_apply_scenario(new id 2)",
 
305
            adapted_test2.id())
 
306
 
 
307
 
 
308
class TestInterRepositoryScenarios(tests.TestCase):
 
309
 
 
310
    def test_scenarios(self):
 
311
        # check that constructor parameters are passed through to the adapted
 
312
        # test.
 
313
        from .per_interrepository import make_scenarios
 
314
        server1 = "a"
 
315
        server2 = "b"
 
316
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
317
        scenarios = make_scenarios(server1, server2, formats)
 
318
        self.assertEqual([
 
319
            ('C0,str,str',
 
320
             {'repository_format': 'C1',
 
321
              'repository_format_to': 'C2',
 
322
              'transport_readonly_server': 'b',
 
323
              'transport_server': 'a',
 
324
              'extra_setup': 'C3'}),
 
325
            ('D0,str,str',
 
326
             {'repository_format': 'D1',
 
327
              'repository_format_to': 'D2',
 
328
              'transport_readonly_server': 'b',
 
329
              'transport_server': 'a',
 
330
              'extra_setup': 'D3'})],
 
331
            scenarios)
 
332
 
 
333
 
 
334
class TestWorkingTreeScenarios(tests.TestCase):
 
335
 
 
336
    def test_scenarios(self):
 
337
        # check that constructor parameters are passed through to the adapted
 
338
        # test.
 
339
        from .per_workingtree import make_scenarios
 
340
        server1 = "a"
 
341
        server2 = "b"
 
342
        formats = [workingtree_4.WorkingTreeFormat4(),
 
343
                   workingtree_3.WorkingTreeFormat3(),
 
344
                   workingtree_4.WorkingTreeFormat6()]
 
345
        scenarios = make_scenarios(server1, server2, formats,
 
346
            remote_server='c', remote_readonly_server='d',
 
347
            remote_backing_server='e')
 
348
        self.assertEqual([
 
349
            ('WorkingTreeFormat4',
 
350
             {'bzrdir_format': formats[0]._matchingbzrdir,
 
351
              'transport_readonly_server': 'b',
 
352
              'transport_server': 'a',
 
353
              'workingtree_format': formats[0]}),
 
354
            ('WorkingTreeFormat3',
 
355
             {'bzrdir_format': formats[1]._matchingbzrdir,
 
356
              'transport_readonly_server': 'b',
 
357
              'transport_server': 'a',
 
358
              'workingtree_format': formats[1]}),
 
359
            ('WorkingTreeFormat6',
 
360
             {'bzrdir_format': formats[2]._matchingbzrdir,
 
361
              'transport_readonly_server': 'b',
 
362
              'transport_server': 'a',
 
363
              'workingtree_format': formats[2]}),
 
364
            ('WorkingTreeFormat6,remote',
 
365
             {'bzrdir_format': formats[2]._matchingbzrdir,
 
366
              'repo_is_remote': True,
 
367
              'transport_readonly_server': 'd',
 
368
              'transport_server': 'c',
 
369
              'vfs_transport_factory': 'e',
 
370
              'workingtree_format': formats[2]}),
 
371
            ], scenarios)
 
372
 
 
373
 
 
374
class TestTreeScenarios(tests.TestCase):
 
375
 
 
376
    def test_scenarios(self):
 
377
        # the tree implementation scenario generator is meant to setup one
 
378
        # instance for each working tree format, one additional instance
 
379
        # that will use the default wt format, but create a revision tree for
 
380
        # the tests, and one more that uses the default wt format as a
 
381
        # lightweight checkout of a remote repository.  This means that the wt
 
382
        # ones should have the workingtree_to_test_tree attribute set to
 
383
        # 'return_parameter' and the revision one set to
 
384
        # revision_tree_from_workingtree.
 
385
 
 
386
        from .per_tree import (
 
387
            _dirstate_tree_from_workingtree,
 
388
            make_scenarios,
 
389
            preview_tree_pre,
 
390
            preview_tree_post,
 
391
            return_parameter,
 
392
            revision_tree_from_workingtree
 
393
            )
 
394
        server1 = "a"
 
395
        server2 = "b"
 
396
        smart_server = test_server.SmartTCPServer_for_testing
 
397
        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
 
398
        mem_server = memory.MemoryServer
 
399
        formats = [workingtree_4.WorkingTreeFormat4(),
 
400
                   workingtree_3.WorkingTreeFormat3(),]
 
401
        scenarios = make_scenarios(server1, server2, formats)
 
402
        self.assertEqual(8, len(scenarios))
 
403
        default_wt_format = workingtree.format_registry.get_default()
 
404
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
405
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
406
        wt6_format = workingtree_4.WorkingTreeFormat6()
 
407
        expected_scenarios = [
 
408
            ('WorkingTreeFormat4',
 
409
             {'bzrdir_format': formats[0]._matchingbzrdir,
 
410
              'transport_readonly_server': 'b',
 
411
              'transport_server': 'a',
 
412
              'workingtree_format': formats[0],
 
413
              '_workingtree_to_test_tree': return_parameter,
 
414
              }),
 
415
            ('WorkingTreeFormat3',
 
416
             {'bzrdir_format': formats[1]._matchingbzrdir,
 
417
              'transport_readonly_server': 'b',
 
418
              'transport_server': 'a',
 
419
              'workingtree_format': formats[1],
 
420
              '_workingtree_to_test_tree': return_parameter,
 
421
             }),
 
422
            ('WorkingTreeFormat6,remote',
 
423
             {'bzrdir_format': wt6_format._matchingbzrdir,
 
424
              'repo_is_remote': True,
 
425
              'transport_readonly_server': smart_readonly_server,
 
426
              'transport_server': smart_server,
 
427
              'vfs_transport_factory': mem_server,
 
428
              'workingtree_format': wt6_format,
 
429
              '_workingtree_to_test_tree': return_parameter,
 
430
             }),
 
431
            ('RevisionTree',
 
432
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
 
433
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
434
              'transport_readonly_server': 'b',
 
435
              'transport_server': 'a',
 
436
              'workingtree_format': default_wt_format,
 
437
             }),
 
438
            ('DirStateRevisionTree,WT4',
 
439
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
440
              'bzrdir_format': wt4_format._matchingbzrdir,
 
441
              'transport_readonly_server': 'b',
 
442
              'transport_server': 'a',
 
443
              'workingtree_format': wt4_format,
 
444
             }),
 
445
            ('DirStateRevisionTree,WT5',
 
446
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
447
              'bzrdir_format': wt5_format._matchingbzrdir,
 
448
              'transport_readonly_server': 'b',
 
449
              'transport_server': 'a',
 
450
              'workingtree_format': wt5_format,
 
451
             }),
 
452
            ('PreviewTree',
 
453
             {'_workingtree_to_test_tree': preview_tree_pre,
 
454
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
455
              'transport_readonly_server': 'b',
 
456
              'transport_server': 'a',
 
457
              'workingtree_format': default_wt_format}),
 
458
            ('PreviewTreePost',
 
459
             {'_workingtree_to_test_tree': preview_tree_post,
 
460
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
461
              'transport_readonly_server': 'b',
 
462
              'transport_server': 'a',
 
463
              'workingtree_format': default_wt_format}),
 
464
             ]
 
465
        self.assertEqual(expected_scenarios, scenarios)
 
466
 
 
467
 
 
468
class TestInterTreeScenarios(tests.TestCase):
 
469
    """A group of tests that test the InterTreeTestAdapter."""
 
470
 
 
471
    def test_scenarios(self):
 
472
        # check that constructor parameters are passed through to the adapted
 
473
        # test.
 
474
        # for InterTree tests we want the machinery to bring up two trees in
 
475
        # each instance: the base one, and the one we are interacting with.
 
476
        # because each optimiser can be direction specific, we need to test
 
477
        # each optimiser in its chosen direction.
 
478
        # unlike the TestProviderAdapter we dont want to automatically add a
 
479
        # parameterized one for WorkingTree - the optimisers will tell us what
 
480
        # ones to add.
 
481
        from .per_tree import (
 
482
            return_parameter,
 
483
            )
 
484
        from .per_intertree import (
 
485
            make_scenarios,
 
486
            )
 
487
        from ..bzr.workingtree_3 import WorkingTreeFormat3
 
488
        from ..bzr.workingtree_4 import WorkingTreeFormat4
 
489
        input_test = TestInterTreeScenarios(
 
490
            "test_scenarios")
 
491
        server1 = "a"
 
492
        server2 = "b"
 
493
        format1 = WorkingTreeFormat4()
 
494
        format2 = WorkingTreeFormat3()
 
495
        formats = [("1", str, format1, format2, "converter1"),
 
496
            ("2", int, format2, format1, "converter2")]
 
497
        scenarios = make_scenarios(server1, server2, formats)
 
498
        self.assertEqual(2, len(scenarios))
 
499
        expected_scenarios = [
 
500
            ("1", {
 
501
                "bzrdir_format": format1._matchingbzrdir,
 
502
                "intertree_class": formats[0][1],
 
503
                "workingtree_format": formats[0][2],
 
504
                "workingtree_format_to": formats[0][3],
 
505
                "mutable_trees_to_test_trees": formats[0][4],
 
506
                "_workingtree_to_test_tree": return_parameter,
 
507
                "transport_server": server1,
 
508
                "transport_readonly_server": server2,
 
509
                }),
 
510
            ("2", {
 
511
                "bzrdir_format": format2._matchingbzrdir,
 
512
                "intertree_class": formats[1][1],
 
513
                "workingtree_format": formats[1][2],
 
514
                "workingtree_format_to": formats[1][3],
 
515
                "mutable_trees_to_test_trees": formats[1][4],
 
516
                "_workingtree_to_test_tree": return_parameter,
 
517
                "transport_server": server1,
 
518
                "transport_readonly_server": server2,
 
519
                }),
 
520
            ]
 
521
        self.assertEqual(scenarios, expected_scenarios)
 
522
 
 
523
 
 
524
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
 
525
 
 
526
    def test_home_is_not_working(self):
 
527
        self.assertNotEqual(self.test_dir, self.test_home_dir)
 
528
        cwd = osutils.getcwd()
 
529
        self.assertIsSameRealPath(self.test_dir, cwd)
 
530
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
531
 
 
532
    def test_assertEqualStat_equal(self):
 
533
        from .test_dirstate import _FakeStat
 
534
        self.build_tree(["foo"])
 
535
        real = os.lstat("foo")
 
536
        fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
 
537
            real.st_dev, real.st_ino, real.st_mode)
 
538
        self.assertEqualStat(real, fake)
 
539
 
 
540
    def test_assertEqualStat_notequal(self):
 
541
        self.build_tree(["foo", "longname"])
 
542
        self.assertRaises(AssertionError, self.assertEqualStat,
 
543
            os.lstat("foo"), os.lstat("longname"))
 
544
 
 
545
    def test_assertPathExists(self):
 
546
        self.assertPathExists('.')
 
547
        self.build_tree(['foo/', 'foo/bar'])
 
548
        self.assertPathExists('foo/bar')
 
549
        self.assertPathDoesNotExist('foo/foo')
 
550
 
 
551
 
 
552
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
 
553
 
 
554
    def test_home_is_non_existant_dir_under_root(self):
 
555
        """The test_home_dir for TestCaseWithMemoryTransport is missing.
 
556
 
 
557
        This is because TestCaseWithMemoryTransport is for tests that do not
 
558
        need any disk resources: they should be hooked into breezy in such a
 
559
        way that no global settings are being changed by the test (only a
 
560
        few tests should need to do that), and having a missing dir as home is
 
561
        an effective way to ensure that this is the case.
 
562
        """
 
563
        self.assertIsSameRealPath(
 
564
            self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
 
565
            self.test_home_dir)
 
566
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
567
 
 
568
    def test_cwd_is_TEST_ROOT(self):
 
569
        self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
 
570
        cwd = osutils.getcwd()
 
571
        self.assertIsSameRealPath(self.test_dir, cwd)
 
572
 
 
573
    def test_BRZ_HOME_and_HOME_are_bytestrings(self):
 
574
        """The $BRZ_HOME and $HOME environment variables should not be unicode.
 
575
 
 
576
        See https://bugs.launchpad.net/bzr/+bug/464174
 
577
        """
 
578
        self.assertIsInstance(os.environ['BRZ_HOME'], str)
 
579
        self.assertIsInstance(os.environ['HOME'], str)
 
580
 
 
581
    def test_make_branch_and_memory_tree(self):
 
582
        """In TestCaseWithMemoryTransport we should not make the branch on disk.
 
583
 
 
584
        This is hard to comprehensively robustly test, so we settle for making
 
585
        a branch and checking no directory was created at its relpath.
 
586
        """
 
587
        tree = self.make_branch_and_memory_tree('dir')
 
588
        # Guard against regression into MemoryTransport leaking
 
589
        # files to disk instead of keeping them in memory.
 
590
        self.assertFalse(osutils.lexists('dir'))
 
591
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
592
 
 
593
    def test_make_branch_and_memory_tree_with_format(self):
 
594
        """make_branch_and_memory_tree should accept a format option."""
 
595
        format = bzrdir.BzrDirMetaFormat1()
 
596
        format.repository_format = repository.format_registry.get_default()
 
597
        tree = self.make_branch_and_memory_tree('dir', format=format)
 
598
        # Guard against regression into MemoryTransport leaking
 
599
        # files to disk instead of keeping them in memory.
 
600
        self.assertFalse(osutils.lexists('dir'))
 
601
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
602
        self.assertEqual(format.repository_format.__class__,
 
603
            tree.branch.repository._format.__class__)
 
604
 
 
605
    def test_make_branch_builder(self):
 
606
        builder = self.make_branch_builder('dir')
 
607
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
 
608
        # Guard against regression into MemoryTransport leaking
 
609
        # files to disk instead of keeping them in memory.
 
610
        self.assertFalse(osutils.lexists('dir'))
 
611
 
 
612
    def test_make_branch_builder_with_format(self):
 
613
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
 
614
        # that the format objects are used.
 
615
        format = bzrdir.BzrDirMetaFormat1()
 
616
        repo_format = repository.format_registry.get_default()
 
617
        format.repository_format = repo_format
 
618
        builder = self.make_branch_builder('dir', format=format)
 
619
        the_branch = builder.get_branch()
 
620
        # Guard against regression into MemoryTransport leaking
 
621
        # files to disk instead of keeping them in memory.
 
622
        self.assertFalse(osutils.lexists('dir'))
 
623
        self.assertEqual(format.repository_format.__class__,
 
624
                         the_branch.repository._format.__class__)
 
625
        self.assertEqual(repo_format.get_format_string(),
 
626
                         self.get_transport().get_bytes(
 
627
                            'dir/.bzr/repository/format'))
 
628
 
 
629
    def test_make_branch_builder_with_format_name(self):
 
630
        builder = self.make_branch_builder('dir', format='knit')
 
631
        the_branch = builder.get_branch()
 
632
        # Guard against regression into MemoryTransport leaking
 
633
        # files to disk instead of keeping them in memory.
 
634
        self.assertFalse(osutils.lexists('dir'))
 
635
        dir_format = controldir.format_registry.make_controldir('knit')
 
636
        self.assertEqual(dir_format.repository_format.__class__,
 
637
                         the_branch.repository._format.__class__)
 
638
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
 
639
                         self.get_transport().get_bytes(
 
640
                            'dir/.bzr/repository/format'))
 
641
 
 
642
    def test_dangling_locks_cause_failures(self):
 
643
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
 
644
            def test_function(self):
 
645
                t = self.get_transport_from_path('.')
 
646
                l = lockdir.LockDir(t, 'lock')
 
647
                l.create()
 
648
                l.attempt_lock()
 
649
        test = TestDanglingLock('test_function')
 
650
        result = test.run()
 
651
        total_failures = result.errors + result.failures
 
652
        if self._lock_check_thorough:
 
653
            self.assertEqual(1, len(total_failures))
 
654
        else:
 
655
            # When _lock_check_thorough is disabled, then we don't trigger a
 
656
            # failure
 
657
            self.assertEqual(0, len(total_failures))
 
658
 
 
659
 
 
660
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
 
661
    """Tests for the convenience functions TestCaseWithTransport introduces."""
 
662
 
 
663
    def test_get_readonly_url_none(self):
 
664
        from ..transport.readonly import ReadonlyTransportDecorator
 
665
        self.vfs_transport_factory = memory.MemoryServer
 
666
        self.transport_readonly_server = None
 
667
        # calling get_readonly_transport() constructs a decorator on the url
 
668
        # for the server
 
669
        url = self.get_readonly_url()
 
670
        url2 = self.get_readonly_url('foo/bar')
 
671
        t = transport.get_transport_from_url(url)
 
672
        t2 = transport.get_transport_from_url(url2)
 
673
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
674
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
675
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
676
 
 
677
    def test_get_readonly_url_http(self):
 
678
        from .http_server import HttpServer
 
679
        from ..transport.http import HttpTransportBase
 
680
        self.transport_server = test_server.LocalURLServer
 
681
        self.transport_readonly_server = HttpServer
 
682
        # calling get_readonly_transport() gives us a HTTP server instance.
 
683
        url = self.get_readonly_url()
 
684
        url2 = self.get_readonly_url('foo/bar')
 
685
        # the transport returned may be any HttpTransportBase subclass
 
686
        t = transport.get_transport_from_url(url)
 
687
        t2 = transport.get_transport_from_url(url2)
 
688
        self.assertIsInstance(t, HttpTransportBase)
 
689
        self.assertIsInstance(t2, HttpTransportBase)
 
690
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
691
 
 
692
    def test_is_directory(self):
 
693
        """Test assertIsDirectory assertion"""
 
694
        t = self.get_transport()
 
695
        self.build_tree(['a_dir/', 'a_file'], transport=t)
 
696
        self.assertIsDirectory('a_dir', t)
 
697
        self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
 
698
        self.assertRaises(AssertionError, self.assertIsDirectory, 'not_here', t)
 
699
 
 
700
    def test_make_branch_builder(self):
 
701
        builder = self.make_branch_builder('dir')
 
702
        rev_id = builder.build_commit()
 
703
        self.assertPathExists('dir')
 
704
        a_dir = controldir.ControlDir.open('dir')
 
705
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
 
706
        a_branch = a_dir.open_branch()
 
707
        builder_branch = builder.get_branch()
 
708
        self.assertEqual(a_branch.base, builder_branch.base)
 
709
        self.assertEqual((1, rev_id), builder_branch.last_revision_info())
 
710
        self.assertEqual((1, rev_id), a_branch.last_revision_info())
 
711
 
 
712
 
 
713
class TestTestCaseTransports(tests.TestCaseWithTransport):
 
714
 
 
715
    def setUp(self):
 
716
        super(TestTestCaseTransports, self).setUp()
 
717
        self.vfs_transport_factory = memory.MemoryServer
 
718
 
 
719
    def test_make_controldir_preserves_transport(self):
 
720
        t = self.get_transport()
 
721
        result_bzrdir = self.make_controldir('subdir')
 
722
        self.assertIsInstance(result_bzrdir.transport,
 
723
                              memory.MemoryTransport)
 
724
        # should not be on disk, should only be in memory
 
725
        self.assertPathDoesNotExist('subdir')
 
726
 
 
727
 
 
728
class TestChrootedTest(tests.ChrootedTestCase):
 
729
 
 
730
    def test_root_is_root(self):
 
731
        t = transport.get_transport_from_url(self.get_readonly_url())
 
732
        url = t.base
 
733
        self.assertEqual(url, t.clone('..').base)
 
734
 
 
735
 
 
736
class TestProfileResult(tests.TestCase):
 
737
 
 
738
    def test_profiles_tests(self):
 
739
        self.requireFeature(features.lsprof_feature)
 
740
        terminal = testtools.testresult.doubles.ExtendedTestResult()
 
741
        result = tests.ProfileResult(terminal)
 
742
        class Sample(tests.TestCase):
 
743
            def a(self):
 
744
                self.sample_function()
 
745
            def sample_function(self):
 
746
                pass
 
747
        test = Sample("a")
 
748
        test.run(result)
 
749
        case = terminal._events[0][1]
 
750
        self.assertLength(1, case._benchcalls)
 
751
        # We must be able to unpack it as the test reporting code wants
 
752
        (_, _, _), stats = case._benchcalls[0]
 
753
        self.assertTrue(callable(stats.pprint))
 
754
 
 
755
 
 
756
class TestTestResult(tests.TestCase):
 
757
 
 
758
    def check_timing(self, test_case, expected_re):
 
759
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
760
        capture = testtools.testresult.doubles.ExtendedTestResult()
 
761
        test_case.run(MultiTestResult(result, capture))
 
762
        run_case = capture._events[0][1]
 
763
        timed_string = result._testTimeString(run_case)
 
764
        self.assertContainsRe(timed_string, expected_re)
 
765
 
 
766
    def test_test_reporting(self):
 
767
        class ShortDelayTestCase(tests.TestCase):
 
768
            def test_short_delay(self):
 
769
                time.sleep(0.003)
 
770
            def test_short_benchmark(self):
 
771
                self.time(time.sleep, 0.003)
 
772
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
773
                          r"^ +[0-9]+ms$")
 
774
        # if a benchmark time is given, we now show just that time followed by
 
775
        # a star
 
776
        self.check_timing(ShortDelayTestCase('test_short_benchmark'),
 
777
                          r"^ +[0-9]+ms\*$")
 
778
 
 
779
    def test_unittest_reporting_unittest_class(self):
 
780
        # getting the time from a non-breezy test works ok
 
781
        class ShortDelayTestCase(unittest.TestCase):
 
782
            def test_short_delay(self):
 
783
                time.sleep(0.003)
 
784
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
785
                          r"^ +[0-9]+ms$")
 
786
 
 
787
    def _time_hello_world_encoding(self):
 
788
        """Profile two sleep calls
 
789
 
 
790
        This is used to exercise the test framework.
 
791
        """
 
792
        self.time(unicode, 'hello', errors='replace')
 
793
        self.time(unicode, 'world', errors='replace')
 
794
 
 
795
    def test_lsprofiling(self):
 
796
        """Verbose test result prints lsprof statistics from test cases."""
 
797
        self.requireFeature(features.lsprof_feature)
 
798
        result_stream = StringIO()
 
799
        result = breezy.tests.VerboseTestResult(
 
800
            result_stream,
 
801
            descriptions=0,
 
802
            verbosity=2,
 
803
            )
 
804
        # we want profile a call of some sort and check it is output by
 
805
        # addSuccess. We dont care about addError or addFailure as they
 
806
        # are not that interesting for performance tuning.
 
807
        # make a new test instance that when run will generate a profile
 
808
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
809
        example_test_case._gather_lsprof_in_benchmarks = True
 
810
        # execute the test, which should succeed and record profiles
 
811
        example_test_case.run(result)
 
812
        # lsprofile_something()
 
813
        # if this worked we want
 
814
        # LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
 
815
        #    CallCount    Recursive    Total(ms)   Inline(ms) module:lineno(function)
 
816
        # (the lsprof header)
 
817
        # ... an arbitrary number of lines
 
818
        # and the function call which is time.sleep.
 
819
        #           1        0            ???         ???       ???(sleep)
 
820
        # and then repeated but with 'world', rather than 'hello'.
 
821
        # this should appear in the output stream of our test result.
 
822
        output = result_stream.getvalue()
 
823
        self.assertContainsRe(output,
 
824
            r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
 
825
        self.assertContainsRe(output,
 
826
            r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
 
827
        self.assertContainsRe(output,
 
828
            r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
 
829
        self.assertContainsRe(output,
 
830
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
 
831
 
 
832
    def test_uses_time_from_testtools(self):
 
833
        """Test case timings in verbose results should use testtools times"""
 
834
        import datetime
 
835
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
836
            def startTest(self, test):
 
837
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
838
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
839
            def addSuccess(self, test):
 
840
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
841
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
842
            def report_tests_starting(self): pass
 
843
        sio = StringIO()
 
844
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
845
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
846
 
 
847
    def test_known_failure(self):
 
848
        """Using knownFailure should trigger several result actions."""
 
849
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
850
            def stopTestRun(self): pass
 
851
            def report_tests_starting(self): pass
 
852
            def report_known_failure(self, test, err=None, details=None):
 
853
                self._call = test, 'known failure'
 
854
        result = InstrumentedTestResult(None, None, None, None)
 
855
        class Test(tests.TestCase):
 
856
            def test_function(self):
 
857
                self.knownFailure('failed!')
 
858
        test = Test("test_function")
 
859
        test.run(result)
 
860
        # it should invoke 'report_known_failure'.
 
861
        self.assertEqual(2, len(result._call))
 
862
        self.assertEqual(test.id(), result._call[0].id())
 
863
        self.assertEqual('known failure', result._call[1])
 
864
        # we dont introspec the traceback, if the rest is ok, it would be
 
865
        # exceptional for it not to be.
 
866
        # it should update the known_failure_count on the object.
 
867
        self.assertEqual(1, result.known_failure_count)
 
868
        # the result should be successful.
 
869
        self.assertTrue(result.wasSuccessful())
 
870
 
 
871
    def test_verbose_report_known_failure(self):
 
872
        # verbose test output formatting
 
873
        result_stream = StringIO()
 
874
        result = breezy.tests.VerboseTestResult(
 
875
            result_stream,
 
876
            descriptions=0,
 
877
            verbosity=2,
 
878
            )
 
879
        _get_test("test_xfail").run(result)
 
880
        self.assertContainsRe(result_stream.getvalue(),
 
881
            "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
 
882
            "\\s*(?:Text attachment: )?reason"
 
883
            "(?:\n-+\n|: {{{)"
 
884
            "this_fails"
 
885
            "(?:\n-+\n|}}}\n)")
 
886
 
 
887
    def get_passing_test(self):
 
888
        """Return a test object that can't be run usefully."""
 
889
        def passing_test():
 
890
            pass
 
891
        return unittest.FunctionTestCase(passing_test)
 
892
 
 
893
    def test_add_not_supported(self):
 
894
        """Test the behaviour of invoking addNotSupported."""
 
895
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
896
            def stopTestRun(self): pass
 
897
            def report_tests_starting(self): pass
 
898
            def report_unsupported(self, test, feature):
 
899
                self._call = test, feature
 
900
        result = InstrumentedTestResult(None, None, None, None)
 
901
        test = SampleTestCase('_test_pass')
 
902
        feature = features.Feature()
 
903
        result.startTest(test)
 
904
        result.addNotSupported(test, feature)
 
905
        # it should invoke 'report_unsupported'.
 
906
        self.assertEqual(2, len(result._call))
 
907
        self.assertEqual(test, result._call[0])
 
908
        self.assertEqual(feature, result._call[1])
 
909
        # the result should be successful.
 
910
        self.assertTrue(result.wasSuccessful())
 
911
        # it should record the test against a count of tests not run due to
 
912
        # this feature.
 
913
        self.assertEqual(1, result.unsupported['Feature'])
 
914
        # and invoking it again should increment that counter
 
915
        result.addNotSupported(test, feature)
 
916
        self.assertEqual(2, result.unsupported['Feature'])
 
917
 
 
918
    def test_verbose_report_unsupported(self):
 
919
        # verbose test output formatting
 
920
        result_stream = StringIO()
 
921
        result = breezy.tests.VerboseTestResult(
 
922
            result_stream,
 
923
            descriptions=0,
 
924
            verbosity=2,
 
925
            )
 
926
        test = self.get_passing_test()
 
927
        feature = features.Feature()
 
928
        result.startTest(test)
 
929
        prefix = len(result_stream.getvalue())
 
930
        result.report_unsupported(test, feature)
 
931
        output = result_stream.getvalue()[prefix:]
 
932
        lines = output.splitlines()
 
933
        # We don't check for the final '0ms' since it may fail on slow hosts
 
934
        self.assertStartsWith(lines[0], 'NODEP')
 
935
        self.assertEqual(lines[1],
 
936
                         "    The feature 'Feature' is not available.")
 
937
 
 
938
    def test_unavailable_exception(self):
 
939
        """An UnavailableFeature being raised should invoke addNotSupported."""
 
940
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
941
            def stopTestRun(self): pass
 
942
            def report_tests_starting(self): pass
 
943
            def addNotSupported(self, test, feature):
 
944
                self._call = test, feature
 
945
        result = InstrumentedTestResult(None, None, None, None)
 
946
        feature = features.Feature()
 
947
        class Test(tests.TestCase):
 
948
            def test_function(self):
 
949
                raise tests.UnavailableFeature(feature)
 
950
        test = Test("test_function")
 
951
        test.run(result)
 
952
        # it should invoke 'addNotSupported'.
 
953
        self.assertEqual(2, len(result._call))
 
954
        self.assertEqual(test.id(), result._call[0].id())
 
955
        self.assertEqual(feature, result._call[1])
 
956
        # and not count as an error
 
957
        self.assertEqual(0, result.error_count)
 
958
 
 
959
    def test_strict_with_unsupported_feature(self):
 
960
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
961
        test = self.get_passing_test()
 
962
        feature = "Unsupported Feature"
 
963
        result.addNotSupported(test, feature)
 
964
        self.assertFalse(result.wasStrictlySuccessful())
 
965
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
966
 
 
967
    def test_strict_with_known_failure(self):
 
968
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
969
        test = _get_test("test_xfail")
 
970
        test.run(result)
 
971
        self.assertFalse(result.wasStrictlySuccessful())
 
972
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
973
 
 
974
    def test_strict_with_success(self):
 
975
        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
976
        test = self.get_passing_test()
 
977
        result.addSuccess(test)
 
978
        self.assertTrue(result.wasStrictlySuccessful())
 
979
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
980
 
 
981
    def test_startTests(self):
 
982
        """Starting the first test should trigger startTests."""
 
983
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
984
            calls = 0
 
985
            def startTests(self): self.calls += 1
 
986
        result = InstrumentedTestResult(None, None, None, None)
 
987
        def test_function():
 
988
            pass
 
989
        test = unittest.FunctionTestCase(test_function)
 
990
        test.run(result)
 
991
        self.assertEqual(1, result.calls)
 
992
 
 
993
    def test_startTests_only_once(self):
 
994
        """With multiple tests startTests should still only be called once"""
 
995
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
996
            calls = 0
 
997
            def startTests(self): self.calls += 1
 
998
        result = InstrumentedTestResult(None, None, None, None)
 
999
        suite = unittest.TestSuite([
 
1000
            unittest.FunctionTestCase(lambda: None),
 
1001
            unittest.FunctionTestCase(lambda: None)])
 
1002
        suite.run(result)
 
1003
        self.assertEqual(1, result.calls)
 
1004
        self.assertEqual(2, result.count)
 
1005
 
 
1006
 
 
1007
class TestRunner(tests.TestCase):
 
1008
 
 
1009
    def dummy_test(self):
 
1010
        pass
 
1011
 
 
1012
    def run_test_runner(self, testrunner, test):
 
1013
        """Run suite in testrunner, saving global state and restoring it.
 
1014
 
 
1015
        This current saves and restores:
 
1016
        TestCaseInTempDir.TEST_ROOT
 
1017
 
 
1018
        There should be no tests in this file that use
 
1019
        breezy.tests.TextTestRunner without using this convenience method,
 
1020
        because of our use of global state.
 
1021
        """
 
1022
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1023
        try:
 
1024
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1025
            return testrunner.run(test)
 
1026
        finally:
 
1027
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1028
 
 
1029
    def test_known_failure_failed_run(self):
 
1030
        # run a test that generates a known failure which should be printed in
 
1031
        # the final output when real failures occur.
 
1032
        class Test(tests.TestCase):
 
1033
            def known_failure_test(self):
 
1034
                self.expectFailure('failed', self.assertTrue, False)
 
1035
        test = unittest.TestSuite()
 
1036
        test.addTest(Test("known_failure_test"))
 
1037
        def failing_test():
 
1038
            raise AssertionError('foo')
 
1039
        test.addTest(unittest.FunctionTestCase(failing_test))
 
1040
        stream = StringIO()
 
1041
        runner = tests.TextTestRunner(stream=stream)
 
1042
        result = self.run_test_runner(runner, test)
 
1043
        lines = stream.getvalue().splitlines()
 
1044
        self.assertContainsRe(stream.getvalue(),
 
1045
            '(?sm)^brz selftest.*$'
 
1046
            '.*'
 
1047
            '^======================================================================\n'
 
1048
            '^FAIL: failing_test\n'
 
1049
            '^----------------------------------------------------------------------\n'
 
1050
            'Traceback \\(most recent call last\\):\n'
 
1051
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1052
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1053
            '.*'
 
1054
            '^----------------------------------------------------------------------\n'
 
1055
            '.*'
 
1056
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1057
            )
 
1058
 
 
1059
    def test_known_failure_ok_run(self):
 
1060
        # run a test that generates a known failure which should be printed in
 
1061
        # the final output.
 
1062
        class Test(tests.TestCase):
 
1063
            def known_failure_test(self):
 
1064
                self.knownFailure("Never works...")
 
1065
        test = Test("known_failure_test")
 
1066
        stream = StringIO()
 
1067
        runner = tests.TextTestRunner(stream=stream)
 
1068
        result = self.run_test_runner(runner, test)
 
1069
        self.assertContainsRe(stream.getvalue(),
 
1070
            '\n'
 
1071
            '-*\n'
 
1072
            'Ran 1 test in .*\n'
 
1073
            '\n'
 
1074
            'OK \\(known_failures=1\\)\n')
 
1075
 
 
1076
    def test_unexpected_success_bad(self):
 
1077
        class Test(tests.TestCase):
 
1078
            def test_truth(self):
 
1079
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1080
        runner = tests.TextTestRunner(stream=StringIO())
 
1081
        result = self.run_test_runner(runner, Test("test_truth"))
 
1082
        self.assertContainsRe(runner.stream.getvalue(),
 
1083
            "=+\n"
 
1084
            "FAIL: \\S+\.test_truth\n"
 
1085
            "-+\n"
 
1086
            "(?:.*\n)*"
 
1087
            "\\s*(?:Text attachment: )?reason"
 
1088
            "(?:\n-+\n|: {{{)"
 
1089
            "No absolute truth"
 
1090
            "(?:\n-+\n|}}}\n)"
 
1091
            "(?:.*\n)*"
 
1092
            "-+\n"
 
1093
            "Ran 1 test in .*\n"
 
1094
            "\n"
 
1095
            "FAILED \\(failures=1\\)\n\\Z")
 
1096
 
 
1097
    def test_result_decorator(self):
 
1098
        # decorate results
 
1099
        calls = []
 
1100
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1101
            def startTest(self, test):
 
1102
                ExtendedToOriginalDecorator.startTest(self, test)
 
1103
                calls.append('start')
 
1104
        test = unittest.FunctionTestCase(lambda:None)
 
1105
        stream = StringIO()
 
1106
        runner = tests.TextTestRunner(stream=stream,
 
1107
            result_decorators=[LoggingDecorator])
 
1108
        result = self.run_test_runner(runner, test)
 
1109
        self.assertLength(1, calls)
 
1110
 
 
1111
    def test_skipped_test(self):
 
1112
        # run a test that is skipped, and check the suite as a whole still
 
1113
        # succeeds.
 
1114
        # skipping_test must be hidden in here so it's not run as a real test
 
1115
        class SkippingTest(tests.TestCase):
 
1116
            def skipping_test(self):
 
1117
                raise tests.TestSkipped('test intentionally skipped')
 
1118
        runner = tests.TextTestRunner(stream=StringIO())
 
1119
        test = SkippingTest("skipping_test")
 
1120
        result = self.run_test_runner(runner, test)
 
1121
        self.assertTrue(result.wasSuccessful())
 
1122
 
 
1123
    def test_skipped_from_setup(self):
 
1124
        calls = []
 
1125
        class SkippedSetupTest(tests.TestCase):
 
1126
 
 
1127
            def setUp(self):
 
1128
                calls.append('setUp')
 
1129
                self.addCleanup(self.cleanup)
 
1130
                raise tests.TestSkipped('skipped setup')
 
1131
 
 
1132
            def test_skip(self):
 
1133
                self.fail('test reached')
 
1134
 
 
1135
            def cleanup(self):
 
1136
                calls.append('cleanup')
 
1137
 
 
1138
        runner = tests.TextTestRunner(stream=StringIO())
 
1139
        test = SkippedSetupTest('test_skip')
 
1140
        result = self.run_test_runner(runner, test)
 
1141
        self.assertTrue(result.wasSuccessful())
 
1142
        # Check if cleanup was called the right number of times.
 
1143
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1144
 
 
1145
    def test_skipped_from_test(self):
 
1146
        calls = []
 
1147
        class SkippedTest(tests.TestCase):
 
1148
 
 
1149
            def setUp(self):
 
1150
                super(SkippedTest, self).setUp()
 
1151
                calls.append('setUp')
 
1152
                self.addCleanup(self.cleanup)
 
1153
 
 
1154
            def test_skip(self):
 
1155
                raise tests.TestSkipped('skipped test')
 
1156
 
 
1157
            def cleanup(self):
 
1158
                calls.append('cleanup')
 
1159
 
 
1160
        runner = tests.TextTestRunner(stream=StringIO())
 
1161
        test = SkippedTest('test_skip')
 
1162
        result = self.run_test_runner(runner, test)
 
1163
        self.assertTrue(result.wasSuccessful())
 
1164
        # Check if cleanup was called the right number of times.
 
1165
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1166
 
 
1167
    def test_not_applicable(self):
 
1168
        # run a test that is skipped because it's not applicable
 
1169
        class Test(tests.TestCase):
 
1170
            def not_applicable_test(self):
 
1171
                raise tests.TestNotApplicable('this test never runs')
 
1172
        out = StringIO()
 
1173
        runner = tests.TextTestRunner(stream=out, verbosity=2)
 
1174
        test = Test("not_applicable_test")
 
1175
        result = self.run_test_runner(runner, test)
 
1176
        self.log(out.getvalue())
 
1177
        self.assertTrue(result.wasSuccessful())
 
1178
        self.assertTrue(result.wasStrictlySuccessful())
 
1179
        self.assertContainsRe(out.getvalue(),
 
1180
                r'(?m)not_applicable_test  * N/A')
 
1181
        self.assertContainsRe(out.getvalue(),
 
1182
                r'(?m)^    this test never runs')
 
1183
 
 
1184
    def test_unsupported_features_listed(self):
 
1185
        """When unsupported features are encountered they are detailed."""
 
1186
        class Feature1(features.Feature):
 
1187
            def _probe(self): return False
 
1188
        class Feature2(features.Feature):
 
1189
            def _probe(self): return False
 
1190
        # create sample tests
 
1191
        test1 = SampleTestCase('_test_pass')
 
1192
        test1._test_needs_features = [Feature1()]
 
1193
        test2 = SampleTestCase('_test_pass')
 
1194
        test2._test_needs_features = [Feature2()]
 
1195
        test = unittest.TestSuite()
 
1196
        test.addTest(test1)
 
1197
        test.addTest(test2)
 
1198
        stream = StringIO()
 
1199
        runner = tests.TextTestRunner(stream=stream)
 
1200
        result = self.run_test_runner(runner, test)
 
1201
        lines = stream.getvalue().splitlines()
 
1202
        self.assertEqual([
 
1203
            'OK',
 
1204
            "Missing feature 'Feature1' skipped 1 tests.",
 
1205
            "Missing feature 'Feature2' skipped 1 tests.",
 
1206
            ],
 
1207
            lines[-3:])
 
1208
 
 
1209
    def test_verbose_test_count(self):
 
1210
        """A verbose test run reports the right test count at the start"""
 
1211
        suite = TestUtil.TestSuite([
 
1212
            unittest.FunctionTestCase(lambda:None),
 
1213
            unittest.FunctionTestCase(lambda:None)])
 
1214
        self.assertEqual(suite.countTestCases(), 2)
 
1215
        stream = StringIO()
 
1216
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1217
        # Need to use the CountingDecorator as that's what sets num_tests
 
1218
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1219
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1220
 
 
1221
    def test_startTestRun(self):
 
1222
        """run should call result.startTestRun()"""
 
1223
        calls = []
 
1224
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1225
            def startTestRun(self):
 
1226
                ExtendedToOriginalDecorator.startTestRun(self)
 
1227
                calls.append('startTestRun')
 
1228
        test = unittest.FunctionTestCase(lambda:None)
 
1229
        stream = StringIO()
 
1230
        runner = tests.TextTestRunner(stream=stream,
 
1231
            result_decorators=[LoggingDecorator])
 
1232
        result = self.run_test_runner(runner, test)
 
1233
        self.assertLength(1, calls)
 
1234
 
 
1235
    def test_stopTestRun(self):
 
1236
        """run should call result.stopTestRun()"""
 
1237
        calls = []
 
1238
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1239
            def stopTestRun(self):
 
1240
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1241
                calls.append('stopTestRun')
 
1242
        test = unittest.FunctionTestCase(lambda:None)
 
1243
        stream = StringIO()
 
1244
        runner = tests.TextTestRunner(stream=stream,
 
1245
            result_decorators=[LoggingDecorator])
 
1246
        result = self.run_test_runner(runner, test)
 
1247
        self.assertLength(1, calls)
 
1248
 
 
1249
    def test_unicode_test_output_on_ascii_stream(self):
 
1250
        """Showing results should always succeed even on an ascii console"""
 
1251
        class FailureWithUnicode(tests.TestCase):
 
1252
            def test_log_unicode(self):
 
1253
                self.log(u"\u2606")
 
1254
                self.fail("Now print that log!")
 
1255
        out = StringIO()
 
1256
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1257
            lambda trace=False: "ascii")
 
1258
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1259
            FailureWithUnicode("test_log_unicode"))
 
1260
        self.assertContainsRe(out.getvalue(),
 
1261
            "(?:Text attachment: )?log"
 
1262
            "(?:\n-+\n|: {{{)"
 
1263
            "\d+\.\d+  \\\\u2606"
 
1264
            "(?:\n-+\n|}}}\n)")
 
1265
 
 
1266
 
 
1267
class SampleTestCase(tests.TestCase):
 
1268
 
 
1269
    def _test_pass(self):
 
1270
        pass
 
1271
 
 
1272
class _TestException(Exception):
 
1273
    pass
 
1274
 
 
1275
 
 
1276
class TestTestCase(tests.TestCase):
 
1277
    """Tests that test the core breezy TestCase."""
 
1278
 
 
1279
    def test_assertLength_matches_empty(self):
 
1280
        a_list = []
 
1281
        self.assertLength(0, a_list)
 
1282
 
 
1283
    def test_assertLength_matches_nonempty(self):
 
1284
        a_list = [1, 2, 3]
 
1285
        self.assertLength(3, a_list)
 
1286
 
 
1287
    def test_assertLength_fails_different(self):
 
1288
        a_list = []
 
1289
        self.assertRaises(AssertionError, self.assertLength, 1, a_list)
 
1290
 
 
1291
    def test_assertLength_shows_sequence_in_failure(self):
 
1292
        a_list = [1, 2, 3]
 
1293
        exception = self.assertRaises(AssertionError, self.assertLength, 2,
 
1294
            a_list)
 
1295
        self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
 
1296
            exception.args[0])
 
1297
 
 
1298
    def test_base_setUp_not_called_causes_failure(self):
 
1299
        class TestCaseWithBrokenSetUp(tests.TestCase):
 
1300
            def setUp(self):
 
1301
                pass # does not call TestCase.setUp
 
1302
            def test_foo(self):
 
1303
                pass
 
1304
        test = TestCaseWithBrokenSetUp('test_foo')
 
1305
        result = unittest.TestResult()
 
1306
        test.run(result)
 
1307
        self.assertFalse(result.wasSuccessful())
 
1308
        self.assertEqual(1, result.testsRun)
 
1309
 
 
1310
    def test_base_tearDown_not_called_causes_failure(self):
 
1311
        class TestCaseWithBrokenTearDown(tests.TestCase):
 
1312
            def tearDown(self):
 
1313
                pass # does not call TestCase.tearDown
 
1314
            def test_foo(self):
 
1315
                pass
 
1316
        test = TestCaseWithBrokenTearDown('test_foo')
 
1317
        result = unittest.TestResult()
 
1318
        test.run(result)
 
1319
        self.assertFalse(result.wasSuccessful())
 
1320
        self.assertEqual(1, result.testsRun)
 
1321
 
 
1322
    def test_debug_flags_sanitised(self):
 
1323
        """The breezy debug flags should be sanitised by setUp."""
 
1324
        if 'allow_debug' in tests.selftest_debug_flags:
 
1325
            raise tests.TestNotApplicable(
 
1326
                '-Eallow_debug option prevents debug flag sanitisation')
 
1327
        # we could set something and run a test that will check
 
1328
        # it gets santised, but this is probably sufficient for now:
 
1329
        # if someone runs the test with -Dsomething it will error.
 
1330
        flags = set()
 
1331
        if self._lock_check_thorough:
 
1332
            flags.add('strict_locks')
 
1333
        self.assertEqual(flags, breezy.debug.debug_flags)
 
1334
 
 
1335
    def change_selftest_debug_flags(self, new_flags):
 
1336
        self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
 
1337
 
 
1338
    def test_allow_debug_flag(self):
 
1339
        """The -Eallow_debug flag prevents breezy.debug.debug_flags from being
 
1340
        sanitised (i.e. cleared) before running a test.
 
1341
        """
 
1342
        self.change_selftest_debug_flags({'allow_debug'})
 
1343
        breezy.debug.debug_flags = {'a-flag'}
 
1344
        class TestThatRecordsFlags(tests.TestCase):
 
1345
            def test_foo(nested_self):
 
1346
                self.flags = set(breezy.debug.debug_flags)
 
1347
        test = TestThatRecordsFlags('test_foo')
 
1348
        test.run(self.make_test_result())
 
1349
        flags = {'a-flag'}
 
1350
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1351
            flags.add('strict_locks')
 
1352
        self.assertEqual(flags, self.flags)
 
1353
 
 
1354
    def test_disable_lock_checks(self):
 
1355
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1356
        class TestThatRecordsFlags(tests.TestCase):
 
1357
            def test_foo(nested_self):
 
1358
                self.flags = set(breezy.debug.debug_flags)
 
1359
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1360
        self.change_selftest_debug_flags(set())
 
1361
        test = TestThatRecordsFlags('test_foo')
 
1362
        test.run(self.make_test_result())
 
1363
        # By default we do strict lock checking and thorough lock/unlock
 
1364
        # tracking.
 
1365
        self.assertTrue(self.test_lock_check_thorough)
 
1366
        self.assertEqual({'strict_locks'}, self.flags)
 
1367
        # Now set the disable_lock_checks flag, and show that this changed.
 
1368
        self.change_selftest_debug_flags({'disable_lock_checks'})
 
1369
        test = TestThatRecordsFlags('test_foo')
 
1370
        test.run(self.make_test_result())
 
1371
        self.assertFalse(self.test_lock_check_thorough)
 
1372
        self.assertEqual(set(), self.flags)
 
1373
 
 
1374
    def test_this_fails_strict_lock_check(self):
 
1375
        class TestThatRecordsFlags(tests.TestCase):
 
1376
            def test_foo(nested_self):
 
1377
                self.flags1 = set(breezy.debug.debug_flags)
 
1378
                self.thisFailsStrictLockCheck()
 
1379
                self.flags2 = set(breezy.debug.debug_flags)
 
1380
        # Make sure lock checking is active
 
1381
        self.change_selftest_debug_flags(set())
 
1382
        test = TestThatRecordsFlags('test_foo')
 
1383
        test.run(self.make_test_result())
 
1384
        self.assertEqual({'strict_locks'}, self.flags1)
 
1385
        self.assertEqual(set(), self.flags2)
 
1386
 
 
1387
    def test_debug_flags_restored(self):
 
1388
        """The breezy debug flags should be restored to their original state
 
1389
        after the test was run, even if allow_debug is set.
 
1390
        """
 
1391
        self.change_selftest_debug_flags({'allow_debug'})
 
1392
        # Now run a test that modifies debug.debug_flags.
 
1393
        breezy.debug.debug_flags = {'original-state'}
 
1394
        class TestThatModifiesFlags(tests.TestCase):
 
1395
            def test_foo(self):
 
1396
                breezy.debug.debug_flags = {'modified'}
 
1397
        test = TestThatModifiesFlags('test_foo')
 
1398
        test.run(self.make_test_result())
 
1399
        self.assertEqual({'original-state'}, breezy.debug.debug_flags)
 
1400
 
 
1401
    def make_test_result(self):
 
1402
        """Get a test result that writes to a StringIO."""
 
1403
        return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
 
1404
 
 
1405
    def inner_test(self):
 
1406
        # the inner child test
 
1407
        note("inner_test")
 
1408
 
 
1409
    def outer_child(self):
 
1410
        # the outer child test
 
1411
        note("outer_start")
 
1412
        self.inner_test = TestTestCase("inner_child")
 
1413
        result = self.make_test_result()
 
1414
        self.inner_test.run(result)
 
1415
        note("outer finish")
 
1416
        self.addCleanup(osutils.delete_any, self._log_file_name)
 
1417
 
 
1418
    def test_trace_nesting(self):
 
1419
        # this tests that each test case nests its trace facility correctly.
 
1420
        # we do this by running a test case manually. That test case (A)
 
1421
        # should setup a new log, log content to it, setup a child case (B),
 
1422
        # which should log independently, then case (A) should log a trailer
 
1423
        # and return.
 
1424
        # we do two nested children so that we can verify the state of the
 
1425
        # logs after the outer child finishes is correct, which a bad clean
 
1426
        # up routine in tearDown might trigger a fault in our test with only
 
1427
        # one child, we should instead see the bad result inside our test with
 
1428
        # the two children.
 
1429
        # the outer child test
 
1430
        original_trace = breezy.trace._trace_file
 
1431
        outer_test = TestTestCase("outer_child")
 
1432
        result = self.make_test_result()
 
1433
        outer_test.run(result)
 
1434
        self.assertEqual(original_trace, breezy.trace._trace_file)
 
1435
 
 
1436
    def method_that_times_a_bit_twice(self):
 
1437
        # call self.time twice to ensure it aggregates
 
1438
        self.time(time.sleep, 0.007)
 
1439
        self.time(time.sleep, 0.007)
 
1440
 
 
1441
    def test_time_creates_benchmark_in_result(self):
 
1442
        """Test that the TestCase.time() method accumulates a benchmark time."""
 
1443
        sample_test = TestTestCase("method_that_times_a_bit_twice")
 
1444
        output_stream = StringIO()
 
1445
        result = breezy.tests.VerboseTestResult(
 
1446
            output_stream,
 
1447
            descriptions=0,
 
1448
            verbosity=2)
 
1449
        sample_test.run(result)
 
1450
        self.assertContainsRe(
 
1451
            output_stream.getvalue(),
 
1452
            r"\d+ms\*\n$")
 
1453
 
 
1454
    def test_hooks_sanitised(self):
 
1455
        """The breezy hooks should be sanitised by setUp."""
 
1456
        # Note this test won't fail with hooks that the core library doesn't
 
1457
        # use - but it trigger with a plugin that adds hooks, so its still a
 
1458
        # useful warning in that case.
 
1459
        self.assertEqual(breezy.branch.BranchHooks(), breezy.branch.Branch.hooks)
 
1460
        self.assertEqual(
 
1461
            breezy.bzr.smart.server.SmartServerHooks(),
 
1462
            breezy.bzr.smart.server.SmartTCPServer.hooks)
 
1463
        self.assertEqual(
 
1464
            breezy.commands.CommandHooks(), breezy.commands.Command.hooks)
 
1465
 
 
1466
    def test__gather_lsprof_in_benchmarks(self):
 
1467
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
 
1468
 
 
1469
        Each self.time() call is individually and separately profiled.
 
1470
        """
 
1471
        self.requireFeature(features.lsprof_feature)
 
1472
        # overrides the class member with an instance member so no cleanup
 
1473
        # needed.
 
1474
        self._gather_lsprof_in_benchmarks = True
 
1475
        self.time(time.sleep, 0.000)
 
1476
        self.time(time.sleep, 0.003)
 
1477
        self.assertEqual(2, len(self._benchcalls))
 
1478
        self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
 
1479
        self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
 
1480
        self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats)
 
1481
        self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats)
 
1482
        del self._benchcalls[:]
 
1483
 
 
1484
    def test_knownFailure(self):
 
1485
        """Self.knownFailure() should raise a KnownFailure exception."""
 
1486
        self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
 
1487
 
 
1488
    def test_open_bzrdir_safe_roots(self):
 
1489
        # even a memory transport should fail to open when its url isn't 
 
1490
        # permitted.
 
1491
        # Manually set one up (TestCase doesn't and shouldn't provide magic
 
1492
        # machinery)
 
1493
        transport_server = memory.MemoryServer()
 
1494
        transport_server.start_server()
 
1495
        self.addCleanup(transport_server.stop_server)
 
1496
        t = transport.get_transport_from_url(transport_server.get_url())
 
1497
        controldir.ControlDir.create(t.base)
 
1498
        self.assertRaises(errors.BzrError,
 
1499
            controldir.ControlDir.open_from_transport, t)
 
1500
        # But if we declare this as safe, we can open the bzrdir.
 
1501
        self.permit_url(t.base)
 
1502
        self._bzr_selftest_roots.append(t.base)
 
1503
        controldir.ControlDir.open_from_transport(t)
 
1504
 
 
1505
    def test_requireFeature_available(self):
 
1506
        """self.requireFeature(available) is a no-op."""
 
1507
        class Available(features.Feature):
 
1508
            def _probe(self):return True
 
1509
        feature = Available()
 
1510
        self.requireFeature(feature)
 
1511
 
 
1512
    def test_requireFeature_unavailable(self):
 
1513
        """self.requireFeature(unavailable) raises UnavailableFeature."""
 
1514
        class Unavailable(features.Feature):
 
1515
            def _probe(self):return False
 
1516
        feature = Unavailable()
 
1517
        self.assertRaises(tests.UnavailableFeature,
 
1518
                          self.requireFeature, feature)
 
1519
 
 
1520
    def test_run_no_parameters(self):
 
1521
        test = SampleTestCase('_test_pass')
 
1522
        test.run()
 
1523
 
 
1524
    def test_run_enabled_unittest_result(self):
 
1525
        """Test we revert to regular behaviour when the test is enabled."""
 
1526
        test = SampleTestCase('_test_pass')
 
1527
        class EnabledFeature(object):
 
1528
            def available(self):
 
1529
                return True
 
1530
        test._test_needs_features = [EnabledFeature()]
 
1531
        result = unittest.TestResult()
 
1532
        test.run(result)
 
1533
        self.assertEqual(1, result.testsRun)
 
1534
        self.assertEqual([], result.errors)
 
1535
        self.assertEqual([], result.failures)
 
1536
 
 
1537
    def test_run_disabled_unittest_result(self):
 
1538
        """Test our compatability for disabled tests with unittest results."""
 
1539
        test = SampleTestCase('_test_pass')
 
1540
        class DisabledFeature(object):
 
1541
            def available(self):
 
1542
                return False
 
1543
        test._test_needs_features = [DisabledFeature()]
 
1544
        result = unittest.TestResult()
 
1545
        test.run(result)
 
1546
        self.assertEqual(1, result.testsRun)
 
1547
        self.assertEqual([], result.errors)
 
1548
        self.assertEqual([], result.failures)
 
1549
 
 
1550
    def test_run_disabled_supporting_result(self):
 
1551
        """Test disabled tests behaviour with support aware results."""
 
1552
        test = SampleTestCase('_test_pass')
 
1553
        class DisabledFeature(object):
 
1554
            def __eq__(self, other):
 
1555
                return isinstance(other, DisabledFeature)
 
1556
            def available(self):
 
1557
                return False
 
1558
        the_feature = DisabledFeature()
 
1559
        test._test_needs_features = [the_feature]
 
1560
        class InstrumentedTestResult(unittest.TestResult):
 
1561
            def __init__(self):
 
1562
                unittest.TestResult.__init__(self)
 
1563
                self.calls = []
 
1564
            def startTest(self, test):
 
1565
                self.calls.append(('startTest', test))
 
1566
            def stopTest(self, test):
 
1567
                self.calls.append(('stopTest', test))
 
1568
            def addNotSupported(self, test, feature):
 
1569
                self.calls.append(('addNotSupported', test, feature))
 
1570
        result = InstrumentedTestResult()
 
1571
        test.run(result)
 
1572
        case = result.calls[0][1]
 
1573
        self.assertEqual([
 
1574
            ('startTest', case),
 
1575
            ('addNotSupported', case, the_feature),
 
1576
            ('stopTest', case),
 
1577
            ],
 
1578
            result.calls)
 
1579
 
 
1580
    def test_start_server_registers_url(self):
 
1581
        transport_server = memory.MemoryServer()
 
1582
        # A little strict, but unlikely to be changed soon.
 
1583
        self.assertEqual([], self._bzr_selftest_roots)
 
1584
        self.start_server(transport_server)
 
1585
        self.assertSubset([transport_server.get_url()],
 
1586
            self._bzr_selftest_roots)
 
1587
 
 
1588
    def test_assert_list_raises_on_generator(self):
 
1589
        def generator_which_will_raise():
 
1590
            # This will not raise until after the first yield
 
1591
            yield 1
 
1592
            raise _TestException()
 
1593
 
 
1594
        e = self.assertListRaises(_TestException, generator_which_will_raise)
 
1595
        self.assertIsInstance(e, _TestException)
 
1596
 
 
1597
        e = self.assertListRaises(Exception, generator_which_will_raise)
 
1598
        self.assertIsInstance(e, _TestException)
 
1599
 
 
1600
    def test_assert_list_raises_on_plain(self):
 
1601
        def plain_exception():
 
1602
            raise _TestException()
 
1603
            return []
 
1604
 
 
1605
        e = self.assertListRaises(_TestException, plain_exception)
 
1606
        self.assertIsInstance(e, _TestException)
 
1607
 
 
1608
        e = self.assertListRaises(Exception, plain_exception)
 
1609
        self.assertIsInstance(e, _TestException)
 
1610
 
 
1611
    def test_assert_list_raises_assert_wrong_exception(self):
 
1612
        class _NotTestException(Exception):
 
1613
            pass
 
1614
 
 
1615
        def wrong_exception():
 
1616
            raise _NotTestException()
 
1617
 
 
1618
        def wrong_exception_generator():
 
1619
            yield 1
 
1620
            yield 2
 
1621
            raise _NotTestException()
 
1622
 
 
1623
        # Wrong exceptions are not intercepted
 
1624
        self.assertRaises(_NotTestException,
 
1625
            self.assertListRaises, _TestException, wrong_exception)
 
1626
        self.assertRaises(_NotTestException,
 
1627
            self.assertListRaises, _TestException, wrong_exception_generator)
 
1628
 
 
1629
    def test_assert_list_raises_no_exception(self):
 
1630
        def success():
 
1631
            return []
 
1632
 
 
1633
        def success_generator():
 
1634
            yield 1
 
1635
            yield 2
 
1636
 
 
1637
        self.assertRaises(AssertionError,
 
1638
            self.assertListRaises, _TestException, success)
 
1639
 
 
1640
        self.assertRaises(AssertionError,
 
1641
            self.assertListRaises, _TestException, success_generator)
 
1642
 
 
1643
    def _run_successful_test(self, test):
 
1644
        result = testtools.TestResult()
 
1645
        test.run(result)
 
1646
        self.assertTrue(result.wasSuccessful())
 
1647
        return result
 
1648
 
 
1649
    def test_overrideAttr_without_value(self):
 
1650
        self.test_attr = 'original' # Define a test attribute
 
1651
        obj = self # Make 'obj' visible to the embedded test
 
1652
        class Test(tests.TestCase):
 
1653
 
 
1654
            def setUp(self):
 
1655
                super(Test, self).setUp()
 
1656
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1657
 
 
1658
            def test_value(self):
 
1659
                self.assertEqual('original', self.orig)
 
1660
                self.assertEqual('original', obj.test_attr)
 
1661
                obj.test_attr = 'modified'
 
1662
                self.assertEqual('modified', obj.test_attr)
 
1663
 
 
1664
        self._run_successful_test(Test('test_value'))
 
1665
        self.assertEqual('original', obj.test_attr)
 
1666
 
 
1667
    def test_overrideAttr_with_value(self):
 
1668
        self.test_attr = 'original' # Define a test attribute
 
1669
        obj = self # Make 'obj' visible to the embedded test
 
1670
        class Test(tests.TestCase):
 
1671
 
 
1672
            def setUp(self):
 
1673
                super(Test, self).setUp()
 
1674
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1675
 
 
1676
            def test_value(self):
 
1677
                self.assertEqual('original', self.orig)
 
1678
                self.assertEqual('modified', obj.test_attr)
 
1679
 
 
1680
        self._run_successful_test(Test('test_value'))
 
1681
        self.assertEqual('original', obj.test_attr)
 
1682
 
 
1683
    def test_overrideAttr_with_no_existing_value_and_value(self):
 
1684
        # Do not define the test_attribute
 
1685
        obj = self # Make 'obj' visible to the embedded test
 
1686
        class Test(tests.TestCase):
 
1687
 
 
1688
            def setUp(self):
 
1689
                tests.TestCase.setUp(self)
 
1690
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1691
 
 
1692
            def test_value(self):
 
1693
                self.assertEqual(tests._unitialized_attr, self.orig)
 
1694
                self.assertEqual('modified', obj.test_attr)
 
1695
 
 
1696
        self._run_successful_test(Test('test_value'))
 
1697
        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1698
 
 
1699
    def test_overrideAttr_with_no_existing_value_and_no_value(self):
 
1700
        # Do not define the test_attribute
 
1701
        obj = self # Make 'obj' visible to the embedded test
 
1702
        class Test(tests.TestCase):
 
1703
 
 
1704
            def setUp(self):
 
1705
                tests.TestCase.setUp(self)
 
1706
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1707
 
 
1708
            def test_value(self):
 
1709
                self.assertEqual(tests._unitialized_attr, self.orig)
 
1710
                self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1711
 
 
1712
        self._run_successful_test(Test('test_value'))
 
1713
        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
 
1714
 
 
1715
    def test_recordCalls(self):
 
1716
        from breezy.tests import test_selftest
 
1717
        calls = self.recordCalls(
 
1718
            test_selftest, '_add_numbers')
 
1719
        self.assertEqual(test_selftest._add_numbers(2, 10),
 
1720
            12)
 
1721
        self.assertEqual(calls, [((2, 10), {})])
 
1722
 
 
1723
 
 
1724
def _add_numbers(a, b):
 
1725
    return a + b
 
1726
 
 
1727
 
 
1728
class _MissingFeature(features.Feature):
 
1729
    def _probe(self):
 
1730
        return False
 
1731
missing_feature = _MissingFeature()
 
1732
 
 
1733
 
 
1734
def _get_test(name):
 
1735
    """Get an instance of a specific example test.
 
1736
 
 
1737
    We protect this in a function so that they don't auto-run in the test
 
1738
    suite.
 
1739
    """
 
1740
 
 
1741
    class ExampleTests(tests.TestCase):
 
1742
 
 
1743
        def test_fail(self):
 
1744
            mutter('this was a failing test')
 
1745
            self.fail('this test will fail')
 
1746
 
 
1747
        def test_error(self):
 
1748
            mutter('this test errored')
 
1749
            raise RuntimeError('gotcha')
 
1750
 
 
1751
        def test_missing_feature(self):
 
1752
            mutter('missing the feature')
 
1753
            self.requireFeature(missing_feature)
 
1754
 
 
1755
        def test_skip(self):
 
1756
            mutter('this test will be skipped')
 
1757
            raise tests.TestSkipped('reason')
 
1758
 
 
1759
        def test_success(self):
 
1760
            mutter('this test succeeds')
 
1761
 
 
1762
        def test_xfail(self):
 
1763
            mutter('test with expected failure')
 
1764
            self.knownFailure('this_fails')
 
1765
 
 
1766
        def test_unexpected_success(self):
 
1767
            mutter('test with unexpected success')
 
1768
            self.expectFailure('should_fail', lambda: None)
 
1769
 
 
1770
    return ExampleTests(name)
 
1771
 
 
1772
 
 
1773
def _get_skip_reasons(result):
 
1774
    # GZ 2017-06-06: Newer testtools doesn't have this, uses detail instead
 
1775
    return result.skip_reasons
 
1776
 
 
1777
 
 
1778
class TestTestCaseLogDetails(tests.TestCase):
 
1779
 
 
1780
    def _run_test(self, test_name):
 
1781
        test = _get_test(test_name)
 
1782
        result = testtools.TestResult()
 
1783
        test.run(result)
 
1784
        return result
 
1785
 
 
1786
    def test_fail_has_log(self):
 
1787
        result = self._run_test('test_fail')
 
1788
        self.assertEqual(1, len(result.failures))
 
1789
        result_content = result.failures[0][1]
 
1790
        self.assertContainsRe(result_content,
 
1791
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1792
        self.assertContainsRe(result_content, 'this was a failing test')
 
1793
 
 
1794
    def test_error_has_log(self):
 
1795
        result = self._run_test('test_error')
 
1796
        self.assertEqual(1, len(result.errors))
 
1797
        result_content = result.errors[0][1]
 
1798
        self.assertContainsRe(result_content,
 
1799
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1800
        self.assertContainsRe(result_content, 'this test errored')
 
1801
 
 
1802
    def test_skip_has_no_log(self):
 
1803
        result = self._run_test('test_skip')
 
1804
        reasons = _get_skip_reasons(result)
 
1805
        self.assertEqual({'reason'}, set(reasons))
 
1806
        skips = reasons['reason']
 
1807
        self.assertEqual(1, len(skips))
 
1808
        test = skips[0]
 
1809
        self.assertFalse('log' in test.getDetails())
 
1810
 
 
1811
    def test_missing_feature_has_no_log(self):
 
1812
        # testtools doesn't know about addNotSupported, so it just gets
 
1813
        # considered as a skip
 
1814
        result = self._run_test('test_missing_feature')
 
1815
        reasons = _get_skip_reasons(result)
 
1816
        self.assertEqual({missing_feature}, set(reasons))
 
1817
        skips = reasons[missing_feature]
 
1818
        self.assertEqual(1, len(skips))
 
1819
        test = skips[0]
 
1820
        self.assertFalse('log' in test.getDetails())
 
1821
 
 
1822
    def test_xfail_has_no_log(self):
 
1823
        result = self._run_test('test_xfail')
 
1824
        self.assertEqual(1, len(result.expectedFailures))
 
1825
        result_content = result.expectedFailures[0][1]
 
1826
        self.assertNotContainsRe(result_content,
 
1827
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
1828
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1829
 
 
1830
    def test_unexpected_success_has_log(self):
 
1831
        result = self._run_test('test_unexpected_success')
 
1832
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1833
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1834
        # expectedFailures is a list of reasons?
 
1835
        test = result.unexpectedSuccesses[0]
 
1836
        details = test.getDetails()
 
1837
        self.assertTrue('log' in details)
 
1838
 
 
1839
 
 
1840
class TestTestCloning(tests.TestCase):
 
1841
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1842
 
 
1843
    def test_cloned_testcase_does_not_share_details(self):
 
1844
        """A TestCase cloned with clone_test does not share mutable attributes
 
1845
        such as details or cleanups.
 
1846
        """
 
1847
        class Test(tests.TestCase):
 
1848
            def test_foo(self):
 
1849
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1850
        orig_test = Test('test_foo')
 
1851
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1852
        orig_test.run(unittest.TestResult())
 
1853
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1854
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1855
 
 
1856
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1857
        """Applying two levels of scenarios to a test preserves the attributes
 
1858
        added by both scenarios.
 
1859
        """
 
1860
        class Test(tests.TestCase):
 
1861
            def test_foo(self):
 
1862
                pass
 
1863
        test = Test('test_foo')
 
1864
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1865
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1866
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1867
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1868
        all_tests = list(tests.iter_suite_tests(suite))
 
1869
        self.assertLength(4, all_tests)
 
1870
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1871
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1872
 
 
1873
 
 
1874
# NB: Don't delete this; it's not actually from 0.11!
 
1875
@deprecated_function(deprecated_in((0, 11, 0)))
 
1876
def sample_deprecated_function():
 
1877
    """A deprecated function to test applyDeprecated with."""
 
1878
    return 2
 
1879
 
 
1880
 
 
1881
def sample_undeprecated_function(a_param):
 
1882
    """A undeprecated function to test applyDeprecated with."""
 
1883
 
 
1884
 
 
1885
class ApplyDeprecatedHelper(object):
 
1886
    """A helper class for ApplyDeprecated tests."""
 
1887
 
 
1888
    @deprecated_method(deprecated_in((0, 11, 0)))
 
1889
    def sample_deprecated_method(self, param_one):
 
1890
        """A deprecated method for testing with."""
 
1891
        return param_one
 
1892
 
 
1893
    def sample_normal_method(self):
 
1894
        """A undeprecated method."""
 
1895
 
 
1896
    @deprecated_method(deprecated_in((0, 10, 0)))
 
1897
    def sample_nested_deprecation(self):
 
1898
        return sample_deprecated_function()
 
1899
 
 
1900
 
 
1901
class TestExtraAssertions(tests.TestCase):
 
1902
    """Tests for new test assertions in breezy test suite"""
 
1903
 
 
1904
    def test_assert_isinstance(self):
 
1905
        self.assertIsInstance(2, int)
 
1906
        self.assertIsInstance(u'', (str, text_type))
 
1907
        e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
 
1908
        self.assertEqual(str(e),
 
1909
            "None is an instance of <type 'NoneType'> rather than <type 'int'>")
 
1910
        self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
 
1911
        e = self.assertRaises(AssertionError,
 
1912
            self.assertIsInstance, None, int, "it's just not")
 
1913
        self.assertEqual(str(e),
 
1914
            "None is an instance of <type 'NoneType'> rather than <type 'int'>"
 
1915
            ": it's just not")
 
1916
 
 
1917
    def test_assertEndsWith(self):
 
1918
        self.assertEndsWith('foo', 'oo')
 
1919
        self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
 
1920
 
 
1921
    def test_assertEqualDiff(self):
 
1922
        e = self.assertRaises(AssertionError,
 
1923
                              self.assertEqualDiff, '', '\n')
 
1924
        self.assertEqual(str(e),
 
1925
                          # Don't blink ! The '+' applies to the second string
 
1926
                          'first string is missing a final newline.\n+ \n')
 
1927
        e = self.assertRaises(AssertionError,
 
1928
                              self.assertEqualDiff, '\n', '')
 
1929
        self.assertEqual(str(e),
 
1930
                          # Don't blink ! The '-' applies to the second string
 
1931
                          'second string is missing a final newline.\n- \n')
 
1932
 
 
1933
 
 
1934
class TestDeprecations(tests.TestCase):
 
1935
 
 
1936
    def test_applyDeprecated_not_deprecated(self):
 
1937
        sample_object = ApplyDeprecatedHelper()
 
1938
        # calling an undeprecated callable raises an assertion
 
1939
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1940
            deprecated_in((0, 11, 0)),
 
1941
            sample_object.sample_normal_method)
 
1942
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1943
            deprecated_in((0, 11, 0)),
 
1944
            sample_undeprecated_function, "a param value")
 
1945
        # calling a deprecated callable (function or method) with the wrong
 
1946
        # expected deprecation fails.
 
1947
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1948
            deprecated_in((0, 10, 0)),
 
1949
            sample_object.sample_deprecated_method, "a param value")
 
1950
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1951
            deprecated_in((0, 10, 0)),
 
1952
            sample_deprecated_function)
 
1953
        # calling a deprecated callable (function or method) with the right
 
1954
        # expected deprecation returns the functions result.
 
1955
        self.assertEqual("a param value",
 
1956
            self.applyDeprecated(deprecated_in((0, 11, 0)),
 
1957
            sample_object.sample_deprecated_method, "a param value"))
 
1958
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
 
1959
            sample_deprecated_function))
 
1960
        # calling a nested deprecation with the wrong deprecation version
 
1961
        # fails even if a deeper nested function was deprecated with the
 
1962
        # supplied version.
 
1963
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1964
            deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
 
1965
        # calling a nested deprecation with the right deprecation value
 
1966
        # returns the calls result.
 
1967
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
 
1968
            sample_object.sample_nested_deprecation))
 
1969
 
 
1970
    def test_callDeprecated(self):
 
1971
        def testfunc(be_deprecated, result=None):
 
1972
            if be_deprecated is True:
 
1973
                symbol_versioning.warn('i am deprecated', DeprecationWarning,
 
1974
                                       stacklevel=1)
 
1975
            return result
 
1976
        result = self.callDeprecated(['i am deprecated'], testfunc, True)
 
1977
        self.assertIs(None, result)
 
1978
        result = self.callDeprecated([], testfunc, False, 'result')
 
1979
        self.assertEqual('result', result)
 
1980
        self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
 
1981
        self.callDeprecated([], testfunc, be_deprecated=False)
 
1982
 
 
1983
 
 
1984
class TestWarningTests(tests.TestCase):
 
1985
    """Tests for calling methods that raise warnings."""
 
1986
 
 
1987
    def test_callCatchWarnings(self):
 
1988
        def meth(a, b):
 
1989
            warnings.warn("this is your last warning")
 
1990
            return a + b
 
1991
        wlist, result = self.callCatchWarnings(meth, 1, 2)
 
1992
        self.assertEqual(3, result)
 
1993
        # would like just to compare them, but UserWarning doesn't implement
 
1994
        # eq well
 
1995
        w0, = wlist
 
1996
        self.assertIsInstance(w0, UserWarning)
 
1997
        self.assertEqual("this is your last warning", str(w0))
 
1998
 
 
1999
 
 
2000
class TestConvenienceMakers(tests.TestCaseWithTransport):
 
2001
    """Test for the make_* convenience functions."""
 
2002
 
 
2003
    def test_make_branch_and_tree_with_format(self):
 
2004
        # we should be able to supply a format to make_branch_and_tree
 
2005
        self.make_branch_and_tree('a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
 
2006
        self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
 
2007
                              breezy.bzr.bzrdir.BzrDirMetaFormat1)
 
2008
 
 
2009
    def test_make_branch_and_memory_tree(self):
 
2010
        # we should be able to get a new branch and a mutable tree from
 
2011
        # TestCaseWithTransport
 
2012
        tree = self.make_branch_and_memory_tree('a')
 
2013
        self.assertIsInstance(tree, breezy.memorytree.MemoryTree)
 
2014
 
 
2015
    def test_make_tree_for_local_vfs_backed_transport(self):
 
2016
        # make_branch_and_tree has to use local branch and repositories
 
2017
        # when the vfs transport and local disk are colocated, even if
 
2018
        # a different transport is in use for url generation.
 
2019
        self.transport_server = test_server.FakeVFATServer
 
2020
        self.assertFalse(self.get_url('t1').startswith('file://'))
 
2021
        tree = self.make_branch_and_tree('t1')
 
2022
        base = tree.controldir.root_transport.base
 
2023
        self.assertStartsWith(base, 'file://')
 
2024
        self.assertEqual(tree.controldir.root_transport,
 
2025
                tree.branch.controldir.root_transport)
 
2026
        self.assertEqual(tree.controldir.root_transport,
 
2027
                tree.branch.repository.controldir.root_transport)
 
2028
 
 
2029
 
 
2030
class SelfTestHelper(object):
 
2031
 
 
2032
    def run_selftest(self, **kwargs):
 
2033
        """Run selftest returning its output."""
 
2034
        output = StringIO()
 
2035
        old_transport = breezy.tests.default_transport
 
2036
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
2037
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
2038
        try:
 
2039
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
2040
        finally:
 
2041
            breezy.tests.default_transport = old_transport
 
2042
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
2043
        output.seek(0)
 
2044
        return output
 
2045
 
 
2046
 
 
2047
class TestSelftest(tests.TestCase, SelfTestHelper):
 
2048
    """Tests of breezy.tests.selftest."""
 
2049
 
 
2050
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
 
2051
        factory_called = []
 
2052
        def factory():
 
2053
            factory_called.append(True)
 
2054
            return TestUtil.TestSuite()
 
2055
        out = StringIO()
 
2056
        err = StringIO()
 
2057
        self.apply_redirected(out, err, None, breezy.tests.selftest,
 
2058
            test_suite_factory=factory)
 
2059
        self.assertEqual([True], factory_called)
 
2060
 
 
2061
    def factory(self):
 
2062
        """A test suite factory."""
 
2063
        class Test(tests.TestCase):
 
2064
            def a(self):
 
2065
                pass
 
2066
            def b(self):
 
2067
                pass
 
2068
            def c(self):
 
2069
                pass
 
2070
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
2071
 
 
2072
    def test_list_only(self):
 
2073
        output = self.run_selftest(test_suite_factory=self.factory,
 
2074
            list_only=True)
 
2075
        self.assertEqual(3, len(output.readlines()))
 
2076
 
 
2077
    def test_list_only_filtered(self):
 
2078
        output = self.run_selftest(test_suite_factory=self.factory,
 
2079
            list_only=True, pattern="Test.b")
 
2080
        self.assertEndsWith(output.getvalue(), "Test.b\n")
 
2081
        self.assertLength(1, output.readlines())
 
2082
 
 
2083
    def test_list_only_excludes(self):
 
2084
        output = self.run_selftest(test_suite_factory=self.factory,
 
2085
            list_only=True, exclude_pattern="Test.b")
 
2086
        self.assertNotContainsRe("Test.b", output.getvalue())
 
2087
        self.assertLength(2, output.readlines())
 
2088
 
 
2089
    def test_lsprof_tests(self):
 
2090
        self.requireFeature(features.lsprof_feature)
 
2091
        results = []
 
2092
        class Test(object):
 
2093
            def __call__(test, result):
 
2094
                test.run(result)
 
2095
            def run(test, result):
 
2096
                results.append(result)
 
2097
            def countTestCases(self):
 
2098
                return 1
 
2099
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
 
2100
        self.assertLength(1, results)
 
2101
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
 
2102
 
 
2103
    def test_random(self):
 
2104
        # test randomising by listing a number of tests.
 
2105
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
2106
            list_only=True, random_seed="123")
 
2107
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
2108
            list_only=True, random_seed="234")
 
2109
        self.assertNotEqual(output_123, output_234)
 
2110
        # "Randominzing test order..\n\n
 
2111
        self.assertLength(5, output_123.readlines())
 
2112
        self.assertLength(5, output_234.readlines())
 
2113
 
 
2114
    def test_random_reuse_is_same_order(self):
 
2115
        # test randomising by listing a number of tests.
 
2116
        expected = self.run_selftest(test_suite_factory=self.factory,
 
2117
            list_only=True, random_seed="123")
 
2118
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
2119
            list_only=True, random_seed="123")
 
2120
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
2121
 
 
2122
    def test_runner_class(self):
 
2123
        self.requireFeature(features.subunit)
 
2124
        from subunit import ProtocolTestCase
 
2125
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2126
            test_suite_factory=self.factory)
 
2127
        test = ProtocolTestCase(stream)
 
2128
        result = unittest.TestResult()
 
2129
        test.run(result)
 
2130
        self.assertEqual(3, result.testsRun)
 
2131
 
 
2132
    def test_starting_with_single_argument(self):
 
2133
        output = self.run_selftest(test_suite_factory=self.factory,
 
2134
            starting_with=['breezy.tests.test_selftest.Test.a'],
 
2135
            list_only=True)
 
2136
        self.assertEqual('breezy.tests.test_selftest.Test.a\n',
 
2137
            output.getvalue())
 
2138
 
 
2139
    def test_starting_with_multiple_argument(self):
 
2140
        output = self.run_selftest(test_suite_factory=self.factory,
 
2141
            starting_with=['breezy.tests.test_selftest.Test.a',
 
2142
                'breezy.tests.test_selftest.Test.b'],
 
2143
            list_only=True)
 
2144
        self.assertEqual('breezy.tests.test_selftest.Test.a\n'
 
2145
            'breezy.tests.test_selftest.Test.b\n',
 
2146
            output.getvalue())
 
2147
 
 
2148
    def check_transport_set(self, transport_server):
 
2149
        captured_transport = []
 
2150
        def seen_transport(a_transport):
 
2151
            captured_transport.append(a_transport)
 
2152
        class Capture(tests.TestCase):
 
2153
            def a(self):
 
2154
                seen_transport(breezy.tests.default_transport)
 
2155
        def factory():
 
2156
            return TestUtil.TestSuite([Capture("a")])
 
2157
        self.run_selftest(transport=transport_server, test_suite_factory=factory)
 
2158
        self.assertEqual(transport_server, captured_transport[0])
 
2159
 
 
2160
    def test_transport_sftp(self):
 
2161
        self.requireFeature(features.paramiko)
 
2162
        from breezy.tests import stub_sftp
 
2163
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
 
2164
 
 
2165
    def test_transport_memory(self):
 
2166
        self.check_transport_set(memory.MemoryServer)
 
2167
 
 
2168
 
 
2169
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
2170
    # Does IO: reads test.list
 
2171
 
 
2172
    def test_load_list(self):
 
2173
        # Provide a list with one test - this test.
 
2174
        test_id_line = '%s\n' % self.id()
 
2175
        self.build_tree_contents([('test.list', test_id_line)])
 
2176
        # And generate a list of the tests in  the suite.
 
2177
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
2178
        self.assertEqual(test_id_line, stream.getvalue())
 
2179
 
 
2180
    def test_load_unknown(self):
 
2181
        # Provide a list with one test - this test.
 
2182
        # And generate a list of the tests in  the suite.
 
2183
        err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
2184
            load_list='missing file name', list_only=True)
 
2185
 
 
2186
 
 
2187
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2188
 
 
2189
    _test_needs_features = [features.subunit]
 
2190
 
 
2191
    def run_subunit_stream(self, test_name):
 
2192
        from subunit import ProtocolTestCase
 
2193
        def factory():
 
2194
            return TestUtil.TestSuite([_get_test(test_name)])
 
2195
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2196
            test_suite_factory=factory)
 
2197
        test = ProtocolTestCase(stream)
 
2198
        result = testtools.TestResult()
 
2199
        test.run(result)
 
2200
        content = stream.getvalue()
 
2201
        return content, result
 
2202
 
 
2203
    def test_fail_has_log(self):
 
2204
        content, result = self.run_subunit_stream('test_fail')
 
2205
        self.assertEqual(1, len(result.failures))
 
2206
        self.assertContainsRe(content, '(?m)^log$')
 
2207
        self.assertContainsRe(content, 'this test will fail')
 
2208
 
 
2209
    def test_error_has_log(self):
 
2210
        content, result = self.run_subunit_stream('test_error')
 
2211
        self.assertContainsRe(content, '(?m)^log$')
 
2212
        self.assertContainsRe(content, 'this test errored')
 
2213
 
 
2214
    def test_skip_has_no_log(self):
 
2215
        content, result = self.run_subunit_stream('test_skip')
 
2216
        self.assertNotContainsRe(content, '(?m)^log$')
 
2217
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2218
        reasons = _get_skip_reasons(result)
 
2219
        self.assertEqual({'reason'}, set(reasons))
 
2220
        skips = reasons['reason']
 
2221
        self.assertEqual(1, len(skips))
 
2222
        test = skips[0]
 
2223
        # RemotedTestCase doesn't preserve the "details"
 
2224
        ## self.assertFalse('log' in test.getDetails())
 
2225
 
 
2226
    def test_missing_feature_has_no_log(self):
 
2227
        content, result = self.run_subunit_stream('test_missing_feature')
 
2228
        self.assertNotContainsRe(content, '(?m)^log$')
 
2229
        self.assertNotContainsRe(content, 'missing the feature')
 
2230
        reasons = _get_skip_reasons(result)
 
2231
        self.assertEqual({'_MissingFeature\n'}, set(reasons))
 
2232
        skips = reasons['_MissingFeature\n']
 
2233
        self.assertEqual(1, len(skips))
 
2234
        test = skips[0]
 
2235
        # RemotedTestCase doesn't preserve the "details"
 
2236
        ## self.assertFalse('log' in test.getDetails())
 
2237
 
 
2238
    def test_xfail_has_no_log(self):
 
2239
        content, result = self.run_subunit_stream('test_xfail')
 
2240
        self.assertNotContainsRe(content, '(?m)^log$')
 
2241
        self.assertNotContainsRe(content, 'test with expected failure')
 
2242
        self.assertEqual(1, len(result.expectedFailures))
 
2243
        result_content = result.expectedFailures[0][1]
 
2244
        self.assertNotContainsRe(result_content,
 
2245
            '(?m)^(?:Text attachment: )?log(?:$|: )')
 
2246
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2247
 
 
2248
    def test_unexpected_success_has_log(self):
 
2249
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2250
        self.assertContainsRe(content, '(?m)^log$')
 
2251
        self.assertContainsRe(content, 'test with unexpected success')
 
2252
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
 
2253
        #                success, if a min version check is added remove this
 
2254
        from subunit import TestProtocolClient as _Client
 
2255
        if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
 
2256
            self.expectFailure('subunit treats "unexpectedSuccess"'
 
2257
                               ' as a plain success',
 
2258
                self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2259
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2260
        test = result.unexpectedSuccesses[0]
 
2261
        # RemotedTestCase doesn't preserve the "details"
 
2262
        ## self.assertTrue('log' in test.getDetails())
 
2263
 
 
2264
    def test_success_has_no_log(self):
 
2265
        content, result = self.run_subunit_stream('test_success')
 
2266
        self.assertEqual(1, result.testsRun)
 
2267
        self.assertNotContainsRe(content, '(?m)^log$')
 
2268
        self.assertNotContainsRe(content, 'this test succeeds')
 
2269
 
 
2270
 
 
2271
class TestRunBzr(tests.TestCase):
 
2272
 
 
2273
    out = ''
 
2274
    err = ''
 
2275
 
 
2276
    def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
 
2277
                         working_dir=None):
 
2278
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
2279
 
 
2280
        Attempts to run bzr from inside this class don't actually run it.
 
2281
 
 
2282
        We test how run_bzr actually invokes bzr in another location.  Here we
 
2283
        only need to test that it passes the right parameters to run_bzr.
 
2284
        """
 
2285
        self.argv = list(argv)
 
2286
        self.retcode = retcode
 
2287
        self.encoding = encoding
 
2288
        self.stdin = stdin
 
2289
        self.working_dir = working_dir
 
2290
        return self.retcode, self.out, self.err
 
2291
 
 
2292
    def test_run_bzr_error(self):
 
2293
        self.out = "It sure does!\n"
 
2294
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
2295
        self.assertEqual(['rocks'], self.argv)
 
2296
        self.assertEqual(34, self.retcode)
 
2297
        self.assertEqual('It sure does!\n', out)
 
2298
        self.assertEqual(out, self.out)
 
2299
        self.assertEqual('', err)
 
2300
        self.assertEqual(err, self.err)
 
2301
 
 
2302
    def test_run_bzr_error_regexes(self):
 
2303
        self.out = ''
 
2304
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
2305
        out, err = self.run_bzr_error(
 
2306
            ["bzr: ERROR: foobarbaz is not versioned"],
 
2307
            ['file-id', 'foobarbaz'])
 
2308
 
 
2309
    def test_encoding(self):
 
2310
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
2311
        self.run_bzr('foo bar')
 
2312
        self.assertEqual(None, self.encoding)
 
2313
        self.assertEqual(['foo', 'bar'], self.argv)
 
2314
 
 
2315
        self.run_bzr('foo bar', encoding='baz')
 
2316
        self.assertEqual('baz', self.encoding)
 
2317
        self.assertEqual(['foo', 'bar'], self.argv)
 
2318
 
 
2319
    def test_retcode(self):
 
2320
        """Test that run_bzr passes retcode to _run_bzr_core"""
 
2321
        # Default is retcode == 0
 
2322
        self.run_bzr('foo bar')
 
2323
        self.assertEqual(0, self.retcode)
 
2324
        self.assertEqual(['foo', 'bar'], self.argv)
 
2325
 
 
2326
        self.run_bzr('foo bar', retcode=1)
 
2327
        self.assertEqual(1, self.retcode)
 
2328
        self.assertEqual(['foo', 'bar'], self.argv)
 
2329
 
 
2330
        self.run_bzr('foo bar', retcode=None)
 
2331
        self.assertEqual(None, self.retcode)
 
2332
        self.assertEqual(['foo', 'bar'], self.argv)
 
2333
 
 
2334
        self.run_bzr(['foo', 'bar'], retcode=3)
 
2335
        self.assertEqual(3, self.retcode)
 
2336
        self.assertEqual(['foo', 'bar'], self.argv)
 
2337
 
 
2338
    def test_stdin(self):
 
2339
        # test that the stdin keyword to run_bzr is passed through to
 
2340
        # _run_bzr_core as-is. We do this by overriding
 
2341
        # _run_bzr_core in this class, and then calling run_bzr,
 
2342
        # which is a convenience function for _run_bzr_core, so
 
2343
        # should invoke it.
 
2344
        self.run_bzr('foo bar', stdin='gam')
 
2345
        self.assertEqual('gam', self.stdin)
 
2346
        self.assertEqual(['foo', 'bar'], self.argv)
 
2347
 
 
2348
        self.run_bzr('foo bar', stdin='zippy')
 
2349
        self.assertEqual('zippy', self.stdin)
 
2350
        self.assertEqual(['foo', 'bar'], self.argv)
 
2351
 
 
2352
    def test_working_dir(self):
 
2353
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
2354
        self.run_bzr('foo bar')
 
2355
        self.assertEqual(None, self.working_dir)
 
2356
        self.assertEqual(['foo', 'bar'], self.argv)
 
2357
 
 
2358
        self.run_bzr('foo bar', working_dir='baz')
 
2359
        self.assertEqual('baz', self.working_dir)
 
2360
        self.assertEqual(['foo', 'bar'], self.argv)
 
2361
 
 
2362
    def test_reject_extra_keyword_arguments(self):
 
2363
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
2364
                          error_regex=['error message'])
 
2365
 
 
2366
 
 
2367
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2368
    # Does IO when testing the working_dir parameter.
 
2369
 
 
2370
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2371
                         a_callable=None, *args, **kwargs):
 
2372
        self.stdin = stdin
 
2373
        self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None)
 
2374
        self.factory = breezy.ui.ui_factory
 
2375
        self.working_dir = osutils.getcwd()
 
2376
        stdout.write('foo\n')
 
2377
        stderr.write('bar\n')
 
2378
        return 0
 
2379
 
 
2380
    def test_stdin(self):
 
2381
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2382
        # apply_redirected as a StringIO. We do this by overriding
 
2383
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2384
        # which calls apply_redirected.
 
2385
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2386
        self.assertEqual('gam', self.stdin.read())
 
2387
        self.assertTrue(self.stdin is self.factory_stdin)
 
2388
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2389
        self.assertEqual('zippy', self.stdin.read())
 
2390
        self.assertTrue(self.stdin is self.factory_stdin)
 
2391
 
 
2392
    def test_ui_factory(self):
 
2393
        # each invocation of self.run_bzr should get its
 
2394
        # own UI factory, which is an instance of TestUIFactory,
 
2395
        # with stdin, stdout and stderr attached to the stdin,
 
2396
        # stdout and stderr of the invoked run_bzr
 
2397
        current_factory = breezy.ui.ui_factory
 
2398
        self.run_bzr(['foo'])
 
2399
        self.assertFalse(current_factory is self.factory)
 
2400
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2401
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2402
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2403
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2404
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2405
 
 
2406
    def test_working_dir(self):
 
2407
        self.build_tree(['one/', 'two/'])
 
2408
        cwd = osutils.getcwd()
 
2409
 
 
2410
        # Default is to work in the current directory
 
2411
        self.run_bzr(['foo', 'bar'])
 
2412
        self.assertEqual(cwd, self.working_dir)
 
2413
 
 
2414
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2415
        self.assertEqual(cwd, self.working_dir)
 
2416
 
 
2417
        # The function should be run in the alternative directory
 
2418
        # but afterwards the current working dir shouldn't be changed
 
2419
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2420
        self.assertNotEqual(cwd, self.working_dir)
 
2421
        self.assertEndsWith(self.working_dir, 'one')
 
2422
        self.assertEqual(cwd, osutils.getcwd())
 
2423
 
 
2424
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2425
        self.assertNotEqual(cwd, self.working_dir)
 
2426
        self.assertEndsWith(self.working_dir, 'two')
 
2427
        self.assertEqual(cwd, osutils.getcwd())
 
2428
 
 
2429
 
 
2430
class StubProcess(object):
 
2431
    """A stub process for testing run_bzr_subprocess."""
 
2432
    
 
2433
    def __init__(self, out="", err="", retcode=0):
 
2434
        self.out = out
 
2435
        self.err = err
 
2436
        self.returncode = retcode
 
2437
 
 
2438
    def communicate(self):
 
2439
        return self.out, self.err
 
2440
 
 
2441
 
 
2442
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
 
2443
    """Base class for tests testing how we might run bzr."""
 
2444
 
 
2445
    def setUp(self):
 
2446
        super(TestWithFakedStartBzrSubprocess, self).setUp()
 
2447
        self.subprocess_calls = []
 
2448
 
 
2449
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2450
                             skip_if_plan_to_signal=False,
 
2451
                             working_dir=None,
 
2452
                             allow_plugins=False):
 
2453
        """capture what run_bzr_subprocess tries to do."""
 
2454
        self.subprocess_calls.append({'process_args':process_args,
 
2455
            'env_changes':env_changes,
 
2456
            'skip_if_plan_to_signal':skip_if_plan_to_signal,
 
2457
            'working_dir':working_dir, 'allow_plugins':allow_plugins})
 
2458
        return self.next_subprocess
 
2459
 
 
2460
 
 
2461
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2462
 
 
2463
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2464
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2465
 
 
2466
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2467
        that will return static results. This assertion method populates those
 
2468
        results and also checks the arguments run_bzr_subprocess generates.
 
2469
        """
 
2470
        self.next_subprocess = process
 
2471
        try:
 
2472
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2473
        except:
 
2474
            self.next_subprocess = None
 
2475
            for key, expected in expected_args.items():
 
2476
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2477
            raise
 
2478
        else:
 
2479
            self.next_subprocess = None
 
2480
            for key, expected in expected_args.items():
 
2481
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2482
            return result
 
2483
 
 
2484
    def test_run_bzr_subprocess(self):
 
2485
        """The run_bzr_helper_external command behaves nicely."""
 
2486
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2487
            StubProcess(), '--version')
 
2488
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2489
            StubProcess(), ['--version'])
 
2490
        # retcode=None disables retcode checking
 
2491
        result = self.assertRunBzrSubprocess({},
 
2492
            StubProcess(retcode=3), '--version', retcode=None)
 
2493
        result = self.assertRunBzrSubprocess({},
 
2494
            StubProcess(out="is free software"), '--version')
 
2495
        self.assertContainsRe(result[0], 'is free software')
 
2496
        # Running a subcommand that is missing errors
 
2497
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2498
            {'process_args':['--versionn']}, StubProcess(retcode=3),
 
2499
            '--versionn')
 
2500
        # Unless it is told to expect the error from the subprocess
 
2501
        result = self.assertRunBzrSubprocess({},
 
2502
            StubProcess(retcode=3), '--versionn', retcode=3)
 
2503
        # Or to ignore retcode checking
 
2504
        result = self.assertRunBzrSubprocess({},
 
2505
            StubProcess(err="unknown command", retcode=3), '--versionn',
 
2506
            retcode=None)
 
2507
        self.assertContainsRe(result[1], 'unknown command')
 
2508
 
 
2509
    def test_env_change_passes_through(self):
 
2510
        self.assertRunBzrSubprocess(
 
2511
            {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
 
2512
            StubProcess(), '',
 
2513
            env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
 
2514
 
 
2515
    def test_no_working_dir_passed_as_None(self):
 
2516
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2517
 
 
2518
    def test_no_working_dir_passed_through(self):
 
2519
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2520
            working_dir='dir')
 
2521
 
 
2522
    def test_run_bzr_subprocess_no_plugins(self):
 
2523
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2524
            StubProcess(), '')
 
2525
 
 
2526
    def test_allow_plugins(self):
 
2527
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2528
            StubProcess(), '', allow_plugins=True)
 
2529
 
 
2530
 
 
2531
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2532
 
 
2533
    def test_finish_bzr_subprocess_with_error(self):
 
2534
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2535
        """
 
2536
        process = StubProcess(err="unknown command", retcode=3)
 
2537
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2538
        self.assertEqual('', result[0])
 
2539
        self.assertContainsRe(result[1], 'unknown command')
 
2540
 
 
2541
    def test_finish_bzr_subprocess_ignoring_retcode(self):
 
2542
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2543
        process = StubProcess(err="unknown command", retcode=3)
 
2544
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2545
        self.assertEqual('', result[0])
 
2546
        self.assertContainsRe(result[1], 'unknown command')
 
2547
 
 
2548
    def test_finish_subprocess_with_unexpected_retcode(self):
 
2549
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2550
        not the expected one.
 
2551
        """
 
2552
        process = StubProcess(err="unknown command", retcode=3)
 
2553
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2554
                          process)
 
2555
 
 
2556
 
 
2557
class _DontSpawnProcess(Exception):
 
2558
    """A simple exception which just allows us to skip unnecessary steps"""
 
2559
 
 
2560
 
 
2561
class TestStartBzrSubProcess(tests.TestCase):
 
2562
    """Stub test start_bzr_subprocess."""
 
2563
 
 
2564
    def _subprocess_log_cleanup(self):
 
2565
        """Inhibits the base version as we don't produce a log file."""
 
2566
 
 
2567
    def _popen(self, *args, **kwargs):
 
2568
        """Override the base version to record the command that is run.
 
2569
 
 
2570
        From there we can ensure it is correct without spawning a real process.
 
2571
        """
 
2572
        self.check_popen_state()
 
2573
        self._popen_args = args
 
2574
        self._popen_kwargs = kwargs
 
2575
        raise _DontSpawnProcess()
 
2576
 
 
2577
    def check_popen_state(self):
 
2578
        """Replace to make assertions when popen is called."""
 
2579
 
 
2580
    def test_run_bzr_subprocess_no_plugins(self):
 
2581
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2582
        command = self._popen_args[0]
 
2583
        self.assertEqual(sys.executable, command[0])
 
2584
        self.assertEqual(self.get_brz_path(), command[1])
 
2585
        self.assertEqual(['--no-plugins'], command[2:])
 
2586
 
 
2587
    def test_allow_plugins(self):
 
2588
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2589
                          allow_plugins=True)
 
2590
        command = self._popen_args[0]
 
2591
        self.assertEqual([], command[2:])
 
2592
 
 
2593
    def test_set_env(self):
 
2594
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2595
        # set in the child
 
2596
        def check_environment():
 
2597
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2598
        self.check_popen_state = check_environment
 
2599
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2600
                          env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2601
        # not set in theparent
 
2602
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2603
 
 
2604
    def test_run_bzr_subprocess_env_del(self):
 
2605
        """run_bzr_subprocess can remove environment variables too."""
 
2606
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2607
        def check_environment():
 
2608
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2609
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2610
        self.check_popen_state = check_environment
 
2611
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2612
                          env_changes={'EXISTANT_ENV_VAR':None})
 
2613
        # Still set in parent
 
2614
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2615
        del os.environ['EXISTANT_ENV_VAR']
 
2616
 
 
2617
    def test_env_del_missing(self):
 
2618
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2619
        def check_environment():
 
2620
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2621
        self.check_popen_state = check_environment
 
2622
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2623
                          env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2624
 
 
2625
    def test_working_dir(self):
 
2626
        """Test that we can specify the working dir for the child"""
 
2627
        orig_getcwd = osutils.getcwd
 
2628
        orig_chdir = os.chdir
 
2629
        chdirs = []
 
2630
        def chdir(path):
 
2631
            chdirs.append(path)
 
2632
        self.overrideAttr(os, 'chdir', chdir)
 
2633
        def getcwd():
 
2634
            return 'current'
 
2635
        self.overrideAttr(osutils, 'getcwd', getcwd)
 
2636
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2637
                          working_dir='foo')
 
2638
        self.assertEqual(['foo', 'current'], chdirs)
 
2639
 
 
2640
    def test_get_brz_path_with_cwd_breezy(self):
 
2641
        self.get_source_path = lambda: ""
 
2642
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2643
        self.assertEqual(self.get_brz_path(), "brz")
 
2644
 
 
2645
 
 
2646
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
 
2647
    """Tests that really need to do things with an external bzr."""
 
2648
 
 
2649
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2650
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2651
        not the expected one.
 
2652
        """
 
2653
        self.disable_missing_extensions_warning()
 
2654
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2655
                                            skip_if_plan_to_signal=True)
 
2656
        self.assertEqual('running\n', process.stdout.readline())
 
2657
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2658
                                            retcode=3)
 
2659
        self.assertEqual('', result[0])
 
2660
        self.assertEqual('brz: interrupted\n', result[1])
 
2661
 
 
2662
 
 
2663
class TestSelftestFiltering(tests.TestCase):
 
2664
 
 
2665
    def setUp(self):
 
2666
        super(TestSelftestFiltering, self).setUp()
 
2667
        self.suite = TestUtil.TestSuite()
 
2668
        self.loader = TestUtil.TestLoader()
 
2669
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2670
            sys.modules['breezy.tests.test_selftest']))
 
2671
        self.all_names = _test_ids(self.suite)
 
2672
 
 
2673
    def test_condition_id_re(self):
 
2674
        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2675
            'test_condition_id_re')
 
2676
        filtered_suite = tests.filter_suite_by_condition(
 
2677
            self.suite, tests.condition_id_re('test_condition_id_re'))
 
2678
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2679
 
 
2680
    def test_condition_id_in_list(self):
 
2681
        test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
 
2682
                      'test_condition_id_in_list']
 
2683
        id_list = tests.TestIdList(test_names)
 
2684
        filtered_suite = tests.filter_suite_by_condition(
 
2685
            self.suite, tests.condition_id_in_list(id_list))
 
2686
        my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
 
2687
        re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
 
2688
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2689
 
 
2690
    def test_condition_id_startswith(self):
 
2691
        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2692
        start1 = klass + 'test_condition_id_starts'
 
2693
        start2 = klass + 'test_condition_id_in'
 
2694
        test_names = [ klass + 'test_condition_id_in_list',
 
2695
                      klass + 'test_condition_id_startswith',
 
2696
                     ]
 
2697
        filtered_suite = tests.filter_suite_by_condition(
 
2698
            self.suite, tests.condition_id_startswith([start1, start2]))
 
2699
        self.assertEqual(test_names, _test_ids(filtered_suite))
 
2700
 
 
2701
    def test_condition_isinstance(self):
 
2702
        filtered_suite = tests.filter_suite_by_condition(
 
2703
            self.suite, tests.condition_isinstance(self.__class__))
 
2704
        class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2705
        re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
 
2706
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2707
 
 
2708
    def test_exclude_tests_by_condition(self):
 
2709
        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2710
            'test_exclude_tests_by_condition')
 
2711
        filtered_suite = tests.exclude_tests_by_condition(self.suite,
 
2712
            lambda x:x.id() == excluded_name)
 
2713
        self.assertEqual(len(self.all_names) - 1,
 
2714
            filtered_suite.countTestCases())
 
2715
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2716
        remaining_names = list(self.all_names)
 
2717
        remaining_names.remove(excluded_name)
 
2718
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2719
 
 
2720
    def test_exclude_tests_by_re(self):
 
2721
        self.all_names = _test_ids(self.suite)
 
2722
        filtered_suite = tests.exclude_tests_by_re(self.suite,
 
2723
                                                   'exclude_tests_by_re')
 
2724
        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2725
            'test_exclude_tests_by_re')
 
2726
        self.assertEqual(len(self.all_names) - 1,
 
2727
            filtered_suite.countTestCases())
 
2728
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2729
        remaining_names = list(self.all_names)
 
2730
        remaining_names.remove(excluded_name)
 
2731
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2732
 
 
2733
    def test_filter_suite_by_condition(self):
 
2734
        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2735
            'test_filter_suite_by_condition')
 
2736
        filtered_suite = tests.filter_suite_by_condition(self.suite,
 
2737
            lambda x:x.id() == test_name)
 
2738
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2739
 
 
2740
    def test_filter_suite_by_re(self):
 
2741
        filtered_suite = tests.filter_suite_by_re(self.suite,
 
2742
                                                  'test_filter_suite_by_r')
 
2743
        filtered_names = _test_ids(filtered_suite)
 
2744
        self.assertEqual(filtered_names, ['breezy.tests.test_selftest.'
 
2745
            'TestSelftestFiltering.test_filter_suite_by_re'])
 
2746
 
 
2747
    def test_filter_suite_by_id_list(self):
 
2748
        test_list = ['breezy.tests.test_selftest.'
 
2749
                     'TestSelftestFiltering.test_filter_suite_by_id_list']
 
2750
        filtered_suite = tests.filter_suite_by_id_list(
 
2751
            self.suite, tests.TestIdList(test_list))
 
2752
        filtered_names = _test_ids(filtered_suite)
 
2753
        self.assertEqual(
 
2754
            filtered_names,
 
2755
            ['breezy.tests.test_selftest.'
 
2756
             'TestSelftestFiltering.test_filter_suite_by_id_list'])
 
2757
 
 
2758
    def test_filter_suite_by_id_startswith(self):
 
2759
        # By design this test may fail if another test is added whose name also
 
2760
        # begins with one of the start value used.
 
2761
        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
 
2762
        start1 = klass + 'test_filter_suite_by_id_starts'
 
2763
        start2 = klass + 'test_filter_suite_by_id_li'
 
2764
        test_list = [klass + 'test_filter_suite_by_id_list',
 
2765
                     klass + 'test_filter_suite_by_id_startswith',
 
2766
                     ]
 
2767
        filtered_suite = tests.filter_suite_by_id_startswith(
 
2768
            self.suite, [start1, start2])
 
2769
        self.assertEqual(
 
2770
            test_list,
 
2771
            _test_ids(filtered_suite),
 
2772
            )
 
2773
 
 
2774
    def test_preserve_input(self):
 
2775
        # NB: Surely this is something in the stdlib to do this?
 
2776
        self.assertTrue(self.suite is tests.preserve_input(self.suite))
 
2777
        self.assertTrue("@#$" is tests.preserve_input("@#$"))
 
2778
 
 
2779
    def test_randomize_suite(self):
 
2780
        randomized_suite = tests.randomize_suite(self.suite)
 
2781
        # randomizing should not add or remove test names.
 
2782
        self.assertEqual(set(_test_ids(self.suite)),
 
2783
                         set(_test_ids(randomized_suite)))
 
2784
        # Technically, this *can* fail, because random.shuffle(list) can be
 
2785
        # equal to list. Trying multiple times just pushes the frequency back.
 
2786
        # As its len(self.all_names)!:1, the failure frequency should be low
 
2787
        # enough to ignore. RBC 20071021.
 
2788
        # It should change the order.
 
2789
        self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
 
2790
        # But not the length. (Possibly redundant with the set test, but not
 
2791
        # necessarily.)
 
2792
        self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
 
2793
 
 
2794
    def test_split_suit_by_condition(self):
 
2795
        self.all_names = _test_ids(self.suite)
 
2796
        condition = tests.condition_id_re('test_filter_suite_by_r')
 
2797
        split_suite = tests.split_suite_by_condition(self.suite, condition)
 
2798
        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2799
            'test_filter_suite_by_re')
 
2800
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2801
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2802
        remaining_names = list(self.all_names)
 
2803
        remaining_names.remove(filtered_name)
 
2804
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2805
 
 
2806
    def test_split_suit_by_re(self):
 
2807
        self.all_names = _test_ids(self.suite)
 
2808
        split_suite = tests.split_suite_by_re(self.suite,
 
2809
                                              'test_filter_suite_by_r')
 
2810
        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
 
2811
            'test_filter_suite_by_re')
 
2812
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2813
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2814
        remaining_names = list(self.all_names)
 
2815
        remaining_names.remove(filtered_name)
 
2816
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2817
 
 
2818
 
 
2819
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2820
 
 
2821
    def test_check_tree_shape(self):
 
2822
        files = ['a', 'b/', 'b/c']
 
2823
        tree = self.make_branch_and_tree('.')
 
2824
        self.build_tree(files)
 
2825
        tree.add(files)
 
2826
        tree.lock_read()
 
2827
        try:
 
2828
            self.check_tree_shape(tree, files)
 
2829
        finally:
 
2830
            tree.unlock()
 
2831
 
 
2832
 
 
2833
class TestBlackboxSupport(tests.TestCase):
 
2834
    """Tests for testsuite blackbox features."""
 
2835
 
 
2836
    def test_run_bzr_failure_not_caught(self):
 
2837
        # When we run bzr in blackbox mode, we want any unexpected errors to
 
2838
        # propagate up to the test suite so that it can show the error in the
 
2839
        # usual way, and we won't get a double traceback.
 
2840
        e = self.assertRaises(
 
2841
            AssertionError,
 
2842
            self.run_bzr, ['assert-fail'])
 
2843
        # make sure we got the real thing, not an error from somewhere else in
 
2844
        # the test framework
 
2845
        self.assertEqual('always fails', str(e))
 
2846
        # check that there's no traceback in the test log
 
2847
        self.assertNotContainsRe(self.get_log(), r'Traceback')
 
2848
 
 
2849
    def test_run_bzr_user_error_caught(self):
 
2850
        # Running bzr in blackbox mode, normal/expected/user errors should be
 
2851
        # caught in the regular way and turned into an error message plus exit
 
2852
        # code.
 
2853
        transport_server = memory.MemoryServer()
 
2854
        transport_server.start_server()
 
2855
        self.addCleanup(transport_server.stop_server)
 
2856
        url = transport_server.get_url()
 
2857
        self.permit_url(url)
 
2858
        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
 
2859
        self.assertEqual(out, '')
 
2860
        self.assertContainsRe(err,
 
2861
            'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
 
2862
 
 
2863
 
 
2864
class TestTestLoader(tests.TestCase):
 
2865
    """Tests for the test loader."""
 
2866
 
 
2867
    def _get_loader_and_module(self):
 
2868
        """Gets a TestLoader and a module with one test in it."""
 
2869
        loader = TestUtil.TestLoader()
 
2870
        module = {}
 
2871
        class Stub(tests.TestCase):
 
2872
            def test_foo(self):
 
2873
                pass
 
2874
        class MyModule(object):
 
2875
            pass
 
2876
        MyModule.a_class = Stub
 
2877
        module = MyModule()
 
2878
        module.__name__ = 'fake_module'
 
2879
        return loader, module
 
2880
 
 
2881
    def test_module_no_load_tests_attribute_loads_classes(self):
 
2882
        loader, module = self._get_loader_and_module()
 
2883
        self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases())
 
2884
 
 
2885
    def test_module_load_tests_attribute_gets_called(self):
 
2886
        loader, module = self._get_loader_and_module()
 
2887
        def load_tests(loader, standard_tests, pattern):
 
2888
            result = loader.suiteClass()
 
2889
            for test in tests.iter_suite_tests(standard_tests):
 
2890
                result.addTests([test, test])
 
2891
            return result
 
2892
        # add a load_tests() method which multiplies the tests from the module.
 
2893
        module.__class__.load_tests = staticmethod(load_tests)
 
2894
        self.assertEqual(
 
2895
            2 * [str(module.a_class('test_foo'))],
 
2896
            list(map(str, loader.loadTestsFromModule(module))))
 
2897
 
 
2898
    def test_load_tests_from_module_name_smoke_test(self):
 
2899
        loader = TestUtil.TestLoader()
 
2900
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
2901
        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
 
2902
                          _test_ids(suite))
 
2903
 
 
2904
    def test_load_tests_from_module_name_with_bogus_module_name(self):
 
2905
        loader = TestUtil.TestLoader()
 
2906
        self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
 
2907
 
 
2908
 
 
2909
class TestTestIdList(tests.TestCase):
 
2910
 
 
2911
    def _create_id_list(self, test_list):
 
2912
        return tests.TestIdList(test_list)
 
2913
 
 
2914
    def _create_suite(self, test_id_list):
 
2915
 
 
2916
        class Stub(tests.TestCase):
 
2917
            def test_foo(self):
 
2918
                pass
 
2919
 
 
2920
        def _create_test_id(id):
 
2921
            return lambda: id
 
2922
 
 
2923
        suite = TestUtil.TestSuite()
 
2924
        for id in test_id_list:
 
2925
            t  = Stub('test_foo')
 
2926
            t.id = _create_test_id(id)
 
2927
            suite.addTest(t)
 
2928
        return suite
 
2929
 
 
2930
    def _test_ids(self, test_suite):
 
2931
        """Get the ids for the tests in a test suite."""
 
2932
        return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
2933
 
 
2934
    def test_empty_list(self):
 
2935
        id_list = self._create_id_list([])
 
2936
        self.assertEqual({}, id_list.tests)
 
2937
        self.assertEqual({}, id_list.modules)
 
2938
 
 
2939
    def test_valid_list(self):
 
2940
        id_list = self._create_id_list(
 
2941
            ['mod1.cl1.meth1', 'mod1.cl1.meth2',
 
2942
             'mod1.func1', 'mod1.cl2.meth2',
 
2943
             'mod1.submod1',
 
2944
             'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
 
2945
             ])
 
2946
        self.assertTrue(id_list.refers_to('mod1'))
 
2947
        self.assertTrue(id_list.refers_to('mod1.submod1'))
 
2948
        self.assertTrue(id_list.refers_to('mod1.submod2'))
 
2949
        self.assertTrue(id_list.includes('mod1.cl1.meth1'))
 
2950
        self.assertTrue(id_list.includes('mod1.submod1'))
 
2951
        self.assertTrue(id_list.includes('mod1.func1'))
 
2952
 
 
2953
    def test_bad_chars_in_params(self):
 
2954
        id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
 
2955
        self.assertTrue(id_list.refers_to('mod1'))
 
2956
        self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
 
2957
 
 
2958
    def test_module_used(self):
 
2959
        id_list = self._create_id_list(['mod.class.meth'])
 
2960
        self.assertTrue(id_list.refers_to('mod'))
 
2961
        self.assertTrue(id_list.refers_to('mod.class'))
 
2962
        self.assertTrue(id_list.refers_to('mod.class.meth'))
 
2963
 
 
2964
    def test_test_suite_matches_id_list_with_unknown(self):
 
2965
        loader = TestUtil.TestLoader()
 
2966
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
2967
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',
 
2968
                     'bogus']
 
2969
        not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
 
2970
        self.assertEqual(['bogus'], not_found)
 
2971
        self.assertEqual([], duplicates)
 
2972
 
 
2973
    def test_suite_matches_id_list_with_duplicates(self):
 
2974
        loader = TestUtil.TestLoader()
 
2975
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
2976
        dupes = loader.suiteClass()
 
2977
        for test in tests.iter_suite_tests(suite):
 
2978
            dupes.addTest(test)
 
2979
            dupes.addTest(test) # Add it again
 
2980
 
 
2981
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',]
 
2982
        not_found, duplicates = tests.suite_matches_id_list(
 
2983
            dupes, test_list)
 
2984
        self.assertEqual([], not_found)
 
2985
        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
 
2986
                          duplicates)
 
2987
 
 
2988
 
 
2989
class TestTestSuite(tests.TestCase):
 
2990
 
 
2991
    def test__test_suite_testmod_names(self):
 
2992
        # Test that a plausible list of test module names are returned
 
2993
        # by _test_suite_testmod_names.
 
2994
        test_list = tests._test_suite_testmod_names()
 
2995
        self.assertSubset([
 
2996
            'breezy.tests.blackbox',
 
2997
            'breezy.tests.per_transport',
 
2998
            'breezy.tests.test_selftest',
 
2999
            ],
 
3000
            test_list)
 
3001
 
 
3002
    def test__test_suite_modules_to_doctest(self):
 
3003
        # Test that a plausible list of modules to doctest is returned
 
3004
        # by _test_suite_modules_to_doctest.
 
3005
        test_list = tests._test_suite_modules_to_doctest()
 
3006
        if __doc__ is None:
 
3007
            # When docstrings are stripped, there are no modules to doctest
 
3008
            self.assertEqual([], test_list)
 
3009
            return
 
3010
        self.assertSubset([
 
3011
            'breezy.timestamp',
 
3012
            ],
 
3013
            test_list)
 
3014
 
 
3015
    def test_test_suite(self):
 
3016
        # test_suite() loads the entire test suite to operate. To avoid this
 
3017
        # overhead, and yet still be confident that things are happening,
 
3018
        # we temporarily replace two functions used by test_suite with 
 
3019
        # test doubles that supply a few sample tests to load, and check they
 
3020
        # are loaded.
 
3021
        calls = []
 
3022
        def testmod_names():
 
3023
            calls.append("testmod_names")
 
3024
            return [
 
3025
                'breezy.tests.blackbox.test_branch',
 
3026
                'breezy.tests.per_transport',
 
3027
                'breezy.tests.test_selftest',
 
3028
                ]
 
3029
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
 
3030
        def doctests():
 
3031
            calls.append("modules_to_doctest")
 
3032
            if __doc__ is None:
 
3033
                return []
 
3034
            return ['breezy.timestamp']
 
3035
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
 
3036
        expected_test_list = [
 
3037
            # testmod_names
 
3038
            'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
 
3039
            ('breezy.tests.per_transport.TransportTests'
 
3040
             '.test_abspath(LocalTransport,LocalURLServer)'),
 
3041
            'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
 
3042
            # plugins can't be tested that way since selftest may be run with
 
3043
            # --no-plugins
 
3044
            ]
 
3045
        if __doc__ is not None:
 
3046
            expected_test_list.extend([
 
3047
                # modules_to_doctest
 
3048
                'breezy.timestamp.format_highres_date',
 
3049
                ])
 
3050
        suite = tests.test_suite()
 
3051
        self.assertEqual({"testmod_names", "modules_to_doctest"},
 
3052
            set(calls))
 
3053
        self.assertSubset(expected_test_list, _test_ids(suite))
 
3054
 
 
3055
    def test_test_suite_list_and_start(self):
 
3056
        # We cannot test this at the same time as the main load, because we want
 
3057
        # to know that starting_with == None works. So a second load is
 
3058
        # incurred - note that the starting_with parameter causes a partial load
 
3059
        # rather than a full load so this test should be pretty quick.
 
3060
        test_list = ['breezy.tests.test_selftest.TestTestSuite.test_test_suite']
 
3061
        suite = tests.test_suite(test_list,
 
3062
                                 ['breezy.tests.test_selftest.TestTestSuite'])
 
3063
        # test_test_suite_list_and_start is not included 
 
3064
        self.assertEqual(test_list, _test_ids(suite))
 
3065
 
 
3066
 
 
3067
class TestLoadTestIdList(tests.TestCaseInTempDir):
 
3068
 
 
3069
    def _create_test_list_file(self, file_name, content):
 
3070
        fl = open(file_name, 'wt')
 
3071
        fl.write(content)
 
3072
        fl.close()
 
3073
 
 
3074
    def test_load_unknown(self):
 
3075
        self.assertRaises(errors.NoSuchFile,
 
3076
                          tests.load_test_id_list, 'i_do_not_exist')
 
3077
 
 
3078
    def test_load_test_list(self):
 
3079
        test_list_fname = 'test.list'
 
3080
        self._create_test_list_file(test_list_fname,
 
3081
                                    'mod1.cl1.meth1\nmod2.cl2.meth2\n')
 
3082
        tlist = tests.load_test_id_list(test_list_fname)
 
3083
        self.assertEqual(2, len(tlist))
 
3084
        self.assertEqual('mod1.cl1.meth1', tlist[0])
 
3085
        self.assertEqual('mod2.cl2.meth2', tlist[1])
 
3086
 
 
3087
    def test_load_dirty_file(self):
 
3088
        test_list_fname = 'test.list'
 
3089
        self._create_test_list_file(test_list_fname,
 
3090
                                    '  mod1.cl1.meth1\n\nmod2.cl2.meth2  \n'
 
3091
                                    'bar baz\n')
 
3092
        tlist = tests.load_test_id_list(test_list_fname)
 
3093
        self.assertEqual(4, len(tlist))
 
3094
        self.assertEqual('mod1.cl1.meth1', tlist[0])
 
3095
        self.assertEqual('', tlist[1])
 
3096
        self.assertEqual('mod2.cl2.meth2', tlist[2])
 
3097
        self.assertEqual('bar baz', tlist[3])
 
3098
 
 
3099
 
 
3100
class TestFilteredByModuleTestLoader(tests.TestCase):
 
3101
 
 
3102
    def _create_loader(self, test_list):
 
3103
        id_filter = tests.TestIdList(test_list)
 
3104
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3105
        return loader
 
3106
 
 
3107
    def test_load_tests(self):
 
3108
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3109
        loader = self._create_loader(test_list)
 
3110
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3111
        self.assertEqual(test_list, _test_ids(suite))
 
3112
 
 
3113
    def test_exclude_tests(self):
 
3114
        test_list = ['bogus']
 
3115
        loader = self._create_loader(test_list)
 
3116
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3117
        self.assertEqual([], _test_ids(suite))
 
3118
 
 
3119
 
 
3120
class TestFilteredByNameStartTestLoader(tests.TestCase):
 
3121
 
 
3122
    def _create_loader(self, name_start):
 
3123
        def needs_module(name):
 
3124
            return name.startswith(name_start) or name_start.startswith(name)
 
3125
        loader = TestUtil.FilteredByModuleTestLoader(needs_module)
 
3126
        return loader
 
3127
 
 
3128
    def test_load_tests(self):
 
3129
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3130
        loader = self._create_loader('breezy.tests.test_samp')
 
3131
 
 
3132
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3133
        self.assertEqual(test_list, _test_ids(suite))
 
3134
 
 
3135
    def test_load_tests_inside_module(self):
 
3136
        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
 
3137
        loader = self._create_loader('breezy.tests.test_sampler.Demo')
 
3138
 
 
3139
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3140
        self.assertEqual(test_list, _test_ids(suite))
 
3141
 
 
3142
    def test_exclude_tests(self):
 
3143
        test_list = ['bogus']
 
3144
        loader = self._create_loader('bogus')
 
3145
 
 
3146
        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
 
3147
        self.assertEqual([], _test_ids(suite))
 
3148
 
 
3149
 
 
3150
class TestTestPrefixRegistry(tests.TestCase):
 
3151
 
 
3152
    def _get_registry(self):
 
3153
        tp_registry = tests.TestPrefixAliasRegistry()
 
3154
        return tp_registry
 
3155
 
 
3156
    def test_register_new_prefix(self):
 
3157
        tpr = self._get_registry()
 
3158
        tpr.register('foo', 'fff.ooo.ooo')
 
3159
        self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
 
3160
 
 
3161
    def test_register_existing_prefix(self):
 
3162
        tpr = self._get_registry()
 
3163
        tpr.register('bar', 'bbb.aaa.rrr')
 
3164
        tpr.register('bar', 'bBB.aAA.rRR')
 
3165
        self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
 
3166
        self.assertThat(self.get_log(),
 
3167
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3168
                           doctest.ELLIPSIS))
 
3169
 
 
3170
    def test_get_unknown_prefix(self):
 
3171
        tpr = self._get_registry()
 
3172
        self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
 
3173
 
 
3174
    def test_resolve_prefix(self):
 
3175
        tpr = self._get_registry()
 
3176
        tpr.register('bar', 'bb.aa.rr')
 
3177
        self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
 
3178
 
 
3179
    def test_resolve_unknown_alias(self):
 
3180
        tpr = self._get_registry()
 
3181
        self.assertRaises(errors.BzrCommandError,
 
3182
                          tpr.resolve_alias, 'I am not a prefix')
 
3183
 
 
3184
    def test_predefined_prefixes(self):
 
3185
        tpr = tests.test_prefix_alias_registry
 
3186
        self.assertEqual('breezy', tpr.resolve_alias('breezy'))
 
3187
        self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
 
3188
        self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
 
3189
        self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
 
3190
        self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
 
3191
        self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
 
3192
 
 
3193
 
 
3194
class TestThreadLeakDetection(tests.TestCase):
 
3195
    """Ensure when tests leak threads we detect and report it"""
 
3196
 
 
3197
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3198
        def __init__(self):
 
3199
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3200
            self.leaks = []
 
3201
        def _report_thread_leak(self, test, leaks, alive):
 
3202
            self.leaks.append((test, leaks))
 
3203
 
 
3204
    def test_testcase_without_addCleanups(self):
 
3205
        """Check old TestCase instances don't break with leak detection"""
 
3206
        class Test(unittest.TestCase):
 
3207
            def runTest(self):
 
3208
                pass
 
3209
        result = self.LeakRecordingResult()
 
3210
        test = Test()
 
3211
        result.startTestRun()
 
3212
        test.run(result)
 
3213
        result.stopTestRun()
 
3214
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3215
        self.assertEqual(result.leaks, [])
 
3216
        
 
3217
    def test_thread_leak(self):
 
3218
        """Ensure a thread that outlives the running of a test is reported
 
3219
 
 
3220
        Uses a thread that blocks on an event, and is started by the inner
 
3221
        test case. As the thread outlives the inner case's run, it should be
 
3222
        detected as a leak, but the event is then set so that the thread can
 
3223
        be safely joined in cleanup so it's not leaked for real.
 
3224
        """
 
3225
        event = threading.Event()
 
3226
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3227
        class Test(tests.TestCase):
 
3228
            def test_leak(self):
 
3229
                thread.start()
 
3230
        result = self.LeakRecordingResult()
 
3231
        test = Test("test_leak")
 
3232
        self.addCleanup(thread.join)
 
3233
        self.addCleanup(event.set)
 
3234
        result.startTestRun()
 
3235
        test.run(result)
 
3236
        result.stopTestRun()
 
3237
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3238
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3239
        self.assertEqual(result.leaks, [(test, {thread})])
 
3240
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3241
 
 
3242
    def test_multiple_leaks(self):
 
3243
        """Check multiple leaks are blamed on the test cases at fault
 
3244
 
 
3245
        Same concept as the previous test, but has one inner test method that
 
3246
        leaks two threads, and one that doesn't leak at all.
 
3247
        """
 
3248
        event = threading.Event()
 
3249
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3250
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3251
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3252
        class Test(tests.TestCase):
 
3253
            def test_first_leak(self):
 
3254
                thread_b.start()
 
3255
            def test_second_no_leak(self):
 
3256
                pass
 
3257
            def test_third_leak(self):
 
3258
                thread_c.start()
 
3259
                thread_a.start()
 
3260
        result = self.LeakRecordingResult()
 
3261
        first_test = Test("test_first_leak")
 
3262
        third_test = Test("test_third_leak")
 
3263
        self.addCleanup(thread_a.join)
 
3264
        self.addCleanup(thread_b.join)
 
3265
        self.addCleanup(thread_c.join)
 
3266
        self.addCleanup(event.set)
 
3267
        result.startTestRun()
 
3268
        unittest.TestSuite(
 
3269
            [first_test, Test("test_second_no_leak"), third_test]
 
3270
            ).run(result)
 
3271
        result.stopTestRun()
 
3272
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3273
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3274
        self.assertEqual(result.leaks, [
 
3275
            (first_test, {thread_b}),
 
3276
            (third_test, {thread_a, thread_c})])
 
3277
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3278
 
 
3279
 
 
3280
class TestPostMortemDebugging(tests.TestCase):
 
3281
    """Check post mortem debugging works when tests fail or error"""
 
3282
 
 
3283
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3284
        def __init__(self):
 
3285
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3286
            self.postcode = None
 
3287
        def _post_mortem(self, tb=None):
 
3288
            """Record the code object at the end of the current traceback"""
 
3289
            tb = tb or sys.exc_info()[2]
 
3290
            if tb is not None:
 
3291
                next = tb.tb_next
 
3292
                while next is not None:
 
3293
                    tb = next
 
3294
                    next = next.tb_next
 
3295
                self.postcode = tb.tb_frame.f_code
 
3296
        def report_error(self, test, err):
 
3297
            pass
 
3298
        def report_failure(self, test, err):
 
3299
            pass
 
3300
 
 
3301
    def test_location_unittest_error(self):
 
3302
        """Needs right post mortem traceback with erroring unittest case"""
 
3303
        class Test(unittest.TestCase):
 
3304
            def runTest(self):
 
3305
                raise RuntimeError
 
3306
        result = self.TracebackRecordingResult()
 
3307
        Test().run(result)
 
3308
        self.assertEqual(result.postcode, Test.runTest.__code__)
 
3309
 
 
3310
    def test_location_unittest_failure(self):
 
3311
        """Needs right post mortem traceback with failing unittest case"""
 
3312
        class Test(unittest.TestCase):
 
3313
            def runTest(self):
 
3314
                raise self.failureException
 
3315
        result = self.TracebackRecordingResult()
 
3316
        Test().run(result)
 
3317
        self.assertEqual(result.postcode, Test.runTest.__code__)
 
3318
 
 
3319
    def test_location_bt_error(self):
 
3320
        """Needs right post mortem traceback with erroring breezy.tests case"""
 
3321
        class Test(tests.TestCase):
 
3322
            def test_error(self):
 
3323
                raise RuntimeError
 
3324
        result = self.TracebackRecordingResult()
 
3325
        Test("test_error").run(result)
 
3326
        self.assertEqual(result.postcode, Test.test_error.__code__)
 
3327
 
 
3328
    def test_location_bt_failure(self):
 
3329
        """Needs right post mortem traceback with failing breezy.tests case"""
 
3330
        class Test(tests.TestCase):
 
3331
            def test_failure(self):
 
3332
                raise self.failureException
 
3333
        result = self.TracebackRecordingResult()
 
3334
        Test("test_failure").run(result)
 
3335
        self.assertEqual(result.postcode, Test.test_failure.__code__)
 
3336
 
 
3337
    def test_env_var_triggers_post_mortem(self):
 
3338
        """Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
 
3339
        import pdb
 
3340
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3341
        post_mortem_calls = []
 
3342
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3343
        self.overrideEnv('BRZ_TEST_PDB', None)
 
3344
        result._post_mortem(1)
 
3345
        self.overrideEnv('BRZ_TEST_PDB', 'on')
 
3346
        result._post_mortem(2)
 
3347
        self.assertEqual([2], post_mortem_calls)
 
3348
 
 
3349
 
 
3350
class TestRunSuite(tests.TestCase):
 
3351
 
 
3352
    def test_runner_class(self):
 
3353
        """run_suite accepts and uses a runner_class keyword argument."""
 
3354
        class Stub(tests.TestCase):
 
3355
            def test_foo(self):
 
3356
                pass
 
3357
        suite = Stub("test_foo")
 
3358
        calls = []
 
3359
        class MyRunner(tests.TextTestRunner):
 
3360
            def run(self, test):
 
3361
                calls.append(test)
 
3362
                return tests.ExtendedTestResult(self.stream, self.descriptions,
 
3363
                                                self.verbosity)
 
3364
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
 
3365
        self.assertLength(1, calls)
 
3366
 
 
3367
 
 
3368
class _Selftest(object):
 
3369
    """Mixin for tests needing full selftest output"""
 
3370
 
 
3371
    def _inject_stream_into_subunit(self, stream):
 
3372
        """To be overridden by subclasses that run tests out of process"""
 
3373
 
 
3374
    def _run_selftest(self, **kwargs):
 
3375
        sio = StringIO()
 
3376
        self._inject_stream_into_subunit(sio)
 
3377
        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
 
3378
        return sio.getvalue()
 
3379
 
 
3380
 
 
3381
class _ForkedSelftest(_Selftest):
 
3382
    """Mixin for tests needing full selftest output with forked children"""
 
3383
 
 
3384
    _test_needs_features = [features.subunit]
 
3385
 
 
3386
    def _inject_stream_into_subunit(self, stream):
 
3387
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3388
 
 
3389
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3390
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3391
        """
 
3392
        from subunit import ProtocolTestCase
 
3393
        _original_init = ProtocolTestCase.__init__
 
3394
        def _init_with_passthrough(self, *args, **kwargs):
 
3395
            _original_init(self, *args, **kwargs)
 
3396
            self._passthrough = stream
 
3397
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3398
 
 
3399
    def _run_selftest(self, **kwargs):
 
3400
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3401
        if getattr(os, "fork", None) is None:
 
3402
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3403
        # Make sure the fork code is actually invoked by claiming two cores
 
3404
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3405
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3406
        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
 
3407
 
 
3408
 
 
3409
class TestParallelFork(_ForkedSelftest, tests.TestCase):
 
3410
    """Check operation of --parallel=fork selftest option"""
 
3411
 
 
3412
    def test_error_in_child_during_fork(self):
 
3413
        """Error in a forked child during test setup should get reported"""
 
3414
        class Test(tests.TestCase):
 
3415
            def testMethod(self):
 
3416
                pass
 
3417
        # We don't care what, just break something that a child will run
 
3418
        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
 
3419
        out = self._run_selftest(test_suite_factory=Test)
 
3420
        # Lines from the tracebacks of the two child processes may be mixed
 
3421
        # together due to the way subunit parses and forwards the streams,
 
3422
        # so permit extra lines between each part of the error output.
 
3423
        self.assertContainsRe(out,
 
3424
            "Traceback.*:\n"
 
3425
            "(?:.*\n)*"
 
3426
            ".+ in fork_for_tests\n"
 
3427
            "(?:.*\n)*"
 
3428
            "\s*workaround_zealous_crypto_random\(\)\n"
 
3429
            "(?:.*\n)*"
 
3430
            "TypeError:")
 
3431
 
 
3432
 
 
3433
class TestUncollectedWarnings(_Selftest, tests.TestCase):
 
3434
    """Check a test case still alive after being run emits a warning"""
 
3435
 
 
3436
    class Test(tests.TestCase):
 
3437
        def test_pass(self):
 
3438
            pass
 
3439
        def test_self_ref(self):
 
3440
            self.also_self = self.test_self_ref
 
3441
        def test_skip(self):
 
3442
            self.skipTest("Don't need")
 
3443
 
 
3444
    def _get_suite(self):
 
3445
        return TestUtil.TestSuite([
 
3446
            self.Test("test_pass"),
 
3447
            self.Test("test_self_ref"),
 
3448
            self.Test("test_skip"),
 
3449
            ])
 
3450
 
 
3451
    def _run_selftest_with_suite(self, **kwargs):
 
3452
        old_flags = tests.selftest_debug_flags
 
3453
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3454
        gc_on = gc.isenabled()
 
3455
        if gc_on:
 
3456
            gc.disable()
 
3457
        try:
 
3458
            output = self._run_selftest(test_suite_factory=self._get_suite,
 
3459
                **kwargs)
 
3460
        finally:
 
3461
            if gc_on:
 
3462
                gc.enable()
 
3463
            tests.selftest_debug_flags = old_flags
 
3464
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3465
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3466
        return output
 
3467
 
 
3468
    def test_testsuite(self):
 
3469
        self._run_selftest_with_suite()
 
3470
 
 
3471
    def test_pattern(self):
 
3472
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3473
        self.assertNotContainsRe(out, "test_skip")
 
3474
 
 
3475
    def test_exclude_pattern(self):
 
3476
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3477
        self.assertNotContainsRe(out, "test_skip")
 
3478
 
 
3479
    def test_random_seed(self):
 
3480
        self._run_selftest_with_suite(random_seed="now")
 
3481
 
 
3482
    def test_matching_tests_first(self):
 
3483
        self._run_selftest_with_suite(matching_tests_first=True,
 
3484
            pattern="test_self_ref$")
 
3485
 
 
3486
    def test_starting_with_and_exclude(self):
 
3487
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3488
            exclude_pattern="test_skip$")
 
3489
        self.assertNotContainsRe(out, "test_skip")
 
3490
 
 
3491
    def test_additonal_decorator(self):
 
3492
        out = self._run_selftest_with_suite(
 
3493
            suite_decorators=[tests.TestDecorator])
 
3494
 
 
3495
 
 
3496
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3497
    """Check warnings from tests staying alive are emitted with subunit"""
 
3498
 
 
3499
    _test_needs_features = [features.subunit]
 
3500
 
 
3501
    def _run_selftest_with_suite(self, **kwargs):
 
3502
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3503
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3504
 
 
3505
 
 
3506
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
 
3507
    """Check warnings from tests staying alive are emitted when forking"""
 
3508
 
 
3509
 
 
3510
class TestEnvironHandling(tests.TestCase):
 
3511
 
 
3512
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3513
        self.assertFalse('MYVAR' in os.environ)
 
3514
        self.overrideEnv('MYVAR', '42')
 
3515
        # We use an embedded test to make sure we fix the _captureVar bug
 
3516
        class Test(tests.TestCase):
 
3517
            def test_me(self):
 
3518
                # The first call save the 42 value
 
3519
                self.overrideEnv('MYVAR', None)
 
3520
                self.assertEqual(None, os.environ.get('MYVAR'))
 
3521
                # Make sure we can call it twice
 
3522
                self.overrideEnv('MYVAR', None)
 
3523
                self.assertEqual(None, os.environ.get('MYVAR'))
 
3524
        output = StringIO()
 
3525
        result = tests.TextTestResult(output, 0, 1)
 
3526
        Test('test_me').run(result)
 
3527
        if not result.wasStrictlySuccessful():
 
3528
            self.fail(output.getvalue())
 
3529
        # We get our value back
 
3530
        self.assertEqual('42', os.environ.get('MYVAR'))
 
3531
 
 
3532
 
 
3533
class TestIsolatedEnv(tests.TestCase):
 
3534
    """Test isolating tests from os.environ.
 
3535
 
 
3536
    Since we use tests that are already isolated from os.environ a bit of care
 
3537
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3538
    The tests start an already clean os.environ which allow doing valid
 
3539
    assertions about which variables are present or not and design tests around
 
3540
    these assertions.
 
3541
    """
 
3542
 
 
3543
    class ScratchMonkey(tests.TestCase):
 
3544
 
 
3545
        def test_me(self):
 
3546
            pass
 
3547
 
 
3548
    def test_basics(self):
 
3549
        # Make sure we know the definition of BRZ_HOME: not part of os.environ
 
3550
        # for tests.TestCase.
 
3551
        self.assertTrue('BRZ_HOME' in tests.isolated_environ)
 
3552
        self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
 
3553
        # Being part of isolated_environ, BRZ_HOME should not appear here
 
3554
        self.assertFalse('BRZ_HOME' in os.environ)
 
3555
        # Make sure we know the definition of LINES: part of os.environ for
 
3556
        # tests.TestCase
 
3557
        self.assertTrue('LINES' in tests.isolated_environ)
 
3558
        self.assertEqual('25', tests.isolated_environ['LINES'])
 
3559
        self.assertEqual('25', os.environ['LINES'])
 
3560
 
 
3561
    def test_injecting_unknown_variable(self):
 
3562
        # BRZ_HOME is known to be absent from os.environ
 
3563
        test = self.ScratchMonkey('test_me')
 
3564
        tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
 
3565
        self.assertEqual('foo', os.environ['BRZ_HOME'])
 
3566
        tests.restore_os_environ(test)
 
3567
        self.assertFalse('BRZ_HOME' in os.environ)
 
3568
 
 
3569
    def test_injecting_known_variable(self):
 
3570
        test = self.ScratchMonkey('test_me')
 
3571
        # LINES is known to be present in os.environ
 
3572
        tests.override_os_environ(test, {'LINES': '42'})
 
3573
        self.assertEqual('42', os.environ['LINES'])
 
3574
        tests.restore_os_environ(test)
 
3575
        self.assertEqual('25', os.environ['LINES'])
 
3576
 
 
3577
    def test_deleting_variable(self):
 
3578
        test = self.ScratchMonkey('test_me')
 
3579
        # LINES is known to be present in os.environ
 
3580
        tests.override_os_environ(test, {'LINES': None})
 
3581
        self.assertTrue('LINES' not in os.environ)
 
3582
        tests.restore_os_environ(test)
 
3583
        self.assertEqual('25', os.environ['LINES'])
 
3584
 
 
3585
 
 
3586
class TestDocTestSuiteIsolation(tests.TestCase):
 
3587
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3588
 
 
3589
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3590
    the clean environment as a base for testing. To precisely capture the
 
3591
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3592
    compare against.
 
3593
 
 
3594
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3595
    not `os.environ` so each test overrides it to suit its needs.
 
3596
 
 
3597
    """
 
3598
 
 
3599
    def get_doctest_suite_for_string(self, klass, string):
 
3600
        class Finder(doctest.DocTestFinder):
 
3601
 
 
3602
            def find(*args, **kwargs):
 
3603
                test = doctest.DocTestParser().get_doctest(
 
3604
                    string, {}, 'foo', 'foo.py', 0)
 
3605
                return [test]
 
3606
 
 
3607
        suite = klass(test_finder=Finder())
 
3608
        return suite
 
3609
 
 
3610
    def run_doctest_suite_for_string(self, klass, string):
 
3611
        suite = self.get_doctest_suite_for_string(klass, string)
 
3612
        output = StringIO()
 
3613
        result = tests.TextTestResult(output, 0, 1)
 
3614
        suite.run(result)
 
3615
        return result, output
 
3616
 
 
3617
    def assertDocTestStringSucceds(self, klass, string):
 
3618
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3619
        if not result.wasStrictlySuccessful():
 
3620
            self.fail(output.getvalue())
 
3621
 
 
3622
    def assertDocTestStringFails(self, klass, string):
 
3623
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3624
        if result.wasStrictlySuccessful():
 
3625
            self.fail(output.getvalue())
 
3626
 
 
3627
    def test_injected_variable(self):
 
3628
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3629
        test = """
 
3630
            >>> import os
 
3631
            >>> os.environ['LINES']
 
3632
            '42'
 
3633
            """
 
3634
        # doctest.DocTestSuite fails as it sees '25'
 
3635
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3636
        # tests.DocTestSuite sees '42'
 
3637
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3638
 
 
3639
    def test_deleted_variable(self):
 
3640
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3641
        test = """
 
3642
            >>> import os
 
3643
            >>> os.environ.get('LINES')
 
3644
            """
 
3645
        # doctest.DocTestSuite fails as it sees '25'
 
3646
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3647
        # tests.DocTestSuite sees None
 
3648
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3649
 
 
3650
 
 
3651
class TestSelftestExcludePatterns(tests.TestCase):
 
3652
 
 
3653
    def setUp(self):
 
3654
        super(TestSelftestExcludePatterns, self).setUp()
 
3655
        self.overrideAttr(tests, 'test_suite', self.suite_factory)
 
3656
 
 
3657
    def suite_factory(self, keep_only=None, starting_with=None):
 
3658
        """A test suite factory with only a few tests."""
 
3659
        class Test(tests.TestCase):
 
3660
            def id(self):
 
3661
                # We don't need the full class path
 
3662
                return self._testMethodName
 
3663
            def a(self):
 
3664
                pass
 
3665
            def b(self):
 
3666
                pass
 
3667
            def c(self):
 
3668
                pass
 
3669
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
3670
 
 
3671
    def assertTestList(self, expected, *selftest_args):
 
3672
        # We rely on setUp installing the right test suite factory so we can
 
3673
        # test at the command level without loading the whole test suite
 
3674
        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
 
3675
        actual = out.splitlines()
 
3676
        self.assertEqual(expected, actual)
 
3677
 
 
3678
    def test_full_list(self):
 
3679
        self.assertTestList(['a', 'b', 'c'])
 
3680
 
 
3681
    def test_single_exclude(self):
 
3682
        self.assertTestList(['b', 'c'], '-x', 'a')
 
3683
 
 
3684
    def test_mutiple_excludes(self):
 
3685
        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
 
3686
 
 
3687
 
 
3688
class TestCounterHooks(tests.TestCase, SelfTestHelper):
 
3689
 
 
3690
    _test_needs_features = [features.subunit]
 
3691
 
 
3692
    def setUp(self):
 
3693
        super(TestCounterHooks, self).setUp()
 
3694
        class Test(tests.TestCase):
 
3695
 
 
3696
            def setUp(self):
 
3697
                super(Test, self).setUp()
 
3698
                self.hooks = hooks.Hooks()
 
3699
                self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
 
3700
                self.install_counter_hook(self.hooks, 'myhook')
 
3701
 
 
3702
            def no_hook(self):
 
3703
                pass
 
3704
 
 
3705
            def run_hook_once(self):
 
3706
                for hook in self.hooks['myhook']:
 
3707
                    hook(self)
 
3708
 
 
3709
        self.test_class = Test
 
3710
 
 
3711
    def assertHookCalls(self, expected_calls, test_name):
 
3712
        test = self.test_class(test_name)
 
3713
        result = unittest.TestResult()
 
3714
        test.run(result)
 
3715
        self.assertTrue(hasattr(test, '_counters'))
 
3716
        self.assertTrue('myhook' in test._counters)
 
3717
        self.assertEqual(expected_calls, test._counters['myhook'])
 
3718
 
 
3719
    def test_no_hook(self):
 
3720
        self.assertHookCalls(0, 'no_hook')
 
3721
 
 
3722
    def test_run_hook_once(self):
 
3723
        tt = features.testtools
 
3724
        if tt.module.__version__ < (0, 9, 8):
 
3725
            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
 
3726
        self.assertHookCalls(1, 'run_hook_once')