104
106
import bzrlib.trace
105
107
from bzrlib.transport import (
110
import bzrlib.transport
111
111
from bzrlib.trace import mutter, note
112
112
from bzrlib.tests import (
116
117
from bzrlib.tests.http_server import HttpServer
117
118
from bzrlib.tests.TestUtil import (
121
from bzrlib.tests.treeshape import build_tree_contents
122
122
from bzrlib.ui import NullProgressView
123
123
from bzrlib.ui.text import TextUIFactory
124
124
import bzrlib.version_info_formats.format_custom
168
168
:param bench_history: Optionally, a writable file object to accumulate
169
169
benchmark results.
171
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
171
testtools.TextTestResult.__init__(self, stream)
172
172
if bench_history is not None:
173
173
from bzrlib.version import _get_bzr_source_tree
174
174
src_tree = _get_bzr_source_tree()
201
201
actionTaken = "Ran"
202
202
stopTime = time.time()
203
203
timeTaken = stopTime - self.startTime
205
self.stream.writeln(self.separator2)
206
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
204
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
205
# the parent class method is similar have to duplicate
206
self._show_list('ERROR', self.errors)
207
self._show_list('FAIL', self.failures)
208
self.stream.write(self.sep2)
209
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
207
210
run, run != 1 and "s" or "", timeTaken))
208
self.stream.writeln()
209
211
if not self.wasSuccessful():
210
212
self.stream.write("FAILED (")
211
213
failed, errored = map(len, (self.failures, self.errors))
218
220
if failed or errored: self.stream.write(", ")
219
221
self.stream.write("known_failure_count=%d" %
220
222
self.known_failure_count)
221
self.stream.writeln(")")
223
self.stream.write(")\n")
223
225
if self.known_failure_count:
224
self.stream.writeln("OK (known_failures=%d)" %
226
self.stream.write("OK (known_failures=%d)\n" %
225
227
self.known_failure_count)
227
self.stream.writeln("OK")
229
self.stream.write("OK\n")
228
230
if self.skip_count > 0:
229
231
skipped = self.skip_count
230
self.stream.writeln('%d test%s skipped' %
232
self.stream.write('%d test%s skipped\n' %
231
233
(skipped, skipped != 1 and "s" or ""))
232
234
if self.unsupported:
233
235
for feature, count in sorted(self.unsupported.items()):
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
236
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
235
237
(feature, count))
237
239
ok = self.wasStrictlySuccessful()
276
278
def _shortened_test_description(self, test):
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
280
what = re.sub(r'^bzrlib\.tests\.', '', what)
281
283
def startTest(self, test):
282
unittest.TestResult.startTest(self, test)
284
super(ExtendedTestResult, self).startTest(test)
283
285
if self.count == 0:
284
286
self.startTests()
285
287
self.report_test_start(test)
358
360
self.report_success(test)
359
361
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
362
super(ExtendedTestResult, self).addSuccess(test)
361
363
test._log_contents = ''
363
365
def addExpectedFailure(self, test, err):
551
553
return '%s%s' % (indent, err[1])
553
555
def report_error(self, test, err):
554
self.stream.writeln('ERROR %s\n%s'
556
self.stream.write('ERROR %s\n%s\n'
555
557
% (self._testTimeString(test),
556
558
self._error_summary(err)))
558
560
def report_failure(self, test, err):
559
self.stream.writeln(' FAIL %s\n%s'
561
self.stream.write(' FAIL %s\n%s\n'
560
562
% (self._testTimeString(test),
561
563
self._error_summary(err)))
563
565
def report_known_failure(self, test, err):
564
self.stream.writeln('XFAIL %s\n%s'
566
self.stream.write('XFAIL %s\n%s\n'
565
567
% (self._testTimeString(test),
566
568
self._error_summary(err)))
568
570
def report_success(self, test):
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
571
self.stream.write(' OK %s\n' % self._testTimeString(test))
570
572
for bench_called, stats in getattr(test, '_benchcalls', []):
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
573
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
572
574
stats.pprint(file=self.stream)
573
575
# flush the stream so that we get smooth output. This verbose mode is
574
576
# used to show the output in PQM.
575
577
self.stream.flush()
577
579
def report_skip(self, test, reason):
578
self.stream.writeln(' SKIP %s\n%s'
580
self.stream.write(' SKIP %s\n%s\n'
579
581
% (self._testTimeString(test), reason))
581
583
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
584
self.stream.write(' N/A %s\n %s\n'
583
585
% (self._testTimeString(test), reason))
585
587
def report_unsupported(self, test, feature):
586
588
"""test cannot be run because feature is missing."""
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
589
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
588
590
%(self._testTimeString(test), feature))
619
621
encode = codec.encode
620
622
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
621
623
stream.encoding = new_encoding
622
self.stream = unittest._WritelnDecorator(stream)
623
625
self.descriptions = descriptions
624
626
self.verbosity = verbosity
625
627
self._bench_history = bench_history
749
751
# XXX: Should probably unify more with CannedInputUIFactory or a
750
752
# particular configuration of TextUIFactory, or otherwise have a clearer
751
753
# idea of how they're supposed to be different.
752
# See https://bugs.edge.launchpad.net/bzr/+bug/408213
754
# See https://bugs.launchpad.net/bzr/+bug/408213
754
756
def __init__(self, stdout=None, stderr=None, stdin=None):
755
757
if stdin is not None:
1028
1030
self.addCleanup(transport_server.stop_server)
1029
1031
# Obtain a real transport because if the server supplies a password, it
1030
1032
# will be hidden from the base on the client side.
1031
t = get_transport(transport_server.get_url())
1033
t = _mod_transport.get_transport(transport_server.get_url())
1032
1034
# Some transport servers effectively chroot the backing transport;
1033
1035
# others like SFTPServer don't - users of the transport can walk up the
1034
1036
# transport to read the entire backing transport. This wouldn't matter
2385
2389
# might be a relative or absolute path
2386
2390
maybe_a_url = self.get_url(relpath)
2387
2391
segments = maybe_a_url.rsplit('/', 1)
2388
t = get_transport(maybe_a_url)
2392
t = _mod_transport.get_transport(maybe_a_url)
2389
2393
if len(segments) > 1 and segments[-1] not in ('', '.'):
2390
2394
t.ensure_base()
2391
2395
if format is None:
2408
2412
made_control = self.make_bzrdir(relpath, format=format)
2409
2413
return made_control.create_repository(shared=shared)
2411
def make_smart_server(self, path):
2415
def make_smart_server(self, path, backing_server=None):
2416
if backing_server is None:
2417
backing_server = self.get_server()
2412
2418
smart_server = test_server.SmartTCPServer_for_testing()
2413
self.start_server(smart_server, self.get_server())
2414
remote_transport = get_transport(smart_server.get_url()).clone(path)
2419
self.start_server(smart_server, backing_server)
2420
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2415
2422
return remote_transport
2417
2424
def make_branch_and_memory_tree(self, relpath, format=None):
2562
2573
"a list or a tuple. Got %r instead" % (shape,))
2563
2574
# It's OK to just create them using forward slashes on windows.
2564
2575
if transport is None or transport.is_readonly():
2565
transport = get_transport(".")
2576
transport = _mod_transport.get_transport(".")
2566
2577
for name in shape:
2567
2578
self.assertIsInstance(name, basestring)
2568
2579
if name[-1] == '/':
2578
2589
content = "contents of %s%s" % (name.encode('utf-8'), end)
2579
2590
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2581
def build_tree_contents(self, shape):
2582
build_tree_contents(shape)
2592
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2584
2594
def assertInWorkingTree(self, path, root_path='.', tree=None):
2585
2595
"""Assert whether path or paths are in the WorkingTree"""
3191
3200
def partition_tests(suite, count):
3192
3201
"""Partition suite into count lists of tests."""
3194
tests = list(iter_suite_tests(suite))
3195
tests_per_process = int(math.ceil(float(len(tests)) / count))
3196
for block in range(count):
3197
low_test = block * tests_per_process
3198
high_test = low_test + tests_per_process
3199
process_tests = tests[low_test:high_test]
3200
result.append(process_tests)
3202
# This just assigns tests in a round-robin fashion. On one hand this
3203
# splits up blocks of related tests that might run faster if they shared
3204
# resources, but on the other it avoids assigning blocks of slow tests to
3205
# just one partition. So the slowest partition shouldn't be much slower
3207
partitions = [list() for i in range(count)]
3208
tests = iter_suite_tests(suite)
3209
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3210
partition.append(test)
3204
3214
def workaround_zealous_crypto_random():
3311
3321
if '--no-plugins' in sys.argv:
3312
3322
argv.append('--no-plugins')
3313
# stderr=STDOUT would be ideal, but until we prevent noise on
3314
# stderr it can interrupt the subunit protocol.
3315
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3323
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3324
# noise on stderr it can interrupt the subunit protocol.
3325
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3326
stdout=subprocess.PIPE,
3327
stderr=subprocess.PIPE,
3317
3329
test = TestInSubprocess(process, test_list_file_name)
3318
3330
result.append(test)
3633
3648
'bzrlib.tests.blackbox',
3634
3649
'bzrlib.tests.commands',
3650
'bzrlib.tests.doc_generate',
3635
3651
'bzrlib.tests.per_branch',
3636
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3652
'bzrlib.tests.per_controldir',
3653
'bzrlib.tests.per_controldir_colo',
3638
3654
'bzrlib.tests.per_foreign_vcs',
3639
3655
'bzrlib.tests.per_interrepository',
3640
3656
'bzrlib.tests.per_intertree',
3701
3717
'bzrlib.tests.test_export',
3702
3718
'bzrlib.tests.test_extract',
3703
3719
'bzrlib.tests.test_fetch',
3720
'bzrlib.tests.test_fixtures',
3704
3721
'bzrlib.tests.test_fifo_cache',
3705
3722
'bzrlib.tests.test_filters',
3706
3723
'bzrlib.tests.test_ftp_transport',
3727
3744
'bzrlib.tests.test_knit',
3728
3745
'bzrlib.tests.test_lazy_import',
3729
3746
'bzrlib.tests.test_lazy_regex',
3747
'bzrlib.tests.test_library_state',
3730
3748
'bzrlib.tests.test_lock',
3731
3749
'bzrlib.tests.test_lockable_files',
3732
3750
'bzrlib.tests.test_lockdir',
3800
3818
'bzrlib.tests.test_transport_log',
3801
3819
'bzrlib.tests.test_tree',
3802
3820
'bzrlib.tests.test_treebuilder',
3821
'bzrlib.tests.test_treeshape',
3803
3822
'bzrlib.tests.test_tsort',
3804
3823
'bzrlib.tests.test_tuned_gzip',
3805
3824
'bzrlib.tests.test_ui',
3809
3828
'bzrlib.tests.test_urlutils',
3810
3829
'bzrlib.tests.test_version',
3811
3830
'bzrlib.tests.test_version_info',
3831
'bzrlib.tests.test_versionedfile',
3812
3832
'bzrlib.tests.test_weave',
3813
3833
'bzrlib.tests.test_whitebox',
3814
3834
'bzrlib.tests.test_win32utils',
4102
4123
if test_id != None:
4103
4124
ui.ui_factory.clear_term()
4104
4125
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4126
# Ugly, but the last thing we want here is fail, so bear with it.
4127
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4128
).encode('ascii', 'replace')
4105
4129
sys.stderr.write('Unable to remove testing dir %s\n%s'
4106
% (os.path.basename(dirname), e))
4130
% (os.path.basename(dirname), printable_e))
4109
4133
class Feature(object):