1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
32
from cStringIO import StringIO
39
from pprint import pformat
44
from subprocess import Popen, PIPE, STDOUT
71
import bzrlib.commands
72
import bzrlib.timestamp
74
import bzrlib.inventory
75
import bzrlib.iterablefile
80
# lsprof not available
82
from bzrlib.merge import merge_inner
85
from bzrlib.smart import client, request, server
87
from bzrlib import symbol_versioning
88
from bzrlib.symbol_versioning import (
95
from bzrlib.transport import get_transport, pathfilter
96
import bzrlib.transport
97
from bzrlib.transport.local import LocalURLServer
98
from bzrlib.transport.memory import MemoryServer
99
from bzrlib.transport.readonly import ReadonlyServer
100
from bzrlib.trace import mutter, note
101
from bzrlib.tests import TestUtil
102
from bzrlib.tests.http_server import HttpServer
103
from bzrlib.tests.TestUtil import (
107
from bzrlib.tests.treeshape import build_tree_contents
108
from bzrlib.ui import NullProgressView
109
from bzrlib.ui.text import TextUIFactory
110
import bzrlib.version_info_formats.format_custom
111
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
113
# Mark this python module as being part of the implementation
114
# of unittest: this gives us better tracebacks where the last
115
# shown frame is the test code, not our assertXYZ.
118
default_transport = LocalURLServer
120
# Subunit result codes, defined here to prevent a hard dependency on subunit.
125
class ExtendedTestResult(unittest._TextTestResult):
126
"""Accepts, reports and accumulates the results of running tests.
128
Compared to the unittest version this class adds support for
129
profiling, benchmarking, stopping as soon as a test fails, and
130
skipping tests. There are further-specialized subclasses for
131
different types of display.
133
When a test finishes, in whatever way, it calls one of the addSuccess,
134
addFailure or addError classes. These in turn may redirect to a more
135
specific case for the special test results supported by our extended
138
Note that just one of these objects is fed the results from many tests.
143
def __init__(self, stream, descriptions, verbosity,
147
"""Construct new TestResult.
149
:param bench_history: Optionally, a writable file object to accumulate
152
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
153
if bench_history is not None:
154
from bzrlib.version import _get_bzr_source_tree
155
src_tree = _get_bzr_source_tree()
158
revision_id = src_tree.get_parent_ids()[0]
160
# XXX: if this is a brand new tree, do the same as if there
164
# XXX: If there's no branch, what should we do?
166
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
167
self._bench_history = bench_history
168
self.ui = ui.ui_factory
171
self.failure_count = 0
172
self.known_failure_count = 0
174
self.not_applicable_count = 0
175
self.unsupported = {}
177
self._overall_start_time = time.time()
178
self._strict = strict
180
def stopTestRun(self):
183
stopTime = time.time()
184
timeTaken = stopTime - self.startTime
186
self.stream.writeln(self.separator2)
187
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
188
run, run != 1 and "s" or "", timeTaken))
189
self.stream.writeln()
190
if not self.wasSuccessful():
191
self.stream.write("FAILED (")
192
failed, errored = map(len, (self.failures, self.errors))
194
self.stream.write("failures=%d" % failed)
196
if failed: self.stream.write(", ")
197
self.stream.write("errors=%d" % errored)
198
if self.known_failure_count:
199
if failed or errored: self.stream.write(", ")
200
self.stream.write("known_failure_count=%d" %
201
self.known_failure_count)
202
self.stream.writeln(")")
204
if self.known_failure_count:
205
self.stream.writeln("OK (known_failures=%d)" %
206
self.known_failure_count)
208
self.stream.writeln("OK")
209
if self.skip_count > 0:
210
skipped = self.skip_count
211
self.stream.writeln('%d test%s skipped' %
212
(skipped, skipped != 1 and "s" or ""))
214
for feature, count in sorted(self.unsupported.items()):
215
self.stream.writeln("Missing feature '%s' skipped %d tests." %
218
ok = self.wasStrictlySuccessful()
220
ok = self.wasSuccessful()
221
if TestCase._first_thread_leaker_id:
223
'%s is leaking threads among %d leaking tests.\n' % (
224
TestCase._first_thread_leaker_id,
225
TestCase._leaking_threads_tests))
226
# We don't report the main thread as an active one.
228
'%d non-main threads were left active in the end.\n'
229
% (TestCase._active_threads - 1))
231
def _extractBenchmarkTime(self, testCase):
232
"""Add a benchmark time for the current test case."""
233
return getattr(testCase, "_benchtime", None)
235
def _elapsedTestTimeString(self):
236
"""Return a time string for the overall time the current test has taken."""
237
return self._formatTime(time.time() - self._start_time)
239
def _testTimeString(self, testCase):
240
benchmark_time = self._extractBenchmarkTime(testCase)
241
if benchmark_time is not None:
242
return self._formatTime(benchmark_time) + "*"
244
return self._elapsedTestTimeString()
246
def _formatTime(self, seconds):
247
"""Format seconds as milliseconds with leading spaces."""
248
# some benchmarks can take thousands of seconds to run, so we need 8
250
return "%8dms" % (1000 * seconds)
252
def _shortened_test_description(self, test):
254
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
257
def startTest(self, test):
258
unittest.TestResult.startTest(self, test)
261
self.report_test_start(test)
262
test.number = self.count
263
self._recordTestStartTime()
265
def startTests(self):
267
if getattr(sys, 'frozen', None) is None:
268
bzr_path = osutils.realpath(sys.argv[0])
270
bzr_path = sys.executable
272
'testing: %s\n' % (bzr_path,))
275
bzrlib.__path__[0],))
277
' bzr-%s python-%s %s\n' % (
278
bzrlib.version_string,
279
bzrlib._format_version_tuple(sys.version_info),
280
platform.platform(aliased=1),
282
self.stream.write('\n')
284
def _recordTestStartTime(self):
285
"""Record that a test has started."""
286
self._start_time = time.time()
288
def _cleanupLogFile(self, test):
289
# We can only do this if we have one of our TestCases, not if
291
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
292
if setKeepLogfile is not None:
295
def addError(self, test, err):
296
"""Tell result that test finished with an error.
298
Called from the TestCase run() method when the test
299
fails with an unexpected error.
301
self._testConcluded(test)
302
if isinstance(err[1], TestNotApplicable):
303
return self._addNotApplicable(test, err)
304
elif isinstance(err[1], UnavailableFeature):
305
return self.addNotSupported(test, err[1].args[0])
308
unittest.TestResult.addError(self, test, err)
309
self.error_count += 1
310
self.report_error(test, err)
313
self._cleanupLogFile(test)
315
def addFailure(self, test, err):
316
"""Tell result that test failed.
318
Called from the TestCase run() method when the test
319
fails because e.g. an assert() method failed.
321
self._testConcluded(test)
322
if isinstance(err[1], KnownFailure):
323
return self._addKnownFailure(test, err)
326
unittest.TestResult.addFailure(self, test, err)
327
self.failure_count += 1
328
self.report_failure(test, err)
331
self._cleanupLogFile(test)
333
def addSuccess(self, test):
334
"""Tell result that test completed successfully.
336
Called from the TestCase run()
338
self._testConcluded(test)
339
if self._bench_history is not None:
340
benchmark_time = self._extractBenchmarkTime(test)
341
if benchmark_time is not None:
342
self._bench_history.write("%s %s\n" % (
343
self._formatTime(benchmark_time),
345
self.report_success(test)
346
self._cleanupLogFile(test)
347
unittest.TestResult.addSuccess(self, test)
348
test._log_contents = ''
350
def _testConcluded(self, test):
351
"""Common code when a test has finished.
353
Called regardless of whether it succeded, failed, etc.
357
def _addKnownFailure(self, test, err):
358
self.known_failure_count += 1
359
self.report_known_failure(test, err)
361
def addNotSupported(self, test, feature):
362
"""The test will not be run because of a missing feature.
364
# this can be called in two different ways: it may be that the
365
# test started running, and then raised (through addError)
366
# UnavailableFeature. Alternatively this method can be called
367
# while probing for features before running the tests; in that
368
# case we will see startTest and stopTest, but the test will never
370
self.unsupported.setdefault(str(feature), 0)
371
self.unsupported[str(feature)] += 1
372
self.report_unsupported(test, feature)
374
def addSkip(self, test, reason):
375
"""A test has not run for 'reason'."""
377
self.report_skip(test, reason)
379
def _addNotApplicable(self, test, skip_excinfo):
380
if isinstance(skip_excinfo[1], TestNotApplicable):
381
self.not_applicable_count += 1
382
self.report_not_applicable(test, skip_excinfo)
385
except KeyboardInterrupt:
388
self.addError(test, test.exc_info())
390
# seems best to treat this as success from point-of-view of unittest
391
# -- it actually does nothing so it barely matters :)
392
unittest.TestResult.addSuccess(self, test)
393
test._log_contents = ''
395
def printErrorList(self, flavour, errors):
396
for test, err in errors:
397
self.stream.writeln(self.separator1)
398
self.stream.write("%s: " % flavour)
399
self.stream.writeln(self.getDescription(test))
400
if getattr(test, '_get_log', None) is not None:
401
log_contents = test._get_log()
403
self.stream.write('\n')
405
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
406
self.stream.write('\n')
407
self.stream.write(log_contents)
408
self.stream.write('\n')
410
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
411
self.stream.write('\n')
412
self.stream.writeln(self.separator2)
413
self.stream.writeln("%s" % err)
415
def _post_mortem(self):
416
"""Start a PDB post mortem session."""
417
if os.environ.get('BZR_TEST_PDB', None):
418
import pdb;pdb.post_mortem()
420
def progress(self, offset, whence):
421
"""The test is adjusting the count of tests to run."""
422
if whence == SUBUNIT_SEEK_SET:
423
self.num_tests = offset
424
elif whence == SUBUNIT_SEEK_CUR:
425
self.num_tests += offset
427
raise errors.BzrError("Unknown whence %r" % whence)
429
def report_cleaning_up(self):
432
def startTestRun(self):
433
self.startTime = time.time()
435
def report_success(self, test):
438
def wasStrictlySuccessful(self):
439
if self.unsupported or self.known_failure_count:
441
return self.wasSuccessful()
444
class TextTestResult(ExtendedTestResult):
445
"""Displays progress and results of tests in text form"""
447
def __init__(self, stream, descriptions, verbosity,
452
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
453
bench_history, strict)
454
# We no longer pass them around, but just rely on the UIFactory stack
457
warnings.warn("Passing pb to TextTestResult is deprecated")
458
self.pb = self.ui.nested_progress_bar()
459
self.pb.show_pct = False
460
self.pb.show_spinner = False
461
self.pb.show_eta = False,
462
self.pb.show_count = False
463
self.pb.show_bar = False
464
self.pb.update_latency = 0
465
self.pb.show_transport_activity = False
467
def stopTestRun(self):
468
# called when the tests that are going to run have run
471
super(TextTestResult, self).stopTestRun()
473
def startTestRun(self):
474
super(TextTestResult, self).startTestRun()
475
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
477
def printErrors(self):
478
# clear the pb to make room for the error listing
480
super(TextTestResult, self).printErrors()
482
def _progress_prefix_text(self):
483
# the longer this text, the less space we have to show the test
485
a = '[%d' % self.count # total that have been run
486
# tests skipped as known not to be relevant are not important enough
488
## if self.skip_count:
489
## a += ', %d skip' % self.skip_count
490
## if self.known_failure_count:
491
## a += '+%dX' % self.known_failure_count
493
a +='/%d' % self.num_tests
495
runtime = time.time() - self._overall_start_time
497
a += '%dm%ds' % (runtime / 60, runtime % 60)
501
a += ', %d err' % self.error_count
502
if self.failure_count:
503
a += ', %d fail' % self.failure_count
504
# if self.unsupported:
505
# a += ', %d missing' % len(self.unsupported)
509
def report_test_start(self, test):
512
self._progress_prefix_text()
514
+ self._shortened_test_description(test))
516
def _test_description(self, test):
517
return self._shortened_test_description(test)
519
def report_error(self, test, err):
520
ui.ui_factory.note('ERROR: %s\n %s\n' % (
521
self._test_description(test),
525
def report_failure(self, test, err):
526
ui.ui_factory.note('FAIL: %s\n %s\n' % (
527
self._test_description(test),
531
def report_known_failure(self, test, err):
532
ui.ui_factory.note('XFAIL: %s\n%s\n' % (
533
self._test_description(test), err[1]))
535
def report_skip(self, test, reason):
538
def report_not_applicable(self, test, skip_excinfo):
541
def report_unsupported(self, test, feature):
542
"""test cannot be run because feature is missing."""
544
def report_cleaning_up(self):
545
self.pb.update('Cleaning up')
548
class VerboseTestResult(ExtendedTestResult):
549
"""Produce long output, with one line per test run plus times"""
551
def _ellipsize_to_right(self, a_string, final_width):
552
"""Truncate and pad a string, keeping the right hand side"""
553
if len(a_string) > final_width:
554
result = '...' + a_string[3-final_width:]
557
return result.ljust(final_width)
559
def startTestRun(self):
560
super(VerboseTestResult, self).startTestRun()
561
self.stream.write('running %d tests...\n' % self.num_tests)
563
def report_test_start(self, test):
565
name = self._shortened_test_description(test)
566
# width needs space for 6 char status, plus 1 for slash, plus an
567
# 11-char time string, plus a trailing blank
568
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
569
self.stream.write(self._ellipsize_to_right(name,
570
osutils.terminal_width()-18))
573
def _error_summary(self, err):
575
return '%s%s' % (indent, err[1])
577
def report_error(self, test, err):
578
self.stream.writeln('ERROR %s\n%s'
579
% (self._testTimeString(test),
580
self._error_summary(err)))
582
def report_failure(self, test, err):
583
self.stream.writeln(' FAIL %s\n%s'
584
% (self._testTimeString(test),
585
self._error_summary(err)))
587
def report_known_failure(self, test, err):
588
self.stream.writeln('XFAIL %s\n%s'
589
% (self._testTimeString(test),
590
self._error_summary(err)))
592
def report_success(self, test):
593
self.stream.writeln(' OK %s' % self._testTimeString(test))
594
for bench_called, stats in getattr(test, '_benchcalls', []):
595
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
596
stats.pprint(file=self.stream)
597
# flush the stream so that we get smooth output. This verbose mode is
598
# used to show the output in PQM.
601
def report_skip(self, test, reason):
602
self.stream.writeln(' SKIP %s\n%s'
603
% (self._testTimeString(test), reason))
605
def report_not_applicable(self, test, skip_excinfo):
606
self.stream.writeln(' N/A %s\n%s'
607
% (self._testTimeString(test),
608
self._error_summary(skip_excinfo)))
610
def report_unsupported(self, test, feature):
611
"""test cannot be run because feature is missing."""
612
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
613
%(self._testTimeString(test), feature))
616
class TextTestRunner(object):
617
stop_on_failure = False
625
result_decorators=None,
627
"""Create a TextTestRunner.
629
:param result_decorators: An optional list of decorators to apply
630
to the result object being used by the runner. Decorators are
631
applied left to right - the first element in the list is the
634
self.stream = unittest._WritelnDecorator(stream)
635
self.descriptions = descriptions
636
self.verbosity = verbosity
637
self._bench_history = bench_history
638
self._strict = strict
639
self._result_decorators = result_decorators or []
642
"Run the given test case or test suite."
643
if self.verbosity == 1:
644
result_class = TextTestResult
645
elif self.verbosity >= 2:
646
result_class = VerboseTestResult
647
original_result = result_class(self.stream,
650
bench_history=self._bench_history,
653
# Signal to result objects that look at stop early policy to stop,
654
original_result.stop_early = self.stop_on_failure
655
result = original_result
656
for decorator in self._result_decorators:
657
result = decorator(result)
658
result.stop_early = self.stop_on_failure
664
if isinstance(test, testtools.ConcurrentTestSuite):
665
# We need to catch bzr specific behaviors
666
result = BZRTransformingResult(result)
667
result.startTestRun()
672
# higher level code uses our extended protocol to determine
673
# what exit code to give.
674
return original_result
677
def iter_suite_tests(suite):
678
"""Return all tests in a suite, recursing through nested suites"""
679
if isinstance(suite, unittest.TestCase):
681
elif isinstance(suite, unittest.TestSuite):
683
for r in iter_suite_tests(item):
686
raise Exception('unknown type %r for object %r'
687
% (type(suite), suite))
690
class TestSkipped(Exception):
691
"""Indicates that a test was intentionally skipped, rather than failing."""
694
class TestNotApplicable(TestSkipped):
695
"""A test is not applicable to the situation where it was run.
697
This is only normally raised by parameterized tests, if they find that
698
the instance they're constructed upon does not support one aspect
703
class KnownFailure(AssertionError):
704
"""Indicates that a test failed in a precisely expected manner.
706
Such failures dont block the whole test suite from passing because they are
707
indicators of partially completed code or of future work. We have an
708
explicit error for them so that we can ensure that they are always visible:
709
KnownFailures are always shown in the output of bzr selftest.
713
class UnavailableFeature(Exception):
714
"""A feature required for this test was not available.
716
The feature should be used to construct the exception.
720
class CommandFailed(Exception):
724
class StringIOWrapper(object):
725
"""A wrapper around cStringIO which just adds an encoding attribute.
727
Internally we can check sys.stdout to see what the output encoding
728
should be. However, cStringIO has no encoding attribute that we can
729
set. So we wrap it instead.
734
def __init__(self, s=None):
736
self.__dict__['_cstring'] = StringIO(s)
738
self.__dict__['_cstring'] = StringIO()
740
def __getattr__(self, name, getattr=getattr):
741
return getattr(self.__dict__['_cstring'], name)
743
def __setattr__(self, name, val):
744
if name == 'encoding':
745
self.__dict__['encoding'] = val
747
return setattr(self._cstring, name, val)
750
class TestUIFactory(TextUIFactory):
751
"""A UI Factory for testing.
753
Hide the progress bar but emit note()s.
755
Allows get_password to be tested without real tty attached.
757
See also CannedInputUIFactory which lets you provide programmatic input in
760
# TODO: Capture progress events at the model level and allow them to be
761
# observed by tests that care.
763
# XXX: Should probably unify more with CannedInputUIFactory or a
764
# particular configuration of TextUIFactory, or otherwise have a clearer
765
# idea of how they're supposed to be different.
766
# See https://bugs.edge.launchpad.net/bzr/+bug/408213
768
def __init__(self, stdout=None, stderr=None, stdin=None):
769
if stdin is not None:
770
# We use a StringIOWrapper to be able to test various
771
# encodings, but the user is still responsible to
772
# encode the string and to set the encoding attribute
773
# of StringIOWrapper.
774
stdin = StringIOWrapper(stdin)
775
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
777
def get_non_echoed_password(self):
778
"""Get password from stdin without trying to handle the echo mode"""
779
password = self.stdin.readline()
782
if password[-1] == '\n':
783
password = password[:-1]
786
def make_progress_view(self):
787
return NullProgressView()
790
class TestCase(unittest.TestCase):
791
"""Base class for bzr unit tests.
793
Tests that need access to disk resources should subclass
794
TestCaseInTempDir not TestCase.
796
Error and debug log messages are redirected from their usual
797
location into a temporary file, the contents of which can be
798
retrieved by _get_log(). We use a real OS file, not an in-memory object,
799
so that it can also capture file IO. When the test completes this file
800
is read into memory and removed from disk.
802
There are also convenience functions to invoke bzr's command-line
803
routine, and to build and check bzr trees.
805
In addition to the usual method of overriding tearDown(), this class also
806
allows subclasses to register functions into the _cleanups list, which is
807
run in order as the object is torn down. It's less likely this will be
808
accidentally overlooked.
811
_active_threads = None
812
_leaking_threads_tests = 0
813
_first_thread_leaker_id = None
814
_log_file_name = None
816
_keep_log_file = False
817
# record lsprof data when performing benchmark calls.
818
_gather_lsprof_in_benchmarks = False
819
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
820
'_log_contents', '_log_file_name', '_benchtime',
821
'_TestCase__testMethodName', '_TestCase__testMethodDoc',)
823
def __init__(self, methodName='testMethod'):
824
super(TestCase, self).__init__(methodName)
826
self._bzr_test_setUp_run = False
827
self._bzr_test_tearDown_run = False
828
self._directory_isolation = True
831
unittest.TestCase.setUp(self)
832
self._bzr_test_setUp_run = True
833
self._cleanEnvironment()
836
self._benchcalls = []
837
self._benchtime = None
839
self._track_transports()
841
self._clear_debug_flags()
842
TestCase._active_threads = threading.activeCount()
843
self.addCleanup(self._check_leaked_threads)
848
pdb.Pdb().set_trace(sys._getframe().f_back)
850
def _check_leaked_threads(self):
851
active = threading.activeCount()
852
leaked_threads = active - TestCase._active_threads
853
TestCase._active_threads = active
854
# If some tests make the number of threads *decrease*, we'll consider
855
# that they are just observing old threads dieing, not agressively kill
856
# random threads. So we don't report these tests as leaking. The risk
857
# is that we have false positives that way (the test see 2 threads
858
# going away but leak one) but it seems less likely than the actual
859
# false positives (the test see threads going away and does not leak).
860
if leaked_threads > 0:
861
TestCase._leaking_threads_tests += 1
862
if TestCase._first_thread_leaker_id is None:
863
TestCase._first_thread_leaker_id = self.id()
865
def _clear_debug_flags(self):
866
"""Prevent externally set debug flags affecting tests.
868
Tests that want to use debug flags can just set them in the
869
debug_flags set during setup/teardown.
871
self._preserved_debug_flags = set(debug.debug_flags)
872
if 'allow_debug' not in selftest_debug_flags:
873
debug.debug_flags.clear()
874
if 'disable_lock_checks' not in selftest_debug_flags:
875
debug.debug_flags.add('strict_locks')
876
self.addCleanup(self._restore_debug_flags)
878
def _clear_hooks(self):
879
# prevent hooks affecting tests
880
self._preserved_hooks = {}
881
for key, factory in hooks.known_hooks.items():
882
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
883
current_hooks = hooks.known_hooks_key_to_object(key)
884
self._preserved_hooks[parent] = (name, current_hooks)
885
self.addCleanup(self._restoreHooks)
886
for key, factory in hooks.known_hooks.items():
887
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
888
setattr(parent, name, factory())
889
# this hook should always be installed
890
request._install_hook()
892
def disable_directory_isolation(self):
893
"""Turn off directory isolation checks."""
894
self._directory_isolation = False
896
def enable_directory_isolation(self):
897
"""Enable directory isolation checks."""
898
self._directory_isolation = True
900
def _silenceUI(self):
901
"""Turn off UI for duration of test"""
902
# by default the UI is off; tests can turn it on if they want it.
903
saved = ui.ui_factory
905
ui.ui_factory = saved
906
ui.ui_factory = ui.SilentUIFactory()
907
self.addCleanup(_restore)
909
def _check_locks(self):
910
"""Check that all lock take/release actions have been paired."""
911
# We always check for mismatched locks. If a mismatch is found, we
912
# fail unless -Edisable_lock_checks is supplied to selftest, in which
913
# case we just print a warning.
915
acquired_locks = [lock for action, lock in self._lock_actions
916
if action == 'acquired']
917
released_locks = [lock for action, lock in self._lock_actions
918
if action == 'released']
919
broken_locks = [lock for action, lock in self._lock_actions
920
if action == 'broken']
921
# trivially, given the tests for lock acquistion and release, if we
922
# have as many in each list, it should be ok. Some lock tests also
923
# break some locks on purpose and should be taken into account by
924
# considering that breaking a lock is just a dirty way of releasing it.
925
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
926
message = ('Different number of acquired and '
927
'released or broken locks. (%s, %s + %s)' %
928
(acquired_locks, released_locks, broken_locks))
929
if not self._lock_check_thorough:
930
# Rather than fail, just warn
931
print "Broken test %s: %s" % (self, message)
935
def _track_locks(self):
936
"""Track lock activity during tests."""
937
self._lock_actions = []
938
if 'disable_lock_checks' in selftest_debug_flags:
939
self._lock_check_thorough = False
941
self._lock_check_thorough = True
943
self.addCleanup(self._check_locks)
944
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
945
self._lock_acquired, None)
946
_mod_lock.Lock.hooks.install_named_hook('lock_released',
947
self._lock_released, None)
948
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
949
self._lock_broken, None)
951
def _lock_acquired(self, result):
952
self._lock_actions.append(('acquired', result))
954
def _lock_released(self, result):
955
self._lock_actions.append(('released', result))
957
def _lock_broken(self, result):
958
self._lock_actions.append(('broken', result))
960
def permit_dir(self, name):
961
"""Permit a directory to be used by this test. See permit_url."""
962
name_transport = get_transport(name)
963
self.permit_url(name)
964
self.permit_url(name_transport.base)
966
def permit_url(self, url):
967
"""Declare that url is an ok url to use in this test.
969
Do this for memory transports, temporary test directory etc.
971
Do not do this for the current working directory, /tmp, or any other
972
preexisting non isolated url.
974
if not url.endswith('/'):
976
self._bzr_selftest_roots.append(url)
978
def permit_source_tree_branch_repo(self):
979
"""Permit the source tree bzr is running from to be opened.
981
Some code such as bzrlib.version attempts to read from the bzr branch
982
that bzr is executing from (if any). This method permits that directory
983
to be used in the test suite.
985
path = self.get_source_path()
986
self.record_directory_isolation()
989
workingtree.WorkingTree.open(path)
990
except (errors.NotBranchError, errors.NoWorkingTree):
993
self.enable_directory_isolation()
995
def _preopen_isolate_transport(self, transport):
996
"""Check that all transport openings are done in the test work area."""
997
while isinstance(transport, pathfilter.PathFilteringTransport):
998
# Unwrap pathfiltered transports
999
transport = transport.server.backing_transport.clone(
1000
transport._filter('.'))
1001
url = transport.base
1002
# ReadonlySmartTCPServer_for_testing decorates the backing transport
1003
# urls it is given by prepending readonly+. This is appropriate as the
1004
# client shouldn't know that the server is readonly (or not readonly).
1005
# We could register all servers twice, with readonly+ prepending, but
1006
# that makes for a long list; this is about the same but easier to
1008
if url.startswith('readonly+'):
1009
url = url[len('readonly+'):]
1010
self._preopen_isolate_url(url)
1012
def _preopen_isolate_url(self, url):
1013
if not self._directory_isolation:
1015
if self._directory_isolation == 'record':
1016
self._bzr_selftest_roots.append(url)
1018
# This prevents all transports, including e.g. sftp ones backed on disk
1019
# from working unless they are explicitly granted permission. We then
1020
# depend on the code that sets up test transports to check that they are
1021
# appropriately isolated and enable their use by calling
1022
# self.permit_transport()
1023
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1024
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1025
% (url, self._bzr_selftest_roots))
1027
def record_directory_isolation(self):
1028
"""Gather accessed directories to permit later access.
1030
This is used for tests that access the branch bzr is running from.
1032
self._directory_isolation = "record"
1034
def start_server(self, transport_server, backing_server=None):
1035
"""Start transport_server for this test.
1037
This starts the server, registers a cleanup for it and permits the
1038
server's urls to be used.
1040
if backing_server is None:
1041
transport_server.setUp()
1043
transport_server.setUp(backing_server)
1044
self.addCleanup(transport_server.tearDown)
1045
# Obtain a real transport because if the server supplies a password, it
1046
# will be hidden from the base on the client side.
1047
t = get_transport(transport_server.get_url())
1048
# Some transport servers effectively chroot the backing transport;
1049
# others like SFTPServer don't - users of the transport can walk up the
1050
# transport to read the entire backing transport. This wouldn't matter
1051
# except that the workdir tests are given - and that they expect the
1052
# server's url to point at - is one directory under the safety net. So
1053
# Branch operations into the transport will attempt to walk up one
1054
# directory. Chrooting all servers would avoid this but also mean that
1055
# we wouldn't be testing directly against non-root urls. Alternatively
1056
# getting the test framework to start the server with a backing server
1057
# at the actual safety net directory would work too, but this then
1058
# means that the self.get_url/self.get_transport methods would need
1059
# to transform all their results. On balance its cleaner to handle it
1060
# here, and permit a higher url when we have one of these transports.
1061
if t.base.endswith('/work/'):
1062
# we have safety net/test root/work
1063
t = t.clone('../..')
1064
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1065
# The smart server adds a path similar to work, which is traversed
1066
# up from by the client. But the server is chrooted - the actual
1067
# backing transport is not escaped from, and VFS requests to the
1068
# root will error (because they try to escape the chroot).
1070
while t2.base != t.base:
1073
self.permit_url(t.base)
1075
def _track_transports(self):
1076
"""Install checks for transport usage."""
1077
# TestCase has no safe place it can write to.
1078
self._bzr_selftest_roots = []
1079
# Currently the easiest way to be sure that nothing is going on is to
1080
# hook into bzr dir opening. This leaves a small window of error for
1081
# transport tests, but they are well known, and we can improve on this
1083
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1084
self._preopen_isolate_transport, "Check bzr directories are safe.")
1086
def _ndiff_strings(self, a, b):
1087
"""Return ndiff between two strings containing lines.
1089
A trailing newline is added if missing to make the strings
1091
if b and b[-1] != '\n':
1093
if a and a[-1] != '\n':
1095
difflines = difflib.ndiff(a.splitlines(True),
1097
linejunk=lambda x: False,
1098
charjunk=lambda x: False)
1099
return ''.join(difflines)
1101
def assertEqual(self, a, b, message=''):
1105
except UnicodeError, e:
1106
# If we can't compare without getting a UnicodeError, then
1107
# obviously they are different
1108
mutter('UnicodeError: %s', e)
1111
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1113
pformat(a), pformat(b)))
1115
assertEquals = assertEqual
1117
def assertEqualDiff(self, a, b, message=None):
1118
"""Assert two texts are equal, if not raise an exception.
1120
This is intended for use with multi-line strings where it can
1121
be hard to find the differences by eye.
1123
# TODO: perhaps override assertEquals to call this for strings?
1127
message = "texts not equal:\n"
1129
message = 'first string is missing a final newline.\n'
1131
message = 'second string is missing a final newline.\n'
1132
raise AssertionError(message +
1133
self._ndiff_strings(a, b))
1135
def assertEqualMode(self, mode, mode_test):
1136
self.assertEqual(mode, mode_test,
1137
'mode mismatch %o != %o' % (mode, mode_test))
1139
def assertEqualStat(self, expected, actual):
1140
"""assert that expected and actual are the same stat result.
1142
:param expected: A stat result.
1143
:param actual: A stat result.
1144
:raises AssertionError: If the expected and actual stat values differ
1145
other than by atime.
1147
self.assertEqual(expected.st_size, actual.st_size)
1148
self.assertEqual(expected.st_mtime, actual.st_mtime)
1149
self.assertEqual(expected.st_ctime, actual.st_ctime)
1150
self.assertEqual(expected.st_dev, actual.st_dev)
1151
self.assertEqual(expected.st_ino, actual.st_ino)
1152
self.assertEqual(expected.st_mode, actual.st_mode)
1154
def assertLength(self, length, obj_with_len):
1155
"""Assert that obj_with_len is of length length."""
1156
if len(obj_with_len) != length:
1157
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1158
length, len(obj_with_len), obj_with_len))
1160
def assertLogsError(self, exception_class, func, *args, **kwargs):
1161
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1163
from bzrlib import trace
1165
orig_log_exception_quietly = trace.log_exception_quietly
1168
orig_log_exception_quietly()
1169
captured.append(sys.exc_info())
1170
trace.log_exception_quietly = capture
1171
func(*args, **kwargs)
1173
trace.log_exception_quietly = orig_log_exception_quietly
1174
self.assertLength(1, captured)
1175
err = captured[0][1]
1176
self.assertIsInstance(err, exception_class)
1179
def assertPositive(self, val):
1180
"""Assert that val is greater than 0."""
1181
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1183
def assertNegative(self, val):
1184
"""Assert that val is less than 0."""
1185
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1187
def assertStartsWith(self, s, prefix):
1188
if not s.startswith(prefix):
1189
raise AssertionError('string %r does not start with %r' % (s, prefix))
1191
def assertEndsWith(self, s, suffix):
1192
"""Asserts that s ends with suffix."""
1193
if not s.endswith(suffix):
1194
raise AssertionError('string %r does not end with %r' % (s, suffix))
1196
def assertContainsRe(self, haystack, needle_re, flags=0):
1197
"""Assert that a contains something matching a regular expression."""
1198
if not re.search(needle_re, haystack, flags):
1199
if '\n' in haystack or len(haystack) > 60:
1200
# a long string, format it in a more readable way
1201
raise AssertionError(
1202
'pattern "%s" not found in\n"""\\\n%s"""\n'
1203
% (needle_re, haystack))
1205
raise AssertionError('pattern "%s" not found in "%s"'
1206
% (needle_re, haystack))
1208
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1209
"""Assert that a does not match a regular expression"""
1210
if re.search(needle_re, haystack, flags):
1211
raise AssertionError('pattern "%s" found in "%s"'
1212
% (needle_re, haystack))
1214
def assertSubset(self, sublist, superlist):
1215
"""Assert that every entry in sublist is present in superlist."""
1216
missing = set(sublist) - set(superlist)
1217
if len(missing) > 0:
1218
raise AssertionError("value(s) %r not present in container %r" %
1219
(missing, superlist))
1221
def assertListRaises(self, excClass, func, *args, **kwargs):
1222
"""Fail unless excClass is raised when the iterator from func is used.
1224
Many functions can return generators this makes sure
1225
to wrap them in a list() call to make sure the whole generator
1226
is run, and that the proper exception is raised.
1229
list(func(*args, **kwargs))
1233
if getattr(excClass,'__name__', None) is not None:
1234
excName = excClass.__name__
1236
excName = str(excClass)
1237
raise self.failureException, "%s not raised" % excName
1239
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1240
"""Assert that a callable raises a particular exception.
1242
:param excClass: As for the except statement, this may be either an
1243
exception class, or a tuple of classes.
1244
:param callableObj: A callable, will be passed ``*args`` and
1247
Returns the exception so that you can examine it.
1250
callableObj(*args, **kwargs)
1254
if getattr(excClass,'__name__', None) is not None:
1255
excName = excClass.__name__
1258
excName = str(excClass)
1259
raise self.failureException, "%s not raised" % excName
1261
def assertIs(self, left, right, message=None):
1262
if not (left is right):
1263
if message is not None:
1264
raise AssertionError(message)
1266
raise AssertionError("%r is not %r." % (left, right))
1268
def assertIsNot(self, left, right, message=None):
1270
if message is not None:
1271
raise AssertionError(message)
1273
raise AssertionError("%r is %r." % (left, right))
1275
def assertTransportMode(self, transport, path, mode):
1276
"""Fail if a path does not have mode "mode".
1278
If modes are not supported on this transport, the assertion is ignored.
1280
if not transport._can_roundtrip_unix_modebits():
1282
path_stat = transport.stat(path)
1283
actual_mode = stat.S_IMODE(path_stat.st_mode)
1284
self.assertEqual(mode, actual_mode,
1285
'mode of %r incorrect (%s != %s)'
1286
% (path, oct(mode), oct(actual_mode)))
1288
def assertIsSameRealPath(self, path1, path2):
1289
"""Fail if path1 and path2 points to different files"""
1290
self.assertEqual(osutils.realpath(path1),
1291
osutils.realpath(path2),
1292
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1294
def assertIsInstance(self, obj, kls, msg=None):
1295
"""Fail if obj is not an instance of kls
1297
:param msg: Supplementary message to show if the assertion fails.
1299
if not isinstance(obj, kls):
1300
m = "%r is an instance of %s rather than %s" % (
1301
obj, obj.__class__, kls)
1306
def expectFailure(self, reason, assertion, *args, **kwargs):
1307
"""Invoke a test, expecting it to fail for the given reason.
1309
This is for assertions that ought to succeed, but currently fail.
1310
(The failure is *expected* but not *wanted*.) Please be very precise
1311
about the failure you're expecting. If a new bug is introduced,
1312
AssertionError should be raised, not KnownFailure.
1314
Frequently, expectFailure should be followed by an opposite assertion.
1317
Intended to be used with a callable that raises AssertionError as the
1318
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1320
Raises KnownFailure if the test fails. Raises AssertionError if the
1325
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1327
self.assertEqual(42, dynamic_val)
1329
This means that a dynamic_val of 54 will cause the test to raise
1330
a KnownFailure. Once math is fixed and the expectFailure is removed,
1331
only a dynamic_val of 42 will allow the test to pass. Anything other
1332
than 54 or 42 will cause an AssertionError.
1335
assertion(*args, **kwargs)
1336
except AssertionError:
1337
raise KnownFailure(reason)
1339
self.fail('Unexpected success. Should have failed: %s' % reason)
1341
def assertFileEqual(self, content, path):
1342
"""Fail if path does not contain 'content'."""
1343
self.failUnlessExists(path)
1344
f = file(path, 'rb')
1349
self.assertEqualDiff(content, s)
1351
def failUnlessExists(self, path):
1352
"""Fail unless path or paths, which may be abs or relative, exist."""
1353
if not isinstance(path, basestring):
1355
self.failUnlessExists(p)
1357
self.failUnless(osutils.lexists(path),path+" does not exist")
1359
def failIfExists(self, path):
1360
"""Fail if path or paths, which may be abs or relative, exist."""
1361
if not isinstance(path, basestring):
1363
self.failIfExists(p)
1365
self.failIf(osutils.lexists(path),path+" exists")
1367
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1368
"""A helper for callDeprecated and applyDeprecated.
1370
:param a_callable: A callable to call.
1371
:param args: The positional arguments for the callable
1372
:param kwargs: The keyword arguments for the callable
1373
:return: A tuple (warnings, result). result is the result of calling
1374
a_callable(``*args``, ``**kwargs``).
1377
def capture_warnings(msg, cls=None, stacklevel=None):
1378
# we've hooked into a deprecation specific callpath,
1379
# only deprecations should getting sent via it.
1380
self.assertEqual(cls, DeprecationWarning)
1381
local_warnings.append(msg)
1382
original_warning_method = symbol_versioning.warn
1383
symbol_versioning.set_warning_method(capture_warnings)
1385
result = a_callable(*args, **kwargs)
1387
symbol_versioning.set_warning_method(original_warning_method)
1388
return (local_warnings, result)
1390
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1391
"""Call a deprecated callable without warning the user.
1393
Note that this only captures warnings raised by symbol_versioning.warn,
1394
not other callers that go direct to the warning module.
1396
To test that a deprecated method raises an error, do something like
1399
self.assertRaises(errors.ReservedId,
1400
self.applyDeprecated,
1401
deprecated_in((1, 5, 0)),
1405
:param deprecation_format: The deprecation format that the callable
1406
should have been deprecated with. This is the same type as the
1407
parameter to deprecated_method/deprecated_function. If the
1408
callable is not deprecated with this format, an assertion error
1410
:param a_callable: A callable to call. This may be a bound method or
1411
a regular function. It will be called with ``*args`` and
1413
:param args: The positional arguments for the callable
1414
:param kwargs: The keyword arguments for the callable
1415
:return: The result of a_callable(``*args``, ``**kwargs``)
1417
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1419
expected_first_warning = symbol_versioning.deprecation_string(
1420
a_callable, deprecation_format)
1421
if len(call_warnings) == 0:
1422
self.fail("No deprecation warning generated by call to %s" %
1424
self.assertEqual(expected_first_warning, call_warnings[0])
1427
def callCatchWarnings(self, fn, *args, **kw):
1428
"""Call a callable that raises python warnings.
1430
The caller's responsible for examining the returned warnings.
1432
If the callable raises an exception, the exception is not
1433
caught and propagates up to the caller. In that case, the list
1434
of warnings is not available.
1436
:returns: ([warning_object, ...], fn_result)
1438
# XXX: This is not perfect, because it completely overrides the
1439
# warnings filters, and some code may depend on suppressing particular
1440
# warnings. It's the easiest way to insulate ourselves from -Werror,
1441
# though. -- Andrew, 20071062
1443
def _catcher(message, category, filename, lineno, file=None, line=None):
1444
# despite the name, 'message' is normally(?) a Warning subclass
1446
wlist.append(message)
1447
saved_showwarning = warnings.showwarning
1448
saved_filters = warnings.filters
1450
warnings.showwarning = _catcher
1451
warnings.filters = []
1452
result = fn(*args, **kw)
1454
warnings.showwarning = saved_showwarning
1455
warnings.filters = saved_filters
1456
return wlist, result
1458
def callDeprecated(self, expected, callable, *args, **kwargs):
1459
"""Assert that a callable is deprecated in a particular way.
1461
This is a very precise test for unusual requirements. The
1462
applyDeprecated helper function is probably more suited for most tests
1463
as it allows you to simply specify the deprecation format being used
1464
and will ensure that that is issued for the function being called.
1466
Note that this only captures warnings raised by symbol_versioning.warn,
1467
not other callers that go direct to the warning module. To catch
1468
general warnings, use callCatchWarnings.
1470
:param expected: a list of the deprecation warnings expected, in order
1471
:param callable: The callable to call
1472
:param args: The positional arguments for the callable
1473
:param kwargs: The keyword arguments for the callable
1475
call_warnings, result = self._capture_deprecation_warnings(callable,
1477
self.assertEqual(expected, call_warnings)
1480
def _startLogFile(self):
1481
"""Send bzr and test log messages to a temporary file.
1483
The file is removed as the test is torn down.
1485
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1486
self._log_file = os.fdopen(fileno, 'w+')
1487
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1488
self._log_file_name = name
1489
self.addCleanup(self._finishLogFile)
1491
def _finishLogFile(self):
1492
"""Finished with the log file.
1494
Close the file and delete it, unless setKeepLogfile was called.
1496
if self._log_file is None:
1498
bzrlib.trace.pop_log_file(self._log_memento)
1499
self._log_file.close()
1500
self._log_file = None
1501
if not self._keep_log_file:
1502
os.remove(self._log_file_name)
1503
self._log_file_name = None
1505
def setKeepLogfile(self):
1506
"""Make the logfile not be deleted when _finishLogFile is called."""
1507
self._keep_log_file = True
1509
def thisFailsStrictLockCheck(self):
1510
"""It is known that this test would fail with -Dstrict_locks.
1512
By default, all tests are run with strict lock checking unless
1513
-Edisable_lock_checks is supplied. However there are some tests which
1514
we know fail strict locks at this point that have not been fixed.
1515
They should call this function to disable the strict checking.
1517
This should be used sparingly, it is much better to fix the locking
1518
issues rather than papering over the problem by calling this function.
1520
debug.debug_flags.discard('strict_locks')
1522
def addCleanup(self, callable, *args, **kwargs):
1523
"""Arrange to run a callable when this case is torn down.
1525
Callables are run in the reverse of the order they are registered,
1526
ie last-in first-out.
1528
self._cleanups.append((callable, args, kwargs))
1530
def _cleanEnvironment(self):
1532
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1533
'HOME': os.getcwd(),
1534
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1535
# tests do check our impls match APPDATA
1536
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1540
'BZREMAIL': None, # may still be present in the environment
1542
'BZR_PROGRESS_BAR': None,
1544
'BZR_PLUGIN_PATH': None,
1545
# Make sure that any text ui tests are consistent regardless of
1546
# the environment the test case is run in; you may want tests that
1547
# test other combinations. 'dumb' is a reasonable guess for tests
1548
# going to a pipe or a StringIO.
1553
'SSH_AUTH_SOCK': None,
1557
'https_proxy': None,
1558
'HTTPS_PROXY': None,
1563
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1564
# least. If you do (care), please update this comment
1568
'BZR_REMOTE_PATH': None,
1571
self.addCleanup(self._restoreEnvironment)
1572
for name, value in new_env.iteritems():
1573
self._captureVar(name, value)
1575
def _captureVar(self, name, newvalue):
1576
"""Set an environment variable, and reset it when finished."""
1577
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1579
def _restore_debug_flags(self):
1580
debug.debug_flags.clear()
1581
debug.debug_flags.update(self._preserved_debug_flags)
1583
def _restoreEnvironment(self):
1584
for name, value in self.__old_env.iteritems():
1585
osutils.set_or_unset_env(name, value)
1587
def _restoreHooks(self):
1588
for klass, (name, hooks) in self._preserved_hooks.items():
1589
setattr(klass, name, hooks)
1591
def knownFailure(self, reason):
1592
"""This test has failed for some known reason."""
1593
raise KnownFailure(reason)
1595
def _do_skip(self, result, reason):
1596
addSkip = getattr(result, 'addSkip', None)
1597
if not callable(addSkip):
1598
result.addError(self, sys.exc_info())
1600
addSkip(self, reason)
1602
def run(self, result=None):
1603
if result is None: result = self.defaultTestResult()
1604
for feature in getattr(self, '_test_needs_features', []):
1605
if not feature.available():
1606
result.startTest(self)
1607
if getattr(result, 'addNotSupported', None):
1608
result.addNotSupported(self, feature)
1610
result.addSuccess(self)
1611
result.stopTest(self)
1615
result.startTest(self)
1616
absent_attr = object()
1618
method_name = getattr(self, '_testMethodName', absent_attr)
1619
if method_name is absent_attr:
1621
method_name = getattr(self, '_TestCase__testMethodName')
1622
testMethod = getattr(self, method_name)
1626
if not self._bzr_test_setUp_run:
1628
"test setUp did not invoke "
1629
"bzrlib.tests.TestCase's setUp")
1630
except KeyboardInterrupt:
1633
except TestSkipped, e:
1634
self._do_skip(result, e.args[0])
1638
result.addError(self, sys.exc_info())
1646
except self.failureException:
1647
result.addFailure(self, sys.exc_info())
1648
except TestSkipped, e:
1650
reason = "No reason given."
1653
self._do_skip(result, reason)
1654
except KeyboardInterrupt:
1658
result.addError(self, sys.exc_info())
1662
if not self._bzr_test_tearDown_run:
1664
"test tearDown did not invoke "
1665
"bzrlib.tests.TestCase's tearDown")
1666
except KeyboardInterrupt:
1670
result.addError(self, sys.exc_info())
1673
if ok: result.addSuccess(self)
1675
result.stopTest(self)
1677
except TestNotApplicable:
1678
# Not moved from the result [yet].
1681
except KeyboardInterrupt:
1686
for attr_name in self.attrs_to_keep:
1687
if attr_name in self.__dict__:
1688
saved_attrs[attr_name] = self.__dict__[attr_name]
1689
self.__dict__ = saved_attrs
1693
self._log_contents = ''
1694
self._bzr_test_tearDown_run = True
1695
unittest.TestCase.tearDown(self)
1697
def time(self, callable, *args, **kwargs):
1698
"""Run callable and accrue the time it takes to the benchmark time.
1700
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1701
this will cause lsprofile statistics to be gathered and stored in
1704
if self._benchtime is None:
1708
if not self._gather_lsprof_in_benchmarks:
1709
return callable(*args, **kwargs)
1711
# record this benchmark
1712
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1714
self._benchcalls.append(((callable, args, kwargs), stats))
1717
self._benchtime += time.time() - start
1719
def _runCleanups(self):
1720
"""Run registered cleanup functions.
1722
This should only be called from TestCase.tearDown.
1724
# TODO: Perhaps this should keep running cleanups even if
1725
# one of them fails?
1727
# Actually pop the cleanups from the list so tearDown running
1728
# twice is safe (this happens for skipped tests).
1729
while self._cleanups:
1730
cleanup, args, kwargs = self._cleanups.pop()
1731
cleanup(*args, **kwargs)
1733
def log(self, *args):
1736
def _get_log(self, keep_log_file=False):
1737
"""Get the log from bzrlib.trace calls from this test.
1739
:param keep_log_file: When True, if the log is still a file on disk
1740
leave it as a file on disk. When False, if the log is still a file
1741
on disk, the log file is deleted and the log preserved as
1743
:return: A string containing the log.
1745
# flush the log file, to get all content
1747
if bzrlib.trace._trace_file:
1748
bzrlib.trace._trace_file.flush()
1749
if self._log_contents:
1750
# XXX: this can hardly contain the content flushed above --vila
1752
return self._log_contents
1753
if self._log_file_name is not None:
1754
logfile = open(self._log_file_name)
1756
log_contents = logfile.read()
1759
if not keep_log_file:
1760
self._log_contents = log_contents
1762
os.remove(self._log_file_name)
1764
if sys.platform == 'win32' and e.errno == errno.EACCES:
1765
sys.stderr.write(('Unable to delete log file '
1766
' %r\n' % self._log_file_name))
1771
return "DELETED log file to reduce memory footprint"
1773
def requireFeature(self, feature):
1774
"""This test requires a specific feature is available.
1776
:raises UnavailableFeature: When feature is not available.
1778
if not feature.available():
1779
raise UnavailableFeature(feature)
1781
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1783
"""Run bazaar command line, splitting up a string command line."""
1784
if isinstance(args, basestring):
1785
# shlex don't understand unicode strings,
1786
# so args should be plain string (bialix 20070906)
1787
args = list(shlex.split(str(args)))
1788
return self._run_bzr_core(args, retcode=retcode,
1789
encoding=encoding, stdin=stdin, working_dir=working_dir,
1792
def _run_bzr_core(self, args, retcode, encoding, stdin,
1794
# Clear chk_map page cache, because the contents are likely to mask
1796
chk_map.clear_cache()
1797
if encoding is None:
1798
encoding = osutils.get_user_encoding()
1799
stdout = StringIOWrapper()
1800
stderr = StringIOWrapper()
1801
stdout.encoding = encoding
1802
stderr.encoding = encoding
1804
self.log('run bzr: %r', args)
1805
# FIXME: don't call into logging here
1806
handler = logging.StreamHandler(stderr)
1807
handler.setLevel(logging.INFO)
1808
logger = logging.getLogger('')
1809
logger.addHandler(handler)
1810
old_ui_factory = ui.ui_factory
1811
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1814
if working_dir is not None:
1815
cwd = osutils.getcwd()
1816
os.chdir(working_dir)
1819
result = self.apply_redirected(ui.ui_factory.stdin,
1821
bzrlib.commands.run_bzr_catch_user_errors,
1824
logger.removeHandler(handler)
1825
ui.ui_factory = old_ui_factory
1829
out = stdout.getvalue()
1830
err = stderr.getvalue()
1832
self.log('output:\n%r', out)
1834
self.log('errors:\n%r', err)
1835
if retcode is not None:
1836
self.assertEquals(retcode, result,
1837
message='Unexpected return code')
1838
return result, out, err
1840
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1841
working_dir=None, error_regexes=[], output_encoding=None):
1842
"""Invoke bzr, as if it were run from the command line.
1844
The argument list should not include the bzr program name - the
1845
first argument is normally the bzr command. Arguments may be
1846
passed in three ways:
1848
1- A list of strings, eg ["commit", "a"]. This is recommended
1849
when the command contains whitespace or metacharacters, or
1850
is built up at run time.
1852
2- A single string, eg "add a". This is the most convenient
1853
for hardcoded commands.
1855
This runs bzr through the interface that catches and reports
1856
errors, and with logging set to something approximating the
1857
default, so that error reporting can be checked.
1859
This should be the main method for tests that want to exercise the
1860
overall behavior of the bzr application (rather than a unit test
1861
or a functional test of the library.)
1863
This sends the stdout/stderr results into the test's log,
1864
where it may be useful for debugging. See also run_captured.
1866
:keyword stdin: A string to be used as stdin for the command.
1867
:keyword retcode: The status code the command should return;
1869
:keyword working_dir: The directory to run the command in
1870
:keyword error_regexes: A list of expected error messages. If
1871
specified they must be seen in the error output of the command.
1873
retcode, out, err = self._run_bzr_autosplit(
1878
working_dir=working_dir,
1880
self.assertIsInstance(error_regexes, (list, tuple))
1881
for regex in error_regexes:
1882
self.assertContainsRe(err, regex)
1885
def run_bzr_error(self, error_regexes, *args, **kwargs):
1886
"""Run bzr, and check that stderr contains the supplied regexes
1888
:param error_regexes: Sequence of regular expressions which
1889
must each be found in the error output. The relative ordering
1891
:param args: command-line arguments for bzr
1892
:param kwargs: Keyword arguments which are interpreted by run_bzr
1893
This function changes the default value of retcode to be 3,
1894
since in most cases this is run when you expect bzr to fail.
1896
:return: (out, err) The actual output of running the command (in case
1897
you want to do more inspection)
1901
# Make sure that commit is failing because there is nothing to do
1902
self.run_bzr_error(['no changes to commit'],
1903
['commit', '-m', 'my commit comment'])
1904
# Make sure --strict is handling an unknown file, rather than
1905
# giving us the 'nothing to do' error
1906
self.build_tree(['unknown'])
1907
self.run_bzr_error(['Commit refused because there are unknown files'],
1908
['commit', --strict', '-m', 'my commit comment'])
1910
kwargs.setdefault('retcode', 3)
1911
kwargs['error_regexes'] = error_regexes
1912
out, err = self.run_bzr(*args, **kwargs)
1915
def run_bzr_subprocess(self, *args, **kwargs):
1916
"""Run bzr in a subprocess for testing.
1918
This starts a new Python interpreter and runs bzr in there.
1919
This should only be used for tests that have a justifiable need for
1920
this isolation: e.g. they are testing startup time, or signal
1921
handling, or early startup code, etc. Subprocess code can't be
1922
profiled or debugged so easily.
1924
:keyword retcode: The status code that is expected. Defaults to 0. If
1925
None is supplied, the status code is not checked.
1926
:keyword env_changes: A dictionary which lists changes to environment
1927
variables. A value of None will unset the env variable.
1928
The values must be strings. The change will only occur in the
1929
child, so you don't need to fix the environment after running.
1930
:keyword universal_newlines: Convert CRLF => LF
1931
:keyword allow_plugins: By default the subprocess is run with
1932
--no-plugins to ensure test reproducibility. Also, it is possible
1933
for system-wide plugins to create unexpected output on stderr,
1934
which can cause unnecessary test failures.
1936
env_changes = kwargs.get('env_changes', {})
1937
working_dir = kwargs.get('working_dir', None)
1938
allow_plugins = kwargs.get('allow_plugins', False)
1940
if isinstance(args[0], list):
1942
elif isinstance(args[0], basestring):
1943
args = list(shlex.split(args[0]))
1945
raise ValueError("passing varargs to run_bzr_subprocess")
1946
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1947
working_dir=working_dir,
1948
allow_plugins=allow_plugins)
1949
# We distinguish between retcode=None and retcode not passed.
1950
supplied_retcode = kwargs.get('retcode', 0)
1951
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1952
universal_newlines=kwargs.get('universal_newlines', False),
1955
def start_bzr_subprocess(self, process_args, env_changes=None,
1956
skip_if_plan_to_signal=False,
1958
allow_plugins=False):
1959
"""Start bzr in a subprocess for testing.
1961
This starts a new Python interpreter and runs bzr in there.
1962
This should only be used for tests that have a justifiable need for
1963
this isolation: e.g. they are testing startup time, or signal
1964
handling, or early startup code, etc. Subprocess code can't be
1965
profiled or debugged so easily.
1967
:param process_args: a list of arguments to pass to the bzr executable,
1968
for example ``['--version']``.
1969
:param env_changes: A dictionary which lists changes to environment
1970
variables. A value of None will unset the env variable.
1971
The values must be strings. The change will only occur in the
1972
child, so you don't need to fix the environment after running.
1973
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1975
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1977
:returns: Popen object for the started process.
1979
if skip_if_plan_to_signal:
1980
if not getattr(os, 'kill', None):
1981
raise TestSkipped("os.kill not available.")
1983
if env_changes is None:
1987
def cleanup_environment():
1988
for env_var, value in env_changes.iteritems():
1989
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1991
def restore_environment():
1992
for env_var, value in old_env.iteritems():
1993
osutils.set_or_unset_env(env_var, value)
1995
bzr_path = self.get_bzr_path()
1998
if working_dir is not None:
1999
cwd = osutils.getcwd()
2000
os.chdir(working_dir)
2003
# win32 subprocess doesn't support preexec_fn
2004
# so we will avoid using it on all platforms, just to
2005
# make sure the code path is used, and we don't break on win32
2006
cleanup_environment()
2007
command = [sys.executable]
2008
# frozen executables don't need the path to bzr
2009
if getattr(sys, "frozen", None) is None:
2010
command.append(bzr_path)
2011
if not allow_plugins:
2012
command.append('--no-plugins')
2013
command.extend(process_args)
2014
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
2016
restore_environment()
2022
def _popen(self, *args, **kwargs):
2023
"""Place a call to Popen.
2025
Allows tests to override this method to intercept the calls made to
2026
Popen for introspection.
2028
return Popen(*args, **kwargs)
2030
def get_source_path(self):
2031
"""Return the path of the directory containing bzrlib."""
2032
return os.path.dirname(os.path.dirname(bzrlib.__file__))
2034
def get_bzr_path(self):
2035
"""Return the path of the 'bzr' executable for this test suite."""
2036
bzr_path = self.get_source_path()+'/bzr'
2037
if not os.path.isfile(bzr_path):
2038
# We are probably installed. Assume sys.argv is the right file
2039
bzr_path = sys.argv[0]
2042
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2043
universal_newlines=False, process_args=None):
2044
"""Finish the execution of process.
2046
:param process: the Popen object returned from start_bzr_subprocess.
2047
:param retcode: The status code that is expected. Defaults to 0. If
2048
None is supplied, the status code is not checked.
2049
:param send_signal: an optional signal to send to the process.
2050
:param universal_newlines: Convert CRLF => LF
2051
:returns: (stdout, stderr)
2053
if send_signal is not None:
2054
os.kill(process.pid, send_signal)
2055
out, err = process.communicate()
2057
if universal_newlines:
2058
out = out.replace('\r\n', '\n')
2059
err = err.replace('\r\n', '\n')
2061
if retcode is not None and retcode != process.returncode:
2062
if process_args is None:
2063
process_args = "(unknown args)"
2064
mutter('Output of bzr %s:\n%s', process_args, out)
2065
mutter('Error for bzr %s:\n%s', process_args, err)
2066
self.fail('Command bzr %s failed with retcode %s != %s'
2067
% (process_args, retcode, process.returncode))
2070
def check_inventory_shape(self, inv, shape):
2071
"""Compare an inventory to a list of expected names.
2073
Fail if they are not precisely equal.
2076
shape = list(shape) # copy
2077
for path, ie in inv.entries():
2078
name = path.replace('\\', '/')
2079
if ie.kind == 'directory':
2086
self.fail("expected paths not found in inventory: %r" % shape)
2088
self.fail("unexpected paths found in inventory: %r" % extras)
2090
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2091
a_callable=None, *args, **kwargs):
2092
"""Call callable with redirected std io pipes.
2094
Returns the return code."""
2095
if not callable(a_callable):
2096
raise ValueError("a_callable must be callable.")
2098
stdin = StringIO("")
2100
if getattr(self, "_log_file", None) is not None:
2101
stdout = self._log_file
2105
if getattr(self, "_log_file", None is not None):
2106
stderr = self._log_file
2109
real_stdin = sys.stdin
2110
real_stdout = sys.stdout
2111
real_stderr = sys.stderr
2116
return a_callable(*args, **kwargs)
2118
sys.stdout = real_stdout
2119
sys.stderr = real_stderr
2120
sys.stdin = real_stdin
2122
def reduceLockdirTimeout(self):
2123
"""Reduce the default lock timeout for the duration of the test, so that
2124
if LockContention occurs during a test, it does so quickly.
2126
Tests that expect to provoke LockContention errors should call this.
2128
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2130
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2131
self.addCleanup(resetTimeout)
2132
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2134
def make_utf8_encoded_stringio(self, encoding_type=None):
2135
"""Return a StringIOWrapper instance, that will encode Unicode
2138
if encoding_type is None:
2139
encoding_type = 'strict'
2141
output_encoding = 'utf-8'
2142
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2143
sio.encoding = output_encoding
2146
def disable_verb(self, verb):
2147
"""Disable a smart server verb for one test."""
2148
from bzrlib.smart import request
2149
request_handlers = request.request_handlers
2150
orig_method = request_handlers.get(verb)
2151
request_handlers.remove(verb)
2153
request_handlers.register(verb, orig_method)
2154
self.addCleanup(restoreVerb)
2157
class CapturedCall(object):
2158
"""A helper for capturing smart server calls for easy debug analysis."""
2160
def __init__(self, params, prefix_length):
2161
"""Capture the call with params and skip prefix_length stack frames."""
2164
# The last 5 frames are the __init__, the hook frame, and 3 smart
2165
# client frames. Beyond this we could get more clever, but this is good
2167
stack = traceback.extract_stack()[prefix_length:-5]
2168
self.stack = ''.join(traceback.format_list(stack))
2171
return self.call.method
2174
return self.call.method
2180
class TestCaseWithMemoryTransport(TestCase):
2181
"""Common test class for tests that do not need disk resources.
2183
Tests that need disk resources should derive from TestCaseWithTransport.
2185
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2187
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2188
a directory which does not exist. This serves to help ensure test isolation
2189
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2190
must exist. However, TestCaseWithMemoryTransport does not offer local
2191
file defaults for the transport in tests, nor does it obey the command line
2192
override, so tests that accidentally write to the common directory should
2195
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2196
a .bzr directory that stops us ascending higher into the filesystem.
2202
def __init__(self, methodName='runTest'):
2203
# allow test parameterization after test construction and before test
2204
# execution. Variables that the parameterizer sets need to be
2205
# ones that are not set by setUp, or setUp will trash them.
2206
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2207
self.vfs_transport_factory = default_transport
2208
self.transport_server = None
2209
self.transport_readonly_server = None
2210
self.__vfs_server = None
2212
def get_transport(self, relpath=None):
2213
"""Return a writeable transport.
2215
This transport is for the test scratch space relative to
2218
:param relpath: a path relative to the base url.
2220
t = get_transport(self.get_url(relpath))
2221
self.assertFalse(t.is_readonly())
2224
def get_readonly_transport(self, relpath=None):
2225
"""Return a readonly transport for the test scratch space
2227
This can be used to test that operations which should only need
2228
readonly access in fact do not try to write.
2230
:param relpath: a path relative to the base url.
2232
t = get_transport(self.get_readonly_url(relpath))
2233
self.assertTrue(t.is_readonly())
2236
def create_transport_readonly_server(self):
2237
"""Create a transport server from class defined at init.
2239
This is mostly a hook for daughter classes.
2241
return self.transport_readonly_server()
2243
def get_readonly_server(self):
2244
"""Get the server instance for the readonly transport
2246
This is useful for some tests with specific servers to do diagnostics.
2248
if self.__readonly_server is None:
2249
if self.transport_readonly_server is None:
2250
# readonly decorator requested
2251
self.__readonly_server = ReadonlyServer()
2253
# explicit readonly transport.
2254
self.__readonly_server = self.create_transport_readonly_server()
2255
self.start_server(self.__readonly_server,
2256
self.get_vfs_only_server())
2257
return self.__readonly_server
2259
def get_readonly_url(self, relpath=None):
2260
"""Get a URL for the readonly transport.
2262
This will either be backed by '.' or a decorator to the transport
2263
used by self.get_url()
2264
relpath provides for clients to get a path relative to the base url.
2265
These should only be downwards relative, not upwards.
2267
base = self.get_readonly_server().get_url()
2268
return self._adjust_url(base, relpath)
2270
def get_vfs_only_server(self):
2271
"""Get the vfs only read/write server instance.
2273
This is useful for some tests with specific servers that need
2276
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2277
is no means to override it.
2279
if self.__vfs_server is None:
2280
self.__vfs_server = MemoryServer()
2281
self.start_server(self.__vfs_server)
2282
return self.__vfs_server
2284
def get_server(self):
2285
"""Get the read/write server instance.
2287
This is useful for some tests with specific servers that need
2290
This is built from the self.transport_server factory. If that is None,
2291
then the self.get_vfs_server is returned.
2293
if self.__server is None:
2294
if (self.transport_server is None or self.transport_server is
2295
self.vfs_transport_factory):
2296
self.__server = self.get_vfs_only_server()
2298
# bring up a decorated means of access to the vfs only server.
2299
self.__server = self.transport_server()
2300
self.start_server(self.__server, self.get_vfs_only_server())
2301
return self.__server
2303
def _adjust_url(self, base, relpath):
2304
"""Get a URL (or maybe a path) for the readwrite transport.
2306
This will either be backed by '.' or to an equivalent non-file based
2308
relpath provides for clients to get a path relative to the base url.
2309
These should only be downwards relative, not upwards.
2311
if relpath is not None and relpath != '.':
2312
if not base.endswith('/'):
2314
# XXX: Really base should be a url; we did after all call
2315
# get_url()! But sometimes it's just a path (from
2316
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2317
# to a non-escaped local path.
2318
if base.startswith('./') or base.startswith('/'):
2321
base += urlutils.escape(relpath)
2324
def get_url(self, relpath=None):
2325
"""Get a URL (or maybe a path) for the readwrite transport.
2327
This will either be backed by '.' or to an equivalent non-file based
2329
relpath provides for clients to get a path relative to the base url.
2330
These should only be downwards relative, not upwards.
2332
base = self.get_server().get_url()
2333
return self._adjust_url(base, relpath)
2335
def get_vfs_only_url(self, relpath=None):
2336
"""Get a URL (or maybe a path for the plain old vfs transport.
2338
This will never be a smart protocol. It always has all the
2339
capabilities of the local filesystem, but it might actually be a
2340
MemoryTransport or some other similar virtual filesystem.
2342
This is the backing transport (if any) of the server returned by
2343
get_url and get_readonly_url.
2345
:param relpath: provides for clients to get a path relative to the base
2346
url. These should only be downwards relative, not upwards.
2349
base = self.get_vfs_only_server().get_url()
2350
return self._adjust_url(base, relpath)
2352
def _create_safety_net(self):
2353
"""Make a fake bzr directory.
2355
This prevents any tests propagating up onto the TEST_ROOT directory's
2358
root = TestCaseWithMemoryTransport.TEST_ROOT
2359
bzrdir.BzrDir.create_standalone_workingtree(root)
2361
def _check_safety_net(self):
2362
"""Check that the safety .bzr directory have not been touched.
2364
_make_test_root have created a .bzr directory to prevent tests from
2365
propagating. This method ensures than a test did not leaked.
2367
root = TestCaseWithMemoryTransport.TEST_ROOT
2368
self.permit_url(get_transport(root).base)
2369
wt = workingtree.WorkingTree.open(root)
2370
last_rev = wt.last_revision()
2371
if last_rev != 'null:':
2372
# The current test have modified the /bzr directory, we need to
2373
# recreate a new one or all the followng tests will fail.
2374
# If you need to inspect its content uncomment the following line
2375
# import pdb; pdb.set_trace()
2376
_rmtree_temp_dir(root + '/.bzr')
2377
self._create_safety_net()
2378
raise AssertionError('%s/.bzr should not be modified' % root)
2380
def _make_test_root(self):
2381
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2382
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2383
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2385
TestCaseWithMemoryTransport.TEST_ROOT = root
2387
self._create_safety_net()
2389
# The same directory is used by all tests, and we're not
2390
# specifically told when all tests are finished. This will do.
2391
atexit.register(_rmtree_temp_dir, root)
2393
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2394
self.addCleanup(self._check_safety_net)
2396
def makeAndChdirToTestDir(self):
2397
"""Create a temporary directories for this one test.
2399
This must set self.test_home_dir and self.test_dir and chdir to
2402
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2404
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2405
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2406
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2407
self.permit_dir(self.test_dir)
2409
def make_branch(self, relpath, format=None):
2410
"""Create a branch on the transport at relpath."""
2411
repo = self.make_repository(relpath, format=format)
2412
return repo.bzrdir.create_branch()
2414
def make_bzrdir(self, relpath, format=None):
2416
# might be a relative or absolute path
2417
maybe_a_url = self.get_url(relpath)
2418
segments = maybe_a_url.rsplit('/', 1)
2419
t = get_transport(maybe_a_url)
2420
if len(segments) > 1 and segments[-1] not in ('', '.'):
2424
if isinstance(format, basestring):
2425
format = bzrdir.format_registry.make_bzrdir(format)
2426
return format.initialize_on_transport(t)
2427
except errors.UninitializableFormat:
2428
raise TestSkipped("Format %s is not initializable." % format)
2430
def make_repository(self, relpath, shared=False, format=None):
2431
"""Create a repository on our default transport at relpath.
2433
Note that relpath must be a relative path, not a full url.
2435
# FIXME: If you create a remoterepository this returns the underlying
2436
# real format, which is incorrect. Actually we should make sure that
2437
# RemoteBzrDir returns a RemoteRepository.
2438
# maybe mbp 20070410
2439
made_control = self.make_bzrdir(relpath, format=format)
2440
return made_control.create_repository(shared=shared)
2442
def make_smart_server(self, path):
2443
smart_server = server.SmartTCPServer_for_testing()
2444
self.start_server(smart_server, self.get_server())
2445
remote_transport = get_transport(smart_server.get_url()).clone(path)
2446
return remote_transport
2448
def make_branch_and_memory_tree(self, relpath, format=None):
2449
"""Create a branch on the default transport and a MemoryTree for it."""
2450
b = self.make_branch(relpath, format=format)
2451
return memorytree.MemoryTree.create_on_branch(b)
2453
def make_branch_builder(self, relpath, format=None):
2454
branch = self.make_branch(relpath, format=format)
2455
return branchbuilder.BranchBuilder(branch=branch)
2457
def overrideEnvironmentForTesting(self):
2458
os.environ['HOME'] = self.test_home_dir
2459
os.environ['BZR_HOME'] = self.test_home_dir
2462
super(TestCaseWithMemoryTransport, self).setUp()
2463
self._make_test_root()
2464
_currentdir = os.getcwdu()
2465
def _leaveDirectory():
2466
os.chdir(_currentdir)
2467
self.addCleanup(_leaveDirectory)
2468
self.makeAndChdirToTestDir()
2469
self.overrideEnvironmentForTesting()
2470
self.__readonly_server = None
2471
self.__server = None
2472
self.reduceLockdirTimeout()
2474
def setup_smart_server_with_call_log(self):
2475
"""Sets up a smart server as the transport server with a call log."""
2476
self.transport_server = server.SmartTCPServer_for_testing
2477
self.hpss_calls = []
2479
# Skip the current stack down to the caller of
2480
# setup_smart_server_with_call_log
2481
prefix_length = len(traceback.extract_stack()) - 2
2482
def capture_hpss_call(params):
2483
self.hpss_calls.append(
2484
CapturedCall(params, prefix_length))
2485
client._SmartClient.hooks.install_named_hook(
2486
'call', capture_hpss_call, None)
2488
def reset_smart_call_log(self):
2489
self.hpss_calls = []
2492
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2493
"""Derived class that runs a test within a temporary directory.
2495
This is useful for tests that need to create a branch, etc.
2497
The directory is created in a slightly complex way: for each
2498
Python invocation, a new temporary top-level directory is created.
2499
All test cases create their own directory within that. If the
2500
tests complete successfully, the directory is removed.
2502
:ivar test_base_dir: The path of the top-level directory for this
2503
test, which contains a home directory and a work directory.
2505
:ivar test_home_dir: An initially empty directory under test_base_dir
2506
which is used as $HOME for this test.
2508
:ivar test_dir: A directory under test_base_dir used as the current
2509
directory when the test proper is run.
2512
OVERRIDE_PYTHON = 'python'
2514
def check_file_contents(self, filename, expect):
2515
self.log("check contents of file %s" % filename)
2516
contents = file(filename, 'r').read()
2517
if contents != expect:
2518
self.log("expected: %r" % expect)
2519
self.log("actually: %r" % contents)
2520
self.fail("contents of %s not as expected" % filename)
2522
def _getTestDirPrefix(self):
2523
# create a directory within the top level test directory
2524
if sys.platform in ('win32', 'cygwin'):
2525
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2526
# windows is likely to have path-length limits so use a short name
2527
name_prefix = name_prefix[-30:]
2529
name_prefix = re.sub('[/]', '_', self.id())
2532
def makeAndChdirToTestDir(self):
2533
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2535
For TestCaseInTempDir we create a temporary directory based on the test
2536
name and then create two subdirs - test and home under it.
2538
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2539
self._getTestDirPrefix())
2541
for i in range(100):
2542
if os.path.exists(name):
2543
name = name_prefix + '_' + str(i)
2545
# now create test and home directories within this dir
2546
self.test_base_dir = name
2547
self.addCleanup(self.deleteTestDir)
2548
os.mkdir(self.test_base_dir)
2550
self.permit_dir(self.test_base_dir)
2551
# 'sprouting' and 'init' of a branch both walk up the tree to find
2552
# stacking policy to honour; create a bzr dir with an unshared
2553
# repository (but not a branch - our code would be trying to escape
2554
# then!) to stop them, and permit it to be read.
2555
# control = bzrdir.BzrDir.create(self.test_base_dir)
2556
# control.create_repository()
2557
self.test_home_dir = self.test_base_dir + '/home'
2558
os.mkdir(self.test_home_dir)
2559
self.test_dir = self.test_base_dir + '/work'
2560
os.mkdir(self.test_dir)
2561
os.chdir(self.test_dir)
2562
# put name of test inside
2563
f = file(self.test_base_dir + '/name', 'w')
2569
def deleteTestDir(self):
2570
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2571
_rmtree_temp_dir(self.test_base_dir)
2573
def build_tree(self, shape, line_endings='binary', transport=None):
2574
"""Build a test tree according to a pattern.
2576
shape is a sequence of file specifications. If the final
2577
character is '/', a directory is created.
2579
This assumes that all the elements in the tree being built are new.
2581
This doesn't add anything to a branch.
2583
:type shape: list or tuple.
2584
:param line_endings: Either 'binary' or 'native'
2585
in binary mode, exact contents are written in native mode, the
2586
line endings match the default platform endings.
2587
:param transport: A transport to write to, for building trees on VFS's.
2588
If the transport is readonly or None, "." is opened automatically.
2591
if type(shape) not in (list, tuple):
2592
raise AssertionError("Parameter 'shape' should be "
2593
"a list or a tuple. Got %r instead" % (shape,))
2594
# It's OK to just create them using forward slashes on windows.
2595
if transport is None or transport.is_readonly():
2596
transport = get_transport(".")
2598
self.assertIsInstance(name, basestring)
2600
transport.mkdir(urlutils.escape(name[:-1]))
2602
if line_endings == 'binary':
2604
elif line_endings == 'native':
2607
raise errors.BzrError(
2608
'Invalid line ending request %r' % line_endings)
2609
content = "contents of %s%s" % (name.encode('utf-8'), end)
2610
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2612
def build_tree_contents(self, shape):
2613
build_tree_contents(shape)
2615
def assertInWorkingTree(self, path, root_path='.', tree=None):
2616
"""Assert whether path or paths are in the WorkingTree"""
2618
tree = workingtree.WorkingTree.open(root_path)
2619
if not isinstance(path, basestring):
2621
self.assertInWorkingTree(p, tree=tree)
2623
self.assertIsNot(tree.path2id(path), None,
2624
path+' not in working tree.')
2626
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2627
"""Assert whether path or paths are not in the WorkingTree"""
2629
tree = workingtree.WorkingTree.open(root_path)
2630
if not isinstance(path, basestring):
2632
self.assertNotInWorkingTree(p,tree=tree)
2634
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2637
class TestCaseWithTransport(TestCaseInTempDir):
2638
"""A test case that provides get_url and get_readonly_url facilities.
2640
These back onto two transport servers, one for readonly access and one for
2643
If no explicit class is provided for readonly access, a
2644
ReadonlyTransportDecorator is used instead which allows the use of non disk
2645
based read write transports.
2647
If an explicit class is provided for readonly access, that server and the
2648
readwrite one must both define get_url() as resolving to os.getcwd().
2651
def get_vfs_only_server(self):
2652
"""See TestCaseWithMemoryTransport.
2654
This is useful for some tests with specific servers that need
2657
if self.__vfs_server is None:
2658
self.__vfs_server = self.vfs_transport_factory()
2659
self.start_server(self.__vfs_server)
2660
return self.__vfs_server
2662
def make_branch_and_tree(self, relpath, format=None):
2663
"""Create a branch on the transport and a tree locally.
2665
If the transport is not a LocalTransport, the Tree can't be created on
2666
the transport. In that case if the vfs_transport_factory is
2667
LocalURLServer the working tree is created in the local
2668
directory backing the transport, and the returned tree's branch and
2669
repository will also be accessed locally. Otherwise a lightweight
2670
checkout is created and returned.
2672
We do this because we can't physically create a tree in the local
2673
path, with a branch reference to the transport_factory url, and
2674
a branch + repository in the vfs_transport, unless the vfs_transport
2675
namespace is distinct from the local disk - the two branch objects
2676
would collide. While we could construct a tree with its branch object
2677
pointing at the transport_factory transport in memory, reopening it
2678
would behaving unexpectedly, and has in the past caused testing bugs
2679
when we tried to do it that way.
2681
:param format: The BzrDirFormat.
2682
:returns: the WorkingTree.
2684
# TODO: always use the local disk path for the working tree,
2685
# this obviously requires a format that supports branch references
2686
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2688
b = self.make_branch(relpath, format=format)
2690
return b.bzrdir.create_workingtree()
2691
except errors.NotLocalUrl:
2692
# We can only make working trees locally at the moment. If the
2693
# transport can't support them, then we keep the non-disk-backed
2694
# branch and create a local checkout.
2695
if self.vfs_transport_factory is LocalURLServer:
2696
# the branch is colocated on disk, we cannot create a checkout.
2697
# hopefully callers will expect this.
2698
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2699
wt = local_controldir.create_workingtree()
2700
if wt.branch._format != b._format:
2702
# Make sure that assigning to wt._branch fixes wt.branch,
2703
# in case the implementation details of workingtree objects
2705
self.assertIs(b, wt.branch)
2708
return b.create_checkout(relpath, lightweight=True)
2710
def assertIsDirectory(self, relpath, transport):
2711
"""Assert that relpath within transport is a directory.
2713
This may not be possible on all transports; in that case it propagates
2714
a TransportNotPossible.
2717
mode = transport.stat(relpath).st_mode
2718
except errors.NoSuchFile:
2719
self.fail("path %s is not a directory; no such file"
2721
if not stat.S_ISDIR(mode):
2722
self.fail("path %s is not a directory; has mode %#o"
2725
def assertTreesEqual(self, left, right):
2726
"""Check that left and right have the same content and properties."""
2727
# we use a tree delta to check for equality of the content, and we
2728
# manually check for equality of other things such as the parents list.
2729
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2730
differences = left.changes_from(right)
2731
self.assertFalse(differences.has_changed(),
2732
"Trees %r and %r are different: %r" % (left, right, differences))
2735
super(TestCaseWithTransport, self).setUp()
2736
self.__vfs_server = None
2738
def disable_missing_extensions_warning(self):
2739
"""Some tests expect a precise stderr content.
2741
There is no point in forcing them to duplicate the extension related
2744
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2747
class ChrootedTestCase(TestCaseWithTransport):
2748
"""A support class that provides readonly urls outside the local namespace.
2750
This is done by checking if self.transport_server is a MemoryServer. if it
2751
is then we are chrooted already, if it is not then an HttpServer is used
2754
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2755
be used without needed to redo it when a different
2756
subclass is in use ?
2760
super(ChrootedTestCase, self).setUp()
2761
if not self.vfs_transport_factory == MemoryServer:
2762
self.transport_readonly_server = HttpServer
2765
def condition_id_re(pattern):
2766
"""Create a condition filter which performs a re check on a test's id.
2768
:param pattern: A regular expression string.
2769
:return: A callable that returns True if the re matches.
2771
filter_re = osutils.re_compile_checked(pattern, 0,
2773
def condition(test):
2775
return filter_re.search(test_id)
2779
def condition_isinstance(klass_or_klass_list):
2780
"""Create a condition filter which returns isinstance(param, klass).
2782
:return: A callable which when called with one parameter obj return the
2783
result of isinstance(obj, klass_or_klass_list).
2786
return isinstance(obj, klass_or_klass_list)
2790
def condition_id_in_list(id_list):
2791
"""Create a condition filter which verify that test's id in a list.
2793
:param id_list: A TestIdList object.
2794
:return: A callable that returns True if the test's id appears in the list.
2796
def condition(test):
2797
return id_list.includes(test.id())
2801
def condition_id_startswith(starts):
2802
"""Create a condition filter verifying that test's id starts with a string.
2804
:param starts: A list of string.
2805
:return: A callable that returns True if the test's id starts with one of
2808
def condition(test):
2809
for start in starts:
2810
if test.id().startswith(start):
2816
def exclude_tests_by_condition(suite, condition):
2817
"""Create a test suite which excludes some tests from suite.
2819
:param suite: The suite to get tests from.
2820
:param condition: A callable whose result evaluates True when called with a
2821
test case which should be excluded from the result.
2822
:return: A suite which contains the tests found in suite that fail
2826
for test in iter_suite_tests(suite):
2827
if not condition(test):
2829
return TestUtil.TestSuite(result)
2832
def filter_suite_by_condition(suite, condition):
2833
"""Create a test suite by filtering another one.
2835
:param suite: The source suite.
2836
:param condition: A callable whose result evaluates True when called with a
2837
test case which should be included in the result.
2838
:return: A suite which contains the tests found in suite that pass
2842
for test in iter_suite_tests(suite):
2845
return TestUtil.TestSuite(result)
2848
def filter_suite_by_re(suite, pattern):
2849
"""Create a test suite by filtering another one.
2851
:param suite: the source suite
2852
:param pattern: pattern that names must match
2853
:returns: the newly created suite
2855
condition = condition_id_re(pattern)
2856
result_suite = filter_suite_by_condition(suite, condition)
2860
def filter_suite_by_id_list(suite, test_id_list):
2861
"""Create a test suite by filtering another one.
2863
:param suite: The source suite.
2864
:param test_id_list: A list of the test ids to keep as strings.
2865
:returns: the newly created suite
2867
condition = condition_id_in_list(test_id_list)
2868
result_suite = filter_suite_by_condition(suite, condition)
2872
def filter_suite_by_id_startswith(suite, start):
2873
"""Create a test suite by filtering another one.
2875
:param suite: The source suite.
2876
:param start: A list of string the test id must start with one of.
2877
:returns: the newly created suite
2879
condition = condition_id_startswith(start)
2880
result_suite = filter_suite_by_condition(suite, condition)
2884
def exclude_tests_by_re(suite, pattern):
2885
"""Create a test suite which excludes some tests from suite.
2887
:param suite: The suite to get tests from.
2888
:param pattern: A regular expression string. Test ids that match this
2889
pattern will be excluded from the result.
2890
:return: A TestSuite that contains all the tests from suite without the
2891
tests that matched pattern. The order of tests is the same as it was in
2894
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2897
def preserve_input(something):
2898
"""A helper for performing test suite transformation chains.
2900
:param something: Anything you want to preserve.
2906
def randomize_suite(suite):
2907
"""Return a new TestSuite with suite's tests in random order.
2909
The tests in the input suite are flattened into a single suite in order to
2910
accomplish this. Any nested TestSuites are removed to provide global
2913
tests = list(iter_suite_tests(suite))
2914
random.shuffle(tests)
2915
return TestUtil.TestSuite(tests)
2918
def split_suite_by_condition(suite, condition):
2919
"""Split a test suite into two by a condition.
2921
:param suite: The suite to split.
2922
:param condition: The condition to match on. Tests that match this
2923
condition are returned in the first test suite, ones that do not match
2924
are in the second suite.
2925
:return: A tuple of two test suites, where the first contains tests from
2926
suite matching the condition, and the second contains the remainder
2927
from suite. The order within each output suite is the same as it was in
2932
for test in iter_suite_tests(suite):
2934
matched.append(test)
2936
did_not_match.append(test)
2937
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2940
def split_suite_by_re(suite, pattern):
2941
"""Split a test suite into two by a regular expression.
2943
:param suite: The suite to split.
2944
:param pattern: A regular expression string. Test ids that match this
2945
pattern will be in the first test suite returned, and the others in the
2946
second test suite returned.
2947
:return: A tuple of two test suites, where the first contains tests from
2948
suite matching pattern, and the second contains the remainder from
2949
suite. The order within each output suite is the same as it was in
2952
return split_suite_by_condition(suite, condition_id_re(pattern))
2955
def run_suite(suite, name='test', verbose=False, pattern=".*",
2956
stop_on_failure=False,
2957
transport=None, lsprof_timed=None, bench_history=None,
2958
matching_tests_first=None,
2961
exclude_pattern=None,
2964
suite_decorators=None,
2966
result_decorators=None,
2968
"""Run a test suite for bzr selftest.
2970
:param runner_class: The class of runner to use. Must support the
2971
constructor arguments passed by run_suite which are more than standard
2973
:return: A boolean indicating success.
2975
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2980
if runner_class is None:
2981
runner_class = TextTestRunner
2984
runner = runner_class(stream=stream,
2986
verbosity=verbosity,
2987
bench_history=bench_history,
2989
result_decorators=result_decorators,
2991
runner.stop_on_failure=stop_on_failure
2992
# built in decorator factories:
2994
random_order(random_seed, runner),
2995
exclude_tests(exclude_pattern),
2997
if matching_tests_first:
2998
decorators.append(tests_first(pattern))
3000
decorators.append(filter_tests(pattern))
3001
if suite_decorators:
3002
decorators.extend(suite_decorators)
3003
# tell the result object how many tests will be running: (except if
3004
# --parallel=fork is being used. Robert said he will provide a better
3005
# progress design later -- vila 20090817)
3006
if fork_decorator not in decorators:
3007
decorators.append(CountingDecorator)
3008
for decorator in decorators:
3009
suite = decorator(suite)
3011
# Done after test suite decoration to allow randomisation etc
3012
# to take effect, though that is of marginal benefit.
3014
stream.write("Listing tests only ...\n")
3015
for t in iter_suite_tests(suite):
3016
stream.write("%s\n" % (t.id()))
3018
result = runner.run(suite)
3020
return result.wasStrictlySuccessful()
3022
return result.wasSuccessful()
3025
# A registry where get() returns a suite decorator.
3026
parallel_registry = registry.Registry()
3029
def fork_decorator(suite):
3030
concurrency = osutils.local_concurrency()
3031
if concurrency == 1:
3033
from testtools import ConcurrentTestSuite
3034
return ConcurrentTestSuite(suite, fork_for_tests)
3035
parallel_registry.register('fork', fork_decorator)
3038
def subprocess_decorator(suite):
3039
concurrency = osutils.local_concurrency()
3040
if concurrency == 1:
3042
from testtools import ConcurrentTestSuite
3043
return ConcurrentTestSuite(suite, reinvoke_for_tests)
3044
parallel_registry.register('subprocess', subprocess_decorator)
3047
def exclude_tests(exclude_pattern):
3048
"""Return a test suite decorator that excludes tests."""
3049
if exclude_pattern is None:
3050
return identity_decorator
3051
def decorator(suite):
3052
return ExcludeDecorator(suite, exclude_pattern)
3056
def filter_tests(pattern):
3058
return identity_decorator
3059
def decorator(suite):
3060
return FilterTestsDecorator(suite, pattern)
3064
def random_order(random_seed, runner):
3065
"""Return a test suite decorator factory for randomising tests order.
3067
:param random_seed: now, a string which casts to a long, or a long.
3068
:param runner: A test runner with a stream attribute to report on.
3070
if random_seed is None:
3071
return identity_decorator
3072
def decorator(suite):
3073
return RandomDecorator(suite, random_seed, runner.stream)
3077
def tests_first(pattern):
3079
return identity_decorator
3080
def decorator(suite):
3081
return TestFirstDecorator(suite, pattern)
3085
def identity_decorator(suite):
3090
class TestDecorator(TestSuite):
3091
"""A decorator for TestCase/TestSuite objects.
3093
Usually, subclasses should override __iter__(used when flattening test
3094
suites), which we do to filter, reorder, parallelise and so on, run() and
3098
def __init__(self, suite):
3099
TestSuite.__init__(self)
3102
def countTestCases(self):
3105
cases += test.countTestCases()
3112
def run(self, result):
3113
# Use iteration on self, not self._tests, to allow subclasses to hook
3116
if result.shouldStop:
3122
class CountingDecorator(TestDecorator):
3123
"""A decorator which calls result.progress(self.countTestCases)."""
3125
def run(self, result):
3126
progress_method = getattr(result, 'progress', None)
3127
if callable(progress_method):
3128
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3129
return super(CountingDecorator, self).run(result)
3132
class ExcludeDecorator(TestDecorator):
3133
"""A decorator which excludes test matching an exclude pattern."""
3135
def __init__(self, suite, exclude_pattern):
3136
TestDecorator.__init__(self, suite)
3137
self.exclude_pattern = exclude_pattern
3138
self.excluded = False
3142
return iter(self._tests)
3143
self.excluded = True
3144
suite = exclude_tests_by_re(self, self.exclude_pattern)
3146
self.addTests(suite)
3147
return iter(self._tests)
3150
class FilterTestsDecorator(TestDecorator):
3151
"""A decorator which filters tests to those matching a pattern."""
3153
def __init__(self, suite, pattern):
3154
TestDecorator.__init__(self, suite)
3155
self.pattern = pattern
3156
self.filtered = False
3160
return iter(self._tests)
3161
self.filtered = True
3162
suite = filter_suite_by_re(self, self.pattern)
3164
self.addTests(suite)
3165
return iter(self._tests)
3168
class RandomDecorator(TestDecorator):
3169
"""A decorator which randomises the order of its tests."""
3171
def __init__(self, suite, random_seed, stream):
3172
TestDecorator.__init__(self, suite)
3173
self.random_seed = random_seed
3174
self.randomised = False
3175
self.stream = stream
3179
return iter(self._tests)
3180
self.randomised = True
3181
self.stream.writeln("Randomizing test order using seed %s\n" %
3182
(self.actual_seed()))
3183
# Initialise the random number generator.
3184
random.seed(self.actual_seed())
3185
suite = randomize_suite(self)
3187
self.addTests(suite)
3188
return iter(self._tests)
3190
def actual_seed(self):
3191
if self.random_seed == "now":
3192
# We convert the seed to a long to make it reuseable across
3193
# invocations (because the user can reenter it).
3194
self.random_seed = long(time.time())
3196
# Convert the seed to a long if we can
3198
self.random_seed = long(self.random_seed)
3201
return self.random_seed
3204
class TestFirstDecorator(TestDecorator):
3205
"""A decorator which moves named tests to the front."""
3207
def __init__(self, suite, pattern):
3208
TestDecorator.__init__(self, suite)
3209
self.pattern = pattern
3210
self.filtered = False
3214
return iter(self._tests)
3215
self.filtered = True
3216
suites = split_suite_by_re(self, self.pattern)
3218
self.addTests(suites)
3219
return iter(self._tests)
3222
def partition_tests(suite, count):
3223
"""Partition suite into count lists of tests."""
3225
tests = list(iter_suite_tests(suite))
3226
tests_per_process = int(math.ceil(float(len(tests)) / count))
3227
for block in range(count):
3228
low_test = block * tests_per_process
3229
high_test = low_test + tests_per_process
3230
process_tests = tests[low_test:high_test]
3231
result.append(process_tests)
3235
def fork_for_tests(suite):
3236
"""Take suite and start up one runner per CPU by forking()
3238
:return: An iterable of TestCase-like objects which can each have
3239
run(result) called on them to feed tests to result.
3241
concurrency = osutils.local_concurrency()
3243
from subunit import TestProtocolClient, ProtocolTestCase
3245
from subunit.test_results import AutoTimingTestResultDecorator
3247
AutoTimingTestResultDecorator = lambda x:x
3248
class TestInOtherProcess(ProtocolTestCase):
3249
# Should be in subunit, I think. RBC.
3250
def __init__(self, stream, pid):
3251
ProtocolTestCase.__init__(self, stream)
3254
def run(self, result):
3256
ProtocolTestCase.run(self, result)
3258
os.waitpid(self.pid, os.WNOHANG)
3260
test_blocks = partition_tests(suite, concurrency)
3261
for process_tests in test_blocks:
3262
process_suite = TestSuite()
3263
process_suite.addTests(process_tests)
3264
c2pread, c2pwrite = os.pipe()
3269
# Leave stderr and stdout open so we can see test noise
3270
# Close stdin so that the child goes away if it decides to
3271
# read from stdin (otherwise its a roulette to see what
3272
# child actually gets keystrokes for pdb etc).
3275
stream = os.fdopen(c2pwrite, 'wb', 1)
3276
subunit_result = AutoTimingTestResultDecorator(
3277
TestProtocolClient(stream))
3278
process_suite.run(subunit_result)
3283
stream = os.fdopen(c2pread, 'rb', 1)
3284
test = TestInOtherProcess(stream, pid)
3289
def reinvoke_for_tests(suite):
3290
"""Take suite and start up one runner per CPU using subprocess().
3292
:return: An iterable of TestCase-like objects which can each have
3293
run(result) called on them to feed tests to result.
3295
concurrency = osutils.local_concurrency()
3297
from subunit import ProtocolTestCase
3298
class TestInSubprocess(ProtocolTestCase):
3299
def __init__(self, process, name):
3300
ProtocolTestCase.__init__(self, process.stdout)
3301
self.process = process
3302
self.process.stdin.close()
3305
def run(self, result):
3307
ProtocolTestCase.run(self, result)
3310
os.unlink(self.name)
3311
# print "pid %d finished" % finished_process
3312
test_blocks = partition_tests(suite, concurrency)
3313
for process_tests in test_blocks:
3314
# ugly; currently reimplement rather than reuses TestCase methods.
3315
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3316
if not os.path.isfile(bzr_path):
3317
# We are probably installed. Assume sys.argv is the right file
3318
bzr_path = sys.argv[0]
3319
fd, test_list_file_name = tempfile.mkstemp()
3320
test_list_file = os.fdopen(fd, 'wb', 1)
3321
for test in process_tests:
3322
test_list_file.write(test.id() + '\n')
3323
test_list_file.close()
3325
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3327
if '--no-plugins' in sys.argv:
3328
argv.append('--no-plugins')
3329
# stderr=STDOUT would be ideal, but until we prevent noise on
3330
# stderr it can interrupt the subunit protocol.
3331
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3333
test = TestInSubprocess(process, test_list_file_name)
3336
os.unlink(test_list_file_name)
3341
class ForwardingResult(unittest.TestResult):
3343
def __init__(self, target):
3344
unittest.TestResult.__init__(self)
3345
self.result = target
3347
def startTest(self, test):
3348
self.result.startTest(test)
3350
def stopTest(self, test):
3351
self.result.stopTest(test)
3353
def startTestRun(self):
3354
self.result.startTestRun()
3356
def stopTestRun(self):
3357
self.result.stopTestRun()
3359
def addSkip(self, test, reason):
3360
self.result.addSkip(test, reason)
3362
def addSuccess(self, test):
3363
self.result.addSuccess(test)
3365
def addError(self, test, err):
3366
self.result.addError(test, err)
3368
def addFailure(self, test, err):
3369
self.result.addFailure(test, err)
3372
class BZRTransformingResult(ForwardingResult):
3374
def addError(self, test, err):
3375
feature = self._error_looks_like('UnavailableFeature: ', err)
3376
if feature is not None:
3377
self.result.addNotSupported(test, feature)
3379
self.result.addError(test, err)
3381
def addFailure(self, test, err):
3382
known = self._error_looks_like('KnownFailure: ', err)
3383
if known is not None:
3384
self.result._addKnownFailure(test, [KnownFailure,
3385
KnownFailure(known), None])
3387
self.result.addFailure(test, err)
3389
def _error_looks_like(self, prefix, err):
3390
"""Deserialize exception and returns the stringify value."""
3394
if isinstance(exc, subunit.RemoteException):
3395
# stringify the exception gives access to the remote traceback
3396
# We search the last line for 'prefix'
3397
lines = str(exc).split('\n')
3398
while lines and not lines[-1]:
3401
if lines[-1].startswith(prefix):
3402
value = lines[-1][len(prefix):]
3406
class ProfileResult(ForwardingResult):
3407
"""Generate profiling data for all activity between start and success.
3409
The profile data is appended to the test's _benchcalls attribute and can
3410
be accessed by the forwarded-to TestResult.
3412
While it might be cleaner do accumulate this in stopTest, addSuccess is
3413
where our existing output support for lsprof is, and this class aims to
3414
fit in with that: while it could be moved it's not necessary to accomplish
3415
test profiling, nor would it be dramatically cleaner.
3418
def startTest(self, test):
3419
self.profiler = bzrlib.lsprof.BzrProfiler()
3420
self.profiler.start()
3421
ForwardingResult.startTest(self, test)
3423
def addSuccess(self, test):
3424
stats = self.profiler.stop()
3426
calls = test._benchcalls
3427
except AttributeError:
3428
test._benchcalls = []
3429
calls = test._benchcalls
3430
calls.append(((test.id(), "", ""), stats))
3431
ForwardingResult.addSuccess(self, test)
3433
def stopTest(self, test):
3434
ForwardingResult.stopTest(self, test)
3435
self.profiler = None
3438
# Controlled by "bzr selftest -E=..." option
3439
# Currently supported:
3440
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3441
# preserves any flags supplied at the command line.
3442
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3443
# rather than failing tests. And no longer raise
3444
# LockContention when fctnl locks are not being used
3445
# with proper exclusion rules.
3446
selftest_debug_flags = set()
3449
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3451
test_suite_factory=None,
3454
matching_tests_first=None,
3457
exclude_pattern=None,
3463
suite_decorators=None,
3467
"""Run the whole test suite under the enhanced runner"""
3468
# XXX: Very ugly way to do this...
3469
# Disable warning about old formats because we don't want it to disturb
3470
# any blackbox tests.
3471
from bzrlib import repository
3472
repository._deprecation_warning_done = True
3474
global default_transport
3475
if transport is None:
3476
transport = default_transport
3477
old_transport = default_transport
3478
default_transport = transport
3479
global selftest_debug_flags
3480
old_debug_flags = selftest_debug_flags
3481
if debug_flags is not None:
3482
selftest_debug_flags = set(debug_flags)
3484
if load_list is None:
3487
keep_only = load_test_id_list(load_list)
3489
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3490
for start in starting_with]
3491
if test_suite_factory is None:
3492
# Reduce loading time by loading modules based on the starting_with
3494
suite = test_suite(keep_only, starting_with)
3496
suite = test_suite_factory()
3498
# But always filter as requested.
3499
suite = filter_suite_by_id_startswith(suite, starting_with)
3500
result_decorators = []
3502
result_decorators.append(ProfileResult)
3503
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3504
stop_on_failure=stop_on_failure,
3505
transport=transport,
3506
lsprof_timed=lsprof_timed,
3507
bench_history=bench_history,
3508
matching_tests_first=matching_tests_first,
3509
list_only=list_only,
3510
random_seed=random_seed,
3511
exclude_pattern=exclude_pattern,
3513
runner_class=runner_class,
3514
suite_decorators=suite_decorators,
3516
result_decorators=result_decorators,
3519
default_transport = old_transport
3520
selftest_debug_flags = old_debug_flags
3523
def load_test_id_list(file_name):
3524
"""Load a test id list from a text file.
3526
The format is one test id by line. No special care is taken to impose
3527
strict rules, these test ids are used to filter the test suite so a test id
3528
that do not match an existing test will do no harm. This allows user to add
3529
comments, leave blank lines, etc.
3533
ftest = open(file_name, 'rt')
3535
if e.errno != errno.ENOENT:
3538
raise errors.NoSuchFile(file_name)
3540
for test_name in ftest.readlines():
3541
test_list.append(test_name.strip())
3546
def suite_matches_id_list(test_suite, id_list):
3547
"""Warns about tests not appearing or appearing more than once.
3549
:param test_suite: A TestSuite object.
3550
:param test_id_list: The list of test ids that should be found in
3553
:return: (absents, duplicates) absents is a list containing the test found
3554
in id_list but not in test_suite, duplicates is a list containing the
3555
test found multiple times in test_suite.
3557
When using a prefined test id list, it may occurs that some tests do not
3558
exist anymore or that some tests use the same id. This function warns the
3559
tester about potential problems in his workflow (test lists are volatile)
3560
or in the test suite itself (using the same id for several tests does not
3561
help to localize defects).
3563
# Build a dict counting id occurrences
3565
for test in iter_suite_tests(test_suite):
3567
tests[id] = tests.get(id, 0) + 1
3572
occurs = tests.get(id, 0)
3574
not_found.append(id)
3576
duplicates.append(id)
3578
return not_found, duplicates
3581
class TestIdList(object):
3582
"""Test id list to filter a test suite.
3584
Relying on the assumption that test ids are built as:
3585
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3586
notation, this class offers methods to :
3587
- avoid building a test suite for modules not refered to in the test list,
3588
- keep only the tests listed from the module test suite.
3591
def __init__(self, test_id_list):
3592
# When a test suite needs to be filtered against us we compare test ids
3593
# for equality, so a simple dict offers a quick and simple solution.
3594
self.tests = dict().fromkeys(test_id_list, True)
3596
# While unittest.TestCase have ids like:
3597
# <module>.<class>.<method>[(<param+)],
3598
# doctest.DocTestCase can have ids like:
3601
# <module>.<function>
3602
# <module>.<class>.<method>
3604
# Since we can't predict a test class from its name only, we settle on
3605
# a simple constraint: a test id always begins with its module name.
3608
for test_id in test_id_list:
3609
parts = test_id.split('.')
3610
mod_name = parts.pop(0)
3611
modules[mod_name] = True
3613
mod_name += '.' + part
3614
modules[mod_name] = True
3615
self.modules = modules
3617
def refers_to(self, module_name):
3618
"""Is there tests for the module or one of its sub modules."""
3619
return self.modules.has_key(module_name)
3621
def includes(self, test_id):
3622
return self.tests.has_key(test_id)
3625
class TestPrefixAliasRegistry(registry.Registry):
3626
"""A registry for test prefix aliases.
3628
This helps implement shorcuts for the --starting-with selftest
3629
option. Overriding existing prefixes is not allowed but not fatal (a
3630
warning will be emitted).
3633
def register(self, key, obj, help=None, info=None,
3634
override_existing=False):
3635
"""See Registry.register.
3637
Trying to override an existing alias causes a warning to be emitted,
3638
not a fatal execption.
3641
super(TestPrefixAliasRegistry, self).register(
3642
key, obj, help=help, info=info, override_existing=False)
3644
actual = self.get(key)
3645
note('Test prefix alias %s is already used for %s, ignoring %s'
3646
% (key, actual, obj))
3648
def resolve_alias(self, id_start):
3649
"""Replace the alias by the prefix in the given string.
3651
Using an unknown prefix is an error to help catching typos.
3653
parts = id_start.split('.')
3655
parts[0] = self.get(parts[0])
3657
raise errors.BzrCommandError(
3658
'%s is not a known test prefix alias' % parts[0])
3659
return '.'.join(parts)
3662
test_prefix_alias_registry = TestPrefixAliasRegistry()
3663
"""Registry of test prefix aliases."""
3666
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3667
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3668
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3670
# Obvious higest levels prefixes, feel free to add your own via a plugin
3671
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3672
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3673
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3674
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3675
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3678
def _test_suite_testmod_names():
3679
"""Return the standard list of test module names to test."""
3682
'bzrlib.tests.blackbox',
3683
'bzrlib.tests.commands',
3684
'bzrlib.tests.per_branch',
3685
'bzrlib.tests.per_bzrdir',
3686
'bzrlib.tests.per_foreign_vcs',
3687
'bzrlib.tests.per_interrepository',
3688
'bzrlib.tests.per_intertree',
3689
'bzrlib.tests.per_inventory',
3690
'bzrlib.tests.per_interbranch',
3691
'bzrlib.tests.per_lock',
3692
'bzrlib.tests.per_transport',
3693
'bzrlib.tests.per_tree',
3694
'bzrlib.tests.per_pack_repository',
3695
'bzrlib.tests.per_repository',
3696
'bzrlib.tests.per_repository_chk',
3697
'bzrlib.tests.per_repository_reference',
3698
'bzrlib.tests.per_uifactory',
3699
'bzrlib.tests.per_versionedfile',
3700
'bzrlib.tests.per_workingtree',
3701
'bzrlib.tests.test__annotator',
3702
'bzrlib.tests.test__chk_map',
3703
'bzrlib.tests.test__dirstate_helpers',
3704
'bzrlib.tests.test__groupcompress',
3705
'bzrlib.tests.test__known_graph',
3706
'bzrlib.tests.test__rio',
3707
'bzrlib.tests.test__simple_set',
3708
'bzrlib.tests.test__static_tuple',
3709
'bzrlib.tests.test__walkdirs_win32',
3710
'bzrlib.tests.test_ancestry',
3711
'bzrlib.tests.test_annotate',
3712
'bzrlib.tests.test_api',
3713
'bzrlib.tests.test_atomicfile',
3714
'bzrlib.tests.test_bad_files',
3715
'bzrlib.tests.test_bencode',
3716
'bzrlib.tests.test_bisect_multi',
3717
'bzrlib.tests.test_branch',
3718
'bzrlib.tests.test_branchbuilder',
3719
'bzrlib.tests.test_btree_index',
3720
'bzrlib.tests.test_bugtracker',
3721
'bzrlib.tests.test_bundle',
3722
'bzrlib.tests.test_bzrdir',
3723
'bzrlib.tests.test__chunks_to_lines',
3724
'bzrlib.tests.test_cache_utf8',
3725
'bzrlib.tests.test_chk_map',
3726
'bzrlib.tests.test_chk_serializer',
3727
'bzrlib.tests.test_chunk_writer',
3728
'bzrlib.tests.test_clean_tree',
3729
'bzrlib.tests.test_cleanup',
3730
'bzrlib.tests.test_commands',
3731
'bzrlib.tests.test_commit',
3732
'bzrlib.tests.test_commit_merge',
3733
'bzrlib.tests.test_config',
3734
'bzrlib.tests.test_conflicts',
3735
'bzrlib.tests.test_counted_lock',
3736
'bzrlib.tests.test_crash',
3737
'bzrlib.tests.test_decorators',
3738
'bzrlib.tests.test_delta',
3739
'bzrlib.tests.test_debug',
3740
'bzrlib.tests.test_deprecated_graph',
3741
'bzrlib.tests.test_diff',
3742
'bzrlib.tests.test_directory_service',
3743
'bzrlib.tests.test_dirstate',
3744
'bzrlib.tests.test_email_message',
3745
'bzrlib.tests.test_eol_filters',
3746
'bzrlib.tests.test_errors',
3747
'bzrlib.tests.test_export',
3748
'bzrlib.tests.test_extract',
3749
'bzrlib.tests.test_fetch',
3750
'bzrlib.tests.test_fifo_cache',
3751
'bzrlib.tests.test_filters',
3752
'bzrlib.tests.test_ftp_transport',
3753
'bzrlib.tests.test_foreign',
3754
'bzrlib.tests.test_generate_docs',
3755
'bzrlib.tests.test_generate_ids',
3756
'bzrlib.tests.test_globbing',
3757
'bzrlib.tests.test_gpg',
3758
'bzrlib.tests.test_graph',
3759
'bzrlib.tests.test_groupcompress',
3760
'bzrlib.tests.test_hashcache',
3761
'bzrlib.tests.test_help',
3762
'bzrlib.tests.test_hooks',
3763
'bzrlib.tests.test_http',
3764
'bzrlib.tests.test_http_response',
3765
'bzrlib.tests.test_https_ca_bundle',
3766
'bzrlib.tests.test_identitymap',
3767
'bzrlib.tests.test_ignores',
3768
'bzrlib.tests.test_index',
3769
'bzrlib.tests.test_info',
3770
'bzrlib.tests.test_inv',
3771
'bzrlib.tests.test_inventory_delta',
3772
'bzrlib.tests.test_knit',
3773
'bzrlib.tests.test_lazy_import',
3774
'bzrlib.tests.test_lazy_regex',
3775
'bzrlib.tests.test_lock',
3776
'bzrlib.tests.test_lockable_files',
3777
'bzrlib.tests.test_lockdir',
3778
'bzrlib.tests.test_log',
3779
'bzrlib.tests.test_lru_cache',
3780
'bzrlib.tests.test_lsprof',
3781
'bzrlib.tests.test_mail_client',
3782
'bzrlib.tests.test_memorytree',
3783
'bzrlib.tests.test_merge',
3784
'bzrlib.tests.test_merge3',
3785
'bzrlib.tests.test_merge_core',
3786
'bzrlib.tests.test_merge_directive',
3787
'bzrlib.tests.test_missing',
3788
'bzrlib.tests.test_msgeditor',
3789
'bzrlib.tests.test_multiparent',
3790
'bzrlib.tests.test_mutabletree',
3791
'bzrlib.tests.test_nonascii',
3792
'bzrlib.tests.test_options',
3793
'bzrlib.tests.test_osutils',
3794
'bzrlib.tests.test_osutils_encodings',
3795
'bzrlib.tests.test_pack',
3796
'bzrlib.tests.test_patch',
3797
'bzrlib.tests.test_patches',
3798
'bzrlib.tests.test_permissions',
3799
'bzrlib.tests.test_plugins',
3800
'bzrlib.tests.test_progress',
3801
'bzrlib.tests.test_read_bundle',
3802
'bzrlib.tests.test_reconcile',
3803
'bzrlib.tests.test_reconfigure',
3804
'bzrlib.tests.test_registry',
3805
'bzrlib.tests.test_remote',
3806
'bzrlib.tests.test_rename_map',
3807
'bzrlib.tests.test_repository',
3808
'bzrlib.tests.test_revert',
3809
'bzrlib.tests.test_revision',
3810
'bzrlib.tests.test_revisionspec',
3811
'bzrlib.tests.test_revisiontree',
3812
'bzrlib.tests.test_rio',
3813
'bzrlib.tests.test_rules',
3814
'bzrlib.tests.test_sampler',
3815
'bzrlib.tests.test_script',
3816
'bzrlib.tests.test_selftest',
3817
'bzrlib.tests.test_serializer',
3818
'bzrlib.tests.test_setup',
3819
'bzrlib.tests.test_sftp_transport',
3820
'bzrlib.tests.test_shelf',
3821
'bzrlib.tests.test_shelf_ui',
3822
'bzrlib.tests.test_smart',
3823
'bzrlib.tests.test_smart_add',
3824
'bzrlib.tests.test_smart_request',
3825
'bzrlib.tests.test_smart_transport',
3826
'bzrlib.tests.test_smtp_connection',
3827
'bzrlib.tests.test_source',
3828
'bzrlib.tests.test_ssh_transport',
3829
'bzrlib.tests.test_status',
3830
'bzrlib.tests.test_store',
3831
'bzrlib.tests.test_strace',
3832
'bzrlib.tests.test_subsume',
3833
'bzrlib.tests.test_switch',
3834
'bzrlib.tests.test_symbol_versioning',
3835
'bzrlib.tests.test_tag',
3836
'bzrlib.tests.test_testament',
3837
'bzrlib.tests.test_textfile',
3838
'bzrlib.tests.test_textmerge',
3839
'bzrlib.tests.test_timestamp',
3840
'bzrlib.tests.test_trace',
3841
'bzrlib.tests.test_transactions',
3842
'bzrlib.tests.test_transform',
3843
'bzrlib.tests.test_transport',
3844
'bzrlib.tests.test_transport_log',
3845
'bzrlib.tests.test_tree',
3846
'bzrlib.tests.test_treebuilder',
3847
'bzrlib.tests.test_tsort',
3848
'bzrlib.tests.test_tuned_gzip',
3849
'bzrlib.tests.test_ui',
3850
'bzrlib.tests.test_uncommit',
3851
'bzrlib.tests.test_upgrade',
3852
'bzrlib.tests.test_upgrade_stacked',
3853
'bzrlib.tests.test_urlutils',
3854
'bzrlib.tests.test_version',
3855
'bzrlib.tests.test_version_info',
3856
'bzrlib.tests.test_weave',
3857
'bzrlib.tests.test_whitebox',
3858
'bzrlib.tests.test_win32utils',
3859
'bzrlib.tests.test_workingtree',
3860
'bzrlib.tests.test_workingtree_4',
3861
'bzrlib.tests.test_wsgi',
3862
'bzrlib.tests.test_xml',
3866
def _test_suite_modules_to_doctest():
3867
"""Return the list of modules to doctest."""
3870
'bzrlib.branchbuilder',
3873
'bzrlib.iterablefile',
3877
'bzrlib.symbol_versioning',
3880
'bzrlib.version_info_formats.format_custom',
3884
def test_suite(keep_only=None, starting_with=None):
3885
"""Build and return TestSuite for the whole of bzrlib.
3887
:param keep_only: A list of test ids limiting the suite returned.
3889
:param starting_with: An id limiting the suite returned to the tests
3892
This function can be replaced if you need to change the default test
3893
suite on a global basis, but it is not encouraged.
3896
loader = TestUtil.TestLoader()
3898
if keep_only is not None:
3899
id_filter = TestIdList(keep_only)
3901
# We take precedence over keep_only because *at loading time* using
3902
# both options means we will load less tests for the same final result.
3903
def interesting_module(name):
3904
for start in starting_with:
3906
# Either the module name starts with the specified string
3907
name.startswith(start)
3908
# or it may contain tests starting with the specified string
3909
or start.startswith(name)
3913
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3915
elif keep_only is not None:
3916
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3917
def interesting_module(name):
3918
return id_filter.refers_to(name)
3921
loader = TestUtil.TestLoader()
3922
def interesting_module(name):
3923
# No filtering, all modules are interesting
3926
suite = loader.suiteClass()
3928
# modules building their suite with loadTestsFromModuleNames
3929
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3931
for mod in _test_suite_modules_to_doctest():
3932
if not interesting_module(mod):
3933
# No tests to keep here, move along
3936
# note that this really does mean "report only" -- doctest
3937
# still runs the rest of the examples
3938
doc_suite = doctest.DocTestSuite(mod,
3939
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3940
except ValueError, e:
3941
print '**failed to get doctest for: %s\n%s' % (mod, e)
3943
if len(doc_suite._tests) == 0:
3944
raise errors.BzrError("no doctests found in %s" % (mod,))
3945
suite.addTest(doc_suite)
3947
default_encoding = sys.getdefaultencoding()
3948
for name, plugin in bzrlib.plugin.plugins().items():
3949
if not interesting_module(plugin.module.__name__):
3951
plugin_suite = plugin.test_suite()
3952
# We used to catch ImportError here and turn it into just a warning,
3953
# but really if you don't have --no-plugins this should be a failure.
3954
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3955
if plugin_suite is None:
3956
plugin_suite = plugin.load_plugin_tests(loader)
3957
if plugin_suite is not None:
3958
suite.addTest(plugin_suite)
3959
if default_encoding != sys.getdefaultencoding():
3960
bzrlib.trace.warning(
3961
'Plugin "%s" tried to reset default encoding to: %s', name,
3962
sys.getdefaultencoding())
3964
sys.setdefaultencoding(default_encoding)
3966
if keep_only is not None:
3967
# Now that the referred modules have loaded their tests, keep only the
3969
suite = filter_suite_by_id_list(suite, id_filter)
3970
# Do some sanity checks on the id_list filtering
3971
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3973
# The tester has used both keep_only and starting_with, so he is
3974
# already aware that some tests are excluded from the list, there
3975
# is no need to tell him which.
3978
# Some tests mentioned in the list are not in the test suite. The
3979
# list may be out of date, report to the tester.
3980
for id in not_found:
3981
bzrlib.trace.warning('"%s" not found in the test suite', id)
3982
for id in duplicates:
3983
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3988
def multiply_scenarios(scenarios_left, scenarios_right):
3989
"""Multiply two sets of scenarios.
3991
:returns: the cartesian product of the two sets of scenarios, that is
3992
a scenario for every possible combination of a left scenario and a
3996
('%s,%s' % (left_name, right_name),
3997
dict(left_dict.items() + right_dict.items()))
3998
for left_name, left_dict in scenarios_left
3999
for right_name, right_dict in scenarios_right]
4002
def multiply_tests(tests, scenarios, result):
4003
"""Multiply tests_list by scenarios into result.
4005
This is the core workhorse for test parameterisation.
4007
Typically the load_tests() method for a per-implementation test suite will
4008
call multiply_tests and return the result.
4010
:param tests: The tests to parameterise.
4011
:param scenarios: The scenarios to apply: pairs of (scenario_name,
4012
scenario_param_dict).
4013
:param result: A TestSuite to add created tests to.
4015
This returns the passed in result TestSuite with the cross product of all
4016
the tests repeated once for each scenario. Each test is adapted by adding
4017
the scenario name at the end of its id(), and updating the test object's
4018
__dict__ with the scenario_param_dict.
4020
>>> import bzrlib.tests.test_sampler
4021
>>> r = multiply_tests(
4022
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4023
... [('one', dict(param=1)),
4024
... ('two', dict(param=2))],
4026
>>> tests = list(iter_suite_tests(r))
4030
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4036
for test in iter_suite_tests(tests):
4037
apply_scenarios(test, scenarios, result)
4041
def apply_scenarios(test, scenarios, result):
4042
"""Apply the scenarios in scenarios to test and add to result.
4044
:param test: The test to apply scenarios to.
4045
:param scenarios: An iterable of scenarios to apply to test.
4047
:seealso: apply_scenario
4049
for scenario in scenarios:
4050
result.addTest(apply_scenario(test, scenario))
4054
def apply_scenario(test, scenario):
4055
"""Copy test and apply scenario to it.
4057
:param test: A test to adapt.
4058
:param scenario: A tuple describing the scenarion.
4059
The first element of the tuple is the new test id.
4060
The second element is a dict containing attributes to set on the
4062
:return: The adapted test.
4064
new_id = "%s(%s)" % (test.id(), scenario[0])
4065
new_test = clone_test(test, new_id)
4066
for name, value in scenario[1].items():
4067
setattr(new_test, name, value)
4071
def clone_test(test, new_id):
4072
"""Clone a test giving it a new id.
4074
:param test: The test to clone.
4075
:param new_id: The id to assign to it.
4076
:return: The new test.
4078
new_test = copy(test)
4079
new_test.id = lambda: new_id
4083
def _rmtree_temp_dir(dirname):
4084
# If LANG=C we probably have created some bogus paths
4085
# which rmtree(unicode) will fail to delete
4086
# so make sure we are using rmtree(str) to delete everything
4087
# except on win32, where rmtree(str) will fail
4088
# since it doesn't have the property of byte-stream paths
4089
# (they are either ascii or mbcs)
4090
if sys.platform == 'win32':
4091
# make sure we are using the unicode win32 api
4092
dirname = unicode(dirname)
4094
dirname = dirname.encode(sys.getfilesystemencoding())
4096
osutils.rmtree(dirname)
4098
# We don't want to fail here because some useful display will be lost
4099
# otherwise. Polluting the tmp dir is bad, but not giving all the
4100
# possible info to the test runner is even worse.
4101
sys.stderr.write('Unable to remove testing dir %s\n%s'
4102
% (os.path.basename(dirname), e))
4105
class Feature(object):
4106
"""An operating system Feature."""
4109
self._available = None
4111
def available(self):
4112
"""Is the feature available?
4114
:return: True if the feature is available.
4116
if self._available is None:
4117
self._available = self._probe()
4118
return self._available
4121
"""Implement this method in concrete features.
4123
:return: True if the feature is available.
4125
raise NotImplementedError
4128
if getattr(self, 'feature_name', None):
4129
return self.feature_name()
4130
return self.__class__.__name__
4133
class _SymlinkFeature(Feature):
4136
return osutils.has_symlinks()
4138
def feature_name(self):
4141
SymlinkFeature = _SymlinkFeature()
4144
class _HardlinkFeature(Feature):
4147
return osutils.has_hardlinks()
4149
def feature_name(self):
4152
HardlinkFeature = _HardlinkFeature()
4155
class _OsFifoFeature(Feature):
4158
return getattr(os, 'mkfifo', None)
4160
def feature_name(self):
4161
return 'filesystem fifos'
4163
OsFifoFeature = _OsFifoFeature()
4166
class _UnicodeFilenameFeature(Feature):
4167
"""Does the filesystem support Unicode filenames?"""
4171
# Check for character combinations unlikely to be covered by any
4172
# single non-unicode encoding. We use the characters
4173
# - greek small letter alpha (U+03B1) and
4174
# - braille pattern dots-123456 (U+283F).
4175
os.stat(u'\u03b1\u283f')
4176
except UnicodeEncodeError:
4178
except (IOError, OSError):
4179
# The filesystem allows the Unicode filename but the file doesn't
4183
# The filesystem allows the Unicode filename and the file exists,
4187
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4190
def probe_unicode_in_user_encoding():
4191
"""Try to encode several unicode strings to use in unicode-aware tests.
4192
Return first successfull match.
4194
:return: (unicode value, encoded plain string value) or (None, None)
4196
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4197
for uni_val in possible_vals:
4199
str_val = uni_val.encode(osutils.get_user_encoding())
4200
except UnicodeEncodeError:
4201
# Try a different character
4204
return uni_val, str_val
4208
def probe_bad_non_ascii(encoding):
4209
"""Try to find [bad] character with code [128..255]
4210
that cannot be decoded to unicode in some encoding.
4211
Return None if all non-ascii characters is valid
4214
for i in xrange(128, 256):
4217
char.decode(encoding)
4218
except UnicodeDecodeError:
4223
class _HTTPSServerFeature(Feature):
4224
"""Some tests want an https Server, check if one is available.
4226
Right now, the only way this is available is under python2.6 which provides
4237
def feature_name(self):
4238
return 'HTTPSServer'
4241
HTTPSServerFeature = _HTTPSServerFeature()
4244
class _ParamikoFeature(Feature):
4245
"""Is paramiko available?"""
4249
from bzrlib.transport.sftp import SFTPAbsoluteServer
4251
except errors.ParamikoNotPresent:
4254
def feature_name(self):
4258
ParamikoFeature = _ParamikoFeature()
4261
class _UnicodeFilename(Feature):
4262
"""Does the filesystem support Unicode filenames?"""
4267
except UnicodeEncodeError:
4269
except (IOError, OSError):
4270
# The filesystem allows the Unicode filename but the file doesn't
4274
# The filesystem allows the Unicode filename and the file exists,
4278
UnicodeFilename = _UnicodeFilename()
4281
class _UTF8Filesystem(Feature):
4282
"""Is the filesystem UTF-8?"""
4285
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4289
UTF8Filesystem = _UTF8Filesystem()
4292
class _BreakinFeature(Feature):
4293
"""Does this platform support the breakin feature?"""
4296
from bzrlib import breakin
4297
if breakin.determine_signal() is None:
4299
if sys.platform == 'win32':
4300
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4301
# We trigger SIGBREAK via a Console api so we need ctypes to
4302
# access the function
4307
def feature_name(self):
4308
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4311
BreakinFeature = _BreakinFeature()
4314
class _CaseInsCasePresFilenameFeature(Feature):
4315
"""Is the file-system case insensitive, but case-preserving?"""
4318
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4320
# first check truly case-preserving for created files, then check
4321
# case insensitive when opening existing files.
4322
name = osutils.normpath(name)
4323
base, rel = osutils.split(name)
4324
found_rel = osutils.canonical_relpath(base, name)
4325
return (found_rel == rel
4326
and os.path.isfile(name.upper())
4327
and os.path.isfile(name.lower()))
4332
def feature_name(self):
4333
return "case-insensitive case-preserving filesystem"
4335
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4338
class _CaseInsensitiveFilesystemFeature(Feature):
4339
"""Check if underlying filesystem is case-insensitive but *not* case
4342
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4343
# more likely to be case preserving, so this case is rare.
4346
if CaseInsCasePresFilenameFeature.available():
4349
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4350
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4351
TestCaseWithMemoryTransport.TEST_ROOT = root
4353
root = TestCaseWithMemoryTransport.TEST_ROOT
4354
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4356
name_a = osutils.pathjoin(tdir, 'a')
4357
name_A = osutils.pathjoin(tdir, 'A')
4359
result = osutils.isdir(name_A)
4360
_rmtree_temp_dir(tdir)
4363
def feature_name(self):
4364
return 'case-insensitive filesystem'
4366
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4369
class _SubUnitFeature(Feature):
4370
"""Check if subunit is available."""
4379
def feature_name(self):
4382
SubUnitFeature = _SubUnitFeature()
4383
# Only define SubUnitBzrRunner if subunit is available.
4385
from subunit import TestProtocolClient
4387
from subunit.test_results import AutoTimingTestResultDecorator
4389
AutoTimingTestResultDecorator = lambda x:x
4390
class SubUnitBzrRunner(TextTestRunner):
4391
def run(self, test):
4392
result = AutoTimingTestResultDecorator(
4393
TestProtocolClient(self.stream))