26
27
# general style of bzrlib. Please continue that consistency when adding e.g.
27
28
# new assertFoo() methods.
30
33
from cStringIO import StringIO
40
from pprint import pformat
45
from subprocess import Popen, PIPE, STDOUT
55
# nb: check this before importing anything else from within it
56
_testtools_version = getattr(testtools, '__version__', ())
57
if _testtools_version < (0, 9, 2):
58
raise ImportError("need at least testtools 0.9.2: %s is %r"
59
% (testtools.__file__, _testtools_version))
60
from testtools import content
44
79
import bzrlib.branch
45
import bzrlib.bzrdir as bzrdir
46
80
import bzrlib.commands
47
import bzrlib.errors as errors
81
import bzrlib.timestamp
48
83
import bzrlib.inventory
49
84
import bzrlib.iterablefile
50
85
import bzrlib.lockdir
89
# lsprof not available
51
91
from bzrlib.merge import merge_inner
52
92
import bzrlib.merge3
54
import bzrlib.osutils as osutils
55
93
import bzrlib.plugin
56
from bzrlib.revision import common_ancestor
94
from bzrlib.smart import client, request, server
57
95
import bzrlib.store
96
from bzrlib import symbol_versioning
97
from bzrlib.symbol_versioning import (
58
104
import bzrlib.trace
59
from bzrlib.transport import urlescape, get_transport
105
from bzrlib.transport import (
60
110
import bzrlib.transport
61
from bzrlib.transport.local import LocalRelpathServer
62
from bzrlib.transport.readonly import ReadonlyServer
63
from bzrlib.trace import mutter
64
from bzrlib.tests.TestUtil import TestLoader, TestSuite
65
from bzrlib.tests.treeshape import build_tree_contents
111
from bzrlib.trace import mutter, note
112
from bzrlib.tests import (
117
from bzrlib.tests.http_server import HttpServer
118
from bzrlib.tests.TestUtil import (
122
from bzrlib.ui import NullProgressView
123
from bzrlib.ui.text import TextUIFactory
124
import bzrlib.version_info_formats.format_custom
66
125
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
68
default_transport = LocalRelpathServer
71
MODULES_TO_DOCTEST = [
83
def packages_to_test():
84
"""Return a list of packages to test.
86
The packages are not globally imported so that import failures are
87
triggered when running selftest, not when importing the command.
90
import bzrlib.tests.blackbox
91
import bzrlib.tests.branch_implementations
92
import bzrlib.tests.bzrdir_implementations
93
import bzrlib.tests.interrepository_implementations
94
import bzrlib.tests.interversionedfile_implementations
95
import bzrlib.tests.repository_implementations
96
import bzrlib.tests.revisionstore_implementations
97
import bzrlib.tests.workingtree_implementations
100
bzrlib.tests.blackbox,
101
bzrlib.tests.branch_implementations,
102
bzrlib.tests.bzrdir_implementations,
103
bzrlib.tests.interrepository_implementations,
104
bzrlib.tests.interversionedfile_implementations,
105
bzrlib.tests.repository_implementations,
106
bzrlib.tests.revisionstore_implementations,
107
bzrlib.tests.workingtree_implementations,
111
class _MyResult(unittest._TextTestResult):
112
"""Custom TestResult.
114
Shows output in a different format, including displaying runtime for tests.
127
# Mark this python module as being part of the implementation
128
# of unittest: this gives us better tracebacks where the last
129
# shown frame is the test code, not our assertXYZ.
132
default_transport = test_server.LocalURLServer
135
_unitialized_attr = object()
136
"""A sentinel needed to act as a default value in a method signature."""
139
# Subunit result codes, defined here to prevent a hard dependency on subunit.
144
class ExtendedTestResult(unittest._TextTestResult):
145
"""Accepts, reports and accumulates the results of running tests.
147
Compared to the unittest version this class adds support for
148
profiling, benchmarking, stopping as soon as a test fails, and
149
skipping tests. There are further-specialized subclasses for
150
different types of display.
152
When a test finishes, in whatever way, it calls one of the addSuccess,
153
addFailure or addError classes. These in turn may redirect to a more
154
specific case for the special test results supported by our extended
157
Note that just one of these objects is fed the results from many tests.
116
160
stop_early = False
118
def _elapsedTime(self):
119
return "%5dms" % (1000 * (time.time() - self._start_time))
162
def __init__(self, stream, descriptions, verbosity,
166
"""Construct new TestResult.
168
:param bench_history: Optionally, a writable file object to accumulate
171
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
172
if bench_history is not None:
173
from bzrlib.version import _get_bzr_source_tree
174
src_tree = _get_bzr_source_tree()
177
revision_id = src_tree.get_parent_ids()[0]
179
# XXX: if this is a brand new tree, do the same as if there
183
# XXX: If there's no branch, what should we do?
185
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
186
self._bench_history = bench_history
187
self.ui = ui.ui_factory
190
self.failure_count = 0
191
self.known_failure_count = 0
193
self.not_applicable_count = 0
194
self.unsupported = {}
196
self._overall_start_time = time.time()
197
self._strict = strict
199
def stopTestRun(self):
202
stopTime = time.time()
203
timeTaken = stopTime - self.startTime
205
self.stream.writeln(self.separator2)
206
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
207
run, run != 1 and "s" or "", timeTaken))
208
self.stream.writeln()
209
if not self.wasSuccessful():
210
self.stream.write("FAILED (")
211
failed, errored = map(len, (self.failures, self.errors))
213
self.stream.write("failures=%d" % failed)
215
if failed: self.stream.write(", ")
216
self.stream.write("errors=%d" % errored)
217
if self.known_failure_count:
218
if failed or errored: self.stream.write(", ")
219
self.stream.write("known_failure_count=%d" %
220
self.known_failure_count)
221
self.stream.writeln(")")
223
if self.known_failure_count:
224
self.stream.writeln("OK (known_failures=%d)" %
225
self.known_failure_count)
227
self.stream.writeln("OK")
228
if self.skip_count > 0:
229
skipped = self.skip_count
230
self.stream.writeln('%d test%s skipped' %
231
(skipped, skipped != 1 and "s" or ""))
233
for feature, count in sorted(self.unsupported.items()):
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
237
ok = self.wasStrictlySuccessful()
239
ok = self.wasSuccessful()
240
if TestCase._first_thread_leaker_id:
242
'%s is leaking threads among %d leaking tests.\n' % (
243
TestCase._first_thread_leaker_id,
244
TestCase._leaking_threads_tests))
245
# We don't report the main thread as an active one.
247
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
250
def getDescription(self, test):
253
def _extractBenchmarkTime(self, testCase, details=None):
254
"""Add a benchmark time for the current test case."""
255
if details and 'benchtime' in details:
256
return float(''.join(details['benchtime'].iter_bytes()))
257
return getattr(testCase, "_benchtime", None)
259
def _elapsedTestTimeString(self):
260
"""Return a time string for the overall time the current test has taken."""
261
return self._formatTime(time.time() - self._start_time)
263
def _testTimeString(self, testCase):
264
benchmark_time = self._extractBenchmarkTime(testCase)
265
if benchmark_time is not None:
266
return self._formatTime(benchmark_time) + "*"
268
return self._elapsedTestTimeString()
270
def _formatTime(self, seconds):
271
"""Format seconds as milliseconds with leading spaces."""
272
# some benchmarks can take thousands of seconds to run, so we need 8
274
return "%8dms" % (1000 * seconds)
276
def _shortened_test_description(self, test):
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
121
281
def startTest(self, test):
122
282
unittest.TestResult.startTest(self, test)
123
# In a short description, the important words are in
124
# the beginning, but in an id, the important words are
126
SHOW_DESCRIPTIONS = False
128
width = osutils.terminal_width()
129
name_width = width - 15
131
if SHOW_DESCRIPTIONS:
132
what = test.shortDescription()
134
if len(what) > name_width:
135
what = what[:name_width-3] + '...'
138
if what.startswith('bzrlib.tests.'):
140
if len(what) > name_width:
141
what = '...' + what[3-name_width:]
142
what = what.ljust(name_width)
143
self.stream.write(what)
285
self.report_test_start(test)
286
test.number = self.count
287
self._recordTestStartTime()
289
def startTests(self):
291
if getattr(sys, 'frozen', None) is None:
292
bzr_path = osutils.realpath(sys.argv[0])
294
bzr_path = sys.executable
296
'bzr selftest: %s\n' % (bzr_path,))
299
bzrlib.__path__[0],))
301
' bzr-%s python-%s %s\n' % (
302
bzrlib.version_string,
303
bzrlib._format_version_tuple(sys.version_info),
304
platform.platform(aliased=1),
306
self.stream.write('\n')
308
def _recordTestStartTime(self):
309
"""Record that a test has started."""
145
310
self._start_time = time.time()
312
def _cleanupLogFile(self, test):
313
# We can only do this if we have one of our TestCases, not if
315
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
316
if setKeepLogfile is not None:
147
319
def addError(self, test, err):
148
if isinstance(err[1], TestSkipped):
149
return self.addSkipped(test, err)
320
"""Tell result that test finished with an error.
322
Called from the TestCase run() method when the test
323
fails with an unexpected error.
150
326
unittest.TestResult.addError(self, test, err)
152
self.stream.writeln("ERROR %s" % self._elapsedTime())
154
self.stream.write('E')
327
self.error_count += 1
328
self.report_error(test, err)
156
329
if self.stop_early:
331
self._cleanupLogFile(test)
159
333
def addFailure(self, test, err):
334
"""Tell result that test failed.
336
Called from the TestCase run() method when the test
337
fails because e.g. an assert() method failed.
160
340
unittest.TestResult.addFailure(self, test, err)
162
self.stream.writeln(" FAIL %s" % self._elapsedTime())
164
self.stream.write('F')
341
self.failure_count += 1
342
self.report_failure(test, err)
166
343
if self.stop_early:
169
def addSuccess(self, test):
171
self.stream.writeln(' OK %s' % self._elapsedTime())
173
self.stream.write('~')
175
unittest.TestResult.addSuccess(self, test)
177
def addSkipped(self, test, skip_excinfo):
179
print >>self.stream, ' SKIP %s' % self._elapsedTime()
180
print >>self.stream, ' %s' % skip_excinfo[1]
182
self.stream.write('S')
184
# seems best to treat this as success from point-of-view of unittest
185
# -- it actually does nothing so it barely matters :)
186
unittest.TestResult.addSuccess(self, test)
188
def printErrorList(self, flavour, errors):
189
for test, err in errors:
190
self.stream.writeln(self.separator1)
191
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
192
if getattr(test, '_get_log', None) is not None:
194
print >>self.stream, \
195
('vvvv[log from %s]' % test.id()).ljust(78,'-')
196
print >>self.stream, test._get_log()
197
print >>self.stream, \
198
('^^^^[log from %s]' % test.id()).ljust(78,'-')
199
self.stream.writeln(self.separator2)
200
self.stream.writeln("%s" % err)
203
class TextTestRunner(unittest.TextTestRunner):
345
self._cleanupLogFile(test)
347
def addSuccess(self, test, details=None):
348
"""Tell result that test completed successfully.
350
Called from the TestCase run()
352
if self._bench_history is not None:
353
benchmark_time = self._extractBenchmarkTime(test, details)
354
if benchmark_time is not None:
355
self._bench_history.write("%s %s\n" % (
356
self._formatTime(benchmark_time),
358
self.report_success(test)
359
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
361
test._log_contents = ''
363
def addExpectedFailure(self, test, err):
364
self.known_failure_count += 1
365
self.report_known_failure(test, err)
367
def addNotSupported(self, test, feature):
368
"""The test will not be run because of a missing feature.
370
# this can be called in two different ways: it may be that the
371
# test started running, and then raised (through requireFeature)
372
# UnavailableFeature. Alternatively this method can be called
373
# while probing for features before running the test code proper; in
374
# that case we will see startTest and stopTest, but the test will
375
# never actually run.
376
self.unsupported.setdefault(str(feature), 0)
377
self.unsupported[str(feature)] += 1
378
self.report_unsupported(test, feature)
380
def addSkip(self, test, reason):
381
"""A test has not run for 'reason'."""
383
self.report_skip(test, reason)
385
def addNotApplicable(self, test, reason):
386
self.not_applicable_count += 1
387
self.report_not_applicable(test, reason)
389
def _post_mortem(self):
390
"""Start a PDB post mortem session."""
391
if os.environ.get('BZR_TEST_PDB', None):
392
import pdb;pdb.post_mortem()
394
def progress(self, offset, whence):
395
"""The test is adjusting the count of tests to run."""
396
if whence == SUBUNIT_SEEK_SET:
397
self.num_tests = offset
398
elif whence == SUBUNIT_SEEK_CUR:
399
self.num_tests += offset
401
raise errors.BzrError("Unknown whence %r" % whence)
403
def report_cleaning_up(self):
406
def startTestRun(self):
407
self.startTime = time.time()
409
def report_success(self, test):
412
def wasStrictlySuccessful(self):
413
if self.unsupported or self.known_failure_count:
415
return self.wasSuccessful()
418
class TextTestResult(ExtendedTestResult):
419
"""Displays progress and results of tests in text form"""
421
def __init__(self, stream, descriptions, verbosity,
426
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
427
bench_history, strict)
428
# We no longer pass them around, but just rely on the UIFactory stack
431
warnings.warn("Passing pb to TextTestResult is deprecated")
432
self.pb = self.ui.nested_progress_bar()
433
self.pb.show_pct = False
434
self.pb.show_spinner = False
435
self.pb.show_eta = False,
436
self.pb.show_count = False
437
self.pb.show_bar = False
438
self.pb.update_latency = 0
439
self.pb.show_transport_activity = False
441
def stopTestRun(self):
442
# called when the tests that are going to run have run
445
super(TextTestResult, self).stopTestRun()
447
def startTestRun(self):
448
super(TextTestResult, self).startTestRun()
449
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
451
def printErrors(self):
452
# clear the pb to make room for the error listing
454
super(TextTestResult, self).printErrors()
456
def _progress_prefix_text(self):
457
# the longer this text, the less space we have to show the test
459
a = '[%d' % self.count # total that have been run
460
# tests skipped as known not to be relevant are not important enough
462
## if self.skip_count:
463
## a += ', %d skip' % self.skip_count
464
## if self.known_failure_count:
465
## a += '+%dX' % self.known_failure_count
467
a +='/%d' % self.num_tests
469
runtime = time.time() - self._overall_start_time
471
a += '%dm%ds' % (runtime / 60, runtime % 60)
474
total_fail_count = self.error_count + self.failure_count
476
a += ', %d failed' % total_fail_count
477
# if self.unsupported:
478
# a += ', %d missing' % len(self.unsupported)
482
def report_test_start(self, test):
485
self._progress_prefix_text()
487
+ self._shortened_test_description(test))
489
def _test_description(self, test):
490
return self._shortened_test_description(test)
492
def report_error(self, test, err):
493
self.stream.write('ERROR: %s\n %s\n' % (
494
self._test_description(test),
498
def report_failure(self, test, err):
499
self.stream.write('FAIL: %s\n %s\n' % (
500
self._test_description(test),
504
def report_known_failure(self, test, err):
507
def report_skip(self, test, reason):
510
def report_not_applicable(self, test, reason):
513
def report_unsupported(self, test, feature):
514
"""test cannot be run because feature is missing."""
516
def report_cleaning_up(self):
517
self.pb.update('Cleaning up')
520
class VerboseTestResult(ExtendedTestResult):
521
"""Produce long output, with one line per test run plus times"""
523
def _ellipsize_to_right(self, a_string, final_width):
524
"""Truncate and pad a string, keeping the right hand side"""
525
if len(a_string) > final_width:
526
result = '...' + a_string[3-final_width:]
529
return result.ljust(final_width)
531
def startTestRun(self):
532
super(VerboseTestResult, self).startTestRun()
533
self.stream.write('running %d tests...\n' % self.num_tests)
535
def report_test_start(self, test):
537
name = self._shortened_test_description(test)
538
width = osutils.terminal_width()
539
if width is not None:
540
# width needs space for 6 char status, plus 1 for slash, plus an
541
# 11-char time string, plus a trailing blank
542
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
544
self.stream.write(self._ellipsize_to_right(name, width-18))
546
self.stream.write(name)
549
def _error_summary(self, err):
551
return '%s%s' % (indent, err[1])
553
def report_error(self, test, err):
554
self.stream.writeln('ERROR %s\n%s'
555
% (self._testTimeString(test),
556
self._error_summary(err)))
558
def report_failure(self, test, err):
559
self.stream.writeln(' FAIL %s\n%s'
560
% (self._testTimeString(test),
561
self._error_summary(err)))
563
def report_known_failure(self, test, err):
564
self.stream.writeln('XFAIL %s\n%s'
565
% (self._testTimeString(test),
566
self._error_summary(err)))
568
def report_success(self, test):
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
570
for bench_called, stats in getattr(test, '_benchcalls', []):
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
572
stats.pprint(file=self.stream)
573
# flush the stream so that we get smooth output. This verbose mode is
574
# used to show the output in PQM.
577
def report_skip(self, test, reason):
578
self.stream.writeln(' SKIP %s\n%s'
579
% (self._testTimeString(test), reason))
581
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
583
% (self._testTimeString(test), reason))
585
def report_unsupported(self, test, feature):
586
"""test cannot be run because feature is missing."""
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
588
%(self._testTimeString(test), feature))
591
class TextTestRunner(object):
204
592
stop_on_failure = False
206
def _makeResult(self):
207
result = _MyResult(self.stream, self.descriptions, self.verbosity)
208
result.stop_early = self.stop_on_failure
600
result_decorators=None,
602
"""Create a TextTestRunner.
604
:param result_decorators: An optional list of decorators to apply
605
to the result object being used by the runner. Decorators are
606
applied left to right - the first element in the list is the
609
# stream may know claim to know to write unicode strings, but in older
610
# pythons this goes sufficiently wrong that it is a bad idea. (
611
# specifically a built in file with encoding 'UTF-8' will still try
612
# to encode using ascii.
613
new_encoding = osutils.get_terminal_encoding()
614
codec = codecs.lookup(new_encoding)
615
if type(codec) is tuple:
619
encode = codec.encode
620
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
621
stream.encoding = new_encoding
622
self.stream = unittest._WritelnDecorator(stream)
623
self.descriptions = descriptions
624
self.verbosity = verbosity
625
self._bench_history = bench_history
626
self._strict = strict
627
self._result_decorators = result_decorators or []
630
"Run the given test case or test suite."
631
if self.verbosity == 1:
632
result_class = TextTestResult
633
elif self.verbosity >= 2:
634
result_class = VerboseTestResult
635
original_result = result_class(self.stream,
638
bench_history=self._bench_history,
641
# Signal to result objects that look at stop early policy to stop,
642
original_result.stop_early = self.stop_on_failure
643
result = original_result
644
for decorator in self._result_decorators:
645
result = decorator(result)
646
result.stop_early = self.stop_on_failure
647
result.startTestRun()
652
# higher level code uses our extended protocol to determine
653
# what exit code to give.
654
return original_result
212
657
def iter_suite_tests(suite):
213
658
"""Return all tests in a suite, recursing through nested suites"""
214
for item in suite._tests:
215
if isinstance(item, unittest.TestCase):
217
elif isinstance(item, unittest.TestSuite):
659
if isinstance(suite, unittest.TestCase):
661
elif isinstance(suite, unittest.TestSuite):
218
663
for r in iter_suite_tests(item):
221
raise Exception('unknown object %r inside test suite %r'
225
class TestSkipped(Exception):
226
"""Indicates that a test was intentionally skipped, rather than failing."""
230
class CommandFailed(Exception):
233
class TestCase(unittest.TestCase):
666
raise Exception('unknown type %r for object %r'
667
% (type(suite), suite))
670
TestSkipped = testtools.testcase.TestSkipped
673
class TestNotApplicable(TestSkipped):
674
"""A test is not applicable to the situation where it was run.
676
This is only normally raised by parameterized tests, if they find that
677
the instance they're constructed upon does not support one aspect
682
# traceback._some_str fails to format exceptions that have the default
683
# __str__ which does an implicit ascii conversion. However, repr() on those
684
# objects works, for all that its not quite what the doctor may have ordered.
685
def _clever_some_str(value):
690
return repr(value).replace('\\n', '\n')
692
return '<unprintable %s object>' % type(value).__name__
694
traceback._some_str = _clever_some_str
697
# deprecated - use self.knownFailure(), or self.expectFailure.
698
KnownFailure = testtools.testcase._ExpectedFailure
701
class UnavailableFeature(Exception):
702
"""A feature required for this test was not available.
704
This can be considered a specialised form of SkippedTest.
706
The feature should be used to construct the exception.
710
class StringIOWrapper(object):
711
"""A wrapper around cStringIO which just adds an encoding attribute.
713
Internally we can check sys.stdout to see what the output encoding
714
should be. However, cStringIO has no encoding attribute that we can
715
set. So we wrap it instead.
720
def __init__(self, s=None):
722
self.__dict__['_cstring'] = StringIO(s)
724
self.__dict__['_cstring'] = StringIO()
726
def __getattr__(self, name, getattr=getattr):
727
return getattr(self.__dict__['_cstring'], name)
729
def __setattr__(self, name, val):
730
if name == 'encoding':
731
self.__dict__['encoding'] = val
733
return setattr(self._cstring, name, val)
736
class TestUIFactory(TextUIFactory):
737
"""A UI Factory for testing.
739
Hide the progress bar but emit note()s.
741
Allows get_password to be tested without real tty attached.
743
See also CannedInputUIFactory which lets you provide programmatic input in
746
# TODO: Capture progress events at the model level and allow them to be
747
# observed by tests that care.
749
# XXX: Should probably unify more with CannedInputUIFactory or a
750
# particular configuration of TextUIFactory, or otherwise have a clearer
751
# idea of how they're supposed to be different.
752
# See https://bugs.launchpad.net/bzr/+bug/408213
754
def __init__(self, stdout=None, stderr=None, stdin=None):
755
if stdin is not None:
756
# We use a StringIOWrapper to be able to test various
757
# encodings, but the user is still responsible to
758
# encode the string and to set the encoding attribute
759
# of StringIOWrapper.
760
stdin = StringIOWrapper(stdin)
761
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
763
def get_non_echoed_password(self):
764
"""Get password from stdin without trying to handle the echo mode"""
765
password = self.stdin.readline()
768
if password[-1] == '\n':
769
password = password[:-1]
772
def make_progress_view(self):
773
return NullProgressView()
776
class TestCase(testtools.TestCase):
234
777
"""Base class for bzr unit tests.
236
Tests that need access to disk resources should subclass
779
Tests that need access to disk resources should subclass
237
780
TestCaseInTempDir not TestCase.
239
782
Error and debug log messages are redirected from their usual
349
1456
The file is removed as the test is torn down.
351
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
352
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
353
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
354
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
1459
self._log_file = os.fdopen(fileno, 'w+')
1460
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
355
1461
self._log_file_name = name
356
1462
self.addCleanup(self._finishLogFile)
358
1464
def _finishLogFile(self):
359
1465
"""Finished with the log file.
361
Read contents into memory, close, and delete.
363
bzrlib.trace.disable_test_log(self._log_nonce)
364
self._log_file.seek(0)
365
self._log_contents = self._log_file.read()
366
self._log_file.close()
367
os.remove(self._log_file_name)
368
self._log_file = self._log_file_name = None
370
def addCleanup(self, callable):
1467
Close the file and delete it, unless setKeepLogfile was called.
1469
if bzrlib.trace._trace_file:
1470
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1472
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1476
def thisFailsStrictLockCheck(self):
1477
"""It is known that this test would fail with -Dstrict_locks.
1479
By default, all tests are run with strict lock checking unless
1480
-Edisable_lock_checks is supplied. However there are some tests which
1481
we know fail strict locks at this point that have not been fixed.
1482
They should call this function to disable the strict checking.
1484
This should be used sparingly, it is much better to fix the locking
1485
issues rather than papering over the problem by calling this function.
1487
debug.debug_flags.discard('strict_locks')
1489
def addCleanup(self, callable, *args, **kwargs):
371
1490
"""Arrange to run a callable when this case is torn down.
373
Callables are run in the reverse of the order they are registered,
1492
Callables are run in the reverse of the order they are registered,
374
1493
ie last-in first-out.
376
if callable in self._cleanups:
377
raise ValueError("cleanup function %r already registered on %s"
379
self._cleanups.append(callable)
1495
self._cleanups.append((callable, args, kwargs))
1497
def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1498
"""Overrides an object attribute restoring it after the test.
1500
:param obj: The object that will be mutated.
1502
:param attr_name: The attribute name we want to preserve/override in
1505
:param new: The optional value we want to set the attribute to.
1507
:returns: The actual attr value.
1509
value = getattr(obj, attr_name)
1510
# The actual value is captured by the call below
1511
self.addCleanup(setattr, obj, attr_name, value)
1512
if new is not _unitialized_attr:
1513
setattr(obj, attr_name, new)
381
1516
def _cleanEnvironment(self):
1518
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
383
1519
'HOME': os.getcwd(),
384
'APPDATA': os.getcwd(),
1520
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1521
# tests do check our impls match APPDATA
1522
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1526
'BZREMAIL': None, # may still be present in the environment
1527
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1528
'BZR_PROGRESS_BAR': None,
1530
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
'BZR_CONCURRENCY': None,
1534
# Make sure that any text ui tests are consistent regardless of
1535
# the environment the test case is run in; you may want tests that
1536
# test other combinations. 'dumb' is a reasonable guess for tests
1537
# going to a pipe or a StringIO.
1541
'BZR_COLUMNS': '80',
1543
'SSH_AUTH_SOCK': None,
1547
'https_proxy': None,
1548
'HTTPS_PROXY': None,
1553
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1554
# least. If you do (care), please update this comment
1558
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
389
1566
self.addCleanup(self._restoreEnvironment)
390
1567
for name, value in new_env.iteritems():
391
1568
self._captureVar(name, value)
394
1570
def _captureVar(self, name, newvalue):
395
"""Set an environment variable, preparing it to be reset when finished."""
396
self.__old_env[name] = os.environ.get(name, None)
398
if name in os.environ:
401
os.environ[name] = newvalue
404
def _restoreVar(name, value):
406
if name in os.environ:
409
os.environ[name] = value
1571
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
411
1574
def _restoreEnvironment(self):
412
for name, value in self.__old_env.iteritems():
413
self._restoreVar(name, value)
417
unittest.TestCase.tearDown(self)
419
def _runCleanups(self):
420
"""Run registered cleanup functions.
422
This should only be called from TestCase.tearDown.
1575
for name, value in self._old_env.iteritems():
1576
osutils.set_or_unset_env(name, value)
1578
def _restoreHooks(self):
1579
for klass, (name, hooks) in self._preserved_hooks.items():
1580
setattr(klass, name, hooks)
1582
def knownFailure(self, reason):
1583
"""This test has failed for some known reason."""
1584
raise KnownFailure(reason)
1586
def _do_skip(self, result, reason):
1587
addSkip = getattr(result, 'addSkip', None)
1588
if not callable(addSkip):
1589
result.addSuccess(result)
1591
addSkip(self, reason)
1594
def _do_known_failure(self, result, e):
1595
err = sys.exc_info()
1596
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1597
if addExpectedFailure is not None:
1598
addExpectedFailure(self, err)
1600
result.addSuccess(self)
1603
def _do_not_applicable(self, result, e):
1605
reason = 'No reason given'
1608
addNotApplicable = getattr(result, 'addNotApplicable', None)
1609
if addNotApplicable is not None:
1610
result.addNotApplicable(self, reason)
1612
self._do_skip(result, reason)
1615
def _do_unsupported_or_skip(self, result, e):
1617
addNotSupported = getattr(result, 'addNotSupported', None)
1618
if addNotSupported is not None:
1619
result.addNotSupported(self, reason)
1621
self._do_skip(result, reason)
1623
def time(self, callable, *args, **kwargs):
1624
"""Run callable and accrue the time it takes to the benchmark time.
1626
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1627
this will cause lsprofile statistics to be gathered and stored in
424
# TODO: Perhaps this should keep running cleanups even if
426
for cleanup_fn in reversed(self._cleanups):
1630
if self._benchtime is None:
1631
self.addDetail('benchtime', content.Content(content.ContentType(
1632
"text", "plain"), lambda:[str(self._benchtime)]))
1636
if not self._gather_lsprof_in_benchmarks:
1637
return callable(*args, **kwargs)
1639
# record this benchmark
1640
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1642
self._benchcalls.append(((callable, args, kwargs), stats))
1645
self._benchtime += time.time() - start
429
1647
def log(self, *args):
433
"""Return as a string the log for this test"""
434
if self._log_file_name:
435
return open(self._log_file_name).read()
1650
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1656
:param keep_log_file: When True, if the log is still a file on disk
1657
leave it as a file on disk. When False, if the log is still a file
1658
on disk, the log file is deleted and the log preserved as
1660
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
437
1668
return self._log_contents
438
# TODO: Delete the log after it's been read in
440
def capture(self, cmd, retcode=0):
441
"""Shortcut that splits cmd into words, runs, and returns stdout"""
442
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
444
def run_bzr_captured(self, argv, retcode=0):
445
"""Invoke bzr and return (stdout, stderr).
447
Useful for code that wants to check the contents of the
448
output, the way error messages are presented, etc.
450
This should be the main method for tests that want to exercise the
451
overall behavior of the bzr application (rather than a unit test
452
or a functional test of the library.)
454
Much of the old code runs bzr by forking a new copy of Python, but
455
that is slower, harder to debug, and generally not necessary.
457
This runs bzr through the interface that catches and reports
458
errors, and with logging set to something approximating the
459
default, so that error reporting can be checked.
461
argv -- arguments to invoke bzr
462
retcode -- expected return code, or None for don't-care.
466
self.log('run bzr: %s', ' '.join(argv))
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1727
return "No log file content and no log file name."
1730
"""Get a unicode string containing the log from bzrlib.trace.
1732
Undecodable characters are replaced.
1734
return u"".join(self.getDetails()['log'].iter_text())
1736
def requireFeature(self, feature):
1737
"""This test requires a specific feature is available.
1739
:raises UnavailableFeature: When feature is not available.
1741
if not feature.available():
1742
raise UnavailableFeature(feature)
1744
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1746
"""Run bazaar command line, splitting up a string command line."""
1747
if isinstance(args, basestring):
1748
# shlex don't understand unicode strings,
1749
# so args should be plain string (bialix 20070906)
1750
args = list(shlex.split(str(args)))
1751
return self._run_bzr_core(args, retcode=retcode,
1752
encoding=encoding, stdin=stdin, working_dir=working_dir,
1755
def _run_bzr_core(self, args, retcode, encoding, stdin,
1757
# Clear chk_map page cache, because the contents are likely to mask
1759
chk_map.clear_cache()
1760
if encoding is None:
1761
encoding = osutils.get_user_encoding()
1762
stdout = StringIOWrapper()
1763
stderr = StringIOWrapper()
1764
stdout.encoding = encoding
1765
stderr.encoding = encoding
1767
self.log('run bzr: %r', args)
467
1768
# FIXME: don't call into logging here
468
1769
handler = logging.StreamHandler(stderr)
469
handler.setFormatter(bzrlib.trace.QuietFormatter())
470
1770
handler.setLevel(logging.INFO)
471
1771
logger = logging.getLogger('')
472
1772
logger.addHandler(handler)
1773
old_ui_factory = ui.ui_factory
1774
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1777
if working_dir is not None:
1778
cwd = osutils.getcwd()
1779
os.chdir(working_dir)
474
result = self.apply_redirected(None, stdout, stderr,
475
bzrlib.commands.run_bzr_catch_errors,
1783
result = self.apply_redirected(ui.ui_factory.stdin,
1785
bzrlib.commands.run_bzr_catch_user_errors,
1787
except KeyboardInterrupt:
1788
# Reraise KeyboardInterrupt with contents of redirected stdout
1789
# and stderr as arguments, for tests which are interested in
1790
# stdout and stderr and are expecting the exception.
1791
out = stdout.getvalue()
1792
err = stderr.getvalue()
1794
self.log('output:\n%r', out)
1796
self.log('errors:\n%r', err)
1797
raise KeyboardInterrupt(out, err)
478
1799
logger.removeHandler(handler)
1800
ui.ui_factory = old_ui_factory
479
1804
out = stdout.getvalue()
480
1805
err = stderr.getvalue()
482
self.log('output:\n%s', out)
1807
self.log('output:\n%r', out)
484
self.log('errors:\n%s', err)
1809
self.log('errors:\n%r', err)
485
1810
if retcode is not None:
486
self.assertEquals(result, retcode)
1811
self.assertEquals(retcode, result,
1812
message='Unexpected return code')
1813
return result, out, err
489
def run_bzr(self, *args, **kwargs):
1815
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1816
working_dir=None, error_regexes=[], output_encoding=None):
490
1817
"""Invoke bzr, as if it were run from the command line.
1819
The argument list should not include the bzr program name - the
1820
first argument is normally the bzr command. Arguments may be
1821
passed in three ways:
1823
1- A list of strings, eg ["commit", "a"]. This is recommended
1824
when the command contains whitespace or metacharacters, or
1825
is built up at run time.
1827
2- A single string, eg "add a". This is the most convenient
1828
for hardcoded commands.
1830
This runs bzr through the interface that catches and reports
1831
errors, and with logging set to something approximating the
1832
default, so that error reporting can be checked.
492
1834
This should be the main method for tests that want to exercise the
493
1835
overall behavior of the bzr application (rather than a unit test
494
1836
or a functional test of the library.)
496
1838
This sends the stdout/stderr results into the test's log,
497
1839
where it may be useful for debugging. See also run_captured.
499
retcode = kwargs.pop('retcode', 0)
500
return self.run_bzr_captured(args, retcode)
1841
:keyword stdin: A string to be used as stdin for the command.
1842
:keyword retcode: The status code the command should return;
1844
:keyword working_dir: The directory to run the command in
1845
:keyword error_regexes: A list of expected error messages. If
1846
specified they must be seen in the error output of the command.
1848
retcode, out, err = self._run_bzr_autosplit(
1853
working_dir=working_dir,
1855
self.assertIsInstance(error_regexes, (list, tuple))
1856
for regex in error_regexes:
1857
self.assertContainsRe(err, regex)
1860
def run_bzr_error(self, error_regexes, *args, **kwargs):
1861
"""Run bzr, and check that stderr contains the supplied regexes
1863
:param error_regexes: Sequence of regular expressions which
1864
must each be found in the error output. The relative ordering
1866
:param args: command-line arguments for bzr
1867
:param kwargs: Keyword arguments which are interpreted by run_bzr
1868
This function changes the default value of retcode to be 3,
1869
since in most cases this is run when you expect bzr to fail.
1871
:return: (out, err) The actual output of running the command (in case
1872
you want to do more inspection)
1876
# Make sure that commit is failing because there is nothing to do
1877
self.run_bzr_error(['no changes to commit'],
1878
['commit', '-m', 'my commit comment'])
1879
# Make sure --strict is handling an unknown file, rather than
1880
# giving us the 'nothing to do' error
1881
self.build_tree(['unknown'])
1882
self.run_bzr_error(['Commit refused because there are unknown files'],
1883
['commit', --strict', '-m', 'my commit comment'])
1885
kwargs.setdefault('retcode', 3)
1886
kwargs['error_regexes'] = error_regexes
1887
out, err = self.run_bzr(*args, **kwargs)
1890
def run_bzr_subprocess(self, *args, **kwargs):
1891
"""Run bzr in a subprocess for testing.
1893
This starts a new Python interpreter and runs bzr in there.
1894
This should only be used for tests that have a justifiable need for
1895
this isolation: e.g. they are testing startup time, or signal
1896
handling, or early startup code, etc. Subprocess code can't be
1897
profiled or debugged so easily.
1899
:keyword retcode: The status code that is expected. Defaults to 0. If
1900
None is supplied, the status code is not checked.
1901
:keyword env_changes: A dictionary which lists changes to environment
1902
variables. A value of None will unset the env variable.
1903
The values must be strings. The change will only occur in the
1904
child, so you don't need to fix the environment after running.
1905
:keyword universal_newlines: Convert CRLF => LF
1906
:keyword allow_plugins: By default the subprocess is run with
1907
--no-plugins to ensure test reproducibility. Also, it is possible
1908
for system-wide plugins to create unexpected output on stderr,
1909
which can cause unnecessary test failures.
1911
env_changes = kwargs.get('env_changes', {})
1912
working_dir = kwargs.get('working_dir', None)
1913
allow_plugins = kwargs.get('allow_plugins', False)
1915
if isinstance(args[0], list):
1917
elif isinstance(args[0], basestring):
1918
args = list(shlex.split(args[0]))
1920
raise ValueError("passing varargs to run_bzr_subprocess")
1921
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1922
working_dir=working_dir,
1923
allow_plugins=allow_plugins)
1924
# We distinguish between retcode=None and retcode not passed.
1925
supplied_retcode = kwargs.get('retcode', 0)
1926
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1927
universal_newlines=kwargs.get('universal_newlines', False),
1930
def start_bzr_subprocess(self, process_args, env_changes=None,
1931
skip_if_plan_to_signal=False,
1933
allow_plugins=False):
1934
"""Start bzr in a subprocess for testing.
1936
This starts a new Python interpreter and runs bzr in there.
1937
This should only be used for tests that have a justifiable need for
1938
this isolation: e.g. they are testing startup time, or signal
1939
handling, or early startup code, etc. Subprocess code can't be
1940
profiled or debugged so easily.
1942
:param process_args: a list of arguments to pass to the bzr executable,
1943
for example ``['--version']``.
1944
:param env_changes: A dictionary which lists changes to environment
1945
variables. A value of None will unset the env variable.
1946
The values must be strings. The change will only occur in the
1947
child, so you don't need to fix the environment after running.
1948
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1950
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1952
:returns: Popen object for the started process.
1954
if skip_if_plan_to_signal:
1955
if not getattr(os, 'kill', None):
1956
raise TestSkipped("os.kill not available.")
1958
if env_changes is None:
1962
def cleanup_environment():
1963
for env_var, value in env_changes.iteritems():
1964
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1966
def restore_environment():
1967
for env_var, value in old_env.iteritems():
1968
osutils.set_or_unset_env(env_var, value)
1970
bzr_path = self.get_bzr_path()
1973
if working_dir is not None:
1974
cwd = osutils.getcwd()
1975
os.chdir(working_dir)
1978
# win32 subprocess doesn't support preexec_fn
1979
# so we will avoid using it on all platforms, just to
1980
# make sure the code path is used, and we don't break on win32
1981
cleanup_environment()
1982
command = [sys.executable]
1983
# frozen executables don't need the path to bzr
1984
if getattr(sys, "frozen", None) is None:
1985
command.append(bzr_path)
1986
if not allow_plugins:
1987
command.append('--no-plugins')
1988
command.extend(process_args)
1989
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1991
restore_environment()
1997
def _popen(self, *args, **kwargs):
1998
"""Place a call to Popen.
2000
Allows tests to override this method to intercept the calls made to
2001
Popen for introspection.
2003
return Popen(*args, **kwargs)
2005
def get_source_path(self):
2006
"""Return the path of the directory containing bzrlib."""
2007
return os.path.dirname(os.path.dirname(bzrlib.__file__))
2009
def get_bzr_path(self):
2010
"""Return the path of the 'bzr' executable for this test suite."""
2011
bzr_path = self.get_source_path()+'/bzr'
2012
if not os.path.isfile(bzr_path):
2013
# We are probably installed. Assume sys.argv is the right file
2014
bzr_path = sys.argv[0]
2017
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2018
universal_newlines=False, process_args=None):
2019
"""Finish the execution of process.
2021
:param process: the Popen object returned from start_bzr_subprocess.
2022
:param retcode: The status code that is expected. Defaults to 0. If
2023
None is supplied, the status code is not checked.
2024
:param send_signal: an optional signal to send to the process.
2025
:param universal_newlines: Convert CRLF => LF
2026
:returns: (stdout, stderr)
2028
if send_signal is not None:
2029
os.kill(process.pid, send_signal)
2030
out, err = process.communicate()
2032
if universal_newlines:
2033
out = out.replace('\r\n', '\n')
2034
err = err.replace('\r\n', '\n')
2036
if retcode is not None and retcode != process.returncode:
2037
if process_args is None:
2038
process_args = "(unknown args)"
2039
mutter('Output of bzr %s:\n%s', process_args, out)
2040
mutter('Error for bzr %s:\n%s', process_args, err)
2041
self.fail('Command bzr %s failed with retcode %s != %s'
2042
% (process_args, retcode, process.returncode))
502
2045
def check_inventory_shape(self, inv, shape):
503
2046
"""Compare an inventory to a list of expected names.
551
2094
sys.stderr = real_stderr
552
2095
sys.stdin = real_stdin
554
def merge(self, branch_from, wt_to):
555
"""A helper for tests to do a ui-less merge.
557
This should move to the main library when someone has time to integrate
560
# minimal ui-less merge.
561
wt_to.branch.fetch(branch_from)
562
base_rev = common_ancestor(branch_from.last_revision(),
563
wt_to.branch.last_revision(),
564
wt_to.branch.repository)
565
merge_inner(wt_to.branch, branch_from.basis_tree(),
566
wt_to.branch.repository.revision_tree(base_rev),
568
wt_to.add_pending_merge(branch_from.last_revision())
571
BzrTestBase = TestCase
574
class TestCaseInTempDir(TestCase):
2097
def reduceLockdirTimeout(self):
2098
"""Reduce the default lock timeout for the duration of the test, so that
2099
if LockContention occurs during a test, it does so quickly.
2101
Tests that expect to provoke LockContention errors should call this.
2103
self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2105
def make_utf8_encoded_stringio(self, encoding_type=None):
2106
"""Return a StringIOWrapper instance, that will encode Unicode
2109
if encoding_type is None:
2110
encoding_type = 'strict'
2112
output_encoding = 'utf-8'
2113
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2114
sio.encoding = output_encoding
2117
def disable_verb(self, verb):
2118
"""Disable a smart server verb for one test."""
2119
from bzrlib.smart import request
2120
request_handlers = request.request_handlers
2121
orig_method = request_handlers.get(verb)
2122
request_handlers.remove(verb)
2123
self.addCleanup(request_handlers.register, verb, orig_method)
2126
class CapturedCall(object):
2127
"""A helper for capturing smart server calls for easy debug analysis."""
2129
def __init__(self, params, prefix_length):
2130
"""Capture the call with params and skip prefix_length stack frames."""
2133
# The last 5 frames are the __init__, the hook frame, and 3 smart
2134
# client frames. Beyond this we could get more clever, but this is good
2136
stack = traceback.extract_stack()[prefix_length:-5]
2137
self.stack = ''.join(traceback.format_list(stack))
2140
return self.call.method
2143
return self.call.method
2149
class TestCaseWithMemoryTransport(TestCase):
2150
"""Common test class for tests that do not need disk resources.
2152
Tests that need disk resources should derive from TestCaseWithTransport.
2154
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2156
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2157
a directory which does not exist. This serves to help ensure test isolation
2158
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2159
must exist. However, TestCaseWithMemoryTransport does not offer local
2160
file defaults for the transport in tests, nor does it obey the command line
2161
override, so tests that accidentally write to the common directory should
2164
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2165
a .bzr directory that stops us ascending higher into the filesystem.
2171
def __init__(self, methodName='runTest'):
2172
# allow test parameterization after test construction and before test
2173
# execution. Variables that the parameterizer sets need to be
2174
# ones that are not set by setUp, or setUp will trash them.
2175
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2176
self.vfs_transport_factory = default_transport
2177
self.transport_server = None
2178
self.transport_readonly_server = None
2179
self.__vfs_server = None
2181
def get_transport(self, relpath=None):
2182
"""Return a writeable transport.
2184
This transport is for the test scratch space relative to
2187
:param relpath: a path relative to the base url.
2189
t = get_transport(self.get_url(relpath))
2190
self.assertFalse(t.is_readonly())
2193
def get_readonly_transport(self, relpath=None):
2194
"""Return a readonly transport for the test scratch space
2196
This can be used to test that operations which should only need
2197
readonly access in fact do not try to write.
2199
:param relpath: a path relative to the base url.
2201
t = get_transport(self.get_readonly_url(relpath))
2202
self.assertTrue(t.is_readonly())
2205
def create_transport_readonly_server(self):
2206
"""Create a transport server from class defined at init.
2208
This is mostly a hook for daughter classes.
2210
return self.transport_readonly_server()
2212
def get_readonly_server(self):
2213
"""Get the server instance for the readonly transport
2215
This is useful for some tests with specific servers to do diagnostics.
2217
if self.__readonly_server is None:
2218
if self.transport_readonly_server is None:
2219
# readonly decorator requested
2220
self.__readonly_server = test_server.ReadonlyServer()
2222
# explicit readonly transport.
2223
self.__readonly_server = self.create_transport_readonly_server()
2224
self.start_server(self.__readonly_server,
2225
self.get_vfs_only_server())
2226
return self.__readonly_server
2228
def get_readonly_url(self, relpath=None):
2229
"""Get a URL for the readonly transport.
2231
This will either be backed by '.' or a decorator to the transport
2232
used by self.get_url()
2233
relpath provides for clients to get a path relative to the base url.
2234
These should only be downwards relative, not upwards.
2236
base = self.get_readonly_server().get_url()
2237
return self._adjust_url(base, relpath)
2239
def get_vfs_only_server(self):
2240
"""Get the vfs only read/write server instance.
2242
This is useful for some tests with specific servers that need
2245
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2246
is no means to override it.
2248
if self.__vfs_server is None:
2249
self.__vfs_server = memory.MemoryServer()
2250
self.start_server(self.__vfs_server)
2251
return self.__vfs_server
2253
def get_server(self):
2254
"""Get the read/write server instance.
2256
This is useful for some tests with specific servers that need
2259
This is built from the self.transport_server factory. If that is None,
2260
then the self.get_vfs_server is returned.
2262
if self.__server is None:
2263
if (self.transport_server is None or self.transport_server is
2264
self.vfs_transport_factory):
2265
self.__server = self.get_vfs_only_server()
2267
# bring up a decorated means of access to the vfs only server.
2268
self.__server = self.transport_server()
2269
self.start_server(self.__server, self.get_vfs_only_server())
2270
return self.__server
2272
def _adjust_url(self, base, relpath):
2273
"""Get a URL (or maybe a path) for the readwrite transport.
2275
This will either be backed by '.' or to an equivalent non-file based
2277
relpath provides for clients to get a path relative to the base url.
2278
These should only be downwards relative, not upwards.
2280
if relpath is not None and relpath != '.':
2281
if not base.endswith('/'):
2283
# XXX: Really base should be a url; we did after all call
2284
# get_url()! But sometimes it's just a path (from
2285
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2286
# to a non-escaped local path.
2287
if base.startswith('./') or base.startswith('/'):
2290
base += urlutils.escape(relpath)
2293
def get_url(self, relpath=None):
2294
"""Get a URL (or maybe a path) for the readwrite transport.
2296
This will either be backed by '.' or to an equivalent non-file based
2298
relpath provides for clients to get a path relative to the base url.
2299
These should only be downwards relative, not upwards.
2301
base = self.get_server().get_url()
2302
return self._adjust_url(base, relpath)
2304
def get_vfs_only_url(self, relpath=None):
2305
"""Get a URL (or maybe a path for the plain old vfs transport.
2307
This will never be a smart protocol. It always has all the
2308
capabilities of the local filesystem, but it might actually be a
2309
MemoryTransport or some other similar virtual filesystem.
2311
This is the backing transport (if any) of the server returned by
2312
get_url and get_readonly_url.
2314
:param relpath: provides for clients to get a path relative to the base
2315
url. These should only be downwards relative, not upwards.
2318
base = self.get_vfs_only_server().get_url()
2319
return self._adjust_url(base, relpath)
2321
def _create_safety_net(self):
2322
"""Make a fake bzr directory.
2324
This prevents any tests propagating up onto the TEST_ROOT directory's
2327
root = TestCaseWithMemoryTransport.TEST_ROOT
2328
bzrdir.BzrDir.create_standalone_workingtree(root)
2330
def _check_safety_net(self):
2331
"""Check that the safety .bzr directory have not been touched.
2333
_make_test_root have created a .bzr directory to prevent tests from
2334
propagating. This method ensures than a test did not leaked.
2336
root = TestCaseWithMemoryTransport.TEST_ROOT
2337
self.permit_url(get_transport(root).base)
2338
wt = workingtree.WorkingTree.open(root)
2339
last_rev = wt.last_revision()
2340
if last_rev != 'null:':
2341
# The current test have modified the /bzr directory, we need to
2342
# recreate a new one or all the followng tests will fail.
2343
# If you need to inspect its content uncomment the following line
2344
# import pdb; pdb.set_trace()
2345
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2346
self._create_safety_net()
2347
raise AssertionError('%s/.bzr should not be modified' % root)
2349
def _make_test_root(self):
2350
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2351
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2352
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2354
TestCaseWithMemoryTransport.TEST_ROOT = root
2356
self._create_safety_net()
2358
# The same directory is used by all tests, and we're not
2359
# specifically told when all tests are finished. This will do.
2360
atexit.register(_rmtree_temp_dir, root)
2362
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2363
self.addCleanup(self._check_safety_net)
2365
def makeAndChdirToTestDir(self):
2366
"""Create a temporary directories for this one test.
2368
This must set self.test_home_dir and self.test_dir and chdir to
2371
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2373
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2374
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2375
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2376
self.permit_dir(self.test_dir)
2378
def make_branch(self, relpath, format=None):
2379
"""Create a branch on the transport at relpath."""
2380
repo = self.make_repository(relpath, format=format)
2381
return repo.bzrdir.create_branch()
2383
def make_bzrdir(self, relpath, format=None):
2385
# might be a relative or absolute path
2386
maybe_a_url = self.get_url(relpath)
2387
segments = maybe_a_url.rsplit('/', 1)
2388
t = get_transport(maybe_a_url)
2389
if len(segments) > 1 and segments[-1] not in ('', '.'):
2393
if isinstance(format, basestring):
2394
format = bzrdir.format_registry.make_bzrdir(format)
2395
return format.initialize_on_transport(t)
2396
except errors.UninitializableFormat:
2397
raise TestSkipped("Format %s is not initializable." % format)
2399
def make_repository(self, relpath, shared=False, format=None):
2400
"""Create a repository on our default transport at relpath.
2402
Note that relpath must be a relative path, not a full url.
2404
# FIXME: If you create a remoterepository this returns the underlying
2405
# real format, which is incorrect. Actually we should make sure that
2406
# RemoteBzrDir returns a RemoteRepository.
2407
# maybe mbp 20070410
2408
made_control = self.make_bzrdir(relpath, format=format)
2409
return made_control.create_repository(shared=shared)
2411
def make_smart_server(self, path):
2412
smart_server = test_server.SmartTCPServer_for_testing()
2413
self.start_server(smart_server, self.get_server())
2414
remote_transport = get_transport(smart_server.get_url()).clone(path)
2415
return remote_transport
2417
def make_branch_and_memory_tree(self, relpath, format=None):
2418
"""Create a branch on the default transport and a MemoryTree for it."""
2419
b = self.make_branch(relpath, format=format)
2420
return memorytree.MemoryTree.create_on_branch(b)
2422
def make_branch_builder(self, relpath, format=None):
2423
branch = self.make_branch(relpath, format=format)
2424
return branchbuilder.BranchBuilder(branch=branch)
2426
def overrideEnvironmentForTesting(self):
2427
test_home_dir = self.test_home_dir
2428
if isinstance(test_home_dir, unicode):
2429
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2434
super(TestCaseWithMemoryTransport, self).setUp()
2435
self._make_test_root()
2436
self.addCleanup(os.chdir, os.getcwdu())
2437
self.makeAndChdirToTestDir()
2438
self.overrideEnvironmentForTesting()
2439
self.__readonly_server = None
2440
self.__server = None
2441
self.reduceLockdirTimeout()
2443
def setup_smart_server_with_call_log(self):
2444
"""Sets up a smart server as the transport server with a call log."""
2445
self.transport_server = test_server.SmartTCPServer_for_testing
2446
self.hpss_calls = []
2448
# Skip the current stack down to the caller of
2449
# setup_smart_server_with_call_log
2450
prefix_length = len(traceback.extract_stack()) - 2
2451
def capture_hpss_call(params):
2452
self.hpss_calls.append(
2453
CapturedCall(params, prefix_length))
2454
client._SmartClient.hooks.install_named_hook(
2455
'call', capture_hpss_call, None)
2457
def reset_smart_call_log(self):
2458
self.hpss_calls = []
2461
class TestCaseInTempDir(TestCaseWithMemoryTransport):
575
2462
"""Derived class that runs a test within a temporary directory.
577
2464
This is useful for tests that need to create a branch, etc.
861
2724
for readonly urls.
863
2726
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
864
be used without needed to redo it when a different
2727
be used without needed to redo it when a different
865
2728
subclass is in use ?
868
2731
def setUp(self):
869
2732
super(ChrootedTestCase, self).setUp()
870
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
871
self.transport_readonly_server = bzrlib.transport.http.HttpServer
2733
if not self.vfs_transport_factory == memory.MemoryServer:
2734
self.transport_readonly_server = HttpServer
2737
def condition_id_re(pattern):
2738
"""Create a condition filter which performs a re check on a test's id.
2740
:param pattern: A regular expression string.
2741
:return: A callable that returns True if the re matches.
2743
filter_re = osutils.re_compile_checked(pattern, 0,
2745
def condition(test):
2747
return filter_re.search(test_id)
2751
def condition_isinstance(klass_or_klass_list):
2752
"""Create a condition filter which returns isinstance(param, klass).
2754
:return: A callable which when called with one parameter obj return the
2755
result of isinstance(obj, klass_or_klass_list).
2758
return isinstance(obj, klass_or_klass_list)
2762
def condition_id_in_list(id_list):
2763
"""Create a condition filter which verify that test's id in a list.
2765
:param id_list: A TestIdList object.
2766
:return: A callable that returns True if the test's id appears in the list.
2768
def condition(test):
2769
return id_list.includes(test.id())
2773
def condition_id_startswith(starts):
2774
"""Create a condition filter verifying that test's id starts with a string.
2776
:param starts: A list of string.
2777
:return: A callable that returns True if the test's id starts with one of
2780
def condition(test):
2781
for start in starts:
2782
if test.id().startswith(start):
2788
def exclude_tests_by_condition(suite, condition):
2789
"""Create a test suite which excludes some tests from suite.
2791
:param suite: The suite to get tests from.
2792
:param condition: A callable whose result evaluates True when called with a
2793
test case which should be excluded from the result.
2794
:return: A suite which contains the tests found in suite that fail
2798
for test in iter_suite_tests(suite):
2799
if not condition(test):
2801
return TestUtil.TestSuite(result)
2804
def filter_suite_by_condition(suite, condition):
2805
"""Create a test suite by filtering another one.
2807
:param suite: The source suite.
2808
:param condition: A callable whose result evaluates True when called with a
2809
test case which should be included in the result.
2810
:return: A suite which contains the tests found in suite that pass
2814
for test in iter_suite_tests(suite):
2817
return TestUtil.TestSuite(result)
874
2820
def filter_suite_by_re(suite, pattern):
876
filter_re = re.compile(pattern)
2821
"""Create a test suite by filtering another one.
2823
:param suite: the source suite
2824
:param pattern: pattern that names must match
2825
:returns: the newly created suite
2827
condition = condition_id_re(pattern)
2828
result_suite = filter_suite_by_condition(suite, condition)
2832
def filter_suite_by_id_list(suite, test_id_list):
2833
"""Create a test suite by filtering another one.
2835
:param suite: The source suite.
2836
:param test_id_list: A list of the test ids to keep as strings.
2837
:returns: the newly created suite
2839
condition = condition_id_in_list(test_id_list)
2840
result_suite = filter_suite_by_condition(suite, condition)
2844
def filter_suite_by_id_startswith(suite, start):
2845
"""Create a test suite by filtering another one.
2847
:param suite: The source suite.
2848
:param start: A list of string the test id must start with one of.
2849
:returns: the newly created suite
2851
condition = condition_id_startswith(start)
2852
result_suite = filter_suite_by_condition(suite, condition)
2856
def exclude_tests_by_re(suite, pattern):
2857
"""Create a test suite which excludes some tests from suite.
2859
:param suite: The suite to get tests from.
2860
:param pattern: A regular expression string. Test ids that match this
2861
pattern will be excluded from the result.
2862
:return: A TestSuite that contains all the tests from suite without the
2863
tests that matched pattern. The order of tests is the same as it was in
2866
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2869
def preserve_input(something):
2870
"""A helper for performing test suite transformation chains.
2872
:param something: Anything you want to preserve.
2878
def randomize_suite(suite):
2879
"""Return a new TestSuite with suite's tests in random order.
2881
The tests in the input suite are flattened into a single suite in order to
2882
accomplish this. Any nested TestSuites are removed to provide global
2885
tests = list(iter_suite_tests(suite))
2886
random.shuffle(tests)
2887
return TestUtil.TestSuite(tests)
2890
def split_suite_by_condition(suite, condition):
2891
"""Split a test suite into two by a condition.
2893
:param suite: The suite to split.
2894
:param condition: The condition to match on. Tests that match this
2895
condition are returned in the first test suite, ones that do not match
2896
are in the second suite.
2897
:return: A tuple of two test suites, where the first contains tests from
2898
suite matching the condition, and the second contains the remainder
2899
from suite. The order within each output suite is the same as it was in
877
2904
for test in iter_suite_tests(suite):
878
if filter_re.search(test.id()):
2906
matched.append(test)
2908
did_not_match.append(test)
2909
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2912
def split_suite_by_re(suite, pattern):
2913
"""Split a test suite into two by a regular expression.
2915
:param suite: The suite to split.
2916
:param pattern: A regular expression string. Test ids that match this
2917
pattern will be in the first test suite returned, and the others in the
2918
second test suite returned.
2919
:return: A tuple of two test suites, where the first contains tests from
2920
suite matching pattern, and the second contains the remainder from
2921
suite. The order within each output suite is the same as it was in
2924
return split_suite_by_condition(suite, condition_id_re(pattern))
883
2927
def run_suite(suite, name='test', verbose=False, pattern=".*",
884
stop_on_failure=False, keep_output=False,
886
TestCaseInTempDir._TEST_NAME = name
2928
stop_on_failure=False,
2929
transport=None, lsprof_timed=None, bench_history=None,
2930
matching_tests_first=None,
2933
exclude_pattern=None,
2936
suite_decorators=None,
2938
result_decorators=None,
2940
"""Run a test suite for bzr selftest.
2942
:param runner_class: The class of runner to use. Must support the
2943
constructor arguments passed by run_suite which are more than standard
2945
:return: A boolean indicating success.
2947
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
891
runner = TextTestRunner(stream=sys.stdout,
2952
if runner_class is None:
2953
runner_class = TextTestRunner
2956
runner = runner_class(stream=stream,
2958
verbosity=verbosity,
2959
bench_history=bench_history,
2961
result_decorators=result_decorators,
894
2963
runner.stop_on_failure=stop_on_failure
896
suite = filter_suite_by_re(suite, pattern)
2964
# built in decorator factories:
2966
random_order(random_seed, runner),
2967
exclude_tests(exclude_pattern),
2969
if matching_tests_first:
2970
decorators.append(tests_first(pattern))
2972
decorators.append(filter_tests(pattern))
2973
if suite_decorators:
2974
decorators.extend(suite_decorators)
2975
# tell the result object how many tests will be running: (except if
2976
# --parallel=fork is being used. Robert said he will provide a better
2977
# progress design later -- vila 20090817)
2978
if fork_decorator not in decorators:
2979
decorators.append(CountingDecorator)
2980
for decorator in decorators:
2981
suite = decorator(suite)
2983
# Done after test suite decoration to allow randomisation etc
2984
# to take effect, though that is of marginal benefit.
2986
stream.write("Listing tests only ...\n")
2987
for t in iter_suite_tests(suite):
2988
stream.write("%s\n" % (t.id()))
897
2990
result = runner.run(suite)
898
# This is still a little bogus,
899
# but only a little. Folk not using our testrunner will
900
# have to delete their temp directories themselves.
901
test_root = TestCaseInTempDir.TEST_ROOT
902
if result.wasSuccessful() or not keep_output:
903
if test_root is not None:
904
print 'Deleting test root %s...' % test_root
906
shutil.rmtree(test_root)
2992
return result.wasStrictlySuccessful()
910
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
911
return result.wasSuccessful()
2994
return result.wasSuccessful()
2997
# A registry where get() returns a suite decorator.
2998
parallel_registry = registry.Registry()
3001
def fork_decorator(suite):
3002
concurrency = osutils.local_concurrency()
3003
if concurrency == 1:
3005
from testtools import ConcurrentTestSuite
3006
return ConcurrentTestSuite(suite, fork_for_tests)
3007
parallel_registry.register('fork', fork_decorator)
3010
def subprocess_decorator(suite):
3011
concurrency = osutils.local_concurrency()
3012
if concurrency == 1:
3014
from testtools import ConcurrentTestSuite
3015
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3016
parallel_registry.register('subprocess', subprocess_decorator)
3019
def exclude_tests(exclude_pattern):
3020
"""Return a test suite decorator that excludes tests."""
3021
if exclude_pattern is None:
3022
return identity_decorator
3023
def decorator(suite):
3024
return ExcludeDecorator(suite, exclude_pattern)
3028
def filter_tests(pattern):
3030
return identity_decorator
3031
def decorator(suite):
3032
return FilterTestsDecorator(suite, pattern)
3036
def random_order(random_seed, runner):
3037
"""Return a test suite decorator factory for randomising tests order.
3039
:param random_seed: now, a string which casts to a long, or a long.
3040
:param runner: A test runner with a stream attribute to report on.
3042
if random_seed is None:
3043
return identity_decorator
3044
def decorator(suite):
3045
return RandomDecorator(suite, random_seed, runner.stream)
3049
def tests_first(pattern):
3051
return identity_decorator
3052
def decorator(suite):
3053
return TestFirstDecorator(suite, pattern)
3057
def identity_decorator(suite):
3062
class TestDecorator(TestSuite):
3063
"""A decorator for TestCase/TestSuite objects.
3065
Usually, subclasses should override __iter__(used when flattening test
3066
suites), which we do to filter, reorder, parallelise and so on, run() and
3070
def __init__(self, suite):
3071
TestSuite.__init__(self)
3074
def countTestCases(self):
3077
cases += test.countTestCases()
3084
def run(self, result):
3085
# Use iteration on self, not self._tests, to allow subclasses to hook
3088
if result.shouldStop:
3094
class CountingDecorator(TestDecorator):
3095
"""A decorator which calls result.progress(self.countTestCases)."""
3097
def run(self, result):
3098
progress_method = getattr(result, 'progress', None)
3099
if callable(progress_method):
3100
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3101
return super(CountingDecorator, self).run(result)
3104
class ExcludeDecorator(TestDecorator):
3105
"""A decorator which excludes test matching an exclude pattern."""
3107
def __init__(self, suite, exclude_pattern):
3108
TestDecorator.__init__(self, suite)
3109
self.exclude_pattern = exclude_pattern
3110
self.excluded = False
3114
return iter(self._tests)
3115
self.excluded = True
3116
suite = exclude_tests_by_re(self, self.exclude_pattern)
3118
self.addTests(suite)
3119
return iter(self._tests)
3122
class FilterTestsDecorator(TestDecorator):
3123
"""A decorator which filters tests to those matching a pattern."""
3125
def __init__(self, suite, pattern):
3126
TestDecorator.__init__(self, suite)
3127
self.pattern = pattern
3128
self.filtered = False
3132
return iter(self._tests)
3133
self.filtered = True
3134
suite = filter_suite_by_re(self, self.pattern)
3136
self.addTests(suite)
3137
return iter(self._tests)
3140
class RandomDecorator(TestDecorator):
3141
"""A decorator which randomises the order of its tests."""
3143
def __init__(self, suite, random_seed, stream):
3144
TestDecorator.__init__(self, suite)
3145
self.random_seed = random_seed
3146
self.randomised = False
3147
self.stream = stream
3151
return iter(self._tests)
3152
self.randomised = True
3153
self.stream.write("Randomizing test order using seed %s\n\n" %
3154
(self.actual_seed()))
3155
# Initialise the random number generator.
3156
random.seed(self.actual_seed())
3157
suite = randomize_suite(self)
3159
self.addTests(suite)
3160
return iter(self._tests)
3162
def actual_seed(self):
3163
if self.random_seed == "now":
3164
# We convert the seed to a long to make it reuseable across
3165
# invocations (because the user can reenter it).
3166
self.random_seed = long(time.time())
3168
# Convert the seed to a long if we can
3170
self.random_seed = long(self.random_seed)
3173
return self.random_seed
3176
class TestFirstDecorator(TestDecorator):
3177
"""A decorator which moves named tests to the front."""
3179
def __init__(self, suite, pattern):
3180
TestDecorator.__init__(self, suite)
3181
self.pattern = pattern
3182
self.filtered = False
3186
return iter(self._tests)
3187
self.filtered = True
3188
suites = split_suite_by_re(self, self.pattern)
3190
self.addTests(suites)
3191
return iter(self._tests)
3194
def partition_tests(suite, count):
3195
"""Partition suite into count lists of tests."""
3197
tests = list(iter_suite_tests(suite))
3198
tests_per_process = int(math.ceil(float(len(tests)) / count))
3199
for block in range(count):
3200
low_test = block * tests_per_process
3201
high_test = low_test + tests_per_process
3202
process_tests = tests[low_test:high_test]
3203
result.append(process_tests)
3207
def workaround_zealous_crypto_random():
3208
"""Crypto.Random want to help us being secure, but we don't care here.
3210
This workaround some test failure related to the sftp server. Once paramiko
3211
stop using the controversial API in Crypto.Random, we may get rid of it.
3214
from Crypto.Random import atfork
3220
def fork_for_tests(suite):
3221
"""Take suite and start up one runner per CPU by forking()
3223
:return: An iterable of TestCase-like objects which can each have
3224
run(result) called on them to feed tests to result.
3226
concurrency = osutils.local_concurrency()
3228
from subunit import TestProtocolClient, ProtocolTestCase
3229
from subunit.test_results import AutoTimingTestResultDecorator
3230
class TestInOtherProcess(ProtocolTestCase):
3231
# Should be in subunit, I think. RBC.
3232
def __init__(self, stream, pid):
3233
ProtocolTestCase.__init__(self, stream)
3236
def run(self, result):
3238
ProtocolTestCase.run(self, result)
3240
os.waitpid(self.pid, 0)
3242
test_blocks = partition_tests(suite, concurrency)
3243
for process_tests in test_blocks:
3244
process_suite = TestSuite()
3245
process_suite.addTests(process_tests)
3246
c2pread, c2pwrite = os.pipe()
3249
workaround_zealous_crypto_random()
3252
# Leave stderr and stdout open so we can see test noise
3253
# Close stdin so that the child goes away if it decides to
3254
# read from stdin (otherwise its a roulette to see what
3255
# child actually gets keystrokes for pdb etc).
3258
stream = os.fdopen(c2pwrite, 'wb', 1)
3259
subunit_result = AutoTimingTestResultDecorator(
3260
TestProtocolClient(stream))
3261
process_suite.run(subunit_result)
3266
stream = os.fdopen(c2pread, 'rb', 1)
3267
test = TestInOtherProcess(stream, pid)
3272
def reinvoke_for_tests(suite):
3273
"""Take suite and start up one runner per CPU using subprocess().
3275
:return: An iterable of TestCase-like objects which can each have
3276
run(result) called on them to feed tests to result.
3278
concurrency = osutils.local_concurrency()
3280
from subunit import ProtocolTestCase
3281
class TestInSubprocess(ProtocolTestCase):
3282
def __init__(self, process, name):
3283
ProtocolTestCase.__init__(self, process.stdout)
3284
self.process = process
3285
self.process.stdin.close()
3288
def run(self, result):
3290
ProtocolTestCase.run(self, result)
3293
os.unlink(self.name)
3294
# print "pid %d finished" % finished_process
3295
test_blocks = partition_tests(suite, concurrency)
3296
for process_tests in test_blocks:
3297
# ugly; currently reimplement rather than reuses TestCase methods.
3298
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3299
if not os.path.isfile(bzr_path):
3300
# We are probably installed. Assume sys.argv is the right file
3301
bzr_path = sys.argv[0]
3302
bzr_path = [bzr_path]
3303
if sys.platform == "win32":
3304
# if we're on windows, we can't execute the bzr script directly
3305
bzr_path = [sys.executable] + bzr_path
3306
fd, test_list_file_name = tempfile.mkstemp()
3307
test_list_file = os.fdopen(fd, 'wb', 1)
3308
for test in process_tests:
3309
test_list_file.write(test.id() + '\n')
3310
test_list_file.close()
3312
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3314
if '--no-plugins' in sys.argv:
3315
argv.append('--no-plugins')
3316
# stderr=STDOUT would be ideal, but until we prevent noise on
3317
# stderr it can interrupt the subunit protocol.
3318
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3320
test = TestInSubprocess(process, test_list_file_name)
3323
os.unlink(test_list_file_name)
3328
class ForwardingResult(unittest.TestResult):
3330
def __init__(self, target):
3331
unittest.TestResult.__init__(self)
3332
self.result = target
3334
def startTest(self, test):
3335
self.result.startTest(test)
3337
def stopTest(self, test):
3338
self.result.stopTest(test)
3340
def startTestRun(self):
3341
self.result.startTestRun()
3343
def stopTestRun(self):
3344
self.result.stopTestRun()
3346
def addSkip(self, test, reason):
3347
self.result.addSkip(test, reason)
3349
def addSuccess(self, test):
3350
self.result.addSuccess(test)
3352
def addError(self, test, err):
3353
self.result.addError(test, err)
3355
def addFailure(self, test, err):
3356
self.result.addFailure(test, err)
3357
ForwardingResult = testtools.ExtendedToOriginalDecorator
3360
class ProfileResult(ForwardingResult):
3361
"""Generate profiling data for all activity between start and success.
3363
The profile data is appended to the test's _benchcalls attribute and can
3364
be accessed by the forwarded-to TestResult.
3366
While it might be cleaner do accumulate this in stopTest, addSuccess is
3367
where our existing output support for lsprof is, and this class aims to
3368
fit in with that: while it could be moved it's not necessary to accomplish
3369
test profiling, nor would it be dramatically cleaner.
3372
def startTest(self, test):
3373
self.profiler = bzrlib.lsprof.BzrProfiler()
3374
self.profiler.start()
3375
ForwardingResult.startTest(self, test)
3377
def addSuccess(self, test):
3378
stats = self.profiler.stop()
3380
calls = test._benchcalls
3381
except AttributeError:
3382
test._benchcalls = []
3383
calls = test._benchcalls
3384
calls.append(((test.id(), "", ""), stats))
3385
ForwardingResult.addSuccess(self, test)
3387
def stopTest(self, test):
3388
ForwardingResult.stopTest(self, test)
3389
self.profiler = None
3392
# Controlled by "bzr selftest -E=..." option
3393
# Currently supported:
3394
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3395
# preserves any flags supplied at the command line.
3396
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3397
# rather than failing tests. And no longer raise
3398
# LockContention when fctnl locks are not being used
3399
# with proper exclusion rules.
3400
selftest_debug_flags = set()
914
3403
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3405
test_suite_factory=None,
3408
matching_tests_first=None,
3411
exclude_pattern=None,
3417
suite_decorators=None,
917
3421
"""Run the whole test suite under the enhanced runner"""
3422
# XXX: Very ugly way to do this...
3423
# Disable warning about old formats because we don't want it to disturb
3424
# any blackbox tests.
3425
from bzrlib import repository
3426
repository._deprecation_warning_done = True
918
3428
global default_transport
919
3429
if transport is None:
920
3430
transport = default_transport
921
3431
old_transport = default_transport
922
3432
default_transport = transport
3433
global selftest_debug_flags
3434
old_debug_flags = selftest_debug_flags
3435
if debug_flags is not None:
3436
selftest_debug_flags = set(debug_flags)
3438
if load_list is None:
3441
keep_only = load_test_id_list(load_list)
3443
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3444
for start in starting_with]
3445
if test_suite_factory is None:
3446
# Reduce loading time by loading modules based on the starting_with
3448
suite = test_suite(keep_only, starting_with)
3450
suite = test_suite_factory()
3452
# But always filter as requested.
3453
suite = filter_suite_by_id_startswith(suite, starting_with)
3454
result_decorators = []
3456
result_decorators.append(ProfileResult)
925
3457
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
926
stop_on_failure=stop_on_failure, keep_output=keep_output,
3458
stop_on_failure=stop_on_failure,
3459
transport=transport,
3460
lsprof_timed=lsprof_timed,
3461
bench_history=bench_history,
3462
matching_tests_first=matching_tests_first,
3463
list_only=list_only,
3464
random_seed=random_seed,
3465
exclude_pattern=exclude_pattern,
3467
runner_class=runner_class,
3468
suite_decorators=suite_decorators,
3470
result_decorators=result_decorators,
929
3473
default_transport = old_transport
934
"""Build and return TestSuite for the whole program."""
935
from doctest import DocTestSuite
937
global MODULES_TO_DOCTEST
940
'bzrlib.tests.test_ancestry',
941
'bzrlib.tests.test_annotate',
942
'bzrlib.tests.test_api',
943
'bzrlib.tests.test_bad_files',
944
'bzrlib.tests.test_basis_inventory',
945
'bzrlib.tests.test_branch',
946
'bzrlib.tests.test_bzrdir',
947
'bzrlib.tests.test_command',
948
'bzrlib.tests.test_commit',
949
'bzrlib.tests.test_commit_merge',
950
'bzrlib.tests.test_config',
951
'bzrlib.tests.test_conflicts',
952
'bzrlib.tests.test_decorators',
953
'bzrlib.tests.test_diff',
954
'bzrlib.tests.test_doc_generate',
955
'bzrlib.tests.test_errors',
956
'bzrlib.tests.test_escaped_store',
957
'bzrlib.tests.test_fetch',
958
'bzrlib.tests.test_gpg',
959
'bzrlib.tests.test_graph',
960
'bzrlib.tests.test_hashcache',
961
'bzrlib.tests.test_http',
962
'bzrlib.tests.test_identitymap',
963
'bzrlib.tests.test_inv',
964
'bzrlib.tests.test_knit',
965
'bzrlib.tests.test_lockdir',
966
'bzrlib.tests.test_lockable_files',
967
'bzrlib.tests.test_log',
968
'bzrlib.tests.test_merge',
969
'bzrlib.tests.test_merge3',
970
'bzrlib.tests.test_merge_core',
971
'bzrlib.tests.test_missing',
972
'bzrlib.tests.test_msgeditor',
973
'bzrlib.tests.test_nonascii',
974
'bzrlib.tests.test_options',
975
'bzrlib.tests.test_osutils',
976
'bzrlib.tests.test_permissions',
977
'bzrlib.tests.test_plugins',
978
'bzrlib.tests.test_progress',
979
'bzrlib.tests.test_reconcile',
980
'bzrlib.tests.test_repository',
981
'bzrlib.tests.test_revision',
982
'bzrlib.tests.test_revisionnamespaces',
983
'bzrlib.tests.test_revprops',
984
'bzrlib.tests.test_rio',
985
'bzrlib.tests.test_sampler',
986
'bzrlib.tests.test_selftest',
987
'bzrlib.tests.test_setup',
988
'bzrlib.tests.test_sftp_transport',
989
'bzrlib.tests.test_smart_add',
990
'bzrlib.tests.test_source',
991
'bzrlib.tests.test_store',
992
'bzrlib.tests.test_symbol_versioning',
993
'bzrlib.tests.test_testament',
994
'bzrlib.tests.test_trace',
995
'bzrlib.tests.test_transactions',
996
'bzrlib.tests.test_transform',
997
'bzrlib.tests.test_transport',
998
'bzrlib.tests.test_tsort',
999
'bzrlib.tests.test_ui',
1000
'bzrlib.tests.test_upgrade',
1001
'bzrlib.tests.test_versionedfile',
1002
'bzrlib.tests.test_weave',
1003
'bzrlib.tests.test_whitebox',
1004
'bzrlib.tests.test_workingtree',
1005
'bzrlib.tests.test_xml',
1007
test_transport_implementations = [
1008
'bzrlib.tests.test_transport_implementations']
1010
TestCase.BZRPATH = osutils.pathjoin(
1011
osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
1012
print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
1013
print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
1016
# python2.4's TestLoader.loadTestsFromNames gives very poor
1017
# errors if it fails to load a named module - no indication of what's
1018
# actually wrong, just "no such module". We should probably override that
1019
# class, but for the moment just load them ourselves. (mbp 20051202)
1020
loader = TestLoader()
1021
from bzrlib.transport import TransportTestProviderAdapter
1022
adapter = TransportTestProviderAdapter()
1023
adapt_modules(test_transport_implementations, adapter, loader, suite)
1024
for mod_name in testmod_names:
1025
mod = _load_module_by_name(mod_name)
1026
suite.addTest(loader.loadTestsFromModule(mod))
1027
for package in packages_to_test():
1028
suite.addTest(package.test_suite())
1029
for m in MODULES_TO_TEST:
1030
suite.addTest(loader.loadTestsFromModule(m))
1031
for m in (MODULES_TO_DOCTEST):
1032
suite.addTest(DocTestSuite(m))
1033
for name, plugin in bzrlib.plugin.all_plugins().items():
1034
if getattr(plugin, 'test_suite', None) is not None:
1035
suite.addTest(plugin.test_suite())
3474
selftest_debug_flags = old_debug_flags
3477
def load_test_id_list(file_name):
3478
"""Load a test id list from a text file.
3480
The format is one test id by line. No special care is taken to impose
3481
strict rules, these test ids are used to filter the test suite so a test id
3482
that do not match an existing test will do no harm. This allows user to add
3483
comments, leave blank lines, etc.
3487
ftest = open(file_name, 'rt')
3489
if e.errno != errno.ENOENT:
3492
raise errors.NoSuchFile(file_name)
3494
for test_name in ftest.readlines():
3495
test_list.append(test_name.strip())
3500
def suite_matches_id_list(test_suite, id_list):
3501
"""Warns about tests not appearing or appearing more than once.
3503
:param test_suite: A TestSuite object.
3504
:param test_id_list: The list of test ids that should be found in
3507
:return: (absents, duplicates) absents is a list containing the test found
3508
in id_list but not in test_suite, duplicates is a list containing the
3509
test found multiple times in test_suite.
3511
When using a prefined test id list, it may occurs that some tests do not
3512
exist anymore or that some tests use the same id. This function warns the
3513
tester about potential problems in his workflow (test lists are volatile)
3514
or in the test suite itself (using the same id for several tests does not
3515
help to localize defects).
3517
# Build a dict counting id occurrences
3519
for test in iter_suite_tests(test_suite):
3521
tests[id] = tests.get(id, 0) + 1
3526
occurs = tests.get(id, 0)
3528
not_found.append(id)
3530
duplicates.append(id)
3532
return not_found, duplicates
3535
class TestIdList(object):
3536
"""Test id list to filter a test suite.
3538
Relying on the assumption that test ids are built as:
3539
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3540
notation, this class offers methods to :
3541
- avoid building a test suite for modules not refered to in the test list,
3542
- keep only the tests listed from the module test suite.
3545
def __init__(self, test_id_list):
3546
# When a test suite needs to be filtered against us we compare test ids
3547
# for equality, so a simple dict offers a quick and simple solution.
3548
self.tests = dict().fromkeys(test_id_list, True)
3550
# While unittest.TestCase have ids like:
3551
# <module>.<class>.<method>[(<param+)],
3552
# doctest.DocTestCase can have ids like:
3555
# <module>.<function>
3556
# <module>.<class>.<method>
3558
# Since we can't predict a test class from its name only, we settle on
3559
# a simple constraint: a test id always begins with its module name.
3562
for test_id in test_id_list:
3563
parts = test_id.split('.')
3564
mod_name = parts.pop(0)
3565
modules[mod_name] = True
3567
mod_name += '.' + part
3568
modules[mod_name] = True
3569
self.modules = modules
3571
def refers_to(self, module_name):
3572
"""Is there tests for the module or one of its sub modules."""
3573
return self.modules.has_key(module_name)
3575
def includes(self, test_id):
3576
return self.tests.has_key(test_id)
3579
class TestPrefixAliasRegistry(registry.Registry):
3580
"""A registry for test prefix aliases.
3582
This helps implement shorcuts for the --starting-with selftest
3583
option. Overriding existing prefixes is not allowed but not fatal (a
3584
warning will be emitted).
3587
def register(self, key, obj, help=None, info=None,
3588
override_existing=False):
3589
"""See Registry.register.
3591
Trying to override an existing alias causes a warning to be emitted,
3592
not a fatal execption.
3595
super(TestPrefixAliasRegistry, self).register(
3596
key, obj, help=help, info=info, override_existing=False)
3598
actual = self.get(key)
3599
note('Test prefix alias %s is already used for %s, ignoring %s'
3600
% (key, actual, obj))
3602
def resolve_alias(self, id_start):
3603
"""Replace the alias by the prefix in the given string.
3605
Using an unknown prefix is an error to help catching typos.
3607
parts = id_start.split('.')
3609
parts[0] = self.get(parts[0])
3611
raise errors.BzrCommandError(
3612
'%s is not a known test prefix alias' % parts[0])
3613
return '.'.join(parts)
3616
test_prefix_alias_registry = TestPrefixAliasRegistry()
3617
"""Registry of test prefix aliases."""
3620
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3621
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3622
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3624
# Obvious highest levels prefixes, feel free to add your own via a plugin
3625
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3626
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3627
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3628
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3629
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3632
def _test_suite_testmod_names():
3633
"""Return the standard list of test module names to test."""
3636
'bzrlib.tests.blackbox',
3637
'bzrlib.tests.commands',
3638
'bzrlib.tests.per_branch',
3639
'bzrlib.tests.per_bzrdir',
3640
'bzrlib.tests.per_bzrdir_colo',
3641
'bzrlib.tests.per_foreign_vcs',
3642
'bzrlib.tests.per_interrepository',
3643
'bzrlib.tests.per_intertree',
3644
'bzrlib.tests.per_inventory',
3645
'bzrlib.tests.per_interbranch',
3646
'bzrlib.tests.per_lock',
3647
'bzrlib.tests.per_merger',
3648
'bzrlib.tests.per_transport',
3649
'bzrlib.tests.per_tree',
3650
'bzrlib.tests.per_pack_repository',
3651
'bzrlib.tests.per_repository',
3652
'bzrlib.tests.per_repository_chk',
3653
'bzrlib.tests.per_repository_reference',
3654
'bzrlib.tests.per_uifactory',
3655
'bzrlib.tests.per_versionedfile',
3656
'bzrlib.tests.per_workingtree',
3657
'bzrlib.tests.test__annotator',
3658
'bzrlib.tests.test__bencode',
3659
'bzrlib.tests.test__chk_map',
3660
'bzrlib.tests.test__dirstate_helpers',
3661
'bzrlib.tests.test__groupcompress',
3662
'bzrlib.tests.test__known_graph',
3663
'bzrlib.tests.test__rio',
3664
'bzrlib.tests.test__simple_set',
3665
'bzrlib.tests.test__static_tuple',
3666
'bzrlib.tests.test__walkdirs_win32',
3667
'bzrlib.tests.test_ancestry',
3668
'bzrlib.tests.test_annotate',
3669
'bzrlib.tests.test_api',
3670
'bzrlib.tests.test_atomicfile',
3671
'bzrlib.tests.test_bad_files',
3672
'bzrlib.tests.test_bisect_multi',
3673
'bzrlib.tests.test_branch',
3674
'bzrlib.tests.test_branchbuilder',
3675
'bzrlib.tests.test_btree_index',
3676
'bzrlib.tests.test_bugtracker',
3677
'bzrlib.tests.test_bundle',
3678
'bzrlib.tests.test_bzrdir',
3679
'bzrlib.tests.test__chunks_to_lines',
3680
'bzrlib.tests.test_cache_utf8',
3681
'bzrlib.tests.test_chk_map',
3682
'bzrlib.tests.test_chk_serializer',
3683
'bzrlib.tests.test_chunk_writer',
3684
'bzrlib.tests.test_clean_tree',
3685
'bzrlib.tests.test_cleanup',
3686
'bzrlib.tests.test_cmdline',
3687
'bzrlib.tests.test_commands',
3688
'bzrlib.tests.test_commit',
3689
'bzrlib.tests.test_commit_merge',
3690
'bzrlib.tests.test_config',
3691
'bzrlib.tests.test_conflicts',
3692
'bzrlib.tests.test_counted_lock',
3693
'bzrlib.tests.test_crash',
3694
'bzrlib.tests.test_decorators',
3695
'bzrlib.tests.test_delta',
3696
'bzrlib.tests.test_debug',
3697
'bzrlib.tests.test_deprecated_graph',
3698
'bzrlib.tests.test_diff',
3699
'bzrlib.tests.test_directory_service',
3700
'bzrlib.tests.test_dirstate',
3701
'bzrlib.tests.test_email_message',
3702
'bzrlib.tests.test_eol_filters',
3703
'bzrlib.tests.test_errors',
3704
'bzrlib.tests.test_export',
3705
'bzrlib.tests.test_extract',
3706
'bzrlib.tests.test_fetch',
3707
'bzrlib.tests.test_fifo_cache',
3708
'bzrlib.tests.test_filters',
3709
'bzrlib.tests.test_ftp_transport',
3710
'bzrlib.tests.test_foreign',
3711
'bzrlib.tests.test_generate_docs',
3712
'bzrlib.tests.test_generate_ids',
3713
'bzrlib.tests.test_globbing',
3714
'bzrlib.tests.test_gpg',
3715
'bzrlib.tests.test_graph',
3716
'bzrlib.tests.test_groupcompress',
3717
'bzrlib.tests.test_hashcache',
3718
'bzrlib.tests.test_help',
3719
'bzrlib.tests.test_hooks',
3720
'bzrlib.tests.test_http',
3721
'bzrlib.tests.test_http_response',
3722
'bzrlib.tests.test_https_ca_bundle',
3723
'bzrlib.tests.test_identitymap',
3724
'bzrlib.tests.test_ignores',
3725
'bzrlib.tests.test_index',
3726
'bzrlib.tests.test_import_tariff',
3727
'bzrlib.tests.test_info',
3728
'bzrlib.tests.test_inv',
3729
'bzrlib.tests.test_inventory_delta',
3730
'bzrlib.tests.test_knit',
3731
'bzrlib.tests.test_lazy_import',
3732
'bzrlib.tests.test_lazy_regex',
3733
'bzrlib.tests.test_lock',
3734
'bzrlib.tests.test_lockable_files',
3735
'bzrlib.tests.test_lockdir',
3736
'bzrlib.tests.test_log',
3737
'bzrlib.tests.test_lru_cache',
3738
'bzrlib.tests.test_lsprof',
3739
'bzrlib.tests.test_mail_client',
3740
'bzrlib.tests.test_matchers',
3741
'bzrlib.tests.test_memorytree',
3742
'bzrlib.tests.test_merge',
3743
'bzrlib.tests.test_merge3',
3744
'bzrlib.tests.test_merge_core',
3745
'bzrlib.tests.test_merge_directive',
3746
'bzrlib.tests.test_missing',
3747
'bzrlib.tests.test_msgeditor',
3748
'bzrlib.tests.test_multiparent',
3749
'bzrlib.tests.test_mutabletree',
3750
'bzrlib.tests.test_nonascii',
3751
'bzrlib.tests.test_options',
3752
'bzrlib.tests.test_osutils',
3753
'bzrlib.tests.test_osutils_encodings',
3754
'bzrlib.tests.test_pack',
3755
'bzrlib.tests.test_patch',
3756
'bzrlib.tests.test_patches',
3757
'bzrlib.tests.test_permissions',
3758
'bzrlib.tests.test_plugins',
3759
'bzrlib.tests.test_progress',
3760
'bzrlib.tests.test_read_bundle',
3761
'bzrlib.tests.test_reconcile',
3762
'bzrlib.tests.test_reconfigure',
3763
'bzrlib.tests.test_registry',
3764
'bzrlib.tests.test_remote',
3765
'bzrlib.tests.test_rename_map',
3766
'bzrlib.tests.test_repository',
3767
'bzrlib.tests.test_revert',
3768
'bzrlib.tests.test_revision',
3769
'bzrlib.tests.test_revisionspec',
3770
'bzrlib.tests.test_revisiontree',
3771
'bzrlib.tests.test_rio',
3772
'bzrlib.tests.test_rules',
3773
'bzrlib.tests.test_sampler',
3774
'bzrlib.tests.test_script',
3775
'bzrlib.tests.test_selftest',
3776
'bzrlib.tests.test_serializer',
3777
'bzrlib.tests.test_setup',
3778
'bzrlib.tests.test_sftp_transport',
3779
'bzrlib.tests.test_shelf',
3780
'bzrlib.tests.test_shelf_ui',
3781
'bzrlib.tests.test_smart',
3782
'bzrlib.tests.test_smart_add',
3783
'bzrlib.tests.test_smart_request',
3784
'bzrlib.tests.test_smart_transport',
3785
'bzrlib.tests.test_smtp_connection',
3786
'bzrlib.tests.test_source',
3787
'bzrlib.tests.test_ssh_transport',
3788
'bzrlib.tests.test_status',
3789
'bzrlib.tests.test_store',
3790
'bzrlib.tests.test_strace',
3791
'bzrlib.tests.test_subsume',
3792
'bzrlib.tests.test_switch',
3793
'bzrlib.tests.test_symbol_versioning',
3794
'bzrlib.tests.test_tag',
3795
'bzrlib.tests.test_testament',
3796
'bzrlib.tests.test_textfile',
3797
'bzrlib.tests.test_textmerge',
3798
'bzrlib.tests.test_timestamp',
3799
'bzrlib.tests.test_trace',
3800
'bzrlib.tests.test_transactions',
3801
'bzrlib.tests.test_transform',
3802
'bzrlib.tests.test_transport',
3803
'bzrlib.tests.test_transport_log',
3804
'bzrlib.tests.test_tree',
3805
'bzrlib.tests.test_treebuilder',
3806
'bzrlib.tests.test_tsort',
3807
'bzrlib.tests.test_tuned_gzip',
3808
'bzrlib.tests.test_ui',
3809
'bzrlib.tests.test_uncommit',
3810
'bzrlib.tests.test_upgrade',
3811
'bzrlib.tests.test_upgrade_stacked',
3812
'bzrlib.tests.test_urlutils',
3813
'bzrlib.tests.test_version',
3814
'bzrlib.tests.test_version_info',
3815
'bzrlib.tests.test_weave',
3816
'bzrlib.tests.test_whitebox',
3817
'bzrlib.tests.test_win32utils',
3818
'bzrlib.tests.test_workingtree',
3819
'bzrlib.tests.test_workingtree_4',
3820
'bzrlib.tests.test_wsgi',
3821
'bzrlib.tests.test_xml',
3825
def _test_suite_modules_to_doctest():
3826
"""Return the list of modules to doctest."""
3828
# GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3832
'bzrlib.branchbuilder',
3833
'bzrlib.decorators',
3836
'bzrlib.iterablefile',
3840
'bzrlib.symbol_versioning',
3843
'bzrlib.version_info_formats.format_custom',
3847
def test_suite(keep_only=None, starting_with=None):
3848
"""Build and return TestSuite for the whole of bzrlib.
3850
:param keep_only: A list of test ids limiting the suite returned.
3852
:param starting_with: An id limiting the suite returned to the tests
3855
This function can be replaced if you need to change the default test
3856
suite on a global basis, but it is not encouraged.
3859
loader = TestUtil.TestLoader()
3861
if keep_only is not None:
3862
id_filter = TestIdList(keep_only)
3864
# We take precedence over keep_only because *at loading time* using
3865
# both options means we will load less tests for the same final result.
3866
def interesting_module(name):
3867
for start in starting_with:
3869
# Either the module name starts with the specified string
3870
name.startswith(start)
3871
# or it may contain tests starting with the specified string
3872
or start.startswith(name)
3876
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3878
elif keep_only is not None:
3879
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3880
def interesting_module(name):
3881
return id_filter.refers_to(name)
3884
loader = TestUtil.TestLoader()
3885
def interesting_module(name):
3886
# No filtering, all modules are interesting
3889
suite = loader.suiteClass()
3891
# modules building their suite with loadTestsFromModuleNames
3892
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3894
for mod in _test_suite_modules_to_doctest():
3895
if not interesting_module(mod):
3896
# No tests to keep here, move along
3899
# note that this really does mean "report only" -- doctest
3900
# still runs the rest of the examples
3901
doc_suite = doctest.DocTestSuite(mod,
3902
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3903
except ValueError, e:
3904
print '**failed to get doctest for: %s\n%s' % (mod, e)
3906
if len(doc_suite._tests) == 0:
3907
raise errors.BzrError("no doctests found in %s" % (mod,))
3908
suite.addTest(doc_suite)
3910
default_encoding = sys.getdefaultencoding()
3911
for name, plugin in bzrlib.plugin.plugins().items():
3912
if not interesting_module(plugin.module.__name__):
3914
plugin_suite = plugin.test_suite()
3915
# We used to catch ImportError here and turn it into just a warning,
3916
# but really if you don't have --no-plugins this should be a failure.
3917
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3918
if plugin_suite is None:
3919
plugin_suite = plugin.load_plugin_tests(loader)
3920
if plugin_suite is not None:
3921
suite.addTest(plugin_suite)
3922
if default_encoding != sys.getdefaultencoding():
3923
bzrlib.trace.warning(
3924
'Plugin "%s" tried to reset default encoding to: %s', name,
3925
sys.getdefaultencoding())
3927
sys.setdefaultencoding(default_encoding)
3929
if keep_only is not None:
3930
# Now that the referred modules have loaded their tests, keep only the
3932
suite = filter_suite_by_id_list(suite, id_filter)
3933
# Do some sanity checks on the id_list filtering
3934
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3936
# The tester has used both keep_only and starting_with, so he is
3937
# already aware that some tests are excluded from the list, there
3938
# is no need to tell him which.
3941
# Some tests mentioned in the list are not in the test suite. The
3942
# list may be out of date, report to the tester.
3943
for id in not_found:
3944
bzrlib.trace.warning('"%s" not found in the test suite', id)
3945
for id in duplicates:
3946
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
1039
def adapt_modules(mods_list, adapter, loader, suite):
1040
"""Adapt the modules in mods_list using adapter and add to suite."""
1041
for mod_name in mods_list:
1042
mod = _load_module_by_name(mod_name)
1043
for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
1044
suite.addTests(adapter.adapt(test))
1047
def _load_module_by_name(mod_name):
1048
parts = mod_name.split('.')
1049
module = __import__(mod_name)
1051
# for historical reasons python returns the top-level module even though
1052
# it loads the submodule; we need to walk down to get the one we want.
1054
module = getattr(module, parts.pop(0))
3951
def multiply_scenarios(scenarios_left, scenarios_right):
3952
"""Multiply two sets of scenarios.
3954
:returns: the cartesian product of the two sets of scenarios, that is
3955
a scenario for every possible combination of a left scenario and a
3959
('%s,%s' % (left_name, right_name),
3960
dict(left_dict.items() + right_dict.items()))
3961
for left_name, left_dict in scenarios_left
3962
for right_name, right_dict in scenarios_right]
3965
def multiply_tests(tests, scenarios, result):
3966
"""Multiply tests_list by scenarios into result.
3968
This is the core workhorse for test parameterisation.
3970
Typically the load_tests() method for a per-implementation test suite will
3971
call multiply_tests and return the result.
3973
:param tests: The tests to parameterise.
3974
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3975
scenario_param_dict).
3976
:param result: A TestSuite to add created tests to.
3978
This returns the passed in result TestSuite with the cross product of all
3979
the tests repeated once for each scenario. Each test is adapted by adding
3980
the scenario name at the end of its id(), and updating the test object's
3981
__dict__ with the scenario_param_dict.
3983
>>> import bzrlib.tests.test_sampler
3984
>>> r = multiply_tests(
3985
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3986
... [('one', dict(param=1)),
3987
... ('two', dict(param=2))],
3989
>>> tests = list(iter_suite_tests(r))
3993
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3999
for test in iter_suite_tests(tests):
4000
apply_scenarios(test, scenarios, result)
4004
def apply_scenarios(test, scenarios, result):
4005
"""Apply the scenarios in scenarios to test and add to result.
4007
:param test: The test to apply scenarios to.
4008
:param scenarios: An iterable of scenarios to apply to test.
4010
:seealso: apply_scenario
4012
for scenario in scenarios:
4013
result.addTest(apply_scenario(test, scenario))
4017
def apply_scenario(test, scenario):
4018
"""Copy test and apply scenario to it.
4020
:param test: A test to adapt.
4021
:param scenario: A tuple describing the scenarion.
4022
The first element of the tuple is the new test id.
4023
The second element is a dict containing attributes to set on the
4025
:return: The adapted test.
4027
new_id = "%s(%s)" % (test.id(), scenario[0])
4028
new_test = clone_test(test, new_id)
4029
for name, value in scenario[1].items():
4030
setattr(new_test, name, value)
4034
def clone_test(test, new_id):
4035
"""Clone a test giving it a new id.
4037
:param test: The test to clone.
4038
:param new_id: The id to assign to it.
4039
:return: The new test.
4041
new_test = copy(test)
4042
new_test.id = lambda: new_id
4046
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4048
"""Helper for permutating tests against an extension module.
4050
This is meant to be used inside a modules 'load_tests()' function. It will
4051
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4052
against both implementations. Setting 'test.module' to the appropriate
4053
module. See bzrlib.tests.test__chk_map.load_tests as an example.
4055
:param standard_tests: A test suite to permute
4056
:param loader: A TestLoader
4057
:param py_module_name: The python path to a python module that can always
4058
be loaded, and will be considered the 'python' implementation. (eg
4059
'bzrlib._chk_map_py')
4060
:param ext_module_name: The python path to an extension module. If the
4061
module cannot be loaded, a single test will be added, which notes that
4062
the module is not available. If it can be loaded, all standard_tests
4063
will be run against that module.
4064
:return: (suite, feature) suite is a test-suite that has all the permuted
4065
tests. feature is the Feature object that can be used to determine if
4066
the module is available.
4069
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
4071
('python', {'module': py_module}),
4073
suite = loader.suiteClass()
4074
feature = ModuleAvailableFeature(ext_module_name)
4075
if feature.available():
4076
scenarios.append(('C', {'module': feature.module}))
4078
# the compiled module isn't available, so we add a failing test
4079
class FailWithoutFeature(TestCase):
4080
def test_fail(self):
4081
self.requireFeature(feature)
4082
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4083
result = multiply_tests(standard_tests, scenarios, suite)
4084
return result, feature
4087
def _rmtree_temp_dir(dirname, test_id=None):
4088
# If LANG=C we probably have created some bogus paths
4089
# which rmtree(unicode) will fail to delete
4090
# so make sure we are using rmtree(str) to delete everything
4091
# except on win32, where rmtree(str) will fail
4092
# since it doesn't have the property of byte-stream paths
4093
# (they are either ascii or mbcs)
4094
if sys.platform == 'win32':
4095
# make sure we are using the unicode win32 api
4096
dirname = unicode(dirname)
4098
dirname = dirname.encode(sys.getfilesystemencoding())
4100
osutils.rmtree(dirname)
4102
# We don't want to fail here because some useful display will be lost
4103
# otherwise. Polluting the tmp dir is bad, but not giving all the
4104
# possible info to the test runner is even worse.
4106
ui.ui_factory.clear_term()
4107
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4108
# Ugly, but the last thing we want here is fail, so bear with it.
4109
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4110
).encode('ascii', 'replace')
4111
sys.stderr.write('Unable to remove testing dir %s\n%s'
4112
% (os.path.basename(dirname), printable_e))
4115
class Feature(object):
4116
"""An operating system Feature."""
4119
self._available = None
4121
def available(self):
4122
"""Is the feature available?
4124
:return: True if the feature is available.
4126
if self._available is None:
4127
self._available = self._probe()
4128
return self._available
4131
"""Implement this method in concrete features.
4133
:return: True if the feature is available.
4135
raise NotImplementedError
4138
if getattr(self, 'feature_name', None):
4139
return self.feature_name()
4140
return self.__class__.__name__
4143
class _SymlinkFeature(Feature):
4146
return osutils.has_symlinks()
4148
def feature_name(self):
4151
SymlinkFeature = _SymlinkFeature()
4154
class _HardlinkFeature(Feature):
4157
return osutils.has_hardlinks()
4159
def feature_name(self):
4162
HardlinkFeature = _HardlinkFeature()
4165
class _OsFifoFeature(Feature):
4168
return getattr(os, 'mkfifo', None)
4170
def feature_name(self):
4171
return 'filesystem fifos'
4173
OsFifoFeature = _OsFifoFeature()
4176
class _UnicodeFilenameFeature(Feature):
4177
"""Does the filesystem support Unicode filenames?"""
4181
# Check for character combinations unlikely to be covered by any
4182
# single non-unicode encoding. We use the characters
4183
# - greek small letter alpha (U+03B1) and
4184
# - braille pattern dots-123456 (U+283F).
4185
os.stat(u'\u03b1\u283f')
4186
except UnicodeEncodeError:
4188
except (IOError, OSError):
4189
# The filesystem allows the Unicode filename but the file doesn't
4193
# The filesystem allows the Unicode filename and the file exists,
4197
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4200
class _CompatabilityThunkFeature(Feature):
4201
"""This feature is just a thunk to another feature.
4203
It issues a deprecation warning if it is accessed, to let you know that you
4204
should really use a different feature.
4207
def __init__(self, dep_version, module, name,
4208
replacement_name, replacement_module=None):
4209
super(_CompatabilityThunkFeature, self).__init__()
4210
self._module = module
4211
if replacement_module is None:
4212
replacement_module = module
4213
self._replacement_module = replacement_module
4215
self._replacement_name = replacement_name
4216
self._dep_version = dep_version
4217
self._feature = None
4220
if self._feature is None:
4221
depr_msg = self._dep_version % ('%s.%s'
4222
% (self._module, self._name))
4223
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4224
self._replacement_name)
4225
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4226
# Import the new feature and use it as a replacement for the
4228
mod = __import__(self._replacement_module, {}, {},
4229
[self._replacement_name])
4230
self._feature = getattr(mod, self._replacement_name)
4234
return self._feature._probe()
4237
class ModuleAvailableFeature(Feature):
4238
"""This is a feature than describes a module we want to be available.
4240
Declare the name of the module in __init__(), and then after probing, the
4241
module will be available as 'self.module'.
4243
:ivar module: The module if it is available, else None.
4246
def __init__(self, module_name):
4247
super(ModuleAvailableFeature, self).__init__()
4248
self.module_name = module_name
4252
self._module = __import__(self.module_name, {}, {}, [''])
4259
if self.available(): # Make sure the probe has been done
4263
def feature_name(self):
4264
return self.module_name
4267
# This is kept here for compatibility, it is recommended to use
4268
# 'bzrlib.tests.feature.paramiko' instead
4269
ParamikoFeature = _CompatabilityThunkFeature(
4270
deprecated_in((2,1,0)),
4271
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4274
def probe_unicode_in_user_encoding():
4275
"""Try to encode several unicode strings to use in unicode-aware tests.
4276
Return first successfull match.
4278
:return: (unicode value, encoded plain string value) or (None, None)
4280
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4281
for uni_val in possible_vals:
4283
str_val = uni_val.encode(osutils.get_user_encoding())
4284
except UnicodeEncodeError:
4285
# Try a different character
4288
return uni_val, str_val
4292
def probe_bad_non_ascii(encoding):
4293
"""Try to find [bad] character with code [128..255]
4294
that cannot be decoded to unicode in some encoding.
4295
Return None if all non-ascii characters is valid
4298
for i in xrange(128, 256):
4301
char.decode(encoding)
4302
except UnicodeDecodeError:
4307
class _HTTPSServerFeature(Feature):
4308
"""Some tests want an https Server, check if one is available.
4310
Right now, the only way this is available is under python2.6 which provides
4321
def feature_name(self):
4322
return 'HTTPSServer'
4325
HTTPSServerFeature = _HTTPSServerFeature()
4328
class _UnicodeFilename(Feature):
4329
"""Does the filesystem support Unicode filenames?"""
4334
except UnicodeEncodeError:
4336
except (IOError, OSError):
4337
# The filesystem allows the Unicode filename but the file doesn't
4341
# The filesystem allows the Unicode filename and the file exists,
4345
UnicodeFilename = _UnicodeFilename()
4348
class _UTF8Filesystem(Feature):
4349
"""Is the filesystem UTF-8?"""
4352
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4356
UTF8Filesystem = _UTF8Filesystem()
4359
class _BreakinFeature(Feature):
4360
"""Does this platform support the breakin feature?"""
4363
from bzrlib import breakin
4364
if breakin.determine_signal() is None:
4366
if sys.platform == 'win32':
4367
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4368
# We trigger SIGBREAK via a Console api so we need ctypes to
4369
# access the function
4376
def feature_name(self):
4377
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4380
BreakinFeature = _BreakinFeature()
4383
class _CaseInsCasePresFilenameFeature(Feature):
4384
"""Is the file-system case insensitive, but case-preserving?"""
4387
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4389
# first check truly case-preserving for created files, then check
4390
# case insensitive when opening existing files.
4391
name = osutils.normpath(name)
4392
base, rel = osutils.split(name)
4393
found_rel = osutils.canonical_relpath(base, name)
4394
return (found_rel == rel
4395
and os.path.isfile(name.upper())
4396
and os.path.isfile(name.lower()))
4401
def feature_name(self):
4402
return "case-insensitive case-preserving filesystem"
4404
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4407
class _CaseInsensitiveFilesystemFeature(Feature):
4408
"""Check if underlying filesystem is case-insensitive but *not* case
4411
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4412
# more likely to be case preserving, so this case is rare.
4415
if CaseInsCasePresFilenameFeature.available():
4418
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4419
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4420
TestCaseWithMemoryTransport.TEST_ROOT = root
4422
root = TestCaseWithMemoryTransport.TEST_ROOT
4423
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4425
name_a = osutils.pathjoin(tdir, 'a')
4426
name_A = osutils.pathjoin(tdir, 'A')
4428
result = osutils.isdir(name_A)
4429
_rmtree_temp_dir(tdir)
4432
def feature_name(self):
4433
return 'case-insensitive filesystem'
4435
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4438
class _CaseSensitiveFilesystemFeature(Feature):
4441
if CaseInsCasePresFilenameFeature.available():
4443
elif CaseInsensitiveFilesystemFeature.available():
4448
def feature_name(self):
4449
return 'case-sensitive filesystem'
4451
# new coding style is for feature instances to be lowercase
4452
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4455
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4456
SubUnitFeature = _CompatabilityThunkFeature(
4457
deprecated_in((2,1,0)),
4458
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4459
# Only define SubUnitBzrRunner if subunit is available.
4461
from subunit import TestProtocolClient
4462
from subunit.test_results import AutoTimingTestResultDecorator
4463
class SubUnitBzrRunner(TextTestRunner):
4464
def run(self, test):
4465
result = AutoTimingTestResultDecorator(
4466
TestProtocolClient(self.stream))
4472
class _PosixPermissionsFeature(Feature):
4476
# create temporary file and check if specified perms are maintained.
4479
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4480
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4483
os.chmod(name, write_perms)
4485
read_perms = os.stat(name).st_mode & 0777
4487
return (write_perms == read_perms)
4489
return (os.name == 'posix') and has_perms()
4491
def feature_name(self):
4492
return 'POSIX permissions support'
4494
posix_permissions_feature = _PosixPermissionsFeature()