1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
65
import bzrlib.commands
66
import bzrlib.timestamp
68
import bzrlib.inventory
69
import bzrlib.iterablefile
74
# lsprof not available
76
from bzrlib.merge import merge_inner
79
from bzrlib.smart import client, server
81
from bzrlib import symbol_versioning
82
from bzrlib.symbol_versioning import (
89
from bzrlib.transport import get_transport
90
import bzrlib.transport
91
from bzrlib.transport.local import LocalURLServer
92
from bzrlib.transport.memory import MemoryServer
93
from bzrlib.transport.readonly import ReadonlyServer
94
from bzrlib.trace import mutter, note
95
from bzrlib.tests import TestUtil
96
from bzrlib.tests.http_server import HttpServer
97
from bzrlib.tests.TestUtil import (
101
from bzrlib.tests.treeshape import build_tree_contents
102
import bzrlib.version_info_formats.format_custom
103
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
105
# Mark this python module as being part of the implementation
106
# of unittest: this gives us better tracebacks where the last
107
# shown frame is the test code, not our assertXYZ.
110
default_transport = LocalURLServer
113
class ExtendedTestResult(unittest._TextTestResult):
114
"""Accepts, reports and accumulates the results of running tests.
116
Compared to the unittest version this class adds support for
117
profiling, benchmarking, stopping as soon as a test fails, and
118
skipping tests. There are further-specialized subclasses for
119
different types of display.
121
When a test finishes, in whatever way, it calls one of the addSuccess,
122
addFailure or addError classes. These in turn may redirect to a more
123
specific case for the special test results supported by our extended
126
Note that just one of these objects is fed the results from many tests.
131
def __init__(self, stream, descriptions, verbosity,
135
"""Construct new TestResult.
137
:param bench_history: Optionally, a writable file object to accumulate
140
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
141
if bench_history is not None:
142
from bzrlib.version import _get_bzr_source_tree
143
src_tree = _get_bzr_source_tree()
146
revision_id = src_tree.get_parent_ids()[0]
148
# XXX: if this is a brand new tree, do the same as if there
152
# XXX: If there's no branch, what should we do?
154
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
155
self._bench_history = bench_history
156
self.ui = ui.ui_factory
157
self.num_tests = num_tests
159
self.failure_count = 0
160
self.known_failure_count = 0
162
self.not_applicable_count = 0
163
self.unsupported = {}
165
self._overall_start_time = time.time()
167
def _extractBenchmarkTime(self, testCase):
168
"""Add a benchmark time for the current test case."""
169
return getattr(testCase, "_benchtime", None)
171
def _elapsedTestTimeString(self):
172
"""Return a time string for the overall time the current test has taken."""
173
return self._formatTime(time.time() - self._start_time)
175
def _testTimeString(self, testCase):
176
benchmark_time = self._extractBenchmarkTime(testCase)
177
if benchmark_time is not None:
179
self._formatTime(benchmark_time),
180
self._elapsedTestTimeString())
182
return " %s" % self._elapsedTestTimeString()
184
def _formatTime(self, seconds):
185
"""Format seconds as milliseconds with leading spaces."""
186
# some benchmarks can take thousands of seconds to run, so we need 8
188
return "%8dms" % (1000 * seconds)
190
def _shortened_test_description(self, test):
192
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
195
def startTest(self, test):
196
unittest.TestResult.startTest(self, test)
197
self.report_test_start(test)
198
test.number = self.count
199
self._recordTestStartTime()
201
def _recordTestStartTime(self):
202
"""Record that a test has started."""
203
self._start_time = time.time()
205
def _cleanupLogFile(self, test):
206
# We can only do this if we have one of our TestCases, not if
208
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
209
if setKeepLogfile is not None:
212
def addError(self, test, err):
213
"""Tell result that test finished with an error.
215
Called from the TestCase run() method when the test
216
fails with an unexpected error.
218
self._testConcluded(test)
219
if isinstance(err[1], TestSkipped):
220
return self._addSkipped(test, err)
221
elif isinstance(err[1], UnavailableFeature):
222
return self.addNotSupported(test, err[1].args[0])
224
unittest.TestResult.addError(self, test, err)
225
self.error_count += 1
226
self.report_error(test, err)
229
self._cleanupLogFile(test)
231
def addFailure(self, test, err):
232
"""Tell result that test failed.
234
Called from the TestCase run() method when the test
235
fails because e.g. an assert() method failed.
237
self._testConcluded(test)
238
if isinstance(err[1], KnownFailure):
239
return self._addKnownFailure(test, err)
241
unittest.TestResult.addFailure(self, test, err)
242
self.failure_count += 1
243
self.report_failure(test, err)
246
self._cleanupLogFile(test)
248
def addSuccess(self, test):
249
"""Tell result that test completed successfully.
251
Called from the TestCase run()
253
self._testConcluded(test)
254
if self._bench_history is not None:
255
benchmark_time = self._extractBenchmarkTime(test)
256
if benchmark_time is not None:
257
self._bench_history.write("%s %s\n" % (
258
self._formatTime(benchmark_time),
260
self.report_success(test)
261
self._cleanupLogFile(test)
262
unittest.TestResult.addSuccess(self, test)
263
test._log_contents = ''
265
def _testConcluded(self, test):
266
"""Common code when a test has finished.
268
Called regardless of whether it succeded, failed, etc.
272
def _addKnownFailure(self, test, err):
273
self.known_failure_count += 1
274
self.report_known_failure(test, err)
276
def addNotSupported(self, test, feature):
277
"""The test will not be run because of a missing feature.
279
# this can be called in two different ways: it may be that the
280
# test started running, and then raised (through addError)
281
# UnavailableFeature. Alternatively this method can be called
282
# while probing for features before running the tests; in that
283
# case we will see startTest and stopTest, but the test will never
285
self.unsupported.setdefault(str(feature), 0)
286
self.unsupported[str(feature)] += 1
287
self.report_unsupported(test, feature)
289
def _addSkipped(self, test, skip_excinfo):
290
if isinstance(skip_excinfo[1], TestNotApplicable):
291
self.not_applicable_count += 1
292
self.report_not_applicable(test, skip_excinfo)
295
self.report_skip(test, skip_excinfo)
298
except KeyboardInterrupt:
301
self.addError(test, test._exc_info())
303
# seems best to treat this as success from point-of-view of unittest
304
# -- it actually does nothing so it barely matters :)
305
unittest.TestResult.addSuccess(self, test)
306
test._log_contents = ''
308
def printErrorList(self, flavour, errors):
309
for test, err in errors:
310
self.stream.writeln(self.separator1)
311
self.stream.write("%s: " % flavour)
312
self.stream.writeln(self.getDescription(test))
313
if getattr(test, '_get_log', None) is not None:
314
self.stream.write('\n')
316
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
317
self.stream.write('\n')
318
self.stream.write(test._get_log())
319
self.stream.write('\n')
321
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
322
self.stream.write('\n')
323
self.stream.writeln(self.separator2)
324
self.stream.writeln("%s" % err)
329
def report_cleaning_up(self):
332
def report_success(self, test):
335
def wasStrictlySuccessful(self):
336
if self.unsupported or self.known_failure_count:
338
return self.wasSuccessful()
341
class TextTestResult(ExtendedTestResult):
342
"""Displays progress and results of tests in text form"""
344
def __init__(self, stream, descriptions, verbosity,
349
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
350
bench_history, num_tests)
352
self.pb = self.ui.nested_progress_bar()
353
self._supplied_pb = False
356
self._supplied_pb = True
357
self.pb.show_pct = False
358
self.pb.show_spinner = False
359
self.pb.show_eta = False,
360
self.pb.show_count = False
361
self.pb.show_bar = False
363
def report_starting(self):
364
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
366
def _progress_prefix_text(self):
367
# the longer this text, the less space we have to show the test
369
a = '[%d' % self.count # total that have been run
370
# tests skipped as known not to be relevant are not important enough
372
## if self.skip_count:
373
## a += ', %d skip' % self.skip_count
374
## if self.known_failure_count:
375
## a += '+%dX' % self.known_failure_count
376
if self.num_tests is not None:
377
a +='/%d' % self.num_tests
379
runtime = time.time() - self._overall_start_time
381
a += '%dm%ds' % (runtime / 60, runtime % 60)
385
a += ', %d err' % self.error_count
386
if self.failure_count:
387
a += ', %d fail' % self.failure_count
389
a += ', %d missing' % len(self.unsupported)
393
def report_test_start(self, test):
396
self._progress_prefix_text()
398
+ self._shortened_test_description(test))
400
def _test_description(self, test):
401
return self._shortened_test_description(test)
403
def report_error(self, test, err):
404
self.pb.note('ERROR: %s\n %s\n',
405
self._test_description(test),
409
def report_failure(self, test, err):
410
self.pb.note('FAIL: %s\n %s\n',
411
self._test_description(test),
415
def report_known_failure(self, test, err):
416
self.pb.note('XFAIL: %s\n%s\n',
417
self._test_description(test), err[1])
419
def report_skip(self, test, skip_excinfo):
422
def report_not_applicable(self, test, skip_excinfo):
425
def report_unsupported(self, test, feature):
426
"""test cannot be run because feature is missing."""
428
def report_cleaning_up(self):
429
self.pb.update('cleaning up...')
432
if not self._supplied_pb:
436
class VerboseTestResult(ExtendedTestResult):
437
"""Produce long output, with one line per test run plus times"""
439
def _ellipsize_to_right(self, a_string, final_width):
440
"""Truncate and pad a string, keeping the right hand side"""
441
if len(a_string) > final_width:
442
result = '...' + a_string[3-final_width:]
445
return result.ljust(final_width)
447
def report_starting(self):
448
self.stream.write('running %d tests...\n' % self.num_tests)
450
def report_test_start(self, test):
452
name = self._shortened_test_description(test)
453
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
454
# numbers, plus a trailing blank
455
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
456
self.stream.write(self._ellipsize_to_right(name,
457
osutils.terminal_width()-30))
460
def _error_summary(self, err):
462
return '%s%s' % (indent, err[1])
464
def report_error(self, test, err):
465
self.stream.writeln('ERROR %s\n%s'
466
% (self._testTimeString(test),
467
self._error_summary(err)))
469
def report_failure(self, test, err):
470
self.stream.writeln(' FAIL %s\n%s'
471
% (self._testTimeString(test),
472
self._error_summary(err)))
474
def report_known_failure(self, test, err):
475
self.stream.writeln('XFAIL %s\n%s'
476
% (self._testTimeString(test),
477
self._error_summary(err)))
479
def report_success(self, test):
480
self.stream.writeln(' OK %s' % self._testTimeString(test))
481
for bench_called, stats in getattr(test, '_benchcalls', []):
482
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
483
stats.pprint(file=self.stream)
484
# flush the stream so that we get smooth output. This verbose mode is
485
# used to show the output in PQM.
488
def report_skip(self, test, skip_excinfo):
489
self.stream.writeln(' SKIP %s\n%s'
490
% (self._testTimeString(test),
491
self._error_summary(skip_excinfo)))
493
def report_not_applicable(self, test, skip_excinfo):
494
self.stream.writeln(' N/A %s\n%s'
495
% (self._testTimeString(test),
496
self._error_summary(skip_excinfo)))
498
def report_unsupported(self, test, feature):
499
"""test cannot be run because feature is missing."""
500
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
501
%(self._testTimeString(test), feature))
504
class TextTestRunner(object):
505
stop_on_failure = False
514
self.stream = unittest._WritelnDecorator(stream)
515
self.descriptions = descriptions
516
self.verbosity = verbosity
517
self._bench_history = bench_history
518
self.list_only = list_only
521
"Run the given test case or test suite."
522
startTime = time.time()
523
if self.verbosity == 1:
524
result_class = TextTestResult
525
elif self.verbosity >= 2:
526
result_class = VerboseTestResult
527
result = result_class(self.stream,
530
bench_history=self._bench_history,
531
num_tests=test.countTestCases(),
533
result.stop_early = self.stop_on_failure
534
result.report_starting()
536
if self.verbosity >= 2:
537
self.stream.writeln("Listing tests only ...\n")
539
for t in iter_suite_tests(test):
540
self.stream.writeln("%s" % (t.id()))
542
actionTaken = "Listed"
545
run = result.testsRun
547
stopTime = time.time()
548
timeTaken = stopTime - startTime
550
self.stream.writeln(result.separator2)
551
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
552
run, run != 1 and "s" or "", timeTaken))
553
self.stream.writeln()
554
if not result.wasSuccessful():
555
self.stream.write("FAILED (")
556
failed, errored = map(len, (result.failures, result.errors))
558
self.stream.write("failures=%d" % failed)
560
if failed: self.stream.write(", ")
561
self.stream.write("errors=%d" % errored)
562
if result.known_failure_count:
563
if failed or errored: self.stream.write(", ")
564
self.stream.write("known_failure_count=%d" %
565
result.known_failure_count)
566
self.stream.writeln(")")
568
if result.known_failure_count:
569
self.stream.writeln("OK (known_failures=%d)" %
570
result.known_failure_count)
572
self.stream.writeln("OK")
573
if result.skip_count > 0:
574
skipped = result.skip_count
575
self.stream.writeln('%d test%s skipped' %
576
(skipped, skipped != 1 and "s" or ""))
577
if result.unsupported:
578
for feature, count in sorted(result.unsupported.items()):
579
self.stream.writeln("Missing feature '%s' skipped %d tests." %
585
def iter_suite_tests(suite):
586
"""Return all tests in a suite, recursing through nested suites"""
587
for item in suite._tests:
588
if isinstance(item, unittest.TestCase):
590
elif isinstance(item, unittest.TestSuite):
591
for r in iter_suite_tests(item):
594
raise Exception('unknown object %r inside test suite %r'
598
class TestSkipped(Exception):
599
"""Indicates that a test was intentionally skipped, rather than failing."""
602
class TestNotApplicable(TestSkipped):
603
"""A test is not applicable to the situation where it was run.
605
This is only normally raised by parameterized tests, if they find that
606
the instance they're constructed upon does not support one aspect
611
class KnownFailure(AssertionError):
612
"""Indicates that a test failed in a precisely expected manner.
614
Such failures dont block the whole test suite from passing because they are
615
indicators of partially completed code or of future work. We have an
616
explicit error for them so that we can ensure that they are always visible:
617
KnownFailures are always shown in the output of bzr selftest.
621
class UnavailableFeature(Exception):
622
"""A feature required for this test was not available.
624
The feature should be used to construct the exception.
628
class CommandFailed(Exception):
632
class StringIOWrapper(object):
633
"""A wrapper around cStringIO which just adds an encoding attribute.
635
Internally we can check sys.stdout to see what the output encoding
636
should be. However, cStringIO has no encoding attribute that we can
637
set. So we wrap it instead.
642
def __init__(self, s=None):
644
self.__dict__['_cstring'] = StringIO(s)
646
self.__dict__['_cstring'] = StringIO()
648
def __getattr__(self, name, getattr=getattr):
649
return getattr(self.__dict__['_cstring'], name)
651
def __setattr__(self, name, val):
652
if name == 'encoding':
653
self.__dict__['encoding'] = val
655
return setattr(self._cstring, name, val)
658
class TestUIFactory(ui.CLIUIFactory):
659
"""A UI Factory for testing.
661
Hide the progress bar but emit note()s.
663
Allows get_password to be tested without real tty attached.
670
super(TestUIFactory, self).__init__()
671
if stdin is not None:
672
# We use a StringIOWrapper to be able to test various
673
# encodings, but the user is still responsible to
674
# encode the string and to set the encoding attribute
675
# of StringIOWrapper.
676
self.stdin = StringIOWrapper(stdin)
678
self.stdout = sys.stdout
682
self.stderr = sys.stderr
687
"""See progress.ProgressBar.clear()."""
689
def clear_term(self):
690
"""See progress.ProgressBar.clear_term()."""
692
def clear_term(self):
693
"""See progress.ProgressBar.clear_term()."""
696
"""See progress.ProgressBar.finished()."""
698
def note(self, fmt_string, *args, **kwargs):
699
"""See progress.ProgressBar.note()."""
700
self.stdout.write((fmt_string + "\n") % args)
702
def progress_bar(self):
705
def nested_progress_bar(self):
708
def update(self, message, count=None, total=None):
709
"""See progress.ProgressBar.update()."""
711
def get_non_echoed_password(self, prompt):
712
"""Get password from stdin without trying to handle the echo mode"""
714
self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
715
password = self.stdin.readline()
718
if password[-1] == '\n':
719
password = password[:-1]
723
def _report_leaked_threads():
724
bzrlib.trace.warning('%s is leaking threads among %d leaking tests',
725
TestCase._first_thread_leaker_id,
726
TestCase._leaking_threads_tests)
729
class TestCase(unittest.TestCase):
730
"""Base class for bzr unit tests.
732
Tests that need access to disk resources should subclass
733
TestCaseInTempDir not TestCase.
735
Error and debug log messages are redirected from their usual
736
location into a temporary file, the contents of which can be
737
retrieved by _get_log(). We use a real OS file, not an in-memory object,
738
so that it can also capture file IO. When the test completes this file
739
is read into memory and removed from disk.
741
There are also convenience functions to invoke bzr's command-line
742
routine, and to build and check bzr trees.
744
In addition to the usual method of overriding tearDown(), this class also
745
allows subclasses to register functions into the _cleanups list, which is
746
run in order as the object is torn down. It's less likely this will be
747
accidentally overlooked.
750
_active_threads = None
751
_leaking_threads_tests = 0
752
_first_thread_leaker_id = None
753
_log_file_name = None
755
_keep_log_file = False
756
# record lsprof data when performing benchmark calls.
757
_gather_lsprof_in_benchmarks = False
758
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
759
'_log_contents', '_log_file_name', '_benchtime',
760
'_TestCase__testMethodName')
762
def __init__(self, methodName='testMethod'):
763
super(TestCase, self).__init__(methodName)
767
unittest.TestCase.setUp(self)
768
self._cleanEnvironment()
771
self._benchcalls = []
772
self._benchtime = None
774
self._clear_debug_flags()
775
TestCase._active_threads = threading.activeCount()
776
self.addCleanup(self._check_leaked_threads)
778
def _check_leaked_threads(self):
779
active = threading.activeCount()
780
leaked_threads = active - TestCase._active_threads
781
TestCase._active_threads = active
783
TestCase._leaking_threads_tests += 1
784
if TestCase._first_thread_leaker_id is None:
785
TestCase._first_thread_leaker_id = self.id()
786
# we're not specifically told when all tests are finished.
787
# This will do. We use a function to avoid keeping a reference
788
# to a TestCase object.
789
atexit.register(_report_leaked_threads)
791
def _clear_debug_flags(self):
792
"""Prevent externally set debug flags affecting tests.
794
Tests that want to use debug flags can just set them in the
795
debug_flags set during setup/teardown.
797
self._preserved_debug_flags = set(debug.debug_flags)
798
if 'allow_debug' not in selftest_debug_flags:
799
debug.debug_flags.clear()
800
self.addCleanup(self._restore_debug_flags)
802
def _clear_hooks(self):
803
# prevent hooks affecting tests
805
import bzrlib.smart.client
806
import bzrlib.smart.server
807
self._preserved_hooks = {
808
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
809
bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
810
bzrlib.smart.client._SmartClient: bzrlib.smart.client._SmartClient.hooks,
811
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
812
bzrlib.commands.Command: bzrlib.commands.Command.hooks,
814
self.addCleanup(self._restoreHooks)
815
# reset all hooks to an empty instance of the appropriate type
816
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
817
bzrlib.smart.client._SmartClient.hooks = bzrlib.smart.client.SmartClientHooks()
818
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
819
bzrlib.commands.Command.hooks = bzrlib.commands.CommandHooks()
821
def _silenceUI(self):
822
"""Turn off UI for duration of test"""
823
# by default the UI is off; tests can turn it on if they want it.
824
saved = ui.ui_factory
826
ui.ui_factory = saved
827
ui.ui_factory = ui.SilentUIFactory()
828
self.addCleanup(_restore)
830
def _ndiff_strings(self, a, b):
831
"""Return ndiff between two strings containing lines.
833
A trailing newline is added if missing to make the strings
835
if b and b[-1] != '\n':
837
if a and a[-1] != '\n':
839
difflines = difflib.ndiff(a.splitlines(True),
841
linejunk=lambda x: False,
842
charjunk=lambda x: False)
843
return ''.join(difflines)
845
def assertEqual(self, a, b, message=''):
849
except UnicodeError, e:
850
# If we can't compare without getting a UnicodeError, then
851
# obviously they are different
852
mutter('UnicodeError: %s', e)
855
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
857
pformat(a), pformat(b)))
859
assertEquals = assertEqual
861
def assertEqualDiff(self, a, b, message=None):
862
"""Assert two texts are equal, if not raise an exception.
864
This is intended for use with multi-line strings where it can
865
be hard to find the differences by eye.
867
# TODO: perhaps override assertEquals to call this for strings?
871
message = "texts not equal:\n"
873
message = 'first string is missing a final newline.\n'
875
message = 'second string is missing a final newline.\n'
876
raise AssertionError(message +
877
self._ndiff_strings(a, b))
879
def assertEqualMode(self, mode, mode_test):
880
self.assertEqual(mode, mode_test,
881
'mode mismatch %o != %o' % (mode, mode_test))
883
def assertEqualStat(self, expected, actual):
884
"""assert that expected and actual are the same stat result.
886
:param expected: A stat result.
887
:param actual: A stat result.
888
:raises AssertionError: If the expected and actual stat values differ
891
self.assertEqual(expected.st_size, actual.st_size)
892
self.assertEqual(expected.st_mtime, actual.st_mtime)
893
self.assertEqual(expected.st_ctime, actual.st_ctime)
894
self.assertEqual(expected.st_dev, actual.st_dev)
895
self.assertEqual(expected.st_ino, actual.st_ino)
896
self.assertEqual(expected.st_mode, actual.st_mode)
898
def assertPositive(self, val):
899
"""Assert that val is greater than 0."""
900
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
902
def assertNegative(self, val):
903
"""Assert that val is less than 0."""
904
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
906
def assertStartsWith(self, s, prefix):
907
if not s.startswith(prefix):
908
raise AssertionError('string %r does not start with %r' % (s, prefix))
910
def assertEndsWith(self, s, suffix):
911
"""Asserts that s ends with suffix."""
912
if not s.endswith(suffix):
913
raise AssertionError('string %r does not end with %r' % (s, suffix))
915
def assertContainsRe(self, haystack, needle_re, flags=0):
916
"""Assert that a contains something matching a regular expression."""
917
if not re.search(needle_re, haystack, flags):
918
if '\n' in haystack or len(haystack) > 60:
919
# a long string, format it in a more readable way
920
raise AssertionError(
921
'pattern "%s" not found in\n"""\\\n%s"""\n'
922
% (needle_re, haystack))
924
raise AssertionError('pattern "%s" not found in "%s"'
925
% (needle_re, haystack))
927
def assertNotContainsRe(self, haystack, needle_re, flags=0):
928
"""Assert that a does not match a regular expression"""
929
if re.search(needle_re, haystack, flags):
930
raise AssertionError('pattern "%s" found in "%s"'
931
% (needle_re, haystack))
933
def assertSubset(self, sublist, superlist):
934
"""Assert that every entry in sublist is present in superlist."""
935
missing = set(sublist) - set(superlist)
937
raise AssertionError("value(s) %r not present in container %r" %
938
(missing, superlist))
940
def assertListRaises(self, excClass, func, *args, **kwargs):
941
"""Fail unless excClass is raised when the iterator from func is used.
943
Many functions can return generators this makes sure
944
to wrap them in a list() call to make sure the whole generator
945
is run, and that the proper exception is raised.
948
list(func(*args, **kwargs))
952
if getattr(excClass,'__name__', None) is not None:
953
excName = excClass.__name__
955
excName = str(excClass)
956
raise self.failureException, "%s not raised" % excName
958
def assertRaises(self, excClass, callableObj, *args, **kwargs):
959
"""Assert that a callable raises a particular exception.
961
:param excClass: As for the except statement, this may be either an
962
exception class, or a tuple of classes.
963
:param callableObj: A callable, will be passed ``*args`` and
966
Returns the exception so that you can examine it.
969
callableObj(*args, **kwargs)
973
if getattr(excClass,'__name__', None) is not None:
974
excName = excClass.__name__
977
excName = str(excClass)
978
raise self.failureException, "%s not raised" % excName
980
def assertIs(self, left, right, message=None):
981
if not (left is right):
982
if message is not None:
983
raise AssertionError(message)
985
raise AssertionError("%r is not %r." % (left, right))
987
def assertIsNot(self, left, right, message=None):
989
if message is not None:
990
raise AssertionError(message)
992
raise AssertionError("%r is %r." % (left, right))
994
def assertTransportMode(self, transport, path, mode):
995
"""Fail if a path does not have mode mode.
997
If modes are not supported on this transport, the assertion is ignored.
999
if not transport._can_roundtrip_unix_modebits():
1001
path_stat = transport.stat(path)
1002
actual_mode = stat.S_IMODE(path_stat.st_mode)
1003
self.assertEqual(mode, actual_mode,
1004
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
1006
def assertIsSameRealPath(self, path1, path2):
1007
"""Fail if path1 and path2 points to different files"""
1008
self.assertEqual(osutils.realpath(path1),
1009
osutils.realpath(path2),
1010
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1012
def assertIsInstance(self, obj, kls):
1013
"""Fail if obj is not an instance of kls"""
1014
if not isinstance(obj, kls):
1015
self.fail("%r is an instance of %s rather than %s" % (
1016
obj, obj.__class__, kls))
1018
def expectFailure(self, reason, assertion, *args, **kwargs):
1019
"""Invoke a test, expecting it to fail for the given reason.
1021
This is for assertions that ought to succeed, but currently fail.
1022
(The failure is *expected* but not *wanted*.) Please be very precise
1023
about the failure you're expecting. If a new bug is introduced,
1024
AssertionError should be raised, not KnownFailure.
1026
Frequently, expectFailure should be followed by an opposite assertion.
1029
Intended to be used with a callable that raises AssertionError as the
1030
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1032
Raises KnownFailure if the test fails. Raises AssertionError if the
1037
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1039
self.assertEqual(42, dynamic_val)
1041
This means that a dynamic_val of 54 will cause the test to raise
1042
a KnownFailure. Once math is fixed and the expectFailure is removed,
1043
only a dynamic_val of 42 will allow the test to pass. Anything other
1044
than 54 or 42 will cause an AssertionError.
1047
assertion(*args, **kwargs)
1048
except AssertionError:
1049
raise KnownFailure(reason)
1051
self.fail('Unexpected success. Should have failed: %s' % reason)
1053
def assertFileEqual(self, content, path):
1054
"""Fail if path does not contain 'content'."""
1055
self.failUnlessExists(path)
1056
f = file(path, 'rb')
1061
self.assertEqualDiff(content, s)
1063
def failUnlessExists(self, path):
1064
"""Fail unless path or paths, which may be abs or relative, exist."""
1065
if not isinstance(path, basestring):
1067
self.failUnlessExists(p)
1069
self.failUnless(osutils.lexists(path),path+" does not exist")
1071
def failIfExists(self, path):
1072
"""Fail if path or paths, which may be abs or relative, exist."""
1073
if not isinstance(path, basestring):
1075
self.failIfExists(p)
1077
self.failIf(osutils.lexists(path),path+" exists")
1079
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1080
"""A helper for callDeprecated and applyDeprecated.
1082
:param a_callable: A callable to call.
1083
:param args: The positional arguments for the callable
1084
:param kwargs: The keyword arguments for the callable
1085
:return: A tuple (warnings, result). result is the result of calling
1086
a_callable(``*args``, ``**kwargs``).
1089
def capture_warnings(msg, cls=None, stacklevel=None):
1090
# we've hooked into a deprecation specific callpath,
1091
# only deprecations should getting sent via it.
1092
self.assertEqual(cls, DeprecationWarning)
1093
local_warnings.append(msg)
1094
original_warning_method = symbol_versioning.warn
1095
symbol_versioning.set_warning_method(capture_warnings)
1097
result = a_callable(*args, **kwargs)
1099
symbol_versioning.set_warning_method(original_warning_method)
1100
return (local_warnings, result)
1102
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1103
"""Call a deprecated callable without warning the user.
1105
Note that this only captures warnings raised by symbol_versioning.warn,
1106
not other callers that go direct to the warning module.
1108
To test that a deprecated method raises an error, do something like
1111
self.assertRaises(errors.ReservedId,
1112
self.applyDeprecated,
1113
deprecated_in((1, 5, 0)),
1117
:param deprecation_format: The deprecation format that the callable
1118
should have been deprecated with. This is the same type as the
1119
parameter to deprecated_method/deprecated_function. If the
1120
callable is not deprecated with this format, an assertion error
1122
:param a_callable: A callable to call. This may be a bound method or
1123
a regular function. It will be called with ``*args`` and
1125
:param args: The positional arguments for the callable
1126
:param kwargs: The keyword arguments for the callable
1127
:return: The result of a_callable(``*args``, ``**kwargs``)
1129
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1131
expected_first_warning = symbol_versioning.deprecation_string(
1132
a_callable, deprecation_format)
1133
if len(call_warnings) == 0:
1134
self.fail("No deprecation warning generated by call to %s" %
1136
self.assertEqual(expected_first_warning, call_warnings[0])
1139
def callCatchWarnings(self, fn, *args, **kw):
1140
"""Call a callable that raises python warnings.
1142
The caller's responsible for examining the returned warnings.
1144
If the callable raises an exception, the exception is not
1145
caught and propagates up to the caller. In that case, the list
1146
of warnings is not available.
1148
:returns: ([warning_object, ...], fn_result)
1150
# XXX: This is not perfect, because it completely overrides the
1151
# warnings filters, and some code may depend on suppressing particular
1152
# warnings. It's the easiest way to insulate ourselves from -Werror,
1153
# though. -- Andrew, 20071062
1155
def _catcher(message, category, filename, lineno, file=None, line=None):
1156
# despite the name, 'message' is normally(?) a Warning subclass
1158
wlist.append(message)
1159
saved_showwarning = warnings.showwarning
1160
saved_filters = warnings.filters
1162
warnings.showwarning = _catcher
1163
warnings.filters = []
1164
result = fn(*args, **kw)
1166
warnings.showwarning = saved_showwarning
1167
warnings.filters = saved_filters
1168
return wlist, result
1170
def callDeprecated(self, expected, callable, *args, **kwargs):
1171
"""Assert that a callable is deprecated in a particular way.
1173
This is a very precise test for unusual requirements. The
1174
applyDeprecated helper function is probably more suited for most tests
1175
as it allows you to simply specify the deprecation format being used
1176
and will ensure that that is issued for the function being called.
1178
Note that this only captures warnings raised by symbol_versioning.warn,
1179
not other callers that go direct to the warning module. To catch
1180
general warnings, use callCatchWarnings.
1182
:param expected: a list of the deprecation warnings expected, in order
1183
:param callable: The callable to call
1184
:param args: The positional arguments for the callable
1185
:param kwargs: The keyword arguments for the callable
1187
call_warnings, result = self._capture_deprecation_warnings(callable,
1189
self.assertEqual(expected, call_warnings)
1192
def _startLogFile(self):
1193
"""Send bzr and test log messages to a temporary file.
1195
The file is removed as the test is torn down.
1197
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1198
self._log_file = os.fdopen(fileno, 'w+')
1199
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1200
self._log_file_name = name
1201
self.addCleanup(self._finishLogFile)
1203
def _finishLogFile(self):
1204
"""Finished with the log file.
1206
Close the file and delete it, unless setKeepLogfile was called.
1208
if self._log_file is None:
1210
bzrlib.trace.pop_log_file(self._log_memento)
1211
self._log_file.close()
1212
self._log_file = None
1213
if not self._keep_log_file:
1214
os.remove(self._log_file_name)
1215
self._log_file_name = None
1217
def setKeepLogfile(self):
1218
"""Make the logfile not be deleted when _finishLogFile is called."""
1219
self._keep_log_file = True
1221
def addCleanup(self, callable, *args, **kwargs):
1222
"""Arrange to run a callable when this case is torn down.
1224
Callables are run in the reverse of the order they are registered,
1225
ie last-in first-out.
1227
self._cleanups.append((callable, args, kwargs))
1229
def _cleanEnvironment(self):
1231
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1232
'HOME': os.getcwd(),
1233
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1234
# tests do check our impls match APPDATA
1235
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1237
'BZREMAIL': None, # may still be present in the environment
1239
'BZR_PROGRESS_BAR': None,
1241
'BZR_PLUGIN_PATH': None,
1243
'SSH_AUTH_SOCK': None,
1247
'https_proxy': None,
1248
'HTTPS_PROXY': None,
1253
# Nobody cares about these ones AFAIK. So far at
1254
# least. If you do (care), please update this comment
1258
'BZR_REMOTE_PATH': None,
1261
self.addCleanup(self._restoreEnvironment)
1262
for name, value in new_env.iteritems():
1263
self._captureVar(name, value)
1265
def _captureVar(self, name, newvalue):
1266
"""Set an environment variable, and reset it when finished."""
1267
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1269
def _restore_debug_flags(self):
1270
debug.debug_flags.clear()
1271
debug.debug_flags.update(self._preserved_debug_flags)
1273
def _restoreEnvironment(self):
1274
for name, value in self.__old_env.iteritems():
1275
osutils.set_or_unset_env(name, value)
1277
def _restoreHooks(self):
1278
for klass, hooks in self._preserved_hooks.items():
1279
setattr(klass, 'hooks', hooks)
1281
def knownFailure(self, reason):
1282
"""This test has failed for some known reason."""
1283
raise KnownFailure(reason)
1285
def run(self, result=None):
1286
if result is None: result = self.defaultTestResult()
1287
for feature in getattr(self, '_test_needs_features', []):
1288
if not feature.available():
1289
result.startTest(self)
1290
if getattr(result, 'addNotSupported', None):
1291
result.addNotSupported(self, feature)
1293
result.addSuccess(self)
1294
result.stopTest(self)
1297
return unittest.TestCase.run(self, result)
1300
absent_attr = object()
1301
for attr_name in self.attrs_to_keep:
1302
attr = getattr(self, attr_name, absent_attr)
1303
if attr is not absent_attr:
1304
saved_attrs[attr_name] = attr
1305
self.__dict__ = saved_attrs
1309
unittest.TestCase.tearDown(self)
1311
def time(self, callable, *args, **kwargs):
1312
"""Run callable and accrue the time it takes to the benchmark time.
1314
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1315
this will cause lsprofile statistics to be gathered and stored in
1318
if self._benchtime is None:
1322
if not self._gather_lsprof_in_benchmarks:
1323
return callable(*args, **kwargs)
1325
# record this benchmark
1326
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1328
self._benchcalls.append(((callable, args, kwargs), stats))
1331
self._benchtime += time.time() - start
1333
def _runCleanups(self):
1334
"""Run registered cleanup functions.
1336
This should only be called from TestCase.tearDown.
1338
# TODO: Perhaps this should keep running cleanups even if
1339
# one of them fails?
1341
# Actually pop the cleanups from the list so tearDown running
1342
# twice is safe (this happens for skipped tests).
1343
while self._cleanups:
1344
cleanup, args, kwargs = self._cleanups.pop()
1345
cleanup(*args, **kwargs)
1347
def log(self, *args):
1350
def _get_log(self, keep_log_file=False):
1351
"""Get the log from bzrlib.trace calls from this test.
1353
:param keep_log_file: When True, if the log is still a file on disk
1354
leave it as a file on disk. When False, if the log is still a file
1355
on disk, the log file is deleted and the log preserved as
1357
:return: A string containing the log.
1359
# flush the log file, to get all content
1361
if bzrlib.trace._trace_file:
1362
bzrlib.trace._trace_file.flush()
1363
if self._log_contents:
1364
# XXX: this can hardly contain the content flushed above --vila
1366
return self._log_contents
1367
if self._log_file_name is not None:
1368
logfile = open(self._log_file_name)
1370
log_contents = logfile.read()
1373
if not keep_log_file:
1374
self._log_contents = log_contents
1376
os.remove(self._log_file_name)
1378
if sys.platform == 'win32' and e.errno == errno.EACCES:
1379
sys.stderr.write(('Unable to delete log file '
1380
' %r\n' % self._log_file_name))
1385
return "DELETED log file to reduce memory footprint"
1387
def requireFeature(self, feature):
1388
"""This test requires a specific feature is available.
1390
:raises UnavailableFeature: When feature is not available.
1392
if not feature.available():
1393
raise UnavailableFeature(feature)
1395
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1397
"""Run bazaar command line, splitting up a string command line."""
1398
if isinstance(args, basestring):
1399
# shlex don't understand unicode strings,
1400
# so args should be plain string (bialix 20070906)
1401
args = list(shlex.split(str(args)))
1402
return self._run_bzr_core(args, retcode=retcode,
1403
encoding=encoding, stdin=stdin, working_dir=working_dir,
1406
def _run_bzr_core(self, args, retcode, encoding, stdin,
1408
if encoding is None:
1409
encoding = osutils.get_user_encoding()
1410
stdout = StringIOWrapper()
1411
stderr = StringIOWrapper()
1412
stdout.encoding = encoding
1413
stderr.encoding = encoding
1415
self.log('run bzr: %r', args)
1416
# FIXME: don't call into logging here
1417
handler = logging.StreamHandler(stderr)
1418
handler.setLevel(logging.INFO)
1419
logger = logging.getLogger('')
1420
logger.addHandler(handler)
1421
old_ui_factory = ui.ui_factory
1422
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1425
if working_dir is not None:
1426
cwd = osutils.getcwd()
1427
os.chdir(working_dir)
1430
result = self.apply_redirected(ui.ui_factory.stdin,
1432
bzrlib.commands.run_bzr_catch_user_errors,
1435
logger.removeHandler(handler)
1436
ui.ui_factory = old_ui_factory
1440
out = stdout.getvalue()
1441
err = stderr.getvalue()
1443
self.log('output:\n%r', out)
1445
self.log('errors:\n%r', err)
1446
if retcode is not None:
1447
self.assertEquals(retcode, result,
1448
message='Unexpected return code')
1451
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1452
working_dir=None, error_regexes=[], output_encoding=None):
1453
"""Invoke bzr, as if it were run from the command line.
1455
The argument list should not include the bzr program name - the
1456
first argument is normally the bzr command. Arguments may be
1457
passed in three ways:
1459
1- A list of strings, eg ["commit", "a"]. This is recommended
1460
when the command contains whitespace or metacharacters, or
1461
is built up at run time.
1463
2- A single string, eg "add a". This is the most convenient
1464
for hardcoded commands.
1466
This runs bzr through the interface that catches and reports
1467
errors, and with logging set to something approximating the
1468
default, so that error reporting can be checked.
1470
This should be the main method for tests that want to exercise the
1471
overall behavior of the bzr application (rather than a unit test
1472
or a functional test of the library.)
1474
This sends the stdout/stderr results into the test's log,
1475
where it may be useful for debugging. See also run_captured.
1477
:keyword stdin: A string to be used as stdin for the command.
1478
:keyword retcode: The status code the command should return;
1480
:keyword working_dir: The directory to run the command in
1481
:keyword error_regexes: A list of expected error messages. If
1482
specified they must be seen in the error output of the command.
1484
out, err = self._run_bzr_autosplit(
1489
working_dir=working_dir,
1491
for regex in error_regexes:
1492
self.assertContainsRe(err, regex)
1495
def run_bzr_error(self, error_regexes, *args, **kwargs):
1496
"""Run bzr, and check that stderr contains the supplied regexes
1498
:param error_regexes: Sequence of regular expressions which
1499
must each be found in the error output. The relative ordering
1501
:param args: command-line arguments for bzr
1502
:param kwargs: Keyword arguments which are interpreted by run_bzr
1503
This function changes the default value of retcode to be 3,
1504
since in most cases this is run when you expect bzr to fail.
1506
:return: (out, err) The actual output of running the command (in case
1507
you want to do more inspection)
1511
# Make sure that commit is failing because there is nothing to do
1512
self.run_bzr_error(['no changes to commit'],
1513
['commit', '-m', 'my commit comment'])
1514
# Make sure --strict is handling an unknown file, rather than
1515
# giving us the 'nothing to do' error
1516
self.build_tree(['unknown'])
1517
self.run_bzr_error(['Commit refused because there are unknown files'],
1518
['commit', --strict', '-m', 'my commit comment'])
1520
kwargs.setdefault('retcode', 3)
1521
kwargs['error_regexes'] = error_regexes
1522
out, err = self.run_bzr(*args, **kwargs)
1525
def run_bzr_subprocess(self, *args, **kwargs):
1526
"""Run bzr in a subprocess for testing.
1528
This starts a new Python interpreter and runs bzr in there.
1529
This should only be used for tests that have a justifiable need for
1530
this isolation: e.g. they are testing startup time, or signal
1531
handling, or early startup code, etc. Subprocess code can't be
1532
profiled or debugged so easily.
1534
:keyword retcode: The status code that is expected. Defaults to 0. If
1535
None is supplied, the status code is not checked.
1536
:keyword env_changes: A dictionary which lists changes to environment
1537
variables. A value of None will unset the env variable.
1538
The values must be strings. The change will only occur in the
1539
child, so you don't need to fix the environment after running.
1540
:keyword universal_newlines: Convert CRLF => LF
1541
:keyword allow_plugins: By default the subprocess is run with
1542
--no-plugins to ensure test reproducibility. Also, it is possible
1543
for system-wide plugins to create unexpected output on stderr,
1544
which can cause unnecessary test failures.
1546
env_changes = kwargs.get('env_changes', {})
1547
working_dir = kwargs.get('working_dir', None)
1548
allow_plugins = kwargs.get('allow_plugins', False)
1550
if isinstance(args[0], list):
1552
elif isinstance(args[0], basestring):
1553
args = list(shlex.split(args[0]))
1555
raise ValueError("passing varargs to run_bzr_subprocess")
1556
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1557
working_dir=working_dir,
1558
allow_plugins=allow_plugins)
1559
# We distinguish between retcode=None and retcode not passed.
1560
supplied_retcode = kwargs.get('retcode', 0)
1561
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1562
universal_newlines=kwargs.get('universal_newlines', False),
1565
def start_bzr_subprocess(self, process_args, env_changes=None,
1566
skip_if_plan_to_signal=False,
1568
allow_plugins=False):
1569
"""Start bzr in a subprocess for testing.
1571
This starts a new Python interpreter and runs bzr in there.
1572
This should only be used for tests that have a justifiable need for
1573
this isolation: e.g. they are testing startup time, or signal
1574
handling, or early startup code, etc. Subprocess code can't be
1575
profiled or debugged so easily.
1577
:param process_args: a list of arguments to pass to the bzr executable,
1578
for example ``['--version']``.
1579
:param env_changes: A dictionary which lists changes to environment
1580
variables. A value of None will unset the env variable.
1581
The values must be strings. The change will only occur in the
1582
child, so you don't need to fix the environment after running.
1583
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1585
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1587
:returns: Popen object for the started process.
1589
if skip_if_plan_to_signal:
1590
if not getattr(os, 'kill', None):
1591
raise TestSkipped("os.kill not available.")
1593
if env_changes is None:
1597
def cleanup_environment():
1598
for env_var, value in env_changes.iteritems():
1599
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1601
def restore_environment():
1602
for env_var, value in old_env.iteritems():
1603
osutils.set_or_unset_env(env_var, value)
1605
bzr_path = self.get_bzr_path()
1608
if working_dir is not None:
1609
cwd = osutils.getcwd()
1610
os.chdir(working_dir)
1613
# win32 subprocess doesn't support preexec_fn
1614
# so we will avoid using it on all platforms, just to
1615
# make sure the code path is used, and we don't break on win32
1616
cleanup_environment()
1617
command = [sys.executable]
1618
# frozen executables don't need the path to bzr
1619
if getattr(sys, "frozen", None) is None:
1620
command.append(bzr_path)
1621
if not allow_plugins:
1622
command.append('--no-plugins')
1623
command.extend(process_args)
1624
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1626
restore_environment()
1632
def _popen(self, *args, **kwargs):
1633
"""Place a call to Popen.
1635
Allows tests to override this method to intercept the calls made to
1636
Popen for introspection.
1638
return Popen(*args, **kwargs)
1640
def get_bzr_path(self):
1641
"""Return the path of the 'bzr' executable for this test suite."""
1642
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1643
if not os.path.isfile(bzr_path):
1644
# We are probably installed. Assume sys.argv is the right file
1645
bzr_path = sys.argv[0]
1648
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1649
universal_newlines=False, process_args=None):
1650
"""Finish the execution of process.
1652
:param process: the Popen object returned from start_bzr_subprocess.
1653
:param retcode: The status code that is expected. Defaults to 0. If
1654
None is supplied, the status code is not checked.
1655
:param send_signal: an optional signal to send to the process.
1656
:param universal_newlines: Convert CRLF => LF
1657
:returns: (stdout, stderr)
1659
if send_signal is not None:
1660
os.kill(process.pid, send_signal)
1661
out, err = process.communicate()
1663
if universal_newlines:
1664
out = out.replace('\r\n', '\n')
1665
err = err.replace('\r\n', '\n')
1667
if retcode is not None and retcode != process.returncode:
1668
if process_args is None:
1669
process_args = "(unknown args)"
1670
mutter('Output of bzr %s:\n%s', process_args, out)
1671
mutter('Error for bzr %s:\n%s', process_args, err)
1672
self.fail('Command bzr %s failed with retcode %s != %s'
1673
% (process_args, retcode, process.returncode))
1676
def check_inventory_shape(self, inv, shape):
1677
"""Compare an inventory to a list of expected names.
1679
Fail if they are not precisely equal.
1682
shape = list(shape) # copy
1683
for path, ie in inv.entries():
1684
name = path.replace('\\', '/')
1685
if ie.kind == 'directory':
1692
self.fail("expected paths not found in inventory: %r" % shape)
1694
self.fail("unexpected paths found in inventory: %r" % extras)
1696
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1697
a_callable=None, *args, **kwargs):
1698
"""Call callable with redirected std io pipes.
1700
Returns the return code."""
1701
if not callable(a_callable):
1702
raise ValueError("a_callable must be callable.")
1704
stdin = StringIO("")
1706
if getattr(self, "_log_file", None) is not None:
1707
stdout = self._log_file
1711
if getattr(self, "_log_file", None is not None):
1712
stderr = self._log_file
1715
real_stdin = sys.stdin
1716
real_stdout = sys.stdout
1717
real_stderr = sys.stderr
1722
return a_callable(*args, **kwargs)
1724
sys.stdout = real_stdout
1725
sys.stderr = real_stderr
1726
sys.stdin = real_stdin
1728
def reduceLockdirTimeout(self):
1729
"""Reduce the default lock timeout for the duration of the test, so that
1730
if LockContention occurs during a test, it does so quickly.
1732
Tests that expect to provoke LockContention errors should call this.
1734
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1736
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1737
self.addCleanup(resetTimeout)
1738
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1740
def make_utf8_encoded_stringio(self, encoding_type=None):
1741
"""Return a StringIOWrapper instance, that will encode Unicode
1744
if encoding_type is None:
1745
encoding_type = 'strict'
1747
output_encoding = 'utf-8'
1748
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1749
sio.encoding = output_encoding
1753
class TestCaseWithMemoryTransport(TestCase):
1754
"""Common test class for tests that do not need disk resources.
1756
Tests that need disk resources should derive from TestCaseWithTransport.
1758
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1760
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1761
a directory which does not exist. This serves to help ensure test isolation
1762
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1763
must exist. However, TestCaseWithMemoryTransport does not offer local
1764
file defaults for the transport in tests, nor does it obey the command line
1765
override, so tests that accidentally write to the common directory should
1768
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1769
a .bzr directory that stops us ascending higher into the filesystem.
1775
def __init__(self, methodName='runTest'):
1776
# allow test parameterization after test construction and before test
1777
# execution. Variables that the parameterizer sets need to be
1778
# ones that are not set by setUp, or setUp will trash them.
1779
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1780
self.vfs_transport_factory = default_transport
1781
self.transport_server = None
1782
self.transport_readonly_server = None
1783
self.__vfs_server = None
1785
def get_transport(self, relpath=None):
1786
"""Return a writeable transport.
1788
This transport is for the test scratch space relative to
1791
:param relpath: a path relative to the base url.
1793
t = get_transport(self.get_url(relpath))
1794
self.assertFalse(t.is_readonly())
1797
def get_readonly_transport(self, relpath=None):
1798
"""Return a readonly transport for the test scratch space
1800
This can be used to test that operations which should only need
1801
readonly access in fact do not try to write.
1803
:param relpath: a path relative to the base url.
1805
t = get_transport(self.get_readonly_url(relpath))
1806
self.assertTrue(t.is_readonly())
1809
def create_transport_readonly_server(self):
1810
"""Create a transport server from class defined at init.
1812
This is mostly a hook for daughter classes.
1814
return self.transport_readonly_server()
1816
def get_readonly_server(self):
1817
"""Get the server instance for the readonly transport
1819
This is useful for some tests with specific servers to do diagnostics.
1821
if self.__readonly_server is None:
1822
if self.transport_readonly_server is None:
1823
# readonly decorator requested
1824
# bring up the server
1825
self.__readonly_server = ReadonlyServer()
1826
self.__readonly_server.setUp(self.get_vfs_only_server())
1828
self.__readonly_server = self.create_transport_readonly_server()
1829
self.__readonly_server.setUp(self.get_vfs_only_server())
1830
self.addCleanup(self.__readonly_server.tearDown)
1831
return self.__readonly_server
1833
def get_readonly_url(self, relpath=None):
1834
"""Get a URL for the readonly transport.
1836
This will either be backed by '.' or a decorator to the transport
1837
used by self.get_url()
1838
relpath provides for clients to get a path relative to the base url.
1839
These should only be downwards relative, not upwards.
1841
base = self.get_readonly_server().get_url()
1842
return self._adjust_url(base, relpath)
1844
def get_vfs_only_server(self):
1845
"""Get the vfs only read/write server instance.
1847
This is useful for some tests with specific servers that need
1850
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1851
is no means to override it.
1853
if self.__vfs_server is None:
1854
self.__vfs_server = MemoryServer()
1855
self.__vfs_server.setUp()
1856
self.addCleanup(self.__vfs_server.tearDown)
1857
return self.__vfs_server
1859
def get_server(self):
1860
"""Get the read/write server instance.
1862
This is useful for some tests with specific servers that need
1865
This is built from the self.transport_server factory. If that is None,
1866
then the self.get_vfs_server is returned.
1868
if self.__server is None:
1869
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1870
return self.get_vfs_only_server()
1872
# bring up a decorated means of access to the vfs only server.
1873
self.__server = self.transport_server()
1875
self.__server.setUp(self.get_vfs_only_server())
1876
except TypeError, e:
1877
# This should never happen; the try:Except here is to assist
1878
# developers having to update code rather than seeing an
1879
# uninformative TypeError.
1880
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1881
self.addCleanup(self.__server.tearDown)
1882
return self.__server
1884
def _adjust_url(self, base, relpath):
1885
"""Get a URL (or maybe a path) for the readwrite transport.
1887
This will either be backed by '.' or to an equivalent non-file based
1889
relpath provides for clients to get a path relative to the base url.
1890
These should only be downwards relative, not upwards.
1892
if relpath is not None and relpath != '.':
1893
if not base.endswith('/'):
1895
# XXX: Really base should be a url; we did after all call
1896
# get_url()! But sometimes it's just a path (from
1897
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1898
# to a non-escaped local path.
1899
if base.startswith('./') or base.startswith('/'):
1902
base += urlutils.escape(relpath)
1905
def get_url(self, relpath=None):
1906
"""Get a URL (or maybe a path) for the readwrite transport.
1908
This will either be backed by '.' or to an equivalent non-file based
1910
relpath provides for clients to get a path relative to the base url.
1911
These should only be downwards relative, not upwards.
1913
base = self.get_server().get_url()
1914
return self._adjust_url(base, relpath)
1916
def get_vfs_only_url(self, relpath=None):
1917
"""Get a URL (or maybe a path for the plain old vfs transport.
1919
This will never be a smart protocol. It always has all the
1920
capabilities of the local filesystem, but it might actually be a
1921
MemoryTransport or some other similar virtual filesystem.
1923
This is the backing transport (if any) of the server returned by
1924
get_url and get_readonly_url.
1926
:param relpath: provides for clients to get a path relative to the base
1927
url. These should only be downwards relative, not upwards.
1930
base = self.get_vfs_only_server().get_url()
1931
return self._adjust_url(base, relpath)
1933
def _create_safety_net(self):
1934
"""Make a fake bzr directory.
1936
This prevents any tests propagating up onto the TEST_ROOT directory's
1939
root = TestCaseWithMemoryTransport.TEST_ROOT
1940
bzrdir.BzrDir.create_standalone_workingtree(root)
1942
def _check_safety_net(self):
1943
"""Check that the safety .bzr directory have not been touched.
1945
_make_test_root have created a .bzr directory to prevent tests from
1946
propagating. This method ensures than a test did not leaked.
1948
root = TestCaseWithMemoryTransport.TEST_ROOT
1949
wt = workingtree.WorkingTree.open(root)
1950
last_rev = wt.last_revision()
1951
if last_rev != 'null:':
1952
# The current test have modified the /bzr directory, we need to
1953
# recreate a new one or all the followng tests will fail.
1954
# If you need to inspect its content uncomment the following line
1955
# import pdb; pdb.set_trace()
1956
_rmtree_temp_dir(root + '/.bzr')
1957
self._create_safety_net()
1958
raise AssertionError('%s/.bzr should not be modified' % root)
1960
def _make_test_root(self):
1961
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1962
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1963
TestCaseWithMemoryTransport.TEST_ROOT = root
1965
self._create_safety_net()
1967
# The same directory is used by all tests, and we're not
1968
# specifically told when all tests are finished. This will do.
1969
atexit.register(_rmtree_temp_dir, root)
1971
self.addCleanup(self._check_safety_net)
1973
def makeAndChdirToTestDir(self):
1974
"""Create a temporary directories for this one test.
1976
This must set self.test_home_dir and self.test_dir and chdir to
1979
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1981
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1982
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1983
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1985
def make_branch(self, relpath, format=None):
1986
"""Create a branch on the transport at relpath."""
1987
repo = self.make_repository(relpath, format=format)
1988
return repo.bzrdir.create_branch()
1990
def make_bzrdir(self, relpath, format=None):
1992
# might be a relative or absolute path
1993
maybe_a_url = self.get_url(relpath)
1994
segments = maybe_a_url.rsplit('/', 1)
1995
t = get_transport(maybe_a_url)
1996
if len(segments) > 1 and segments[-1] not in ('', '.'):
2000
if isinstance(format, basestring):
2001
format = bzrdir.format_registry.make_bzrdir(format)
2002
return format.initialize_on_transport(t)
2003
except errors.UninitializableFormat:
2004
raise TestSkipped("Format %s is not initializable." % format)
2006
def make_repository(self, relpath, shared=False, format=None):
2007
"""Create a repository on our default transport at relpath.
2009
Note that relpath must be a relative path, not a full url.
2011
# FIXME: If you create a remoterepository this returns the underlying
2012
# real format, which is incorrect. Actually we should make sure that
2013
# RemoteBzrDir returns a RemoteRepository.
2014
# maybe mbp 20070410
2015
made_control = self.make_bzrdir(relpath, format=format)
2016
return made_control.create_repository(shared=shared)
2018
def make_branch_and_memory_tree(self, relpath, format=None):
2019
"""Create a branch on the default transport and a MemoryTree for it."""
2020
b = self.make_branch(relpath, format=format)
2021
return memorytree.MemoryTree.create_on_branch(b)
2023
def make_branch_builder(self, relpath, format=None):
2024
url = self.get_url(relpath)
2025
tran = get_transport(url)
2026
return branchbuilder.BranchBuilder(get_transport(url), format=format)
2028
def overrideEnvironmentForTesting(self):
2029
os.environ['HOME'] = self.test_home_dir
2030
os.environ['BZR_HOME'] = self.test_home_dir
2033
super(TestCaseWithMemoryTransport, self).setUp()
2034
self._make_test_root()
2035
_currentdir = os.getcwdu()
2036
def _leaveDirectory():
2037
os.chdir(_currentdir)
2038
self.addCleanup(_leaveDirectory)
2039
self.makeAndChdirToTestDir()
2040
self.overrideEnvironmentForTesting()
2041
self.__readonly_server = None
2042
self.__server = None
2043
self.reduceLockdirTimeout()
2045
def setup_smart_server_with_call_log(self):
2046
"""Sets up a smart server as the transport server with a call log."""
2047
self.transport_server = server.SmartTCPServer_for_testing
2048
self.hpss_calls = []
2049
def capture_hpss_call(params):
2051
self.hpss_calls.append((params, traceback.format_stack()))
2052
client._SmartClient.hooks.install_named_hook(
2053
'call', capture_hpss_call, None)
2055
def reset_smart_call_log(self):
2056
self.hpss_calls = []
2059
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2060
"""Derived class that runs a test within a temporary directory.
2062
This is useful for tests that need to create a branch, etc.
2064
The directory is created in a slightly complex way: for each
2065
Python invocation, a new temporary top-level directory is created.
2066
All test cases create their own directory within that. If the
2067
tests complete successfully, the directory is removed.
2069
:ivar test_base_dir: The path of the top-level directory for this
2070
test, which contains a home directory and a work directory.
2072
:ivar test_home_dir: An initially empty directory under test_base_dir
2073
which is used as $HOME for this test.
2075
:ivar test_dir: A directory under test_base_dir used as the current
2076
directory when the test proper is run.
2079
OVERRIDE_PYTHON = 'python'
2081
def check_file_contents(self, filename, expect):
2082
self.log("check contents of file %s" % filename)
2083
contents = file(filename, 'r').read()
2084
if contents != expect:
2085
self.log("expected: %r" % expect)
2086
self.log("actually: %r" % contents)
2087
self.fail("contents of %s not as expected" % filename)
2089
def _getTestDirPrefix(self):
2090
# create a directory within the top level test directory
2091
if sys.platform == 'win32':
2092
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2093
# windows is likely to have path-length limits so use a short name
2094
name_prefix = name_prefix[-30:]
2096
name_prefix = re.sub('[/]', '_', self.id())
2099
def makeAndChdirToTestDir(self):
2100
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2102
For TestCaseInTempDir we create a temporary directory based on the test
2103
name and then create two subdirs - test and home under it.
2105
name_prefix = osutils.pathjoin(self.TEST_ROOT, self._getTestDirPrefix())
2107
for i in range(100):
2108
if os.path.exists(name):
2109
name = name_prefix + '_' + str(i)
2113
# now create test and home directories within this dir
2114
self.test_base_dir = name
2115
self.test_home_dir = self.test_base_dir + '/home'
2116
os.mkdir(self.test_home_dir)
2117
self.test_dir = self.test_base_dir + '/work'
2118
os.mkdir(self.test_dir)
2119
os.chdir(self.test_dir)
2120
# put name of test inside
2121
f = file(self.test_base_dir + '/name', 'w')
2126
self.addCleanup(self.deleteTestDir)
2128
def deleteTestDir(self):
2129
os.chdir(self.TEST_ROOT)
2130
_rmtree_temp_dir(self.test_base_dir)
2132
def build_tree(self, shape, line_endings='binary', transport=None):
2133
"""Build a test tree according to a pattern.
2135
shape is a sequence of file specifications. If the final
2136
character is '/', a directory is created.
2138
This assumes that all the elements in the tree being built are new.
2140
This doesn't add anything to a branch.
2142
:type shape: list or tuple.
2143
:param line_endings: Either 'binary' or 'native'
2144
in binary mode, exact contents are written in native mode, the
2145
line endings match the default platform endings.
2146
:param transport: A transport to write to, for building trees on VFS's.
2147
If the transport is readonly or None, "." is opened automatically.
2150
if type(shape) not in (list, tuple):
2151
raise AssertionError("Parameter 'shape' should be "
2152
"a list or a tuple. Got %r instead" % (shape,))
2153
# It's OK to just create them using forward slashes on windows.
2154
if transport is None or transport.is_readonly():
2155
transport = get_transport(".")
2157
self.assertIsInstance(name, basestring)
2159
transport.mkdir(urlutils.escape(name[:-1]))
2161
if line_endings == 'binary':
2163
elif line_endings == 'native':
2166
raise errors.BzrError(
2167
'Invalid line ending request %r' % line_endings)
2168
content = "contents of %s%s" % (name.encode('utf-8'), end)
2169
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2171
def build_tree_contents(self, shape):
2172
build_tree_contents(shape)
2174
def assertInWorkingTree(self, path, root_path='.', tree=None):
2175
"""Assert whether path or paths are in the WorkingTree"""
2177
tree = workingtree.WorkingTree.open(root_path)
2178
if not isinstance(path, basestring):
2180
self.assertInWorkingTree(p, tree=tree)
2182
self.assertIsNot(tree.path2id(path), None,
2183
path+' not in working tree.')
2185
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2186
"""Assert whether path or paths are not in the WorkingTree"""
2188
tree = workingtree.WorkingTree.open(root_path)
2189
if not isinstance(path, basestring):
2191
self.assertNotInWorkingTree(p,tree=tree)
2193
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2196
class TestCaseWithTransport(TestCaseInTempDir):
2197
"""A test case that provides get_url and get_readonly_url facilities.
2199
These back onto two transport servers, one for readonly access and one for
2202
If no explicit class is provided for readonly access, a
2203
ReadonlyTransportDecorator is used instead which allows the use of non disk
2204
based read write transports.
2206
If an explicit class is provided for readonly access, that server and the
2207
readwrite one must both define get_url() as resolving to os.getcwd().
2210
def get_vfs_only_server(self):
2211
"""See TestCaseWithMemoryTransport.
2213
This is useful for some tests with specific servers that need
2216
if self.__vfs_server is None:
2217
self.__vfs_server = self.vfs_transport_factory()
2218
self.__vfs_server.setUp()
2219
self.addCleanup(self.__vfs_server.tearDown)
2220
return self.__vfs_server
2222
def make_branch_and_tree(self, relpath, format=None):
2223
"""Create a branch on the transport and a tree locally.
2225
If the transport is not a LocalTransport, the Tree can't be created on
2226
the transport. In that case if the vfs_transport_factory is
2227
LocalURLServer the working tree is created in the local
2228
directory backing the transport, and the returned tree's branch and
2229
repository will also be accessed locally. Otherwise a lightweight
2230
checkout is created and returned.
2232
:param format: The BzrDirFormat.
2233
:returns: the WorkingTree.
2235
# TODO: always use the local disk path for the working tree,
2236
# this obviously requires a format that supports branch references
2237
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2239
b = self.make_branch(relpath, format=format)
2241
return b.bzrdir.create_workingtree()
2242
except errors.NotLocalUrl:
2243
# We can only make working trees locally at the moment. If the
2244
# transport can't support them, then we keep the non-disk-backed
2245
# branch and create a local checkout.
2246
if self.vfs_transport_factory is LocalURLServer:
2247
# the branch is colocated on disk, we cannot create a checkout.
2248
# hopefully callers will expect this.
2249
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2250
wt = local_controldir.create_workingtree()
2251
if wt.branch._format != b._format:
2253
# Make sure that assigning to wt._branch fixes wt.branch,
2254
# in case the implementation details of workingtree objects
2256
self.assertIs(b, wt.branch)
2259
return b.create_checkout(relpath, lightweight=True)
2261
def assertIsDirectory(self, relpath, transport):
2262
"""Assert that relpath within transport is a directory.
2264
This may not be possible on all transports; in that case it propagates
2265
a TransportNotPossible.
2268
mode = transport.stat(relpath).st_mode
2269
except errors.NoSuchFile:
2270
self.fail("path %s is not a directory; no such file"
2272
if not stat.S_ISDIR(mode):
2273
self.fail("path %s is not a directory; has mode %#o"
2276
def assertTreesEqual(self, left, right):
2277
"""Check that left and right have the same content and properties."""
2278
# we use a tree delta to check for equality of the content, and we
2279
# manually check for equality of other things such as the parents list.
2280
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2281
differences = left.changes_from(right)
2282
self.assertFalse(differences.has_changed(),
2283
"Trees %r and %r are different: %r" % (left, right, differences))
2286
super(TestCaseWithTransport, self).setUp()
2287
self.__vfs_server = None
2290
class ChrootedTestCase(TestCaseWithTransport):
2291
"""A support class that provides readonly urls outside the local namespace.
2293
This is done by checking if self.transport_server is a MemoryServer. if it
2294
is then we are chrooted already, if it is not then an HttpServer is used
2297
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2298
be used without needed to redo it when a different
2299
subclass is in use ?
2303
super(ChrootedTestCase, self).setUp()
2304
if not self.vfs_transport_factory == MemoryServer:
2305
self.transport_readonly_server = HttpServer
2308
def condition_id_re(pattern):
2309
"""Create a condition filter which performs a re check on a test's id.
2311
:param pattern: A regular expression string.
2312
:return: A callable that returns True if the re matches.
2314
filter_re = re.compile(pattern)
2315
def condition(test):
2317
return filter_re.search(test_id)
2321
def condition_isinstance(klass_or_klass_list):
2322
"""Create a condition filter which returns isinstance(param, klass).
2324
:return: A callable which when called with one parameter obj return the
2325
result of isinstance(obj, klass_or_klass_list).
2328
return isinstance(obj, klass_or_klass_list)
2332
def condition_id_in_list(id_list):
2333
"""Create a condition filter which verify that test's id in a list.
2335
:param id_list: A TestIdList object.
2336
:return: A callable that returns True if the test's id appears in the list.
2338
def condition(test):
2339
return id_list.includes(test.id())
2343
def condition_id_startswith(starts):
2344
"""Create a condition filter verifying that test's id starts with a string.
2346
:param starts: A list of string.
2347
:return: A callable that returns True if the test's id starts with one of
2350
def condition(test):
2351
for start in starts:
2352
if test.id().startswith(start):
2358
def exclude_tests_by_condition(suite, condition):
2359
"""Create a test suite which excludes some tests from suite.
2361
:param suite: The suite to get tests from.
2362
:param condition: A callable whose result evaluates True when called with a
2363
test case which should be excluded from the result.
2364
:return: A suite which contains the tests found in suite that fail
2368
for test in iter_suite_tests(suite):
2369
if not condition(test):
2371
return TestUtil.TestSuite(result)
2374
def filter_suite_by_condition(suite, condition):
2375
"""Create a test suite by filtering another one.
2377
:param suite: The source suite.
2378
:param condition: A callable whose result evaluates True when called with a
2379
test case which should be included in the result.
2380
:return: A suite which contains the tests found in suite that pass
2384
for test in iter_suite_tests(suite):
2387
return TestUtil.TestSuite(result)
2390
def filter_suite_by_re(suite, pattern):
2391
"""Create a test suite by filtering another one.
2393
:param suite: the source suite
2394
:param pattern: pattern that names must match
2395
:returns: the newly created suite
2397
condition = condition_id_re(pattern)
2398
result_suite = filter_suite_by_condition(suite, condition)
2402
def filter_suite_by_id_list(suite, test_id_list):
2403
"""Create a test suite by filtering another one.
2405
:param suite: The source suite.
2406
:param test_id_list: A list of the test ids to keep as strings.
2407
:returns: the newly created suite
2409
condition = condition_id_in_list(test_id_list)
2410
result_suite = filter_suite_by_condition(suite, condition)
2414
def filter_suite_by_id_startswith(suite, start):
2415
"""Create a test suite by filtering another one.
2417
:param suite: The source suite.
2418
:param start: A list of string the test id must start with one of.
2419
:returns: the newly created suite
2421
condition = condition_id_startswith(start)
2422
result_suite = filter_suite_by_condition(suite, condition)
2426
def exclude_tests_by_re(suite, pattern):
2427
"""Create a test suite which excludes some tests from suite.
2429
:param suite: The suite to get tests from.
2430
:param pattern: A regular expression string. Test ids that match this
2431
pattern will be excluded from the result.
2432
:return: A TestSuite that contains all the tests from suite without the
2433
tests that matched pattern. The order of tests is the same as it was in
2436
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2439
def preserve_input(something):
2440
"""A helper for performing test suite transformation chains.
2442
:param something: Anything you want to preserve.
2448
def randomize_suite(suite):
2449
"""Return a new TestSuite with suite's tests in random order.
2451
The tests in the input suite are flattened into a single suite in order to
2452
accomplish this. Any nested TestSuites are removed to provide global
2455
tests = list(iter_suite_tests(suite))
2456
random.shuffle(tests)
2457
return TestUtil.TestSuite(tests)
2460
def split_suite_by_condition(suite, condition):
2461
"""Split a test suite into two by a condition.
2463
:param suite: The suite to split.
2464
:param condition: The condition to match on. Tests that match this
2465
condition are returned in the first test suite, ones that do not match
2466
are in the second suite.
2467
:return: A tuple of two test suites, where the first contains tests from
2468
suite matching the condition, and the second contains the remainder
2469
from suite. The order within each output suite is the same as it was in
2474
for test in iter_suite_tests(suite):
2476
matched.append(test)
2478
did_not_match.append(test)
2479
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2482
def split_suite_by_re(suite, pattern):
2483
"""Split a test suite into two by a regular expression.
2485
:param suite: The suite to split.
2486
:param pattern: A regular expression string. Test ids that match this
2487
pattern will be in the first test suite returned, and the others in the
2488
second test suite returned.
2489
:return: A tuple of two test suites, where the first contains tests from
2490
suite matching pattern, and the second contains the remainder from
2491
suite. The order within each output suite is the same as it was in
2494
return split_suite_by_condition(suite, condition_id_re(pattern))
2497
def run_suite(suite, name='test', verbose=False, pattern=".*",
2498
stop_on_failure=False,
2499
transport=None, lsprof_timed=None, bench_history=None,
2500
matching_tests_first=None,
2503
exclude_pattern=None,
2506
"""Run a test suite for bzr selftest.
2508
:param runner_class: The class of runner to use. Must support the
2509
constructor arguments passed by run_suite which are more than standard
2511
:return: A boolean indicating success.
2513
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2518
if runner_class is None:
2519
runner_class = TextTestRunner
2520
runner = runner_class(stream=sys.stdout,
2522
verbosity=verbosity,
2523
bench_history=bench_history,
2524
list_only=list_only,
2526
runner.stop_on_failure=stop_on_failure
2527
# Initialise the random number generator and display the seed used.
2528
# We convert the seed to a long to make it reuseable across invocations.
2529
random_order = False
2530
if random_seed is not None:
2532
if random_seed == "now":
2533
random_seed = long(time.time())
2535
# Convert the seed to a long if we can
2537
random_seed = long(random_seed)
2540
runner.stream.writeln("Randomizing test order using seed %s\n" %
2542
random.seed(random_seed)
2543
# Customise the list of tests if requested
2544
if exclude_pattern is not None:
2545
suite = exclude_tests_by_re(suite, exclude_pattern)
2547
order_changer = randomize_suite
2549
order_changer = preserve_input
2550
if pattern != '.*' or random_order:
2551
if matching_tests_first:
2552
suites = map(order_changer, split_suite_by_re(suite, pattern))
2553
suite = TestUtil.TestSuite(suites)
2555
suite = order_changer(filter_suite_by_re(suite, pattern))
2557
result = runner.run(suite)
2560
return result.wasStrictlySuccessful()
2562
return result.wasSuccessful()
2565
# Controlled by "bzr selftest -E=..." option
2566
selftest_debug_flags = set()
2569
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2571
test_suite_factory=None,
2574
matching_tests_first=None,
2577
exclude_pattern=None,
2584
"""Run the whole test suite under the enhanced runner"""
2585
# XXX: Very ugly way to do this...
2586
# Disable warning about old formats because we don't want it to disturb
2587
# any blackbox tests.
2588
from bzrlib import repository
2589
repository._deprecation_warning_done = True
2591
global default_transport
2592
if transport is None:
2593
transport = default_transport
2594
old_transport = default_transport
2595
default_transport = transport
2596
global selftest_debug_flags
2597
old_debug_flags = selftest_debug_flags
2598
if debug_flags is not None:
2599
selftest_debug_flags = set(debug_flags)
2601
if load_list is None:
2604
keep_only = load_test_id_list(load_list)
2605
if test_suite_factory is None:
2606
suite = test_suite(keep_only, starting_with)
2608
suite = test_suite_factory()
2609
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2610
stop_on_failure=stop_on_failure,
2611
transport=transport,
2612
lsprof_timed=lsprof_timed,
2613
bench_history=bench_history,
2614
matching_tests_first=matching_tests_first,
2615
list_only=list_only,
2616
random_seed=random_seed,
2617
exclude_pattern=exclude_pattern,
2619
runner_class=runner_class,
2622
default_transport = old_transport
2623
selftest_debug_flags = old_debug_flags
2626
def load_test_id_list(file_name):
2627
"""Load a test id list from a text file.
2629
The format is one test id by line. No special care is taken to impose
2630
strict rules, these test ids are used to filter the test suite so a test id
2631
that do not match an existing test will do no harm. This allows user to add
2632
comments, leave blank lines, etc.
2636
ftest = open(file_name, 'rt')
2638
if e.errno != errno.ENOENT:
2641
raise errors.NoSuchFile(file_name)
2643
for test_name in ftest.readlines():
2644
test_list.append(test_name.strip())
2649
def suite_matches_id_list(test_suite, id_list):
2650
"""Warns about tests not appearing or appearing more than once.
2652
:param test_suite: A TestSuite object.
2653
:param test_id_list: The list of test ids that should be found in
2656
:return: (absents, duplicates) absents is a list containing the test found
2657
in id_list but not in test_suite, duplicates is a list containing the
2658
test found multiple times in test_suite.
2660
When using a prefined test id list, it may occurs that some tests do not
2661
exist anymore or that some tests use the same id. This function warns the
2662
tester about potential problems in his workflow (test lists are volatile)
2663
or in the test suite itself (using the same id for several tests does not
2664
help to localize defects).
2666
# Build a dict counting id occurrences
2668
for test in iter_suite_tests(test_suite):
2670
tests[id] = tests.get(id, 0) + 1
2675
occurs = tests.get(id, 0)
2677
not_found.append(id)
2679
duplicates.append(id)
2681
return not_found, duplicates
2684
class TestIdList(object):
2685
"""Test id list to filter a test suite.
2687
Relying on the assumption that test ids are built as:
2688
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2689
notation, this class offers methods to :
2690
- avoid building a test suite for modules not refered to in the test list,
2691
- keep only the tests listed from the module test suite.
2694
def __init__(self, test_id_list):
2695
# When a test suite needs to be filtered against us we compare test ids
2696
# for equality, so a simple dict offers a quick and simple solution.
2697
self.tests = dict().fromkeys(test_id_list, True)
2699
# While unittest.TestCase have ids like:
2700
# <module>.<class>.<method>[(<param+)],
2701
# doctest.DocTestCase can have ids like:
2704
# <module>.<function>
2705
# <module>.<class>.<method>
2707
# Since we can't predict a test class from its name only, we settle on
2708
# a simple constraint: a test id always begins with its module name.
2711
for test_id in test_id_list:
2712
parts = test_id.split('.')
2713
mod_name = parts.pop(0)
2714
modules[mod_name] = True
2716
mod_name += '.' + part
2717
modules[mod_name] = True
2718
self.modules = modules
2720
def refers_to(self, module_name):
2721
"""Is there tests for the module or one of its sub modules."""
2722
return self.modules.has_key(module_name)
2724
def includes(self, test_id):
2725
return self.tests.has_key(test_id)
2728
class TestPrefixAliasRegistry(registry.Registry):
2729
"""A registry for test prefix aliases.
2731
This helps implement shorcuts for the --starting-with selftest
2732
option. Overriding existing prefixes is not allowed but not fatal (a
2733
warning will be emitted).
2736
def register(self, key, obj, help=None, info=None,
2737
override_existing=False):
2738
"""See Registry.register.
2740
Trying to override an existing alias causes a warning to be emitted,
2741
not a fatal execption.
2744
super(TestPrefixAliasRegistry, self).register(
2745
key, obj, help=help, info=info, override_existing=False)
2747
actual = self.get(key)
2748
note('Test prefix alias %s is already used for %s, ignoring %s'
2749
% (key, actual, obj))
2751
def resolve_alias(self, id_start):
2752
"""Replace the alias by the prefix in the given string.
2754
Using an unknown prefix is an error to help catching typos.
2756
parts = id_start.split('.')
2758
parts[0] = self.get(parts[0])
2760
raise errors.BzrCommandError(
2761
'%s is not a known test prefix alias' % parts[0])
2762
return '.'.join(parts)
2765
test_prefix_alias_registry = TestPrefixAliasRegistry()
2766
"""Registry of test prefix aliases."""
2769
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
2770
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
2771
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
2773
# Obvious higest levels prefixes, feel free to add your own via a plugin
2774
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
2775
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
2776
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
2777
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
2778
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
2781
def test_suite(keep_only=None, starting_with=None):
2782
"""Build and return TestSuite for the whole of bzrlib.
2784
:param keep_only: A list of test ids limiting the suite returned.
2786
:param starting_with: An id limiting the suite returned to the tests
2789
This function can be replaced if you need to change the default test
2790
suite on a global basis, but it is not encouraged.
2794
'bzrlib.tests.blackbox',
2795
'bzrlib.tests.branch_implementations',
2796
'bzrlib.tests.bzrdir_implementations',
2797
'bzrlib.tests.commands',
2798
'bzrlib.tests.interrepository_implementations',
2799
'bzrlib.tests.intertree_implementations',
2800
'bzrlib.tests.inventory_implementations',
2801
'bzrlib.tests.per_interbranch',
2802
'bzrlib.tests.per_lock',
2803
'bzrlib.tests.per_repository',
2804
'bzrlib.tests.per_repository_reference',
2805
'bzrlib.tests.test__dirstate_helpers',
2806
'bzrlib.tests.test__walkdirs_win32',
2807
'bzrlib.tests.test_ancestry',
2808
'bzrlib.tests.test_annotate',
2809
'bzrlib.tests.test_api',
2810
'bzrlib.tests.test_atomicfile',
2811
'bzrlib.tests.test_bad_files',
2812
'bzrlib.tests.test_bisect_multi',
2813
'bzrlib.tests.test_branch',
2814
'bzrlib.tests.test_branchbuilder',
2815
'bzrlib.tests.test_btree_index',
2816
'bzrlib.tests.test_bugtracker',
2817
'bzrlib.tests.test_bundle',
2818
'bzrlib.tests.test_bzrdir',
2819
'bzrlib.tests.test_cache_utf8',
2820
'bzrlib.tests.test_chunk_writer',
2821
'bzrlib.tests.test__chunks_to_lines',
2822
'bzrlib.tests.test_commands',
2823
'bzrlib.tests.test_commit',
2824
'bzrlib.tests.test_commit_merge',
2825
'bzrlib.tests.test_config',
2826
'bzrlib.tests.test_conflicts',
2827
'bzrlib.tests.test_counted_lock',
2828
'bzrlib.tests.test_decorators',
2829
'bzrlib.tests.test_delta',
2830
'bzrlib.tests.test_deprecated_graph',
2831
'bzrlib.tests.test_diff',
2832
'bzrlib.tests.test_directory_service',
2833
'bzrlib.tests.test_dirstate',
2834
'bzrlib.tests.test_email_message',
2835
'bzrlib.tests.test_errors',
2836
'bzrlib.tests.test_export',
2837
'bzrlib.tests.test_extract',
2838
'bzrlib.tests.test_fetch',
2839
'bzrlib.tests.test_fifo_cache',
2840
'bzrlib.tests.test_ftp_transport',
2841
'bzrlib.tests.test_foreign',
2842
'bzrlib.tests.test_generate_docs',
2843
'bzrlib.tests.test_generate_ids',
2844
'bzrlib.tests.test_globbing',
2845
'bzrlib.tests.test_gpg',
2846
'bzrlib.tests.test_graph',
2847
'bzrlib.tests.test_hashcache',
2848
'bzrlib.tests.test_help',
2849
'bzrlib.tests.test_hooks',
2850
'bzrlib.tests.test_http',
2851
'bzrlib.tests.test_http_implementations',
2852
'bzrlib.tests.test_http_response',
2853
'bzrlib.tests.test_https_ca_bundle',
2854
'bzrlib.tests.test_identitymap',
2855
'bzrlib.tests.test_ignores',
2856
'bzrlib.tests.test_index',
2857
'bzrlib.tests.test_info',
2858
'bzrlib.tests.test_inv',
2859
'bzrlib.tests.test_knit',
2860
'bzrlib.tests.test_lazy_import',
2861
'bzrlib.tests.test_lazy_regex',
2862
'bzrlib.tests.test_lockable_files',
2863
'bzrlib.tests.test_lockdir',
2864
'bzrlib.tests.test_log',
2865
'bzrlib.tests.test_lru_cache',
2866
'bzrlib.tests.test_lsprof',
2867
'bzrlib.tests.test_mail_client',
2868
'bzrlib.tests.test_memorytree',
2869
'bzrlib.tests.test_merge',
2870
'bzrlib.tests.test_merge3',
2871
'bzrlib.tests.test_merge_core',
2872
'bzrlib.tests.test_merge_directive',
2873
'bzrlib.tests.test_missing',
2874
'bzrlib.tests.test_msgeditor',
2875
'bzrlib.tests.test_multiparent',
2876
'bzrlib.tests.test_mutabletree',
2877
'bzrlib.tests.test_nonascii',
2878
'bzrlib.tests.test_options',
2879
'bzrlib.tests.test_osutils',
2880
'bzrlib.tests.test_osutils_encodings',
2881
'bzrlib.tests.test_pack',
2882
'bzrlib.tests.test_pack_repository',
2883
'bzrlib.tests.test_patch',
2884
'bzrlib.tests.test_patches',
2885
'bzrlib.tests.test_permissions',
2886
'bzrlib.tests.test_plugins',
2887
'bzrlib.tests.test_progress',
2888
'bzrlib.tests.test_read_bundle',
2889
'bzrlib.tests.test_reconcile',
2890
'bzrlib.tests.test_reconfigure',
2891
'bzrlib.tests.test_registry',
2892
'bzrlib.tests.test_remote',
2893
'bzrlib.tests.test_repository',
2894
'bzrlib.tests.test_revert',
2895
'bzrlib.tests.test_revision',
2896
'bzrlib.tests.test_revisionspec',
2897
'bzrlib.tests.test_revisiontree',
2898
'bzrlib.tests.test_rio',
2899
'bzrlib.tests.test_rules',
2900
'bzrlib.tests.test_sampler',
2901
'bzrlib.tests.test_selftest',
2902
'bzrlib.tests.test_setup',
2903
'bzrlib.tests.test_sftp_transport',
2904
'bzrlib.tests.test_shelf',
2905
'bzrlib.tests.test_shelf_ui',
2906
'bzrlib.tests.test_smart',
2907
'bzrlib.tests.test_smart_add',
2908
'bzrlib.tests.test_smart_request',
2909
'bzrlib.tests.test_smart_transport',
2910
'bzrlib.tests.test_smtp_connection',
2911
'bzrlib.tests.test_source',
2912
'bzrlib.tests.test_ssh_transport',
2913
'bzrlib.tests.test_status',
2914
'bzrlib.tests.test_store',
2915
'bzrlib.tests.test_strace',
2916
'bzrlib.tests.test_subsume',
2917
'bzrlib.tests.test_switch',
2918
'bzrlib.tests.test_symbol_versioning',
2919
'bzrlib.tests.test_tag',
2920
'bzrlib.tests.test_testament',
2921
'bzrlib.tests.test_textfile',
2922
'bzrlib.tests.test_textmerge',
2923
'bzrlib.tests.test_timestamp',
2924
'bzrlib.tests.test_trace',
2925
'bzrlib.tests.test_transactions',
2926
'bzrlib.tests.test_transform',
2927
'bzrlib.tests.test_transport',
2928
'bzrlib.tests.test_transport_implementations',
2929
'bzrlib.tests.test_transport_log',
2930
'bzrlib.tests.test_tree',
2931
'bzrlib.tests.test_treebuilder',
2932
'bzrlib.tests.test_tsort',
2933
'bzrlib.tests.test_tuned_gzip',
2934
'bzrlib.tests.test_ui',
2935
'bzrlib.tests.test_uncommit',
2936
'bzrlib.tests.test_upgrade',
2937
'bzrlib.tests.test_upgrade_stacked',
2938
'bzrlib.tests.test_urlutils',
2939
'bzrlib.tests.test_version',
2940
'bzrlib.tests.test_version_info',
2941
'bzrlib.tests.test_versionedfile',
2942
'bzrlib.tests.test_weave',
2943
'bzrlib.tests.test_whitebox',
2944
'bzrlib.tests.test_win32utils',
2945
'bzrlib.tests.test_workingtree',
2946
'bzrlib.tests.test_workingtree_4',
2947
'bzrlib.tests.test_wsgi',
2948
'bzrlib.tests.test_xml',
2949
'bzrlib.tests.tree_implementations',
2950
'bzrlib.tests.workingtree_implementations',
2951
'bzrlib.util.tests.test_bencode',
2954
loader = TestUtil.TestLoader()
2957
starting_with = [test_prefix_alias_registry.resolve_alias(start)
2958
for start in starting_with]
2959
# We take precedence over keep_only because *at loading time* using
2960
# both options means we will load less tests for the same final result.
2961
def interesting_module(name):
2962
for start in starting_with:
2964
# Either the module name starts with the specified string
2965
name.startswith(start)
2966
# or it may contain tests starting with the specified string
2967
or start.startswith(name)
2971
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
2973
elif keep_only is not None:
2974
id_filter = TestIdList(keep_only)
2975
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
2976
def interesting_module(name):
2977
return id_filter.refers_to(name)
2980
loader = TestUtil.TestLoader()
2981
def interesting_module(name):
2982
# No filtering, all modules are interesting
2985
suite = loader.suiteClass()
2987
# modules building their suite with loadTestsFromModuleNames
2988
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2990
modules_to_doctest = [
2992
'bzrlib.branchbuilder',
2995
'bzrlib.iterablefile',
2999
'bzrlib.symbol_versioning',
3002
'bzrlib.version_info_formats.format_custom',
3005
for mod in modules_to_doctest:
3006
if not interesting_module(mod):
3007
# No tests to keep here, move along
3010
# note that this really does mean "report only" -- doctest
3011
# still runs the rest of the examples
3012
doc_suite = doctest.DocTestSuite(mod,
3013
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3014
except ValueError, e:
3015
print '**failed to get doctest for: %s\n%s' % (mod, e)
3017
if len(doc_suite._tests) == 0:
3018
raise errors.BzrError("no doctests found in %s" % (mod,))
3019
suite.addTest(doc_suite)
3021
default_encoding = sys.getdefaultencoding()
3022
for name, plugin in bzrlib.plugin.plugins().items():
3023
if not interesting_module(plugin.module.__name__):
3025
plugin_suite = plugin.test_suite()
3026
# We used to catch ImportError here and turn it into just a warning,
3027
# but really if you don't have --no-plugins this should be a failure.
3028
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3029
if plugin_suite is None:
3030
plugin_suite = plugin.load_plugin_tests(loader)
3031
if plugin_suite is not None:
3032
suite.addTest(plugin_suite)
3033
if default_encoding != sys.getdefaultencoding():
3034
bzrlib.trace.warning(
3035
'Plugin "%s" tried to reset default encoding to: %s', name,
3036
sys.getdefaultencoding())
3038
sys.setdefaultencoding(default_encoding)
3041
suite = filter_suite_by_id_startswith(suite, starting_with)
3043
if keep_only is not None:
3044
# Now that the referred modules have loaded their tests, keep only the
3046
suite = filter_suite_by_id_list(suite, id_filter)
3047
# Do some sanity checks on the id_list filtering
3048
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3050
# The tester has used both keep_only and starting_with, so he is
3051
# already aware that some tests are excluded from the list, there
3052
# is no need to tell him which.
3055
# Some tests mentioned in the list are not in the test suite. The
3056
# list may be out of date, report to the tester.
3057
for id in not_found:
3058
bzrlib.trace.warning('"%s" not found in the test suite', id)
3059
for id in duplicates:
3060
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3065
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
3066
"""Adapt all tests in some given modules to given scenarios.
3068
This is the recommended public interface for test parameterization.
3069
Typically the test_suite() method for a per-implementation test
3070
suite will call multiply_tests_from_modules and return the
3073
:param module_name_list: List of fully-qualified names of test
3075
:param scenario_iter: Iterable of pairs of (scenario_name,
3076
scenario_param_dict).
3077
:param loader: If provided, will be used instead of a new
3078
bzrlib.tests.TestLoader() instance.
3080
This returns a new TestSuite containing the cross product of
3081
all the tests in all the modules, each repeated for each scenario.
3082
Each test is adapted by adding the scenario name at the end
3083
of its name, and updating the test object's __dict__ with the
3084
scenario_param_dict.
3086
>>> r = multiply_tests_from_modules(
3087
... ['bzrlib.tests.test_sampler'],
3088
... [('one', dict(param=1)),
3089
... ('two', dict(param=2))])
3090
>>> tests = list(iter_suite_tests(r))
3094
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3100
# XXX: Isn't load_tests() a better way to provide the same functionality
3101
# without forcing a predefined TestScenarioApplier ? --vila 080215
3103
loader = TestUtil.TestLoader()
3105
suite = loader.suiteClass()
3107
adapter = TestScenarioApplier()
3108
adapter.scenarios = list(scenario_iter)
3109
adapt_modules(module_name_list, adapter, loader, suite)
3113
def multiply_scenarios(scenarios_left, scenarios_right):
3114
"""Multiply two sets of scenarios.
3116
:returns: the cartesian product of the two sets of scenarios, that is
3117
a scenario for every possible combination of a left scenario and a
3121
('%s,%s' % (left_name, right_name),
3122
dict(left_dict.items() + right_dict.items()))
3123
for left_name, left_dict in scenarios_left
3124
for right_name, right_dict in scenarios_right]
3128
def adapt_modules(mods_list, adapter, loader, suite):
3129
"""Adapt the modules in mods_list using adapter and add to suite."""
3130
tests = loader.loadTestsFromModuleNames(mods_list)
3131
adapt_tests(tests, adapter, suite)
3134
def adapt_tests(tests_list, adapter, suite):
3135
"""Adapt the tests in tests_list using adapter and add to suite."""
3136
for test in iter_suite_tests(tests_list):
3137
suite.addTests(adapter.adapt(test))
3140
def _rmtree_temp_dir(dirname):
3141
# If LANG=C we probably have created some bogus paths
3142
# which rmtree(unicode) will fail to delete
3143
# so make sure we are using rmtree(str) to delete everything
3144
# except on win32, where rmtree(str) will fail
3145
# since it doesn't have the property of byte-stream paths
3146
# (they are either ascii or mbcs)
3147
if sys.platform == 'win32':
3148
# make sure we are using the unicode win32 api
3149
dirname = unicode(dirname)
3151
dirname = dirname.encode(sys.getfilesystemencoding())
3153
osutils.rmtree(dirname)
3155
if sys.platform == 'win32' and e.errno == errno.EACCES:
3156
sys.stderr.write(('Permission denied: '
3157
'unable to remove testing dir '
3158
'%s\n' % os.path.basename(dirname)))
3163
class Feature(object):
3164
"""An operating system Feature."""
3167
self._available = None
3169
def available(self):
3170
"""Is the feature available?
3172
:return: True if the feature is available.
3174
if self._available is None:
3175
self._available = self._probe()
3176
return self._available
3179
"""Implement this method in concrete features.
3181
:return: True if the feature is available.
3183
raise NotImplementedError
3186
if getattr(self, 'feature_name', None):
3187
return self.feature_name()
3188
return self.__class__.__name__
3191
class _SymlinkFeature(Feature):
3194
return osutils.has_symlinks()
3196
def feature_name(self):
3199
SymlinkFeature = _SymlinkFeature()
3202
class _HardlinkFeature(Feature):
3205
return osutils.has_hardlinks()
3207
def feature_name(self):
3210
HardlinkFeature = _HardlinkFeature()
3213
class _OsFifoFeature(Feature):
3216
return getattr(os, 'mkfifo', None)
3218
def feature_name(self):
3219
return 'filesystem fifos'
3221
OsFifoFeature = _OsFifoFeature()
3224
class _UnicodeFilenameFeature(Feature):
3225
"""Does the filesystem support Unicode filenames?"""
3229
# Check for character combinations unlikely to be covered by any
3230
# single non-unicode encoding. We use the characters
3231
# - greek small letter alpha (U+03B1) and
3232
# - braille pattern dots-123456 (U+283F).
3233
os.stat(u'\u03b1\u283f')
3234
except UnicodeEncodeError:
3236
except (IOError, OSError):
3237
# The filesystem allows the Unicode filename but the file doesn't
3241
# The filesystem allows the Unicode filename and the file exists,
3245
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3248
class TestScenarioApplier(object):
3249
"""A tool to apply scenarios to tests."""
3251
def adapt(self, test):
3252
"""Return a TestSuite containing a copy of test for each scenario."""
3253
result = unittest.TestSuite()
3254
for scenario in self.scenarios:
3255
result.addTest(self.adapt_test_to_scenario(test, scenario))
3258
def adapt_test_to_scenario(self, test, scenario):
3259
"""Copy test and apply scenario to it.
3261
:param test: A test to adapt.
3262
:param scenario: A tuple describing the scenarion.
3263
The first element of the tuple is the new test id.
3264
The second element is a dict containing attributes to set on the
3266
:return: The adapted test.
3268
from copy import deepcopy
3269
new_test = deepcopy(test)
3270
for name, value in scenario[1].items():
3271
setattr(new_test, name, value)
3272
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3273
new_test.id = lambda: new_id
3277
def probe_unicode_in_user_encoding():
3278
"""Try to encode several unicode strings to use in unicode-aware tests.
3279
Return first successfull match.
3281
:return: (unicode value, encoded plain string value) or (None, None)
3283
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3284
for uni_val in possible_vals:
3286
str_val = uni_val.encode(osutils.get_user_encoding())
3287
except UnicodeEncodeError:
3288
# Try a different character
3291
return uni_val, str_val
3295
def probe_bad_non_ascii(encoding):
3296
"""Try to find [bad] character with code [128..255]
3297
that cannot be decoded to unicode in some encoding.
3298
Return None if all non-ascii characters is valid
3301
for i in xrange(128, 256):
3304
char.decode(encoding)
3305
except UnicodeDecodeError:
3310
class _FTPServerFeature(Feature):
3311
"""Some tests want an FTP Server, check if one is available.
3313
Right now, the only way this is available is if 'medusa' is installed.
3314
http://www.amk.ca/python/code/medusa.html
3319
import bzrlib.tests.ftp_server
3324
def feature_name(self):
3328
FTPServerFeature = _FTPServerFeature()
3331
class _HTTPSServerFeature(Feature):
3332
"""Some tests want an https Server, check if one is available.
3334
Right now, the only way this is available is under python2.6 which provides
3345
def feature_name(self):
3346
return 'HTTPSServer'
3349
HTTPSServerFeature = _HTTPSServerFeature()
3352
class _UnicodeFilename(Feature):
3353
"""Does the filesystem support Unicode filenames?"""
3358
except UnicodeEncodeError:
3360
except (IOError, OSError):
3361
# The filesystem allows the Unicode filename but the file doesn't
3365
# The filesystem allows the Unicode filename and the file exists,
3369
UnicodeFilename = _UnicodeFilename()
3372
class _UTF8Filesystem(Feature):
3373
"""Is the filesystem UTF-8?"""
3376
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
3380
UTF8Filesystem = _UTF8Filesystem()
3383
class _CaseInsCasePresFilenameFeature(Feature):
3384
"""Is the file-system case insensitive, but case-preserving?"""
3387
fileno, name = tempfile.mkstemp(prefix='MixedCase')
3389
# first check truly case-preserving for created files, then check
3390
# case insensitive when opening existing files.
3391
name = osutils.normpath(name)
3392
base, rel = osutils.split(name)
3393
found_rel = osutils.canonical_relpath(base, name)
3394
return (found_rel == rel
3395
and os.path.isfile(name.upper())
3396
and os.path.isfile(name.lower()))
3401
def feature_name(self):
3402
return "case-insensitive case-preserving filesystem"
3404
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
3407
class _CaseInsensitiveFilesystemFeature(Feature):
3408
"""Check if underlying filesystem is case-insensitive but *not* case
3411
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
3412
# more likely to be case preserving, so this case is rare.
3415
if CaseInsCasePresFilenameFeature.available():
3418
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3419
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3420
TestCaseWithMemoryTransport.TEST_ROOT = root
3422
root = TestCaseWithMemoryTransport.TEST_ROOT
3423
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3425
name_a = osutils.pathjoin(tdir, 'a')
3426
name_A = osutils.pathjoin(tdir, 'A')
3428
result = osutils.isdir(name_A)
3429
_rmtree_temp_dir(tdir)
3432
def feature_name(self):
3433
return 'case-insensitive filesystem'
3435
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()