104
106
import bzrlib.trace
105
107
from bzrlib.transport import (
110
import bzrlib.transport
111
111
from bzrlib.trace import mutter, note
112
112
from bzrlib.tests import (
116
from bzrlib.tests.http_server import HttpServer
117
from bzrlib.tests.TestUtil import (
121
from bzrlib.tests.treeshape import build_tree_contents
122
117
from bzrlib.ui import NullProgressView
123
118
from bzrlib.ui.text import TextUIFactory
124
119
import bzrlib.version_info_formats.format_custom
140
135
SUBUNIT_SEEK_SET = 0
141
136
SUBUNIT_SEEK_CUR = 1
138
# These are intentionally brought into this namespace. That way plugins, etc
139
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
140
TestSuite = TestUtil.TestSuite
141
TestLoader = TestUtil.TestLoader
144
class ExtendedTestResult(unittest._TextTestResult):
143
class ExtendedTestResult(testtools.TextTestResult):
145
144
"""Accepts, reports and accumulates the results of running tests.
147
146
Compared to the unittest version this class adds support for
168
167
:param bench_history: Optionally, a writable file object to accumulate
169
168
benchmark results.
171
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
170
testtools.TextTestResult.__init__(self, stream)
172
171
if bench_history is not None:
173
172
from bzrlib.version import _get_bzr_source_tree
174
173
src_tree = _get_bzr_source_tree()
201
200
actionTaken = "Ran"
202
201
stopTime = time.time()
203
202
timeTaken = stopTime - self.startTime
205
self.stream.writeln(self.separator2)
206
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
203
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
204
# the parent class method is similar have to duplicate
205
self._show_list('ERROR', self.errors)
206
self._show_list('FAIL', self.failures)
207
self.stream.write(self.sep2)
208
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
207
209
run, run != 1 and "s" or "", timeTaken))
208
self.stream.writeln()
209
210
if not self.wasSuccessful():
210
211
self.stream.write("FAILED (")
211
212
failed, errored = map(len, (self.failures, self.errors))
218
219
if failed or errored: self.stream.write(", ")
219
220
self.stream.write("known_failure_count=%d" %
220
221
self.known_failure_count)
221
self.stream.writeln(")")
222
self.stream.write(")\n")
223
224
if self.known_failure_count:
224
self.stream.writeln("OK (known_failures=%d)" %
225
self.stream.write("OK (known_failures=%d)\n" %
225
226
self.known_failure_count)
227
self.stream.writeln("OK")
228
self.stream.write("OK\n")
228
229
if self.skip_count > 0:
229
230
skipped = self.skip_count
230
self.stream.writeln('%d test%s skipped' %
231
self.stream.write('%d test%s skipped\n' %
231
232
(skipped, skipped != 1 and "s" or ""))
232
233
if self.unsupported:
233
234
for feature, count in sorted(self.unsupported.items()):
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
235
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
235
236
(feature, count))
237
238
ok = self.wasStrictlySuccessful()
276
277
def _shortened_test_description(self, test):
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
279
what = re.sub(r'^bzrlib\.tests\.', '', what)
281
282
def startTest(self, test):
282
unittest.TestResult.startTest(self, test)
283
super(ExtendedTestResult, self).startTest(test)
283
284
if self.count == 0:
284
285
self.startTests()
285
286
self.report_test_start(test)
358
359
self.report_success(test)
359
360
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
361
super(ExtendedTestResult, self).addSuccess(test)
361
362
test._log_contents = ''
363
364
def addExpectedFailure(self, test, err):
551
552
return '%s%s' % (indent, err[1])
553
554
def report_error(self, test, err):
554
self.stream.writeln('ERROR %s\n%s'
555
self.stream.write('ERROR %s\n%s\n'
555
556
% (self._testTimeString(test),
556
557
self._error_summary(err)))
558
559
def report_failure(self, test, err):
559
self.stream.writeln(' FAIL %s\n%s'
560
self.stream.write(' FAIL %s\n%s\n'
560
561
% (self._testTimeString(test),
561
562
self._error_summary(err)))
563
564
def report_known_failure(self, test, err):
564
self.stream.writeln('XFAIL %s\n%s'
565
self.stream.write('XFAIL %s\n%s\n'
565
566
% (self._testTimeString(test),
566
567
self._error_summary(err)))
568
569
def report_success(self, test):
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
570
self.stream.write(' OK %s\n' % self._testTimeString(test))
570
571
for bench_called, stats in getattr(test, '_benchcalls', []):
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
572
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
572
573
stats.pprint(file=self.stream)
573
574
# flush the stream so that we get smooth output. This verbose mode is
574
575
# used to show the output in PQM.
575
576
self.stream.flush()
577
578
def report_skip(self, test, reason):
578
self.stream.writeln(' SKIP %s\n%s'
579
self.stream.write(' SKIP %s\n%s\n'
579
580
% (self._testTimeString(test), reason))
581
582
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
583
self.stream.write(' N/A %s\n %s\n'
583
584
% (self._testTimeString(test), reason))
585
586
def report_unsupported(self, test, feature):
586
587
"""test cannot be run because feature is missing."""
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
588
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
588
589
%(self._testTimeString(test), feature))
619
620
encode = codec.encode
620
621
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
621
622
stream.encoding = new_encoding
622
self.stream = unittest._WritelnDecorator(stream)
623
624
self.descriptions = descriptions
624
625
self.verbosity = verbosity
625
626
self._bench_history = bench_history
749
750
# XXX: Should probably unify more with CannedInputUIFactory or a
750
751
# particular configuration of TextUIFactory, or otherwise have a clearer
751
752
# idea of how they're supposed to be different.
752
# See https://bugs.edge.launchpad.net/bzr/+bug/408213
753
# See https://bugs.launchpad.net/bzr/+bug/408213
754
755
def __init__(self, stdout=None, stderr=None, stdin=None):
755
756
if stdin is not None:
846
847
# going away but leak one) but it seems less likely than the actual
847
848
# false positives (the test see threads going away and does not leak).
848
849
if leaked_threads > 0:
850
if 'threads' in selftest_debug_flags:
851
print '%s is leaking, active is now %d' % (self.id(), active)
849
852
TestCase._leaking_threads_tests += 1
850
853
if TestCase._first_thread_leaker_id is None:
851
854
TestCase._first_thread_leaker_id = self.id()
1028
1031
self.addCleanup(transport_server.stop_server)
1029
1032
# Obtain a real transport because if the server supplies a password, it
1030
1033
# will be hidden from the base on the client side.
1031
t = get_transport(transport_server.get_url())
1034
t = _mod_transport.get_transport(transport_server.get_url())
1032
1035
# Some transport servers effectively chroot the backing transport;
1033
1036
# others like SFTPServer don't - users of the transport can walk up the
1034
1037
# transport to read the entire backing transport. This wouldn't matter
1456
1459
The file is removed as the test is torn down.
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1459
self._log_file = os.fdopen(fileno, 'w+')
1461
self._log_file = StringIO()
1460
1462
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1461
self._log_file_name = name
1462
1463
self.addCleanup(self._finishLogFile)
1464
1465
def _finishLogFile(self):
1666
1667
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
1668
self._log_contents = unicodestr.encode('utf8')
1668
1669
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1670
if self._log_file is not None:
1671
log_contents = self._log_file.getvalue()
1680
1673
log_contents.decode('utf8')
1681
1674
except UnicodeDecodeError:
1682
1675
unicodestr = log_contents.decode('utf8', 'replace')
1683
1676
log_contents = unicodestr.encode('utf8')
1684
1677
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
1678
self._log_file = None
1713
1679
# Permit multiple calls to get_log until we clean it up in
1714
1680
# finishLogFile
1715
1681
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1725
1682
return log_contents
1727
return "No log file content and no log file name."
1684
return "No log file content."
1729
1686
def get_log(self):
1730
1687
"""Get a unicode string containing the log from bzrlib.trace.
1945
1902
variables. A value of None will unset the env variable.
1946
1903
The values must be strings. The change will only occur in the
1947
1904
child, so you don't need to fix the environment after running.
1948
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1905
:param skip_if_plan_to_signal: raise TestSkipped when true and system
1906
doesn't support signalling subprocesses.
1950
1907
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1952
1909
:returns: Popen object for the started process.
1954
1911
if skip_if_plan_to_signal:
1955
if not getattr(os, 'kill', None):
1956
raise TestSkipped("os.kill not available.")
1912
if os.name != "posix":
1913
raise TestSkipped("Sending signals not supported")
1958
1915
if env_changes is None:
1959
1916
env_changes = {}
2385
2344
# might be a relative or absolute path
2386
2345
maybe_a_url = self.get_url(relpath)
2387
2346
segments = maybe_a_url.rsplit('/', 1)
2388
t = get_transport(maybe_a_url)
2347
t = _mod_transport.get_transport(maybe_a_url)
2389
2348
if len(segments) > 1 and segments[-1] not in ('', '.'):
2390
2349
t.ensure_base()
2391
2350
if format is None:
2408
2367
made_control = self.make_bzrdir(relpath, format=format)
2409
2368
return made_control.create_repository(shared=shared)
2411
def make_smart_server(self, path):
2370
def make_smart_server(self, path, backing_server=None):
2371
if backing_server is None:
2372
backing_server = self.get_server()
2412
2373
smart_server = test_server.SmartTCPServer_for_testing()
2413
self.start_server(smart_server, self.get_server())
2414
remote_transport = get_transport(smart_server.get_url()).clone(path)
2374
self.start_server(smart_server, backing_server)
2375
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2415
2377
return remote_transport
2417
2379
def make_branch_and_memory_tree(self, relpath, format=None):
2433
2395
def setUp(self):
2434
2396
super(TestCaseWithMemoryTransport, self).setUp()
2397
# Ensure that ConnectedTransport doesn't leak sockets
2398
def get_transport_with_cleanup(*args, **kwargs):
2399
t = orig_get_transport(*args, **kwargs)
2400
if isinstance(t, _mod_transport.ConnectedTransport):
2401
self.addCleanup(t.disconnect)
2404
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2405
get_transport_with_cleanup)
2435
2406
self._make_test_root()
2436
2407
self.addCleanup(os.chdir, os.getcwdu())
2437
2408
self.makeAndChdirToTestDir()
2562
2537
"a list or a tuple. Got %r instead" % (shape,))
2563
2538
# It's OK to just create them using forward slashes on windows.
2564
2539
if transport is None or transport.is_readonly():
2565
transport = get_transport(".")
2540
transport = _mod_transport.get_transport(".")
2566
2541
for name in shape:
2567
2542
self.assertIsInstance(name, basestring)
2568
2543
if name[-1] == '/':
2578
2553
content = "contents of %s%s" % (name.encode('utf-8'), end)
2579
2554
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2581
def build_tree_contents(self, shape):
2582
build_tree_contents(shape)
2556
build_tree_contents = staticmethod(treeshape.build_tree_contents)
2584
2558
def assertInWorkingTree(self, path, root_path='.', tree=None):
2585
2559
"""Assert whether path or paths are in the WorkingTree"""
2728
2702
def setUp(self):
2703
from bzrlib.tests import http_server
2729
2704
super(ChrootedTestCase, self).setUp()
2730
2705
if not self.vfs_transport_factory == memory.MemoryServer:
2731
self.transport_readonly_server = HttpServer
2706
self.transport_readonly_server = http_server.HttpServer
2734
2709
def condition_id_re(pattern):
3191
3165
def partition_tests(suite, count):
3192
3166
"""Partition suite into count lists of tests."""
3194
tests = list(iter_suite_tests(suite))
3195
tests_per_process = int(math.ceil(float(len(tests)) / count))
3196
for block in range(count):
3197
low_test = block * tests_per_process
3198
high_test = low_test + tests_per_process
3199
process_tests = tests[low_test:high_test]
3200
result.append(process_tests)
3167
# This just assigns tests in a round-robin fashion. On one hand this
3168
# splits up blocks of related tests that might run faster if they shared
3169
# resources, but on the other it avoids assigning blocks of slow tests to
3170
# just one partition. So the slowest partition shouldn't be much slower
3172
partitions = [list() for i in range(count)]
3173
tests = iter_suite_tests(suite)
3174
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3175
partition.append(test)
3204
3179
def workaround_zealous_crypto_random():
3239
3214
test_blocks = partition_tests(suite, concurrency)
3240
3215
for process_tests in test_blocks:
3241
process_suite = TestSuite()
3216
process_suite = TestUtil.TestSuite()
3242
3217
process_suite.addTests(process_tests)
3243
3218
c2pread, c2pwrite = os.pipe()
3244
3219
pid = os.fork()
3311
3286
if '--no-plugins' in sys.argv:
3312
3287
argv.append('--no-plugins')
3313
# stderr=STDOUT would be ideal, but until we prevent noise on
3314
# stderr it can interrupt the subunit protocol.
3315
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3288
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3289
# noise on stderr it can interrupt the subunit protocol.
3290
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3291
stdout=subprocess.PIPE,
3292
stderr=subprocess.PIPE,
3317
3294
test = TestInSubprocess(process, test_list_file_name)
3318
3295
result.append(test)
3633
3615
'bzrlib.tests.blackbox',
3634
3616
'bzrlib.tests.commands',
3617
'bzrlib.tests.doc_generate',
3635
3618
'bzrlib.tests.per_branch',
3636
'bzrlib.tests.per_bzrdir',
3637
'bzrlib.tests.per_bzrdir_colo',
3619
'bzrlib.tests.per_controldir',
3620
'bzrlib.tests.per_controldir_colo',
3638
3621
'bzrlib.tests.per_foreign_vcs',
3639
3622
'bzrlib.tests.per_interrepository',
3640
3623
'bzrlib.tests.per_intertree',
3653
3636
'bzrlib.tests.per_workingtree',
3654
3637
'bzrlib.tests.test__annotator',
3655
3638
'bzrlib.tests.test__bencode',
3639
'bzrlib.tests.test__btree_serializer',
3656
3640
'bzrlib.tests.test__chk_map',
3657
3641
'bzrlib.tests.test__dirstate_helpers',
3658
3642
'bzrlib.tests.test__groupcompress',
3701
3685
'bzrlib.tests.test_export',
3702
3686
'bzrlib.tests.test_extract',
3703
3687
'bzrlib.tests.test_fetch',
3688
'bzrlib.tests.test_fixtures',
3704
3689
'bzrlib.tests.test_fifo_cache',
3705
3690
'bzrlib.tests.test_filters',
3706
3691
'bzrlib.tests.test_ftp_transport',
3727
3712
'bzrlib.tests.test_knit',
3728
3713
'bzrlib.tests.test_lazy_import',
3729
3714
'bzrlib.tests.test_lazy_regex',
3715
'bzrlib.tests.test_library_state',
3730
3716
'bzrlib.tests.test_lock',
3731
3717
'bzrlib.tests.test_lockable_files',
3732
3718
'bzrlib.tests.test_lockdir',
3789
3775
'bzrlib.tests.test_switch',
3790
3776
'bzrlib.tests.test_symbol_versioning',
3791
3777
'bzrlib.tests.test_tag',
3778
'bzrlib.tests.test_test_server',
3792
3779
'bzrlib.tests.test_testament',
3793
3780
'bzrlib.tests.test_textfile',
3794
3781
'bzrlib.tests.test_textmerge',
3800
3787
'bzrlib.tests.test_transport_log',
3801
3788
'bzrlib.tests.test_tree',
3802
3789
'bzrlib.tests.test_treebuilder',
3790
'bzrlib.tests.test_treeshape',
3803
3791
'bzrlib.tests.test_tsort',
3804
3792
'bzrlib.tests.test_tuned_gzip',
3805
3793
'bzrlib.tests.test_ui',
3809
3797
'bzrlib.tests.test_urlutils',
3810
3798
'bzrlib.tests.test_version',
3811
3799
'bzrlib.tests.test_version_info',
3800
'bzrlib.tests.test_versionedfile',
3812
3801
'bzrlib.tests.test_weave',
3813
3802
'bzrlib.tests.test_whitebox',
3814
3803
'bzrlib.tests.test_win32utils',
3982
3972
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3983
3973
... [('one', dict(param=1)),
3984
3974
... ('two', dict(param=2))],
3975
... TestUtil.TestSuite())
3986
3976
>>> tests = list(iter_suite_tests(r))
4102
4092
if test_id != None:
4103
4093
ui.ui_factory.clear_term()
4104
4094
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4095
# Ugly, but the last thing we want here is fail, so bear with it.
4096
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4097
).encode('ascii', 'replace')
4105
4098
sys.stderr.write('Unable to remove testing dir %s\n%s'
4106
% (os.path.basename(dirname), e))
4099
% (os.path.basename(dirname), printable_e))
4109
4102
class Feature(object):