104
105
import bzrlib.trace
105
106
from bzrlib.transport import (
110
import bzrlib.transport
111
110
from bzrlib.trace import mutter, note
112
111
from bzrlib.tests import (
116
from bzrlib.tests.http_server import HttpServer
117
from bzrlib.tests.TestUtil import (
121
from bzrlib.tests.treeshape import build_tree_contents
122
116
from bzrlib.ui import NullProgressView
123
117
from bzrlib.ui.text import TextUIFactory
124
118
import bzrlib.version_info_formats.format_custom
140
134
SUBUNIT_SEEK_SET = 0
141
135
SUBUNIT_SEEK_CUR = 1
137
# These are intentionally brought into this namespace. That way plugins, etc
138
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
139
TestSuite = TestUtil.TestSuite
140
TestLoader = TestUtil.TestLoader
144
class ExtendedTestResult(unittest._TextTestResult):
142
class ExtendedTestResult(testtools.TextTestResult):
145
143
"""Accepts, reports and accumulates the results of running tests.
147
145
Compared to the unittest version this class adds support for
168
166
:param bench_history: Optionally, a writable file object to accumulate
169
167
benchmark results.
171
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
169
testtools.TextTestResult.__init__(self, stream)
172
170
if bench_history is not None:
173
171
from bzrlib.version import _get_bzr_source_tree
174
172
src_tree = _get_bzr_source_tree()
196
194
self._overall_start_time = time.time()
197
195
self._strict = strict
196
self._first_thread_leaker_id = None
197
self._tests_leaking_threads_count = 0
199
199
def stopTestRun(self):
200
200
run = self.testsRun
201
201
actionTaken = "Ran"
202
202
stopTime = time.time()
203
203
timeTaken = stopTime - self.startTime
205
self.stream.writeln(self.separator2)
206
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
204
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
205
# the parent class method is similar have to duplicate
206
self._show_list('ERROR', self.errors)
207
self._show_list('FAIL', self.failures)
208
self.stream.write(self.sep2)
209
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
207
210
run, run != 1 and "s" or "", timeTaken))
208
self.stream.writeln()
209
211
if not self.wasSuccessful():
210
212
self.stream.write("FAILED (")
211
213
failed, errored = map(len, (self.failures, self.errors))
218
220
if failed or errored: self.stream.write(", ")
219
221
self.stream.write("known_failure_count=%d" %
220
222
self.known_failure_count)
221
self.stream.writeln(")")
223
self.stream.write(")\n")
223
225
if self.known_failure_count:
224
self.stream.writeln("OK (known_failures=%d)" %
226
self.stream.write("OK (known_failures=%d)\n" %
225
227
self.known_failure_count)
227
self.stream.writeln("OK")
229
self.stream.write("OK\n")
228
230
if self.skip_count > 0:
229
231
skipped = self.skip_count
230
self.stream.writeln('%d test%s skipped' %
232
self.stream.write('%d test%s skipped\n' %
231
233
(skipped, skipped != 1 and "s" or ""))
232
234
if self.unsupported:
233
235
for feature, count in sorted(self.unsupported.items()):
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
236
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
235
237
(feature, count))
237
239
ok = self.wasStrictlySuccessful()
239
241
ok = self.wasSuccessful()
240
if TestCase._first_thread_leaker_id:
242
if self._first_thread_leaker_id:
241
243
self.stream.write(
242
244
'%s is leaking threads among %d leaking tests.\n' % (
243
TestCase._first_thread_leaker_id,
244
TestCase._leaking_threads_tests))
245
self._first_thread_leaker_id,
246
self._tests_leaking_threads_count))
245
247
# We don't report the main thread as an active one.
246
248
self.stream.write(
247
249
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
250
% (len(self._active_threads) - 1))
250
252
def getDescription(self, test):
276
278
def _shortened_test_description(self, test):
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
280
what = re.sub(r'^bzrlib\.tests\.', '', what)
281
283
def startTest(self, test):
282
unittest.TestResult.startTest(self, test)
284
super(ExtendedTestResult, self).startTest(test)
283
285
if self.count == 0:
284
286
self.startTests()
285
288
self.report_test_start(test)
286
289
test.number = self.count
287
290
self._recordTestStartTime()
291
# Only check for thread leaks if the test case supports cleanups
292
addCleanup = getattr(test, "addCleanup", None)
293
if addCleanup is not None:
294
addCleanup(self._check_leaked_threads, test)
289
296
def startTests(self):
291
if getattr(sys, 'frozen', None) is None:
292
bzr_path = osutils.realpath(sys.argv[0])
294
bzr_path = sys.executable
296
'bzr selftest: %s\n' % (bzr_path,))
299
bzrlib.__path__[0],))
301
' bzr-%s python-%s %s\n' % (
302
bzrlib.version_string,
303
bzrlib._format_version_tuple(sys.version_info),
304
platform.platform(aliased=1),
306
self.stream.write('\n')
297
self.report_tests_starting()
298
self._active_threads = threading.enumerate()
300
def _check_leaked_threads(self, test):
301
"""See if any threads have leaked since last call
303
A sample of live threads is stored in the _active_threads attribute,
304
when this method runs it compares the current live threads and any not
305
in the previous sample are treated as having leaked.
307
now_active_threads = set(threading.enumerate())
308
threads_leaked = now_active_threads.difference(self._active_threads)
310
self._report_thread_leak(test, threads_leaked, now_active_threads)
311
self._tests_leaking_threads_count += 1
312
if self._first_thread_leaker_id is None:
313
self._first_thread_leaker_id = test.id()
314
self._active_threads = now_active_threads
308
316
def _recordTestStartTime(self):
309
317
"""Record that a test has started."""
310
318
self._start_time = time.time()
312
def _cleanupLogFile(self, test):
313
# We can only do this if we have one of our TestCases, not if
315
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
316
if setKeepLogfile is not None:
319
320
def addError(self, test, err):
320
321
"""Tell result that test finished with an error.
323
324
fails with an unexpected error.
325
326
self._post_mortem()
326
unittest.TestResult.addError(self, test, err)
327
super(ExtendedTestResult, self).addError(test, err)
327
328
self.error_count += 1
328
329
self.report_error(test, err)
329
330
if self.stop_early:
331
self._cleanupLogFile(test)
333
333
def addFailure(self, test, err):
334
334
"""Tell result that test failed.
337
337
fails because e.g. an assert() method failed.
339
339
self._post_mortem()
340
unittest.TestResult.addFailure(self, test, err)
340
super(ExtendedTestResult, self).addFailure(test, err)
341
341
self.failure_count += 1
342
342
self.report_failure(test, err)
343
343
if self.stop_early:
345
self._cleanupLogFile(test)
347
346
def addSuccess(self, test, details=None):
348
347
"""Tell result that test completed successfully.
356
355
self._formatTime(benchmark_time),
358
357
self.report_success(test)
359
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
358
super(ExtendedTestResult, self).addSuccess(test)
361
359
test._log_contents = ''
363
361
def addExpectedFailure(self, test, err):
401
399
raise errors.BzrError("Unknown whence %r" % whence)
403
def report_cleaning_up(self):
401
def report_tests_starting(self):
402
"""Display information before the test run begins"""
403
if getattr(sys, 'frozen', None) is None:
404
bzr_path = osutils.realpath(sys.argv[0])
406
bzr_path = sys.executable
408
'bzr selftest: %s\n' % (bzr_path,))
411
bzrlib.__path__[0],))
413
' bzr-%s python-%s %s\n' % (
414
bzrlib.version_string,
415
bzrlib._format_version_tuple(sys.version_info),
416
platform.platform(aliased=1),
418
self.stream.write('\n')
420
def report_test_start(self, test):
421
"""Display information on the test just about to be run"""
423
def _report_thread_leak(self, test, leaked_threads, active_threads):
424
"""Display information on a test that leaked one or more threads"""
425
# GZ 2010-09-09: A leak summary reported separately from the general
426
# thread debugging would be nice. Tests under subunit
427
# need something not using stream, perhaps adding a
428
# testtools details object would be fitting.
429
if 'threads' in selftest_debug_flags:
430
self.stream.write('%s is leaking, active is now %d\n' %
431
(test.id(), len(active_threads)))
406
433
def startTestRun(self):
407
434
self.startTime = time.time()
444
471
self.pb.finished()
445
472
super(TextTestResult, self).stopTestRun()
447
def startTestRun(self):
448
super(TextTestResult, self).startTestRun()
474
def report_tests_starting(self):
475
super(TextTestResult, self).report_tests_starting()
449
476
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
451
def printErrors(self):
452
# clear the pb to make room for the error listing
454
super(TextTestResult, self).printErrors()
456
478
def _progress_prefix_text(self):
457
479
# the longer this text, the less space we have to show the test
513
534
def report_unsupported(self, test, feature):
514
535
"""test cannot be run because feature is missing."""
516
def report_cleaning_up(self):
517
self.pb.update('Cleaning up')
520
538
class VerboseTestResult(ExtendedTestResult):
521
539
"""Produce long output, with one line per test run plus times"""
528
546
result = a_string
529
547
return result.ljust(final_width)
531
def startTestRun(self):
532
super(VerboseTestResult, self).startTestRun()
549
def report_tests_starting(self):
533
550
self.stream.write('running %d tests...\n' % self.num_tests)
551
super(VerboseTestResult, self).report_tests_starting()
535
553
def report_test_start(self, test):
537
554
name = self._shortened_test_description(test)
538
555
width = osutils.terminal_width()
539
556
if width is not None:
551
568
return '%s%s' % (indent, err[1])
553
570
def report_error(self, test, err):
554
self.stream.writeln('ERROR %s\n%s'
571
self.stream.write('ERROR %s\n%s\n'
555
572
% (self._testTimeString(test),
556
573
self._error_summary(err)))
558
575
def report_failure(self, test, err):
559
self.stream.writeln(' FAIL %s\n%s'
576
self.stream.write(' FAIL %s\n%s\n'
560
577
% (self._testTimeString(test),
561
578
self._error_summary(err)))
563
580
def report_known_failure(self, test, err):
564
self.stream.writeln('XFAIL %s\n%s'
581
self.stream.write('XFAIL %s\n%s\n'
565
582
% (self._testTimeString(test),
566
583
self._error_summary(err)))
568
585
def report_success(self, test):
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
586
self.stream.write(' OK %s\n' % self._testTimeString(test))
570
587
for bench_called, stats in getattr(test, '_benchcalls', []):
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
588
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
572
589
stats.pprint(file=self.stream)
573
590
# flush the stream so that we get smooth output. This verbose mode is
574
591
# used to show the output in PQM.
575
592
self.stream.flush()
577
594
def report_skip(self, test, reason):
578
self.stream.writeln(' SKIP %s\n%s'
595
self.stream.write(' SKIP %s\n%s\n'
579
596
% (self._testTimeString(test), reason))
581
598
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
599
self.stream.write(' N/A %s\n %s\n'
583
600
% (self._testTimeString(test), reason))
585
602
def report_unsupported(self, test, feature):
586
603
"""test cannot be run because feature is missing."""
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
604
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
588
605
%(self._testTimeString(test), feature))
749
766
# XXX: Should probably unify more with CannedInputUIFactory or a
750
767
# particular configuration of TextUIFactory, or otherwise have a clearer
751
768
# idea of how they're supposed to be different.
752
# See https://bugs.edge.launchpad.net/bzr/+bug/408213
769
# See https://bugs.launchpad.net/bzr/+bug/408213
754
771
def __init__(self, stdout=None, stderr=None, stdin=None):
755
772
if stdin is not None:
789
806
routine, and to build and check bzr trees.
791
808
In addition to the usual method of overriding tearDown(), this class also
792
allows subclasses to register functions into the _cleanups list, which is
809
allows subclasses to register cleanup functions via addCleanup, which are
793
810
run in order as the object is torn down. It's less likely this will be
794
811
accidentally overlooked.
797
_active_threads = None
798
_leaking_threads_tests = 0
799
_first_thread_leaker_id = None
800
_log_file_name = None
801
815
# record lsprof data when performing benchmark calls.
802
816
_gather_lsprof_in_benchmarks = False
804
818
def __init__(self, methodName='testMethod'):
805
819
super(TestCase, self).__init__(methodName)
807
820
self._directory_isolation = True
808
821
self.exception_handlers.insert(0,
809
822
(UnavailableFeature, self._do_unsupported_or_skip))
827
840
self._track_transports()
828
841
self._track_locks()
829
842
self._clear_debug_flags()
830
TestCase._active_threads = threading.activeCount()
831
self.addCleanup(self._check_leaked_threads)
834
845
# debug a frame up.
836
847
pdb.Pdb().set_trace(sys._getframe().f_back)
838
def _check_leaked_threads(self):
839
active = threading.activeCount()
840
leaked_threads = active - TestCase._active_threads
841
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
TestCase._leaking_threads_tests += 1
850
if TestCase._first_thread_leaker_id is None:
851
TestCase._first_thread_leaker_id = self.id()
849
def discardDetail(self, name):
850
"""Extend the addDetail, getDetails api so we can remove a detail.
852
eg. bzr always adds the 'log' detail at startup, but we don't want to
853
include it for skipped, xfail, etc tests.
855
It is safe to call this for a detail that doesn't exist, in case this
856
gets called multiple times.
858
# We cheat. details is stored in __details which means we shouldn't
859
# touch it. but getDetails() returns the dict directly, so we can
861
details = self.getDetails()
853
865
def _clear_debug_flags(self):
854
866
"""Prevent externally set debug flags affecting tests.
1028
1040
self.addCleanup(transport_server.stop_server)
1029
1041
# Obtain a real transport because if the server supplies a password, it
1030
1042
# will be hidden from the base on the client side.
1031
t = get_transport(transport_server.get_url())
1043
t = _mod_transport.get_transport(transport_server.get_url())
1032
1044
# Some transport servers effectively chroot the backing transport;
1033
1045
# others like SFTPServer don't - users of the transport can walk up the
1034
1046
# transport to read the entire backing transport. This wouldn't matter
1456
1468
The file is removed as the test is torn down.
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1459
self._log_file = os.fdopen(fileno, 'w+')
1470
self._log_file = StringIO()
1460
1471
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1461
self._log_file_name = name
1462
1472
self.addCleanup(self._finishLogFile)
1464
1474
def _finishLogFile(self):
1487
1497
debug.debug_flags.discard('strict_locks')
1489
def addCleanup(self, callable, *args, **kwargs):
1490
"""Arrange to run a callable when this case is torn down.
1492
Callables are run in the reverse of the order they are registered,
1493
ie last-in first-out.
1495
self._cleanups.append((callable, args, kwargs))
1497
1499
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
1500
"""Overrides an object attribute restoring it after the test.
1583
1585
"""This test has failed for some known reason."""
1584
1586
raise KnownFailure(reason)
1588
def _suppress_log(self):
1589
"""Remove the log info from details."""
1590
self.discardDetail('log')
1586
1592
def _do_skip(self, result, reason):
1593
self._suppress_log()
1587
1594
addSkip = getattr(result, 'addSkip', None)
1588
1595
if not callable(addSkip):
1589
1596
result.addSuccess(result)
1612
1621
self._do_skip(result, reason)
1624
def _report_skip(self, result, err):
1625
"""Override the default _report_skip.
1627
We want to strip the 'log' detail. If we waint until _do_skip, it has
1628
already been formatted into the 'reason' string, and we can't pull it
1631
self._suppress_log()
1632
super(TestCase, self)._report_skip(self, result, err)
1635
def _report_expected_failure(self, result, err):
1638
See _report_skip for motivation.
1640
self._suppress_log()
1641
super(TestCase, self)._report_expected_failure(self, result, err)
1615
1644
def _do_unsupported_or_skip(self, result, e):
1616
1645
reason = e.args[0]
1646
self._suppress_log()
1617
1647
addNotSupported = getattr(result, 'addNotSupported', None)
1618
1648
if addNotSupported is not None:
1619
1649
result.addNotSupported(self, reason)
1666
1696
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
1697
self._log_contents = unicodestr.encode('utf8')
1668
1698
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1699
if self._log_file is not None:
1700
log_contents = self._log_file.getvalue()
1680
1702
log_contents.decode('utf8')
1681
1703
except UnicodeDecodeError:
1682
1704
unicodestr = log_contents.decode('utf8', 'replace')
1683
1705
log_contents = unicodestr.encode('utf8')
1684
1706
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
1707
self._log_file = None
1713
1708
# Permit multiple calls to get_log until we clean it up in
1714
1709
# finishLogFile
1715
1710
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1725
1711
return log_contents
1727
return "No log file content and no log file name."
1713
return "No log file content."
1729
1715
def get_log(self):
1730
1716
"""Get a unicode string containing the log from bzrlib.trace.
1945
1931
variables. A value of None will unset the env variable.
1946
1932
The values must be strings. The change will only occur in the
1947
1933
child, so you don't need to fix the environment after running.
1948
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1934
:param skip_if_plan_to_signal: raise TestSkipped when true and system
1935
doesn't support signalling subprocesses.
1950
1936
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1952
1938
:returns: Popen object for the started process.
1954
1940
if skip_if_plan_to_signal:
1955
if not getattr(os, 'kill', None):
1956
raise TestSkipped("os.kill not available.")
1941
if os.name != "posix":
1942
raise TestSkipped("Sending signals not supported")
1958
1944
if env_changes is None:
1959
1945
env_changes = {}
2408
2396
made_control = self.make_bzrdir(relpath, format=format)
2409
2397
return made_control.create_repository(shared=shared)
2411
def make_smart_server(self, path):
2399
def make_smart_server(self, path, backing_server=None):
2400
if backing_server is None:
2401
backing_server = self.get_server()
2412
2402
smart_server = test_server.SmartTCPServer_for_testing()
2413
self.start_server(smart_server, self.get_server())
2414
remote_transport = get_transport(smart_server.get_url()).clone(path)
2403
self.start_server(smart_server, backing_server)
2404
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2415
2406
return remote_transport
2417
2408
def make_branch_and_memory_tree(self, relpath, format=None):
2433
2424
def setUp(self):
2434
2425
super(TestCaseWithMemoryTransport, self).setUp()
2426
# Ensure that ConnectedTransport doesn't leak sockets
2427
def get_transport_with_cleanup(*args, **kwargs):
2428
t = orig_get_transport(*args, **kwargs)
2429
if isinstance(t, _mod_transport.ConnectedTransport):
2430
self.addCleanup(t.disconnect)
2433
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2434
get_transport_with_cleanup)
2435
2435
self._make_test_root()
2436
2436
self.addCleanup(os.chdir, os.getcwdu())
2437
2437
self.makeAndChdirToTestDir()
2578
2582
content = "contents of %s%s" % (name.encode('utf-8'), end)
2579
2583
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2581
def build_tree_contents(self, shape):
2582
build_tree_contents(shape)
2585
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2584
2587
def assertInWorkingTree(self, path, root_path='.', tree=None):
2585
2588
"""Assert whether path or paths are in the WorkingTree"""
3191
3197
def partition_tests(suite, count):
3192
3198
"""Partition suite into count lists of tests."""
3194
tests = list(iter_suite_tests(suite))
3195
tests_per_process = int(math.ceil(float(len(tests)) / count))
3196
for block in range(count):
3197
low_test = block * tests_per_process
3198
high_test = low_test + tests_per_process
3199
process_tests = tests[low_test:high_test]
3200
result.append(process_tests)
3199
# This just assigns tests in a round-robin fashion. On one hand this
3200
# splits up blocks of related tests that might run faster if they shared
3201
# resources, but on the other it avoids assigning blocks of slow tests to
3202
# just one partition. So the slowest partition shouldn't be much slower
3204
partitions = [list() for i in range(count)]
3205
tests = iter_suite_tests(suite)
3206
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3207
partition.append(test)
3204
3211
def workaround_zealous_crypto_random():
3311
3318
if '--no-plugins' in sys.argv:
3312
3319
argv.append('--no-plugins')
3313
# stderr=STDOUT would be ideal, but until we prevent noise on
3314
# stderr it can interrupt the subunit protocol.
3315
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3320
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3321
# noise on stderr it can interrupt the subunit protocol.
3322
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3323
stdout=subprocess.PIPE,
3324
stderr=subprocess.PIPE,
3317
3326
test = TestInSubprocess(process, test_list_file_name)
3318
3327
result.append(test)
3633
3647
'bzrlib.tests.blackbox',
3634
3648
'bzrlib.tests.commands',
3649
'bzrlib.tests.doc_generate',
3635
3650
'bzrlib.tests.per_branch',
3636
3651
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3652
'bzrlib.tests.per_controldir',
3653
'bzrlib.tests.per_controldir_colo',
3638
3654
'bzrlib.tests.per_foreign_vcs',
3639
3655
'bzrlib.tests.per_interrepository',
3640
3656
'bzrlib.tests.per_intertree',
3653
3669
'bzrlib.tests.per_workingtree',
3654
3670
'bzrlib.tests.test__annotator',
3655
3671
'bzrlib.tests.test__bencode',
3672
'bzrlib.tests.test__btree_serializer',
3656
3673
'bzrlib.tests.test__chk_map',
3657
3674
'bzrlib.tests.test__dirstate_helpers',
3658
3675
'bzrlib.tests.test__groupcompress',
3701
3718
'bzrlib.tests.test_export',
3702
3719
'bzrlib.tests.test_extract',
3703
3720
'bzrlib.tests.test_fetch',
3721
'bzrlib.tests.test_fixtures',
3704
3722
'bzrlib.tests.test_fifo_cache',
3705
3723
'bzrlib.tests.test_filters',
3706
3724
'bzrlib.tests.test_ftp_transport',
3727
3745
'bzrlib.tests.test_knit',
3728
3746
'bzrlib.tests.test_lazy_import',
3729
3747
'bzrlib.tests.test_lazy_regex',
3748
'bzrlib.tests.test_library_state',
3730
3749
'bzrlib.tests.test_lock',
3731
3750
'bzrlib.tests.test_lockable_files',
3732
3751
'bzrlib.tests.test_lockdir',
3789
3808
'bzrlib.tests.test_switch',
3790
3809
'bzrlib.tests.test_symbol_versioning',
3791
3810
'bzrlib.tests.test_tag',
3811
'bzrlib.tests.test_test_server',
3792
3812
'bzrlib.tests.test_testament',
3793
3813
'bzrlib.tests.test_textfile',
3794
3814
'bzrlib.tests.test_textmerge',
3800
3820
'bzrlib.tests.test_transport_log',
3801
3821
'bzrlib.tests.test_tree',
3802
3822
'bzrlib.tests.test_treebuilder',
3823
'bzrlib.tests.test_treeshape',
3803
3824
'bzrlib.tests.test_tsort',
3804
3825
'bzrlib.tests.test_tuned_gzip',
3805
3826
'bzrlib.tests.test_ui',
3809
3830
'bzrlib.tests.test_urlutils',
3810
3831
'bzrlib.tests.test_version',
3811
3832
'bzrlib.tests.test_version_info',
3833
'bzrlib.tests.test_versionedfile',
3812
3834
'bzrlib.tests.test_weave',
3813
3835
'bzrlib.tests.test_whitebox',
3814
3836
'bzrlib.tests.test_win32utils',
4035
4058
:param new_id: The id to assign to it.
4036
4059
:return: The new test.
4038
new_test = copy(test)
4061
new_test = copy.copy(test)
4039
4062
new_test.id = lambda: new_id
4063
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4064
# causes cloned tests to share the 'details' dict. This makes it hard to
4065
# read the test output for parameterized tests, because tracebacks will be
4066
# associated with irrelevant tests.
4068
details = new_test._TestCase__details
4069
except AttributeError:
4070
# must be a different version of testtools than expected. Do nothing.
4073
# Reset the '__details' dict.
4074
new_test._TestCase__details = {}
4040
4075
return new_test
4102
4137
if test_id != None:
4103
4138
ui.ui_factory.clear_term()
4104
4139
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4140
# Ugly, but the last thing we want here is fail, so bear with it.
4141
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4142
).encode('ascii', 'replace')
4105
4143
sys.stderr.write('Unable to remove testing dir %s\n%s'
4106
% (os.path.basename(dirname), e))
4144
% (os.path.basename(dirname), printable_e))
4109
4147
class Feature(object):
4455
4504
from subunit import TestProtocolClient
4456
4505
from subunit.test_results import AutoTimingTestResultDecorator
4506
class SubUnitBzrProtocolClient(TestProtocolClient):
4508
def addSuccess(self, test, details=None):
4509
# The subunit client always includes the details in the subunit
4510
# stream, but we don't want to include it in ours.
4511
if details is not None and 'log' in details:
4513
return super(SubUnitBzrProtocolClient, self).addSuccess(
4457
4516
class SubUnitBzrRunner(TextTestRunner):
4458
4517
def run(self, test):
4459
4518
result = AutoTimingTestResultDecorator(
4460
TestProtocolClient(self.stream))
4519
SubUnitBzrProtocolClient(self.stream))
4461
4520
test.run(result)
4463
4522
except ImportError: