88
98
from bzrlib.symbol_versioning import (
89
99
DEPRECATED_PARAMETER,
90
100
deprecated_function,
91
102
deprecated_method,
92
103
deprecated_passed,
94
105
import bzrlib.trace
95
from bzrlib.transport import get_transport, pathfilter
96
import bzrlib.transport
97
from bzrlib.transport.local import LocalURLServer
98
from bzrlib.transport.memory import MemoryServer
99
from bzrlib.transport.readonly import ReadonlyServer
106
from bzrlib.transport import (
100
110
from bzrlib.trace import mutter, note
101
from bzrlib.tests import TestUtil
102
from bzrlib.tests.http_server import HttpServer
103
from bzrlib.tests.TestUtil import (
107
from bzrlib.tests.treeshape import build_tree_contents
111
from bzrlib.tests import (
108
116
from bzrlib.ui import NullProgressView
109
117
from bzrlib.ui.text import TextUIFactory
110
118
import bzrlib.version_info_formats.format_custom
115
123
# shown frame is the test code, not our assertXYZ.
118
default_transport = LocalURLServer
126
default_transport = test_server.LocalURLServer
129
_unitialized_attr = object()
130
"""A sentinel needed to act as a default value in a method signature."""
120
133
# Subunit result codes, defined here to prevent a hard dependency on subunit.
121
134
SUBUNIT_SEEK_SET = 0
122
135
SUBUNIT_SEEK_CUR = 1
137
# These are intentionally brought into this namespace. That way plugins, etc
138
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
139
TestSuite = TestUtil.TestSuite
140
TestLoader = TestUtil.TestLoader
125
class ExtendedTestResult(unittest._TextTestResult):
142
class ExtendedTestResult(testtools.TextTestResult):
126
143
"""Accepts, reports and accumulates the results of running tests.
128
145
Compared to the unittest version this class adds support for
177
194
self._overall_start_time = time.time()
178
195
self._strict = strict
196
self._first_thread_leaker_id = None
197
self._tests_leaking_threads_count = 0
180
199
def stopTestRun(self):
181
200
run = self.testsRun
182
201
actionTaken = "Ran"
183
202
stopTime = time.time()
184
203
timeTaken = stopTime - self.startTime
186
self.stream.writeln(self.separator2)
187
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
204
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
205
# the parent class method is similar have to duplicate
206
self._show_list('ERROR', self.errors)
207
self._show_list('FAIL', self.failures)
208
self.stream.write(self.sep2)
209
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
188
210
run, run != 1 and "s" or "", timeTaken))
189
self.stream.writeln()
190
211
if not self.wasSuccessful():
191
212
self.stream.write("FAILED (")
192
213
failed, errored = map(len, (self.failures, self.errors))
199
220
if failed or errored: self.stream.write(", ")
200
221
self.stream.write("known_failure_count=%d" %
201
222
self.known_failure_count)
202
self.stream.writeln(")")
223
self.stream.write(")\n")
204
225
if self.known_failure_count:
205
self.stream.writeln("OK (known_failures=%d)" %
226
self.stream.write("OK (known_failures=%d)\n" %
206
227
self.known_failure_count)
208
self.stream.writeln("OK")
229
self.stream.write("OK\n")
209
230
if self.skip_count > 0:
210
231
skipped = self.skip_count
211
self.stream.writeln('%d test%s skipped' %
232
self.stream.write('%d test%s skipped\n' %
212
233
(skipped, skipped != 1 and "s" or ""))
213
234
if self.unsupported:
214
235
for feature, count in sorted(self.unsupported.items()):
215
self.stream.writeln("Missing feature '%s' skipped %d tests." %
236
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
216
237
(feature, count))
218
239
ok = self.wasStrictlySuccessful()
220
241
ok = self.wasSuccessful()
221
if TestCase._first_thread_leaker_id:
242
if self._first_thread_leaker_id:
222
243
self.stream.write(
223
244
'%s is leaking threads among %d leaking tests.\n' % (
224
TestCase._first_thread_leaker_id,
225
TestCase._leaking_threads_tests))
245
self._first_thread_leaker_id,
246
self._tests_leaking_threads_count))
226
247
# We don't report the main thread as an active one.
227
248
self.stream.write(
228
249
'%d non-main threads were left active in the end.\n'
229
% (TestCase._active_threads - 1))
231
def _extractBenchmarkTime(self, testCase):
250
% (len(self._active_threads) - 1))
252
def getDescription(self, test):
255
def _extractBenchmarkTime(self, testCase, details=None):
232
256
"""Add a benchmark time for the current test case."""
257
if details and 'benchtime' in details:
258
return float(''.join(details['benchtime'].iter_bytes()))
233
259
return getattr(testCase, "_benchtime", None)
235
261
def _elapsedTestTimeString(self):
252
278
def _shortened_test_description(self, test):
254
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
280
what = re.sub(r'^bzrlib\.tests\.', '', what)
257
283
def startTest(self, test):
258
unittest.TestResult.startTest(self, test)
284
super(ExtendedTestResult, self).startTest(test)
259
285
if self.count == 0:
260
286
self.startTests()
261
288
self.report_test_start(test)
262
289
test.number = self.count
263
290
self._recordTestStartTime()
291
# Only check for thread leaks if the test case supports cleanups
292
addCleanup = getattr(test, "addCleanup", None)
293
if addCleanup is not None:
294
addCleanup(self._check_leaked_threads, test)
265
296
def startTests(self):
267
if getattr(sys, 'frozen', None) is None:
268
bzr_path = osutils.realpath(sys.argv[0])
270
bzr_path = sys.executable
272
'testing: %s\n' % (bzr_path,))
275
bzrlib.__path__[0],))
277
' bzr-%s python-%s %s\n' % (
278
bzrlib.version_string,
279
bzrlib._format_version_tuple(sys.version_info),
280
platform.platform(aliased=1),
282
self.stream.write('\n')
297
self.report_tests_starting()
298
self._active_threads = threading.enumerate()
300
def _check_leaked_threads(self, test):
301
"""See if any threads have leaked since last call
303
A sample of live threads is stored in the _active_threads attribute,
304
when this method runs it compares the current live threads and any not
305
in the previous sample are treated as having leaked.
307
now_active_threads = set(threading.enumerate())
308
threads_leaked = now_active_threads.difference(self._active_threads)
310
self._report_thread_leak(test, threads_leaked, now_active_threads)
311
self._tests_leaking_threads_count += 1
312
if self._first_thread_leaker_id is None:
313
self._first_thread_leaker_id = test.id()
314
self._active_threads = now_active_threads
284
316
def _recordTestStartTime(self):
285
317
"""Record that a test has started."""
286
318
self._start_time = time.time()
288
def _cleanupLogFile(self, test):
289
# We can only do this if we have one of our TestCases, not if
291
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
292
if setKeepLogfile is not None:
295
320
def addError(self, test, err):
296
321
"""Tell result that test finished with an error.
313
337
fails because e.g. an assert() method failed.
315
339
self._post_mortem()
316
unittest.TestResult.addFailure(self, test, err)
340
super(ExtendedTestResult, self).addFailure(test, err)
317
341
self.failure_count += 1
318
342
self.report_failure(test, err)
319
343
if self.stop_early:
321
self._cleanupLogFile(test)
323
def addSuccess(self, test):
346
def addSuccess(self, test, details=None):
324
347
"""Tell result that test completed successfully.
326
349
Called from the TestCase run()
328
351
if self._bench_history is not None:
329
benchmark_time = self._extractBenchmarkTime(test)
352
benchmark_time = self._extractBenchmarkTime(test, details)
330
353
if benchmark_time is not None:
331
354
self._bench_history.write("%s %s\n" % (
332
355
self._formatTime(benchmark_time),
334
357
self.report_success(test)
335
self._cleanupLogFile(test)
336
unittest.TestResult.addSuccess(self, test)
358
super(ExtendedTestResult, self).addSuccess(test)
337
359
test._log_contents = ''
339
361
def addExpectedFailure(self, test, err):
362
384
self.not_applicable_count += 1
363
385
self.report_not_applicable(test, reason)
365
def printErrorList(self, flavour, errors):
366
for test, err in errors:
367
self.stream.writeln(self.separator1)
368
self.stream.write("%s: " % flavour)
369
self.stream.writeln(self.getDescription(test))
370
if getattr(test, '_get_log', None) is not None:
371
log_contents = test._get_log()
373
self.stream.write('\n')
375
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
376
self.stream.write('\n')
377
self.stream.write(log_contents)
378
self.stream.write('\n')
380
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
381
self.stream.write('\n')
382
self.stream.writeln(self.separator2)
383
self.stream.writeln("%s" % err)
385
387
def _post_mortem(self):
386
388
"""Start a PDB post mortem session."""
387
389
if os.environ.get('BZR_TEST_PDB', None):
397
399
raise errors.BzrError("Unknown whence %r" % whence)
399
def report_cleaning_up(self):
401
def report_tests_starting(self):
402
"""Display information before the test run begins"""
403
if getattr(sys, 'frozen', None) is None:
404
bzr_path = osutils.realpath(sys.argv[0])
406
bzr_path = sys.executable
408
'bzr selftest: %s\n' % (bzr_path,))
411
bzrlib.__path__[0],))
413
' bzr-%s python-%s %s\n' % (
414
bzrlib.version_string,
415
bzrlib._format_version_tuple(sys.version_info),
416
platform.platform(aliased=1),
418
self.stream.write('\n')
420
def report_test_start(self, test):
421
"""Display information on the test just about to be run"""
423
def _report_thread_leak(self, test, leaked_threads, active_threads):
424
"""Display information on a test that leaked one or more threads"""
425
# GZ 2010-09-09: A leak summary reported separately from the general
426
# thread debugging would be nice. Tests under subunit
427
# need something not using stream, perhaps adding a
428
# testtools details object would be fitting.
429
if 'threads' in selftest_debug_flags:
430
self.stream.write('%s is leaking, active is now %d\n' %
431
(test.id(), len(active_threads)))
402
433
def startTestRun(self):
403
434
self.startTime = time.time()
487
511
return self._shortened_test_description(test)
489
513
def report_error(self, test, err):
490
ui.ui_factory.note('ERROR: %s\n %s\n' % (
514
self.stream.write('ERROR: %s\n %s\n' % (
491
515
self._test_description(test),
495
519
def report_failure(self, test, err):
496
ui.ui_factory.note('FAIL: %s\n %s\n' % (
520
self.stream.write('FAIL: %s\n %s\n' % (
497
521
self._test_description(test),
501
525
def report_known_failure(self, test, err):
502
ui.ui_factory.note('XFAIL: %s\n%s\n' % (
503
self._test_description(test), err[1]))
505
528
def report_skip(self, test, reason):
549
568
return '%s%s' % (indent, err[1])
551
570
def report_error(self, test, err):
552
self.stream.writeln('ERROR %s\n%s'
571
self.stream.write('ERROR %s\n%s\n'
553
572
% (self._testTimeString(test),
554
573
self._error_summary(err)))
556
575
def report_failure(self, test, err):
557
self.stream.writeln(' FAIL %s\n%s'
576
self.stream.write(' FAIL %s\n%s\n'
558
577
% (self._testTimeString(test),
559
578
self._error_summary(err)))
561
580
def report_known_failure(self, test, err):
562
self.stream.writeln('XFAIL %s\n%s'
581
self.stream.write('XFAIL %s\n%s\n'
563
582
% (self._testTimeString(test),
564
583
self._error_summary(err)))
566
585
def report_success(self, test):
567
self.stream.writeln(' OK %s' % self._testTimeString(test))
586
self.stream.write(' OK %s\n' % self._testTimeString(test))
568
587
for bench_called, stats in getattr(test, '_benchcalls', []):
569
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
588
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
570
589
stats.pprint(file=self.stream)
571
590
# flush the stream so that we get smooth output. This verbose mode is
572
591
# used to show the output in PQM.
573
592
self.stream.flush()
575
594
def report_skip(self, test, reason):
576
self.stream.writeln(' SKIP %s\n%s'
595
self.stream.write(' SKIP %s\n%s\n'
577
596
% (self._testTimeString(test), reason))
579
598
def report_not_applicable(self, test, reason):
580
self.stream.writeln(' N/A %s\n %s'
599
self.stream.write(' N/A %s\n %s\n'
581
600
% (self._testTimeString(test), reason))
583
602
def report_unsupported(self, test, feature):
584
603
"""test cannot be run because feature is missing."""
585
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
604
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
586
605
%(self._testTimeString(test), feature))
676
class KnownFailure(AssertionError):
677
"""Indicates that a test failed in a precisely expected manner.
679
Such failures dont block the whole test suite from passing because they are
680
indicators of partially completed code or of future work. We have an
681
explicit error for them so that we can ensure that they are always visible:
682
KnownFailures are always shown in the output of bzr selftest.
699
# traceback._some_str fails to format exceptions that have the default
700
# __str__ which does an implicit ascii conversion. However, repr() on those
701
# objects works, for all that its not quite what the doctor may have ordered.
702
def _clever_some_str(value):
707
return repr(value).replace('\\n', '\n')
709
return '<unprintable %s object>' % type(value).__name__
711
traceback._some_str = _clever_some_str
714
# deprecated - use self.knownFailure(), or self.expectFailure.
715
KnownFailure = testtools.testcase._ExpectedFailure
686
718
class UnavailableFeature(Exception):
778
806
routine, and to build and check bzr trees.
780
808
In addition to the usual method of overriding tearDown(), this class also
781
allows subclasses to register functions into the _cleanups list, which is
809
allows subclasses to register cleanup functions via addCleanup, which are
782
810
run in order as the object is torn down. It's less likely this will be
783
811
accidentally overlooked.
786
_active_threads = None
787
_leaking_threads_tests = 0
788
_first_thread_leaker_id = None
789
_log_file_name = None
791
_keep_log_file = False
792
815
# record lsprof data when performing benchmark calls.
793
816
_gather_lsprof_in_benchmarks = False
794
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
795
'_log_contents', '_log_file_name', '_benchtime',
796
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
798
818
def __init__(self, methodName='testMethod'):
799
819
super(TestCase, self).__init__(methodName)
801
self._bzr_test_setUp_run = False
802
self._bzr_test_tearDown_run = False
803
820
self._directory_isolation = True
821
self.exception_handlers.insert(0,
822
(UnavailableFeature, self._do_unsupported_or_skip))
823
self.exception_handlers.insert(0,
824
(TestNotApplicable, self._do_not_applicable))
806
unittest.TestCase.setUp(self)
807
self._bzr_test_setUp_run = True
827
super(TestCase, self).setUp()
828
for feature in getattr(self, '_test_needs_features', []):
829
self.requireFeature(feature)
830
self._log_contents = None
831
self.addDetail("log", content.Content(content.ContentType("text",
832
"plain", {"charset": "utf8"}),
833
lambda:[self._get_log(keep_log_file=True)]))
808
834
self._cleanEnvironment()
809
835
self._silenceUI()
810
836
self._startLogFile()
814
840
self._track_transports()
815
841
self._track_locks()
816
842
self._clear_debug_flags()
817
TestCase._active_threads = threading.activeCount()
818
self.addCleanup(self._check_leaked_threads)
821
845
# debug a frame up.
823
847
pdb.Pdb().set_trace(sys._getframe().f_back)
825
def _check_leaked_threads(self):
826
active = threading.activeCount()
827
leaked_threads = active - TestCase._active_threads
828
TestCase._active_threads = active
829
# If some tests make the number of threads *decrease*, we'll consider
830
# that they are just observing old threads dieing, not agressively kill
831
# random threads. So we don't report these tests as leaking. The risk
832
# is that we have false positives that way (the test see 2 threads
833
# going away but leak one) but it seems less likely than the actual
834
# false positives (the test see threads going away and does not leak).
835
if leaked_threads > 0:
836
TestCase._leaking_threads_tests += 1
837
if TestCase._first_thread_leaker_id is None:
838
TestCase._first_thread_leaker_id = self.id()
849
def discardDetail(self, name):
850
"""Extend the addDetail, getDetails api so we can remove a detail.
852
eg. bzr always adds the 'log' detail at startup, but we don't want to
853
include it for skipped, xfail, etc tests.
855
It is safe to call this for a detail that doesn't exist, in case this
856
gets called multiple times.
858
# We cheat. details is stored in __details which means we shouldn't
859
# touch it. but getDetails() returns the dict directly, so we can
861
details = self.getDetails()
840
865
def _clear_debug_flags(self):
841
866
"""Prevent externally set debug flags affecting tests.
1013
1034
server's urls to be used.
1015
1036
if backing_server is None:
1016
transport_server.setUp()
1037
transport_server.start_server()
1018
transport_server.setUp(backing_server)
1019
self.addCleanup(transport_server.tearDown)
1039
transport_server.start_server(backing_server)
1040
self.addCleanup(transport_server.stop_server)
1020
1041
# Obtain a real transport because if the server supplies a password, it
1021
1042
# will be hidden from the base on the client side.
1022
t = get_transport(transport_server.get_url())
1043
t = _mod_transport.get_transport(transport_server.get_url())
1023
1044
# Some transport servers effectively chroot the backing transport;
1024
1045
# others like SFTPServer don't - users of the transport can walk up the
1025
1046
# transport to read the entire backing transport. This wouldn't matter
1289
1315
m += ": " + msg
1292
def expectFailure(self, reason, assertion, *args, **kwargs):
1293
"""Invoke a test, expecting it to fail for the given reason.
1295
This is for assertions that ought to succeed, but currently fail.
1296
(The failure is *expected* but not *wanted*.) Please be very precise
1297
about the failure you're expecting. If a new bug is introduced,
1298
AssertionError should be raised, not KnownFailure.
1300
Frequently, expectFailure should be followed by an opposite assertion.
1303
Intended to be used with a callable that raises AssertionError as the
1304
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1306
Raises KnownFailure if the test fails. Raises AssertionError if the
1311
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1313
self.assertEqual(42, dynamic_val)
1315
This means that a dynamic_val of 54 will cause the test to raise
1316
a KnownFailure. Once math is fixed and the expectFailure is removed,
1317
only a dynamic_val of 42 will allow the test to pass. Anything other
1318
than 54 or 42 will cause an AssertionError.
1321
assertion(*args, **kwargs)
1322
except AssertionError:
1323
raise KnownFailure(reason)
1325
self.fail('Unexpected success. Should have failed: %s' % reason)
1327
1318
def assertFileEqual(self, content, path):
1328
1319
"""Fail if path does not contain 'content'."""
1329
1320
self.failUnlessExists(path)
1480
1477
Close the file and delete it, unless setKeepLogfile was called.
1482
if self._log_file is None:
1479
if bzrlib.trace._trace_file:
1480
# flush the log file, to get all content
1481
bzrlib.trace._trace_file.flush()
1484
1482
bzrlib.trace.pop_log_file(self._log_memento)
1485
self._log_file.close()
1486
self._log_file = None
1487
if not self._keep_log_file:
1488
os.remove(self._log_file_name)
1489
self._log_file_name = None
1491
def setKeepLogfile(self):
1492
"""Make the logfile not be deleted when _finishLogFile is called."""
1493
self._keep_log_file = True
1483
# Cache the log result and delete the file on disk
1484
self._get_log(False)
1495
1486
def thisFailsStrictLockCheck(self):
1496
1487
"""It is known that this test would fail with -Dstrict_locks.
1506
1497
debug.debug_flags.discard('strict_locks')
1508
def addCleanup(self, callable, *args, **kwargs):
1509
"""Arrange to run a callable when this case is torn down.
1511
Callables are run in the reverse of the order they are registered,
1512
ie last-in first-out.
1499
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1500
"""Overrides an object attribute restoring it after the test.
1502
:param obj: The object that will be mutated.
1504
:param attr_name: The attribute name we want to preserve/override in
1507
:param new: The optional value we want to set the attribute to.
1509
:returns: The actual attr value.
1514
self._cleanups.append((callable, args, kwargs))
1511
value = getattr(obj, attr_name)
1512
# The actual value is captured by the call below
1513
self.addCleanup(setattr, obj, attr_name, value)
1514
if new is not _unitialized_attr:
1515
setattr(obj, attr_name, new)
1516
1518
def _cleanEnvironment(self):
1554
1558
'ftp_proxy': None,
1555
1559
'FTP_PROXY': None,
1556
1560
'BZR_REMOTE_PATH': None,
1561
# Generally speaking, we don't want apport reporting on crashes in
1562
# the test envirnoment unless we're specifically testing apport,
1563
# so that it doesn't leak into the real system environment. We
1564
# use an env var so it propagates to subprocesses.
1565
'APPORT_DISABLE': '1',
1559
1568
self.addCleanup(self._restoreEnvironment)
1560
1569
for name, value in new_env.iteritems():
1561
1570
self._captureVar(name, value)
1563
1572
def _captureVar(self, name, newvalue):
1564
1573
"""Set an environment variable, and reset it when finished."""
1565
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1567
def _restore_debug_flags(self):
1568
debug.debug_flags.clear()
1569
debug.debug_flags.update(self._preserved_debug_flags)
1574
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1571
1576
def _restoreEnvironment(self):
1572
for name, value in self.__old_env.iteritems():
1577
for name, value in self._old_env.iteritems():
1573
1578
osutils.set_or_unset_env(name, value)
1575
1580
def _restoreHooks(self):
1580
1585
"""This test has failed for some known reason."""
1581
1586
raise KnownFailure(reason)
1588
def _suppress_log(self):
1589
"""Remove the log info from details."""
1590
self.discardDetail('log')
1583
1592
def _do_skip(self, result, reason):
1593
self._suppress_log()
1584
1594
addSkip = getattr(result, 'addSkip', None)
1585
1595
if not callable(addSkip):
1586
1596
result.addSuccess(result)
1588
1598
addSkip(self, reason)
1590
def _do_known_failure(self, result):
1601
def _do_known_failure(self, result, e):
1602
self._suppress_log()
1591
1603
err = sys.exc_info()
1592
1604
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1593
1605
if addExpectedFailure is not None:
1596
1608
result.addSuccess(self)
1598
1611
def _do_not_applicable(self, result, e):
1600
1613
reason = 'No reason given'
1602
1615
reason = e.args[0]
1616
self._suppress_log ()
1603
1617
addNotApplicable = getattr(result, 'addNotApplicable', None)
1604
1618
if addNotApplicable is not None:
1605
1619
result.addNotApplicable(self, reason)
1607
1621
self._do_skip(result, reason)
1609
def _do_unsupported_or_skip(self, result, reason):
1624
def _report_skip(self, result, err):
1625
"""Override the default _report_skip.
1627
We want to strip the 'log' detail. If we waint until _do_skip, it has
1628
already been formatted into the 'reason' string, and we can't pull it
1631
self._suppress_log()
1632
super(TestCase, self)._report_skip(self, result, err)
1635
def _report_expected_failure(self, result, err):
1638
See _report_skip for motivation.
1640
self._suppress_log()
1641
super(TestCase, self)._report_expected_failure(self, result, err)
1644
def _do_unsupported_or_skip(self, result, e):
1646
self._suppress_log()
1610
1647
addNotSupported = getattr(result, 'addNotSupported', None)
1611
1648
if addNotSupported is not None:
1612
1649
result.addNotSupported(self, reason)
1614
1651
self._do_skip(result, reason)
1616
def run(self, result=None):
1617
if result is None: result = self.defaultTestResult()
1618
result.startTest(self)
1623
result.stopTest(self)
1625
def _run(self, result):
1626
for feature in getattr(self, '_test_needs_features', []):
1627
if not feature.available():
1628
return self._do_unsupported_or_skip(result, feature)
1630
absent_attr = object()
1632
method_name = getattr(self, '_testMethodName', absent_attr)
1633
if method_name is absent_attr:
1635
method_name = getattr(self, '_TestCase__testMethodName')
1636
testMethod = getattr(self, method_name)
1640
if not self._bzr_test_setUp_run:
1642
"test setUp did not invoke "
1643
"bzrlib.tests.TestCase's setUp")
1644
except KeyboardInterrupt:
1647
except KnownFailure:
1648
self._do_known_failure(result)
1651
except TestNotApplicable, e:
1652
self._do_not_applicable(result, e)
1655
except TestSkipped, e:
1656
self._do_skip(result, e.args[0])
1659
except UnavailableFeature, e:
1660
self._do_unsupported_or_skip(result, e.args[0])
1664
result.addError(self, sys.exc_info())
1672
except KnownFailure:
1673
self._do_known_failure(result)
1674
except self.failureException:
1675
result.addFailure(self, sys.exc_info())
1676
except TestNotApplicable, e:
1677
self._do_not_applicable(result, e)
1678
except TestSkipped, e:
1680
reason = "No reason given."
1683
self._do_skip(result, reason)
1684
except UnavailableFeature, e:
1685
self._do_unsupported_or_skip(result, e.args[0])
1686
except KeyboardInterrupt:
1690
result.addError(self, sys.exc_info())
1694
if not self._bzr_test_tearDown_run:
1696
"test tearDown did not invoke "
1697
"bzrlib.tests.TestCase's tearDown")
1698
except KeyboardInterrupt:
1702
result.addError(self, sys.exc_info())
1705
if ok: result.addSuccess(self)
1707
except KeyboardInterrupt:
1712
for attr_name in self.attrs_to_keep:
1713
if attr_name in self.__dict__:
1714
saved_attrs[attr_name] = self.__dict__[attr_name]
1715
self.__dict__ = saved_attrs
1719
self._log_contents = ''
1720
self._bzr_test_tearDown_run = True
1721
unittest.TestCase.tearDown(self)
1723
1653
def time(self, callable, *args, **kwargs):
1724
1654
"""Run callable and accrue the time it takes to the benchmark time.
1743
1675
self._benchtime += time.time() - start
1745
def _runCleanups(self):
1746
"""Run registered cleanup functions.
1748
This should only be called from TestCase.tearDown.
1750
# TODO: Perhaps this should keep running cleanups even if
1751
# one of them fails?
1753
# Actually pop the cleanups from the list so tearDown running
1754
# twice is safe (this happens for skipped tests).
1755
while self._cleanups:
1756
cleanup, args, kwargs = self._cleanups.pop()
1757
cleanup(*args, **kwargs)
1759
1677
def log(self, *args):
1762
1680
def _get_log(self, keep_log_file=False):
1763
"""Get the log from bzrlib.trace calls from this test.
1681
"""Internal helper to get the log from bzrlib.trace for this test.
1683
Please use self.getDetails, or self.get_log to access this in test case
1765
1686
:param keep_log_file: When True, if the log is still a file on disk
1766
1687
leave it as a file on disk. When False, if the log is still a file
1768
1689
self._log_contents.
1769
1690
:return: A string containing the log.
1771
# flush the log file, to get all content
1773
if bzrlib.trace._trace_file:
1774
bzrlib.trace._trace_file.flush()
1775
if self._log_contents:
1776
# XXX: this can hardly contain the content flushed above --vila
1692
if self._log_contents is not None:
1694
self._log_contents.decode('utf8')
1695
except UnicodeDecodeError:
1696
unicodestr = self._log_contents.decode('utf8', 'replace')
1697
self._log_contents = unicodestr.encode('utf8')
1778
1698
return self._log_contents
1779
if self._log_file_name is not None:
1780
logfile = open(self._log_file_name)
1699
if self._log_file is not None:
1700
log_contents = self._log_file.getvalue()
1782
log_contents = logfile.read()
1702
log_contents.decode('utf8')
1703
except UnicodeDecodeError:
1704
unicodestr = log_contents.decode('utf8', 'replace')
1705
log_contents = unicodestr.encode('utf8')
1785
1706
if not keep_log_file:
1707
self._log_file = None
1708
# Permit multiple calls to get_log until we clean it up in
1786
1710
self._log_contents = log_contents
1788
os.remove(self._log_file_name)
1790
if sys.platform == 'win32' and e.errno == errno.EACCES:
1791
sys.stderr.write(('Unable to delete log file '
1792
' %r\n' % self._log_file_name))
1795
1711
return log_contents
1797
return "DELETED log file to reduce memory footprint"
1713
return "No log file content."
1716
"""Get a unicode string containing the log from bzrlib.trace.
1718
Undecodable characters are replaced.
1720
return u"".join(self.getDetails()['log'].iter_text())
1799
1722
def requireFeature(self, feature):
1800
1723
"""This test requires a specific feature is available.
2477
2396
made_control = self.make_bzrdir(relpath, format=format)
2478
2397
return made_control.create_repository(shared=shared)
2480
def make_smart_server(self, path):
2481
smart_server = server.SmartTCPServer_for_testing()
2482
self.start_server(smart_server, self.get_server())
2483
remote_transport = get_transport(smart_server.get_url()).clone(path)
2399
def make_smart_server(self, path, backing_server=None):
2400
if backing_server is None:
2401
backing_server = self.get_server()
2402
smart_server = test_server.SmartTCPServer_for_testing()
2403
self.start_server(smart_server, backing_server)
2404
remote_transport = _mod_transport.get_transport(smart_server.get_url()
2484
2406
return remote_transport
2486
2408
def make_branch_and_memory_tree(self, relpath, format=None):
2502
2424
def setUp(self):
2503
2425
super(TestCaseWithMemoryTransport, self).setUp()
2426
# Ensure that ConnectedTransport doesn't leak sockets
2427
def get_transport_with_cleanup(*args, **kwargs):
2428
t = orig_get_transport(*args, **kwargs)
2429
if isinstance(t, _mod_transport.ConnectedTransport):
2430
self.addCleanup(t.disconnect)
2433
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2434
get_transport_with_cleanup)
2504
2435
self._make_test_root()
2505
_currentdir = os.getcwdu()
2506
def _leaveDirectory():
2507
os.chdir(_currentdir)
2508
self.addCleanup(_leaveDirectory)
2436
self.addCleanup(os.chdir, os.getcwdu())
2509
2437
self.makeAndChdirToTestDir()
2510
2438
self.overrideEnvironmentForTesting()
2511
2439
self.__readonly_server = None
3263
3197
def partition_tests(suite, count):
3264
3198
"""Partition suite into count lists of tests."""
3266
tests = list(iter_suite_tests(suite))
3267
tests_per_process = int(math.ceil(float(len(tests)) / count))
3268
for block in range(count):
3269
low_test = block * tests_per_process
3270
high_test = low_test + tests_per_process
3271
process_tests = tests[low_test:high_test]
3272
result.append(process_tests)
3199
# This just assigns tests in a round-robin fashion. On one hand this
3200
# splits up blocks of related tests that might run faster if they shared
3201
# resources, but on the other it avoids assigning blocks of slow tests to
3202
# just one partition. So the slowest partition shouldn't be much slower
3204
partitions = [list() for i in range(count)]
3205
tests = iter_suite_tests(suite)
3206
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3207
partition.append(test)
3211
def workaround_zealous_crypto_random():
3212
"""Crypto.Random want to help us being secure, but we don't care here.
3214
This workaround some test failure related to the sftp server. Once paramiko
3215
stop using the controversial API in Crypto.Random, we may get rid of it.
3218
from Crypto.Random import atfork
3276
3224
def fork_for_tests(suite):
3409
3361
def addFailure(self, test, err):
3410
3362
self.result.addFailure(test, err)
3413
class BZRTransformingResult(ForwardingResult):
3415
def addError(self, test, err):
3416
feature = self._error_looks_like('UnavailableFeature: ', err)
3417
if feature is not None:
3418
self.result.addNotSupported(test, feature)
3420
self.result.addError(test, err)
3422
def addFailure(self, test, err):
3423
known = self._error_looks_like('KnownFailure: ', err)
3424
if known is not None:
3425
self.result.addExpectedFailure(test,
3426
[KnownFailure, KnownFailure(known), None])
3428
self.result.addFailure(test, err)
3430
def _error_looks_like(self, prefix, err):
3431
"""Deserialize exception and returns the stringify value."""
3435
if isinstance(exc, subunit.RemoteException):
3436
# stringify the exception gives access to the remote traceback
3437
# We search the last line for 'prefix'
3438
lines = str(exc).split('\n')
3439
while lines and not lines[-1]:
3442
if lines[-1].startswith(prefix):
3443
value = lines[-1][len(prefix):]
3448
from subunit.test_results import AutoTimingTestResultDecorator
3449
# Expected failure should be seen as a success not a failure Once subunit
3450
# provide native support for that, BZRTransformingResult and this class
3451
# will become useless.
3452
class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
3454
def addExpectedFailure(self, test, err):
3455
self._before_event()
3456
return self._call_maybe("addExpectedFailure", self._degrade_skip,
3459
# Let's just define a no-op decorator
3460
BzrAutoTimingTestResultDecorator = lambda x:x
3363
ForwardingResult = testtools.ExtendedToOriginalDecorator
3463
3366
class ProfileResult(ForwardingResult):
3824
3738
'bzrlib.tests.test_identitymap',
3825
3739
'bzrlib.tests.test_ignores',
3826
3740
'bzrlib.tests.test_index',
3741
'bzrlib.tests.test_import_tariff',
3827
3742
'bzrlib.tests.test_info',
3828
3743
'bzrlib.tests.test_inv',
3829
3744
'bzrlib.tests.test_inventory_delta',
3830
3745
'bzrlib.tests.test_knit',
3831
3746
'bzrlib.tests.test_lazy_import',
3832
3747
'bzrlib.tests.test_lazy_regex',
3748
'bzrlib.tests.test_library_state',
3833
3749
'bzrlib.tests.test_lock',
3834
3750
'bzrlib.tests.test_lockable_files',
3835
3751
'bzrlib.tests.test_lockdir',
4134
4059
:param new_id: The id to assign to it.
4135
4060
:return: The new test.
4137
new_test = copy(test)
4062
new_test = copy.copy(test)
4138
4063
new_test.id = lambda: new_id
4064
# XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4065
# causes cloned tests to share the 'details' dict. This makes it hard to
4066
# read the test output for parameterized tests, because tracebacks will be
4067
# associated with irrelevant tests.
4069
details = new_test._TestCase__details
4070
except AttributeError:
4071
# must be a different version of testtools than expected. Do nothing.
4074
# Reset the '__details' dict.
4075
new_test._TestCase__details = {}
4139
4076
return new_test
4079
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4081
"""Helper for permutating tests against an extension module.
4083
This is meant to be used inside a modules 'load_tests()' function. It will
4084
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4085
against both implementations. Setting 'test.module' to the appropriate
4086
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4088
:param standard_tests: A test suite to permute
4089
:param loader: A TestLoader
4090
:param py_module_name: The python path to a python module that can always
4091
be loaded, and will be considered the 'python' implementation. (eg
4092
'bzrlib._chk_map_py')
4093
:param ext_module_name: The python path to an extension module. If the
4094
module cannot be loaded, a single test will be added, which notes that
4095
the module is not available. If it can be loaded, all standard_tests
4096
will be run against that module.
4097
:return: (suite, feature) suite is a test-suite that has all the permuted
4098
tests. feature is the Feature object that can be used to determine if
4099
the module is available.
4102
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4104
('python', {'module': py_module}),
4106
suite = loader.suiteClass()
4107
feature = ModuleAvailableFeature(ext_module_name)
4108
if feature.available():
4109
scenarios.append(('C', {'module': feature.module}))
4111
# the compiled module isn't available, so we add a failing test
4112
class FailWithoutFeature(TestCase):
4113
def test_fail(self):
4114
self.requireFeature(feature)
4115
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4116
result = multiply_tests(standard_tests, scenarios, suite)
4117
return result, feature
4142
4120
def _rmtree_temp_dir(dirname, test_id=None):
4143
4121
# If LANG=C we probably have created some bogus paths
4144
4122
# which rmtree(unicode) will fail to delete
4249
4230
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4233
class _CompatabilityThunkFeature(Feature):
4234
"""This feature is just a thunk to another feature.
4236
It issues a deprecation warning if it is accessed, to let you know that you
4237
should really use a different feature.
4240
def __init__(self, dep_version, module, name,
4241
replacement_name, replacement_module=None):
4242
super(_CompatabilityThunkFeature, self).__init__()
4243
self._module = module
4244
if replacement_module is None:
4245
replacement_module = module
4246
self._replacement_module = replacement_module
4248
self._replacement_name = replacement_name
4249
self._dep_version = dep_version
4250
self._feature = None
4253
if self._feature is None:
4254
depr_msg = self._dep_version % ('%s.%s'
4255
% (self._module, self._name))
4256
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4257
self._replacement_name)
4258
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4259
# Import the new feature and use it as a replacement for the
4261
mod = __import__(self._replacement_module, {}, {},
4262
[self._replacement_name])
4263
self._feature = getattr(mod, self._replacement_name)
4267
return self._feature._probe()
4252
4270
class ModuleAvailableFeature(Feature):
4253
4271
"""This is a feature than describes a module we want to be available.
4461
4479
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4464
class _SubUnitFeature(Feature):
4465
"""Check if subunit is available."""
4482
class _CaseSensitiveFilesystemFeature(Feature):
4467
4484
def _probe(self):
4485
if CaseInsCasePresFilenameFeature.available():
4487
elif CaseInsensitiveFilesystemFeature.available():
4474
4492
def feature_name(self):
4477
SubUnitFeature = _SubUnitFeature()
4493
return 'case-sensitive filesystem'
4495
# new coding style is for feature instances to be lowercase
4496
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4499
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4500
SubUnitFeature = _CompatabilityThunkFeature(
4501
deprecated_in((2,1,0)),
4502
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4478
4503
# Only define SubUnitBzrRunner if subunit is available.
4480
4505
from subunit import TestProtocolClient
4506
from subunit.test_results import AutoTimingTestResultDecorator
4507
class SubUnitBzrProtocolClient(TestProtocolClient):
4509
def addSuccess(self, test, details=None):
4510
# The subunit client always includes the details in the subunit
4511
# stream, but we don't want to include it in ours.
4512
if details is not None and 'log' in details:
4514
return super(SubUnitBzrProtocolClient, self).addSuccess(
4481
4517
class SubUnitBzrRunner(TextTestRunner):
4482
4518
def run(self, test):
4483
result = BzrAutoTimingTestResultDecorator(
4484
TestProtocolClient(self.stream))
4519
result = AutoTimingTestResultDecorator(
4520
SubUnitBzrProtocolClient(self.stream))
4485
4521
test.run(result)
4487
4523
except ImportError:
4526
class _PosixPermissionsFeature(Feature):
4530
# create temporary file and check if specified perms are maintained.
4533
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4534
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4537
os.chmod(name, write_perms)
4539
read_perms = os.stat(name).st_mode & 0777
4541
return (write_perms == read_perms)
4543
return (os.name == 'posix') and has_perms()
4545
def feature_name(self):
4546
return 'POSIX permissions support'
4548
posix_permissions_feature = _PosixPermissionsFeature()