1
# Copyright (C) 2005-2013, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the test framework."""
29
from testtools import (
30
ExtendedToOriginalDecorator,
33
from testtools.content import Content
34
from testtools.content_type import ContentType
35
from testtools.matchers import (
39
import testtools.testresult.doubles
65
from ..sixish import (
68
from ..symbol_versioning import (
79
from ..trace import note, mutter
80
from ..transport import memory
83
def _test_ids(test_suite):
84
"""Get the ids for the tests in a test suite."""
85
return [t.id() for t in tests.iter_suite_tests(test_suite)]
88
class MetaTestLog(tests.TestCase):
90
def test_logging(self):
91
"""Test logs are captured when a test fails."""
92
self.log('a test message')
93
details = self.getDetails()
95
self.assertThat(log.content_type, Equals(ContentType(
96
"text", "plain", {"charset": "utf8"})))
97
self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
98
self.assertThat(self.get_log(),
99
DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
102
class TestTreeShape(tests.TestCaseInTempDir):
104
def test_unicode_paths(self):
105
self.requireFeature(features.UnicodeFilenameFeature)
107
filename = u'hell\u00d8'
108
self.build_tree_contents([(filename, 'contents of hello')])
109
self.assertPathExists(filename)
112
class TestClassesAvailable(tests.TestCase):
113
"""As a convenience we expose Test* classes from breezy.tests"""
115
def test_test_case(self):
116
from . import TestCase
118
def test_test_loader(self):
119
from . import TestLoader
121
def test_test_suite(self):
122
from . import TestSuite
125
class TestTransportScenarios(tests.TestCase):
126
"""A group of tests that test the transport implementation adaption core.
128
This is a meta test that the tests are applied to all available
131
This will be generalised in the future which is why it is in this
132
test file even though it is specific to transport tests at the moment.
135
def test_get_transport_permutations(self):
136
# this checks that get_test_permutations defined by the module is
137
# called by the get_transport_test_permutations function.
138
class MockModule(object):
139
def get_test_permutations(self):
140
return sample_permutation
141
sample_permutation = [(1,2), (3,4)]
142
from .per_transport import get_transport_test_permutations
143
self.assertEqual(sample_permutation,
144
get_transport_test_permutations(MockModule()))
146
def test_scenarios_include_all_modules(self):
147
# this checks that the scenario generator returns as many permutations
148
# as there are in all the registered transport modules - we assume if
149
# this matches its probably doing the right thing especially in
150
# combination with the tests for setting the right classes below.
151
from .per_transport import transport_test_permutations
152
from ..transport import _get_transport_modules
153
modules = _get_transport_modules()
154
permutation_count = 0
155
for module in modules:
157
permutation_count += len(reduce(getattr,
158
(module + ".get_test_permutations").split('.')[1:],
159
__import__(module))())
160
except errors.DependencyNotPresent:
162
scenarios = transport_test_permutations()
163
self.assertEqual(permutation_count, len(scenarios))
165
def test_scenarios_include_transport_class(self):
166
# This test used to know about all the possible transports and the
167
# order they were returned but that seems overly brittle (mbp
169
from .per_transport import transport_test_permutations
170
scenarios = transport_test_permutations()
171
# there are at least that many builtin transports
172
self.assertTrue(len(scenarios) > 6)
173
one_scenario = scenarios[0]
174
self.assertIsInstance(one_scenario[0], str)
175
self.assertTrue(issubclass(one_scenario[1]["transport_class"],
176
breezy.transport.Transport))
177
self.assertTrue(issubclass(one_scenario[1]["transport_server"],
178
breezy.transport.Server))
181
class TestBranchScenarios(tests.TestCase):
183
def test_scenarios(self):
184
# check that constructor parameters are passed through to the adapted
186
from .per_branch import make_scenarios
189
formats = [("c", "C"), ("d", "D")]
190
scenarios = make_scenarios(server1, server2, formats)
191
self.assertEqual(2, len(scenarios))
194
{'branch_format': 'c',
195
'bzrdir_format': 'C',
196
'transport_readonly_server': 'b',
197
'transport_server': 'a'}),
199
{'branch_format': 'd',
200
'bzrdir_format': 'D',
201
'transport_readonly_server': 'b',
202
'transport_server': 'a'})],
206
class TestBzrDirScenarios(tests.TestCase):
208
def test_scenarios(self):
209
# check that constructor parameters are passed through to the adapted
211
from .per_controldir import make_scenarios
216
scenarios = make_scenarios(vfs_factory, server1, server2, formats)
219
{'bzrdir_format': 'c',
220
'transport_readonly_server': 'b',
221
'transport_server': 'a',
222
'vfs_transport_factory': 'v'}),
224
{'bzrdir_format': 'd',
225
'transport_readonly_server': 'b',
226
'transport_server': 'a',
227
'vfs_transport_factory': 'v'})],
231
class TestRepositoryScenarios(tests.TestCase):
233
def test_formats_to_scenarios(self):
234
from .per_repository import formats_to_scenarios
235
formats = [("(c)", remote.RemoteRepositoryFormat()),
236
("(d)", repository.format_registry.get(
237
'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
238
no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
240
vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
241
vfs_transport_factory="vfs")
242
# no_vfs generate scenarios without vfs_transport_factory
244
('RemoteRepositoryFormat(c)',
245
{'bzrdir_format': remote.RemoteBzrDirFormat(),
246
'repository_format': remote.RemoteRepositoryFormat(),
247
'transport_readonly_server': 'readonly',
248
'transport_server': 'server'}),
249
('RepositoryFormat2a(d)',
250
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
251
'repository_format': groupcompress_repo.RepositoryFormat2a(),
252
'transport_readonly_server': 'readonly',
253
'transport_server': 'server'})]
254
self.assertEqual(expected, no_vfs_scenarios)
256
('RemoteRepositoryFormat(c)',
257
{'bzrdir_format': remote.RemoteBzrDirFormat(),
258
'repository_format': remote.RemoteRepositoryFormat(),
259
'transport_readonly_server': 'readonly',
260
'transport_server': 'server',
261
'vfs_transport_factory': 'vfs'}),
262
('RepositoryFormat2a(d)',
263
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
264
'repository_format': groupcompress_repo.RepositoryFormat2a(),
265
'transport_readonly_server': 'readonly',
266
'transport_server': 'server',
267
'vfs_transport_factory': 'vfs'})],
271
class TestTestScenarioApplication(tests.TestCase):
272
"""Tests for the test adaption facilities."""
274
def test_apply_scenario(self):
275
from breezy.tests import apply_scenario
276
input_test = TestTestScenarioApplication("test_apply_scenario")
277
# setup two adapted tests
278
adapted_test1 = apply_scenario(input_test,
280
{"bzrdir_format":"bzr_format",
281
"repository_format":"repo_fmt",
282
"transport_server":"transport_server",
283
"transport_readonly_server":"readonly-server"}))
284
adapted_test2 = apply_scenario(input_test,
285
("new id 2", {"bzrdir_format":None}))
286
# input_test should have been altered.
287
self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
288
# the new tests are mutually incompatible, ensuring it has
289
# made new ones, and unspecified elements in the scenario
290
# should not have been altered.
291
self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
292
self.assertEqual("repo_fmt", adapted_test1.repository_format)
293
self.assertEqual("transport_server", adapted_test1.transport_server)
294
self.assertEqual("readonly-server",
295
adapted_test1.transport_readonly_server)
297
"breezy.tests.test_selftest.TestTestScenarioApplication."
298
"test_apply_scenario(new id)",
300
self.assertEqual(None, adapted_test2.bzrdir_format)
302
"breezy.tests.test_selftest.TestTestScenarioApplication."
303
"test_apply_scenario(new id 2)",
307
class TestInterRepositoryScenarios(tests.TestCase):
309
def test_scenarios(self):
310
# check that constructor parameters are passed through to the adapted
312
from .per_interrepository import make_scenarios
315
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
scenarios = make_scenarios(server1, server2, formats)
319
{'repository_format': 'C1',
320
'repository_format_to': 'C2',
321
'transport_readonly_server': 'b',
322
'transport_server': 'a',
323
'extra_setup': 'C3'}),
325
{'repository_format': 'D1',
326
'repository_format_to': 'D2',
327
'transport_readonly_server': 'b',
328
'transport_server': 'a',
329
'extra_setup': 'D3'})],
333
class TestWorkingTreeScenarios(tests.TestCase):
335
def test_scenarios(self):
336
# check that constructor parameters are passed through to the adapted
338
from .per_workingtree import make_scenarios
341
formats = [workingtree_4.WorkingTreeFormat4(),
342
workingtree_3.WorkingTreeFormat3(),
343
workingtree_4.WorkingTreeFormat6()]
344
scenarios = make_scenarios(server1, server2, formats,
345
remote_server='c', remote_readonly_server='d',
346
remote_backing_server='e')
348
('WorkingTreeFormat4',
349
{'bzrdir_format': formats[0]._matchingbzrdir,
350
'transport_readonly_server': 'b',
351
'transport_server': 'a',
352
'workingtree_format': formats[0]}),
353
('WorkingTreeFormat3',
354
{'bzrdir_format': formats[1]._matchingbzrdir,
355
'transport_readonly_server': 'b',
356
'transport_server': 'a',
357
'workingtree_format': formats[1]}),
358
('WorkingTreeFormat6',
359
{'bzrdir_format': formats[2]._matchingbzrdir,
360
'transport_readonly_server': 'b',
361
'transport_server': 'a',
362
'workingtree_format': formats[2]}),
363
('WorkingTreeFormat6,remote',
364
{'bzrdir_format': formats[2]._matchingbzrdir,
365
'repo_is_remote': True,
366
'transport_readonly_server': 'd',
367
'transport_server': 'c',
368
'vfs_transport_factory': 'e',
369
'workingtree_format': formats[2]}),
373
class TestTreeScenarios(tests.TestCase):
375
def test_scenarios(self):
376
# the tree implementation scenario generator is meant to setup one
377
# instance for each working tree format, one additional instance
378
# that will use the default wt format, but create a revision tree for
379
# the tests, and one more that uses the default wt format as a
380
# lightweight checkout of a remote repository. This means that the wt
381
# ones should have the workingtree_to_test_tree attribute set to
382
# 'return_parameter' and the revision one set to
383
# revision_tree_from_workingtree.
385
from .per_tree import (
386
_dirstate_tree_from_workingtree,
391
revision_tree_from_workingtree
395
smart_server = test_server.SmartTCPServer_for_testing
396
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
397
mem_server = memory.MemoryServer
398
formats = [workingtree_4.WorkingTreeFormat4(),
399
workingtree_3.WorkingTreeFormat3(),]
400
scenarios = make_scenarios(server1, server2, formats)
401
self.assertEqual(8, len(scenarios))
402
default_wt_format = workingtree.format_registry.get_default()
403
wt4_format = workingtree_4.WorkingTreeFormat4()
404
wt5_format = workingtree_4.WorkingTreeFormat5()
405
wt6_format = workingtree_4.WorkingTreeFormat6()
406
expected_scenarios = [
407
('WorkingTreeFormat4',
408
{'bzrdir_format': formats[0]._matchingbzrdir,
409
'transport_readonly_server': 'b',
410
'transport_server': 'a',
411
'workingtree_format': formats[0],
412
'_workingtree_to_test_tree': return_parameter,
414
('WorkingTreeFormat3',
415
{'bzrdir_format': formats[1]._matchingbzrdir,
416
'transport_readonly_server': 'b',
417
'transport_server': 'a',
418
'workingtree_format': formats[1],
419
'_workingtree_to_test_tree': return_parameter,
421
('WorkingTreeFormat6,remote',
422
{'bzrdir_format': wt6_format._matchingbzrdir,
423
'repo_is_remote': True,
424
'transport_readonly_server': smart_readonly_server,
425
'transport_server': smart_server,
426
'vfs_transport_factory': mem_server,
427
'workingtree_format': wt6_format,
428
'_workingtree_to_test_tree': return_parameter,
431
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
432
'bzrdir_format': default_wt_format._matchingbzrdir,
433
'transport_readonly_server': 'b',
434
'transport_server': 'a',
435
'workingtree_format': default_wt_format,
437
('DirStateRevisionTree,WT4',
438
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
439
'bzrdir_format': wt4_format._matchingbzrdir,
440
'transport_readonly_server': 'b',
441
'transport_server': 'a',
442
'workingtree_format': wt4_format,
444
('DirStateRevisionTree,WT5',
445
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
446
'bzrdir_format': wt5_format._matchingbzrdir,
447
'transport_readonly_server': 'b',
448
'transport_server': 'a',
449
'workingtree_format': wt5_format,
452
{'_workingtree_to_test_tree': preview_tree_pre,
453
'bzrdir_format': default_wt_format._matchingbzrdir,
454
'transport_readonly_server': 'b',
455
'transport_server': 'a',
456
'workingtree_format': default_wt_format}),
458
{'_workingtree_to_test_tree': preview_tree_post,
459
'bzrdir_format': default_wt_format._matchingbzrdir,
460
'transport_readonly_server': 'b',
461
'transport_server': 'a',
462
'workingtree_format': default_wt_format}),
464
self.assertEqual(expected_scenarios, scenarios)
467
class TestInterTreeScenarios(tests.TestCase):
468
"""A group of tests that test the InterTreeTestAdapter."""
470
def test_scenarios(self):
471
# check that constructor parameters are passed through to the adapted
473
# for InterTree tests we want the machinery to bring up two trees in
474
# each instance: the base one, and the one we are interacting with.
475
# because each optimiser can be direction specific, we need to test
476
# each optimiser in its chosen direction.
477
# unlike the TestProviderAdapter we dont want to automatically add a
478
# parameterized one for WorkingTree - the optimisers will tell us what
480
from .per_tree import (
483
from .per_intertree import (
486
from ..bzr.workingtree_3 import WorkingTreeFormat3
487
from ..bzr.workingtree_4 import WorkingTreeFormat4
488
input_test = TestInterTreeScenarios(
492
format1 = WorkingTreeFormat4()
493
format2 = WorkingTreeFormat3()
494
formats = [("1", str, format1, format2, "converter1"),
495
("2", int, format2, format1, "converter2")]
496
scenarios = make_scenarios(server1, server2, formats)
497
self.assertEqual(2, len(scenarios))
498
expected_scenarios = [
500
"bzrdir_format": format1._matchingbzrdir,
501
"intertree_class": formats[0][1],
502
"workingtree_format": formats[0][2],
503
"workingtree_format_to": formats[0][3],
504
"mutable_trees_to_test_trees": formats[0][4],
505
"_workingtree_to_test_tree": return_parameter,
506
"transport_server": server1,
507
"transport_readonly_server": server2,
510
"bzrdir_format": format2._matchingbzrdir,
511
"intertree_class": formats[1][1],
512
"workingtree_format": formats[1][2],
513
"workingtree_format_to": formats[1][3],
514
"mutable_trees_to_test_trees": formats[1][4],
515
"_workingtree_to_test_tree": return_parameter,
516
"transport_server": server1,
517
"transport_readonly_server": server2,
520
self.assertEqual(scenarios, expected_scenarios)
523
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
525
def test_home_is_not_working(self):
526
self.assertNotEqual(self.test_dir, self.test_home_dir)
527
cwd = osutils.getcwd()
528
self.assertIsSameRealPath(self.test_dir, cwd)
529
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
531
def test_assertEqualStat_equal(self):
532
from .test_dirstate import _FakeStat
533
self.build_tree(["foo"])
534
real = os.lstat("foo")
535
fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
536
real.st_dev, real.st_ino, real.st_mode)
537
self.assertEqualStat(real, fake)
539
def test_assertEqualStat_notequal(self):
540
self.build_tree(["foo", "longname"])
541
self.assertRaises(AssertionError, self.assertEqualStat,
542
os.lstat("foo"), os.lstat("longname"))
544
def test_assertPathExists(self):
545
self.assertPathExists('.')
546
self.build_tree(['foo/', 'foo/bar'])
547
self.assertPathExists('foo/bar')
548
self.assertPathDoesNotExist('foo/foo')
551
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
553
def test_home_is_non_existant_dir_under_root(self):
554
"""The test_home_dir for TestCaseWithMemoryTransport is missing.
556
This is because TestCaseWithMemoryTransport is for tests that do not
557
need any disk resources: they should be hooked into breezy in such a
558
way that no global settings are being changed by the test (only a
559
few tests should need to do that), and having a missing dir as home is
560
an effective way to ensure that this is the case.
562
self.assertIsSameRealPath(
563
self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
565
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
567
def test_cwd_is_TEST_ROOT(self):
568
self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
569
cwd = osutils.getcwd()
570
self.assertIsSameRealPath(self.test_dir, cwd)
572
def test_BRZ_HOME_and_HOME_are_bytestrings(self):
573
"""The $BRZ_HOME and $HOME environment variables should not be unicode.
575
See https://bugs.launchpad.net/bzr/+bug/464174
577
self.assertIsInstance(os.environ['BRZ_HOME'], str)
578
self.assertIsInstance(os.environ['HOME'], str)
580
def test_make_branch_and_memory_tree(self):
581
"""In TestCaseWithMemoryTransport we should not make the branch on disk.
583
This is hard to comprehensively robustly test, so we settle for making
584
a branch and checking no directory was created at its relpath.
586
tree = self.make_branch_and_memory_tree('dir')
587
# Guard against regression into MemoryTransport leaking
588
# files to disk instead of keeping them in memory.
589
self.assertFalse(osutils.lexists('dir'))
590
self.assertIsInstance(tree, memorytree.MemoryTree)
592
def test_make_branch_and_memory_tree_with_format(self):
593
"""make_branch_and_memory_tree should accept a format option."""
594
format = bzrdir.BzrDirMetaFormat1()
595
format.repository_format = repository.format_registry.get_default()
596
tree = self.make_branch_and_memory_tree('dir', format=format)
597
# Guard against regression into MemoryTransport leaking
598
# files to disk instead of keeping them in memory.
599
self.assertFalse(osutils.lexists('dir'))
600
self.assertIsInstance(tree, memorytree.MemoryTree)
601
self.assertEqual(format.repository_format.__class__,
602
tree.branch.repository._format.__class__)
604
def test_make_branch_builder(self):
605
builder = self.make_branch_builder('dir')
606
self.assertIsInstance(builder, branchbuilder.BranchBuilder)
607
# Guard against regression into MemoryTransport leaking
608
# files to disk instead of keeping them in memory.
609
self.assertFalse(osutils.lexists('dir'))
611
def test_make_branch_builder_with_format(self):
612
# Use a repo layout that doesn't conform to a 'named' layout, to ensure
613
# that the format objects are used.
614
format = bzrdir.BzrDirMetaFormat1()
615
repo_format = repository.format_registry.get_default()
616
format.repository_format = repo_format
617
builder = self.make_branch_builder('dir', format=format)
618
the_branch = builder.get_branch()
619
# Guard against regression into MemoryTransport leaking
620
# files to disk instead of keeping them in memory.
621
self.assertFalse(osutils.lexists('dir'))
622
self.assertEqual(format.repository_format.__class__,
623
the_branch.repository._format.__class__)
624
self.assertEqual(repo_format.get_format_string(),
625
self.get_transport().get_bytes(
626
'dir/.bzr/repository/format'))
628
def test_make_branch_builder_with_format_name(self):
629
builder = self.make_branch_builder('dir', format='knit')
630
the_branch = builder.get_branch()
631
# Guard against regression into MemoryTransport leaking
632
# files to disk instead of keeping them in memory.
633
self.assertFalse(osutils.lexists('dir'))
634
dir_format = controldir.format_registry.make_controldir('knit')
635
self.assertEqual(dir_format.repository_format.__class__,
636
the_branch.repository._format.__class__)
637
self.assertEqual('Bazaar-NG Knit Repository Format 1',
638
self.get_transport().get_bytes(
639
'dir/.bzr/repository/format'))
641
def test_dangling_locks_cause_failures(self):
642
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
643
def test_function(self):
644
t = self.get_transport_from_path('.')
645
l = lockdir.LockDir(t, 'lock')
648
test = TestDanglingLock('test_function')
650
total_failures = result.errors + result.failures
651
if self._lock_check_thorough:
652
self.assertEqual(1, len(total_failures))
654
# When _lock_check_thorough is disabled, then we don't trigger a
656
self.assertEqual(0, len(total_failures))
659
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
660
"""Tests for the convenience functions TestCaseWithTransport introduces."""
662
def test_get_readonly_url_none(self):
663
from ..transport.readonly import ReadonlyTransportDecorator
664
self.vfs_transport_factory = memory.MemoryServer
665
self.transport_readonly_server = None
666
# calling get_readonly_transport() constructs a decorator on the url
668
url = self.get_readonly_url()
669
url2 = self.get_readonly_url('foo/bar')
670
t = transport.get_transport_from_url(url)
671
t2 = transport.get_transport_from_url(url2)
672
self.assertIsInstance(t, ReadonlyTransportDecorator)
673
self.assertIsInstance(t2, ReadonlyTransportDecorator)
674
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
676
def test_get_readonly_url_http(self):
677
from .http_server import HttpServer
678
from ..transport.http import HttpTransportBase
679
self.transport_server = test_server.LocalURLServer
680
self.transport_readonly_server = HttpServer
681
# calling get_readonly_transport() gives us a HTTP server instance.
682
url = self.get_readonly_url()
683
url2 = self.get_readonly_url('foo/bar')
684
# the transport returned may be any HttpTransportBase subclass
685
t = transport.get_transport_from_url(url)
686
t2 = transport.get_transport_from_url(url2)
687
self.assertIsInstance(t, HttpTransportBase)
688
self.assertIsInstance(t2, HttpTransportBase)
689
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
691
def test_is_directory(self):
692
"""Test assertIsDirectory assertion"""
693
t = self.get_transport()
694
self.build_tree(['a_dir/', 'a_file'], transport=t)
695
self.assertIsDirectory('a_dir', t)
696
self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
697
self.assertRaises(AssertionError, self.assertIsDirectory, 'not_here', t)
699
def test_make_branch_builder(self):
700
builder = self.make_branch_builder('dir')
701
rev_id = builder.build_commit()
702
self.assertPathExists('dir')
703
a_dir = controldir.ControlDir.open('dir')
704
self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
705
a_branch = a_dir.open_branch()
706
builder_branch = builder.get_branch()
707
self.assertEqual(a_branch.base, builder_branch.base)
708
self.assertEqual((1, rev_id), builder_branch.last_revision_info())
709
self.assertEqual((1, rev_id), a_branch.last_revision_info())
712
class TestTestCaseTransports(tests.TestCaseWithTransport):
715
super(TestTestCaseTransports, self).setUp()
716
self.vfs_transport_factory = memory.MemoryServer
718
def test_make_controldir_preserves_transport(self):
719
t = self.get_transport()
720
result_bzrdir = self.make_controldir('subdir')
721
self.assertIsInstance(result_bzrdir.transport,
722
memory.MemoryTransport)
723
# should not be on disk, should only be in memory
724
self.assertPathDoesNotExist('subdir')
727
class TestChrootedTest(tests.ChrootedTestCase):
729
def test_root_is_root(self):
730
t = transport.get_transport_from_url(self.get_readonly_url())
732
self.assertEqual(url, t.clone('..').base)
735
class TestProfileResult(tests.TestCase):
737
def test_profiles_tests(self):
738
self.requireFeature(features.lsprof_feature)
739
terminal = testtools.testresult.doubles.ExtendedTestResult()
740
result = tests.ProfileResult(terminal)
741
class Sample(tests.TestCase):
743
self.sample_function()
744
def sample_function(self):
748
case = terminal._events[0][1]
749
self.assertLength(1, case._benchcalls)
750
# We must be able to unpack it as the test reporting code wants
751
(_, _, _), stats = case._benchcalls[0]
752
self.assertTrue(callable(stats.pprint))
755
class TestTestResult(tests.TestCase):
757
def check_timing(self, test_case, expected_re):
758
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
759
capture = testtools.testresult.doubles.ExtendedTestResult()
760
test_case.run(MultiTestResult(result, capture))
761
run_case = capture._events[0][1]
762
timed_string = result._testTimeString(run_case)
763
self.assertContainsRe(timed_string, expected_re)
765
def test_test_reporting(self):
766
class ShortDelayTestCase(tests.TestCase):
767
def test_short_delay(self):
769
def test_short_benchmark(self):
770
self.time(time.sleep, 0.003)
771
self.check_timing(ShortDelayTestCase('test_short_delay'),
773
# if a benchmark time is given, we now show just that time followed by
775
self.check_timing(ShortDelayTestCase('test_short_benchmark'),
778
def test_unittest_reporting_unittest_class(self):
779
# getting the time from a non-breezy test works ok
780
class ShortDelayTestCase(unittest.TestCase):
781
def test_short_delay(self):
783
self.check_timing(ShortDelayTestCase('test_short_delay'),
786
def _time_hello_world_encoding(self):
787
"""Profile two sleep calls
789
This is used to exercise the test framework.
791
self.time(unicode, 'hello', errors='replace')
792
self.time(unicode, 'world', errors='replace')
794
def test_lsprofiling(self):
795
"""Verbose test result prints lsprof statistics from test cases."""
796
self.requireFeature(features.lsprof_feature)
797
result_stream = StringIO()
798
result = breezy.tests.VerboseTestResult(
803
# we want profile a call of some sort and check it is output by
804
# addSuccess. We dont care about addError or addFailure as they
805
# are not that interesting for performance tuning.
806
# make a new test instance that when run will generate a profile
807
example_test_case = TestTestResult("_time_hello_world_encoding")
808
example_test_case._gather_lsprof_in_benchmarks = True
809
# execute the test, which should succeed and record profiles
810
example_test_case.run(result)
811
# lsprofile_something()
812
# if this worked we want
813
# LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
814
# CallCount Recursive Total(ms) Inline(ms) module:lineno(function)
815
# (the lsprof header)
816
# ... an arbitrary number of lines
817
# and the function call which is time.sleep.
818
# 1 0 ??? ??? ???(sleep)
819
# and then repeated but with 'world', rather than 'hello'.
820
# this should appear in the output stream of our test result.
821
output = result_stream.getvalue()
822
self.assertContainsRe(output,
823
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
824
self.assertContainsRe(output,
825
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
826
self.assertContainsRe(output,
827
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
828
self.assertContainsRe(output,
829
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
831
def test_uses_time_from_testtools(self):
832
"""Test case timings in verbose results should use testtools times"""
834
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
835
def startTest(self, test):
836
self.time(datetime.datetime.utcfromtimestamp(1.145))
837
super(TimeAddedVerboseTestResult, self).startTest(test)
838
def addSuccess(self, test):
839
self.time(datetime.datetime.utcfromtimestamp(51.147))
840
super(TimeAddedVerboseTestResult, self).addSuccess(test)
841
def report_tests_starting(self): pass
843
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
844
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
846
def test_known_failure(self):
847
"""Using knownFailure should trigger several result actions."""
848
class InstrumentedTestResult(tests.ExtendedTestResult):
849
def stopTestRun(self): pass
850
def report_tests_starting(self): pass
851
def report_known_failure(self, test, err=None, details=None):
852
self._call = test, 'known failure'
853
result = InstrumentedTestResult(None, None, None, None)
854
class Test(tests.TestCase):
855
def test_function(self):
856
self.knownFailure('failed!')
857
test = Test("test_function")
859
# it should invoke 'report_known_failure'.
860
self.assertEqual(2, len(result._call))
861
self.assertEqual(test.id(), result._call[0].id())
862
self.assertEqual('known failure', result._call[1])
863
# we dont introspec the traceback, if the rest is ok, it would be
864
# exceptional for it not to be.
865
# it should update the known_failure_count on the object.
866
self.assertEqual(1, result.known_failure_count)
867
# the result should be successful.
868
self.assertTrue(result.wasSuccessful())
870
def test_verbose_report_known_failure(self):
871
# verbose test output formatting
872
result_stream = StringIO()
873
result = breezy.tests.VerboseTestResult(
878
_get_test("test_xfail").run(result)
879
self.assertContainsRe(result_stream.getvalue(),
880
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
881
"\\s*(?:Text attachment: )?reason"
886
def get_passing_test(self):
887
"""Return a test object that can't be run usefully."""
890
return unittest.FunctionTestCase(passing_test)
892
def test_add_not_supported(self):
893
"""Test the behaviour of invoking addNotSupported."""
894
class InstrumentedTestResult(tests.ExtendedTestResult):
895
def stopTestRun(self): pass
896
def report_tests_starting(self): pass
897
def report_unsupported(self, test, feature):
898
self._call = test, feature
899
result = InstrumentedTestResult(None, None, None, None)
900
test = SampleTestCase('_test_pass')
901
feature = features.Feature()
902
result.startTest(test)
903
result.addNotSupported(test, feature)
904
# it should invoke 'report_unsupported'.
905
self.assertEqual(2, len(result._call))
906
self.assertEqual(test, result._call[0])
907
self.assertEqual(feature, result._call[1])
908
# the result should be successful.
909
self.assertTrue(result.wasSuccessful())
910
# it should record the test against a count of tests not run due to
912
self.assertEqual(1, result.unsupported['Feature'])
913
# and invoking it again should increment that counter
914
result.addNotSupported(test, feature)
915
self.assertEqual(2, result.unsupported['Feature'])
917
def test_verbose_report_unsupported(self):
918
# verbose test output formatting
919
result_stream = StringIO()
920
result = breezy.tests.VerboseTestResult(
925
test = self.get_passing_test()
926
feature = features.Feature()
927
result.startTest(test)
928
prefix = len(result_stream.getvalue())
929
result.report_unsupported(test, feature)
930
output = result_stream.getvalue()[prefix:]
931
lines = output.splitlines()
932
# We don't check for the final '0ms' since it may fail on slow hosts
933
self.assertStartsWith(lines[0], 'NODEP')
934
self.assertEqual(lines[1],
935
" The feature 'Feature' is not available.")
937
def test_unavailable_exception(self):
938
"""An UnavailableFeature being raised should invoke addNotSupported."""
939
class InstrumentedTestResult(tests.ExtendedTestResult):
940
def stopTestRun(self): pass
941
def report_tests_starting(self): pass
942
def addNotSupported(self, test, feature):
943
self._call = test, feature
944
result = InstrumentedTestResult(None, None, None, None)
945
feature = features.Feature()
946
class Test(tests.TestCase):
947
def test_function(self):
948
raise tests.UnavailableFeature(feature)
949
test = Test("test_function")
951
# it should invoke 'addNotSupported'.
952
self.assertEqual(2, len(result._call))
953
self.assertEqual(test.id(), result._call[0].id())
954
self.assertEqual(feature, result._call[1])
955
# and not count as an error
956
self.assertEqual(0, result.error_count)
958
def test_strict_with_unsupported_feature(self):
959
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
960
test = self.get_passing_test()
961
feature = "Unsupported Feature"
962
result.addNotSupported(test, feature)
963
self.assertFalse(result.wasStrictlySuccessful())
964
self.assertEqual(None, result._extractBenchmarkTime(test))
966
def test_strict_with_known_failure(self):
967
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
968
test = _get_test("test_xfail")
970
self.assertFalse(result.wasStrictlySuccessful())
971
self.assertEqual(None, result._extractBenchmarkTime(test))
973
def test_strict_with_success(self):
974
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
975
test = self.get_passing_test()
976
result.addSuccess(test)
977
self.assertTrue(result.wasStrictlySuccessful())
978
self.assertEqual(None, result._extractBenchmarkTime(test))
980
def test_startTests(self):
981
"""Starting the first test should trigger startTests."""
982
class InstrumentedTestResult(tests.ExtendedTestResult):
984
def startTests(self): self.calls += 1
985
result = InstrumentedTestResult(None, None, None, None)
988
test = unittest.FunctionTestCase(test_function)
990
self.assertEqual(1, result.calls)
992
def test_startTests_only_once(self):
993
"""With multiple tests startTests should still only be called once"""
994
class InstrumentedTestResult(tests.ExtendedTestResult):
996
def startTests(self): self.calls += 1
997
result = InstrumentedTestResult(None, None, None, None)
998
suite = unittest.TestSuite([
999
unittest.FunctionTestCase(lambda: None),
1000
unittest.FunctionTestCase(lambda: None)])
1002
self.assertEqual(1, result.calls)
1003
self.assertEqual(2, result.count)
1006
class TestRunner(tests.TestCase):
1008
def dummy_test(self):
1011
def run_test_runner(self, testrunner, test):
1012
"""Run suite in testrunner, saving global state and restoring it.
1014
This current saves and restores:
1015
TestCaseInTempDir.TEST_ROOT
1017
There should be no tests in this file that use
1018
breezy.tests.TextTestRunner without using this convenience method,
1019
because of our use of global state.
1021
old_root = tests.TestCaseInTempDir.TEST_ROOT
1023
tests.TestCaseInTempDir.TEST_ROOT = None
1024
return testrunner.run(test)
1026
tests.TestCaseInTempDir.TEST_ROOT = old_root
1028
def test_known_failure_failed_run(self):
1029
# run a test that generates a known failure which should be printed in
1030
# the final output when real failures occur.
1031
class Test(tests.TestCase):
1032
def known_failure_test(self):
1033
self.expectFailure('failed', self.assertTrue, False)
1034
test = unittest.TestSuite()
1035
test.addTest(Test("known_failure_test"))
1037
raise AssertionError('foo')
1038
test.addTest(unittest.FunctionTestCase(failing_test))
1040
runner = tests.TextTestRunner(stream=stream)
1041
result = self.run_test_runner(runner, test)
1042
lines = stream.getvalue().splitlines()
1043
self.assertContainsRe(stream.getvalue(),
1044
'(?sm)^brz selftest.*$'
1046
'^======================================================================\n'
1047
'^FAIL: failing_test\n'
1048
'^----------------------------------------------------------------------\n'
1049
'Traceback \\(most recent call last\\):\n'
1050
' .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1051
' raise AssertionError\\(\'foo\'\\)\n'
1053
'^----------------------------------------------------------------------\n'
1055
'FAILED \\(failures=1, known_failure_count=1\\)'
1058
def test_known_failure_ok_run(self):
1059
# run a test that generates a known failure which should be printed in
1061
class Test(tests.TestCase):
1062
def known_failure_test(self):
1063
self.knownFailure("Never works...")
1064
test = Test("known_failure_test")
1066
runner = tests.TextTestRunner(stream=stream)
1067
result = self.run_test_runner(runner, test)
1068
self.assertContainsRe(stream.getvalue(),
1071
'Ran 1 test in .*\n'
1073
'OK \\(known_failures=1\\)\n')
1075
def test_unexpected_success_bad(self):
1076
class Test(tests.TestCase):
1077
def test_truth(self):
1078
self.expectFailure("No absolute truth", self.assertTrue, True)
1079
runner = tests.TextTestRunner(stream=StringIO())
1080
result = self.run_test_runner(runner, Test("test_truth"))
1081
self.assertContainsRe(runner.stream.getvalue(),
1083
"FAIL: \\S+\.test_truth\n"
1086
"\\s*(?:Text attachment: )?reason"
1092
"Ran 1 test in .*\n"
1094
"FAILED \\(failures=1\\)\n\\Z")
1096
def test_result_decorator(self):
1099
class LoggingDecorator(ExtendedToOriginalDecorator):
1100
def startTest(self, test):
1101
ExtendedToOriginalDecorator.startTest(self, test)
1102
calls.append('start')
1103
test = unittest.FunctionTestCase(lambda:None)
1105
runner = tests.TextTestRunner(stream=stream,
1106
result_decorators=[LoggingDecorator])
1107
result = self.run_test_runner(runner, test)
1108
self.assertLength(1, calls)
1110
def test_skipped_test(self):
1111
# run a test that is skipped, and check the suite as a whole still
1113
# skipping_test must be hidden in here so it's not run as a real test
1114
class SkippingTest(tests.TestCase):
1115
def skipping_test(self):
1116
raise tests.TestSkipped('test intentionally skipped')
1117
runner = tests.TextTestRunner(stream=StringIO())
1118
test = SkippingTest("skipping_test")
1119
result = self.run_test_runner(runner, test)
1120
self.assertTrue(result.wasSuccessful())
1122
def test_skipped_from_setup(self):
1124
class SkippedSetupTest(tests.TestCase):
1127
calls.append('setUp')
1128
self.addCleanup(self.cleanup)
1129
raise tests.TestSkipped('skipped setup')
1131
def test_skip(self):
1132
self.fail('test reached')
1135
calls.append('cleanup')
1137
runner = tests.TextTestRunner(stream=StringIO())
1138
test = SkippedSetupTest('test_skip')
1139
result = self.run_test_runner(runner, test)
1140
self.assertTrue(result.wasSuccessful())
1141
# Check if cleanup was called the right number of times.
1142
self.assertEqual(['setUp', 'cleanup'], calls)
1144
def test_skipped_from_test(self):
1146
class SkippedTest(tests.TestCase):
1149
super(SkippedTest, self).setUp()
1150
calls.append('setUp')
1151
self.addCleanup(self.cleanup)
1153
def test_skip(self):
1154
raise tests.TestSkipped('skipped test')
1157
calls.append('cleanup')
1159
runner = tests.TextTestRunner(stream=StringIO())
1160
test = SkippedTest('test_skip')
1161
result = self.run_test_runner(runner, test)
1162
self.assertTrue(result.wasSuccessful())
1163
# Check if cleanup was called the right number of times.
1164
self.assertEqual(['setUp', 'cleanup'], calls)
1166
def test_not_applicable(self):
1167
# run a test that is skipped because it's not applicable
1168
class Test(tests.TestCase):
1169
def not_applicable_test(self):
1170
raise tests.TestNotApplicable('this test never runs')
1172
runner = tests.TextTestRunner(stream=out, verbosity=2)
1173
test = Test("not_applicable_test")
1174
result = self.run_test_runner(runner, test)
1175
self.log(out.getvalue())
1176
self.assertTrue(result.wasSuccessful())
1177
self.assertTrue(result.wasStrictlySuccessful())
1178
self.assertContainsRe(out.getvalue(),
1179
r'(?m)not_applicable_test * N/A')
1180
self.assertContainsRe(out.getvalue(),
1181
r'(?m)^ this test never runs')
1183
def test_unsupported_features_listed(self):
1184
"""When unsupported features are encountered they are detailed."""
1185
class Feature1(features.Feature):
1186
def _probe(self): return False
1187
class Feature2(features.Feature):
1188
def _probe(self): return False
1189
# create sample tests
1190
test1 = SampleTestCase('_test_pass')
1191
test1._test_needs_features = [Feature1()]
1192
test2 = SampleTestCase('_test_pass')
1193
test2._test_needs_features = [Feature2()]
1194
test = unittest.TestSuite()
1198
runner = tests.TextTestRunner(stream=stream)
1199
result = self.run_test_runner(runner, test)
1200
lines = stream.getvalue().splitlines()
1203
"Missing feature 'Feature1' skipped 1 tests.",
1204
"Missing feature 'Feature2' skipped 1 tests.",
1208
def test_verbose_test_count(self):
1209
"""A verbose test run reports the right test count at the start"""
1210
suite = TestUtil.TestSuite([
1211
unittest.FunctionTestCase(lambda:None),
1212
unittest.FunctionTestCase(lambda:None)])
1213
self.assertEqual(suite.countTestCases(), 2)
1215
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1216
# Need to use the CountingDecorator as that's what sets num_tests
1217
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1218
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1220
def test_startTestRun(self):
1221
"""run should call result.startTestRun()"""
1223
class LoggingDecorator(ExtendedToOriginalDecorator):
1224
def startTestRun(self):
1225
ExtendedToOriginalDecorator.startTestRun(self)
1226
calls.append('startTestRun')
1227
test = unittest.FunctionTestCase(lambda:None)
1229
runner = tests.TextTestRunner(stream=stream,
1230
result_decorators=[LoggingDecorator])
1231
result = self.run_test_runner(runner, test)
1232
self.assertLength(1, calls)
1234
def test_stopTestRun(self):
1235
"""run should call result.stopTestRun()"""
1237
class LoggingDecorator(ExtendedToOriginalDecorator):
1238
def stopTestRun(self):
1239
ExtendedToOriginalDecorator.stopTestRun(self)
1240
calls.append('stopTestRun')
1241
test = unittest.FunctionTestCase(lambda:None)
1243
runner = tests.TextTestRunner(stream=stream,
1244
result_decorators=[LoggingDecorator])
1245
result = self.run_test_runner(runner, test)
1246
self.assertLength(1, calls)
1248
def test_unicode_test_output_on_ascii_stream(self):
1249
"""Showing results should always succeed even on an ascii console"""
1250
class FailureWithUnicode(tests.TestCase):
1251
def test_log_unicode(self):
1253
self.fail("Now print that log!")
1255
self.overrideAttr(osutils, "get_terminal_encoding",
1256
lambda trace=False: "ascii")
1257
result = self.run_test_runner(tests.TextTestRunner(stream=out),
1258
FailureWithUnicode("test_log_unicode"))
1259
self.assertContainsRe(out.getvalue(),
1260
"(?:Text attachment: )?log"
1262
"\d+\.\d+ \\\\u2606"
1266
class SampleTestCase(tests.TestCase):
1268
def _test_pass(self):
1271
class _TestException(Exception):
1275
class TestTestCase(tests.TestCase):
1276
"""Tests that test the core breezy TestCase."""
1278
def test_assertLength_matches_empty(self):
1280
self.assertLength(0, a_list)
1282
def test_assertLength_matches_nonempty(self):
1284
self.assertLength(3, a_list)
1286
def test_assertLength_fails_different(self):
1288
self.assertRaises(AssertionError, self.assertLength, 1, a_list)
1290
def test_assertLength_shows_sequence_in_failure(self):
1292
exception = self.assertRaises(AssertionError, self.assertLength, 2,
1294
self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
1297
def test_base_setUp_not_called_causes_failure(self):
1298
class TestCaseWithBrokenSetUp(tests.TestCase):
1300
pass # does not call TestCase.setUp
1303
test = TestCaseWithBrokenSetUp('test_foo')
1304
result = unittest.TestResult()
1306
self.assertFalse(result.wasSuccessful())
1307
self.assertEqual(1, result.testsRun)
1309
def test_base_tearDown_not_called_causes_failure(self):
1310
class TestCaseWithBrokenTearDown(tests.TestCase):
1312
pass # does not call TestCase.tearDown
1315
test = TestCaseWithBrokenTearDown('test_foo')
1316
result = unittest.TestResult()
1318
self.assertFalse(result.wasSuccessful())
1319
self.assertEqual(1, result.testsRun)
1321
def test_debug_flags_sanitised(self):
1322
"""The breezy debug flags should be sanitised by setUp."""
1323
if 'allow_debug' in tests.selftest_debug_flags:
1324
raise tests.TestNotApplicable(
1325
'-Eallow_debug option prevents debug flag sanitisation')
1326
# we could set something and run a test that will check
1327
# it gets santised, but this is probably sufficient for now:
1328
# if someone runs the test with -Dsomething it will error.
1330
if self._lock_check_thorough:
1331
flags.add('strict_locks')
1332
self.assertEqual(flags, breezy.debug.debug_flags)
1334
def change_selftest_debug_flags(self, new_flags):
1335
self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
1337
def test_allow_debug_flag(self):
1338
"""The -Eallow_debug flag prevents breezy.debug.debug_flags from being
1339
sanitised (i.e. cleared) before running a test.
1341
self.change_selftest_debug_flags({'allow_debug'})
1342
breezy.debug.debug_flags = {'a-flag'}
1343
class TestThatRecordsFlags(tests.TestCase):
1344
def test_foo(nested_self):
1345
self.flags = set(breezy.debug.debug_flags)
1346
test = TestThatRecordsFlags('test_foo')
1347
test.run(self.make_test_result())
1349
if 'disable_lock_checks' not in tests.selftest_debug_flags:
1350
flags.add('strict_locks')
1351
self.assertEqual(flags, self.flags)
1353
def test_disable_lock_checks(self):
1354
"""The -Edisable_lock_checks flag disables thorough checks."""
1355
class TestThatRecordsFlags(tests.TestCase):
1356
def test_foo(nested_self):
1357
self.flags = set(breezy.debug.debug_flags)
1358
self.test_lock_check_thorough = nested_self._lock_check_thorough
1359
self.change_selftest_debug_flags(set())
1360
test = TestThatRecordsFlags('test_foo')
1361
test.run(self.make_test_result())
1362
# By default we do strict lock checking and thorough lock/unlock
1364
self.assertTrue(self.test_lock_check_thorough)
1365
self.assertEqual({'strict_locks'}, self.flags)
1366
# Now set the disable_lock_checks flag, and show that this changed.
1367
self.change_selftest_debug_flags({'disable_lock_checks'})
1368
test = TestThatRecordsFlags('test_foo')
1369
test.run(self.make_test_result())
1370
self.assertFalse(self.test_lock_check_thorough)
1371
self.assertEqual(set(), self.flags)
1373
def test_this_fails_strict_lock_check(self):
1374
class TestThatRecordsFlags(tests.TestCase):
1375
def test_foo(nested_self):
1376
self.flags1 = set(breezy.debug.debug_flags)
1377
self.thisFailsStrictLockCheck()
1378
self.flags2 = set(breezy.debug.debug_flags)
1379
# Make sure lock checking is active
1380
self.change_selftest_debug_flags(set())
1381
test = TestThatRecordsFlags('test_foo')
1382
test.run(self.make_test_result())
1383
self.assertEqual({'strict_locks'}, self.flags1)
1384
self.assertEqual(set(), self.flags2)
1386
def test_debug_flags_restored(self):
1387
"""The breezy debug flags should be restored to their original state
1388
after the test was run, even if allow_debug is set.
1390
self.change_selftest_debug_flags({'allow_debug'})
1391
# Now run a test that modifies debug.debug_flags.
1392
breezy.debug.debug_flags = {'original-state'}
1393
class TestThatModifiesFlags(tests.TestCase):
1395
breezy.debug.debug_flags = {'modified'}
1396
test = TestThatModifiesFlags('test_foo')
1397
test.run(self.make_test_result())
1398
self.assertEqual({'original-state'}, breezy.debug.debug_flags)
1400
def make_test_result(self):
1401
"""Get a test result that writes to a StringIO."""
1402
return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1404
def inner_test(self):
1405
# the inner child test
1408
def outer_child(self):
1409
# the outer child test
1411
self.inner_test = TestTestCase("inner_child")
1412
result = self.make_test_result()
1413
self.inner_test.run(result)
1414
note("outer finish")
1415
self.addCleanup(osutils.delete_any, self._log_file_name)
1417
def test_trace_nesting(self):
1418
# this tests that each test case nests its trace facility correctly.
1419
# we do this by running a test case manually. That test case (A)
1420
# should setup a new log, log content to it, setup a child case (B),
1421
# which should log independently, then case (A) should log a trailer
1423
# we do two nested children so that we can verify the state of the
1424
# logs after the outer child finishes is correct, which a bad clean
1425
# up routine in tearDown might trigger a fault in our test with only
1426
# one child, we should instead see the bad result inside our test with
1428
# the outer child test
1429
original_trace = breezy.trace._trace_file
1430
outer_test = TestTestCase("outer_child")
1431
result = self.make_test_result()
1432
outer_test.run(result)
1433
self.assertEqual(original_trace, breezy.trace._trace_file)
1435
def method_that_times_a_bit_twice(self):
1436
# call self.time twice to ensure it aggregates
1437
self.time(time.sleep, 0.007)
1438
self.time(time.sleep, 0.007)
1440
def test_time_creates_benchmark_in_result(self):
1441
"""Test that the TestCase.time() method accumulates a benchmark time."""
1442
sample_test = TestTestCase("method_that_times_a_bit_twice")
1443
output_stream = StringIO()
1444
result = breezy.tests.VerboseTestResult(
1448
sample_test.run(result)
1449
self.assertContainsRe(
1450
output_stream.getvalue(),
1453
def test_hooks_sanitised(self):
1454
"""The breezy hooks should be sanitised by setUp."""
1455
# Note this test won't fail with hooks that the core library doesn't
1456
# use - but it trigger with a plugin that adds hooks, so its still a
1457
# useful warning in that case.
1458
self.assertEqual(breezy.branch.BranchHooks(), breezy.branch.Branch.hooks)
1460
breezy.smart.server.SmartServerHooks(),
1461
breezy.smart.server.SmartTCPServer.hooks)
1463
breezy.commands.CommandHooks(), breezy.commands.Command.hooks)
1465
def test__gather_lsprof_in_benchmarks(self):
1466
"""When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1468
Each self.time() call is individually and separately profiled.
1470
self.requireFeature(features.lsprof_feature)
1471
# overrides the class member with an instance member so no cleanup
1473
self._gather_lsprof_in_benchmarks = True
1474
self.time(time.sleep, 0.000)
1475
self.time(time.sleep, 0.003)
1476
self.assertEqual(2, len(self._benchcalls))
1477
self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
1478
self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1479
self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats)
1480
self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats)
1481
del self._benchcalls[:]
1483
def test_knownFailure(self):
1484
"""Self.knownFailure() should raise a KnownFailure exception."""
1485
self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1487
def test_open_bzrdir_safe_roots(self):
1488
# even a memory transport should fail to open when its url isn't
1490
# Manually set one up (TestCase doesn't and shouldn't provide magic
1492
transport_server = memory.MemoryServer()
1493
transport_server.start_server()
1494
self.addCleanup(transport_server.stop_server)
1495
t = transport.get_transport_from_url(transport_server.get_url())
1496
controldir.ControlDir.create(t.base)
1497
self.assertRaises(errors.BzrError,
1498
controldir.ControlDir.open_from_transport, t)
1499
# But if we declare this as safe, we can open the bzrdir.
1500
self.permit_url(t.base)
1501
self._bzr_selftest_roots.append(t.base)
1502
controldir.ControlDir.open_from_transport(t)
1504
def test_requireFeature_available(self):
1505
"""self.requireFeature(available) is a no-op."""
1506
class Available(features.Feature):
1507
def _probe(self):return True
1508
feature = Available()
1509
self.requireFeature(feature)
1511
def test_requireFeature_unavailable(self):
1512
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1513
class Unavailable(features.Feature):
1514
def _probe(self):return False
1515
feature = Unavailable()
1516
self.assertRaises(tests.UnavailableFeature,
1517
self.requireFeature, feature)
1519
def test_run_no_parameters(self):
1520
test = SampleTestCase('_test_pass')
1523
def test_run_enabled_unittest_result(self):
1524
"""Test we revert to regular behaviour when the test is enabled."""
1525
test = SampleTestCase('_test_pass')
1526
class EnabledFeature(object):
1527
def available(self):
1529
test._test_needs_features = [EnabledFeature()]
1530
result = unittest.TestResult()
1532
self.assertEqual(1, result.testsRun)
1533
self.assertEqual([], result.errors)
1534
self.assertEqual([], result.failures)
1536
def test_run_disabled_unittest_result(self):
1537
"""Test our compatability for disabled tests with unittest results."""
1538
test = SampleTestCase('_test_pass')
1539
class DisabledFeature(object):
1540
def available(self):
1542
test._test_needs_features = [DisabledFeature()]
1543
result = unittest.TestResult()
1545
self.assertEqual(1, result.testsRun)
1546
self.assertEqual([], result.errors)
1547
self.assertEqual([], result.failures)
1549
def test_run_disabled_supporting_result(self):
1550
"""Test disabled tests behaviour with support aware results."""
1551
test = SampleTestCase('_test_pass')
1552
class DisabledFeature(object):
1553
def __eq__(self, other):
1554
return isinstance(other, DisabledFeature)
1555
def available(self):
1557
the_feature = DisabledFeature()
1558
test._test_needs_features = [the_feature]
1559
class InstrumentedTestResult(unittest.TestResult):
1561
unittest.TestResult.__init__(self)
1563
def startTest(self, test):
1564
self.calls.append(('startTest', test))
1565
def stopTest(self, test):
1566
self.calls.append(('stopTest', test))
1567
def addNotSupported(self, test, feature):
1568
self.calls.append(('addNotSupported', test, feature))
1569
result = InstrumentedTestResult()
1571
case = result.calls[0][1]
1573
('startTest', case),
1574
('addNotSupported', case, the_feature),
1579
def test_start_server_registers_url(self):
1580
transport_server = memory.MemoryServer()
1581
# A little strict, but unlikely to be changed soon.
1582
self.assertEqual([], self._bzr_selftest_roots)
1583
self.start_server(transport_server)
1584
self.assertSubset([transport_server.get_url()],
1585
self._bzr_selftest_roots)
1587
def test_assert_list_raises_on_generator(self):
1588
def generator_which_will_raise():
1589
# This will not raise until after the first yield
1591
raise _TestException()
1593
e = self.assertListRaises(_TestException, generator_which_will_raise)
1594
self.assertIsInstance(e, _TestException)
1596
e = self.assertListRaises(Exception, generator_which_will_raise)
1597
self.assertIsInstance(e, _TestException)
1599
def test_assert_list_raises_on_plain(self):
1600
def plain_exception():
1601
raise _TestException()
1604
e = self.assertListRaises(_TestException, plain_exception)
1605
self.assertIsInstance(e, _TestException)
1607
e = self.assertListRaises(Exception, plain_exception)
1608
self.assertIsInstance(e, _TestException)
1610
def test_assert_list_raises_assert_wrong_exception(self):
1611
class _NotTestException(Exception):
1614
def wrong_exception():
1615
raise _NotTestException()
1617
def wrong_exception_generator():
1620
raise _NotTestException()
1622
# Wrong exceptions are not intercepted
1623
self.assertRaises(_NotTestException,
1624
self.assertListRaises, _TestException, wrong_exception)
1625
self.assertRaises(_NotTestException,
1626
self.assertListRaises, _TestException, wrong_exception_generator)
1628
def test_assert_list_raises_no_exception(self):
1632
def success_generator():
1636
self.assertRaises(AssertionError,
1637
self.assertListRaises, _TestException, success)
1639
self.assertRaises(AssertionError,
1640
self.assertListRaises, _TestException, success_generator)
1642
def _run_successful_test(self, test):
1643
result = testtools.TestResult()
1645
self.assertTrue(result.wasSuccessful())
1648
def test_overrideAttr_without_value(self):
1649
self.test_attr = 'original' # Define a test attribute
1650
obj = self # Make 'obj' visible to the embedded test
1651
class Test(tests.TestCase):
1654
super(Test, self).setUp()
1655
self.orig = self.overrideAttr(obj, 'test_attr')
1657
def test_value(self):
1658
self.assertEqual('original', self.orig)
1659
self.assertEqual('original', obj.test_attr)
1660
obj.test_attr = 'modified'
1661
self.assertEqual('modified', obj.test_attr)
1663
self._run_successful_test(Test('test_value'))
1664
self.assertEqual('original', obj.test_attr)
1666
def test_overrideAttr_with_value(self):
1667
self.test_attr = 'original' # Define a test attribute
1668
obj = self # Make 'obj' visible to the embedded test
1669
class Test(tests.TestCase):
1672
super(Test, self).setUp()
1673
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1675
def test_value(self):
1676
self.assertEqual('original', self.orig)
1677
self.assertEqual('modified', obj.test_attr)
1679
self._run_successful_test(Test('test_value'))
1680
self.assertEqual('original', obj.test_attr)
1682
def test_overrideAttr_with_no_existing_value_and_value(self):
1683
# Do not define the test_attribute
1684
obj = self # Make 'obj' visible to the embedded test
1685
class Test(tests.TestCase):
1688
tests.TestCase.setUp(self)
1689
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1691
def test_value(self):
1692
self.assertEqual(tests._unitialized_attr, self.orig)
1693
self.assertEqual('modified', obj.test_attr)
1695
self._run_successful_test(Test('test_value'))
1696
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1698
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1699
# Do not define the test_attribute
1700
obj = self # Make 'obj' visible to the embedded test
1701
class Test(tests.TestCase):
1704
tests.TestCase.setUp(self)
1705
self.orig = self.overrideAttr(obj, 'test_attr')
1707
def test_value(self):
1708
self.assertEqual(tests._unitialized_attr, self.orig)
1709
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1711
self._run_successful_test(Test('test_value'))
1712
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1714
def test_recordCalls(self):
1715
from breezy.tests import test_selftest
1716
calls = self.recordCalls(
1717
test_selftest, '_add_numbers')
1718
self.assertEqual(test_selftest._add_numbers(2, 10),
1720
self.assertEqual(calls, [((2, 10), {})])
1723
def _add_numbers(a, b):
1727
class _MissingFeature(features.Feature):
1730
missing_feature = _MissingFeature()
1733
def _get_test(name):
1734
"""Get an instance of a specific example test.
1736
We protect this in a function so that they don't auto-run in the test
1740
class ExampleTests(tests.TestCase):
1742
def test_fail(self):
1743
mutter('this was a failing test')
1744
self.fail('this test will fail')
1746
def test_error(self):
1747
mutter('this test errored')
1748
raise RuntimeError('gotcha')
1750
def test_missing_feature(self):
1751
mutter('missing the feature')
1752
self.requireFeature(missing_feature)
1754
def test_skip(self):
1755
mutter('this test will be skipped')
1756
raise tests.TestSkipped('reason')
1758
def test_success(self):
1759
mutter('this test succeeds')
1761
def test_xfail(self):
1762
mutter('test with expected failure')
1763
self.knownFailure('this_fails')
1765
def test_unexpected_success(self):
1766
mutter('test with unexpected success')
1767
self.expectFailure('should_fail', lambda: None)
1769
return ExampleTests(name)
1772
def _get_skip_reasons(result):
1773
# GZ 2017-06-06: Newer testtools doesn't have this, uses detail instead
1774
return result.skip_reasons
1777
class TestTestCaseLogDetails(tests.TestCase):
1779
def _run_test(self, test_name):
1780
test = _get_test(test_name)
1781
result = testtools.TestResult()
1785
def test_fail_has_log(self):
1786
result = self._run_test('test_fail')
1787
self.assertEqual(1, len(result.failures))
1788
result_content = result.failures[0][1]
1789
self.assertContainsRe(result_content,
1790
'(?m)^(?:Text attachment: )?log(?:$|: )')
1791
self.assertContainsRe(result_content, 'this was a failing test')
1793
def test_error_has_log(self):
1794
result = self._run_test('test_error')
1795
self.assertEqual(1, len(result.errors))
1796
result_content = result.errors[0][1]
1797
self.assertContainsRe(result_content,
1798
'(?m)^(?:Text attachment: )?log(?:$|: )')
1799
self.assertContainsRe(result_content, 'this test errored')
1801
def test_skip_has_no_log(self):
1802
result = self._run_test('test_skip')
1803
reasons = _get_skip_reasons(result)
1804
self.assertEqual({'reason'}, set(reasons))
1805
skips = reasons['reason']
1806
self.assertEqual(1, len(skips))
1808
self.assertFalse('log' in test.getDetails())
1810
def test_missing_feature_has_no_log(self):
1811
# testtools doesn't know about addNotSupported, so it just gets
1812
# considered as a skip
1813
result = self._run_test('test_missing_feature')
1814
reasons = _get_skip_reasons(result)
1815
self.assertEqual({missing_feature}, set(reasons))
1816
skips = reasons[missing_feature]
1817
self.assertEqual(1, len(skips))
1819
self.assertFalse('log' in test.getDetails())
1821
def test_xfail_has_no_log(self):
1822
result = self._run_test('test_xfail')
1823
self.assertEqual(1, len(result.expectedFailures))
1824
result_content = result.expectedFailures[0][1]
1825
self.assertNotContainsRe(result_content,
1826
'(?m)^(?:Text attachment: )?log(?:$|: )')
1827
self.assertNotContainsRe(result_content, 'test with expected failure')
1829
def test_unexpected_success_has_log(self):
1830
result = self._run_test('test_unexpected_success')
1831
self.assertEqual(1, len(result.unexpectedSuccesses))
1832
# Inconsistency, unexpectedSuccesses is a list of tests,
1833
# expectedFailures is a list of reasons?
1834
test = result.unexpectedSuccesses[0]
1835
details = test.getDetails()
1836
self.assertTrue('log' in details)
1839
class TestTestCloning(tests.TestCase):
1840
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1842
def test_cloned_testcase_does_not_share_details(self):
1843
"""A TestCase cloned with clone_test does not share mutable attributes
1844
such as details or cleanups.
1846
class Test(tests.TestCase):
1848
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1849
orig_test = Test('test_foo')
1850
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1851
orig_test.run(unittest.TestResult())
1852
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1853
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1855
def test_double_apply_scenario_preserves_first_scenario(self):
1856
"""Applying two levels of scenarios to a test preserves the attributes
1857
added by both scenarios.
1859
class Test(tests.TestCase):
1862
test = Test('test_foo')
1863
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1864
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1865
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1866
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1867
all_tests = list(tests.iter_suite_tests(suite))
1868
self.assertLength(4, all_tests)
1869
all_xys = sorted((t.x, t.y) for t in all_tests)
1870
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1873
# NB: Don't delete this; it's not actually from 0.11!
1874
@deprecated_function(deprecated_in((0, 11, 0)))
1875
def sample_deprecated_function():
1876
"""A deprecated function to test applyDeprecated with."""
1880
def sample_undeprecated_function(a_param):
1881
"""A undeprecated function to test applyDeprecated with."""
1884
class ApplyDeprecatedHelper(object):
1885
"""A helper class for ApplyDeprecated tests."""
1887
@deprecated_method(deprecated_in((0, 11, 0)))
1888
def sample_deprecated_method(self, param_one):
1889
"""A deprecated method for testing with."""
1892
def sample_normal_method(self):
1893
"""A undeprecated method."""
1895
@deprecated_method(deprecated_in((0, 10, 0)))
1896
def sample_nested_deprecation(self):
1897
return sample_deprecated_function()
1900
class TestExtraAssertions(tests.TestCase):
1901
"""Tests for new test assertions in breezy test suite"""
1903
def test_assert_isinstance(self):
1904
self.assertIsInstance(2, int)
1905
self.assertIsInstance(u'', basestring)
1906
e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1907
self.assertEqual(str(e),
1908
"None is an instance of <type 'NoneType'> rather than <type 'int'>")
1909
self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1910
e = self.assertRaises(AssertionError,
1911
self.assertIsInstance, None, int, "it's just not")
1912
self.assertEqual(str(e),
1913
"None is an instance of <type 'NoneType'> rather than <type 'int'>"
1916
def test_assertEndsWith(self):
1917
self.assertEndsWith('foo', 'oo')
1918
self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1920
def test_assertEqualDiff(self):
1921
e = self.assertRaises(AssertionError,
1922
self.assertEqualDiff, '', '\n')
1923
self.assertEqual(str(e),
1924
# Don't blink ! The '+' applies to the second string
1925
'first string is missing a final newline.\n+ \n')
1926
e = self.assertRaises(AssertionError,
1927
self.assertEqualDiff, '\n', '')
1928
self.assertEqual(str(e),
1929
# Don't blink ! The '-' applies to the second string
1930
'second string is missing a final newline.\n- \n')
1933
class TestDeprecations(tests.TestCase):
1935
def test_applyDeprecated_not_deprecated(self):
1936
sample_object = ApplyDeprecatedHelper()
1937
# calling an undeprecated callable raises an assertion
1938
self.assertRaises(AssertionError, self.applyDeprecated,
1939
deprecated_in((0, 11, 0)),
1940
sample_object.sample_normal_method)
1941
self.assertRaises(AssertionError, self.applyDeprecated,
1942
deprecated_in((0, 11, 0)),
1943
sample_undeprecated_function, "a param value")
1944
# calling a deprecated callable (function or method) with the wrong
1945
# expected deprecation fails.
1946
self.assertRaises(AssertionError, self.applyDeprecated,
1947
deprecated_in((0, 10, 0)),
1948
sample_object.sample_deprecated_method, "a param value")
1949
self.assertRaises(AssertionError, self.applyDeprecated,
1950
deprecated_in((0, 10, 0)),
1951
sample_deprecated_function)
1952
# calling a deprecated callable (function or method) with the right
1953
# expected deprecation returns the functions result.
1954
self.assertEqual("a param value",
1955
self.applyDeprecated(deprecated_in((0, 11, 0)),
1956
sample_object.sample_deprecated_method, "a param value"))
1957
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
1958
sample_deprecated_function))
1959
# calling a nested deprecation with the wrong deprecation version
1960
# fails even if a deeper nested function was deprecated with the
1962
self.assertRaises(AssertionError, self.applyDeprecated,
1963
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
1964
# calling a nested deprecation with the right deprecation value
1965
# returns the calls result.
1966
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
1967
sample_object.sample_nested_deprecation))
1969
def test_callDeprecated(self):
1970
def testfunc(be_deprecated, result=None):
1971
if be_deprecated is True:
1972
symbol_versioning.warn('i am deprecated', DeprecationWarning,
1975
result = self.callDeprecated(['i am deprecated'], testfunc, True)
1976
self.assertIs(None, result)
1977
result = self.callDeprecated([], testfunc, False, 'result')
1978
self.assertEqual('result', result)
1979
self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
1980
self.callDeprecated([], testfunc, be_deprecated=False)
1983
class TestWarningTests(tests.TestCase):
1984
"""Tests for calling methods that raise warnings."""
1986
def test_callCatchWarnings(self):
1988
warnings.warn("this is your last warning")
1990
wlist, result = self.callCatchWarnings(meth, 1, 2)
1991
self.assertEqual(3, result)
1992
# would like just to compare them, but UserWarning doesn't implement
1995
self.assertIsInstance(w0, UserWarning)
1996
self.assertEqual("this is your last warning", str(w0))
1999
class TestConvenienceMakers(tests.TestCaseWithTransport):
2000
"""Test for the make_* convenience functions."""
2002
def test_make_branch_and_tree_with_format(self):
2003
# we should be able to supply a format to make_branch_and_tree
2004
self.make_branch_and_tree('a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
2005
self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
2006
breezy.bzr.bzrdir.BzrDirMetaFormat1)
2008
def test_make_branch_and_memory_tree(self):
2009
# we should be able to get a new branch and a mutable tree from
2010
# TestCaseWithTransport
2011
tree = self.make_branch_and_memory_tree('a')
2012
self.assertIsInstance(tree, breezy.memorytree.MemoryTree)
2014
def test_make_tree_for_local_vfs_backed_transport(self):
2015
# make_branch_and_tree has to use local branch and repositories
2016
# when the vfs transport and local disk are colocated, even if
2017
# a different transport is in use for url generation.
2018
self.transport_server = test_server.FakeVFATServer
2019
self.assertFalse(self.get_url('t1').startswith('file://'))
2020
tree = self.make_branch_and_tree('t1')
2021
base = tree.controldir.root_transport.base
2022
self.assertStartsWith(base, 'file://')
2023
self.assertEqual(tree.controldir.root_transport,
2024
tree.branch.controldir.root_transport)
2025
self.assertEqual(tree.controldir.root_transport,
2026
tree.branch.repository.controldir.root_transport)
2029
class SelfTestHelper(object):
2031
def run_selftest(self, **kwargs):
2032
"""Run selftest returning its output."""
2034
old_transport = breezy.tests.default_transport
2035
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2036
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2038
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
2040
breezy.tests.default_transport = old_transport
2041
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2046
class TestSelftest(tests.TestCase, SelfTestHelper):
2047
"""Tests of breezy.tests.selftest."""
2049
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
2052
factory_called.append(True)
2053
return TestUtil.TestSuite()
2056
self.apply_redirected(out, err, None, breezy.tests.selftest,
2057
test_suite_factory=factory)
2058
self.assertEqual([True], factory_called)
2061
"""A test suite factory."""
2062
class Test(tests.TestCase):
2069
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2071
def test_list_only(self):
2072
output = self.run_selftest(test_suite_factory=self.factory,
2074
self.assertEqual(3, len(output.readlines()))
2076
def test_list_only_filtered(self):
2077
output = self.run_selftest(test_suite_factory=self.factory,
2078
list_only=True, pattern="Test.b")
2079
self.assertEndsWith(output.getvalue(), "Test.b\n")
2080
self.assertLength(1, output.readlines())
2082
def test_list_only_excludes(self):
2083
output = self.run_selftest(test_suite_factory=self.factory,
2084
list_only=True, exclude_pattern="Test.b")
2085
self.assertNotContainsRe("Test.b", output.getvalue())
2086
self.assertLength(2, output.readlines())
2088
def test_lsprof_tests(self):
2089
self.requireFeature(features.lsprof_feature)
2092
def __call__(test, result):
2094
def run(test, result):
2095
results.append(result)
2096
def countTestCases(self):
2098
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2099
self.assertLength(1, results)
2100
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2102
def test_random(self):
2103
# test randomising by listing a number of tests.
2104
output_123 = self.run_selftest(test_suite_factory=self.factory,
2105
list_only=True, random_seed="123")
2106
output_234 = self.run_selftest(test_suite_factory=self.factory,
2107
list_only=True, random_seed="234")
2108
self.assertNotEqual(output_123, output_234)
2109
# "Randominzing test order..\n\n
2110
self.assertLength(5, output_123.readlines())
2111
self.assertLength(5, output_234.readlines())
2113
def test_random_reuse_is_same_order(self):
2114
# test randomising by listing a number of tests.
2115
expected = self.run_selftest(test_suite_factory=self.factory,
2116
list_only=True, random_seed="123")
2117
repeated = self.run_selftest(test_suite_factory=self.factory,
2118
list_only=True, random_seed="123")
2119
self.assertEqual(expected.getvalue(), repeated.getvalue())
2121
def test_runner_class(self):
2122
self.requireFeature(features.subunit)
2123
from subunit import ProtocolTestCase
2124
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2125
test_suite_factory=self.factory)
2126
test = ProtocolTestCase(stream)
2127
result = unittest.TestResult()
2129
self.assertEqual(3, result.testsRun)
2131
def test_starting_with_single_argument(self):
2132
output = self.run_selftest(test_suite_factory=self.factory,
2133
starting_with=['breezy.tests.test_selftest.Test.a'],
2135
self.assertEqual('breezy.tests.test_selftest.Test.a\n',
2138
def test_starting_with_multiple_argument(self):
2139
output = self.run_selftest(test_suite_factory=self.factory,
2140
starting_with=['breezy.tests.test_selftest.Test.a',
2141
'breezy.tests.test_selftest.Test.b'],
2143
self.assertEqual('breezy.tests.test_selftest.Test.a\n'
2144
'breezy.tests.test_selftest.Test.b\n',
2147
def check_transport_set(self, transport_server):
2148
captured_transport = []
2149
def seen_transport(a_transport):
2150
captured_transport.append(a_transport)
2151
class Capture(tests.TestCase):
2153
seen_transport(breezy.tests.default_transport)
2155
return TestUtil.TestSuite([Capture("a")])
2156
self.run_selftest(transport=transport_server, test_suite_factory=factory)
2157
self.assertEqual(transport_server, captured_transport[0])
2159
def test_transport_sftp(self):
2160
self.requireFeature(features.paramiko)
2161
from breezy.tests import stub_sftp
2162
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2164
def test_transport_memory(self):
2165
self.check_transport_set(memory.MemoryServer)
2168
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2169
# Does IO: reads test.list
2171
def test_load_list(self):
2172
# Provide a list with one test - this test.
2173
test_id_line = '%s\n' % self.id()
2174
self.build_tree_contents([('test.list', test_id_line)])
2175
# And generate a list of the tests in the suite.
2176
stream = self.run_selftest(load_list='test.list', list_only=True)
2177
self.assertEqual(test_id_line, stream.getvalue())
2179
def test_load_unknown(self):
2180
# Provide a list with one test - this test.
2181
# And generate a list of the tests in the suite.
2182
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
2183
load_list='missing file name', list_only=True)
2186
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2188
_test_needs_features = [features.subunit]
2190
def run_subunit_stream(self, test_name):
2191
from subunit import ProtocolTestCase
2193
return TestUtil.TestSuite([_get_test(test_name)])
2194
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2195
test_suite_factory=factory)
2196
test = ProtocolTestCase(stream)
2197
result = testtools.TestResult()
2199
content = stream.getvalue()
2200
return content, result
2202
def test_fail_has_log(self):
2203
content, result = self.run_subunit_stream('test_fail')
2204
self.assertEqual(1, len(result.failures))
2205
self.assertContainsRe(content, '(?m)^log$')
2206
self.assertContainsRe(content, 'this test will fail')
2208
def test_error_has_log(self):
2209
content, result = self.run_subunit_stream('test_error')
2210
self.assertContainsRe(content, '(?m)^log$')
2211
self.assertContainsRe(content, 'this test errored')
2213
def test_skip_has_no_log(self):
2214
content, result = self.run_subunit_stream('test_skip')
2215
self.assertNotContainsRe(content, '(?m)^log$')
2216
self.assertNotContainsRe(content, 'this test will be skipped')
2217
reasons = _get_skip_reasons(result)
2218
self.assertEqual({'reason'}, set(reasons))
2219
skips = reasons['reason']
2220
self.assertEqual(1, len(skips))
2222
# RemotedTestCase doesn't preserve the "details"
2223
## self.assertFalse('log' in test.getDetails())
2225
def test_missing_feature_has_no_log(self):
2226
content, result = self.run_subunit_stream('test_missing_feature')
2227
self.assertNotContainsRe(content, '(?m)^log$')
2228
self.assertNotContainsRe(content, 'missing the feature')
2229
reasons = _get_skip_reasons(result)
2230
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2231
skips = reasons['_MissingFeature\n']
2232
self.assertEqual(1, len(skips))
2234
# RemotedTestCase doesn't preserve the "details"
2235
## self.assertFalse('log' in test.getDetails())
2237
def test_xfail_has_no_log(self):
2238
content, result = self.run_subunit_stream('test_xfail')
2239
self.assertNotContainsRe(content, '(?m)^log$')
2240
self.assertNotContainsRe(content, 'test with expected failure')
2241
self.assertEqual(1, len(result.expectedFailures))
2242
result_content = result.expectedFailures[0][1]
2243
self.assertNotContainsRe(result_content,
2244
'(?m)^(?:Text attachment: )?log(?:$|: )')
2245
self.assertNotContainsRe(result_content, 'test with expected failure')
2247
def test_unexpected_success_has_log(self):
2248
content, result = self.run_subunit_stream('test_unexpected_success')
2249
self.assertContainsRe(content, '(?m)^log$')
2250
self.assertContainsRe(content, 'test with unexpected success')
2251
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2252
# success, if a min version check is added remove this
2253
from subunit import TestProtocolClient as _Client
2254
if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
2255
self.expectFailure('subunit treats "unexpectedSuccess"'
2256
' as a plain success',
2257
self.assertEqual, 1, len(result.unexpectedSuccesses))
2258
self.assertEqual(1, len(result.unexpectedSuccesses))
2259
test = result.unexpectedSuccesses[0]
2260
# RemotedTestCase doesn't preserve the "details"
2261
## self.assertTrue('log' in test.getDetails())
2263
def test_success_has_no_log(self):
2264
content, result = self.run_subunit_stream('test_success')
2265
self.assertEqual(1, result.testsRun)
2266
self.assertNotContainsRe(content, '(?m)^log$')
2267
self.assertNotContainsRe(content, 'this test succeeds')
2270
class TestRunBzr(tests.TestCase):
2275
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2277
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2279
Attempts to run bzr from inside this class don't actually run it.
2281
We test how run_bzr actually invokes bzr in another location. Here we
2282
only need to test that it passes the right parameters to run_bzr.
2284
self.argv = list(argv)
2285
self.retcode = retcode
2286
self.encoding = encoding
2288
self.working_dir = working_dir
2289
return self.retcode, self.out, self.err
2291
def test_run_bzr_error(self):
2292
self.out = "It sure does!\n"
2293
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2294
self.assertEqual(['rocks'], self.argv)
2295
self.assertEqual(34, self.retcode)
2296
self.assertEqual('It sure does!\n', out)
2297
self.assertEqual(out, self.out)
2298
self.assertEqual('', err)
2299
self.assertEqual(err, self.err)
2301
def test_run_bzr_error_regexes(self):
2303
self.err = "bzr: ERROR: foobarbaz is not versioned"
2304
out, err = self.run_bzr_error(
2305
["bzr: ERROR: foobarbaz is not versioned"],
2306
['file-id', 'foobarbaz'])
2308
def test_encoding(self):
2309
"""Test that run_bzr passes encoding to _run_bzr_core"""
2310
self.run_bzr('foo bar')
2311
self.assertEqual(None, self.encoding)
2312
self.assertEqual(['foo', 'bar'], self.argv)
2314
self.run_bzr('foo bar', encoding='baz')
2315
self.assertEqual('baz', self.encoding)
2316
self.assertEqual(['foo', 'bar'], self.argv)
2318
def test_retcode(self):
2319
"""Test that run_bzr passes retcode to _run_bzr_core"""
2320
# Default is retcode == 0
2321
self.run_bzr('foo bar')
2322
self.assertEqual(0, self.retcode)
2323
self.assertEqual(['foo', 'bar'], self.argv)
2325
self.run_bzr('foo bar', retcode=1)
2326
self.assertEqual(1, self.retcode)
2327
self.assertEqual(['foo', 'bar'], self.argv)
2329
self.run_bzr('foo bar', retcode=None)
2330
self.assertEqual(None, self.retcode)
2331
self.assertEqual(['foo', 'bar'], self.argv)
2333
self.run_bzr(['foo', 'bar'], retcode=3)
2334
self.assertEqual(3, self.retcode)
2335
self.assertEqual(['foo', 'bar'], self.argv)
2337
def test_stdin(self):
2338
# test that the stdin keyword to run_bzr is passed through to
2339
# _run_bzr_core as-is. We do this by overriding
2340
# _run_bzr_core in this class, and then calling run_bzr,
2341
# which is a convenience function for _run_bzr_core, so
2343
self.run_bzr('foo bar', stdin='gam')
2344
self.assertEqual('gam', self.stdin)
2345
self.assertEqual(['foo', 'bar'], self.argv)
2347
self.run_bzr('foo bar', stdin='zippy')
2348
self.assertEqual('zippy', self.stdin)
2349
self.assertEqual(['foo', 'bar'], self.argv)
2351
def test_working_dir(self):
2352
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2353
self.run_bzr('foo bar')
2354
self.assertEqual(None, self.working_dir)
2355
self.assertEqual(['foo', 'bar'], self.argv)
2357
self.run_bzr('foo bar', working_dir='baz')
2358
self.assertEqual('baz', self.working_dir)
2359
self.assertEqual(['foo', 'bar'], self.argv)
2361
def test_reject_extra_keyword_arguments(self):
2362
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2363
error_regex=['error message'])
2366
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2367
# Does IO when testing the working_dir parameter.
2369
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2370
a_callable=None, *args, **kwargs):
2372
self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None)
2373
self.factory = breezy.ui.ui_factory
2374
self.working_dir = osutils.getcwd()
2375
stdout.write('foo\n')
2376
stderr.write('bar\n')
2379
def test_stdin(self):
2380
# test that the stdin keyword to _run_bzr_core is passed through to
2381
# apply_redirected as a StringIO. We do this by overriding
2382
# apply_redirected in this class, and then calling _run_bzr_core,
2383
# which calls apply_redirected.
2384
self.run_bzr(['foo', 'bar'], stdin='gam')
2385
self.assertEqual('gam', self.stdin.read())
2386
self.assertTrue(self.stdin is self.factory_stdin)
2387
self.run_bzr(['foo', 'bar'], stdin='zippy')
2388
self.assertEqual('zippy', self.stdin.read())
2389
self.assertTrue(self.stdin is self.factory_stdin)
2391
def test_ui_factory(self):
2392
# each invocation of self.run_bzr should get its
2393
# own UI factory, which is an instance of TestUIFactory,
2394
# with stdin, stdout and stderr attached to the stdin,
2395
# stdout and stderr of the invoked run_bzr
2396
current_factory = breezy.ui.ui_factory
2397
self.run_bzr(['foo'])
2398
self.assertFalse(current_factory is self.factory)
2399
self.assertNotEqual(sys.stdout, self.factory.stdout)
2400
self.assertNotEqual(sys.stderr, self.factory.stderr)
2401
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2402
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2403
self.assertIsInstance(self.factory, tests.TestUIFactory)
2405
def test_working_dir(self):
2406
self.build_tree(['one/', 'two/'])
2407
cwd = osutils.getcwd()
2409
# Default is to work in the current directory
2410
self.run_bzr(['foo', 'bar'])
2411
self.assertEqual(cwd, self.working_dir)
2413
self.run_bzr(['foo', 'bar'], working_dir=None)
2414
self.assertEqual(cwd, self.working_dir)
2416
# The function should be run in the alternative directory
2417
# but afterwards the current working dir shouldn't be changed
2418
self.run_bzr(['foo', 'bar'], working_dir='one')
2419
self.assertNotEqual(cwd, self.working_dir)
2420
self.assertEndsWith(self.working_dir, 'one')
2421
self.assertEqual(cwd, osutils.getcwd())
2423
self.run_bzr(['foo', 'bar'], working_dir='two')
2424
self.assertNotEqual(cwd, self.working_dir)
2425
self.assertEndsWith(self.working_dir, 'two')
2426
self.assertEqual(cwd, osutils.getcwd())
2429
class StubProcess(object):
2430
"""A stub process for testing run_bzr_subprocess."""
2432
def __init__(self, out="", err="", retcode=0):
2435
self.returncode = retcode
2437
def communicate(self):
2438
return self.out, self.err
2441
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2442
"""Base class for tests testing how we might run bzr."""
2445
super(TestWithFakedStartBzrSubprocess, self).setUp()
2446
self.subprocess_calls = []
2448
def start_bzr_subprocess(self, process_args, env_changes=None,
2449
skip_if_plan_to_signal=False,
2451
allow_plugins=False):
2452
"""capture what run_bzr_subprocess tries to do."""
2453
self.subprocess_calls.append({'process_args':process_args,
2454
'env_changes':env_changes,
2455
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2456
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2457
return self.next_subprocess
2460
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2462
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2463
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2465
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2466
that will return static results. This assertion method populates those
2467
results and also checks the arguments run_bzr_subprocess generates.
2469
self.next_subprocess = process
2471
result = self.run_bzr_subprocess(*args, **kwargs)
2473
self.next_subprocess = None
2474
for key, expected in expected_args.items():
2475
self.assertEqual(expected, self.subprocess_calls[-1][key])
2478
self.next_subprocess = None
2479
for key, expected in expected_args.items():
2480
self.assertEqual(expected, self.subprocess_calls[-1][key])
2483
def test_run_bzr_subprocess(self):
2484
"""The run_bzr_helper_external command behaves nicely."""
2485
self.assertRunBzrSubprocess({'process_args':['--version']},
2486
StubProcess(), '--version')
2487
self.assertRunBzrSubprocess({'process_args':['--version']},
2488
StubProcess(), ['--version'])
2489
# retcode=None disables retcode checking
2490
result = self.assertRunBzrSubprocess({},
2491
StubProcess(retcode=3), '--version', retcode=None)
2492
result = self.assertRunBzrSubprocess({},
2493
StubProcess(out="is free software"), '--version')
2494
self.assertContainsRe(result[0], 'is free software')
2495
# Running a subcommand that is missing errors
2496
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2497
{'process_args':['--versionn']}, StubProcess(retcode=3),
2499
# Unless it is told to expect the error from the subprocess
2500
result = self.assertRunBzrSubprocess({},
2501
StubProcess(retcode=3), '--versionn', retcode=3)
2502
# Or to ignore retcode checking
2503
result = self.assertRunBzrSubprocess({},
2504
StubProcess(err="unknown command", retcode=3), '--versionn',
2506
self.assertContainsRe(result[1], 'unknown command')
2508
def test_env_change_passes_through(self):
2509
self.assertRunBzrSubprocess(
2510
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2512
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2514
def test_no_working_dir_passed_as_None(self):
2515
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2517
def test_no_working_dir_passed_through(self):
2518
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2521
def test_run_bzr_subprocess_no_plugins(self):
2522
self.assertRunBzrSubprocess({'allow_plugins': False},
2525
def test_allow_plugins(self):
2526
self.assertRunBzrSubprocess({'allow_plugins': True},
2527
StubProcess(), '', allow_plugins=True)
2530
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2532
def test_finish_bzr_subprocess_with_error(self):
2533
"""finish_bzr_subprocess allows specification of the desired exit code.
2535
process = StubProcess(err="unknown command", retcode=3)
2536
result = self.finish_bzr_subprocess(process, retcode=3)
2537
self.assertEqual('', result[0])
2538
self.assertContainsRe(result[1], 'unknown command')
2540
def test_finish_bzr_subprocess_ignoring_retcode(self):
2541
"""finish_bzr_subprocess allows the exit code to be ignored."""
2542
process = StubProcess(err="unknown command", retcode=3)
2543
result = self.finish_bzr_subprocess(process, retcode=None)
2544
self.assertEqual('', result[0])
2545
self.assertContainsRe(result[1], 'unknown command')
2547
def test_finish_subprocess_with_unexpected_retcode(self):
2548
"""finish_bzr_subprocess raises self.failureException if the retcode is
2549
not the expected one.
2551
process = StubProcess(err="unknown command", retcode=3)
2552
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2556
class _DontSpawnProcess(Exception):
2557
"""A simple exception which just allows us to skip unnecessary steps"""
2560
class TestStartBzrSubProcess(tests.TestCase):
2561
"""Stub test start_bzr_subprocess."""
2563
def _subprocess_log_cleanup(self):
2564
"""Inhibits the base version as we don't produce a log file."""
2566
def _popen(self, *args, **kwargs):
2567
"""Override the base version to record the command that is run.
2569
From there we can ensure it is correct without spawning a real process.
2571
self.check_popen_state()
2572
self._popen_args = args
2573
self._popen_kwargs = kwargs
2574
raise _DontSpawnProcess()
2576
def check_popen_state(self):
2577
"""Replace to make assertions when popen is called."""
2579
def test_run_bzr_subprocess_no_plugins(self):
2580
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2581
command = self._popen_args[0]
2582
self.assertEqual(sys.executable, command[0])
2583
self.assertEqual(self.get_brz_path(), command[1])
2584
self.assertEqual(['--no-plugins'], command[2:])
2586
def test_allow_plugins(self):
2587
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2589
command = self._popen_args[0]
2590
self.assertEqual([], command[2:])
2592
def test_set_env(self):
2593
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2595
def check_environment():
2596
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2597
self.check_popen_state = check_environment
2598
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2599
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2600
# not set in theparent
2601
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2603
def test_run_bzr_subprocess_env_del(self):
2604
"""run_bzr_subprocess can remove environment variables too."""
2605
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2606
def check_environment():
2607
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2608
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2609
self.check_popen_state = check_environment
2610
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2611
env_changes={'EXISTANT_ENV_VAR':None})
2612
# Still set in parent
2613
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2614
del os.environ['EXISTANT_ENV_VAR']
2616
def test_env_del_missing(self):
2617
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2618
def check_environment():
2619
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2620
self.check_popen_state = check_environment
2621
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2622
env_changes={'NON_EXISTANT_ENV_VAR':None})
2624
def test_working_dir(self):
2625
"""Test that we can specify the working dir for the child"""
2626
orig_getcwd = osutils.getcwd
2627
orig_chdir = os.chdir
2631
self.overrideAttr(os, 'chdir', chdir)
2634
self.overrideAttr(osutils, 'getcwd', getcwd)
2635
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2637
self.assertEqual(['foo', 'current'], chdirs)
2639
def test_get_brz_path_with_cwd_breezy(self):
2640
self.get_source_path = lambda: ""
2641
self.overrideAttr(os.path, "isfile", lambda path: True)
2642
self.assertEqual(self.get_brz_path(), "brz")
2645
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2646
"""Tests that really need to do things with an external bzr."""
2648
def test_start_and_stop_bzr_subprocess_send_signal(self):
2649
"""finish_bzr_subprocess raises self.failureException if the retcode is
2650
not the expected one.
2652
self.disable_missing_extensions_warning()
2653
process = self.start_bzr_subprocess(['wait-until-signalled'],
2654
skip_if_plan_to_signal=True)
2655
self.assertEqual('running\n', process.stdout.readline())
2656
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2658
self.assertEqual('', result[0])
2659
self.assertEqual('brz: interrupted\n', result[1])
2662
class TestSelftestFiltering(tests.TestCase):
2665
super(TestSelftestFiltering, self).setUp()
2666
self.suite = TestUtil.TestSuite()
2667
self.loader = TestUtil.TestLoader()
2668
self.suite.addTest(self.loader.loadTestsFromModule(
2669
sys.modules['breezy.tests.test_selftest']))
2670
self.all_names = _test_ids(self.suite)
2672
def test_condition_id_re(self):
2673
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2674
'test_condition_id_re')
2675
filtered_suite = tests.filter_suite_by_condition(
2676
self.suite, tests.condition_id_re('test_condition_id_re'))
2677
self.assertEqual([test_name], _test_ids(filtered_suite))
2679
def test_condition_id_in_list(self):
2680
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2681
'test_condition_id_in_list']
2682
id_list = tests.TestIdList(test_names)
2683
filtered_suite = tests.filter_suite_by_condition(
2684
self.suite, tests.condition_id_in_list(id_list))
2685
my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
2686
re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
2687
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2689
def test_condition_id_startswith(self):
2690
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2691
start1 = klass + 'test_condition_id_starts'
2692
start2 = klass + 'test_condition_id_in'
2693
test_names = [ klass + 'test_condition_id_in_list',
2694
klass + 'test_condition_id_startswith',
2696
filtered_suite = tests.filter_suite_by_condition(
2697
self.suite, tests.condition_id_startswith([start1, start2]))
2698
self.assertEqual(test_names, _test_ids(filtered_suite))
2700
def test_condition_isinstance(self):
2701
filtered_suite = tests.filter_suite_by_condition(
2702
self.suite, tests.condition_isinstance(self.__class__))
2703
class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2704
re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
2705
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2707
def test_exclude_tests_by_condition(self):
2708
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2709
'test_exclude_tests_by_condition')
2710
filtered_suite = tests.exclude_tests_by_condition(self.suite,
2711
lambda x:x.id() == excluded_name)
2712
self.assertEqual(len(self.all_names) - 1,
2713
filtered_suite.countTestCases())
2714
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2715
remaining_names = list(self.all_names)
2716
remaining_names.remove(excluded_name)
2717
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2719
def test_exclude_tests_by_re(self):
2720
self.all_names = _test_ids(self.suite)
2721
filtered_suite = tests.exclude_tests_by_re(self.suite,
2722
'exclude_tests_by_re')
2723
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2724
'test_exclude_tests_by_re')
2725
self.assertEqual(len(self.all_names) - 1,
2726
filtered_suite.countTestCases())
2727
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2728
remaining_names = list(self.all_names)
2729
remaining_names.remove(excluded_name)
2730
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2732
def test_filter_suite_by_condition(self):
2733
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2734
'test_filter_suite_by_condition')
2735
filtered_suite = tests.filter_suite_by_condition(self.suite,
2736
lambda x:x.id() == test_name)
2737
self.assertEqual([test_name], _test_ids(filtered_suite))
2739
def test_filter_suite_by_re(self):
2740
filtered_suite = tests.filter_suite_by_re(self.suite,
2741
'test_filter_suite_by_r')
2742
filtered_names = _test_ids(filtered_suite)
2743
self.assertEqual(filtered_names, ['breezy.tests.test_selftest.'
2744
'TestSelftestFiltering.test_filter_suite_by_re'])
2746
def test_filter_suite_by_id_list(self):
2747
test_list = ['breezy.tests.test_selftest.'
2748
'TestSelftestFiltering.test_filter_suite_by_id_list']
2749
filtered_suite = tests.filter_suite_by_id_list(
2750
self.suite, tests.TestIdList(test_list))
2751
filtered_names = _test_ids(filtered_suite)
2754
['breezy.tests.test_selftest.'
2755
'TestSelftestFiltering.test_filter_suite_by_id_list'])
2757
def test_filter_suite_by_id_startswith(self):
2758
# By design this test may fail if another test is added whose name also
2759
# begins with one of the start value used.
2760
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2761
start1 = klass + 'test_filter_suite_by_id_starts'
2762
start2 = klass + 'test_filter_suite_by_id_li'
2763
test_list = [klass + 'test_filter_suite_by_id_list',
2764
klass + 'test_filter_suite_by_id_startswith',
2766
filtered_suite = tests.filter_suite_by_id_startswith(
2767
self.suite, [start1, start2])
2770
_test_ids(filtered_suite),
2773
def test_preserve_input(self):
2774
# NB: Surely this is something in the stdlib to do this?
2775
self.assertTrue(self.suite is tests.preserve_input(self.suite))
2776
self.assertTrue("@#$" is tests.preserve_input("@#$"))
2778
def test_randomize_suite(self):
2779
randomized_suite = tests.randomize_suite(self.suite)
2780
# randomizing should not add or remove test names.
2781
self.assertEqual(set(_test_ids(self.suite)),
2782
set(_test_ids(randomized_suite)))
2783
# Technically, this *can* fail, because random.shuffle(list) can be
2784
# equal to list. Trying multiple times just pushes the frequency back.
2785
# As its len(self.all_names)!:1, the failure frequency should be low
2786
# enough to ignore. RBC 20071021.
2787
# It should change the order.
2788
self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
2789
# But not the length. (Possibly redundant with the set test, but not
2791
self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
2793
def test_split_suit_by_condition(self):
2794
self.all_names = _test_ids(self.suite)
2795
condition = tests.condition_id_re('test_filter_suite_by_r')
2796
split_suite = tests.split_suite_by_condition(self.suite, condition)
2797
filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2798
'test_filter_suite_by_re')
2799
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2800
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2801
remaining_names = list(self.all_names)
2802
remaining_names.remove(filtered_name)
2803
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2805
def test_split_suit_by_re(self):
2806
self.all_names = _test_ids(self.suite)
2807
split_suite = tests.split_suite_by_re(self.suite,
2808
'test_filter_suite_by_r')
2809
filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2810
'test_filter_suite_by_re')
2811
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2812
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2813
remaining_names = list(self.all_names)
2814
remaining_names.remove(filtered_name)
2815
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2818
class TestCheckTreeShape(tests.TestCaseWithTransport):
2820
def test_check_tree_shape(self):
2821
files = ['a', 'b/', 'b/c']
2822
tree = self.make_branch_and_tree('.')
2823
self.build_tree(files)
2827
self.check_tree_shape(tree, files)
2832
class TestBlackboxSupport(tests.TestCase):
2833
"""Tests for testsuite blackbox features."""
2835
def test_run_bzr_failure_not_caught(self):
2836
# When we run bzr in blackbox mode, we want any unexpected errors to
2837
# propagate up to the test suite so that it can show the error in the
2838
# usual way, and we won't get a double traceback.
2839
e = self.assertRaises(
2841
self.run_bzr, ['assert-fail'])
2842
# make sure we got the real thing, not an error from somewhere else in
2843
# the test framework
2844
self.assertEqual('always fails', str(e))
2845
# check that there's no traceback in the test log
2846
self.assertNotContainsRe(self.get_log(), r'Traceback')
2848
def test_run_bzr_user_error_caught(self):
2849
# Running bzr in blackbox mode, normal/expected/user errors should be
2850
# caught in the regular way and turned into an error message plus exit
2852
transport_server = memory.MemoryServer()
2853
transport_server.start_server()
2854
self.addCleanup(transport_server.stop_server)
2855
url = transport_server.get_url()
2856
self.permit_url(url)
2857
out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2858
self.assertEqual(out, '')
2859
self.assertContainsRe(err,
2860
'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
2863
class TestTestLoader(tests.TestCase):
2864
"""Tests for the test loader."""
2866
def _get_loader_and_module(self):
2867
"""Gets a TestLoader and a module with one test in it."""
2868
loader = TestUtil.TestLoader()
2870
class Stub(tests.TestCase):
2873
class MyModule(object):
2875
MyModule.a_class = Stub
2877
module.__name__ = 'fake_module'
2878
return loader, module
2880
def test_module_no_load_tests_attribute_loads_classes(self):
2881
loader, module = self._get_loader_and_module()
2882
self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases())
2884
def test_module_load_tests_attribute_gets_called(self):
2885
loader, module = self._get_loader_and_module()
2886
def load_tests(loader, standard_tests, pattern):
2887
result = loader.suiteClass()
2888
for test in tests.iter_suite_tests(standard_tests):
2889
result.addTests([test, test])
2891
# add a load_tests() method which multiplies the tests from the module.
2892
module.__class__.load_tests = staticmethod(load_tests)
2894
2 * [str(module.a_class('test_foo'))],
2895
list(map(str, loader.loadTestsFromModule(module))))
2897
def test_load_tests_from_module_name_smoke_test(self):
2898
loader = TestUtil.TestLoader()
2899
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2900
self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
2903
def test_load_tests_from_module_name_with_bogus_module_name(self):
2904
loader = TestUtil.TestLoader()
2905
self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
2908
class TestTestIdList(tests.TestCase):
2910
def _create_id_list(self, test_list):
2911
return tests.TestIdList(test_list)
2913
def _create_suite(self, test_id_list):
2915
class Stub(tests.TestCase):
2919
def _create_test_id(id):
2922
suite = TestUtil.TestSuite()
2923
for id in test_id_list:
2924
t = Stub('test_foo')
2925
t.id = _create_test_id(id)
2929
def _test_ids(self, test_suite):
2930
"""Get the ids for the tests in a test suite."""
2931
return [t.id() for t in tests.iter_suite_tests(test_suite)]
2933
def test_empty_list(self):
2934
id_list = self._create_id_list([])
2935
self.assertEqual({}, id_list.tests)
2936
self.assertEqual({}, id_list.modules)
2938
def test_valid_list(self):
2939
id_list = self._create_id_list(
2940
['mod1.cl1.meth1', 'mod1.cl1.meth2',
2941
'mod1.func1', 'mod1.cl2.meth2',
2943
'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
2945
self.assertTrue(id_list.refers_to('mod1'))
2946
self.assertTrue(id_list.refers_to('mod1.submod1'))
2947
self.assertTrue(id_list.refers_to('mod1.submod2'))
2948
self.assertTrue(id_list.includes('mod1.cl1.meth1'))
2949
self.assertTrue(id_list.includes('mod1.submod1'))
2950
self.assertTrue(id_list.includes('mod1.func1'))
2952
def test_bad_chars_in_params(self):
2953
id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
2954
self.assertTrue(id_list.refers_to('mod1'))
2955
self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
2957
def test_module_used(self):
2958
id_list = self._create_id_list(['mod.class.meth'])
2959
self.assertTrue(id_list.refers_to('mod'))
2960
self.assertTrue(id_list.refers_to('mod.class'))
2961
self.assertTrue(id_list.refers_to('mod.class.meth'))
2963
def test_test_suite_matches_id_list_with_unknown(self):
2964
loader = TestUtil.TestLoader()
2965
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2966
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',
2968
not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
2969
self.assertEqual(['bogus'], not_found)
2970
self.assertEqual([], duplicates)
2972
def test_suite_matches_id_list_with_duplicates(self):
2973
loader = TestUtil.TestLoader()
2974
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2975
dupes = loader.suiteClass()
2976
for test in tests.iter_suite_tests(suite):
2978
dupes.addTest(test) # Add it again
2980
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',]
2981
not_found, duplicates = tests.suite_matches_id_list(
2983
self.assertEqual([], not_found)
2984
self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
2988
class TestTestSuite(tests.TestCase):
2990
def test__test_suite_testmod_names(self):
2991
# Test that a plausible list of test module names are returned
2992
# by _test_suite_testmod_names.
2993
test_list = tests._test_suite_testmod_names()
2995
'breezy.tests.blackbox',
2996
'breezy.tests.per_transport',
2997
'breezy.tests.test_selftest',
3001
def test__test_suite_modules_to_doctest(self):
3002
# Test that a plausible list of modules to doctest is returned
3003
# by _test_suite_modules_to_doctest.
3004
test_list = tests._test_suite_modules_to_doctest()
3006
# When docstrings are stripped, there are no modules to doctest
3007
self.assertEqual([], test_list)
3014
def test_test_suite(self):
3015
# test_suite() loads the entire test suite to operate. To avoid this
3016
# overhead, and yet still be confident that things are happening,
3017
# we temporarily replace two functions used by test_suite with
3018
# test doubles that supply a few sample tests to load, and check they
3021
def testmod_names():
3022
calls.append("testmod_names")
3024
'breezy.tests.blackbox.test_branch',
3025
'breezy.tests.per_transport',
3026
'breezy.tests.test_selftest',
3028
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3030
calls.append("modules_to_doctest")
3033
return ['breezy.timestamp']
3034
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3035
expected_test_list = [
3037
'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
3038
('breezy.tests.per_transport.TransportTests'
3039
'.test_abspath(LocalTransport,LocalURLServer)'),
3040
'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
3041
# plugins can't be tested that way since selftest may be run with
3044
if __doc__ is not None:
3045
expected_test_list.extend([
3046
# modules_to_doctest
3047
'breezy.timestamp.format_highres_date',
3049
suite = tests.test_suite()
3050
self.assertEqual({"testmod_names", "modules_to_doctest"},
3052
self.assertSubset(expected_test_list, _test_ids(suite))
3054
def test_test_suite_list_and_start(self):
3055
# We cannot test this at the same time as the main load, because we want
3056
# to know that starting_with == None works. So a second load is
3057
# incurred - note that the starting_with parameter causes a partial load
3058
# rather than a full load so this test should be pretty quick.
3059
test_list = ['breezy.tests.test_selftest.TestTestSuite.test_test_suite']
3060
suite = tests.test_suite(test_list,
3061
['breezy.tests.test_selftest.TestTestSuite'])
3062
# test_test_suite_list_and_start is not included
3063
self.assertEqual(test_list, _test_ids(suite))
3066
class TestLoadTestIdList(tests.TestCaseInTempDir):
3068
def _create_test_list_file(self, file_name, content):
3069
fl = open(file_name, 'wt')
3073
def test_load_unknown(self):
3074
self.assertRaises(errors.NoSuchFile,
3075
tests.load_test_id_list, 'i_do_not_exist')
3077
def test_load_test_list(self):
3078
test_list_fname = 'test.list'
3079
self._create_test_list_file(test_list_fname,
3080
'mod1.cl1.meth1\nmod2.cl2.meth2\n')
3081
tlist = tests.load_test_id_list(test_list_fname)
3082
self.assertEqual(2, len(tlist))
3083
self.assertEqual('mod1.cl1.meth1', tlist[0])
3084
self.assertEqual('mod2.cl2.meth2', tlist[1])
3086
def test_load_dirty_file(self):
3087
test_list_fname = 'test.list'
3088
self._create_test_list_file(test_list_fname,
3089
' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n'
3091
tlist = tests.load_test_id_list(test_list_fname)
3092
self.assertEqual(4, len(tlist))
3093
self.assertEqual('mod1.cl1.meth1', tlist[0])
3094
self.assertEqual('', tlist[1])
3095
self.assertEqual('mod2.cl2.meth2', tlist[2])
3096
self.assertEqual('bar baz', tlist[3])
3099
class TestFilteredByModuleTestLoader(tests.TestCase):
3101
def _create_loader(self, test_list):
3102
id_filter = tests.TestIdList(test_list)
3103
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3106
def test_load_tests(self):
3107
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3108
loader = self._create_loader(test_list)
3109
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3110
self.assertEqual(test_list, _test_ids(suite))
3112
def test_exclude_tests(self):
3113
test_list = ['bogus']
3114
loader = self._create_loader(test_list)
3115
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3116
self.assertEqual([], _test_ids(suite))
3119
class TestFilteredByNameStartTestLoader(tests.TestCase):
3121
def _create_loader(self, name_start):
3122
def needs_module(name):
3123
return name.startswith(name_start) or name_start.startswith(name)
3124
loader = TestUtil.FilteredByModuleTestLoader(needs_module)
3127
def test_load_tests(self):
3128
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3129
loader = self._create_loader('breezy.tests.test_samp')
3131
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3132
self.assertEqual(test_list, _test_ids(suite))
3134
def test_load_tests_inside_module(self):
3135
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3136
loader = self._create_loader('breezy.tests.test_sampler.Demo')
3138
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3139
self.assertEqual(test_list, _test_ids(suite))
3141
def test_exclude_tests(self):
3142
test_list = ['bogus']
3143
loader = self._create_loader('bogus')
3145
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3146
self.assertEqual([], _test_ids(suite))
3149
class TestTestPrefixRegistry(tests.TestCase):
3151
def _get_registry(self):
3152
tp_registry = tests.TestPrefixAliasRegistry()
3155
def test_register_new_prefix(self):
3156
tpr = self._get_registry()
3157
tpr.register('foo', 'fff.ooo.ooo')
3158
self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
3160
def test_register_existing_prefix(self):
3161
tpr = self._get_registry()
3162
tpr.register('bar', 'bbb.aaa.rrr')
3163
tpr.register('bar', 'bBB.aAA.rRR')
3164
self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
3165
self.assertThat(self.get_log(),
3166
DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3169
def test_get_unknown_prefix(self):
3170
tpr = self._get_registry()
3171
self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
3173
def test_resolve_prefix(self):
3174
tpr = self._get_registry()
3175
tpr.register('bar', 'bb.aa.rr')
3176
self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
3178
def test_resolve_unknown_alias(self):
3179
tpr = self._get_registry()
3180
self.assertRaises(errors.BzrCommandError,
3181
tpr.resolve_alias, 'I am not a prefix')
3183
def test_predefined_prefixes(self):
3184
tpr = tests.test_prefix_alias_registry
3185
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3186
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3187
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3188
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3189
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3190
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3193
class TestThreadLeakDetection(tests.TestCase):
3194
"""Ensure when tests leak threads we detect and report it"""
3196
class LeakRecordingResult(tests.ExtendedTestResult):
3198
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3200
def _report_thread_leak(self, test, leaks, alive):
3201
self.leaks.append((test, leaks))
3203
def test_testcase_without_addCleanups(self):
3204
"""Check old TestCase instances don't break with leak detection"""
3205
class Test(unittest.TestCase):
3208
result = self.LeakRecordingResult()
3210
result.startTestRun()
3212
result.stopTestRun()
3213
self.assertEqual(result._tests_leaking_threads_count, 0)
3214
self.assertEqual(result.leaks, [])
3216
def test_thread_leak(self):
3217
"""Ensure a thread that outlives the running of a test is reported
3219
Uses a thread that blocks on an event, and is started by the inner
3220
test case. As the thread outlives the inner case's run, it should be
3221
detected as a leak, but the event is then set so that the thread can
3222
be safely joined in cleanup so it's not leaked for real.
3224
event = threading.Event()
3225
thread = threading.Thread(name="Leaker", target=event.wait)
3226
class Test(tests.TestCase):
3227
def test_leak(self):
3229
result = self.LeakRecordingResult()
3230
test = Test("test_leak")
3231
self.addCleanup(thread.join)
3232
self.addCleanup(event.set)
3233
result.startTestRun()
3235
result.stopTestRun()
3236
self.assertEqual(result._tests_leaking_threads_count, 1)
3237
self.assertEqual(result._first_thread_leaker_id, test.id())
3238
self.assertEqual(result.leaks, [(test, {thread})])
3239
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3241
def test_multiple_leaks(self):
3242
"""Check multiple leaks are blamed on the test cases at fault
3244
Same concept as the previous test, but has one inner test method that
3245
leaks two threads, and one that doesn't leak at all.
3247
event = threading.Event()
3248
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3249
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3250
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3251
class Test(tests.TestCase):
3252
def test_first_leak(self):
3254
def test_second_no_leak(self):
3256
def test_third_leak(self):
3259
result = self.LeakRecordingResult()
3260
first_test = Test("test_first_leak")
3261
third_test = Test("test_third_leak")
3262
self.addCleanup(thread_a.join)
3263
self.addCleanup(thread_b.join)
3264
self.addCleanup(thread_c.join)
3265
self.addCleanup(event.set)
3266
result.startTestRun()
3268
[first_test, Test("test_second_no_leak"), third_test]
3270
result.stopTestRun()
3271
self.assertEqual(result._tests_leaking_threads_count, 2)
3272
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3273
self.assertEqual(result.leaks, [
3274
(first_test, {thread_b}),
3275
(third_test, {thread_a, thread_c})])
3276
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3279
class TestPostMortemDebugging(tests.TestCase):
3280
"""Check post mortem debugging works when tests fail or error"""
3282
class TracebackRecordingResult(tests.ExtendedTestResult):
3284
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3285
self.postcode = None
3286
def _post_mortem(self, tb=None):
3287
"""Record the code object at the end of the current traceback"""
3288
tb = tb or sys.exc_info()[2]
3291
while next is not None:
3294
self.postcode = tb.tb_frame.f_code
3295
def report_error(self, test, err):
3297
def report_failure(self, test, err):
3300
def test_location_unittest_error(self):
3301
"""Needs right post mortem traceback with erroring unittest case"""
3302
class Test(unittest.TestCase):
3305
result = self.TracebackRecordingResult()
3307
self.assertEqual(result.postcode, Test.runTest.__code__)
3309
def test_location_unittest_failure(self):
3310
"""Needs right post mortem traceback with failing unittest case"""
3311
class Test(unittest.TestCase):
3313
raise self.failureException
3314
result = self.TracebackRecordingResult()
3316
self.assertEqual(result.postcode, Test.runTest.__code__)
3318
def test_location_bt_error(self):
3319
"""Needs right post mortem traceback with erroring breezy.tests case"""
3320
class Test(tests.TestCase):
3321
def test_error(self):
3323
result = self.TracebackRecordingResult()
3324
Test("test_error").run(result)
3325
self.assertEqual(result.postcode, Test.test_error.__code__)
3327
def test_location_bt_failure(self):
3328
"""Needs right post mortem traceback with failing breezy.tests case"""
3329
class Test(tests.TestCase):
3330
def test_failure(self):
3331
raise self.failureException
3332
result = self.TracebackRecordingResult()
3333
Test("test_failure").run(result)
3334
self.assertEqual(result.postcode, Test.test_failure.__code__)
3336
def test_env_var_triggers_post_mortem(self):
3337
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3339
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3340
post_mortem_calls = []
3341
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3342
self.overrideEnv('BRZ_TEST_PDB', None)
3343
result._post_mortem(1)
3344
self.overrideEnv('BRZ_TEST_PDB', 'on')
3345
result._post_mortem(2)
3346
self.assertEqual([2], post_mortem_calls)
3349
class TestRunSuite(tests.TestCase):
3351
def test_runner_class(self):
3352
"""run_suite accepts and uses a runner_class keyword argument."""
3353
class Stub(tests.TestCase):
3356
suite = Stub("test_foo")
3358
class MyRunner(tests.TextTestRunner):
3359
def run(self, test):
3361
return tests.ExtendedTestResult(self.stream, self.descriptions,
3363
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3364
self.assertLength(1, calls)
3367
class _Selftest(object):
3368
"""Mixin for tests needing full selftest output"""
3370
def _inject_stream_into_subunit(self, stream):
3371
"""To be overridden by subclasses that run tests out of process"""
3373
def _run_selftest(self, **kwargs):
3375
self._inject_stream_into_subunit(sio)
3376
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3377
return sio.getvalue()
3380
class _ForkedSelftest(_Selftest):
3381
"""Mixin for tests needing full selftest output with forked children"""
3383
_test_needs_features = [features.subunit]
3385
def _inject_stream_into_subunit(self, stream):
3386
"""Monkey-patch subunit so the extra output goes to stream not stdout
3388
Some APIs need rewriting so this kind of bogus hackery can be replaced
3389
by passing the stream param from run_tests down into ProtocolTestCase.
3391
from subunit import ProtocolTestCase
3392
_original_init = ProtocolTestCase.__init__
3393
def _init_with_passthrough(self, *args, **kwargs):
3394
_original_init(self, *args, **kwargs)
3395
self._passthrough = stream
3396
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3398
def _run_selftest(self, **kwargs):
3399
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3400
if getattr(os, "fork", None) is None:
3401
raise tests.TestNotApplicable("Platform doesn't support forking")
3402
# Make sure the fork code is actually invoked by claiming two cores
3403
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3404
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3405
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3408
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3409
"""Check operation of --parallel=fork selftest option"""
3411
def test_error_in_child_during_fork(self):
3412
"""Error in a forked child during test setup should get reported"""
3413
class Test(tests.TestCase):
3414
def testMethod(self):
3416
# We don't care what, just break something that a child will run
3417
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3418
out = self._run_selftest(test_suite_factory=Test)
3419
# Lines from the tracebacks of the two child processes may be mixed
3420
# together due to the way subunit parses and forwards the streams,
3421
# so permit extra lines between each part of the error output.
3422
self.assertContainsRe(out,
3425
".+ in fork_for_tests\n"
3427
"\s*workaround_zealous_crypto_random\(\)\n"
3432
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3433
"""Check a test case still alive after being run emits a warning"""
3435
class Test(tests.TestCase):
3436
def test_pass(self):
3438
def test_self_ref(self):
3439
self.also_self = self.test_self_ref
3440
def test_skip(self):
3441
self.skipTest("Don't need")
3443
def _get_suite(self):
3444
return TestUtil.TestSuite([
3445
self.Test("test_pass"),
3446
self.Test("test_self_ref"),
3447
self.Test("test_skip"),
3450
def _run_selftest_with_suite(self, **kwargs):
3451
old_flags = tests.selftest_debug_flags
3452
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3453
gc_on = gc.isenabled()
3457
output = self._run_selftest(test_suite_factory=self._get_suite,
3462
tests.selftest_debug_flags = old_flags
3463
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3464
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3467
def test_testsuite(self):
3468
self._run_selftest_with_suite()
3470
def test_pattern(self):
3471
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3472
self.assertNotContainsRe(out, "test_skip")
3474
def test_exclude_pattern(self):
3475
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3476
self.assertNotContainsRe(out, "test_skip")
3478
def test_random_seed(self):
3479
self._run_selftest_with_suite(random_seed="now")
3481
def test_matching_tests_first(self):
3482
self._run_selftest_with_suite(matching_tests_first=True,
3483
pattern="test_self_ref$")
3485
def test_starting_with_and_exclude(self):
3486
out = self._run_selftest_with_suite(starting_with=["bt."],
3487
exclude_pattern="test_skip$")
3488
self.assertNotContainsRe(out, "test_skip")
3490
def test_additonal_decorator(self):
3491
out = self._run_selftest_with_suite(
3492
suite_decorators=[tests.TestDecorator])
3495
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3496
"""Check warnings from tests staying alive are emitted with subunit"""
3498
_test_needs_features = [features.subunit]
3500
def _run_selftest_with_suite(self, **kwargs):
3501
return TestUncollectedWarnings._run_selftest_with_suite(self,
3502
runner_class=tests.SubUnitBzrRunner, **kwargs)
3505
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3506
"""Check warnings from tests staying alive are emitted when forking"""
3509
class TestEnvironHandling(tests.TestCase):
3511
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3512
self.assertFalse('MYVAR' in os.environ)
3513
self.overrideEnv('MYVAR', '42')
3514
# We use an embedded test to make sure we fix the _captureVar bug
3515
class Test(tests.TestCase):
3517
# The first call save the 42 value
3518
self.overrideEnv('MYVAR', None)
3519
self.assertEqual(None, os.environ.get('MYVAR'))
3520
# Make sure we can call it twice
3521
self.overrideEnv('MYVAR', None)
3522
self.assertEqual(None, os.environ.get('MYVAR'))
3524
result = tests.TextTestResult(output, 0, 1)
3525
Test('test_me').run(result)
3526
if not result.wasStrictlySuccessful():
3527
self.fail(output.getvalue())
3528
# We get our value back
3529
self.assertEqual('42', os.environ.get('MYVAR'))
3532
class TestIsolatedEnv(tests.TestCase):
3533
"""Test isolating tests from os.environ.
3535
Since we use tests that are already isolated from os.environ a bit of care
3536
should be taken when designing the tests to avoid bootstrap side-effects.
3537
The tests start an already clean os.environ which allow doing valid
3538
assertions about which variables are present or not and design tests around
3542
class ScratchMonkey(tests.TestCase):
3547
def test_basics(self):
3548
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3549
# for tests.TestCase.
3550
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3551
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3552
# Being part of isolated_environ, BRZ_HOME should not appear here
3553
self.assertFalse('BRZ_HOME' in os.environ)
3554
# Make sure we know the definition of LINES: part of os.environ for
3556
self.assertTrue('LINES' in tests.isolated_environ)
3557
self.assertEqual('25', tests.isolated_environ['LINES'])
3558
self.assertEqual('25', os.environ['LINES'])
3560
def test_injecting_unknown_variable(self):
3561
# BRZ_HOME is known to be absent from os.environ
3562
test = self.ScratchMonkey('test_me')
3563
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3564
self.assertEqual('foo', os.environ['BRZ_HOME'])
3565
tests.restore_os_environ(test)
3566
self.assertFalse('BRZ_HOME' in os.environ)
3568
def test_injecting_known_variable(self):
3569
test = self.ScratchMonkey('test_me')
3570
# LINES is known to be present in os.environ
3571
tests.override_os_environ(test, {'LINES': '42'})
3572
self.assertEqual('42', os.environ['LINES'])
3573
tests.restore_os_environ(test)
3574
self.assertEqual('25', os.environ['LINES'])
3576
def test_deleting_variable(self):
3577
test = self.ScratchMonkey('test_me')
3578
# LINES is known to be present in os.environ
3579
tests.override_os_environ(test, {'LINES': None})
3580
self.assertTrue('LINES' not in os.environ)
3581
tests.restore_os_environ(test)
3582
self.assertEqual('25', os.environ['LINES'])
3585
class TestDocTestSuiteIsolation(tests.TestCase):
3586
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3588
Since tests.TestCase alreay provides an isolation from os.environ, we use
3589
the clean environment as a base for testing. To precisely capture the
3590
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3593
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3594
not `os.environ` so each test overrides it to suit its needs.
3598
def get_doctest_suite_for_string(self, klass, string):
3599
class Finder(doctest.DocTestFinder):
3601
def find(*args, **kwargs):
3602
test = doctest.DocTestParser().get_doctest(
3603
string, {}, 'foo', 'foo.py', 0)
3606
suite = klass(test_finder=Finder())
3609
def run_doctest_suite_for_string(self, klass, string):
3610
suite = self.get_doctest_suite_for_string(klass, string)
3612
result = tests.TextTestResult(output, 0, 1)
3614
return result, output
3616
def assertDocTestStringSucceds(self, klass, string):
3617
result, output = self.run_doctest_suite_for_string(klass, string)
3618
if not result.wasStrictlySuccessful():
3619
self.fail(output.getvalue())
3621
def assertDocTestStringFails(self, klass, string):
3622
result, output = self.run_doctest_suite_for_string(klass, string)
3623
if result.wasStrictlySuccessful():
3624
self.fail(output.getvalue())
3626
def test_injected_variable(self):
3627
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3630
>>> os.environ['LINES']
3633
# doctest.DocTestSuite fails as it sees '25'
3634
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3635
# tests.DocTestSuite sees '42'
3636
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3638
def test_deleted_variable(self):
3639
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3642
>>> os.environ.get('LINES')
3644
# doctest.DocTestSuite fails as it sees '25'
3645
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3646
# tests.DocTestSuite sees None
3647
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3650
class TestSelftestExcludePatterns(tests.TestCase):
3653
super(TestSelftestExcludePatterns, self).setUp()
3654
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3656
def suite_factory(self, keep_only=None, starting_with=None):
3657
"""A test suite factory with only a few tests."""
3658
class Test(tests.TestCase):
3660
# We don't need the full class path
3661
return self._testMethodName
3668
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3670
def assertTestList(self, expected, *selftest_args):
3671
# We rely on setUp installing the right test suite factory so we can
3672
# test at the command level without loading the whole test suite
3673
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3674
actual = out.splitlines()
3675
self.assertEqual(expected, actual)
3677
def test_full_list(self):
3678
self.assertTestList(['a', 'b', 'c'])
3680
def test_single_exclude(self):
3681
self.assertTestList(['b', 'c'], '-x', 'a')
3683
def test_mutiple_excludes(self):
3684
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3687
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3689
_test_needs_features = [features.subunit]
3692
super(TestCounterHooks, self).setUp()
3693
class Test(tests.TestCase):
3696
super(Test, self).setUp()
3697
self.hooks = hooks.Hooks()
3698
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3699
self.install_counter_hook(self.hooks, 'myhook')
3704
def run_hook_once(self):
3705
for hook in self.hooks['myhook']:
3708
self.test_class = Test
3710
def assertHookCalls(self, expected_calls, test_name):
3711
test = self.test_class(test_name)
3712
result = unittest.TestResult()
3714
self.assertTrue(hasattr(test, '_counters'))
3715
self.assertTrue('myhook' in test._counters)
3716
self.assertEqual(expected_calls, test._counters['myhook'])
3718
def test_no_hook(self):
3719
self.assertHookCalls(0, 'no_hook')
3721
def test_run_hook_once(self):
3722
tt = features.testtools
3723
if tt.module.__version__ < (0, 9, 8):
3724
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3725
self.assertHookCalls(1, 'run_hook_once')