1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
62
import bzrlib.commands
63
import bzrlib.timestamp
65
import bzrlib.inventory
66
import bzrlib.iterablefile
71
# lsprof not available
73
from bzrlib.merge import merge_inner
76
from bzrlib.revision import common_ancestor
78
from bzrlib import symbol_versioning
79
from bzrlib.symbol_versioning import (
89
from bzrlib.transport import get_transport
90
import bzrlib.transport
91
from bzrlib.transport.local import LocalURLServer
92
from bzrlib.transport.memory import MemoryServer
93
from bzrlib.transport.readonly import ReadonlyServer
94
from bzrlib.trace import mutter, note
95
from bzrlib.tests import TestUtil
96
from bzrlib.tests.http_server import HttpServer
97
from bzrlib.tests.TestUtil import (
101
from bzrlib.tests.treeshape import build_tree_contents
102
import bzrlib.version_info_formats.format_custom
103
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
105
# Mark this python module as being part of the implementation
106
# of unittest: this gives us better tracebacks where the last
107
# shown frame is the test code, not our assertXYZ.
110
default_transport = LocalURLServer
113
MODULES_TO_DOCTEST = [
123
bzrlib.version_info_formats.format_custom,
124
# quoted to avoid module-loading circularity
129
def packages_to_test():
130
"""Return a list of packages to test.
132
The packages are not globally imported so that import failures are
133
triggered when running selftest, not when importing the command.
136
import bzrlib.tests.blackbox
137
import bzrlib.tests.branch_implementations
138
import bzrlib.tests.bzrdir_implementations
139
import bzrlib.tests.commands
140
import bzrlib.tests.interrepository_implementations
141
import bzrlib.tests.interversionedfile_implementations
142
import bzrlib.tests.intertree_implementations
143
import bzrlib.tests.inventory_implementations
144
import bzrlib.tests.per_lock
145
import bzrlib.tests.repository_implementations
146
import bzrlib.tests.revisionstore_implementations
147
import bzrlib.tests.tree_implementations
148
import bzrlib.tests.workingtree_implementations
151
bzrlib.tests.blackbox,
152
bzrlib.tests.branch_implementations,
153
bzrlib.tests.bzrdir_implementations,
154
bzrlib.tests.commands,
155
bzrlib.tests.interrepository_implementations,
156
bzrlib.tests.interversionedfile_implementations,
157
bzrlib.tests.intertree_implementations,
158
bzrlib.tests.inventory_implementations,
159
bzrlib.tests.per_lock,
160
bzrlib.tests.repository_implementations,
161
bzrlib.tests.revisionstore_implementations,
162
bzrlib.tests.tree_implementations,
163
bzrlib.tests.workingtree_implementations,
167
class ExtendedTestResult(unittest._TextTestResult):
168
"""Accepts, reports and accumulates the results of running tests.
170
Compared to the unittest version this class adds support for
171
profiling, benchmarking, stopping as soon as a test fails, and
172
skipping tests. There are further-specialized subclasses for
173
different types of display.
175
When a test finishes, in whatever way, it calls one of the addSuccess,
176
addFailure or addError classes. These in turn may redirect to a more
177
specific case for the special test results supported by our extended
180
Note that just one of these objects is fed the results from many tests.
185
def __init__(self, stream, descriptions, verbosity,
189
"""Construct new TestResult.
191
:param bench_history: Optionally, a writable file object to accumulate
194
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
195
if bench_history is not None:
196
from bzrlib.version import _get_bzr_source_tree
197
src_tree = _get_bzr_source_tree()
200
revision_id = src_tree.get_parent_ids()[0]
202
# XXX: if this is a brand new tree, do the same as if there
206
# XXX: If there's no branch, what should we do?
208
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
209
self._bench_history = bench_history
210
self.ui = ui.ui_factory
211
self.num_tests = num_tests
213
self.failure_count = 0
214
self.known_failure_count = 0
216
self.not_applicable_count = 0
217
self.unsupported = {}
219
self._overall_start_time = time.time()
221
def _extractBenchmarkTime(self, testCase):
222
"""Add a benchmark time for the current test case."""
223
return getattr(testCase, "_benchtime", None)
225
def _elapsedTestTimeString(self):
226
"""Return a time string for the overall time the current test has taken."""
227
return self._formatTime(time.time() - self._start_time)
229
def _testTimeString(self, testCase):
230
benchmark_time = self._extractBenchmarkTime(testCase)
231
if benchmark_time is not None:
233
self._formatTime(benchmark_time),
234
self._elapsedTestTimeString())
236
return " %s" % self._elapsedTestTimeString()
238
def _formatTime(self, seconds):
239
"""Format seconds as milliseconds with leading spaces."""
240
# some benchmarks can take thousands of seconds to run, so we need 8
242
return "%8dms" % (1000 * seconds)
244
def _shortened_test_description(self, test):
246
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
249
def startTest(self, test):
250
unittest.TestResult.startTest(self, test)
251
self.report_test_start(test)
252
test.number = self.count
253
self._recordTestStartTime()
255
def _recordTestStartTime(self):
256
"""Record that a test has started."""
257
self._start_time = time.time()
259
def _cleanupLogFile(self, test):
260
# We can only do this if we have one of our TestCases, not if
262
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
263
if setKeepLogfile is not None:
266
def addError(self, test, err):
267
"""Tell result that test finished with an error.
269
Called from the TestCase run() method when the test
270
fails with an unexpected error.
272
self._testConcluded(test)
273
if isinstance(err[1], TestSkipped):
274
return self._addSkipped(test, err)
275
elif isinstance(err[1], UnavailableFeature):
276
return self.addNotSupported(test, err[1].args[0])
278
self._cleanupLogFile(test)
279
unittest.TestResult.addError(self, test, err)
280
self.error_count += 1
281
self.report_error(test, err)
285
def addFailure(self, test, err):
286
"""Tell result that test failed.
288
Called from the TestCase run() method when the test
289
fails because e.g. an assert() method failed.
291
self._testConcluded(test)
292
if isinstance(err[1], KnownFailure):
293
return self._addKnownFailure(test, err)
295
self._cleanupLogFile(test)
296
unittest.TestResult.addFailure(self, test, err)
297
self.failure_count += 1
298
self.report_failure(test, err)
302
def addSuccess(self, test):
303
"""Tell result that test completed successfully.
305
Called from the TestCase run()
307
self._testConcluded(test)
308
if self._bench_history is not None:
309
benchmark_time = self._extractBenchmarkTime(test)
310
if benchmark_time is not None:
311
self._bench_history.write("%s %s\n" % (
312
self._formatTime(benchmark_time),
314
self.report_success(test)
315
self._cleanupLogFile(test)
316
unittest.TestResult.addSuccess(self, test)
318
def _testConcluded(self, test):
319
"""Common code when a test has finished.
321
Called regardless of whether it succeded, failed, etc.
325
def _addKnownFailure(self, test, err):
326
self.known_failure_count += 1
327
self.report_known_failure(test, err)
329
def addNotSupported(self, test, feature):
330
"""The test will not be run because of a missing feature.
332
# this can be called in two different ways: it may be that the
333
# test started running, and then raised (through addError)
334
# UnavailableFeature. Alternatively this method can be called
335
# while probing for features before running the tests; in that
336
# case we will see startTest and stopTest, but the test will never
338
self.unsupported.setdefault(str(feature), 0)
339
self.unsupported[str(feature)] += 1
340
self.report_unsupported(test, feature)
342
def _addSkipped(self, test, skip_excinfo):
343
if isinstance(skip_excinfo[1], TestNotApplicable):
344
self.not_applicable_count += 1
345
self.report_not_applicable(test, skip_excinfo)
348
self.report_skip(test, skip_excinfo)
351
except KeyboardInterrupt:
354
self.addError(test, test._exc_info())
356
# seems best to treat this as success from point-of-view of unittest
357
# -- it actually does nothing so it barely matters :)
358
unittest.TestResult.addSuccess(self, test)
360
def printErrorList(self, flavour, errors):
361
for test, err in errors:
362
self.stream.writeln(self.separator1)
363
self.stream.write("%s: " % flavour)
364
self.stream.writeln(self.getDescription(test))
365
if getattr(test, '_get_log', None) is not None:
366
self.stream.write('\n')
368
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
369
self.stream.write('\n')
370
self.stream.write(test._get_log())
371
self.stream.write('\n')
373
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
374
self.stream.write('\n')
375
self.stream.writeln(self.separator2)
376
self.stream.writeln("%s" % err)
381
def report_cleaning_up(self):
384
def report_success(self, test):
387
def wasStrictlySuccessful(self):
388
if self.unsupported or self.known_failure_count:
390
return self.wasSuccessful()
393
class TextTestResult(ExtendedTestResult):
394
"""Displays progress and results of tests in text form"""
396
def __init__(self, stream, descriptions, verbosity,
401
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
402
bench_history, num_tests)
404
self.pb = self.ui.nested_progress_bar()
405
self._supplied_pb = False
408
self._supplied_pb = True
409
self.pb.show_pct = False
410
self.pb.show_spinner = False
411
self.pb.show_eta = False,
412
self.pb.show_count = False
413
self.pb.show_bar = False
415
def report_starting(self):
416
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
418
def _progress_prefix_text(self):
419
a = '[%d' % self.count
420
if self.num_tests is not None:
421
a +='/%d' % self.num_tests
422
a += ' in %ds' % (time.time() - self._overall_start_time)
424
a += ', %d errors' % self.error_count
425
if self.failure_count:
426
a += ', %d failed' % self.failure_count
427
if self.known_failure_count:
428
a += ', %d known failures' % self.known_failure_count
430
a += ', %d skipped' % self.skip_count
432
a += ', %d missing features' % len(self.unsupported)
436
def report_test_start(self, test):
439
self._progress_prefix_text()
441
+ self._shortened_test_description(test))
443
def _test_description(self, test):
444
return self._shortened_test_description(test)
446
def report_error(self, test, err):
447
self.pb.note('ERROR: %s\n %s\n',
448
self._test_description(test),
452
def report_failure(self, test, err):
453
self.pb.note('FAIL: %s\n %s\n',
454
self._test_description(test),
458
def report_known_failure(self, test, err):
459
self.pb.note('XFAIL: %s\n%s\n',
460
self._test_description(test), err[1])
462
def report_skip(self, test, skip_excinfo):
465
def report_not_applicable(self, test, skip_excinfo):
468
def report_unsupported(self, test, feature):
469
"""test cannot be run because feature is missing."""
471
def report_cleaning_up(self):
472
self.pb.update('cleaning up...')
475
if not self._supplied_pb:
479
class VerboseTestResult(ExtendedTestResult):
480
"""Produce long output, with one line per test run plus times"""
482
def _ellipsize_to_right(self, a_string, final_width):
483
"""Truncate and pad a string, keeping the right hand side"""
484
if len(a_string) > final_width:
485
result = '...' + a_string[3-final_width:]
488
return result.ljust(final_width)
490
def report_starting(self):
491
self.stream.write('running %d tests...\n' % self.num_tests)
493
def report_test_start(self, test):
495
name = self._shortened_test_description(test)
496
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
497
# numbers, plus a trailing blank
498
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
499
self.stream.write(self._ellipsize_to_right(name,
500
osutils.terminal_width()-30))
503
def _error_summary(self, err):
505
return '%s%s' % (indent, err[1])
507
def report_error(self, test, err):
508
self.stream.writeln('ERROR %s\n%s'
509
% (self._testTimeString(test),
510
self._error_summary(err)))
512
def report_failure(self, test, err):
513
self.stream.writeln(' FAIL %s\n%s'
514
% (self._testTimeString(test),
515
self._error_summary(err)))
517
def report_known_failure(self, test, err):
518
self.stream.writeln('XFAIL %s\n%s'
519
% (self._testTimeString(test),
520
self._error_summary(err)))
522
def report_success(self, test):
523
self.stream.writeln(' OK %s' % self._testTimeString(test))
524
for bench_called, stats in getattr(test, '_benchcalls', []):
525
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
526
stats.pprint(file=self.stream)
527
# flush the stream so that we get smooth output. This verbose mode is
528
# used to show the output in PQM.
531
def report_skip(self, test, skip_excinfo):
532
self.stream.writeln(' SKIP %s\n%s'
533
% (self._testTimeString(test),
534
self._error_summary(skip_excinfo)))
536
def report_not_applicable(self, test, skip_excinfo):
537
self.stream.writeln(' N/A %s\n%s'
538
% (self._testTimeString(test),
539
self._error_summary(skip_excinfo)))
541
def report_unsupported(self, test, feature):
542
"""test cannot be run because feature is missing."""
543
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
544
%(self._testTimeString(test), feature))
547
class TextTestRunner(object):
548
stop_on_failure = False
557
self.stream = unittest._WritelnDecorator(stream)
558
self.descriptions = descriptions
559
self.verbosity = verbosity
560
self._bench_history = bench_history
561
self.list_only = list_only
564
"Run the given test case or test suite."
565
startTime = time.time()
566
if self.verbosity == 1:
567
result_class = TextTestResult
568
elif self.verbosity >= 2:
569
result_class = VerboseTestResult
570
result = result_class(self.stream,
573
bench_history=self._bench_history,
574
num_tests=test.countTestCases(),
576
result.stop_early = self.stop_on_failure
577
result.report_starting()
579
if self.verbosity >= 2:
580
self.stream.writeln("Listing tests only ...\n")
582
for t in iter_suite_tests(test):
583
self.stream.writeln("%s" % (t.id()))
585
actionTaken = "Listed"
588
run = result.testsRun
590
stopTime = time.time()
591
timeTaken = stopTime - startTime
593
self.stream.writeln(result.separator2)
594
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
595
run, run != 1 and "s" or "", timeTaken))
596
self.stream.writeln()
597
if not result.wasSuccessful():
598
self.stream.write("FAILED (")
599
failed, errored = map(len, (result.failures, result.errors))
601
self.stream.write("failures=%d" % failed)
603
if failed: self.stream.write(", ")
604
self.stream.write("errors=%d" % errored)
605
if result.known_failure_count:
606
if failed or errored: self.stream.write(", ")
607
self.stream.write("known_failure_count=%d" %
608
result.known_failure_count)
609
self.stream.writeln(")")
611
if result.known_failure_count:
612
self.stream.writeln("OK (known_failures=%d)" %
613
result.known_failure_count)
615
self.stream.writeln("OK")
616
if result.skip_count > 0:
617
skipped = result.skip_count
618
self.stream.writeln('%d test%s skipped' %
619
(skipped, skipped != 1 and "s" or ""))
620
if result.unsupported:
621
for feature, count in sorted(result.unsupported.items()):
622
self.stream.writeln("Missing feature '%s' skipped %d tests." %
628
def iter_suite_tests(suite):
629
"""Return all tests in a suite, recursing through nested suites"""
630
for item in suite._tests:
631
if isinstance(item, unittest.TestCase):
633
elif isinstance(item, unittest.TestSuite):
634
for r in iter_suite_tests(item):
637
raise Exception('unknown object %r inside test suite %r'
641
class TestSkipped(Exception):
642
"""Indicates that a test was intentionally skipped, rather than failing."""
645
class TestNotApplicable(TestSkipped):
646
"""A test is not applicable to the situation where it was run.
648
This is only normally raised by parameterized tests, if they find that
649
the instance they're constructed upon does not support one aspect
654
class KnownFailure(AssertionError):
655
"""Indicates that a test failed in a precisely expected manner.
657
Such failures dont block the whole test suite from passing because they are
658
indicators of partially completed code or of future work. We have an
659
explicit error for them so that we can ensure that they are always visible:
660
KnownFailures are always shown in the output of bzr selftest.
664
class UnavailableFeature(Exception):
665
"""A feature required for this test was not available.
667
The feature should be used to construct the exception.
671
class CommandFailed(Exception):
675
class StringIOWrapper(object):
676
"""A wrapper around cStringIO which just adds an encoding attribute.
678
Internally we can check sys.stdout to see what the output encoding
679
should be. However, cStringIO has no encoding attribute that we can
680
set. So we wrap it instead.
685
def __init__(self, s=None):
687
self.__dict__['_cstring'] = StringIO(s)
689
self.__dict__['_cstring'] = StringIO()
691
def __getattr__(self, name, getattr=getattr):
692
return getattr(self.__dict__['_cstring'], name)
694
def __setattr__(self, name, val):
695
if name == 'encoding':
696
self.__dict__['encoding'] = val
698
return setattr(self._cstring, name, val)
701
class TestUIFactory(ui.CLIUIFactory):
702
"""A UI Factory for testing.
704
Hide the progress bar but emit note()s.
706
Allows get_password to be tested without real tty attached.
713
super(TestUIFactory, self).__init__()
714
if stdin is not None:
715
# We use a StringIOWrapper to be able to test various
716
# encodings, but the user is still responsible to
717
# encode the string and to set the encoding attribute
718
# of StringIOWrapper.
719
self.stdin = StringIOWrapper(stdin)
721
self.stdout = sys.stdout
725
self.stderr = sys.stderr
730
"""See progress.ProgressBar.clear()."""
732
def clear_term(self):
733
"""See progress.ProgressBar.clear_term()."""
735
def clear_term(self):
736
"""See progress.ProgressBar.clear_term()."""
739
"""See progress.ProgressBar.finished()."""
741
def note(self, fmt_string, *args, **kwargs):
742
"""See progress.ProgressBar.note()."""
743
self.stdout.write((fmt_string + "\n") % args)
745
def progress_bar(self):
748
def nested_progress_bar(self):
751
def update(self, message, count=None, total=None):
752
"""See progress.ProgressBar.update()."""
754
def get_non_echoed_password(self, prompt):
755
"""Get password from stdin without trying to handle the echo mode"""
757
self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
758
password = self.stdin.readline()
761
if password[-1] == '\n':
762
password = password[:-1]
766
class TestCase(unittest.TestCase):
767
"""Base class for bzr unit tests.
769
Tests that need access to disk resources should subclass
770
TestCaseInTempDir not TestCase.
772
Error and debug log messages are redirected from their usual
773
location into a temporary file, the contents of which can be
774
retrieved by _get_log(). We use a real OS file, not an in-memory object,
775
so that it can also capture file IO. When the test completes this file
776
is read into memory and removed from disk.
778
There are also convenience functions to invoke bzr's command-line
779
routine, and to build and check bzr trees.
781
In addition to the usual method of overriding tearDown(), this class also
782
allows subclasses to register functions into the _cleanups list, which is
783
run in order as the object is torn down. It's less likely this will be
784
accidentally overlooked.
787
_log_file_name = None
789
_keep_log_file = False
790
# record lsprof data when performing benchmark calls.
791
_gather_lsprof_in_benchmarks = False
793
def __init__(self, methodName='testMethod'):
794
super(TestCase, self).__init__(methodName)
798
unittest.TestCase.setUp(self)
799
self._cleanEnvironment()
800
bzrlib.trace.disable_default_logging()
803
self._benchcalls = []
804
self._benchtime = None
806
self._clear_debug_flags()
808
def _clear_debug_flags(self):
809
"""Prevent externally set debug flags affecting tests.
811
Tests that want to use debug flags can just set them in the
812
debug_flags set during setup/teardown.
814
self._preserved_debug_flags = set(debug.debug_flags)
815
debug.debug_flags.clear()
816
self.addCleanup(self._restore_debug_flags)
818
def _clear_hooks(self):
819
# prevent hooks affecting tests
821
import bzrlib.smart.server
822
self._preserved_hooks = {
823
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
824
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
826
self.addCleanup(self._restoreHooks)
827
# reset all hooks to an empty instance of the appropriate type
828
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
829
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
831
def _silenceUI(self):
832
"""Turn off UI for duration of test"""
833
# by default the UI is off; tests can turn it on if they want it.
834
saved = ui.ui_factory
836
ui.ui_factory = saved
837
ui.ui_factory = ui.SilentUIFactory()
838
self.addCleanup(_restore)
840
def _ndiff_strings(self, a, b):
841
"""Return ndiff between two strings containing lines.
843
A trailing newline is added if missing to make the strings
845
if b and b[-1] != '\n':
847
if a and a[-1] != '\n':
849
difflines = difflib.ndiff(a.splitlines(True),
851
linejunk=lambda x: False,
852
charjunk=lambda x: False)
853
return ''.join(difflines)
855
def assertEqual(self, a, b, message=''):
859
except UnicodeError, e:
860
# If we can't compare without getting a UnicodeError, then
861
# obviously they are different
862
mutter('UnicodeError: %s', e)
865
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
867
pformat(a), pformat(b)))
869
assertEquals = assertEqual
871
def assertEqualDiff(self, a, b, message=None):
872
"""Assert two texts are equal, if not raise an exception.
874
This is intended for use with multi-line strings where it can
875
be hard to find the differences by eye.
877
# TODO: perhaps override assertEquals to call this for strings?
881
message = "texts not equal:\n"
882
raise AssertionError(message +
883
self._ndiff_strings(a, b))
885
def assertEqualMode(self, mode, mode_test):
886
self.assertEqual(mode, mode_test,
887
'mode mismatch %o != %o' % (mode, mode_test))
889
def assertPositive(self, val):
890
"""Assert that val is greater than 0."""
891
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
893
def assertNegative(self, val):
894
"""Assert that val is less than 0."""
895
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
897
def assertStartsWith(self, s, prefix):
898
if not s.startswith(prefix):
899
raise AssertionError('string %r does not start with %r' % (s, prefix))
901
def assertEndsWith(self, s, suffix):
902
"""Asserts that s ends with suffix."""
903
if not s.endswith(suffix):
904
raise AssertionError('string %r does not end with %r' % (s, suffix))
906
def assertContainsRe(self, haystack, needle_re):
907
"""Assert that a contains something matching a regular expression."""
908
if not re.search(needle_re, haystack):
909
if '\n' in haystack or len(haystack) > 60:
910
# a long string, format it in a more readable way
911
raise AssertionError(
912
'pattern "%s" not found in\n"""\\\n%s"""\n'
913
% (needle_re, haystack))
915
raise AssertionError('pattern "%s" not found in "%s"'
916
% (needle_re, haystack))
918
def assertNotContainsRe(self, haystack, needle_re):
919
"""Assert that a does not match a regular expression"""
920
if re.search(needle_re, haystack):
921
raise AssertionError('pattern "%s" found in "%s"'
922
% (needle_re, haystack))
924
def assertSubset(self, sublist, superlist):
925
"""Assert that every entry in sublist is present in superlist."""
926
missing = set(sublist) - set(superlist)
928
raise AssertionError("value(s) %r not present in container %r" %
929
(missing, superlist))
931
def assertListRaises(self, excClass, func, *args, **kwargs):
932
"""Fail unless excClass is raised when the iterator from func is used.
934
Many functions can return generators this makes sure
935
to wrap them in a list() call to make sure the whole generator
936
is run, and that the proper exception is raised.
939
list(func(*args, **kwargs))
943
if getattr(excClass,'__name__', None) is not None:
944
excName = excClass.__name__
946
excName = str(excClass)
947
raise self.failureException, "%s not raised" % excName
949
def assertRaises(self, excClass, callableObj, *args, **kwargs):
950
"""Assert that a callable raises a particular exception.
952
:param excClass: As for the except statement, this may be either an
953
exception class, or a tuple of classes.
954
:param callableObj: A callable, will be passed ``*args`` and
957
Returns the exception so that you can examine it.
960
callableObj(*args, **kwargs)
964
if getattr(excClass,'__name__', None) is not None:
965
excName = excClass.__name__
968
excName = str(excClass)
969
raise self.failureException, "%s not raised" % excName
971
def assertIs(self, left, right, message=None):
972
if not (left is right):
973
if message is not None:
974
raise AssertionError(message)
976
raise AssertionError("%r is not %r." % (left, right))
978
def assertIsNot(self, left, right, message=None):
980
if message is not None:
981
raise AssertionError(message)
983
raise AssertionError("%r is %r." % (left, right))
985
def assertTransportMode(self, transport, path, mode):
986
"""Fail if a path does not have mode mode.
988
If modes are not supported on this transport, the assertion is ignored.
990
if not transport._can_roundtrip_unix_modebits():
992
path_stat = transport.stat(path)
993
actual_mode = stat.S_IMODE(path_stat.st_mode)
994
self.assertEqual(mode, actual_mode,
995
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
997
def assertIsSameRealPath(self, path1, path2):
998
"""Fail if path1 and path2 points to different files"""
999
self.assertEqual(osutils.realpath(path1),
1000
osutils.realpath(path2),
1001
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1003
def assertIsInstance(self, obj, kls):
1004
"""Fail if obj is not an instance of kls"""
1005
if not isinstance(obj, kls):
1006
self.fail("%r is an instance of %s rather than %s" % (
1007
obj, obj.__class__, kls))
1009
def expectFailure(self, reason, assertion, *args, **kwargs):
1010
"""Invoke a test, expecting it to fail for the given reason.
1012
This is for assertions that ought to succeed, but currently fail.
1013
(The failure is *expected* but not *wanted*.) Please be very precise
1014
about the failure you're expecting. If a new bug is introduced,
1015
AssertionError should be raised, not KnownFailure.
1017
Frequently, expectFailure should be followed by an opposite assertion.
1020
Intended to be used with a callable that raises AssertionError as the
1021
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1023
Raises KnownFailure if the test fails. Raises AssertionError if the
1028
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1030
self.assertEqual(42, dynamic_val)
1032
This means that a dynamic_val of 54 will cause the test to raise
1033
a KnownFailure. Once math is fixed and the expectFailure is removed,
1034
only a dynamic_val of 42 will allow the test to pass. Anything other
1035
than 54 or 42 will cause an AssertionError.
1038
assertion(*args, **kwargs)
1039
except AssertionError:
1040
raise KnownFailure(reason)
1042
self.fail('Unexpected success. Should have failed: %s' % reason)
1044
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1045
"""A helper for callDeprecated and applyDeprecated.
1047
:param a_callable: A callable to call.
1048
:param args: The positional arguments for the callable
1049
:param kwargs: The keyword arguments for the callable
1050
:return: A tuple (warnings, result). result is the result of calling
1051
a_callable(``*args``, ``**kwargs``).
1054
def capture_warnings(msg, cls=None, stacklevel=None):
1055
# we've hooked into a deprecation specific callpath,
1056
# only deprecations should getting sent via it.
1057
self.assertEqual(cls, DeprecationWarning)
1058
local_warnings.append(msg)
1059
original_warning_method = symbol_versioning.warn
1060
symbol_versioning.set_warning_method(capture_warnings)
1062
result = a_callable(*args, **kwargs)
1064
symbol_versioning.set_warning_method(original_warning_method)
1065
return (local_warnings, result)
1067
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1068
"""Call a deprecated callable without warning the user.
1070
Note that this only captures warnings raised by symbol_versioning.warn,
1071
not other callers that go direct to the warning module.
1073
To test that a deprecated method raises an error, do something like
1076
self.assertRaises(errors.ReservedId,
1077
self.applyDeprecated, zero_ninetyone,
1078
br.append_revision, 'current:')
1080
:param deprecation_format: The deprecation format that the callable
1081
should have been deprecated with. This is the same type as the
1082
parameter to deprecated_method/deprecated_function. If the
1083
callable is not deprecated with this format, an assertion error
1085
:param a_callable: A callable to call. This may be a bound method or
1086
a regular function. It will be called with ``*args`` and
1088
:param args: The positional arguments for the callable
1089
:param kwargs: The keyword arguments for the callable
1090
:return: The result of a_callable(``*args``, ``**kwargs``)
1092
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1094
expected_first_warning = symbol_versioning.deprecation_string(
1095
a_callable, deprecation_format)
1096
if len(call_warnings) == 0:
1097
self.fail("No deprecation warning generated by call to %s" %
1099
self.assertEqual(expected_first_warning, call_warnings[0])
1102
def callCatchWarnings(self, fn, *args, **kw):
1103
"""Call a callable that raises python warnings.
1105
The caller's responsible for examining the returned warnings.
1107
If the callable raises an exception, the exception is not
1108
caught and propagates up to the caller. In that case, the list
1109
of warnings is not available.
1111
:returns: ([warning_object, ...], fn_result)
1113
# XXX: This is not perfect, because it completely overrides the
1114
# warnings filters, and some code may depend on suppressing particular
1115
# warnings. It's the easiest way to insulate ourselves from -Werror,
1116
# though. -- Andrew, 20071062
1118
def _catcher(message, category, filename, lineno, file=None):
1119
# despite the name, 'message' is normally(?) a Warning subclass
1121
wlist.append(message)
1122
saved_showwarning = warnings.showwarning
1123
saved_filters = warnings.filters
1125
warnings.showwarning = _catcher
1126
warnings.filters = []
1127
result = fn(*args, **kw)
1129
warnings.showwarning = saved_showwarning
1130
warnings.filters = saved_filters
1131
return wlist, result
1133
def callDeprecated(self, expected, callable, *args, **kwargs):
1134
"""Assert that a callable is deprecated in a particular way.
1136
This is a very precise test for unusual requirements. The
1137
applyDeprecated helper function is probably more suited for most tests
1138
as it allows you to simply specify the deprecation format being used
1139
and will ensure that that is issued for the function being called.
1141
Note that this only captures warnings raised by symbol_versioning.warn,
1142
not other callers that go direct to the warning module. To catch
1143
general warnings, use callCatchWarnings.
1145
:param expected: a list of the deprecation warnings expected, in order
1146
:param callable: The callable to call
1147
:param args: The positional arguments for the callable
1148
:param kwargs: The keyword arguments for the callable
1150
call_warnings, result = self._capture_deprecation_warnings(callable,
1152
self.assertEqual(expected, call_warnings)
1155
def _startLogFile(self):
1156
"""Send bzr and test log messages to a temporary file.
1158
The file is removed as the test is torn down.
1160
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1161
self._log_file = os.fdopen(fileno, 'w+')
1162
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
1163
self._log_file_name = name
1164
self.addCleanup(self._finishLogFile)
1166
def _finishLogFile(self):
1167
"""Finished with the log file.
1169
Close the file and delete it, unless setKeepLogfile was called.
1171
if self._log_file is None:
1173
bzrlib.trace.disable_test_log(self._log_nonce)
1174
self._log_file.close()
1175
self._log_file = None
1176
if not self._keep_log_file:
1177
os.remove(self._log_file_name)
1178
self._log_file_name = None
1180
def setKeepLogfile(self):
1181
"""Make the logfile not be deleted when _finishLogFile is called."""
1182
self._keep_log_file = True
1184
def addCleanup(self, callable):
1185
"""Arrange to run a callable when this case is torn down.
1187
Callables are run in the reverse of the order they are registered,
1188
ie last-in first-out.
1190
if callable in self._cleanups:
1191
raise ValueError("cleanup function %r already registered on %s"
1193
self._cleanups.append(callable)
1195
def _cleanEnvironment(self):
1197
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1198
'HOME': os.getcwd(),
1199
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
1200
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1202
'BZREMAIL': None, # may still be present in the environment
1204
'BZR_PROGRESS_BAR': None,
1207
'SSH_AUTH_SOCK': None,
1211
'https_proxy': None,
1212
'HTTPS_PROXY': None,
1217
# Nobody cares about these ones AFAIK. So far at
1218
# least. If you do (care), please update this comment
1222
'BZR_REMOTE_PATH': None,
1225
self.addCleanup(self._restoreEnvironment)
1226
for name, value in new_env.iteritems():
1227
self._captureVar(name, value)
1229
def _captureVar(self, name, newvalue):
1230
"""Set an environment variable, and reset it when finished."""
1231
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1233
def _restore_debug_flags(self):
1234
debug.debug_flags.clear()
1235
debug.debug_flags.update(self._preserved_debug_flags)
1237
def _restoreEnvironment(self):
1238
for name, value in self.__old_env.iteritems():
1239
osutils.set_or_unset_env(name, value)
1241
def _restoreHooks(self):
1242
for klass, hooks in self._preserved_hooks.items():
1243
setattr(klass, 'hooks', hooks)
1245
def knownFailure(self, reason):
1246
"""This test has failed for some known reason."""
1247
raise KnownFailure(reason)
1249
def run(self, result=None):
1250
if result is None: result = self.defaultTestResult()
1251
for feature in getattr(self, '_test_needs_features', []):
1252
if not feature.available():
1253
result.startTest(self)
1254
if getattr(result, 'addNotSupported', None):
1255
result.addNotSupported(self, feature)
1257
result.addSuccess(self)
1258
result.stopTest(self)
1260
return unittest.TestCase.run(self, result)
1264
unittest.TestCase.tearDown(self)
1266
def time(self, callable, *args, **kwargs):
1267
"""Run callable and accrue the time it takes to the benchmark time.
1269
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1270
this will cause lsprofile statistics to be gathered and stored in
1273
if self._benchtime is None:
1277
if not self._gather_lsprof_in_benchmarks:
1278
return callable(*args, **kwargs)
1280
# record this benchmark
1281
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1283
self._benchcalls.append(((callable, args, kwargs), stats))
1286
self._benchtime += time.time() - start
1288
def _runCleanups(self):
1289
"""Run registered cleanup functions.
1291
This should only be called from TestCase.tearDown.
1293
# TODO: Perhaps this should keep running cleanups even if
1294
# one of them fails?
1296
# Actually pop the cleanups from the list so tearDown running
1297
# twice is safe (this happens for skipped tests).
1298
while self._cleanups:
1299
self._cleanups.pop()()
1301
def log(self, *args):
1304
def _get_log(self, keep_log_file=False):
1305
"""Get the log from bzrlib.trace calls from this test.
1307
:param keep_log_file: When True, if the log is still a file on disk
1308
leave it as a file on disk. When False, if the log is still a file
1309
on disk, the log file is deleted and the log preserved as
1311
:return: A string containing the log.
1313
# flush the log file, to get all content
1315
bzrlib.trace._trace_file.flush()
1316
if self._log_contents:
1317
# XXX: this can hardly contain the content flushed above --vila
1319
return self._log_contents
1320
if self._log_file_name is not None:
1321
logfile = open(self._log_file_name)
1323
log_contents = logfile.read()
1326
if not keep_log_file:
1327
self._log_contents = log_contents
1329
os.remove(self._log_file_name)
1331
if sys.platform == 'win32' and e.errno == errno.EACCES:
1332
sys.stderr.write(('Unable to delete log file '
1333
' %r\n' % self._log_file_name))
1338
return "DELETED log file to reduce memory footprint"
1340
def requireFeature(self, feature):
1341
"""This test requires a specific feature is available.
1343
:raises UnavailableFeature: When feature is not available.
1345
if not feature.available():
1346
raise UnavailableFeature(feature)
1348
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1350
"""Run bazaar command line, splitting up a string command line."""
1351
if isinstance(args, basestring):
1352
# shlex don't understand unicode strings,
1353
# so args should be plain string (bialix 20070906)
1354
args = list(shlex.split(str(args)))
1355
return self._run_bzr_core(args, retcode=retcode,
1356
encoding=encoding, stdin=stdin, working_dir=working_dir,
1359
def _run_bzr_core(self, args, retcode, encoding, stdin,
1361
if encoding is None:
1362
encoding = bzrlib.user_encoding
1363
stdout = StringIOWrapper()
1364
stderr = StringIOWrapper()
1365
stdout.encoding = encoding
1366
stderr.encoding = encoding
1368
self.log('run bzr: %r', args)
1369
# FIXME: don't call into logging here
1370
handler = logging.StreamHandler(stderr)
1371
handler.setLevel(logging.INFO)
1372
logger = logging.getLogger('')
1373
logger.addHandler(handler)
1374
old_ui_factory = ui.ui_factory
1375
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1378
if working_dir is not None:
1379
cwd = osutils.getcwd()
1380
os.chdir(working_dir)
1383
result = self.apply_redirected(ui.ui_factory.stdin,
1385
bzrlib.commands.run_bzr_catch_user_errors,
1388
logger.removeHandler(handler)
1389
ui.ui_factory = old_ui_factory
1393
out = stdout.getvalue()
1394
err = stderr.getvalue()
1396
self.log('output:\n%r', out)
1398
self.log('errors:\n%r', err)
1399
if retcode is not None:
1400
self.assertEquals(retcode, result,
1401
message='Unexpected return code')
1404
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1405
working_dir=None, error_regexes=[], output_encoding=None):
1406
"""Invoke bzr, as if it were run from the command line.
1408
The argument list should not include the bzr program name - the
1409
first argument is normally the bzr command. Arguments may be
1410
passed in three ways:
1412
1- A list of strings, eg ["commit", "a"]. This is recommended
1413
when the command contains whitespace or metacharacters, or
1414
is built up at run time.
1416
2- A single string, eg "add a". This is the most convenient
1417
for hardcoded commands.
1419
This runs bzr through the interface that catches and reports
1420
errors, and with logging set to something approximating the
1421
default, so that error reporting can be checked.
1423
This should be the main method for tests that want to exercise the
1424
overall behavior of the bzr application (rather than a unit test
1425
or a functional test of the library.)
1427
This sends the stdout/stderr results into the test's log,
1428
where it may be useful for debugging. See also run_captured.
1430
:keyword stdin: A string to be used as stdin for the command.
1431
:keyword retcode: The status code the command should return;
1433
:keyword working_dir: The directory to run the command in
1434
:keyword error_regexes: A list of expected error messages. If
1435
specified they must be seen in the error output of the command.
1437
out, err = self._run_bzr_autosplit(
1442
working_dir=working_dir,
1444
for regex in error_regexes:
1445
self.assertContainsRe(err, regex)
1448
def run_bzr_error(self, error_regexes, *args, **kwargs):
1449
"""Run bzr, and check that stderr contains the supplied regexes
1451
:param error_regexes: Sequence of regular expressions which
1452
must each be found in the error output. The relative ordering
1454
:param args: command-line arguments for bzr
1455
:param kwargs: Keyword arguments which are interpreted by run_bzr
1456
This function changes the default value of retcode to be 3,
1457
since in most cases this is run when you expect bzr to fail.
1459
:return: (out, err) The actual output of running the command (in case
1460
you want to do more inspection)
1464
# Make sure that commit is failing because there is nothing to do
1465
self.run_bzr_error(['no changes to commit'],
1466
['commit', '-m', 'my commit comment'])
1467
# Make sure --strict is handling an unknown file, rather than
1468
# giving us the 'nothing to do' error
1469
self.build_tree(['unknown'])
1470
self.run_bzr_error(['Commit refused because there are unknown files'],
1471
['commit', --strict', '-m', 'my commit comment'])
1473
kwargs.setdefault('retcode', 3)
1474
kwargs['error_regexes'] = error_regexes
1475
out, err = self.run_bzr(*args, **kwargs)
1478
def run_bzr_subprocess(self, *args, **kwargs):
1479
"""Run bzr in a subprocess for testing.
1481
This starts a new Python interpreter and runs bzr in there.
1482
This should only be used for tests that have a justifiable need for
1483
this isolation: e.g. they are testing startup time, or signal
1484
handling, or early startup code, etc. Subprocess code can't be
1485
profiled or debugged so easily.
1487
:keyword retcode: The status code that is expected. Defaults to 0. If
1488
None is supplied, the status code is not checked.
1489
:keyword env_changes: A dictionary which lists changes to environment
1490
variables. A value of None will unset the env variable.
1491
The values must be strings. The change will only occur in the
1492
child, so you don't need to fix the environment after running.
1493
:keyword universal_newlines: Convert CRLF => LF
1494
:keyword allow_plugins: By default the subprocess is run with
1495
--no-plugins to ensure test reproducibility. Also, it is possible
1496
for system-wide plugins to create unexpected output on stderr,
1497
which can cause unnecessary test failures.
1499
env_changes = kwargs.get('env_changes', {})
1500
working_dir = kwargs.get('working_dir', None)
1501
allow_plugins = kwargs.get('allow_plugins', False)
1503
if isinstance(args[0], list):
1505
elif isinstance(args[0], basestring):
1506
args = list(shlex.split(args[0]))
1508
symbol_versioning.warn(zero_ninetyone %
1509
"passing varargs to run_bzr_subprocess",
1510
DeprecationWarning, stacklevel=3)
1511
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1512
working_dir=working_dir,
1513
allow_plugins=allow_plugins)
1514
# We distinguish between retcode=None and retcode not passed.
1515
supplied_retcode = kwargs.get('retcode', 0)
1516
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1517
universal_newlines=kwargs.get('universal_newlines', False),
1520
def start_bzr_subprocess(self, process_args, env_changes=None,
1521
skip_if_plan_to_signal=False,
1523
allow_plugins=False):
1524
"""Start bzr in a subprocess for testing.
1526
This starts a new Python interpreter and runs bzr in there.
1527
This should only be used for tests that have a justifiable need for
1528
this isolation: e.g. they are testing startup time, or signal
1529
handling, or early startup code, etc. Subprocess code can't be
1530
profiled or debugged so easily.
1532
:param process_args: a list of arguments to pass to the bzr executable,
1533
for example ``['--version']``.
1534
:param env_changes: A dictionary which lists changes to environment
1535
variables. A value of None will unset the env variable.
1536
The values must be strings. The change will only occur in the
1537
child, so you don't need to fix the environment after running.
1538
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1540
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1542
:returns: Popen object for the started process.
1544
if skip_if_plan_to_signal:
1545
if not getattr(os, 'kill', None):
1546
raise TestSkipped("os.kill not available.")
1548
if env_changes is None:
1552
def cleanup_environment():
1553
for env_var, value in env_changes.iteritems():
1554
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1556
def restore_environment():
1557
for env_var, value in old_env.iteritems():
1558
osutils.set_or_unset_env(env_var, value)
1560
bzr_path = self.get_bzr_path()
1563
if working_dir is not None:
1564
cwd = osutils.getcwd()
1565
os.chdir(working_dir)
1568
# win32 subprocess doesn't support preexec_fn
1569
# so we will avoid using it on all platforms, just to
1570
# make sure the code path is used, and we don't break on win32
1571
cleanup_environment()
1572
command = [sys.executable, bzr_path]
1573
if not allow_plugins:
1574
command.append('--no-plugins')
1575
command.extend(process_args)
1576
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1578
restore_environment()
1584
def _popen(self, *args, **kwargs):
1585
"""Place a call to Popen.
1587
Allows tests to override this method to intercept the calls made to
1588
Popen for introspection.
1590
return Popen(*args, **kwargs)
1592
def get_bzr_path(self):
1593
"""Return the path of the 'bzr' executable for this test suite."""
1594
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1595
if not os.path.isfile(bzr_path):
1596
# We are probably installed. Assume sys.argv is the right file
1597
bzr_path = sys.argv[0]
1600
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1601
universal_newlines=False, process_args=None):
1602
"""Finish the execution of process.
1604
:param process: the Popen object returned from start_bzr_subprocess.
1605
:param retcode: The status code that is expected. Defaults to 0. If
1606
None is supplied, the status code is not checked.
1607
:param send_signal: an optional signal to send to the process.
1608
:param universal_newlines: Convert CRLF => LF
1609
:returns: (stdout, stderr)
1611
if send_signal is not None:
1612
os.kill(process.pid, send_signal)
1613
out, err = process.communicate()
1615
if universal_newlines:
1616
out = out.replace('\r\n', '\n')
1617
err = err.replace('\r\n', '\n')
1619
if retcode is not None and retcode != process.returncode:
1620
if process_args is None:
1621
process_args = "(unknown args)"
1622
mutter('Output of bzr %s:\n%s', process_args, out)
1623
mutter('Error for bzr %s:\n%s', process_args, err)
1624
self.fail('Command bzr %s failed with retcode %s != %s'
1625
% (process_args, retcode, process.returncode))
1628
def check_inventory_shape(self, inv, shape):
1629
"""Compare an inventory to a list of expected names.
1631
Fail if they are not precisely equal.
1634
shape = list(shape) # copy
1635
for path, ie in inv.entries():
1636
name = path.replace('\\', '/')
1637
if ie.kind == 'directory':
1644
self.fail("expected paths not found in inventory: %r" % shape)
1646
self.fail("unexpected paths found in inventory: %r" % extras)
1648
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1649
a_callable=None, *args, **kwargs):
1650
"""Call callable with redirected std io pipes.
1652
Returns the return code."""
1653
if not callable(a_callable):
1654
raise ValueError("a_callable must be callable.")
1656
stdin = StringIO("")
1658
if getattr(self, "_log_file", None) is not None:
1659
stdout = self._log_file
1663
if getattr(self, "_log_file", None is not None):
1664
stderr = self._log_file
1667
real_stdin = sys.stdin
1668
real_stdout = sys.stdout
1669
real_stderr = sys.stderr
1674
return a_callable(*args, **kwargs)
1676
sys.stdout = real_stdout
1677
sys.stderr = real_stderr
1678
sys.stdin = real_stdin
1680
def reduceLockdirTimeout(self):
1681
"""Reduce the default lock timeout for the duration of the test, so that
1682
if LockContention occurs during a test, it does so quickly.
1684
Tests that expect to provoke LockContention errors should call this.
1686
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1688
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1689
self.addCleanup(resetTimeout)
1690
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1692
def make_utf8_encoded_stringio(self, encoding_type=None):
1693
"""Return a StringIOWrapper instance, that will encode Unicode
1696
if encoding_type is None:
1697
encoding_type = 'strict'
1699
output_encoding = 'utf-8'
1700
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1701
sio.encoding = output_encoding
1705
class TestCaseWithMemoryTransport(TestCase):
1706
"""Common test class for tests that do not need disk resources.
1708
Tests that need disk resources should derive from TestCaseWithTransport.
1710
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1712
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1713
a directory which does not exist. This serves to help ensure test isolation
1714
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1715
must exist. However, TestCaseWithMemoryTransport does not offer local
1716
file defaults for the transport in tests, nor does it obey the command line
1717
override, so tests that accidentally write to the common directory should
1720
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1721
a .bzr directory that stops us ascending higher into the filesystem.
1727
def __init__(self, methodName='runTest'):
1728
# allow test parameterization after test construction and before test
1729
# execution. Variables that the parameterizer sets need to be
1730
# ones that are not set by setUp, or setUp will trash them.
1731
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1732
self.vfs_transport_factory = default_transport
1733
self.transport_server = None
1734
self.transport_readonly_server = None
1735
self.__vfs_server = None
1737
def get_transport(self, relpath=None):
1738
"""Return a writeable transport.
1740
This transport is for the test scratch space relative to
1743
:param relpath: a path relative to the base url.
1745
t = get_transport(self.get_url(relpath))
1746
self.assertFalse(t.is_readonly())
1749
def get_readonly_transport(self, relpath=None):
1750
"""Return a readonly transport for the test scratch space
1752
This can be used to test that operations which should only need
1753
readonly access in fact do not try to write.
1755
:param relpath: a path relative to the base url.
1757
t = get_transport(self.get_readonly_url(relpath))
1758
self.assertTrue(t.is_readonly())
1761
def create_transport_readonly_server(self):
1762
"""Create a transport server from class defined at init.
1764
This is mostly a hook for daughter classes.
1766
return self.transport_readonly_server()
1768
def get_readonly_server(self):
1769
"""Get the server instance for the readonly transport
1771
This is useful for some tests with specific servers to do diagnostics.
1773
if self.__readonly_server is None:
1774
if self.transport_readonly_server is None:
1775
# readonly decorator requested
1776
# bring up the server
1777
self.__readonly_server = ReadonlyServer()
1778
self.__readonly_server.setUp(self.get_vfs_only_server())
1780
self.__readonly_server = self.create_transport_readonly_server()
1781
self.__readonly_server.setUp(self.get_vfs_only_server())
1782
self.addCleanup(self.__readonly_server.tearDown)
1783
return self.__readonly_server
1785
def get_readonly_url(self, relpath=None):
1786
"""Get a URL for the readonly transport.
1788
This will either be backed by '.' or a decorator to the transport
1789
used by self.get_url()
1790
relpath provides for clients to get a path relative to the base url.
1791
These should only be downwards relative, not upwards.
1793
base = self.get_readonly_server().get_url()
1794
return self._adjust_url(base, relpath)
1796
def get_vfs_only_server(self):
1797
"""Get the vfs only read/write server instance.
1799
This is useful for some tests with specific servers that need
1802
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1803
is no means to override it.
1805
if self.__vfs_server is None:
1806
self.__vfs_server = MemoryServer()
1807
self.__vfs_server.setUp()
1808
self.addCleanup(self.__vfs_server.tearDown)
1809
return self.__vfs_server
1811
def get_server(self):
1812
"""Get the read/write server instance.
1814
This is useful for some tests with specific servers that need
1817
This is built from the self.transport_server factory. If that is None,
1818
then the self.get_vfs_server is returned.
1820
if self.__server is None:
1821
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1822
return self.get_vfs_only_server()
1824
# bring up a decorated means of access to the vfs only server.
1825
self.__server = self.transport_server()
1827
self.__server.setUp(self.get_vfs_only_server())
1828
except TypeError, e:
1829
# This should never happen; the try:Except here is to assist
1830
# developers having to update code rather than seeing an
1831
# uninformative TypeError.
1832
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1833
self.addCleanup(self.__server.tearDown)
1834
return self.__server
1836
def _adjust_url(self, base, relpath):
1837
"""Get a URL (or maybe a path) for the readwrite transport.
1839
This will either be backed by '.' or to an equivalent non-file based
1841
relpath provides for clients to get a path relative to the base url.
1842
These should only be downwards relative, not upwards.
1844
if relpath is not None and relpath != '.':
1845
if not base.endswith('/'):
1847
# XXX: Really base should be a url; we did after all call
1848
# get_url()! But sometimes it's just a path (from
1849
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1850
# to a non-escaped local path.
1851
if base.startswith('./') or base.startswith('/'):
1854
base += urlutils.escape(relpath)
1857
def get_url(self, relpath=None):
1858
"""Get a URL (or maybe a path) for the readwrite transport.
1860
This will either be backed by '.' or to an equivalent non-file based
1862
relpath provides for clients to get a path relative to the base url.
1863
These should only be downwards relative, not upwards.
1865
base = self.get_server().get_url()
1866
return self._adjust_url(base, relpath)
1868
def get_vfs_only_url(self, relpath=None):
1869
"""Get a URL (or maybe a path for the plain old vfs transport.
1871
This will never be a smart protocol. It always has all the
1872
capabilities of the local filesystem, but it might actually be a
1873
MemoryTransport or some other similar virtual filesystem.
1875
This is the backing transport (if any) of the server returned by
1876
get_url and get_readonly_url.
1878
:param relpath: provides for clients to get a path relative to the base
1879
url. These should only be downwards relative, not upwards.
1882
base = self.get_vfs_only_server().get_url()
1883
return self._adjust_url(base, relpath)
1885
def _create_safety_net(self):
1886
"""Make a fake bzr directory.
1888
This prevents any tests propagating up onto the TEST_ROOT directory's
1891
root = TestCaseWithMemoryTransport.TEST_ROOT
1892
bzrdir.BzrDir.create_standalone_workingtree(root)
1894
def _check_safety_net(self):
1895
"""Check that the safety .bzr directory have not been touched.
1897
_make_test_root have created a .bzr directory to prevent tests from
1898
propagating. This method ensures than a test did not leaked.
1900
root = TestCaseWithMemoryTransport.TEST_ROOT
1901
wt = workingtree.WorkingTree.open(root)
1902
last_rev = wt.last_revision()
1903
if last_rev != 'null:':
1904
# The current test have modified the /bzr directory, we need to
1905
# recreate a new one or all the followng tests will fail.
1906
# If you need to inspect its content uncomment the following line
1907
# import pdb; pdb.set_trace()
1908
_rmtree_temp_dir(root + '/.bzr')
1909
self._create_safety_net()
1910
raise AssertionError('%s/.bzr should not be modified' % root)
1912
def _make_test_root(self):
1913
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1914
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1915
TestCaseWithMemoryTransport.TEST_ROOT = root
1917
self._create_safety_net()
1919
# The same directory is used by all tests, and we're not
1920
# specifically told when all tests are finished. This will do.
1921
atexit.register(_rmtree_temp_dir, root)
1923
self.addCleanup(self._check_safety_net)
1925
def makeAndChdirToTestDir(self):
1926
"""Create a temporary directories for this one test.
1928
This must set self.test_home_dir and self.test_dir and chdir to
1931
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1933
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1934
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1935
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1937
def make_branch(self, relpath, format=None):
1938
"""Create a branch on the transport at relpath."""
1939
repo = self.make_repository(relpath, format=format)
1940
return repo.bzrdir.create_branch()
1942
def make_bzrdir(self, relpath, format=None):
1944
# might be a relative or absolute path
1945
maybe_a_url = self.get_url(relpath)
1946
segments = maybe_a_url.rsplit('/', 1)
1947
t = get_transport(maybe_a_url)
1948
if len(segments) > 1 and segments[-1] not in ('', '.'):
1952
if isinstance(format, basestring):
1953
format = bzrdir.format_registry.make_bzrdir(format)
1954
return format.initialize_on_transport(t)
1955
except errors.UninitializableFormat:
1956
raise TestSkipped("Format %s is not initializable." % format)
1958
def make_repository(self, relpath, shared=False, format=None):
1959
"""Create a repository on our default transport at relpath.
1961
Note that relpath must be a relative path, not a full url.
1963
# FIXME: If you create a remoterepository this returns the underlying
1964
# real format, which is incorrect. Actually we should make sure that
1965
# RemoteBzrDir returns a RemoteRepository.
1966
# maybe mbp 20070410
1967
made_control = self.make_bzrdir(relpath, format=format)
1968
return made_control.create_repository(shared=shared)
1970
def make_branch_and_memory_tree(self, relpath, format=None):
1971
"""Create a branch on the default transport and a MemoryTree for it."""
1972
b = self.make_branch(relpath, format=format)
1973
return memorytree.MemoryTree.create_on_branch(b)
1975
def overrideEnvironmentForTesting(self):
1976
os.environ['HOME'] = self.test_home_dir
1977
os.environ['BZR_HOME'] = self.test_home_dir
1980
super(TestCaseWithMemoryTransport, self).setUp()
1981
self._make_test_root()
1982
_currentdir = os.getcwdu()
1983
def _leaveDirectory():
1984
os.chdir(_currentdir)
1985
self.addCleanup(_leaveDirectory)
1986
self.makeAndChdirToTestDir()
1987
self.overrideEnvironmentForTesting()
1988
self.__readonly_server = None
1989
self.__server = None
1990
self.reduceLockdirTimeout()
1993
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1994
"""Derived class that runs a test within a temporary directory.
1996
This is useful for tests that need to create a branch, etc.
1998
The directory is created in a slightly complex way: for each
1999
Python invocation, a new temporary top-level directory is created.
2000
All test cases create their own directory within that. If the
2001
tests complete successfully, the directory is removed.
2003
:ivar test_base_dir: The path of the top-level directory for this
2004
test, which contains a home directory and a work directory.
2006
:ivar test_home_dir: An initially empty directory under test_base_dir
2007
which is used as $HOME for this test.
2009
:ivar test_dir: A directory under test_base_dir used as the current
2010
directory when the test proper is run.
2013
OVERRIDE_PYTHON = 'python'
2015
def check_file_contents(self, filename, expect):
2016
self.log("check contents of file %s" % filename)
2017
contents = file(filename, 'r').read()
2018
if contents != expect:
2019
self.log("expected: %r" % expect)
2020
self.log("actually: %r" % contents)
2021
self.fail("contents of %s not as expected" % filename)
2023
def makeAndChdirToTestDir(self):
2024
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2026
For TestCaseInTempDir we create a temporary directory based on the test
2027
name and then create two subdirs - test and home under it.
2029
# create a directory within the top level test directory
2030
candidate_dir = osutils.mkdtemp(dir=self.TEST_ROOT)
2031
# now create test and home directories within this dir
2032
self.test_base_dir = candidate_dir
2033
self.test_home_dir = self.test_base_dir + '/home'
2034
os.mkdir(self.test_home_dir)
2035
self.test_dir = self.test_base_dir + '/work'
2036
os.mkdir(self.test_dir)
2037
os.chdir(self.test_dir)
2038
# put name of test inside
2039
f = file(self.test_base_dir + '/name', 'w')
2044
self.addCleanup(self.deleteTestDir)
2046
def deleteTestDir(self):
2047
os.chdir(self.TEST_ROOT)
2048
_rmtree_temp_dir(self.test_base_dir)
2050
def build_tree(self, shape, line_endings='binary', transport=None):
2051
"""Build a test tree according to a pattern.
2053
shape is a sequence of file specifications. If the final
2054
character is '/', a directory is created.
2056
This assumes that all the elements in the tree being built are new.
2058
This doesn't add anything to a branch.
2060
:type shape: list or tuple.
2061
:param line_endings: Either 'binary' or 'native'
2062
in binary mode, exact contents are written in native mode, the
2063
line endings match the default platform endings.
2064
:param transport: A transport to write to, for building trees on VFS's.
2065
If the transport is readonly or None, "." is opened automatically.
2068
if type(shape) not in (list, tuple):
2069
raise AssertionError("Parameter 'shape' should be "
2070
"a list or a tuple. Got %r instead" % (shape,))
2071
# It's OK to just create them using forward slashes on windows.
2072
if transport is None or transport.is_readonly():
2073
transport = get_transport(".")
2075
self.assert_(isinstance(name, basestring))
2077
transport.mkdir(urlutils.escape(name[:-1]))
2079
if line_endings == 'binary':
2081
elif line_endings == 'native':
2084
raise errors.BzrError(
2085
'Invalid line ending request %r' % line_endings)
2086
content = "contents of %s%s" % (name.encode('utf-8'), end)
2087
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2089
def build_tree_contents(self, shape):
2090
build_tree_contents(shape)
2092
def assertFileEqual(self, content, path):
2093
"""Fail if path does not contain 'content'."""
2094
self.failUnlessExists(path)
2095
f = file(path, 'rb')
2100
self.assertEqualDiff(content, s)
2102
def failUnlessExists(self, path):
2103
"""Fail unless path or paths, which may be abs or relative, exist."""
2104
if not isinstance(path, basestring):
2106
self.failUnlessExists(p)
2108
self.failUnless(osutils.lexists(path),path+" does not exist")
2110
def failIfExists(self, path):
2111
"""Fail if path or paths, which may be abs or relative, exist."""
2112
if not isinstance(path, basestring):
2114
self.failIfExists(p)
2116
self.failIf(osutils.lexists(path),path+" exists")
2118
def assertInWorkingTree(self, path, root_path='.', tree=None):
2119
"""Assert whether path or paths are in the WorkingTree"""
2121
tree = workingtree.WorkingTree.open(root_path)
2122
if not isinstance(path, basestring):
2124
self.assertInWorkingTree(p,tree=tree)
2126
self.assertIsNot(tree.path2id(path), None,
2127
path+' not in working tree.')
2129
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2130
"""Assert whether path or paths are not in the WorkingTree"""
2132
tree = workingtree.WorkingTree.open(root_path)
2133
if not isinstance(path, basestring):
2135
self.assertNotInWorkingTree(p,tree=tree)
2137
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2140
class TestCaseWithTransport(TestCaseInTempDir):
2141
"""A test case that provides get_url and get_readonly_url facilities.
2143
These back onto two transport servers, one for readonly access and one for
2146
If no explicit class is provided for readonly access, a
2147
ReadonlyTransportDecorator is used instead which allows the use of non disk
2148
based read write transports.
2150
If an explicit class is provided for readonly access, that server and the
2151
readwrite one must both define get_url() as resolving to os.getcwd().
2154
def get_vfs_only_server(self):
2155
"""See TestCaseWithMemoryTransport.
2157
This is useful for some tests with specific servers that need
2160
if self.__vfs_server is None:
2161
self.__vfs_server = self.vfs_transport_factory()
2162
self.__vfs_server.setUp()
2163
self.addCleanup(self.__vfs_server.tearDown)
2164
return self.__vfs_server
2166
def make_branch_and_tree(self, relpath, format=None):
2167
"""Create a branch on the transport and a tree locally.
2169
If the transport is not a LocalTransport, the Tree can't be created on
2170
the transport. In that case if the vfs_transport_factory is
2171
LocalURLServer the working tree is created in the local
2172
directory backing the transport, and the returned tree's branch and
2173
repository will also be accessed locally. Otherwise a lightweight
2174
checkout is created and returned.
2176
:param format: The BzrDirFormat.
2177
:returns: the WorkingTree.
2179
# TODO: always use the local disk path for the working tree,
2180
# this obviously requires a format that supports branch references
2181
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2183
b = self.make_branch(relpath, format=format)
2185
return b.bzrdir.create_workingtree()
2186
except errors.NotLocalUrl:
2187
# We can only make working trees locally at the moment. If the
2188
# transport can't support them, then we keep the non-disk-backed
2189
# branch and create a local checkout.
2190
if self.vfs_transport_factory is LocalURLServer:
2191
# the branch is colocated on disk, we cannot create a checkout.
2192
# hopefully callers will expect this.
2193
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2194
return local_controldir.create_workingtree()
2196
return b.create_checkout(relpath, lightweight=True)
2198
def assertIsDirectory(self, relpath, transport):
2199
"""Assert that relpath within transport is a directory.
2201
This may not be possible on all transports; in that case it propagates
2202
a TransportNotPossible.
2205
mode = transport.stat(relpath).st_mode
2206
except errors.NoSuchFile:
2207
self.fail("path %s is not a directory; no such file"
2209
if not stat.S_ISDIR(mode):
2210
self.fail("path %s is not a directory; has mode %#o"
2213
def assertTreesEqual(self, left, right):
2214
"""Check that left and right have the same content and properties."""
2215
# we use a tree delta to check for equality of the content, and we
2216
# manually check for equality of other things such as the parents list.
2217
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2218
differences = left.changes_from(right)
2219
self.assertFalse(differences.has_changed(),
2220
"Trees %r and %r are different: %r" % (left, right, differences))
2223
super(TestCaseWithTransport, self).setUp()
2224
self.__vfs_server = None
2227
class ChrootedTestCase(TestCaseWithTransport):
2228
"""A support class that provides readonly urls outside the local namespace.
2230
This is done by checking if self.transport_server is a MemoryServer. if it
2231
is then we are chrooted already, if it is not then an HttpServer is used
2234
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2235
be used without needed to redo it when a different
2236
subclass is in use ?
2240
super(ChrootedTestCase, self).setUp()
2241
if not self.vfs_transport_factory == MemoryServer:
2242
self.transport_readonly_server = HttpServer
2245
def condition_id_re(pattern):
2246
"""Create a condition filter which performs a re check on a test's id.
2248
:param pattern: A regular expression string.
2249
:return: A callable that returns True if the re matches.
2251
filter_re = re.compile(pattern)
2252
def condition(test):
2254
return filter_re.search(test_id)
2258
def condition_isinstance(klass_or_klass_list):
2259
"""Create a condition filter which returns isinstance(param, klass).
2261
:return: A callable which when called with one parameter obj return the
2262
result of isinstance(obj, klass_or_klass_list).
2265
return isinstance(obj, klass_or_klass_list)
2269
def condition_id_in_list(id_list):
2270
"""Create a condition filter which verify that test's id in a list.
2272
:param name: A TestIdList object.
2273
:return: A callable that returns True if the test's id appears in the list.
2275
def condition(test):
2276
return id_list.test_in(test.id())
2280
def exclude_tests_by_condition(suite, condition):
2281
"""Create a test suite which excludes some tests from suite.
2283
:param suite: The suite to get tests from.
2284
:param condition: A callable whose result evaluates True when called with a
2285
test case which should be excluded from the result.
2286
:return: A suite which contains the tests found in suite that fail
2290
for test in iter_suite_tests(suite):
2291
if not condition(test):
2293
return TestUtil.TestSuite(result)
2296
def filter_suite_by_condition(suite, condition):
2297
"""Create a test suite by filtering another one.
2299
:param suite: The source suite.
2300
:param condition: A callable whose result evaluates True when called with a
2301
test case which should be included in the result.
2302
:return: A suite which contains the tests found in suite that pass
2306
for test in iter_suite_tests(suite):
2309
return TestUtil.TestSuite(result)
2312
def filter_suite_by_re(suite, pattern, exclude_pattern=DEPRECATED_PARAMETER,
2313
random_order=DEPRECATED_PARAMETER):
2314
"""Create a test suite by filtering another one.
2316
:param suite: the source suite
2317
:param pattern: pattern that names must match
2318
:param exclude_pattern: A pattern that names must not match. This parameter
2319
is deprecated as of bzrlib 1.0. Please use the separate function
2320
exclude_tests_by_re instead.
2321
:param random_order: If True, tests in the new suite will be put in
2322
random order. This parameter is deprecated as of bzrlib 1.0. Please
2323
use the separate function randomize_suite instead.
2324
:returns: the newly created suite
2326
if deprecated_passed(exclude_pattern):
2327
symbol_versioning.warn(
2328
one_zero % "passing exclude_pattern to filter_suite_by_re",
2329
DeprecationWarning, stacklevel=2)
2330
if exclude_pattern is not None:
2331
suite = exclude_tests_by_re(suite, exclude_pattern)
2332
condition = condition_id_re(pattern)
2333
result_suite = filter_suite_by_condition(suite, condition)
2334
if deprecated_passed(random_order):
2335
symbol_versioning.warn(
2336
one_zero % "passing random_order to filter_suite_by_re",
2337
DeprecationWarning, stacklevel=2)
2339
result_suite = randomize_suite(result_suite)
2343
def filter_suite_by_id_list(suite, test_id_list):
2344
"""Create a test suite by filtering another one.
2346
:param suite: The source suite.
2347
:param test_id_list: A list of the test ids to keep as strings.
2348
:returns: the newly created suite
2350
condition = condition_id_in_list(test_id_list)
2351
result_suite = filter_suite_by_condition(suite, condition)
2355
def exclude_tests_by_re(suite, pattern):
2356
"""Create a test suite which excludes some tests from suite.
2358
:param suite: The suite to get tests from.
2359
:param pattern: A regular expression string. Test ids that match this
2360
pattern will be excluded from the result.
2361
:return: A TestSuite that contains all the tests from suite without the
2362
tests that matched pattern. The order of tests is the same as it was in
2365
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2368
def preserve_input(something):
2369
"""A helper for performing test suite transformation chains.
2371
:param something: Anything you want to preserve.
2377
def randomize_suite(suite):
2378
"""Return a new TestSuite with suite's tests in random order.
2380
The tests in the input suite are flattened into a single suite in order to
2381
accomplish this. Any nested TestSuites are removed to provide global
2384
tests = list(iter_suite_tests(suite))
2385
random.shuffle(tests)
2386
return TestUtil.TestSuite(tests)
2389
@deprecated_function(one_zero)
2390
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2391
random_order=False, append_rest=True):
2392
"""DEPRECATED: Create a test suite by sorting another one.
2394
This method has been decomposed into separate helper methods that should be
2396
- filter_suite_by_re
2397
- exclude_tests_by_re
2401
:param suite: the source suite
2402
:param pattern: pattern that names must match in order to go
2403
first in the new suite
2404
:param exclude_pattern: pattern that names must not match, if any
2405
:param random_order: if True, tests in the new suite will be put in
2406
random order (with all tests matching pattern
2408
:param append_rest: if False, pattern is a strict filter and not
2409
just an ordering directive
2410
:returns: the newly created suite
2412
if exclude_pattern is not None:
2413
suite = exclude_tests_by_re(suite, exclude_pattern)
2415
order_changer = randomize_suite
2417
order_changer = preserve_input
2419
suites = map(order_changer, split_suite_by_re(suite, pattern))
2420
return TestUtil.TestSuite(suites)
2422
return order_changer(filter_suite_by_re(suite, pattern))
2425
def split_suite_by_re(suite, pattern):
2426
"""Split a test suite into two by a regular expression.
2428
:param suite: The suite to split.
2429
:param pattern: A regular expression string. Test ids that match this
2430
pattern will be in the first test suite returned, and the others in the
2431
second test suite returned.
2432
:return: A tuple of two test suites, where the first contains tests from
2433
suite matching pattern, and the second contains the remainder from
2434
suite. The order within each output suite is the same as it was in
2439
filter_re = re.compile(pattern)
2440
for test in iter_suite_tests(suite):
2442
if filter_re.search(test_id):
2443
matched.append(test)
2445
did_not_match.append(test)
2446
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2449
def run_suite(suite, name='test', verbose=False, pattern=".*",
2450
stop_on_failure=False,
2451
transport=None, lsprof_timed=None, bench_history=None,
2452
matching_tests_first=None,
2455
exclude_pattern=None,
2457
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2462
runner = TextTestRunner(stream=sys.stdout,
2464
verbosity=verbosity,
2465
bench_history=bench_history,
2466
list_only=list_only,
2468
runner.stop_on_failure=stop_on_failure
2469
# Initialise the random number generator and display the seed used.
2470
# We convert the seed to a long to make it reuseable across invocations.
2471
random_order = False
2472
if random_seed is not None:
2474
if random_seed == "now":
2475
random_seed = long(time.time())
2477
# Convert the seed to a long if we can
2479
random_seed = long(random_seed)
2482
runner.stream.writeln("Randomizing test order using seed %s\n" %
2484
random.seed(random_seed)
2485
# Customise the list of tests if requested
2486
if exclude_pattern is not None:
2487
suite = exclude_tests_by_re(suite, exclude_pattern)
2489
order_changer = randomize_suite
2491
order_changer = preserve_input
2492
if pattern != '.*' or random_order:
2493
if matching_tests_first:
2494
suites = map(order_changer, split_suite_by_re(suite, pattern))
2495
suite = TestUtil.TestSuite(suites)
2497
suite = order_changer(filter_suite_by_re(suite, pattern))
2499
result = runner.run(suite)
2502
return result.wasStrictlySuccessful()
2504
return result.wasSuccessful()
2507
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2509
test_suite_factory=None,
2512
matching_tests_first=None,
2515
exclude_pattern=None,
2519
"""Run the whole test suite under the enhanced runner"""
2520
# XXX: Very ugly way to do this...
2521
# Disable warning about old formats because we don't want it to disturb
2522
# any blackbox tests.
2523
from bzrlib import repository
2524
repository._deprecation_warning_done = True
2526
global default_transport
2527
if transport is None:
2528
transport = default_transport
2529
old_transport = default_transport
2530
default_transport = transport
2532
if load_list is None:
2535
keep_only = load_test_id_list(load_list)
2536
if test_suite_factory is None:
2537
suite = test_suite(keep_only)
2539
suite = test_suite_factory()
2540
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2541
stop_on_failure=stop_on_failure,
2542
transport=transport,
2543
lsprof_timed=lsprof_timed,
2544
bench_history=bench_history,
2545
matching_tests_first=matching_tests_first,
2546
list_only=list_only,
2547
random_seed=random_seed,
2548
exclude_pattern=exclude_pattern,
2551
default_transport = old_transport
2554
def load_test_id_list(file_name):
2555
"""Load a test id list from a text file.
2557
The format is one test id by line. No special care is taken to impose
2558
strict rules, these test ids are used to filter the test suite so a test id
2559
that do not match an existing test will do no harm. This allows user to add
2560
comments, leave blank lines, etc.
2564
ftest = open(file_name, 'rt')
2566
if e.errno != errno.ENOENT:
2569
raise errors.NoSuchFile(file_name)
2571
for test_name in ftest.readlines():
2572
test_list.append(test_name.strip())
2577
class TestIdList(object):
2578
"""Test id list to filter a test suite.
2580
Relying on the assumption that test ids are built as:
2581
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2582
notation, this class offers methods to :
2583
- avoid building a test suite for modules not refered to in the test list,
2584
- keep only the tests listed from the module test suite.
2587
def __init__(self, test_id_list):
2588
# When a test suite needs to be filtered against us we compare test ids
2589
# for equality, so a simple dict offers a quick and simple solution.
2590
self.tests = dict().fromkeys(test_id_list, True)
2592
# While unittest.TestCase have ids like:
2593
# <module>.<class>.<method>[(<param+)],
2594
# doctest.DocTestCase can have ids like:
2597
# <module>.<function>
2598
# <module>.<class>.<method>
2600
# Since we can't predict a test class from its name only, we settle on
2601
# a simple constraint: a test id always begins with its module name.
2604
for test_id in test_id_list:
2605
parts = test_id.split('.')
2606
mod_name = parts.pop(0)
2607
modules[mod_name] = True
2609
mod_name += '.' + part
2610
modules[mod_name] = True
2611
self.modules = modules
2613
def is_module_name_used(self, module_name):
2614
"""Is there tests for the module or one of its sub modules."""
2615
return self.modules.has_key(module_name)
2617
def test_in(self, test_id):
2618
return self.tests.has_key(test_id)
2621
def test_suite(keep_only=None):
2622
"""Build and return TestSuite for the whole of bzrlib.
2624
:param keep_only: A list of test ids limiting the suite returned.
2626
This function can be replaced if you need to change the default test
2627
suite on a global basis, but it is not encouraged.
2630
'bzrlib.util.tests.test_bencode',
2631
'bzrlib.tests.test__dirstate_helpers',
2632
'bzrlib.tests.test_ancestry',
2633
'bzrlib.tests.test_annotate',
2634
'bzrlib.tests.test_api',
2635
'bzrlib.tests.test_atomicfile',
2636
'bzrlib.tests.test_bad_files',
2637
'bzrlib.tests.test_bisect_multi',
2638
'bzrlib.tests.test_branch',
2639
'bzrlib.tests.test_branchbuilder',
2640
'bzrlib.tests.test_bugtracker',
2641
'bzrlib.tests.test_bundle',
2642
'bzrlib.tests.test_bzrdir',
2643
'bzrlib.tests.test_cache_utf8',
2644
'bzrlib.tests.test_commands',
2645
'bzrlib.tests.test_commit',
2646
'bzrlib.tests.test_commit_merge',
2647
'bzrlib.tests.test_config',
2648
'bzrlib.tests.test_conflicts',
2649
'bzrlib.tests.test_counted_lock',
2650
'bzrlib.tests.test_decorators',
2651
'bzrlib.tests.test_delta',
2652
'bzrlib.tests.test_deprecated_graph',
2653
'bzrlib.tests.test_diff',
2654
'bzrlib.tests.test_dirstate',
2655
'bzrlib.tests.test_email_message',
2656
'bzrlib.tests.test_errors',
2657
'bzrlib.tests.test_escaped_store',
2658
'bzrlib.tests.test_extract',
2659
'bzrlib.tests.test_fetch',
2660
'bzrlib.tests.test_ftp_transport',
2661
'bzrlib.tests.test_generate_docs',
2662
'bzrlib.tests.test_generate_ids',
2663
'bzrlib.tests.test_globbing',
2664
'bzrlib.tests.test_gpg',
2665
'bzrlib.tests.test_graph',
2666
'bzrlib.tests.test_hashcache',
2667
'bzrlib.tests.test_help',
2668
'bzrlib.tests.test_hooks',
2669
'bzrlib.tests.test_http',
2670
'bzrlib.tests.test_http_implementations',
2671
'bzrlib.tests.test_http_response',
2672
'bzrlib.tests.test_https_ca_bundle',
2673
'bzrlib.tests.test_identitymap',
2674
'bzrlib.tests.test_ignores',
2675
'bzrlib.tests.test_index',
2676
'bzrlib.tests.test_info',
2677
'bzrlib.tests.test_inv',
2678
'bzrlib.tests.test_knit',
2679
'bzrlib.tests.test_lazy_import',
2680
'bzrlib.tests.test_lazy_regex',
2681
'bzrlib.tests.test_lockdir',
2682
'bzrlib.tests.test_lockable_files',
2683
'bzrlib.tests.test_log',
2684
'bzrlib.tests.test_lsprof',
2685
'bzrlib.tests.test_lru_cache',
2686
'bzrlib.tests.test_mail_client',
2687
'bzrlib.tests.test_memorytree',
2688
'bzrlib.tests.test_merge',
2689
'bzrlib.tests.test_merge3',
2690
'bzrlib.tests.test_merge_core',
2691
'bzrlib.tests.test_merge_directive',
2692
'bzrlib.tests.test_missing',
2693
'bzrlib.tests.test_msgeditor',
2694
'bzrlib.tests.test_multiparent',
2695
'bzrlib.tests.test_nonascii',
2696
'bzrlib.tests.test_options',
2697
'bzrlib.tests.test_osutils',
2698
'bzrlib.tests.test_osutils_encodings',
2699
'bzrlib.tests.test_pack',
2700
'bzrlib.tests.test_patch',
2701
'bzrlib.tests.test_patches',
2702
'bzrlib.tests.test_permissions',
2703
'bzrlib.tests.test_plugins',
2704
'bzrlib.tests.test_progress',
2705
'bzrlib.tests.test_reconfigure',
2706
'bzrlib.tests.test_reconcile',
2707
'bzrlib.tests.test_registry',
2708
'bzrlib.tests.test_remote',
2709
'bzrlib.tests.test_repository',
2710
'bzrlib.tests.test_revert',
2711
'bzrlib.tests.test_revision',
2712
'bzrlib.tests.test_revisionnamespaces',
2713
'bzrlib.tests.test_revisiontree',
2714
'bzrlib.tests.test_rio',
2715
'bzrlib.tests.test_sampler',
2716
'bzrlib.tests.test_selftest',
2717
'bzrlib.tests.test_setup',
2718
'bzrlib.tests.test_sftp_transport',
2719
'bzrlib.tests.test_smart',
2720
'bzrlib.tests.test_smart_add',
2721
'bzrlib.tests.test_smart_transport',
2722
'bzrlib.tests.test_smtp_connection',
2723
'bzrlib.tests.test_source',
2724
'bzrlib.tests.test_ssh_transport',
2725
'bzrlib.tests.test_status',
2726
'bzrlib.tests.test_store',
2727
'bzrlib.tests.test_strace',
2728
'bzrlib.tests.test_subsume',
2729
'bzrlib.tests.test_switch',
2730
'bzrlib.tests.test_symbol_versioning',
2731
'bzrlib.tests.test_tag',
2732
'bzrlib.tests.test_testament',
2733
'bzrlib.tests.test_textfile',
2734
'bzrlib.tests.test_textmerge',
2735
'bzrlib.tests.test_timestamp',
2736
'bzrlib.tests.test_trace',
2737
'bzrlib.tests.test_transactions',
2738
'bzrlib.tests.test_transform',
2739
'bzrlib.tests.test_transport',
2740
'bzrlib.tests.test_tree',
2741
'bzrlib.tests.test_treebuilder',
2742
'bzrlib.tests.test_tsort',
2743
'bzrlib.tests.test_tuned_gzip',
2744
'bzrlib.tests.test_ui',
2745
'bzrlib.tests.test_upgrade',
2746
'bzrlib.tests.test_urlutils',
2747
'bzrlib.tests.test_versionedfile',
2748
'bzrlib.tests.test_version',
2749
'bzrlib.tests.test_version_info',
2750
'bzrlib.tests.test_weave',
2751
'bzrlib.tests.test_whitebox',
2752
'bzrlib.tests.test_win32utils',
2753
'bzrlib.tests.test_workingtree',
2754
'bzrlib.tests.test_workingtree_4',
2755
'bzrlib.tests.test_wsgi',
2756
'bzrlib.tests.test_xml',
2758
test_transport_implementations = [
2759
'bzrlib.tests.test_transport_implementations',
2760
'bzrlib.tests.test_read_bundle',
2762
suite = TestUtil.TestSuite()
2763
loader = TestUtil.TestLoader()
2765
if keep_only is not None:
2766
id_filter = TestIdList(keep_only)
2768
# modules building their suite with loadTestsFromModuleNames
2769
if keep_only is None:
2770
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2772
for mod in [m for m in testmod_names
2773
if id_filter.is_module_name_used(m)]:
2774
mod_suite = loader.loadTestsFromModuleNames([mod])
2775
mod_suite = filter_suite_by_id_list(mod_suite, id_filter)
2776
suite.addTest(mod_suite)
2778
# modules adapted for transport implementations
2779
from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2780
adapter = TransportTestProviderAdapter()
2781
if keep_only is None:
2782
adapt_modules(test_transport_implementations, adapter, loader, suite)
2784
for mod in [m for m in test_transport_implementations
2785
if id_filter.is_module_name_used(m)]:
2786
mod_suite = TestUtil.TestSuite()
2787
adapt_modules([mod], adapter, loader, mod_suite)
2788
mod_suite = filter_suite_by_id_list(mod_suite, id_filter)
2789
suite.addTest(mod_suite)
2791
# modules defining their own test_suite()
2792
for package in [p for p in packages_to_test()
2793
if (keep_only is None
2794
or id_filter.is_module_name_used(p.__name__))]:
2795
pack_suite = package.test_suite()
2796
if keep_only is not None:
2797
pack_suite = filter_suite_by_id_list(pack_suite, id_filter)
2798
suite.addTest(pack_suite)
2800
# XXX: MODULES_TO_TEST should be obsoleted ?
2801
for mod in [m for m in MODULES_TO_TEST
2802
if keep_only is None or id_filter.is_module_name_used(m)]:
2803
mod_suite = loader.loadTestsFromModule(mod)
2804
if keep_only is not None:
2805
mod_suite = filter_suite_by_id_list(mod_suite, id_filter)
2806
suite.addTest(mod_suite)
2808
for mod in MODULES_TO_DOCTEST:
2810
doc_suite = doctest.DocTestSuite(mod)
2811
except ValueError, e:
2812
print '**failed to get doctest for: %s\n%s' % (mod, e)
2814
if keep_only is not None:
2815
# DocTests may use ids which doesn't contain the module name
2816
doc_suite = filter_suite_by_id_list(doc_suite, id_filter)
2817
suite.addTest(doc_suite)
2819
default_encoding = sys.getdefaultencoding()
2820
for name, plugin in [(n, p) for (n, p) in bzrlib.plugin.plugins().items()
2821
if (keep_only is None
2822
or id_filter.is_module_name_used(
2823
p.module.__name__))]:
2825
plugin_suite = plugin.test_suite()
2826
except ImportError, e:
2827
bzrlib.trace.warning(
2828
'Unable to test plugin "%s": %s', name, e)
2830
if plugin_suite is not None:
2831
if keep_only is not None:
2832
plugin_suite = filter_suite_by_id_list(plugin_suite,
2834
suite.addTest(plugin_suite)
2835
if default_encoding != sys.getdefaultencoding():
2836
bzrlib.trace.warning(
2837
'Plugin "%s" tried to reset default encoding to: %s', name,
2838
sys.getdefaultencoding())
2840
sys.setdefaultencoding(default_encoding)
2844
def multiply_tests_from_modules(module_name_list, scenario_iter):
2845
"""Adapt all tests in some given modules to given scenarios.
2847
This is the recommended public interface for test parameterization.
2848
Typically the test_suite() method for a per-implementation test
2849
suite will call multiply_tests_from_modules and return the
2852
:param module_name_list: List of fully-qualified names of test
2854
:param scenario_iter: Iterable of pairs of (scenario_name,
2855
scenario_param_dict).
2857
This returns a new TestSuite containing the cross product of
2858
all the tests in all the modules, each repeated for each scenario.
2859
Each test is adapted by adding the scenario name at the end
2860
of its name, and updating the test object's __dict__ with the
2861
scenario_param_dict.
2863
>>> r = multiply_tests_from_modules(
2864
... ['bzrlib.tests.test_sampler'],
2865
... [('one', dict(param=1)),
2866
... ('two', dict(param=2))])
2867
>>> tests = list(iter_suite_tests(r))
2871
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
2877
loader = TestLoader()
2879
adapter = TestScenarioApplier()
2880
adapter.scenarios = list(scenario_iter)
2881
adapt_modules(module_name_list, adapter, loader, suite)
2885
def multiply_scenarios(scenarios_left, scenarios_right):
2886
"""Multiply two sets of scenarios.
2888
:returns: the cartesian product of the two sets of scenarios, that is
2889
a scenario for every possible combination of a left scenario and a
2893
('%s,%s' % (left_name, right_name),
2894
dict(left_dict.items() + right_dict.items()))
2895
for left_name, left_dict in scenarios_left
2896
for right_name, right_dict in scenarios_right]
2900
def adapt_modules(mods_list, adapter, loader, suite):
2901
"""Adapt the modules in mods_list using adapter and add to suite."""
2902
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2903
suite.addTests(adapter.adapt(test))
2906
def adapt_tests(tests_list, adapter, loader, suite):
2907
"""Adapt the tests in tests_list using adapter and add to suite."""
2908
for test in tests_list:
2909
suite.addTests(adapter.adapt(loader.loadTestsFromName(test)))
2912
def _rmtree_temp_dir(dirname):
2913
# If LANG=C we probably have created some bogus paths
2914
# which rmtree(unicode) will fail to delete
2915
# so make sure we are using rmtree(str) to delete everything
2916
# except on win32, where rmtree(str) will fail
2917
# since it doesn't have the property of byte-stream paths
2918
# (they are either ascii or mbcs)
2919
if sys.platform == 'win32':
2920
# make sure we are using the unicode win32 api
2921
dirname = unicode(dirname)
2923
dirname = dirname.encode(sys.getfilesystemencoding())
2925
osutils.rmtree(dirname)
2927
if sys.platform == 'win32' and e.errno == errno.EACCES:
2928
sys.stderr.write(('Permission denied: '
2929
'unable to remove testing dir '
2930
'%s\n' % os.path.basename(dirname)))
2935
class Feature(object):
2936
"""An operating system Feature."""
2939
self._available = None
2941
def available(self):
2942
"""Is the feature available?
2944
:return: True if the feature is available.
2946
if self._available is None:
2947
self._available = self._probe()
2948
return self._available
2951
"""Implement this method in concrete features.
2953
:return: True if the feature is available.
2955
raise NotImplementedError
2958
if getattr(self, 'feature_name', None):
2959
return self.feature_name()
2960
return self.__class__.__name__
2963
class _SymlinkFeature(Feature):
2966
return osutils.has_symlinks()
2968
def feature_name(self):
2971
SymlinkFeature = _SymlinkFeature()
2974
class _OsFifoFeature(Feature):
2977
return getattr(os, 'mkfifo', None)
2979
def feature_name(self):
2980
return 'filesystem fifos'
2982
OsFifoFeature = _OsFifoFeature()
2985
class TestScenarioApplier(object):
2986
"""A tool to apply scenarios to tests."""
2988
def adapt(self, test):
2989
"""Return a TestSuite containing a copy of test for each scenario."""
2990
result = unittest.TestSuite()
2991
for scenario in self.scenarios:
2992
result.addTest(self.adapt_test_to_scenario(test, scenario))
2995
def adapt_test_to_scenario(self, test, scenario):
2996
"""Copy test and apply scenario to it.
2998
:param test: A test to adapt.
2999
:param scenario: A tuple describing the scenarion.
3000
The first element of the tuple is the new test id.
3001
The second element is a dict containing attributes to set on the
3003
:return: The adapted test.
3005
from copy import deepcopy
3006
new_test = deepcopy(test)
3007
for name, value in scenario[1].items():
3008
setattr(new_test, name, value)
3009
new_id = "%s(%s)" % (new_test.id(), scenario[0])
3010
new_test.id = lambda: new_id
3014
def probe_unicode_in_user_encoding():
3015
"""Try to encode several unicode strings to use in unicode-aware tests.
3016
Return first successfull match.
3018
:return: (unicode value, encoded plain string value) or (None, None)
3020
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3021
for uni_val in possible_vals:
3023
str_val = uni_val.encode(bzrlib.user_encoding)
3024
except UnicodeEncodeError:
3025
# Try a different character
3028
return uni_val, str_val
3032
def probe_bad_non_ascii(encoding):
3033
"""Try to find [bad] character with code [128..255]
3034
that cannot be decoded to unicode in some encoding.
3035
Return None if all non-ascii characters is valid
3038
for i in xrange(128, 256):
3041
char.decode(encoding)
3042
except UnicodeDecodeError:
3047
class _FTPServerFeature(Feature):
3048
"""Some tests want an FTP Server, check if one is available.
3050
Right now, the only way this is available is if 'medusa' is installed.
3051
http://www.amk.ca/python/code/medusa.html
3056
import bzrlib.tests.ftp_server
3061
def feature_name(self):
3064
FTPServerFeature = _FTPServerFeature()
3067
class _CaseInsensitiveFilesystemFeature(Feature):
3068
"""Check if underlined filesystem is case-insensitive
3069
(e.g. on Windows, Cygwin, MacOS)
3073
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3074
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3075
TestCaseWithMemoryTransport.TEST_ROOT = root
3077
root = TestCaseWithMemoryTransport.TEST_ROOT
3078
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3080
name_a = osutils.pathjoin(tdir, 'a')
3081
name_A = osutils.pathjoin(tdir, 'A')
3083
result = osutils.isdir(name_A)
3084
_rmtree_temp_dir(tdir)
3087
def feature_name(self):
3088
return 'case-insensitive filesystem'
3090
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()