1
# Copyright (C) 2005-2013, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the test framework."""
29
from testtools import (
30
ExtendedToOriginalDecorator,
33
from testtools.content import Content
34
from testtools.content_type import ContentType
35
from testtools.matchers import (
39
import testtools.testresult.doubles
65
from ..sixish import (
69
from ..symbol_versioning import (
80
from ..trace import note, mutter
81
from ..transport import memory
84
def _test_ids(test_suite):
85
"""Get the ids for the tests in a test suite."""
86
return [t.id() for t in tests.iter_suite_tests(test_suite)]
89
class MetaTestLog(tests.TestCase):
91
def test_logging(self):
92
"""Test logs are captured when a test fails."""
93
self.log('a test message')
94
details = self.getDetails()
96
self.assertThat(log.content_type, Equals(ContentType(
97
"text", "plain", {"charset": "utf8"})))
98
self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
99
self.assertThat(self.get_log(),
100
DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
103
class TestTreeShape(tests.TestCaseInTempDir):
105
def test_unicode_paths(self):
106
self.requireFeature(features.UnicodeFilenameFeature)
108
filename = u'hell\u00d8'
109
self.build_tree_contents([(filename, b'contents of hello')])
110
self.assertPathExists(filename)
113
class TestClassesAvailable(tests.TestCase):
114
"""As a convenience we expose Test* classes from breezy.tests"""
116
def test_test_case(self):
117
from . import TestCase
119
def test_test_loader(self):
120
from . import TestLoader
122
def test_test_suite(self):
123
from . import TestSuite
126
class TestTransportScenarios(tests.TestCase):
127
"""A group of tests that test the transport implementation adaption core.
129
This is a meta test that the tests are applied to all available
132
This will be generalised in the future which is why it is in this
133
test file even though it is specific to transport tests at the moment.
136
def test_get_transport_permutations(self):
137
# this checks that get_test_permutations defined by the module is
138
# called by the get_transport_test_permutations function.
139
class MockModule(object):
140
def get_test_permutations(self):
141
return sample_permutation
142
sample_permutation = [(1, 2), (3, 4)]
143
from .per_transport import get_transport_test_permutations
144
self.assertEqual(sample_permutation,
145
get_transport_test_permutations(MockModule()))
147
def test_scenarios_include_all_modules(self):
148
# this checks that the scenario generator returns as many permutations
149
# as there are in all the registered transport modules - we assume if
150
# this matches its probably doing the right thing especially in
151
# combination with the tests for setting the right classes below.
152
from .per_transport import transport_test_permutations
153
from ..transport import _get_transport_modules
154
modules = _get_transport_modules()
155
permutation_count = 0
156
for module in modules:
158
permutation_count += len(reduce(getattr,
159
(module + ".get_test_permutations").split('.')[1:],
160
__import__(module))())
161
except errors.DependencyNotPresent:
163
scenarios = transport_test_permutations()
164
self.assertEqual(permutation_count, len(scenarios))
166
def test_scenarios_include_transport_class(self):
167
# This test used to know about all the possible transports and the
168
# order they were returned but that seems overly brittle (mbp
170
from .per_transport import transport_test_permutations
171
scenarios = transport_test_permutations()
172
# there are at least that many builtin transports
173
self.assertTrue(len(scenarios) > 6)
174
one_scenario = scenarios[0]
175
self.assertIsInstance(one_scenario[0], str)
176
self.assertTrue(issubclass(one_scenario[1]["transport_class"],
177
breezy.transport.Transport))
178
self.assertTrue(issubclass(one_scenario[1]["transport_server"],
179
breezy.transport.Server))
182
class TestBranchScenarios(tests.TestCase):
184
def test_scenarios(self):
185
# check that constructor parameters are passed through to the adapted
187
from .per_branch import make_scenarios
190
formats = [("c", "C"), ("d", "D")]
191
scenarios = make_scenarios(server1, server2, formats)
192
self.assertEqual(2, len(scenarios))
195
{'branch_format': 'c',
196
'bzrdir_format': 'C',
197
'transport_readonly_server': 'b',
198
'transport_server': 'a'}),
200
{'branch_format': 'd',
201
'bzrdir_format': 'D',
202
'transport_readonly_server': 'b',
203
'transport_server': 'a'})],
207
class TestBzrDirScenarios(tests.TestCase):
209
def test_scenarios(self):
210
# check that constructor parameters are passed through to the adapted
212
from .per_controldir import make_scenarios
217
scenarios = make_scenarios(vfs_factory, server1, server2, formats)
220
{'bzrdir_format': 'c',
221
'transport_readonly_server': 'b',
222
'transport_server': 'a',
223
'vfs_transport_factory': 'v'}),
225
{'bzrdir_format': 'd',
226
'transport_readonly_server': 'b',
227
'transport_server': 'a',
228
'vfs_transport_factory': 'v'})],
232
class TestRepositoryScenarios(tests.TestCase):
234
def test_formats_to_scenarios(self):
235
from .per_repository import formats_to_scenarios
236
formats = [("(c)", remote.RemoteRepositoryFormat()),
237
("(d)", repository.format_registry.get(
238
b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
239
no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
241
vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
242
vfs_transport_factory="vfs")
243
# no_vfs generate scenarios without vfs_transport_factory
245
('RemoteRepositoryFormat(c)',
246
{'bzrdir_format': remote.RemoteBzrDirFormat(),
247
'repository_format': remote.RemoteRepositoryFormat(),
248
'transport_readonly_server': 'readonly',
249
'transport_server': 'server'}),
250
('RepositoryFormat2a(d)',
251
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
252
'repository_format': groupcompress_repo.RepositoryFormat2a(),
253
'transport_readonly_server': 'readonly',
254
'transport_server': 'server'})]
255
self.assertEqual(expected, no_vfs_scenarios)
257
('RemoteRepositoryFormat(c)',
258
{'bzrdir_format': remote.RemoteBzrDirFormat(),
259
'repository_format': remote.RemoteRepositoryFormat(),
260
'transport_readonly_server': 'readonly',
261
'transport_server': 'server',
262
'vfs_transport_factory': 'vfs'}),
263
('RepositoryFormat2a(d)',
264
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
265
'repository_format': groupcompress_repo.RepositoryFormat2a(),
266
'transport_readonly_server': 'readonly',
267
'transport_server': 'server',
268
'vfs_transport_factory': 'vfs'})],
272
class TestTestScenarioApplication(tests.TestCase):
273
"""Tests for the test adaption facilities."""
275
def test_apply_scenario(self):
276
from breezy.tests import apply_scenario
277
input_test = TestTestScenarioApplication("test_apply_scenario")
278
# setup two adapted tests
279
adapted_test1 = apply_scenario(input_test,
281
{"bzrdir_format":"bzr_format",
282
"repository_format":"repo_fmt",
283
"transport_server":"transport_server",
284
"transport_readonly_server":"readonly-server"}))
285
adapted_test2 = apply_scenario(input_test,
286
("new id 2", {"bzrdir_format":None}))
287
# input_test should have been altered.
288
self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
289
# the new tests are mutually incompatible, ensuring it has
290
# made new ones, and unspecified elements in the scenario
291
# should not have been altered.
292
self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
293
self.assertEqual("repo_fmt", adapted_test1.repository_format)
294
self.assertEqual("transport_server", adapted_test1.transport_server)
295
self.assertEqual("readonly-server",
296
adapted_test1.transport_readonly_server)
298
"breezy.tests.test_selftest.TestTestScenarioApplication."
299
"test_apply_scenario(new id)",
301
self.assertEqual(None, adapted_test2.bzrdir_format)
303
"breezy.tests.test_selftest.TestTestScenarioApplication."
304
"test_apply_scenario(new id 2)",
308
class TestInterRepositoryScenarios(tests.TestCase):
310
def test_scenarios(self):
311
# check that constructor parameters are passed through to the adapted
313
from .per_interrepository import make_scenarios
316
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
317
scenarios = make_scenarios(server1, server2, formats)
320
{'repository_format': 'C1',
321
'repository_format_to': 'C2',
322
'transport_readonly_server': 'b',
323
'transport_server': 'a',
324
'extra_setup': 'C3'}),
326
{'repository_format': 'D1',
327
'repository_format_to': 'D2',
328
'transport_readonly_server': 'b',
329
'transport_server': 'a',
330
'extra_setup': 'D3'})],
334
class TestWorkingTreeScenarios(tests.TestCase):
336
def test_scenarios(self):
337
# check that constructor parameters are passed through to the adapted
339
from .per_workingtree import make_scenarios
342
formats = [workingtree_4.WorkingTreeFormat4(),
343
workingtree_3.WorkingTreeFormat3(),
344
workingtree_4.WorkingTreeFormat6()]
345
scenarios = make_scenarios(server1, server2, formats,
346
remote_server='c', remote_readonly_server='d',
347
remote_backing_server='e')
349
('WorkingTreeFormat4',
350
{'bzrdir_format': formats[0]._matchingcontroldir,
351
'transport_readonly_server': 'b',
352
'transport_server': 'a',
353
'workingtree_format': formats[0]}),
354
('WorkingTreeFormat3',
355
{'bzrdir_format': formats[1]._matchingcontroldir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[1]}),
359
('WorkingTreeFormat6',
360
{'bzrdir_format': formats[2]._matchingcontroldir,
361
'transport_readonly_server': 'b',
362
'transport_server': 'a',
363
'workingtree_format': formats[2]}),
364
('WorkingTreeFormat6,remote',
365
{'bzrdir_format': formats[2]._matchingcontroldir,
366
'repo_is_remote': True,
367
'transport_readonly_server': 'd',
368
'transport_server': 'c',
369
'vfs_transport_factory': 'e',
370
'workingtree_format': formats[2]}),
374
class TestTreeScenarios(tests.TestCase):
376
def test_scenarios(self):
377
# the tree implementation scenario generator is meant to setup one
378
# instance for each working tree format, one additional instance
379
# that will use the default wt format, but create a revision tree for
380
# the tests, and one more that uses the default wt format as a
381
# lightweight checkout of a remote repository. This means that the wt
382
# ones should have the workingtree_to_test_tree attribute set to
383
# 'return_parameter' and the revision one set to
384
# revision_tree_from_workingtree.
386
from .per_tree import (
387
_dirstate_tree_from_workingtree,
392
revision_tree_from_workingtree
396
smart_server = test_server.SmartTCPServer_for_testing
397
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
398
mem_server = memory.MemoryServer
399
formats = [workingtree_4.WorkingTreeFormat4(),
400
workingtree_3.WorkingTreeFormat3(),]
401
scenarios = make_scenarios(server1, server2, formats)
402
self.assertEqual(8, len(scenarios))
403
default_wt_format = workingtree.format_registry.get_default()
404
wt4_format = workingtree_4.WorkingTreeFormat4()
405
wt5_format = workingtree_4.WorkingTreeFormat5()
406
wt6_format = workingtree_4.WorkingTreeFormat6()
407
expected_scenarios = [
408
('WorkingTreeFormat4',
409
{'bzrdir_format': formats[0]._matchingcontroldir,
410
'transport_readonly_server': 'b',
411
'transport_server': 'a',
412
'workingtree_format': formats[0],
413
'_workingtree_to_test_tree': return_parameter,
415
('WorkingTreeFormat3',
416
{'bzrdir_format': formats[1]._matchingcontroldir,
417
'transport_readonly_server': 'b',
418
'transport_server': 'a',
419
'workingtree_format': formats[1],
420
'_workingtree_to_test_tree': return_parameter,
422
('WorkingTreeFormat6,remote',
423
{'bzrdir_format': wt6_format._matchingcontroldir,
424
'repo_is_remote': True,
425
'transport_readonly_server': smart_readonly_server,
426
'transport_server': smart_server,
427
'vfs_transport_factory': mem_server,
428
'workingtree_format': wt6_format,
429
'_workingtree_to_test_tree': return_parameter,
432
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
433
'bzrdir_format': default_wt_format._matchingcontroldir,
434
'transport_readonly_server': 'b',
435
'transport_server': 'a',
436
'workingtree_format': default_wt_format,
438
('DirStateRevisionTree,WT4',
439
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
440
'bzrdir_format': wt4_format._matchingcontroldir,
441
'transport_readonly_server': 'b',
442
'transport_server': 'a',
443
'workingtree_format': wt4_format,
445
('DirStateRevisionTree,WT5',
446
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
447
'bzrdir_format': wt5_format._matchingcontroldir,
448
'transport_readonly_server': 'b',
449
'transport_server': 'a',
450
'workingtree_format': wt5_format,
453
{'_workingtree_to_test_tree': preview_tree_pre,
454
'bzrdir_format': default_wt_format._matchingcontroldir,
455
'transport_readonly_server': 'b',
456
'transport_server': 'a',
457
'workingtree_format': default_wt_format}),
459
{'_workingtree_to_test_tree': preview_tree_post,
460
'bzrdir_format': default_wt_format._matchingcontroldir,
461
'transport_readonly_server': 'b',
462
'transport_server': 'a',
463
'workingtree_format': default_wt_format}),
465
self.assertEqual(expected_scenarios, scenarios)
468
class TestInterTreeScenarios(tests.TestCase):
469
"""A group of tests that test the InterTreeTestAdapter."""
471
def test_scenarios(self):
472
# check that constructor parameters are passed through to the adapted
474
# for InterTree tests we want the machinery to bring up two trees in
475
# each instance: the base one, and the one we are interacting with.
476
# because each optimiser can be direction specific, we need to test
477
# each optimiser in its chosen direction.
478
# unlike the TestProviderAdapter we dont want to automatically add a
479
# parameterized one for WorkingTree - the optimisers will tell us what
481
from .per_tree import (
484
from .per_intertree import (
487
from ..bzr.workingtree_3 import WorkingTreeFormat3
488
from ..bzr.workingtree_4 import WorkingTreeFormat4
489
input_test = TestInterTreeScenarios(
493
format1 = WorkingTreeFormat4()
494
format2 = WorkingTreeFormat3()
495
formats = [("1", str, format1, format2, "converter1"),
496
("2", int, format2, format1, "converter2")]
497
scenarios = make_scenarios(server1, server2, formats)
498
self.assertEqual(2, len(scenarios))
499
expected_scenarios = [
501
"bzrdir_format": format1._matchingcontroldir,
502
"intertree_class": formats[0][1],
503
"workingtree_format": formats[0][2],
504
"workingtree_format_to": formats[0][3],
505
"mutable_trees_to_test_trees": formats[0][4],
506
"_workingtree_to_test_tree": return_parameter,
507
"transport_server": server1,
508
"transport_readonly_server": server2,
511
"bzrdir_format": format2._matchingcontroldir,
512
"intertree_class": formats[1][1],
513
"workingtree_format": formats[1][2],
514
"workingtree_format_to": formats[1][3],
515
"mutable_trees_to_test_trees": formats[1][4],
516
"_workingtree_to_test_tree": return_parameter,
517
"transport_server": server1,
518
"transport_readonly_server": server2,
521
self.assertEqual(scenarios, expected_scenarios)
524
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
526
def test_home_is_not_working(self):
527
self.assertNotEqual(self.test_dir, self.test_home_dir)
528
cwd = osutils.getcwd()
529
self.assertIsSameRealPath(self.test_dir, cwd)
530
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
532
def test_assertEqualStat_equal(self):
533
from .test_dirstate import _FakeStat
534
self.build_tree(["foo"])
535
real = os.lstat("foo")
536
fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
537
real.st_dev, real.st_ino, real.st_mode)
538
self.assertEqualStat(real, fake)
540
def test_assertEqualStat_notequal(self):
541
self.build_tree(["foo", "longname"])
542
self.assertRaises(AssertionError, self.assertEqualStat,
543
os.lstat("foo"), os.lstat("longname"))
545
def test_assertPathExists(self):
546
self.assertPathExists('.')
547
self.build_tree(['foo/', 'foo/bar'])
548
self.assertPathExists('foo/bar')
549
self.assertPathDoesNotExist('foo/foo')
552
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
554
def test_home_is_non_existant_dir_under_root(self):
555
"""The test_home_dir for TestCaseWithMemoryTransport is missing.
557
This is because TestCaseWithMemoryTransport is for tests that do not
558
need any disk resources: they should be hooked into breezy in such a
559
way that no global settings are being changed by the test (only a
560
few tests should need to do that), and having a missing dir as home is
561
an effective way to ensure that this is the case.
563
self.assertIsSameRealPath(
564
self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
566
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
568
def test_cwd_is_TEST_ROOT(self):
569
self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
570
cwd = osutils.getcwd()
571
self.assertIsSameRealPath(self.test_dir, cwd)
573
def test_BRZ_HOME_and_HOME_are_bytestrings(self):
574
"""The $BRZ_HOME and $HOME environment variables should not be unicode.
576
See https://bugs.launchpad.net/bzr/+bug/464174
578
self.assertIsInstance(os.environ['BRZ_HOME'], str)
579
self.assertIsInstance(os.environ['HOME'], str)
581
def test_make_branch_and_memory_tree(self):
582
"""In TestCaseWithMemoryTransport we should not make the branch on disk.
584
This is hard to comprehensively robustly test, so we settle for making
585
a branch and checking no directory was created at its relpath.
587
tree = self.make_branch_and_memory_tree('dir')
588
# Guard against regression into MemoryTransport leaking
589
# files to disk instead of keeping them in memory.
590
self.assertFalse(osutils.lexists('dir'))
591
self.assertIsInstance(tree, memorytree.MemoryTree)
593
def test_make_branch_and_memory_tree_with_format(self):
594
"""make_branch_and_memory_tree should accept a format option."""
595
format = bzrdir.BzrDirMetaFormat1()
596
format.repository_format = repository.format_registry.get_default()
597
tree = self.make_branch_and_memory_tree('dir', format=format)
598
# Guard against regression into MemoryTransport leaking
599
# files to disk instead of keeping them in memory.
600
self.assertFalse(osutils.lexists('dir'))
601
self.assertIsInstance(tree, memorytree.MemoryTree)
602
self.assertEqual(format.repository_format.__class__,
603
tree.branch.repository._format.__class__)
605
def test_make_branch_builder(self):
606
builder = self.make_branch_builder('dir')
607
self.assertIsInstance(builder, branchbuilder.BranchBuilder)
608
# Guard against regression into MemoryTransport leaking
609
# files to disk instead of keeping them in memory.
610
self.assertFalse(osutils.lexists('dir'))
612
def test_make_branch_builder_with_format(self):
613
# Use a repo layout that doesn't conform to a 'named' layout, to ensure
614
# that the format objects are used.
615
format = bzrdir.BzrDirMetaFormat1()
616
repo_format = repository.format_registry.get_default()
617
format.repository_format = repo_format
618
builder = self.make_branch_builder('dir', format=format)
619
the_branch = builder.get_branch()
620
# Guard against regression into MemoryTransport leaking
621
# files to disk instead of keeping them in memory.
622
self.assertFalse(osutils.lexists('dir'))
623
self.assertEqual(format.repository_format.__class__,
624
the_branch.repository._format.__class__)
625
self.assertEqual(repo_format.get_format_string(),
626
self.get_transport().get_bytes(
627
'dir/.bzr/repository/format'))
629
def test_make_branch_builder_with_format_name(self):
630
builder = self.make_branch_builder('dir', format='knit')
631
the_branch = builder.get_branch()
632
# Guard against regression into MemoryTransport leaking
633
# files to disk instead of keeping them in memory.
634
self.assertFalse(osutils.lexists('dir'))
635
dir_format = controldir.format_registry.make_controldir('knit')
636
self.assertEqual(dir_format.repository_format.__class__,
637
the_branch.repository._format.__class__)
638
self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
639
self.get_transport().get_bytes(
640
'dir/.bzr/repository/format'))
642
def test_dangling_locks_cause_failures(self):
643
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
644
def test_function(self):
645
t = self.get_transport_from_path('.')
646
l = lockdir.LockDir(t, 'lock')
649
test = TestDanglingLock('test_function')
651
total_failures = result.errors + result.failures
652
if self._lock_check_thorough:
653
self.assertEqual(1, len(total_failures))
655
# When _lock_check_thorough is disabled, then we don't trigger a
657
self.assertEqual(0, len(total_failures))
660
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
661
"""Tests for the convenience functions TestCaseWithTransport introduces."""
663
def test_get_readonly_url_none(self):
664
from ..transport.readonly import ReadonlyTransportDecorator
665
self.vfs_transport_factory = memory.MemoryServer
666
self.transport_readonly_server = None
667
# calling get_readonly_transport() constructs a decorator on the url
669
url = self.get_readonly_url()
670
url2 = self.get_readonly_url('foo/bar')
671
t = transport.get_transport_from_url(url)
672
t2 = transport.get_transport_from_url(url2)
673
self.assertIsInstance(t, ReadonlyTransportDecorator)
674
self.assertIsInstance(t2, ReadonlyTransportDecorator)
675
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
677
def test_get_readonly_url_http(self):
678
from .http_server import HttpServer
679
from ..transport.http import HttpTransport
680
self.transport_server = test_server.LocalURLServer
681
self.transport_readonly_server = HttpServer
682
# calling get_readonly_transport() gives us a HTTP server instance.
683
url = self.get_readonly_url()
684
url2 = self.get_readonly_url('foo/bar')
685
# the transport returned may be any HttpTransportBase subclass
686
t = transport.get_transport_from_url(url)
687
t2 = transport.get_transport_from_url(url2)
688
self.assertIsInstance(t, HttpTransport)
689
self.assertIsInstance(t2, HttpTransport)
690
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
692
def test_is_directory(self):
693
"""Test assertIsDirectory assertion"""
694
t = self.get_transport()
695
self.build_tree(['a_dir/', 'a_file'], transport=t)
696
self.assertIsDirectory('a_dir', t)
697
self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
698
self.assertRaises(AssertionError, self.assertIsDirectory, 'not_here', t)
700
def test_make_branch_builder(self):
701
builder = self.make_branch_builder('dir')
702
rev_id = builder.build_commit()
703
self.assertPathExists('dir')
704
a_dir = controldir.ControlDir.open('dir')
705
self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
706
a_branch = a_dir.open_branch()
707
builder_branch = builder.get_branch()
708
self.assertEqual(a_branch.base, builder_branch.base)
709
self.assertEqual((1, rev_id), builder_branch.last_revision_info())
710
self.assertEqual((1, rev_id), a_branch.last_revision_info())
713
class TestTestCaseTransports(tests.TestCaseWithTransport):
716
super(TestTestCaseTransports, self).setUp()
717
self.vfs_transport_factory = memory.MemoryServer
719
def test_make_controldir_preserves_transport(self):
720
t = self.get_transport()
721
result_bzrdir = self.make_controldir('subdir')
722
self.assertIsInstance(result_bzrdir.transport,
723
memory.MemoryTransport)
724
# should not be on disk, should only be in memory
725
self.assertPathDoesNotExist('subdir')
728
class TestChrootedTest(tests.ChrootedTestCase):
730
def test_root_is_root(self):
731
t = transport.get_transport_from_url(self.get_readonly_url())
733
self.assertEqual(url, t.clone('..').base)
736
class TestProfileResult(tests.TestCase):
738
def test_profiles_tests(self):
739
self.requireFeature(features.lsprof_feature)
740
terminal = testtools.testresult.doubles.ExtendedTestResult()
741
result = tests.ProfileResult(terminal)
742
class Sample(tests.TestCase):
744
self.sample_function()
745
def sample_function(self):
749
case = terminal._events[0][1]
750
self.assertLength(1, case._benchcalls)
751
# We must be able to unpack it as the test reporting code wants
752
(_, _, _), stats = case._benchcalls[0]
753
self.assertTrue(callable(stats.pprint))
756
class TestTestResult(tests.TestCase):
758
def check_timing(self, test_case, expected_re):
759
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
760
capture = testtools.testresult.doubles.ExtendedTestResult()
761
test_case.run(MultiTestResult(result, capture))
762
run_case = capture._events[0][1]
763
timed_string = result._testTimeString(run_case)
764
self.assertContainsRe(timed_string, expected_re)
766
def test_test_reporting(self):
767
class ShortDelayTestCase(tests.TestCase):
768
def test_short_delay(self):
770
def test_short_benchmark(self):
771
self.time(time.sleep, 0.003)
772
self.check_timing(ShortDelayTestCase('test_short_delay'),
774
# if a benchmark time is given, we now show just that time followed by
776
self.check_timing(ShortDelayTestCase('test_short_benchmark'),
779
def test_unittest_reporting_unittest_class(self):
780
# getting the time from a non-breezy test works ok
781
class ShortDelayTestCase(unittest.TestCase):
782
def test_short_delay(self):
784
self.check_timing(ShortDelayTestCase('test_short_delay'),
787
def _time_hello_world_encoding(self):
788
"""Profile two sleep calls
790
This is used to exercise the test framework.
792
self.time(text_type, 'hello', errors='replace')
793
self.time(text_type, 'world', errors='replace')
795
def test_lsprofiling(self):
796
"""Verbose test result prints lsprof statistics from test cases."""
797
self.requireFeature(features.lsprof_feature)
798
result_stream = StringIO()
799
result = breezy.tests.VerboseTestResult(
804
# we want profile a call of some sort and check it is output by
805
# addSuccess. We dont care about addError or addFailure as they
806
# are not that interesting for performance tuning.
807
# make a new test instance that when run will generate a profile
808
example_test_case = TestTestResult("_time_hello_world_encoding")
809
example_test_case._gather_lsprof_in_benchmarks = True
810
# execute the test, which should succeed and record profiles
811
example_test_case.run(result)
812
# lsprofile_something()
813
# if this worked we want
814
# LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
815
# CallCount Recursive Total(ms) Inline(ms) module:lineno(function)
816
# (the lsprof header)
817
# ... an arbitrary number of lines
818
# and the function call which is time.sleep.
819
# 1 0 ??? ??? ???(sleep)
820
# and then repeated but with 'world', rather than 'hello'.
821
# this should appear in the output stream of our test result.
822
output = result_stream.getvalue()
823
self.assertContainsRe(output,
824
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
825
self.assertContainsRe(output,
826
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
827
self.assertContainsRe(output,
828
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
829
self.assertContainsRe(output,
830
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
832
def test_uses_time_from_testtools(self):
833
"""Test case timings in verbose results should use testtools times"""
835
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
836
def startTest(self, test):
837
self.time(datetime.datetime.utcfromtimestamp(1.145))
838
super(TimeAddedVerboseTestResult, self).startTest(test)
839
def addSuccess(self, test):
840
self.time(datetime.datetime.utcfromtimestamp(51.147))
841
super(TimeAddedVerboseTestResult, self).addSuccess(test)
842
def report_tests_starting(self): pass
844
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
845
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
847
def test_known_failure(self):
848
"""Using knownFailure should trigger several result actions."""
849
class InstrumentedTestResult(tests.ExtendedTestResult):
850
def stopTestRun(self): pass
851
def report_tests_starting(self): pass
852
def report_known_failure(self, test, err=None, details=None):
853
self._call = test, 'known failure'
854
result = InstrumentedTestResult(None, None, None, None)
855
class Test(tests.TestCase):
856
def test_function(self):
857
self.knownFailure('failed!')
858
test = Test("test_function")
860
# it should invoke 'report_known_failure'.
861
self.assertEqual(2, len(result._call))
862
self.assertEqual(test.id(), result._call[0].id())
863
self.assertEqual('known failure', result._call[1])
864
# we dont introspec the traceback, if the rest is ok, it would be
865
# exceptional for it not to be.
866
# it should update the known_failure_count on the object.
867
self.assertEqual(1, result.known_failure_count)
868
# the result should be successful.
869
self.assertTrue(result.wasSuccessful())
871
def test_verbose_report_known_failure(self):
872
# verbose test output formatting
873
result_stream = StringIO()
874
result = breezy.tests.VerboseTestResult(
879
_get_test("test_xfail").run(result)
880
self.assertContainsRe(result_stream.getvalue(),
881
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
882
"\\s*(?:Text attachment: )?reason"
887
def get_passing_test(self):
888
"""Return a test object that can't be run usefully."""
891
return unittest.FunctionTestCase(passing_test)
893
def test_add_not_supported(self):
894
"""Test the behaviour of invoking addNotSupported."""
895
class InstrumentedTestResult(tests.ExtendedTestResult):
896
def stopTestRun(self): pass
897
def report_tests_starting(self): pass
898
def report_unsupported(self, test, feature):
899
self._call = test, feature
900
result = InstrumentedTestResult(None, None, None, None)
901
test = SampleTestCase('_test_pass')
902
feature = features.Feature()
903
result.startTest(test)
904
result.addNotSupported(test, feature)
905
# it should invoke 'report_unsupported'.
906
self.assertEqual(2, len(result._call))
907
self.assertEqual(test, result._call[0])
908
self.assertEqual(feature, result._call[1])
909
# the result should be successful.
910
self.assertTrue(result.wasSuccessful())
911
# it should record the test against a count of tests not run due to
913
self.assertEqual(1, result.unsupported['Feature'])
914
# and invoking it again should increment that counter
915
result.addNotSupported(test, feature)
916
self.assertEqual(2, result.unsupported['Feature'])
918
def test_verbose_report_unsupported(self):
919
# verbose test output formatting
920
result_stream = StringIO()
921
result = breezy.tests.VerboseTestResult(
926
test = self.get_passing_test()
927
feature = features.Feature()
928
result.startTest(test)
929
prefix = len(result_stream.getvalue())
930
result.report_unsupported(test, feature)
931
output = result_stream.getvalue()[prefix:]
932
lines = output.splitlines()
933
# We don't check for the final '0ms' since it may fail on slow hosts
934
self.assertStartsWith(lines[0], 'NODEP')
935
self.assertEqual(lines[1],
936
" The feature 'Feature' is not available.")
938
def test_unavailable_exception(self):
939
"""An UnavailableFeature being raised should invoke addNotSupported."""
940
class InstrumentedTestResult(tests.ExtendedTestResult):
941
def stopTestRun(self): pass
942
def report_tests_starting(self): pass
943
def addNotSupported(self, test, feature):
944
self._call = test, feature
945
result = InstrumentedTestResult(None, None, None, None)
946
feature = features.Feature()
947
class Test(tests.TestCase):
948
def test_function(self):
949
raise tests.UnavailableFeature(feature)
950
test = Test("test_function")
952
# it should invoke 'addNotSupported'.
953
self.assertEqual(2, len(result._call))
954
self.assertEqual(test.id(), result._call[0].id())
955
self.assertEqual(feature, result._call[1])
956
# and not count as an error
957
self.assertEqual(0, result.error_count)
959
def test_strict_with_unsupported_feature(self):
960
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
961
test = self.get_passing_test()
962
feature = "Unsupported Feature"
963
result.addNotSupported(test, feature)
964
self.assertFalse(result.wasStrictlySuccessful())
965
self.assertEqual(None, result._extractBenchmarkTime(test))
967
def test_strict_with_known_failure(self):
968
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
969
test = _get_test("test_xfail")
971
self.assertFalse(result.wasStrictlySuccessful())
972
self.assertEqual(None, result._extractBenchmarkTime(test))
974
def test_strict_with_success(self):
975
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
976
test = self.get_passing_test()
977
result.addSuccess(test)
978
self.assertTrue(result.wasStrictlySuccessful())
979
self.assertEqual(None, result._extractBenchmarkTime(test))
981
def test_startTests(self):
982
"""Starting the first test should trigger startTests."""
983
class InstrumentedTestResult(tests.ExtendedTestResult):
985
def startTests(self): self.calls += 1
986
result = InstrumentedTestResult(None, None, None, None)
989
test = unittest.FunctionTestCase(test_function)
991
self.assertEqual(1, result.calls)
993
def test_startTests_only_once(self):
994
"""With multiple tests startTests should still only be called once"""
995
class InstrumentedTestResult(tests.ExtendedTestResult):
997
def startTests(self): self.calls += 1
998
result = InstrumentedTestResult(None, None, None, None)
999
suite = unittest.TestSuite([
1000
unittest.FunctionTestCase(lambda: None),
1001
unittest.FunctionTestCase(lambda: None)])
1003
self.assertEqual(1, result.calls)
1004
self.assertEqual(2, result.count)
1007
class TestRunner(tests.TestCase):
1009
def dummy_test(self):
1012
def run_test_runner(self, testrunner, test):
1013
"""Run suite in testrunner, saving global state and restoring it.
1015
This current saves and restores:
1016
TestCaseInTempDir.TEST_ROOT
1018
There should be no tests in this file that use
1019
breezy.tests.TextTestRunner without using this convenience method,
1020
because of our use of global state.
1022
old_root = tests.TestCaseInTempDir.TEST_ROOT
1024
tests.TestCaseInTempDir.TEST_ROOT = None
1025
return testrunner.run(test)
1027
tests.TestCaseInTempDir.TEST_ROOT = old_root
1029
def test_known_failure_failed_run(self):
1030
# run a test that generates a known failure which should be printed in
1031
# the final output when real failures occur.
1032
class Test(tests.TestCase):
1033
def known_failure_test(self):
1034
self.expectFailure('failed', self.assertTrue, False)
1035
test = unittest.TestSuite()
1036
test.addTest(Test("known_failure_test"))
1038
raise AssertionError('foo')
1039
test.addTest(unittest.FunctionTestCase(failing_test))
1041
runner = tests.TextTestRunner(stream=stream)
1042
result = self.run_test_runner(runner, test)
1043
lines = stream.getvalue().splitlines()
1044
self.assertContainsRe(stream.getvalue(),
1045
'(?sm)^brz selftest.*$'
1047
'^======================================================================\n'
1048
'^FAIL: failing_test\n'
1049
'^----------------------------------------------------------------------\n'
1050
'Traceback \\(most recent call last\\):\n'
1051
' .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1052
' raise AssertionError\\(\'foo\'\\)\n'
1054
'^----------------------------------------------------------------------\n'
1056
'FAILED \\(failures=1, known_failure_count=1\\)'
1059
def test_known_failure_ok_run(self):
1060
# run a test that generates a known failure which should be printed in
1062
class Test(tests.TestCase):
1063
def known_failure_test(self):
1064
self.knownFailure("Never works...")
1065
test = Test("known_failure_test")
1067
runner = tests.TextTestRunner(stream=stream)
1068
result = self.run_test_runner(runner, test)
1069
self.assertContainsRe(stream.getvalue(),
1072
'Ran 1 test in .*\n'
1074
'OK \\(known_failures=1\\)\n')
1076
def test_unexpected_success_bad(self):
1077
class Test(tests.TestCase):
1078
def test_truth(self):
1079
self.expectFailure("No absolute truth", self.assertTrue, True)
1080
runner = tests.TextTestRunner(stream=StringIO())
1081
result = self.run_test_runner(runner, Test("test_truth"))
1082
self.assertContainsRe(runner.stream.getvalue(),
1084
"FAIL: \\S+\\.test_truth\n"
1087
"\\s*(?:Text attachment: )?reason"
1093
"Ran 1 test in .*\n"
1095
"FAILED \\(failures=1\\)\n\\Z")
1097
def test_result_decorator(self):
1100
class LoggingDecorator(ExtendedToOriginalDecorator):
1101
def startTest(self, test):
1102
ExtendedToOriginalDecorator.startTest(self, test)
1103
calls.append('start')
1104
test = unittest.FunctionTestCase(lambda:None)
1106
runner = tests.TextTestRunner(stream=stream,
1107
result_decorators=[LoggingDecorator])
1108
result = self.run_test_runner(runner, test)
1109
self.assertLength(1, calls)
1111
def test_skipped_test(self):
1112
# run a test that is skipped, and check the suite as a whole still
1114
# skipping_test must be hidden in here so it's not run as a real test
1115
class SkippingTest(tests.TestCase):
1116
def skipping_test(self):
1117
raise tests.TestSkipped('test intentionally skipped')
1118
runner = tests.TextTestRunner(stream=StringIO())
1119
test = SkippingTest("skipping_test")
1120
result = self.run_test_runner(runner, test)
1121
self.assertTrue(result.wasSuccessful())
1123
def test_skipped_from_setup(self):
1125
class SkippedSetupTest(tests.TestCase):
1128
calls.append('setUp')
1129
self.addCleanup(self.cleanup)
1130
raise tests.TestSkipped('skipped setup')
1132
def test_skip(self):
1133
self.fail('test reached')
1136
calls.append('cleanup')
1138
runner = tests.TextTestRunner(stream=StringIO())
1139
test = SkippedSetupTest('test_skip')
1140
result = self.run_test_runner(runner, test)
1141
self.assertTrue(result.wasSuccessful())
1142
# Check if cleanup was called the right number of times.
1143
self.assertEqual(['setUp', 'cleanup'], calls)
1145
def test_skipped_from_test(self):
1147
class SkippedTest(tests.TestCase):
1150
super(SkippedTest, self).setUp()
1151
calls.append('setUp')
1152
self.addCleanup(self.cleanup)
1154
def test_skip(self):
1155
raise tests.TestSkipped('skipped test')
1158
calls.append('cleanup')
1160
runner = tests.TextTestRunner(stream=StringIO())
1161
test = SkippedTest('test_skip')
1162
result = self.run_test_runner(runner, test)
1163
self.assertTrue(result.wasSuccessful())
1164
# Check if cleanup was called the right number of times.
1165
self.assertEqual(['setUp', 'cleanup'], calls)
1167
def test_not_applicable(self):
1168
# run a test that is skipped because it's not applicable
1169
class Test(tests.TestCase):
1170
def not_applicable_test(self):
1171
raise tests.TestNotApplicable('this test never runs')
1173
runner = tests.TextTestRunner(stream=out, verbosity=2)
1174
test = Test("not_applicable_test")
1175
result = self.run_test_runner(runner, test)
1176
self.log(out.getvalue())
1177
self.assertTrue(result.wasSuccessful())
1178
self.assertTrue(result.wasStrictlySuccessful())
1179
self.assertContainsRe(out.getvalue(),
1180
r'(?m)not_applicable_test * N/A')
1181
self.assertContainsRe(out.getvalue(),
1182
r'(?m)^ this test never runs')
1184
def test_unsupported_features_listed(self):
1185
"""When unsupported features are encountered they are detailed."""
1186
class Feature1(features.Feature):
1187
def _probe(self): return False
1188
class Feature2(features.Feature):
1189
def _probe(self): return False
1190
# create sample tests
1191
test1 = SampleTestCase('_test_pass')
1192
test1._test_needs_features = [Feature1()]
1193
test2 = SampleTestCase('_test_pass')
1194
test2._test_needs_features = [Feature2()]
1195
test = unittest.TestSuite()
1199
runner = tests.TextTestRunner(stream=stream)
1200
result = self.run_test_runner(runner, test)
1201
lines = stream.getvalue().splitlines()
1204
"Missing feature 'Feature1' skipped 1 tests.",
1205
"Missing feature 'Feature2' skipped 1 tests.",
1209
def test_verbose_test_count(self):
1210
"""A verbose test run reports the right test count at the start"""
1211
suite = TestUtil.TestSuite([
1212
unittest.FunctionTestCase(lambda:None),
1213
unittest.FunctionTestCase(lambda:None)])
1214
self.assertEqual(suite.countTestCases(), 2)
1216
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1217
# Need to use the CountingDecorator as that's what sets num_tests
1218
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1219
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1221
def test_startTestRun(self):
1222
"""run should call result.startTestRun()"""
1224
class LoggingDecorator(ExtendedToOriginalDecorator):
1225
def startTestRun(self):
1226
ExtendedToOriginalDecorator.startTestRun(self)
1227
calls.append('startTestRun')
1228
test = unittest.FunctionTestCase(lambda:None)
1230
runner = tests.TextTestRunner(stream=stream,
1231
result_decorators=[LoggingDecorator])
1232
result = self.run_test_runner(runner, test)
1233
self.assertLength(1, calls)
1235
def test_stopTestRun(self):
1236
"""run should call result.stopTestRun()"""
1238
class LoggingDecorator(ExtendedToOriginalDecorator):
1239
def stopTestRun(self):
1240
ExtendedToOriginalDecorator.stopTestRun(self)
1241
calls.append('stopTestRun')
1242
test = unittest.FunctionTestCase(lambda:None)
1244
runner = tests.TextTestRunner(stream=stream,
1245
result_decorators=[LoggingDecorator])
1246
result = self.run_test_runner(runner, test)
1247
self.assertLength(1, calls)
1249
def test_unicode_test_output_on_ascii_stream(self):
1250
"""Showing results should always succeed even on an ascii console"""
1251
class FailureWithUnicode(tests.TestCase):
1252
def test_log_unicode(self):
1254
self.fail("Now print that log!")
1256
self.overrideAttr(osutils, "get_terminal_encoding",
1257
lambda trace=False: "ascii")
1258
result = self.run_test_runner(tests.TextTestRunner(stream=out),
1259
FailureWithUnicode("test_log_unicode"))
1260
self.assertContainsRe(out.getvalue(),
1261
"(?:Text attachment: )?log"
1263
"\\d+\\.\\d+ \\\\u2606"
1267
class SampleTestCase(tests.TestCase):
1269
def _test_pass(self):
1272
class _TestException(Exception):
1276
class TestTestCase(tests.TestCase):
1277
"""Tests that test the core breezy TestCase."""
1279
def test_assertLength_matches_empty(self):
1281
self.assertLength(0, a_list)
1283
def test_assertLength_matches_nonempty(self):
1285
self.assertLength(3, a_list)
1287
def test_assertLength_fails_different(self):
1289
self.assertRaises(AssertionError, self.assertLength, 1, a_list)
1291
def test_assertLength_shows_sequence_in_failure(self):
1293
exception = self.assertRaises(AssertionError, self.assertLength, 2,
1295
self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
1298
def test_base_setUp_not_called_causes_failure(self):
1299
class TestCaseWithBrokenSetUp(tests.TestCase):
1301
pass # does not call TestCase.setUp
1304
test = TestCaseWithBrokenSetUp('test_foo')
1305
result = unittest.TestResult()
1307
self.assertFalse(result.wasSuccessful())
1308
self.assertEqual(1, result.testsRun)
1310
def test_base_tearDown_not_called_causes_failure(self):
1311
class TestCaseWithBrokenTearDown(tests.TestCase):
1313
pass # does not call TestCase.tearDown
1316
test = TestCaseWithBrokenTearDown('test_foo')
1317
result = unittest.TestResult()
1319
self.assertFalse(result.wasSuccessful())
1320
self.assertEqual(1, result.testsRun)
1322
def test_debug_flags_sanitised(self):
1323
"""The breezy debug flags should be sanitised by setUp."""
1324
if 'allow_debug' in tests.selftest_debug_flags:
1325
raise tests.TestNotApplicable(
1326
'-Eallow_debug option prevents debug flag sanitisation')
1327
# we could set something and run a test that will check
1328
# it gets santised, but this is probably sufficient for now:
1329
# if someone runs the test with -Dsomething it will error.
1331
if self._lock_check_thorough:
1332
flags.add('strict_locks')
1333
self.assertEqual(flags, breezy.debug.debug_flags)
1335
def change_selftest_debug_flags(self, new_flags):
1336
self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
1338
def test_allow_debug_flag(self):
1339
"""The -Eallow_debug flag prevents breezy.debug.debug_flags from being
1340
sanitised (i.e. cleared) before running a test.
1342
self.change_selftest_debug_flags({'allow_debug'})
1343
breezy.debug.debug_flags = {'a-flag'}
1344
class TestThatRecordsFlags(tests.TestCase):
1345
def test_foo(nested_self):
1346
self.flags = set(breezy.debug.debug_flags)
1347
test = TestThatRecordsFlags('test_foo')
1348
test.run(self.make_test_result())
1350
if 'disable_lock_checks' not in tests.selftest_debug_flags:
1351
flags.add('strict_locks')
1352
self.assertEqual(flags, self.flags)
1354
def test_disable_lock_checks(self):
1355
"""The -Edisable_lock_checks flag disables thorough checks."""
1356
class TestThatRecordsFlags(tests.TestCase):
1357
def test_foo(nested_self):
1358
self.flags = set(breezy.debug.debug_flags)
1359
self.test_lock_check_thorough = nested_self._lock_check_thorough
1360
self.change_selftest_debug_flags(set())
1361
test = TestThatRecordsFlags('test_foo')
1362
test.run(self.make_test_result())
1363
# By default we do strict lock checking and thorough lock/unlock
1365
self.assertTrue(self.test_lock_check_thorough)
1366
self.assertEqual({'strict_locks'}, self.flags)
1367
# Now set the disable_lock_checks flag, and show that this changed.
1368
self.change_selftest_debug_flags({'disable_lock_checks'})
1369
test = TestThatRecordsFlags('test_foo')
1370
test.run(self.make_test_result())
1371
self.assertFalse(self.test_lock_check_thorough)
1372
self.assertEqual(set(), self.flags)
1374
def test_this_fails_strict_lock_check(self):
1375
class TestThatRecordsFlags(tests.TestCase):
1376
def test_foo(nested_self):
1377
self.flags1 = set(breezy.debug.debug_flags)
1378
self.thisFailsStrictLockCheck()
1379
self.flags2 = set(breezy.debug.debug_flags)
1380
# Make sure lock checking is active
1381
self.change_selftest_debug_flags(set())
1382
test = TestThatRecordsFlags('test_foo')
1383
test.run(self.make_test_result())
1384
self.assertEqual({'strict_locks'}, self.flags1)
1385
self.assertEqual(set(), self.flags2)
1387
def test_debug_flags_restored(self):
1388
"""The breezy debug flags should be restored to their original state
1389
after the test was run, even if allow_debug is set.
1391
self.change_selftest_debug_flags({'allow_debug'})
1392
# Now run a test that modifies debug.debug_flags.
1393
breezy.debug.debug_flags = {'original-state'}
1394
class TestThatModifiesFlags(tests.TestCase):
1396
breezy.debug.debug_flags = {'modified'}
1397
test = TestThatModifiesFlags('test_foo')
1398
test.run(self.make_test_result())
1399
self.assertEqual({'original-state'}, breezy.debug.debug_flags)
1401
def make_test_result(self):
1402
"""Get a test result that writes to a StringIO."""
1403
return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1405
def inner_test(self):
1406
# the inner child test
1409
def outer_child(self):
1410
# the outer child test
1412
self.inner_test = TestTestCase("inner_child")
1413
result = self.make_test_result()
1414
self.inner_test.run(result)
1415
note("outer finish")
1416
self.addCleanup(osutils.delete_any, self._log_file_name)
1418
def test_trace_nesting(self):
1419
# this tests that each test case nests its trace facility correctly.
1420
# we do this by running a test case manually. That test case (A)
1421
# should setup a new log, log content to it, setup a child case (B),
1422
# which should log independently, then case (A) should log a trailer
1424
# we do two nested children so that we can verify the state of the
1425
# logs after the outer child finishes is correct, which a bad clean
1426
# up routine in tearDown might trigger a fault in our test with only
1427
# one child, we should instead see the bad result inside our test with
1429
# the outer child test
1430
original_trace = breezy.trace._trace_file
1431
outer_test = TestTestCase("outer_child")
1432
result = self.make_test_result()
1433
outer_test.run(result)
1434
self.assertEqual(original_trace, breezy.trace._trace_file)
1436
def method_that_times_a_bit_twice(self):
1437
# call self.time twice to ensure it aggregates
1438
self.time(time.sleep, 0.007)
1439
self.time(time.sleep, 0.007)
1441
def test_time_creates_benchmark_in_result(self):
1442
"""Test that the TestCase.time() method accumulates a benchmark time."""
1443
sample_test = TestTestCase("method_that_times_a_bit_twice")
1444
output_stream = StringIO()
1445
result = breezy.tests.VerboseTestResult(
1449
sample_test.run(result)
1450
self.assertContainsRe(
1451
output_stream.getvalue(),
1454
def test_hooks_sanitised(self):
1455
"""The breezy hooks should be sanitised by setUp."""
1456
# Note this test won't fail with hooks that the core library doesn't
1457
# use - but it trigger with a plugin that adds hooks, so its still a
1458
# useful warning in that case.
1459
self.assertEqual(breezy.branch.BranchHooks(), breezy.branch.Branch.hooks)
1461
breezy.bzr.smart.server.SmartServerHooks(),
1462
breezy.bzr.smart.server.SmartTCPServer.hooks)
1464
breezy.commands.CommandHooks(), breezy.commands.Command.hooks)
1466
def test__gather_lsprof_in_benchmarks(self):
1467
"""When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1469
Each self.time() call is individually and separately profiled.
1471
self.requireFeature(features.lsprof_feature)
1472
# overrides the class member with an instance member so no cleanup
1474
self._gather_lsprof_in_benchmarks = True
1475
self.time(time.sleep, 0.000)
1476
self.time(time.sleep, 0.003)
1477
self.assertEqual(2, len(self._benchcalls))
1478
self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
1479
self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1480
self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats)
1481
self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats)
1482
del self._benchcalls[:]
1484
def test_knownFailure(self):
1485
"""Self.knownFailure() should raise a KnownFailure exception."""
1486
self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1488
def test_open_bzrdir_safe_roots(self):
1489
# even a memory transport should fail to open when its url isn't
1491
# Manually set one up (TestCase doesn't and shouldn't provide magic
1493
transport_server = memory.MemoryServer()
1494
transport_server.start_server()
1495
self.addCleanup(transport_server.stop_server)
1496
t = transport.get_transport_from_url(transport_server.get_url())
1497
controldir.ControlDir.create(t.base)
1498
self.assertRaises(errors.BzrError,
1499
controldir.ControlDir.open_from_transport, t)
1500
# But if we declare this as safe, we can open the bzrdir.
1501
self.permit_url(t.base)
1502
self._bzr_selftest_roots.append(t.base)
1503
controldir.ControlDir.open_from_transport(t)
1505
def test_requireFeature_available(self):
1506
"""self.requireFeature(available) is a no-op."""
1507
class Available(features.Feature):
1508
def _probe(self):return True
1509
feature = Available()
1510
self.requireFeature(feature)
1512
def test_requireFeature_unavailable(self):
1513
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1514
class Unavailable(features.Feature):
1515
def _probe(self):return False
1516
feature = Unavailable()
1517
self.assertRaises(tests.UnavailableFeature,
1518
self.requireFeature, feature)
1520
def test_run_no_parameters(self):
1521
test = SampleTestCase('_test_pass')
1524
def test_run_enabled_unittest_result(self):
1525
"""Test we revert to regular behaviour when the test is enabled."""
1526
test = SampleTestCase('_test_pass')
1527
class EnabledFeature(object):
1528
def available(self):
1530
test._test_needs_features = [EnabledFeature()]
1531
result = unittest.TestResult()
1533
self.assertEqual(1, result.testsRun)
1534
self.assertEqual([], result.errors)
1535
self.assertEqual([], result.failures)
1537
def test_run_disabled_unittest_result(self):
1538
"""Test our compatability for disabled tests with unittest results."""
1539
test = SampleTestCase('_test_pass')
1540
class DisabledFeature(object):
1541
def available(self):
1543
test._test_needs_features = [DisabledFeature()]
1544
result = unittest.TestResult()
1546
self.assertEqual(1, result.testsRun)
1547
self.assertEqual([], result.errors)
1548
self.assertEqual([], result.failures)
1550
def test_run_disabled_supporting_result(self):
1551
"""Test disabled tests behaviour with support aware results."""
1552
test = SampleTestCase('_test_pass')
1553
class DisabledFeature(object):
1554
def __eq__(self, other):
1555
return isinstance(other, DisabledFeature)
1556
def available(self):
1558
the_feature = DisabledFeature()
1559
test._test_needs_features = [the_feature]
1560
class InstrumentedTestResult(unittest.TestResult):
1562
unittest.TestResult.__init__(self)
1564
def startTest(self, test):
1565
self.calls.append(('startTest', test))
1566
def stopTest(self, test):
1567
self.calls.append(('stopTest', test))
1568
def addNotSupported(self, test, feature):
1569
self.calls.append(('addNotSupported', test, feature))
1570
result = InstrumentedTestResult()
1572
case = result.calls[0][1]
1574
('startTest', case),
1575
('addNotSupported', case, the_feature),
1580
def test_start_server_registers_url(self):
1581
transport_server = memory.MemoryServer()
1582
# A little strict, but unlikely to be changed soon.
1583
self.assertEqual([], self._bzr_selftest_roots)
1584
self.start_server(transport_server)
1585
self.assertSubset([transport_server.get_url()],
1586
self._bzr_selftest_roots)
1588
def test_assert_list_raises_on_generator(self):
1589
def generator_which_will_raise():
1590
# This will not raise until after the first yield
1592
raise _TestException()
1594
e = self.assertListRaises(_TestException, generator_which_will_raise)
1595
self.assertIsInstance(e, _TestException)
1597
e = self.assertListRaises(Exception, generator_which_will_raise)
1598
self.assertIsInstance(e, _TestException)
1600
def test_assert_list_raises_on_plain(self):
1601
def plain_exception():
1602
raise _TestException()
1605
e = self.assertListRaises(_TestException, plain_exception)
1606
self.assertIsInstance(e, _TestException)
1608
e = self.assertListRaises(Exception, plain_exception)
1609
self.assertIsInstance(e, _TestException)
1611
def test_assert_list_raises_assert_wrong_exception(self):
1612
class _NotTestException(Exception):
1615
def wrong_exception():
1616
raise _NotTestException()
1618
def wrong_exception_generator():
1621
raise _NotTestException()
1623
# Wrong exceptions are not intercepted
1624
self.assertRaises(_NotTestException,
1625
self.assertListRaises, _TestException, wrong_exception)
1626
self.assertRaises(_NotTestException,
1627
self.assertListRaises, _TestException, wrong_exception_generator)
1629
def test_assert_list_raises_no_exception(self):
1633
def success_generator():
1637
self.assertRaises(AssertionError,
1638
self.assertListRaises, _TestException, success)
1640
self.assertRaises(AssertionError,
1641
self.assertListRaises, _TestException, success_generator)
1643
def _run_successful_test(self, test):
1644
result = testtools.TestResult()
1646
self.assertTrue(result.wasSuccessful())
1649
def test_overrideAttr_without_value(self):
1650
self.test_attr = 'original' # Define a test attribute
1651
obj = self # Make 'obj' visible to the embedded test
1652
class Test(tests.TestCase):
1655
super(Test, self).setUp()
1656
self.orig = self.overrideAttr(obj, 'test_attr')
1658
def test_value(self):
1659
self.assertEqual('original', self.orig)
1660
self.assertEqual('original', obj.test_attr)
1661
obj.test_attr = 'modified'
1662
self.assertEqual('modified', obj.test_attr)
1664
self._run_successful_test(Test('test_value'))
1665
self.assertEqual('original', obj.test_attr)
1667
def test_overrideAttr_with_value(self):
1668
self.test_attr = 'original' # Define a test attribute
1669
obj = self # Make 'obj' visible to the embedded test
1670
class Test(tests.TestCase):
1673
super(Test, self).setUp()
1674
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1676
def test_value(self):
1677
self.assertEqual('original', self.orig)
1678
self.assertEqual('modified', obj.test_attr)
1680
self._run_successful_test(Test('test_value'))
1681
self.assertEqual('original', obj.test_attr)
1683
def test_overrideAttr_with_no_existing_value_and_value(self):
1684
# Do not define the test_attribute
1685
obj = self # Make 'obj' visible to the embedded test
1686
class Test(tests.TestCase):
1689
tests.TestCase.setUp(self)
1690
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1692
def test_value(self):
1693
self.assertEqual(tests._unitialized_attr, self.orig)
1694
self.assertEqual('modified', obj.test_attr)
1696
self._run_successful_test(Test('test_value'))
1697
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1699
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1700
# Do not define the test_attribute
1701
obj = self # Make 'obj' visible to the embedded test
1702
class Test(tests.TestCase):
1705
tests.TestCase.setUp(self)
1706
self.orig = self.overrideAttr(obj, 'test_attr')
1708
def test_value(self):
1709
self.assertEqual(tests._unitialized_attr, self.orig)
1710
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1712
self._run_successful_test(Test('test_value'))
1713
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1715
def test_recordCalls(self):
1716
from breezy.tests import test_selftest
1717
calls = self.recordCalls(
1718
test_selftest, '_add_numbers')
1719
self.assertEqual(test_selftest._add_numbers(2, 10),
1721
self.assertEqual(calls, [((2, 10), {})])
1724
def _add_numbers(a, b):
1728
class _MissingFeature(features.Feature):
1731
missing_feature = _MissingFeature()
1734
def _get_test(name):
1735
"""Get an instance of a specific example test.
1737
We protect this in a function so that they don't auto-run in the test
1741
class ExampleTests(tests.TestCase):
1743
def test_fail(self):
1744
mutter('this was a failing test')
1745
self.fail('this test will fail')
1747
def test_error(self):
1748
mutter('this test errored')
1749
raise RuntimeError('gotcha')
1751
def test_missing_feature(self):
1752
mutter('missing the feature')
1753
self.requireFeature(missing_feature)
1755
def test_skip(self):
1756
mutter('this test will be skipped')
1757
raise tests.TestSkipped('reason')
1759
def test_success(self):
1760
mutter('this test succeeds')
1762
def test_xfail(self):
1763
mutter('test with expected failure')
1764
self.knownFailure('this_fails')
1766
def test_unexpected_success(self):
1767
mutter('test with unexpected success')
1768
self.expectFailure('should_fail', lambda: None)
1770
return ExampleTests(name)
1773
def _get_skip_reasons(result):
1774
# GZ 2017-06-06: Newer testtools doesn't have this, uses detail instead
1775
return result.skip_reasons
1778
class TestTestCaseLogDetails(tests.TestCase):
1780
def _run_test(self, test_name):
1781
test = _get_test(test_name)
1782
result = testtools.TestResult()
1786
def test_fail_has_log(self):
1787
result = self._run_test('test_fail')
1788
self.assertEqual(1, len(result.failures))
1789
result_content = result.failures[0][1]
1790
self.assertContainsRe(result_content,
1791
'(?m)^(?:Text attachment: )?log(?:$|: )')
1792
self.assertContainsRe(result_content, 'this was a failing test')
1794
def test_error_has_log(self):
1795
result = self._run_test('test_error')
1796
self.assertEqual(1, len(result.errors))
1797
result_content = result.errors[0][1]
1798
self.assertContainsRe(result_content,
1799
'(?m)^(?:Text attachment: )?log(?:$|: )')
1800
self.assertContainsRe(result_content, 'this test errored')
1802
def test_skip_has_no_log(self):
1803
result = self._run_test('test_skip')
1804
reasons = _get_skip_reasons(result)
1805
self.assertEqual({'reason'}, set(reasons))
1806
skips = reasons['reason']
1807
self.assertEqual(1, len(skips))
1809
self.assertFalse('log' in test.getDetails())
1811
def test_missing_feature_has_no_log(self):
1812
# testtools doesn't know about addNotSupported, so it just gets
1813
# considered as a skip
1814
result = self._run_test('test_missing_feature')
1815
reasons = _get_skip_reasons(result)
1816
self.assertEqual({str(missing_feature)}, set(reasons))
1817
skips = reasons[str(missing_feature)]
1818
self.assertEqual(1, len(skips))
1820
self.assertFalse('log' in test.getDetails())
1822
def test_xfail_has_no_log(self):
1823
result = self._run_test('test_xfail')
1824
self.assertEqual(1, len(result.expectedFailures))
1825
result_content = result.expectedFailures[0][1]
1826
self.assertNotContainsRe(result_content,
1827
'(?m)^(?:Text attachment: )?log(?:$|: )')
1828
self.assertNotContainsRe(result_content, 'test with expected failure')
1830
def test_unexpected_success_has_log(self):
1831
result = self._run_test('test_unexpected_success')
1832
self.assertEqual(1, len(result.unexpectedSuccesses))
1833
# Inconsistency, unexpectedSuccesses is a list of tests,
1834
# expectedFailures is a list of reasons?
1835
test = result.unexpectedSuccesses[0]
1836
details = test.getDetails()
1837
self.assertTrue('log' in details)
1840
class TestTestCloning(tests.TestCase):
1841
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1843
def test_cloned_testcase_does_not_share_details(self):
1844
"""A TestCase cloned with clone_test does not share mutable attributes
1845
such as details or cleanups.
1847
class Test(tests.TestCase):
1849
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1850
orig_test = Test('test_foo')
1851
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1852
orig_test.run(unittest.TestResult())
1853
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1854
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1856
def test_double_apply_scenario_preserves_first_scenario(self):
1857
"""Applying two levels of scenarios to a test preserves the attributes
1858
added by both scenarios.
1860
class Test(tests.TestCase):
1863
test = Test('test_foo')
1864
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1865
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1866
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1867
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1868
all_tests = list(tests.iter_suite_tests(suite))
1869
self.assertLength(4, all_tests)
1870
all_xys = sorted((t.x, t.y) for t in all_tests)
1871
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1874
# NB: Don't delete this; it's not actually from 0.11!
1875
@deprecated_function(deprecated_in((0, 11, 0)))
1876
def sample_deprecated_function():
1877
"""A deprecated function to test applyDeprecated with."""
1881
def sample_undeprecated_function(a_param):
1882
"""A undeprecated function to test applyDeprecated with."""
1885
class ApplyDeprecatedHelper(object):
1886
"""A helper class for ApplyDeprecated tests."""
1888
@deprecated_method(deprecated_in((0, 11, 0)))
1889
def sample_deprecated_method(self, param_one):
1890
"""A deprecated method for testing with."""
1893
def sample_normal_method(self):
1894
"""A undeprecated method."""
1896
@deprecated_method(deprecated_in((0, 10, 0)))
1897
def sample_nested_deprecation(self):
1898
return sample_deprecated_function()
1901
class TestExtraAssertions(tests.TestCase):
1902
"""Tests for new test assertions in breezy test suite"""
1904
def test_assert_isinstance(self):
1905
self.assertIsInstance(2, int)
1906
self.assertIsInstance(u'', (str, text_type))
1907
e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1908
self.assertIn(str(e),
1909
["None is an instance of <type 'NoneType'> rather than <type 'int'>",
1910
"None is an instance of <class 'NoneType'> rather than <class 'int'>"])
1911
self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1912
e = self.assertRaises(AssertionError,
1913
self.assertIsInstance, None, int, "it's just not")
1914
self.assertEqual(str(e),
1915
"None is an instance of <type 'NoneType'> rather than <type 'int'>"
1918
def test_assertEndsWith(self):
1919
self.assertEndsWith('foo', 'oo')
1920
self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1922
def test_assertEqualDiff(self):
1923
e = self.assertRaises(AssertionError,
1924
self.assertEqualDiff, '', '\n')
1925
self.assertEqual(str(e),
1926
# Don't blink ! The '+' applies to the second string
1927
'first string is missing a final newline.\n+ \n')
1928
e = self.assertRaises(AssertionError,
1929
self.assertEqualDiff, '\n', '')
1930
self.assertEqual(str(e),
1931
# Don't blink ! The '-' applies to the second string
1932
'second string is missing a final newline.\n- \n')
1935
class TestDeprecations(tests.TestCase):
1937
def test_applyDeprecated_not_deprecated(self):
1938
sample_object = ApplyDeprecatedHelper()
1939
# calling an undeprecated callable raises an assertion
1940
self.assertRaises(AssertionError, self.applyDeprecated,
1941
deprecated_in((0, 11, 0)),
1942
sample_object.sample_normal_method)
1943
self.assertRaises(AssertionError, self.applyDeprecated,
1944
deprecated_in((0, 11, 0)),
1945
sample_undeprecated_function, "a param value")
1946
# calling a deprecated callable (function or method) with the wrong
1947
# expected deprecation fails.
1948
self.assertRaises(AssertionError, self.applyDeprecated,
1949
deprecated_in((0, 10, 0)),
1950
sample_object.sample_deprecated_method, "a param value")
1951
self.assertRaises(AssertionError, self.applyDeprecated,
1952
deprecated_in((0, 10, 0)),
1953
sample_deprecated_function)
1954
# calling a deprecated callable (function or method) with the right
1955
# expected deprecation returns the functions result.
1956
self.assertEqual("a param value",
1957
self.applyDeprecated(deprecated_in((0, 11, 0)),
1958
sample_object.sample_deprecated_method, "a param value"))
1959
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
1960
sample_deprecated_function))
1961
# calling a nested deprecation with the wrong deprecation version
1962
# fails even if a deeper nested function was deprecated with the
1964
self.assertRaises(AssertionError, self.applyDeprecated,
1965
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
1966
# calling a nested deprecation with the right deprecation value
1967
# returns the calls result.
1968
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
1969
sample_object.sample_nested_deprecation))
1971
def test_callDeprecated(self):
1972
def testfunc(be_deprecated, result=None):
1973
if be_deprecated is True:
1974
symbol_versioning.warn('i am deprecated', DeprecationWarning,
1977
result = self.callDeprecated(['i am deprecated'], testfunc, True)
1978
self.assertIs(None, result)
1979
result = self.callDeprecated([], testfunc, False, 'result')
1980
self.assertEqual('result', result)
1981
self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
1982
self.callDeprecated([], testfunc, be_deprecated=False)
1985
class TestWarningTests(tests.TestCase):
1986
"""Tests for calling methods that raise warnings."""
1988
def test_callCatchWarnings(self):
1990
warnings.warn("this is your last warning")
1992
wlist, result = self.callCatchWarnings(meth, 1, 2)
1993
self.assertEqual(3, result)
1994
# would like just to compare them, but UserWarning doesn't implement
1997
self.assertIsInstance(w0, UserWarning)
1998
self.assertEqual("this is your last warning", str(w0))
2001
class TestConvenienceMakers(tests.TestCaseWithTransport):
2002
"""Test for the make_* convenience functions."""
2004
def test_make_branch_and_tree_with_format(self):
2005
# we should be able to supply a format to make_branch_and_tree
2006
self.make_branch_and_tree('a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
2007
self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
2008
breezy.bzr.bzrdir.BzrDirMetaFormat1)
2010
def test_make_branch_and_memory_tree(self):
2011
# we should be able to get a new branch and a mutable tree from
2012
# TestCaseWithTransport
2013
tree = self.make_branch_and_memory_tree('a')
2014
self.assertIsInstance(tree, breezy.memorytree.MemoryTree)
2016
def test_make_tree_for_local_vfs_backed_transport(self):
2017
# make_branch_and_tree has to use local branch and repositories
2018
# when the vfs transport and local disk are colocated, even if
2019
# a different transport is in use for url generation.
2020
self.transport_server = test_server.FakeVFATServer
2021
self.assertFalse(self.get_url('t1').startswith('file://'))
2022
tree = self.make_branch_and_tree('t1')
2023
base = tree.controldir.root_transport.base
2024
self.assertStartsWith(base, 'file://')
2025
self.assertEqual(tree.controldir.root_transport,
2026
tree.branch.controldir.root_transport)
2027
self.assertEqual(tree.controldir.root_transport,
2028
tree.branch.repository.controldir.root_transport)
2031
class SelfTestHelper(object):
2033
def run_selftest(self, **kwargs):
2034
"""Run selftest returning its output."""
2036
old_transport = breezy.tests.default_transport
2037
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2038
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2040
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
2042
breezy.tests.default_transport = old_transport
2043
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2048
class TestSelftest(tests.TestCase, SelfTestHelper):
2049
"""Tests of breezy.tests.selftest."""
2051
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
2054
factory_called.append(True)
2055
return TestUtil.TestSuite()
2058
self.apply_redirected(out, err, None, breezy.tests.selftest,
2059
test_suite_factory=factory)
2060
self.assertEqual([True], factory_called)
2063
"""A test suite factory."""
2064
class Test(tests.TestCase):
2071
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2073
def test_list_only(self):
2074
output = self.run_selftest(test_suite_factory=self.factory,
2076
self.assertEqual(3, len(output.readlines()))
2078
def test_list_only_filtered(self):
2079
output = self.run_selftest(test_suite_factory=self.factory,
2080
list_only=True, pattern="Test.b")
2081
self.assertEndsWith(output.getvalue(), "Test.b\n")
2082
self.assertLength(1, output.readlines())
2084
def test_list_only_excludes(self):
2085
output = self.run_selftest(test_suite_factory=self.factory,
2086
list_only=True, exclude_pattern="Test.b")
2087
self.assertNotContainsRe("Test.b", output.getvalue())
2088
self.assertLength(2, output.readlines())
2090
def test_lsprof_tests(self):
2091
self.requireFeature(features.lsprof_feature)
2094
def __call__(test, result):
2096
def run(test, result):
2097
results.append(result)
2098
def countTestCases(self):
2100
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2101
self.assertLength(1, results)
2102
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2104
def test_random(self):
2105
# test randomising by listing a number of tests.
2106
output_123 = self.run_selftest(test_suite_factory=self.factory,
2107
list_only=True, random_seed="123")
2108
output_234 = self.run_selftest(test_suite_factory=self.factory,
2109
list_only=True, random_seed="234")
2110
self.assertNotEqual(output_123, output_234)
2111
# "Randominzing test order..\n\n
2112
self.assertLength(5, output_123.readlines())
2113
self.assertLength(5, output_234.readlines())
2115
def test_random_reuse_is_same_order(self):
2116
# test randomising by listing a number of tests.
2117
expected = self.run_selftest(test_suite_factory=self.factory,
2118
list_only=True, random_seed="123")
2119
repeated = self.run_selftest(test_suite_factory=self.factory,
2120
list_only=True, random_seed="123")
2121
self.assertEqual(expected.getvalue(), repeated.getvalue())
2123
def test_runner_class(self):
2124
self.requireFeature(features.subunit)
2125
from subunit import ProtocolTestCase
2126
stream = self.run_selftest(
2127
runner_class=tests.SubUnitBzrRunnerv1,
2128
test_suite_factory=self.factory)
2129
test = ProtocolTestCase(stream)
2130
result = unittest.TestResult()
2132
self.assertEqual(3, result.testsRun)
2134
def test_starting_with_single_argument(self):
2135
output = self.run_selftest(test_suite_factory=self.factory,
2136
starting_with=['breezy.tests.test_selftest.Test.a'],
2138
self.assertEqual('breezy.tests.test_selftest.Test.a\n',
2141
def test_starting_with_multiple_argument(self):
2142
output = self.run_selftest(test_suite_factory=self.factory,
2143
starting_with=['breezy.tests.test_selftest.Test.a',
2144
'breezy.tests.test_selftest.Test.b'],
2146
self.assertEqual('breezy.tests.test_selftest.Test.a\n'
2147
'breezy.tests.test_selftest.Test.b\n',
2150
def check_transport_set(self, transport_server):
2151
captured_transport = []
2152
def seen_transport(a_transport):
2153
captured_transport.append(a_transport)
2154
class Capture(tests.TestCase):
2156
seen_transport(breezy.tests.default_transport)
2158
return TestUtil.TestSuite([Capture("a")])
2159
self.run_selftest(transport=transport_server, test_suite_factory=factory)
2160
self.assertEqual(transport_server, captured_transport[0])
2162
def test_transport_sftp(self):
2163
self.requireFeature(features.paramiko)
2164
from breezy.tests import stub_sftp
2165
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2167
def test_transport_memory(self):
2168
self.check_transport_set(memory.MemoryServer)
2171
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2172
# Does IO: reads test.list
2174
def test_load_list(self):
2175
# Provide a list with one test - this test.
2176
test_id_line = b'%s\n' % self.id()
2177
self.build_tree_contents([('test.list', test_id_line)])
2178
# And generate a list of the tests in the suite.
2179
stream = self.run_selftest(load_list='test.list', list_only=True)
2180
self.assertEqual(test_id_line, stream.getvalue())
2182
def test_load_unknown(self):
2183
# Provide a list with one test - this test.
2184
# And generate a list of the tests in the suite.
2185
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
2186
load_list='missing file name', list_only=True)
2189
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2191
_test_needs_features = [features.subunit]
2193
def run_subunit_stream(self, test_name):
2194
from subunit import ProtocolTestCase
2196
return TestUtil.TestSuite([_get_test(test_name)])
2197
stream = self.run_selftest(
2198
runner_class=tests.SubUnitBzrRunnerv1,
2199
test_suite_factory=factory)
2200
test = ProtocolTestCase(stream)
2201
result = testtools.TestResult()
2203
content = stream.getvalue()
2204
return content, result
2206
def test_fail_has_log(self):
2207
content, result = self.run_subunit_stream('test_fail')
2208
self.assertEqual(1, len(result.failures))
2209
self.assertContainsRe(content, '(?m)^log$')
2210
self.assertContainsRe(content, 'this test will fail')
2212
def test_error_has_log(self):
2213
content, result = self.run_subunit_stream('test_error')
2214
self.assertContainsRe(content, '(?m)^log$')
2215
self.assertContainsRe(content, 'this test errored')
2217
def test_skip_has_no_log(self):
2218
content, result = self.run_subunit_stream('test_skip')
2219
self.assertNotContainsRe(content, '(?m)^log$')
2220
self.assertNotContainsRe(content, 'this test will be skipped')
2221
reasons = _get_skip_reasons(result)
2222
self.assertEqual({'reason'}, set(reasons))
2223
skips = reasons['reason']
2224
self.assertEqual(1, len(skips))
2226
# RemotedTestCase doesn't preserve the "details"
2227
## self.assertFalse('log' in test.getDetails())
2229
def test_missing_feature_has_no_log(self):
2230
content, result = self.run_subunit_stream('test_missing_feature')
2231
self.assertNotContainsRe(content, '(?m)^log$')
2232
self.assertNotContainsRe(content, 'missing the feature')
2233
reasons = _get_skip_reasons(result)
2234
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2235
skips = reasons['_MissingFeature\n']
2236
self.assertEqual(1, len(skips))
2238
# RemotedTestCase doesn't preserve the "details"
2239
## self.assertFalse('log' in test.getDetails())
2241
def test_xfail_has_no_log(self):
2242
content, result = self.run_subunit_stream('test_xfail')
2243
self.assertNotContainsRe(content, '(?m)^log$')
2244
self.assertNotContainsRe(content, 'test with expected failure')
2245
self.assertEqual(1, len(result.expectedFailures))
2246
result_content = result.expectedFailures[0][1]
2247
self.assertNotContainsRe(result_content,
2248
'(?m)^(?:Text attachment: )?log(?:$|: )')
2249
self.assertNotContainsRe(result_content, 'test with expected failure')
2251
def test_unexpected_success_has_log(self):
2252
content, result = self.run_subunit_stream('test_unexpected_success')
2253
self.assertContainsRe(content, '(?m)^log$')
2254
self.assertContainsRe(content, 'test with unexpected success')
2255
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2256
# success, if a min version check is added remove this
2257
from subunit import TestProtocolClient as _Client
2258
if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
2259
self.expectFailure('subunit treats "unexpectedSuccess"'
2260
' as a plain success',
2261
self.assertEqual, 1, len(result.unexpectedSuccesses))
2262
self.assertEqual(1, len(result.unexpectedSuccesses))
2263
test = result.unexpectedSuccesses[0]
2264
# RemotedTestCase doesn't preserve the "details"
2265
## self.assertTrue('log' in test.getDetails())
2267
def test_success_has_no_log(self):
2268
content, result = self.run_subunit_stream('test_success')
2269
self.assertEqual(1, result.testsRun)
2270
self.assertNotContainsRe(content, '(?m)^log$')
2271
self.assertNotContainsRe(content, 'this test succeeds')
2274
class TestRunBzr(tests.TestCase):
2279
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2281
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2283
Attempts to run bzr from inside this class don't actually run it.
2285
We test how run_bzr actually invokes bzr in another location. Here we
2286
only need to test that it passes the right parameters to run_bzr.
2288
self.argv = list(argv)
2289
self.retcode = retcode
2290
self.encoding = encoding
2292
self.working_dir = working_dir
2293
return self.retcode, self.out, self.err
2295
def test_run_bzr_error(self):
2296
self.out = "It sure does!\n"
2297
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2298
self.assertEqual(['rocks'], self.argv)
2299
self.assertEqual(34, self.retcode)
2300
self.assertEqual('It sure does!\n', out)
2301
self.assertEqual(out, self.out)
2302
self.assertEqual('', err)
2303
self.assertEqual(err, self.err)
2305
def test_run_bzr_error_regexes(self):
2307
self.err = b"bzr: ERROR: foobarbaz is not versioned"
2308
out, err = self.run_bzr_error(
2309
[b"bzr: ERROR: foobarbaz is not versioned"],
2310
['file-id', 'foobarbaz'])
2312
def test_encoding(self):
2313
"""Test that run_bzr passes encoding to _run_bzr_core"""
2314
self.run_bzr('foo bar')
2315
self.assertEqual(None, self.encoding)
2316
self.assertEqual(['foo', 'bar'], self.argv)
2318
self.run_bzr('foo bar', encoding='baz')
2319
self.assertEqual('baz', self.encoding)
2320
self.assertEqual(['foo', 'bar'], self.argv)
2322
def test_retcode(self):
2323
"""Test that run_bzr passes retcode to _run_bzr_core"""
2324
# Default is retcode == 0
2325
self.run_bzr('foo bar')
2326
self.assertEqual(0, self.retcode)
2327
self.assertEqual(['foo', 'bar'], self.argv)
2329
self.run_bzr('foo bar', retcode=1)
2330
self.assertEqual(1, self.retcode)
2331
self.assertEqual(['foo', 'bar'], self.argv)
2333
self.run_bzr('foo bar', retcode=None)
2334
self.assertEqual(None, self.retcode)
2335
self.assertEqual(['foo', 'bar'], self.argv)
2337
self.run_bzr(['foo', 'bar'], retcode=3)
2338
self.assertEqual(3, self.retcode)
2339
self.assertEqual(['foo', 'bar'], self.argv)
2341
def test_stdin(self):
2342
# test that the stdin keyword to run_bzr is passed through to
2343
# _run_bzr_core as-is. We do this by overriding
2344
# _run_bzr_core in this class, and then calling run_bzr,
2345
# which is a convenience function for _run_bzr_core, so
2347
self.run_bzr('foo bar', stdin='gam')
2348
self.assertEqual('gam', self.stdin)
2349
self.assertEqual(['foo', 'bar'], self.argv)
2351
self.run_bzr('foo bar', stdin='zippy')
2352
self.assertEqual('zippy', self.stdin)
2353
self.assertEqual(['foo', 'bar'], self.argv)
2355
def test_working_dir(self):
2356
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2357
self.run_bzr('foo bar')
2358
self.assertEqual(None, self.working_dir)
2359
self.assertEqual(['foo', 'bar'], self.argv)
2361
self.run_bzr('foo bar', working_dir='baz')
2362
self.assertEqual('baz', self.working_dir)
2363
self.assertEqual(['foo', 'bar'], self.argv)
2365
def test_reject_extra_keyword_arguments(self):
2366
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2367
error_regex=['error message'])
2370
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2371
# Does IO when testing the working_dir parameter.
2373
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2374
a_callable=None, *args, **kwargs):
2376
self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None)
2377
self.factory = breezy.ui.ui_factory
2378
self.working_dir = osutils.getcwd()
2379
stdout.write('foo\n')
2380
stderr.write('bar\n')
2383
def test_stdin(self):
2384
# test that the stdin keyword to _run_bzr_core is passed through to
2385
# apply_redirected as a StringIO. We do this by overriding
2386
# apply_redirected in this class, and then calling _run_bzr_core,
2387
# which calls apply_redirected.
2388
self.run_bzr(['foo', 'bar'], stdin='gam')
2389
self.assertEqual('gam', self.stdin.read())
2390
self.assertTrue(self.stdin is self.factory_stdin)
2391
self.run_bzr(['foo', 'bar'], stdin='zippy')
2392
self.assertEqual('zippy', self.stdin.read())
2393
self.assertTrue(self.stdin is self.factory_stdin)
2395
def test_ui_factory(self):
2396
# each invocation of self.run_bzr should get its
2397
# own UI factory, which is an instance of TestUIFactory,
2398
# with stdin, stdout and stderr attached to the stdin,
2399
# stdout and stderr of the invoked run_bzr
2400
current_factory = breezy.ui.ui_factory
2401
self.run_bzr(['foo'])
2402
self.assertFalse(current_factory is self.factory)
2403
self.assertNotEqual(sys.stdout, self.factory.stdout)
2404
self.assertNotEqual(sys.stderr, self.factory.stderr)
2405
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2406
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2407
self.assertIsInstance(self.factory, tests.TestUIFactory)
2409
def test_working_dir(self):
2410
self.build_tree(['one/', 'two/'])
2411
cwd = osutils.getcwd()
2413
# Default is to work in the current directory
2414
self.run_bzr(['foo', 'bar'])
2415
self.assertEqual(cwd, self.working_dir)
2417
self.run_bzr(['foo', 'bar'], working_dir=None)
2418
self.assertEqual(cwd, self.working_dir)
2420
# The function should be run in the alternative directory
2421
# but afterwards the current working dir shouldn't be changed
2422
self.run_bzr(['foo', 'bar'], working_dir='one')
2423
self.assertNotEqual(cwd, self.working_dir)
2424
self.assertEndsWith(self.working_dir, 'one')
2425
self.assertEqual(cwd, osutils.getcwd())
2427
self.run_bzr(['foo', 'bar'], working_dir='two')
2428
self.assertNotEqual(cwd, self.working_dir)
2429
self.assertEndsWith(self.working_dir, 'two')
2430
self.assertEqual(cwd, osutils.getcwd())
2433
class StubProcess(object):
2434
"""A stub process for testing run_bzr_subprocess."""
2436
def __init__(self, out="", err="", retcode=0):
2439
self.returncode = retcode
2441
def communicate(self):
2442
return self.out, self.err
2445
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2446
"""Base class for tests testing how we might run bzr."""
2449
super(TestWithFakedStartBzrSubprocess, self).setUp()
2450
self.subprocess_calls = []
2452
def start_bzr_subprocess(self, process_args, env_changes=None,
2453
skip_if_plan_to_signal=False,
2455
allow_plugins=False):
2456
"""capture what run_bzr_subprocess tries to do."""
2457
self.subprocess_calls.append({'process_args':process_args,
2458
'env_changes':env_changes,
2459
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2460
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2461
return self.next_subprocess
2464
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2466
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2467
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2469
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2470
that will return static results. This assertion method populates those
2471
results and also checks the arguments run_bzr_subprocess generates.
2473
self.next_subprocess = process
2475
result = self.run_bzr_subprocess(*args, **kwargs)
2477
self.next_subprocess = None
2478
for key, expected in expected_args.items():
2479
self.assertEqual(expected, self.subprocess_calls[-1][key])
2482
self.next_subprocess = None
2483
for key, expected in expected_args.items():
2484
self.assertEqual(expected, self.subprocess_calls[-1][key])
2487
def test_run_bzr_subprocess(self):
2488
"""The run_bzr_helper_external command behaves nicely."""
2489
self.assertRunBzrSubprocess({'process_args':['--version']},
2490
StubProcess(), '--version')
2491
self.assertRunBzrSubprocess({'process_args':['--version']},
2492
StubProcess(), ['--version'])
2493
# retcode=None disables retcode checking
2494
result = self.assertRunBzrSubprocess({},
2495
StubProcess(retcode=3), '--version', retcode=None)
2496
result = self.assertRunBzrSubprocess({},
2497
StubProcess(out="is free software"), '--version')
2498
self.assertContainsRe(result[0], 'is free software')
2499
# Running a subcommand that is missing errors
2500
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2501
{'process_args':['--versionn']}, StubProcess(retcode=3),
2503
# Unless it is told to expect the error from the subprocess
2504
result = self.assertRunBzrSubprocess({},
2505
StubProcess(retcode=3), '--versionn', retcode=3)
2506
# Or to ignore retcode checking
2507
result = self.assertRunBzrSubprocess({},
2508
StubProcess(err="unknown command", retcode=3), '--versionn',
2510
self.assertContainsRe(result[1], 'unknown command')
2512
def test_env_change_passes_through(self):
2513
self.assertRunBzrSubprocess(
2514
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2516
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2518
def test_no_working_dir_passed_as_None(self):
2519
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2521
def test_no_working_dir_passed_through(self):
2522
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2525
def test_run_bzr_subprocess_no_plugins(self):
2526
self.assertRunBzrSubprocess({'allow_plugins': False},
2529
def test_allow_plugins(self):
2530
self.assertRunBzrSubprocess({'allow_plugins': True},
2531
StubProcess(), '', allow_plugins=True)
2534
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2536
def test_finish_bzr_subprocess_with_error(self):
2537
"""finish_bzr_subprocess allows specification of the desired exit code.
2539
process = StubProcess(err="unknown command", retcode=3)
2540
result = self.finish_bzr_subprocess(process, retcode=3)
2541
self.assertEqual('', result[0])
2542
self.assertContainsRe(result[1], 'unknown command')
2544
def test_finish_bzr_subprocess_ignoring_retcode(self):
2545
"""finish_bzr_subprocess allows the exit code to be ignored."""
2546
process = StubProcess(err="unknown command", retcode=3)
2547
result = self.finish_bzr_subprocess(process, retcode=None)
2548
self.assertEqual('', result[0])
2549
self.assertContainsRe(result[1], 'unknown command')
2551
def test_finish_subprocess_with_unexpected_retcode(self):
2552
"""finish_bzr_subprocess raises self.failureException if the retcode is
2553
not the expected one.
2555
process = StubProcess(err="unknown command", retcode=3)
2556
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2560
class _DontSpawnProcess(Exception):
2561
"""A simple exception which just allows us to skip unnecessary steps"""
2564
class TestStartBzrSubProcess(tests.TestCase):
2565
"""Stub test start_bzr_subprocess."""
2567
def _subprocess_log_cleanup(self):
2568
"""Inhibits the base version as we don't produce a log file."""
2570
def _popen(self, *args, **kwargs):
2571
"""Override the base version to record the command that is run.
2573
From there we can ensure it is correct without spawning a real process.
2575
self.check_popen_state()
2576
self._popen_args = args
2577
self._popen_kwargs = kwargs
2578
raise _DontSpawnProcess()
2580
def check_popen_state(self):
2581
"""Replace to make assertions when popen is called."""
2583
def test_run_bzr_subprocess_no_plugins(self):
2584
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2585
command = self._popen_args[0]
2586
self.assertEqual(sys.executable, command[0])
2587
self.assertEqual(self.get_brz_path(), command[1])
2588
self.assertEqual(['--no-plugins'], command[2:])
2590
def test_allow_plugins(self):
2591
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2593
command = self._popen_args[0]
2594
self.assertEqual([], command[2:])
2596
def test_set_env(self):
2597
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2599
def check_environment():
2600
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2601
self.check_popen_state = check_environment
2602
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2603
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2604
# not set in theparent
2605
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2607
def test_run_bzr_subprocess_env_del(self):
2608
"""run_bzr_subprocess can remove environment variables too."""
2609
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2610
def check_environment():
2611
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2612
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2613
self.check_popen_state = check_environment
2614
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2615
env_changes={'EXISTANT_ENV_VAR':None})
2616
# Still set in parent
2617
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2618
del os.environ['EXISTANT_ENV_VAR']
2620
def test_env_del_missing(self):
2621
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2622
def check_environment():
2623
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2624
self.check_popen_state = check_environment
2625
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2626
env_changes={'NON_EXISTANT_ENV_VAR':None})
2628
def test_working_dir(self):
2629
"""Test that we can specify the working dir for the child"""
2630
orig_getcwd = osutils.getcwd
2631
orig_chdir = os.chdir
2635
self.overrideAttr(os, 'chdir', chdir)
2638
self.overrideAttr(osutils, 'getcwd', getcwd)
2639
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2641
self.assertEqual(['foo', 'current'], chdirs)
2643
def test_get_brz_path_with_cwd_breezy(self):
2644
self.get_source_path = lambda: ""
2645
self.overrideAttr(os.path, "isfile", lambda path: True)
2646
self.assertEqual(self.get_brz_path(), "brz")
2649
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2650
"""Tests that really need to do things with an external bzr."""
2652
def test_start_and_stop_bzr_subprocess_send_signal(self):
2653
"""finish_bzr_subprocess raises self.failureException if the retcode is
2654
not the expected one.
2656
self.disable_missing_extensions_warning()
2657
process = self.start_bzr_subprocess(['wait-until-signalled'],
2658
skip_if_plan_to_signal=True)
2659
self.assertEqual(b'running\n', process.stdout.readline())
2660
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2662
self.assertEqual(b'', result[0])
2663
self.assertEqual(b'brz: interrupted\n', result[1])
2666
class TestSelftestFiltering(tests.TestCase):
2669
super(TestSelftestFiltering, self).setUp()
2670
self.suite = TestUtil.TestSuite()
2671
self.loader = TestUtil.TestLoader()
2672
self.suite.addTest(self.loader.loadTestsFromModule(
2673
sys.modules['breezy.tests.test_selftest']))
2674
self.all_names = _test_ids(self.suite)
2676
def test_condition_id_re(self):
2677
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2678
'test_condition_id_re')
2679
filtered_suite = tests.filter_suite_by_condition(
2680
self.suite, tests.condition_id_re('test_condition_id_re'))
2681
self.assertEqual([test_name], _test_ids(filtered_suite))
2683
def test_condition_id_in_list(self):
2684
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2685
'test_condition_id_in_list']
2686
id_list = tests.TestIdList(test_names)
2687
filtered_suite = tests.filter_suite_by_condition(
2688
self.suite, tests.condition_id_in_list(id_list))
2689
my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
2690
re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
2691
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2693
def test_condition_id_startswith(self):
2694
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2695
start1 = klass + 'test_condition_id_starts'
2696
start2 = klass + 'test_condition_id_in'
2697
test_names = [ klass + 'test_condition_id_in_list',
2698
klass + 'test_condition_id_startswith',
2700
filtered_suite = tests.filter_suite_by_condition(
2701
self.suite, tests.condition_id_startswith([start1, start2]))
2702
self.assertEqual(test_names, _test_ids(filtered_suite))
2704
def test_condition_isinstance(self):
2705
filtered_suite = tests.filter_suite_by_condition(
2706
self.suite, tests.condition_isinstance(self.__class__))
2707
class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2708
re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
2709
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2711
def test_exclude_tests_by_condition(self):
2712
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2713
'test_exclude_tests_by_condition')
2714
filtered_suite = tests.exclude_tests_by_condition(self.suite,
2715
lambda x:x.id() == excluded_name)
2716
self.assertEqual(len(self.all_names) - 1,
2717
filtered_suite.countTestCases())
2718
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2719
remaining_names = list(self.all_names)
2720
remaining_names.remove(excluded_name)
2721
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2723
def test_exclude_tests_by_re(self):
2724
self.all_names = _test_ids(self.suite)
2725
filtered_suite = tests.exclude_tests_by_re(self.suite,
2726
'exclude_tests_by_re')
2727
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2728
'test_exclude_tests_by_re')
2729
self.assertEqual(len(self.all_names) - 1,
2730
filtered_suite.countTestCases())
2731
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2732
remaining_names = list(self.all_names)
2733
remaining_names.remove(excluded_name)
2734
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2736
def test_filter_suite_by_condition(self):
2737
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2738
'test_filter_suite_by_condition')
2739
filtered_suite = tests.filter_suite_by_condition(self.suite,
2740
lambda x:x.id() == test_name)
2741
self.assertEqual([test_name], _test_ids(filtered_suite))
2743
def test_filter_suite_by_re(self):
2744
filtered_suite = tests.filter_suite_by_re(self.suite,
2745
'test_filter_suite_by_r')
2746
filtered_names = _test_ids(filtered_suite)
2747
self.assertEqual(filtered_names, ['breezy.tests.test_selftest.'
2748
'TestSelftestFiltering.test_filter_suite_by_re'])
2750
def test_filter_suite_by_id_list(self):
2751
test_list = ['breezy.tests.test_selftest.'
2752
'TestSelftestFiltering.test_filter_suite_by_id_list']
2753
filtered_suite = tests.filter_suite_by_id_list(
2754
self.suite, tests.TestIdList(test_list))
2755
filtered_names = _test_ids(filtered_suite)
2758
['breezy.tests.test_selftest.'
2759
'TestSelftestFiltering.test_filter_suite_by_id_list'])
2761
def test_filter_suite_by_id_startswith(self):
2762
# By design this test may fail if another test is added whose name also
2763
# begins with one of the start value used.
2764
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2765
start1 = klass + 'test_filter_suite_by_id_starts'
2766
start2 = klass + 'test_filter_suite_by_id_li'
2767
test_list = [klass + 'test_filter_suite_by_id_list',
2768
klass + 'test_filter_suite_by_id_startswith',
2770
filtered_suite = tests.filter_suite_by_id_startswith(
2771
self.suite, [start1, start2])
2774
_test_ids(filtered_suite),
2777
def test_preserve_input(self):
2778
# NB: Surely this is something in the stdlib to do this?
2779
self.assertTrue(self.suite is tests.preserve_input(self.suite))
2780
self.assertTrue("@#$" is tests.preserve_input("@#$"))
2782
def test_randomize_suite(self):
2783
randomized_suite = tests.randomize_suite(self.suite)
2784
# randomizing should not add or remove test names.
2785
self.assertEqual(set(_test_ids(self.suite)),
2786
set(_test_ids(randomized_suite)))
2787
# Technically, this *can* fail, because random.shuffle(list) can be
2788
# equal to list. Trying multiple times just pushes the frequency back.
2789
# As its len(self.all_names)!:1, the failure frequency should be low
2790
# enough to ignore. RBC 20071021.
2791
# It should change the order.
2792
self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
2793
# But not the length. (Possibly redundant with the set test, but not
2795
self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
2797
def test_split_suit_by_condition(self):
2798
self.all_names = _test_ids(self.suite)
2799
condition = tests.condition_id_re('test_filter_suite_by_r')
2800
split_suite = tests.split_suite_by_condition(self.suite, condition)
2801
filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2802
'test_filter_suite_by_re')
2803
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2804
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2805
remaining_names = list(self.all_names)
2806
remaining_names.remove(filtered_name)
2807
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2809
def test_split_suit_by_re(self):
2810
self.all_names = _test_ids(self.suite)
2811
split_suite = tests.split_suite_by_re(self.suite,
2812
'test_filter_suite_by_r')
2813
filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2814
'test_filter_suite_by_re')
2815
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2816
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2817
remaining_names = list(self.all_names)
2818
remaining_names.remove(filtered_name)
2819
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2822
class TestCheckTreeShape(tests.TestCaseWithTransport):
2824
def test_check_tree_shape(self):
2825
files = ['a', 'b/', 'b/c']
2826
tree = self.make_branch_and_tree('.')
2827
self.build_tree(files)
2831
self.check_tree_shape(tree, files)
2836
class TestBlackboxSupport(tests.TestCase):
2837
"""Tests for testsuite blackbox features."""
2839
def test_run_bzr_failure_not_caught(self):
2840
# When we run bzr in blackbox mode, we want any unexpected errors to
2841
# propagate up to the test suite so that it can show the error in the
2842
# usual way, and we won't get a double traceback.
2843
e = self.assertRaises(
2845
self.run_bzr, ['assert-fail'])
2846
# make sure we got the real thing, not an error from somewhere else in
2847
# the test framework
2848
self.assertEqual('always fails', str(e))
2849
# check that there's no traceback in the test log
2850
self.assertNotContainsRe(self.get_log(), r'Traceback')
2852
def test_run_bzr_user_error_caught(self):
2853
# Running bzr in blackbox mode, normal/expected/user errors should be
2854
# caught in the regular way and turned into an error message plus exit
2856
transport_server = memory.MemoryServer()
2857
transport_server.start_server()
2858
self.addCleanup(transport_server.stop_server)
2859
url = transport_server.get_url()
2860
self.permit_url(url)
2861
out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2862
self.assertEqual(out, '')
2863
self.assertContainsRe(err,
2864
b'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
2867
class TestTestLoader(tests.TestCase):
2868
"""Tests for the test loader."""
2870
def _get_loader_and_module(self):
2871
"""Gets a TestLoader and a module with one test in it."""
2872
loader = TestUtil.TestLoader()
2874
class Stub(tests.TestCase):
2877
class MyModule(object):
2879
MyModule.a_class = Stub
2881
module.__name__ = 'fake_module'
2882
return loader, module
2884
def test_module_no_load_tests_attribute_loads_classes(self):
2885
loader, module = self._get_loader_and_module()
2886
self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases())
2888
def test_module_load_tests_attribute_gets_called(self):
2889
loader, module = self._get_loader_and_module()
2890
def load_tests(loader, standard_tests, pattern):
2891
result = loader.suiteClass()
2892
for test in tests.iter_suite_tests(standard_tests):
2893
result.addTests([test, test])
2895
# add a load_tests() method which multiplies the tests from the module.
2896
module.__class__.load_tests = staticmethod(load_tests)
2898
2 * [str(module.a_class('test_foo'))],
2899
list(map(str, loader.loadTestsFromModule(module))))
2901
def test_load_tests_from_module_name_smoke_test(self):
2902
loader = TestUtil.TestLoader()
2903
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2904
self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
2907
def test_load_tests_from_module_name_with_bogus_module_name(self):
2908
loader = TestUtil.TestLoader()
2909
self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
2912
class TestTestIdList(tests.TestCase):
2914
def _create_id_list(self, test_list):
2915
return tests.TestIdList(test_list)
2917
def _create_suite(self, test_id_list):
2919
class Stub(tests.TestCase):
2923
def _create_test_id(id):
2926
suite = TestUtil.TestSuite()
2927
for id in test_id_list:
2928
t = Stub('test_foo')
2929
t.id = _create_test_id(id)
2933
def _test_ids(self, test_suite):
2934
"""Get the ids for the tests in a test suite."""
2935
return [t.id() for t in tests.iter_suite_tests(test_suite)]
2937
def test_empty_list(self):
2938
id_list = self._create_id_list([])
2939
self.assertEqual({}, id_list.tests)
2940
self.assertEqual({}, id_list.modules)
2942
def test_valid_list(self):
2943
id_list = self._create_id_list(
2944
['mod1.cl1.meth1', 'mod1.cl1.meth2',
2945
'mod1.func1', 'mod1.cl2.meth2',
2947
'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
2949
self.assertTrue(id_list.refers_to('mod1'))
2950
self.assertTrue(id_list.refers_to('mod1.submod1'))
2951
self.assertTrue(id_list.refers_to('mod1.submod2'))
2952
self.assertTrue(id_list.includes('mod1.cl1.meth1'))
2953
self.assertTrue(id_list.includes('mod1.submod1'))
2954
self.assertTrue(id_list.includes('mod1.func1'))
2956
def test_bad_chars_in_params(self):
2957
id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
2958
self.assertTrue(id_list.refers_to('mod1'))
2959
self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
2961
def test_module_used(self):
2962
id_list = self._create_id_list(['mod.class.meth'])
2963
self.assertTrue(id_list.refers_to('mod'))
2964
self.assertTrue(id_list.refers_to('mod.class'))
2965
self.assertTrue(id_list.refers_to('mod.class.meth'))
2967
def test_test_suite_matches_id_list_with_unknown(self):
2968
loader = TestUtil.TestLoader()
2969
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2970
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',
2972
not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
2973
self.assertEqual(['bogus'], not_found)
2974
self.assertEqual([], duplicates)
2976
def test_suite_matches_id_list_with_duplicates(self):
2977
loader = TestUtil.TestLoader()
2978
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2979
dupes = loader.suiteClass()
2980
for test in tests.iter_suite_tests(suite):
2982
dupes.addTest(test) # Add it again
2984
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',]
2985
not_found, duplicates = tests.suite_matches_id_list(
2987
self.assertEqual([], not_found)
2988
self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
2992
class TestTestSuite(tests.TestCase):
2994
def test__test_suite_testmod_names(self):
2995
# Test that a plausible list of test module names are returned
2996
# by _test_suite_testmod_names.
2997
test_list = tests._test_suite_testmod_names()
2999
'breezy.tests.blackbox',
3000
'breezy.tests.per_transport',
3001
'breezy.tests.test_selftest',
3005
def test__test_suite_modules_to_doctest(self):
3006
# Test that a plausible list of modules to doctest is returned
3007
# by _test_suite_modules_to_doctest.
3008
test_list = tests._test_suite_modules_to_doctest()
3010
# When docstrings are stripped, there are no modules to doctest
3011
self.assertEqual([], test_list)
3018
def test_test_suite(self):
3019
# test_suite() loads the entire test suite to operate. To avoid this
3020
# overhead, and yet still be confident that things are happening,
3021
# we temporarily replace two functions used by test_suite with
3022
# test doubles that supply a few sample tests to load, and check they
3025
def testmod_names():
3026
calls.append("testmod_names")
3028
'breezy.tests.blackbox.test_branch',
3029
'breezy.tests.per_transport',
3030
'breezy.tests.test_selftest',
3032
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3034
calls.append("modules_to_doctest")
3037
return ['breezy.timestamp']
3038
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3039
expected_test_list = [
3041
'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
3042
('breezy.tests.per_transport.TransportTests'
3043
'.test_abspath(LocalTransport,LocalURLServer)'),
3044
'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
3045
# plugins can't be tested that way since selftest may be run with
3048
if __doc__ is not None:
3049
expected_test_list.extend([
3050
# modules_to_doctest
3051
'breezy.timestamp.format_highres_date',
3053
suite = tests.test_suite()
3054
self.assertEqual({"testmod_names", "modules_to_doctest"},
3056
self.assertSubset(expected_test_list, _test_ids(suite))
3058
def test_test_suite_list_and_start(self):
3059
# We cannot test this at the same time as the main load, because we want
3060
# to know that starting_with == None works. So a second load is
3061
# incurred - note that the starting_with parameter causes a partial load
3062
# rather than a full load so this test should be pretty quick.
3063
test_list = ['breezy.tests.test_selftest.TestTestSuite.test_test_suite']
3064
suite = tests.test_suite(test_list,
3065
['breezy.tests.test_selftest.TestTestSuite'])
3066
# test_test_suite_list_and_start is not included
3067
self.assertEqual(test_list, _test_ids(suite))
3070
class TestLoadTestIdList(tests.TestCaseInTempDir):
3072
def _create_test_list_file(self, file_name, content):
3073
fl = open(file_name, 'wt')
3077
def test_load_unknown(self):
3078
self.assertRaises(errors.NoSuchFile,
3079
tests.load_test_id_list, 'i_do_not_exist')
3081
def test_load_test_list(self):
3082
test_list_fname = 'test.list'
3083
self._create_test_list_file(test_list_fname,
3084
'mod1.cl1.meth1\nmod2.cl2.meth2\n')
3085
tlist = tests.load_test_id_list(test_list_fname)
3086
self.assertEqual(2, len(tlist))
3087
self.assertEqual('mod1.cl1.meth1', tlist[0])
3088
self.assertEqual('mod2.cl2.meth2', tlist[1])
3090
def test_load_dirty_file(self):
3091
test_list_fname = 'test.list'
3092
self._create_test_list_file(test_list_fname,
3093
' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n'
3095
tlist = tests.load_test_id_list(test_list_fname)
3096
self.assertEqual(4, len(tlist))
3097
self.assertEqual('mod1.cl1.meth1', tlist[0])
3098
self.assertEqual('', tlist[1])
3099
self.assertEqual('mod2.cl2.meth2', tlist[2])
3100
self.assertEqual('bar baz', tlist[3])
3103
class TestFilteredByModuleTestLoader(tests.TestCase):
3105
def _create_loader(self, test_list):
3106
id_filter = tests.TestIdList(test_list)
3107
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3110
def test_load_tests(self):
3111
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3112
loader = self._create_loader(test_list)
3113
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3114
self.assertEqual(test_list, _test_ids(suite))
3116
def test_exclude_tests(self):
3117
test_list = ['bogus']
3118
loader = self._create_loader(test_list)
3119
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3120
self.assertEqual([], _test_ids(suite))
3123
class TestFilteredByNameStartTestLoader(tests.TestCase):
3125
def _create_loader(self, name_start):
3126
def needs_module(name):
3127
return name.startswith(name_start) or name_start.startswith(name)
3128
loader = TestUtil.FilteredByModuleTestLoader(needs_module)
3131
def test_load_tests(self):
3132
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3133
loader = self._create_loader('breezy.tests.test_samp')
3135
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3136
self.assertEqual(test_list, _test_ids(suite))
3138
def test_load_tests_inside_module(self):
3139
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3140
loader = self._create_loader('breezy.tests.test_sampler.Demo')
3142
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3143
self.assertEqual(test_list, _test_ids(suite))
3145
def test_exclude_tests(self):
3146
test_list = ['bogus']
3147
loader = self._create_loader('bogus')
3149
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3150
self.assertEqual([], _test_ids(suite))
3153
class TestTestPrefixRegistry(tests.TestCase):
3155
def _get_registry(self):
3156
tp_registry = tests.TestPrefixAliasRegistry()
3159
def test_register_new_prefix(self):
3160
tpr = self._get_registry()
3161
tpr.register('foo', 'fff.ooo.ooo')
3162
self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
3164
def test_register_existing_prefix(self):
3165
tpr = self._get_registry()
3166
tpr.register('bar', 'bbb.aaa.rrr')
3167
tpr.register('bar', 'bBB.aAA.rRR')
3168
self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
3169
self.assertThat(self.get_log(),
3170
DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3173
def test_get_unknown_prefix(self):
3174
tpr = self._get_registry()
3175
self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
3177
def test_resolve_prefix(self):
3178
tpr = self._get_registry()
3179
tpr.register('bar', 'bb.aa.rr')
3180
self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
3182
def test_resolve_unknown_alias(self):
3183
tpr = self._get_registry()
3184
self.assertRaises(errors.BzrCommandError,
3185
tpr.resolve_alias, 'I am not a prefix')
3187
def test_predefined_prefixes(self):
3188
tpr = tests.test_prefix_alias_registry
3189
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3190
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3191
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3192
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3193
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3194
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3197
class TestThreadLeakDetection(tests.TestCase):
3198
"""Ensure when tests leak threads we detect and report it"""
3200
class LeakRecordingResult(tests.ExtendedTestResult):
3202
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3204
def _report_thread_leak(self, test, leaks, alive):
3205
self.leaks.append((test, leaks))
3207
def test_testcase_without_addCleanups(self):
3208
"""Check old TestCase instances don't break with leak detection"""
3209
class Test(unittest.TestCase):
3212
result = self.LeakRecordingResult()
3214
result.startTestRun()
3216
result.stopTestRun()
3217
self.assertEqual(result._tests_leaking_threads_count, 0)
3218
self.assertEqual(result.leaks, [])
3220
def test_thread_leak(self):
3221
"""Ensure a thread that outlives the running of a test is reported
3223
Uses a thread that blocks on an event, and is started by the inner
3224
test case. As the thread outlives the inner case's run, it should be
3225
detected as a leak, but the event is then set so that the thread can
3226
be safely joined in cleanup so it's not leaked for real.
3228
event = threading.Event()
3229
thread = threading.Thread(name="Leaker", target=event.wait)
3230
class Test(tests.TestCase):
3231
def test_leak(self):
3233
result = self.LeakRecordingResult()
3234
test = Test("test_leak")
3235
self.addCleanup(thread.join)
3236
self.addCleanup(event.set)
3237
result.startTestRun()
3239
result.stopTestRun()
3240
self.assertEqual(result._tests_leaking_threads_count, 1)
3241
self.assertEqual(result._first_thread_leaker_id, test.id())
3242
self.assertEqual(result.leaks, [(test, {thread})])
3243
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3245
def test_multiple_leaks(self):
3246
"""Check multiple leaks are blamed on the test cases at fault
3248
Same concept as the previous test, but has one inner test method that
3249
leaks two threads, and one that doesn't leak at all.
3251
event = threading.Event()
3252
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3253
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3254
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3255
class Test(tests.TestCase):
3256
def test_first_leak(self):
3258
def test_second_no_leak(self):
3260
def test_third_leak(self):
3263
result = self.LeakRecordingResult()
3264
first_test = Test("test_first_leak")
3265
third_test = Test("test_third_leak")
3266
self.addCleanup(thread_a.join)
3267
self.addCleanup(thread_b.join)
3268
self.addCleanup(thread_c.join)
3269
self.addCleanup(event.set)
3270
result.startTestRun()
3272
[first_test, Test("test_second_no_leak"), third_test]
3274
result.stopTestRun()
3275
self.assertEqual(result._tests_leaking_threads_count, 2)
3276
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3277
self.assertEqual(result.leaks, [
3278
(first_test, {thread_b}),
3279
(third_test, {thread_a, thread_c})])
3280
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3283
class TestPostMortemDebugging(tests.TestCase):
3284
"""Check post mortem debugging works when tests fail or error"""
3286
class TracebackRecordingResult(tests.ExtendedTestResult):
3288
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3289
self.postcode = None
3290
def _post_mortem(self, tb=None):
3291
"""Record the code object at the end of the current traceback"""
3292
tb = tb or sys.exc_info()[2]
3295
while next is not None:
3298
self.postcode = tb.tb_frame.f_code
3299
def report_error(self, test, err):
3301
def report_failure(self, test, err):
3304
def test_location_unittest_error(self):
3305
"""Needs right post mortem traceback with erroring unittest case"""
3306
class Test(unittest.TestCase):
3309
result = self.TracebackRecordingResult()
3311
self.assertEqual(result.postcode, Test.runTest.__code__)
3313
def test_location_unittest_failure(self):
3314
"""Needs right post mortem traceback with failing unittest case"""
3315
class Test(unittest.TestCase):
3317
raise self.failureException
3318
result = self.TracebackRecordingResult()
3320
self.assertEqual(result.postcode, Test.runTest.__code__)
3322
def test_location_bt_error(self):
3323
"""Needs right post mortem traceback with erroring breezy.tests case"""
3324
class Test(tests.TestCase):
3325
def test_error(self):
3327
result = self.TracebackRecordingResult()
3328
Test("test_error").run(result)
3329
self.assertEqual(result.postcode, Test.test_error.__code__)
3331
def test_location_bt_failure(self):
3332
"""Needs right post mortem traceback with failing breezy.tests case"""
3333
class Test(tests.TestCase):
3334
def test_failure(self):
3335
raise self.failureException
3336
result = self.TracebackRecordingResult()
3337
Test("test_failure").run(result)
3338
self.assertEqual(result.postcode, Test.test_failure.__code__)
3340
def test_env_var_triggers_post_mortem(self):
3341
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3343
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3344
post_mortem_calls = []
3345
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3346
self.overrideEnv('BRZ_TEST_PDB', None)
3347
result._post_mortem(1)
3348
self.overrideEnv('BRZ_TEST_PDB', 'on')
3349
result._post_mortem(2)
3350
self.assertEqual([2], post_mortem_calls)
3353
class TestRunSuite(tests.TestCase):
3355
def test_runner_class(self):
3356
"""run_suite accepts and uses a runner_class keyword argument."""
3357
class Stub(tests.TestCase):
3360
suite = Stub("test_foo")
3362
class MyRunner(tests.TextTestRunner):
3363
def run(self, test):
3365
return tests.ExtendedTestResult(self.stream, self.descriptions,
3367
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3368
self.assertLength(1, calls)
3371
class _Selftest(object):
3372
"""Mixin for tests needing full selftest output"""
3374
def _inject_stream_into_subunit(self, stream):
3375
"""To be overridden by subclasses that run tests out of process"""
3377
def _run_selftest(self, **kwargs):
3379
self._inject_stream_into_subunit(sio)
3380
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3381
return sio.getvalue()
3384
class _ForkedSelftest(_Selftest):
3385
"""Mixin for tests needing full selftest output with forked children"""
3387
_test_needs_features = [features.subunit]
3389
def _inject_stream_into_subunit(self, stream):
3390
"""Monkey-patch subunit so the extra output goes to stream not stdout
3392
Some APIs need rewriting so this kind of bogus hackery can be replaced
3393
by passing the stream param from run_tests down into ProtocolTestCase.
3395
from subunit import ProtocolTestCase
3396
_original_init = ProtocolTestCase.__init__
3397
def _init_with_passthrough(self, *args, **kwargs):
3398
_original_init(self, *args, **kwargs)
3399
self._passthrough = stream
3400
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3402
def _run_selftest(self, **kwargs):
3403
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3404
if getattr(os, "fork", None) is None:
3405
raise tests.TestNotApplicable("Platform doesn't support forking")
3406
# Make sure the fork code is actually invoked by claiming two cores
3407
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3408
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3409
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3412
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3413
"""Check operation of --parallel=fork selftest option"""
3415
def test_error_in_child_during_fork(self):
3416
"""Error in a forked child during test setup should get reported"""
3417
class Test(tests.TestCase):
3418
def testMethod(self):
3420
# We don't care what, just break something that a child will run
3421
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3422
out = self._run_selftest(test_suite_factory=Test)
3423
# Lines from the tracebacks of the two child processes may be mixed
3424
# together due to the way subunit parses and forwards the streams,
3425
# so permit extra lines between each part of the error output.
3426
self.assertContainsRe(out,
3429
".+ in fork_for_tests\n"
3431
"\\s*workaround_zealous_crypto_random\\(\\)\n"
3436
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3437
"""Check a test case still alive after being run emits a warning"""
3439
class Test(tests.TestCase):
3440
def test_pass(self):
3442
def test_self_ref(self):
3443
self.also_self = self.test_self_ref
3444
def test_skip(self):
3445
self.skipTest("Don't need")
3447
def _get_suite(self):
3448
return TestUtil.TestSuite([
3449
self.Test("test_pass"),
3450
self.Test("test_self_ref"),
3451
self.Test("test_skip"),
3454
def _run_selftest_with_suite(self, **kwargs):
3455
old_flags = tests.selftest_debug_flags
3456
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3457
gc_on = gc.isenabled()
3461
output = self._run_selftest(test_suite_factory=self._get_suite,
3466
tests.selftest_debug_flags = old_flags
3467
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3468
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3471
def test_testsuite(self):
3472
self._run_selftest_with_suite()
3474
def test_pattern(self):
3475
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3476
self.assertNotContainsRe(out, "test_skip")
3478
def test_exclude_pattern(self):
3479
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3480
self.assertNotContainsRe(out, "test_skip")
3482
def test_random_seed(self):
3483
self._run_selftest_with_suite(random_seed="now")
3485
def test_matching_tests_first(self):
3486
self._run_selftest_with_suite(matching_tests_first=True,
3487
pattern="test_self_ref$")
3489
def test_starting_with_and_exclude(self):
3490
out = self._run_selftest_with_suite(starting_with=["bt."],
3491
exclude_pattern="test_skip$")
3492
self.assertNotContainsRe(out, "test_skip")
3494
def test_additonal_decorator(self):
3495
out = self._run_selftest_with_suite(
3496
suite_decorators=[tests.TestDecorator])
3499
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3500
"""Check warnings from tests staying alive are emitted with subunit"""
3502
_test_needs_features = [features.subunit]
3504
def _run_selftest_with_suite(self, **kwargs):
3505
return TestUncollectedWarnings._run_selftest_with_suite(
3506
self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3509
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3510
"""Check warnings from tests staying alive are emitted when forking"""
3513
class TestEnvironHandling(tests.TestCase):
3515
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3516
self.assertFalse('MYVAR' in os.environ)
3517
self.overrideEnv('MYVAR', '42')
3518
# We use an embedded test to make sure we fix the _captureVar bug
3519
class Test(tests.TestCase):
3521
# The first call save the 42 value
3522
self.overrideEnv('MYVAR', None)
3523
self.assertEqual(None, os.environ.get('MYVAR'))
3524
# Make sure we can call it twice
3525
self.overrideEnv('MYVAR', None)
3526
self.assertEqual(None, os.environ.get('MYVAR'))
3528
result = tests.TextTestResult(output, 0, 1)
3529
Test('test_me').run(result)
3530
if not result.wasStrictlySuccessful():
3531
self.fail(output.getvalue())
3532
# We get our value back
3533
self.assertEqual('42', os.environ.get('MYVAR'))
3536
class TestIsolatedEnv(tests.TestCase):
3537
"""Test isolating tests from os.environ.
3539
Since we use tests that are already isolated from os.environ a bit of care
3540
should be taken when designing the tests to avoid bootstrap side-effects.
3541
The tests start an already clean os.environ which allow doing valid
3542
assertions about which variables are present or not and design tests around
3546
class ScratchMonkey(tests.TestCase):
3551
def test_basics(self):
3552
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3553
# for tests.TestCase.
3554
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3555
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3556
# Being part of isolated_environ, BRZ_HOME should not appear here
3557
self.assertFalse('BRZ_HOME' in os.environ)
3558
# Make sure we know the definition of LINES: part of os.environ for
3560
self.assertTrue('LINES' in tests.isolated_environ)
3561
self.assertEqual('25', tests.isolated_environ['LINES'])
3562
self.assertEqual('25', os.environ['LINES'])
3564
def test_injecting_unknown_variable(self):
3565
# BRZ_HOME is known to be absent from os.environ
3566
test = self.ScratchMonkey('test_me')
3567
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3568
self.assertEqual('foo', os.environ['BRZ_HOME'])
3569
tests.restore_os_environ(test)
3570
self.assertFalse('BRZ_HOME' in os.environ)
3572
def test_injecting_known_variable(self):
3573
test = self.ScratchMonkey('test_me')
3574
# LINES is known to be present in os.environ
3575
tests.override_os_environ(test, {'LINES': '42'})
3576
self.assertEqual('42', os.environ['LINES'])
3577
tests.restore_os_environ(test)
3578
self.assertEqual('25', os.environ['LINES'])
3580
def test_deleting_variable(self):
3581
test = self.ScratchMonkey('test_me')
3582
# LINES is known to be present in os.environ
3583
tests.override_os_environ(test, {'LINES': None})
3584
self.assertTrue('LINES' not in os.environ)
3585
tests.restore_os_environ(test)
3586
self.assertEqual('25', os.environ['LINES'])
3589
class TestDocTestSuiteIsolation(tests.TestCase):
3590
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3592
Since tests.TestCase alreay provides an isolation from os.environ, we use
3593
the clean environment as a base for testing. To precisely capture the
3594
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3597
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3598
not `os.environ` so each test overrides it to suit its needs.
3602
def get_doctest_suite_for_string(self, klass, string):
3603
class Finder(doctest.DocTestFinder):
3605
def find(*args, **kwargs):
3606
test = doctest.DocTestParser().get_doctest(
3607
string, {}, 'foo', 'foo.py', 0)
3610
suite = klass(test_finder=Finder())
3613
def run_doctest_suite_for_string(self, klass, string):
3614
suite = self.get_doctest_suite_for_string(klass, string)
3616
result = tests.TextTestResult(output, 0, 1)
3618
return result, output
3620
def assertDocTestStringSucceds(self, klass, string):
3621
result, output = self.run_doctest_suite_for_string(klass, string)
3622
if not result.wasStrictlySuccessful():
3623
self.fail(output.getvalue())
3625
def assertDocTestStringFails(self, klass, string):
3626
result, output = self.run_doctest_suite_for_string(klass, string)
3627
if result.wasStrictlySuccessful():
3628
self.fail(output.getvalue())
3630
def test_injected_variable(self):
3631
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3634
>>> os.environ['LINES']
3637
# doctest.DocTestSuite fails as it sees '25'
3638
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3639
# tests.DocTestSuite sees '42'
3640
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3642
def test_deleted_variable(self):
3643
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3646
>>> os.environ.get('LINES')
3648
# doctest.DocTestSuite fails as it sees '25'
3649
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3650
# tests.DocTestSuite sees None
3651
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3654
class TestSelftestExcludePatterns(tests.TestCase):
3657
super(TestSelftestExcludePatterns, self).setUp()
3658
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3660
def suite_factory(self, keep_only=None, starting_with=None):
3661
"""A test suite factory with only a few tests."""
3662
class Test(tests.TestCase):
3664
# We don't need the full class path
3665
return self._testMethodName
3672
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3674
def assertTestList(self, expected, *selftest_args):
3675
# We rely on setUp installing the right test suite factory so we can
3676
# test at the command level without loading the whole test suite
3677
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3678
actual = out.splitlines()
3679
self.assertEqual(expected, actual)
3681
def test_full_list(self):
3682
self.assertTestList(['a', 'b', 'c'])
3684
def test_single_exclude(self):
3685
self.assertTestList(['b', 'c'], '-x', 'a')
3687
def test_mutiple_excludes(self):
3688
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3691
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3693
_test_needs_features = [features.subunit]
3696
super(TestCounterHooks, self).setUp()
3697
class Test(tests.TestCase):
3700
super(Test, self).setUp()
3701
self.hooks = hooks.Hooks()
3702
self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3703
self.install_counter_hook(self.hooks, 'myhook')
3708
def run_hook_once(self):
3709
for hook in self.hooks['myhook']:
3712
self.test_class = Test
3714
def assertHookCalls(self, expected_calls, test_name):
3715
test = self.test_class(test_name)
3716
result = unittest.TestResult()
3718
self.assertTrue(hasattr(test, '_counters'))
3719
self.assertTrue('myhook' in test._counters)
3720
self.assertEqual(expected_calls, test._counters['myhook'])
3722
def test_no_hook(self):
3723
self.assertHookCalls(0, 'no_hook')
3725
def test_run_hook_once(self):
3726
tt = features.testtools
3727
if tt.module.__version__ < (0, 9, 8):
3728
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3729
self.assertHookCalls(1, 'run_hook_once')