1
# Copyright (C) 2005-2013, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the test framework."""
21
from functools import reduce
22
from io import BytesIO, StringIO, TextIOWrapper
31
from testtools import (
32
ExtendedToOriginalDecorator,
35
from testtools.content import Content
36
from testtools.content_type import ContentType
37
from testtools.matchers import (
41
import testtools.testresult.doubles
68
workingtree as git_workingtree,
70
from ..symbol_versioning import (
81
from ..trace import note, mutter
82
from ..transport import memory
85
def _test_ids(test_suite):
86
"""Get the ids for the tests in a test suite."""
87
return [t.id() for t in tests.iter_suite_tests(test_suite)]
90
class MetaTestLog(tests.TestCase):
92
def test_logging(self):
93
"""Test logs are captured when a test fails."""
94
self.log('a test message')
95
details = self.getDetails()
97
self.assertThat(log.content_type, Equals(ContentType(
98
"text", "plain", {"charset": "utf8"})))
99
self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
100
self.assertThat(self.get_log(),
101
DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
class TestTreeShape(tests.TestCaseInTempDir):
106
def test_unicode_paths(self):
107
self.requireFeature(features.UnicodeFilenameFeature)
109
filename = u'hell\u00d8'
110
self.build_tree_contents([(filename, b'contents of hello')])
111
self.assertPathExists(filename)
114
class TestClassesAvailable(tests.TestCase):
115
"""As a convenience we expose Test* classes from breezy.tests"""
117
def test_test_case(self):
118
from . import TestCase
120
def test_test_loader(self):
121
from . import TestLoader
123
def test_test_suite(self):
124
from . import TestSuite
127
class TestTransportScenarios(tests.TestCase):
128
"""A group of tests that test the transport implementation adaption core.
130
This is a meta test that the tests are applied to all available
133
This will be generalised in the future which is why it is in this
134
test file even though it is specific to transport tests at the moment.
137
def test_get_transport_permutations(self):
138
# this checks that get_test_permutations defined by the module is
139
# called by the get_transport_test_permutations function.
140
class MockModule(object):
141
def get_test_permutations(self):
142
return sample_permutation
143
sample_permutation = [(1, 2), (3, 4)]
144
from .per_transport import get_transport_test_permutations
145
self.assertEqual(sample_permutation,
146
get_transport_test_permutations(MockModule()))
148
def test_scenarios_include_all_modules(self):
149
# this checks that the scenario generator returns as many permutations
150
# as there are in all the registered transport modules - we assume if
151
# this matches its probably doing the right thing especially in
152
# combination with the tests for setting the right classes below.
153
from .per_transport import transport_test_permutations
154
from ..transport import _get_transport_modules
155
modules = _get_transport_modules()
156
permutation_count = 0
157
for module in modules:
159
permutation_count += len(reduce(getattr,
161
+ ".get_test_permutations").split('.')[1:],
162
__import__(module))())
163
except errors.DependencyNotPresent:
165
scenarios = transport_test_permutations()
166
self.assertEqual(permutation_count, len(scenarios))
168
def test_scenarios_include_transport_class(self):
169
# This test used to know about all the possible transports and the
170
# order they were returned but that seems overly brittle (mbp
172
from .per_transport import transport_test_permutations
173
scenarios = transport_test_permutations()
174
# there are at least that many builtin transports
175
self.assertTrue(len(scenarios) > 6)
176
one_scenario = scenarios[0]
177
self.assertIsInstance(one_scenario[0], str)
178
self.assertTrue(issubclass(one_scenario[1]["transport_class"],
179
breezy.transport.Transport))
180
self.assertTrue(issubclass(one_scenario[1]["transport_server"],
181
breezy.transport.Server))
184
class TestBranchScenarios(tests.TestCase):
186
def test_scenarios(self):
187
# check that constructor parameters are passed through to the adapted
189
from .per_branch import make_scenarios
192
formats = [("c", "C"), ("d", "D")]
193
scenarios = make_scenarios(server1, server2, formats)
194
self.assertEqual(2, len(scenarios))
197
{'branch_format': 'c',
198
'bzrdir_format': 'C',
199
'transport_readonly_server': 'b',
200
'transport_server': 'a'}),
202
{'branch_format': 'd',
203
'bzrdir_format': 'D',
204
'transport_readonly_server': 'b',
205
'transport_server': 'a'})],
209
class TestBzrDirScenarios(tests.TestCase):
211
def test_scenarios(self):
212
# check that constructor parameters are passed through to the adapted
214
from .per_controldir import make_scenarios
219
scenarios = make_scenarios(vfs_factory, server1, server2, formats)
222
{'bzrdir_format': 'c',
223
'transport_readonly_server': 'b',
224
'transport_server': 'a',
225
'vfs_transport_factory': 'v'}),
227
{'bzrdir_format': 'd',
228
'transport_readonly_server': 'b',
229
'transport_server': 'a',
230
'vfs_transport_factory': 'v'})],
234
class TestRepositoryScenarios(tests.TestCase):
236
def test_formats_to_scenarios(self):
237
from .per_repository import formats_to_scenarios
238
formats = [("(c)", remote.RemoteRepositoryFormat()),
239
("(d)", repository.format_registry.get(
240
b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
241
no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
243
vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
244
vfs_transport_factory="vfs")
245
# no_vfs generate scenarios without vfs_transport_factory
247
('RemoteRepositoryFormat(c)',
248
{'bzrdir_format': remote.RemoteBzrDirFormat(),
249
'repository_format': remote.RemoteRepositoryFormat(),
250
'transport_readonly_server': 'readonly',
251
'transport_server': 'server'}),
252
('RepositoryFormat2a(d)',
253
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
254
'repository_format': groupcompress_repo.RepositoryFormat2a(),
255
'transport_readonly_server': 'readonly',
256
'transport_server': 'server'})]
257
self.assertEqual(expected, no_vfs_scenarios)
259
('RemoteRepositoryFormat(c)',
260
{'bzrdir_format': remote.RemoteBzrDirFormat(),
261
'repository_format': remote.RemoteRepositoryFormat(),
262
'transport_readonly_server': 'readonly',
263
'transport_server': 'server',
264
'vfs_transport_factory': 'vfs'}),
265
('RepositoryFormat2a(d)',
266
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
267
'repository_format': groupcompress_repo.RepositoryFormat2a(),
268
'transport_readonly_server': 'readonly',
269
'transport_server': 'server',
270
'vfs_transport_factory': 'vfs'})],
274
class TestTestScenarioApplication(tests.TestCase):
275
"""Tests for the test adaption facilities."""
277
def test_apply_scenario(self):
278
from breezy.tests import apply_scenario
279
input_test = TestTestScenarioApplication("test_apply_scenario")
280
# setup two adapted tests
281
adapted_test1 = apply_scenario(input_test,
283
{"bzrdir_format": "bzr_format",
284
"repository_format": "repo_fmt",
285
"transport_server": "transport_server",
286
"transport_readonly_server": "readonly-server"}))
287
adapted_test2 = apply_scenario(input_test,
288
("new id 2", {"bzrdir_format": None}))
289
# input_test should have been altered.
290
self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
291
# the new tests are mutually incompatible, ensuring it has
292
# made new ones, and unspecified elements in the scenario
293
# should not have been altered.
294
self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
295
self.assertEqual("repo_fmt", adapted_test1.repository_format)
296
self.assertEqual("transport_server", adapted_test1.transport_server)
297
self.assertEqual("readonly-server",
298
adapted_test1.transport_readonly_server)
300
"breezy.tests.test_selftest.TestTestScenarioApplication."
301
"test_apply_scenario(new id)",
303
self.assertEqual(None, adapted_test2.bzrdir_format)
305
"breezy.tests.test_selftest.TestTestScenarioApplication."
306
"test_apply_scenario(new id 2)",
310
class TestInterRepositoryScenarios(tests.TestCase):
312
def test_scenarios(self):
313
# check that constructor parameters are passed through to the adapted
315
from .per_interrepository import make_scenarios
318
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
319
scenarios = make_scenarios(server1, server2, formats)
322
{'repository_format': 'C1',
323
'repository_format_to': 'C2',
324
'transport_readonly_server': 'b',
325
'transport_server': 'a',
326
'extra_setup': 'C3'}),
328
{'repository_format': 'D1',
329
'repository_format_to': 'D2',
330
'transport_readonly_server': 'b',
331
'transport_server': 'a',
332
'extra_setup': 'D3'})],
336
class TestWorkingTreeScenarios(tests.TestCase):
338
def test_scenarios(self):
339
# check that constructor parameters are passed through to the adapted
341
from .per_workingtree import make_scenarios
344
formats = [workingtree_4.WorkingTreeFormat4(),
345
workingtree_3.WorkingTreeFormat3(),
346
workingtree_4.WorkingTreeFormat6()]
347
scenarios = make_scenarios(server1, server2, formats,
348
remote_server='c', remote_readonly_server='d',
349
remote_backing_server='e')
351
('WorkingTreeFormat4',
352
{'bzrdir_format': formats[0]._matchingcontroldir,
353
'transport_readonly_server': 'b',
354
'transport_server': 'a',
355
'workingtree_format': formats[0]}),
356
('WorkingTreeFormat3',
357
{'bzrdir_format': formats[1]._matchingcontroldir,
358
'transport_readonly_server': 'b',
359
'transport_server': 'a',
360
'workingtree_format': formats[1]}),
361
('WorkingTreeFormat6',
362
{'bzrdir_format': formats[2]._matchingcontroldir,
363
'transport_readonly_server': 'b',
364
'transport_server': 'a',
365
'workingtree_format': formats[2]}),
366
('WorkingTreeFormat6,remote',
367
{'bzrdir_format': formats[2]._matchingcontroldir,
368
'repo_is_remote': True,
369
'transport_readonly_server': 'd',
370
'transport_server': 'c',
371
'vfs_transport_factory': 'e',
372
'workingtree_format': formats[2]}),
376
class TestTreeScenarios(tests.TestCase):
378
def test_scenarios(self):
379
# the tree implementation scenario generator is meant to setup one
380
# instance for each working tree format, one additional instance
381
# that will use the default wt format, but create a revision tree for
382
# the tests, and one more that uses the default wt format as a
383
# lightweight checkout of a remote repository. This means that the wt
384
# ones should have the workingtree_to_test_tree attribute set to
385
# 'return_parameter' and the revision one set to
386
# revision_tree_from_workingtree.
388
from .per_tree import (
389
_dirstate_tree_from_workingtree,
394
revision_tree_from_workingtree
398
smart_server = test_server.SmartTCPServer_for_testing
399
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
400
mem_server = memory.MemoryServer
401
formats = [workingtree_4.WorkingTreeFormat4(),
402
workingtree_3.WorkingTreeFormat3(), ]
403
scenarios = make_scenarios(server1, server2, formats)
404
self.assertEqual(9, len(scenarios))
405
default_wt_format = workingtree.format_registry.get_default()
406
wt4_format = workingtree_4.WorkingTreeFormat4()
407
wt5_format = workingtree_4.WorkingTreeFormat5()
408
wt6_format = workingtree_4.WorkingTreeFormat6()
409
git_wt_format = git_workingtree.GitWorkingTreeFormat()
410
expected_scenarios = [
411
('WorkingTreeFormat4',
412
{'bzrdir_format': formats[0]._matchingcontroldir,
413
'transport_readonly_server': 'b',
414
'transport_server': 'a',
415
'workingtree_format': formats[0],
416
'_workingtree_to_test_tree': return_parameter,
418
('WorkingTreeFormat3',
419
{'bzrdir_format': formats[1]._matchingcontroldir,
420
'transport_readonly_server': 'b',
421
'transport_server': 'a',
422
'workingtree_format': formats[1],
423
'_workingtree_to_test_tree': return_parameter,
425
('WorkingTreeFormat6,remote',
426
{'bzrdir_format': wt6_format._matchingcontroldir,
427
'repo_is_remote': True,
428
'transport_readonly_server': smart_readonly_server,
429
'transport_server': smart_server,
430
'vfs_transport_factory': mem_server,
431
'workingtree_format': wt6_format,
432
'_workingtree_to_test_tree': return_parameter,
435
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
436
'bzrdir_format': default_wt_format._matchingcontroldir,
437
'transport_readonly_server': 'b',
438
'transport_server': 'a',
439
'workingtree_format': default_wt_format,
442
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
443
'bzrdir_format': git_wt_format._matchingcontroldir,
444
'transport_readonly_server': 'b',
445
'transport_server': 'a',
446
'workingtree_format': git_wt_format,
449
('DirStateRevisionTree,WT4',
450
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
451
'bzrdir_format': wt4_format._matchingcontroldir,
452
'transport_readonly_server': 'b',
453
'transport_server': 'a',
454
'workingtree_format': wt4_format,
456
('DirStateRevisionTree,WT5',
457
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
458
'bzrdir_format': wt5_format._matchingcontroldir,
459
'transport_readonly_server': 'b',
460
'transport_server': 'a',
461
'workingtree_format': wt5_format,
464
{'_workingtree_to_test_tree': preview_tree_pre,
465
'bzrdir_format': default_wt_format._matchingcontroldir,
466
'transport_readonly_server': 'b',
467
'transport_server': 'a',
468
'workingtree_format': default_wt_format}),
470
{'_workingtree_to_test_tree': preview_tree_post,
471
'bzrdir_format': default_wt_format._matchingcontroldir,
472
'transport_readonly_server': 'b',
473
'transport_server': 'a',
474
'workingtree_format': default_wt_format}),
476
self.assertEqual(expected_scenarios, scenarios)
479
class TestInterTreeScenarios(tests.TestCase):
480
"""A group of tests that test the InterTreeTestAdapter."""
482
def test_scenarios(self):
483
# check that constructor parameters are passed through to the adapted
485
# for InterTree tests we want the machinery to bring up two trees in
486
# each instance: the base one, and the one we are interacting with.
487
# because each optimiser can be direction specific, we need to test
488
# each optimiser in its chosen direction.
489
# unlike the TestProviderAdapter we dont want to automatically add a
490
# parameterized one for WorkingTree - the optimisers will tell us what
492
from .per_tree import (
495
from .per_intertree import (
498
from ..bzr.workingtree_3 import WorkingTreeFormat3
499
from ..bzr.workingtree_4 import WorkingTreeFormat4
500
input_test = TestInterTreeScenarios(
504
format1 = WorkingTreeFormat4()
505
format2 = WorkingTreeFormat3()
506
formats = [("1", str, format1, format2, "converter1"),
507
("2", int, format2, format1, "converter2")]
508
scenarios = make_scenarios(server1, server2, formats)
509
self.assertEqual(2, len(scenarios))
510
expected_scenarios = [
512
"bzrdir_format": format1._matchingcontroldir,
513
"intertree_class": formats[0][1],
514
"workingtree_format": formats[0][2],
515
"workingtree_format_to": formats[0][3],
516
"mutable_trees_to_test_trees": formats[0][4],
517
"_workingtree_to_test_tree": return_parameter,
518
"transport_server": server1,
519
"transport_readonly_server": server2,
522
"bzrdir_format": format2._matchingcontroldir,
523
"intertree_class": formats[1][1],
524
"workingtree_format": formats[1][2],
525
"workingtree_format_to": formats[1][3],
526
"mutable_trees_to_test_trees": formats[1][4],
527
"_workingtree_to_test_tree": return_parameter,
528
"transport_server": server1,
529
"transport_readonly_server": server2,
532
self.assertEqual(scenarios, expected_scenarios)
535
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
537
def test_home_is_not_working(self):
538
self.assertNotEqual(self.test_dir, self.test_home_dir)
539
cwd = osutils.getcwd()
540
self.assertIsSameRealPath(self.test_dir, cwd)
541
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
543
def test_assertEqualStat_equal(self):
544
from ..bzr.tests.test_dirstate import _FakeStat
545
self.build_tree(["foo"])
546
real = os.lstat("foo")
547
fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
548
real.st_dev, real.st_ino, real.st_mode)
549
self.assertEqualStat(real, fake)
551
def test_assertEqualStat_notequal(self):
552
self.build_tree(["foo", "longname"])
553
self.assertRaises(AssertionError, self.assertEqualStat,
554
os.lstat("foo"), os.lstat("longname"))
556
def test_assertPathExists(self):
557
self.assertPathExists('.')
558
self.build_tree(['foo/', 'foo/bar'])
559
self.assertPathExists('foo/bar')
560
self.assertPathDoesNotExist('foo/foo')
563
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
565
def test_home_is_non_existant_dir_under_root(self):
566
"""The test_home_dir for TestCaseWithMemoryTransport is missing.
568
This is because TestCaseWithMemoryTransport is for tests that do not
569
need any disk resources: they should be hooked into breezy in such a
570
way that no global settings are being changed by the test (only a
571
few tests should need to do that), and having a missing dir as home is
572
an effective way to ensure that this is the case.
574
self.assertIsSameRealPath(
575
self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
577
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
579
def test_cwd_is_TEST_ROOT(self):
580
self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
581
cwd = osutils.getcwd()
582
self.assertIsSameRealPath(self.test_dir, cwd)
584
def test_BRZ_HOME_and_HOME_are_bytestrings(self):
585
"""The $BRZ_HOME and $HOME environment variables should not be unicode.
587
See https://bugs.launchpad.net/bzr/+bug/464174
589
self.assertIsInstance(os.environ['BRZ_HOME'], str)
590
self.assertIsInstance(os.environ['HOME'], str)
592
def test_make_branch_and_memory_tree(self):
593
"""In TestCaseWithMemoryTransport we should not make the branch on disk.
595
This is hard to comprehensively robustly test, so we settle for making
596
a branch and checking no directory was created at its relpath.
598
tree = self.make_branch_and_memory_tree('dir')
599
# Guard against regression into MemoryTransport leaking
600
# files to disk instead of keeping them in memory.
601
self.assertFalse(osutils.lexists('dir'))
602
self.assertIsInstance(tree, memorytree.MemoryTree)
604
def test_make_branch_and_memory_tree_with_format(self):
605
"""make_branch_and_memory_tree should accept a format option."""
606
format = bzrdir.BzrDirMetaFormat1()
607
format.repository_format = repository.format_registry.get_default()
608
tree = self.make_branch_and_memory_tree('dir', format=format)
609
# Guard against regression into MemoryTransport leaking
610
# files to disk instead of keeping them in memory.
611
self.assertFalse(osutils.lexists('dir'))
612
self.assertIsInstance(tree, memorytree.MemoryTree)
613
self.assertEqual(format.repository_format.__class__,
614
tree.branch.repository._format.__class__)
616
def test_make_branch_builder(self):
617
builder = self.make_branch_builder('dir')
618
self.assertIsInstance(builder, branchbuilder.BranchBuilder)
619
# Guard against regression into MemoryTransport leaking
620
# files to disk instead of keeping them in memory.
621
self.assertFalse(osutils.lexists('dir'))
623
def test_make_branch_builder_with_format(self):
624
# Use a repo layout that doesn't conform to a 'named' layout, to ensure
625
# that the format objects are used.
626
format = bzrdir.BzrDirMetaFormat1()
627
repo_format = repository.format_registry.get_default()
628
format.repository_format = repo_format
629
builder = self.make_branch_builder('dir', format=format)
630
the_branch = builder.get_branch()
631
# Guard against regression into MemoryTransport leaking
632
# files to disk instead of keeping them in memory.
633
self.assertFalse(osutils.lexists('dir'))
634
self.assertEqual(format.repository_format.__class__,
635
the_branch.repository._format.__class__)
636
self.assertEqual(repo_format.get_format_string(),
637
self.get_transport().get_bytes(
638
'dir/.bzr/repository/format'))
640
def test_make_branch_builder_with_format_name(self):
641
builder = self.make_branch_builder('dir', format='knit')
642
the_branch = builder.get_branch()
643
# Guard against regression into MemoryTransport leaking
644
# files to disk instead of keeping them in memory.
645
self.assertFalse(osutils.lexists('dir'))
646
dir_format = controldir.format_registry.make_controldir('knit')
647
self.assertEqual(dir_format.repository_format.__class__,
648
the_branch.repository._format.__class__)
649
self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
650
self.get_transport().get_bytes(
651
'dir/.bzr/repository/format'))
653
def test_dangling_locks_cause_failures(self):
654
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
655
def test_function(self):
656
t = self.get_transport_from_path('.')
657
l = lockdir.LockDir(t, 'lock')
660
test = TestDanglingLock('test_function')
662
total_failures = result.errors + result.failures
663
if self._lock_check_thorough:
664
self.assertEqual(1, len(total_failures))
666
# When _lock_check_thorough is disabled, then we don't trigger a
668
self.assertEqual(0, len(total_failures))
671
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
672
"""Tests for the convenience functions TestCaseWithTransport introduces."""
674
def test_get_readonly_url_none(self):
675
from ..transport.readonly import ReadonlyTransportDecorator
676
self.vfs_transport_factory = memory.MemoryServer
677
self.transport_readonly_server = None
678
# calling get_readonly_transport() constructs a decorator on the url
680
url = self.get_readonly_url()
681
url2 = self.get_readonly_url('foo/bar')
682
t = transport.get_transport_from_url(url)
683
t2 = transport.get_transport_from_url(url2)
684
self.assertIsInstance(t, ReadonlyTransportDecorator)
685
self.assertIsInstance(t2, ReadonlyTransportDecorator)
686
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
688
def test_get_readonly_url_http(self):
689
from .http_server import HttpServer
690
from ..transport.http import HttpTransport
691
self.transport_server = test_server.LocalURLServer
692
self.transport_readonly_server = HttpServer
693
# calling get_readonly_transport() gives us a HTTP server instance.
694
url = self.get_readonly_url()
695
url2 = self.get_readonly_url('foo/bar')
696
# the transport returned may be any HttpTransportBase subclass
697
t = transport.get_transport_from_url(url)
698
t2 = transport.get_transport_from_url(url2)
699
self.assertIsInstance(t, HttpTransport)
700
self.assertIsInstance(t2, HttpTransport)
701
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
703
def test_is_directory(self):
704
"""Test assertIsDirectory assertion"""
705
t = self.get_transport()
706
self.build_tree(['a_dir/', 'a_file'], transport=t)
707
self.assertIsDirectory('a_dir', t)
708
self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
710
AssertionError, self.assertIsDirectory, 'not_here', t)
712
def test_make_branch_builder(self):
713
builder = self.make_branch_builder('dir')
714
rev_id = builder.build_commit()
715
self.assertPathExists('dir')
716
a_dir = controldir.ControlDir.open('dir')
717
self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
718
a_branch = a_dir.open_branch()
719
builder_branch = builder.get_branch()
720
self.assertEqual(a_branch.base, builder_branch.base)
721
self.assertEqual((1, rev_id), builder_branch.last_revision_info())
722
self.assertEqual((1, rev_id), a_branch.last_revision_info())
725
class TestTestCaseTransports(tests.TestCaseWithTransport):
728
super(TestTestCaseTransports, self).setUp()
729
self.vfs_transport_factory = memory.MemoryServer
731
def test_make_controldir_preserves_transport(self):
732
t = self.get_transport()
733
result_bzrdir = self.make_controldir('subdir')
734
self.assertIsInstance(result_bzrdir.transport,
735
memory.MemoryTransport)
736
# should not be on disk, should only be in memory
737
self.assertPathDoesNotExist('subdir')
740
class TestChrootedTest(tests.ChrootedTestCase):
742
def test_root_is_root(self):
743
t = transport.get_transport_from_url(self.get_readonly_url())
745
self.assertEqual(url, t.clone('..').base)
748
class TestProfileResult(tests.TestCase):
750
def test_profiles_tests(self):
751
self.requireFeature(features.lsprof_feature)
752
terminal = testtools.testresult.doubles.ExtendedTestResult()
753
result = tests.ProfileResult(terminal)
755
class Sample(tests.TestCase):
757
self.sample_function()
759
def sample_function(self):
763
case = terminal._events[0][1]
764
self.assertLength(1, case._benchcalls)
765
# We must be able to unpack it as the test reporting code wants
766
(_, _, _), stats = case._benchcalls[0]
767
self.assertTrue(callable(stats.pprint))
770
class TestTestResult(tests.TestCase):
772
def check_timing(self, test_case, expected_re):
773
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
774
capture = testtools.testresult.doubles.ExtendedTestResult()
775
test_case.run(MultiTestResult(result, capture))
776
run_case = capture._events[0][1]
777
timed_string = result._testTimeString(run_case)
778
self.assertContainsRe(timed_string, expected_re)
780
def test_test_reporting(self):
781
class ShortDelayTestCase(tests.TestCase):
782
def test_short_delay(self):
785
def test_short_benchmark(self):
786
self.time(time.sleep, 0.003)
787
self.check_timing(ShortDelayTestCase('test_short_delay'),
789
# if a benchmark time is given, we now show just that time followed by
791
self.check_timing(ShortDelayTestCase('test_short_benchmark'),
794
def test_unittest_reporting_unittest_class(self):
795
# getting the time from a non-breezy test works ok
796
class ShortDelayTestCase(unittest.TestCase):
797
def test_short_delay(self):
799
self.check_timing(ShortDelayTestCase('test_short_delay'),
802
def _time_hello_world_encoding(self):
803
"""Profile two sleep calls
805
This is used to exercise the test framework.
807
self.time(str, b'hello', errors='replace')
808
self.time(str, b'world', errors='replace')
810
def test_lsprofiling(self):
811
"""Verbose test result prints lsprof statistics from test cases."""
812
self.requireFeature(features.lsprof_feature)
813
result_stream = StringIO()
814
result = breezy.tests.VerboseTestResult(
819
# we want profile a call of some sort and check it is output by
820
# addSuccess. We dont care about addError or addFailure as they
821
# are not that interesting for performance tuning.
822
# make a new test instance that when run will generate a profile
823
example_test_case = TestTestResult("_time_hello_world_encoding")
824
example_test_case._gather_lsprof_in_benchmarks = True
825
# execute the test, which should succeed and record profiles
826
example_test_case.run(result)
827
# lsprofile_something()
828
# if this worked we want
829
# LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
830
# CallCount Recursive Total(ms) Inline(ms) module:lineno(function)
831
# (the lsprof header)
832
# ... an arbitrary number of lines
833
# and the function call which is time.sleep.
834
# 1 0 ??? ??? ???(sleep)
835
# and then repeated but with 'world', rather than 'hello'.
836
# this should appear in the output stream of our test result.
837
output = result_stream.getvalue()
838
self.assertContainsRe(output,
839
r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
840
self.assertContainsRe(output,
841
r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
842
self.assertContainsRe(output,
843
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
844
self.assertContainsRe(output,
845
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
847
def test_uses_time_from_testtools(self):
848
"""Test case timings in verbose results should use testtools times"""
851
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
852
def startTest(self, test):
853
self.time(datetime.datetime.utcfromtimestamp(1.145))
854
super(TimeAddedVerboseTestResult, self).startTest(test)
856
def addSuccess(self, test):
857
self.time(datetime.datetime.utcfromtimestamp(51.147))
858
super(TimeAddedVerboseTestResult, self).addSuccess(test)
860
def report_tests_starting(self): pass
862
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
863
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
865
def test_known_failure(self):
866
"""Using knownFailure should trigger several result actions."""
867
class InstrumentedTestResult(tests.ExtendedTestResult):
868
def stopTestRun(self): pass
870
def report_tests_starting(self): pass
872
def report_known_failure(self, test, err=None, details=None):
873
self._call = test, 'known failure'
874
result = InstrumentedTestResult(None, None, None, None)
876
class Test(tests.TestCase):
877
def test_function(self):
878
self.knownFailure('failed!')
879
test = Test("test_function")
881
# it should invoke 'report_known_failure'.
882
self.assertEqual(2, len(result._call))
883
self.assertEqual(test.id(), result._call[0].id())
884
self.assertEqual('known failure', result._call[1])
885
# we dont introspec the traceback, if the rest is ok, it would be
886
# exceptional for it not to be.
887
# it should update the known_failure_count on the object.
888
self.assertEqual(1, result.known_failure_count)
889
# the result should be successful.
890
self.assertTrue(result.wasSuccessful())
892
def test_verbose_report_known_failure(self):
893
# verbose test output formatting
894
result_stream = StringIO()
895
result = breezy.tests.VerboseTestResult(
900
_get_test("test_xfail").run(result)
901
self.assertContainsRe(result_stream.getvalue(),
902
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
903
"\\s*(?:Text attachment: )?reason"
908
def get_passing_test(self):
909
"""Return a test object that can't be run usefully."""
912
return unittest.FunctionTestCase(passing_test)
914
def test_add_not_supported(self):
915
"""Test the behaviour of invoking addNotSupported."""
916
class InstrumentedTestResult(tests.ExtendedTestResult):
917
def stopTestRun(self): pass
919
def report_tests_starting(self): pass
921
def report_unsupported(self, test, feature):
922
self._call = test, feature
923
result = InstrumentedTestResult(None, None, None, None)
924
test = SampleTestCase('_test_pass')
925
feature = features.Feature()
926
result.startTest(test)
927
result.addNotSupported(test, feature)
928
# it should invoke 'report_unsupported'.
929
self.assertEqual(2, len(result._call))
930
self.assertEqual(test, result._call[0])
931
self.assertEqual(feature, result._call[1])
932
# the result should be successful.
933
self.assertTrue(result.wasSuccessful())
934
# it should record the test against a count of tests not run due to
936
self.assertEqual(1, result.unsupported['Feature'])
937
# and invoking it again should increment that counter
938
result.addNotSupported(test, feature)
939
self.assertEqual(2, result.unsupported['Feature'])
941
def test_verbose_report_unsupported(self):
942
# verbose test output formatting
943
result_stream = StringIO()
944
result = breezy.tests.VerboseTestResult(
949
test = self.get_passing_test()
950
feature = features.Feature()
951
result.startTest(test)
952
prefix = len(result_stream.getvalue())
953
result.report_unsupported(test, feature)
954
output = result_stream.getvalue()[prefix:]
955
lines = output.splitlines()
956
# We don't check for the final '0ms' since it may fail on slow hosts
957
self.assertStartsWith(lines[0], 'NODEP')
958
self.assertEqual(lines[1],
959
" The feature 'Feature' is not available.")
961
def test_unavailable_exception(self):
962
"""An UnavailableFeature being raised should invoke addNotSupported."""
963
class InstrumentedTestResult(tests.ExtendedTestResult):
964
def stopTestRun(self):
967
def report_tests_starting(self):
970
def addNotSupported(self, test, feature):
971
self._call = test, feature
972
result = InstrumentedTestResult(None, None, None, None)
973
feature = features.Feature()
975
class Test(tests.TestCase):
976
def test_function(self):
977
raise tests.UnavailableFeature(feature)
978
test = Test("test_function")
980
# it should invoke 'addNotSupported'.
981
self.assertEqual(2, len(result._call))
982
self.assertEqual(test.id(), result._call[0].id())
983
self.assertEqual(feature, result._call[1])
984
# and not count as an error
985
self.assertEqual(0, result.error_count)
987
def test_strict_with_unsupported_feature(self):
988
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
989
test = self.get_passing_test()
990
feature = "Unsupported Feature"
991
result.addNotSupported(test, feature)
992
self.assertFalse(result.wasStrictlySuccessful())
993
self.assertEqual(None, result._extractBenchmarkTime(test))
995
def test_strict_with_known_failure(self):
996
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
997
test = _get_test("test_xfail")
999
self.assertFalse(result.wasStrictlySuccessful())
1000
self.assertEqual(None, result._extractBenchmarkTime(test))
1002
def test_strict_with_success(self):
1003
result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1004
test = self.get_passing_test()
1005
result.addSuccess(test)
1006
self.assertTrue(result.wasStrictlySuccessful())
1007
self.assertEqual(None, result._extractBenchmarkTime(test))
1009
def test_startTests(self):
1010
"""Starting the first test should trigger startTests."""
1011
class InstrumentedTestResult(tests.ExtendedTestResult):
1014
def startTests(self):
1016
result = InstrumentedTestResult(None, None, None, None)
1018
def test_function():
1020
test = unittest.FunctionTestCase(test_function)
1022
self.assertEqual(1, result.calls)
1024
def test_startTests_only_once(self):
1025
"""With multiple tests startTests should still only be called once"""
1026
class InstrumentedTestResult(tests.ExtendedTestResult):
1029
def startTests(self):
1031
result = InstrumentedTestResult(None, None, None, None)
1032
suite = unittest.TestSuite([
1033
unittest.FunctionTestCase(lambda: None),
1034
unittest.FunctionTestCase(lambda: None)])
1036
self.assertEqual(1, result.calls)
1037
self.assertEqual(2, result.count)
1040
class TestRunner(tests.TestCase):
1042
def dummy_test(self):
1045
def run_test_runner(self, testrunner, test):
1046
"""Run suite in testrunner, saving global state and restoring it.
1048
This current saves and restores:
1049
TestCaseInTempDir.TEST_ROOT
1051
There should be no tests in this file that use
1052
breezy.tests.TextTestRunner without using this convenience method,
1053
because of our use of global state.
1055
old_root = tests.TestCaseInTempDir.TEST_ROOT
1057
tests.TestCaseInTempDir.TEST_ROOT = None
1058
return testrunner.run(test)
1060
tests.TestCaseInTempDir.TEST_ROOT = old_root
1062
def test_known_failure_failed_run(self):
1063
# run a test that generates a known failure which should be printed in
1064
# the final output when real failures occur.
1065
class Test(tests.TestCase):
1066
def known_failure_test(self):
1067
self.expectFailure('failed', self.assertTrue, False)
1068
test = unittest.TestSuite()
1069
test.addTest(Test("known_failure_test"))
1072
raise AssertionError('foo')
1073
test.addTest(unittest.FunctionTestCase(failing_test))
1075
runner = tests.TextTestRunner(stream=stream)
1076
self.run_test_runner(runner, test)
1077
self.assertContainsRe(
1079
'(?sm)^brz selftest.*$'
1081
'^======================================================================\n'
1082
'^FAIL: failing_test\n'
1083
'^----------------------------------------------------------------------\n'
1084
'Traceback \\(most recent call last\\):\n'
1085
' .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1086
' raise AssertionError\\(\'foo\'\\)\n'
1088
'^----------------------------------------------------------------------\n'
1090
'FAILED \\(failures=1, known_failure_count=1\\)'
1093
def test_known_failure_ok_run(self):
1094
# run a test that generates a known failure which should be printed in
1096
class Test(tests.TestCase):
1097
def known_failure_test(self):
1098
self.knownFailure("Never works...")
1099
test = Test("known_failure_test")
1101
runner = tests.TextTestRunner(stream=stream)
1102
self.run_test_runner(runner, test)
1103
self.assertContainsRe(stream.getvalue(),
1106
'Ran 1 test in .*\n'
1108
'OK \\(known_failures=1\\)\n')
1110
def test_unexpected_success_bad(self):
1111
class Test(tests.TestCase):
1112
def test_truth(self):
1113
self.expectFailure("No absolute truth", self.assertTrue, True)
1114
runner = tests.TextTestRunner(stream=StringIO())
1115
self.run_test_runner(runner, Test("test_truth"))
1116
self.assertContainsRe(runner.stream.getvalue(),
1118
"FAIL: \\S+\\.test_truth\n"
1121
"\\s*(?:Text attachment: )?reason"
1127
"Ran 1 test in .*\n"
1129
"FAILED \\(failures=1\\)\n\\Z")
1131
def test_result_decorator(self):
1135
class LoggingDecorator(ExtendedToOriginalDecorator):
1136
def startTest(self, test):
1137
ExtendedToOriginalDecorator.startTest(self, test)
1138
calls.append('start')
1139
test = unittest.FunctionTestCase(lambda: None)
1141
runner = tests.TextTestRunner(stream=stream,
1142
result_decorators=[LoggingDecorator])
1143
self.run_test_runner(runner, test)
1144
self.assertLength(1, calls)
1146
def test_skipped_test(self):
1147
# run a test that is skipped, and check the suite as a whole still
1149
# skipping_test must be hidden in here so it's not run as a real test
1150
class SkippingTest(tests.TestCase):
1151
def skipping_test(self):
1152
raise tests.TestSkipped('test intentionally skipped')
1153
runner = tests.TextTestRunner(stream=StringIO())
1154
test = SkippingTest("skipping_test")
1155
result = self.run_test_runner(runner, test)
1156
self.assertTrue(result.wasSuccessful())
1158
def test_skipped_from_setup(self):
1161
class SkippedSetupTest(tests.TestCase):
1164
calls.append('setUp')
1165
self.addCleanup(self.cleanup)
1166
raise tests.TestSkipped('skipped setup')
1168
def test_skip(self):
1169
self.fail('test reached')
1172
calls.append('cleanup')
1174
runner = tests.TextTestRunner(stream=StringIO())
1175
test = SkippedSetupTest('test_skip')
1176
result = self.run_test_runner(runner, test)
1177
self.assertTrue(result.wasSuccessful())
1178
# Check if cleanup was called the right number of times.
1179
self.assertEqual(['setUp', 'cleanup'], calls)
1181
def test_skipped_from_test(self):
1184
class SkippedTest(tests.TestCase):
1187
super(SkippedTest, self).setUp()
1188
calls.append('setUp')
1189
self.addCleanup(self.cleanup)
1191
def test_skip(self):
1192
raise tests.TestSkipped('skipped test')
1195
calls.append('cleanup')
1197
runner = tests.TextTestRunner(stream=StringIO())
1198
test = SkippedTest('test_skip')
1199
result = self.run_test_runner(runner, test)
1200
self.assertTrue(result.wasSuccessful())
1201
# Check if cleanup was called the right number of times.
1202
self.assertEqual(['setUp', 'cleanup'], calls)
1204
def test_not_applicable(self):
1205
# run a test that is skipped because it's not applicable
1206
class Test(tests.TestCase):
1207
def not_applicable_test(self):
1208
raise tests.TestNotApplicable('this test never runs')
1210
runner = tests.TextTestRunner(stream=out, verbosity=2)
1211
test = Test("not_applicable_test")
1212
result = self.run_test_runner(runner, test)
1213
self.log(out.getvalue())
1214
self.assertTrue(result.wasSuccessful())
1215
self.assertTrue(result.wasStrictlySuccessful())
1216
self.assertContainsRe(out.getvalue(),
1217
r'(?m)not_applicable_test * N/A')
1218
self.assertContainsRe(out.getvalue(),
1219
r'(?m)^ this test never runs')
1221
def test_unsupported_features_listed(self):
1222
"""When unsupported features are encountered they are detailed."""
1223
class Feature1(features.Feature):
1227
class Feature2(features.Feature):
1230
# create sample tests
1231
test1 = SampleTestCase('_test_pass')
1232
test1._test_needs_features = [Feature1()]
1233
test2 = SampleTestCase('_test_pass')
1234
test2._test_needs_features = [Feature2()]
1235
test = unittest.TestSuite()
1239
runner = tests.TextTestRunner(stream=stream)
1240
self.run_test_runner(runner, test)
1241
lines = stream.getvalue().splitlines()
1244
"Missing feature 'Feature1' skipped 1 tests.",
1245
"Missing feature 'Feature2' skipped 1 tests.",
1249
def test_verbose_test_count(self):
1250
"""A verbose test run reports the right test count at the start"""
1251
suite = TestUtil.TestSuite([
1252
unittest.FunctionTestCase(lambda:None),
1253
unittest.FunctionTestCase(lambda:None)])
1254
self.assertEqual(suite.countTestCases(), 2)
1256
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1257
# Need to use the CountingDecorator as that's what sets num_tests
1258
self.run_test_runner(runner, tests.CountingDecorator(suite))
1259
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1261
def test_startTestRun(self):
1262
"""run should call result.startTestRun()"""
1265
class LoggingDecorator(ExtendedToOriginalDecorator):
1266
def startTestRun(self):
1267
ExtendedToOriginalDecorator.startTestRun(self)
1268
calls.append('startTestRun')
1269
test = unittest.FunctionTestCase(lambda: None)
1271
runner = tests.TextTestRunner(stream=stream,
1272
result_decorators=[LoggingDecorator])
1273
self.run_test_runner(runner, test)
1274
self.assertLength(1, calls)
1276
def test_stopTestRun(self):
1277
"""run should call result.stopTestRun()"""
1280
class LoggingDecorator(ExtendedToOriginalDecorator):
1281
def stopTestRun(self):
1282
ExtendedToOriginalDecorator.stopTestRun(self)
1283
calls.append('stopTestRun')
1284
test = unittest.FunctionTestCase(lambda: None)
1286
runner = tests.TextTestRunner(stream=stream,
1287
result_decorators=[LoggingDecorator])
1288
self.run_test_runner(runner, test)
1289
self.assertLength(1, calls)
1291
def test_unicode_test_output_on_ascii_stream(self):
1292
"""Showing results should always succeed even on an ascii console"""
1293
class FailureWithUnicode(tests.TestCase):
1294
def test_log_unicode(self):
1296
self.fail("Now print that log!")
1298
out = TextIOWrapper(bio, 'ascii', 'backslashreplace')
1299
self.overrideAttr(osutils, "get_terminal_encoding",
1300
lambda trace=False: "ascii")
1301
self.run_test_runner(
1302
tests.TextTestRunner(stream=out),
1303
FailureWithUnicode("test_log_unicode"))
1305
self.assertContainsRe(bio.getvalue(),
1306
b"(?:Text attachment: )?log"
1308
b"\\d+\\.\\d+ \\\\u2606"
1309
b"(?:\n-+\n|}}}\n)")
1312
class SampleTestCase(tests.TestCase):
1314
def _test_pass(self):
1318
class _TestException(Exception):
1322
class TestTestCase(tests.TestCase):
1323
"""Tests that test the core breezy TestCase."""
1325
def test_assertLength_matches_empty(self):
1327
self.assertLength(0, a_list)
1329
def test_assertLength_matches_nonempty(self):
1331
self.assertLength(3, a_list)
1333
def test_assertLength_fails_different(self):
1335
self.assertRaises(AssertionError, self.assertLength, 1, a_list)
1337
def test_assertLength_shows_sequence_in_failure(self):
1339
exception = self.assertRaises(AssertionError, self.assertLength, 2,
1341
self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
1344
def test_base_setUp_not_called_causes_failure(self):
1345
class TestCaseWithBrokenSetUp(tests.TestCase):
1347
pass # does not call TestCase.setUp
1351
test = TestCaseWithBrokenSetUp('test_foo')
1352
result = unittest.TestResult()
1354
self.assertFalse(result.wasSuccessful())
1355
self.assertEqual(1, result.testsRun)
1357
def test_base_tearDown_not_called_causes_failure(self):
1358
class TestCaseWithBrokenTearDown(tests.TestCase):
1360
pass # does not call TestCase.tearDown
1364
test = TestCaseWithBrokenTearDown('test_foo')
1365
result = unittest.TestResult()
1367
self.assertFalse(result.wasSuccessful())
1368
self.assertEqual(1, result.testsRun)
1370
def test_debug_flags_sanitised(self):
1371
"""The breezy debug flags should be sanitised by setUp."""
1372
if 'allow_debug' in tests.selftest_debug_flags:
1373
raise tests.TestNotApplicable(
1374
'-Eallow_debug option prevents debug flag sanitisation')
1375
# we could set something and run a test that will check
1376
# it gets santised, but this is probably sufficient for now:
1377
# if someone runs the test with -Dsomething it will error.
1379
if self._lock_check_thorough:
1380
flags.add('strict_locks')
1381
self.assertEqual(flags, breezy.debug.debug_flags)
1383
def change_selftest_debug_flags(self, new_flags):
1384
self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
1386
def test_allow_debug_flag(self):
1387
"""The -Eallow_debug flag prevents breezy.debug.debug_flags from being
1388
sanitised (i.e. cleared) before running a test.
1390
self.change_selftest_debug_flags({'allow_debug'})
1391
breezy.debug.debug_flags = {'a-flag'}
1393
class TestThatRecordsFlags(tests.TestCase):
1394
def test_foo(nested_self):
1395
self.flags = set(breezy.debug.debug_flags)
1396
test = TestThatRecordsFlags('test_foo')
1397
test.run(self.make_test_result())
1399
if 'disable_lock_checks' not in tests.selftest_debug_flags:
1400
flags.add('strict_locks')
1401
self.assertEqual(flags, self.flags)
1403
def test_disable_lock_checks(self):
1404
"""The -Edisable_lock_checks flag disables thorough checks."""
1405
class TestThatRecordsFlags(tests.TestCase):
1406
def test_foo(nested_self):
1407
self.flags = set(breezy.debug.debug_flags)
1408
self.test_lock_check_thorough = nested_self._lock_check_thorough
1409
self.change_selftest_debug_flags(set())
1410
test = TestThatRecordsFlags('test_foo')
1411
test.run(self.make_test_result())
1412
# By default we do strict lock checking and thorough lock/unlock
1414
self.assertTrue(self.test_lock_check_thorough)
1415
self.assertEqual({'strict_locks'}, self.flags)
1416
# Now set the disable_lock_checks flag, and show that this changed.
1417
self.change_selftest_debug_flags({'disable_lock_checks'})
1418
test = TestThatRecordsFlags('test_foo')
1419
test.run(self.make_test_result())
1420
self.assertFalse(self.test_lock_check_thorough)
1421
self.assertEqual(set(), self.flags)
1423
def test_this_fails_strict_lock_check(self):
1424
class TestThatRecordsFlags(tests.TestCase):
1425
def test_foo(nested_self):
1426
self.flags1 = set(breezy.debug.debug_flags)
1427
self.thisFailsStrictLockCheck()
1428
self.flags2 = set(breezy.debug.debug_flags)
1429
# Make sure lock checking is active
1430
self.change_selftest_debug_flags(set())
1431
test = TestThatRecordsFlags('test_foo')
1432
test.run(self.make_test_result())
1433
self.assertEqual({'strict_locks'}, self.flags1)
1434
self.assertEqual(set(), self.flags2)
1436
def test_debug_flags_restored(self):
1437
"""The breezy debug flags should be restored to their original state
1438
after the test was run, even if allow_debug is set.
1440
self.change_selftest_debug_flags({'allow_debug'})
1441
# Now run a test that modifies debug.debug_flags.
1442
breezy.debug.debug_flags = {'original-state'}
1444
class TestThatModifiesFlags(tests.TestCase):
1446
breezy.debug.debug_flags = {'modified'}
1447
test = TestThatModifiesFlags('test_foo')
1448
test.run(self.make_test_result())
1449
self.assertEqual({'original-state'}, breezy.debug.debug_flags)
1451
def make_test_result(self):
1452
"""Get a test result that writes to a StringIO."""
1453
return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1455
def inner_test(self):
1456
# the inner child test
1459
def outer_child(self):
1460
# the outer child test
1462
self.inner_test = TestTestCase("inner_child")
1463
result = self.make_test_result()
1464
self.inner_test.run(result)
1465
note("outer finish")
1466
self.addCleanup(osutils.delete_any, self._log_file_name)
1468
def test_trace_nesting(self):
1469
# this tests that each test case nests its trace facility correctly.
1470
# we do this by running a test case manually. That test case (A)
1471
# should setup a new log, log content to it, setup a child case (B),
1472
# which should log independently, then case (A) should log a trailer
1474
# we do two nested children so that we can verify the state of the
1475
# logs after the outer child finishes is correct, which a bad clean
1476
# up routine in tearDown might trigger a fault in our test with only
1477
# one child, we should instead see the bad result inside our test with
1479
# the outer child test
1480
original_trace = breezy.trace._trace_file
1481
outer_test = TestTestCase("outer_child")
1482
result = self.make_test_result()
1483
outer_test.run(result)
1484
self.assertEqual(original_trace, breezy.trace._trace_file)
1486
def method_that_times_a_bit_twice(self):
1487
# call self.time twice to ensure it aggregates
1488
self.time(time.sleep, 0.007)
1489
self.time(time.sleep, 0.007)
1491
def test_time_creates_benchmark_in_result(self):
1492
"""The TestCase.time() method accumulates a benchmark time."""
1493
sample_test = TestTestCase("method_that_times_a_bit_twice")
1494
output_stream = StringIO()
1495
result = breezy.tests.VerboseTestResult(
1499
sample_test.run(result)
1500
self.assertContainsRe(
1501
output_stream.getvalue(),
1504
def test_hooks_sanitised(self):
1505
"""The breezy hooks should be sanitised by setUp."""
1506
# Note this test won't fail with hooks that the core library doesn't
1507
# use - but it trigger with a plugin that adds hooks, so its still a
1508
# useful warning in that case.
1509
self.assertEqual(breezy.branch.BranchHooks(),
1510
breezy.branch.Branch.hooks)
1512
breezy.bzr.smart.server.SmartServerHooks(),
1513
breezy.bzr.smart.server.SmartTCPServer.hooks)
1515
breezy.commands.CommandHooks(), breezy.commands.Command.hooks)
1517
def test__gather_lsprof_in_benchmarks(self):
1518
"""When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1520
Each self.time() call is individually and separately profiled.
1522
self.requireFeature(features.lsprof_feature)
1523
# overrides the class member with an instance member so no cleanup
1525
self._gather_lsprof_in_benchmarks = True
1526
self.time(time.sleep, 0.000)
1527
self.time(time.sleep, 0.003)
1528
self.assertEqual(2, len(self._benchcalls))
1529
self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
1530
self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1531
self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats)
1532
self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats)
1533
del self._benchcalls[:]
1535
def test_knownFailure(self):
1536
"""Self.knownFailure() should raise a KnownFailure exception."""
1537
self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1539
def test_open_bzrdir_safe_roots(self):
1540
# even a memory transport should fail to open when its url isn't
1542
# Manually set one up (TestCase doesn't and shouldn't provide magic
1544
transport_server = memory.MemoryServer()
1545
transport_server.start_server()
1546
self.addCleanup(transport_server.stop_server)
1547
t = transport.get_transport_from_url(transport_server.get_url())
1548
controldir.ControlDir.create(t.base)
1549
self.assertRaises(errors.BzrError,
1550
controldir.ControlDir.open_from_transport, t)
1551
# But if we declare this as safe, we can open the bzrdir.
1552
self.permit_url(t.base)
1553
self._bzr_selftest_roots.append(t.base)
1554
controldir.ControlDir.open_from_transport(t)
1556
def test_requireFeature_available(self):
1557
"""self.requireFeature(available) is a no-op."""
1558
class Available(features.Feature):
1561
feature = Available()
1562
self.requireFeature(feature)
1564
def test_requireFeature_unavailable(self):
1565
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1566
class Unavailable(features.Feature):
1569
feature = Unavailable()
1570
self.assertRaises(tests.UnavailableFeature,
1571
self.requireFeature, feature)
1573
def test_run_no_parameters(self):
1574
test = SampleTestCase('_test_pass')
1577
def test_run_enabled_unittest_result(self):
1578
"""Test we revert to regular behaviour when the test is enabled."""
1579
test = SampleTestCase('_test_pass')
1581
class EnabledFeature(object):
1582
def available(self):
1584
test._test_needs_features = [EnabledFeature()]
1585
result = unittest.TestResult()
1587
self.assertEqual(1, result.testsRun)
1588
self.assertEqual([], result.errors)
1589
self.assertEqual([], result.failures)
1591
def test_run_disabled_unittest_result(self):
1592
"""Test our compatibility for disabled tests with unittest results."""
1593
test = SampleTestCase('_test_pass')
1595
class DisabledFeature(object):
1596
def available(self):
1598
test._test_needs_features = [DisabledFeature()]
1599
result = unittest.TestResult()
1601
self.assertEqual(1, result.testsRun)
1602
self.assertEqual([], result.errors)
1603
self.assertEqual([], result.failures)
1605
def test_run_disabled_supporting_result(self):
1606
"""Test disabled tests behaviour with support aware results."""
1607
test = SampleTestCase('_test_pass')
1609
class DisabledFeature(object):
1610
def __eq__(self, other):
1611
return isinstance(other, DisabledFeature)
1613
def available(self):
1615
the_feature = DisabledFeature()
1616
test._test_needs_features = [the_feature]
1618
class InstrumentedTestResult(unittest.TestResult):
1620
unittest.TestResult.__init__(self)
1623
def startTest(self, test):
1624
self.calls.append(('startTest', test))
1626
def stopTest(self, test):
1627
self.calls.append(('stopTest', test))
1629
def addNotSupported(self, test, feature):
1630
self.calls.append(('addNotSupported', test, feature))
1631
result = InstrumentedTestResult()
1633
case = result.calls[0][1]
1635
('startTest', case),
1636
('addNotSupported', case, the_feature),
1641
def test_start_server_registers_url(self):
1642
transport_server = memory.MemoryServer()
1643
# A little strict, but unlikely to be changed soon.
1644
self.assertEqual([], self._bzr_selftest_roots)
1645
self.start_server(transport_server)
1646
self.assertSubset([transport_server.get_url()],
1647
self._bzr_selftest_roots)
1649
def test_assert_list_raises_on_generator(self):
1650
def generator_which_will_raise():
1651
# This will not raise until after the first yield
1653
raise _TestException()
1655
e = self.assertListRaises(_TestException, generator_which_will_raise)
1656
self.assertIsInstance(e, _TestException)
1658
e = self.assertListRaises(Exception, generator_which_will_raise)
1659
self.assertIsInstance(e, _TestException)
1661
def test_assert_list_raises_on_plain(self):
1662
def plain_exception():
1663
raise _TestException()
1666
e = self.assertListRaises(_TestException, plain_exception)
1667
self.assertIsInstance(e, _TestException)
1669
e = self.assertListRaises(Exception, plain_exception)
1670
self.assertIsInstance(e, _TestException)
1672
def test_assert_list_raises_assert_wrong_exception(self):
1673
class _NotTestException(Exception):
1676
def wrong_exception():
1677
raise _NotTestException()
1679
def wrong_exception_generator():
1682
raise _NotTestException()
1684
# Wrong exceptions are not intercepted
1687
self.assertListRaises, _TestException, wrong_exception)
1690
self.assertListRaises, _TestException, wrong_exception_generator)
1692
def test_assert_list_raises_no_exception(self):
1696
def success_generator():
1700
self.assertRaises(AssertionError,
1701
self.assertListRaises, _TestException, success)
1705
self.assertListRaises, _TestException, success_generator)
1707
def _run_successful_test(self, test):
1708
result = testtools.TestResult()
1710
self.assertTrue(result.wasSuccessful())
1713
def test_overrideAttr_without_value(self):
1714
self.test_attr = 'original' # Define a test attribute
1715
obj = self # Make 'obj' visible to the embedded test
1717
class Test(tests.TestCase):
1720
super(Test, self).setUp()
1721
self.orig = self.overrideAttr(obj, 'test_attr')
1723
def test_value(self):
1724
self.assertEqual('original', self.orig)
1725
self.assertEqual('original', obj.test_attr)
1726
obj.test_attr = 'modified'
1727
self.assertEqual('modified', obj.test_attr)
1729
self._run_successful_test(Test('test_value'))
1730
self.assertEqual('original', obj.test_attr)
1732
def test_overrideAttr_with_value(self):
1733
self.test_attr = 'original' # Define a test attribute
1734
obj = self # Make 'obj' visible to the embedded test
1736
class Test(tests.TestCase):
1739
super(Test, self).setUp()
1740
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1742
def test_value(self):
1743
self.assertEqual('original', self.orig)
1744
self.assertEqual('modified', obj.test_attr)
1746
self._run_successful_test(Test('test_value'))
1747
self.assertEqual('original', obj.test_attr)
1749
def test_overrideAttr_with_no_existing_value_and_value(self):
1750
# Do not define the test_attribute
1751
obj = self # Make 'obj' visible to the embedded test
1753
class Test(tests.TestCase):
1756
tests.TestCase.setUp(self)
1757
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1759
def test_value(self):
1760
self.assertEqual(tests._unitialized_attr, self.orig)
1761
self.assertEqual('modified', obj.test_attr)
1763
self._run_successful_test(Test('test_value'))
1764
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1766
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1767
# Do not define the test_attribute
1768
obj = self # Make 'obj' visible to the embedded test
1770
class Test(tests.TestCase):
1773
tests.TestCase.setUp(self)
1774
self.orig = self.overrideAttr(obj, 'test_attr')
1776
def test_value(self):
1777
self.assertEqual(tests._unitialized_attr, self.orig)
1778
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1780
self._run_successful_test(Test('test_value'))
1781
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1783
def test_recordCalls(self):
1784
from breezy.tests import test_selftest
1785
calls = self.recordCalls(
1786
test_selftest, '_add_numbers')
1787
self.assertEqual(test_selftest._add_numbers(2, 10),
1789
self.assertEqual(calls, [((2, 10), {})])
1792
def _add_numbers(a, b):
1796
class _MissingFeature(features.Feature):
1801
missing_feature = _MissingFeature()
1804
def _get_test(name):
1805
"""Get an instance of a specific example test.
1807
We protect this in a function so that they don't auto-run in the test
1811
class ExampleTests(tests.TestCase):
1813
def test_fail(self):
1814
mutter('this was a failing test')
1815
self.fail('this test will fail')
1817
def test_error(self):
1818
mutter('this test errored')
1819
raise RuntimeError('gotcha')
1821
def test_missing_feature(self):
1822
mutter('missing the feature')
1823
self.requireFeature(missing_feature)
1825
def test_skip(self):
1826
mutter('this test will be skipped')
1827
raise tests.TestSkipped('reason')
1829
def test_success(self):
1830
mutter('this test succeeds')
1832
def test_xfail(self):
1833
mutter('test with expected failure')
1834
self.knownFailure('this_fails')
1836
def test_unexpected_success(self):
1837
mutter('test with unexpected success')
1838
self.expectFailure('should_fail', lambda: None)
1840
return ExampleTests(name)
1843
class TestTestCaseLogDetails(tests.TestCase):
1845
def _run_test(self, test_name):
1846
test = _get_test(test_name)
1847
result = testtools.TestResult()
1851
def test_fail_has_log(self):
1852
result = self._run_test('test_fail')
1853
self.assertEqual(1, len(result.failures))
1854
result_content = result.failures[0][1]
1855
self.assertContainsRe(result_content,
1856
'(?m)^(?:Text attachment: )?log(?:$|: )')
1857
self.assertContainsRe(result_content, 'this was a failing test')
1859
def test_error_has_log(self):
1860
result = self._run_test('test_error')
1861
self.assertEqual(1, len(result.errors))
1862
result_content = result.errors[0][1]
1863
self.assertContainsRe(result_content,
1864
'(?m)^(?:Text attachment: )?log(?:$|: )')
1865
self.assertContainsRe(result_content, 'this test errored')
1867
def test_skip_has_no_log(self):
1868
result = self._run_test('test_skip')
1869
reasons = result.skip_reasons
1870
self.assertEqual({'reason'}, set(reasons))
1871
skips = reasons['reason']
1872
self.assertEqual(1, len(skips))
1874
self.assertFalse('log' in test.getDetails())
1876
def test_missing_feature_has_no_log(self):
1877
# testtools doesn't know about addNotSupported, so it just gets
1878
# considered as a skip
1879
result = self._run_test('test_missing_feature')
1880
reasons = result.skip_reasons
1881
self.assertEqual({str(missing_feature)}, set(reasons))
1882
skips = reasons[str(missing_feature)]
1883
self.assertEqual(1, len(skips))
1885
self.assertFalse('log' in test.getDetails())
1887
def test_xfail_has_no_log(self):
1888
result = self._run_test('test_xfail')
1889
self.assertEqual(1, len(result.expectedFailures))
1890
result_content = result.expectedFailures[0][1]
1891
self.assertNotContainsRe(result_content,
1892
'(?m)^(?:Text attachment: )?log(?:$|: )')
1893
self.assertNotContainsRe(result_content, 'test with expected failure')
1895
def test_unexpected_success_has_log(self):
1896
result = self._run_test('test_unexpected_success')
1897
self.assertEqual(1, len(result.unexpectedSuccesses))
1898
# Inconsistency, unexpectedSuccesses is a list of tests,
1899
# expectedFailures is a list of reasons?
1900
test = result.unexpectedSuccesses[0]
1901
details = test.getDetails()
1902
self.assertTrue('log' in details)
1905
class TestTestCloning(tests.TestCase):
1906
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1908
def test_cloned_testcase_does_not_share_details(self):
1909
"""A TestCase cloned with clone_test does not share mutable attributes
1910
such as details or cleanups.
1912
class Test(tests.TestCase):
1914
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1915
orig_test = Test('test_foo')
1916
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1917
orig_test.run(unittest.TestResult())
1918
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1919
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1921
def test_double_apply_scenario_preserves_first_scenario(self):
1922
"""Applying two levels of scenarios to a test preserves the attributes
1923
added by both scenarios.
1925
class Test(tests.TestCase):
1928
test = Test('test_foo')
1929
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1930
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1931
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1932
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1933
all_tests = list(tests.iter_suite_tests(suite))
1934
self.assertLength(4, all_tests)
1935
all_xys = sorted((t.x, t.y) for t in all_tests)
1936
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1939
# NB: Don't delete this; it's not actually from 0.11!
1940
@deprecated_function(deprecated_in((0, 11, 0)))
1941
def sample_deprecated_function():
1942
"""A deprecated function to test applyDeprecated with."""
1946
def sample_undeprecated_function(a_param):
1947
"""A undeprecated function to test applyDeprecated with."""
1950
class ApplyDeprecatedHelper(object):
1951
"""A helper class for ApplyDeprecated tests."""
1953
@deprecated_method(deprecated_in((0, 11, 0)))
1954
def sample_deprecated_method(self, param_one):
1955
"""A deprecated method for testing with."""
1958
def sample_normal_method(self):
1959
"""A undeprecated method."""
1961
@deprecated_method(deprecated_in((0, 10, 0)))
1962
def sample_nested_deprecation(self):
1963
return sample_deprecated_function()
1966
class TestExtraAssertions(tests.TestCase):
1967
"""Tests for new test assertions in breezy test suite"""
1969
def test_assert_isinstance(self):
1970
self.assertIsInstance(2, int)
1971
self.assertIsInstance(u'', str)
1972
e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1975
["None is an instance of <type 'NoneType'> rather than "
1977
"None is an instance of <class 'NoneType'> rather than "
1979
self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1980
e = self.assertRaises(AssertionError,
1981
self.assertIsInstance, None, int,
1985
"None is an instance of <class 'NoneType'> rather "
1986
"than <class 'int'>: it's just not")
1988
def test_assertEndsWith(self):
1989
self.assertEndsWith('foo', 'oo')
1990
self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1992
def test_assertEqualDiff(self):
1993
e = self.assertRaises(AssertionError,
1994
self.assertEqualDiff, '', '\n')
1995
self.assertEqual(str(e),
1996
# Don't blink ! The '+' applies to the second string
1997
'first string is missing a final newline.\n+ \n')
1998
e = self.assertRaises(AssertionError,
1999
self.assertEqualDiff, '\n', '')
2000
self.assertEqual(str(e),
2001
# Don't blink ! The '-' applies to the second string
2002
'second string is missing a final newline.\n- \n')
2005
class TestDeprecations(tests.TestCase):
2007
def test_applyDeprecated_not_deprecated(self):
2008
sample_object = ApplyDeprecatedHelper()
2009
# calling an undeprecated callable raises an assertion
2010
self.assertRaises(AssertionError, self.applyDeprecated,
2011
deprecated_in((0, 11, 0)),
2012
sample_object.sample_normal_method)
2013
self.assertRaises(AssertionError, self.applyDeprecated,
2014
deprecated_in((0, 11, 0)),
2015
sample_undeprecated_function, "a param value")
2016
# calling a deprecated callable (function or method) with the wrong
2017
# expected deprecation fails.
2018
self.assertRaises(AssertionError, self.applyDeprecated,
2019
deprecated_in((0, 10, 0)),
2020
sample_object.sample_deprecated_method,
2022
self.assertRaises(AssertionError, self.applyDeprecated,
2023
deprecated_in((0, 10, 0)),
2024
sample_deprecated_function)
2025
# calling a deprecated callable (function or method) with the right
2026
# expected deprecation returns the functions result.
2029
self.applyDeprecated(
2030
deprecated_in((0, 11, 0)),
2031
sample_object.sample_deprecated_method, "a param value"))
2032
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
2033
sample_deprecated_function))
2034
# calling a nested deprecation with the wrong deprecation version
2035
# fails even if a deeper nested function was deprecated with the
2038
AssertionError, self.applyDeprecated,
2039
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
2040
# calling a nested deprecation with the right deprecation value
2041
# returns the calls result.
2043
2, self.applyDeprecated(
2044
deprecated_in((0, 10, 0)),
2045
sample_object.sample_nested_deprecation))
2047
def test_callDeprecated(self):
2048
def testfunc(be_deprecated, result=None):
2049
if be_deprecated is True:
2050
symbol_versioning.warn('i am deprecated', DeprecationWarning,
2053
result = self.callDeprecated(['i am deprecated'], testfunc, True)
2054
self.assertIs(None, result)
2055
result = self.callDeprecated([], testfunc, False, 'result')
2056
self.assertEqual('result', result)
2057
self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
2058
self.callDeprecated([], testfunc, be_deprecated=False)
2061
class TestWarningTests(tests.TestCase):
2062
"""Tests for calling methods that raise warnings."""
2064
def test_callCatchWarnings(self):
2066
warnings.warn("this is your last warning")
2068
wlist, result = self.callCatchWarnings(meth, 1, 2)
2069
self.assertEqual(3, result)
2070
# would like just to compare them, but UserWarning doesn't implement
2073
self.assertIsInstance(w0, UserWarning)
2074
self.assertEqual("this is your last warning", str(w0))
2077
class TestConvenienceMakers(tests.TestCaseWithTransport):
2078
"""Test for the make_* convenience functions."""
2080
def test_make_branch_and_tree_with_format(self):
2081
# we should be able to supply a format to make_branch_and_tree
2082
self.make_branch_and_tree(
2083
'a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
2084
self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
2085
breezy.bzr.bzrdir.BzrDirMetaFormat1)
2087
def test_make_branch_and_memory_tree(self):
2088
# we should be able to get a new branch and a mutable tree from
2089
# TestCaseWithTransport
2090
tree = self.make_branch_and_memory_tree('a')
2091
self.assertIsInstance(tree, breezy.memorytree.MemoryTree)
2093
def test_make_tree_for_local_vfs_backed_transport(self):
2094
# make_branch_and_tree has to use local branch and repositories
2095
# when the vfs transport and local disk are colocated, even if
2096
# a different transport is in use for url generation.
2097
self.transport_server = test_server.FakeVFATServer
2098
self.assertFalse(self.get_url('t1').startswith('file://'))
2099
tree = self.make_branch_and_tree('t1')
2100
base = tree.controldir.root_transport.base
2101
self.assertStartsWith(base, 'file://')
2102
self.assertEqual(tree.controldir.root_transport,
2103
tree.branch.controldir.root_transport)
2104
self.assertEqual(tree.controldir.root_transport,
2105
tree.branch.repository.controldir.root_transport)
2108
class SelfTestHelper(object):
2110
def run_selftest(self, **kwargs):
2111
"""Run selftest returning its output."""
2113
output = TextIOWrapper(bio, 'utf-8')
2114
old_transport = breezy.tests.default_transport
2115
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2116
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2118
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
2120
breezy.tests.default_transport = old_transport
2121
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2128
class TestSelftest(tests.TestCase, SelfTestHelper):
2129
"""Tests of breezy.tests.selftest."""
2131
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(
2136
factory_called.append(True)
2137
return TestUtil.TestSuite()
2140
self.apply_redirected(out, err, None, breezy.tests.selftest,
2141
test_suite_factory=factory)
2142
self.assertEqual([True], factory_called)
2145
"""A test suite factory."""
2146
class Test(tests.TestCase):
2148
return __name__ + ".Test." + self._testMethodName
2158
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2160
def test_list_only(self):
2161
output = self.run_selftest(test_suite_factory=self.factory,
2163
self.assertEqual(3, len(output.readlines()))
2165
def test_list_only_filtered(self):
2166
output = self.run_selftest(test_suite_factory=self.factory,
2167
list_only=True, pattern="Test.b")
2168
self.assertEndsWith(output.getvalue(), b"Test.b\n")
2169
self.assertLength(1, output.readlines())
2171
def test_list_only_excludes(self):
2172
output = self.run_selftest(test_suite_factory=self.factory,
2173
list_only=True, exclude_pattern="Test.b")
2174
self.assertNotContainsRe(b"Test.b", output.getvalue())
2175
self.assertLength(2, output.readlines())
2177
def test_lsprof_tests(self):
2178
self.requireFeature(features.lsprof_feature)
2182
def __call__(test, result):
2185
def run(test, result):
2186
results.append(result)
2188
def countTestCases(self):
2190
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2191
self.assertLength(1, results)
2192
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2194
def test_random(self):
2195
# test randomising by listing a number of tests.
2196
output_123 = self.run_selftest(test_suite_factory=self.factory,
2197
list_only=True, random_seed="123")
2198
output_234 = self.run_selftest(test_suite_factory=self.factory,
2199
list_only=True, random_seed="234")
2200
self.assertNotEqual(output_123, output_234)
2201
# "Randominzing test order..\n\n
2202
self.assertLength(5, output_123.readlines())
2203
self.assertLength(5, output_234.readlines())
2205
def test_random_reuse_is_same_order(self):
2206
# test randomising by listing a number of tests.
2207
expected = self.run_selftest(test_suite_factory=self.factory,
2208
list_only=True, random_seed="123")
2209
repeated = self.run_selftest(test_suite_factory=self.factory,
2210
list_only=True, random_seed="123")
2211
self.assertEqual(expected.getvalue(), repeated.getvalue())
2213
def test_runner_class(self):
2214
self.requireFeature(features.subunit)
2215
from subunit import ProtocolTestCase
2216
stream = self.run_selftest(
2217
runner_class=tests.SubUnitBzrRunnerv1,
2218
test_suite_factory=self.factory)
2219
test = ProtocolTestCase(stream)
2220
result = unittest.TestResult()
2222
self.assertEqual(3, result.testsRun)
2224
def test_starting_with_single_argument(self):
2225
output = self.run_selftest(test_suite_factory=self.factory,
2227
'breezy.tests.test_selftest.Test.a'],
2229
self.assertEqual(b'breezy.tests.test_selftest.Test.a\n',
2232
def test_starting_with_multiple_argument(self):
2233
output = self.run_selftest(
2234
test_suite_factory=self.factory,
2235
starting_with=['breezy.tests.test_selftest.Test.a',
2236
'breezy.tests.test_selftest.Test.b'],
2238
self.assertEqual(b'breezy.tests.test_selftest.Test.a\n'
2239
b'breezy.tests.test_selftest.Test.b\n',
2242
def check_transport_set(self, transport_server):
2243
captured_transport = []
2245
def seen_transport(a_transport):
2246
captured_transport.append(a_transport)
2248
class Capture(tests.TestCase):
2250
seen_transport(breezy.tests.default_transport)
2253
return TestUtil.TestSuite([Capture("a")])
2254
self.run_selftest(transport=transport_server,
2255
test_suite_factory=factory)
2256
self.assertEqual(transport_server, captured_transport[0])
2258
def test_transport_sftp(self):
2259
self.requireFeature(features.paramiko)
2260
from breezy.tests import stub_sftp
2261
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2263
def test_transport_memory(self):
2264
self.check_transport_set(memory.MemoryServer)
2267
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2268
# Does IO: reads test.list
2270
def test_load_list(self):
2271
# Provide a list with one test - this test.
2272
test_id_line = b'%s\n' % self.id().encode('ascii')
2273
self.build_tree_contents([('test.list', test_id_line)])
2274
# And generate a list of the tests in the suite.
2275
stream = self.run_selftest(load_list='test.list', list_only=True)
2276
self.assertEqual(test_id_line, stream.getvalue())
2278
def test_load_unknown(self):
2279
# Provide a list with one test - this test.
2280
# And generate a list of the tests in the suite.
2281
self.assertRaises(errors.NoSuchFile, self.run_selftest,
2282
load_list='missing file name', list_only=True)
2285
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2287
_test_needs_features = [features.subunit]
2289
def run_subunit_stream(self, test_name):
2290
from subunit import ProtocolTestCase
2293
return TestUtil.TestSuite([_get_test(test_name)])
2294
stream = self.run_selftest(
2295
runner_class=tests.SubUnitBzrRunnerv1,
2296
test_suite_factory=factory)
2297
test = ProtocolTestCase(stream)
2298
result = testtools.TestResult()
2300
content = stream.getvalue()
2301
return content, result
2303
def test_fail_has_log(self):
2304
content, result = self.run_subunit_stream('test_fail')
2305
self.assertEqual(1, len(result.failures))
2306
self.assertContainsRe(content, b'(?m)^log$')
2307
self.assertContainsRe(content, b'this test will fail')
2309
def test_error_has_log(self):
2310
content, result = self.run_subunit_stream('test_error')
2311
self.assertContainsRe(content, b'(?m)^log$')
2312
self.assertContainsRe(content, b'this test errored')
2314
def test_skip_has_no_log(self):
2315
content, result = self.run_subunit_stream('test_skip')
2316
self.assertNotContainsRe(content, b'(?m)^log$')
2317
self.assertNotContainsRe(content, b'this test will be skipped')
2318
reasons = result.skip_reasons
2319
self.assertEqual({'reason'}, set(reasons))
2320
skips = reasons['reason']
2321
self.assertEqual(1, len(skips))
2323
# RemotedTestCase doesn't preserve the "details"
2324
# self.assertFalse('log' in test.getDetails())
2326
def test_missing_feature_has_no_log(self):
2327
content, result = self.run_subunit_stream('test_missing_feature')
2328
self.assertNotContainsRe(content, b'(?m)^log$')
2329
self.assertNotContainsRe(content, b'missing the feature')
2330
reasons = result.skip_reasons
2331
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2332
skips = reasons['_MissingFeature\n']
2333
self.assertEqual(1, len(skips))
2335
# RemotedTestCase doesn't preserve the "details"
2336
# self.assertFalse('log' in test.getDetails())
2338
def test_xfail_has_no_log(self):
2339
content, result = self.run_subunit_stream('test_xfail')
2340
self.assertNotContainsRe(content, b'(?m)^log$')
2341
self.assertNotContainsRe(content, b'test with expected failure')
2342
self.assertEqual(1, len(result.expectedFailures))
2343
result_content = result.expectedFailures[0][1]
2344
self.assertNotContainsRe(result_content,
2345
'(?m)^(?:Text attachment: )?log(?:$|: )')
2346
self.assertNotContainsRe(result_content, 'test with expected failure')
2348
def test_unexpected_success_has_log(self):
2349
content, result = self.run_subunit_stream('test_unexpected_success')
2350
self.assertContainsRe(content, b'(?m)^log$')
2351
self.assertContainsRe(content, b'test with unexpected success')
2352
self.assertEqual(1, len(result.unexpectedSuccesses))
2353
# test = result.unexpectedSuccesses[0]
2354
# RemotedTestCase doesn't preserve the "details"
2355
# self.assertTrue('log' in test.getDetails())
2357
def test_success_has_no_log(self):
2358
content, result = self.run_subunit_stream('test_success')
2359
self.assertEqual(1, result.testsRun)
2360
self.assertNotContainsRe(content, b'(?m)^log$')
2361
self.assertNotContainsRe(content, b'this test succeeds')
2364
class TestRunBzr(tests.TestCase):
2370
def _run_bzr_core(self, argv, encoding=None, stdin=None,
2371
stdout=None, stderr=None, working_dir=None):
2372
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2374
Attempts to run bzr from inside this class don't actually run it.
2376
We test how run_bzr actually invokes bzr in another location. Here we
2377
only need to test that it passes the right parameters to run_bzr.
2379
self.argv = list(argv)
2380
self.encoding = encoding
2382
self.working_dir = working_dir
2383
stdout.write(self.out)
2384
stderr.write(self.err)
2387
def test_run_bzr_error(self):
2388
self.out = "It sure does!\n"
2390
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2391
self.assertEqual(['rocks'], self.argv)
2392
self.assertEqual('It sure does!\n', out)
2393
self.assertEqual(out, self.out)
2394
self.assertEqual('', err)
2395
self.assertEqual(err, self.err)
2397
def test_run_bzr_error_regexes(self):
2399
self.err = "bzr: ERROR: foobarbaz is not versioned"
2401
out, err = self.run_bzr_error(
2402
["bzr: ERROR: foobarbaz is not versioned"],
2403
['file-id', 'foobarbaz'])
2405
def test_encoding(self):
2406
"""Test that run_bzr passes encoding to _run_bzr_core"""
2407
self.run_bzr('foo bar')
2408
self.assertEqual(osutils.get_user_encoding(), self.encoding)
2409
self.assertEqual(['foo', 'bar'], self.argv)
2411
self.run_bzr('foo bar', encoding='baz')
2412
self.assertEqual('baz', self.encoding)
2413
self.assertEqual(['foo', 'bar'], self.argv)
2415
def test_stdin(self):
2416
# test that the stdin keyword to run_bzr is passed through to
2417
# _run_bzr_core as-is. We do this by overriding
2418
# _run_bzr_core in this class, and then calling run_bzr,
2419
# which is a convenience function for _run_bzr_core, so
2421
self.run_bzr('foo bar', stdin='gam')
2422
self.assertEqual('gam', self.stdin)
2423
self.assertEqual(['foo', 'bar'], self.argv)
2425
self.run_bzr('foo bar', stdin='zippy')
2426
self.assertEqual('zippy', self.stdin)
2427
self.assertEqual(['foo', 'bar'], self.argv)
2429
def test_working_dir(self):
2430
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2431
self.run_bzr('foo bar')
2432
self.assertEqual(None, self.working_dir)
2433
self.assertEqual(['foo', 'bar'], self.argv)
2435
self.run_bzr('foo bar', working_dir='baz')
2436
self.assertEqual('baz', self.working_dir)
2437
self.assertEqual(['foo', 'bar'], self.argv)
2439
def test_reject_extra_keyword_arguments(self):
2440
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2441
error_regex=['error message'])
2444
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2445
# Does IO when testing the working_dir parameter.
2447
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2448
a_callable=None, *args, **kwargs):
2450
self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None)
2451
self.factory = breezy.ui.ui_factory
2452
self.working_dir = osutils.getcwd()
2453
stdout.write('foo\n')
2454
stderr.write('bar\n')
2457
def test_stdin(self):
2458
# test that the stdin keyword to _run_bzr_core is passed through to
2459
# apply_redirected as a StringIO. We do this by overriding
2460
# apply_redirected in this class, and then calling _run_bzr_core,
2461
# which calls apply_redirected.
2462
self.run_bzr(['foo', 'bar'], stdin='gam')
2463
self.assertEqual('gam', self.stdin.read())
2464
self.assertTrue(self.stdin is self.factory_stdin)
2465
self.run_bzr(['foo', 'bar'], stdin='zippy')
2466
self.assertEqual('zippy', self.stdin.read())
2467
self.assertTrue(self.stdin is self.factory_stdin)
2469
def test_ui_factory(self):
2470
# each invocation of self.run_bzr should get its
2471
# own UI factory, which is an instance of TestUIFactory,
2472
# with stdin, stdout and stderr attached to the stdin,
2473
# stdout and stderr of the invoked run_bzr
2474
current_factory = breezy.ui.ui_factory
2475
self.run_bzr(['foo'])
2476
self.assertFalse(current_factory is self.factory)
2477
self.assertNotEqual(sys.stdout, self.factory.stdout)
2478
self.assertNotEqual(sys.stderr, self.factory.stderr)
2479
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2480
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2481
self.assertIsInstance(self.factory, tests.TestUIFactory)
2483
def test_working_dir(self):
2484
self.build_tree(['one/', 'two/'])
2485
cwd = osutils.getcwd()
2487
# Default is to work in the current directory
2488
self.run_bzr(['foo', 'bar'])
2489
self.assertEqual(cwd, self.working_dir)
2491
self.run_bzr(['foo', 'bar'], working_dir=None)
2492
self.assertEqual(cwd, self.working_dir)
2494
# The function should be run in the alternative directory
2495
# but afterwards the current working dir shouldn't be changed
2496
self.run_bzr(['foo', 'bar'], working_dir='one')
2497
self.assertNotEqual(cwd, self.working_dir)
2498
self.assertEndsWith(self.working_dir, 'one')
2499
self.assertEqual(cwd, osutils.getcwd())
2501
self.run_bzr(['foo', 'bar'], working_dir='two')
2502
self.assertNotEqual(cwd, self.working_dir)
2503
self.assertEndsWith(self.working_dir, 'two')
2504
self.assertEqual(cwd, osutils.getcwd())
2507
class StubProcess(object):
2508
"""A stub process for testing run_bzr_subprocess."""
2510
def __init__(self, out="", err="", retcode=0):
2513
self.returncode = retcode
2515
def communicate(self):
2516
return self.out, self.err
2519
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2520
"""Base class for tests testing how we might run bzr."""
2523
super(TestWithFakedStartBzrSubprocess, self).setUp()
2524
self.subprocess_calls = []
2526
def start_bzr_subprocess(self, process_args, env_changes=None,
2527
skip_if_plan_to_signal=False,
2529
allow_plugins=False):
2530
"""capture what run_bzr_subprocess tries to do."""
2531
self.subprocess_calls.append(
2532
{'process_args': process_args,
2533
'env_changes': env_changes,
2534
'skip_if_plan_to_signal': skip_if_plan_to_signal,
2535
'working_dir': working_dir, 'allow_plugins': allow_plugins})
2536
return self.next_subprocess
2539
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2541
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2542
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2544
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2545
that will return static results. This assertion method populates those
2546
results and also checks the arguments run_bzr_subprocess generates.
2548
self.next_subprocess = process
2550
result = self.run_bzr_subprocess(*args, **kwargs)
2551
except BaseException:
2552
self.next_subprocess = None
2553
for key, expected in expected_args.items():
2554
self.assertEqual(expected, self.subprocess_calls[-1][key])
2557
self.next_subprocess = None
2558
for key, expected in expected_args.items():
2559
self.assertEqual(expected, self.subprocess_calls[-1][key])
2562
def test_run_bzr_subprocess(self):
2563
"""The run_bzr_helper_external command behaves nicely."""
2564
self.assertRunBzrSubprocess({'process_args': ['--version']},
2565
StubProcess(), '--version')
2566
self.assertRunBzrSubprocess({'process_args': ['--version']},
2567
StubProcess(), ['--version'])
2568
# retcode=None disables retcode checking
2569
result = self.assertRunBzrSubprocess(
2570
{}, StubProcess(retcode=3), '--version', retcode=None)
2571
result = self.assertRunBzrSubprocess(
2572
{}, StubProcess(out="is free software"), '--version')
2573
self.assertContainsRe(result[0], 'is free software')
2574
# Running a subcommand that is missing errors
2575
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2576
{'process_args': ['--versionn']
2577
}, StubProcess(retcode=3),
2579
# Unless it is told to expect the error from the subprocess
2580
result = self.assertRunBzrSubprocess(
2581
{}, StubProcess(retcode=3), '--versionn', retcode=3)
2582
# Or to ignore retcode checking
2583
result = self.assertRunBzrSubprocess(
2584
{}, StubProcess(err="unknown command", retcode=3),
2585
'--versionn', retcode=None)
2586
self.assertContainsRe(result[1], 'unknown command')
2588
def test_env_change_passes_through(self):
2589
self.assertRunBzrSubprocess(
2590
{'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}},
2592
env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None})
2594
def test_no_working_dir_passed_as_None(self):
2595
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2597
def test_no_working_dir_passed_through(self):
2598
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2601
def test_run_bzr_subprocess_no_plugins(self):
2602
self.assertRunBzrSubprocess({'allow_plugins': False},
2605
def test_allow_plugins(self):
2606
self.assertRunBzrSubprocess({'allow_plugins': True},
2607
StubProcess(), '', allow_plugins=True)
2610
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2612
def test_finish_bzr_subprocess_with_error(self):
2613
"""finish_bzr_subprocess allows specification of the desired exit code.
2615
process = StubProcess(err="unknown command", retcode=3)
2616
result = self.finish_bzr_subprocess(process, retcode=3)
2617
self.assertEqual('', result[0])
2618
self.assertContainsRe(result[1], 'unknown command')
2620
def test_finish_bzr_subprocess_ignoring_retcode(self):
2621
"""finish_bzr_subprocess allows the exit code to be ignored."""
2622
process = StubProcess(err="unknown command", retcode=3)
2623
result = self.finish_bzr_subprocess(process, retcode=None)
2624
self.assertEqual('', result[0])
2625
self.assertContainsRe(result[1], 'unknown command')
2627
def test_finish_subprocess_with_unexpected_retcode(self):
2628
"""finish_bzr_subprocess raises self.failureException if the retcode is
2629
not the expected one.
2631
process = StubProcess(err="unknown command", retcode=3)
2632
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2636
class _DontSpawnProcess(Exception):
2637
"""A simple exception which just allows us to skip unnecessary steps"""
2640
class TestStartBzrSubProcess(tests.TestCase):
2641
"""Stub test start_bzr_subprocess."""
2643
def _subprocess_log_cleanup(self):
2644
"""Inhibits the base version as we don't produce a log file."""
2646
def _popen(self, *args, **kwargs):
2647
"""Override the base version to record the command that is run.
2649
From there we can ensure it is correct without spawning a real process.
2651
self.check_popen_state()
2652
self._popen_args = args
2653
self._popen_kwargs = kwargs
2654
raise _DontSpawnProcess()
2656
def check_popen_state(self):
2657
"""Replace to make assertions when popen is called."""
2659
def test_run_bzr_subprocess_no_plugins(self):
2660
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2661
command = self._popen_args[0]
2662
self.assertEqual(sys.executable, command[0])
2663
self.assertEqual(self.get_brz_path(), command[1])
2664
self.assertEqual(['--no-plugins'], command[2:])
2666
def test_allow_plugins(self):
2667
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2669
command = self._popen_args[0]
2670
self.assertEqual([], command[2:])
2672
def test_set_env(self):
2673
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2676
def check_environment():
2677
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2678
self.check_popen_state = check_environment
2679
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2680
env_changes={'EXISTANT_ENV_VAR': 'set variable'})
2681
# not set in theparent
2682
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2684
def test_run_bzr_subprocess_env_del(self):
2685
"""run_bzr_subprocess can remove environment variables too."""
2686
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2688
def check_environment():
2689
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2690
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2691
self.check_popen_state = check_environment
2692
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2693
env_changes={'EXISTANT_ENV_VAR': None})
2694
# Still set in parent
2695
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2696
del os.environ['EXISTANT_ENV_VAR']
2698
def test_env_del_missing(self):
2699
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2701
def check_environment():
2702
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2703
self.check_popen_state = check_environment
2704
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2705
env_changes={'NON_EXISTANT_ENV_VAR': None})
2707
def test_working_dir(self):
2708
"""Test that we can specify the working dir for the child"""
2713
self.overrideAttr(os, 'chdir', chdir)
2717
self.overrideAttr(osutils, 'getcwd', getcwd)
2718
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2720
self.assertEqual(['foo', 'current'], chdirs)
2722
def test_get_brz_path_with_cwd_breezy(self):
2723
self.get_source_path = lambda: ""
2724
self.overrideAttr(os.path, "isfile", lambda path: True)
2725
self.assertEqual(self.get_brz_path(), "brz")
2728
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2729
"""Tests that really need to do things with an external bzr."""
2731
def test_start_and_stop_bzr_subprocess_send_signal(self):
2732
"""finish_bzr_subprocess raises self.failureException if the retcode is
2733
not the expected one.
2735
self.disable_missing_extensions_warning()
2736
process = self.start_bzr_subprocess(['wait-until-signalled'],
2737
skip_if_plan_to_signal=True)
2738
self.assertEqual(b'running\n', process.stdout.readline())
2739
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2741
self.assertEqual(b'', result[0])
2742
self.assertEqual(b'brz: interrupted\n', result[1])
2745
class TestSelftestFiltering(tests.TestCase):
2748
super(TestSelftestFiltering, self).setUp()
2749
self.suite = TestUtil.TestSuite()
2750
self.loader = TestUtil.TestLoader()
2751
self.suite.addTest(self.loader.loadTestsFromModule(
2752
sys.modules['breezy.tests.test_selftest']))
2753
self.all_names = _test_ids(self.suite)
2755
def test_condition_id_re(self):
2756
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2757
'test_condition_id_re')
2758
filtered_suite = tests.filter_suite_by_condition(
2759
self.suite, tests.condition_id_re('test_condition_id_re'))
2760
self.assertEqual([test_name], _test_ids(filtered_suite))
2762
def test_condition_id_in_list(self):
2763
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2764
'test_condition_id_in_list']
2765
id_list = tests.TestIdList(test_names)
2766
filtered_suite = tests.filter_suite_by_condition(
2767
self.suite, tests.condition_id_in_list(id_list))
2768
my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
2769
re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
2770
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2772
def test_condition_id_startswith(self):
2773
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2774
start1 = klass + 'test_condition_id_starts'
2775
start2 = klass + 'test_condition_id_in'
2776
test_names = [klass + 'test_condition_id_in_list',
2777
klass + 'test_condition_id_startswith',
2779
filtered_suite = tests.filter_suite_by_condition(
2780
self.suite, tests.condition_id_startswith([start1, start2]))
2781
self.assertEqual(test_names, _test_ids(filtered_suite))
2783
def test_condition_isinstance(self):
2784
filtered_suite = tests.filter_suite_by_condition(
2785
self.suite, tests.condition_isinstance(self.__class__))
2786
class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2787
re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
2788
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2790
def test_exclude_tests_by_condition(self):
2791
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2792
'test_exclude_tests_by_condition')
2793
filtered_suite = tests.exclude_tests_by_condition(
2794
self.suite, lambda x: x.id() == excluded_name)
2795
self.assertEqual(len(self.all_names) - 1,
2796
filtered_suite.countTestCases())
2797
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2798
remaining_names = list(self.all_names)
2799
remaining_names.remove(excluded_name)
2800
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2802
def test_exclude_tests_by_re(self):
2803
self.all_names = _test_ids(self.suite)
2804
filtered_suite = tests.exclude_tests_by_re(self.suite,
2805
'exclude_tests_by_re')
2806
excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2807
'test_exclude_tests_by_re')
2808
self.assertEqual(len(self.all_names) - 1,
2809
filtered_suite.countTestCases())
2810
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2811
remaining_names = list(self.all_names)
2812
remaining_names.remove(excluded_name)
2813
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2815
def test_filter_suite_by_condition(self):
2816
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2817
'test_filter_suite_by_condition')
2818
filtered_suite = tests.filter_suite_by_condition(
2819
self.suite, lambda x: x.id() == test_name)
2820
self.assertEqual([test_name], _test_ids(filtered_suite))
2822
def test_filter_suite_by_re(self):
2823
filtered_suite = tests.filter_suite_by_re(self.suite,
2824
'test_filter_suite_by_r')
2825
filtered_names = _test_ids(filtered_suite)
2827
filtered_names, ['breezy.tests.test_selftest.'
2828
'TestSelftestFiltering.test_filter_suite_by_re'])
2830
def test_filter_suite_by_id_list(self):
2831
test_list = ['breezy.tests.test_selftest.'
2832
'TestSelftestFiltering.test_filter_suite_by_id_list']
2833
filtered_suite = tests.filter_suite_by_id_list(
2834
self.suite, tests.TestIdList(test_list))
2835
filtered_names = _test_ids(filtered_suite)
2838
['breezy.tests.test_selftest.'
2839
'TestSelftestFiltering.test_filter_suite_by_id_list'])
2841
def test_filter_suite_by_id_startswith(self):
2842
# By design this test may fail if another test is added whose name also
2843
# begins with one of the start value used.
2844
klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2845
start1 = klass + 'test_filter_suite_by_id_starts'
2846
start2 = klass + 'test_filter_suite_by_id_li'
2847
test_list = [klass + 'test_filter_suite_by_id_list',
2848
klass + 'test_filter_suite_by_id_startswith',
2850
filtered_suite = tests.filter_suite_by_id_startswith(
2851
self.suite, [start1, start2])
2854
_test_ids(filtered_suite),
2857
def test_preserve_input(self):
2858
# NB: Surely this is something in the stdlib to do this?
2859
self.assertIs(self.suite, tests.preserve_input(self.suite))
2860
self.assertEqual("@#$", tests.preserve_input("@#$"))
2862
def test_randomize_suite(self):
2863
randomized_suite = tests.randomize_suite(self.suite)
2864
# randomizing should not add or remove test names.
2865
self.assertEqual(set(_test_ids(self.suite)),
2866
set(_test_ids(randomized_suite)))
2867
# Technically, this *can* fail, because random.shuffle(list) can be
2868
# equal to list. Trying multiple times just pushes the frequency back.
2869
# As its len(self.all_names)!:1, the failure frequency should be low
2870
# enough to ignore. RBC 20071021.
2871
# It should change the order.
2872
self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
2873
# But not the length. (Possibly redundant with the set test, but not
2875
self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
2877
def test_split_suit_by_condition(self):
2878
self.all_names = _test_ids(self.suite)
2879
condition = tests.condition_id_re('test_filter_suite_by_r')
2880
split_suite = tests.split_suite_by_condition(self.suite, condition)
2881
filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2882
'test_filter_suite_by_re')
2883
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2884
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2885
remaining_names = list(self.all_names)
2886
remaining_names.remove(filtered_name)
2887
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2889
def test_split_suit_by_re(self):
2890
self.all_names = _test_ids(self.suite)
2891
split_suite = tests.split_suite_by_re(self.suite,
2892
'test_filter_suite_by_r')
2893
filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2894
'test_filter_suite_by_re')
2895
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2896
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2897
remaining_names = list(self.all_names)
2898
remaining_names.remove(filtered_name)
2899
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2902
class TestCheckTreeShape(tests.TestCaseWithTransport):
2904
def test_check_tree_shape(self):
2905
files = ['a', 'b/', 'b/c']
2906
tree = self.make_branch_and_tree('.')
2907
self.build_tree(files)
2911
self.check_tree_shape(tree, files)
2916
class TestBlackboxSupport(tests.TestCase):
2917
"""Tests for testsuite blackbox features."""
2919
def test_run_bzr_failure_not_caught(self):
2920
# When we run bzr in blackbox mode, we want any unexpected errors to
2921
# propagate up to the test suite so that it can show the error in the
2922
# usual way, and we won't get a double traceback.
2923
e = self.assertRaises(
2925
self.run_bzr, ['assert-fail'])
2926
# make sure we got the real thing, not an error from somewhere else in
2927
# the test framework
2928
self.assertEqual('always fails', str(e))
2929
# check that there's no traceback in the test log
2930
self.assertNotContainsRe(self.get_log(), r'Traceback')
2932
def test_run_bzr_user_error_caught(self):
2933
# Running bzr in blackbox mode, normal/expected/user errors should be
2934
# caught in the regular way and turned into an error message plus exit
2936
transport_server = memory.MemoryServer()
2937
transport_server.start_server()
2938
self.addCleanup(transport_server.stop_server)
2939
url = transport_server.get_url()
2940
self.permit_url(url)
2941
out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2942
self.assertEqual(out, '')
2943
self.assertContainsRe(
2944
err, 'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
2947
class TestTestLoader(tests.TestCase):
2948
"""Tests for the test loader."""
2950
def _get_loader_and_module(self):
2951
"""Gets a TestLoader and a module with one test in it."""
2952
loader = TestUtil.TestLoader()
2955
class Stub(tests.TestCase):
2959
class MyModule(object):
2961
MyModule.a_class = Stub
2963
module.__name__ = 'fake_module'
2964
return loader, module
2966
def test_module_no_load_tests_attribute_loads_classes(self):
2967
loader, module = self._get_loader_and_module()
2968
self.assertEqual(1, loader.loadTestsFromModule(
2969
module).countTestCases())
2971
def test_module_load_tests_attribute_gets_called(self):
2972
loader, module = self._get_loader_and_module()
2974
def load_tests(loader, standard_tests, pattern):
2975
result = loader.suiteClass()
2976
for test in tests.iter_suite_tests(standard_tests):
2977
result.addTests([test, test])
2979
# add a load_tests() method which multiplies the tests from the module.
2980
module.__class__.load_tests = staticmethod(load_tests)
2982
2 * [str(module.a_class('test_foo'))],
2983
list(map(str, loader.loadTestsFromModule(module))))
2985
def test_load_tests_from_module_name_smoke_test(self):
2986
loader = TestUtil.TestLoader()
2987
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2988
self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
2991
def test_load_tests_from_module_name_with_bogus_module_name(self):
2992
loader = TestUtil.TestLoader()
2993
self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
2996
class TestTestIdList(tests.TestCase):
2998
def _create_id_list(self, test_list):
2999
return tests.TestIdList(test_list)
3001
def _create_suite(self, test_id_list):
3003
class Stub(tests.TestCase):
3007
def _create_test_id(id):
3010
suite = TestUtil.TestSuite()
3011
for id in test_id_list:
3012
t = Stub('test_foo')
3013
t.id = _create_test_id(id)
3017
def _test_ids(self, test_suite):
3018
"""Get the ids for the tests in a test suite."""
3019
return [t.id() for t in tests.iter_suite_tests(test_suite)]
3021
def test_empty_list(self):
3022
id_list = self._create_id_list([])
3023
self.assertEqual({}, id_list.tests)
3024
self.assertEqual({}, id_list.modules)
3026
def test_valid_list(self):
3027
id_list = self._create_id_list(
3028
['mod1.cl1.meth1', 'mod1.cl1.meth2',
3029
'mod1.func1', 'mod1.cl2.meth2',
3031
'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
3033
self.assertTrue(id_list.refers_to('mod1'))
3034
self.assertTrue(id_list.refers_to('mod1.submod1'))
3035
self.assertTrue(id_list.refers_to('mod1.submod2'))
3036
self.assertTrue(id_list.includes('mod1.cl1.meth1'))
3037
self.assertTrue(id_list.includes('mod1.submod1'))
3038
self.assertTrue(id_list.includes('mod1.func1'))
3040
def test_bad_chars_in_params(self):
3041
id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
3042
self.assertTrue(id_list.refers_to('mod1'))
3043
self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
3045
def test_module_used(self):
3046
id_list = self._create_id_list(['mod.class.meth'])
3047
self.assertTrue(id_list.refers_to('mod'))
3048
self.assertTrue(id_list.refers_to('mod.class'))
3049
self.assertTrue(id_list.refers_to('mod.class.meth'))
3051
def test_test_suite_matches_id_list_with_unknown(self):
3052
loader = TestUtil.TestLoader()
3053
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3054
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',
3056
not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
3057
self.assertEqual(['bogus'], not_found)
3058
self.assertEqual([], duplicates)
3060
def test_suite_matches_id_list_with_duplicates(self):
3061
loader = TestUtil.TestLoader()
3062
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3063
dupes = loader.suiteClass()
3064
for test in tests.iter_suite_tests(suite):
3066
dupes.addTest(test) # Add it again
3068
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing', ]
3069
not_found, duplicates = tests.suite_matches_id_list(
3071
self.assertEqual([], not_found)
3072
self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
3076
class TestTestSuite(tests.TestCase):
3078
def test__test_suite_testmod_names(self):
3079
# Test that a plausible list of test module names are returned
3080
# by _test_suite_testmod_names.
3081
test_list = tests._test_suite_testmod_names()
3083
'breezy.tests.blackbox',
3084
'breezy.tests.per_transport',
3085
'breezy.tests.test_selftest',
3089
def test__test_suite_modules_to_doctest(self):
3090
# Test that a plausible list of modules to doctest is returned
3091
# by _test_suite_modules_to_doctest.
3092
test_list = tests._test_suite_modules_to_doctest()
3094
# When docstrings are stripped, there are no modules to doctest
3095
self.assertEqual([], test_list)
3102
def test_test_suite(self):
3103
# test_suite() loads the entire test suite to operate. To avoid this
3104
# overhead, and yet still be confident that things are happening,
3105
# we temporarily replace two functions used by test_suite with
3106
# test doubles that supply a few sample tests to load, and check they
3110
def testmod_names():
3111
calls.append("testmod_names")
3113
'breezy.tests.blackbox.test_branch',
3114
'breezy.tests.per_transport',
3115
'breezy.tests.test_selftest',
3117
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3120
calls.append("modules_to_doctest")
3123
return ['breezy.timestamp']
3124
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3125
expected_test_list = [
3127
'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
3128
('breezy.tests.per_transport.TransportTests'
3129
'.test_abspath(LocalTransport,LocalURLServer)'),
3130
'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
3131
# plugins can't be tested that way since selftest may be run with
3134
suite = tests.test_suite()
3135
self.assertEqual({"testmod_names", "modules_to_doctest"}, set(calls))
3136
self.assertSubset(expected_test_list, _test_ids(suite))
3138
def test_test_suite_list_and_start(self):
3139
# We cannot test this at the same time as the main load, because we
3140
# want to know that starting_with == None works. So a second load is
3141
# incurred - note that the starting_with parameter causes a partial
3142
# load rather than a full load so this test should be pretty quick.
3144
'breezy.tests.test_selftest.TestTestSuite.test_test_suite']
3145
suite = tests.test_suite(test_list,
3146
['breezy.tests.test_selftest.TestTestSuite'])
3147
# test_test_suite_list_and_start is not included
3148
self.assertEqual(test_list, _test_ids(suite))
3151
class TestLoadTestIdList(tests.TestCaseInTempDir):
3153
def _create_test_list_file(self, file_name, content):
3154
fl = open(file_name, 'wt')
3158
def test_load_unknown(self):
3159
self.assertRaises(errors.NoSuchFile,
3160
tests.load_test_id_list, 'i_do_not_exist')
3162
def test_load_test_list(self):
3163
test_list_fname = 'test.list'
3164
self._create_test_list_file(test_list_fname,
3165
'mod1.cl1.meth1\nmod2.cl2.meth2\n')
3166
tlist = tests.load_test_id_list(test_list_fname)
3167
self.assertEqual(2, len(tlist))
3168
self.assertEqual('mod1.cl1.meth1', tlist[0])
3169
self.assertEqual('mod2.cl2.meth2', tlist[1])
3171
def test_load_dirty_file(self):
3172
test_list_fname = 'test.list'
3173
self._create_test_list_file(test_list_fname,
3174
' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n'
3176
tlist = tests.load_test_id_list(test_list_fname)
3177
self.assertEqual(4, len(tlist))
3178
self.assertEqual('mod1.cl1.meth1', tlist[0])
3179
self.assertEqual('', tlist[1])
3180
self.assertEqual('mod2.cl2.meth2', tlist[2])
3181
self.assertEqual('bar baz', tlist[3])
3184
class TestFilteredByModuleTestLoader(tests.TestCase):
3186
def _create_loader(self, test_list):
3187
id_filter = tests.TestIdList(test_list)
3188
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3191
def test_load_tests(self):
3192
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3193
loader = self._create_loader(test_list)
3194
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3195
self.assertEqual(test_list, _test_ids(suite))
3197
def test_exclude_tests(self):
3198
test_list = ['bogus']
3199
loader = self._create_loader(test_list)
3200
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3201
self.assertEqual([], _test_ids(suite))
3204
class TestFilteredByNameStartTestLoader(tests.TestCase):
3206
def _create_loader(self, name_start):
3207
def needs_module(name):
3208
return name.startswith(name_start) or name_start.startswith(name)
3209
loader = TestUtil.FilteredByModuleTestLoader(needs_module)
3212
def test_load_tests(self):
3213
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3214
loader = self._create_loader('breezy.tests.test_samp')
3216
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3217
self.assertEqual(test_list, _test_ids(suite))
3219
def test_load_tests_inside_module(self):
3220
test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3221
loader = self._create_loader('breezy.tests.test_sampler.Demo')
3223
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3224
self.assertEqual(test_list, _test_ids(suite))
3226
def test_exclude_tests(self):
3227
loader = self._create_loader('bogus')
3229
suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3230
self.assertEqual([], _test_ids(suite))
3233
class TestTestPrefixRegistry(tests.TestCase):
3235
def _get_registry(self):
3236
tp_registry = tests.TestPrefixAliasRegistry()
3239
def test_register_new_prefix(self):
3240
tpr = self._get_registry()
3241
tpr.register('foo', 'fff.ooo.ooo')
3242
self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
3244
def test_register_existing_prefix(self):
3245
tpr = self._get_registry()
3246
tpr.register('bar', 'bbb.aaa.rrr')
3247
tpr.register('bar', 'bBB.aAA.rRR')
3248
self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
3249
self.assertThat(self.get_log(),
3250
DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3253
def test_get_unknown_prefix(self):
3254
tpr = self._get_registry()
3255
self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
3257
def test_resolve_prefix(self):
3258
tpr = self._get_registry()
3259
tpr.register('bar', 'bb.aa.rr')
3260
self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
3262
def test_resolve_unknown_alias(self):
3263
tpr = self._get_registry()
3264
self.assertRaises(errors.CommandError,
3265
tpr.resolve_alias, 'I am not a prefix')
3267
def test_predefined_prefixes(self):
3268
tpr = tests.test_prefix_alias_registry
3269
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3270
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3271
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3272
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3273
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3274
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3277
class TestThreadLeakDetection(tests.TestCase):
3278
"""Ensure when tests leak threads we detect and report it"""
3280
class LeakRecordingResult(tests.ExtendedTestResult):
3282
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3285
def _report_thread_leak(self, test, leaks, alive):
3286
self.leaks.append((test, leaks))
3288
def test_testcase_without_addCleanups(self):
3289
"""Check old TestCase instances don't break with leak detection"""
3290
class Test(unittest.TestCase):
3293
result = self.LeakRecordingResult()
3295
result.startTestRun()
3297
result.stopTestRun()
3298
self.assertEqual(result._tests_leaking_threads_count, 0)
3299
self.assertEqual(result.leaks, [])
3301
def test_thread_leak(self):
3302
"""Ensure a thread that outlives the running of a test is reported
3304
Uses a thread that blocks on an event, and is started by the inner
3305
test case. As the thread outlives the inner case's run, it should be
3306
detected as a leak, but the event is then set so that the thread can
3307
be safely joined in cleanup so it's not leaked for real.
3309
event = threading.Event()
3310
thread = threading.Thread(name="Leaker", target=event.wait)
3312
class Test(tests.TestCase):
3313
def test_leak(self):
3315
result = self.LeakRecordingResult()
3316
test = Test("test_leak")
3317
self.addCleanup(thread.join)
3318
self.addCleanup(event.set)
3319
result.startTestRun()
3321
result.stopTestRun()
3322
self.assertEqual(result._tests_leaking_threads_count, 1)
3323
self.assertEqual(result._first_thread_leaker_id, test.id())
3324
self.assertEqual(result.leaks, [(test, {thread})])
3325
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3327
def test_multiple_leaks(self):
3328
"""Check multiple leaks are blamed on the test cases at fault
3330
Same concept as the previous test, but has one inner test method that
3331
leaks two threads, and one that doesn't leak at all.
3333
event = threading.Event()
3334
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3335
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3336
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3338
class Test(tests.TestCase):
3339
def test_first_leak(self):
3342
def test_second_no_leak(self):
3345
def test_third_leak(self):
3348
result = self.LeakRecordingResult()
3349
first_test = Test("test_first_leak")
3350
third_test = Test("test_third_leak")
3351
self.addCleanup(thread_a.join)
3352
self.addCleanup(thread_b.join)
3353
self.addCleanup(thread_c.join)
3354
self.addCleanup(event.set)
3355
result.startTestRun()
3357
[first_test, Test("test_second_no_leak"), third_test]
3359
result.stopTestRun()
3360
self.assertEqual(result._tests_leaking_threads_count, 2)
3361
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3362
self.assertEqual(result.leaks, [
3363
(first_test, {thread_b}),
3364
(third_test, {thread_a, thread_c})])
3365
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3368
class TestPostMortemDebugging(tests.TestCase):
3369
"""Check post mortem debugging works when tests fail or error"""
3371
class TracebackRecordingResult(tests.ExtendedTestResult):
3373
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3374
self.postcode = None
3376
def _post_mortem(self, tb=None):
3377
"""Record the code object at the end of the current traceback"""
3378
tb = tb or sys.exc_info()[2]
3381
while next is not None:
3384
self.postcode = tb.tb_frame.f_code
3386
def report_error(self, test, err):
3389
def report_failure(self, test, err):
3392
def test_location_unittest_error(self):
3393
"""Needs right post mortem traceback with erroring unittest case"""
3394
class Test(unittest.TestCase):
3397
result = self.TracebackRecordingResult()
3399
self.assertEqual(result.postcode, Test.runTest.__code__)
3401
def test_location_unittest_failure(self):
3402
"""Needs right post mortem traceback with failing unittest case"""
3403
class Test(unittest.TestCase):
3405
raise self.failureException
3406
result = self.TracebackRecordingResult()
3408
self.assertEqual(result.postcode, Test.runTest.__code__)
3410
def test_location_bt_error(self):
3411
"""Needs right post mortem traceback with erroring breezy.tests case"""
3412
class Test(tests.TestCase):
3413
def test_error(self):
3415
result = self.TracebackRecordingResult()
3416
Test("test_error").run(result)
3417
self.assertEqual(result.postcode, Test.test_error.__code__)
3419
def test_location_bt_failure(self):
3420
"""Needs right post mortem traceback with failing breezy.tests case"""
3421
class Test(tests.TestCase):
3422
def test_failure(self):
3423
raise self.failureException
3424
result = self.TracebackRecordingResult()
3425
Test("test_failure").run(result)
3426
self.assertEqual(result.postcode, Test.test_failure.__code__)
3428
def test_env_var_triggers_post_mortem(self):
3429
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3431
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3432
post_mortem_calls = []
3433
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3434
self.overrideEnv('BRZ_TEST_PDB', None)
3435
result._post_mortem(1)
3436
self.overrideEnv('BRZ_TEST_PDB', 'on')
3437
result._post_mortem(2)
3438
self.assertEqual([2], post_mortem_calls)
3441
class TestRunSuite(tests.TestCase):
3443
def test_runner_class(self):
3444
"""run_suite accepts and uses a runner_class keyword argument."""
3445
class Stub(tests.TestCase):
3448
suite = Stub("test_foo")
3451
class MyRunner(tests.TextTestRunner):
3452
def run(self, test):
3454
return tests.ExtendedTestResult(self.stream, self.descriptions,
3456
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3457
self.assertLength(1, calls)
3460
class _Selftest(object):
3461
"""Mixin for tests needing full selftest output"""
3463
def _inject_stream_into_subunit(self, stream):
3464
"""To be overridden by subclasses that run tests out of process"""
3466
def _run_selftest(self, **kwargs):
3468
sio = TextIOWrapper(bio, 'utf-8')
3469
self._inject_stream_into_subunit(bio)
3470
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3472
return bio.getvalue()
3475
class _ForkedSelftest(_Selftest):
3476
"""Mixin for tests needing full selftest output with forked children"""
3478
_test_needs_features = [features.subunit]
3480
def _inject_stream_into_subunit(self, stream):
3481
"""Monkey-patch subunit so the extra output goes to stream not stdout
3483
Some APIs need rewriting so this kind of bogus hackery can be replaced
3484
by passing the stream param from run_tests down into ProtocolTestCase.
3486
from subunit import ProtocolTestCase
3487
_original_init = ProtocolTestCase.__init__
3489
def _init_with_passthrough(self, *args, **kwargs):
3490
_original_init(self, *args, **kwargs)
3491
self._passthrough = stream
3492
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3494
def _run_selftest(self, **kwargs):
3495
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3496
if getattr(os, "fork", None) is None:
3497
raise tests.TestNotApplicable("Platform doesn't support forking")
3498
# Make sure the fork code is actually invoked by claiming two cores
3499
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3500
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3501
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3504
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3505
"""Check operation of --parallel=fork selftest option"""
3507
def test_error_in_child_during_fork(self):
3508
"""Error in a forked child during test setup should get reported"""
3509
class Test(tests.TestCase):
3510
def testMethod(self):
3512
# We don't care what, just break something that a child will run
3513
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3514
out = self._run_selftest(test_suite_factory=Test)
3515
# Lines from the tracebacks of the two child processes may be mixed
3516
# together due to the way subunit parses and forwards the streams,
3517
# so permit extra lines between each part of the error output.
3518
self.assertContainsRe(out,
3521
b".+ in fork_for_tests\n"
3523
b"\\s*workaround_zealous_crypto_random\\(\\)\n"
3528
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3529
"""Check a test case still alive after being run emits a warning"""
3531
class Test(tests.TestCase):
3532
def test_pass(self):
3535
def test_self_ref(self):
3536
self.also_self = self.test_self_ref
3538
def test_skip(self):
3539
self.skipTest("Don't need")
3541
def _get_suite(self):
3542
return TestUtil.TestSuite([
3543
self.Test("test_pass"),
3544
self.Test("test_self_ref"),
3545
self.Test("test_skip"),
3548
def _run_selftest_with_suite(self, **kwargs):
3549
old_flags = tests.selftest_debug_flags
3550
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3551
gc_on = gc.isenabled()
3555
output = self._run_selftest(test_suite_factory=self._get_suite,
3560
tests.selftest_debug_flags = old_flags
3561
self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
3562
self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
3565
def test_testsuite(self):
3566
self._run_selftest_with_suite()
3568
def test_pattern(self):
3569
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3570
self.assertNotContainsRe(out, b"test_skip")
3572
def test_exclude_pattern(self):
3573
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3574
self.assertNotContainsRe(out, b"test_skip")
3576
def test_random_seed(self):
3577
self._run_selftest_with_suite(random_seed="now")
3579
def test_matching_tests_first(self):
3580
self._run_selftest_with_suite(matching_tests_first=True,
3581
pattern="test_self_ref$")
3583
def test_starting_with_and_exclude(self):
3584
out = self._run_selftest_with_suite(starting_with=["bt."],
3585
exclude_pattern="test_skip$")
3586
self.assertNotContainsRe(out, b"test_skip")
3588
def test_additonal_decorator(self):
3589
self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator])
3592
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3593
"""Check warnings from tests staying alive are emitted with subunit"""
3595
_test_needs_features = [features.subunit]
3597
def _run_selftest_with_suite(self, **kwargs):
3598
return TestUncollectedWarnings._run_selftest_with_suite(
3599
self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3602
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3603
"""Check warnings from tests staying alive are emitted when forking"""
3606
class TestEnvironHandling(tests.TestCase):
3608
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3609
self.assertFalse('MYVAR' in os.environ)
3610
self.overrideEnv('MYVAR', '42')
3611
# We use an embedded test to make sure we fix the _captureVar bug
3613
class Test(tests.TestCase):
3615
# The first call save the 42 value
3616
self.overrideEnv('MYVAR', None)
3617
self.assertEqual(None, os.environ.get('MYVAR'))
3618
# Make sure we can call it twice
3619
self.overrideEnv('MYVAR', None)
3620
self.assertEqual(None, os.environ.get('MYVAR'))
3622
result = tests.TextTestResult(output, 0, 1)
3623
Test('test_me').run(result)
3624
if not result.wasStrictlySuccessful():
3625
self.fail(output.getvalue())
3626
# We get our value back
3627
self.assertEqual('42', os.environ.get('MYVAR'))
3630
class TestIsolatedEnv(tests.TestCase):
3631
"""Test isolating tests from os.environ.
3633
Since we use tests that are already isolated from os.environ a bit of care
3634
should be taken when designing the tests to avoid bootstrap side-effects.
3635
The tests start an already clean os.environ which allow doing valid
3636
assertions about which variables are present or not and design tests around
3640
class ScratchMonkey(tests.TestCase):
3645
def test_basics(self):
3646
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3647
# for tests.TestCase.
3648
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3649
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3650
# Being part of isolated_environ, BRZ_HOME should not appear here
3651
self.assertFalse('BRZ_HOME' in os.environ)
3652
# Make sure we know the definition of LINES: part of os.environ for
3654
self.assertTrue('LINES' in tests.isolated_environ)
3655
self.assertEqual('25', tests.isolated_environ['LINES'])
3656
self.assertEqual('25', os.environ['LINES'])
3658
def test_injecting_unknown_variable(self):
3659
# BRZ_HOME is known to be absent from os.environ
3660
test = self.ScratchMonkey('test_me')
3661
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3662
self.assertEqual('foo', os.environ['BRZ_HOME'])
3663
tests.restore_os_environ(test)
3664
self.assertFalse('BRZ_HOME' in os.environ)
3666
def test_injecting_known_variable(self):
3667
test = self.ScratchMonkey('test_me')
3668
# LINES is known to be present in os.environ
3669
tests.override_os_environ(test, {'LINES': '42'})
3670
self.assertEqual('42', os.environ['LINES'])
3671
tests.restore_os_environ(test)
3672
self.assertEqual('25', os.environ['LINES'])
3674
def test_deleting_variable(self):
3675
test = self.ScratchMonkey('test_me')
3676
# LINES is known to be present in os.environ
3677
tests.override_os_environ(test, {'LINES': None})
3678
self.assertTrue('LINES' not in os.environ)
3679
tests.restore_os_environ(test)
3680
self.assertEqual('25', os.environ['LINES'])
3683
class TestDocTestSuiteIsolation(tests.TestCase):
3684
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3686
Since tests.TestCase alreay provides an isolation from os.environ, we use
3687
the clean environment as a base for testing. To precisely capture the
3688
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3691
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3692
not `os.environ` so each test overrides it to suit its needs.
3696
def get_doctest_suite_for_string(self, klass, string):
3697
class Finder(doctest.DocTestFinder):
3699
def find(*args, **kwargs):
3700
test = doctest.DocTestParser().get_doctest(
3701
string, {}, 'foo', 'foo.py', 0)
3704
suite = klass(test_finder=Finder())
3707
def run_doctest_suite_for_string(self, klass, string):
3708
suite = self.get_doctest_suite_for_string(klass, string)
3710
result = tests.TextTestResult(output, 0, 1)
3712
return result, output
3714
def assertDocTestStringSucceds(self, klass, string):
3715
result, output = self.run_doctest_suite_for_string(klass, string)
3716
if not result.wasStrictlySuccessful():
3717
self.fail(output.getvalue())
3719
def assertDocTestStringFails(self, klass, string):
3720
result, output = self.run_doctest_suite_for_string(klass, string)
3721
if result.wasStrictlySuccessful():
3722
self.fail(output.getvalue())
3724
def test_injected_variable(self):
3725
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3728
>>> os.environ['LINES']
3731
# doctest.DocTestSuite fails as it sees '25'
3732
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3733
# tests.DocTestSuite sees '42'
3734
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3736
def test_deleted_variable(self):
3737
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3740
>>> os.environ.get('LINES')
3742
# doctest.DocTestSuite fails as it sees '25'
3743
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3744
# tests.DocTestSuite sees None
3745
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3748
class TestSelftestExcludePatterns(tests.TestCase):
3751
super(TestSelftestExcludePatterns, self).setUp()
3752
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3754
def suite_factory(self, keep_only=None, starting_with=None):
3755
"""A test suite factory with only a few tests."""
3756
class Test(tests.TestCase):
3758
# We don't need the full class path
3759
return self._testMethodName
3769
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3771
def assertTestList(self, expected, *selftest_args):
3772
# We rely on setUp installing the right test suite factory so we can
3773
# test at the command level without loading the whole test suite
3774
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3775
actual = out.splitlines()
3776
self.assertEqual(expected, actual)
3778
def test_full_list(self):
3779
self.assertTestList(['a', 'b', 'c'])
3781
def test_single_exclude(self):
3782
self.assertTestList(['b', 'c'], '-x', 'a')
3784
def test_mutiple_excludes(self):
3785
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3788
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3790
_test_needs_features = [features.subunit]
3793
super(TestCounterHooks, self).setUp()
3795
class Test(tests.TestCase):
3798
super(Test, self).setUp()
3799
self.hooks = hooks.Hooks()
3800
self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3801
self.install_counter_hook(self.hooks, 'myhook')
3806
def run_hook_once(self):
3807
for hook in self.hooks['myhook']:
3810
self.test_class = Test
3812
def assertHookCalls(self, expected_calls, test_name):
3813
test = self.test_class(test_name)
3814
result = unittest.TestResult()
3816
self.assertTrue(hasattr(test, '_counters'))
3817
self.assertTrue('myhook' in test._counters)
3818
self.assertEqual(expected_calls, test._counters['myhook'])
3820
def test_no_hook(self):
3821
self.assertHookCalls(0, 'no_hook')
3823
def test_run_hook_once(self):
3824
tt = features.testtools
3825
if tt.module.__version__ < (0, 9, 8):
3826
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3827
self.assertHookCalls(1, 'run_hook_once')