1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
32
from cStringIO import StringIO
39
from pprint import pformat
44
from subprocess import Popen, PIPE, STDOUT
54
from testtools import content
74
import bzrlib.commands
75
import bzrlib.timestamp
77
import bzrlib.inventory
78
import bzrlib.iterablefile
83
# lsprof not available
85
from bzrlib.merge import merge_inner
88
from bzrlib.smart import client, request, server
90
from bzrlib import symbol_versioning
91
from bzrlib.symbol_versioning import (
99
from bzrlib.transport import get_transport, pathfilter
100
import bzrlib.transport
101
from bzrlib.transport.local import LocalURLServer
102
from bzrlib.transport.memory import MemoryServer
103
from bzrlib.transport.readonly import ReadonlyServer
104
from bzrlib.trace import mutter, note
105
from bzrlib.tests import TestUtil
106
from bzrlib.tests.http_server import HttpServer
107
from bzrlib.tests.TestUtil import (
111
from bzrlib.tests.treeshape import build_tree_contents
112
from bzrlib.ui import NullProgressView
113
from bzrlib.ui.text import TextUIFactory
114
import bzrlib.version_info_formats.format_custom
115
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
117
# Mark this python module as being part of the implementation
118
# of unittest: this gives us better tracebacks where the last
119
# shown frame is the test code, not our assertXYZ.
122
default_transport = LocalURLServer
124
# Subunit result codes, defined here to prevent a hard dependency on subunit.
129
class ExtendedTestResult(unittest._TextTestResult):
130
"""Accepts, reports and accumulates the results of running tests.
132
Compared to the unittest version this class adds support for
133
profiling, benchmarking, stopping as soon as a test fails, and
134
skipping tests. There are further-specialized subclasses for
135
different types of display.
137
When a test finishes, in whatever way, it calls one of the addSuccess,
138
addFailure or addError classes. These in turn may redirect to a more
139
specific case for the special test results supported by our extended
142
Note that just one of these objects is fed the results from many tests.
147
def __init__(self, stream, descriptions, verbosity,
151
"""Construct new TestResult.
153
:param bench_history: Optionally, a writable file object to accumulate
156
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
157
if bench_history is not None:
158
from bzrlib.version import _get_bzr_source_tree
159
src_tree = _get_bzr_source_tree()
162
revision_id = src_tree.get_parent_ids()[0]
164
# XXX: if this is a brand new tree, do the same as if there
168
# XXX: If there's no branch, what should we do?
170
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
171
self._bench_history = bench_history
172
self.ui = ui.ui_factory
175
self.failure_count = 0
176
self.known_failure_count = 0
178
self.not_applicable_count = 0
179
self.unsupported = {}
181
self._overall_start_time = time.time()
182
self._strict = strict
184
def stopTestRun(self):
187
stopTime = time.time()
188
timeTaken = stopTime - self.startTime
190
self.stream.writeln(self.separator2)
191
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
192
run, run != 1 and "s" or "", timeTaken))
193
self.stream.writeln()
194
if not self.wasSuccessful():
195
self.stream.write("FAILED (")
196
failed, errored = map(len, (self.failures, self.errors))
198
self.stream.write("failures=%d" % failed)
200
if failed: self.stream.write(", ")
201
self.stream.write("errors=%d" % errored)
202
if self.known_failure_count:
203
if failed or errored: self.stream.write(", ")
204
self.stream.write("known_failure_count=%d" %
205
self.known_failure_count)
206
self.stream.writeln(")")
208
if self.known_failure_count:
209
self.stream.writeln("OK (known_failures=%d)" %
210
self.known_failure_count)
212
self.stream.writeln("OK")
213
if self.skip_count > 0:
214
skipped = self.skip_count
215
self.stream.writeln('%d test%s skipped' %
216
(skipped, skipped != 1 and "s" or ""))
218
for feature, count in sorted(self.unsupported.items()):
219
self.stream.writeln("Missing feature '%s' skipped %d tests." %
222
ok = self.wasStrictlySuccessful()
224
ok = self.wasSuccessful()
225
if TestCase._first_thread_leaker_id:
227
'%s is leaking threads among %d leaking tests.\n' % (
228
TestCase._first_thread_leaker_id,
229
TestCase._leaking_threads_tests))
230
# We don't report the main thread as an active one.
232
'%d non-main threads were left active in the end.\n'
233
% (TestCase._active_threads - 1))
235
def _extractBenchmarkTime(self, testCase, details=None):
236
"""Add a benchmark time for the current test case."""
237
if details and 'benchtime' in details:
238
return float(''.join(details['benchtime'].iter_bytes()))
239
return getattr(testCase, "_benchtime", None)
241
def _elapsedTestTimeString(self):
242
"""Return a time string for the overall time the current test has taken."""
243
return self._formatTime(time.time() - self._start_time)
245
def _testTimeString(self, testCase):
246
benchmark_time = self._extractBenchmarkTime(testCase)
247
if benchmark_time is not None:
248
return self._formatTime(benchmark_time) + "*"
250
return self._elapsedTestTimeString()
252
def _formatTime(self, seconds):
253
"""Format seconds as milliseconds with leading spaces."""
254
# some benchmarks can take thousands of seconds to run, so we need 8
256
return "%8dms" % (1000 * seconds)
258
def _shortened_test_description(self, test):
260
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
263
def startTest(self, test):
264
unittest.TestResult.startTest(self, test)
267
self.report_test_start(test)
268
test.number = self.count
269
self._recordTestStartTime()
271
def startTests(self):
273
if getattr(sys, 'frozen', None) is None:
274
bzr_path = osutils.realpath(sys.argv[0])
276
bzr_path = sys.executable
278
'bzr selftest: %s\n' % (bzr_path,))
281
bzrlib.__path__[0],))
283
' bzr-%s python-%s %s\n' % (
284
bzrlib.version_string,
285
bzrlib._format_version_tuple(sys.version_info),
286
platform.platform(aliased=1),
288
self.stream.write('\n')
290
def _recordTestStartTime(self):
291
"""Record that a test has started."""
292
self._start_time = time.time()
294
def _cleanupLogFile(self, test):
295
# We can only do this if we have one of our TestCases, not if
297
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
298
if setKeepLogfile is not None:
301
def addError(self, test, err):
302
"""Tell result that test finished with an error.
304
Called from the TestCase run() method when the test
305
fails with an unexpected error.
308
unittest.TestResult.addError(self, test, err)
309
self.error_count += 1
310
self.report_error(test, err)
313
self._cleanupLogFile(test)
315
def addFailure(self, test, err):
316
"""Tell result that test failed.
318
Called from the TestCase run() method when the test
319
fails because e.g. an assert() method failed.
322
unittest.TestResult.addFailure(self, test, err)
323
self.failure_count += 1
324
self.report_failure(test, err)
327
self._cleanupLogFile(test)
329
def addSuccess(self, test, details=None):
330
"""Tell result that test completed successfully.
332
Called from the TestCase run()
334
if self._bench_history is not None:
335
benchmark_time = self._extractBenchmarkTime(test, details)
336
if benchmark_time is not None:
337
self._bench_history.write("%s %s\n" % (
338
self._formatTime(benchmark_time),
340
self.report_success(test)
341
self._cleanupLogFile(test)
342
unittest.TestResult.addSuccess(self, test)
343
test._log_contents = ''
345
def addExpectedFailure(self, test, err):
346
self.known_failure_count += 1
347
self.report_known_failure(test, err)
349
def addNotSupported(self, test, feature):
350
"""The test will not be run because of a missing feature.
352
# this can be called in two different ways: it may be that the
353
# test started running, and then raised (through requireFeature)
354
# UnavailableFeature. Alternatively this method can be called
355
# while probing for features before running the test code proper; in
356
# that case we will see startTest and stopTest, but the test will
357
# never actually run.
358
self.unsupported.setdefault(str(feature), 0)
359
self.unsupported[str(feature)] += 1
360
self.report_unsupported(test, feature)
362
def addSkip(self, test, reason):
363
"""A test has not run for 'reason'."""
365
self.report_skip(test, reason)
367
def addNotApplicable(self, test, reason):
368
self.not_applicable_count += 1
369
self.report_not_applicable(test, reason)
371
def _post_mortem(self):
372
"""Start a PDB post mortem session."""
373
if os.environ.get('BZR_TEST_PDB', None):
374
import pdb;pdb.post_mortem()
376
def progress(self, offset, whence):
377
"""The test is adjusting the count of tests to run."""
378
if whence == SUBUNIT_SEEK_SET:
379
self.num_tests = offset
380
elif whence == SUBUNIT_SEEK_CUR:
381
self.num_tests += offset
383
raise errors.BzrError("Unknown whence %r" % whence)
385
def report_cleaning_up(self):
388
def startTestRun(self):
389
self.startTime = time.time()
391
def report_success(self, test):
394
def wasStrictlySuccessful(self):
395
if self.unsupported or self.known_failure_count:
397
return self.wasSuccessful()
400
class TextTestResult(ExtendedTestResult):
401
"""Displays progress and results of tests in text form"""
403
def __init__(self, stream, descriptions, verbosity,
408
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
409
bench_history, strict)
410
# We no longer pass them around, but just rely on the UIFactory stack
413
warnings.warn("Passing pb to TextTestResult is deprecated")
414
self.pb = self.ui.nested_progress_bar()
415
self.pb.show_pct = False
416
self.pb.show_spinner = False
417
self.pb.show_eta = False,
418
self.pb.show_count = False
419
self.pb.show_bar = False
420
self.pb.update_latency = 0
421
self.pb.show_transport_activity = False
423
def stopTestRun(self):
424
# called when the tests that are going to run have run
427
super(TextTestResult, self).stopTestRun()
429
def startTestRun(self):
430
super(TextTestResult, self).startTestRun()
431
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
433
def printErrors(self):
434
# clear the pb to make room for the error listing
436
super(TextTestResult, self).printErrors()
438
def _progress_prefix_text(self):
439
# the longer this text, the less space we have to show the test
441
a = '[%d' % self.count # total that have been run
442
# tests skipped as known not to be relevant are not important enough
444
## if self.skip_count:
445
## a += ', %d skip' % self.skip_count
446
## if self.known_failure_count:
447
## a += '+%dX' % self.known_failure_count
449
a +='/%d' % self.num_tests
451
runtime = time.time() - self._overall_start_time
453
a += '%dm%ds' % (runtime / 60, runtime % 60)
456
total_fail_count = self.error_count + self.failure_count
458
a += ', %d failed' % total_fail_count
459
# if self.unsupported:
460
# a += ', %d missing' % len(self.unsupported)
464
def report_test_start(self, test):
467
self._progress_prefix_text()
469
+ self._shortened_test_description(test))
471
def _test_description(self, test):
472
return self._shortened_test_description(test)
474
def report_error(self, test, err):
475
ui.ui_factory.note('ERROR: %s\n %s\n' % (
476
self._test_description(test),
480
def report_failure(self, test, err):
481
ui.ui_factory.note('FAIL: %s\n %s\n' % (
482
self._test_description(test),
486
def report_known_failure(self, test, err):
489
def report_skip(self, test, reason):
492
def report_not_applicable(self, test, reason):
495
def report_unsupported(self, test, feature):
496
"""test cannot be run because feature is missing."""
498
def report_cleaning_up(self):
499
self.pb.update('Cleaning up')
502
class VerboseTestResult(ExtendedTestResult):
503
"""Produce long output, with one line per test run plus times"""
505
def _ellipsize_to_right(self, a_string, final_width):
506
"""Truncate and pad a string, keeping the right hand side"""
507
if len(a_string) > final_width:
508
result = '...' + a_string[3-final_width:]
511
return result.ljust(final_width)
513
def startTestRun(self):
514
super(VerboseTestResult, self).startTestRun()
515
self.stream.write('running %d tests...\n' % self.num_tests)
517
def report_test_start(self, test):
519
name = self._shortened_test_description(test)
520
width = osutils.terminal_width()
521
if width is not None:
522
# width needs space for 6 char status, plus 1 for slash, plus an
523
# 11-char time string, plus a trailing blank
524
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
526
self.stream.write(self._ellipsize_to_right(name, width-18))
528
self.stream.write(name)
531
def _error_summary(self, err):
533
return '%s%s' % (indent, err[1])
535
def report_error(self, test, err):
536
self.stream.writeln('ERROR %s\n%s'
537
% (self._testTimeString(test),
538
self._error_summary(err)))
540
def report_failure(self, test, err):
541
self.stream.writeln(' FAIL %s\n%s'
542
% (self._testTimeString(test),
543
self._error_summary(err)))
545
def report_known_failure(self, test, err):
546
self.stream.writeln('XFAIL %s\n%s'
547
% (self._testTimeString(test),
548
self._error_summary(err)))
550
def report_success(self, test):
551
self.stream.writeln(' OK %s' % self._testTimeString(test))
552
for bench_called, stats in getattr(test, '_benchcalls', []):
553
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
554
stats.pprint(file=self.stream)
555
# flush the stream so that we get smooth output. This verbose mode is
556
# used to show the output in PQM.
559
def report_skip(self, test, reason):
560
self.stream.writeln(' SKIP %s\n%s'
561
% (self._testTimeString(test), reason))
563
def report_not_applicable(self, test, reason):
564
self.stream.writeln(' N/A %s\n %s'
565
% (self._testTimeString(test), reason))
567
def report_unsupported(self, test, feature):
568
"""test cannot be run because feature is missing."""
569
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
570
%(self._testTimeString(test), feature))
573
class TextTestRunner(object):
574
stop_on_failure = False
582
result_decorators=None,
584
"""Create a TextTestRunner.
586
:param result_decorators: An optional list of decorators to apply
587
to the result object being used by the runner. Decorators are
588
applied left to right - the first element in the list is the
591
# stream may know claim to know to write unicode strings, but in older
592
# pythons this goes sufficiently wrong that it is a bad idea. (
593
# specifically a built in file with encoding 'UTF-8' will still try
594
# to encode using ascii.
595
new_encoding = osutils.get_terminal_encoding()
596
codec = codecs.lookup(new_encoding)
597
if type(codec) is tuple:
601
encode = codec.encode
602
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
603
stream.encoding = new_encoding
604
self.stream = unittest._WritelnDecorator(stream)
605
self.descriptions = descriptions
606
self.verbosity = verbosity
607
self._bench_history = bench_history
608
self._strict = strict
609
self._result_decorators = result_decorators or []
612
"Run the given test case or test suite."
613
if self.verbosity == 1:
614
result_class = TextTestResult
615
elif self.verbosity >= 2:
616
result_class = VerboseTestResult
617
original_result = result_class(self.stream,
620
bench_history=self._bench_history,
623
# Signal to result objects that look at stop early policy to stop,
624
original_result.stop_early = self.stop_on_failure
625
result = original_result
626
for decorator in self._result_decorators:
627
result = decorator(result)
628
result.stop_early = self.stop_on_failure
629
result.startTestRun()
634
# higher level code uses our extended protocol to determine
635
# what exit code to give.
636
return original_result
639
def iter_suite_tests(suite):
640
"""Return all tests in a suite, recursing through nested suites"""
641
if isinstance(suite, unittest.TestCase):
643
elif isinstance(suite, unittest.TestSuite):
645
for r in iter_suite_tests(item):
648
raise Exception('unknown type %r for object %r'
649
% (type(suite), suite))
652
TestSkipped = testtools.testcase.TestSkipped
655
class TestNotApplicable(TestSkipped):
656
"""A test is not applicable to the situation where it was run.
658
This is only normally raised by parameterized tests, if they find that
659
the instance they're constructed upon does not support one aspect
664
# traceback._some_str fails to format exceptions that have the default
665
# __str__ which does an implicit ascii conversion. However, repr() on those
666
# objects works, for all that its not quite what the doctor may have ordered.
667
def _clever_some_str(value):
672
return repr(value).replace('\\n', '\n')
674
return '<unprintable %s object>' % type(value).__name__
676
traceback._some_str = _clever_some_str
679
# deprecated - use self.knownFailure(), or self.expectFailure.
680
KnownFailure = testtools.testcase._ExpectedFailure
683
class UnavailableFeature(Exception):
684
"""A feature required for this test was not available.
686
This can be considered a specialised form of SkippedTest.
688
The feature should be used to construct the exception.
692
class CommandFailed(Exception):
696
class StringIOWrapper(object):
697
"""A wrapper around cStringIO which just adds an encoding attribute.
699
Internally we can check sys.stdout to see what the output encoding
700
should be. However, cStringIO has no encoding attribute that we can
701
set. So we wrap it instead.
706
def __init__(self, s=None):
708
self.__dict__['_cstring'] = StringIO(s)
710
self.__dict__['_cstring'] = StringIO()
712
def __getattr__(self, name, getattr=getattr):
713
return getattr(self.__dict__['_cstring'], name)
715
def __setattr__(self, name, val):
716
if name == 'encoding':
717
self.__dict__['encoding'] = val
719
return setattr(self._cstring, name, val)
722
class TestUIFactory(TextUIFactory):
723
"""A UI Factory for testing.
725
Hide the progress bar but emit note()s.
727
Allows get_password to be tested without real tty attached.
729
See also CannedInputUIFactory which lets you provide programmatic input in
732
# TODO: Capture progress events at the model level and allow them to be
733
# observed by tests that care.
735
# XXX: Should probably unify more with CannedInputUIFactory or a
736
# particular configuration of TextUIFactory, or otherwise have a clearer
737
# idea of how they're supposed to be different.
738
# See https://bugs.edge.launchpad.net/bzr/+bug/408213
740
def __init__(self, stdout=None, stderr=None, stdin=None):
741
if stdin is not None:
742
# We use a StringIOWrapper to be able to test various
743
# encodings, but the user is still responsible to
744
# encode the string and to set the encoding attribute
745
# of StringIOWrapper.
746
stdin = StringIOWrapper(stdin)
747
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
749
def get_non_echoed_password(self):
750
"""Get password from stdin without trying to handle the echo mode"""
751
password = self.stdin.readline()
754
if password[-1] == '\n':
755
password = password[:-1]
758
def make_progress_view(self):
759
return NullProgressView()
762
class TestCase(testtools.TestCase):
763
"""Base class for bzr unit tests.
765
Tests that need access to disk resources should subclass
766
TestCaseInTempDir not TestCase.
768
Error and debug log messages are redirected from their usual
769
location into a temporary file, the contents of which can be
770
retrieved by _get_log(). We use a real OS file, not an in-memory object,
771
so that it can also capture file IO. When the test completes this file
772
is read into memory and removed from disk.
774
There are also convenience functions to invoke bzr's command-line
775
routine, and to build and check bzr trees.
777
In addition to the usual method of overriding tearDown(), this class also
778
allows subclasses to register functions into the _cleanups list, which is
779
run in order as the object is torn down. It's less likely this will be
780
accidentally overlooked.
783
_active_threads = None
784
_leaking_threads_tests = 0
785
_first_thread_leaker_id = None
786
_log_file_name = None
787
# record lsprof data when performing benchmark calls.
788
_gather_lsprof_in_benchmarks = False
790
def __init__(self, methodName='testMethod'):
791
super(TestCase, self).__init__(methodName)
793
self._directory_isolation = True
794
self.exception_handlers.insert(0,
795
(UnavailableFeature, self._do_unsupported_or_skip))
796
self.exception_handlers.insert(0,
797
(TestNotApplicable, self._do_not_applicable))
800
super(TestCase, self).setUp()
801
for feature in getattr(self, '_test_needs_features', []):
802
self.requireFeature(feature)
803
self._log_contents = None
804
self.addDetail("log", content.Content(content.ContentType("text",
805
"plain", {"charset": "utf8"}),
806
lambda:[self._get_log(keep_log_file=True)]))
807
self._cleanEnvironment()
810
self._benchcalls = []
811
self._benchtime = None
813
self._track_transports()
815
self._clear_debug_flags()
816
TestCase._active_threads = threading.activeCount()
817
self.addCleanup(self._check_leaked_threads)
822
pdb.Pdb().set_trace(sys._getframe().f_back)
824
def _check_leaked_threads(self):
825
active = threading.activeCount()
826
leaked_threads = active - TestCase._active_threads
827
TestCase._active_threads = active
828
# If some tests make the number of threads *decrease*, we'll consider
829
# that they are just observing old threads dieing, not agressively kill
830
# random threads. So we don't report these tests as leaking. The risk
831
# is that we have false positives that way (the test see 2 threads
832
# going away but leak one) but it seems less likely than the actual
833
# false positives (the test see threads going away and does not leak).
834
if leaked_threads > 0:
835
TestCase._leaking_threads_tests += 1
836
if TestCase._first_thread_leaker_id is None:
837
TestCase._first_thread_leaker_id = self.id()
839
def _clear_debug_flags(self):
840
"""Prevent externally set debug flags affecting tests.
842
Tests that want to use debug flags can just set them in the
843
debug_flags set during setup/teardown.
845
self._preserved_debug_flags = set(debug.debug_flags)
846
if 'allow_debug' not in selftest_debug_flags:
847
debug.debug_flags.clear()
848
if 'disable_lock_checks' not in selftest_debug_flags:
849
debug.debug_flags.add('strict_locks')
850
self.addCleanup(self._restore_debug_flags)
852
def _clear_hooks(self):
853
# prevent hooks affecting tests
854
self._preserved_hooks = {}
855
for key, factory in hooks.known_hooks.items():
856
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
857
current_hooks = hooks.known_hooks_key_to_object(key)
858
self._preserved_hooks[parent] = (name, current_hooks)
859
self.addCleanup(self._restoreHooks)
860
for key, factory in hooks.known_hooks.items():
861
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
862
setattr(parent, name, factory())
863
# this hook should always be installed
864
request._install_hook()
866
def disable_directory_isolation(self):
867
"""Turn off directory isolation checks."""
868
self._directory_isolation = False
870
def enable_directory_isolation(self):
871
"""Enable directory isolation checks."""
872
self._directory_isolation = True
874
def _silenceUI(self):
875
"""Turn off UI for duration of test"""
876
# by default the UI is off; tests can turn it on if they want it.
877
saved = ui.ui_factory
879
ui.ui_factory = saved
880
ui.ui_factory = ui.SilentUIFactory()
881
self.addCleanup(_restore)
883
def _check_locks(self):
884
"""Check that all lock take/release actions have been paired."""
885
# We always check for mismatched locks. If a mismatch is found, we
886
# fail unless -Edisable_lock_checks is supplied to selftest, in which
887
# case we just print a warning.
889
acquired_locks = [lock for action, lock in self._lock_actions
890
if action == 'acquired']
891
released_locks = [lock for action, lock in self._lock_actions
892
if action == 'released']
893
broken_locks = [lock for action, lock in self._lock_actions
894
if action == 'broken']
895
# trivially, given the tests for lock acquistion and release, if we
896
# have as many in each list, it should be ok. Some lock tests also
897
# break some locks on purpose and should be taken into account by
898
# considering that breaking a lock is just a dirty way of releasing it.
899
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
900
message = ('Different number of acquired and '
901
'released or broken locks. (%s, %s + %s)' %
902
(acquired_locks, released_locks, broken_locks))
903
if not self._lock_check_thorough:
904
# Rather than fail, just warn
905
print "Broken test %s: %s" % (self, message)
909
def _track_locks(self):
910
"""Track lock activity during tests."""
911
self._lock_actions = []
912
if 'disable_lock_checks' in selftest_debug_flags:
913
self._lock_check_thorough = False
915
self._lock_check_thorough = True
917
self.addCleanup(self._check_locks)
918
_mod_lock.Lock.hooks.install_named_hook('lock_acquired',
919
self._lock_acquired, None)
920
_mod_lock.Lock.hooks.install_named_hook('lock_released',
921
self._lock_released, None)
922
_mod_lock.Lock.hooks.install_named_hook('lock_broken',
923
self._lock_broken, None)
925
def _lock_acquired(self, result):
926
self._lock_actions.append(('acquired', result))
928
def _lock_released(self, result):
929
self._lock_actions.append(('released', result))
931
def _lock_broken(self, result):
932
self._lock_actions.append(('broken', result))
934
def permit_dir(self, name):
935
"""Permit a directory to be used by this test. See permit_url."""
936
name_transport = get_transport(name)
937
self.permit_url(name)
938
self.permit_url(name_transport.base)
940
def permit_url(self, url):
941
"""Declare that url is an ok url to use in this test.
943
Do this for memory transports, temporary test directory etc.
945
Do not do this for the current working directory, /tmp, or any other
946
preexisting non isolated url.
948
if not url.endswith('/'):
950
self._bzr_selftest_roots.append(url)
952
def permit_source_tree_branch_repo(self):
953
"""Permit the source tree bzr is running from to be opened.
955
Some code such as bzrlib.version attempts to read from the bzr branch
956
that bzr is executing from (if any). This method permits that directory
957
to be used in the test suite.
959
path = self.get_source_path()
960
self.record_directory_isolation()
963
workingtree.WorkingTree.open(path)
964
except (errors.NotBranchError, errors.NoWorkingTree):
967
self.enable_directory_isolation()
969
def _preopen_isolate_transport(self, transport):
970
"""Check that all transport openings are done in the test work area."""
971
while isinstance(transport, pathfilter.PathFilteringTransport):
972
# Unwrap pathfiltered transports
973
transport = transport.server.backing_transport.clone(
974
transport._filter('.'))
976
# ReadonlySmartTCPServer_for_testing decorates the backing transport
977
# urls it is given by prepending readonly+. This is appropriate as the
978
# client shouldn't know that the server is readonly (or not readonly).
979
# We could register all servers twice, with readonly+ prepending, but
980
# that makes for a long list; this is about the same but easier to
982
if url.startswith('readonly+'):
983
url = url[len('readonly+'):]
984
self._preopen_isolate_url(url)
986
def _preopen_isolate_url(self, url):
987
if not self._directory_isolation:
989
if self._directory_isolation == 'record':
990
self._bzr_selftest_roots.append(url)
992
# This prevents all transports, including e.g. sftp ones backed on disk
993
# from working unless they are explicitly granted permission. We then
994
# depend on the code that sets up test transports to check that they are
995
# appropriately isolated and enable their use by calling
996
# self.permit_transport()
997
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
998
raise errors.BzrError("Attempt to escape test isolation: %r %r"
999
% (url, self._bzr_selftest_roots))
1001
def record_directory_isolation(self):
1002
"""Gather accessed directories to permit later access.
1004
This is used for tests that access the branch bzr is running from.
1006
self._directory_isolation = "record"
1008
def start_server(self, transport_server, backing_server=None):
1009
"""Start transport_server for this test.
1011
This starts the server, registers a cleanup for it and permits the
1012
server's urls to be used.
1014
if backing_server is None:
1015
transport_server.setUp()
1017
transport_server.setUp(backing_server)
1018
self.addCleanup(transport_server.tearDown)
1019
# Obtain a real transport because if the server supplies a password, it
1020
# will be hidden from the base on the client side.
1021
t = get_transport(transport_server.get_url())
1022
# Some transport servers effectively chroot the backing transport;
1023
# others like SFTPServer don't - users of the transport can walk up the
1024
# transport to read the entire backing transport. This wouldn't matter
1025
# except that the workdir tests are given - and that they expect the
1026
# server's url to point at - is one directory under the safety net. So
1027
# Branch operations into the transport will attempt to walk up one
1028
# directory. Chrooting all servers would avoid this but also mean that
1029
# we wouldn't be testing directly against non-root urls. Alternatively
1030
# getting the test framework to start the server with a backing server
1031
# at the actual safety net directory would work too, but this then
1032
# means that the self.get_url/self.get_transport methods would need
1033
# to transform all their results. On balance its cleaner to handle it
1034
# here, and permit a higher url when we have one of these transports.
1035
if t.base.endswith('/work/'):
1036
# we have safety net/test root/work
1037
t = t.clone('../..')
1038
elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1039
# The smart server adds a path similar to work, which is traversed
1040
# up from by the client. But the server is chrooted - the actual
1041
# backing transport is not escaped from, and VFS requests to the
1042
# root will error (because they try to escape the chroot).
1044
while t2.base != t.base:
1047
self.permit_url(t.base)
1049
def _track_transports(self):
1050
"""Install checks for transport usage."""
1051
# TestCase has no safe place it can write to.
1052
self._bzr_selftest_roots = []
1053
# Currently the easiest way to be sure that nothing is going on is to
1054
# hook into bzr dir opening. This leaves a small window of error for
1055
# transport tests, but they are well known, and we can improve on this
1057
bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1058
self._preopen_isolate_transport, "Check bzr directories are safe.")
1060
def _ndiff_strings(self, a, b):
1061
"""Return ndiff between two strings containing lines.
1063
A trailing newline is added if missing to make the strings
1065
if b and b[-1] != '\n':
1067
if a and a[-1] != '\n':
1069
difflines = difflib.ndiff(a.splitlines(True),
1071
linejunk=lambda x: False,
1072
charjunk=lambda x: False)
1073
return ''.join(difflines)
1075
def assertEqual(self, a, b, message=''):
1079
except UnicodeError, e:
1080
# If we can't compare without getting a UnicodeError, then
1081
# obviously they are different
1082
mutter('UnicodeError: %s', e)
1085
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1087
pformat(a), pformat(b)))
1089
assertEquals = assertEqual
1091
def assertEqualDiff(self, a, b, message=None):
1092
"""Assert two texts are equal, if not raise an exception.
1094
This is intended for use with multi-line strings where it can
1095
be hard to find the differences by eye.
1097
# TODO: perhaps override assertEquals to call this for strings?
1101
message = "texts not equal:\n"
1103
message = 'first string is missing a final newline.\n'
1105
message = 'second string is missing a final newline.\n'
1106
raise AssertionError(message +
1107
self._ndiff_strings(a, b))
1109
def assertEqualMode(self, mode, mode_test):
1110
self.assertEqual(mode, mode_test,
1111
'mode mismatch %o != %o' % (mode, mode_test))
1113
def assertEqualStat(self, expected, actual):
1114
"""assert that expected and actual are the same stat result.
1116
:param expected: A stat result.
1117
:param actual: A stat result.
1118
:raises AssertionError: If the expected and actual stat values differ
1119
other than by atime.
1121
self.assertEqual(expected.st_size, actual.st_size,
1122
'st_size did not match')
1123
self.assertEqual(expected.st_mtime, actual.st_mtime,
1124
'st_mtime did not match')
1125
self.assertEqual(expected.st_ctime, actual.st_ctime,
1126
'st_ctime did not match')
1127
if sys.platform != 'win32':
1128
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1129
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1130
# odd. Regardless we shouldn't actually try to assert anything
1131
# about their values
1132
self.assertEqual(expected.st_dev, actual.st_dev,
1133
'st_dev did not match')
1134
self.assertEqual(expected.st_ino, actual.st_ino,
1135
'st_ino did not match')
1136
self.assertEqual(expected.st_mode, actual.st_mode,
1137
'st_mode did not match')
1139
def assertLength(self, length, obj_with_len):
1140
"""Assert that obj_with_len is of length length."""
1141
if len(obj_with_len) != length:
1142
self.fail("Incorrect length: wanted %d, got %d for %r" % (
1143
length, len(obj_with_len), obj_with_len))
1145
def assertLogsError(self, exception_class, func, *args, **kwargs):
1146
"""Assert that func(*args, **kwargs) quietly logs a specific exception.
1148
from bzrlib import trace
1150
orig_log_exception_quietly = trace.log_exception_quietly
1153
orig_log_exception_quietly()
1154
captured.append(sys.exc_info())
1155
trace.log_exception_quietly = capture
1156
func(*args, **kwargs)
1158
trace.log_exception_quietly = orig_log_exception_quietly
1159
self.assertLength(1, captured)
1160
err = captured[0][1]
1161
self.assertIsInstance(err, exception_class)
1164
def assertPositive(self, val):
1165
"""Assert that val is greater than 0."""
1166
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1168
def assertNegative(self, val):
1169
"""Assert that val is less than 0."""
1170
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1172
def assertStartsWith(self, s, prefix):
1173
if not s.startswith(prefix):
1174
raise AssertionError('string %r does not start with %r' % (s, prefix))
1176
def assertEndsWith(self, s, suffix):
1177
"""Asserts that s ends with suffix."""
1178
if not s.endswith(suffix):
1179
raise AssertionError('string %r does not end with %r' % (s, suffix))
1181
def assertContainsRe(self, haystack, needle_re, flags=0):
1182
"""Assert that a contains something matching a regular expression."""
1183
if not re.search(needle_re, haystack, flags):
1184
if '\n' in haystack or len(haystack) > 60:
1185
# a long string, format it in a more readable way
1186
raise AssertionError(
1187
'pattern "%s" not found in\n"""\\\n%s"""\n'
1188
% (needle_re, haystack))
1190
raise AssertionError('pattern "%s" not found in "%s"'
1191
% (needle_re, haystack))
1193
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1194
"""Assert that a does not match a regular expression"""
1195
if re.search(needle_re, haystack, flags):
1196
raise AssertionError('pattern "%s" found in "%s"'
1197
% (needle_re, haystack))
1199
def assertSubset(self, sublist, superlist):
1200
"""Assert that every entry in sublist is present in superlist."""
1201
missing = set(sublist) - set(superlist)
1202
if len(missing) > 0:
1203
raise AssertionError("value(s) %r not present in container %r" %
1204
(missing, superlist))
1206
def assertListRaises(self, excClass, func, *args, **kwargs):
1207
"""Fail unless excClass is raised when the iterator from func is used.
1209
Many functions can return generators this makes sure
1210
to wrap them in a list() call to make sure the whole generator
1211
is run, and that the proper exception is raised.
1214
list(func(*args, **kwargs))
1218
if getattr(excClass,'__name__', None) is not None:
1219
excName = excClass.__name__
1221
excName = str(excClass)
1222
raise self.failureException, "%s not raised" % excName
1224
def assertRaises(self, excClass, callableObj, *args, **kwargs):
1225
"""Assert that a callable raises a particular exception.
1227
:param excClass: As for the except statement, this may be either an
1228
exception class, or a tuple of classes.
1229
:param callableObj: A callable, will be passed ``*args`` and
1232
Returns the exception so that you can examine it.
1235
callableObj(*args, **kwargs)
1239
if getattr(excClass,'__name__', None) is not None:
1240
excName = excClass.__name__
1243
excName = str(excClass)
1244
raise self.failureException, "%s not raised" % excName
1246
def assertIs(self, left, right, message=None):
1247
if not (left is right):
1248
if message is not None:
1249
raise AssertionError(message)
1251
raise AssertionError("%r is not %r." % (left, right))
1253
def assertIsNot(self, left, right, message=None):
1255
if message is not None:
1256
raise AssertionError(message)
1258
raise AssertionError("%r is %r." % (left, right))
1260
def assertTransportMode(self, transport, path, mode):
1261
"""Fail if a path does not have mode "mode".
1263
If modes are not supported on this transport, the assertion is ignored.
1265
if not transport._can_roundtrip_unix_modebits():
1267
path_stat = transport.stat(path)
1268
actual_mode = stat.S_IMODE(path_stat.st_mode)
1269
self.assertEqual(mode, actual_mode,
1270
'mode of %r incorrect (%s != %s)'
1271
% (path, oct(mode), oct(actual_mode)))
1273
def assertIsSameRealPath(self, path1, path2):
1274
"""Fail if path1 and path2 points to different files"""
1275
self.assertEqual(osutils.realpath(path1),
1276
osutils.realpath(path2),
1277
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1279
def assertIsInstance(self, obj, kls, msg=None):
1280
"""Fail if obj is not an instance of kls
1282
:param msg: Supplementary message to show if the assertion fails.
1284
if not isinstance(obj, kls):
1285
m = "%r is an instance of %s rather than %s" % (
1286
obj, obj.__class__, kls)
1291
def assertFileEqual(self, content, path):
1292
"""Fail if path does not contain 'content'."""
1293
self.failUnlessExists(path)
1294
f = file(path, 'rb')
1299
self.assertEqualDiff(content, s)
1301
def failUnlessExists(self, path):
1302
"""Fail unless path or paths, which may be abs or relative, exist."""
1303
if not isinstance(path, basestring):
1305
self.failUnlessExists(p)
1307
self.failUnless(osutils.lexists(path),path+" does not exist")
1309
def failIfExists(self, path):
1310
"""Fail if path or paths, which may be abs or relative, exist."""
1311
if not isinstance(path, basestring):
1313
self.failIfExists(p)
1315
self.failIf(osutils.lexists(path),path+" exists")
1317
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1318
"""A helper for callDeprecated and applyDeprecated.
1320
:param a_callable: A callable to call.
1321
:param args: The positional arguments for the callable
1322
:param kwargs: The keyword arguments for the callable
1323
:return: A tuple (warnings, result). result is the result of calling
1324
a_callable(``*args``, ``**kwargs``).
1327
def capture_warnings(msg, cls=None, stacklevel=None):
1328
# we've hooked into a deprecation specific callpath,
1329
# only deprecations should getting sent via it.
1330
self.assertEqual(cls, DeprecationWarning)
1331
local_warnings.append(msg)
1332
original_warning_method = symbol_versioning.warn
1333
symbol_versioning.set_warning_method(capture_warnings)
1335
result = a_callable(*args, **kwargs)
1337
symbol_versioning.set_warning_method(original_warning_method)
1338
return (local_warnings, result)
1340
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1341
"""Call a deprecated callable without warning the user.
1343
Note that this only captures warnings raised by symbol_versioning.warn,
1344
not other callers that go direct to the warning module.
1346
To test that a deprecated method raises an error, do something like
1349
self.assertRaises(errors.ReservedId,
1350
self.applyDeprecated,
1351
deprecated_in((1, 5, 0)),
1355
:param deprecation_format: The deprecation format that the callable
1356
should have been deprecated with. This is the same type as the
1357
parameter to deprecated_method/deprecated_function. If the
1358
callable is not deprecated with this format, an assertion error
1360
:param a_callable: A callable to call. This may be a bound method or
1361
a regular function. It will be called with ``*args`` and
1363
:param args: The positional arguments for the callable
1364
:param kwargs: The keyword arguments for the callable
1365
:return: The result of a_callable(``*args``, ``**kwargs``)
1367
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1369
expected_first_warning = symbol_versioning.deprecation_string(
1370
a_callable, deprecation_format)
1371
if len(call_warnings) == 0:
1372
self.fail("No deprecation warning generated by call to %s" %
1374
self.assertEqual(expected_first_warning, call_warnings[0])
1377
def callCatchWarnings(self, fn, *args, **kw):
1378
"""Call a callable that raises python warnings.
1380
The caller's responsible for examining the returned warnings.
1382
If the callable raises an exception, the exception is not
1383
caught and propagates up to the caller. In that case, the list
1384
of warnings is not available.
1386
:returns: ([warning_object, ...], fn_result)
1388
# XXX: This is not perfect, because it completely overrides the
1389
# warnings filters, and some code may depend on suppressing particular
1390
# warnings. It's the easiest way to insulate ourselves from -Werror,
1391
# though. -- Andrew, 20071062
1393
def _catcher(message, category, filename, lineno, file=None, line=None):
1394
# despite the name, 'message' is normally(?) a Warning subclass
1396
wlist.append(message)
1397
saved_showwarning = warnings.showwarning
1398
saved_filters = warnings.filters
1400
warnings.showwarning = _catcher
1401
warnings.filters = []
1402
result = fn(*args, **kw)
1404
warnings.showwarning = saved_showwarning
1405
warnings.filters = saved_filters
1406
return wlist, result
1408
def callDeprecated(self, expected, callable, *args, **kwargs):
1409
"""Assert that a callable is deprecated in a particular way.
1411
This is a very precise test for unusual requirements. The
1412
applyDeprecated helper function is probably more suited for most tests
1413
as it allows you to simply specify the deprecation format being used
1414
and will ensure that that is issued for the function being called.
1416
Note that this only captures warnings raised by symbol_versioning.warn,
1417
not other callers that go direct to the warning module. To catch
1418
general warnings, use callCatchWarnings.
1420
:param expected: a list of the deprecation warnings expected, in order
1421
:param callable: The callable to call
1422
:param args: The positional arguments for the callable
1423
:param kwargs: The keyword arguments for the callable
1425
call_warnings, result = self._capture_deprecation_warnings(callable,
1427
self.assertEqual(expected, call_warnings)
1430
def _startLogFile(self):
1431
"""Send bzr and test log messages to a temporary file.
1433
The file is removed as the test is torn down.
1435
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1436
self._log_file = os.fdopen(fileno, 'w+')
1437
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1438
self._log_file_name = name
1439
self.addCleanup(self._finishLogFile)
1441
def _finishLogFile(self):
1442
"""Finished with the log file.
1444
Close the file and delete it, unless setKeepLogfile was called.
1446
if bzrlib.trace._trace_file:
1447
# flush the log file, to get all content
1448
bzrlib.trace._trace_file.flush()
1449
bzrlib.trace.pop_log_file(self._log_memento)
1450
# Cache the log result and delete the file on disk
1451
self._get_log(False)
1453
def thisFailsStrictLockCheck(self):
1454
"""It is known that this test would fail with -Dstrict_locks.
1456
By default, all tests are run with strict lock checking unless
1457
-Edisable_lock_checks is supplied. However there are some tests which
1458
we know fail strict locks at this point that have not been fixed.
1459
They should call this function to disable the strict checking.
1461
This should be used sparingly, it is much better to fix the locking
1462
issues rather than papering over the problem by calling this function.
1464
debug.debug_flags.discard('strict_locks')
1466
def addCleanup(self, callable, *args, **kwargs):
1467
"""Arrange to run a callable when this case is torn down.
1469
Callables are run in the reverse of the order they are registered,
1470
ie last-in first-out.
1472
self._cleanups.append((callable, args, kwargs))
1474
def _cleanEnvironment(self):
1476
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1477
'HOME': os.getcwd(),
1478
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1479
# tests do check our impls match APPDATA
1480
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1484
'BZREMAIL': None, # may still be present in the environment
1486
'BZR_PROGRESS_BAR': None,
1488
'BZR_PLUGIN_PATH': None,
1489
'BZR_CONCURRENCY': None,
1490
# Make sure that any text ui tests are consistent regardless of
1491
# the environment the test case is run in; you may want tests that
1492
# test other combinations. 'dumb' is a reasonable guess for tests
1493
# going to a pipe or a StringIO.
1497
'BZR_COLUMNS': '80',
1499
'SSH_AUTH_SOCK': None,
1503
'https_proxy': None,
1504
'HTTPS_PROXY': None,
1509
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1510
# least. If you do (care), please update this comment
1514
'BZR_REMOTE_PATH': None,
1517
self.addCleanup(self._restoreEnvironment)
1518
for name, value in new_env.iteritems():
1519
self._captureVar(name, value)
1521
def _captureVar(self, name, newvalue):
1522
"""Set an environment variable, and reset it when finished."""
1523
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1525
def _restore_debug_flags(self):
1526
debug.debug_flags.clear()
1527
debug.debug_flags.update(self._preserved_debug_flags)
1529
def _restoreEnvironment(self):
1530
for name, value in self.__old_env.iteritems():
1531
osutils.set_or_unset_env(name, value)
1533
def _restoreHooks(self):
1534
for klass, (name, hooks) in self._preserved_hooks.items():
1535
setattr(klass, name, hooks)
1537
def knownFailure(self, reason):
1538
"""This test has failed for some known reason."""
1539
raise KnownFailure(reason)
1541
def _do_skip(self, result, reason):
1542
addSkip = getattr(result, 'addSkip', None)
1543
if not callable(addSkip):
1544
result.addSuccess(result)
1546
addSkip(self, reason)
1549
def _do_known_failure(self, result, e):
1550
err = sys.exc_info()
1551
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1552
if addExpectedFailure is not None:
1553
addExpectedFailure(self, err)
1555
result.addSuccess(self)
1558
def _do_not_applicable(self, result, e):
1560
reason = 'No reason given'
1563
addNotApplicable = getattr(result, 'addNotApplicable', None)
1564
if addNotApplicable is not None:
1565
result.addNotApplicable(self, reason)
1567
self._do_skip(result, reason)
1570
def _do_unsupported_or_skip(self, result, e):
1572
addNotSupported = getattr(result, 'addNotSupported', None)
1573
if addNotSupported is not None:
1574
result.addNotSupported(self, reason)
1576
self._do_skip(result, reason)
1578
def time(self, callable, *args, **kwargs):
1579
"""Run callable and accrue the time it takes to the benchmark time.
1581
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1582
this will cause lsprofile statistics to be gathered and stored in
1585
if self._benchtime is None:
1586
self.addDetail('benchtime', content.Content(content.ContentType(
1587
"text", "plain"), lambda:[str(self._benchtime)]))
1591
if not self._gather_lsprof_in_benchmarks:
1592
return callable(*args, **kwargs)
1594
# record this benchmark
1595
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1597
self._benchcalls.append(((callable, args, kwargs), stats))
1600
self._benchtime += time.time() - start
1602
def log(self, *args):
1605
def _get_log(self, keep_log_file=False):
1606
"""Internal helper to get the log from bzrlib.trace for this test.
1608
Please use self.getDetails, or self.get_log to access this in test case
1611
:param keep_log_file: When True, if the log is still a file on disk
1612
leave it as a file on disk. When False, if the log is still a file
1613
on disk, the log file is deleted and the log preserved as
1615
:return: A string containing the log.
1617
if self._log_contents is not None:
1619
self._log_contents.decode('utf8')
1620
except UnicodeDecodeError:
1621
unicodestr = self._log_contents.decode('utf8', 'replace')
1622
self._log_contents = unicodestr.encode('utf8')
1623
return self._log_contents
1625
if bzrlib.trace._trace_file:
1626
# flush the log file, to get all content
1627
bzrlib.trace._trace_file.flush()
1628
if self._log_file_name is not None:
1629
logfile = open(self._log_file_name)
1631
log_contents = logfile.read()
1635
log_contents.decode('utf8')
1636
except UnicodeDecodeError:
1637
unicodestr = log_contents.decode('utf8', 'replace')
1638
log_contents = unicodestr.encode('utf8')
1639
if not keep_log_file:
1640
self._log_file.close()
1641
self._log_file = None
1642
# Permit multiple calls to get_log until we clean it up in
1644
self._log_contents = log_contents
1646
os.remove(self._log_file_name)
1648
if sys.platform == 'win32' and e.errno == errno.EACCES:
1649
sys.stderr.write(('Unable to delete log file '
1650
' %r\n' % self._log_file_name))
1653
self._log_file_name = None
1656
return "No log file content and no log file name."
1659
"""Get a unicode string containing the log from bzrlib.trace.
1661
Undecodable characters are replaced.
1663
return u"".join(self.getDetails()['log'].iter_text())
1665
def requireFeature(self, feature):
1666
"""This test requires a specific feature is available.
1668
:raises UnavailableFeature: When feature is not available.
1670
if not feature.available():
1671
raise UnavailableFeature(feature)
1673
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1675
"""Run bazaar command line, splitting up a string command line."""
1676
if isinstance(args, basestring):
1677
# shlex don't understand unicode strings,
1678
# so args should be plain string (bialix 20070906)
1679
args = list(shlex.split(str(args)))
1680
return self._run_bzr_core(args, retcode=retcode,
1681
encoding=encoding, stdin=stdin, working_dir=working_dir,
1684
def _run_bzr_core(self, args, retcode, encoding, stdin,
1686
# Clear chk_map page cache, because the contents are likely to mask
1688
chk_map.clear_cache()
1689
if encoding is None:
1690
encoding = osutils.get_user_encoding()
1691
stdout = StringIOWrapper()
1692
stderr = StringIOWrapper()
1693
stdout.encoding = encoding
1694
stderr.encoding = encoding
1696
self.log('run bzr: %r', args)
1697
# FIXME: don't call into logging here
1698
handler = logging.StreamHandler(stderr)
1699
handler.setLevel(logging.INFO)
1700
logger = logging.getLogger('')
1701
logger.addHandler(handler)
1702
old_ui_factory = ui.ui_factory
1703
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1706
if working_dir is not None:
1707
cwd = osutils.getcwd()
1708
os.chdir(working_dir)
1712
result = self.apply_redirected(ui.ui_factory.stdin,
1714
bzrlib.commands.run_bzr_catch_user_errors,
1716
except KeyboardInterrupt:
1717
# Reraise KeyboardInterrupt with contents of redirected stdout
1718
# and stderr as arguments, for tests which are interested in
1719
# stdout and stderr and are expecting the exception.
1720
out = stdout.getvalue()
1721
err = stderr.getvalue()
1723
self.log('output:\n%r', out)
1725
self.log('errors:\n%r', err)
1726
raise KeyboardInterrupt(out, err)
1728
logger.removeHandler(handler)
1729
ui.ui_factory = old_ui_factory
1733
out = stdout.getvalue()
1734
err = stderr.getvalue()
1736
self.log('output:\n%r', out)
1738
self.log('errors:\n%r', err)
1739
if retcode is not None:
1740
self.assertEquals(retcode, result,
1741
message='Unexpected return code')
1742
return result, out, err
1744
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1745
working_dir=None, error_regexes=[], output_encoding=None):
1746
"""Invoke bzr, as if it were run from the command line.
1748
The argument list should not include the bzr program name - the
1749
first argument is normally the bzr command. Arguments may be
1750
passed in three ways:
1752
1- A list of strings, eg ["commit", "a"]. This is recommended
1753
when the command contains whitespace or metacharacters, or
1754
is built up at run time.
1756
2- A single string, eg "add a". This is the most convenient
1757
for hardcoded commands.
1759
This runs bzr through the interface that catches and reports
1760
errors, and with logging set to something approximating the
1761
default, so that error reporting can be checked.
1763
This should be the main method for tests that want to exercise the
1764
overall behavior of the bzr application (rather than a unit test
1765
or a functional test of the library.)
1767
This sends the stdout/stderr results into the test's log,
1768
where it may be useful for debugging. See also run_captured.
1770
:keyword stdin: A string to be used as stdin for the command.
1771
:keyword retcode: The status code the command should return;
1773
:keyword working_dir: The directory to run the command in
1774
:keyword error_regexes: A list of expected error messages. If
1775
specified they must be seen in the error output of the command.
1777
retcode, out, err = self._run_bzr_autosplit(
1782
working_dir=working_dir,
1784
self.assertIsInstance(error_regexes, (list, tuple))
1785
for regex in error_regexes:
1786
self.assertContainsRe(err, regex)
1789
def run_bzr_error(self, error_regexes, *args, **kwargs):
1790
"""Run bzr, and check that stderr contains the supplied regexes
1792
:param error_regexes: Sequence of regular expressions which
1793
must each be found in the error output. The relative ordering
1795
:param args: command-line arguments for bzr
1796
:param kwargs: Keyword arguments which are interpreted by run_bzr
1797
This function changes the default value of retcode to be 3,
1798
since in most cases this is run when you expect bzr to fail.
1800
:return: (out, err) The actual output of running the command (in case
1801
you want to do more inspection)
1805
# Make sure that commit is failing because there is nothing to do
1806
self.run_bzr_error(['no changes to commit'],
1807
['commit', '-m', 'my commit comment'])
1808
# Make sure --strict is handling an unknown file, rather than
1809
# giving us the 'nothing to do' error
1810
self.build_tree(['unknown'])
1811
self.run_bzr_error(['Commit refused because there are unknown files'],
1812
['commit', --strict', '-m', 'my commit comment'])
1814
kwargs.setdefault('retcode', 3)
1815
kwargs['error_regexes'] = error_regexes
1816
out, err = self.run_bzr(*args, **kwargs)
1819
def run_bzr_subprocess(self, *args, **kwargs):
1820
"""Run bzr in a subprocess for testing.
1822
This starts a new Python interpreter and runs bzr in there.
1823
This should only be used for tests that have a justifiable need for
1824
this isolation: e.g. they are testing startup time, or signal
1825
handling, or early startup code, etc. Subprocess code can't be
1826
profiled or debugged so easily.
1828
:keyword retcode: The status code that is expected. Defaults to 0. If
1829
None is supplied, the status code is not checked.
1830
:keyword env_changes: A dictionary which lists changes to environment
1831
variables. A value of None will unset the env variable.
1832
The values must be strings. The change will only occur in the
1833
child, so you don't need to fix the environment after running.
1834
:keyword universal_newlines: Convert CRLF => LF
1835
:keyword allow_plugins: By default the subprocess is run with
1836
--no-plugins to ensure test reproducibility. Also, it is possible
1837
for system-wide plugins to create unexpected output on stderr,
1838
which can cause unnecessary test failures.
1840
env_changes = kwargs.get('env_changes', {})
1841
working_dir = kwargs.get('working_dir', None)
1842
allow_plugins = kwargs.get('allow_plugins', False)
1844
if isinstance(args[0], list):
1846
elif isinstance(args[0], basestring):
1847
args = list(shlex.split(args[0]))
1849
raise ValueError("passing varargs to run_bzr_subprocess")
1850
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1851
working_dir=working_dir,
1852
allow_plugins=allow_plugins)
1853
# We distinguish between retcode=None and retcode not passed.
1854
supplied_retcode = kwargs.get('retcode', 0)
1855
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1856
universal_newlines=kwargs.get('universal_newlines', False),
1859
def start_bzr_subprocess(self, process_args, env_changes=None,
1860
skip_if_plan_to_signal=False,
1862
allow_plugins=False):
1863
"""Start bzr in a subprocess for testing.
1865
This starts a new Python interpreter and runs bzr in there.
1866
This should only be used for tests that have a justifiable need for
1867
this isolation: e.g. they are testing startup time, or signal
1868
handling, or early startup code, etc. Subprocess code can't be
1869
profiled or debugged so easily.
1871
:param process_args: a list of arguments to pass to the bzr executable,
1872
for example ``['--version']``.
1873
:param env_changes: A dictionary which lists changes to environment
1874
variables. A value of None will unset the env variable.
1875
The values must be strings. The change will only occur in the
1876
child, so you don't need to fix the environment after running.
1877
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1879
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1881
:returns: Popen object for the started process.
1883
if skip_if_plan_to_signal:
1884
if not getattr(os, 'kill', None):
1885
raise TestSkipped("os.kill not available.")
1887
if env_changes is None:
1891
def cleanup_environment():
1892
for env_var, value in env_changes.iteritems():
1893
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1895
def restore_environment():
1896
for env_var, value in old_env.iteritems():
1897
osutils.set_or_unset_env(env_var, value)
1899
bzr_path = self.get_bzr_path()
1902
if working_dir is not None:
1903
cwd = osutils.getcwd()
1904
os.chdir(working_dir)
1907
# win32 subprocess doesn't support preexec_fn
1908
# so we will avoid using it on all platforms, just to
1909
# make sure the code path is used, and we don't break on win32
1910
cleanup_environment()
1911
command = [sys.executable]
1912
# frozen executables don't need the path to bzr
1913
if getattr(sys, "frozen", None) is None:
1914
command.append(bzr_path)
1915
if not allow_plugins:
1916
command.append('--no-plugins')
1917
command.extend(process_args)
1918
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1920
restore_environment()
1926
def _popen(self, *args, **kwargs):
1927
"""Place a call to Popen.
1929
Allows tests to override this method to intercept the calls made to
1930
Popen for introspection.
1932
return Popen(*args, **kwargs)
1934
def get_source_path(self):
1935
"""Return the path of the directory containing bzrlib."""
1936
return os.path.dirname(os.path.dirname(bzrlib.__file__))
1938
def get_bzr_path(self):
1939
"""Return the path of the 'bzr' executable for this test suite."""
1940
bzr_path = self.get_source_path()+'/bzr'
1941
if not os.path.isfile(bzr_path):
1942
# We are probably installed. Assume sys.argv is the right file
1943
bzr_path = sys.argv[0]
1946
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1947
universal_newlines=False, process_args=None):
1948
"""Finish the execution of process.
1950
:param process: the Popen object returned from start_bzr_subprocess.
1951
:param retcode: The status code that is expected. Defaults to 0. If
1952
None is supplied, the status code is not checked.
1953
:param send_signal: an optional signal to send to the process.
1954
:param universal_newlines: Convert CRLF => LF
1955
:returns: (stdout, stderr)
1957
if send_signal is not None:
1958
os.kill(process.pid, send_signal)
1959
out, err = process.communicate()
1961
if universal_newlines:
1962
out = out.replace('\r\n', '\n')
1963
err = err.replace('\r\n', '\n')
1965
if retcode is not None and retcode != process.returncode:
1966
if process_args is None:
1967
process_args = "(unknown args)"
1968
mutter('Output of bzr %s:\n%s', process_args, out)
1969
mutter('Error for bzr %s:\n%s', process_args, err)
1970
self.fail('Command bzr %s failed with retcode %s != %s'
1971
% (process_args, retcode, process.returncode))
1974
def check_inventory_shape(self, inv, shape):
1975
"""Compare an inventory to a list of expected names.
1977
Fail if they are not precisely equal.
1980
shape = list(shape) # copy
1981
for path, ie in inv.entries():
1982
name = path.replace('\\', '/')
1983
if ie.kind == 'directory':
1990
self.fail("expected paths not found in inventory: %r" % shape)
1992
self.fail("unexpected paths found in inventory: %r" % extras)
1994
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1995
a_callable=None, *args, **kwargs):
1996
"""Call callable with redirected std io pipes.
1998
Returns the return code."""
1999
if not callable(a_callable):
2000
raise ValueError("a_callable must be callable.")
2002
stdin = StringIO("")
2004
if getattr(self, "_log_file", None) is not None:
2005
stdout = self._log_file
2009
if getattr(self, "_log_file", None is not None):
2010
stderr = self._log_file
2013
real_stdin = sys.stdin
2014
real_stdout = sys.stdout
2015
real_stderr = sys.stderr
2020
return a_callable(*args, **kwargs)
2022
sys.stdout = real_stdout
2023
sys.stderr = real_stderr
2024
sys.stdin = real_stdin
2026
def reduceLockdirTimeout(self):
2027
"""Reduce the default lock timeout for the duration of the test, so that
2028
if LockContention occurs during a test, it does so quickly.
2030
Tests that expect to provoke LockContention errors should call this.
2032
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2034
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2035
self.addCleanup(resetTimeout)
2036
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2038
def make_utf8_encoded_stringio(self, encoding_type=None):
2039
"""Return a StringIOWrapper instance, that will encode Unicode
2042
if encoding_type is None:
2043
encoding_type = 'strict'
2045
output_encoding = 'utf-8'
2046
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2047
sio.encoding = output_encoding
2050
def disable_verb(self, verb):
2051
"""Disable a smart server verb for one test."""
2052
from bzrlib.smart import request
2053
request_handlers = request.request_handlers
2054
orig_method = request_handlers.get(verb)
2055
request_handlers.remove(verb)
2057
request_handlers.register(verb, orig_method)
2058
self.addCleanup(restoreVerb)
2061
class CapturedCall(object):
2062
"""A helper for capturing smart server calls for easy debug analysis."""
2064
def __init__(self, params, prefix_length):
2065
"""Capture the call with params and skip prefix_length stack frames."""
2068
# The last 5 frames are the __init__, the hook frame, and 3 smart
2069
# client frames. Beyond this we could get more clever, but this is good
2071
stack = traceback.extract_stack()[prefix_length:-5]
2072
self.stack = ''.join(traceback.format_list(stack))
2075
return self.call.method
2078
return self.call.method
2084
class TestCaseWithMemoryTransport(TestCase):
2085
"""Common test class for tests that do not need disk resources.
2087
Tests that need disk resources should derive from TestCaseWithTransport.
2089
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2091
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2092
a directory which does not exist. This serves to help ensure test isolation
2093
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2094
must exist. However, TestCaseWithMemoryTransport does not offer local
2095
file defaults for the transport in tests, nor does it obey the command line
2096
override, so tests that accidentally write to the common directory should
2099
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2100
a .bzr directory that stops us ascending higher into the filesystem.
2106
def __init__(self, methodName='runTest'):
2107
# allow test parameterization after test construction and before test
2108
# execution. Variables that the parameterizer sets need to be
2109
# ones that are not set by setUp, or setUp will trash them.
2110
super(TestCaseWithMemoryTransport, self).__init__(methodName)
2111
self.vfs_transport_factory = default_transport
2112
self.transport_server = None
2113
self.transport_readonly_server = None
2114
self.__vfs_server = None
2116
def get_transport(self, relpath=None):
2117
"""Return a writeable transport.
2119
This transport is for the test scratch space relative to
2122
:param relpath: a path relative to the base url.
2124
t = get_transport(self.get_url(relpath))
2125
self.assertFalse(t.is_readonly())
2128
def get_readonly_transport(self, relpath=None):
2129
"""Return a readonly transport for the test scratch space
2131
This can be used to test that operations which should only need
2132
readonly access in fact do not try to write.
2134
:param relpath: a path relative to the base url.
2136
t = get_transport(self.get_readonly_url(relpath))
2137
self.assertTrue(t.is_readonly())
2140
def create_transport_readonly_server(self):
2141
"""Create a transport server from class defined at init.
2143
This is mostly a hook for daughter classes.
2145
return self.transport_readonly_server()
2147
def get_readonly_server(self):
2148
"""Get the server instance for the readonly transport
2150
This is useful for some tests with specific servers to do diagnostics.
2152
if self.__readonly_server is None:
2153
if self.transport_readonly_server is None:
2154
# readonly decorator requested
2155
self.__readonly_server = ReadonlyServer()
2157
# explicit readonly transport.
2158
self.__readonly_server = self.create_transport_readonly_server()
2159
self.start_server(self.__readonly_server,
2160
self.get_vfs_only_server())
2161
return self.__readonly_server
2163
def get_readonly_url(self, relpath=None):
2164
"""Get a URL for the readonly transport.
2166
This will either be backed by '.' or a decorator to the transport
2167
used by self.get_url()
2168
relpath provides for clients to get a path relative to the base url.
2169
These should only be downwards relative, not upwards.
2171
base = self.get_readonly_server().get_url()
2172
return self._adjust_url(base, relpath)
2174
def get_vfs_only_server(self):
2175
"""Get the vfs only read/write server instance.
2177
This is useful for some tests with specific servers that need
2180
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2181
is no means to override it.
2183
if self.__vfs_server is None:
2184
self.__vfs_server = MemoryServer()
2185
self.start_server(self.__vfs_server)
2186
return self.__vfs_server
2188
def get_server(self):
2189
"""Get the read/write server instance.
2191
This is useful for some tests with specific servers that need
2194
This is built from the self.transport_server factory. If that is None,
2195
then the self.get_vfs_server is returned.
2197
if self.__server is None:
2198
if (self.transport_server is None or self.transport_server is
2199
self.vfs_transport_factory):
2200
self.__server = self.get_vfs_only_server()
2202
# bring up a decorated means of access to the vfs only server.
2203
self.__server = self.transport_server()
2204
self.start_server(self.__server, self.get_vfs_only_server())
2205
return self.__server
2207
def _adjust_url(self, base, relpath):
2208
"""Get a URL (or maybe a path) for the readwrite transport.
2210
This will either be backed by '.' or to an equivalent non-file based
2212
relpath provides for clients to get a path relative to the base url.
2213
These should only be downwards relative, not upwards.
2215
if relpath is not None and relpath != '.':
2216
if not base.endswith('/'):
2218
# XXX: Really base should be a url; we did after all call
2219
# get_url()! But sometimes it's just a path (from
2220
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2221
# to a non-escaped local path.
2222
if base.startswith('./') or base.startswith('/'):
2225
base += urlutils.escape(relpath)
2228
def get_url(self, relpath=None):
2229
"""Get a URL (or maybe a path) for the readwrite transport.
2231
This will either be backed by '.' or to an equivalent non-file based
2233
relpath provides for clients to get a path relative to the base url.
2234
These should only be downwards relative, not upwards.
2236
base = self.get_server().get_url()
2237
return self._adjust_url(base, relpath)
2239
def get_vfs_only_url(self, relpath=None):
2240
"""Get a URL (or maybe a path for the plain old vfs transport.
2242
This will never be a smart protocol. It always has all the
2243
capabilities of the local filesystem, but it might actually be a
2244
MemoryTransport or some other similar virtual filesystem.
2246
This is the backing transport (if any) of the server returned by
2247
get_url and get_readonly_url.
2249
:param relpath: provides for clients to get a path relative to the base
2250
url. These should only be downwards relative, not upwards.
2253
base = self.get_vfs_only_server().get_url()
2254
return self._adjust_url(base, relpath)
2256
def _create_safety_net(self):
2257
"""Make a fake bzr directory.
2259
This prevents any tests propagating up onto the TEST_ROOT directory's
2262
root = TestCaseWithMemoryTransport.TEST_ROOT
2263
bzrdir.BzrDir.create_standalone_workingtree(root)
2265
def _check_safety_net(self):
2266
"""Check that the safety .bzr directory have not been touched.
2268
_make_test_root have created a .bzr directory to prevent tests from
2269
propagating. This method ensures than a test did not leaked.
2271
root = TestCaseWithMemoryTransport.TEST_ROOT
2272
self.permit_url(get_transport(root).base)
2273
wt = workingtree.WorkingTree.open(root)
2274
last_rev = wt.last_revision()
2275
if last_rev != 'null:':
2276
# The current test have modified the /bzr directory, we need to
2277
# recreate a new one or all the followng tests will fail.
2278
# If you need to inspect its content uncomment the following line
2279
# import pdb; pdb.set_trace()
2280
_rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2281
self._create_safety_net()
2282
raise AssertionError('%s/.bzr should not be modified' % root)
2284
def _make_test_root(self):
2285
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2286
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2287
root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2289
TestCaseWithMemoryTransport.TEST_ROOT = root
2291
self._create_safety_net()
2293
# The same directory is used by all tests, and we're not
2294
# specifically told when all tests are finished. This will do.
2295
atexit.register(_rmtree_temp_dir, root)
2297
self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2298
self.addCleanup(self._check_safety_net)
2300
def makeAndChdirToTestDir(self):
2301
"""Create a temporary directories for this one test.
2303
This must set self.test_home_dir and self.test_dir and chdir to
2306
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2308
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2309
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2310
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2311
self.permit_dir(self.test_dir)
2313
def make_branch(self, relpath, format=None):
2314
"""Create a branch on the transport at relpath."""
2315
repo = self.make_repository(relpath, format=format)
2316
return repo.bzrdir.create_branch()
2318
def make_bzrdir(self, relpath, format=None):
2320
# might be a relative or absolute path
2321
maybe_a_url = self.get_url(relpath)
2322
segments = maybe_a_url.rsplit('/', 1)
2323
t = get_transport(maybe_a_url)
2324
if len(segments) > 1 and segments[-1] not in ('', '.'):
2328
if isinstance(format, basestring):
2329
format = bzrdir.format_registry.make_bzrdir(format)
2330
return format.initialize_on_transport(t)
2331
except errors.UninitializableFormat:
2332
raise TestSkipped("Format %s is not initializable." % format)
2334
def make_repository(self, relpath, shared=False, format=None):
2335
"""Create a repository on our default transport at relpath.
2337
Note that relpath must be a relative path, not a full url.
2339
# FIXME: If you create a remoterepository this returns the underlying
2340
# real format, which is incorrect. Actually we should make sure that
2341
# RemoteBzrDir returns a RemoteRepository.
2342
# maybe mbp 20070410
2343
made_control = self.make_bzrdir(relpath, format=format)
2344
return made_control.create_repository(shared=shared)
2346
def make_smart_server(self, path):
2347
smart_server = server.SmartTCPServer_for_testing()
2348
self.start_server(smart_server, self.get_server())
2349
remote_transport = get_transport(smart_server.get_url()).clone(path)
2350
return remote_transport
2352
def make_branch_and_memory_tree(self, relpath, format=None):
2353
"""Create a branch on the default transport and a MemoryTree for it."""
2354
b = self.make_branch(relpath, format=format)
2355
return memorytree.MemoryTree.create_on_branch(b)
2357
def make_branch_builder(self, relpath, format=None):
2358
branch = self.make_branch(relpath, format=format)
2359
return branchbuilder.BranchBuilder(branch=branch)
2361
def overrideEnvironmentForTesting(self):
2362
test_home_dir = self.test_home_dir
2363
if isinstance(test_home_dir, unicode):
2364
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2365
os.environ['HOME'] = test_home_dir
2366
os.environ['BZR_HOME'] = test_home_dir
2369
super(TestCaseWithMemoryTransport, self).setUp()
2370
self._make_test_root()
2371
_currentdir = os.getcwdu()
2372
def _leaveDirectory():
2373
os.chdir(_currentdir)
2374
self.addCleanup(_leaveDirectory)
2375
self.makeAndChdirToTestDir()
2376
self.overrideEnvironmentForTesting()
2377
self.__readonly_server = None
2378
self.__server = None
2379
self.reduceLockdirTimeout()
2381
def setup_smart_server_with_call_log(self):
2382
"""Sets up a smart server as the transport server with a call log."""
2383
self.transport_server = server.SmartTCPServer_for_testing
2384
self.hpss_calls = []
2386
# Skip the current stack down to the caller of
2387
# setup_smart_server_with_call_log
2388
prefix_length = len(traceback.extract_stack()) - 2
2389
def capture_hpss_call(params):
2390
self.hpss_calls.append(
2391
CapturedCall(params, prefix_length))
2392
client._SmartClient.hooks.install_named_hook(
2393
'call', capture_hpss_call, None)
2395
def reset_smart_call_log(self):
2396
self.hpss_calls = []
2399
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2400
"""Derived class that runs a test within a temporary directory.
2402
This is useful for tests that need to create a branch, etc.
2404
The directory is created in a slightly complex way: for each
2405
Python invocation, a new temporary top-level directory is created.
2406
All test cases create their own directory within that. If the
2407
tests complete successfully, the directory is removed.
2409
:ivar test_base_dir: The path of the top-level directory for this
2410
test, which contains a home directory and a work directory.
2412
:ivar test_home_dir: An initially empty directory under test_base_dir
2413
which is used as $HOME for this test.
2415
:ivar test_dir: A directory under test_base_dir used as the current
2416
directory when the test proper is run.
2419
OVERRIDE_PYTHON = 'python'
2421
def check_file_contents(self, filename, expect):
2422
self.log("check contents of file %s" % filename)
2423
contents = file(filename, 'r').read()
2424
if contents != expect:
2425
self.log("expected: %r" % expect)
2426
self.log("actually: %r" % contents)
2427
self.fail("contents of %s not as expected" % filename)
2429
def _getTestDirPrefix(self):
2430
# create a directory within the top level test directory
2431
if sys.platform in ('win32', 'cygwin'):
2432
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2433
# windows is likely to have path-length limits so use a short name
2434
name_prefix = name_prefix[-30:]
2436
name_prefix = re.sub('[/]', '_', self.id())
2439
def makeAndChdirToTestDir(self):
2440
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2442
For TestCaseInTempDir we create a temporary directory based on the test
2443
name and then create two subdirs - test and home under it.
2445
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2446
self._getTestDirPrefix())
2448
for i in range(100):
2449
if os.path.exists(name):
2450
name = name_prefix + '_' + str(i)
2452
# now create test and home directories within this dir
2453
self.test_base_dir = name
2454
self.addCleanup(self.deleteTestDir)
2455
os.mkdir(self.test_base_dir)
2457
self.permit_dir(self.test_base_dir)
2458
# 'sprouting' and 'init' of a branch both walk up the tree to find
2459
# stacking policy to honour; create a bzr dir with an unshared
2460
# repository (but not a branch - our code would be trying to escape
2461
# then!) to stop them, and permit it to be read.
2462
# control = bzrdir.BzrDir.create(self.test_base_dir)
2463
# control.create_repository()
2464
self.test_home_dir = self.test_base_dir + '/home'
2465
os.mkdir(self.test_home_dir)
2466
self.test_dir = self.test_base_dir + '/work'
2467
os.mkdir(self.test_dir)
2468
os.chdir(self.test_dir)
2469
# put name of test inside
2470
f = file(self.test_base_dir + '/name', 'w')
2476
def deleteTestDir(self):
2477
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2478
_rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2480
def build_tree(self, shape, line_endings='binary', transport=None):
2481
"""Build a test tree according to a pattern.
2483
shape is a sequence of file specifications. If the final
2484
character is '/', a directory is created.
2486
This assumes that all the elements in the tree being built are new.
2488
This doesn't add anything to a branch.
2490
:type shape: list or tuple.
2491
:param line_endings: Either 'binary' or 'native'
2492
in binary mode, exact contents are written in native mode, the
2493
line endings match the default platform endings.
2494
:param transport: A transport to write to, for building trees on VFS's.
2495
If the transport is readonly or None, "." is opened automatically.
2498
if type(shape) not in (list, tuple):
2499
raise AssertionError("Parameter 'shape' should be "
2500
"a list or a tuple. Got %r instead" % (shape,))
2501
# It's OK to just create them using forward slashes on windows.
2502
if transport is None or transport.is_readonly():
2503
transport = get_transport(".")
2505
self.assertIsInstance(name, basestring)
2507
transport.mkdir(urlutils.escape(name[:-1]))
2509
if line_endings == 'binary':
2511
elif line_endings == 'native':
2514
raise errors.BzrError(
2515
'Invalid line ending request %r' % line_endings)
2516
content = "contents of %s%s" % (name.encode('utf-8'), end)
2517
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2519
def build_tree_contents(self, shape):
2520
build_tree_contents(shape)
2522
def assertInWorkingTree(self, path, root_path='.', tree=None):
2523
"""Assert whether path or paths are in the WorkingTree"""
2525
tree = workingtree.WorkingTree.open(root_path)
2526
if not isinstance(path, basestring):
2528
self.assertInWorkingTree(p, tree=tree)
2530
self.assertIsNot(tree.path2id(path), None,
2531
path+' not in working tree.')
2533
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2534
"""Assert whether path or paths are not in the WorkingTree"""
2536
tree = workingtree.WorkingTree.open(root_path)
2537
if not isinstance(path, basestring):
2539
self.assertNotInWorkingTree(p,tree=tree)
2541
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2544
class TestCaseWithTransport(TestCaseInTempDir):
2545
"""A test case that provides get_url and get_readonly_url facilities.
2547
These back onto two transport servers, one for readonly access and one for
2550
If no explicit class is provided for readonly access, a
2551
ReadonlyTransportDecorator is used instead which allows the use of non disk
2552
based read write transports.
2554
If an explicit class is provided for readonly access, that server and the
2555
readwrite one must both define get_url() as resolving to os.getcwd().
2558
def get_vfs_only_server(self):
2559
"""See TestCaseWithMemoryTransport.
2561
This is useful for some tests with specific servers that need
2564
if self.__vfs_server is None:
2565
self.__vfs_server = self.vfs_transport_factory()
2566
self.start_server(self.__vfs_server)
2567
return self.__vfs_server
2569
def make_branch_and_tree(self, relpath, format=None):
2570
"""Create a branch on the transport and a tree locally.
2572
If the transport is not a LocalTransport, the Tree can't be created on
2573
the transport. In that case if the vfs_transport_factory is
2574
LocalURLServer the working tree is created in the local
2575
directory backing the transport, and the returned tree's branch and
2576
repository will also be accessed locally. Otherwise a lightweight
2577
checkout is created and returned.
2579
We do this because we can't physically create a tree in the local
2580
path, with a branch reference to the transport_factory url, and
2581
a branch + repository in the vfs_transport, unless the vfs_transport
2582
namespace is distinct from the local disk - the two branch objects
2583
would collide. While we could construct a tree with its branch object
2584
pointing at the transport_factory transport in memory, reopening it
2585
would behaving unexpectedly, and has in the past caused testing bugs
2586
when we tried to do it that way.
2588
:param format: The BzrDirFormat.
2589
:returns: the WorkingTree.
2591
# TODO: always use the local disk path for the working tree,
2592
# this obviously requires a format that supports branch references
2593
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2595
b = self.make_branch(relpath, format=format)
2597
return b.bzrdir.create_workingtree()
2598
except errors.NotLocalUrl:
2599
# We can only make working trees locally at the moment. If the
2600
# transport can't support them, then we keep the non-disk-backed
2601
# branch and create a local checkout.
2602
if self.vfs_transport_factory is LocalURLServer:
2603
# the branch is colocated on disk, we cannot create a checkout.
2604
# hopefully callers will expect this.
2605
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2606
wt = local_controldir.create_workingtree()
2607
if wt.branch._format != b._format:
2609
# Make sure that assigning to wt._branch fixes wt.branch,
2610
# in case the implementation details of workingtree objects
2612
self.assertIs(b, wt.branch)
2615
return b.create_checkout(relpath, lightweight=True)
2617
def assertIsDirectory(self, relpath, transport):
2618
"""Assert that relpath within transport is a directory.
2620
This may not be possible on all transports; in that case it propagates
2621
a TransportNotPossible.
2624
mode = transport.stat(relpath).st_mode
2625
except errors.NoSuchFile:
2626
self.fail("path %s is not a directory; no such file"
2628
if not stat.S_ISDIR(mode):
2629
self.fail("path %s is not a directory; has mode %#o"
2632
def assertTreesEqual(self, left, right):
2633
"""Check that left and right have the same content and properties."""
2634
# we use a tree delta to check for equality of the content, and we
2635
# manually check for equality of other things such as the parents list.
2636
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2637
differences = left.changes_from(right)
2638
self.assertFalse(differences.has_changed(),
2639
"Trees %r and %r are different: %r" % (left, right, differences))
2642
super(TestCaseWithTransport, self).setUp()
2643
self.__vfs_server = None
2645
def disable_missing_extensions_warning(self):
2646
"""Some tests expect a precise stderr content.
2648
There is no point in forcing them to duplicate the extension related
2651
config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2654
class ChrootedTestCase(TestCaseWithTransport):
2655
"""A support class that provides readonly urls outside the local namespace.
2657
This is done by checking if self.transport_server is a MemoryServer. if it
2658
is then we are chrooted already, if it is not then an HttpServer is used
2661
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2662
be used without needed to redo it when a different
2663
subclass is in use ?
2667
super(ChrootedTestCase, self).setUp()
2668
if not self.vfs_transport_factory == MemoryServer:
2669
self.transport_readonly_server = HttpServer
2672
def condition_id_re(pattern):
2673
"""Create a condition filter which performs a re check on a test's id.
2675
:param pattern: A regular expression string.
2676
:return: A callable that returns True if the re matches.
2678
filter_re = osutils.re_compile_checked(pattern, 0,
2680
def condition(test):
2682
return filter_re.search(test_id)
2686
def condition_isinstance(klass_or_klass_list):
2687
"""Create a condition filter which returns isinstance(param, klass).
2689
:return: A callable which when called with one parameter obj return the
2690
result of isinstance(obj, klass_or_klass_list).
2693
return isinstance(obj, klass_or_klass_list)
2697
def condition_id_in_list(id_list):
2698
"""Create a condition filter which verify that test's id in a list.
2700
:param id_list: A TestIdList object.
2701
:return: A callable that returns True if the test's id appears in the list.
2703
def condition(test):
2704
return id_list.includes(test.id())
2708
def condition_id_startswith(starts):
2709
"""Create a condition filter verifying that test's id starts with a string.
2711
:param starts: A list of string.
2712
:return: A callable that returns True if the test's id starts with one of
2715
def condition(test):
2716
for start in starts:
2717
if test.id().startswith(start):
2723
def exclude_tests_by_condition(suite, condition):
2724
"""Create a test suite which excludes some tests from suite.
2726
:param suite: The suite to get tests from.
2727
:param condition: A callable whose result evaluates True when called with a
2728
test case which should be excluded from the result.
2729
:return: A suite which contains the tests found in suite that fail
2733
for test in iter_suite_tests(suite):
2734
if not condition(test):
2736
return TestUtil.TestSuite(result)
2739
def filter_suite_by_condition(suite, condition):
2740
"""Create a test suite by filtering another one.
2742
:param suite: The source suite.
2743
:param condition: A callable whose result evaluates True when called with a
2744
test case which should be included in the result.
2745
:return: A suite which contains the tests found in suite that pass
2749
for test in iter_suite_tests(suite):
2752
return TestUtil.TestSuite(result)
2755
def filter_suite_by_re(suite, pattern):
2756
"""Create a test suite by filtering another one.
2758
:param suite: the source suite
2759
:param pattern: pattern that names must match
2760
:returns: the newly created suite
2762
condition = condition_id_re(pattern)
2763
result_suite = filter_suite_by_condition(suite, condition)
2767
def filter_suite_by_id_list(suite, test_id_list):
2768
"""Create a test suite by filtering another one.
2770
:param suite: The source suite.
2771
:param test_id_list: A list of the test ids to keep as strings.
2772
:returns: the newly created suite
2774
condition = condition_id_in_list(test_id_list)
2775
result_suite = filter_suite_by_condition(suite, condition)
2779
def filter_suite_by_id_startswith(suite, start):
2780
"""Create a test suite by filtering another one.
2782
:param suite: The source suite.
2783
:param start: A list of string the test id must start with one of.
2784
:returns: the newly created suite
2786
condition = condition_id_startswith(start)
2787
result_suite = filter_suite_by_condition(suite, condition)
2791
def exclude_tests_by_re(suite, pattern):
2792
"""Create a test suite which excludes some tests from suite.
2794
:param suite: The suite to get tests from.
2795
:param pattern: A regular expression string. Test ids that match this
2796
pattern will be excluded from the result.
2797
:return: A TestSuite that contains all the tests from suite without the
2798
tests that matched pattern. The order of tests is the same as it was in
2801
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2804
def preserve_input(something):
2805
"""A helper for performing test suite transformation chains.
2807
:param something: Anything you want to preserve.
2813
def randomize_suite(suite):
2814
"""Return a new TestSuite with suite's tests in random order.
2816
The tests in the input suite are flattened into a single suite in order to
2817
accomplish this. Any nested TestSuites are removed to provide global
2820
tests = list(iter_suite_tests(suite))
2821
random.shuffle(tests)
2822
return TestUtil.TestSuite(tests)
2825
def split_suite_by_condition(suite, condition):
2826
"""Split a test suite into two by a condition.
2828
:param suite: The suite to split.
2829
:param condition: The condition to match on. Tests that match this
2830
condition are returned in the first test suite, ones that do not match
2831
are in the second suite.
2832
:return: A tuple of two test suites, where the first contains tests from
2833
suite matching the condition, and the second contains the remainder
2834
from suite. The order within each output suite is the same as it was in
2839
for test in iter_suite_tests(suite):
2841
matched.append(test)
2843
did_not_match.append(test)
2844
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2847
def split_suite_by_re(suite, pattern):
2848
"""Split a test suite into two by a regular expression.
2850
:param suite: The suite to split.
2851
:param pattern: A regular expression string. Test ids that match this
2852
pattern will be in the first test suite returned, and the others in the
2853
second test suite returned.
2854
:return: A tuple of two test suites, where the first contains tests from
2855
suite matching pattern, and the second contains the remainder from
2856
suite. The order within each output suite is the same as it was in
2859
return split_suite_by_condition(suite, condition_id_re(pattern))
2862
def run_suite(suite, name='test', verbose=False, pattern=".*",
2863
stop_on_failure=False,
2864
transport=None, lsprof_timed=None, bench_history=None,
2865
matching_tests_first=None,
2868
exclude_pattern=None,
2871
suite_decorators=None,
2873
result_decorators=None,
2875
"""Run a test suite for bzr selftest.
2877
:param runner_class: The class of runner to use. Must support the
2878
constructor arguments passed by run_suite which are more than standard
2880
:return: A boolean indicating success.
2882
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2887
if runner_class is None:
2888
runner_class = TextTestRunner
2891
runner = runner_class(stream=stream,
2893
verbosity=verbosity,
2894
bench_history=bench_history,
2896
result_decorators=result_decorators,
2898
runner.stop_on_failure=stop_on_failure
2899
# built in decorator factories:
2901
random_order(random_seed, runner),
2902
exclude_tests(exclude_pattern),
2904
if matching_tests_first:
2905
decorators.append(tests_first(pattern))
2907
decorators.append(filter_tests(pattern))
2908
if suite_decorators:
2909
decorators.extend(suite_decorators)
2910
# tell the result object how many tests will be running: (except if
2911
# --parallel=fork is being used. Robert said he will provide a better
2912
# progress design later -- vila 20090817)
2913
if fork_decorator not in decorators:
2914
decorators.append(CountingDecorator)
2915
for decorator in decorators:
2916
suite = decorator(suite)
2918
# Done after test suite decoration to allow randomisation etc
2919
# to take effect, though that is of marginal benefit.
2921
stream.write("Listing tests only ...\n")
2922
for t in iter_suite_tests(suite):
2923
stream.write("%s\n" % (t.id()))
2925
result = runner.run(suite)
2927
return result.wasStrictlySuccessful()
2929
return result.wasSuccessful()
2932
# A registry where get() returns a suite decorator.
2933
parallel_registry = registry.Registry()
2936
def fork_decorator(suite):
2937
concurrency = osutils.local_concurrency()
2938
if concurrency == 1:
2940
from testtools import ConcurrentTestSuite
2941
return ConcurrentTestSuite(suite, fork_for_tests)
2942
parallel_registry.register('fork', fork_decorator)
2945
def subprocess_decorator(suite):
2946
concurrency = osutils.local_concurrency()
2947
if concurrency == 1:
2949
from testtools import ConcurrentTestSuite
2950
return ConcurrentTestSuite(suite, reinvoke_for_tests)
2951
parallel_registry.register('subprocess', subprocess_decorator)
2954
def exclude_tests(exclude_pattern):
2955
"""Return a test suite decorator that excludes tests."""
2956
if exclude_pattern is None:
2957
return identity_decorator
2958
def decorator(suite):
2959
return ExcludeDecorator(suite, exclude_pattern)
2963
def filter_tests(pattern):
2965
return identity_decorator
2966
def decorator(suite):
2967
return FilterTestsDecorator(suite, pattern)
2971
def random_order(random_seed, runner):
2972
"""Return a test suite decorator factory for randomising tests order.
2974
:param random_seed: now, a string which casts to a long, or a long.
2975
:param runner: A test runner with a stream attribute to report on.
2977
if random_seed is None:
2978
return identity_decorator
2979
def decorator(suite):
2980
return RandomDecorator(suite, random_seed, runner.stream)
2984
def tests_first(pattern):
2986
return identity_decorator
2987
def decorator(suite):
2988
return TestFirstDecorator(suite, pattern)
2992
def identity_decorator(suite):
2997
class TestDecorator(TestSuite):
2998
"""A decorator for TestCase/TestSuite objects.
3000
Usually, subclasses should override __iter__(used when flattening test
3001
suites), which we do to filter, reorder, parallelise and so on, run() and
3005
def __init__(self, suite):
3006
TestSuite.__init__(self)
3009
def countTestCases(self):
3012
cases += test.countTestCases()
3019
def run(self, result):
3020
# Use iteration on self, not self._tests, to allow subclasses to hook
3023
if result.shouldStop:
3029
class CountingDecorator(TestDecorator):
3030
"""A decorator which calls result.progress(self.countTestCases)."""
3032
def run(self, result):
3033
progress_method = getattr(result, 'progress', None)
3034
if callable(progress_method):
3035
progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3036
return super(CountingDecorator, self).run(result)
3039
class ExcludeDecorator(TestDecorator):
3040
"""A decorator which excludes test matching an exclude pattern."""
3042
def __init__(self, suite, exclude_pattern):
3043
TestDecorator.__init__(self, suite)
3044
self.exclude_pattern = exclude_pattern
3045
self.excluded = False
3049
return iter(self._tests)
3050
self.excluded = True
3051
suite = exclude_tests_by_re(self, self.exclude_pattern)
3053
self.addTests(suite)
3054
return iter(self._tests)
3057
class FilterTestsDecorator(TestDecorator):
3058
"""A decorator which filters tests to those matching a pattern."""
3060
def __init__(self, suite, pattern):
3061
TestDecorator.__init__(self, suite)
3062
self.pattern = pattern
3063
self.filtered = False
3067
return iter(self._tests)
3068
self.filtered = True
3069
suite = filter_suite_by_re(self, self.pattern)
3071
self.addTests(suite)
3072
return iter(self._tests)
3075
class RandomDecorator(TestDecorator):
3076
"""A decorator which randomises the order of its tests."""
3078
def __init__(self, suite, random_seed, stream):
3079
TestDecorator.__init__(self, suite)
3080
self.random_seed = random_seed
3081
self.randomised = False
3082
self.stream = stream
3086
return iter(self._tests)
3087
self.randomised = True
3088
self.stream.write("Randomizing test order using seed %s\n\n" %
3089
(self.actual_seed()))
3090
# Initialise the random number generator.
3091
random.seed(self.actual_seed())
3092
suite = randomize_suite(self)
3094
self.addTests(suite)
3095
return iter(self._tests)
3097
def actual_seed(self):
3098
if self.random_seed == "now":
3099
# We convert the seed to a long to make it reuseable across
3100
# invocations (because the user can reenter it).
3101
self.random_seed = long(time.time())
3103
# Convert the seed to a long if we can
3105
self.random_seed = long(self.random_seed)
3108
return self.random_seed
3111
class TestFirstDecorator(TestDecorator):
3112
"""A decorator which moves named tests to the front."""
3114
def __init__(self, suite, pattern):
3115
TestDecorator.__init__(self, suite)
3116
self.pattern = pattern
3117
self.filtered = False
3121
return iter(self._tests)
3122
self.filtered = True
3123
suites = split_suite_by_re(self, self.pattern)
3125
self.addTests(suites)
3126
return iter(self._tests)
3129
def partition_tests(suite, count):
3130
"""Partition suite into count lists of tests."""
3132
tests = list(iter_suite_tests(suite))
3133
tests_per_process = int(math.ceil(float(len(tests)) / count))
3134
for block in range(count):
3135
low_test = block * tests_per_process
3136
high_test = low_test + tests_per_process
3137
process_tests = tests[low_test:high_test]
3138
result.append(process_tests)
3142
def fork_for_tests(suite):
3143
"""Take suite and start up one runner per CPU by forking()
3145
:return: An iterable of TestCase-like objects which can each have
3146
run(result) called on them to feed tests to result.
3148
concurrency = osutils.local_concurrency()
3150
from subunit import TestProtocolClient, ProtocolTestCase
3151
from subunit.test_results import AutoTimingTestResultDecorator
3152
class TestInOtherProcess(ProtocolTestCase):
3153
# Should be in subunit, I think. RBC.
3154
def __init__(self, stream, pid):
3155
ProtocolTestCase.__init__(self, stream)
3158
def run(self, result):
3160
ProtocolTestCase.run(self, result)
3162
os.waitpid(self.pid, os.WNOHANG)
3164
test_blocks = partition_tests(suite, concurrency)
3165
for process_tests in test_blocks:
3166
process_suite = TestSuite()
3167
process_suite.addTests(process_tests)
3168
c2pread, c2pwrite = os.pipe()
3173
# Leave stderr and stdout open so we can see test noise
3174
# Close stdin so that the child goes away if it decides to
3175
# read from stdin (otherwise its a roulette to see what
3176
# child actually gets keystrokes for pdb etc).
3179
stream = os.fdopen(c2pwrite, 'wb', 1)
3180
subunit_result = AutoTimingTestResultDecorator(
3181
TestProtocolClient(stream))
3182
process_suite.run(subunit_result)
3187
stream = os.fdopen(c2pread, 'rb', 1)
3188
test = TestInOtherProcess(stream, pid)
3193
def reinvoke_for_tests(suite):
3194
"""Take suite and start up one runner per CPU using subprocess().
3196
:return: An iterable of TestCase-like objects which can each have
3197
run(result) called on them to feed tests to result.
3199
concurrency = osutils.local_concurrency()
3201
from subunit import ProtocolTestCase
3202
class TestInSubprocess(ProtocolTestCase):
3203
def __init__(self, process, name):
3204
ProtocolTestCase.__init__(self, process.stdout)
3205
self.process = process
3206
self.process.stdin.close()
3209
def run(self, result):
3211
ProtocolTestCase.run(self, result)
3214
os.unlink(self.name)
3215
# print "pid %d finished" % finished_process
3216
test_blocks = partition_tests(suite, concurrency)
3217
for process_tests in test_blocks:
3218
# ugly; currently reimplement rather than reuses TestCase methods.
3219
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3220
if not os.path.isfile(bzr_path):
3221
# We are probably installed. Assume sys.argv is the right file
3222
bzr_path = sys.argv[0]
3223
bzr_path = [bzr_path]
3224
if sys.platform == "win32":
3225
# if we're on windows, we can't execute the bzr script directly
3226
bzr_path = [sys.executable] + bzr_path
3227
fd, test_list_file_name = tempfile.mkstemp()
3228
test_list_file = os.fdopen(fd, 'wb', 1)
3229
for test in process_tests:
3230
test_list_file.write(test.id() + '\n')
3231
test_list_file.close()
3233
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3235
if '--no-plugins' in sys.argv:
3236
argv.append('--no-plugins')
3237
# stderr=STDOUT would be ideal, but until we prevent noise on
3238
# stderr it can interrupt the subunit protocol.
3239
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3241
test = TestInSubprocess(process, test_list_file_name)
3244
os.unlink(test_list_file_name)
3249
class ForwardingResult(unittest.TestResult):
3251
def __init__(self, target):
3252
unittest.TestResult.__init__(self)
3253
self.result = target
3255
def startTest(self, test):
3256
self.result.startTest(test)
3258
def stopTest(self, test):
3259
self.result.stopTest(test)
3261
def startTestRun(self):
3262
self.result.startTestRun()
3264
def stopTestRun(self):
3265
self.result.stopTestRun()
3267
def addSkip(self, test, reason):
3268
self.result.addSkip(test, reason)
3270
def addSuccess(self, test):
3271
self.result.addSuccess(test)
3273
def addError(self, test, err):
3274
self.result.addError(test, err)
3276
def addFailure(self, test, err):
3277
self.result.addFailure(test, err)
3278
ForwardingResult = testtools.ExtendedToOriginalDecorator
3281
class ProfileResult(ForwardingResult):
3282
"""Generate profiling data for all activity between start and success.
3284
The profile data is appended to the test's _benchcalls attribute and can
3285
be accessed by the forwarded-to TestResult.
3287
While it might be cleaner do accumulate this in stopTest, addSuccess is
3288
where our existing output support for lsprof is, and this class aims to
3289
fit in with that: while it could be moved it's not necessary to accomplish
3290
test profiling, nor would it be dramatically cleaner.
3293
def startTest(self, test):
3294
self.profiler = bzrlib.lsprof.BzrProfiler()
3295
self.profiler.start()
3296
ForwardingResult.startTest(self, test)
3298
def addSuccess(self, test):
3299
stats = self.profiler.stop()
3301
calls = test._benchcalls
3302
except AttributeError:
3303
test._benchcalls = []
3304
calls = test._benchcalls
3305
calls.append(((test.id(), "", ""), stats))
3306
ForwardingResult.addSuccess(self, test)
3308
def stopTest(self, test):
3309
ForwardingResult.stopTest(self, test)
3310
self.profiler = None
3313
# Controlled by "bzr selftest -E=..." option
3314
# Currently supported:
3315
# -Eallow_debug Will no longer clear debug.debug_flags() so it
3316
# preserves any flags supplied at the command line.
3317
# -Edisable_lock_checks Turns errors in mismatched locks into simple prints
3318
# rather than failing tests. And no longer raise
3319
# LockContention when fctnl locks are not being used
3320
# with proper exclusion rules.
3321
selftest_debug_flags = set()
3324
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3326
test_suite_factory=None,
3329
matching_tests_first=None,
3332
exclude_pattern=None,
3338
suite_decorators=None,
3342
"""Run the whole test suite under the enhanced runner"""
3343
# XXX: Very ugly way to do this...
3344
# Disable warning about old formats because we don't want it to disturb
3345
# any blackbox tests.
3346
from bzrlib import repository
3347
repository._deprecation_warning_done = True
3349
global default_transport
3350
if transport is None:
3351
transport = default_transport
3352
old_transport = default_transport
3353
default_transport = transport
3354
global selftest_debug_flags
3355
old_debug_flags = selftest_debug_flags
3356
if debug_flags is not None:
3357
selftest_debug_flags = set(debug_flags)
3359
if load_list is None:
3362
keep_only = load_test_id_list(load_list)
3364
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3365
for start in starting_with]
3366
if test_suite_factory is None:
3367
# Reduce loading time by loading modules based on the starting_with
3369
suite = test_suite(keep_only, starting_with)
3371
suite = test_suite_factory()
3373
# But always filter as requested.
3374
suite = filter_suite_by_id_startswith(suite, starting_with)
3375
result_decorators = []
3377
result_decorators.append(ProfileResult)
3378
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3379
stop_on_failure=stop_on_failure,
3380
transport=transport,
3381
lsprof_timed=lsprof_timed,
3382
bench_history=bench_history,
3383
matching_tests_first=matching_tests_first,
3384
list_only=list_only,
3385
random_seed=random_seed,
3386
exclude_pattern=exclude_pattern,
3388
runner_class=runner_class,
3389
suite_decorators=suite_decorators,
3391
result_decorators=result_decorators,
3394
default_transport = old_transport
3395
selftest_debug_flags = old_debug_flags
3398
def load_test_id_list(file_name):
3399
"""Load a test id list from a text file.
3401
The format is one test id by line. No special care is taken to impose
3402
strict rules, these test ids are used to filter the test suite so a test id
3403
that do not match an existing test will do no harm. This allows user to add
3404
comments, leave blank lines, etc.
3408
ftest = open(file_name, 'rt')
3410
if e.errno != errno.ENOENT:
3413
raise errors.NoSuchFile(file_name)
3415
for test_name in ftest.readlines():
3416
test_list.append(test_name.strip())
3421
def suite_matches_id_list(test_suite, id_list):
3422
"""Warns about tests not appearing or appearing more than once.
3424
:param test_suite: A TestSuite object.
3425
:param test_id_list: The list of test ids that should be found in
3428
:return: (absents, duplicates) absents is a list containing the test found
3429
in id_list but not in test_suite, duplicates is a list containing the
3430
test found multiple times in test_suite.
3432
When using a prefined test id list, it may occurs that some tests do not
3433
exist anymore or that some tests use the same id. This function warns the
3434
tester about potential problems in his workflow (test lists are volatile)
3435
or in the test suite itself (using the same id for several tests does not
3436
help to localize defects).
3438
# Build a dict counting id occurrences
3440
for test in iter_suite_tests(test_suite):
3442
tests[id] = tests.get(id, 0) + 1
3447
occurs = tests.get(id, 0)
3449
not_found.append(id)
3451
duplicates.append(id)
3453
return not_found, duplicates
3456
class TestIdList(object):
3457
"""Test id list to filter a test suite.
3459
Relying on the assumption that test ids are built as:
3460
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3461
notation, this class offers methods to :
3462
- avoid building a test suite for modules not refered to in the test list,
3463
- keep only the tests listed from the module test suite.
3466
def __init__(self, test_id_list):
3467
# When a test suite needs to be filtered against us we compare test ids
3468
# for equality, so a simple dict offers a quick and simple solution.
3469
self.tests = dict().fromkeys(test_id_list, True)
3471
# While unittest.TestCase have ids like:
3472
# <module>.<class>.<method>[(<param+)],
3473
# doctest.DocTestCase can have ids like:
3476
# <module>.<function>
3477
# <module>.<class>.<method>
3479
# Since we can't predict a test class from its name only, we settle on
3480
# a simple constraint: a test id always begins with its module name.
3483
for test_id in test_id_list:
3484
parts = test_id.split('.')
3485
mod_name = parts.pop(0)
3486
modules[mod_name] = True
3488
mod_name += '.' + part
3489
modules[mod_name] = True
3490
self.modules = modules
3492
def refers_to(self, module_name):
3493
"""Is there tests for the module or one of its sub modules."""
3494
return self.modules.has_key(module_name)
3496
def includes(self, test_id):
3497
return self.tests.has_key(test_id)
3500
class TestPrefixAliasRegistry(registry.Registry):
3501
"""A registry for test prefix aliases.
3503
This helps implement shorcuts for the --starting-with selftest
3504
option. Overriding existing prefixes is not allowed but not fatal (a
3505
warning will be emitted).
3508
def register(self, key, obj, help=None, info=None,
3509
override_existing=False):
3510
"""See Registry.register.
3512
Trying to override an existing alias causes a warning to be emitted,
3513
not a fatal execption.
3516
super(TestPrefixAliasRegistry, self).register(
3517
key, obj, help=help, info=info, override_existing=False)
3519
actual = self.get(key)
3520
note('Test prefix alias %s is already used for %s, ignoring %s'
3521
% (key, actual, obj))
3523
def resolve_alias(self, id_start):
3524
"""Replace the alias by the prefix in the given string.
3526
Using an unknown prefix is an error to help catching typos.
3528
parts = id_start.split('.')
3530
parts[0] = self.get(parts[0])
3532
raise errors.BzrCommandError(
3533
'%s is not a known test prefix alias' % parts[0])
3534
return '.'.join(parts)
3537
test_prefix_alias_registry = TestPrefixAliasRegistry()
3538
"""Registry of test prefix aliases."""
3541
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3542
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3543
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3545
# Obvious higest levels prefixes, feel free to add your own via a plugin
3546
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3547
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3548
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3549
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3550
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3553
def _test_suite_testmod_names():
3554
"""Return the standard list of test module names to test."""
3557
'bzrlib.tests.blackbox',
3558
'bzrlib.tests.commands',
3559
'bzrlib.tests.per_branch',
3560
'bzrlib.tests.per_bzrdir',
3561
'bzrlib.tests.per_foreign_vcs',
3562
'bzrlib.tests.per_interrepository',
3563
'bzrlib.tests.per_intertree',
3564
'bzrlib.tests.per_inventory',
3565
'bzrlib.tests.per_interbranch',
3566
'bzrlib.tests.per_lock',
3567
'bzrlib.tests.per_merger',
3568
'bzrlib.tests.per_transport',
3569
'bzrlib.tests.per_tree',
3570
'bzrlib.tests.per_pack_repository',
3571
'bzrlib.tests.per_repository',
3572
'bzrlib.tests.per_repository_chk',
3573
'bzrlib.tests.per_repository_reference',
3574
'bzrlib.tests.per_uifactory',
3575
'bzrlib.tests.per_versionedfile',
3576
'bzrlib.tests.per_workingtree',
3577
'bzrlib.tests.test__annotator',
3578
'bzrlib.tests.test__bencode',
3579
'bzrlib.tests.test__chk_map',
3580
'bzrlib.tests.test__dirstate_helpers',
3581
'bzrlib.tests.test__groupcompress',
3582
'bzrlib.tests.test__known_graph',
3583
'bzrlib.tests.test__rio',
3584
'bzrlib.tests.test__simple_set',
3585
'bzrlib.tests.test__static_tuple',
3586
'bzrlib.tests.test__walkdirs_win32',
3587
'bzrlib.tests.test_ancestry',
3588
'bzrlib.tests.test_annotate',
3589
'bzrlib.tests.test_api',
3590
'bzrlib.tests.test_atomicfile',
3591
'bzrlib.tests.test_bad_files',
3592
'bzrlib.tests.test_bisect_multi',
3593
'bzrlib.tests.test_branch',
3594
'bzrlib.tests.test_branchbuilder',
3595
'bzrlib.tests.test_btree_index',
3596
'bzrlib.tests.test_bugtracker',
3597
'bzrlib.tests.test_bundle',
3598
'bzrlib.tests.test_bzrdir',
3599
'bzrlib.tests.test__chunks_to_lines',
3600
'bzrlib.tests.test_cache_utf8',
3601
'bzrlib.tests.test_chk_map',
3602
'bzrlib.tests.test_chk_serializer',
3603
'bzrlib.tests.test_chunk_writer',
3604
'bzrlib.tests.test_clean_tree',
3605
'bzrlib.tests.test_cleanup',
3606
'bzrlib.tests.test_commands',
3607
'bzrlib.tests.test_commit',
3608
'bzrlib.tests.test_commit_merge',
3609
'bzrlib.tests.test_config',
3610
'bzrlib.tests.test_conflicts',
3611
'bzrlib.tests.test_counted_lock',
3612
'bzrlib.tests.test_crash',
3613
'bzrlib.tests.test_decorators',
3614
'bzrlib.tests.test_delta',
3615
'bzrlib.tests.test_debug',
3616
'bzrlib.tests.test_deprecated_graph',
3617
'bzrlib.tests.test_diff',
3618
'bzrlib.tests.test_directory_service',
3619
'bzrlib.tests.test_dirstate',
3620
'bzrlib.tests.test_email_message',
3621
'bzrlib.tests.test_eol_filters',
3622
'bzrlib.tests.test_errors',
3623
'bzrlib.tests.test_export',
3624
'bzrlib.tests.test_extract',
3625
'bzrlib.tests.test_fetch',
3626
'bzrlib.tests.test_fifo_cache',
3627
'bzrlib.tests.test_filters',
3628
'bzrlib.tests.test_ftp_transport',
3629
'bzrlib.tests.test_foreign',
3630
'bzrlib.tests.test_generate_docs',
3631
'bzrlib.tests.test_generate_ids',
3632
'bzrlib.tests.test_globbing',
3633
'bzrlib.tests.test_gpg',
3634
'bzrlib.tests.test_graph',
3635
'bzrlib.tests.test_groupcompress',
3636
'bzrlib.tests.test_hashcache',
3637
'bzrlib.tests.test_help',
3638
'bzrlib.tests.test_hooks',
3639
'bzrlib.tests.test_http',
3640
'bzrlib.tests.test_http_response',
3641
'bzrlib.tests.test_https_ca_bundle',
3642
'bzrlib.tests.test_identitymap',
3643
'bzrlib.tests.test_ignores',
3644
'bzrlib.tests.test_index',
3645
'bzrlib.tests.test_info',
3646
'bzrlib.tests.test_inv',
3647
'bzrlib.tests.test_inventory_delta',
3648
'bzrlib.tests.test_knit',
3649
'bzrlib.tests.test_lazy_import',
3650
'bzrlib.tests.test_lazy_regex',
3651
'bzrlib.tests.test_lock',
3652
'bzrlib.tests.test_lockable_files',
3653
'bzrlib.tests.test_lockdir',
3654
'bzrlib.tests.test_log',
3655
'bzrlib.tests.test_lru_cache',
3656
'bzrlib.tests.test_lsprof',
3657
'bzrlib.tests.test_mail_client',
3658
'bzrlib.tests.test_memorytree',
3659
'bzrlib.tests.test_merge',
3660
'bzrlib.tests.test_merge3',
3661
'bzrlib.tests.test_merge_core',
3662
'bzrlib.tests.test_merge_directive',
3663
'bzrlib.tests.test_missing',
3664
'bzrlib.tests.test_msgeditor',
3665
'bzrlib.tests.test_multiparent',
3666
'bzrlib.tests.test_mutabletree',
3667
'bzrlib.tests.test_nonascii',
3668
'bzrlib.tests.test_options',
3669
'bzrlib.tests.test_osutils',
3670
'bzrlib.tests.test_osutils_encodings',
3671
'bzrlib.tests.test_pack',
3672
'bzrlib.tests.test_patch',
3673
'bzrlib.tests.test_patches',
3674
'bzrlib.tests.test_permissions',
3675
'bzrlib.tests.test_plugins',
3676
'bzrlib.tests.test_progress',
3677
'bzrlib.tests.test_read_bundle',
3678
'bzrlib.tests.test_reconcile',
3679
'bzrlib.tests.test_reconfigure',
3680
'bzrlib.tests.test_registry',
3681
'bzrlib.tests.test_remote',
3682
'bzrlib.tests.test_rename_map',
3683
'bzrlib.tests.test_repository',
3684
'bzrlib.tests.test_revert',
3685
'bzrlib.tests.test_revision',
3686
'bzrlib.tests.test_revisionspec',
3687
'bzrlib.tests.test_revisiontree',
3688
'bzrlib.tests.test_rio',
3689
'bzrlib.tests.test_rules',
3690
'bzrlib.tests.test_sampler',
3691
'bzrlib.tests.test_script',
3692
'bzrlib.tests.test_selftest',
3693
'bzrlib.tests.test_serializer',
3694
'bzrlib.tests.test_setup',
3695
'bzrlib.tests.test_sftp_transport',
3696
'bzrlib.tests.test_shelf',
3697
'bzrlib.tests.test_shelf_ui',
3698
'bzrlib.tests.test_smart',
3699
'bzrlib.tests.test_smart_add',
3700
'bzrlib.tests.test_smart_request',
3701
'bzrlib.tests.test_smart_transport',
3702
'bzrlib.tests.test_smtp_connection',
3703
'bzrlib.tests.test_source',
3704
'bzrlib.tests.test_ssh_transport',
3705
'bzrlib.tests.test_status',
3706
'bzrlib.tests.test_store',
3707
'bzrlib.tests.test_strace',
3708
'bzrlib.tests.test_subsume',
3709
'bzrlib.tests.test_switch',
3710
'bzrlib.tests.test_symbol_versioning',
3711
'bzrlib.tests.test_tag',
3712
'bzrlib.tests.test_testament',
3713
'bzrlib.tests.test_textfile',
3714
'bzrlib.tests.test_textmerge',
3715
'bzrlib.tests.test_timestamp',
3716
'bzrlib.tests.test_trace',
3717
'bzrlib.tests.test_transactions',
3718
'bzrlib.tests.test_transform',
3719
'bzrlib.tests.test_transport',
3720
'bzrlib.tests.test_transport_log',
3721
'bzrlib.tests.test_tree',
3722
'bzrlib.tests.test_treebuilder',
3723
'bzrlib.tests.test_tsort',
3724
'bzrlib.tests.test_tuned_gzip',
3725
'bzrlib.tests.test_ui',
3726
'bzrlib.tests.test_uncommit',
3727
'bzrlib.tests.test_upgrade',
3728
'bzrlib.tests.test_upgrade_stacked',
3729
'bzrlib.tests.test_urlutils',
3730
'bzrlib.tests.test_version',
3731
'bzrlib.tests.test_version_info',
3732
'bzrlib.tests.test_weave',
3733
'bzrlib.tests.test_whitebox',
3734
'bzrlib.tests.test_win32utils',
3735
'bzrlib.tests.test_workingtree',
3736
'bzrlib.tests.test_workingtree_4',
3737
'bzrlib.tests.test_wsgi',
3738
'bzrlib.tests.test_xml',
3742
def _test_suite_modules_to_doctest():
3743
"""Return the list of modules to doctest."""
3746
'bzrlib.branchbuilder',
3749
'bzrlib.iterablefile',
3753
'bzrlib.symbol_versioning',
3756
'bzrlib.version_info_formats.format_custom',
3760
def test_suite(keep_only=None, starting_with=None):
3761
"""Build and return TestSuite for the whole of bzrlib.
3763
:param keep_only: A list of test ids limiting the suite returned.
3765
:param starting_with: An id limiting the suite returned to the tests
3768
This function can be replaced if you need to change the default test
3769
suite on a global basis, but it is not encouraged.
3772
loader = TestUtil.TestLoader()
3774
if keep_only is not None:
3775
id_filter = TestIdList(keep_only)
3777
# We take precedence over keep_only because *at loading time* using
3778
# both options means we will load less tests for the same final result.
3779
def interesting_module(name):
3780
for start in starting_with:
3782
# Either the module name starts with the specified string
3783
name.startswith(start)
3784
# or it may contain tests starting with the specified string
3785
or start.startswith(name)
3789
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3791
elif keep_only is not None:
3792
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3793
def interesting_module(name):
3794
return id_filter.refers_to(name)
3797
loader = TestUtil.TestLoader()
3798
def interesting_module(name):
3799
# No filtering, all modules are interesting
3802
suite = loader.suiteClass()
3804
# modules building their suite with loadTestsFromModuleNames
3805
suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3807
for mod in _test_suite_modules_to_doctest():
3808
if not interesting_module(mod):
3809
# No tests to keep here, move along
3812
# note that this really does mean "report only" -- doctest
3813
# still runs the rest of the examples
3814
doc_suite = doctest.DocTestSuite(mod,
3815
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3816
except ValueError, e:
3817
print '**failed to get doctest for: %s\n%s' % (mod, e)
3819
if len(doc_suite._tests) == 0:
3820
raise errors.BzrError("no doctests found in %s" % (mod,))
3821
suite.addTest(doc_suite)
3823
default_encoding = sys.getdefaultencoding()
3824
for name, plugin in bzrlib.plugin.plugins().items():
3825
if not interesting_module(plugin.module.__name__):
3827
plugin_suite = plugin.test_suite()
3828
# We used to catch ImportError here and turn it into just a warning,
3829
# but really if you don't have --no-plugins this should be a failure.
3830
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3831
if plugin_suite is None:
3832
plugin_suite = plugin.load_plugin_tests(loader)
3833
if plugin_suite is not None:
3834
suite.addTest(plugin_suite)
3835
if default_encoding != sys.getdefaultencoding():
3836
bzrlib.trace.warning(
3837
'Plugin "%s" tried to reset default encoding to: %s', name,
3838
sys.getdefaultencoding())
3840
sys.setdefaultencoding(default_encoding)
3842
if keep_only is not None:
3843
# Now that the referred modules have loaded their tests, keep only the
3845
suite = filter_suite_by_id_list(suite, id_filter)
3846
# Do some sanity checks on the id_list filtering
3847
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3849
# The tester has used both keep_only and starting_with, so he is
3850
# already aware that some tests are excluded from the list, there
3851
# is no need to tell him which.
3854
# Some tests mentioned in the list are not in the test suite. The
3855
# list may be out of date, report to the tester.
3856
for id in not_found:
3857
bzrlib.trace.warning('"%s" not found in the test suite', id)
3858
for id in duplicates:
3859
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3864
def multiply_scenarios(scenarios_left, scenarios_right):
3865
"""Multiply two sets of scenarios.
3867
:returns: the cartesian product of the two sets of scenarios, that is
3868
a scenario for every possible combination of a left scenario and a
3872
('%s,%s' % (left_name, right_name),
3873
dict(left_dict.items() + right_dict.items()))
3874
for left_name, left_dict in scenarios_left
3875
for right_name, right_dict in scenarios_right]
3878
def multiply_tests(tests, scenarios, result):
3879
"""Multiply tests_list by scenarios into result.
3881
This is the core workhorse for test parameterisation.
3883
Typically the load_tests() method for a per-implementation test suite will
3884
call multiply_tests and return the result.
3886
:param tests: The tests to parameterise.
3887
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3888
scenario_param_dict).
3889
:param result: A TestSuite to add created tests to.
3891
This returns the passed in result TestSuite with the cross product of all
3892
the tests repeated once for each scenario. Each test is adapted by adding
3893
the scenario name at the end of its id(), and updating the test object's
3894
__dict__ with the scenario_param_dict.
3896
>>> import bzrlib.tests.test_sampler
3897
>>> r = multiply_tests(
3898
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3899
... [('one', dict(param=1)),
3900
... ('two', dict(param=2))],
3902
>>> tests = list(iter_suite_tests(r))
3906
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3912
for test in iter_suite_tests(tests):
3913
apply_scenarios(test, scenarios, result)
3917
def apply_scenarios(test, scenarios, result):
3918
"""Apply the scenarios in scenarios to test and add to result.
3920
:param test: The test to apply scenarios to.
3921
:param scenarios: An iterable of scenarios to apply to test.
3923
:seealso: apply_scenario
3925
for scenario in scenarios:
3926
result.addTest(apply_scenario(test, scenario))
3930
def apply_scenario(test, scenario):
3931
"""Copy test and apply scenario to it.
3933
:param test: A test to adapt.
3934
:param scenario: A tuple describing the scenarion.
3935
The first element of the tuple is the new test id.
3936
The second element is a dict containing attributes to set on the
3938
:return: The adapted test.
3940
new_id = "%s(%s)" % (test.id(), scenario[0])
3941
new_test = clone_test(test, new_id)
3942
for name, value in scenario[1].items():
3943
setattr(new_test, name, value)
3947
def clone_test(test, new_id):
3948
"""Clone a test giving it a new id.
3950
:param test: The test to clone.
3951
:param new_id: The id to assign to it.
3952
:return: The new test.
3954
new_test = copy(test)
3955
new_test.id = lambda: new_id
3959
def permute_tests_for_extension(standard_tests, loader, py_module_name,
3961
"""Helper for permutating tests against an extension module.
3963
This is meant to be used inside a modules 'load_tests()' function. It will
3964
create 2 scenarios, and cause all tests in the 'standard_tests' to be run
3965
against both implementations. Setting 'test.module' to the appropriate
3966
module. See bzrlib.tests.test__chk_map.load_tests as an example.
3968
:param standard_tests: A test suite to permute
3969
:param loader: A TestLoader
3970
:param py_module_name: The python path to a python module that can always
3971
be loaded, and will be considered the 'python' implementation. (eg
3972
'bzrlib._chk_map_py')
3973
:param ext_module_name: The python path to an extension module. If the
3974
module cannot be loaded, a single test will be added, which notes that
3975
the module is not available. If it can be loaded, all standard_tests
3976
will be run against that module.
3977
:return: (suite, feature) suite is a test-suite that has all the permuted
3978
tests. feature is the Feature object that can be used to determine if
3979
the module is available.
3982
py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
3984
('python', {'module': py_module}),
3986
suite = loader.suiteClass()
3987
feature = ModuleAvailableFeature(ext_module_name)
3988
if feature.available():
3989
scenarios.append(('C', {'module': feature.module}))
3991
# the compiled module isn't available, so we add a failing test
3992
class FailWithoutFeature(TestCase):
3993
def test_fail(self):
3994
self.requireFeature(feature)
3995
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
3996
result = multiply_tests(standard_tests, scenarios, suite)
3997
return result, feature
4000
def _rmtree_temp_dir(dirname, test_id=None):
4001
# If LANG=C we probably have created some bogus paths
4002
# which rmtree(unicode) will fail to delete
4003
# so make sure we are using rmtree(str) to delete everything
4004
# except on win32, where rmtree(str) will fail
4005
# since it doesn't have the property of byte-stream paths
4006
# (they are either ascii or mbcs)
4007
if sys.platform == 'win32':
4008
# make sure we are using the unicode win32 api
4009
dirname = unicode(dirname)
4011
dirname = dirname.encode(sys.getfilesystemencoding())
4013
osutils.rmtree(dirname)
4015
# We don't want to fail here because some useful display will be lost
4016
# otherwise. Polluting the tmp dir is bad, but not giving all the
4017
# possible info to the test runner is even worse.
4019
ui.ui_factory.clear_term()
4020
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4021
sys.stderr.write('Unable to remove testing dir %s\n%s'
4022
% (os.path.basename(dirname), e))
4025
class Feature(object):
4026
"""An operating system Feature."""
4029
self._available = None
4031
def available(self):
4032
"""Is the feature available?
4034
:return: True if the feature is available.
4036
if self._available is None:
4037
self._available = self._probe()
4038
return self._available
4041
"""Implement this method in concrete features.
4043
:return: True if the feature is available.
4045
raise NotImplementedError
4048
if getattr(self, 'feature_name', None):
4049
return self.feature_name()
4050
return self.__class__.__name__
4053
class _SymlinkFeature(Feature):
4056
return osutils.has_symlinks()
4058
def feature_name(self):
4061
SymlinkFeature = _SymlinkFeature()
4064
class _HardlinkFeature(Feature):
4067
return osutils.has_hardlinks()
4069
def feature_name(self):
4072
HardlinkFeature = _HardlinkFeature()
4075
class _OsFifoFeature(Feature):
4078
return getattr(os, 'mkfifo', None)
4080
def feature_name(self):
4081
return 'filesystem fifos'
4083
OsFifoFeature = _OsFifoFeature()
4086
class _UnicodeFilenameFeature(Feature):
4087
"""Does the filesystem support Unicode filenames?"""
4091
# Check for character combinations unlikely to be covered by any
4092
# single non-unicode encoding. We use the characters
4093
# - greek small letter alpha (U+03B1) and
4094
# - braille pattern dots-123456 (U+283F).
4095
os.stat(u'\u03b1\u283f')
4096
except UnicodeEncodeError:
4098
except (IOError, OSError):
4099
# The filesystem allows the Unicode filename but the file doesn't
4103
# The filesystem allows the Unicode filename and the file exists,
4107
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4110
class _CompatabilityThunkFeature(Feature):
4111
"""This feature is just a thunk to another feature.
4113
It issues a deprecation warning if it is accessed, to let you know that you
4114
should really use a different feature.
4117
def __init__(self, module, name, this_name, dep_version):
4118
super(_CompatabilityThunkFeature, self).__init__()
4119
self._module = module
4121
self._this_name = this_name
4122
self._dep_version = dep_version
4123
self._feature = None
4126
if self._feature is None:
4127
msg = (self._dep_version % self._this_name) + (
4128
' Use %s.%s instead.' % (self._module, self._name))
4129
symbol_versioning.warn(msg, DeprecationWarning)
4130
mod = __import__(self._module, {}, {}, [self._name])
4131
self._feature = getattr(mod, self._name)
4135
return self._feature._probe()
4138
class ModuleAvailableFeature(Feature):
4139
"""This is a feature than describes a module we want to be available.
4141
Declare the name of the module in __init__(), and then after probing, the
4142
module will be available as 'self.module'.
4144
:ivar module: The module if it is available, else None.
4147
def __init__(self, module_name):
4148
super(ModuleAvailableFeature, self).__init__()
4149
self.module_name = module_name
4153
self._module = __import__(self.module_name, {}, {}, [''])
4160
if self.available(): # Make sure the probe has been done
4164
def feature_name(self):
4165
return self.module_name
4168
# This is kept here for compatibility, it is recommended to use
4169
# 'bzrlib.tests.feature.paramiko' instead
4170
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
4171
'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4174
def probe_unicode_in_user_encoding():
4175
"""Try to encode several unicode strings to use in unicode-aware tests.
4176
Return first successfull match.
4178
:return: (unicode value, encoded plain string value) or (None, None)
4180
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4181
for uni_val in possible_vals:
4183
str_val = uni_val.encode(osutils.get_user_encoding())
4184
except UnicodeEncodeError:
4185
# Try a different character
4188
return uni_val, str_val
4192
def probe_bad_non_ascii(encoding):
4193
"""Try to find [bad] character with code [128..255]
4194
that cannot be decoded to unicode in some encoding.
4195
Return None if all non-ascii characters is valid
4198
for i in xrange(128, 256):
4201
char.decode(encoding)
4202
except UnicodeDecodeError:
4207
class _HTTPSServerFeature(Feature):
4208
"""Some tests want an https Server, check if one is available.
4210
Right now, the only way this is available is under python2.6 which provides
4221
def feature_name(self):
4222
return 'HTTPSServer'
4225
HTTPSServerFeature = _HTTPSServerFeature()
4228
class _UnicodeFilename(Feature):
4229
"""Does the filesystem support Unicode filenames?"""
4234
except UnicodeEncodeError:
4236
except (IOError, OSError):
4237
# The filesystem allows the Unicode filename but the file doesn't
4241
# The filesystem allows the Unicode filename and the file exists,
4245
UnicodeFilename = _UnicodeFilename()
4248
class _UTF8Filesystem(Feature):
4249
"""Is the filesystem UTF-8?"""
4252
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4256
UTF8Filesystem = _UTF8Filesystem()
4259
class _BreakinFeature(Feature):
4260
"""Does this platform support the breakin feature?"""
4263
from bzrlib import breakin
4264
if breakin.determine_signal() is None:
4266
if sys.platform == 'win32':
4267
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4268
# We trigger SIGBREAK via a Console api so we need ctypes to
4269
# access the function
4276
def feature_name(self):
4277
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4280
BreakinFeature = _BreakinFeature()
4283
class _CaseInsCasePresFilenameFeature(Feature):
4284
"""Is the file-system case insensitive, but case-preserving?"""
4287
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4289
# first check truly case-preserving for created files, then check
4290
# case insensitive when opening existing files.
4291
name = osutils.normpath(name)
4292
base, rel = osutils.split(name)
4293
found_rel = osutils.canonical_relpath(base, name)
4294
return (found_rel == rel
4295
and os.path.isfile(name.upper())
4296
and os.path.isfile(name.lower()))
4301
def feature_name(self):
4302
return "case-insensitive case-preserving filesystem"
4304
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4307
class _CaseInsensitiveFilesystemFeature(Feature):
4308
"""Check if underlying filesystem is case-insensitive but *not* case
4311
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4312
# more likely to be case preserving, so this case is rare.
4315
if CaseInsCasePresFilenameFeature.available():
4318
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4319
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4320
TestCaseWithMemoryTransport.TEST_ROOT = root
4322
root = TestCaseWithMemoryTransport.TEST_ROOT
4323
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4325
name_a = osutils.pathjoin(tdir, 'a')
4326
name_A = osutils.pathjoin(tdir, 'A')
4328
result = osutils.isdir(name_A)
4329
_rmtree_temp_dir(tdir)
4332
def feature_name(self):
4333
return 'case-insensitive filesystem'
4335
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4338
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4339
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4340
'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4341
# Only define SubUnitBzrRunner if subunit is available.
4343
from subunit import TestProtocolClient
4344
from subunit.test_results import AutoTimingTestResultDecorator
4345
class SubUnitBzrRunner(TextTestRunner):
4346
def run(self, test):
4347
result = AutoTimingTestResultDecorator(
4348
TestProtocolClient(self.stream))