/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Ian Clatworthy
  • Date: 2009-04-03 00:07:49 UTC
  • mfrom: (4241 +trunk)
  • mto: (4265.1.1 bbc-merge)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: ian.clatworthy@canonical.com-20090403000749-sevl9klctwfi8jml
merge bzr.dev r4241

Show diffs side-by-side

added added

removed removed

Lines of Context:
545
545
            actionTaken = "Listed"
546
546
        else:
547
547
            try:
548
 
                from testtools import ThreadsafeForwardingResult
 
548
                import testtools
549
549
            except ImportError:
550
550
                test.run(result)
551
551
            else:
552
 
                if type(result) == ThreadsafeForwardingResult:
 
552
                if isinstance(test, testtools.ConcurrentTestSuite):
 
553
                    # We need to catch bzr specific behaviors
553
554
                    test.run(BZRTransformingResult(result))
554
555
                else:
555
556
                    test.run(result)
794
795
        import pdb
795
796
        pdb.Pdb().set_trace(sys._getframe().f_back)
796
797
 
797
 
    def exc_info(self):
798
 
        absent_attr = object()
799
 
        exc_info = getattr(self, '_exc_info', absent_attr)
800
 
        if exc_info is absent_attr:
801
 
            exc_info = getattr(self, '_TestCase__exc_info')
802
 
        return exc_info()
803
 
 
804
798
    def _check_leaked_threads(self):
805
799
        active = threading.activeCount()
806
800
        leaked_threads = active - TestCase._active_threads
1280
1274
            'NO_PROXY': None,
1281
1275
            'all_proxy': None,
1282
1276
            'ALL_PROXY': None,
1283
 
            # Nobody cares about these ones AFAIK. So far at
 
1277
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1284
1278
            # least. If you do (care), please update this comment
1285
 
            # -- vila 20061212
 
1279
            # -- vila 20080401
1286
1280
            'ftp_proxy': None,
1287
1281
            'FTP_PROXY': None,
1288
1282
            'BZR_REMOTE_PATH': None,
1315
1309
    def _do_skip(self, result, reason):
1316
1310
        addSkip = getattr(result, 'addSkip', None)
1317
1311
        if not callable(addSkip):
1318
 
            result.addError(self, self.exc_info())
 
1312
            result.addError(self, sys.exc_info())
1319
1313
        else:
1320
1314
            addSkip(self, reason)
1321
1315
 
1354
1348
                        self.tearDown()
1355
1349
                        return
1356
1350
                    except:
1357
 
                        result.addError(self, self.exc_info())
 
1351
                        result.addError(self, sys.exc_info())
1358
1352
                        return
1359
1353
 
1360
1354
                    ok = False
1362
1356
                        testMethod()
1363
1357
                        ok = True
1364
1358
                    except self.failureException:
1365
 
                        result.addFailure(self, self.exc_info())
 
1359
                        result.addFailure(self, sys.exc_info())
1366
1360
                    except TestSkipped, e:
1367
1361
                        if not e.args:
1368
1362
                            reason = "No reason given."
1372
1366
                    except KeyboardInterrupt:
1373
1367
                        raise
1374
1368
                    except:
1375
 
                        result.addError(self, self.exc_info())
 
1369
                        result.addError(self, sys.exc_info())
1376
1370
 
1377
1371
                    try:
1378
1372
                        self.tearDown()
1383
1377
                    except KeyboardInterrupt:
1384
1378
                        raise
1385
1379
                    except:
1386
 
                        result.addError(self, self.exc_info())
 
1380
                        result.addError(self, sys.exc_info())
1387
1381
                        ok = False
1388
1382
                    if ok: result.addSuccess(self)
1389
1383
                finally:
2683
2677
 
2684
2678
# A registry where get() returns a suite decorator.
2685
2679
parallel_registry = registry.Registry()
 
2680
 
 
2681
 
2686
2682
def fork_decorator(suite):
2687
2683
    concurrency = local_concurrency()
2688
2684
    if concurrency == 1:
2690
2686
    from testtools import ConcurrentTestSuite
2691
2687
    return ConcurrentTestSuite(suite, fork_for_tests)
2692
2688
parallel_registry.register('fork', fork_decorator)
 
2689
 
 
2690
 
2693
2691
def subprocess_decorator(suite):
2694
2692
    concurrency = local_concurrency()
2695
2693
    if concurrency == 1:
2881
2879
    """Take suite and start up one runner per CPU by forking()
2882
2880
 
2883
2881
    :return: An iterable of TestCase-like objects which can each have
2884
 
        run(result) called on them to feed tests to result, and
2885
 
        cleanup() called on them to stop them/kill children/end threads.
 
2882
        run(result) called on them to feed tests to result.
2886
2883
    """
2887
2884
    concurrency = local_concurrency()
2888
2885
    result = []
2898
2895
                ProtocolTestCase.run(self, result)
2899
2896
            finally:
2900
2897
                os.waitpid(self.pid, os.WNOHANG)
2901
 
            # print "pid %d finished" % finished_process
2902
2898
 
2903
2899
    test_blocks = partition_tests(suite, concurrency)
2904
2900
    for process_tests in test_blocks:
2912
2908
                # Leave stderr and stdout open so we can see test noise
2913
2909
                # Close stdin so that the child goes away if it decides to
2914
2910
                # read from stdin (otherwise its a roulette to see what
2915
 
                # child actually gets keystrokes for pdb etc.
 
2911
                # child actually gets keystrokes for pdb etc).
2916
2912
                sys.stdin.close()
2917
2913
                sys.stdin = None
2918
 
                stream = os.fdopen(c2pwrite, 'wb', 0)
 
2914
                stream = os.fdopen(c2pwrite, 'wb', 1)
2919
2915
                subunit_result = TestProtocolClient(stream)
2920
2916
                process_suite.run(subunit_result)
2921
2917
            finally:
2932
2928
    """Take suite and start up one runner per CPU using subprocess().
2933
2929
 
2934
2930
    :return: An iterable of TestCase-like objects which can each have
2935
 
        run(result) called on them to feed tests to result, and
2936
 
        cleanup() called on them to stop them/kill children/end threads.
 
2931
        run(result) called on them to feed tests to result.
2937
2932
    """
2938
2933
    concurrency = local_concurrency()
2939
2934
    result = []
3328
3323
                   'bzrlib.tests.test_directory_service',
3329
3324
                   'bzrlib.tests.test_dirstate',
3330
3325
                   'bzrlib.tests.test_email_message',
 
3326
                   'bzrlib.tests.test_eol_filters',
3331
3327
                   'bzrlib.tests.test_errors',
3332
3328
                   'bzrlib.tests.test_export',
3333
3329
                   'bzrlib.tests.test_extract',
3593
3589
    the scenario name at the end of its id(), and updating the test object's
3594
3590
    __dict__ with the scenario_param_dict.
3595
3591
 
 
3592
    >>> import bzrlib.tests.test_sampler
3596
3593
    >>> r = multiply_tests(
3597
3594
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3598
3595
    ...     [('one', dict(param=1)),