212
212
osutils.set_or_unset_env(var, value)
215
def _clear__type_equality_funcs(test):
216
"""Cleanup bound methods stored on TestCase instances
218
Clear the dict breaking a few (mostly) harmless cycles in the affected
219
unittests released with Python 2.6 and initial Python 2.7 versions.
221
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
shipped in Oneiric, an object with no clear method was used, hence the
223
extra complications, see bug 809048 for details.
225
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
if type_equality_funcs is not None:
227
tef_clear = getattr(type_equality_funcs, "clear", None)
228
if tef_clear is None:
229
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
if tef_instance_dict is not None:
231
tef_clear = tef_instance_dict.clear
232
if tef_clear is not None:
215
236
class ExtendedTestResult(testtools.TextTestResult):
216
237
"""Accepts, reports and accumulates the results of running tests.
385
406
getDetails = getattr(test, "getDetails", None)
386
407
if getDetails is not None:
387
408
getDetails().clear()
388
# Clear _type_equality_funcs to try to stop TestCase instances
389
# from wasting memory. 'clear' is not available in all Python
390
# versions (bug 809048)
391
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
if type_equality_funcs is not None:
393
tef_clear = getattr(type_equality_funcs, "clear", None)
394
if tef_clear is None:
395
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
if tef_instance_dict is not None:
397
tef_clear = tef_instance_dict.clear
398
if tef_clear is not None:
409
_clear__type_equality_funcs(test)
400
410
self._traceback_from_test = None
402
412
def startTests(self):
503
513
self.not_applicable_count += 1
504
514
self.report_not_applicable(test, reason)
516
def _count_stored_tests(self):
517
"""Count of tests instances kept alive due to not succeeding"""
518
return self.error_count + self.failure_count + self.known_failure_count
506
520
def _post_mortem(self, tb=None):
507
521
"""Start a PDB post mortem session."""
508
522
if os.environ.get('BZR_TEST_PDB', None):
983
997
for feature in getattr(self, '_test_needs_features', []):
984
998
self.requireFeature(feature)
985
999
self._cleanEnvironment()
986
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
987
config.CommandLineSection())
1000
if bzrlib.global_state is not None:
1001
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1002
config.CommandLineSection())
988
1003
self._silenceUI()
989
1004
self._startLogFile()
990
1005
self._benchcalls = []
2568
2583
root = TestCaseWithMemoryTransport.TEST_ROOT
2570
# Make sure we get a readable and accessible home for .bzr.log
2571
# and/or config files, and not fallback to weird defaults (see
2572
# http://pad.lv/825027).
2573
self.assertIs(None, os.environ.get('BZR_HOME', None))
2574
os.environ['BZR_HOME'] = root
2575
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2576
del os.environ['BZR_HOME']
2577
except Exception, e:
2578
self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2584
# Make sure we get a readable and accessible home for .bzr.log
2585
# and/or config files, and not fallback to weird defaults (see
2586
# http://pad.lv/825027).
2587
self.assertIs(None, os.environ.get('BZR_HOME', None))
2588
os.environ['BZR_HOME'] = root
2589
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2590
del os.environ['BZR_HOME']
2579
2591
# Hack for speed: remember the raw bytes of the dirstate file so that
2580
2592
# we don't need to re-open the wt to check it hasn't changed.
2581
2593
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2947
2959
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2949
2961
format = self.resolve_format(format=format)
2962
if not format.supports_workingtrees:
2963
b = self.make_branch(relpath+'.branch', format=format)
2964
return b.create_checkout(relpath, lightweight=True)
2950
2965
b = self.make_branch(relpath, format=format)
2952
2967
return b.bzrdir.create_workingtree()
3251
3266
result_decorators=result_decorators,
3253
3268
runner.stop_on_failure=stop_on_failure
3269
if isinstance(suite, unittest.TestSuite):
3270
# Empty out _tests list of passed suite and populate new TestSuite
3271
suite._tests[:], suite = [], TestSuite(suite)
3254
3272
# built in decorator factories:
3256
3274
random_order(random_seed, runner),
3355
3373
class TestDecorator(TestUtil.TestSuite):
3356
3374
"""A decorator for TestCase/TestSuite objects.
3358
Usually, subclasses should override __iter__(used when flattening test
3359
suites), which we do to filter, reorder, parallelise and so on, run() and
3376
Contains rather than flattening suite passed on construction
3363
def __init__(self, suite):
3364
TestUtil.TestSuite.__init__(self)
3367
def countTestCases(self):
3370
cases += test.countTestCases()
3377
def run(self, result):
3378
# Use iteration on self, not self._tests, to allow subclasses to hook
3381
if result.shouldStop:
3379
def __init__(self, suite=None):
3380
super(TestDecorator, self).__init__()
3381
if suite is not None:
3384
# Don't need subclass run method with suite emptying
3385
run = unittest.TestSuite.run
3387
3388
class CountingDecorator(TestDecorator):
3398
3399
"""A decorator which excludes test matching an exclude pattern."""
3400
3401
def __init__(self, suite, exclude_pattern):
3401
TestDecorator.__init__(self, suite)
3402
self.exclude_pattern = exclude_pattern
3403
self.excluded = False
3407
return iter(self._tests)
3408
self.excluded = True
3409
suite = exclude_tests_by_re(self, self.exclude_pattern)
3411
self.addTests(suite)
3412
return iter(self._tests)
3402
super(ExcludeDecorator, self).__init__(
3403
exclude_tests_by_re(suite, exclude_pattern))
3415
3406
class FilterTestsDecorator(TestDecorator):
3416
3407
"""A decorator which filters tests to those matching a pattern."""
3418
3409
def __init__(self, suite, pattern):
3419
TestDecorator.__init__(self, suite)
3420
self.pattern = pattern
3421
self.filtered = False
3425
return iter(self._tests)
3426
self.filtered = True
3427
suite = filter_suite_by_re(self, self.pattern)
3429
self.addTests(suite)
3430
return iter(self._tests)
3410
super(FilterTestsDecorator, self).__init__(
3411
filter_suite_by_re(suite, pattern))
3433
3414
class RandomDecorator(TestDecorator):
3434
3415
"""A decorator which randomises the order of its tests."""
3436
3417
def __init__(self, suite, random_seed, stream):
3437
TestDecorator.__init__(self, suite)
3438
self.random_seed = random_seed
3439
self.randomised = False
3440
self.stream = stream
3444
return iter(self._tests)
3445
self.randomised = True
3446
self.stream.write("Randomizing test order using seed %s\n\n" %
3447
(self.actual_seed()))
3418
random_seed = self.actual_seed(random_seed)
3419
stream.write("Randomizing test order using seed %s\n\n" %
3448
3421
# Initialise the random number generator.
3449
random.seed(self.actual_seed())
3450
suite = randomize_suite(self)
3452
self.addTests(suite)
3453
return iter(self._tests)
3422
random.seed(random_seed)
3423
super(RandomDecorator, self).__init__(randomize_suite(suite))
3455
def actual_seed(self):
3456
if self.random_seed == "now":
3426
def actual_seed(seed):
3457
3428
# We convert the seed to a long to make it reuseable across
3458
3429
# invocations (because the user can reenter it).
3459
self.random_seed = long(time.time())
3430
return long(time.time())
3461
3432
# Convert the seed to a long if we can
3463
self.random_seed = long(self.random_seed)
3435
except (TypeError, ValueError):
3466
return self.random_seed
3469
3440
class TestFirstDecorator(TestDecorator):
3470
3441
"""A decorator which moves named tests to the front."""
3472
3443
def __init__(self, suite, pattern):
3473
TestDecorator.__init__(self, suite)
3474
self.pattern = pattern
3475
self.filtered = False
3479
return iter(self._tests)
3480
self.filtered = True
3481
suites = split_suite_by_re(self, self.pattern)
3483
self.addTests(suites)
3484
return iter(self._tests)
3444
super(TestFirstDecorator, self).__init__()
3445
self.addTests(split_suite_by_re(suite, pattern))
3487
3448
def partition_tests(suite, count):
3534
3495
os.waitpid(self.pid, 0)
3536
3497
test_blocks = partition_tests(suite, concurrency)
3498
# Clear the tests from the original suite so it doesn't keep them alive
3499
suite._tests[:] = []
3537
3500
for process_tests in test_blocks:
3538
process_suite = TestUtil.TestSuite()
3539
process_suite.addTests(process_tests)
3501
process_suite = TestUtil.TestSuite(process_tests)
3502
# Also clear each split list so new suite has only reference
3503
process_tests[:] = []
3540
3504
c2pread, c2pwrite = os.pipe()
3541
3505
pid = os.fork()
3548
3512
# read from stdin (otherwise its a roulette to see what
3549
3513
# child actually gets keystrokes for pdb etc).
3550
3514
sys.stdin.close()
3515
# GZ 2011-06-16: Why set stdin to None? Breaks multi fork.
3552
3517
stream = os.fdopen(c2pwrite, 'wb', 1)
3553
3518
subunit_result = AutoTimingTestResultDecorator(
3554
TestProtocolClient(stream))
3519
SubUnitBzrProtocolClient(stream))
3555
3520
process_suite.run(subunit_result)
3522
# GZ 2011-06-16: Is always exiting with silent success
3523
# really the right thing? Hurts debugging.
3559
3526
os.close(c2pwrite)
3666
3633
# with proper exclusion rules.
3667
3634
# -Ethreads Will display thread ident at creation/join time to
3668
3635
# help track thread leaks
3636
# -Euncollected_cases Display the identity of any test cases that weren't
3637
# deallocated after being completed.
3670
3638
# -Econfig_stats Will collect statistics using addDetail
3671
3639
selftest_debug_flags = set()
4470
4438
from subunit.test_results import AutoTimingTestResultDecorator
4471
4439
class SubUnitBzrProtocolClient(TestProtocolClient):
4441
def stopTest(self, test):
4442
super(SubUnitBzrProtocolClient, self).stopTest(test)
4443
_clear__type_equality_funcs(test)
4473
4445
def addSuccess(self, test, details=None):
4474
4446
# The subunit client always includes the details in the subunit
4475
4447
# stream, but we don't want to include it in ours.