1
# Copyright (C) 2005-2013, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the test framework."""
21
from functools import reduce
22
from io import BytesIO, TextIOWrapper
31
from testtools import (
32
ExtendedToOriginalDecorator,
35
from testtools.content import Content
36
from testtools.content_type import ContentType
37
from testtools.matchers import (
41
import testtools.testresult.doubles
68
workingtree as git_workingtree,
70
from ..sixish import (
75
from ..symbol_versioning import (
86
from ..trace import note, mutter
87
from ..transport import memory
90
def _test_ids(test_suite):
91
"""Get the ids for the tests in a test suite."""
92
return [t.id() for t in tests.iter_suite_tests(test_suite)]
95
class MetaTestLog(tests.TestCase):
97
def test_logging(self):
98
"""Test logs are captured when a test fails."""
99
self.log('a test message')
100
details = self.getDetails()
102
self.assertThat(log.content_type, Equals(ContentType(
103
"text", "plain", {"charset": "utf8"})))
104
self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
105
self.assertThat(self.get_log(),
106
DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
109
class TestTreeShape(tests.TestCaseInTempDir):
111
def test_unicode_paths(self):
112
self.requireFeature(features.UnicodeFilenameFeature)
114
filename = u'hell\u00d8'
115
self.build_tree_contents([(filename, b'contents of hello')])
116
self.assertPathExists(filename)
119
class TestClassesAvailable(tests.TestCase):
120
"""As a convenience we expose Test* classes from breezy.tests"""
122
def test_test_case(self):
123
from . import TestCase
125
def test_test_loader(self):
126
from . import TestLoader
128
def test_test_suite(self):
129
from . import TestSuite
132
class TestTransportScenarios(tests.TestCase):
133
"""A group of tests that test the transport implementation adaption core.
135
This is a meta test that the tests are applied to all available
138
This will be generalised in the future which is why it is in this
139
test file even though it is specific to transport tests at the moment.
142
def test_get_transport_permutations(self):
143
# this checks that get_test_permutations defined by the module is
144
# called by the get_transport_test_permutations function.
145
class MockModule(object):
146
def get_test_permutations(self):
147
return sample_permutation
148
sample_permutation = [(1, 2), (3, 4)]
149
from .per_transport import get_transport_test_permutations
150
self.assertEqual(sample_permutation,
151
get_transport_test_permutations(MockModule()))
153
def test_scenarios_include_all_modules(self):
154
# this checks that the scenario generator returns as many permutations
155
# as there are in all the registered transport modules - we assume if
156
# this matches its probably doing the right thing especially in
157
# combination with the tests for setting the right classes below.
158
from .per_transport import transport_test_permutations
159
from ..transport import _get_transport_modules
160
modules = _get_transport_modules()
161
permutation_count = 0
162
for module in modules:
164
permutation_count += len(reduce(getattr,
166
+ ".get_test_permutations").split('.')[1:],
167
__import__(module))())
168
except errors.DependencyNotPresent:
170
scenarios = transport_test_permutations()
171
self.assertEqual(permutation_count, len(scenarios))
173
def test_scenarios_include_transport_class(self):
174
# This test used to know about all the possible transports and the
175
# order they were returned but that seems overly brittle (mbp
177
from .per_transport import transport_test_permutations
178
scenarios = transport_test_permutations()
179
# there are at least that many builtin transports
180
self.assertTrue(len(scenarios) > 6)
181
one_scenario = scenarios[0]
182
self.assertIsInstance(one_scenario[0], str)
183
self.assertTrue(issubclass(one_scenario[1]["transport_class"],
184
breezy.transport.Transport))
185
self.assertTrue(issubclass(one_scenario[1]["transport_server"],
186
breezy.transport.Server))
189
class TestBranchScenarios(tests.TestCase):
191
def test_scenarios(self):
192
# check that constructor parameters are passed through to the adapted
194
from .per_branch import make_scenarios
197
formats = [("c", "C"), ("d", "D")]
198
scenarios = make_scenarios(server1, server2, formats)
199
self.assertEqual(2, len(scenarios))
202
{'branch_format': 'c',
203
'bzrdir_format': 'C',
204
'transport_readonly_server': 'b',
205
'transport_server': 'a'}),
207
{'branch_format': 'd',
208
'bzrdir_format': 'D',
209
'transport_readonly_server': 'b',
210
'transport_server': 'a'})],
214
class TestBzrDirScenarios(tests.TestCase):
216
def test_scenarios(self):
217
# check that constructor parameters are passed through to the adapted
219
from .per_controldir import make_scenarios
224
scenarios = make_scenarios(vfs_factory, server1, server2, formats)
227
{'bzrdir_format': 'c',
228
'transport_readonly_server': 'b',
229
'transport_server': 'a',
230
'vfs_transport_factory': 'v'}),
232
{'bzrdir_format': 'd',
233
'transport_readonly_server': 'b',
234
'transport_server': 'a',
235
'vfs_transport_factory': 'v'})],
239
class TestRepositoryScenarios(tests.TestCase):
241
def test_formats_to_scenarios(self):
242
from .per_repository import formats_to_scenarios
243
formats = [("(c)", remote.RemoteRepositoryFormat()),
244
("(d)", repository.format_registry.get(
245
b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
246
no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
248
vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
249
vfs_transport_factory="vfs")
250
# no_vfs generate scenarios without vfs_transport_factory
252
('RemoteRepositoryFormat(c)',
253
{'bzrdir_format': remote.RemoteBzrDirFormat(),
254
'repository_format': remote.RemoteRepositoryFormat(),
255
'transport_readonly_server': 'readonly',
256
'transport_server': 'server'}),
257
('RepositoryFormat2a(d)',
258
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
259
'repository_format': groupcompress_repo.RepositoryFormat2a(),
260
'transport_readonly_server': 'readonly',
261
'transport_server': 'server'})]
262
self.assertEqual(expected, no_vfs_scenarios)
264
('RemoteRepositoryFormat(c)',
265
{'bzrdir_format': remote.RemoteBzrDirFormat(),
266
'repository_format': remote.RemoteRepositoryFormat(),
267
'transport_readonly_server': 'readonly',
268
'transport_server': 'server',
269
'vfs_transport_factory': 'vfs'}),
270
('RepositoryFormat2a(d)',
271
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
272
'repository_format': groupcompress_repo.RepositoryFormat2a(),
273
'transport_readonly_server': 'readonly',
274
'transport_server': 'server',
275
'vfs_transport_factory': 'vfs'})],
279
class TestTestScenarioApplication(tests.TestCase):
280
"""Tests for the test adaption facilities."""
282
def test_apply_scenario(self):
283
from breezy.tests import apply_scenario
284
input_test = TestTestScenarioApplication("test_apply_scenario")
285
# setup two adapted tests
286
adapted_test1 = apply_scenario(input_test,
288
{"bzrdir_format": "bzr_format",
289
"repository_format": "repo_fmt",
290
"transport_server": "transport_server",
291
"transport_readonly_server": "readonly-server"}))
292
adapted_test2 = apply_scenario(input_test,
293
("new id 2", {"bzrdir_format": None}))
294
# input_test should have been altered.
295
self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
296
# the new tests are mutually incompatible, ensuring it has
297
# made new ones, and unspecified elements in the scenario
298
# should not have been altered.
299
self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
300
self.assertEqual("repo_fmt", adapted_test1.repository_format)
301
self.assertEqual("transport_server", adapted_test1.transport_server)
302
self.assertEqual("readonly-server",
303
adapted_test1.transport_readonly_server)
305
"breezy.tests.test_selftest.TestTestScenarioApplication."
306
"test_apply_scenario(new id)",
308
self.assertEqual(None, adapted_test2.bzrdir_format)
310
"breezy.tests.test_selftest.TestTestScenarioApplication."
311
"test_apply_scenario(new id 2)",
315
class TestInterRepositoryScenarios(tests.TestCase):
317
def test_scenarios(self):
318
# check that constructor parameters are passed through to the adapted
320
from .per_interrepository import make_scenarios
323
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
324
scenarios = make_scenarios(server1, server2, formats)
327
{'repository_format': 'C1',
328
'repository_format_to': 'C2',
329
'transport_readonly_server': 'b',
330
'transport_server': 'a',
331
'extra_setup': 'C3'}),
333
{'repository_format': 'D1',
334
'repository_format_to': 'D2',
335
'transport_readonly_server': 'b',
336
'transport_server': 'a',
337
'extra_setup': 'D3'})],
341
class TestWorkingTreeScenarios(tests.TestCase):
343
def test_scenarios(self):
344
# check that constructor parameters are passed through to the adapted
346
from .per_workingtree import make_scenarios
349
formats = [workingtree_4.WorkingTreeFormat4(),
350
workingtree_3.WorkingTreeFormat3(),
351
workingtree_4.WorkingTreeFormat6()]
352
scenarios = make_scenarios(server1, server2, formats,
353
remote_server='c', remote_readonly_server='d',
354
remote_backing_server='e')
356
('WorkingTreeFormat4',
357
{'bzrdir_format': formats[0]._matchingcontroldir,
358
'transport_readonly_server': 'b',
359
'transport_server': 'a',
360
'workingtree_format': formats[0]}),
361
('WorkingTreeFormat3',
362
{'bzrdir_format': formats[1]._matchingcontroldir,
363
'transport_readonly_server': 'b',
364
'transport_server': 'a',
365
'workingtree_format': formats[1]}),
366
('WorkingTreeFormat6',
367
{'bzrdir_format': formats[2]._matchingcontroldir,
368
'transport_readonly_server': 'b',
369
'transport_server': 'a',
370
'workingtree_format': formats[2]}),
371
('WorkingTreeFormat6,remote',
372
{'bzrdir_format': formats[2]._matchingcontroldir,
373
'repo_is_remote': True,
374
'transport_readonly_server': 'd',
375
'transport_server': 'c',
376
'vfs_transport_factory': 'e',
377
'workingtree_format': formats[2]}),
381
class TestTreeScenarios(tests.TestCase):
383
def test_scenarios(self):
384
# the tree implementation scenario generator is meant to setup one
385
# instance for each working tree format, one additional instance
386
# that will use the default wt format, but create a revision tree for
387
# the tests, and one more that uses the default wt format as a
388
# lightweight checkout of a remote repository. This means that the wt
389
# ones should have the workingtree_to_test_tree attribute set to
390
# 'return_parameter' and the revision one set to
391
# revision_tree_from_workingtree.
393
from .per_tree import (
394
_dirstate_tree_from_workingtree,
399
revision_tree_from_workingtree
403
smart_server = test_server.SmartTCPServer_for_testing
404
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
405
mem_server = memory.MemoryServer
406
formats = [workingtree_4.WorkingTreeFormat4(),
407
workingtree_3.WorkingTreeFormat3(), ]
408
scenarios = make_scenarios(server1, server2, formats)
409
self.assertEqual(9, len(scenarios))
410
default_wt_format = workingtree.format_registry.get_default()
411
wt4_format = workingtree_4.WorkingTreeFormat4()
412
wt5_format = workingtree_4.WorkingTreeFormat5()
413
wt6_format = workingtree_4.WorkingTreeFormat6()
414
git_wt_format = git_workingtree.GitWorkingTreeFormat()
415
expected_scenarios = [
416
('WorkingTreeFormat4',
417
{'bzrdir_format': formats[0]._matchingcontroldir,
418
'transport_readonly_server': 'b',
419
'transport_server': 'a',
420
'workingtree_format': formats[0],
421
'_workingtree_to_test_tree': return_parameter,
423
('WorkingTreeFormat3',
424
{'bzrdir_format': formats[1]._matchingcontroldir,
425
'transport_readonly_server': 'b',
426
'transport_server': 'a',
427
'workingtree_format': formats[1],
428
'_workingtree_to_test_tree': return_parameter,
430
('WorkingTreeFormat6,remote',
431
{'bzrdir_format': wt6_format._matchingcontroldir,
432
'repo_is_remote': True,
433
'transport_readonly_server': smart_readonly_server,
434
'transport_server': smart_server,
435
'vfs_transport_factory': mem_server,
436
'workingtree_format': wt6_format,
437
'_workingtree_to_test_tree': return_parameter,
440
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
441
'bzrdir_format': default_wt_format._matchingcontroldir,
442
'transport_readonly_server': 'b',
443
'transport_server': 'a',
444
'workingtree_format': default_wt_format,
447
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
448
'bzrdir_format': git_wt_format._matchingcontroldir,
449
'transport_readonly_server': 'b',
450
'transport_server': 'a',
451
'workingtree_format': git_wt_format,
454
('DirStateRevisionTree,WT4',
455
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
456
'bzrdir_format': wt4_format._matchingcontroldir,
457
'transport_readonly_server': 'b',
458
'transport_server': 'a',
459
'workingtree_format': wt4_format,
461
('DirStateRevisionTree,WT5',
462
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
463
'bzrdir_format': wt5_format._matchingcontroldir,
464
'transport_readonly_server': 'b',
465
'transport_server': 'a',
466
'workingtree_format': wt5_format,
469
{'_workingtree_to_test_tree': preview_tree_pre,
470
'bzrdir_format': default_wt_format._matchingcontroldir,
471
'transport_readonly_server': 'b',
472
'transport_server': 'a',
473
'workingtree_format': default_wt_format}),
475
{'_workingtree_to_test_tree': preview_tree_post,
476
'bzrdir_format': default_wt_format._matchingcontroldir,
477
'transport_readonly_server': 'b',
478
'transport_server': 'a',
479
'workingtree_format': default_wt_format}),
481
self.assertEqual(expected_scenarios, scenarios)
484
class TestInterTreeScenarios(tests.TestCase):
485
"""A group of tests that test the InterTreeTestAdapter."""
487
def test_scenarios(self):
488
# check that constructor parameters are passed through to the adapted
490
# for InterTree tests we want the machinery to bring up two trees in
491
# each instance: the base one, and the one we are interacting with.
492
# because each optimiser can be direction specific, we need to test
493
# each optimiser in its chosen direction.
494
# unlike the TestProviderAdapter we dont want to automatically add a
495
# parameterized one for WorkingTree - the optimisers will tell us what
497
from .per_tree import (
500
from .per_intertree import (
503
from ..bzr.workingtree_3 import WorkingTreeFormat3
504
from ..bzr.workingtree_4 import WorkingTreeFormat4
505
input_test = TestInterTreeScenarios(
509
format1 = WorkingTreeFormat4()
510
format2 = WorkingTreeFormat3()
511
formats = [("1", str, format1, format2, "converter1"),
512
("2", int, format2, format1, "converter2")]
513
scenarios = make_scenarios(server1, server2, formats)
514
self.assertEqual(2, len(scenarios))
515
expected_scenarios = [
517
"bzrdir_format": format1._matchingcontroldir,
518
"intertree_class": formats[0][1],
519
"workingtree_format": formats[0][2],
520
"workingtree_format_to": formats[0][3],
521
"mutable_trees_to_test_trees": formats[0][4],
522
"_workingtree_to_test_tree": return_parameter,
523
"transport_server": server1,
524
"transport_readonly_server": server2,
527
"bzrdir_format": format2._matchingcontroldir,
528
"intertree_class": formats[1][1],
529
"workingtree_format": formats[1][2],
530
"workingtree_format_to": formats[1][3],
531
"mutable_trees_to_test_trees": formats[1][4],
532
"_workingtree_to_test_tree": return_parameter,
533
"transport_server": server1,
534
"transport_readonly_server": server2,
537
self.assertEqual(scenarios, expected_scenarios)
540
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
542
def test_home_is_not_working(self):
543
self.assertNotEqual(self.test_dir, self.test_home_dir)
544
cwd = osutils.getcwd()
545
self.assertIsSameRealPath(self.test_dir, cwd)
546
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
548
def test_assertEqualStat_equal(self):
549
from .test_dirstate import _FakeStat
550
self.build_tree(["foo"])
551
real = os.lstat("foo")
552
fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
553
real.st_dev, real.st_ino, real.st_mode)
554
self.assertEqualStat(real, fake)
556
def test_assertEqualStat_notequal(self):
557
self.build_tree(["foo", "longname"])
558
self.assertRaises(AssertionError, self.assertEqualStat,
559
os.lstat("foo"), os.lstat("longname"))
561
def test_assertPathExists(self):
562
self.assertPathExists('.')
563
self.build_tree(['foo/', 'foo/bar'])
564
self.assertPathExists('foo/bar')
565
self.assertPathDoesNotExist('foo/foo')
568
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
570
def test_home_is_non_existant_dir_under_root(self):
571
"""The test_home_dir for TestCaseWithMemoryTransport is missing.
573
This is because TestCaseWithMemoryTransport is for tests that do not
574
need any disk resources: they should be hooked into breezy in such a
575
way that no global settings are being changed by the test (only a
576
few tests should need to do that), and having a missing dir as home is
577
an effective way to ensure that this is the case.
579
self.assertIsSameRealPath(
580
self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
582
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
584
def test_cwd_is_TEST_ROOT(self):
585
self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
586
cwd = osutils.getcwd()
587
self.assertIsSameRealPath(self.test_dir, cwd)
589
def test_BRZ_HOME_and_HOME_are_bytestrings(self):
590
"""The $BRZ_HOME and $HOME environment variables should not be unicode.
592
See https://bugs.launchpad.net/bzr/+bug/464174
594
self.assertIsInstance(os.environ['BRZ_HOME'], str)
595
self.assertIsInstance(os.environ['HOME'], str)
597
def test_make_branch_and_memory_tree(self):
598
"""In TestCaseWithMemoryTransport we should not make the branch on disk.
600
This is hard to comprehensively robustly test, so we settle for making
601
a branch and checking no directory was created at its relpath.
603
tree = self.make_branch_and_memory_tree('dir')
604
# Guard against regression into MemoryTransport leaking
605
# files to disk instead of keeping them in memory.
606
self.assertFalse(osutils.lexists('dir'))
607
self.assertIsInstance(tree, memorytree.MemoryTree)
609
def test_make_branch_and_memory_tree_with_format(self):
610
"""make_branch_and_memory_tree should accept a format option."""
611
format = bzrdir.BzrDirMetaFormat1()
612
format.repository_format = repository.format_registry.get_default()
613
tree = self.make_branch_and_memory_tree('dir', format=format)
614
# Guard against regression into MemoryTransport leaking
615
# files to disk instead of keeping them in memory.
616
self.assertFalse(osutils.lexists('dir'))
617
self.assertIsInstance(tree, memorytree.MemoryTree)
618
self.assertEqual(format.repository_format.__class__,
619
tree.branch.repository._format.__class__)
621
def test_make_branch_builder(self):
622
builder = self.make_branch_builder('dir')
623
self.assertIsInstance(builder, branchbuilder.BranchBuilder)
624
# Guard against regression into MemoryTransport leaking
625
# files to disk instead of keeping them in memory.
626
self.assertFalse(osutils.lexists('dir'))
628
def test_make_branch_builder_with_format(self):
629
# Use a repo layout that doesn't conform to a 'named' layout, to ensure
630
# that the format objects are used.
631
format = bzrdir.BzrDirMetaFormat1()
632
repo_format = repository.format_registry.get_default()
633
format.repository_format = repo_format
634
builder = self.make_branch_builder('dir', format=format)
635
the_branch = builder.get_branch()
636
# Guard against regression into MemoryTransport leaking
637
# files to disk instead of keeping them in memory.
638
self.assertFalse(osutils.lexists('dir'))
639
self.assertEqual(format.repository_format.__class__,
640
the_branch.repository._format.__class__)
641
self.assertEqual(repo_format.get_format_string(),
642
self.get_transport().get_bytes(
643
'dir/.bzr/repository/format'))
645
def test_make_branch_builder_with_format_name(self):
646
builder = self.make_branch_builder('dir', format='knit')
647
the_branch = builder.get_branch()
648
# Guard against regression into MemoryTransport leaking
649
# files to disk instead of keeping them in memory.
650
self.assertFalse(osutils.lexists('dir'))
651
dir_format = controldir.format_registry.make_controldir('knit')
652
self.assertEqual(dir_format.repository_format.__class__,
653
the_branch.repository._format.__class__)
654
self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
655
self.get_transport().get_bytes(
656
'dir/.bzr/repository/format'))
658
def test_dangling_locks_cause_failures(self):
659
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
660
def test_function(self):
661
t = self.get_transport_from_path('.')
662
l = lockdir.LockDir(t, 'lock')
665
test = TestDanglingLock('test_function')
667
total_failures = result.errors + result.failures
668
if self._lock_check_thorough:
669
self.assertEqual(1, len(total_failures))
671
# When _lock_check_thorough is disabled, then we don't trigger a
673
self.assertEqual(0, len(total_failures))
676
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
677
"""Tests for the convenience functions TestCaseWithTransport introduces."""
679
def test_get_readonly_url_none(self):
680
from ..transport.readonly import ReadonlyTransportDecorator
681
self.vfs_transport_factory = memory.MemoryServer
682
self.transport_readonly_server = None
683
# calling get_readonly_transport() constructs a decorator on the url
685
url = self.get_readonly_url()
686
url2 = self.get_readonly_url('foo/bar')
687
t = transport.get_transport_from_url(url)
688
t2 = transport.get_transport_from_url(url2)
689
self.assertIsInstance(t, ReadonlyTransportDecorator)
690
self.assertIsInstance(t2, ReadonlyTransportDecorator)
691
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
693
def test_get_readonly_url_http(self):
694
from .http_server import HttpServer
695
from ..transport.http import HttpTransport
696
self.transport_server = test_server.LocalURLServer
697
self.transport_readonly_server = HttpServer
698
# calling get_readonly_transport() gives us a HTTP server instance.
699
url = self.get_readonly_url()
700
url2 = self.get_readonly_url('foo/bar')
701
# the transport returned may be any HttpTransportBase subclass
702
t = transport.get_transport_from_url(url)
703
t2 = transport.get_transport_from_url(url2)
704
self.assertIsInstance(t, HttpTransport)
705
self.assertIsInstance(t2, HttpTransport)
706
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
708
def test_is_directory(self):
709
"""Test assertIsDirectory assertion"""
710
t = self.get_transport()
711
self.build_tree(['a_dir/', 'a_file'], transport=t)
712
self.assertIsDirectory('a_dir', t)
713
self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
715
AssertionError, self.assertIsDirectory, 'not_here', t)
717
def test_make_branch_builder(self):
718
builder = self.make_branch_builder('dir')
719
rev_id = builder.build_commit()
720
self.assertPathExists('dir')
721
a_dir = controldir.ControlDir.open('dir')
722
self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
723
a_branch = a_dir.open_branch()
724
builder_branch = builder.get_branch()
725
self.assertEqual(a_branch.base, builder_branch.base)
726
self.assertEqual((1, rev_id), builder_branch.last_revision_info())
727
self.assertEqual((1, rev_id), a_branch.last_revision_info())
730
class TestTestCaseTransports(tests.TestCaseWithTransport):
733
super(TestTestCaseTransports, self).setUp()
734
self.vfs_transport_factory = memory.MemoryServer
736
def test_make_controldir_preserves_transport(self):
737
t = self.get_transport()
738
result_bzrdir = self.make_controldir('subdir')
739
self.assertIsInstance(result_bzrdir.transport,
740
memory.MemoryTransport)
741
# should not be on disk, should only be in memory
742
self.assertPathDoesNotExist('subdir')
745
class TestChrootedTest(tests.ChrootedTestCase):
747
def test_root_is_root(self):
748
t = transport.get_transport_from_url(self.get_readonly_url())
750
self.assertEqual(url, t.clone('..').base)
753
class TestProfileResult(tests.TestCase):
755
def test_profiles_tests(self):
756
self.requireFeature(features.lsprof_feature)
757
terminal = testtools.testresult.doubles.ExtendedTestResult()
758
result = tests.ProfileResult(terminal)
760
class Sample(tests.TestCase):
762
self.sample_function()
764
def sample_function(self):
768
case = terminal._events[0][1]
769
self.assertLength(1, case._benchcalls)
770
# We must be able to unpack it as the test reporting code wants
771
(_, _, _), stats = case._benchcalls[0]
772
self.assertTrue(callable(stats.pprint))
775
class TestTestResult(tests.TestCase):
777
def check_timing(self, test_case, expected_re):
778
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
779
capture = testtools.testresult.doubles.ExtendedTestResult()
780
test_case.run(MultiTestResult(result, capture))
781
run_case = capture._events[0][1]
782
timed_string = result._testTimeString(run_case)
783
self.assertContainsRe(timed_string, expected_re)
785
def test_test_reporting(self):
786
class ShortDelayTestCase(tests.TestCase):
787
def test_short_delay(self):
790
def test_short_benchmark(self):
791
self.time(time.sleep, 0.003)
792
self.check_timing(ShortDelayTestCase('test_short_delay'),
794
# if a benchmark time is given, we now show just that time followed by
796
self.check_timing(ShortDelayTestCase('test_short_benchmark'),
799
def test_unittest_reporting_unittest_class(self):
800
# getting the time from a non-breezy test works ok
801
class ShortDelayTestCase(unittest.TestCase):
802
def test_short_delay(self):
804
self.check_timing(ShortDelayTestCase('test_short_delay'),
807
def _time_hello_world_encoding(self):
808
"""Profile two sleep calls
810
This is used to exercise the test framework.
812
self.time(text_type, b'hello', errors='replace')
813
self.time(text_type, b'world', errors='replace')
815
def test_lsprofiling(self):
816
"""Verbose test result prints lsprof statistics from test cases."""
817
self.requireFeature(features.lsprof_feature)
818
result_stream = StringIO()
819
result = breezy.tests.VerboseTestResult(
824
# we want profile a call of some sort and check it is output by
825
# addSuccess. We dont care about addError or addFailure as they
826
# are not that interesting for performance tuning.
827
# make a new test instance that when run will generate a profile
828
example_test_case = TestTestResult("_time_hello_world_encoding")
829
example_test_case._gather_lsprof_in_benchmarks = True
830
# execute the test, which should succeed and record profiles
831
example_test_case.run(result)
832
# lsprofile_something()
833
# if this worked we want
834
# LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
835
# CallCount Recursive Total(ms) Inline(ms) module:lineno(function)
836
# (the lsprof header)
837
# ... an arbitrary number of lines
838
# and the function call which is time.sleep.
839
# 1 0 ??? ??? ???(sleep)
840
# and then repeated but with 'world', rather than 'hello'.
841
# this should appear in the output stream of our test result.
842
output = result_stream.getvalue()
844
self.assertContainsRe(output,
845
r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
846
self.assertContainsRe(output,
847
r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
849
self.assertContainsRe(output,
850
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
851
self.assertContainsRe(output,
852
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
853
self.assertContainsRe(output,
854
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
855
self.assertContainsRe(output,
856
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
858
def test_uses_time_from_testtools(self):
859
"""Test case timings in verbose results should use testtools times"""
862
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
863
def startTest(self, test):
864
self.time(datetime.datetime.utcfromtimestamp(1.145))
865
super(TimeAddedVerboseTestResult, self).startTest(test)
867
def addSuccess(self, test):
868
self.time(datetime.datetime.utcfromtimestamp(51.147))
869
super(TimeAddedVerboseTestResult, self).addSuccess(test)
871
def report_tests_starting(self): pass
873
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
874
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
876
def test_known_failure(self):
877
"""Using knownFailure should trigger several result actions."""
878
class InstrumentedTestResult(tests.ExtendedTestResult):
879
def stopTestRun(self): pass
881
def report_tests_starting(self): pass
883
def report_known_failure(self, test, err=None, details=None):
884
self._call = test, 'known failure'
885
result = InstrumentedTestResult(None, None, None, None)
887
class Test(tests.TestCase):
888
def test_function(self):
889
self.knownFailure('failed!')
890
test = Test("test_function")
892
# it should invoke 'report_known_failure'.
893
self.assertEqual(2, len(result._call))
894
self.assertEqual(test.id(), result._call[0].id())
895
self.assertEqual('known failure', result._call[1])
896
# we dont introspec the traceback, if the rest is ok, it would be
897
# exceptional for it not to be.
898
# it should update the known_failure_count on the object.
899
self.assertEqual(1, result.known_failure_count)
900
# the result should be successful.
901
self.assertTrue(result.wasSuccessful())
903
def test_verbose_report_known_failure(self):
904
# verbose test output formatting
905
result_stream = StringIO()
906
result = breezy.tests.VerboseTestResult(
911
_get_test("test_xfail").run(result)
912
self.assertContainsRe(result_stream.getvalue(),
913
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
914
"\\s*(?:Text attachment: )?reason"
919
def get_passing_test(self):
920
"""Return a test object that can't be run usefully."""
923
return unittest.FunctionTestCase(passing_test)
925
def test_add_not_supported(self):
926
"""Test the behaviour of invoking addNotSupported."""
927
class InstrumentedTestResult(tests.ExtendedTestResult):
928
def stopTestRun(self): pass
930
def report_tests_starting(self): pass
932
def report_unsupported(self, test, feature):
933
self._call = test, feature
934
result = InstrumentedTestResult(None, None, None, None)
935
test = SampleTestCase('_test_pass')
936
feature = features.Feature()
937
result.startTest(test)
938
result.addNotSupported(test, feature)
939
# it should invoke 'report_unsupported'.
940
self.assertEqual(2, len(result._call))
941
self.assertEqual(test, result._call[0])
942
self.assertEqual(feature, result._call[1])
943
# the result should be successful.
944
self.assertTrue(result.wasSuccessful())
945
# it should record the test against a count of tests not run due to
947
self.assertEqual(1, result.unsupported['Feature'])
948
# and invoking it again should increment that counter
949
result.addNotSupported(test, feature)
950
self.assertEqual(2, result.unsupported['Feature'])
952
def test_verbose_report_unsupported(self):
953
# verbose test output formatting
954
result_stream = StringIO()
955
result = breezy.tests.VerboseTestResult(
960
test = self.get_passing_test()
961
feature = features.Feature()
962
result.startTest(test)
963
prefix = len(result_stream.getvalue())
964
result.report_unsupported(test, feature)
965
output = result_stream.getvalue()[prefix:]
966
lines = output.splitlines()
967
# We don't check for the final '0ms' since it may fail on slow hosts
968
self.assertStartsWith(lines[0], 'NODEP')
969
self.assertEqual(lines[1],
970
" The feature 'Feature' is not available.")
972
def test_unavailable_exception(self):
973
"""An UnavailableFeature being raised should invoke addNotSupported."""
974
class InstrumentedTestResult(tests.ExtendedTestResult):
975
def stopTestRun(self):
978
def report_tests_starting(self):
981
def addNotSupported(self, test, feature):
982
self._call = test, feature
983
result = InstrumentedTestResult(None, None, None, None)
984
feature = features.Feature()
986
class Test(tests.TestCase):
987
def test_function(self):
988
raise tests.UnavailableFeature(feature)
989
test = Test("test_function")
991
# it should invoke 'addNotSupported'.
992
self.assertEqual(2, len(result._call))
993
self.assertEqual(test.id(), result._call[0].id())
994
self.assertEqual(feature, result._call[1])
995
# and not count as an error
996
self.assertEqual(0, result.error_count)
998
def test_strict_with_unsupported_feature(self):
999
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1000
test = self.get_passing_test()
1001
feature = "Unsupported Feature"
1002
result.addNotSupported(test, feature)
1003
self.assertFalse(result.wasStrictlySuccessful())
1004
self.assertEqual(None, result._extractBenchmarkTime(test))
1006
def test_strict_with_known_failure(self):
1007
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1008
test = _get_test("test_xfail")
1010
self.assertFalse(result.wasStrictlySuccessful())
1011
self.assertEqual(None, result._extractBenchmarkTime(test))
1013
def test_strict_with_success(self):
1014
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1015
test = self.get_passing_test()
1016
result.addSuccess(test)
1017
self.assertTrue(result.wasStrictlySuccessful())
1018
self.assertEqual(None, result._extractBenchmarkTime(test))
1020
def test_startTests(self):
1021
"""Starting the first test should trigger startTests."""
1022
class InstrumentedTestResult(tests.ExtendedTestResult):
1025
def startTests(self):
1027
result = InstrumentedTestResult(None, None, None, None)
1029
def test_function():
1031
test = unittest.FunctionTestCase(test_function)
1033
self.assertEqual(1, result.calls)
1035
def test_startTests_only_once(self):
1036
"""With multiple tests startTests should still only be called once"""
1037
class InstrumentedTestResult(tests.ExtendedTestResult):
1040
def startTests(self):
1042
result = InstrumentedTestResult(None, None, None, None)
1043
suite = unittest.TestSuite([
1044
unittest.FunctionTestCase(lambda: None),
1045
unittest.FunctionTestCase(lambda: None)])
1047
self.assertEqual(1, result.calls)
1048
self.assertEqual(2, result.count)
1051
class TestRunner(tests.TestCase):
1053
def dummy_test(self):
1056
def run_test_runner(self, testrunner, test):
1057
"""Run suite in testrunner, saving global state and restoring it.
1059
This current saves and restores:
1060
TestCaseInTempDir.TEST_ROOT
1062
There should be no tests in this file that use
1063
breezy.tests.TextTestRunner without using this convenience method,
1064
because of our use of global state.
1066
old_root = tests.TestCaseInTempDir.TEST_ROOT
1068
tests.TestCaseInTempDir.TEST_ROOT = None
1069
return testrunner.run(test)
1071
tests.TestCaseInTempDir.TEST_ROOT = old_root
1073
def test_known_failure_failed_run(self):
1074
# run a test that generates a known failure which should be printed in
1075
# the final output when real failures occur.
1076
class Test(tests.TestCase):
1077
def known_failure_test(self):
1078
self.expectFailure('failed', self.assertTrue, False)
1079
test = unittest.TestSuite()
1080
test.addTest(Test("known_failure_test"))
1083
raise AssertionError('foo')
1084
test.addTest(unittest.FunctionTestCase(failing_test))
1086
runner = tests.TextTestRunner(stream=stream)
1087
self.run_test_runner(runner, test)
1088
self.assertContainsRe(
1090
'(?sm)^brz selftest.*$'
1092
'^======================================================================\n'
1093
'^FAIL: failing_test\n'
1094
'^----------------------------------------------------------------------\n'
1095
'Traceback \\(most recent call last\\):\n'
1096
' .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1097
' raise AssertionError\\(\'foo\'\\)\n'
1099
'^----------------------------------------------------------------------\n'
1101
'FAILED \\(failures=1, known_failure_count=1\\)'
1104
def test_known_failure_ok_run(self):
1105
# run a test that generates a known failure which should be printed in
1107
class Test(tests.TestCase):
1108
def known_failure_test(self):
1109
self.knownFailure("Never works...")
1110
test = Test("known_failure_test")
1112
runner = tests.TextTestRunner(stream=stream)
1113
self.run_test_runner(runner, test)
1114
self.assertContainsRe(stream.getvalue(),
1117
'Ran 1 test in .*\n'
1119
'OK \\(known_failures=1\\)\n')
1121
def test_unexpected_success_bad(self):
1122
class Test(tests.TestCase):
1123
def test_truth(self):
1124
self.expectFailure("No absolute truth", self.assertTrue, True)
1125
runner = tests.TextTestRunner(stream=StringIO())
1126
self.run_test_runner(runner, Test("test_truth"))
1127
self.assertContainsRe(runner.stream.getvalue(),
1129
"FAIL: \\S+\\.test_truth\n"
1132
"\\s*(?:Text attachment: )?reason"
1138
"Ran 1 test in .*\n"
1140
"FAILED \\(failures=1\\)\n\\Z")
1142
def test_result_decorator(self):
1146
class LoggingDecorator(ExtendedToOriginalDecorator):
1147
def startTest(self, test):
1148
ExtendedToOriginalDecorator.startTest(self, test)
1149
calls.append('start')
1150
test = unittest.FunctionTestCase(lambda: None)
1152
runner = tests.TextTestRunner(stream=stream,
1153
result_decorators=[LoggingDecorator])
1154
self.run_test_runner(runner, test)
1155
self.assertLength(1, calls)
1157
def test_skipped_test(self):
1158
# run a test that is skipped, and check the suite as a whole still
1160
# skipping_test must be hidden in here so it's not run as a real test
1161
class SkippingTest(tests.TestCase):
1162
def skipping_test(self):
1163
raise tests.TestSkipped('test intentionally skipped')
1164
runner = tests.TextTestRunner(stream=StringIO())
1165
test = SkippingTest("skipping_test")
1166
result = self.run_test_runner(runner, test)
1167
self.assertTrue(result.wasSuccessful())
1169
def test_skipped_from_setup(self):
1172
class SkippedSetupTest(tests.TestCase):
1175
calls.append('setUp')
1176
self.addCleanup(self.cleanup)
1177
raise tests.TestSkipped('skipped setup')
1179
def test_skip(self):
1180
self.fail('test reached')
1183
calls.append('cleanup')
1185
runner = tests.TextTestRunner(stream=StringIO())
1186
test = SkippedSetupTest('test_skip')
1187
result = self.run_test_runner(runner, test)
1188
self.assertTrue(result.wasSuccessful())
1189
# Check if cleanup was called the right number of times.
1190
self.assertEqual(['setUp', 'cleanup'], calls)
1192
def test_skipped_from_test(self):
1195
class SkippedTest(tests.TestCase):
1198
super(SkippedTest, self).setUp()
1199
calls.append('setUp')
1200
self.addCleanup(self.cleanup)
1202
def test_skip(self):
1203
raise tests.TestSkipped('skipped test')
1206
calls.append('cleanup')
1208
runner = tests.TextTestRunner(stream=StringIO())
1209
test = SkippedTest('test_skip')
1210
result = self.run_test_runner(runner, test)
1211
self.assertTrue(result.wasSuccessful())
1212
# Check if cleanup was called the right number of times.
1213
self.assertEqual(['setUp', 'cleanup'], calls)
1215
def test_not_applicable(self):
1216
# run a test that is skipped because it's not applicable
1217
class Test(tests.TestCase):
1218
def not_applicable_test(self):
1219
raise tests.TestNotApplicable('this test never runs')
1221
runner = tests.TextTestRunner(stream=out, verbosity=2)
1222
test = Test("not_applicable_test")
1223
result = self.run_test_runner(runner, test)
1224
self.log(out.getvalue())
1225
self.assertTrue(result.wasSuccessful())
1226
self.assertTrue(result.wasStrictlySuccessful())
1227
self.assertContainsRe(out.getvalue(),
1228
r'(?m)not_applicable_test * N/A')
1229
self.assertContainsRe(out.getvalue(),
1230
r'(?m)^ this test never runs')
1232
def test_unsupported_features_listed(self):
1233
"""When unsupported features are encountered they are detailed."""
1234
class Feature1(features.Feature):
1238
class Feature2(features.Feature):
1241
# create sample tests
1242
test1 = SampleTestCase('_test_pass')
1243
test1._test_needs_features = [Feature1()]
1244
test2 = SampleTestCase('_test_pass')
1245
test2._test_needs_features = [Feature2()]
1246
test = unittest.TestSuite()
1250
runner = tests.TextTestRunner(stream=stream)
1251
self.run_test_runner(runner, test)
1252
lines = stream.getvalue().splitlines()
1255
"Missing feature 'Feature1' skipped 1 tests.",
1256
"Missing feature 'Feature2' skipped 1 tests.",
1260
def test_verbose_test_count(self):
1261
"""A verbose test run reports the right test count at the start"""
1262
suite = TestUtil.TestSuite([
1263
unittest.FunctionTestCase(lambda:None),
1264
unittest.FunctionTestCase(lambda:None)])
1265
self.assertEqual(suite.countTestCases(), 2)
1267
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1268
# Need to use the CountingDecorator as that's what sets num_tests
1269
self.run_test_runner(runner, tests.CountingDecorator(suite))
1270
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1272
def test_startTestRun(self):
1273
"""run should call result.startTestRun()"""
1276
class LoggingDecorator(ExtendedToOriginalDecorator):
1277
def startTestRun(self):
1278
ExtendedToOriginalDecorator.startTestRun(self)
1279
calls.append('startTestRun')
1280
test = unittest.FunctionTestCase(lambda: None)
1282
runner = tests.TextTestRunner(stream=stream,
1283
result_decorators=[LoggingDecorator])
1284
self.run_test_runner(runner, test)
1285
self.assertLength(1, calls)
1287
def test_stopTestRun(self):
1288
"""run should call result.stopTestRun()"""
1291
class LoggingDecorator(ExtendedToOriginalDecorator):
1292
def stopTestRun(self):
1293
ExtendedToOriginalDecorator.stopTestRun(self)
1294
calls.append('stopTestRun')
1295
test = unittest.FunctionTestCase(lambda: None)
1297
runner = tests.TextTestRunner(stream=stream,
1298
result_decorators=[LoggingDecorator])
1299
self.run_test_runner(runner, test)
1300
self.assertLength(1, calls)
1302
def test_unicode_test_output_on_ascii_stream(self):
1303
"""Showing results should always succeed even on an ascii console"""
1304
class FailureWithUnicode(tests.TestCase):
1305
def test_log_unicode(self):
1307
self.fail("Now print that log!")
1310
out = TextIOWrapper(bio, 'ascii', 'backslashreplace')
1312
bio = out = StringIO()
1313
self.overrideAttr(osutils, "get_terminal_encoding",
1314
lambda trace=False: "ascii")
1315
self.run_test_runner(
1316
tests.TextTestRunner(stream=out),
1317
FailureWithUnicode("test_log_unicode"))
1319
self.assertContainsRe(bio.getvalue(),
1320
b"(?:Text attachment: )?log"
1322
b"\\d+\\.\\d+ \\\\u2606"
1323
b"(?:\n-+\n|}}}\n)")
1326
class SampleTestCase(tests.TestCase):
1328
def _test_pass(self):
1332
class _TestException(Exception):
1336
class TestTestCase(tests.TestCase):
1337
"""Tests that test the core breezy TestCase."""
1339
def test_assertLength_matches_empty(self):
1341
self.assertLength(0, a_list)
1343
def test_assertLength_matches_nonempty(self):
1345
self.assertLength(3, a_list)
1347
def test_assertLength_fails_different(self):
1349
self.assertRaises(AssertionError, self.assertLength, 1, a_list)
1351
def test_assertLength_shows_sequence_in_failure(self):
1353
exception = self.assertRaises(AssertionError, self.assertLength, 2,
1355
self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
1358
def test_base_setUp_not_called_causes_failure(self):
1359
class TestCaseWithBrokenSetUp(tests.TestCase):
1361
pass # does not call TestCase.setUp
1365
test = TestCaseWithBrokenSetUp('test_foo')
1366
result = unittest.TestResult()
1368
self.assertFalse(result.wasSuccessful())
1369
self.assertEqual(1, result.testsRun)
1371
def test_base_tearDown_not_called_causes_failure(self):
1372
class TestCaseWithBrokenTearDown(tests.TestCase):
1374
pass # does not call TestCase.tearDown
1378
test = TestCaseWithBrokenTearDown('test_foo')
1379
result = unittest.TestResult()
1381
self.assertFalse(result.wasSuccessful())
1382
self.assertEqual(1, result.testsRun)
1384
def test_debug_flags_sanitised(self):
1385
"""The breezy debug flags should be sanitised by setUp."""
1386
if 'allow_debug' in tests.selftest_debug_flags:
1387
raise tests.TestNotApplicable(
1388
'-Eallow_debug option prevents debug flag sanitisation')
1389
# we could set something and run a test that will check
1390
# it gets santised, but this is probably sufficient for now:
1391
# if someone runs the test with -Dsomething it will error.
1393
if self._lock_check_thorough:
1394
flags.add('strict_locks')
1395
self.assertEqual(flags, breezy.debug.debug_flags)
1397
def change_selftest_debug_flags(self, new_flags):
1398
self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
1400
def test_allow_debug_flag(self):
1401
"""The -Eallow_debug flag prevents breezy.debug.debug_flags from being
1402
sanitised (i.e. cleared) before running a test.
1404
self.change_selftest_debug_flags({'allow_debug'})
1405
breezy.debug.debug_flags = {'a-flag'}
1407
class TestThatRecordsFlags(tests.TestCase):
1408
def test_foo(nested_self):
1409
self.flags = set(breezy.debug.debug_flags)
1410
test = TestThatRecordsFlags('test_foo')
1411
test.run(self.make_test_result())
1413
if 'disable_lock_checks' not in tests.selftest_debug_flags:
1414
flags.add('strict_locks')
1415
self.assertEqual(flags, self.flags)
1417
def test_disable_lock_checks(self):
1418
"""The -Edisable_lock_checks flag disables thorough checks."""
1419
class TestThatRecordsFlags(tests.TestCase):
1420
def test_foo(nested_self):
1421
self.flags = set(breezy.debug.debug_flags)
1422
self.test_lock_check_thorough = nested_self._lock_check_thorough
1423
self.change_selftest_debug_flags(set())
1424
test = TestThatRecordsFlags('test_foo')
1425
test.run(self.make_test_result())
1426
# By default we do strict lock checking and thorough lock/unlock
1428
self.assertTrue(self.test_lock_check_thorough)
1429
self.assertEqual({'strict_locks'}, self.flags)
1430
# Now set the disable_lock_checks flag, and show that this changed.
1431
self.change_selftest_debug_flags({'disable_lock_checks'})
1432
test = TestThatRecordsFlags('test_foo')
1433
test.run(self.make_test_result())
1434
self.assertFalse(self.test_lock_check_thorough)
1435
self.assertEqual(set(), self.flags)
1437
def test_this_fails_strict_lock_check(self):
1438
class TestThatRecordsFlags(tests.TestCase):
1439
def test_foo(nested_self):
1440
self.flags1 = set(breezy.debug.debug_flags)
1441
self.thisFailsStrictLockCheck()
1442
self.flags2 = set(breezy.debug.debug_flags)
1443
# Make sure lock checking is active
1444
self.change_selftest_debug_flags(set())
1445
test = TestThatRecordsFlags('test_foo')
1446
test.run(self.make_test_result())
1447
self.assertEqual({'strict_locks'}, self.flags1)
1448
self.assertEqual(set(), self.flags2)
1450
def test_debug_flags_restored(self):
1451
"""The breezy debug flags should be restored to their original state
1452
after the test was run, even if allow_debug is set.
1454
self.change_selftest_debug_flags({'allow_debug'})
1455
# Now run a test that modifies debug.debug_flags.
1456
breezy.debug.debug_flags = {'original-state'}
1458
class TestThatModifiesFlags(tests.TestCase):
1460
breezy.debug.debug_flags = {'modified'}
1461
test = TestThatModifiesFlags('test_foo')
1462
test.run(self.make_test_result())
1463
self.assertEqual({'original-state'}, breezy.debug.debug_flags)
1465
def make_test_result(self):
1466
"""Get a test result that writes to a StringIO."""
1467
return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1469
def inner_test(self):
1470
# the inner child test
1473
def outer_child(self):
1474
# the outer child test
1476
self.inner_test = TestTestCase("inner_child")
1477
result = self.make_test_result()
1478
self.inner_test.run(result)
1479
note("outer finish")
1480
self.addCleanup(osutils.delete_any, self._log_file_name)
1482
def test_trace_nesting(self):
1483
# this tests that each test case nests its trace facility correctly.
1484
# we do this by running a test case manually. That test case (A)
1485
# should setup a new log, log content to it, setup a child case (B),
1486
# which should log independently, then case (A) should log a trailer
1488
# we do two nested children so that we can verify the state of the
1489
# logs after the outer child finishes is correct, which a bad clean
1490
# up routine in tearDown might trigger a fault in our test with only
1491
# one child, we should instead see the bad result inside our test with
1493
# the outer child test
1494
original_trace = breezy.trace._trace_file
1495
outer_test = TestTestCase("outer_child")
1496
result = self.make_test_result()
1497
outer_test.run(result)
1498
self.assertEqual(original_trace, breezy.trace._trace_file)
1500
def method_that_times_a_bit_twice(self):
1501
# call self.time twice to ensure it aggregates
1502
self.time(time.sleep, 0.007)
1503
self.time(time.sleep, 0.007)
1505
def test_time_creates_benchmark_in_result(self):
1506
"""The TestCase.time() method accumulates a benchmark time."""
1507
sample_test = TestTestCase("method_that_times_a_bit_twice")
1508
output_stream = StringIO()
1509
result = breezy.tests.VerboseTestResult(
1513
sample_test.run(result)
1514
self.assertContainsRe(
1515
output_stream.getvalue(),
1518
def test_hooks_sanitised(self):
1519
"""The breezy hooks should be sanitised by setUp."""
1520
# Note this test won't fail with hooks that the core library doesn't
1521
# use - but it trigger with a plugin that adds hooks, so its still a
1522
# useful warning in that case.
1523
self.assertEqual(breezy.branch.BranchHooks(),
1524
breezy.branch.Branch.hooks)
1526
breezy.bzr.smart.server.SmartServerHooks(),
1527
breezy.bzr.smart.server.SmartTCPServer.hooks)
1529
breezy.commands.CommandHooks(), breezy.commands.Command.hooks)
1531
def test__gather_lsprof_in_benchmarks(self):
1532
"""When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1534
Each self.time() call is individually and separately profiled.
1536
self.requireFeature(features.lsprof_feature)
1537
# overrides the class member with an instance member so no cleanup
1539
self._gather_lsprof_in_benchmarks = True
1540
self.time(time.sleep, 0.000)
1541
self.time(time.sleep, 0.003)
1542
self.assertEqual(2, len(self._benchcalls))
1543
self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
1544
self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1545
self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats)
1546
self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats)
1547
del self._benchcalls[:]
1549
def test_knownFailure(self):
1550
"""Self.knownFailure() should raise a KnownFailure exception."""
1551
self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1553
def test_open_bzrdir_safe_roots(self):
1554
# even a memory transport should fail to open when its url isn't
1556
# Manually set one up (TestCase doesn't and shouldn't provide magic
1558
transport_server = memory.MemoryServer()
1559
transport_server.start_server()
1560
self.addCleanup(transport_server.stop_server)
1561
t = transport.get_transport_from_url(transport_server.get_url())
1562
controldir.ControlDir.create(t.base)
1563
self.assertRaises(errors.BzrError,
1564
controldir.ControlDir.open_from_transport, t)
1565
# But if we declare this as safe, we can open the bzrdir.
1566
self.permit_url(t.base)
1567
self._bzr_selftest_roots.append(t.base)
1568
controldir.ControlDir.open_from_transport(t)
1570
def test_requireFeature_available(self):
1571
"""self.requireFeature(available) is a no-op."""
1572
class Available(features.Feature):
1575
feature = Available()
1576
self.requireFeature(feature)
1578
def test_requireFeature_unavailable(self):
1579
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1580
class Unavailable(features.Feature):
1583
feature = Unavailable()
1584
self.assertRaises(tests.UnavailableFeature,
1585
self.requireFeature, feature)
1587
def test_run_no_parameters(self):
1588
test = SampleTestCase('_test_pass')
1591
def test_run_enabled_unittest_result(self):
1592
"""Test we revert to regular behaviour when the test is enabled."""
1593
test = SampleTestCase('_test_pass')
1595
class EnabledFeature(object):
1596
def available(self):
1598
test._test_needs_features = [EnabledFeature()]
1599
result = unittest.TestResult()
1601
self.assertEqual(1, result.testsRun)
1602
self.assertEqual([], result.errors)
1603
self.assertEqual([], result.failures)
1605
def test_run_disabled_unittest_result(self):
1606
"""Test our compatibility for disabled tests with unittest results."""
1607
test = SampleTestCase('_test_pass')
1609
class DisabledFeature(object):
1610
def available(self):
1612
test._test_needs_features = [DisabledFeature()]
1613
result = unittest.TestResult()
1615
self.assertEqual(1, result.testsRun)
1616
self.assertEqual([], result.errors)
1617
self.assertEqual([], result.failures)
1619
def test_run_disabled_supporting_result(self):
1620
"""Test disabled tests behaviour with support aware results."""
1621
test = SampleTestCase('_test_pass')
1623
class DisabledFeature(object):
1624
def __eq__(self, other):
1625
return isinstance(other, DisabledFeature)
1627
def available(self):
1629
the_feature = DisabledFeature()
1630
test._test_needs_features = [the_feature]
1632
class InstrumentedTestResult(unittest.TestResult):
1634
unittest.TestResult.__init__(self)
1637
def startTest(self, test):
1638
self.calls.append(('startTest', test))
1640
def stopTest(self, test):
1641
self.calls.append(('stopTest', test))
1643
def addNotSupported(self, test, feature):
1644
self.calls.append(('addNotSupported', test, feature))
1645
result = InstrumentedTestResult()
1647
case = result.calls[0][1]
1649
('startTest', case),
1650
('addNotSupported', case, the_feature),
1655
def test_start_server_registers_url(self):
1656
transport_server = memory.MemoryServer()
1657
# A little strict, but unlikely to be changed soon.
1658
self.assertEqual([], self._bzr_selftest_roots)
1659
self.start_server(transport_server)
1660
self.assertSubset([transport_server.get_url()],
1661
self._bzr_selftest_roots)
1663
def test_assert_list_raises_on_generator(self):
1664
def generator_which_will_raise():
1665
# This will not raise until after the first yield
1667
raise _TestException()
1669
e = self.assertListRaises(_TestException, generator_which_will_raise)
1670
self.assertIsInstance(e, _TestException)
1672
e = self.assertListRaises(Exception, generator_which_will_raise)
1673
self.assertIsInstance(e, _TestException)
1675
def test_assert_list_raises_on_plain(self):
1676
def plain_exception():
1677
raise _TestException()
1680
e = self.assertListRaises(_TestException, plain_exception)
1681
self.assertIsInstance(e, _TestException)
1683
e = self.assertListRaises(Exception, plain_exception)
1684
self.assertIsInstance(e, _TestException)
1686
def test_assert_list_raises_assert_wrong_exception(self):
1687
class _NotTestException(Exception):
1690
def wrong_exception():
1691
raise _NotTestException()
1693
def wrong_exception_generator():
1696
raise _NotTestException()
1698
# Wrong exceptions are not intercepted
1701
self.assertListRaises, _TestException, wrong_exception)
1704
self.assertListRaises, _TestException, wrong_exception_generator)
1706
def test_assert_list_raises_no_exception(self):
1710
def success_generator():
1714
self.assertRaises(AssertionError,
1715
self.assertListRaises, _TestException, success)
1719
self.assertListRaises, _TestException, success_generator)
1721
def _run_successful_test(self, test):
1722
result = testtools.TestResult()
1724
self.assertTrue(result.wasSuccessful())
1727
def test_overrideAttr_without_value(self):
1728
self.test_attr = 'original' # Define a test attribute
1729
obj = self # Make 'obj' visible to the embedded test
1731
class Test(tests.TestCase):
1734
super(Test, self).setUp()
1735
self.orig = self.overrideAttr(obj, 'test_attr')
1737
def test_value(self):
1738
self.assertEqual('original', self.orig)
1739
self.assertEqual('original', obj.test_attr)
1740
obj.test_attr = 'modified'
1741
self.assertEqual('modified', obj.test_attr)
1743
self._run_successful_test(Test('test_value'))
1744
self.assertEqual('original', obj.test_attr)
1746
def test_overrideAttr_with_value(self):
1747
self.test_attr = 'original' # Define a test attribute
1748
obj = self # Make 'obj' visible to the embedded test
1750
class Test(tests.TestCase):
1753
super(Test, self).setUp()
1754
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1756
def test_value(self):
1757
self.assertEqual('original', self.orig)
1758
self.assertEqual('modified', obj.test_attr)
1760
self._run_successful_test(Test('test_value'))
1761
self.assertEqual('original', obj.test_attr)
1763
def test_overrideAttr_with_no_existing_value_and_value(self):
1764
# Do not define the test_attribute
1765
obj = self # Make 'obj' visible to the embedded test
1767
class Test(tests.TestCase):
1770
tests.TestCase.setUp(self)
1771
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1773
def test_value(self):
1774
self.assertEqual(tests._unitialized_attr, self.orig)
1775
self.assertEqual('modified', obj.test_attr)
1777
self._run_successful_test(Test('test_value'))
1778
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1780
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1781
# Do not define the test_attribute
1782
obj = self # Make 'obj' visible to the embedded test
1784
class Test(tests.TestCase):
1787
tests.TestCase.setUp(self)
1788
self.orig = self.overrideAttr(obj, 'test_attr')
1790
def test_value(self):
1791
self.assertEqual(tests._unitialized_attr, self.orig)
1792
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1794
self._run_successful_test(Test('test_value'))
1795
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1797
def test_recordCalls(self):
1798
from breezy.tests import test_selftest
1799
calls = self.recordCalls(
1800
test_selftest, '_add_numbers')
1801
self.assertEqual(test_selftest._add_numbers(2, 10),
1803
self.assertEqual(calls, [((2, 10), {})])
1806
def _add_numbers(a, b):
1810
class _MissingFeature(features.Feature):
1815
missing_feature = _MissingFeature()
1818
def _get_test(name):
1819
"""Get an instance of a specific example test.
1821
We protect this in a function so that they don't auto-run in the test
1825
class ExampleTests(tests.TestCase):
1827
def test_fail(self):
1828
mutter('this was a failing test')
1829
self.fail('this test will fail')
1831
def test_error(self):
1832
mutter('this test errored')
1833
raise RuntimeError('gotcha')
1835
def test_missing_feature(self):
1836
mutter('missing the feature')
1837
self.requireFeature(missing_feature)
1839
def test_skip(self):
1840
mutter('this test will be skipped')
1841
raise tests.TestSkipped('reason')
1843
def test_success(self):
1844
mutter('this test succeeds')
1846
def test_xfail(self):
1847
mutter('test with expected failure')
1848
self.knownFailure('this_fails')
1850
def test_unexpected_success(self):
1851
mutter('test with unexpected success')
1852
self.expectFailure('should_fail', lambda: None)
1854
return ExampleTests(name)
1857
class TestTestCaseLogDetails(tests.TestCase):
1859
def _run_test(self, test_name):
1860
test = _get_test(test_name)
1861
result = testtools.TestResult()
1865
def test_fail_has_log(self):
1866
result = self._run_test('test_fail')
1867
self.assertEqual(1, len(result.failures))
1868
result_content = result.failures[0][1]
1869
self.assertContainsRe(result_content,
1870
'(?m)^(?:Text attachment: )?log(?:$|: )')
1871
self.assertContainsRe(result_content, 'this was a failing test')
1873
def test_error_has_log(self):
1874
result = self._run_test('test_error')
1875
self.assertEqual(1, len(result.errors))
1876
result_content = result.errors[0][1]
1877
self.assertContainsRe(result_content,
1878
'(?m)^(?:Text attachment: )?log(?:$|: )')
1879
self.assertContainsRe(result_content, 'this test errored')
1881
def test_skip_has_no_log(self):
1882
result = self._run_test('test_skip')
1883
reasons = result.skip_reasons
1884
self.assertEqual({'reason'}, set(reasons))
1885
skips = reasons['reason']
1886
self.assertEqual(1, len(skips))
1888
self.assertFalse('log' in test.getDetails())
1890
def test_missing_feature_has_no_log(self):
1891
# testtools doesn't know about addNotSupported, so it just gets
1892
# considered as a skip
1893
result = self._run_test('test_missing_feature')
1894
reasons = result.skip_reasons
1895
self.assertEqual({str(missing_feature)}, set(reasons))
1896
skips = reasons[str(missing_feature)]
1897
self.assertEqual(1, len(skips))
1899
self.assertFalse('log' in test.getDetails())
1901
def test_xfail_has_no_log(self):
1902
result = self._run_test('test_xfail')
1903
self.assertEqual(1, len(result.expectedFailures))
1904
result_content = result.expectedFailures[0][1]
1905
self.assertNotContainsRe(result_content,
1906
'(?m)^(?:Text attachment: )?log(?:$|: )')
1907
self.assertNotContainsRe(result_content, 'test with expected failure')
1909
def test_unexpected_success_has_log(self):
1910
result = self._run_test('test_unexpected_success')
1911
self.assertEqual(1, len(result.unexpectedSuccesses))
1912
# Inconsistency, unexpectedSuccesses is a list of tests,
1913
# expectedFailures is a list of reasons?
1914
test = result.unexpectedSuccesses[0]
1915
details = test.getDetails()
1916
self.assertTrue('log' in details)
1919
class TestTestCloning(tests.TestCase):
1920
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1922
def test_cloned_testcase_does_not_share_details(self):
1923
"""A TestCase cloned with clone_test does not share mutable attributes
1924
such as details or cleanups.
1926
class Test(tests.TestCase):
1928
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1929
orig_test = Test('test_foo')
1930
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1931
orig_test.run(unittest.TestResult())
1932
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1933
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1935
def test_double_apply_scenario_preserves_first_scenario(self):
1936
"""Applying two levels of scenarios to a test preserves the attributes
1937
added by both scenarios.
1939
class Test(tests.TestCase):
1942
test = Test('test_foo')
1943
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1944
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1945
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1946
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1947
all_tests = list(tests.iter_suite_tests(suite))
1948
self.assertLength(4, all_tests)
1949
all_xys = sorted((t.x, t.y) for t in all_tests)
1950
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1953
# NB: Don't delete this; it's not actually from 0.11!
1954
@deprecated_function(deprecated_in((0, 11, 0)))
1955
def sample_deprecated_function():
1956
"""A deprecated function to test applyDeprecated with."""
1960
def sample_undeprecated_function(a_param):
1961
"""A undeprecated function to test applyDeprecated with."""
1964
class ApplyDeprecatedHelper(object):
1965
"""A helper class for ApplyDeprecated tests."""
1967
@deprecated_method(deprecated_in((0, 11, 0)))
1968
def sample_deprecated_method(self, param_one):
1969
"""A deprecated method for testing with."""
1972
def sample_normal_method(self):
1973
"""A undeprecated method."""
1975
@deprecated_method(deprecated_in((0, 10, 0)))
1976
def sample_nested_deprecation(self):
1977
return sample_deprecated_function()
1980
class TestExtraAssertions(tests.TestCase):
1981
"""Tests for new test assertions in breezy test suite"""
1983
def test_assert_isinstance(self):
1984
self.assertIsInstance(2, int)
1985
self.assertIsInstance(u'', (str, text_type))
1986
e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1989
["None is an instance of <type 'NoneType'> rather than "
1991
"None is an instance of <class 'NoneType'> rather than "
1993
self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1994
e = self.assertRaises(AssertionError,
1995
self.assertIsInstance, None, int,
2000
"None is an instance of <class 'NoneType'> rather "
2001
"than <class 'int'>: it's just not")
2005
"None is an instance of <type 'NoneType'> "
2006
"rather than <type 'int'>: it's just not")
2008
def test_assertEndsWith(self):
2009
self.assertEndsWith('foo', 'oo')
2010
self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
2012
def test_assertEqualDiff(self):
2013
e = self.assertRaises(AssertionError,
2014
self.assertEqualDiff, '', '\n')
2015
self.assertEqual(str(e),
2016
# Don't blink ! The '+' applies to the second string
2017
'first string is missing a final newline.\n+ \n')
2018
e = self.assertRaises(AssertionError,
2019
self.assertEqualDiff, '\n', '')
2020
self.assertEqual(str(e),
2021
# Don't blink ! The '-' applies to the second string
2022
'second string is missing a final newline.\n- \n')
2025
class TestDeprecations(tests.TestCase):
2027
def test_applyDeprecated_not_deprecated(self):
2028
sample_object = ApplyDeprecatedHelper()
2029
# calling an undeprecated callable raises an assertion
2030
self.assertRaises(AssertionError, self.applyDeprecated,
2031
deprecated_in((0, 11, 0)),
2032
sample_object.sample_normal_method)
2033
self.assertRaises(AssertionError, self.applyDeprecated,
2034
deprecated_in((0, 11, 0)),
2035
sample_undeprecated_function, "a param value")
2036
# calling a deprecated callable (function or method) with the wrong
2037
# expected deprecation fails.
2038
self.assertRaises(AssertionError, self.applyDeprecated,
2039
deprecated_in((0, 10, 0)),
2040
sample_object.sample_deprecated_method,
2042
self.assertRaises(AssertionError, self.applyDeprecated,
2043
deprecated_in((0, 10, 0)),
2044
sample_deprecated_function)
2045
# calling a deprecated callable (function or method) with the right
2046
# expected deprecation returns the functions result.
2049
self.applyDeprecated(
2050
deprecated_in((0, 11, 0)),
2051
sample_object.sample_deprecated_method, "a param value"))
2052
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
2053
sample_deprecated_function))
2054
# calling a nested deprecation with the wrong deprecation version
2055
# fails even if a deeper nested function was deprecated with the
2058
AssertionError, self.applyDeprecated,
2059
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
2060
# calling a nested deprecation with the right deprecation value
2061
# returns the calls result.
2063
2, self.applyDeprecated(
2064
deprecated_in((0, 10, 0)),
2065
sample_object.sample_nested_deprecation))
2067
def test_callDeprecated(self):
2068
def testfunc(be_deprecated, result=None):
2069
if be_deprecated is True:
2070
symbol_versioning.warn('i am deprecated', DeprecationWarning,
2073
result = self.callDeprecated(['i am deprecated'], testfunc, True)
2074
self.assertIs(None, result)
2075
result = self.callDeprecated([], testfunc, False, 'result')
2076
self.assertEqual('result', result)
2077
self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
2078
self.callDeprecated([], testfunc, be_deprecated=False)
2081
class TestWarningTests(tests.TestCase):
2082
"""Tests for calling methods that raise warnings."""
2084
def test_callCatchWarnings(self):
2086
warnings.warn("this is your last warning")
2088
wlist, result = self.callCatchWarnings(meth, 1, 2)
2089
self.assertEqual(3, result)
2090
# would like just to compare them, but UserWarning doesn't implement
2093
self.assertIsInstance(w0, UserWarning)
2094
self.assertEqual("this is your last warning", str(w0))
2097
class TestConvenienceMakers(tests.TestCaseWithTransport):
2098
"""Test for the make_* convenience functions."""
2100
def test_make_branch_and_tree_with_format(self):
2101
# we should be able to supply a format to make_branch_and_tree
2102
self.make_branch_and_tree(
2103
'a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
2104
self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
2105
breezy.bzr.bzrdir.BzrDirMetaFormat1)
2107
def test_make_branch_and_memory_tree(self):
2108
# we should be able to get a new branch and a mutable tree from
2109
# TestCaseWithTransport
2110
tree = self.make_branch_and_memory_tree('a')
2111
self.assertIsInstance(tree, breezy.memorytree.MemoryTree)
2113
def test_make_tree_for_local_vfs_backed_transport(self):
2114
# make_branch_and_tree has to use local branch and repositories
2115
# when the vfs transport and local disk are colocated, even if
2116
# a different transport is in use for url generation.
2117
self.transport_server = test_server.FakeVFATServer
2118
self.assertFalse(self.get_url('t1').startswith('file://'))
2119
tree = self.make_branch_and_tree('t1')
2120
base = tree.controldir.root_transport.base
2121
self.assertStartsWith(base, 'file://')
2122
self.assertEqual(tree.controldir.root_transport,
2123
tree.branch.controldir.root_transport)
2124
self.assertEqual(tree.controldir.root_transport,
2125
tree.branch.repository.controldir.root_transport)
2128
class SelfTestHelper(object):
2130
def run_selftest(self, **kwargs):
2131
"""Run selftest returning its output."""
2134
output = TextIOWrapper(bio, 'utf-8')
2136
bio = output = StringIO()
2137
old_transport = breezy.tests.default_transport
2138
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2139
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2141
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
2143
breezy.tests.default_transport = old_transport
2144
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2152
class TestSelftest(tests.TestCase, SelfTestHelper):
2153
"""Tests of breezy.tests.selftest."""
2155
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(
2160
factory_called.append(True)
2161
return TestUtil.TestSuite()
2164
self.apply_redirected(out, err, None, breezy.tests.selftest,
2165
test_suite_factory=factory)
2166
self.assertEqual([True], factory_called)
2169
"""A test suite factory."""
2170
class Test(tests.TestCase):
2172
return __name__ + ".Test." + self._testMethodName
2182
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2184
def test_list_only(self):
2185
output = self.run_selftest(test_suite_factory=self.factory,
2187
self.assertEqual(3, len(output.readlines()))
2189
def test_list_only_filtered(self):
2190
output = self.run_selftest(test_suite_factory=self.factory,
2191
list_only=True, pattern="Test.b")
2192
self.assertEndsWith(output.getvalue(), b"Test.b\n")
2193
self.assertLength(1, output.readlines())
2195
def test_list_only_excludes(self):
2196
output = self.run_selftest(test_suite_factory=self.factory,
2197
list_only=True, exclude_pattern="Test.b")
2198
self.assertNotContainsRe(b"Test.b", output.getvalue())
2199
self.assertLength(2, output.readlines())
2201
def test_lsprof_tests(self):
2202
self.requireFeature(features.lsprof_feature)
2206
def __call__(test, result):
2209
def run(test, result):
2210
results.append(result)
2212
def countTestCases(self):
2214
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2215
self.assertLength(1, results)
2216
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2218
def test_random(self):
2219
# test randomising by listing a number of tests.
2220
output_123 = self.run_selftest(test_suite_factory=self.factory,
2221
list_only=True, random_seed="123")
2222
output_234 = self.run_selftest(test_suite_factory=self.factory,
2223
list_only=True, random_seed="234")
2224
self.assertNotEqual(output_123, output_234)
2225
# "Randominzing test order..\n\n
2226
self.assertLength(5, output_123.readlines())
2227
self.assertLength(5, output_234.readlines())
2229
def test_random_reuse_is_same_order(self):
2230
# test randomising by listing a number of tests.
2231
expected = self.run_selftest(test_suite_factory=self.factory,
2232
list_only=True, random_seed="123")
2233
repeated = self.run_selftest(test_suite_factory=self.factory,
2234
list_only=True, random_seed="123")
2235
self.assertEqual(expected.getvalue(), repeated.getvalue())
2237
def test_runner_class(self):
2238
self.requireFeature(features.subunit)
2239
from subunit import ProtocolTestCase
2240
stream = self.run_selftest(
2241
runner_class=tests.SubUnitBzrRunnerv1,
2242
test_suite_factory=self.factory)
2243
test = ProtocolTestCase(stream)
2244
result = unittest.TestResult()
2246
self.assertEqual(3, result.testsRun)
2248
def test_starting_with_single_argument(self):
2249
output = self.run_selftest(test_suite_factory=self.factory,
2251
'breezy.tests.test_selftest.Test.a'],
2253
self.assertEqual(b'breezy.tests.test_selftest.Test.a\n',
2256
def test_starting_with_multiple_argument(self):
2257
output = self.run_selftest(
2258
test_suite_factory=self.factory,
2259
starting_with=['breezy.tests.test_selftest.Test.a',
2260
'breezy.tests.test_selftest.Test.b'],
2262
self.assertEqual(b'breezy.tests.test_selftest.Test.a\n'
2263
b'breezy.tests.test_selftest.Test.b\n',
2266
def check_transport_set(self, transport_server):
2267
captured_transport = []
2269
def seen_transport(a_transport):
2270
captured_transport.append(a_transport)
2272
class Capture(tests.TestCase):
2274
seen_transport(breezy.tests.default_transport)
2277
return TestUtil.TestSuite([Capture("a")])
2278
self.run_selftest(transport=transport_server,
2279
test_suite_factory=factory)
2280
self.assertEqual(transport_server, captured_transport[0])
2282
def test_transport_sftp(self):
2283
self.requireFeature(features.paramiko)
2284
from breezy.tests import stub_sftp
2285
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2287
def test_transport_memory(self):
2288
self.check_transport_set(memory.MemoryServer)
2291
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2292
# Does IO: reads test.list
2294
def test_load_list(self):
2295
# Provide a list with one test - this test.
2296
test_id_line = b'%s\n' % self.id().encode('ascii')
2297
self.build_tree_contents([('test.list', test_id_line)])
2298
# And generate a list of the tests in the suite.
2299
stream = self.run_selftest(load_list='test.list', list_only=True)
2300
self.assertEqual(test_id_line, stream.getvalue())
2302
def test_load_unknown(self):
2303
# Provide a list with one test - this test.
2304
# And generate a list of the tests in the suite.
2305
self.assertRaises(errors.NoSuchFile, self.run_selftest,
2306
load_list='missing file name', list_only=True)
2309
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2311
_test_needs_features = [features.subunit]
2313
def run_subunit_stream(self, test_name):
2314
from subunit import ProtocolTestCase
2317
return TestUtil.TestSuite([_get_test(test_name)])
2318
stream = self.run_selftest(
2319
runner_class=tests.SubUnitBzrRunnerv1,
2320
test_suite_factory=factory)
2321
test = ProtocolTestCase(stream)
2322
result = testtools.TestResult()
2324
content = stream.getvalue()
2325
return content, result
2327
def test_fail_has_log(self):
2328
content, result = self.run_subunit_stream('test_fail')
2329
self.assertEqual(1, len(result.failures))
2330
self.assertContainsRe(content, b'(?m)^log$')
2331
self.assertContainsRe(content, b'this test will fail')
2333
def test_error_has_log(self):
2334
content, result = self.run_subunit_stream('test_error')
2335
self.assertContainsRe(content, b'(?m)^log$')
2336
self.assertContainsRe(content, b'this test errored')
2338
def test_skip_has_no_log(self):
2339
content, result = self.run_subunit_stream('test_skip')
2340
self.assertNotContainsRe(content, b'(?m)^log$')
2341
self.assertNotContainsRe(content, b'this test will be skipped')
2342
reasons = result.skip_reasons
2343
self.assertEqual({'reason'}, set(reasons))
2344
skips = reasons['reason']
2345
self.assertEqual(1, len(skips))
2347
# RemotedTestCase doesn't preserve the "details"
2348
# self.assertFalse('log' in test.getDetails())
2350
def test_missing_feature_has_no_log(self):
2351
content, result = self.run_subunit_stream('test_missing_feature')
2352
self.assertNotContainsRe(content, b'(?m)^log$')
2353
self.assertNotContainsRe(content, b'missing the feature')
2354
reasons = result.skip_reasons
2355
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2356
skips = reasons['_MissingFeature\n']
2357
self.assertEqual(1, len(skips))
2359
# RemotedTestCase doesn't preserve the "details"
2360
# self.assertFalse('log' in test.getDetails())
2362
def test_xfail_has_no_log(self):
2363
content, result = self.run_subunit_stream('test_xfail')
2364
self.assertNotContainsRe(content, b'(?m)^log$')
2365
self.assertNotContainsRe(content, b'test with expected failure')
2366
self.assertEqual(1, len(result.expectedFailures))
2367
result_content = result.expectedFailures[0][1]
2368
self.assertNotContainsRe(result_content,
2369
'(?m)^(?:Text attachment: )?log(?:$|: )')
2370
self.assertNotContainsRe(result_content, 'test with expected failure')
2372
def test_unexpected_success_has_log(self):
2373
content, result = self.run_subunit_stream('test_unexpected_success')
2374
self.assertContainsRe(content, b'(?m)^log$')
2375
self.assertContainsRe(content, b'test with unexpected success')
2376
self.assertEqual(1, len(result.unexpectedSuccesses))
2377
# test = result.unexpectedSuccesses[0]
2378
# RemotedTestCase doesn't preserve the "details"
2379
# self.assertTrue('log' in test.getDetails())
2381
def test_success_has_no_log(self):
2382
content, result = self.run_subunit_stream('test_success')
2383
self.assertEqual(1, result.testsRun)
2384
self.assertNotContainsRe(content, b'(?m)^log$')
2385
self.assertNotContainsRe(content, b'this test succeeds')
2388
class TestRunBzr(tests.TestCase):
2394
def _run_bzr_core(self, argv, encoding=None, stdin=None,
2395
stdout=None, stderr=None, working_dir=None):
2396
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2398
Attempts to run bzr from inside this class don't actually run it.
2400
We test how run_bzr actually invokes bzr in another location. Here we
2401
only need to test that it passes the right parameters to run_bzr.
2403
self.argv = list(argv)
2404
self.encoding = encoding
2406
self.working_dir = working_dir
2407
stdout.write(self.out)
2408
stderr.write(self.err)
2411
def test_run_bzr_error(self):
2412
self.out = "It sure does!\n"
2414
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2415
self.assertEqual(['rocks'], self.argv)
2416
self.assertEqual('It sure does!\n', out)
2417
self.assertEqual(out, self.out)
2418
self.assertEqual('', err)
2419
self.assertEqual(err, self.err)
2421
def test_run_bzr_error_regexes(self):
2423
self.err = "bzr: ERROR: foobarbaz is not versioned"
2425
out, err = self.run_bzr_error(
2426
["bzr: ERROR: foobarbaz is not versioned"],
2427
['file-id', 'foobarbaz'])
2429
def test_encoding(self):
2430
"""Test that run_bzr passes encoding to _run_bzr_core"""
2431
self.run_bzr('foo bar')
2432
self.assertEqual(osutils.get_user_encoding(), self.encoding)
2433
self.assertEqual(['foo', 'bar'], self.argv)
2435
self.run_bzr('foo bar', encoding='baz')
2436
self.assertEqual('baz', self.encoding)
2437
self.assertEqual(['foo', 'bar'], self.argv)
2439
def test_stdin(self):
2440
# test that the stdin keyword to run_bzr is passed through to
2441
# _run_bzr_core as-is. We do this by overriding
2442
# _run_bzr_core in this class, and then calling run_bzr,
2443
# which is a convenience function for _run_bzr_core, so
2445
self.run_bzr('foo bar', stdin='gam')
2446
self.assertEqual('gam', self.stdin)
2447
self.assertEqual(['foo', 'bar'], self.argv)
2449
self.run_bzr('foo bar', stdin='zippy')
2450
self.assertEqual('zippy', self.stdin)
2451
self.assertEqual(['foo', 'bar'], self.argv)
2453
def test_working_dir(self):
2454
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2455
self.run_bzr('foo bar')
2456
self.assertEqual(None, self.working_dir)
2457
self.assertEqual(['foo', 'bar'], self.argv)
2459
self.run_bzr('foo bar', working_dir='baz')
2460
self.assertEqual('baz', self.working_dir)
2461
self.assertEqual(['foo', 'bar'], self.argv)
2463
def test_reject_extra_keyword_arguments(self):
2464
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2465
error_regex=['error message'])
2468
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2469
# Does IO when testing the working_dir parameter.
2471
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2472
a_callable=None, *args, **kwargs):
2474
self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None)
2475
self.factory = breezy.ui.ui_factory
2476
self.working_dir = osutils.getcwd()
2477
stdout.write('foo\n')
2478
stderr.write('bar\n')
2481
def test_stdin(self):
2482
# test that the stdin keyword to _run_bzr_core is passed through to
2483
# apply_redirected as a StringIO. We do this by overriding
2484
# apply_redirected in this class, and then calling _run_bzr_core,
2485
# which calls apply_redirected.
2486
self.run_bzr(['foo', 'bar'], stdin='gam')
2487
self.assertEqual('gam', self.stdin.read())
2488
self.assertTrue(self.stdin is self.factory_stdin)
2489
self.run_bzr(['foo', 'bar'], stdin='zippy')
2490
self.assertEqual('zippy', self.stdin.read())
2491
self.assertTrue(self.stdin is self.factory_stdin)
2493
def test_ui_factory(self):
2494
# each invocation of self.run_bzr should get its
2495
# own UI factory, which is an instance of TestUIFactory,
2496
# with stdin, stdout and stderr attached to the stdin,
2497
# stdout and stderr of the invoked run_bzr
2498
current_factory = breezy.ui.ui_factory
2499
self.run_bzr(['foo'])
2500
self.assertFalse(current_factory is self.factory)
2501
self.assertNotEqual(sys.stdout, self.factory.stdout)
2502
self.assertNotEqual(sys.stderr, self.factory.stderr)
2503
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2504
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2505
self.assertIsInstance(self.factory, tests.TestUIFactory)
2507
def test_working_dir(self):
2508
self.build_tree(['one/', 'two/'])
2509
cwd = osutils.getcwd()
2511
# Default is to work in the current directory
2512
self.run_bzr(['foo', 'bar'])
2513
self.assertEqual(cwd, self.working_dir)
2515
self.run_bzr(['foo', 'bar'], working_dir=None)
2516
self.assertEqual(cwd, self.working_dir)
2518
# The function should be run in the alternative directory
2519
# but afterwards the current working dir shouldn't be changed
2520
self.run_bzr(['foo', 'bar'], working_dir='one')
2521
self.assertNotEqual(cwd, self.working_dir)
2522
self.assertEndsWith(self.working_dir, 'one')
2523
self.assertEqual(cwd, osutils.getcwd())
2525
self.run_bzr(['foo', 'bar'], working_dir='two')
2526
self.assertNotEqual(cwd, self.working_dir)
2527
self.assertEndsWith(self.working_dir, 'two')
2528
self.assertEqual(cwd, osutils.getcwd())
2531
class StubProcess(object):
2532
"""A stub process for testing run_bzr_subprocess."""
2534
def __init__(self, out="", err="", retcode=0):
2537
self.returncode = retcode
2539
def communicate(self):
2540
return self.out, self.err
2543
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2544
"""Base class for tests testing how we might run bzr."""
2547
super(TestWithFakedStartBzrSubprocess, self).setUp()
2548
self.subprocess_calls = []
2550
def start_bzr_subprocess(self, process_args, env_changes=None,
2551
skip_if_plan_to_signal=False,
2553
allow_plugins=False):
2554
"""capture what run_bzr_subprocess tries to do."""
2555
self.subprocess_calls.append(
2556
{'process_args': process_args,
2557
'env_changes': env_changes,
2558
'skip_if_plan_to_signal': skip_if_plan_to_signal,
2559
'working_dir': working_dir, 'allow_plugins': allow_plugins})
2560
return self.next_subprocess
2563
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2565
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2566
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2568
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2569
that will return static results. This assertion method populates those
2570
results and also checks the arguments run_bzr_subprocess generates.
2572
self.next_subprocess = process
2574
result = self.run_bzr_subprocess(*args, **kwargs)
2575
except BaseException:
2576
self.next_subprocess = None
2577
for key, expected in expected_args.items():
2578
self.assertEqual(expected, self.subprocess_calls[-1][key])
2581
self.next_subprocess = None
2582
for key, expected in expected_args.items():
2583
self.assertEqual(expected, self.subprocess_calls[-1][key])
2586
def test_run_bzr_subprocess(self):
2587
"""The run_bzr_helper_external command behaves nicely."""
2588
self.assertRunBzrSubprocess({'process_args': ['--version']},
2589
StubProcess(), '--version')
2590
self.assertRunBzrSubprocess({'process_args': ['--version']},
2591
StubProcess(), ['--version'])
2592
# retcode=None disables retcode checking
2593
result = self.assertRunBzrSubprocess(
2594
{}, StubProcess(retcode=3), '--version', retcode=None)
2595
result = self.assertRunBzrSubprocess(
2596
{}, StubProcess(out="is free software"), '--version')
2597
self.assertContainsRe(result[0], 'is free software')
2598
# Running a subcommand that is missing errors
2599
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2600
{'process_args': ['--versionn']
2601
}, StubProcess(retcode=3),
2603
# Unless it is told to expect the error from the subprocess
2604
result = self.assertRunBzrSubprocess(
2605
{}, StubProcess(retcode=3), '--versionn', retcode=3)
2606
# Or to ignore retcode checking
2607
result = self.assertRunBzrSubprocess(
2608
{}, StubProcess(err="unknown command", retcode=3),
2609
'--versionn', retcode=None)
2610
self.assertContainsRe(result[1], 'unknown command')
2612
def test_env_change_passes_through(self):
2613
self.assertRunBzrSubprocess(
2614
{'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}},
2616
env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None})
2618
def test_no_working_dir_passed_as_None(self):
2619
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2621
def test_no_working_dir_passed_through(self):
2622
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2625
def test_run_bzr_subprocess_no_plugins(self):
2626
self.assertRunBzrSubprocess({'allow_plugins': False},
2629
def test_allow_plugins(self):
2630
self.assertRunBzrSubprocess({'allow_plugins': True},
2631
StubProcess(), '', allow_plugins=True)
2634
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2636
def test_finish_bzr_subprocess_with_error(self):
2637
"""finish_bzr_subprocess allows specification of the desired exit code.
2639
process = StubProcess(err="unknown command", retcode=3)
2640
result = self.finish_bzr_subprocess(process, retcode=3)
2641
self.assertEqual('', result[0])
2642
self.assertContainsRe(result[1], 'unknown command')
2644
def test_finish_bzr_subprocess_ignoring_retcode(self):
2645
"""finish_bzr_subprocess allows the exit code to be ignored."""
2646
process = StubProcess(err="unknown command", retcode=3)
2647
result = self.finish_bzr_subprocess(process, retcode=None)
2648
self.assertEqual('', result[0])
2649
self.assertContainsRe(result[1], 'unknown command')
2651
def test_finish_subprocess_with_unexpected_retcode(self):
2652
"""finish_bzr_subprocess raises self.failureException if the retcode is
2653
not the expected one.
2655
process = StubProcess(err="unknown command", retcode=3)
2656
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2660
class _DontSpawnProcess(Exception):
2661
"""A simple exception which just allows us to skip unnecessary steps"""
2664
class TestStartBzrSubProcess(tests.TestCase):
2665
"""Stub test start_bzr_subprocess."""
2667
def _subprocess_log_cleanup(self):
2668
"""Inhibits the base version as we don't produce a log file."""
2670
def _popen(self, *args, **kwargs):
2671
"""Override the base version to record the command that is run.
2673
From there we can ensure it is correct without spawning a real process.
2675
self.check_popen_state()
2676
self._popen_args = args
2677
self._popen_kwargs = kwargs
2678
raise _DontSpawnProcess()
2680
def check_popen_state(self):
2681
"""Replace to make assertions when popen is called."""
2683
def test_run_bzr_subprocess_no_plugins(self):
2684
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2685
command = self._popen_args[0]
2686
self.assertEqual(sys.executable, command[0])
2687
self.assertEqual(self.get_brz_path(), command[1])
2688
self.assertEqual(['--no-plugins'], command[2:])
2690
def test_allow_plugins(self):
2691
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2693
command = self._popen_args[0]
2694
self.assertEqual([], command[2:])
2696
def test_set_env(self):
2697
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2700
def check_environment():
2701
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2702
self.check_popen_state = check_environment
2703
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2704
env_changes={'EXISTANT_ENV_VAR': 'set variable'})
2705
# not set in theparent
2706
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2708
def test_run_bzr_subprocess_env_del(self):
2709
"""run_bzr_subprocess can remove environment variables too."""
2710
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2712
def check_environment():
2713
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2714
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2715
self.check_popen_state = check_environment
2716
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2717
env_changes={'EXISTANT_ENV_VAR': None})
2718
# Still set in parent
2719
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2720
del os.environ['EXISTANT_ENV_VAR']
2722
def test_env_del_missing(self):
2723
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2725
def check_environment():
2726
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2727
self.check_popen_state = check_environment
2728
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2729
env_changes={'NON_EXISTANT_ENV_VAR': None})
2731
def test_working_dir(self):
2732
"""Test that we can specify the working dir for the child"""
2737
self.overrideAttr(os, 'chdir', chdir)
2741
self.overrideAttr(osutils, 'getcwd', getcwd)
2742
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2744
self.assertEqual(['foo', 'current'], chdirs)
2746
def test_get_brz_path_with_cwd_breezy(self):
2747
self.get_source_path = lambda: ""
2748
self.overrideAttr(os.path, "isfile", lambda path: True)
2749
self.assertEqual(self.get_brz_path(), "brz")
2752
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2753
"""Tests that really need to do things with an external bzr."""
2755
def test_start_and_stop_bzr_subprocess_send_signal(self):
2756
"""finish_bzr_subprocess raises self.failureException if the retcode is
2757
not the expected one.
2759
self.disable_missing_extensions_warning()
2760
process = self.start_bzr_subprocess(['wait-until-signalled'],
2761
skip_if_plan_to_signal=True)
2762
self.assertEqual(b'running\n', process.stdout.readline())
2763
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2765
self.assertEqual(b'', result[0])
2766
self.assertEqual(b'brz: interrupted\n', result[1])
2769
class TestSelftestFiltering(tests.TestCase):
2772
super(TestSelftestFiltering, self).setUp()
2773
self.suite = TestUtil.TestSuite()
2774
self.loader = TestUtil.TestLoader()
2775
self.suite.addTest(self.loader.loadTestsFromModule(
2776
sys.modules['breezy.tests.test_selftest']))
2777
self.all_names = _test_ids(self.suite)
2779
def test_condition_id_re(self):
2780
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2781
'test_condition_id_re')
2782
filtered_suite = tests.filter_suite_by_condition(
2783
self.suite, tests.condition_id_re('test_condition_id_re'))
2784
self.assertEqual([test_name], _test_ids(filtered_suite))
2786
def test_condition_id_in_list(self):
2787
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2788
'test_condition_id_in_list']
2789
id_list = tests.TestIdList(test_names)
2790
filtered_suite = tests.filter_suite_by_condition(
2791
self.suite, tests.condition_id_in_list(id_list))
2792
my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
2793
re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
2794
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2796
def test_condition_id_startswith(self):
2797
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2798
start1 = klass + 'test_condition_id_starts'
2799
start2 = klass + 'test_condition_id_in'
2800
test_names = [klass + 'test_condition_id_in_list',
2801
klass + 'test_condition_id_startswith',
2803
filtered_suite = tests.filter_suite_by_condition(
2804
self.suite, tests.condition_id_startswith([start1, start2]))
2805
self.assertEqual(test_names, _test_ids(filtered_suite))
2807
def test_condition_isinstance(self):
2808
filtered_suite = tests.filter_suite_by_condition(
2809
self.suite, tests.condition_isinstance(self.__class__))
2810
class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2811
re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
2812
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2814
def test_exclude_tests_by_condition(self):
2815
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2816
'test_exclude_tests_by_condition')
2817
filtered_suite = tests.exclude_tests_by_condition(
2818
self.suite, lambda x: x.id() == excluded_name)
2819
self.assertEqual(len(self.all_names) - 1,
2820
filtered_suite.countTestCases())
2821
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2822
remaining_names = list(self.all_names)
2823
remaining_names.remove(excluded_name)
2824
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2826
def test_exclude_tests_by_re(self):
2827
self.all_names = _test_ids(self.suite)
2828
filtered_suite = tests.exclude_tests_by_re(self.suite,
2829
'exclude_tests_by_re')
2830
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2831
'test_exclude_tests_by_re')
2832
self.assertEqual(len(self.all_names) - 1,
2833
filtered_suite.countTestCases())
2834
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2835
remaining_names = list(self.all_names)
2836
remaining_names.remove(excluded_name)
2837
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2839
def test_filter_suite_by_condition(self):
2840
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2841
'test_filter_suite_by_condition')
2842
filtered_suite = tests.filter_suite_by_condition(
2843
self.suite, lambda x: x.id() == test_name)
2844
self.assertEqual([test_name], _test_ids(filtered_suite))
2846
def test_filter_suite_by_re(self):
2847
filtered_suite = tests.filter_suite_by_re(self.suite,
2848
'test_filter_suite_by_r')
2849
filtered_names = _test_ids(filtered_suite)
2851
filtered_names, ['breezy.tests.test_selftest.'
2852
'TestSelftestFiltering.test_filter_suite_by_re'])
2854
def test_filter_suite_by_id_list(self):
2855
test_list = ['breezy.tests.test_selftest.'
2856
'TestSelftestFiltering.test_filter_suite_by_id_list']
2857
filtered_suite = tests.filter_suite_by_id_list(
2858
self.suite, tests.TestIdList(test_list))
2859
filtered_names = _test_ids(filtered_suite)
2862
['breezy.tests.test_selftest.'
2863
'TestSelftestFiltering.test_filter_suite_by_id_list'])
2865
def test_filter_suite_by_id_startswith(self):
2866
# By design this test may fail if another test is added whose name also
2867
# begins with one of the start value used.
2868
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2869
start1 = klass + 'test_filter_suite_by_id_starts'
2870
start2 = klass + 'test_filter_suite_by_id_li'
2871
test_list = [klass + 'test_filter_suite_by_id_list',
2872
klass + 'test_filter_suite_by_id_startswith',
2874
filtered_suite = tests.filter_suite_by_id_startswith(
2875
self.suite, [start1, start2])
2878
_test_ids(filtered_suite),
2881
def test_preserve_input(self):
2882
# NB: Surely this is something in the stdlib to do this?
2883
self.assertIs(self.suite, tests.preserve_input(self.suite))
2884
self.assertEqual("@#$", tests.preserve_input("@#$"))
2886
def test_randomize_suite(self):
2887
randomized_suite = tests.randomize_suite(self.suite)
2888
# randomizing should not add or remove test names.
2889
self.assertEqual(set(_test_ids(self.suite)),
2890
set(_test_ids(randomized_suite)))
2891
# Technically, this *can* fail, because random.shuffle(list) can be
2892
# equal to list. Trying multiple times just pushes the frequency back.
2893
# As its len(self.all_names)!:1, the failure frequency should be low
2894
# enough to ignore. RBC 20071021.
2895
# It should change the order.
2896
self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
2897
# But not the length. (Possibly redundant with the set test, but not
2899
self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
2901
def test_split_suit_by_condition(self):
2902
self.all_names = _test_ids(self.suite)
2903
condition = tests.condition_id_re('test_filter_suite_by_r')
2904
split_suite = tests.split_suite_by_condition(self.suite, condition)
2905
filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2906
'test_filter_suite_by_re')
2907
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2908
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2909
remaining_names = list(self.all_names)
2910
remaining_names.remove(filtered_name)
2911
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2913
def test_split_suit_by_re(self):
2914
self.all_names = _test_ids(self.suite)
2915
split_suite = tests.split_suite_by_re(self.suite,
2916
'test_filter_suite_by_r')
2917
filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2918
'test_filter_suite_by_re')
2919
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2920
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2921
remaining_names = list(self.all_names)
2922
remaining_names.remove(filtered_name)
2923
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2926
class TestCheckTreeShape(tests.TestCaseWithTransport):
2928
def test_check_tree_shape(self):
2929
files = ['a', 'b/', 'b/c']
2930
tree = self.make_branch_and_tree('.')
2931
self.build_tree(files)
2935
self.check_tree_shape(tree, files)
2940
class TestBlackboxSupport(tests.TestCase):
2941
"""Tests for testsuite blackbox features."""
2943
def test_run_bzr_failure_not_caught(self):
2944
# When we run bzr in blackbox mode, we want any unexpected errors to
2945
# propagate up to the test suite so that it can show the error in the
2946
# usual way, and we won't get a double traceback.
2947
e = self.assertRaises(
2949
self.run_bzr, ['assert-fail'])
2950
# make sure we got the real thing, not an error from somewhere else in
2951
# the test framework
2952
self.assertEqual('always fails', str(e))
2953
# check that there's no traceback in the test log
2954
self.assertNotContainsRe(self.get_log(), r'Traceback')
2956
def test_run_bzr_user_error_caught(self):
2957
# Running bzr in blackbox mode, normal/expected/user errors should be
2958
# caught in the regular way and turned into an error message plus exit
2960
transport_server = memory.MemoryServer()
2961
transport_server.start_server()
2962
self.addCleanup(transport_server.stop_server)
2963
url = transport_server.get_url()
2964
self.permit_url(url)
2965
out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2966
self.assertEqual(out, '')
2967
self.assertContainsRe(
2968
err, 'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
2971
class TestTestLoader(tests.TestCase):
2972
"""Tests for the test loader."""
2974
def _get_loader_and_module(self):
2975
"""Gets a TestLoader and a module with one test in it."""
2976
loader = TestUtil.TestLoader()
2979
class Stub(tests.TestCase):
2983
class MyModule(object):
2985
MyModule.a_class = Stub
2987
module.__name__ = 'fake_module'
2988
return loader, module
2990
def test_module_no_load_tests_attribute_loads_classes(self):
2991
loader, module = self._get_loader_and_module()
2992
self.assertEqual(1, loader.loadTestsFromModule(
2993
module).countTestCases())
2995
def test_module_load_tests_attribute_gets_called(self):
2996
loader, module = self._get_loader_and_module()
2998
def load_tests(loader, standard_tests, pattern):
2999
result = loader.suiteClass()
3000
for test in tests.iter_suite_tests(standard_tests):
3001
result.addTests([test, test])
3003
# add a load_tests() method which multiplies the tests from the module.
3004
module.__class__.load_tests = staticmethod(load_tests)
3006
2 * [str(module.a_class('test_foo'))],
3007
list(map(str, loader.loadTestsFromModule(module))))
3009
def test_load_tests_from_module_name_smoke_test(self):
3010
loader = TestUtil.TestLoader()
3011
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3012
self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
3015
def test_load_tests_from_module_name_with_bogus_module_name(self):
3016
loader = TestUtil.TestLoader()
3017
self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
3020
class TestTestIdList(tests.TestCase):
3022
def _create_id_list(self, test_list):
3023
return tests.TestIdList(test_list)
3025
def _create_suite(self, test_id_list):
3027
class Stub(tests.TestCase):
3031
def _create_test_id(id):
3034
suite = TestUtil.TestSuite()
3035
for id in test_id_list:
3036
t = Stub('test_foo')
3037
t.id = _create_test_id(id)
3041
def _test_ids(self, test_suite):
3042
"""Get the ids for the tests in a test suite."""
3043
return [t.id() for t in tests.iter_suite_tests(test_suite)]
3045
def test_empty_list(self):
3046
id_list = self._create_id_list([])
3047
self.assertEqual({}, id_list.tests)
3048
self.assertEqual({}, id_list.modules)
3050
def test_valid_list(self):
3051
id_list = self._create_id_list(
3052
['mod1.cl1.meth1', 'mod1.cl1.meth2',
3053
'mod1.func1', 'mod1.cl2.meth2',
3055
'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
3057
self.assertTrue(id_list.refers_to('mod1'))
3058
self.assertTrue(id_list.refers_to('mod1.submod1'))
3059
self.assertTrue(id_list.refers_to('mod1.submod2'))
3060
self.assertTrue(id_list.includes('mod1.cl1.meth1'))
3061
self.assertTrue(id_list.includes('mod1.submod1'))
3062
self.assertTrue(id_list.includes('mod1.func1'))
3064
def test_bad_chars_in_params(self):
3065
id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
3066
self.assertTrue(id_list.refers_to('mod1'))
3067
self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
3069
def test_module_used(self):
3070
id_list = self._create_id_list(['mod.class.meth'])
3071
self.assertTrue(id_list.refers_to('mod'))
3072
self.assertTrue(id_list.refers_to('mod.class'))
3073
self.assertTrue(id_list.refers_to('mod.class.meth'))
3075
def test_test_suite_matches_id_list_with_unknown(self):
3076
loader = TestUtil.TestLoader()
3077
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3078
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',
3080
not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
3081
self.assertEqual(['bogus'], not_found)
3082
self.assertEqual([], duplicates)
3084
def test_suite_matches_id_list_with_duplicates(self):
3085
loader = TestUtil.TestLoader()
3086
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3087
dupes = loader.suiteClass()
3088
for test in tests.iter_suite_tests(suite):
3090
dupes.addTest(test) # Add it again
3092
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing', ]
3093
not_found, duplicates = tests.suite_matches_id_list(
3095
self.assertEqual([], not_found)
3096
self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
3100
class TestTestSuite(tests.TestCase):
3102
def test__test_suite_testmod_names(self):
3103
# Test that a plausible list of test module names are returned
3104
# by _test_suite_testmod_names.
3105
test_list = tests._test_suite_testmod_names()
3107
'breezy.tests.blackbox',
3108
'breezy.tests.per_transport',
3109
'breezy.tests.test_selftest',
3113
def test__test_suite_modules_to_doctest(self):
3114
# Test that a plausible list of modules to doctest is returned
3115
# by _test_suite_modules_to_doctest.
3116
test_list = tests._test_suite_modules_to_doctest()
3118
# When docstrings are stripped, there are no modules to doctest
3119
self.assertEqual([], test_list)
3126
def test_test_suite(self):
3127
# test_suite() loads the entire test suite to operate. To avoid this
3128
# overhead, and yet still be confident that things are happening,
3129
# we temporarily replace two functions used by test_suite with
3130
# test doubles that supply a few sample tests to load, and check they
3134
def testmod_names():
3135
calls.append("testmod_names")
3137
'breezy.tests.blackbox.test_branch',
3138
'breezy.tests.per_transport',
3139
'breezy.tests.test_selftest',
3141
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3144
calls.append("modules_to_doctest")
3147
return ['breezy.timestamp']
3148
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3149
expected_test_list = [
3151
'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
3152
('breezy.tests.per_transport.TransportTests'
3153
'.test_abspath(LocalTransport,LocalURLServer)'),
3154
'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
3155
# plugins can't be tested that way since selftest may be run with
3158
if __doc__ is not None and not PY3:
3159
expected_test_list.extend([
3160
# modules_to_doctest
3161
'breezy.timestamp.format_highres_date',
3163
suite = tests.test_suite()
3165
self.assertEqual({"testmod_names"}, set(calls))
3167
self.assertEqual({"testmod_names", "modules_to_doctest"},
3169
self.assertSubset(expected_test_list, _test_ids(suite))
3171
def test_test_suite_list_and_start(self):
3172
# We cannot test this at the same time as the main load, because we
3173
# want to know that starting_with == None works. So a second load is
3174
# incurred - note that the starting_with parameter causes a partial
3175
# load rather than a full load so this test should be pretty quick.
3177
'breezy.tests.test_selftest.TestTestSuite.test_test_suite']
3178
suite = tests.test_suite(test_list,
3179
['breezy.tests.test_selftest.TestTestSuite'])
3180
# test_test_suite_list_and_start is not included
3181
self.assertEqual(test_list, _test_ids(suite))
3184
class TestLoadTestIdList(tests.TestCaseInTempDir):
3186
def _create_test_list_file(self, file_name, content):
3187
fl = open(file_name, 'wt')
3191
def test_load_unknown(self):
3192
self.assertRaises(errors.NoSuchFile,
3193
tests.load_test_id_list, 'i_do_not_exist')
3195
def test_load_test_list(self):
3196
test_list_fname = 'test.list'
3197
self._create_test_list_file(test_list_fname,
3198
'mod1.cl1.meth1\nmod2.cl2.meth2\n')
3199
tlist = tests.load_test_id_list(test_list_fname)
3200
self.assertEqual(2, len(tlist))
3201
self.assertEqual('mod1.cl1.meth1', tlist[0])
3202
self.assertEqual('mod2.cl2.meth2', tlist[1])
3204
def test_load_dirty_file(self):
3205
test_list_fname = 'test.list'
3206
self._create_test_list_file(test_list_fname,
3207
' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n'
3209
tlist = tests.load_test_id_list(test_list_fname)
3210
self.assertEqual(4, len(tlist))
3211
self.assertEqual('mod1.cl1.meth1', tlist[0])
3212
self.assertEqual('', tlist[1])
3213
self.assertEqual('mod2.cl2.meth2', tlist[2])
3214
self.assertEqual('bar baz', tlist[3])
3217
class TestFilteredByModuleTestLoader(tests.TestCase):
3219
def _create_loader(self, test_list):
3220
id_filter = tests.TestIdList(test_list)
3221
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3224
def test_load_tests(self):
3225
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3226
loader = self._create_loader(test_list)
3227
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3228
self.assertEqual(test_list, _test_ids(suite))
3230
def test_exclude_tests(self):
3231
test_list = ['bogus']
3232
loader = self._create_loader(test_list)
3233
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3234
self.assertEqual([], _test_ids(suite))
3237
class TestFilteredByNameStartTestLoader(tests.TestCase):
3239
def _create_loader(self, name_start):
3240
def needs_module(name):
3241
return name.startswith(name_start) or name_start.startswith(name)
3242
loader = TestUtil.FilteredByModuleTestLoader(needs_module)
3245
def test_load_tests(self):
3246
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3247
loader = self._create_loader('breezy.tests.test_samp')
3249
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3250
self.assertEqual(test_list, _test_ids(suite))
3252
def test_load_tests_inside_module(self):
3253
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3254
loader = self._create_loader('breezy.tests.test_sampler.Demo')
3256
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3257
self.assertEqual(test_list, _test_ids(suite))
3259
def test_exclude_tests(self):
3260
loader = self._create_loader('bogus')
3262
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3263
self.assertEqual([], _test_ids(suite))
3266
class TestTestPrefixRegistry(tests.TestCase):
3268
def _get_registry(self):
3269
tp_registry = tests.TestPrefixAliasRegistry()
3272
def test_register_new_prefix(self):
3273
tpr = self._get_registry()
3274
tpr.register('foo', 'fff.ooo.ooo')
3275
self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
3277
def test_register_existing_prefix(self):
3278
tpr = self._get_registry()
3279
tpr.register('bar', 'bbb.aaa.rrr')
3280
tpr.register('bar', 'bBB.aAA.rRR')
3281
self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
3282
self.assertThat(self.get_log(),
3283
DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3286
def test_get_unknown_prefix(self):
3287
tpr = self._get_registry()
3288
self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
3290
def test_resolve_prefix(self):
3291
tpr = self._get_registry()
3292
tpr.register('bar', 'bb.aa.rr')
3293
self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
3295
def test_resolve_unknown_alias(self):
3296
tpr = self._get_registry()
3297
self.assertRaises(errors.BzrCommandError,
3298
tpr.resolve_alias, 'I am not a prefix')
3300
def test_predefined_prefixes(self):
3301
tpr = tests.test_prefix_alias_registry
3302
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3303
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3304
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3305
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3306
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3307
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3310
class TestThreadLeakDetection(tests.TestCase):
3311
"""Ensure when tests leak threads we detect and report it"""
3313
class LeakRecordingResult(tests.ExtendedTestResult):
3315
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3318
def _report_thread_leak(self, test, leaks, alive):
3319
self.leaks.append((test, leaks))
3321
def test_testcase_without_addCleanups(self):
3322
"""Check old TestCase instances don't break with leak detection"""
3323
class Test(unittest.TestCase):
3326
result = self.LeakRecordingResult()
3328
result.startTestRun()
3330
result.stopTestRun()
3331
self.assertEqual(result._tests_leaking_threads_count, 0)
3332
self.assertEqual(result.leaks, [])
3334
def test_thread_leak(self):
3335
"""Ensure a thread that outlives the running of a test is reported
3337
Uses a thread that blocks on an event, and is started by the inner
3338
test case. As the thread outlives the inner case's run, it should be
3339
detected as a leak, but the event is then set so that the thread can
3340
be safely joined in cleanup so it's not leaked for real.
3342
event = threading.Event()
3343
thread = threading.Thread(name="Leaker", target=event.wait)
3345
class Test(tests.TestCase):
3346
def test_leak(self):
3348
result = self.LeakRecordingResult()
3349
test = Test("test_leak")
3350
self.addCleanup(thread.join)
3351
self.addCleanup(event.set)
3352
result.startTestRun()
3354
result.stopTestRun()
3355
self.assertEqual(result._tests_leaking_threads_count, 1)
3356
self.assertEqual(result._first_thread_leaker_id, test.id())
3357
self.assertEqual(result.leaks, [(test, {thread})])
3358
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3360
def test_multiple_leaks(self):
3361
"""Check multiple leaks are blamed on the test cases at fault
3363
Same concept as the previous test, but has one inner test method that
3364
leaks two threads, and one that doesn't leak at all.
3366
event = threading.Event()
3367
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3368
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3369
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3371
class Test(tests.TestCase):
3372
def test_first_leak(self):
3375
def test_second_no_leak(self):
3378
def test_third_leak(self):
3381
result = self.LeakRecordingResult()
3382
first_test = Test("test_first_leak")
3383
third_test = Test("test_third_leak")
3384
self.addCleanup(thread_a.join)
3385
self.addCleanup(thread_b.join)
3386
self.addCleanup(thread_c.join)
3387
self.addCleanup(event.set)
3388
result.startTestRun()
3390
[first_test, Test("test_second_no_leak"), third_test]
3392
result.stopTestRun()
3393
self.assertEqual(result._tests_leaking_threads_count, 2)
3394
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3395
self.assertEqual(result.leaks, [
3396
(first_test, {thread_b}),
3397
(third_test, {thread_a, thread_c})])
3398
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3401
class TestPostMortemDebugging(tests.TestCase):
3402
"""Check post mortem debugging works when tests fail or error"""
3404
class TracebackRecordingResult(tests.ExtendedTestResult):
3406
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3407
self.postcode = None
3409
def _post_mortem(self, tb=None):
3410
"""Record the code object at the end of the current traceback"""
3411
tb = tb or sys.exc_info()[2]
3414
while next is not None:
3417
self.postcode = tb.tb_frame.f_code
3419
def report_error(self, test, err):
3422
def report_failure(self, test, err):
3425
def test_location_unittest_error(self):
3426
"""Needs right post mortem traceback with erroring unittest case"""
3427
class Test(unittest.TestCase):
3430
result = self.TracebackRecordingResult()
3432
self.assertEqual(result.postcode, Test.runTest.__code__)
3434
def test_location_unittest_failure(self):
3435
"""Needs right post mortem traceback with failing unittest case"""
3436
class Test(unittest.TestCase):
3438
raise self.failureException
3439
result = self.TracebackRecordingResult()
3441
self.assertEqual(result.postcode, Test.runTest.__code__)
3443
def test_location_bt_error(self):
3444
"""Needs right post mortem traceback with erroring breezy.tests case"""
3445
class Test(tests.TestCase):
3446
def test_error(self):
3448
result = self.TracebackRecordingResult()
3449
Test("test_error").run(result)
3450
self.assertEqual(result.postcode, Test.test_error.__code__)
3452
def test_location_bt_failure(self):
3453
"""Needs right post mortem traceback with failing breezy.tests case"""
3454
class Test(tests.TestCase):
3455
def test_failure(self):
3456
raise self.failureException
3457
result = self.TracebackRecordingResult()
3458
Test("test_failure").run(result)
3459
self.assertEqual(result.postcode, Test.test_failure.__code__)
3461
def test_env_var_triggers_post_mortem(self):
3462
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3464
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3465
post_mortem_calls = []
3466
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3467
self.overrideEnv('BRZ_TEST_PDB', None)
3468
result._post_mortem(1)
3469
self.overrideEnv('BRZ_TEST_PDB', 'on')
3470
result._post_mortem(2)
3471
self.assertEqual([2], post_mortem_calls)
3474
class TestRunSuite(tests.TestCase):
3476
def test_runner_class(self):
3477
"""run_suite accepts and uses a runner_class keyword argument."""
3478
class Stub(tests.TestCase):
3481
suite = Stub("test_foo")
3484
class MyRunner(tests.TextTestRunner):
3485
def run(self, test):
3487
return tests.ExtendedTestResult(self.stream, self.descriptions,
3489
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3490
self.assertLength(1, calls)
3493
class _Selftest(object):
3494
"""Mixin for tests needing full selftest output"""
3496
def _inject_stream_into_subunit(self, stream):
3497
"""To be overridden by subclasses that run tests out of process"""
3499
def _run_selftest(self, **kwargs):
3502
sio = TextIOWrapper(bio, 'utf-8')
3504
sio = bio = StringIO()
3505
self._inject_stream_into_subunit(bio)
3506
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3508
return bio.getvalue()
3511
class _ForkedSelftest(_Selftest):
3512
"""Mixin for tests needing full selftest output with forked children"""
3514
_test_needs_features = [features.subunit]
3516
def _inject_stream_into_subunit(self, stream):
3517
"""Monkey-patch subunit so the extra output goes to stream not stdout
3519
Some APIs need rewriting so this kind of bogus hackery can be replaced
3520
by passing the stream param from run_tests down into ProtocolTestCase.
3522
from subunit import ProtocolTestCase
3523
_original_init = ProtocolTestCase.__init__
3525
def _init_with_passthrough(self, *args, **kwargs):
3526
_original_init(self, *args, **kwargs)
3527
self._passthrough = stream
3528
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3530
def _run_selftest(self, **kwargs):
3531
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3532
if getattr(os, "fork", None) is None:
3533
raise tests.TestNotApplicable("Platform doesn't support forking")
3534
# Make sure the fork code is actually invoked by claiming two cores
3535
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3536
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3537
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3540
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3541
"""Check operation of --parallel=fork selftest option"""
3543
def test_error_in_child_during_fork(self):
3544
"""Error in a forked child during test setup should get reported"""
3545
class Test(tests.TestCase):
3546
def testMethod(self):
3548
# We don't care what, just break something that a child will run
3549
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3550
out = self._run_selftest(test_suite_factory=Test)
3551
# Lines from the tracebacks of the two child processes may be mixed
3552
# together due to the way subunit parses and forwards the streams,
3553
# so permit extra lines between each part of the error output.
3554
self.assertContainsRe(out,
3557
b".+ in fork_for_tests\n"
3559
b"\\s*workaround_zealous_crypto_random\\(\\)\n"
3564
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3565
"""Check a test case still alive after being run emits a warning"""
3567
class Test(tests.TestCase):
3568
def test_pass(self):
3571
def test_self_ref(self):
3572
self.also_self = self.test_self_ref
3574
def test_skip(self):
3575
self.skipTest("Don't need")
3577
def _get_suite(self):
3578
return TestUtil.TestSuite([
3579
self.Test("test_pass"),
3580
self.Test("test_self_ref"),
3581
self.Test("test_skip"),
3584
def _run_selftest_with_suite(self, **kwargs):
3585
old_flags = tests.selftest_debug_flags
3586
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3587
gc_on = gc.isenabled()
3591
output = self._run_selftest(test_suite_factory=self._get_suite,
3596
tests.selftest_debug_flags = old_flags
3597
self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
3598
self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
3601
def test_testsuite(self):
3602
self._run_selftest_with_suite()
3604
def test_pattern(self):
3605
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3606
self.assertNotContainsRe(out, b"test_skip")
3608
def test_exclude_pattern(self):
3609
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3610
self.assertNotContainsRe(out, b"test_skip")
3612
def test_random_seed(self):
3613
self._run_selftest_with_suite(random_seed="now")
3615
def test_matching_tests_first(self):
3616
self._run_selftest_with_suite(matching_tests_first=True,
3617
pattern="test_self_ref$")
3619
def test_starting_with_and_exclude(self):
3620
out = self._run_selftest_with_suite(starting_with=["bt."],
3621
exclude_pattern="test_skip$")
3622
self.assertNotContainsRe(out, b"test_skip")
3624
def test_additonal_decorator(self):
3625
self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator])
3628
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3629
"""Check warnings from tests staying alive are emitted with subunit"""
3631
_test_needs_features = [features.subunit]
3633
def _run_selftest_with_suite(self, **kwargs):
3634
return TestUncollectedWarnings._run_selftest_with_suite(
3635
self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3638
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3639
"""Check warnings from tests staying alive are emitted when forking"""
3642
class TestEnvironHandling(tests.TestCase):
3644
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3645
self.assertFalse('MYVAR' in os.environ)
3646
self.overrideEnv('MYVAR', '42')
3647
# We use an embedded test to make sure we fix the _captureVar bug
3649
class Test(tests.TestCase):
3651
# The first call save the 42 value
3652
self.overrideEnv('MYVAR', None)
3653
self.assertEqual(None, os.environ.get('MYVAR'))
3654
# Make sure we can call it twice
3655
self.overrideEnv('MYVAR', None)
3656
self.assertEqual(None, os.environ.get('MYVAR'))
3658
result = tests.TextTestResult(output, 0, 1)
3659
Test('test_me').run(result)
3660
if not result.wasStrictlySuccessful():
3661
self.fail(output.getvalue())
3662
# We get our value back
3663
self.assertEqual('42', os.environ.get('MYVAR'))
3666
class TestIsolatedEnv(tests.TestCase):
3667
"""Test isolating tests from os.environ.
3669
Since we use tests that are already isolated from os.environ a bit of care
3670
should be taken when designing the tests to avoid bootstrap side-effects.
3671
The tests start an already clean os.environ which allow doing valid
3672
assertions about which variables are present or not and design tests around
3676
class ScratchMonkey(tests.TestCase):
3681
def test_basics(self):
3682
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3683
# for tests.TestCase.
3684
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3685
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3686
# Being part of isolated_environ, BRZ_HOME should not appear here
3687
self.assertFalse('BRZ_HOME' in os.environ)
3688
# Make sure we know the definition of LINES: part of os.environ for
3690
self.assertTrue('LINES' in tests.isolated_environ)
3691
self.assertEqual('25', tests.isolated_environ['LINES'])
3692
self.assertEqual('25', os.environ['LINES'])
3694
def test_injecting_unknown_variable(self):
3695
# BRZ_HOME is known to be absent from os.environ
3696
test = self.ScratchMonkey('test_me')
3697
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3698
self.assertEqual('foo', os.environ['BRZ_HOME'])
3699
tests.restore_os_environ(test)
3700
self.assertFalse('BRZ_HOME' in os.environ)
3702
def test_injecting_known_variable(self):
3703
test = self.ScratchMonkey('test_me')
3704
# LINES is known to be present in os.environ
3705
tests.override_os_environ(test, {'LINES': '42'})
3706
self.assertEqual('42', os.environ['LINES'])
3707
tests.restore_os_environ(test)
3708
self.assertEqual('25', os.environ['LINES'])
3710
def test_deleting_variable(self):
3711
test = self.ScratchMonkey('test_me')
3712
# LINES is known to be present in os.environ
3713
tests.override_os_environ(test, {'LINES': None})
3714
self.assertTrue('LINES' not in os.environ)
3715
tests.restore_os_environ(test)
3716
self.assertEqual('25', os.environ['LINES'])
3719
class TestDocTestSuiteIsolation(tests.TestCase):
3720
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3722
Since tests.TestCase alreay provides an isolation from os.environ, we use
3723
the clean environment as a base for testing. To precisely capture the
3724
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3727
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3728
not `os.environ` so each test overrides it to suit its needs.
3732
def get_doctest_suite_for_string(self, klass, string):
3733
class Finder(doctest.DocTestFinder):
3735
def find(*args, **kwargs):
3736
test = doctest.DocTestParser().get_doctest(
3737
string, {}, 'foo', 'foo.py', 0)
3740
suite = klass(test_finder=Finder())
3743
def run_doctest_suite_for_string(self, klass, string):
3744
suite = self.get_doctest_suite_for_string(klass, string)
3746
result = tests.TextTestResult(output, 0, 1)
3748
return result, output
3750
def assertDocTestStringSucceds(self, klass, string):
3751
result, output = self.run_doctest_suite_for_string(klass, string)
3752
if not result.wasStrictlySuccessful():
3753
self.fail(output.getvalue())
3755
def assertDocTestStringFails(self, klass, string):
3756
result, output = self.run_doctest_suite_for_string(klass, string)
3757
if result.wasStrictlySuccessful():
3758
self.fail(output.getvalue())
3760
def test_injected_variable(self):
3761
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3764
>>> os.environ['LINES']
3767
# doctest.DocTestSuite fails as it sees '25'
3768
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3769
# tests.DocTestSuite sees '42'
3770
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3772
def test_deleted_variable(self):
3773
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3776
>>> os.environ.get('LINES')
3778
# doctest.DocTestSuite fails as it sees '25'
3779
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3780
# tests.DocTestSuite sees None
3781
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3784
class TestSelftestExcludePatterns(tests.TestCase):
3787
super(TestSelftestExcludePatterns, self).setUp()
3788
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3790
def suite_factory(self, keep_only=None, starting_with=None):
3791
"""A test suite factory with only a few tests."""
3792
class Test(tests.TestCase):
3794
# We don't need the full class path
3795
return self._testMethodName
3805
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3807
def assertTestList(self, expected, *selftest_args):
3808
# We rely on setUp installing the right test suite factory so we can
3809
# test at the command level without loading the whole test suite
3810
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3811
actual = out.splitlines()
3812
self.assertEqual(expected, actual)
3814
def test_full_list(self):
3815
self.assertTestList(['a', 'b', 'c'])
3817
def test_single_exclude(self):
3818
self.assertTestList(['b', 'c'], '-x', 'a')
3820
def test_mutiple_excludes(self):
3821
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3824
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3826
_test_needs_features = [features.subunit]
3829
super(TestCounterHooks, self).setUp()
3831
class Test(tests.TestCase):
3834
super(Test, self).setUp()
3835
self.hooks = hooks.Hooks()
3836
self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3837
self.install_counter_hook(self.hooks, 'myhook')
3842
def run_hook_once(self):
3843
for hook in self.hooks['myhook']:
3846
self.test_class = Test
3848
def assertHookCalls(self, expected_calls, test_name):
3849
test = self.test_class(test_name)
3850
result = unittest.TestResult()
3852
self.assertTrue(hasattr(test, '_counters'))
3853
self.assertTrue('myhook' in test._counters)
3854
self.assertEqual(expected_calls, test._counters['myhook'])
3856
def test_no_hook(self):
3857
self.assertHookCalls(0, 'no_hook')
3859
def test_run_hook_once(self):
3860
tt = features.testtools
3861
if tt.module.__version__ < (0, 9, 8):
3862
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3863
self.assertHookCalls(1, 'run_hook_once')