/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

Tags: bzr-2.0.3
Prepare 2.0.3 final

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the test framework."""
 
18
 
 
19
from cStringIO import StringIO
 
20
import os
 
21
import signal
 
22
import sys
 
23
import time
 
24
import unittest
 
25
import warnings
 
26
 
 
27
import bzrlib
 
28
from bzrlib import (
 
29
    branchbuilder,
 
30
    bzrdir,
 
31
    debug,
 
32
    errors,
 
33
    lockdir,
 
34
    memorytree,
 
35
    osutils,
 
36
    progress,
 
37
    remote,
 
38
    repository,
 
39
    symbol_versioning,
 
40
    tests,
 
41
    workingtree,
 
42
    )
 
43
from bzrlib.repofmt import (
 
44
    groupcompress_repo,
 
45
    pack_repo,
 
46
    weaverepo,
 
47
    )
 
48
from bzrlib.symbol_versioning import (
 
49
    deprecated_function,
 
50
    deprecated_in,
 
51
    deprecated_method,
 
52
    )
 
53
from bzrlib.tests import (
 
54
    SubUnitFeature,
 
55
    test_lsprof,
 
56
    test_sftp_transport,
 
57
    TestUtil,
 
58
    )
 
59
from bzrlib.trace import note
 
60
from bzrlib.transport.memory import MemoryServer, MemoryTransport
 
61
from bzrlib.version import _get_bzr_source_tree
 
62
 
 
63
 
 
64
def _test_ids(test_suite):
 
65
    """Get the ids for the tests in a test suite."""
 
66
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
67
 
 
68
 
 
69
class SelftestTests(tests.TestCase):
 
70
 
 
71
    def test_import_tests(self):
 
72
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
 
73
        self.assertEqual(mod.SelftestTests, SelftestTests)
 
74
 
 
75
    def test_import_test_failure(self):
 
76
        self.assertRaises(ImportError,
 
77
                          TestUtil._load_module_by_name,
 
78
                          'bzrlib.no-name-yet')
 
79
 
 
80
class MetaTestLog(tests.TestCase):
 
81
 
 
82
    def test_logging(self):
 
83
        """Test logs are captured when a test fails."""
 
84
        self.log('a test message')
 
85
        self._log_file.flush()
 
86
        self.assertContainsRe(self._get_log(keep_log_file=True),
 
87
                              'a test message\n')
 
88
 
 
89
 
 
90
class TestUnicodeFilename(tests.TestCase):
 
91
 
 
92
    def test_probe_passes(self):
 
93
        """UnicodeFilename._probe passes."""
 
94
        # We can't test much more than that because the behaviour depends
 
95
        # on the platform.
 
96
        tests.UnicodeFilename._probe()
 
97
 
 
98
 
 
99
class TestTreeShape(tests.TestCaseInTempDir):
 
100
 
 
101
    def test_unicode_paths(self):
 
102
        self.requireFeature(tests.UnicodeFilename)
 
103
 
 
104
        filename = u'hell\u00d8'
 
105
        self.build_tree_contents([(filename, 'contents of hello')])
 
106
        self.failUnlessExists(filename)
 
107
 
 
108
 
 
109
class TestTransportScenarios(tests.TestCase):
 
110
    """A group of tests that test the transport implementation adaption core.
 
111
 
 
112
    This is a meta test that the tests are applied to all available
 
113
    transports.
 
114
 
 
115
    This will be generalised in the future which is why it is in this
 
116
    test file even though it is specific to transport tests at the moment.
 
117
    """
 
118
 
 
119
    def test_get_transport_permutations(self):
 
120
        # this checks that get_test_permutations defined by the module is
 
121
        # called by the get_transport_test_permutations function.
 
122
        class MockModule(object):
 
123
            def get_test_permutations(self):
 
124
                return sample_permutation
 
125
        sample_permutation = [(1,2), (3,4)]
 
126
        from bzrlib.tests.per_transport import get_transport_test_permutations
 
127
        self.assertEqual(sample_permutation,
 
128
                         get_transport_test_permutations(MockModule()))
 
129
 
 
130
    def test_scenarios_include_all_modules(self):
 
131
        # this checks that the scenario generator returns as many permutations
 
132
        # as there are in all the registered transport modules - we assume if
 
133
        # this matches its probably doing the right thing especially in
 
134
        # combination with the tests for setting the right classes below.
 
135
        from bzrlib.tests.per_transport import transport_test_permutations
 
136
        from bzrlib.transport import _get_transport_modules
 
137
        modules = _get_transport_modules()
 
138
        permutation_count = 0
 
139
        for module in modules:
 
140
            try:
 
141
                permutation_count += len(reduce(getattr,
 
142
                    (module + ".get_test_permutations").split('.')[1:],
 
143
                     __import__(module))())
 
144
            except errors.DependencyNotPresent:
 
145
                pass
 
146
        scenarios = transport_test_permutations()
 
147
        self.assertEqual(permutation_count, len(scenarios))
 
148
 
 
149
    def test_scenarios_include_transport_class(self):
 
150
        # This test used to know about all the possible transports and the
 
151
        # order they were returned but that seems overly brittle (mbp
 
152
        # 20060307)
 
153
        from bzrlib.tests.per_transport import transport_test_permutations
 
154
        scenarios = transport_test_permutations()
 
155
        # there are at least that many builtin transports
 
156
        self.assertTrue(len(scenarios) > 6)
 
157
        one_scenario = scenarios[0]
 
158
        self.assertIsInstance(one_scenario[0], str)
 
159
        self.assertTrue(issubclass(one_scenario[1]["transport_class"],
 
160
                                   bzrlib.transport.Transport))
 
161
        self.assertTrue(issubclass(one_scenario[1]["transport_server"],
 
162
                                   bzrlib.transport.Server))
 
163
 
 
164
 
 
165
class TestBranchScenarios(tests.TestCase):
 
166
 
 
167
    def test_scenarios(self):
 
168
        # check that constructor parameters are passed through to the adapted
 
169
        # test.
 
170
        from bzrlib.tests.per_branch import make_scenarios
 
171
        server1 = "a"
 
172
        server2 = "b"
 
173
        formats = [("c", "C"), ("d", "D")]
 
174
        scenarios = make_scenarios(server1, server2, formats)
 
175
        self.assertEqual(2, len(scenarios))
 
176
        self.assertEqual([
 
177
            ('str',
 
178
             {'branch_format': 'c',
 
179
              'bzrdir_format': 'C',
 
180
              'transport_readonly_server': 'b',
 
181
              'transport_server': 'a'}),
 
182
            ('str',
 
183
             {'branch_format': 'd',
 
184
              'bzrdir_format': 'D',
 
185
              'transport_readonly_server': 'b',
 
186
              'transport_server': 'a'})],
 
187
            scenarios)
 
188
 
 
189
 
 
190
class TestBzrDirScenarios(tests.TestCase):
 
191
 
 
192
    def test_scenarios(self):
 
193
        # check that constructor parameters are passed through to the adapted
 
194
        # test.
 
195
        from bzrlib.tests.per_bzrdir import make_scenarios
 
196
        vfs_factory = "v"
 
197
        server1 = "a"
 
198
        server2 = "b"
 
199
        formats = ["c", "d"]
 
200
        scenarios = make_scenarios(vfs_factory, server1, server2, formats)
 
201
        self.assertEqual([
 
202
            ('str',
 
203
             {'bzrdir_format': 'c',
 
204
              'transport_readonly_server': 'b',
 
205
              'transport_server': 'a',
 
206
              'vfs_transport_factory': 'v'}),
 
207
            ('str',
 
208
             {'bzrdir_format': 'd',
 
209
              'transport_readonly_server': 'b',
 
210
              'transport_server': 'a',
 
211
              'vfs_transport_factory': 'v'})],
 
212
            scenarios)
 
213
 
 
214
 
 
215
class TestRepositoryScenarios(tests.TestCase):
 
216
 
 
217
    def test_formats_to_scenarios(self):
 
218
        from bzrlib.tests.per_repository import formats_to_scenarios
 
219
        formats = [("(c)", remote.RemoteRepositoryFormat()),
 
220
                   ("(d)", repository.format_registry.get(
 
221
                    'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
 
222
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
223
            None)
 
224
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
225
            vfs_transport_factory="vfs")
 
226
        # no_vfs generate scenarios without vfs_transport_factory
 
227
        expected = [
 
228
            ('RemoteRepositoryFormat(c)',
 
229
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
230
              'repository_format': remote.RemoteRepositoryFormat(),
 
231
              'transport_readonly_server': 'readonly',
 
232
              'transport_server': 'server'}),
 
233
            ('RepositoryFormat2a(d)',
 
234
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
235
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
236
              'transport_readonly_server': 'readonly',
 
237
              'transport_server': 'server'})]
 
238
        self.assertEqual(expected, no_vfs_scenarios)
 
239
        self.assertEqual([
 
240
            ('RemoteRepositoryFormat(c)',
 
241
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
242
              'repository_format': remote.RemoteRepositoryFormat(),
 
243
              'transport_readonly_server': 'readonly',
 
244
              'transport_server': 'server',
 
245
              'vfs_transport_factory': 'vfs'}),
 
246
            ('RepositoryFormat2a(d)',
 
247
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
248
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
249
              'transport_readonly_server': 'readonly',
 
250
              'transport_server': 'server',
 
251
              'vfs_transport_factory': 'vfs'})],
 
252
            vfs_scenarios)
 
253
 
 
254
 
 
255
class TestTestScenarioApplication(tests.TestCase):
 
256
    """Tests for the test adaption facilities."""
 
257
 
 
258
    def test_apply_scenario(self):
 
259
        from bzrlib.tests import apply_scenario
 
260
        input_test = TestTestScenarioApplication("test_apply_scenario")
 
261
        # setup two adapted tests
 
262
        adapted_test1 = apply_scenario(input_test,
 
263
            ("new id",
 
264
            {"bzrdir_format":"bzr_format",
 
265
             "repository_format":"repo_fmt",
 
266
             "transport_server":"transport_server",
 
267
             "transport_readonly_server":"readonly-server"}))
 
268
        adapted_test2 = apply_scenario(input_test,
 
269
            ("new id 2", {"bzrdir_format":None}))
 
270
        # input_test should have been altered.
 
271
        self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
 
272
        # the new tests are mutually incompatible, ensuring it has
 
273
        # made new ones, and unspecified elements in the scenario
 
274
        # should not have been altered.
 
275
        self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
 
276
        self.assertEqual("repo_fmt", adapted_test1.repository_format)
 
277
        self.assertEqual("transport_server", adapted_test1.transport_server)
 
278
        self.assertEqual("readonly-server",
 
279
            adapted_test1.transport_readonly_server)
 
280
        self.assertEqual(
 
281
            "bzrlib.tests.test_selftest.TestTestScenarioApplication."
 
282
            "test_apply_scenario(new id)",
 
283
            adapted_test1.id())
 
284
        self.assertEqual(None, adapted_test2.bzrdir_format)
 
285
        self.assertEqual(
 
286
            "bzrlib.tests.test_selftest.TestTestScenarioApplication."
 
287
            "test_apply_scenario(new id 2)",
 
288
            adapted_test2.id())
 
289
 
 
290
 
 
291
class TestInterRepositoryScenarios(tests.TestCase):
 
292
 
 
293
    def test_scenarios(self):
 
294
        # check that constructor parameters are passed through to the adapted
 
295
        # test.
 
296
        from bzrlib.tests.per_interrepository import make_scenarios
 
297
        server1 = "a"
 
298
        server2 = "b"
 
299
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
300
        scenarios = make_scenarios(server1, server2, formats)
 
301
        self.assertEqual([
 
302
            ('C0,str,str',
 
303
             {'repository_format': 'C1',
 
304
              'repository_format_to': 'C2',
 
305
              'transport_readonly_server': 'b',
 
306
              'transport_server': 'a'}),
 
307
            ('D0,str,str',
 
308
             {'repository_format': 'D1',
 
309
              'repository_format_to': 'D2',
 
310
              'transport_readonly_server': 'b',
 
311
              'transport_server': 'a'})],
 
312
            scenarios)
 
313
 
 
314
 
 
315
class TestWorkingTreeScenarios(tests.TestCase):
 
316
 
 
317
    def test_scenarios(self):
 
318
        # check that constructor parameters are passed through to the adapted
 
319
        # test.
 
320
        from bzrlib.tests.per_workingtree import make_scenarios
 
321
        server1 = "a"
 
322
        server2 = "b"
 
323
        formats = [workingtree.WorkingTreeFormat2(),
 
324
                   workingtree.WorkingTreeFormat3(),]
 
325
        scenarios = make_scenarios(server1, server2, formats)
 
326
        self.assertEqual([
 
327
            ('WorkingTreeFormat2',
 
328
             {'bzrdir_format': formats[0]._matchingbzrdir,
 
329
              'transport_readonly_server': 'b',
 
330
              'transport_server': 'a',
 
331
              'workingtree_format': formats[0]}),
 
332
            ('WorkingTreeFormat3',
 
333
             {'bzrdir_format': formats[1]._matchingbzrdir,
 
334
              'transport_readonly_server': 'b',
 
335
              'transport_server': 'a',
 
336
              'workingtree_format': formats[1]})],
 
337
            scenarios)
 
338
 
 
339
 
 
340
class TestTreeScenarios(tests.TestCase):
 
341
 
 
342
    def test_scenarios(self):
 
343
        # the tree implementation scenario generator is meant to setup one
 
344
        # instance for each working tree format, and one additional instance
 
345
        # that will use the default wt format, but create a revision tree for
 
346
        # the tests.  this means that the wt ones should have the
 
347
        # workingtree_to_test_tree attribute set to 'return_parameter' and the
 
348
        # revision one set to revision_tree_from_workingtree.
 
349
 
 
350
        from bzrlib.tests.per_tree import (
 
351
            _dirstate_tree_from_workingtree,
 
352
            make_scenarios,
 
353
            preview_tree_pre,
 
354
            preview_tree_post,
 
355
            return_parameter,
 
356
            revision_tree_from_workingtree
 
357
            )
 
358
        server1 = "a"
 
359
        server2 = "b"
 
360
        formats = [workingtree.WorkingTreeFormat2(),
 
361
                   workingtree.WorkingTreeFormat3(),]
 
362
        scenarios = make_scenarios(server1, server2, formats)
 
363
        self.assertEqual(7, len(scenarios))
 
364
        default_wt_format = workingtree.WorkingTreeFormat4._default_format
 
365
        wt4_format = workingtree.WorkingTreeFormat4()
 
366
        wt5_format = workingtree.WorkingTreeFormat5()
 
367
        expected_scenarios = [
 
368
            ('WorkingTreeFormat2',
 
369
             {'bzrdir_format': formats[0]._matchingbzrdir,
 
370
              'transport_readonly_server': 'b',
 
371
              'transport_server': 'a',
 
372
              'workingtree_format': formats[0],
 
373
              '_workingtree_to_test_tree': return_parameter,
 
374
              }),
 
375
            ('WorkingTreeFormat3',
 
376
             {'bzrdir_format': formats[1]._matchingbzrdir,
 
377
              'transport_readonly_server': 'b',
 
378
              'transport_server': 'a',
 
379
              'workingtree_format': formats[1],
 
380
              '_workingtree_to_test_tree': return_parameter,
 
381
             }),
 
382
            ('RevisionTree',
 
383
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
 
384
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
385
              'transport_readonly_server': 'b',
 
386
              'transport_server': 'a',
 
387
              'workingtree_format': default_wt_format,
 
388
             }),
 
389
            ('DirStateRevisionTree,WT4',
 
390
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
391
              'bzrdir_format': wt4_format._matchingbzrdir,
 
392
              'transport_readonly_server': 'b',
 
393
              'transport_server': 'a',
 
394
              'workingtree_format': wt4_format,
 
395
             }),
 
396
            ('DirStateRevisionTree,WT5',
 
397
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
398
              'bzrdir_format': wt5_format._matchingbzrdir,
 
399
              'transport_readonly_server': 'b',
 
400
              'transport_server': 'a',
 
401
              'workingtree_format': wt5_format,
 
402
             }),
 
403
            ('PreviewTree',
 
404
             {'_workingtree_to_test_tree': preview_tree_pre,
 
405
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
406
              'transport_readonly_server': 'b',
 
407
              'transport_server': 'a',
 
408
              'workingtree_format': default_wt_format}),
 
409
            ('PreviewTreePost',
 
410
             {'_workingtree_to_test_tree': preview_tree_post,
 
411
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
412
              'transport_readonly_server': 'b',
 
413
              'transport_server': 'a',
 
414
              'workingtree_format': default_wt_format}),
 
415
             ]
 
416
        self.assertEqual(expected_scenarios, scenarios)
 
417
 
 
418
 
 
419
class TestInterTreeScenarios(tests.TestCase):
 
420
    """A group of tests that test the InterTreeTestAdapter."""
 
421
 
 
422
    def test_scenarios(self):
 
423
        # check that constructor parameters are passed through to the adapted
 
424
        # test.
 
425
        # for InterTree tests we want the machinery to bring up two trees in
 
426
        # each instance: the base one, and the one we are interacting with.
 
427
        # because each optimiser can be direction specific, we need to test
 
428
        # each optimiser in its chosen direction.
 
429
        # unlike the TestProviderAdapter we dont want to automatically add a
 
430
        # parameterized one for WorkingTree - the optimisers will tell us what
 
431
        # ones to add.
 
432
        from bzrlib.tests.per_tree import (
 
433
            return_parameter,
 
434
            revision_tree_from_workingtree
 
435
            )
 
436
        from bzrlib.tests.per_intertree import (
 
437
            make_scenarios,
 
438
            )
 
439
        from bzrlib.workingtree import WorkingTreeFormat2, WorkingTreeFormat3
 
440
        input_test = TestInterTreeScenarios(
 
441
            "test_scenarios")
 
442
        server1 = "a"
 
443
        server2 = "b"
 
444
        format1 = WorkingTreeFormat2()
 
445
        format2 = WorkingTreeFormat3()
 
446
        formats = [("1", str, format1, format2, "converter1"),
 
447
            ("2", int, format2, format1, "converter2")]
 
448
        scenarios = make_scenarios(server1, server2, formats)
 
449
        self.assertEqual(2, len(scenarios))
 
450
        expected_scenarios = [
 
451
            ("1", {
 
452
                "bzrdir_format": format1._matchingbzrdir,
 
453
                "intertree_class": formats[0][1],
 
454
                "workingtree_format": formats[0][2],
 
455
                "workingtree_format_to": formats[0][3],
 
456
                "mutable_trees_to_test_trees": formats[0][4],
 
457
                "_workingtree_to_test_tree": return_parameter,
 
458
                "transport_server": server1,
 
459
                "transport_readonly_server": server2,
 
460
                }),
 
461
            ("2", {
 
462
                "bzrdir_format": format2._matchingbzrdir,
 
463
                "intertree_class": formats[1][1],
 
464
                "workingtree_format": formats[1][2],
 
465
                "workingtree_format_to": formats[1][3],
 
466
                "mutable_trees_to_test_trees": formats[1][4],
 
467
                "_workingtree_to_test_tree": return_parameter,
 
468
                "transport_server": server1,
 
469
                "transport_readonly_server": server2,
 
470
                }),
 
471
            ]
 
472
        self.assertEqual(scenarios, expected_scenarios)
 
473
 
 
474
 
 
475
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
 
476
 
 
477
    def test_home_is_not_working(self):
 
478
        self.assertNotEqual(self.test_dir, self.test_home_dir)
 
479
        cwd = osutils.getcwd()
 
480
        self.assertIsSameRealPath(self.test_dir, cwd)
 
481
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
482
 
 
483
    def test_assertEqualStat_equal(self):
 
484
        from bzrlib.tests.test_dirstate import _FakeStat
 
485
        self.build_tree(["foo"])
 
486
        real = os.lstat("foo")
 
487
        fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
 
488
            real.st_dev, real.st_ino, real.st_mode)
 
489
        self.assertEqualStat(real, fake)
 
490
 
 
491
    def test_assertEqualStat_notequal(self):
 
492
        self.build_tree(["foo", "bar"])
 
493
        self.assertRaises(AssertionError, self.assertEqualStat,
 
494
            os.lstat("foo"), os.lstat("bar"))
 
495
 
 
496
 
 
497
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
 
498
 
 
499
    def test_home_is_non_existant_dir_under_root(self):
 
500
        """The test_home_dir for TestCaseWithMemoryTransport is missing.
 
501
 
 
502
        This is because TestCaseWithMemoryTransport is for tests that do not
 
503
        need any disk resources: they should be hooked into bzrlib in such a
 
504
        way that no global settings are being changed by the test (only a
 
505
        few tests should need to do that), and having a missing dir as home is
 
506
        an effective way to ensure that this is the case.
 
507
        """
 
508
        self.assertIsSameRealPath(
 
509
            self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
 
510
            self.test_home_dir)
 
511
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
512
 
 
513
    def test_cwd_is_TEST_ROOT(self):
 
514
        self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
 
515
        cwd = osutils.getcwd()
 
516
        self.assertIsSameRealPath(self.test_dir, cwd)
 
517
 
 
518
    def test_make_branch_and_memory_tree(self):
 
519
        """In TestCaseWithMemoryTransport we should not make the branch on disk.
 
520
 
 
521
        This is hard to comprehensively robustly test, so we settle for making
 
522
        a branch and checking no directory was created at its relpath.
 
523
        """
 
524
        tree = self.make_branch_and_memory_tree('dir')
 
525
        # Guard against regression into MemoryTransport leaking
 
526
        # files to disk instead of keeping them in memory.
 
527
        self.failIf(osutils.lexists('dir'))
 
528
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
529
 
 
530
    def test_make_branch_and_memory_tree_with_format(self):
 
531
        """make_branch_and_memory_tree should accept a format option."""
 
532
        format = bzrdir.BzrDirMetaFormat1()
 
533
        format.repository_format = weaverepo.RepositoryFormat7()
 
534
        tree = self.make_branch_and_memory_tree('dir', format=format)
 
535
        # Guard against regression into MemoryTransport leaking
 
536
        # files to disk instead of keeping them in memory.
 
537
        self.failIf(osutils.lexists('dir'))
 
538
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
539
        self.assertEqual(format.repository_format.__class__,
 
540
            tree.branch.repository._format.__class__)
 
541
 
 
542
    def test_make_branch_builder(self):
 
543
        builder = self.make_branch_builder('dir')
 
544
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
 
545
        # Guard against regression into MemoryTransport leaking
 
546
        # files to disk instead of keeping them in memory.
 
547
        self.failIf(osutils.lexists('dir'))
 
548
 
 
549
    def test_make_branch_builder_with_format(self):
 
550
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
 
551
        # that the format objects are used.
 
552
        format = bzrdir.BzrDirMetaFormat1()
 
553
        repo_format = weaverepo.RepositoryFormat7()
 
554
        format.repository_format = repo_format
 
555
        builder = self.make_branch_builder('dir', format=format)
 
556
        the_branch = builder.get_branch()
 
557
        # Guard against regression into MemoryTransport leaking
 
558
        # files to disk instead of keeping them in memory.
 
559
        self.failIf(osutils.lexists('dir'))
 
560
        self.assertEqual(format.repository_format.__class__,
 
561
                         the_branch.repository._format.__class__)
 
562
        self.assertEqual(repo_format.get_format_string(),
 
563
                         self.get_transport().get_bytes(
 
564
                            'dir/.bzr/repository/format'))
 
565
 
 
566
    def test_make_branch_builder_with_format_name(self):
 
567
        builder = self.make_branch_builder('dir', format='knit')
 
568
        the_branch = builder.get_branch()
 
569
        # Guard against regression into MemoryTransport leaking
 
570
        # files to disk instead of keeping them in memory.
 
571
        self.failIf(osutils.lexists('dir'))
 
572
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
 
573
        self.assertEqual(dir_format.repository_format.__class__,
 
574
                         the_branch.repository._format.__class__)
 
575
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
 
576
                         self.get_transport().get_bytes(
 
577
                            'dir/.bzr/repository/format'))
 
578
 
 
579
    def test_safety_net(self):
 
580
        """No test should modify the safety .bzr directory.
 
581
 
 
582
        We just test that the _check_safety_net private method raises
 
583
        AssertionError, it's easier than building a test suite with the same
 
584
        test.
 
585
        """
 
586
        # Oops, a commit in the current directory (i.e. without local .bzr
 
587
        # directory) will crawl up the hierarchy to find a .bzr directory.
 
588
        self.run_bzr(['commit', '-mfoo', '--unchanged'])
 
589
        # But we have a safety net in place.
 
590
        self.assertRaises(AssertionError, self._check_safety_net)
 
591
 
 
592
    def test_dangling_locks_cause_failures(self):
 
593
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
 
594
            def test_function(self):
 
595
                t = self.get_transport('.')
 
596
                l = lockdir.LockDir(t, 'lock')
 
597
                l.create()
 
598
                l.attempt_lock()
 
599
        test = TestDanglingLock('test_function')
 
600
        result = test.run()
 
601
        if self._lock_check_thorough:
 
602
            self.assertEqual(1, len(result.errors))
 
603
        else:
 
604
            # When _lock_check_thorough is disabled, then we don't trigger a
 
605
            # failure
 
606
            self.assertEqual(0, len(result.errors))
 
607
 
 
608
 
 
609
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
 
610
    """Tests for the convenience functions TestCaseWithTransport introduces."""
 
611
 
 
612
    def test_get_readonly_url_none(self):
 
613
        from bzrlib.transport import get_transport
 
614
        from bzrlib.transport.memory import MemoryServer
 
615
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
 
616
        self.vfs_transport_factory = MemoryServer
 
617
        self.transport_readonly_server = None
 
618
        # calling get_readonly_transport() constructs a decorator on the url
 
619
        # for the server
 
620
        url = self.get_readonly_url()
 
621
        url2 = self.get_readonly_url('foo/bar')
 
622
        t = get_transport(url)
 
623
        t2 = get_transport(url2)
 
624
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
 
625
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
 
626
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
627
 
 
628
    def test_get_readonly_url_http(self):
 
629
        from bzrlib.tests.http_server import HttpServer
 
630
        from bzrlib.transport import get_transport
 
631
        from bzrlib.transport.local import LocalURLServer
 
632
        from bzrlib.transport.http import HttpTransportBase
 
633
        self.transport_server = LocalURLServer
 
634
        self.transport_readonly_server = HttpServer
 
635
        # calling get_readonly_transport() gives us a HTTP server instance.
 
636
        url = self.get_readonly_url()
 
637
        url2 = self.get_readonly_url('foo/bar')
 
638
        # the transport returned may be any HttpTransportBase subclass
 
639
        t = get_transport(url)
 
640
        t2 = get_transport(url2)
 
641
        self.failUnless(isinstance(t, HttpTransportBase))
 
642
        self.failUnless(isinstance(t2, HttpTransportBase))
 
643
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
644
 
 
645
    def test_is_directory(self):
 
646
        """Test assertIsDirectory assertion"""
 
647
        t = self.get_transport()
 
648
        self.build_tree(['a_dir/', 'a_file'], transport=t)
 
649
        self.assertIsDirectory('a_dir', t)
 
650
        self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
 
651
        self.assertRaises(AssertionError, self.assertIsDirectory, 'not_here', t)
 
652
 
 
653
    def test_make_branch_builder(self):
 
654
        builder = self.make_branch_builder('dir')
 
655
        rev_id = builder.build_commit()
 
656
        self.failUnlessExists('dir')
 
657
        a_dir = bzrdir.BzrDir.open('dir')
 
658
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
 
659
        a_branch = a_dir.open_branch()
 
660
        builder_branch = builder.get_branch()
 
661
        self.assertEqual(a_branch.base, builder_branch.base)
 
662
        self.assertEqual((1, rev_id), builder_branch.last_revision_info())
 
663
        self.assertEqual((1, rev_id), a_branch.last_revision_info())
 
664
 
 
665
 
 
666
class TestTestCaseTransports(tests.TestCaseWithTransport):
 
667
 
 
668
    def setUp(self):
 
669
        super(TestTestCaseTransports, self).setUp()
 
670
        self.vfs_transport_factory = MemoryServer
 
671
 
 
672
    def test_make_bzrdir_preserves_transport(self):
 
673
        t = self.get_transport()
 
674
        result_bzrdir = self.make_bzrdir('subdir')
 
675
        self.assertIsInstance(result_bzrdir.transport,
 
676
                              MemoryTransport)
 
677
        # should not be on disk, should only be in memory
 
678
        self.failIfExists('subdir')
 
679
 
 
680
 
 
681
class TestChrootedTest(tests.ChrootedTestCase):
 
682
 
 
683
    def test_root_is_root(self):
 
684
        from bzrlib.transport import get_transport
 
685
        t = get_transport(self.get_readonly_url())
 
686
        url = t.base
 
687
        self.assertEqual(url, t.clone('..').base)
 
688
 
 
689
 
 
690
class TestTestResult(tests.TestCase):
 
691
 
 
692
    def check_timing(self, test_case, expected_re):
 
693
        result = bzrlib.tests.TextTestResult(self._log_file,
 
694
                descriptions=0,
 
695
                verbosity=1,
 
696
                )
 
697
        test_case.run(result)
 
698
        timed_string = result._testTimeString(test_case)
 
699
        self.assertContainsRe(timed_string, expected_re)
 
700
 
 
701
    def test_test_reporting(self):
 
702
        class ShortDelayTestCase(tests.TestCase):
 
703
            def test_short_delay(self):
 
704
                time.sleep(0.003)
 
705
            def test_short_benchmark(self):
 
706
                self.time(time.sleep, 0.003)
 
707
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
708
                          r"^ +[0-9]+ms$")
 
709
        # if a benchmark time is given, we now show just that time followed by
 
710
        # a star
 
711
        self.check_timing(ShortDelayTestCase('test_short_benchmark'),
 
712
                          r"^ +[0-9]+ms\*$")
 
713
 
 
714
    def test_unittest_reporting_unittest_class(self):
 
715
        # getting the time from a non-bzrlib test works ok
 
716
        class ShortDelayTestCase(unittest.TestCase):
 
717
            def test_short_delay(self):
 
718
                time.sleep(0.003)
 
719
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
720
                          r"^ +[0-9]+ms$")
 
721
 
 
722
    def test_assigned_benchmark_file_stores_date(self):
 
723
        output = StringIO()
 
724
        result = bzrlib.tests.TextTestResult(self._log_file,
 
725
                                        descriptions=0,
 
726
                                        verbosity=1,
 
727
                                        bench_history=output
 
728
                                        )
 
729
        output_string = output.getvalue()
 
730
        # if you are wondering about the regexp please read the comment in
 
731
        # test_bench_history (bzrlib.tests.test_selftest.TestRunner)
 
732
        # XXX: what comment?  -- Andrew Bennetts
 
733
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
734
 
 
735
    def test_benchhistory_records_test_times(self):
 
736
        result_stream = StringIO()
 
737
        result = bzrlib.tests.TextTestResult(
 
738
            self._log_file,
 
739
            descriptions=0,
 
740
            verbosity=1,
 
741
            bench_history=result_stream
 
742
            )
 
743
 
 
744
        # we want profile a call and check that its test duration is recorded
 
745
        # make a new test instance that when run will generate a benchmark
 
746
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
747
        # execute the test, which should succeed and record times
 
748
        example_test_case.run(result)
 
749
        lines = result_stream.getvalue().splitlines()
 
750
        self.assertEqual(2, len(lines))
 
751
        self.assertContainsRe(lines[1],
 
752
            " *[0-9]+ms bzrlib.tests.test_selftest.TestTestResult"
 
753
            "._time_hello_world_encoding")
 
754
 
 
755
    def _time_hello_world_encoding(self):
 
756
        """Profile two sleep calls
 
757
 
 
758
        This is used to exercise the test framework.
 
759
        """
 
760
        self.time(unicode, 'hello', errors='replace')
 
761
        self.time(unicode, 'world', errors='replace')
 
762
 
 
763
    def test_lsprofiling(self):
 
764
        """Verbose test result prints lsprof statistics from test cases."""
 
765
        self.requireFeature(test_lsprof.LSProfFeature)
 
766
        result_stream = StringIO()
 
767
        result = bzrlib.tests.VerboseTestResult(
 
768
            unittest._WritelnDecorator(result_stream),
 
769
            descriptions=0,
 
770
            verbosity=2,
 
771
            )
 
772
        # we want profile a call of some sort and check it is output by
 
773
        # addSuccess. We dont care about addError or addFailure as they
 
774
        # are not that interesting for performance tuning.
 
775
        # make a new test instance that when run will generate a profile
 
776
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
777
        example_test_case._gather_lsprof_in_benchmarks = True
 
778
        # execute the test, which should succeed and record profiles
 
779
        example_test_case.run(result)
 
780
        # lsprofile_something()
 
781
        # if this worked we want
 
782
        # LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
 
783
        #    CallCount    Recursive    Total(ms)   Inline(ms) module:lineno(function)
 
784
        # (the lsprof header)
 
785
        # ... an arbitrary number of lines
 
786
        # and the function call which is time.sleep.
 
787
        #           1        0            ???         ???       ???(sleep)
 
788
        # and then repeated but with 'world', rather than 'hello'.
 
789
        # this should appear in the output stream of our test result.
 
790
        output = result_stream.getvalue()
 
791
        self.assertContainsRe(output,
 
792
            r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
 
793
        self.assertContainsRe(output,
 
794
            r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
 
795
        self.assertContainsRe(output,
 
796
            r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
 
797
        self.assertContainsRe(output,
 
798
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
 
799
 
 
800
    def test_known_failure(self):
 
801
        """A KnownFailure being raised should trigger several result actions."""
 
802
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
803
            def done(self): pass
 
804
            def startTests(self): pass
 
805
            def report_test_start(self, test): pass
 
806
            def report_known_failure(self, test, err):
 
807
                self._call = test, err
 
808
        result = InstrumentedTestResult(None, None, None, None)
 
809
        def test_function():
 
810
            raise tests.KnownFailure('failed!')
 
811
        test = unittest.FunctionTestCase(test_function)
 
812
        test.run(result)
 
813
        # it should invoke 'report_known_failure'.
 
814
        self.assertEqual(2, len(result._call))
 
815
        self.assertEqual(test, result._call[0])
 
816
        self.assertEqual(tests.KnownFailure, result._call[1][0])
 
817
        self.assertIsInstance(result._call[1][1], tests.KnownFailure)
 
818
        # we dont introspec the traceback, if the rest is ok, it would be
 
819
        # exceptional for it not to be.
 
820
        # it should update the known_failure_count on the object.
 
821
        self.assertEqual(1, result.known_failure_count)
 
822
        # the result should be successful.
 
823
        self.assertTrue(result.wasSuccessful())
 
824
 
 
825
    def test_verbose_report_known_failure(self):
 
826
        # verbose test output formatting
 
827
        result_stream = StringIO()
 
828
        result = bzrlib.tests.VerboseTestResult(
 
829
            unittest._WritelnDecorator(result_stream),
 
830
            descriptions=0,
 
831
            verbosity=2,
 
832
            )
 
833
        test = self.get_passing_test()
 
834
        result.startTest(test)
 
835
        prefix = len(result_stream.getvalue())
 
836
        # the err parameter has the shape:
 
837
        # (class, exception object, traceback)
 
838
        # KnownFailures dont get their tracebacks shown though, so we
 
839
        # can skip that.
 
840
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
841
        result.report_known_failure(test, err)
 
842
        output = result_stream.getvalue()[prefix:]
 
843
        lines = output.splitlines()
 
844
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
845
        self.assertEqual(lines[1], '    foo')
 
846
        self.assertEqual(2, len(lines))
 
847
 
 
848
    def get_passing_test(self):
 
849
        """Return a test object that can't be run usefully."""
 
850
        def passing_test():
 
851
            pass
 
852
        return unittest.FunctionTestCase(passing_test)
 
853
 
 
854
    def test_add_not_supported(self):
 
855
        """Test the behaviour of invoking addNotSupported."""
 
856
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
857
            def done(self): pass
 
858
            def startTests(self): pass
 
859
            def report_test_start(self, test): pass
 
860
            def report_unsupported(self, test, feature):
 
861
                self._call = test, feature
 
862
        result = InstrumentedTestResult(None, None, None, None)
 
863
        test = SampleTestCase('_test_pass')
 
864
        feature = tests.Feature()
 
865
        result.startTest(test)
 
866
        result.addNotSupported(test, feature)
 
867
        # it should invoke 'report_unsupported'.
 
868
        self.assertEqual(2, len(result._call))
 
869
        self.assertEqual(test, result._call[0])
 
870
        self.assertEqual(feature, result._call[1])
 
871
        # the result should be successful.
 
872
        self.assertTrue(result.wasSuccessful())
 
873
        # it should record the test against a count of tests not run due to
 
874
        # this feature.
 
875
        self.assertEqual(1, result.unsupported['Feature'])
 
876
        # and invoking it again should increment that counter
 
877
        result.addNotSupported(test, feature)
 
878
        self.assertEqual(2, result.unsupported['Feature'])
 
879
 
 
880
    def test_verbose_report_unsupported(self):
 
881
        # verbose test output formatting
 
882
        result_stream = StringIO()
 
883
        result = bzrlib.tests.VerboseTestResult(
 
884
            unittest._WritelnDecorator(result_stream),
 
885
            descriptions=0,
 
886
            verbosity=2,
 
887
            )
 
888
        test = self.get_passing_test()
 
889
        feature = tests.Feature()
 
890
        result.startTest(test)
 
891
        prefix = len(result_stream.getvalue())
 
892
        result.report_unsupported(test, feature)
 
893
        output = result_stream.getvalue()[prefix:]
 
894
        lines = output.splitlines()
 
895
        self.assertEqual(lines, ['NODEP        0ms',
 
896
                                 "    The feature 'Feature' is not available."])
 
897
 
 
898
    def test_unavailable_exception(self):
 
899
        """An UnavailableFeature being raised should invoke addNotSupported."""
 
900
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
901
            def done(self): pass
 
902
            def startTests(self): pass
 
903
            def report_test_start(self, test): pass
 
904
            def addNotSupported(self, test, feature):
 
905
                self._call = test, feature
 
906
        result = InstrumentedTestResult(None, None, None, None)
 
907
        feature = tests.Feature()
 
908
        def test_function():
 
909
            raise tests.UnavailableFeature(feature)
 
910
        test = unittest.FunctionTestCase(test_function)
 
911
        test.run(result)
 
912
        # it should invoke 'addNotSupported'.
 
913
        self.assertEqual(2, len(result._call))
 
914
        self.assertEqual(test, result._call[0])
 
915
        self.assertEqual(feature, result._call[1])
 
916
        # and not count as an error
 
917
        self.assertEqual(0, result.error_count)
 
918
 
 
919
    def test_strict_with_unsupported_feature(self):
 
920
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
 
921
                                             verbosity=1)
 
922
        test = self.get_passing_test()
 
923
        feature = "Unsupported Feature"
 
924
        result.addNotSupported(test, feature)
 
925
        self.assertFalse(result.wasStrictlySuccessful())
 
926
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
927
 
 
928
    def test_strict_with_known_failure(self):
 
929
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
 
930
                                             verbosity=1)
 
931
        test = self.get_passing_test()
 
932
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
933
        result._addKnownFailure(test, err)
 
934
        self.assertFalse(result.wasStrictlySuccessful())
 
935
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
936
 
 
937
    def test_strict_with_success(self):
 
938
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
 
939
                                             verbosity=1)
 
940
        test = self.get_passing_test()
 
941
        result.addSuccess(test)
 
942
        self.assertTrue(result.wasStrictlySuccessful())
 
943
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
944
 
 
945
    def test_startTests(self):
 
946
        """Starting the first test should trigger startTests."""
 
947
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
948
            calls = 0
 
949
            def startTests(self): self.calls += 1
 
950
            def report_test_start(self, test): pass
 
951
        result = InstrumentedTestResult(None, None, None, None)
 
952
        def test_function():
 
953
            pass
 
954
        test = unittest.FunctionTestCase(test_function)
 
955
        test.run(result)
 
956
        self.assertEquals(1, result.calls)
 
957
 
 
958
 
 
959
class TestUnicodeFilenameFeature(tests.TestCase):
 
960
 
 
961
    def test_probe_passes(self):
 
962
        """UnicodeFilenameFeature._probe passes."""
 
963
        # We can't test much more than that because the behaviour depends
 
964
        # on the platform.
 
965
        tests.UnicodeFilenameFeature._probe()
 
966
 
 
967
 
 
968
class TestRunner(tests.TestCase):
 
969
 
 
970
    def dummy_test(self):
 
971
        pass
 
972
 
 
973
    def run_test_runner(self, testrunner, test):
 
974
        """Run suite in testrunner, saving global state and restoring it.
 
975
 
 
976
        This current saves and restores:
 
977
        TestCaseInTempDir.TEST_ROOT
 
978
 
 
979
        There should be no tests in this file that use
 
980
        bzrlib.tests.TextTestRunner without using this convenience method,
 
981
        because of our use of global state.
 
982
        """
 
983
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
984
        try:
 
985
            tests.TestCaseInTempDir.TEST_ROOT = None
 
986
            return testrunner.run(test)
 
987
        finally:
 
988
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
989
 
 
990
    def test_known_failure_failed_run(self):
 
991
        # run a test that generates a known failure which should be printed in
 
992
        # the final output when real failures occur.
 
993
        def known_failure_test():
 
994
            raise tests.KnownFailure('failed')
 
995
        test = unittest.TestSuite()
 
996
        test.addTest(unittest.FunctionTestCase(known_failure_test))
 
997
        def failing_test():
 
998
            raise AssertionError('foo')
 
999
        test.addTest(unittest.FunctionTestCase(failing_test))
 
1000
        stream = StringIO()
 
1001
        runner = tests.TextTestRunner(stream=stream)
 
1002
        result = self.run_test_runner(runner, test)
 
1003
        lines = stream.getvalue().splitlines()
 
1004
        self.assertContainsRe(stream.getvalue(),
 
1005
            '(?sm)^testing.*$'
 
1006
            '.*'
 
1007
            '^======================================================================\n'
 
1008
            '^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
 
1009
            '^----------------------------------------------------------------------\n'
 
1010
            'Traceback \\(most recent call last\\):\n'
 
1011
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1012
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1013
            '.*'
 
1014
            '^----------------------------------------------------------------------\n'
 
1015
            '.*'
 
1016
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1017
            )
 
1018
 
 
1019
    def test_known_failure_ok_run(self):
 
1020
        # run a test that generates a known failure which should be printed in the final output.
 
1021
        def known_failure_test():
 
1022
            raise tests.KnownFailure('failed')
 
1023
        test = unittest.FunctionTestCase(known_failure_test)
 
1024
        stream = StringIO()
 
1025
        runner = tests.TextTestRunner(stream=stream)
 
1026
        result = self.run_test_runner(runner, test)
 
1027
        self.assertContainsRe(stream.getvalue(),
 
1028
            '\n'
 
1029
            '-*\n'
 
1030
            'Ran 1 test in .*\n'
 
1031
            '\n'
 
1032
            'OK \\(known_failures=1\\)\n')
 
1033
 
 
1034
    def test_skipped_test(self):
 
1035
        # run a test that is skipped, and check the suite as a whole still
 
1036
        # succeeds.
 
1037
        # skipping_test must be hidden in here so it's not run as a real test
 
1038
        class SkippingTest(tests.TestCase):
 
1039
            def skipping_test(self):
 
1040
                raise tests.TestSkipped('test intentionally skipped')
 
1041
        runner = tests.TextTestRunner(stream=self._log_file)
 
1042
        test = SkippingTest("skipping_test")
 
1043
        result = self.run_test_runner(runner, test)
 
1044
        self.assertTrue(result.wasSuccessful())
 
1045
 
 
1046
    def test_skipped_from_setup(self):
 
1047
        calls = []
 
1048
        class SkippedSetupTest(tests.TestCase):
 
1049
 
 
1050
            def setUp(self):
 
1051
                calls.append('setUp')
 
1052
                self.addCleanup(self.cleanup)
 
1053
                raise tests.TestSkipped('skipped setup')
 
1054
 
 
1055
            def test_skip(self):
 
1056
                self.fail('test reached')
 
1057
 
 
1058
            def cleanup(self):
 
1059
                calls.append('cleanup')
 
1060
 
 
1061
        runner = tests.TextTestRunner(stream=self._log_file)
 
1062
        test = SkippedSetupTest('test_skip')
 
1063
        result = self.run_test_runner(runner, test)
 
1064
        self.assertTrue(result.wasSuccessful())
 
1065
        # Check if cleanup was called the right number of times.
 
1066
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1067
 
 
1068
    def test_skipped_from_test(self):
 
1069
        calls = []
 
1070
        class SkippedTest(tests.TestCase):
 
1071
 
 
1072
            def setUp(self):
 
1073
                tests.TestCase.setUp(self)
 
1074
                calls.append('setUp')
 
1075
                self.addCleanup(self.cleanup)
 
1076
 
 
1077
            def test_skip(self):
 
1078
                raise tests.TestSkipped('skipped test')
 
1079
 
 
1080
            def cleanup(self):
 
1081
                calls.append('cleanup')
 
1082
 
 
1083
        runner = tests.TextTestRunner(stream=self._log_file)
 
1084
        test = SkippedTest('test_skip')
 
1085
        result = self.run_test_runner(runner, test)
 
1086
        self.assertTrue(result.wasSuccessful())
 
1087
        # Check if cleanup was called the right number of times.
 
1088
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1089
 
 
1090
    def test_not_applicable(self):
 
1091
        # run a test that is skipped because it's not applicable
 
1092
        def not_applicable_test():
 
1093
            raise tests.TestNotApplicable('this test never runs')
 
1094
        out = StringIO()
 
1095
        runner = tests.TextTestRunner(stream=out, verbosity=2)
 
1096
        test = unittest.FunctionTestCase(not_applicable_test)
 
1097
        result = self.run_test_runner(runner, test)
 
1098
        self._log_file.write(out.getvalue())
 
1099
        self.assertTrue(result.wasSuccessful())
 
1100
        self.assertTrue(result.wasStrictlySuccessful())
 
1101
        self.assertContainsRe(out.getvalue(),
 
1102
                r'(?m)not_applicable_test   * N/A')
 
1103
        self.assertContainsRe(out.getvalue(),
 
1104
                r'(?m)^    this test never runs')
 
1105
 
 
1106
    def test_not_applicable_demo(self):
 
1107
        # just so you can see it in the test output
 
1108
        raise tests.TestNotApplicable('this test is just a demonstation')
 
1109
 
 
1110
    def test_unsupported_features_listed(self):
 
1111
        """When unsupported features are encountered they are detailed."""
 
1112
        class Feature1(tests.Feature):
 
1113
            def _probe(self): return False
 
1114
        class Feature2(tests.Feature):
 
1115
            def _probe(self): return False
 
1116
        # create sample tests
 
1117
        test1 = SampleTestCase('_test_pass')
 
1118
        test1._test_needs_features = [Feature1()]
 
1119
        test2 = SampleTestCase('_test_pass')
 
1120
        test2._test_needs_features = [Feature2()]
 
1121
        test = unittest.TestSuite()
 
1122
        test.addTest(test1)
 
1123
        test.addTest(test2)
 
1124
        stream = StringIO()
 
1125
        runner = tests.TextTestRunner(stream=stream)
 
1126
        result = self.run_test_runner(runner, test)
 
1127
        lines = stream.getvalue().splitlines()
 
1128
        self.assertEqual([
 
1129
            'OK',
 
1130
            "Missing feature 'Feature1' skipped 1 tests.",
 
1131
            "Missing feature 'Feature2' skipped 1 tests.",
 
1132
            ],
 
1133
            lines[-3:])
 
1134
 
 
1135
    def test_bench_history(self):
 
1136
        # tests that the running the benchmark produces a history file
 
1137
        # containing a timestamp and the revision id of the bzrlib source which
 
1138
        # was tested.
 
1139
        workingtree = _get_bzr_source_tree()
 
1140
        test = TestRunner('dummy_test')
 
1141
        output = StringIO()
 
1142
        runner = tests.TextTestRunner(stream=self._log_file,
 
1143
                                      bench_history=output)
 
1144
        result = self.run_test_runner(runner, test)
 
1145
        output_string = output.getvalue()
 
1146
        self.assertContainsRe(output_string, "--date [0-9.]+")
 
1147
        if workingtree is not None:
 
1148
            revision_id = workingtree.get_parent_ids()[0]
 
1149
            self.assertEndsWith(output_string.rstrip(), revision_id)
 
1150
 
 
1151
    def assertLogDeleted(self, test):
 
1152
        log = test._get_log()
 
1153
        self.assertEqual("DELETED log file to reduce memory footprint", log)
 
1154
        self.assertEqual('', test._log_contents)
 
1155
        self.assertIs(None, test._log_file_name)
 
1156
 
 
1157
    def test_success_log_deleted(self):
 
1158
        """Successful tests have their log deleted"""
 
1159
 
 
1160
        class LogTester(tests.TestCase):
 
1161
 
 
1162
            def test_success(self):
 
1163
                self.log('this will be removed\n')
 
1164
 
 
1165
        sio = StringIO()
 
1166
        runner = tests.TextTestRunner(stream=sio)
 
1167
        test = LogTester('test_success')
 
1168
        result = self.run_test_runner(runner, test)
 
1169
 
 
1170
        self.assertLogDeleted(test)
 
1171
 
 
1172
    def test_skipped_log_deleted(self):
 
1173
        """Skipped tests have their log deleted"""
 
1174
 
 
1175
        class LogTester(tests.TestCase):
 
1176
 
 
1177
            def test_skipped(self):
 
1178
                self.log('this will be removed\n')
 
1179
                raise tests.TestSkipped()
 
1180
 
 
1181
        sio = StringIO()
 
1182
        runner = tests.TextTestRunner(stream=sio)
 
1183
        test = LogTester('test_skipped')
 
1184
        result = self.run_test_runner(runner, test)
 
1185
 
 
1186
        self.assertLogDeleted(test)
 
1187
 
 
1188
    def test_not_aplicable_log_deleted(self):
 
1189
        """Not applicable tests have their log deleted"""
 
1190
 
 
1191
        class LogTester(tests.TestCase):
 
1192
 
 
1193
            def test_not_applicable(self):
 
1194
                self.log('this will be removed\n')
 
1195
                raise tests.TestNotApplicable()
 
1196
 
 
1197
        sio = StringIO()
 
1198
        runner = tests.TextTestRunner(stream=sio)
 
1199
        test = LogTester('test_not_applicable')
 
1200
        result = self.run_test_runner(runner, test)
 
1201
 
 
1202
        self.assertLogDeleted(test)
 
1203
 
 
1204
    def test_known_failure_log_deleted(self):
 
1205
        """Know failure tests have their log deleted"""
 
1206
 
 
1207
        class LogTester(tests.TestCase):
 
1208
 
 
1209
            def test_known_failure(self):
 
1210
                self.log('this will be removed\n')
 
1211
                raise tests.KnownFailure()
 
1212
 
 
1213
        sio = StringIO()
 
1214
        runner = tests.TextTestRunner(stream=sio)
 
1215
        test = LogTester('test_known_failure')
 
1216
        result = self.run_test_runner(runner, test)
 
1217
 
 
1218
        self.assertLogDeleted(test)
 
1219
 
 
1220
    def test_fail_log_kept(self):
 
1221
        """Failed tests have their log kept"""
 
1222
 
 
1223
        class LogTester(tests.TestCase):
 
1224
 
 
1225
            def test_fail(self):
 
1226
                self.log('this will be kept\n')
 
1227
                self.fail('this test fails')
 
1228
 
 
1229
        sio = StringIO()
 
1230
        runner = tests.TextTestRunner(stream=sio)
 
1231
        test = LogTester('test_fail')
 
1232
        result = self.run_test_runner(runner, test)
 
1233
 
 
1234
        text = sio.getvalue()
 
1235
        self.assertContainsRe(text, 'this will be kept')
 
1236
        self.assertContainsRe(text, 'this test fails')
 
1237
 
 
1238
        log = test._get_log()
 
1239
        self.assertContainsRe(log, 'this will be kept')
 
1240
        self.assertEqual(log, test._log_contents)
 
1241
 
 
1242
    def test_error_log_kept(self):
 
1243
        """Tests with errors have their log kept"""
 
1244
 
 
1245
        class LogTester(tests.TestCase):
 
1246
 
 
1247
            def test_error(self):
 
1248
                self.log('this will be kept\n')
 
1249
                raise ValueError('random exception raised')
 
1250
 
 
1251
        sio = StringIO()
 
1252
        runner = tests.TextTestRunner(stream=sio)
 
1253
        test = LogTester('test_error')
 
1254
        result = self.run_test_runner(runner, test)
 
1255
 
 
1256
        text = sio.getvalue()
 
1257
        self.assertContainsRe(text, 'this will be kept')
 
1258
        self.assertContainsRe(text, 'random exception raised')
 
1259
 
 
1260
        log = test._get_log()
 
1261
        self.assertContainsRe(log, 'this will be kept')
 
1262
        self.assertEqual(log, test._log_contents)
 
1263
 
 
1264
 
 
1265
class SampleTestCase(tests.TestCase):
 
1266
 
 
1267
    def _test_pass(self):
 
1268
        pass
 
1269
 
 
1270
class _TestException(Exception):
 
1271
    pass
 
1272
 
 
1273
 
 
1274
class TestTestCase(tests.TestCase):
 
1275
    """Tests that test the core bzrlib TestCase."""
 
1276
 
 
1277
    def test_assertLength_matches_empty(self):
 
1278
        a_list = []
 
1279
        self.assertLength(0, a_list)
 
1280
 
 
1281
    def test_assertLength_matches_nonempty(self):
 
1282
        a_list = [1, 2, 3]
 
1283
        self.assertLength(3, a_list)
 
1284
 
 
1285
    def test_assertLength_fails_different(self):
 
1286
        a_list = []
 
1287
        self.assertRaises(AssertionError, self.assertLength, 1, a_list)
 
1288
 
 
1289
    def test_assertLength_shows_sequence_in_failure(self):
 
1290
        a_list = [1, 2, 3]
 
1291
        exception = self.assertRaises(AssertionError, self.assertLength, 2,
 
1292
            a_list)
 
1293
        self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
 
1294
            exception.args[0])
 
1295
 
 
1296
    def test_base_setUp_not_called_causes_failure(self):
 
1297
        class TestCaseWithBrokenSetUp(tests.TestCase):
 
1298
            def setUp(self):
 
1299
                pass # does not call TestCase.setUp
 
1300
            def test_foo(self):
 
1301
                pass
 
1302
        test = TestCaseWithBrokenSetUp('test_foo')
 
1303
        result = unittest.TestResult()
 
1304
        test.run(result)
 
1305
        self.assertFalse(result.wasSuccessful())
 
1306
        self.assertEqual(1, result.testsRun)
 
1307
 
 
1308
    def test_base_tearDown_not_called_causes_failure(self):
 
1309
        class TestCaseWithBrokenTearDown(tests.TestCase):
 
1310
            def tearDown(self):
 
1311
                pass # does not call TestCase.tearDown
 
1312
            def test_foo(self):
 
1313
                pass
 
1314
        test = TestCaseWithBrokenTearDown('test_foo')
 
1315
        result = unittest.TestResult()
 
1316
        test.run(result)
 
1317
        self.assertFalse(result.wasSuccessful())
 
1318
        self.assertEqual(1, result.testsRun)
 
1319
 
 
1320
    def test_debug_flags_sanitised(self):
 
1321
        """The bzrlib debug flags should be sanitised by setUp."""
 
1322
        if 'allow_debug' in tests.selftest_debug_flags:
 
1323
            raise tests.TestNotApplicable(
 
1324
                '-Eallow_debug option prevents debug flag sanitisation')
 
1325
        # we could set something and run a test that will check
 
1326
        # it gets santised, but this is probably sufficient for now:
 
1327
        # if someone runs the test with -Dsomething it will error.
 
1328
        flags = set()
 
1329
        if self._lock_check_thorough:
 
1330
            flags.add('strict_locks')
 
1331
        self.assertEqual(flags, bzrlib.debug.debug_flags)
 
1332
 
 
1333
    def change_selftest_debug_flags(self, new_flags):
 
1334
        orig_selftest_flags = tests.selftest_debug_flags
 
1335
        self.addCleanup(self._restore_selftest_debug_flags, orig_selftest_flags)
 
1336
        tests.selftest_debug_flags = set(new_flags)
 
1337
 
 
1338
    def _restore_selftest_debug_flags(self, flags):
 
1339
        tests.selftest_debug_flags = flags
 
1340
 
 
1341
    def test_allow_debug_flag(self):
 
1342
        """The -Eallow_debug flag prevents bzrlib.debug.debug_flags from being
 
1343
        sanitised (i.e. cleared) before running a test.
 
1344
        """
 
1345
        self.change_selftest_debug_flags(set(['allow_debug']))
 
1346
        bzrlib.debug.debug_flags = set(['a-flag'])
 
1347
        class TestThatRecordsFlags(tests.TestCase):
 
1348
            def test_foo(nested_self):
 
1349
                self.flags = set(bzrlib.debug.debug_flags)
 
1350
        test = TestThatRecordsFlags('test_foo')
 
1351
        test.run(self.make_test_result())
 
1352
        flags = set(['a-flag'])
 
1353
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1354
            flags.add('strict_locks')
 
1355
        self.assertEqual(flags, self.flags)
 
1356
 
 
1357
    def test_disable_lock_checks(self):
 
1358
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1359
        class TestThatRecordsFlags(tests.TestCase):
 
1360
            def test_foo(nested_self):
 
1361
                self.flags = set(bzrlib.debug.debug_flags)
 
1362
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1363
        self.change_selftest_debug_flags(set())
 
1364
        test = TestThatRecordsFlags('test_foo')
 
1365
        test.run(self.make_test_result())
 
1366
        # By default we do strict lock checking and thorough lock/unlock
 
1367
        # tracking.
 
1368
        self.assertTrue(self.test_lock_check_thorough)
 
1369
        self.assertEqual(set(['strict_locks']), self.flags)
 
1370
        # Now set the disable_lock_checks flag, and show that this changed.
 
1371
        self.change_selftest_debug_flags(set(['disable_lock_checks']))
 
1372
        test = TestThatRecordsFlags('test_foo')
 
1373
        test.run(self.make_test_result())
 
1374
        self.assertFalse(self.test_lock_check_thorough)
 
1375
        self.assertEqual(set(), self.flags)
 
1376
 
 
1377
    def test_this_fails_strict_lock_check(self):
 
1378
        class TestThatRecordsFlags(tests.TestCase):
 
1379
            def test_foo(nested_self):
 
1380
                self.flags1 = set(bzrlib.debug.debug_flags)
 
1381
                self.thisFailsStrictLockCheck()
 
1382
                self.flags2 = set(bzrlib.debug.debug_flags)
 
1383
        # Make sure lock checking is active
 
1384
        self.change_selftest_debug_flags(set())
 
1385
        test = TestThatRecordsFlags('test_foo')
 
1386
        test.run(self.make_test_result())
 
1387
        self.assertEqual(set(['strict_locks']), self.flags1)
 
1388
        self.assertEqual(set(), self.flags2)
 
1389
 
 
1390
    def test_debug_flags_restored(self):
 
1391
        """The bzrlib debug flags should be restored to their original state
 
1392
        after the test was run, even if allow_debug is set.
 
1393
        """
 
1394
        self.change_selftest_debug_flags(set(['allow_debug']))
 
1395
        # Now run a test that modifies debug.debug_flags.
 
1396
        bzrlib.debug.debug_flags = set(['original-state'])
 
1397
        class TestThatModifiesFlags(tests.TestCase):
 
1398
            def test_foo(self):
 
1399
                bzrlib.debug.debug_flags = set(['modified'])
 
1400
        test = TestThatModifiesFlags('test_foo')
 
1401
        test.run(self.make_test_result())
 
1402
        self.assertEqual(set(['original-state']), bzrlib.debug.debug_flags)
 
1403
 
 
1404
    def make_test_result(self):
 
1405
        return tests.TextTestResult(self._log_file, descriptions=0, verbosity=1)
 
1406
 
 
1407
    def inner_test(self):
 
1408
        # the inner child test
 
1409
        note("inner_test")
 
1410
 
 
1411
    def outer_child(self):
 
1412
        # the outer child test
 
1413
        note("outer_start")
 
1414
        self.inner_test = TestTestCase("inner_child")
 
1415
        result = self.make_test_result()
 
1416
        self.inner_test.run(result)
 
1417
        note("outer finish")
 
1418
 
 
1419
    def test_trace_nesting(self):
 
1420
        # this tests that each test case nests its trace facility correctly.
 
1421
        # we do this by running a test case manually. That test case (A)
 
1422
        # should setup a new log, log content to it, setup a child case (B),
 
1423
        # which should log independently, then case (A) should log a trailer
 
1424
        # and return.
 
1425
        # we do two nested children so that we can verify the state of the
 
1426
        # logs after the outer child finishes is correct, which a bad clean
 
1427
        # up routine in tearDown might trigger a fault in our test with only
 
1428
        # one child, we should instead see the bad result inside our test with
 
1429
        # the two children.
 
1430
        # the outer child test
 
1431
        original_trace = bzrlib.trace._trace_file
 
1432
        outer_test = TestTestCase("outer_child")
 
1433
        result = self.make_test_result()
 
1434
        outer_test.run(result)
 
1435
        self.assertEqual(original_trace, bzrlib.trace._trace_file)
 
1436
 
 
1437
    def method_that_times_a_bit_twice(self):
 
1438
        # call self.time twice to ensure it aggregates
 
1439
        self.time(time.sleep, 0.007)
 
1440
        self.time(time.sleep, 0.007)
 
1441
 
 
1442
    def test_time_creates_benchmark_in_result(self):
 
1443
        """Test that the TestCase.time() method accumulates a benchmark time."""
 
1444
        sample_test = TestTestCase("method_that_times_a_bit_twice")
 
1445
        output_stream = StringIO()
 
1446
        result = bzrlib.tests.VerboseTestResult(
 
1447
            unittest._WritelnDecorator(output_stream),
 
1448
            descriptions=0,
 
1449
            verbosity=2)
 
1450
        sample_test.run(result)
 
1451
        self.assertContainsRe(
 
1452
            output_stream.getvalue(),
 
1453
            r"\d+ms\*\n$")
 
1454
 
 
1455
    def test_hooks_sanitised(self):
 
1456
        """The bzrlib hooks should be sanitised by setUp."""
 
1457
        # Note this test won't fail with hooks that the core library doesn't
 
1458
        # use - but it trigger with a plugin that adds hooks, so its still a
 
1459
        # useful warning in that case.
 
1460
        self.assertEqual(bzrlib.branch.BranchHooks(),
 
1461
            bzrlib.branch.Branch.hooks)
 
1462
        self.assertEqual(bzrlib.smart.server.SmartServerHooks(),
 
1463
            bzrlib.smart.server.SmartTCPServer.hooks)
 
1464
        self.assertEqual(bzrlib.commands.CommandHooks(),
 
1465
            bzrlib.commands.Command.hooks)
 
1466
 
 
1467
    def test__gather_lsprof_in_benchmarks(self):
 
1468
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
 
1469
 
 
1470
        Each self.time() call is individually and separately profiled.
 
1471
        """
 
1472
        self.requireFeature(test_lsprof.LSProfFeature)
 
1473
        # overrides the class member with an instance member so no cleanup
 
1474
        # needed.
 
1475
        self._gather_lsprof_in_benchmarks = True
 
1476
        self.time(time.sleep, 0.000)
 
1477
        self.time(time.sleep, 0.003)
 
1478
        self.assertEqual(2, len(self._benchcalls))
 
1479
        self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
 
1480
        self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
 
1481
        self.assertIsInstance(self._benchcalls[0][1], bzrlib.lsprof.Stats)
 
1482
        self.assertIsInstance(self._benchcalls[1][1], bzrlib.lsprof.Stats)
 
1483
 
 
1484
    def test_knownFailure(self):
 
1485
        """Self.knownFailure() should raise a KnownFailure exception."""
 
1486
        self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
 
1487
 
 
1488
    def test_requireFeature_available(self):
 
1489
        """self.requireFeature(available) is a no-op."""
 
1490
        class Available(tests.Feature):
 
1491
            def _probe(self):return True
 
1492
        feature = Available()
 
1493
        self.requireFeature(feature)
 
1494
 
 
1495
    def test_requireFeature_unavailable(self):
 
1496
        """self.requireFeature(unavailable) raises UnavailableFeature."""
 
1497
        class Unavailable(tests.Feature):
 
1498
            def _probe(self):return False
 
1499
        feature = Unavailable()
 
1500
        self.assertRaises(tests.UnavailableFeature,
 
1501
                          self.requireFeature, feature)
 
1502
 
 
1503
    def test_run_no_parameters(self):
 
1504
        test = SampleTestCase('_test_pass')
 
1505
        test.run()
 
1506
 
 
1507
    def test_run_enabled_unittest_result(self):
 
1508
        """Test we revert to regular behaviour when the test is enabled."""
 
1509
        test = SampleTestCase('_test_pass')
 
1510
        class EnabledFeature(object):
 
1511
            def available(self):
 
1512
                return True
 
1513
        test._test_needs_features = [EnabledFeature()]
 
1514
        result = unittest.TestResult()
 
1515
        test.run(result)
 
1516
        self.assertEqual(1, result.testsRun)
 
1517
        self.assertEqual([], result.errors)
 
1518
        self.assertEqual([], result.failures)
 
1519
 
 
1520
    def test_run_disabled_unittest_result(self):
 
1521
        """Test our compatability for disabled tests with unittest results."""
 
1522
        test = SampleTestCase('_test_pass')
 
1523
        class DisabledFeature(object):
 
1524
            def available(self):
 
1525
                return False
 
1526
        test._test_needs_features = [DisabledFeature()]
 
1527
        result = unittest.TestResult()
 
1528
        test.run(result)
 
1529
        self.assertEqual(1, result.testsRun)
 
1530
        self.assertEqual([], result.errors)
 
1531
        self.assertEqual([], result.failures)
 
1532
 
 
1533
    def test_run_disabled_supporting_result(self):
 
1534
        """Test disabled tests behaviour with support aware results."""
 
1535
        test = SampleTestCase('_test_pass')
 
1536
        class DisabledFeature(object):
 
1537
            def available(self):
 
1538
                return False
 
1539
        the_feature = DisabledFeature()
 
1540
        test._test_needs_features = [the_feature]
 
1541
        class InstrumentedTestResult(unittest.TestResult):
 
1542
            def __init__(self):
 
1543
                unittest.TestResult.__init__(self)
 
1544
                self.calls = []
 
1545
            def startTest(self, test):
 
1546
                self.calls.append(('startTest', test))
 
1547
            def stopTest(self, test):
 
1548
                self.calls.append(('stopTest', test))
 
1549
            def addNotSupported(self, test, feature):
 
1550
                self.calls.append(('addNotSupported', test, feature))
 
1551
        result = InstrumentedTestResult()
 
1552
        test.run(result)
 
1553
        self.assertEqual([
 
1554
            ('startTest', test),
 
1555
            ('addNotSupported', test, the_feature),
 
1556
            ('stopTest', test),
 
1557
            ],
 
1558
            result.calls)
 
1559
 
 
1560
    def test_assert_list_raises_on_generator(self):
 
1561
        def generator_which_will_raise():
 
1562
            # This will not raise until after the first yield
 
1563
            yield 1
 
1564
            raise _TestException()
 
1565
 
 
1566
        e = self.assertListRaises(_TestException, generator_which_will_raise)
 
1567
        self.assertIsInstance(e, _TestException)
 
1568
 
 
1569
        e = self.assertListRaises(Exception, generator_which_will_raise)
 
1570
        self.assertIsInstance(e, _TestException)
 
1571
 
 
1572
    def test_assert_list_raises_on_plain(self):
 
1573
        def plain_exception():
 
1574
            raise _TestException()
 
1575
            return []
 
1576
 
 
1577
        e = self.assertListRaises(_TestException, plain_exception)
 
1578
        self.assertIsInstance(e, _TestException)
 
1579
 
 
1580
        e = self.assertListRaises(Exception, plain_exception)
 
1581
        self.assertIsInstance(e, _TestException)
 
1582
 
 
1583
    def test_assert_list_raises_assert_wrong_exception(self):
 
1584
        class _NotTestException(Exception):
 
1585
            pass
 
1586
 
 
1587
        def wrong_exception():
 
1588
            raise _NotTestException()
 
1589
 
 
1590
        def wrong_exception_generator():
 
1591
            yield 1
 
1592
            yield 2
 
1593
            raise _NotTestException()
 
1594
 
 
1595
        # Wrong exceptions are not intercepted
 
1596
        self.assertRaises(_NotTestException,
 
1597
            self.assertListRaises, _TestException, wrong_exception)
 
1598
        self.assertRaises(_NotTestException,
 
1599
            self.assertListRaises, _TestException, wrong_exception_generator)
 
1600
 
 
1601
    def test_assert_list_raises_no_exception(self):
 
1602
        def success():
 
1603
            return []
 
1604
 
 
1605
        def success_generator():
 
1606
            yield 1
 
1607
            yield 2
 
1608
 
 
1609
        self.assertRaises(AssertionError,
 
1610
            self.assertListRaises, _TestException, success)
 
1611
 
 
1612
        self.assertRaises(AssertionError,
 
1613
            self.assertListRaises, _TestException, success_generator)
 
1614
 
 
1615
 
 
1616
# NB: Don't delete this; it's not actually from 0.11!
 
1617
@deprecated_function(deprecated_in((0, 11, 0)))
 
1618
def sample_deprecated_function():
 
1619
    """A deprecated function to test applyDeprecated with."""
 
1620
    return 2
 
1621
 
 
1622
 
 
1623
def sample_undeprecated_function(a_param):
 
1624
    """A undeprecated function to test applyDeprecated with."""
 
1625
 
 
1626
 
 
1627
class ApplyDeprecatedHelper(object):
 
1628
    """A helper class for ApplyDeprecated tests."""
 
1629
 
 
1630
    @deprecated_method(deprecated_in((0, 11, 0)))
 
1631
    def sample_deprecated_method(self, param_one):
 
1632
        """A deprecated method for testing with."""
 
1633
        return param_one
 
1634
 
 
1635
    def sample_normal_method(self):
 
1636
        """A undeprecated method."""
 
1637
 
 
1638
    @deprecated_method(deprecated_in((0, 10, 0)))
 
1639
    def sample_nested_deprecation(self):
 
1640
        return sample_deprecated_function()
 
1641
 
 
1642
 
 
1643
class TestExtraAssertions(tests.TestCase):
 
1644
    """Tests for new test assertions in bzrlib test suite"""
 
1645
 
 
1646
    def test_assert_isinstance(self):
 
1647
        self.assertIsInstance(2, int)
 
1648
        self.assertIsInstance(u'', basestring)
 
1649
        e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
 
1650
        self.assertEquals(str(e),
 
1651
            "None is an instance of <type 'NoneType'> rather than <type 'int'>")
 
1652
        self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
 
1653
        e = self.assertRaises(AssertionError,
 
1654
            self.assertIsInstance, None, int, "it's just not")
 
1655
        self.assertEquals(str(e),
 
1656
            "None is an instance of <type 'NoneType'> rather than <type 'int'>"
 
1657
            ": it's just not")
 
1658
 
 
1659
    def test_assertEndsWith(self):
 
1660
        self.assertEndsWith('foo', 'oo')
 
1661
        self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
 
1662
 
 
1663
    def test_applyDeprecated_not_deprecated(self):
 
1664
        sample_object = ApplyDeprecatedHelper()
 
1665
        # calling an undeprecated callable raises an assertion
 
1666
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1667
            deprecated_in((0, 11, 0)),
 
1668
            sample_object.sample_normal_method)
 
1669
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1670
            deprecated_in((0, 11, 0)),
 
1671
            sample_undeprecated_function, "a param value")
 
1672
        # calling a deprecated callable (function or method) with the wrong
 
1673
        # expected deprecation fails.
 
1674
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1675
            deprecated_in((0, 10, 0)),
 
1676
            sample_object.sample_deprecated_method, "a param value")
 
1677
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1678
            deprecated_in((0, 10, 0)),
 
1679
            sample_deprecated_function)
 
1680
        # calling a deprecated callable (function or method) with the right
 
1681
        # expected deprecation returns the functions result.
 
1682
        self.assertEqual("a param value",
 
1683
            self.applyDeprecated(deprecated_in((0, 11, 0)),
 
1684
            sample_object.sample_deprecated_method, "a param value"))
 
1685
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
 
1686
            sample_deprecated_function))
 
1687
        # calling a nested deprecation with the wrong deprecation version
 
1688
        # fails even if a deeper nested function was deprecated with the
 
1689
        # supplied version.
 
1690
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1691
            deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
 
1692
        # calling a nested deprecation with the right deprecation value
 
1693
        # returns the calls result.
 
1694
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
 
1695
            sample_object.sample_nested_deprecation))
 
1696
 
 
1697
    def test_callDeprecated(self):
 
1698
        def testfunc(be_deprecated, result=None):
 
1699
            if be_deprecated is True:
 
1700
                symbol_versioning.warn('i am deprecated', DeprecationWarning,
 
1701
                                       stacklevel=1)
 
1702
            return result
 
1703
        result = self.callDeprecated(['i am deprecated'], testfunc, True)
 
1704
        self.assertIs(None, result)
 
1705
        result = self.callDeprecated([], testfunc, False, 'result')
 
1706
        self.assertEqual('result', result)
 
1707
        self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
 
1708
        self.callDeprecated([], testfunc, be_deprecated=False)
 
1709
 
 
1710
 
 
1711
class TestWarningTests(tests.TestCase):
 
1712
    """Tests for calling methods that raise warnings."""
 
1713
 
 
1714
    def test_callCatchWarnings(self):
 
1715
        def meth(a, b):
 
1716
            warnings.warn("this is your last warning")
 
1717
            return a + b
 
1718
        wlist, result = self.callCatchWarnings(meth, 1, 2)
 
1719
        self.assertEquals(3, result)
 
1720
        # would like just to compare them, but UserWarning doesn't implement
 
1721
        # eq well
 
1722
        w0, = wlist
 
1723
        self.assertIsInstance(w0, UserWarning)
 
1724
        self.assertEquals("this is your last warning", str(w0))
 
1725
 
 
1726
 
 
1727
class TestConvenienceMakers(tests.TestCaseWithTransport):
 
1728
    """Test for the make_* convenience functions."""
 
1729
 
 
1730
    def test_make_branch_and_tree_with_format(self):
 
1731
        # we should be able to supply a format to make_branch_and_tree
 
1732
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
 
1733
        self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
 
1734
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
 
1735
                              bzrlib.bzrdir.BzrDirMetaFormat1)
 
1736
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
 
1737
                              bzrlib.bzrdir.BzrDirFormat6)
 
1738
 
 
1739
    def test_make_branch_and_memory_tree(self):
 
1740
        # we should be able to get a new branch and a mutable tree from
 
1741
        # TestCaseWithTransport
 
1742
        tree = self.make_branch_and_memory_tree('a')
 
1743
        self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree)
 
1744
 
 
1745
 
 
1746
class TestSFTPMakeBranchAndTree(test_sftp_transport.TestCaseWithSFTPServer):
 
1747
 
 
1748
    def test_make_tree_for_sftp_branch(self):
 
1749
        """Transports backed by local directories create local trees."""
 
1750
        # NB: This is arguably a bug in the definition of make_branch_and_tree.
 
1751
        tree = self.make_branch_and_tree('t1')
 
1752
        base = tree.bzrdir.root_transport.base
 
1753
        self.failIf(base.startswith('sftp'),
 
1754
                'base %r is on sftp but should be local' % base)
 
1755
        self.assertEquals(tree.bzrdir.root_transport,
 
1756
                tree.branch.bzrdir.root_transport)
 
1757
        self.assertEquals(tree.bzrdir.root_transport,
 
1758
                tree.branch.repository.bzrdir.root_transport)
 
1759
 
 
1760
 
 
1761
class SelfTestHelper:
 
1762
 
 
1763
    def run_selftest(self, **kwargs):
 
1764
        """Run selftest returning its output."""
 
1765
        output = StringIO()
 
1766
        old_transport = bzrlib.tests.default_transport
 
1767
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
1768
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
1769
        try:
 
1770
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
1771
        finally:
 
1772
            bzrlib.tests.default_transport = old_transport
 
1773
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
1774
        output.seek(0)
 
1775
        return output
 
1776
 
 
1777
 
 
1778
class TestSelftest(tests.TestCase, SelfTestHelper):
 
1779
    """Tests of bzrlib.tests.selftest."""
 
1780
 
 
1781
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
 
1782
        factory_called = []
 
1783
        def factory():
 
1784
            factory_called.append(True)
 
1785
            return TestUtil.TestSuite()
 
1786
        out = StringIO()
 
1787
        err = StringIO()
 
1788
        self.apply_redirected(out, err, None, bzrlib.tests.selftest,
 
1789
            test_suite_factory=factory)
 
1790
        self.assertEqual([True], factory_called)
 
1791
 
 
1792
    def factory(self):
 
1793
        """A test suite factory."""
 
1794
        class Test(tests.TestCase):
 
1795
            def a(self):
 
1796
                pass
 
1797
            def b(self):
 
1798
                pass
 
1799
            def c(self):
 
1800
                pass
 
1801
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
1802
 
 
1803
    def test_list_only(self):
 
1804
        output = self.run_selftest(test_suite_factory=self.factory,
 
1805
            list_only=True)
 
1806
        self.assertEqual(3, len(output.readlines()))
 
1807
 
 
1808
    def test_list_only_filtered(self):
 
1809
        output = self.run_selftest(test_suite_factory=self.factory,
 
1810
            list_only=True, pattern="Test.b")
 
1811
        self.assertEndsWith(output.getvalue(), "Test.b\n")
 
1812
        self.assertLength(1, output.readlines())
 
1813
 
 
1814
    def test_list_only_excludes(self):
 
1815
        output = self.run_selftest(test_suite_factory=self.factory,
 
1816
            list_only=True, exclude_pattern="Test.b")
 
1817
        self.assertNotContainsRe("Test.b", output.getvalue())
 
1818
        self.assertLength(2, output.readlines())
 
1819
 
 
1820
    def test_random(self):
 
1821
        # test randomising by listing a number of tests.
 
1822
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
1823
            list_only=True, random_seed="123")
 
1824
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
1825
            list_only=True, random_seed="234")
 
1826
        self.assertNotEqual(output_123, output_234)
 
1827
        # "Randominzing test order..\n\n
 
1828
        self.assertLength(5, output_123.readlines())
 
1829
        self.assertLength(5, output_234.readlines())
 
1830
 
 
1831
    def test_random_reuse_is_same_order(self):
 
1832
        # test randomising by listing a number of tests.
 
1833
        expected = self.run_selftest(test_suite_factory=self.factory,
 
1834
            list_only=True, random_seed="123")
 
1835
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
1836
            list_only=True, random_seed="123")
 
1837
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
1838
 
 
1839
    def test_runner_class(self):
 
1840
        self.requireFeature(SubUnitFeature)
 
1841
        from subunit import ProtocolTestCase
 
1842
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
1843
            test_suite_factory=self.factory)
 
1844
        test = ProtocolTestCase(stream)
 
1845
        result = unittest.TestResult()
 
1846
        test.run(result)
 
1847
        self.assertEqual(3, result.testsRun)
 
1848
 
 
1849
    def test_starting_with_single_argument(self):
 
1850
        output = self.run_selftest(test_suite_factory=self.factory,
 
1851
            starting_with=['bzrlib.tests.test_selftest.Test.a'],
 
1852
            list_only=True)
 
1853
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
 
1854
            output.getvalue())
 
1855
 
 
1856
    def test_starting_with_multiple_argument(self):
 
1857
        output = self.run_selftest(test_suite_factory=self.factory,
 
1858
            starting_with=['bzrlib.tests.test_selftest.Test.a',
 
1859
                'bzrlib.tests.test_selftest.Test.b'],
 
1860
            list_only=True)
 
1861
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
 
1862
            'bzrlib.tests.test_selftest.Test.b\n',
 
1863
            output.getvalue())
 
1864
 
 
1865
    def check_transport_set(self, transport_server):
 
1866
        captured_transport = []
 
1867
        def seen_transport(a_transport):
 
1868
            captured_transport.append(a_transport)
 
1869
        class Capture(tests.TestCase):
 
1870
            def a(self):
 
1871
                seen_transport(bzrlib.tests.default_transport)
 
1872
        def factory():
 
1873
            return TestUtil.TestSuite([Capture("a")])
 
1874
        self.run_selftest(transport=transport_server, test_suite_factory=factory)
 
1875
        self.assertEqual(transport_server, captured_transport[0])
 
1876
 
 
1877
    def test_transport_sftp(self):
 
1878
        try:
 
1879
            import bzrlib.transport.sftp
 
1880
        except errors.ParamikoNotPresent:
 
1881
            raise tests.TestSkipped("Paramiko not present")
 
1882
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
 
1883
 
 
1884
    def test_transport_memory(self):
 
1885
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
 
1886
 
 
1887
 
 
1888
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
1889
    # Does IO: reads test.list
 
1890
 
 
1891
    def test_load_list(self):
 
1892
        # Provide a list with one test - this test.
 
1893
        test_id_line = '%s\n' % self.id()
 
1894
        self.build_tree_contents([('test.list', test_id_line)])
 
1895
        # And generate a list of the tests in  the suite.
 
1896
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
1897
        self.assertEqual(test_id_line, stream.getvalue())
 
1898
 
 
1899
    def test_load_unknown(self):
 
1900
        # Provide a list with one test - this test.
 
1901
        # And generate a list of the tests in  the suite.
 
1902
        err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
1903
            load_list='missing file name', list_only=True)
 
1904
 
 
1905
 
 
1906
class TestRunBzr(tests.TestCase):
 
1907
 
 
1908
    out = ''
 
1909
    err = ''
 
1910
 
 
1911
    def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
 
1912
                         working_dir=None):
 
1913
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
1914
 
 
1915
        Attempts to run bzr from inside this class don't actually run it.
 
1916
 
 
1917
        We test how run_bzr actually invokes bzr in another location.
 
1918
        Here we only need to test that it is run_bzr passes the right
 
1919
        parameters to run_bzr.
 
1920
        """
 
1921
        self.argv = list(argv)
 
1922
        self.retcode = retcode
 
1923
        self.encoding = encoding
 
1924
        self.stdin = stdin
 
1925
        self.working_dir = working_dir
 
1926
        return self.out, self.err
 
1927
 
 
1928
    def test_run_bzr_error(self):
 
1929
        self.out = "It sure does!\n"
 
1930
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
1931
        self.assertEqual(['rocks'], self.argv)
 
1932
        self.assertEqual(34, self.retcode)
 
1933
        self.assertEqual(out, 'It sure does!\n')
 
1934
 
 
1935
    def test_run_bzr_error_regexes(self):
 
1936
        self.out = ''
 
1937
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
1938
        out, err = self.run_bzr_error(
 
1939
                ["bzr: ERROR: foobarbaz is not versioned"],
 
1940
                ['file-id', 'foobarbaz'])
 
1941
 
 
1942
    def test_encoding(self):
 
1943
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
1944
        self.run_bzr('foo bar')
 
1945
        self.assertEqual(None, self.encoding)
 
1946
        self.assertEqual(['foo', 'bar'], self.argv)
 
1947
 
 
1948
        self.run_bzr('foo bar', encoding='baz')
 
1949
        self.assertEqual('baz', self.encoding)
 
1950
        self.assertEqual(['foo', 'bar'], self.argv)
 
1951
 
 
1952
    def test_retcode(self):
 
1953
        """Test that run_bzr passes retcode to _run_bzr_core"""
 
1954
        # Default is retcode == 0
 
1955
        self.run_bzr('foo bar')
 
1956
        self.assertEqual(0, self.retcode)
 
1957
        self.assertEqual(['foo', 'bar'], self.argv)
 
1958
 
 
1959
        self.run_bzr('foo bar', retcode=1)
 
1960
        self.assertEqual(1, self.retcode)
 
1961
        self.assertEqual(['foo', 'bar'], self.argv)
 
1962
 
 
1963
        self.run_bzr('foo bar', retcode=None)
 
1964
        self.assertEqual(None, self.retcode)
 
1965
        self.assertEqual(['foo', 'bar'], self.argv)
 
1966
 
 
1967
        self.run_bzr(['foo', 'bar'], retcode=3)
 
1968
        self.assertEqual(3, self.retcode)
 
1969
        self.assertEqual(['foo', 'bar'], self.argv)
 
1970
 
 
1971
    def test_stdin(self):
 
1972
        # test that the stdin keyword to run_bzr is passed through to
 
1973
        # _run_bzr_core as-is. We do this by overriding
 
1974
        # _run_bzr_core in this class, and then calling run_bzr,
 
1975
        # which is a convenience function for _run_bzr_core, so
 
1976
        # should invoke it.
 
1977
        self.run_bzr('foo bar', stdin='gam')
 
1978
        self.assertEqual('gam', self.stdin)
 
1979
        self.assertEqual(['foo', 'bar'], self.argv)
 
1980
 
 
1981
        self.run_bzr('foo bar', stdin='zippy')
 
1982
        self.assertEqual('zippy', self.stdin)
 
1983
        self.assertEqual(['foo', 'bar'], self.argv)
 
1984
 
 
1985
    def test_working_dir(self):
 
1986
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
1987
        self.run_bzr('foo bar')
 
1988
        self.assertEqual(None, self.working_dir)
 
1989
        self.assertEqual(['foo', 'bar'], self.argv)
 
1990
 
 
1991
        self.run_bzr('foo bar', working_dir='baz')
 
1992
        self.assertEqual('baz', self.working_dir)
 
1993
        self.assertEqual(['foo', 'bar'], self.argv)
 
1994
 
 
1995
    def test_reject_extra_keyword_arguments(self):
 
1996
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
1997
                          error_regex=['error message'])
 
1998
 
 
1999
 
 
2000
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2001
    # Does IO when testing the working_dir parameter.
 
2002
 
 
2003
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2004
                         a_callable=None, *args, **kwargs):
 
2005
        self.stdin = stdin
 
2006
        self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
2007
        self.factory = bzrlib.ui.ui_factory
 
2008
        self.working_dir = osutils.getcwd()
 
2009
        stdout.write('foo\n')
 
2010
        stderr.write('bar\n')
 
2011
        return 0
 
2012
 
 
2013
    def test_stdin(self):
 
2014
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2015
        # apply_redirected as a StringIO. We do this by overriding
 
2016
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2017
        # which calls apply_redirected.
 
2018
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2019
        self.assertEqual('gam', self.stdin.read())
 
2020
        self.assertTrue(self.stdin is self.factory_stdin)
 
2021
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2022
        self.assertEqual('zippy', self.stdin.read())
 
2023
        self.assertTrue(self.stdin is self.factory_stdin)
 
2024
 
 
2025
    def test_ui_factory(self):
 
2026
        # each invocation of self.run_bzr should get its
 
2027
        # own UI factory, which is an instance of TestUIFactory,
 
2028
        # with stdin, stdout and stderr attached to the stdin,
 
2029
        # stdout and stderr of the invoked run_bzr
 
2030
        current_factory = bzrlib.ui.ui_factory
 
2031
        self.run_bzr(['foo'])
 
2032
        self.failIf(current_factory is self.factory)
 
2033
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2034
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2035
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2036
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2037
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2038
 
 
2039
    def test_working_dir(self):
 
2040
        self.build_tree(['one/', 'two/'])
 
2041
        cwd = osutils.getcwd()
 
2042
 
 
2043
        # Default is to work in the current directory
 
2044
        self.run_bzr(['foo', 'bar'])
 
2045
        self.assertEqual(cwd, self.working_dir)
 
2046
 
 
2047
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2048
        self.assertEqual(cwd, self.working_dir)
 
2049
 
 
2050
        # The function should be run in the alternative directory
 
2051
        # but afterwards the current working dir shouldn't be changed
 
2052
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2053
        self.assertNotEqual(cwd, self.working_dir)
 
2054
        self.assertEndsWith(self.working_dir, 'one')
 
2055
        self.assertEqual(cwd, osutils.getcwd())
 
2056
 
 
2057
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2058
        self.assertNotEqual(cwd, self.working_dir)
 
2059
        self.assertEndsWith(self.working_dir, 'two')
 
2060
        self.assertEqual(cwd, osutils.getcwd())
 
2061
 
 
2062
 
 
2063
class StubProcess(object):
 
2064
    """A stub process for testing run_bzr_subprocess."""
 
2065
    
 
2066
    def __init__(self, out="", err="", retcode=0):
 
2067
        self.out = out
 
2068
        self.err = err
 
2069
        self.returncode = retcode
 
2070
 
 
2071
    def communicate(self):
 
2072
        return self.out, self.err
 
2073
 
 
2074
 
 
2075
class TestRunBzrSubprocess(tests.TestCaseWithTransport):
 
2076
 
 
2077
    def setUp(self):
 
2078
        tests.TestCaseWithTransport.setUp(self)
 
2079
        self.subprocess_calls = []
 
2080
 
 
2081
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2082
                             skip_if_plan_to_signal=False,
 
2083
                             working_dir=None,
 
2084
                             allow_plugins=False):
 
2085
        """capture what run_bzr_subprocess tries to do."""
 
2086
        self.subprocess_calls.append({'process_args':process_args,
 
2087
            'env_changes':env_changes,
 
2088
            'skip_if_plan_to_signal':skip_if_plan_to_signal,
 
2089
            'working_dir':working_dir, 'allow_plugins':allow_plugins})
 
2090
        return self.next_subprocess
 
2091
 
 
2092
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2093
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2094
 
 
2095
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2096
        that will return static results. This assertion method populates those
 
2097
        results and also checks the arguments run_bzr_subprocess generates.
 
2098
        """
 
2099
        self.next_subprocess = process
 
2100
        try:
 
2101
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2102
        except:
 
2103
            self.next_subprocess = None
 
2104
            for key, expected in expected_args.iteritems():
 
2105
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2106
            raise
 
2107
        else:
 
2108
            self.next_subprocess = None
 
2109
            for key, expected in expected_args.iteritems():
 
2110
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2111
            return result
 
2112
 
 
2113
    def test_run_bzr_subprocess(self):
 
2114
        """The run_bzr_helper_external command behaves nicely."""
 
2115
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2116
            StubProcess(), '--version')
 
2117
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2118
            StubProcess(), ['--version'])
 
2119
        # retcode=None disables retcode checking
 
2120
        result = self.assertRunBzrSubprocess({},
 
2121
            StubProcess(retcode=3), '--version', retcode=None)
 
2122
        result = self.assertRunBzrSubprocess({},
 
2123
            StubProcess(out="is free software"), '--version')
 
2124
        self.assertContainsRe(result[0], 'is free software')
 
2125
        # Running a subcommand that is missing errors
 
2126
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2127
            {'process_args':['--versionn']}, StubProcess(retcode=3),
 
2128
            '--versionn')
 
2129
        # Unless it is told to expect the error from the subprocess
 
2130
        result = self.assertRunBzrSubprocess({},
 
2131
            StubProcess(retcode=3), '--versionn', retcode=3)
 
2132
        # Or to ignore retcode checking
 
2133
        result = self.assertRunBzrSubprocess({},
 
2134
            StubProcess(err="unknown command", retcode=3), '--versionn',
 
2135
            retcode=None)
 
2136
        self.assertContainsRe(result[1], 'unknown command')
 
2137
 
 
2138
    def test_env_change_passes_through(self):
 
2139
        self.assertRunBzrSubprocess(
 
2140
            {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
 
2141
            StubProcess(), '',
 
2142
            env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
 
2143
 
 
2144
    def test_no_working_dir_passed_as_None(self):
 
2145
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2146
 
 
2147
    def test_no_working_dir_passed_through(self):
 
2148
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2149
            working_dir='dir')
 
2150
 
 
2151
    def test_run_bzr_subprocess_no_plugins(self):
 
2152
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2153
            StubProcess(), '')
 
2154
 
 
2155
    def test_allow_plugins(self):
 
2156
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2157
            StubProcess(), '', allow_plugins=True)
 
2158
 
 
2159
 
 
2160
class _DontSpawnProcess(Exception):
 
2161
    """A simple exception which just allows us to skip unnecessary steps"""
 
2162
 
 
2163
 
 
2164
class TestStartBzrSubProcess(tests.TestCase):
 
2165
 
 
2166
    def check_popen_state(self):
 
2167
        """Replace to make assertions when popen is called."""
 
2168
 
 
2169
    def _popen(self, *args, **kwargs):
 
2170
        """Record the command that is run, so that we can ensure it is correct"""
 
2171
        self.check_popen_state()
 
2172
        self._popen_args = args
 
2173
        self._popen_kwargs = kwargs
 
2174
        raise _DontSpawnProcess()
 
2175
 
 
2176
    def test_run_bzr_subprocess_no_plugins(self):
 
2177
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2178
        command = self._popen_args[0]
 
2179
        self.assertEqual(sys.executable, command[0])
 
2180
        self.assertEqual(self.get_bzr_path(), command[1])
 
2181
        self.assertEqual(['--no-plugins'], command[2:])
 
2182
 
 
2183
    def test_allow_plugins(self):
 
2184
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2185
            allow_plugins=True)
 
2186
        command = self._popen_args[0]
 
2187
        self.assertEqual([], command[2:])
 
2188
 
 
2189
    def test_set_env(self):
 
2190
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2191
        # set in the child
 
2192
        def check_environment():
 
2193
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2194
        self.check_popen_state = check_environment
 
2195
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2196
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2197
        # not set in theparent
 
2198
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2199
 
 
2200
    def test_run_bzr_subprocess_env_del(self):
 
2201
        """run_bzr_subprocess can remove environment variables too."""
 
2202
        self.failIf('EXISTANT_ENV_VAR' in os.environ)
 
2203
        def check_environment():
 
2204
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2205
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2206
        self.check_popen_state = check_environment
 
2207
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2208
            env_changes={'EXISTANT_ENV_VAR':None})
 
2209
        # Still set in parent
 
2210
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2211
        del os.environ['EXISTANT_ENV_VAR']
 
2212
 
 
2213
    def test_env_del_missing(self):
 
2214
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
2215
        def check_environment():
 
2216
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2217
        self.check_popen_state = check_environment
 
2218
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2219
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2220
 
 
2221
    def test_working_dir(self):
 
2222
        """Test that we can specify the working dir for the child"""
 
2223
        orig_getcwd = osutils.getcwd
 
2224
        orig_chdir = os.chdir
 
2225
        chdirs = []
 
2226
        def chdir(path):
 
2227
            chdirs.append(path)
 
2228
        os.chdir = chdir
 
2229
        try:
 
2230
            def getcwd():
 
2231
                return 'current'
 
2232
            osutils.getcwd = getcwd
 
2233
            try:
 
2234
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2235
                    working_dir='foo')
 
2236
            finally:
 
2237
                osutils.getcwd = orig_getcwd
 
2238
        finally:
 
2239
            os.chdir = orig_chdir
 
2240
        self.assertEqual(['foo', 'current'], chdirs)
 
2241
 
 
2242
 
 
2243
class TestBzrSubprocess(tests.TestCaseWithTransport):
 
2244
 
 
2245
    def test_start_and_stop_bzr_subprocess(self):
 
2246
        """We can start and perform other test actions while that process is
 
2247
        still alive.
 
2248
        """
 
2249
        process = self.start_bzr_subprocess(['--version'])
 
2250
        result = self.finish_bzr_subprocess(process)
 
2251
        self.assertContainsRe(result[0], 'is free software')
 
2252
        self.assertEqual('', result[1])
 
2253
 
 
2254
    def test_start_and_stop_bzr_subprocess_with_error(self):
 
2255
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2256
        """
 
2257
        process = self.start_bzr_subprocess(['--versionn'])
 
2258
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2259
        self.assertEqual('', result[0])
 
2260
        self.assertContainsRe(result[1], 'unknown command')
 
2261
 
 
2262
    def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
 
2263
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2264
        process = self.start_bzr_subprocess(['--versionn'])
 
2265
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2266
        self.assertEqual('', result[0])
 
2267
        self.assertContainsRe(result[1], 'unknown command')
 
2268
 
 
2269
    def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
 
2270
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2271
        not the expected one.
 
2272
        """
 
2273
        process = self.start_bzr_subprocess(['--versionn'])
 
2274
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2275
                          process)
 
2276
 
 
2277
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2278
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2279
        not the expected one.
 
2280
        """
 
2281
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2282
                                            skip_if_plan_to_signal=True)
 
2283
        self.assertEqual('running\n', process.stdout.readline())
 
2284
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2285
                                            retcode=3)
 
2286
        self.assertEqual('', result[0])
 
2287
        self.assertEqual('bzr: interrupted\n', result[1])
 
2288
 
 
2289
    def test_start_and_stop_working_dir(self):
 
2290
        cwd = osutils.getcwd()
 
2291
        self.make_branch_and_tree('one')
 
2292
        process = self.start_bzr_subprocess(['root'], working_dir='one')
 
2293
        result = self.finish_bzr_subprocess(process, universal_newlines=True)
 
2294
        self.assertEndsWith(result[0], 'one\n')
 
2295
        self.assertEqual('', result[1])
 
2296
 
 
2297
 
 
2298
class TestKnownFailure(tests.TestCase):
 
2299
 
 
2300
    def test_known_failure(self):
 
2301
        """Check that KnownFailure is defined appropriately."""
 
2302
        # a KnownFailure is an assertion error for compatability with unaware
 
2303
        # runners.
 
2304
        self.assertIsInstance(tests.KnownFailure(""), AssertionError)
 
2305
 
 
2306
    def test_expect_failure(self):
 
2307
        try:
 
2308
            self.expectFailure("Doomed to failure", self.assertTrue, False)
 
2309
        except tests.KnownFailure, e:
 
2310
            self.assertEqual('Doomed to failure', e.args[0])
 
2311
        try:
 
2312
            self.expectFailure("Doomed to failure", self.assertTrue, True)
 
2313
        except AssertionError, e:
 
2314
            self.assertEqual('Unexpected success.  Should have failed:'
 
2315
                             ' Doomed to failure', e.args[0])
 
2316
        else:
 
2317
            self.fail('Assertion not raised')
 
2318
 
 
2319
 
 
2320
class TestFeature(tests.TestCase):
 
2321
 
 
2322
    def test_caching(self):
 
2323
        """Feature._probe is called by the feature at most once."""
 
2324
        class InstrumentedFeature(tests.Feature):
 
2325
            def __init__(self):
 
2326
                super(InstrumentedFeature, self).__init__()
 
2327
                self.calls = []
 
2328
            def _probe(self):
 
2329
                self.calls.append('_probe')
 
2330
                return False
 
2331
        feature = InstrumentedFeature()
 
2332
        feature.available()
 
2333
        self.assertEqual(['_probe'], feature.calls)
 
2334
        feature.available()
 
2335
        self.assertEqual(['_probe'], feature.calls)
 
2336
 
 
2337
    def test_named_str(self):
 
2338
        """Feature.__str__ should thunk to feature_name()."""
 
2339
        class NamedFeature(tests.Feature):
 
2340
            def feature_name(self):
 
2341
                return 'symlinks'
 
2342
        feature = NamedFeature()
 
2343
        self.assertEqual('symlinks', str(feature))
 
2344
 
 
2345
    def test_default_str(self):
 
2346
        """Feature.__str__ should default to __class__.__name__."""
 
2347
        class NamedFeature(tests.Feature):
 
2348
            pass
 
2349
        feature = NamedFeature()
 
2350
        self.assertEqual('NamedFeature', str(feature))
 
2351
 
 
2352
 
 
2353
class TestUnavailableFeature(tests.TestCase):
 
2354
 
 
2355
    def test_access_feature(self):
 
2356
        feature = tests.Feature()
 
2357
        exception = tests.UnavailableFeature(feature)
 
2358
        self.assertIs(feature, exception.args[0])
 
2359
 
 
2360
 
 
2361
class TestSelftestFiltering(tests.TestCase):
 
2362
 
 
2363
    def setUp(self):
 
2364
        tests.TestCase.setUp(self)
 
2365
        self.suite = TestUtil.TestSuite()
 
2366
        self.loader = TestUtil.TestLoader()
 
2367
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2368
            sys.modules['bzrlib.tests.test_selftest']))
 
2369
        self.all_names = _test_ids(self.suite)
 
2370
 
 
2371
    def test_condition_id_re(self):
 
2372
        test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2373
            'test_condition_id_re')
 
2374
        filtered_suite = tests.filter_suite_by_condition(
 
2375
            self.suite, tests.condition_id_re('test_condition_id_re'))
 
2376
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2377
 
 
2378
    def test_condition_id_in_list(self):
 
2379
        test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2380
                      'test_condition_id_in_list']
 
2381
        id_list = tests.TestIdList(test_names)
 
2382
        filtered_suite = tests.filter_suite_by_condition(
 
2383
            self.suite, tests.condition_id_in_list(id_list))
 
2384
        my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
 
2385
        re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
 
2386
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2387
 
 
2388
    def test_condition_id_startswith(self):
 
2389
        klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2390
        start1 = klass + 'test_condition_id_starts'
 
2391
        start2 = klass + 'test_condition_id_in'
 
2392
        test_names = [ klass + 'test_condition_id_in_list',
 
2393
                      klass + 'test_condition_id_startswith',
 
2394
                     ]
 
2395
        filtered_suite = tests.filter_suite_by_condition(
 
2396
            self.suite, tests.condition_id_startswith([start1, start2]))
 
2397
        self.assertEqual(test_names, _test_ids(filtered_suite))
 
2398
 
 
2399
    def test_condition_isinstance(self):
 
2400
        filtered_suite = tests.filter_suite_by_condition(
 
2401
            self.suite, tests.condition_isinstance(self.__class__))
 
2402
        class_pattern = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2403
        re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
 
2404
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2405
 
 
2406
    def test_exclude_tests_by_condition(self):
 
2407
        excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2408
            'test_exclude_tests_by_condition')
 
2409
        filtered_suite = tests.exclude_tests_by_condition(self.suite,
 
2410
            lambda x:x.id() == excluded_name)
 
2411
        self.assertEqual(len(self.all_names) - 1,
 
2412
            filtered_suite.countTestCases())
 
2413
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2414
        remaining_names = list(self.all_names)
 
2415
        remaining_names.remove(excluded_name)
 
2416
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2417
 
 
2418
    def test_exclude_tests_by_re(self):
 
2419
        self.all_names = _test_ids(self.suite)
 
2420
        filtered_suite = tests.exclude_tests_by_re(self.suite,
 
2421
                                                   'exclude_tests_by_re')
 
2422
        excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2423
            'test_exclude_tests_by_re')
 
2424
        self.assertEqual(len(self.all_names) - 1,
 
2425
            filtered_suite.countTestCases())
 
2426
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2427
        remaining_names = list(self.all_names)
 
2428
        remaining_names.remove(excluded_name)
 
2429
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2430
 
 
2431
    def test_filter_suite_by_condition(self):
 
2432
        test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2433
            'test_filter_suite_by_condition')
 
2434
        filtered_suite = tests.filter_suite_by_condition(self.suite,
 
2435
            lambda x:x.id() == test_name)
 
2436
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2437
 
 
2438
    def test_filter_suite_by_re(self):
 
2439
        filtered_suite = tests.filter_suite_by_re(self.suite,
 
2440
                                                  'test_filter_suite_by_r')
 
2441
        filtered_names = _test_ids(filtered_suite)
 
2442
        self.assertEqual(filtered_names, ['bzrlib.tests.test_selftest.'
 
2443
            'TestSelftestFiltering.test_filter_suite_by_re'])
 
2444
 
 
2445
    def test_filter_suite_by_id_list(self):
 
2446
        test_list = ['bzrlib.tests.test_selftest.'
 
2447
                     'TestSelftestFiltering.test_filter_suite_by_id_list']
 
2448
        filtered_suite = tests.filter_suite_by_id_list(
 
2449
            self.suite, tests.TestIdList(test_list))
 
2450
        filtered_names = _test_ids(filtered_suite)
 
2451
        self.assertEqual(
 
2452
            filtered_names,
 
2453
            ['bzrlib.tests.test_selftest.'
 
2454
             'TestSelftestFiltering.test_filter_suite_by_id_list'])
 
2455
 
 
2456
    def test_filter_suite_by_id_startswith(self):
 
2457
        # By design this test may fail if another test is added whose name also
 
2458
        # begins with one of the start value used.
 
2459
        klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2460
        start1 = klass + 'test_filter_suite_by_id_starts'
 
2461
        start2 = klass + 'test_filter_suite_by_id_li'
 
2462
        test_list = [klass + 'test_filter_suite_by_id_list',
 
2463
                     klass + 'test_filter_suite_by_id_startswith',
 
2464
                     ]
 
2465
        filtered_suite = tests.filter_suite_by_id_startswith(
 
2466
            self.suite, [start1, start2])
 
2467
        self.assertEqual(
 
2468
            test_list,
 
2469
            _test_ids(filtered_suite),
 
2470
            )
 
2471
 
 
2472
    def test_preserve_input(self):
 
2473
        # NB: Surely this is something in the stdlib to do this?
 
2474
        self.assertTrue(self.suite is tests.preserve_input(self.suite))
 
2475
        self.assertTrue("@#$" is tests.preserve_input("@#$"))
 
2476
 
 
2477
    def test_randomize_suite(self):
 
2478
        randomized_suite = tests.randomize_suite(self.suite)
 
2479
        # randomizing should not add or remove test names.
 
2480
        self.assertEqual(set(_test_ids(self.suite)),
 
2481
                         set(_test_ids(randomized_suite)))
 
2482
        # Technically, this *can* fail, because random.shuffle(list) can be
 
2483
        # equal to list. Trying multiple times just pushes the frequency back.
 
2484
        # As its len(self.all_names)!:1, the failure frequency should be low
 
2485
        # enough to ignore. RBC 20071021.
 
2486
        # It should change the order.
 
2487
        self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
 
2488
        # But not the length. (Possibly redundant with the set test, but not
 
2489
        # necessarily.)
 
2490
        self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
 
2491
 
 
2492
    def test_split_suit_by_condition(self):
 
2493
        self.all_names = _test_ids(self.suite)
 
2494
        condition = tests.condition_id_re('test_filter_suite_by_r')
 
2495
        split_suite = tests.split_suite_by_condition(self.suite, condition)
 
2496
        filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2497
            'test_filter_suite_by_re')
 
2498
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2499
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2500
        remaining_names = list(self.all_names)
 
2501
        remaining_names.remove(filtered_name)
 
2502
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2503
 
 
2504
    def test_split_suit_by_re(self):
 
2505
        self.all_names = _test_ids(self.suite)
 
2506
        split_suite = tests.split_suite_by_re(self.suite,
 
2507
                                              'test_filter_suite_by_r')
 
2508
        filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2509
            'test_filter_suite_by_re')
 
2510
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2511
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2512
        remaining_names = list(self.all_names)
 
2513
        remaining_names.remove(filtered_name)
 
2514
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2515
 
 
2516
 
 
2517
class TestCheckInventoryShape(tests.TestCaseWithTransport):
 
2518
 
 
2519
    def test_check_inventory_shape(self):
 
2520
        files = ['a', 'b/', 'b/c']
 
2521
        tree = self.make_branch_and_tree('.')
 
2522
        self.build_tree(files)
 
2523
        tree.add(files)
 
2524
        tree.lock_read()
 
2525
        try:
 
2526
            self.check_inventory_shape(tree.inventory, files)
 
2527
        finally:
 
2528
            tree.unlock()
 
2529
 
 
2530
 
 
2531
class TestBlackboxSupport(tests.TestCase):
 
2532
    """Tests for testsuite blackbox features."""
 
2533
 
 
2534
    def test_run_bzr_failure_not_caught(self):
 
2535
        # When we run bzr in blackbox mode, we want any unexpected errors to
 
2536
        # propagate up to the test suite so that it can show the error in the
 
2537
        # usual way, and we won't get a double traceback.
 
2538
        e = self.assertRaises(
 
2539
            AssertionError,
 
2540
            self.run_bzr, ['assert-fail'])
 
2541
        # make sure we got the real thing, not an error from somewhere else in
 
2542
        # the test framework
 
2543
        self.assertEquals('always fails', str(e))
 
2544
        # check that there's no traceback in the test log
 
2545
        self.assertNotContainsRe(self._get_log(keep_log_file=True),
 
2546
            r'Traceback')
 
2547
 
 
2548
    def test_run_bzr_user_error_caught(self):
 
2549
        # Running bzr in blackbox mode, normal/expected/user errors should be
 
2550
        # caught in the regular way and turned into an error message plus exit
 
2551
        # code.
 
2552
        out, err = self.run_bzr(["log", "/nonexistantpath"], retcode=3)
 
2553
        self.assertEqual(out, '')
 
2554
        self.assertContainsRe(err,
 
2555
            'bzr: ERROR: Not a branch: ".*nonexistantpath/".\n')
 
2556
 
 
2557
 
 
2558
class TestTestLoader(tests.TestCase):
 
2559
    """Tests for the test loader."""
 
2560
 
 
2561
    def _get_loader_and_module(self):
 
2562
        """Gets a TestLoader and a module with one test in it."""
 
2563
        loader = TestUtil.TestLoader()
 
2564
        module = {}
 
2565
        class Stub(tests.TestCase):
 
2566
            def test_foo(self):
 
2567
                pass
 
2568
        class MyModule(object):
 
2569
            pass
 
2570
        MyModule.a_class = Stub
 
2571
        module = MyModule()
 
2572
        return loader, module
 
2573
 
 
2574
    def test_module_no_load_tests_attribute_loads_classes(self):
 
2575
        loader, module = self._get_loader_and_module()
 
2576
        self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases())
 
2577
 
 
2578
    def test_module_load_tests_attribute_gets_called(self):
 
2579
        loader, module = self._get_loader_and_module()
 
2580
        # 'self' is here because we're faking the module with a class. Regular
 
2581
        # load_tests do not need that :)
 
2582
        def load_tests(self, standard_tests, module, loader):
 
2583
            result = loader.suiteClass()
 
2584
            for test in tests.iter_suite_tests(standard_tests):
 
2585
                result.addTests([test, test])
 
2586
            return result
 
2587
        # add a load_tests() method which multiplies the tests from the module.
 
2588
        module.__class__.load_tests = load_tests
 
2589
        self.assertEqual(2, loader.loadTestsFromModule(module).countTestCases())
 
2590
 
 
2591
    def test_load_tests_from_module_name_smoke_test(self):
 
2592
        loader = TestUtil.TestLoader()
 
2593
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2594
        self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
 
2595
                          _test_ids(suite))
 
2596
 
 
2597
    def test_load_tests_from_module_name_with_bogus_module_name(self):
 
2598
        loader = TestUtil.TestLoader()
 
2599
        self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
 
2600
 
 
2601
 
 
2602
class TestTestIdList(tests.TestCase):
 
2603
 
 
2604
    def _create_id_list(self, test_list):
 
2605
        return tests.TestIdList(test_list)
 
2606
 
 
2607
    def _create_suite(self, test_id_list):
 
2608
 
 
2609
        class Stub(tests.TestCase):
 
2610
            def test_foo(self):
 
2611
                pass
 
2612
 
 
2613
        def _create_test_id(id):
 
2614
            return lambda: id
 
2615
 
 
2616
        suite = TestUtil.TestSuite()
 
2617
        for id in test_id_list:
 
2618
            t  = Stub('test_foo')
 
2619
            t.id = _create_test_id(id)
 
2620
            suite.addTest(t)
 
2621
        return suite
 
2622
 
 
2623
    def _test_ids(self, test_suite):
 
2624
        """Get the ids for the tests in a test suite."""
 
2625
        return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
2626
 
 
2627
    def test_empty_list(self):
 
2628
        id_list = self._create_id_list([])
 
2629
        self.assertEquals({}, id_list.tests)
 
2630
        self.assertEquals({}, id_list.modules)
 
2631
 
 
2632
    def test_valid_list(self):
 
2633
        id_list = self._create_id_list(
 
2634
            ['mod1.cl1.meth1', 'mod1.cl1.meth2',
 
2635
             'mod1.func1', 'mod1.cl2.meth2',
 
2636
             'mod1.submod1',
 
2637
             'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
 
2638
             ])
 
2639
        self.assertTrue(id_list.refers_to('mod1'))
 
2640
        self.assertTrue(id_list.refers_to('mod1.submod1'))
 
2641
        self.assertTrue(id_list.refers_to('mod1.submod2'))
 
2642
        self.assertTrue(id_list.includes('mod1.cl1.meth1'))
 
2643
        self.assertTrue(id_list.includes('mod1.submod1'))
 
2644
        self.assertTrue(id_list.includes('mod1.func1'))
 
2645
 
 
2646
    def test_bad_chars_in_params(self):
 
2647
        id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
 
2648
        self.assertTrue(id_list.refers_to('mod1'))
 
2649
        self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
 
2650
 
 
2651
    def test_module_used(self):
 
2652
        id_list = self._create_id_list(['mod.class.meth'])
 
2653
        self.assertTrue(id_list.refers_to('mod'))
 
2654
        self.assertTrue(id_list.refers_to('mod.class'))
 
2655
        self.assertTrue(id_list.refers_to('mod.class.meth'))
 
2656
 
 
2657
    def test_test_suite_matches_id_list_with_unknown(self):
 
2658
        loader = TestUtil.TestLoader()
 
2659
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2660
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',
 
2661
                     'bogus']
 
2662
        not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
 
2663
        self.assertEquals(['bogus'], not_found)
 
2664
        self.assertEquals([], duplicates)
 
2665
 
 
2666
    def test_suite_matches_id_list_with_duplicates(self):
 
2667
        loader = TestUtil.TestLoader()
 
2668
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2669
        dupes = loader.suiteClass()
 
2670
        for test in tests.iter_suite_tests(suite):
 
2671
            dupes.addTest(test)
 
2672
            dupes.addTest(test) # Add it again
 
2673
 
 
2674
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',]
 
2675
        not_found, duplicates = tests.suite_matches_id_list(
 
2676
            dupes, test_list)
 
2677
        self.assertEquals([], not_found)
 
2678
        self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
 
2679
                          duplicates)
 
2680
 
 
2681
 
 
2682
class TestTestSuite(tests.TestCase):
 
2683
 
 
2684
    def test_test_suite(self):
 
2685
        # This test is slow - it loads the entire test suite to operate, so we
 
2686
        # do a single test with one test in each category
 
2687
        test_list = [
 
2688
            # testmod_names
 
2689
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
 
2690
            ('bzrlib.tests.per_transport.TransportTests'
 
2691
             '.test_abspath(LocalURLServer)'),
 
2692
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
 
2693
            # modules_to_doctest
 
2694
            'bzrlib.timestamp.format_highres_date',
 
2695
            # plugins can't be tested that way since selftest may be run with
 
2696
            # --no-plugins
 
2697
            ]
 
2698
        suite = tests.test_suite(test_list)
 
2699
        self.assertEquals(test_list, _test_ids(suite))
 
2700
 
 
2701
    def test_test_suite_list_and_start(self):
 
2702
        # We cannot test this at the same time as the main load, because we want
 
2703
        # to know that starting_with == None works. So a second full load is
 
2704
        # incurred.
 
2705
        test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
 
2706
        suite = tests.test_suite(test_list,
 
2707
                                 ['bzrlib.tests.test_selftest.TestTestSuite'])
 
2708
        # test_test_suite_list_and_start is not included 
 
2709
        self.assertEquals(test_list, _test_ids(suite))
 
2710
 
 
2711
 
 
2712
class TestLoadTestIdList(tests.TestCaseInTempDir):
 
2713
 
 
2714
    def _create_test_list_file(self, file_name, content):
 
2715
        fl = open(file_name, 'wt')
 
2716
        fl.write(content)
 
2717
        fl.close()
 
2718
 
 
2719
    def test_load_unknown(self):
 
2720
        self.assertRaises(errors.NoSuchFile,
 
2721
                          tests.load_test_id_list, 'i_do_not_exist')
 
2722
 
 
2723
    def test_load_test_list(self):
 
2724
        test_list_fname = 'test.list'
 
2725
        self._create_test_list_file(test_list_fname,
 
2726
                                    'mod1.cl1.meth1\nmod2.cl2.meth2\n')
 
2727
        tlist = tests.load_test_id_list(test_list_fname)
 
2728
        self.assertEquals(2, len(tlist))
 
2729
        self.assertEquals('mod1.cl1.meth1', tlist[0])
 
2730
        self.assertEquals('mod2.cl2.meth2', tlist[1])
 
2731
 
 
2732
    def test_load_dirty_file(self):
 
2733
        test_list_fname = 'test.list'
 
2734
        self._create_test_list_file(test_list_fname,
 
2735
                                    '  mod1.cl1.meth1\n\nmod2.cl2.meth2  \n'
 
2736
                                    'bar baz\n')
 
2737
        tlist = tests.load_test_id_list(test_list_fname)
 
2738
        self.assertEquals(4, len(tlist))
 
2739
        self.assertEquals('mod1.cl1.meth1', tlist[0])
 
2740
        self.assertEquals('', tlist[1])
 
2741
        self.assertEquals('mod2.cl2.meth2', tlist[2])
 
2742
        self.assertEquals('bar baz', tlist[3])
 
2743
 
 
2744
 
 
2745
class TestFilteredByModuleTestLoader(tests.TestCase):
 
2746
 
 
2747
    def _create_loader(self, test_list):
 
2748
        id_filter = tests.TestIdList(test_list)
 
2749
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
2750
        return loader
 
2751
 
 
2752
    def test_load_tests(self):
 
2753
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
 
2754
        loader = self._create_loader(test_list)
 
2755
 
 
2756
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2757
        self.assertEquals(test_list, _test_ids(suite))
 
2758
 
 
2759
    def test_exclude_tests(self):
 
2760
        test_list = ['bogus']
 
2761
        loader = self._create_loader(test_list)
 
2762
 
 
2763
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2764
        self.assertEquals([], _test_ids(suite))
 
2765
 
 
2766
 
 
2767
class TestFilteredByNameStartTestLoader(tests.TestCase):
 
2768
 
 
2769
    def _create_loader(self, name_start):
 
2770
        def needs_module(name):
 
2771
            return name.startswith(name_start) or name_start.startswith(name)
 
2772
        loader = TestUtil.FilteredByModuleTestLoader(needs_module)
 
2773
        return loader
 
2774
 
 
2775
    def test_load_tests(self):
 
2776
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
 
2777
        loader = self._create_loader('bzrlib.tests.test_samp')
 
2778
 
 
2779
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2780
        self.assertEquals(test_list, _test_ids(suite))
 
2781
 
 
2782
    def test_load_tests_inside_module(self):
 
2783
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
 
2784
        loader = self._create_loader('bzrlib.tests.test_sampler.Demo')
 
2785
 
 
2786
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2787
        self.assertEquals(test_list, _test_ids(suite))
 
2788
 
 
2789
    def test_exclude_tests(self):
 
2790
        test_list = ['bogus']
 
2791
        loader = self._create_loader('bogus')
 
2792
 
 
2793
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2794
        self.assertEquals([], _test_ids(suite))
 
2795
 
 
2796
 
 
2797
class TestTestPrefixRegistry(tests.TestCase):
 
2798
 
 
2799
    def _get_registry(self):
 
2800
        tp_registry = tests.TestPrefixAliasRegistry()
 
2801
        return tp_registry
 
2802
 
 
2803
    def test_register_new_prefix(self):
 
2804
        tpr = self._get_registry()
 
2805
        tpr.register('foo', 'fff.ooo.ooo')
 
2806
        self.assertEquals('fff.ooo.ooo', tpr.get('foo'))
 
2807
 
 
2808
    def test_register_existing_prefix(self):
 
2809
        tpr = self._get_registry()
 
2810
        tpr.register('bar', 'bbb.aaa.rrr')
 
2811
        tpr.register('bar', 'bBB.aAA.rRR')
 
2812
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
 
2813
        self.assertContainsRe(self._get_log(keep_log_file=True),
 
2814
                              r'.*bar.*bbb.aaa.rrr.*bBB.aAA.rRR')
 
2815
 
 
2816
    def test_get_unknown_prefix(self):
 
2817
        tpr = self._get_registry()
 
2818
        self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
 
2819
 
 
2820
    def test_resolve_prefix(self):
 
2821
        tpr = self._get_registry()
 
2822
        tpr.register('bar', 'bb.aa.rr')
 
2823
        self.assertEquals('bb.aa.rr', tpr.resolve_alias('bar'))
 
2824
 
 
2825
    def test_resolve_unknown_alias(self):
 
2826
        tpr = self._get_registry()
 
2827
        self.assertRaises(errors.BzrCommandError,
 
2828
                          tpr.resolve_alias, 'I am not a prefix')
 
2829
 
 
2830
    def test_predefined_prefixes(self):
 
2831
        tpr = tests.test_prefix_alias_registry
 
2832
        self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
 
2833
        self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
 
2834
        self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
 
2835
        self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
 
2836
        self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
 
2837
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
 
2838
 
 
2839
 
 
2840
class TestRunSuite(tests.TestCase):
 
2841
 
 
2842
    def test_runner_class(self):
 
2843
        """run_suite accepts and uses a runner_class keyword argument."""
 
2844
        class Stub(tests.TestCase):
 
2845
            def test_foo(self):
 
2846
                pass
 
2847
        suite = Stub("test_foo")
 
2848
        calls = []
 
2849
        class MyRunner(tests.TextTestRunner):
 
2850
            def run(self, test):
 
2851
                calls.append(test)
 
2852
                return tests.ExtendedTestResult(self.stream, self.descriptions,
 
2853
                                                self.verbosity)
 
2854
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
 
2855
        self.assertLength(1, calls)
 
2856
 
 
2857
    def test_done(self):
 
2858
        """run_suite should call result.done()"""
 
2859
        self.calls = 0
 
2860
        def one_more_call(): self.calls += 1
 
2861
        def test_function():
 
2862
            pass
 
2863
        test = unittest.FunctionTestCase(test_function)
 
2864
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
2865
            def done(self): one_more_call()
 
2866
        class MyRunner(tests.TextTestRunner):
 
2867
            def run(self, test):
 
2868
                return InstrumentedTestResult(self.stream, self.descriptions,
 
2869
                                              self.verbosity)
 
2870
        tests.run_suite(test, runner_class=MyRunner, stream=StringIO())
 
2871
        self.assertEquals(1, self.calls)