1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
32
from cStringIO import StringIO
39
from pprint import pformat
44
from subprocess import Popen, PIPE, STDOUT
70
import bzrlib.commands
71
import bzrlib.timestamp
73
import bzrlib.inventory
74
import bzrlib.iterablefile
79
# lsprof not available
81
from bzrlib.merge import merge_inner
84
from bzrlib.smart import client, request, server
86
from bzrlib import symbol_versioning
87
from bzrlib.symbol_versioning import (
94
from bzrlib.transport import get_transport, pathfilter
95
import bzrlib.transport
96
from bzrlib.transport.local import LocalURLServer
97
from bzrlib.transport.memory import MemoryServer
98
from bzrlib.transport.readonly import ReadonlyServer
99
from bzrlib.trace import mutter, note
100
from bzrlib.tests import TestUtil
101
from bzrlib.tests.http_server import HttpServer
102
from bzrlib.tests.TestUtil import (
106
from bzrlib.tests.treeshape import build_tree_contents
107
from bzrlib.ui import NullProgressView
108
from bzrlib.ui.text import TextUIFactory
109
import bzrlib.version_info_formats.format_custom
110
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
112
# Mark this python module as being part of the implementation
113
# of unittest: this gives us better tracebacks where the last
114
# shown frame is the test code, not our assertXYZ.
117
default_transport = LocalURLServer
119
# Subunit result codes, defined here to prevent a hard dependency on subunit.
124
class ExtendedTestResult(unittest._TextTestResult):
125
"""Accepts, reports and accumulates the results of running tests.
127
Compared to the unittest version this class adds support for
128
profiling, benchmarking, stopping as soon as a test fails, and
129
skipping tests. There are further-specialized subclasses for
130
different types of display.
132
When a test finishes, in whatever way, it calls one of the addSuccess,
133
addFailure or addError classes. These in turn may redirect to a more
134
specific case for the special test results supported by our extended
137
Note that just one of these objects is fed the results from many tests.
142
def __init__(self, stream, descriptions, verbosity,
146
"""Construct new TestResult.
148
:param bench_history: Optionally, a writable file object to accumulate
151
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
152
if bench_history is not None:
153
from bzrlib.version import _get_bzr_source_tree
154
src_tree = _get_bzr_source_tree()
157
revision_id = src_tree.get_parent_ids()[0]
159
# XXX: if this is a brand new tree, do the same as if there
163
# XXX: If there's no branch, what should we do?
165
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
166
self._bench_history = bench_history
167
self.ui = ui.ui_factory
170
self.failure_count = 0
171
self.known_failure_count = 0
173
self.not_applicable_count = 0
174
self.unsupported = {}
176
self._overall_start_time = time.time()
177
self._strict = strict
179
def stopTestRun(self):
182
stopTime = time.time()
183
timeTaken = stopTime - self.startTime
185
self.stream.writeln(self.separator2)
186
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
187
run, run != 1 and "s" or "", timeTaken))
188
self.stream.writeln()
189
if not self.wasSuccessful():
190
self.stream.write("FAILED (")
191
failed, errored = map(len, (self.failures, self.errors))
193
self.stream.write("failures=%d" % failed)
195
if failed: self.stream.write(", ")
196
self.stream.write("errors=%d" % errored)
197
if self.known_failure_count:
198
if failed or errored: self.stream.write(", ")
199
self.stream.write("known_failure_count=%d" %
200
self.known_failure_count)
201
self.stream.writeln(")")
203
if self.known_failure_count:
204
self.stream.writeln("OK (known_failures=%d)" %
205
self.known_failure_count)
207
self.stream.writeln("OK")
208
if self.skip_count > 0:
209
skipped = self.skip_count
210
self.stream.writeln('%d test%s skipped' %
211
(skipped, skipped != 1 and "s" or ""))
213
for feature, count in sorted(self.unsupported.items()):
214
self.stream.writeln("Missing feature '%s' skipped %d tests." %
217
ok = self.wasStrictlySuccessful()
219
ok = self.wasSuccessful()
220
if TestCase._first_thread_leaker_id:
222
'%s is leaking threads among %d leaking tests.\n' % (
223
TestCase._first_thread_leaker_id,
224
TestCase._leaking_threads_tests))
225
# We don't report the main thread as an active one.
227
'%d non-main threads were left active in the end.\n'
228
% (TestCase._active_threads - 1))
230
def _extractBenchmarkTime(self, testCase):
231
"""Add a benchmark time for the current test case."""
232
return getattr(testCase, "_benchtime", None)
234
def _elapsedTestTimeString(self):
235
"""Return a time string for the overall time the current test has taken."""
236
return self._formatTime(time.time() - self._start_time)
238
def _testTimeString(self, testCase):
239
benchmark_time = self._extractBenchmarkTime(testCase)
240
if benchmark_time is not None:
241
return self._formatTime(benchmark_time) + "*"
243
return self._elapsedTestTimeString()
245
def _formatTime(self, seconds):
246
"""Format seconds as milliseconds with leading spaces."""
247
# some benchmarks can take thousands of seconds to run, so we need 8
249
return "%8dms" % (1000 * seconds)
251
def _shortened_test_description(self, test):
253
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
256
def startTest(self, test):
257
unittest.TestResult.startTest(self, test)
260
self.report_test_start(test)
261
test.number = self.count
262
self._recordTestStartTime()
264
def startTests(self):
266
if getattr(sys, 'frozen', None) is None:
267
bzr_path = osutils.realpath(sys.argv[0])
269
bzr_path = sys.executable
271
'testing: %s\n' % (bzr_path,))
274
bzrlib.__path__[0],))
276
' bzr-%s python-%s %s\n' % (
277
bzrlib.version_string,
278
bzrlib._format_version_tuple(sys.version_info),
279
platform.platform(aliased=1),
281
self.stream.write('\n')
283
def _recordTestStartTime(self):
284
"""Record that a test has started."""
285
self._start_time = time.time()
287
def _cleanupLogFile(self, test):
288
# We can only do this if we have one of our TestCases, not if
290
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
291
if setKeepLogfile is not None:
294
def addError(self, test, err):
295
"""Tell result that test finished with an error.
297
Called from the TestCase run() method when the test
298
fails with an unexpected error.
300
self._testConcluded(test)
301
if isinstance(err[1], TestNotApplicable):
302
return self._addNotApplicable(test, err)
303
elif isinstance(err[1], UnavailableFeature):
304
return self.addNotSupported(test, err[1].args[0])
307
unittest.TestResult.addError(self, test, err)
308
self.error_count += 1
309
self.report_error(test, err)
312
self._cleanupLogFile(test)
314
def addFailure(self, test, err):
315
"""Tell result that test failed.
317
Called from the TestCase run() method when the test
318
fails because e.g. an assert() method failed.
320
self._testConcluded(test)
321
if isinstance(err[1], KnownFailure):
322
return self._addKnownFailure(test, err)
325
unittest.TestResult.addFailure(self, test, err)
326
self.failure_count += 1
327
self.report_failure(test, err)
330
self._cleanupLogFile(test)
332
def addSuccess(self, test):
333
"""Tell result that test completed successfully.
335
Called from the TestCase run()
337
self._testConcluded(test)
338
if self._bench_history is not None:
339
benchmark_time = self._extractBenchmarkTime(test)
340
if benchmark_time is not None:
341
self._bench_history.write("%s %s\n" % (
342
self._formatTime(benchmark_time),
344
self.report_success(test)
345
self._cleanupLogFile(test)
346
unittest.TestResult.addSuccess(self, test)
347
test._log_contents = ''
349
def _testConcluded(self, test):
350
"""Common code when a test has finished.
352
Called regardless of whether it succeded, failed, etc.
356
def _addKnownFailure(self, test, err):
357
self.known_failure_count += 1
358
self.report_known_failure(test, err)
360
def addNotSupported(self, test, feature):
361
"""The test will not be run because of a missing feature.
363
# this can be called in two different ways: it may be that the
364
# test started running, and then raised (through addError)
365
# UnavailableFeature. Alternatively this method can be called
366
# while probing for features before running the tests; in that
367
# case we will see startTest and stopTest, but the test will never
369
self.unsupported.setdefault(str(feature), 0)
370
self.unsupported[str(feature)] += 1
371
self.report_unsupported(test, feature)
373
def addSkip(self, test, reason):
374
"""A test has not run for 'reason'."""
376
self.report_skip(test, reason)
378
def _addNotApplicable(self, test, skip_excinfo):
379
if isinstance(skip_excinfo[1], TestNotApplicable):
380
self.not_applicable_count += 1
381
self.report_not_applicable(test, skip_excinfo)
384
except KeyboardInterrupt:
387
self.addError(test, test.exc_info())
389
# seems best to treat this as success from point-of-view of unittest
390
# -- it actually does nothing so it barely matters :)
391
unittest.TestResult.addSuccess(self, test)
392
test._log_contents = ''
394
def printErrorList(self, flavour, errors):
395
for test, err in errors:
396
self.stream.writeln(self.separator1)
397
self.stream.write("%s: " % flavour)
398
self.stream.writeln(self.getDescription(test))
399
if getattr(test, '_get_log', None) is not None:
400
log_contents = test._get_log()
402
self.stream.write('\n')
404
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
405
self.stream.write('\n')
406
self.stream.write(log_contents)
407
self.stream.write('\n')
409
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
410
self.stream.write('\n')
411
self.stream.writeln(self.separator2)
412
self.stream.writeln("%s" % err)
414
def _post_mortem(self):
415
"""Start a PDB post mortem session."""
416
if os.environ.get('BZR_TEST_PDB', None):
417
import pdb;pdb.post_mortem()
419
def progress(self, offset, whence):
420
"""The test is adjusting the count of tests to run."""
421
if whence == SUBUNIT_SEEK_SET:
422
self.num_tests = offset
423
elif whence == SUBUNIT_SEEK_CUR:
424
self.num_tests += offset
426
raise errors.BzrError("Unknown whence %r" % whence)
428
def report_cleaning_up(self):
431
def startTestRun(self):
432
self.startTime = time.time()
434
def report_success(self, test):
437
def wasStrictlySuccessful(self):
438
if self.unsupported or self.known_failure_count:
440
return self.wasSuccessful()
443
class TextTestResult(ExtendedTestResult):
444
"""Displays progress and results of tests in text form"""
446
def __init__(self, stream, descriptions, verbosity,
451
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
452
bench_history, strict)
453
# We no longer pass them around, but just rely on the UIFactory stack
456
warnings.warn("Passing pb to TextTestResult is deprecated")
457
self.pb = self.ui.nested_progress_bar()
458
self.pb.show_pct = False
459
self.pb.show_spinner = False
460
self.pb.show_eta = False,
461
self.pb.show_count = False
462
self.pb.show_bar = False
463
self.pb.update_latency = 0
464
self.pb.show_transport_activity = False
466
def stopTestRun(self):
467
# called when the tests that are going to run have run
470
super(TextTestResult, self).stopTestRun()
472
def startTestRun(self):
473
super(TextTestResult, self).startTestRun()
474
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
476
def printErrors(self):
477
# clear the pb to make room for the error listing
479
super(TextTestResult, self).printErrors()
481
def _progress_prefix_text(self):
482
# the longer this text, the less space we have to show the test
484
a = '[%d' % self.count # total that have been run
485
# tests skipped as known not to be relevant are not important enough
487
## if self.skip_count:
488
## a += ', %d skip' % self.skip_count
489
## if self.known_failure_count:
490
## a += '+%dX' % self.known_failure_count
492
a +='/%d' % self.num_tests
494
runtime = time.time() - self._overall_start_time
496
a += '%dm%ds' % (runtime / 60, runtime % 60)
500
a += ', %d err' % self.error_count
501
if self.failure_count:
502
a += ', %d fail' % self.failure_count
503
# if self.unsupported:
504
# a += ', %d missing' % len(self.unsupported)
508
def report_test_start(self, test):
511
self._progress_prefix_text()
513
+ self._shortened_test_description(test))
515
def _test_description(self, test):
516
return self._shortened_test_description(test)
518
def report_error(self, test, err):
519
ui.ui_factory.note('ERROR: %s\n %s\n' % (
520
self._test_description(test),
524
def report_failure(self, test, err):
525
ui.ui_factory.note('FAIL: %s\n %s\n' % (
526
self._test_description(test),
530
def report_known_failure(self, test, err):
531
ui.ui_factory.note('XFAIL: %s\n%s\n' % (
532
self._test_description(test), err[1]))
534
def report_skip(self, test, reason):
537
def report_not_applicable(self, test, skip_excinfo):
540
def report_unsupported(self, test, feature):
541
"""test cannot be run because feature is missing."""
543
def report_cleaning_up(self):
544
self.pb.update('Cleaning up')
547
class VerboseTestResult(ExtendedTestResult):
548
"""Produce long output, with one line per test run plus times"""
550
def _ellipsize_to_right(self, a_string, final_width):
551
"""Truncate and pad a string, keeping the right hand side"""
552
if len(a_string) > final_width:
553
result = '...' + a_string[3-final_width:]
556
return result.ljust(final_width)
558
def startTestRun(self):
559
super(VerboseTestResult, self).startTestRun()
560
self.stream.write('running %d tests...\n' % self.num_tests)
562
def report_test_start(self, test):
564
name = self._shortened_test_description(test)
565
# width needs space for 6 char status, plus 1 for slash, plus an
566
# 11-char time string, plus a trailing blank
567
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
568
self.stream.write(self._ellipsize_to_right(name,
569
osutils.terminal_width()-18))
572
def _error_summary(self, err):
574
return '%s%s' % (indent, err[1])
576
def report_error(self, test, err):
577
self.stream.writeln('ERROR %s\n%s'
578
% (self._testTimeString(test),
579
self._error_summary(err)))
581
def report_failure(self, test, err):
582
self.stream.writeln(' FAIL %s\n%s'
583
% (self._testTimeString(test),
584
self._error_summary(err)))
586
def report_known_failure(self, test, err):
587
self.stream.writeln('XFAIL %s\n%s'
588
% (self._testTimeString(test),
589
self._error_summary(err)))
591
def report_success(self, test):
592
self.stream.writeln(' OK %s' % self._testTimeString(test))
593
for bench_called, stats in getattr(test, '_benchcalls', []):
594
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
595
stats.pprint(file=self.stream)
596
# flush the stream so that we get smooth output. This verbose mode is
597
# used to show the output in PQM.
600
def report_skip(self, test, reason):
601
self.stream.writeln(' SKIP %s\n%s'
602
% (self._testTimeString(test), reason))
604
def report_not_applicable(self, test, skip_excinfo):
605
self.stream.writeln(' N/A %s\n%s'
606
% (self._testTimeString(test),
607
self._error_summary(skip_excinfo)))
609
def report_unsupported(self, test, feature):
610
"""test cannot be run because feature is missing."""
611
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
612
%(self._testTimeString(test), feature))
615
class TextTestRunner(object):
616
stop_on_failure = False
624
result_decorators=None,
626
"""Create a TextTestRunner.
628
:param result_decorators: An optional list of decorators to apply
629
to the result object being used by the runner. Decorators are
630
applied left to right - the first element in the list is the
633
self.stream = unittest._WritelnDecorator(stream)
634
self.descriptions = descriptions
635
self.verbosity = verbosity
636
self._bench_history = bench_history
637
self._strict = strict
638
self._result_decorators = result_decorators or []
641
"Run the given test case or test suite."
642
if self.verbosity == 1:
643
result_class = TextTestResult
644
elif self.verbosity >= 2:
645
result_class = VerboseTestResult
646
original_result = result_class(self.stream,
649
bench_history=self._bench_history,
652
# Signal to result objects that look at stop early policy to stop,
653
original_result.stop_early = self.stop_on_failure
654
result = original_result
655
for decorator in self._result_decorators:
656
result = decorator(result)
657
result.stop_early = self.stop_on_failure
663
if isinstance(test, testtools.ConcurrentTestSuite):
664
# We need to catch bzr specific behaviors
665
result = BZRTransformingResult(result)
666
result.startTestRun()
671
# higher level code uses our extended protocol to determine
672
# what exit code to give.
673
return original_result
676
def iter_suite_tests(suite):
677
"""Return all tests in a suite, recursing through nested suites"""
678
if isinstance(suite, unittest.TestCase):
680
elif isinstance(suite, unittest.TestSuite):
682
for r in iter_suite_tests(item):
685
raise Exception('unknown type %r for object %r'
686
% (type(suite), suite))
689
class TestSkipped(Exception):
690
"""Indicates that a test was intentionally skipped, rather than failing."""
693
class TestNotApplicable(TestSkipped):
694
"""A test is not applicable to the situation where it was run.
696
This is only normally raised by parameterized tests, if they find that
697
the instance they're constructed upon does not support one aspect
702
class KnownFailure(AssertionError):
703
"""Indicates that a test failed in a precisely expected manner.
705
Such failures dont block the whole test suite from passing because they are
706
indicators of partially completed code or of future work. We have an
707
explicit error for them so that we can ensure that they are always visible:
708
KnownFailures are always shown in the output of bzr selftest.
712
class UnavailableFeature(Exception):
713
"""A feature required for this test was not available.
715
The feature should be used to construct the exception.
719
class CommandFailed(Exception):
723
class StringIOWrapper(object):
724
"""A wrapper around cStringIO which just adds an encoding attribute.
726
Internally we can check sys.stdout to see what the output encoding
727
should be. However, cStringIO has no encoding attribute that we can
728
set. So we wrap it instead.
733
def __init__(self, s=None):
735
self.__dict__['_cstring'] = StringIO(s)
737
self.__dict__['_cstring'] = StringIO()
739
def __getattr__(self, name, getattr=getattr):
740
return getattr(self.__dict__['_cstring'], name)
742
def __setattr__(self, name, val):
743
if name == 'encoding':
744
self.__dict__['encoding'] = val
746
return setattr(self._cstring, name, val)
749
class TestUIFactory(TextUIFactory):
750
"""A UI Factory for testing.
752
Hide the progress bar but emit note()s.
754
Allows get_password to be tested without real tty attached.
756
See also CannedInputUIFactory which lets you provide programmatic input in
759
# TODO: Capture progress events at the model level and allow them to be
760
# observed by tests that care.
762
# XXX: Should probably unify more with CannedInputUIFactory or a
763
# particular configuration of TextUIFactory, or otherwise have a clearer
764
# idea of how they're supposed to be different.
765
# See https://bugs.edge.launchpad.net/bzr/+bug/408213
767
def __init__(self, stdout=None, stderr=None, stdin=None):
768
if stdin is not None:
769
# We use a StringIOWrapper to be able to test various
770
# encodings, but the user is still responsible to
771
# encode the string and to set the encoding attribute
772
# of StringIOWrapper.
773
stdin = StringIOWrapper(stdin)
774
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
776
def get_non_echoed_password(self):
777
"""Get password from stdin without trying to handle the echo mode"""
778
password = self.stdin.readline()
781
if password[-1] == '\n':
782
password = password[:-1]
785
def make_progress_view(self):
786
return NullProgressView()
789
class TestCase(unittest.TestCase):
790
"""Base class for bzr unit tests.
792
Tests that need access to disk resources should subclass
793
TestCaseInTempDir not TestCase.
795
Error and debug log messages are redirected from their usual
796
location into a temporary file, the contents of which can be
797
retrieved by _get_log(). We use a real OS file, not an in-memory object,
798
so that it can also capture file IO. When the test completes this file
799
is read into memory and removed from disk.
801
There are also convenience functions to invoke bzr's command-line
802
routine, and to build and check bzr trees.
804
In addition to the usual method of overriding tearDown(), this class also
805
allows subclasses to register functions into the _cleanups list, which is
806
run in order as the object is torn down. It's less likely this will be
807
accidentally overlooked.
810
_active_threads = None
811
_leaking_threads_tests = 0
812
_first_thread_leaker_id = None
813
_log_file_name = None
815
_keep_log_file = False
816
# record lsprof data when performing benchmark calls.
817
_gather_lsprof_in_benchmarks = False
818
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
819
'_log_contents', '_log_file_name', '_benchtime',
820
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
822
def __init__(self, methodName='testMethod'):
823
super(TestCase, self).__init__(methodName)
825
self._bzr_test_setUp_run = False
826
self._bzr_test_tearDown_run = False
827
self._directory_isolation = True
830
unittest.TestCase.setUp(self)
831
self._bzr_test_setUp_run = True
832
self._cleanEnvironment()
835
self._benchcalls = []
836
self._benchtime = None
838
self._track_transports()
840
self._clear_debug_flags()
841
TestCase._active_threads = threading.activeCount()
842
self.addCleanup(self._check_leaked_threads)
847
pdb.Pdb().set_trace(sys._getframe().f_back)
849
def _check_leaked_threads(self):
850
active = threading.activeCount()
851
leaked_threads = active - TestCase._active_threads
852
TestCase._active_threads = active
853
# If some tests make the number of threads *decrease*, we'll consider
854
# that they are just observing old threads dieing, not agressively kill
855
# random threads. So we don't report these tests as leaking. The risk
856
# is that we have false positives that way (the test see 2 threads
857
# going away but leak one) but it seems less likely than the actual
858
# false positives (the test see threads going away and does not leak).
859
if leaked_threads > 0:
860
TestCase._leaking_threads_tests += 1
861
if TestCase._first_thread_leaker_id is None:
862
TestCase._first_thread_leaker_id = self.id()
864
def _clear_debug_flags(self):
865
"""Prevent externally set debug flags affecting tests.
867
Tests that want to use debug flags can just set them in the
868
debug_flags set during setup/teardown.
870
self._preserved_debug_flags = set(debug.debug_flags)
871
if 'allow_debug' not in selftest_debug_flags:
872
debug.debug_flags.clear()
873
if 'disable_lock_checks' not in selftest_debug_flags:
874
debug.debug_flags.add('strict_locks')
875
self.addCleanup(self._restore_debug_flags)
877
def _clear_hooks(self):
878
# prevent hooks affecting tests
879
self._preserved_hooks = {}
880
for key, factory in hooks.known_hooks.items():
881
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
882
current_hooks = hooks.known_hooks_key_to_object(key)
883
self._preserved_hooks[parent] = (name, current_hooks)
884
self.addCleanup(self._restoreHooks)
885
for key, factory in hooks.known_hooks.items():
886
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
887
setattr(parent, name, factory())
888
# this hook should always be installed
889
request._install_hook()
891
def disable_directory_isolation(self):
892
"""Turn off directory isolation checks."""
893
self._directory_isolation = False
895
def enable_directory_isolation(self):
896
"""Enable directory isolation checks."""
897
self._directory_isolation = True
899
def _silenceUI(self):
900
"""Turn off UI for duration of test"""
901
# by default the UI is off; tests can turn it on if they want it.
902
saved = ui.ui_factory
904
ui.ui_factory = saved
905
ui.ui_factory = ui.SilentUIFactory()
906
self.addCleanup(_restore)
908
def _check_locks(self):
909
"""Check that all lock take/release actions have been paired."""
910
# We always check for mismatched locks. If a mismatch is found, we
911
# fail unless -Edisable_lock_checks is supplied to selftest, in which
912
# case we just print a warning.
914
acquired_locks = [lock for action, lock in self._lock_actions
915
if action == 'acquired']
916
released_locks = [lock for action, lock in self._lock_actions
917
if action == 'released']
918
broken_locks = [lock for action, lock in self._lock_actions
919
if action == 'broken']
920
# trivially, given the tests for lock acquistion and release, if we
921
# have as many in each list, it should be ok. Some lock tests also
922
# break some locks on purpose and should be taken into account by
923
# considering that breaking a lock is just a dirty way of releasing it.
924
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
925
message = ('Different number of acquired and '
926
'released or broken locks. (%s, %s + %s)' %
927
(acquired_locks, released_locks, broken_locks))
928
if not self._lock_check_thorough:
929
# Rather than fail, just warn
930
print "Broken test %s: %s" % (self, message)
934
def _track_locks(self):
935
"""Track lock activity during tests."""
936
self._lock_actions = []
937
if 'disable_lock_checks' in selftest_debug_flags:
938
self._lock_check_thorough = False
940
self._lock_check_thorough = True
942
self.addCleanup(self._check_locks)
943
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
944
self._lock_acquired, None)
945
_mod_lock.Lock.hooks.install_named_hook('lock_released',
946
self._lock_released, None)
947
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
948
self._lock_broken, None)
950
def _lock_acquired(self, result):
951
self._lock_actions.append(('acquired', result))
953
def _lock_released(self, result):
954
self._lock_actions.append(('released', result))
956
def _lock_broken(self, result):
957
self._lock_actions.append(('broken', result))
959
def permit_dir(self, name):
960
"""Permit a directory to be used by this test. See permit_url."""
961
name_transport = get_transport(name)
962
self.permit_url(name)
963
self.permit_url(name_transport.base)
965
def permit_url(self, url):
966
"""Declare that url is an ok url to use in this test.
968
Do this for memory transports, temporary test directory etc.
970
Do not do this for the current working directory, /tmp, or any other
971
preexisting non isolated url.
973
if not url.endswith('/'):
975
self._bzr_selftest_roots.append(url)
977
def permit_source_tree_branch_repo(self):
978
"""Permit the source tree bzr is running from to be opened.
980
Some code such as bzrlib.version attempts to read from the bzr branch
981
that bzr is executing from (if any). This method permits that directory
982
to be used in the test suite.
984
path = self.get_source_path()
985
self.record_directory_isolation()
988
workingtree.WorkingTree.open(path)
989
except (errors.NotBranchError, errors.NoWorkingTree):
992
self.enable_directory_isolation()
994
def _preopen_isolate_transport(self, transport):
995
"""Check that all transport openings are done in the test work area."""
996
while isinstance(transport, pathfilter.PathFilteringTransport):
997
# Unwrap pathfiltered transports
998
transport = transport.server.backing_transport.clone(
999
transport._filter('.'))
1000
url = transport.base
1001
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1002
# urls it is given by prepending readonly+. This is appropriate as the
1003
# client shouldn't know that the server is readonly (or not readonly).
1004
# We could register all servers twice, with readonly+ prepending, but
1005
# that makes for a long list; this is about the same but easier to
1007
if url.startswith('readonly+'):
1008
url = url[len('readonly+'):]
1009
self._preopen_isolate_url(url)
1011
def _preopen_isolate_url(self, url):
1012
if not self._directory_isolation:
1014
if self._directory_isolation == 'record':
1015
self._bzr_selftest_roots.append(url)
1017
# This prevents all transports, including e.g. sftp ones backed on disk
1018
# from working unless they are explicitly granted permission. We then
1019
# depend on the code that sets up test transports to check that they are
1020
# appropriately isolated and enable their use by calling
1021
# self.permit_transport()
1022
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1023
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1024
% (url, self._bzr_selftest_roots))
1026
def record_directory_isolation(self):
1027
"""Gather accessed directories to permit later access.
1029
This is used for tests that access the branch bzr is running from.
1031
self._directory_isolation = "record"
1033
def start_server(self, transport_server, backing_server=None):
1034
"""Start transport_server for this test.
1036
This starts the server, registers a cleanup for it and permits the
1037
server's urls to be used.
1039
if backing_server is None:
1040
transport_server.setUp()
1042
transport_server.setUp(backing_server)
1043
self.addCleanup(transport_server.tearDown)
1044
# Obtain a real transport because if the server supplies a password, it
1045
# will be hidden from the base on the client side.
1046
t = get_transport(transport_server.get_url())
1047
# Some transport servers effectively chroot the backing transport;
1048
# others like SFTPServer don't - users of the transport can walk up the
1049
# transport to read the entire backing transport. This wouldn't matter
1050
# except that the workdir tests are given - and that they expect the
1051
# server's url to point at - is one directory under the safety net. So
1052
# Branch operations into the transport will attempt to walk up one
1053
# directory. Chrooting all servers would avoid this but also mean that
1054
# we wouldn't be testing directly against non-root urls. Alternatively
1055
# getting the test framework to start the server with a backing server
1056
# at the actual safety net directory would work too, but this then
1057
# means that the self.get_url/self.get_transport methods would need
1058
# to transform all their results. On balance its cleaner to handle it
1059
# here, and permit a higher url when we have one of these transports.
1060
if t.base.endswith('/work/'):
1061
# we have safety net/test root/work
1062
t = t.clone('../..')
1063
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1064
# The smart server adds a path similar to work, which is traversed
1065
# up from by the client. But the server is chrooted - the actual
1066
# backing transport is not escaped from, and VFS requests to the
1067
# root will error (because they try to escape the chroot).
1069
while t2.base != t.base:
1072
self.permit_url(t.base)
1074
def _track_transports(self):
1075
"""Install checks for transport usage."""
1076
# TestCase has no safe place it can write to.
1077
self._bzr_selftest_roots = []
1078
# Currently the easiest way to be sure that nothing is going on is to
1079
# hook into bzr dir opening. This leaves a small window of error for
1080
# transport tests, but they are well known, and we can improve on this
1082
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1083
self._preopen_isolate_transport, "Check bzr directories are safe.")
1085
def _ndiff_strings(self, a, b):
1086
"""Return ndiff between two strings containing lines.
1088
A trailing newline is added if missing to make the strings
1090
if b and b[-1] != '\n':
1092
if a and a[-1] != '\n':
1094
difflines = difflib.ndiff(a.splitlines(True),
1096
linejunk=lambda x: False,
1097
charjunk=lambda x: False)
1098
return ''.join(difflines)
1100
def assertEqual(self, a, b, message=''):
1104
except UnicodeError, e:
1105
# If we can't compare without getting a UnicodeError, then
1106
# obviously they are different
1107
mutter('UnicodeError: %s', e)
1110
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1112
pformat(a), pformat(b)))
1114
assertEquals = assertEqual
1116
def assertEqualDiff(self, a, b, message=None):
1117
"""Assert two texts are equal, if not raise an exception.
1119
This is intended for use with multi-line strings where it can
1120
be hard to find the differences by eye.
1122
# TODO: perhaps override assertEquals to call this for strings?
1126
message = "texts not equal:\n"
1128
message = 'first string is missing a final newline.\n'
1130
message = 'second string is missing a final newline.\n'
1131
raise AssertionError(message +
1132
self._ndiff_strings(a, b))
1134
def assertEqualMode(self, mode, mode_test):
1135
self.assertEqual(mode, mode_test,
1136
'mode mismatch %o != %o' % (mode, mode_test))
1138
def assertEqualStat(self, expected, actual):
1139
"""assert that expected and actual are the same stat result.
1141
:param expected: A stat result.
1142
:param actual: A stat result.
1143
:raises AssertionError: If the expected and actual stat values differ
1144
other than by atime.
1146
self.assertEqual(expected.st_size, actual.st_size)
1147
self.assertEqual(expected.st_mtime, actual.st_mtime)
1148
self.assertEqual(expected.st_ctime, actual.st_ctime)
1149
self.assertEqual(expected.st_dev, actual.st_dev)
1150
self.assertEqual(expected.st_ino, actual.st_ino)
1151
self.assertEqual(expected.st_mode, actual.st_mode)
1153
def assertLength(self, length, obj_with_len):
1154
"""Assert that obj_with_len is of length length."""
1155
if len(obj_with_len) != length:
1156
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1157
length, len(obj_with_len), obj_with_len))
1159
def assertLogsError(self, exception_class, func, *args, **kwargs):
1160
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1162
from bzrlib import trace
1164
orig_log_exception_quietly = trace.log_exception_quietly
1167
orig_log_exception_quietly()
1168
captured.append(sys.exc_info())
1169
trace.log_exception_quietly = capture
1170
func(*args, **kwargs)
1172
trace.log_exception_quietly = orig_log_exception_quietly
1173
self.assertLength(1, captured)
1174
err = captured[0][1]
1175
self.assertIsInstance(err, exception_class)
1178
def assertPositive(self, val):
1179
"""Assert that val is greater than 0."""
1180
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1182
def assertNegative(self, val):
1183
"""Assert that val is less than 0."""
1184
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1186
def assertStartsWith(self, s, prefix):
1187
if not s.startswith(prefix):
1188
raise AssertionError('string %r does not start with %r' % (s, prefix))
1190
def assertEndsWith(self, s, suffix):
1191
"""Asserts that s ends with suffix."""
1192
if not s.endswith(suffix):
1193
raise AssertionError('string %r does not end with %r' % (s, suffix))
1195
def assertContainsRe(self, haystack, needle_re, flags=0):
1196
"""Assert that a contains something matching a regular expression."""
1197
if not re.search(needle_re, haystack, flags):
1198
if '\n' in haystack or len(haystack) > 60:
1199
# a long string, format it in a more readable way
1200
raise AssertionError(
1201
'pattern "%s" not found in\n"""\\\n%s"""\n'
1202
% (needle_re, haystack))
1204
raise AssertionError('pattern "%s" not found in "%s"'
1205
% (needle_re, haystack))
1207
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1208
"""Assert that a does not match a regular expression"""
1209
if re.search(needle_re, haystack, flags):
1210
raise AssertionError('pattern "%s" found in "%s"'
1211
% (needle_re, haystack))
1213
def assertSubset(self, sublist, superlist):
1214
"""Assert that every entry in sublist is present in superlist."""
1215
missing = set(sublist) - set(superlist)
1216
if len(missing) > 0:
1217
raise AssertionError("value(s) %r not present in container %r" %
1218
(missing, superlist))
1220
def assertListRaises(self, excClass, func, *args, **kwargs):
1221
"""Fail unless excClass is raised when the iterator from func is used.
1223
Many functions can return generators this makes sure
1224
to wrap them in a list() call to make sure the whole generator
1225
is run, and that the proper exception is raised.
1228
list(func(*args, **kwargs))
1232
if getattr(excClass,'__name__', None) is not None:
1233
excName = excClass.__name__
1235
excName = str(excClass)
1236
raise self.failureException, "%s not raised" % excName
1238
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1239
"""Assert that a callable raises a particular exception.
1241
:param excClass: As for the except statement, this may be either an
1242
exception class, or a tuple of classes.
1243
:param callableObj: A callable, will be passed ``*args`` and
1246
Returns the exception so that you can examine it.
1249
callableObj(*args, **kwargs)
1253
if getattr(excClass,'__name__', None) is not None:
1254
excName = excClass.__name__
1257
excName = str(excClass)
1258
raise self.failureException, "%s not raised" % excName
1260
def assertIs(self, left, right, message=None):
1261
if not (left is right):
1262
if message is not None:
1263
raise AssertionError(message)
1265
raise AssertionError("%r is not %r." % (left, right))
1267
def assertIsNot(self, left, right, message=None):
1269
if message is not None:
1270
raise AssertionError(message)
1272
raise AssertionError("%r is %r." % (left, right))
1274
def assertTransportMode(self, transport, path, mode):
1275
"""Fail if a path does not have mode "mode".
1277
If modes are not supported on this transport, the assertion is ignored.
1279
if not transport._can_roundtrip_unix_modebits():
1281
path_stat = transport.stat(path)
1282
actual_mode = stat.S_IMODE(path_stat.st_mode)
1283
self.assertEqual(mode, actual_mode,
1284
'mode of %r incorrect (%s != %s)'
1285
% (path, oct(mode), oct(actual_mode)))
1287
def assertIsSameRealPath(self, path1, path2):
1288
"""Fail if path1 and path2 points to different files"""
1289
self.assertEqual(osutils.realpath(path1),
1290
osutils.realpath(path2),
1291
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1293
def assertIsInstance(self, obj, kls, msg=None):
1294
"""Fail if obj is not an instance of kls
1296
:param msg: Supplementary message to show if the assertion fails.
1298
if not isinstance(obj, kls):
1299
m = "%r is an instance of %s rather than %s" % (
1300
obj, obj.__class__, kls)
1305
def expectFailure(self, reason, assertion, *args, **kwargs):
1306
"""Invoke a test, expecting it to fail for the given reason.
1308
This is for assertions that ought to succeed, but currently fail.
1309
(The failure is *expected* but not *wanted*.) Please be very precise
1310
about the failure you're expecting. If a new bug is introduced,
1311
AssertionError should be raised, not KnownFailure.
1313
Frequently, expectFailure should be followed by an opposite assertion.
1316
Intended to be used with a callable that raises AssertionError as the
1317
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1319
Raises KnownFailure if the test fails. Raises AssertionError if the
1324
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1326
self.assertEqual(42, dynamic_val)
1328
This means that a dynamic_val of 54 will cause the test to raise
1329
a KnownFailure. Once math is fixed and the expectFailure is removed,
1330
only a dynamic_val of 42 will allow the test to pass. Anything other
1331
than 54 or 42 will cause an AssertionError.
1334
assertion(*args, **kwargs)
1335
except AssertionError:
1336
raise KnownFailure(reason)
1338
self.fail('Unexpected success. Should have failed: %s' % reason)
1340
def assertFileEqual(self, content, path):
1341
"""Fail if path does not contain 'content'."""
1342
self.failUnlessExists(path)
1343
f = file(path, 'rb')
1348
self.assertEqualDiff(content, s)
1350
def failUnlessExists(self, path):
1351
"""Fail unless path or paths, which may be abs or relative, exist."""
1352
if not isinstance(path, basestring):
1354
self.failUnlessExists(p)
1356
self.failUnless(osutils.lexists(path),path+" does not exist")
1358
def failIfExists(self, path):
1359
"""Fail if path or paths, which may be abs or relative, exist."""
1360
if not isinstance(path, basestring):
1362
self.failIfExists(p)
1364
self.failIf(osutils.lexists(path),path+" exists")
1366
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1367
"""A helper for callDeprecated and applyDeprecated.
1369
:param a_callable: A callable to call.
1370
:param args: The positional arguments for the callable
1371
:param kwargs: The keyword arguments for the callable
1372
:return: A tuple (warnings, result). result is the result of calling
1373
a_callable(``*args``, ``**kwargs``).
1376
def capture_warnings(msg, cls=None, stacklevel=None):
1377
# we've hooked into a deprecation specific callpath,
1378
# only deprecations should getting sent via it.
1379
self.assertEqual(cls, DeprecationWarning)
1380
local_warnings.append(msg)
1381
original_warning_method = symbol_versioning.warn
1382
symbol_versioning.set_warning_method(capture_warnings)
1384
result = a_callable(*args, **kwargs)
1386
symbol_versioning.set_warning_method(original_warning_method)
1387
return (local_warnings, result)
1389
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1390
"""Call a deprecated callable without warning the user.
1392
Note that this only captures warnings raised by symbol_versioning.warn,
1393
not other callers that go direct to the warning module.
1395
To test that a deprecated method raises an error, do something like
1398
self.assertRaises(errors.ReservedId,
1399
self.applyDeprecated,
1400
deprecated_in((1, 5, 0)),
1404
:param deprecation_format: The deprecation format that the callable
1405
should have been deprecated with. This is the same type as the
1406
parameter to deprecated_method/deprecated_function. If the
1407
callable is not deprecated with this format, an assertion error
1409
:param a_callable: A callable to call. This may be a bound method or
1410
a regular function. It will be called with ``*args`` and
1412
:param args: The positional arguments for the callable
1413
:param kwargs: The keyword arguments for the callable
1414
:return: The result of a_callable(``*args``, ``**kwargs``)
1416
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1418
expected_first_warning = symbol_versioning.deprecation_string(
1419
a_callable, deprecation_format)
1420
if len(call_warnings) == 0:
1421
self.fail("No deprecation warning generated by call to %s" %
1423
self.assertEqual(expected_first_warning, call_warnings[0])
1426
def callCatchWarnings(self, fn, *args, **kw):
1427
"""Call a callable that raises python warnings.
1429
The caller's responsible for examining the returned warnings.
1431
If the callable raises an exception, the exception is not
1432
caught and propagates up to the caller. In that case, the list
1433
of warnings is not available.
1435
:returns: ([warning_object, ...], fn_result)
1437
# XXX: This is not perfect, because it completely overrides the
1438
# warnings filters, and some code may depend on suppressing particular
1439
# warnings. It's the easiest way to insulate ourselves from -Werror,
1440
# though. -- Andrew, 20071062
1442
def _catcher(message, category, filename, lineno, file=None, line=None):
1443
# despite the name, 'message' is normally(?) a Warning subclass
1445
wlist.append(message)
1446
saved_showwarning = warnings.showwarning
1447
saved_filters = warnings.filters
1449
warnings.showwarning = _catcher
1450
warnings.filters = []
1451
result = fn(*args, **kw)
1453
warnings.showwarning = saved_showwarning
1454
warnings.filters = saved_filters
1455
return wlist, result
1457
def callDeprecated(self, expected, callable, *args, **kwargs):
1458
"""Assert that a callable is deprecated in a particular way.
1460
This is a very precise test for unusual requirements. The
1461
applyDeprecated helper function is probably more suited for most tests
1462
as it allows you to simply specify the deprecation format being used
1463
and will ensure that that is issued for the function being called.
1465
Note that this only captures warnings raised by symbol_versioning.warn,
1466
not other callers that go direct to the warning module. To catch
1467
general warnings, use callCatchWarnings.
1469
:param expected: a list of the deprecation warnings expected, in order
1470
:param callable: The callable to call
1471
:param args: The positional arguments for the callable
1472
:param kwargs: The keyword arguments for the callable
1474
call_warnings, result = self._capture_deprecation_warnings(callable,
1476
self.assertEqual(expected, call_warnings)
1479
def _startLogFile(self):
1480
"""Send bzr and test log messages to a temporary file.
1482
The file is removed as the test is torn down.
1484
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1485
self._log_file = os.fdopen(fileno, 'w+')
1486
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1487
self._log_file_name = name
1488
self.addCleanup(self._finishLogFile)
1490
def _finishLogFile(self):
1491
"""Finished with the log file.
1493
Close the file and delete it, unless setKeepLogfile was called.
1495
if self._log_file is None:
1497
bzrlib.trace.pop_log_file(self._log_memento)
1498
self._log_file.close()
1499
self._log_file = None
1500
if not self._keep_log_file:
1501
os.remove(self._log_file_name)
1502
self._log_file_name = None
1504
def setKeepLogfile(self):
1505
"""Make the logfile not be deleted when _finishLogFile is called."""
1506
self._keep_log_file = True
1508
def thisFailsStrictLockCheck(self):
1509
"""It is known that this test would fail with -Dstrict_locks.
1511
By default, all tests are run with strict lock checking unless
1512
-Edisable_lock_checks is supplied. However there are some tests which
1513
we know fail strict locks at this point that have not been fixed.
1514
They should call this function to disable the strict checking.
1516
This should be used sparingly, it is much better to fix the locking
1517
issues rather than papering over the problem by calling this function.
1519
debug.debug_flags.discard('strict_locks')
1521
def addCleanup(self, callable, *args, **kwargs):
1522
"""Arrange to run a callable when this case is torn down.
1524
Callables are run in the reverse of the order they are registered,
1525
ie last-in first-out.
1527
self._cleanups.append((callable, args, kwargs))
1529
def _cleanEnvironment(self):
1531
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1532
'HOME': os.getcwd(),
1533
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1534
# tests do check our impls match APPDATA
1535
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1539
'BZREMAIL': None, # may still be present in the environment
1541
'BZR_PROGRESS_BAR': None,
1543
'BZR_PLUGIN_PATH': None,
1544
# Make sure that any text ui tests are consistent regardless of
1545
# the environment the test case is run in; you may want tests that
1546
# test other combinations. 'dumb' is a reasonable guess for tests
1547
# going to a pipe or a StringIO.
1552
'SSH_AUTH_SOCK': None,
1556
'https_proxy': None,
1557
'HTTPS_PROXY': None,
1562
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1563
# least. If you do (care), please update this comment
1567
'BZR_REMOTE_PATH': None,
1570
self.addCleanup(self._restoreEnvironment)
1571
for name, value in new_env.iteritems():
1572
self._captureVar(name, value)
1574
def _captureVar(self, name, newvalue):
1575
"""Set an environment variable, and reset it when finished."""
1576
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1578
def _restore_debug_flags(self):
1579
debug.debug_flags.clear()
1580
debug.debug_flags.update(self._preserved_debug_flags)
1582
def _restoreEnvironment(self):
1583
for name, value in self.__old_env.iteritems():
1584
osutils.set_or_unset_env(name, value)
1586
def _restoreHooks(self):
1587
for klass, (name, hooks) in self._preserved_hooks.items():
1588
setattr(klass, name, hooks)
1590
def knownFailure(self, reason):
1591
"""This test has failed for some known reason."""
1592
raise KnownFailure(reason)
1594
def _do_skip(self, result, reason):
1595
addSkip = getattr(result, 'addSkip', None)
1596
if not callable(addSkip):
1597
result.addError(self, sys.exc_info())
1599
addSkip(self, reason)
1601
def run(self, result=None):
1602
if result is None: result = self.defaultTestResult()
1603
for feature in getattr(self, '_test_needs_features', []):
1604
if not feature.available():
1605
result.startTest(self)
1606
if getattr(result, 'addNotSupported', None):
1607
result.addNotSupported(self, feature)
1609
result.addSuccess(self)
1610
result.stopTest(self)
1614
result.startTest(self)
1615
absent_attr = object()
1617
method_name = getattr(self, '_testMethodName', absent_attr)
1618
if method_name is absent_attr:
1620
method_name = getattr(self, '_TestCase__testMethodName')
1621
testMethod = getattr(self, method_name)
1625
if not self._bzr_test_setUp_run:
1627
"test setUp did not invoke "
1628
"bzrlib.tests.TestCase's setUp")
1629
except KeyboardInterrupt:
1632
except TestSkipped, e:
1633
self._do_skip(result, e.args[0])
1637
result.addError(self, sys.exc_info())
1645
except self.failureException:
1646
result.addFailure(self, sys.exc_info())
1647
except TestSkipped, e:
1649
reason = "No reason given."
1652
self._do_skip(result, reason)
1653
except KeyboardInterrupt:
1657
result.addError(self, sys.exc_info())
1661
if not self._bzr_test_tearDown_run:
1663
"test tearDown did not invoke "
1664
"bzrlib.tests.TestCase's tearDown")
1665
except KeyboardInterrupt:
1669
result.addError(self, sys.exc_info())
1672
if ok: result.addSuccess(self)
1674
result.stopTest(self)
1676
except TestNotApplicable:
1677
# Not moved from the result [yet].
1680
except KeyboardInterrupt:
1685
for attr_name in self.attrs_to_keep:
1686
if attr_name in self.__dict__:
1687
saved_attrs[attr_name] = self.__dict__[attr_name]
1688
self.__dict__ = saved_attrs
1692
self._log_contents = ''
1693
self._bzr_test_tearDown_run = True
1694
unittest.TestCase.tearDown(self)
1696
def time(self, callable, *args, **kwargs):
1697
"""Run callable and accrue the time it takes to the benchmark time.
1699
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1700
this will cause lsprofile statistics to be gathered and stored in
1703
if self._benchtime is None:
1707
if not self._gather_lsprof_in_benchmarks:
1708
return callable(*args, **kwargs)
1710
# record this benchmark
1711
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1713
self._benchcalls.append(((callable, args, kwargs), stats))
1716
self._benchtime += time.time() - start
1718
def _runCleanups(self):
1719
"""Run registered cleanup functions.
1721
This should only be called from TestCase.tearDown.
1723
# TODO: Perhaps this should keep running cleanups even if
1724
# one of them fails?
1726
# Actually pop the cleanups from the list so tearDown running
1727
# twice is safe (this happens for skipped tests).
1728
while self._cleanups:
1729
cleanup, args, kwargs = self._cleanups.pop()
1730
cleanup(*args, **kwargs)
1732
def log(self, *args):
1735
def _get_log(self, keep_log_file=False):
1736
"""Get the log from bzrlib.trace calls from this test.
1738
:param keep_log_file: When True, if the log is still a file on disk
1739
leave it as a file on disk. When False, if the log is still a file
1740
on disk, the log file is deleted and the log preserved as
1742
:return: A string containing the log.
1744
# flush the log file, to get all content
1746
if bzrlib.trace._trace_file:
1747
bzrlib.trace._trace_file.flush()
1748
if self._log_contents:
1749
# XXX: this can hardly contain the content flushed above --vila
1751
return self._log_contents
1752
if self._log_file_name is not None:
1753
logfile = open(self._log_file_name)
1755
log_contents = logfile.read()
1758
if not keep_log_file:
1759
self._log_contents = log_contents
1761
os.remove(self._log_file_name)
1763
if sys.platform == 'win32' and e.errno == errno.EACCES:
1764
sys.stderr.write(('Unable to delete log file '
1765
' %r\n' % self._log_file_name))
1770
return "DELETED log file to reduce memory footprint"
1772
def requireFeature(self, feature):
1773
"""This test requires a specific feature is available.
1775
:raises UnavailableFeature: When feature is not available.
1777
if not feature.available():
1778
raise UnavailableFeature(feature)
1780
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1782
"""Run bazaar command line, splitting up a string command line."""
1783
if isinstance(args, basestring):
1784
# shlex don't understand unicode strings,
1785
# so args should be plain string (bialix 20070906)
1786
args = list(shlex.split(str(args)))
1787
return self._run_bzr_core(args, retcode=retcode,
1788
encoding=encoding, stdin=stdin, working_dir=working_dir,
1791
def _run_bzr_core(self, args, retcode, encoding, stdin,
1793
if encoding is None:
1794
encoding = osutils.get_user_encoding()
1795
stdout = StringIOWrapper()
1796
stderr = StringIOWrapper()
1797
stdout.encoding = encoding
1798
stderr.encoding = encoding
1800
self.log('run bzr: %r', args)
1801
# FIXME: don't call into logging here
1802
handler = logging.StreamHandler(stderr)
1803
handler.setLevel(logging.INFO)
1804
logger = logging.getLogger('')
1805
logger.addHandler(handler)
1806
old_ui_factory = ui.ui_factory
1807
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1810
if working_dir is not None:
1811
cwd = osutils.getcwd()
1812
os.chdir(working_dir)
1815
result = self.apply_redirected(ui.ui_factory.stdin,
1817
bzrlib.commands.run_bzr_catch_user_errors,
1820
logger.removeHandler(handler)
1821
ui.ui_factory = old_ui_factory
1825
out = stdout.getvalue()
1826
err = stderr.getvalue()
1828
self.log('output:\n%r', out)
1830
self.log('errors:\n%r', err)
1831
if retcode is not None:
1832
self.assertEquals(retcode, result,
1833
message='Unexpected return code')
1834
return result, out, err
1836
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1837
working_dir=None, error_regexes=[], output_encoding=None):
1838
"""Invoke bzr, as if it were run from the command line.
1840
The argument list should not include the bzr program name - the
1841
first argument is normally the bzr command. Arguments may be
1842
passed in three ways:
1844
1- A list of strings, eg ["commit", "a"]. This is recommended
1845
when the command contains whitespace or metacharacters, or
1846
is built up at run time.
1848
2- A single string, eg "add a". This is the most convenient
1849
for hardcoded commands.
1851
This runs bzr through the interface that catches and reports
1852
errors, and with logging set to something approximating the
1853
default, so that error reporting can be checked.
1855
This should be the main method for tests that want to exercise the
1856
overall behavior of the bzr application (rather than a unit test
1857
or a functional test of the library.)
1859
This sends the stdout/stderr results into the test's log,
1860
where it may be useful for debugging. See also run_captured.
1862
:keyword stdin: A string to be used as stdin for the command.
1863
:keyword retcode: The status code the command should return;
1865
:keyword working_dir: The directory to run the command in
1866
:keyword error_regexes: A list of expected error messages. If
1867
specified they must be seen in the error output of the command.
1869
retcode, out, err = self._run_bzr_autosplit(
1874
working_dir=working_dir,
1876
self.assertIsInstance(error_regexes, (list, tuple))
1877
for regex in error_regexes:
1878
self.assertContainsRe(err, regex)
1881
def run_bzr_error(self, error_regexes, *args, **kwargs):
1882
"""Run bzr, and check that stderr contains the supplied regexes
1884
:param error_regexes: Sequence of regular expressions which
1885
must each be found in the error output. The relative ordering
1887
:param args: command-line arguments for bzr
1888
:param kwargs: Keyword arguments which are interpreted by run_bzr
1889
This function changes the default value of retcode to be 3,
1890
since in most cases this is run when you expect bzr to fail.
1892
:return: (out, err) The actual output of running the command (in case
1893
you want to do more inspection)
1897
# Make sure that commit is failing because there is nothing to do
1898
self.run_bzr_error(['no changes to commit'],
1899
['commit', '-m', 'my commit comment'])
1900
# Make sure --strict is handling an unknown file, rather than
1901
# giving us the 'nothing to do' error
1902
self.build_tree(['unknown'])
1903
self.run_bzr_error(['Commit refused because there are unknown files'],
1904
['commit', --strict', '-m', 'my commit comment'])
1906
kwargs.setdefault('retcode', 3)
1907
kwargs['error_regexes'] = error_regexes
1908
out, err = self.run_bzr(*args, **kwargs)
1911
def run_bzr_subprocess(self, *args, **kwargs):
1912
"""Run bzr in a subprocess for testing.
1914
This starts a new Python interpreter and runs bzr in there.
1915
This should only be used for tests that have a justifiable need for
1916
this isolation: e.g. they are testing startup time, or signal
1917
handling, or early startup code, etc. Subprocess code can't be
1918
profiled or debugged so easily.
1920
:keyword retcode: The status code that is expected. Defaults to 0. If
1921
None is supplied, the status code is not checked.
1922
:keyword env_changes: A dictionary which lists changes to environment
1923
variables. A value of None will unset the env variable.
1924
The values must be strings. The change will only occur in the
1925
child, so you don't need to fix the environment after running.
1926
:keyword universal_newlines: Convert CRLF => LF
1927
:keyword allow_plugins: By default the subprocess is run with
1928
--no-plugins to ensure test reproducibility. Also, it is possible
1929
for system-wide plugins to create unexpected output on stderr,
1930
which can cause unnecessary test failures.
1932
env_changes = kwargs.get('env_changes', {})
1933
working_dir = kwargs.get('working_dir', None)
1934
allow_plugins = kwargs.get('allow_plugins', False)
1936
if isinstance(args[0], list):
1938
elif isinstance(args[0], basestring):
1939
args = list(shlex.split(args[0]))
1941
raise ValueError("passing varargs to run_bzr_subprocess")
1942
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1943
working_dir=working_dir,
1944
allow_plugins=allow_plugins)
1945
# We distinguish between retcode=None and retcode not passed.
1946
supplied_retcode = kwargs.get('retcode', 0)
1947
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1948
universal_newlines=kwargs.get('universal_newlines', False),
1951
def start_bzr_subprocess(self, process_args, env_changes=None,
1952
skip_if_plan_to_signal=False,
1954
allow_plugins=False):
1955
"""Start bzr in a subprocess for testing.
1957
This starts a new Python interpreter and runs bzr in there.
1958
This should only be used for tests that have a justifiable need for
1959
this isolation: e.g. they are testing startup time, or signal
1960
handling, or early startup code, etc. Subprocess code can't be
1961
profiled or debugged so easily.
1963
:param process_args: a list of arguments to pass to the bzr executable,
1964
for example ``['--version']``.
1965
:param env_changes: A dictionary which lists changes to environment
1966
variables. A value of None will unset the env variable.
1967
The values must be strings. The change will only occur in the
1968
child, so you don't need to fix the environment after running.
1969
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1971
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1973
:returns: Popen object for the started process.
1975
if skip_if_plan_to_signal:
1976
if not getattr(os, 'kill', None):
1977
raise TestSkipped("os.kill not available.")
1979
if env_changes is None:
1983
def cleanup_environment():
1984
for env_var, value in env_changes.iteritems():
1985
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1987
def restore_environment():
1988
for env_var, value in old_env.iteritems():
1989
osutils.set_or_unset_env(env_var, value)
1991
bzr_path = self.get_bzr_path()
1994
if working_dir is not None:
1995
cwd = osutils.getcwd()
1996
os.chdir(working_dir)
1999
# win32 subprocess doesn't support preexec_fn
2000
# so we will avoid using it on all platforms, just to
2001
# make sure the code path is used, and we don't break on win32
2002
cleanup_environment()
2003
command = [sys.executable]
2004
# frozen executables don't need the path to bzr
2005
if getattr(sys, "frozen", None) is None:
2006
command.append(bzr_path)
2007
if not allow_plugins:
2008
command.append('--no-plugins')
2009
command.extend(process_args)
2010
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
2012
restore_environment()
2018
def _popen(self, *args, **kwargs):
2019
"""Place a call to Popen.
2021
Allows tests to override this method to intercept the calls made to
2022
Popen for introspection.
2024
return Popen(*args, **kwargs)
2026
def get_source_path(self):
2027
"""Return the path of the directory containing bzrlib."""
2028
return os.path.dirname(os.path.dirname(bzrlib.__file__))
2030
def get_bzr_path(self):
2031
"""Return the path of the 'bzr' executable for this test suite."""
2032
bzr_path = self.get_source_path()+'/bzr'
2033
if not os.path.isfile(bzr_path):
2034
# We are probably installed. Assume sys.argv is the right file
2035
bzr_path = sys.argv[0]
2038
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2039
universal_newlines=False, process_args=None):
2040
"""Finish the execution of process.
2042
:param process: the Popen object returned from start_bzr_subprocess.
2043
:param retcode: The status code that is expected. Defaults to 0. If
2044
None is supplied, the status code is not checked.
2045
:param send_signal: an optional signal to send to the process.
2046
:param universal_newlines: Convert CRLF => LF
2047
:returns: (stdout, stderr)
2049
if send_signal is not None:
2050
os.kill(process.pid, send_signal)
2051
out, err = process.communicate()
2053
if universal_newlines:
2054
out = out.replace('\r\n', '\n')
2055
err = err.replace('\r\n', '\n')
2057
if retcode is not None and retcode != process.returncode:
2058
if process_args is None:
2059
process_args = "(unknown args)"
2060
mutter('Output of bzr %s:\n%s', process_args, out)
2061
mutter('Error for bzr %s:\n%s', process_args, err)
2062
self.fail('Command bzr %s failed with retcode %s != %s'
2063
% (process_args, retcode, process.returncode))
2066
def check_inventory_shape(self, inv, shape):
2067
"""Compare an inventory to a list of expected names.
2069
Fail if they are not precisely equal.
2072
shape = list(shape) # copy
2073
for path, ie in inv.entries():
2074
name = path.replace('\\', '/')
2075
if ie.kind == 'directory':
2082
self.fail("expected paths not found in inventory: %r" % shape)
2084
self.fail("unexpected paths found in inventory: %r" % extras)
2086
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2087
a_callable=None, *args, **kwargs):
2088
"""Call callable with redirected std io pipes.
2090
Returns the return code."""
2091
if not callable(a_callable):
2092
raise ValueError("a_callable must be callable.")
2094
stdin = StringIO("")
2096
if getattr(self, "_log_file", None) is not None:
2097
stdout = self._log_file
2101
if getattr(self, "_log_file", None is not None):
2102
stderr = self._log_file
2105
real_stdin = sys.stdin
2106
real_stdout = sys.stdout
2107
real_stderr = sys.stderr
2112
return a_callable(*args, **kwargs)
2114
sys.stdout = real_stdout
2115
sys.stderr = real_stderr
2116
sys.stdin = real_stdin
2118
def reduceLockdirTimeout(self):
2119
"""Reduce the default lock timeout for the duration of the test, so that
2120
if LockContention occurs during a test, it does so quickly.
2122
Tests that expect to provoke LockContention errors should call this.
2124
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2126
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2127
self.addCleanup(resetTimeout)
2128
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2130
def make_utf8_encoded_stringio(self, encoding_type=None):
2131
"""Return a StringIOWrapper instance, that will encode Unicode
2134
if encoding_type is None:
2135
encoding_type = 'strict'
2137
output_encoding = 'utf-8'
2138
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2139
sio.encoding = output_encoding
2142
def disable_verb(self, verb):
2143
"""Disable a smart server verb for one test."""
2144
from bzrlib.smart import request
2145
request_handlers = request.request_handlers
2146
orig_method = request_handlers.get(verb)
2147
request_handlers.remove(verb)
2149
request_handlers.register(verb, orig_method)
2150
self.addCleanup(restoreVerb)
2153
class CapturedCall(object):
2154
"""A helper for capturing smart server calls for easy debug analysis."""
2156
def __init__(self, params, prefix_length):
2157
"""Capture the call with params and skip prefix_length stack frames."""
2160
# The last 5 frames are the __init__, the hook frame, and 3 smart
2161
# client frames. Beyond this we could get more clever, but this is good
2163
stack = traceback.extract_stack()[prefix_length:-5]
2164
self.stack = ''.join(traceback.format_list(stack))
2167
return self.call.method
2170
return self.call.method
2176
class TestCaseWithMemoryTransport(TestCase):
2177
"""Common test class for tests that do not need disk resources.
2179
Tests that need disk resources should derive from TestCaseWithTransport.
2181
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2183
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2184
a directory which does not exist. This serves to help ensure test isolation
2185
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2186
must exist. However, TestCaseWithMemoryTransport does not offer local
2187
file defaults for the transport in tests, nor does it obey the command line
2188
override, so tests that accidentally write to the common directory should
2191
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2192
a .bzr directory that stops us ascending higher into the filesystem.
2198
def __init__(self, methodName='runTest'):
2199
# allow test parameterization after test construction and before test
2200
# execution. Variables that the parameterizer sets need to be
2201
# ones that are not set by setUp, or setUp will trash them.
2202
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2203
self.vfs_transport_factory = default_transport
2204
self.transport_server = None
2205
self.transport_readonly_server = None
2206
self.__vfs_server = None
2208
def get_transport(self, relpath=None):
2209
"""Return a writeable transport.
2211
This transport is for the test scratch space relative to
2214
:param relpath: a path relative to the base url.
2216
t = get_transport(self.get_url(relpath))
2217
self.assertFalse(t.is_readonly())
2220
def get_readonly_transport(self, relpath=None):
2221
"""Return a readonly transport for the test scratch space
2223
This can be used to test that operations which should only need
2224
readonly access in fact do not try to write.
2226
:param relpath: a path relative to the base url.
2228
t = get_transport(self.get_readonly_url(relpath))
2229
self.assertTrue(t.is_readonly())
2232
def create_transport_readonly_server(self):
2233
"""Create a transport server from class defined at init.
2235
This is mostly a hook for daughter classes.
2237
return self.transport_readonly_server()
2239
def get_readonly_server(self):
2240
"""Get the server instance for the readonly transport
2242
This is useful for some tests with specific servers to do diagnostics.
2244
if self.__readonly_server is None:
2245
if self.transport_readonly_server is None:
2246
# readonly decorator requested
2247
self.__readonly_server = ReadonlyServer()
2249
# explicit readonly transport.
2250
self.__readonly_server = self.create_transport_readonly_server()
2251
self.start_server(self.__readonly_server,
2252
self.get_vfs_only_server())
2253
return self.__readonly_server
2255
def get_readonly_url(self, relpath=None):
2256
"""Get a URL for the readonly transport.
2258
This will either be backed by '.' or a decorator to the transport
2259
used by self.get_url()
2260
relpath provides for clients to get a path relative to the base url.
2261
These should only be downwards relative, not upwards.
2263
base = self.get_readonly_server().get_url()
2264
return self._adjust_url(base, relpath)
2266
def get_vfs_only_server(self):
2267
"""Get the vfs only read/write server instance.
2269
This is useful for some tests with specific servers that need
2272
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2273
is no means to override it.
2275
if self.__vfs_server is None:
2276
self.__vfs_server = MemoryServer()
2277
self.start_server(self.__vfs_server)
2278
return self.__vfs_server
2280
def get_server(self):
2281
"""Get the read/write server instance.
2283
This is useful for some tests with specific servers that need
2286
This is built from the self.transport_server factory. If that is None,
2287
then the self.get_vfs_server is returned.
2289
if self.__server is None:
2290
if (self.transport_server is None or self.transport_server is
2291
self.vfs_transport_factory):
2292
self.__server = self.get_vfs_only_server()
2294
# bring up a decorated means of access to the vfs only server.
2295
self.__server = self.transport_server()
2296
self.start_server(self.__server, self.get_vfs_only_server())
2297
return self.__server
2299
def _adjust_url(self, base, relpath):
2300
"""Get a URL (or maybe a path) for the readwrite transport.
2302
This will either be backed by '.' or to an equivalent non-file based
2304
relpath provides for clients to get a path relative to the base url.
2305
These should only be downwards relative, not upwards.
2307
if relpath is not None and relpath != '.':
2308
if not base.endswith('/'):
2310
# XXX: Really base should be a url; we did after all call
2311
# get_url()! But sometimes it's just a path (from
2312
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2313
# to a non-escaped local path.
2314
if base.startswith('./') or base.startswith('/'):
2317
base += urlutils.escape(relpath)
2320
def get_url(self, relpath=None):
2321
"""Get a URL (or maybe a path) for the readwrite transport.
2323
This will either be backed by '.' or to an equivalent non-file based
2325
relpath provides for clients to get a path relative to the base url.
2326
These should only be downwards relative, not upwards.
2328
base = self.get_server().get_url()
2329
return self._adjust_url(base, relpath)
2331
def get_vfs_only_url(self, relpath=None):
2332
"""Get a URL (or maybe a path for the plain old vfs transport.
2334
This will never be a smart protocol. It always has all the
2335
capabilities of the local filesystem, but it might actually be a
2336
MemoryTransport or some other similar virtual filesystem.
2338
This is the backing transport (if any) of the server returned by
2339
get_url and get_readonly_url.
2341
:param relpath: provides for clients to get a path relative to the base
2342
url. These should only be downwards relative, not upwards.
2345
base = self.get_vfs_only_server().get_url()
2346
return self._adjust_url(base, relpath)
2348
def _create_safety_net(self):
2349
"""Make a fake bzr directory.
2351
This prevents any tests propagating up onto the TEST_ROOT directory's
2354
root = TestCaseWithMemoryTransport.TEST_ROOT
2355
bzrdir.BzrDir.create_standalone_workingtree(root)
2357
def _check_safety_net(self):
2358
"""Check that the safety .bzr directory have not been touched.
2360
_make_test_root have created a .bzr directory to prevent tests from
2361
propagating. This method ensures than a test did not leaked.
2363
root = TestCaseWithMemoryTransport.TEST_ROOT
2364
self.permit_url(get_transport(root).base)
2365
wt = workingtree.WorkingTree.open(root)
2366
last_rev = wt.last_revision()
2367
if last_rev != 'null:':
2368
# The current test have modified the /bzr directory, we need to
2369
# recreate a new one or all the followng tests will fail.
2370
# If you need to inspect its content uncomment the following line
2371
# import pdb; pdb.set_trace()
2372
_rmtree_temp_dir(root + '/.bzr')
2373
self._create_safety_net()
2374
raise AssertionError('%s/.bzr should not be modified' % root)
2376
def _make_test_root(self):
2377
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2378
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2379
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2381
TestCaseWithMemoryTransport.TEST_ROOT = root
2383
self._create_safety_net()
2385
# The same directory is used by all tests, and we're not
2386
# specifically told when all tests are finished. This will do.
2387
atexit.register(_rmtree_temp_dir, root)
2389
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2390
self.addCleanup(self._check_safety_net)
2392
def makeAndChdirToTestDir(self):
2393
"""Create a temporary directories for this one test.
2395
This must set self.test_home_dir and self.test_dir and chdir to
2398
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2400
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2401
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2402
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2403
self.permit_dir(self.test_dir)
2405
def make_branch(self, relpath, format=None):
2406
"""Create a branch on the transport at relpath."""
2407
repo = self.make_repository(relpath, format=format)
2408
return repo.bzrdir.create_branch()
2410
def make_bzrdir(self, relpath, format=None):
2412
# might be a relative or absolute path
2413
maybe_a_url = self.get_url(relpath)
2414
segments = maybe_a_url.rsplit('/', 1)
2415
t = get_transport(maybe_a_url)
2416
if len(segments) > 1 and segments[-1] not in ('', '.'):
2420
if isinstance(format, basestring):
2421
format = bzrdir.format_registry.make_bzrdir(format)
2422
return format.initialize_on_transport(t)
2423
except errors.UninitializableFormat:
2424
raise TestSkipped("Format %s is not initializable." % format)
2426
def make_repository(self, relpath, shared=False, format=None):
2427
"""Create a repository on our default transport at relpath.
2429
Note that relpath must be a relative path, not a full url.
2431
# FIXME: If you create a remoterepository this returns the underlying
2432
# real format, which is incorrect. Actually we should make sure that
2433
# RemoteBzrDir returns a RemoteRepository.
2434
# maybe mbp 20070410
2435
made_control = self.make_bzrdir(relpath, format=format)
2436
return made_control.create_repository(shared=shared)
2438
def make_smart_server(self, path):
2439
smart_server = server.SmartTCPServer_for_testing()
2440
self.start_server(smart_server, self.get_server())
2441
remote_transport = get_transport(smart_server.get_url()).clone(path)
2442
return remote_transport
2444
def make_branch_and_memory_tree(self, relpath, format=None):
2445
"""Create a branch on the default transport and a MemoryTree for it."""
2446
b = self.make_branch(relpath, format=format)
2447
return memorytree.MemoryTree.create_on_branch(b)
2449
def make_branch_builder(self, relpath, format=None):
2450
branch = self.make_branch(relpath, format=format)
2451
return branchbuilder.BranchBuilder(branch=branch)
2453
def overrideEnvironmentForTesting(self):
2454
os.environ['HOME'] = self.test_home_dir
2455
os.environ['BZR_HOME'] = self.test_home_dir
2458
super(TestCaseWithMemoryTransport, self).setUp()
2459
self._make_test_root()
2460
_currentdir = os.getcwdu()
2461
def _leaveDirectory():
2462
os.chdir(_currentdir)
2463
self.addCleanup(_leaveDirectory)
2464
self.makeAndChdirToTestDir()
2465
self.overrideEnvironmentForTesting()
2466
self.__readonly_server = None
2467
self.__server = None
2468
self.reduceLockdirTimeout()
2470
def setup_smart_server_with_call_log(self):
2471
"""Sets up a smart server as the transport server with a call log."""
2472
self.transport_server = server.SmartTCPServer_for_testing
2473
self.hpss_calls = []
2475
# Skip the current stack down to the caller of
2476
# setup_smart_server_with_call_log
2477
prefix_length = len(traceback.extract_stack()) - 2
2478
def capture_hpss_call(params):
2479
self.hpss_calls.append(
2480
CapturedCall(params, prefix_length))
2481
client._SmartClient.hooks.install_named_hook(
2482
'call', capture_hpss_call, None)
2484
def reset_smart_call_log(self):
2485
self.hpss_calls = []
2488
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2489
"""Derived class that runs a test within a temporary directory.
2491
This is useful for tests that need to create a branch, etc.
2493
The directory is created in a slightly complex way: for each
2494
Python invocation, a new temporary top-level directory is created.
2495
All test cases create their own directory within that. If the
2496
tests complete successfully, the directory is removed.
2498
:ivar test_base_dir: The path of the top-level directory for this
2499
test, which contains a home directory and a work directory.
2501
:ivar test_home_dir: An initially empty directory under test_base_dir
2502
which is used as $HOME for this test.
2504
:ivar test_dir: A directory under test_base_dir used as the current
2505
directory when the test proper is run.
2508
OVERRIDE_PYTHON = 'python'
2510
def check_file_contents(self, filename, expect):
2511
self.log("check contents of file %s" % filename)
2512
contents = file(filename, 'r').read()
2513
if contents != expect:
2514
self.log("expected: %r" % expect)
2515
self.log("actually: %r" % contents)
2516
self.fail("contents of %s not as expected" % filename)
2518
def _getTestDirPrefix(self):
2519
# create a directory within the top level test directory
2520
if sys.platform in ('win32', 'cygwin'):
2521
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2522
# windows is likely to have path-length limits so use a short name
2523
name_prefix = name_prefix[-30:]
2525
name_prefix = re.sub('[/]', '_', self.id())
2528
def makeAndChdirToTestDir(self):
2529
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2531
For TestCaseInTempDir we create a temporary directory based on the test
2532
name and then create two subdirs - test and home under it.
2534
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2535
self._getTestDirPrefix())
2537
for i in range(100):
2538
if os.path.exists(name):
2539
name = name_prefix + '_' + str(i)
2541
# now create test and home directories within this dir
2542
self.test_base_dir = name
2543
self.addCleanup(self.deleteTestDir)
2544
os.mkdir(self.test_base_dir)
2546
self.permit_dir(self.test_base_dir)
2547
# 'sprouting' and 'init' of a branch both walk up the tree to find
2548
# stacking policy to honour; create a bzr dir with an unshared
2549
# repository (but not a branch - our code would be trying to escape
2550
# then!) to stop them, and permit it to be read.
2551
# control = bzrdir.BzrDir.create(self.test_base_dir)
2552
# control.create_repository()
2553
self.test_home_dir = self.test_base_dir + '/home'
2554
os.mkdir(self.test_home_dir)
2555
self.test_dir = self.test_base_dir + '/work'
2556
os.mkdir(self.test_dir)
2557
os.chdir(self.test_dir)
2558
# put name of test inside
2559
f = file(self.test_base_dir + '/name', 'w')
2565
def deleteTestDir(self):
2566
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2567
_rmtree_temp_dir(self.test_base_dir)
2569
def build_tree(self, shape, line_endings='binary', transport=None):
2570
"""Build a test tree according to a pattern.
2572
shape is a sequence of file specifications. If the final
2573
character is '/', a directory is created.
2575
This assumes that all the elements in the tree being built are new.
2577
This doesn't add anything to a branch.
2579
:type shape: list or tuple.
2580
:param line_endings: Either 'binary' or 'native'
2581
in binary mode, exact contents are written in native mode, the
2582
line endings match the default platform endings.
2583
:param transport: A transport to write to, for building trees on VFS's.
2584
If the transport is readonly or None, "." is opened automatically.
2587
if type(shape) not in (list, tuple):
2588
raise AssertionError("Parameter 'shape' should be "
2589
"a list or a tuple. Got %r instead" % (shape,))
2590
# It's OK to just create them using forward slashes on windows.
2591
if transport is None or transport.is_readonly():
2592
transport = get_transport(".")
2594
self.assertIsInstance(name, basestring)
2596
transport.mkdir(urlutils.escape(name[:-1]))
2598
if line_endings == 'binary':
2600
elif line_endings == 'native':
2603
raise errors.BzrError(
2604
'Invalid line ending request %r' % line_endings)
2605
content = "contents of %s%s" % (name.encode('utf-8'), end)
2606
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2608
def build_tree_contents(self, shape):
2609
build_tree_contents(shape)
2611
def assertInWorkingTree(self, path, root_path='.', tree=None):
2612
"""Assert whether path or paths are in the WorkingTree"""
2614
tree = workingtree.WorkingTree.open(root_path)
2615
if not isinstance(path, basestring):
2617
self.assertInWorkingTree(p, tree=tree)
2619
self.assertIsNot(tree.path2id(path), None,
2620
path+' not in working tree.')
2622
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2623
"""Assert whether path or paths are not in the WorkingTree"""
2625
tree = workingtree.WorkingTree.open(root_path)
2626
if not isinstance(path, basestring):
2628
self.assertNotInWorkingTree(p,tree=tree)
2630
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2633
class TestCaseWithTransport(TestCaseInTempDir):
2634
"""A test case that provides get_url and get_readonly_url facilities.
2636
These back onto two transport servers, one for readonly access and one for
2639
If no explicit class is provided for readonly access, a
2640
ReadonlyTransportDecorator is used instead which allows the use of non disk
2641
based read write transports.
2643
If an explicit class is provided for readonly access, that server and the
2644
readwrite one must both define get_url() as resolving to os.getcwd().
2647
def get_vfs_only_server(self):
2648
"""See TestCaseWithMemoryTransport.
2650
This is useful for some tests with specific servers that need
2653
if self.__vfs_server is None:
2654
self.__vfs_server = self.vfs_transport_factory()
2655
self.start_server(self.__vfs_server)
2656
return self.__vfs_server
2658
def make_branch_and_tree(self, relpath, format=None):
2659
"""Create a branch on the transport and a tree locally.
2661
If the transport is not a LocalTransport, the Tree can't be created on
2662
the transport. In that case if the vfs_transport_factory is
2663
LocalURLServer the working tree is created in the local
2664
directory backing the transport, and the returned tree's branch and
2665
repository will also be accessed locally. Otherwise a lightweight
2666
checkout is created and returned.
2668
We do this because we can't physically create a tree in the local
2669
path, with a branch reference to the transport_factory url, and
2670
a branch + repository in the vfs_transport, unless the vfs_transport
2671
namespace is distinct from the local disk - the two branch objects
2672
would collide. While we could construct a tree with its branch object
2673
pointing at the transport_factory transport in memory, reopening it
2674
would behaving unexpectedly, and has in the past caused testing bugs
2675
when we tried to do it that way.
2677
:param format: The BzrDirFormat.
2678
:returns: the WorkingTree.
2680
# TODO: always use the local disk path for the working tree,
2681
# this obviously requires a format that supports branch references
2682
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2684
b = self.make_branch(relpath, format=format)
2686
return b.bzrdir.create_workingtree()
2687
except errors.NotLocalUrl:
2688
# We can only make working trees locally at the moment. If the
2689
# transport can't support them, then we keep the non-disk-backed
2690
# branch and create a local checkout.
2691
if self.vfs_transport_factory is LocalURLServer:
2692
# the branch is colocated on disk, we cannot create a checkout.
2693
# hopefully callers will expect this.
2694
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2695
wt = local_controldir.create_workingtree()
2696
if wt.branch._format != b._format:
2698
# Make sure that assigning to wt._branch fixes wt.branch,
2699
# in case the implementation details of workingtree objects
2701
self.assertIs(b, wt.branch)
2704
return b.create_checkout(relpath, lightweight=True)
2706
def assertIsDirectory(self, relpath, transport):
2707
"""Assert that relpath within transport is a directory.
2709
This may not be possible on all transports; in that case it propagates
2710
a TransportNotPossible.
2713
mode = transport.stat(relpath).st_mode
2714
except errors.NoSuchFile:
2715
self.fail("path %s is not a directory; no such file"
2717
if not stat.S_ISDIR(mode):
2718
self.fail("path %s is not a directory; has mode %#o"
2721
def assertTreesEqual(self, left, right):
2722
"""Check that left and right have the same content and properties."""
2723
# we use a tree delta to check for equality of the content, and we
2724
# manually check for equality of other things such as the parents list.
2725
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2726
differences = left.changes_from(right)
2727
self.assertFalse(differences.has_changed(),
2728
"Trees %r and %r are different: %r" % (left, right, differences))
2731
super(TestCaseWithTransport, self).setUp()
2732
self.__vfs_server = None
2734
def disable_missing_extensions_warning(self):
2735
"""Some tests expect a precise stderr content.
2737
There is no point in forcing them to duplicate the extension related
2740
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2743
class ChrootedTestCase(TestCaseWithTransport):
2744
"""A support class that provides readonly urls outside the local namespace.
2746
This is done by checking if self.transport_server is a MemoryServer. if it
2747
is then we are chrooted already, if it is not then an HttpServer is used
2750
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2751
be used without needed to redo it when a different
2752
subclass is in use ?
2756
super(ChrootedTestCase, self).setUp()
2757
if not self.vfs_transport_factory == MemoryServer:
2758
self.transport_readonly_server = HttpServer
2761
def condition_id_re(pattern):
2762
"""Create a condition filter which performs a re check on a test's id.
2764
:param pattern: A regular expression string.
2765
:return: A callable that returns True if the re matches.
2767
filter_re = osutils.re_compile_checked(pattern, 0,
2769
def condition(test):
2771
return filter_re.search(test_id)
2775
def condition_isinstance(klass_or_klass_list):
2776
"""Create a condition filter which returns isinstance(param, klass).
2778
:return: A callable which when called with one parameter obj return the
2779
result of isinstance(obj, klass_or_klass_list).
2782
return isinstance(obj, klass_or_klass_list)
2786
def condition_id_in_list(id_list):
2787
"""Create a condition filter which verify that test's id in a list.
2789
:param id_list: A TestIdList object.
2790
:return: A callable that returns True if the test's id appears in the list.
2792
def condition(test):
2793
return id_list.includes(test.id())
2797
def condition_id_startswith(starts):
2798
"""Create a condition filter verifying that test's id starts with a string.
2800
:param starts: A list of string.
2801
:return: A callable that returns True if the test's id starts with one of
2804
def condition(test):
2805
for start in starts:
2806
if test.id().startswith(start):
2812
def exclude_tests_by_condition(suite, condition):
2813
"""Create a test suite which excludes some tests from suite.
2815
:param suite: The suite to get tests from.
2816
:param condition: A callable whose result evaluates True when called with a
2817
test case which should be excluded from the result.
2818
:return: A suite which contains the tests found in suite that fail
2822
for test in iter_suite_tests(suite):
2823
if not condition(test):
2825
return TestUtil.TestSuite(result)
2828
def filter_suite_by_condition(suite, condition):
2829
"""Create a test suite by filtering another one.
2831
:param suite: The source suite.
2832
:param condition: A callable whose result evaluates True when called with a
2833
test case which should be included in the result.
2834
:return: A suite which contains the tests found in suite that pass
2838
for test in iter_suite_tests(suite):
2841
return TestUtil.TestSuite(result)
2844
def filter_suite_by_re(suite, pattern):
2845
"""Create a test suite by filtering another one.
2847
:param suite: the source suite
2848
:param pattern: pattern that names must match
2849
:returns: the newly created suite
2851
condition = condition_id_re(pattern)
2852
result_suite = filter_suite_by_condition(suite, condition)
2856
def filter_suite_by_id_list(suite, test_id_list):
2857
"""Create a test suite by filtering another one.
2859
:param suite: The source suite.
2860
:param test_id_list: A list of the test ids to keep as strings.
2861
:returns: the newly created suite
2863
condition = condition_id_in_list(test_id_list)
2864
result_suite = filter_suite_by_condition(suite, condition)
2868
def filter_suite_by_id_startswith(suite, start):
2869
"""Create a test suite by filtering another one.
2871
:param suite: The source suite.
2872
:param start: A list of string the test id must start with one of.
2873
:returns: the newly created suite
2875
condition = condition_id_startswith(start)
2876
result_suite = filter_suite_by_condition(suite, condition)
2880
def exclude_tests_by_re(suite, pattern):
2881
"""Create a test suite which excludes some tests from suite.
2883
:param suite: The suite to get tests from.
2884
:param pattern: A regular expression string. Test ids that match this
2885
pattern will be excluded from the result.
2886
:return: A TestSuite that contains all the tests from suite without the
2887
tests that matched pattern. The order of tests is the same as it was in
2890
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2893
def preserve_input(something):
2894
"""A helper for performing test suite transformation chains.
2896
:param something: Anything you want to preserve.
2902
def randomize_suite(suite):
2903
"""Return a new TestSuite with suite's tests in random order.
2905
The tests in the input suite are flattened into a single suite in order to
2906
accomplish this. Any nested TestSuites are removed to provide global
2909
tests = list(iter_suite_tests(suite))
2910
random.shuffle(tests)
2911
return TestUtil.TestSuite(tests)
2914
def split_suite_by_condition(suite, condition):
2915
"""Split a test suite into two by a condition.
2917
:param suite: The suite to split.
2918
:param condition: The condition to match on. Tests that match this
2919
condition are returned in the first test suite, ones that do not match
2920
are in the second suite.
2921
:return: A tuple of two test suites, where the first contains tests from
2922
suite matching the condition, and the second contains the remainder
2923
from suite. The order within each output suite is the same as it was in
2928
for test in iter_suite_tests(suite):
2930
matched.append(test)
2932
did_not_match.append(test)
2933
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2936
def split_suite_by_re(suite, pattern):
2937
"""Split a test suite into two by a regular expression.
2939
:param suite: The suite to split.
2940
:param pattern: A regular expression string. Test ids that match this
2941
pattern will be in the first test suite returned, and the others in the
2942
second test suite returned.
2943
:return: A tuple of two test suites, where the first contains tests from
2944
suite matching pattern, and the second contains the remainder from
2945
suite. The order within each output suite is the same as it was in
2948
return split_suite_by_condition(suite, condition_id_re(pattern))
2951
def run_suite(suite, name='test', verbose=False, pattern=".*",
2952
stop_on_failure=False,
2953
transport=None, lsprof_timed=None, bench_history=None,
2954
matching_tests_first=None,
2957
exclude_pattern=None,
2960
suite_decorators=None,
2962
result_decorators=None,
2964
"""Run a test suite for bzr selftest.
2966
:param runner_class: The class of runner to use. Must support the
2967
constructor arguments passed by run_suite which are more than standard
2969
:return: A boolean indicating success.
2971
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2976
if runner_class is None:
2977
runner_class = TextTestRunner
2980
runner = runner_class(stream=stream,
2982
verbosity=verbosity,
2983
bench_history=bench_history,
2985
result_decorators=result_decorators,
2987
runner.stop_on_failure=stop_on_failure
2988
# built in decorator factories:
2990
random_order(random_seed, runner),
2991
exclude_tests(exclude_pattern),
2993
if matching_tests_first:
2994
decorators.append(tests_first(pattern))
2996
decorators.append(filter_tests(pattern))
2997
if suite_decorators:
2998
decorators.extend(suite_decorators)
2999
# tell the result object how many tests will be running: (except if
3000
# --parallel=fork is being used. Robert said he will provide a better
3001
# progress design later -- vila 20090817)
3002
if fork_decorator not in decorators:
3003
decorators.append(CountingDecorator)
3004
for decorator in decorators:
3005
suite = decorator(suite)
3007
# Done after test suite decoration to allow randomisation etc
3008
# to take effect, though that is of marginal benefit.
3010
stream.write("Listing tests only ...\n")
3011
for t in iter_suite_tests(suite):
3012
stream.write("%s\n" % (t.id()))
3014
result = runner.run(suite)
3016
return result.wasStrictlySuccessful()
3018
return result.wasSuccessful()
3021
# A registry where get() returns a suite decorator.
3022
parallel_registry = registry.Registry()
3025
def fork_decorator(suite):
3026
concurrency = osutils.local_concurrency()
3027
if concurrency == 1:
3029
from testtools import ConcurrentTestSuite
3030
return ConcurrentTestSuite(suite, fork_for_tests)
3031
parallel_registry.register('fork', fork_decorator)
3034
def subprocess_decorator(suite):
3035
concurrency = osutils.local_concurrency()
3036
if concurrency == 1:
3038
from testtools import ConcurrentTestSuite
3039
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3040
parallel_registry.register('subprocess', subprocess_decorator)
3043
def exclude_tests(exclude_pattern):
3044
"""Return a test suite decorator that excludes tests."""
3045
if exclude_pattern is None:
3046
return identity_decorator
3047
def decorator(suite):
3048
return ExcludeDecorator(suite, exclude_pattern)
3052
def filter_tests(pattern):
3054
return identity_decorator
3055
def decorator(suite):
3056
return FilterTestsDecorator(suite, pattern)
3060
def random_order(random_seed, runner):
3061
"""Return a test suite decorator factory for randomising tests order.
3063
:param random_seed: now, a string which casts to a long, or a long.
3064
:param runner: A test runner with a stream attribute to report on.
3066
if random_seed is None:
3067
return identity_decorator
3068
def decorator(suite):
3069
return RandomDecorator(suite, random_seed, runner.stream)
3073
def tests_first(pattern):
3075
return identity_decorator
3076
def decorator(suite):
3077
return TestFirstDecorator(suite, pattern)
3081
def identity_decorator(suite):
3086
class TestDecorator(TestSuite):
3087
"""A decorator for TestCase/TestSuite objects.
3089
Usually, subclasses should override __iter__(used when flattening test
3090
suites), which we do to filter, reorder, parallelise and so on, run() and
3094
def __init__(self, suite):
3095
TestSuite.__init__(self)
3098
def countTestCases(self):
3101
cases += test.countTestCases()
3108
def run(self, result):
3109
# Use iteration on self, not self._tests, to allow subclasses to hook
3112
if result.shouldStop:
3118
class CountingDecorator(TestDecorator):
3119
"""A decorator which calls result.progress(self.countTestCases)."""
3121
def run(self, result):
3122
progress_method = getattr(result, 'progress', None)
3123
if callable(progress_method):
3124
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3125
return super(CountingDecorator, self).run(result)
3128
class ExcludeDecorator(TestDecorator):
3129
"""A decorator which excludes test matching an exclude pattern."""
3131
def __init__(self, suite, exclude_pattern):
3132
TestDecorator.__init__(self, suite)
3133
self.exclude_pattern = exclude_pattern
3134
self.excluded = False
3138
return iter(self._tests)
3139
self.excluded = True
3140
suite = exclude_tests_by_re(self, self.exclude_pattern)
3142
self.addTests(suite)
3143
return iter(self._tests)
3146
class FilterTestsDecorator(TestDecorator):
3147
"""A decorator which filters tests to those matching a pattern."""
3149
def __init__(self, suite, pattern):
3150
TestDecorator.__init__(self, suite)
3151
self.pattern = pattern
3152
self.filtered = False
3156
return iter(self._tests)
3157
self.filtered = True
3158
suite = filter_suite_by_re(self, self.pattern)
3160
self.addTests(suite)
3161
return iter(self._tests)
3164
class RandomDecorator(TestDecorator):
3165
"""A decorator which randomises the order of its tests."""
3167
def __init__(self, suite, random_seed, stream):
3168
TestDecorator.__init__(self, suite)
3169
self.random_seed = random_seed
3170
self.randomised = False
3171
self.stream = stream
3175
return iter(self._tests)
3176
self.randomised = True
3177
self.stream.writeln("Randomizing test order using seed %s\n" %
3178
(self.actual_seed()))
3179
# Initialise the random number generator.
3180
random.seed(self.actual_seed())
3181
suite = randomize_suite(self)
3183
self.addTests(suite)
3184
return iter(self._tests)
3186
def actual_seed(self):
3187
if self.random_seed == "now":
3188
# We convert the seed to a long to make it reuseable across
3189
# invocations (because the user can reenter it).
3190
self.random_seed = long(time.time())
3192
# Convert the seed to a long if we can
3194
self.random_seed = long(self.random_seed)
3197
return self.random_seed
3200
class TestFirstDecorator(TestDecorator):
3201
"""A decorator which moves named tests to the front."""
3203
def __init__(self, suite, pattern):
3204
TestDecorator.__init__(self, suite)
3205
self.pattern = pattern
3206
self.filtered = False
3210
return iter(self._tests)
3211
self.filtered = True
3212
suites = split_suite_by_re(self, self.pattern)
3214
self.addTests(suites)
3215
return iter(self._tests)
3218
def partition_tests(suite, count):
3219
"""Partition suite into count lists of tests."""
3221
tests = list(iter_suite_tests(suite))
3222
tests_per_process = int(math.ceil(float(len(tests)) / count))
3223
for block in range(count):
3224
low_test = block * tests_per_process
3225
high_test = low_test + tests_per_process
3226
process_tests = tests[low_test:high_test]
3227
result.append(process_tests)
3231
def fork_for_tests(suite):
3232
"""Take suite and start up one runner per CPU by forking()
3234
:return: An iterable of TestCase-like objects which can each have
3235
run(result) called on them to feed tests to result.
3237
concurrency = osutils.local_concurrency()
3239
from subunit import TestProtocolClient, ProtocolTestCase
3241
from subunit.test_results import AutoTimingTestResultDecorator
3243
AutoTimingTestResultDecorator = lambda x:x
3244
class TestInOtherProcess(ProtocolTestCase):
3245
# Should be in subunit, I think. RBC.
3246
def __init__(self, stream, pid):
3247
ProtocolTestCase.__init__(self, stream)
3250
def run(self, result):
3252
ProtocolTestCase.run(self, result)
3254
os.waitpid(self.pid, os.WNOHANG)
3256
test_blocks = partition_tests(suite, concurrency)
3257
for process_tests in test_blocks:
3258
process_suite = TestSuite()
3259
process_suite.addTests(process_tests)
3260
c2pread, c2pwrite = os.pipe()
3265
# Leave stderr and stdout open so we can see test noise
3266
# Close stdin so that the child goes away if it decides to
3267
# read from stdin (otherwise its a roulette to see what
3268
# child actually gets keystrokes for pdb etc).
3271
stream = os.fdopen(c2pwrite, 'wb', 1)
3272
subunit_result = AutoTimingTestResultDecorator(
3273
TestProtocolClient(stream))
3274
process_suite.run(subunit_result)
3279
stream = os.fdopen(c2pread, 'rb', 1)
3280
test = TestInOtherProcess(stream, pid)
3285
def reinvoke_for_tests(suite):
3286
"""Take suite and start up one runner per CPU using subprocess().
3288
:return: An iterable of TestCase-like objects which can each have
3289
run(result) called on them to feed tests to result.
3291
concurrency = osutils.local_concurrency()
3293
from subunit import ProtocolTestCase
3294
class TestInSubprocess(ProtocolTestCase):
3295
def __init__(self, process, name):
3296
ProtocolTestCase.__init__(self, process.stdout)
3297
self.process = process
3298
self.process.stdin.close()
3301
def run(self, result):
3303
ProtocolTestCase.run(self, result)
3306
os.unlink(self.name)
3307
# print "pid %d finished" % finished_process
3308
test_blocks = partition_tests(suite, concurrency)
3309
for process_tests in test_blocks:
3310
# ugly; currently reimplement rather than reuses TestCase methods.
3311
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3312
if not os.path.isfile(bzr_path):
3313
# We are probably installed. Assume sys.argv is the right file
3314
bzr_path = sys.argv[0]
3315
fd, test_list_file_name = tempfile.mkstemp()
3316
test_list_file = os.fdopen(fd, 'wb', 1)
3317
for test in process_tests:
3318
test_list_file.write(test.id() + '\n')
3319
test_list_file.close()
3321
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3323
if '--no-plugins' in sys.argv:
3324
argv.append('--no-plugins')
3325
# stderr=STDOUT would be ideal, but until we prevent noise on
3326
# stderr it can interrupt the subunit protocol.
3327
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3329
test = TestInSubprocess(process, test_list_file_name)
3332
os.unlink(test_list_file_name)
3337
class ForwardingResult(unittest.TestResult):
3339
def __init__(self, target):
3340
unittest.TestResult.__init__(self)
3341
self.result = target
3343
def startTest(self, test):
3344
self.result.startTest(test)
3346
def stopTest(self, test):
3347
self.result.stopTest(test)
3349
def startTestRun(self):
3350
self.result.startTestRun()
3352
def stopTestRun(self):
3353
self.result.stopTestRun()
3355
def addSkip(self, test, reason):
3356
self.result.addSkip(test, reason)
3358
def addSuccess(self, test):
3359
self.result.addSuccess(test)
3361
def addError(self, test, err):
3362
self.result.addError(test, err)
3364
def addFailure(self, test, err):
3365
self.result.addFailure(test, err)
3368
class BZRTransformingResult(ForwardingResult):
3370
def addError(self, test, err):
3371
feature = self._error_looks_like('UnavailableFeature: ', err)
3372
if feature is not None:
3373
self.result.addNotSupported(test, feature)
3375
self.result.addError(test, err)
3377
def addFailure(self, test, err):
3378
known = self._error_looks_like('KnownFailure: ', err)
3379
if known is not None:
3380
self.result._addKnownFailure(test, [KnownFailure,
3381
KnownFailure(known), None])
3383
self.result.addFailure(test, err)
3385
def _error_looks_like(self, prefix, err):
3386
"""Deserialize exception and returns the stringify value."""
3390
if isinstance(exc, subunit.RemoteException):
3391
# stringify the exception gives access to the remote traceback
3392
# We search the last line for 'prefix'
3393
lines = str(exc).split('\n')
3394
while lines and not lines[-1]:
3397
if lines[-1].startswith(prefix):
3398
value = lines[-1][len(prefix):]
3402
class ProfileResult(ForwardingResult):
3403
"""Generate profiling data for all activity between start and success.
3405
The profile data is appended to the test's _benchcalls attribute and can
3406
be accessed by the forwarded-to TestResult.
3408
While it might be cleaner do accumulate this in stopTest, addSuccess is
3409
where our existing output support for lsprof is, and this class aims to
3410
fit in with that: while it could be moved it's not necessary to accomplish
3411
test profiling, nor would it be dramatically cleaner.
3414
def startTest(self, test):
3415
self.profiler = bzrlib.lsprof.BzrProfiler()
3416
self.profiler.start()
3417
ForwardingResult.startTest(self, test)
3419
def addSuccess(self, test):
3420
stats = self.profiler.stop()
3422
calls = test._benchcalls
3423
except AttributeError:
3424
test._benchcalls = []
3425
calls = test._benchcalls
3426
calls.append(((test.id(), "", ""), stats))
3427
ForwardingResult.addSuccess(self, test)
3429
def stopTest(self, test):
3430
ForwardingResult.stopTest(self, test)
3431
self.profiler = None
3434
# Controlled by "bzr selftest -E=..." option
3435
# Currently supported:
3436
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3437
# preserves any flags supplied at the command line.
3438
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3439
# rather than failing tests. And no longer raise
3440
# LockContention when fctnl locks are not being used
3441
# with proper exclusion rules.
3442
selftest_debug_flags = set()
3445
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3447
test_suite_factory=None,
3450
matching_tests_first=None,
3453
exclude_pattern=None,
3459
suite_decorators=None,
3463
"""Run the whole test suite under the enhanced runner"""
3464
# XXX: Very ugly way to do this...
3465
# Disable warning about old formats because we don't want it to disturb
3466
# any blackbox tests.
3467
from bzrlib import repository
3468
repository._deprecation_warning_done = True
3470
global default_transport
3471
if transport is None:
3472
transport = default_transport
3473
old_transport = default_transport
3474
default_transport = transport
3475
global selftest_debug_flags
3476
old_debug_flags = selftest_debug_flags
3477
if debug_flags is not None:
3478
selftest_debug_flags = set(debug_flags)
3480
if load_list is None:
3483
keep_only = load_test_id_list(load_list)
3485
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3486
for start in starting_with]
3487
if test_suite_factory is None:
3488
# Reduce loading time by loading modules based on the starting_with
3490
suite = test_suite(keep_only, starting_with)
3492
suite = test_suite_factory()
3494
# But always filter as requested.
3495
suite = filter_suite_by_id_startswith(suite, starting_with)
3496
result_decorators = []
3498
result_decorators.append(ProfileResult)
3499
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3500
stop_on_failure=stop_on_failure,
3501
transport=transport,
3502
lsprof_timed=lsprof_timed,
3503
bench_history=bench_history,
3504
matching_tests_first=matching_tests_first,
3505
list_only=list_only,
3506
random_seed=random_seed,
3507
exclude_pattern=exclude_pattern,
3509
runner_class=runner_class,
3510
suite_decorators=suite_decorators,
3512
result_decorators=result_decorators,
3515
default_transport = old_transport
3516
selftest_debug_flags = old_debug_flags
3519
def load_test_id_list(file_name):
3520
"""Load a test id list from a text file.
3522
The format is one test id by line. No special care is taken to impose
3523
strict rules, these test ids are used to filter the test suite so a test id
3524
that do not match an existing test will do no harm. This allows user to add
3525
comments, leave blank lines, etc.
3529
ftest = open(file_name, 'rt')
3531
if e.errno != errno.ENOENT:
3534
raise errors.NoSuchFile(file_name)
3536
for test_name in ftest.readlines():
3537
test_list.append(test_name.strip())
3542
def suite_matches_id_list(test_suite, id_list):
3543
"""Warns about tests not appearing or appearing more than once.
3545
:param test_suite: A TestSuite object.
3546
:param test_id_list: The list of test ids that should be found in
3549
:return: (absents, duplicates) absents is a list containing the test found
3550
in id_list but not in test_suite, duplicates is a list containing the
3551
test found multiple times in test_suite.
3553
When using a prefined test id list, it may occurs that some tests do not
3554
exist anymore or that some tests use the same id. This function warns the
3555
tester about potential problems in his workflow (test lists are volatile)
3556
or in the test suite itself (using the same id for several tests does not
3557
help to localize defects).
3559
# Build a dict counting id occurrences
3561
for test in iter_suite_tests(test_suite):
3563
tests[id] = tests.get(id, 0) + 1
3568
occurs = tests.get(id, 0)
3570
not_found.append(id)
3572
duplicates.append(id)
3574
return not_found, duplicates
3577
class TestIdList(object):
3578
"""Test id list to filter a test suite.
3580
Relying on the assumption that test ids are built as:
3581
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3582
notation, this class offers methods to :
3583
- avoid building a test suite for modules not refered to in the test list,
3584
- keep only the tests listed from the module test suite.
3587
def __init__(self, test_id_list):
3588
# When a test suite needs to be filtered against us we compare test ids
3589
# for equality, so a simple dict offers a quick and simple solution.
3590
self.tests = dict().fromkeys(test_id_list, True)
3592
# While unittest.TestCase have ids like:
3593
# <module>.<class>.<method>[(<param+)],
3594
# doctest.DocTestCase can have ids like:
3597
# <module>.<function>
3598
# <module>.<class>.<method>
3600
# Since we can't predict a test class from its name only, we settle on
3601
# a simple constraint: a test id always begins with its module name.
3604
for test_id in test_id_list:
3605
parts = test_id.split('.')
3606
mod_name = parts.pop(0)
3607
modules[mod_name] = True
3609
mod_name += '.' + part
3610
modules[mod_name] = True
3611
self.modules = modules
3613
def refers_to(self, module_name):
3614
"""Is there tests for the module or one of its sub modules."""
3615
return self.modules.has_key(module_name)
3617
def includes(self, test_id):
3618
return self.tests.has_key(test_id)
3621
class TestPrefixAliasRegistry(registry.Registry):
3622
"""A registry for test prefix aliases.
3624
This helps implement shorcuts for the --starting-with selftest
3625
option. Overriding existing prefixes is not allowed but not fatal (a
3626
warning will be emitted).
3629
def register(self, key, obj, help=None, info=None,
3630
override_existing=False):
3631
"""See Registry.register.
3633
Trying to override an existing alias causes a warning to be emitted,
3634
not a fatal execption.
3637
super(TestPrefixAliasRegistry, self).register(
3638
key, obj, help=help, info=info, override_existing=False)
3640
actual = self.get(key)
3641
note('Test prefix alias %s is already used for %s, ignoring %s'
3642
% (key, actual, obj))
3644
def resolve_alias(self, id_start):
3645
"""Replace the alias by the prefix in the given string.
3647
Using an unknown prefix is an error to help catching typos.
3649
parts = id_start.split('.')
3651
parts[0] = self.get(parts[0])
3653
raise errors.BzrCommandError(
3654
'%s is not a known test prefix alias' % parts[0])
3655
return '.'.join(parts)
3658
test_prefix_alias_registry = TestPrefixAliasRegistry()
3659
"""Registry of test prefix aliases."""
3662
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3663
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3664
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3666
# Obvious higest levels prefixes, feel free to add your own via a plugin
3667
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3668
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3669
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3670
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3671
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3674
def _test_suite_testmod_names():
3675
"""Return the standard list of test module names to test."""
3678
'bzrlib.tests.blackbox',
3679
'bzrlib.tests.commands',
3680
'bzrlib.tests.per_branch',
3681
'bzrlib.tests.per_bzrdir',
3682
'bzrlib.tests.per_interrepository',
3683
'bzrlib.tests.per_intertree',
3684
'bzrlib.tests.per_inventory',
3685
'bzrlib.tests.per_interbranch',
3686
'bzrlib.tests.per_lock',
3687
'bzrlib.tests.per_transport',
3688
'bzrlib.tests.per_tree',
3689
'bzrlib.tests.per_pack_repository',
3690
'bzrlib.tests.per_repository',
3691
'bzrlib.tests.per_repository_chk',
3692
'bzrlib.tests.per_repository_reference',
3693
'bzrlib.tests.per_uifactory',
3694
'bzrlib.tests.per_versionedfile',
3695
'bzrlib.tests.per_workingtree',
3696
'bzrlib.tests.test__annotator',
3697
'bzrlib.tests.test__chk_map',
3698
'bzrlib.tests.test__dirstate_helpers',
3699
'bzrlib.tests.test__groupcompress',
3700
'bzrlib.tests.test__known_graph',
3701
'bzrlib.tests.test__rio',
3702
'bzrlib.tests.test__simple_set',
3703
'bzrlib.tests.test__static_tuple',
3704
'bzrlib.tests.test__walkdirs_win32',
3705
'bzrlib.tests.test_ancestry',
3706
'bzrlib.tests.test_annotate',
3707
'bzrlib.tests.test_api',
3708
'bzrlib.tests.test_atomicfile',
3709
'bzrlib.tests.test_bad_files',
3710
'bzrlib.tests.test_bencode',
3711
'bzrlib.tests.test_bisect_multi',
3712
'bzrlib.tests.test_branch',
3713
'bzrlib.tests.test_branchbuilder',
3714
'bzrlib.tests.test_btree_index',
3715
'bzrlib.tests.test_bugtracker',
3716
'bzrlib.tests.test_bundle',
3717
'bzrlib.tests.test_bzrdir',
3718
'bzrlib.tests.test__chunks_to_lines',
3719
'bzrlib.tests.test_cache_utf8',
3720
'bzrlib.tests.test_chk_map',
3721
'bzrlib.tests.test_chk_serializer',
3722
'bzrlib.tests.test_chunk_writer',
3723
'bzrlib.tests.test_clean_tree',
3724
'bzrlib.tests.test_commands',
3725
'bzrlib.tests.test_commit',
3726
'bzrlib.tests.test_commit_merge',
3727
'bzrlib.tests.test_config',
3728
'bzrlib.tests.test_conflicts',
3729
'bzrlib.tests.test_counted_lock',
3730
'bzrlib.tests.test_crash',
3731
'bzrlib.tests.test_decorators',
3732
'bzrlib.tests.test_delta',
3733
'bzrlib.tests.test_debug',
3734
'bzrlib.tests.test_deprecated_graph',
3735
'bzrlib.tests.test_diff',
3736
'bzrlib.tests.test_directory_service',
3737
'bzrlib.tests.test_dirstate',
3738
'bzrlib.tests.test_email_message',
3739
'bzrlib.tests.test_eol_filters',
3740
'bzrlib.tests.test_errors',
3741
'bzrlib.tests.test_export',
3742
'bzrlib.tests.test_extract',
3743
'bzrlib.tests.test_fetch',
3744
'bzrlib.tests.test_fifo_cache',
3745
'bzrlib.tests.test_filters',
3746
'bzrlib.tests.test_ftp_transport',
3747
'bzrlib.tests.test_foreign',
3748
'bzrlib.tests.test_generate_docs',
3749
'bzrlib.tests.test_generate_ids',
3750
'bzrlib.tests.test_globbing',
3751
'bzrlib.tests.test_gpg',
3752
'bzrlib.tests.test_graph',
3753
'bzrlib.tests.test_groupcompress',
3754
'bzrlib.tests.test_hashcache',
3755
'bzrlib.tests.test_help',
3756
'bzrlib.tests.test_hooks',
3757
'bzrlib.tests.test_http',
3758
'bzrlib.tests.test_http_response',
3759
'bzrlib.tests.test_https_ca_bundle',
3760
'bzrlib.tests.test_identitymap',
3761
'bzrlib.tests.test_ignores',
3762
'bzrlib.tests.test_index',
3763
'bzrlib.tests.test_info',
3764
'bzrlib.tests.test_inv',
3765
'bzrlib.tests.test_inventory_delta',
3766
'bzrlib.tests.test_knit',
3767
'bzrlib.tests.test_lazy_import',
3768
'bzrlib.tests.test_lazy_regex',
3769
'bzrlib.tests.test_lock',
3770
'bzrlib.tests.test_lockable_files',
3771
'bzrlib.tests.test_lockdir',
3772
'bzrlib.tests.test_log',
3773
'bzrlib.tests.test_lru_cache',
3774
'bzrlib.tests.test_lsprof',
3775
'bzrlib.tests.test_mail_client',
3776
'bzrlib.tests.test_memorytree',
3777
'bzrlib.tests.test_merge',
3778
'bzrlib.tests.test_merge3',
3779
'bzrlib.tests.test_merge_core',
3780
'bzrlib.tests.test_merge_directive',
3781
'bzrlib.tests.test_missing',
3782
'bzrlib.tests.test_msgeditor',
3783
'bzrlib.tests.test_multiparent',
3784
'bzrlib.tests.test_mutabletree',
3785
'bzrlib.tests.test_nonascii',
3786
'bzrlib.tests.test_options',
3787
'bzrlib.tests.test_osutils',
3788
'bzrlib.tests.test_osutils_encodings',
3789
'bzrlib.tests.test_pack',
3790
'bzrlib.tests.test_patch',
3791
'bzrlib.tests.test_patches',
3792
'bzrlib.tests.test_permissions',
3793
'bzrlib.tests.test_plugins',
3794
'bzrlib.tests.test_progress',
3795
'bzrlib.tests.test_read_bundle',
3796
'bzrlib.tests.test_reconcile',
3797
'bzrlib.tests.test_reconfigure',
3798
'bzrlib.tests.test_registry',
3799
'bzrlib.tests.test_remote',
3800
'bzrlib.tests.test_rename_map',
3801
'bzrlib.tests.test_repository',
3802
'bzrlib.tests.test_revert',
3803
'bzrlib.tests.test_revision',
3804
'bzrlib.tests.test_revisionspec',
3805
'bzrlib.tests.test_revisiontree',
3806
'bzrlib.tests.test_rio',
3807
'bzrlib.tests.test_rules',
3808
'bzrlib.tests.test_sampler',
3809
'bzrlib.tests.test_script',
3810
'bzrlib.tests.test_selftest',
3811
'bzrlib.tests.test_serializer',
3812
'bzrlib.tests.test_setup',
3813
'bzrlib.tests.test_sftp_transport',
3814
'bzrlib.tests.test_shelf',
3815
'bzrlib.tests.test_shelf_ui',
3816
'bzrlib.tests.test_smart',
3817
'bzrlib.tests.test_smart_add',
3818
'bzrlib.tests.test_smart_request',
3819
'bzrlib.tests.test_smart_transport',
3820
'bzrlib.tests.test_smtp_connection',
3821
'bzrlib.tests.test_source',
3822
'bzrlib.tests.test_ssh_transport',
3823
'bzrlib.tests.test_status',
3824
'bzrlib.tests.test_store',
3825
'bzrlib.tests.test_strace',
3826
'bzrlib.tests.test_subsume',
3827
'bzrlib.tests.test_switch',
3828
'bzrlib.tests.test_symbol_versioning',
3829
'bzrlib.tests.test_tag',
3830
'bzrlib.tests.test_testament',
3831
'bzrlib.tests.test_textfile',
3832
'bzrlib.tests.test_textmerge',
3833
'bzrlib.tests.test_timestamp',
3834
'bzrlib.tests.test_trace',
3835
'bzrlib.tests.test_transactions',
3836
'bzrlib.tests.test_transform',
3837
'bzrlib.tests.test_transport',
3838
'bzrlib.tests.test_transport_log',
3839
'bzrlib.tests.test_tree',
3840
'bzrlib.tests.test_treebuilder',
3841
'bzrlib.tests.test_tsort',
3842
'bzrlib.tests.test_tuned_gzip',
3843
'bzrlib.tests.test_ui',
3844
'bzrlib.tests.test_uncommit',
3845
'bzrlib.tests.test_upgrade',
3846
'bzrlib.tests.test_upgrade_stacked',
3847
'bzrlib.tests.test_urlutils',
3848
'bzrlib.tests.test_version',
3849
'bzrlib.tests.test_version_info',
3850
'bzrlib.tests.test_weave',
3851
'bzrlib.tests.test_whitebox',
3852
'bzrlib.tests.test_win32utils',
3853
'bzrlib.tests.test_workingtree',
3854
'bzrlib.tests.test_workingtree_4',
3855
'bzrlib.tests.test_wsgi',
3856
'bzrlib.tests.test_xml',
3860
def _test_suite_modules_to_doctest():
3861
"""Return the list of modules to doctest."""
3864
'bzrlib.branchbuilder',
3867
'bzrlib.iterablefile',
3871
'bzrlib.symbol_versioning',
3874
'bzrlib.version_info_formats.format_custom',
3878
def test_suite(keep_only=None, starting_with=None):
3879
"""Build and return TestSuite for the whole of bzrlib.
3881
:param keep_only: A list of test ids limiting the suite returned.
3883
:param starting_with: An id limiting the suite returned to the tests
3886
This function can be replaced if you need to change the default test
3887
suite on a global basis, but it is not encouraged.
3890
loader = TestUtil.TestLoader()
3892
if keep_only is not None:
3893
id_filter = TestIdList(keep_only)
3895
# We take precedence over keep_only because *at loading time* using
3896
# both options means we will load less tests for the same final result.
3897
def interesting_module(name):
3898
for start in starting_with:
3900
# Either the module name starts with the specified string
3901
name.startswith(start)
3902
# or it may contain tests starting with the specified string
3903
or start.startswith(name)
3907
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3909
elif keep_only is not None:
3910
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3911
def interesting_module(name):
3912
return id_filter.refers_to(name)
3915
loader = TestUtil.TestLoader()
3916
def interesting_module(name):
3917
# No filtering, all modules are interesting
3920
suite = loader.suiteClass()
3922
# modules building their suite with loadTestsFromModuleNames
3923
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3925
for mod in _test_suite_modules_to_doctest():
3926
if not interesting_module(mod):
3927
# No tests to keep here, move along
3930
# note that this really does mean "report only" -- doctest
3931
# still runs the rest of the examples
3932
doc_suite = doctest.DocTestSuite(mod,
3933
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3934
except ValueError, e:
3935
print '**failed to get doctest for: %s\n%s' % (mod, e)
3937
if len(doc_suite._tests) == 0:
3938
raise errors.BzrError("no doctests found in %s" % (mod,))
3939
suite.addTest(doc_suite)
3941
default_encoding = sys.getdefaultencoding()
3942
for name, plugin in bzrlib.plugin.plugins().items():
3943
if not interesting_module(plugin.module.__name__):
3945
plugin_suite = plugin.test_suite()
3946
# We used to catch ImportError here and turn it into just a warning,
3947
# but really if you don't have --no-plugins this should be a failure.
3948
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3949
if plugin_suite is None:
3950
plugin_suite = plugin.load_plugin_tests(loader)
3951
if plugin_suite is not None:
3952
suite.addTest(plugin_suite)
3953
if default_encoding != sys.getdefaultencoding():
3954
bzrlib.trace.warning(
3955
'Plugin "%s" tried to reset default encoding to: %s', name,
3956
sys.getdefaultencoding())
3958
sys.setdefaultencoding(default_encoding)
3960
if keep_only is not None:
3961
# Now that the referred modules have loaded their tests, keep only the
3963
suite = filter_suite_by_id_list(suite, id_filter)
3964
# Do some sanity checks on the id_list filtering
3965
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3967
# The tester has used both keep_only and starting_with, so he is
3968
# already aware that some tests are excluded from the list, there
3969
# is no need to tell him which.
3972
# Some tests mentioned in the list are not in the test suite. The
3973
# list may be out of date, report to the tester.
3974
for id in not_found:
3975
bzrlib.trace.warning('"%s" not found in the test suite', id)
3976
for id in duplicates:
3977
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3982
def multiply_scenarios(scenarios_left, scenarios_right):
3983
"""Multiply two sets of scenarios.
3985
:returns: the cartesian product of the two sets of scenarios, that is
3986
a scenario for every possible combination of a left scenario and a
3990
('%s,%s' % (left_name, right_name),
3991
dict(left_dict.items() + right_dict.items()))
3992
for left_name, left_dict in scenarios_left
3993
for right_name, right_dict in scenarios_right]
3996
def multiply_tests(tests, scenarios, result):
3997
"""Multiply tests_list by scenarios into result.
3999
This is the core workhorse for test parameterisation.
4001
Typically the load_tests() method for a per-implementation test suite will
4002
call multiply_tests and return the result.
4004
:param tests: The tests to parameterise.
4005
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4006
scenario_param_dict).
4007
:param result: A TestSuite to add created tests to.
4009
This returns the passed in result TestSuite with the cross product of all
4010
the tests repeated once for each scenario. Each test is adapted by adding
4011
the scenario name at the end of its id(), and updating the test object's
4012
__dict__ with the scenario_param_dict.
4014
>>> import bzrlib.tests.test_sampler
4015
>>> r = multiply_tests(
4016
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4017
... [('one', dict(param=1)),
4018
... ('two', dict(param=2))],
4020
>>> tests = list(iter_suite_tests(r))
4024
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4030
for test in iter_suite_tests(tests):
4031
apply_scenarios(test, scenarios, result)
4035
def apply_scenarios(test, scenarios, result):
4036
"""Apply the scenarios in scenarios to test and add to result.
4038
:param test: The test to apply scenarios to.
4039
:param scenarios: An iterable of scenarios to apply to test.
4041
:seealso: apply_scenario
4043
for scenario in scenarios:
4044
result.addTest(apply_scenario(test, scenario))
4048
def apply_scenario(test, scenario):
4049
"""Copy test and apply scenario to it.
4051
:param test: A test to adapt.
4052
:param scenario: A tuple describing the scenarion.
4053
The first element of the tuple is the new test id.
4054
The second element is a dict containing attributes to set on the
4056
:return: The adapted test.
4058
new_id = "%s(%s)" % (test.id(), scenario[0])
4059
new_test = clone_test(test, new_id)
4060
for name, value in scenario[1].items():
4061
setattr(new_test, name, value)
4065
def clone_test(test, new_id):
4066
"""Clone a test giving it a new id.
4068
:param test: The test to clone.
4069
:param new_id: The id to assign to it.
4070
:return: The new test.
4072
new_test = copy(test)
4073
new_test.id = lambda: new_id
4077
def _rmtree_temp_dir(dirname):
4078
# If LANG=C we probably have created some bogus paths
4079
# which rmtree(unicode) will fail to delete
4080
# so make sure we are using rmtree(str) to delete everything
4081
# except on win32, where rmtree(str) will fail
4082
# since it doesn't have the property of byte-stream paths
4083
# (they are either ascii or mbcs)
4084
if sys.platform == 'win32':
4085
# make sure we are using the unicode win32 api
4086
dirname = unicode(dirname)
4088
dirname = dirname.encode(sys.getfilesystemencoding())
4090
osutils.rmtree(dirname)
4092
# We don't want to fail here because some useful display will be lost
4093
# otherwise. Polluting the tmp dir is bad, but not giving all the
4094
# possible info to the test runner is even worse.
4095
sys.stderr.write('Unable to remove testing dir %s\n%s'
4096
% (os.path.basename(dirname), e))
4099
class Feature(object):
4100
"""An operating system Feature."""
4103
self._available = None
4105
def available(self):
4106
"""Is the feature available?
4108
:return: True if the feature is available.
4110
if self._available is None:
4111
self._available = self._probe()
4112
return self._available
4115
"""Implement this method in concrete features.
4117
:return: True if the feature is available.
4119
raise NotImplementedError
4122
if getattr(self, 'feature_name', None):
4123
return self.feature_name()
4124
return self.__class__.__name__
4127
class _SymlinkFeature(Feature):
4130
return osutils.has_symlinks()
4132
def feature_name(self):
4135
SymlinkFeature = _SymlinkFeature()
4138
class _HardlinkFeature(Feature):
4141
return osutils.has_hardlinks()
4143
def feature_name(self):
4146
HardlinkFeature = _HardlinkFeature()
4149
class _OsFifoFeature(Feature):
4152
return getattr(os, 'mkfifo', None)
4154
def feature_name(self):
4155
return 'filesystem fifos'
4157
OsFifoFeature = _OsFifoFeature()
4160
class _UnicodeFilenameFeature(Feature):
4161
"""Does the filesystem support Unicode filenames?"""
4165
# Check for character combinations unlikely to be covered by any
4166
# single non-unicode encoding. We use the characters
4167
# - greek small letter alpha (U+03B1) and
4168
# - braille pattern dots-123456 (U+283F).
4169
os.stat(u'\u03b1\u283f')
4170
except UnicodeEncodeError:
4172
except (IOError, OSError):
4173
# The filesystem allows the Unicode filename but the file doesn't
4177
# The filesystem allows the Unicode filename and the file exists,
4181
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4184
def probe_unicode_in_user_encoding():
4185
"""Try to encode several unicode strings to use in unicode-aware tests.
4186
Return first successfull match.
4188
:return: (unicode value, encoded plain string value) or (None, None)
4190
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4191
for uni_val in possible_vals:
4193
str_val = uni_val.encode(osutils.get_user_encoding())
4194
except UnicodeEncodeError:
4195
# Try a different character
4198
return uni_val, str_val
4202
def probe_bad_non_ascii(encoding):
4203
"""Try to find [bad] character with code [128..255]
4204
that cannot be decoded to unicode in some encoding.
4205
Return None if all non-ascii characters is valid
4208
for i in xrange(128, 256):
4211
char.decode(encoding)
4212
except UnicodeDecodeError:
4217
class _HTTPSServerFeature(Feature):
4218
"""Some tests want an https Server, check if one is available.
4220
Right now, the only way this is available is under python2.6 which provides
4231
def feature_name(self):
4232
return 'HTTPSServer'
4235
HTTPSServerFeature = _HTTPSServerFeature()
4238
class _ParamikoFeature(Feature):
4239
"""Is paramiko available?"""
4243
from bzrlib.transport.sftp import SFTPAbsoluteServer
4245
except errors.ParamikoNotPresent:
4248
def feature_name(self):
4252
ParamikoFeature = _ParamikoFeature()
4255
class _UnicodeFilename(Feature):
4256
"""Does the filesystem support Unicode filenames?"""
4261
except UnicodeEncodeError:
4263
except (IOError, OSError):
4264
# The filesystem allows the Unicode filename but the file doesn't
4268
# The filesystem allows the Unicode filename and the file exists,
4272
UnicodeFilename = _UnicodeFilename()
4275
class _UTF8Filesystem(Feature):
4276
"""Is the filesystem UTF-8?"""
4279
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4283
UTF8Filesystem = _UTF8Filesystem()
4286
class _BreakinFeature(Feature):
4287
"""Does this platform support the breakin feature?"""
4290
from bzrlib import breakin
4291
if breakin.determine_signal() is None:
4293
if sys.platform == 'win32':
4294
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4295
# We trigger SIGBREAK via a Console api so we need ctypes to
4296
# access the function
4301
def feature_name(self):
4302
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4305
BreakinFeature = _BreakinFeature()
4308
class _CaseInsCasePresFilenameFeature(Feature):
4309
"""Is the file-system case insensitive, but case-preserving?"""
4312
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4314
# first check truly case-preserving for created files, then check
4315
# case insensitive when opening existing files.
4316
name = osutils.normpath(name)
4317
base, rel = osutils.split(name)
4318
found_rel = osutils.canonical_relpath(base, name)
4319
return (found_rel == rel
4320
and os.path.isfile(name.upper())
4321
and os.path.isfile(name.lower()))
4326
def feature_name(self):
4327
return "case-insensitive case-preserving filesystem"
4329
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4332
class _CaseInsensitiveFilesystemFeature(Feature):
4333
"""Check if underlying filesystem is case-insensitive but *not* case
4336
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4337
# more likely to be case preserving, so this case is rare.
4340
if CaseInsCasePresFilenameFeature.available():
4343
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4344
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4345
TestCaseWithMemoryTransport.TEST_ROOT = root
4347
root = TestCaseWithMemoryTransport.TEST_ROOT
4348
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4350
name_a = osutils.pathjoin(tdir, 'a')
4351
name_A = osutils.pathjoin(tdir, 'A')
4353
result = osutils.isdir(name_A)
4354
_rmtree_temp_dir(tdir)
4357
def feature_name(self):
4358
return 'case-insensitive filesystem'
4360
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4363
class _SubUnitFeature(Feature):
4364
"""Check if subunit is available."""
4373
def feature_name(self):
4376
SubUnitFeature = _SubUnitFeature()
4377
# Only define SubUnitBzrRunner if subunit is available.
4379
from subunit import TestProtocolClient
4381
from subunit.test_results import AutoTimingTestResultDecorator
4383
AutoTimingTestResultDecorator = lambda x:x
4384
class SubUnitBzrRunner(TextTestRunner):
4385
def run(self, test):
4386
result = AutoTimingTestResultDecorator(
4387
TestProtocolClient(self.stream))