2836
2864
return iter(self._tests)
2867
def partition_tests(suite, count):
2868
"""Partition suite into count lists of tests."""
2870
tests = list(iter_suite_tests(suite))
2871
tests_per_process = int(math.ceil(float(len(tests)) / count))
2872
for block in range(count):
2873
low_test = block * tests_per_process
2874
high_test = low_test + tests_per_process
2875
process_tests = tests[low_test:high_test]
2876
result.append(process_tests)
2880
def fork_for_tests(suite):
2881
"""Take suite and start up one runner per CPU by forking()
2883
:return: An iterable of TestCase-like objects which can each have
2884
run(result) called on them to feed tests to result, and
2885
cleanup() called on them to stop them/kill children/end threads.
2887
concurrency = local_concurrency()
2889
from subunit import TestProtocolClient, ProtocolTestCase
2890
class TestInOtherProcess(ProtocolTestCase):
2891
# Should be in subunit, I think. RBC.
2892
def __init__(self, stream, pid):
2893
ProtocolTestCase.__init__(self, stream)
2896
def run(self, result):
2898
ProtocolTestCase.run(self, result)
2900
os.waitpid(self.pid, os.WNOHANG)
2901
# print "pid %d finished" % finished_process
2903
test_blocks = partition_tests(suite, concurrency)
2904
for process_tests in test_blocks:
2905
process_suite = TestSuite()
2906
process_suite.addTests(process_tests)
2907
c2pread, c2pwrite = os.pipe()
2912
# Leave stderr and stdout open so we can see test noise
2913
# Close stdin so that the child goes away if it decides to
2914
# read from stdin (otherwise its a roulette to see what
2915
# child actually gets keystrokes for pdb etc.
2918
stream = os.fdopen(c2pwrite, 'wb', 0)
2919
subunit_result = TestProtocolClient(stream)
2920
process_suite.run(subunit_result)
2925
stream = os.fdopen(c2pread, 'rb', 1)
2926
test = TestInOtherProcess(stream, pid)
2931
def reinvoke_for_tests(suite):
2932
"""Take suite and start up one runner per CPU using subprocess().
2934
:return: An iterable of TestCase-like objects which can each have
2935
run(result) called on them to feed tests to result, and
2936
cleanup() called on them to stop them/kill children/end threads.
2938
concurrency = local_concurrency()
2940
from subunit import TestProtocolClient, ProtocolTestCase
2941
class TestInSubprocess(ProtocolTestCase):
2942
def __init__(self, process, name):
2943
ProtocolTestCase.__init__(self, process.stdout)
2944
self.process = process
2945
self.process.stdin.close()
2948
def run(self, result):
2950
ProtocolTestCase.run(self, result)
2953
os.unlink(self.name)
2954
# print "pid %d finished" % finished_process
2955
test_blocks = partition_tests(suite, concurrency)
2956
for process_tests in test_blocks:
2957
# ugly; currently reimplement rather than reuses TestCase methods.
2958
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
2959
if not os.path.isfile(bzr_path):
2960
# We are probably installed. Assume sys.argv is the right file
2961
bzr_path = sys.argv[0]
2962
fd, test_list_file_name = tempfile.mkstemp()
2963
test_list_file = os.fdopen(fd, 'wb', 1)
2964
for test in process_tests:
2965
test_list_file.write(test.id() + '\n')
2966
test_list_file.close()
2968
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
2970
# Note that without subunit in core - epic FAIL. But we don't really
2971
# care anyway because load-list means we don't get plugin tests.
2972
if '--no-plugins' in sys.argv:
2973
argv.append('--no-plugins')
2974
# stderr=STDOUT would be ideal, but until we prevent noise on
2975
# stderr it can interrupt the subunit protocol.
2976
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
2978
test = TestInSubprocess(process, test_list_file_name)
2981
os.unlink(test_list_file_name)
2986
def cpucount(content):
2987
lines = content.splitlines()
2988
prefix = 'processor'
2990
if line.startswith(prefix):
2991
concurrency = int(line[line.find(':')+1:]) + 1
2995
def local_concurrency():
2997
content = file('/proc/cpuinfo', 'rb').read()
2998
concurrency = cpucount(content)
2999
except Exception, e:
3004
class BZRTransformingResult(unittest.TestResult):
3006
def __init__(self, target):
3007
unittest.TestResult.__init__(self)
3008
self.result = target
3010
def startTest(self, test):
3011
self.result.startTest(test)
3013
def stopTest(self, test):
3014
self.result.stopTest(test)
3016
def addError(self, test, err):
3017
feature = self._error_looks_like('UnavailableFeature: ', err)
3018
if feature is not None:
3019
self.result.addNotSupported(test, feature)
3021
self.result.addError(test, err)
3023
def addFailure(self, test, err):
3024
known = self._error_looks_like('KnownFailure: ', err)
3025
if known is not None:
3026
self.result._addKnownFailure(test, [KnownFailure,
3027
KnownFailure(known), None])
3029
self.result.addFailure(test, err)
3031
def addSkip(self, test, reason):
3032
self.result.addSkip(test, reason)
3034
def addSuccess(self, test):
3035
self.result.addSuccess(test)
3037
def _error_looks_like(self, prefix, err):
3038
"""Deserialize exception and returns the stringify value."""
3042
if isinstance(exc, subunit.RemoteException):
3043
# stringify the exception gives access to the remote traceback
3044
# We search the last line for 'prefix'
3045
lines = str(exc).split('\n')
3047
last = lines[-2] # -1 is empty, final \n
3050
if last.startswith(prefix):
3051
value = last[len(prefix):]
2839
3055
# Controlled by "bzr selftest -E=..." option
2840
3056
selftest_debug_flags = set()