1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
from cStringIO import StringIO
38
from pprint import pformat
43
from subprocess import Popen, PIPE, STDOUT
67
import bzrlib.commands
68
import bzrlib.timestamp
70
import bzrlib.inventory
71
import bzrlib.iterablefile
76
# lsprof not available
78
from bzrlib.merge import merge_inner
81
from bzrlib.smart import client, request, server
83
from bzrlib import symbol_versioning
84
from bzrlib.symbol_versioning import (
91
from bzrlib.transport import get_transport
92
import bzrlib.transport
93
from bzrlib.transport.local import LocalURLServer
94
from bzrlib.transport.memory import MemoryServer
95
from bzrlib.transport.readonly import ReadonlyServer
96
from bzrlib.trace import mutter, note
97
from bzrlib.tests import TestUtil
98
from bzrlib.tests.http_server import HttpServer
99
from bzrlib.tests.TestUtil import (
103
from bzrlib.tests.treeshape import build_tree_contents
104
import bzrlib.version_info_formats.format_custom
105
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
107
# Mark this python module as being part of the implementation
108
# of unittest: this gives us better tracebacks where the last
109
# shown frame is the test code, not our assertXYZ.
112
default_transport = LocalURLServer
115
class ExtendedTestResult(unittest._TextTestResult):
116
"""Accepts, reports and accumulates the results of running tests.
118
Compared to the unittest version this class adds support for
119
profiling, benchmarking, stopping as soon as a test fails, and
120
skipping tests. There are further-specialized subclasses for
121
different types of display.
123
When a test finishes, in whatever way, it calls one of the addSuccess,
124
addFailure or addError classes. These in turn may redirect to a more
125
specific case for the special test results supported by our extended
128
Note that just one of these objects is fed the results from many tests.
133
def __init__(self, stream, descriptions, verbosity,
138
"""Construct new TestResult.
140
:param bench_history: Optionally, a writable file object to accumulate
143
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
144
if bench_history is not None:
145
from bzrlib.version import _get_bzr_source_tree
146
src_tree = _get_bzr_source_tree()
149
revision_id = src_tree.get_parent_ids()[0]
151
# XXX: if this is a brand new tree, do the same as if there
155
# XXX: If there's no branch, what should we do?
157
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
158
self._bench_history = bench_history
159
self.ui = ui.ui_factory
160
self.num_tests = num_tests
162
self.failure_count = 0
163
self.known_failure_count = 0
165
self.not_applicable_count = 0
166
self.unsupported = {}
168
self._overall_start_time = time.time()
169
self._strict = strict
173
ok = self.wasStrictlySuccessful()
175
ok = self.wasSuccessful()
177
self.stream.write('tests passed\n')
179
self.stream.write('tests failed\n')
180
if TestCase._first_thread_leaker_id:
182
'%s is leaking threads among %d leaking tests.\n' % (
183
TestCase._first_thread_leaker_id,
184
TestCase._leaking_threads_tests))
186
def _extractBenchmarkTime(self, testCase):
187
"""Add a benchmark time for the current test case."""
188
return getattr(testCase, "_benchtime", None)
190
def _elapsedTestTimeString(self):
191
"""Return a time string for the overall time the current test has taken."""
192
return self._formatTime(time.time() - self._start_time)
194
def _testTimeString(self, testCase):
195
benchmark_time = self._extractBenchmarkTime(testCase)
196
if benchmark_time is not None:
198
self._formatTime(benchmark_time),
199
self._elapsedTestTimeString())
201
return " %s" % self._elapsedTestTimeString()
203
def _formatTime(self, seconds):
204
"""Format seconds as milliseconds with leading spaces."""
205
# some benchmarks can take thousands of seconds to run, so we need 8
207
return "%8dms" % (1000 * seconds)
209
def _shortened_test_description(self, test):
211
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
214
def startTest(self, test):
215
unittest.TestResult.startTest(self, test)
218
self.report_test_start(test)
219
test.number = self.count
220
self._recordTestStartTime()
222
def startTests(self):
224
'testing: %s\n' % (osutils.realpath(sys.argv[0]),))
226
' %s (%s python%s)\n' % (
228
bzrlib.version_string,
229
bzrlib._format_version_tuple(sys.version_info),
231
self.stream.write('\n')
233
def _recordTestStartTime(self):
234
"""Record that a test has started."""
235
self._start_time = time.time()
237
def _cleanupLogFile(self, test):
238
# We can only do this if we have one of our TestCases, not if
240
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
241
if setKeepLogfile is not None:
244
def addError(self, test, err):
245
"""Tell result that test finished with an error.
247
Called from the TestCase run() method when the test
248
fails with an unexpected error.
250
self._testConcluded(test)
251
if isinstance(err[1], TestNotApplicable):
252
return self._addNotApplicable(test, err)
253
elif isinstance(err[1], UnavailableFeature):
254
return self.addNotSupported(test, err[1].args[0])
256
unittest.TestResult.addError(self, test, err)
257
self.error_count += 1
258
self.report_error(test, err)
261
self._cleanupLogFile(test)
263
def addFailure(self, test, err):
264
"""Tell result that test failed.
266
Called from the TestCase run() method when the test
267
fails because e.g. an assert() method failed.
269
self._testConcluded(test)
270
if isinstance(err[1], KnownFailure):
271
return self._addKnownFailure(test, err)
273
unittest.TestResult.addFailure(self, test, err)
274
self.failure_count += 1
275
self.report_failure(test, err)
278
self._cleanupLogFile(test)
280
def addSuccess(self, test):
281
"""Tell result that test completed successfully.
283
Called from the TestCase run()
285
self._testConcluded(test)
286
if self._bench_history is not None:
287
benchmark_time = self._extractBenchmarkTime(test)
288
if benchmark_time is not None:
289
self._bench_history.write("%s %s\n" % (
290
self._formatTime(benchmark_time),
292
self.report_success(test)
293
self._cleanupLogFile(test)
294
unittest.TestResult.addSuccess(self, test)
295
test._log_contents = ''
297
def _testConcluded(self, test):
298
"""Common code when a test has finished.
300
Called regardless of whether it succeded, failed, etc.
304
def _addKnownFailure(self, test, err):
305
self.known_failure_count += 1
306
self.report_known_failure(test, err)
308
def addNotSupported(self, test, feature):
309
"""The test will not be run because of a missing feature.
311
# this can be called in two different ways: it may be that the
312
# test started running, and then raised (through addError)
313
# UnavailableFeature. Alternatively this method can be called
314
# while probing for features before running the tests; in that
315
# case we will see startTest and stopTest, but the test will never
317
self.unsupported.setdefault(str(feature), 0)
318
self.unsupported[str(feature)] += 1
319
self.report_unsupported(test, feature)
321
def addSkip(self, test, reason):
322
"""A test has not run for 'reason'."""
324
self.report_skip(test, reason)
326
def _addNotApplicable(self, test, skip_excinfo):
327
if isinstance(skip_excinfo[1], TestNotApplicable):
328
self.not_applicable_count += 1
329
self.report_not_applicable(test, skip_excinfo)
332
except KeyboardInterrupt:
335
self.addError(test, test.exc_info())
337
# seems best to treat this as success from point-of-view of unittest
338
# -- it actually does nothing so it barely matters :)
339
unittest.TestResult.addSuccess(self, test)
340
test._log_contents = ''
342
def printErrorList(self, flavour, errors):
343
for test, err in errors:
344
self.stream.writeln(self.separator1)
345
self.stream.write("%s: " % flavour)
346
self.stream.writeln(self.getDescription(test))
347
if getattr(test, '_get_log', None) is not None:
348
self.stream.write('\n')
350
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
351
self.stream.write('\n')
352
self.stream.write(test._get_log())
353
self.stream.write('\n')
355
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
356
self.stream.write('\n')
357
self.stream.writeln(self.separator2)
358
self.stream.writeln("%s" % err)
363
def report_cleaning_up(self):
366
def report_success(self, test):
369
def wasStrictlySuccessful(self):
370
if self.unsupported or self.known_failure_count:
372
return self.wasSuccessful()
375
class TextTestResult(ExtendedTestResult):
376
"""Displays progress and results of tests in text form"""
378
def __init__(self, stream, descriptions, verbosity,
384
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
385
bench_history, num_tests, strict)
387
self.pb = self.ui.nested_progress_bar()
388
self._supplied_pb = False
391
self._supplied_pb = True
392
self.pb.show_pct = False
393
self.pb.show_spinner = False
394
self.pb.show_eta = False,
395
self.pb.show_count = False
396
self.pb.show_bar = False
398
def report_starting(self):
399
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
401
def _progress_prefix_text(self):
402
# the longer this text, the less space we have to show the test
404
a = '[%d' % self.count # total that have been run
405
# tests skipped as known not to be relevant are not important enough
407
## if self.skip_count:
408
## a += ', %d skip' % self.skip_count
409
## if self.known_failure_count:
410
## a += '+%dX' % self.known_failure_count
411
if self.num_tests is not None:
412
a +='/%d' % self.num_tests
414
runtime = time.time() - self._overall_start_time
416
a += '%dm%ds' % (runtime / 60, runtime % 60)
420
a += ', %d err' % self.error_count
421
if self.failure_count:
422
a += ', %d fail' % self.failure_count
424
a += ', %d missing' % len(self.unsupported)
428
def report_test_start(self, test):
431
self._progress_prefix_text()
433
+ self._shortened_test_description(test))
435
def _test_description(self, test):
436
return self._shortened_test_description(test)
438
def report_error(self, test, err):
439
self.pb.note('ERROR: %s\n %s\n',
440
self._test_description(test),
444
def report_failure(self, test, err):
445
self.pb.note('FAIL: %s\n %s\n',
446
self._test_description(test),
450
def report_known_failure(self, test, err):
451
self.pb.note('XFAIL: %s\n%s\n',
452
self._test_description(test), err[1])
454
def report_skip(self, test, reason):
457
def report_not_applicable(self, test, skip_excinfo):
460
def report_unsupported(self, test, feature):
461
"""test cannot be run because feature is missing."""
463
def report_cleaning_up(self):
464
self.pb.update('Cleaning up')
467
if not self._supplied_pb:
471
class VerboseTestResult(ExtendedTestResult):
472
"""Produce long output, with one line per test run plus times"""
474
def _ellipsize_to_right(self, a_string, final_width):
475
"""Truncate and pad a string, keeping the right hand side"""
476
if len(a_string) > final_width:
477
result = '...' + a_string[3-final_width:]
480
return result.ljust(final_width)
482
def report_starting(self):
483
self.stream.write('running %d tests...\n' % self.num_tests)
485
def report_test_start(self, test):
487
name = self._shortened_test_description(test)
488
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
489
# numbers, plus a trailing blank
490
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
491
self.stream.write(self._ellipsize_to_right(name,
492
osutils.terminal_width()-30))
495
def _error_summary(self, err):
497
return '%s%s' % (indent, err[1])
499
def report_error(self, test, err):
500
self.stream.writeln('ERROR %s\n%s'
501
% (self._testTimeString(test),
502
self._error_summary(err)))
504
def report_failure(self, test, err):
505
self.stream.writeln(' FAIL %s\n%s'
506
% (self._testTimeString(test),
507
self._error_summary(err)))
509
def report_known_failure(self, test, err):
510
self.stream.writeln('XFAIL %s\n%s'
511
% (self._testTimeString(test),
512
self._error_summary(err)))
514
def report_success(self, test):
515
self.stream.writeln(' OK %s' % self._testTimeString(test))
516
for bench_called, stats in getattr(test, '_benchcalls', []):
517
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
518
stats.pprint(file=self.stream)
519
# flush the stream so that we get smooth output. This verbose mode is
520
# used to show the output in PQM.
523
def report_skip(self, test, reason):
524
self.stream.writeln(' SKIP %s\n%s'
525
% (self._testTimeString(test), reason))
527
def report_not_applicable(self, test, skip_excinfo):
528
self.stream.writeln(' N/A %s\n%s'
529
% (self._testTimeString(test),
530
self._error_summary(skip_excinfo)))
532
def report_unsupported(self, test, feature):
533
"""test cannot be run because feature is missing."""
534
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
535
%(self._testTimeString(test), feature))
538
class TextTestRunner(object):
539
stop_on_failure = False
549
self.stream = unittest._WritelnDecorator(stream)
550
self.descriptions = descriptions
551
self.verbosity = verbosity
552
self._bench_history = bench_history
553
self.list_only = list_only
554
self._strict = strict
557
"Run the given test case or test suite."
558
startTime = time.time()
559
if self.verbosity == 1:
560
result_class = TextTestResult
561
elif self.verbosity >= 2:
562
result_class = VerboseTestResult
563
result = result_class(self.stream,
566
bench_history=self._bench_history,
567
num_tests=test.countTestCases(),
570
result.stop_early = self.stop_on_failure
571
result.report_starting()
573
if self.verbosity >= 2:
574
self.stream.writeln("Listing tests only ...\n")
576
for t in iter_suite_tests(test):
577
self.stream.writeln("%s" % (t.id()))
586
if isinstance(test, testtools.ConcurrentTestSuite):
587
# We need to catch bzr specific behaviors
588
test.run(BZRTransformingResult(result))
591
run = result.testsRun
593
stopTime = time.time()
594
timeTaken = stopTime - startTime
596
self.stream.writeln(result.separator2)
597
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
598
run, run != 1 and "s" or "", timeTaken))
599
self.stream.writeln()
600
if not result.wasSuccessful():
601
self.stream.write("FAILED (")
602
failed, errored = map(len, (result.failures, result.errors))
604
self.stream.write("failures=%d" % failed)
606
if failed: self.stream.write(", ")
607
self.stream.write("errors=%d" % errored)
608
if result.known_failure_count:
609
if failed or errored: self.stream.write(", ")
610
self.stream.write("known_failure_count=%d" %
611
result.known_failure_count)
612
self.stream.writeln(")")
614
if result.known_failure_count:
615
self.stream.writeln("OK (known_failures=%d)" %
616
result.known_failure_count)
618
self.stream.writeln("OK")
619
if result.skip_count > 0:
620
skipped = result.skip_count
621
self.stream.writeln('%d test%s skipped' %
622
(skipped, skipped != 1 and "s" or ""))
623
if result.unsupported:
624
for feature, count in sorted(result.unsupported.items()):
625
self.stream.writeln("Missing feature '%s' skipped %d tests." %
631
def iter_suite_tests(suite):
632
"""Return all tests in a suite, recursing through nested suites"""
633
if isinstance(suite, unittest.TestCase):
635
elif isinstance(suite, unittest.TestSuite):
637
for r in iter_suite_tests(item):
640
raise Exception('unknown type %r for object %r'
641
% (type(suite), suite))
644
class TestSkipped(Exception):
645
"""Indicates that a test was intentionally skipped, rather than failing."""
648
class TestNotApplicable(TestSkipped):
649
"""A test is not applicable to the situation where it was run.
651
This is only normally raised by parameterized tests, if they find that
652
the instance they're constructed upon does not support one aspect
657
class KnownFailure(AssertionError):
658
"""Indicates that a test failed in a precisely expected manner.
660
Such failures dont block the whole test suite from passing because they are
661
indicators of partially completed code or of future work. We have an
662
explicit error for them so that we can ensure that they are always visible:
663
KnownFailures are always shown in the output of bzr selftest.
667
class UnavailableFeature(Exception):
668
"""A feature required for this test was not available.
670
The feature should be used to construct the exception.
674
class CommandFailed(Exception):
678
class StringIOWrapper(object):
679
"""A wrapper around cStringIO which just adds an encoding attribute.
681
Internally we can check sys.stdout to see what the output encoding
682
should be. However, cStringIO has no encoding attribute that we can
683
set. So we wrap it instead.
688
def __init__(self, s=None):
690
self.__dict__['_cstring'] = StringIO(s)
692
self.__dict__['_cstring'] = StringIO()
694
def __getattr__(self, name, getattr=getattr):
695
return getattr(self.__dict__['_cstring'], name)
697
def __setattr__(self, name, val):
698
if name == 'encoding':
699
self.__dict__['encoding'] = val
701
return setattr(self._cstring, name, val)
704
class TestUIFactory(ui.CLIUIFactory):
705
"""A UI Factory for testing.
707
Hide the progress bar but emit note()s.
709
Allows get_password to be tested without real tty attached.
712
def __init__(self, stdout=None, stderr=None, stdin=None):
713
if stdin is not None:
714
# We use a StringIOWrapper to be able to test various
715
# encodings, but the user is still responsible to
716
# encode the string and to set the encoding attribute
717
# of StringIOWrapper.
718
stdin = StringIOWrapper(stdin)
719
super(TestUIFactory, self).__init__(stdin, stdout, stderr)
722
"""See progress.ProgressBar.clear()."""
724
def clear_term(self):
725
"""See progress.ProgressBar.clear_term()."""
728
"""See progress.ProgressBar.finished()."""
730
def note(self, fmt_string, *args, **kwargs):
731
"""See progress.ProgressBar.note()."""
732
self.stdout.write((fmt_string + "\n") % args)
734
def progress_bar(self):
737
def nested_progress_bar(self):
740
def update(self, message, count=None, total=None):
741
"""See progress.ProgressBar.update()."""
743
def get_non_echoed_password(self):
744
"""Get password from stdin without trying to handle the echo mode"""
745
password = self.stdin.readline()
748
if password[-1] == '\n':
749
password = password[:-1]
753
class TestCase(unittest.TestCase):
754
"""Base class for bzr unit tests.
756
Tests that need access to disk resources should subclass
757
TestCaseInTempDir not TestCase.
759
Error and debug log messages are redirected from their usual
760
location into a temporary file, the contents of which can be
761
retrieved by _get_log(). We use a real OS file, not an in-memory object,
762
so that it can also capture file IO. When the test completes this file
763
is read into memory and removed from disk.
765
There are also convenience functions to invoke bzr's command-line
766
routine, and to build and check bzr trees.
768
In addition to the usual method of overriding tearDown(), this class also
769
allows subclasses to register functions into the _cleanups list, which is
770
run in order as the object is torn down. It's less likely this will be
771
accidentally overlooked.
774
_active_threads = None
775
_leaking_threads_tests = 0
776
_first_thread_leaker_id = None
777
_log_file_name = None
779
_keep_log_file = False
780
# record lsprof data when performing benchmark calls.
781
_gather_lsprof_in_benchmarks = False
782
attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
783
'_log_contents', '_log_file_name', '_benchtime',
784
'_TestCase__testMethodName')
786
def __init__(self, methodName='testMethod'):
787
super(TestCase, self).__init__(methodName)
789
self._bzr_test_setUp_run = False
790
self._bzr_test_tearDown_run = False
793
unittest.TestCase.setUp(self)
794
self._bzr_test_setUp_run = True
795
self._cleanEnvironment()
798
self._benchcalls = []
799
self._benchtime = None
801
self._clear_debug_flags()
802
TestCase._active_threads = threading.activeCount()
803
self.addCleanup(self._check_leaked_threads)
808
pdb.Pdb().set_trace(sys._getframe().f_back)
810
def _check_leaked_threads(self):
811
active = threading.activeCount()
812
leaked_threads = active - TestCase._active_threads
813
TestCase._active_threads = active
815
TestCase._leaking_threads_tests += 1
816
if TestCase._first_thread_leaker_id is None:
817
TestCase._first_thread_leaker_id = self.id()
819
def _clear_debug_flags(self):
820
"""Prevent externally set debug flags affecting tests.
822
Tests that want to use debug flags can just set them in the
823
debug_flags set during setup/teardown.
825
self._preserved_debug_flags = set(debug.debug_flags)
826
if 'allow_debug' not in selftest_debug_flags:
827
debug.debug_flags.clear()
828
self.addCleanup(self._restore_debug_flags)
830
def _clear_hooks(self):
831
# prevent hooks affecting tests
832
self._preserved_hooks = {}
833
for key, factory in hooks.known_hooks.items():
834
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
835
current_hooks = hooks.known_hooks_key_to_object(key)
836
self._preserved_hooks[parent] = (name, current_hooks)
837
self.addCleanup(self._restoreHooks)
838
for key, factory in hooks.known_hooks.items():
839
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
840
setattr(parent, name, factory())
841
# this hook should always be installed
842
request._install_hook()
844
def _silenceUI(self):
845
"""Turn off UI for duration of test"""
846
# by default the UI is off; tests can turn it on if they want it.
847
saved = ui.ui_factory
849
ui.ui_factory = saved
850
ui.ui_factory = ui.SilentUIFactory()
851
self.addCleanup(_restore)
853
def _ndiff_strings(self, a, b):
854
"""Return ndiff between two strings containing lines.
856
A trailing newline is added if missing to make the strings
858
if b and b[-1] != '\n':
860
if a and a[-1] != '\n':
862
difflines = difflib.ndiff(a.splitlines(True),
864
linejunk=lambda x: False,
865
charjunk=lambda x: False)
866
return ''.join(difflines)
868
def assertEqual(self, a, b, message=''):
872
except UnicodeError, e:
873
# If we can't compare without getting a UnicodeError, then
874
# obviously they are different
875
mutter('UnicodeError: %s', e)
878
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
880
pformat(a), pformat(b)))
882
assertEquals = assertEqual
884
def assertEqualDiff(self, a, b, message=None):
885
"""Assert two texts are equal, if not raise an exception.
887
This is intended for use with multi-line strings where it can
888
be hard to find the differences by eye.
890
# TODO: perhaps override assertEquals to call this for strings?
894
message = "texts not equal:\n"
896
message = 'first string is missing a final newline.\n'
898
message = 'second string is missing a final newline.\n'
899
raise AssertionError(message +
900
self._ndiff_strings(a, b))
902
def assertEqualMode(self, mode, mode_test):
903
self.assertEqual(mode, mode_test,
904
'mode mismatch %o != %o' % (mode, mode_test))
906
def assertEqualStat(self, expected, actual):
907
"""assert that expected and actual are the same stat result.
909
:param expected: A stat result.
910
:param actual: A stat result.
911
:raises AssertionError: If the expected and actual stat values differ
914
self.assertEqual(expected.st_size, actual.st_size)
915
self.assertEqual(expected.st_mtime, actual.st_mtime)
916
self.assertEqual(expected.st_ctime, actual.st_ctime)
917
self.assertEqual(expected.st_dev, actual.st_dev)
918
self.assertEqual(expected.st_ino, actual.st_ino)
919
self.assertEqual(expected.st_mode, actual.st_mode)
921
def assertLength(self, length, obj_with_len):
922
"""Assert that obj_with_len is of length length."""
923
if len(obj_with_len) != length:
924
self.fail("Incorrect length: wanted %d, got %d for %r" % (
925
length, len(obj_with_len), obj_with_len))
927
def assertPositive(self, val):
928
"""Assert that val is greater than 0."""
929
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
931
def assertNegative(self, val):
932
"""Assert that val is less than 0."""
933
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
935
def assertStartsWith(self, s, prefix):
936
if not s.startswith(prefix):
937
raise AssertionError('string %r does not start with %r' % (s, prefix))
939
def assertEndsWith(self, s, suffix):
940
"""Asserts that s ends with suffix."""
941
if not s.endswith(suffix):
942
raise AssertionError('string %r does not end with %r' % (s, suffix))
944
def assertContainsRe(self, haystack, needle_re, flags=0):
945
"""Assert that a contains something matching a regular expression."""
946
if not re.search(needle_re, haystack, flags):
947
if '\n' in haystack or len(haystack) > 60:
948
# a long string, format it in a more readable way
949
raise AssertionError(
950
'pattern "%s" not found in\n"""\\\n%s"""\n'
951
% (needle_re, haystack))
953
raise AssertionError('pattern "%s" not found in "%s"'
954
% (needle_re, haystack))
956
def assertNotContainsRe(self, haystack, needle_re, flags=0):
957
"""Assert that a does not match a regular expression"""
958
if re.search(needle_re, haystack, flags):
959
raise AssertionError('pattern "%s" found in "%s"'
960
% (needle_re, haystack))
962
def assertSubset(self, sublist, superlist):
963
"""Assert that every entry in sublist is present in superlist."""
964
missing = set(sublist) - set(superlist)
966
raise AssertionError("value(s) %r not present in container %r" %
967
(missing, superlist))
969
def assertListRaises(self, excClass, func, *args, **kwargs):
970
"""Fail unless excClass is raised when the iterator from func is used.
972
Many functions can return generators this makes sure
973
to wrap them in a list() call to make sure the whole generator
974
is run, and that the proper exception is raised.
977
list(func(*args, **kwargs))
981
if getattr(excClass,'__name__', None) is not None:
982
excName = excClass.__name__
984
excName = str(excClass)
985
raise self.failureException, "%s not raised" % excName
987
def assertRaises(self, excClass, callableObj, *args, **kwargs):
988
"""Assert that a callable raises a particular exception.
990
:param excClass: As for the except statement, this may be either an
991
exception class, or a tuple of classes.
992
:param callableObj: A callable, will be passed ``*args`` and
995
Returns the exception so that you can examine it.
998
callableObj(*args, **kwargs)
1002
if getattr(excClass,'__name__', None) is not None:
1003
excName = excClass.__name__
1006
excName = str(excClass)
1007
raise self.failureException, "%s not raised" % excName
1009
def assertIs(self, left, right, message=None):
1010
if not (left is right):
1011
if message is not None:
1012
raise AssertionError(message)
1014
raise AssertionError("%r is not %r." % (left, right))
1016
def assertIsNot(self, left, right, message=None):
1018
if message is not None:
1019
raise AssertionError(message)
1021
raise AssertionError("%r is %r." % (left, right))
1023
def assertTransportMode(self, transport, path, mode):
1024
"""Fail if a path does not have mode "mode".
1026
If modes are not supported on this transport, the assertion is ignored.
1028
if not transport._can_roundtrip_unix_modebits():
1030
path_stat = transport.stat(path)
1031
actual_mode = stat.S_IMODE(path_stat.st_mode)
1032
self.assertEqual(mode, actual_mode,
1033
'mode of %r incorrect (%s != %s)'
1034
% (path, oct(mode), oct(actual_mode)))
1036
def assertIsSameRealPath(self, path1, path2):
1037
"""Fail if path1 and path2 points to different files"""
1038
self.assertEqual(osutils.realpath(path1),
1039
osutils.realpath(path2),
1040
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1042
def assertIsInstance(self, obj, kls):
1043
"""Fail if obj is not an instance of kls"""
1044
if not isinstance(obj, kls):
1045
self.fail("%r is an instance of %s rather than %s" % (
1046
obj, obj.__class__, kls))
1048
def expectFailure(self, reason, assertion, *args, **kwargs):
1049
"""Invoke a test, expecting it to fail for the given reason.
1051
This is for assertions that ought to succeed, but currently fail.
1052
(The failure is *expected* but not *wanted*.) Please be very precise
1053
about the failure you're expecting. If a new bug is introduced,
1054
AssertionError should be raised, not KnownFailure.
1056
Frequently, expectFailure should be followed by an opposite assertion.
1059
Intended to be used with a callable that raises AssertionError as the
1060
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1062
Raises KnownFailure if the test fails. Raises AssertionError if the
1067
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1069
self.assertEqual(42, dynamic_val)
1071
This means that a dynamic_val of 54 will cause the test to raise
1072
a KnownFailure. Once math is fixed and the expectFailure is removed,
1073
only a dynamic_val of 42 will allow the test to pass. Anything other
1074
than 54 or 42 will cause an AssertionError.
1077
assertion(*args, **kwargs)
1078
except AssertionError:
1079
raise KnownFailure(reason)
1081
self.fail('Unexpected success. Should have failed: %s' % reason)
1083
def assertFileEqual(self, content, path):
1084
"""Fail if path does not contain 'content'."""
1085
self.failUnlessExists(path)
1086
f = file(path, 'rb')
1091
self.assertEqualDiff(content, s)
1093
def failUnlessExists(self, path):
1094
"""Fail unless path or paths, which may be abs or relative, exist."""
1095
if not isinstance(path, basestring):
1097
self.failUnlessExists(p)
1099
self.failUnless(osutils.lexists(path),path+" does not exist")
1101
def failIfExists(self, path):
1102
"""Fail if path or paths, which may be abs or relative, exist."""
1103
if not isinstance(path, basestring):
1105
self.failIfExists(p)
1107
self.failIf(osutils.lexists(path),path+" exists")
1109
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1110
"""A helper for callDeprecated and applyDeprecated.
1112
:param a_callable: A callable to call.
1113
:param args: The positional arguments for the callable
1114
:param kwargs: The keyword arguments for the callable
1115
:return: A tuple (warnings, result). result is the result of calling
1116
a_callable(``*args``, ``**kwargs``).
1119
def capture_warnings(msg, cls=None, stacklevel=None):
1120
# we've hooked into a deprecation specific callpath,
1121
# only deprecations should getting sent via it.
1122
self.assertEqual(cls, DeprecationWarning)
1123
local_warnings.append(msg)
1124
original_warning_method = symbol_versioning.warn
1125
symbol_versioning.set_warning_method(capture_warnings)
1127
result = a_callable(*args, **kwargs)
1129
symbol_versioning.set_warning_method(original_warning_method)
1130
return (local_warnings, result)
1132
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1133
"""Call a deprecated callable without warning the user.
1135
Note that this only captures warnings raised by symbol_versioning.warn,
1136
not other callers that go direct to the warning module.
1138
To test that a deprecated method raises an error, do something like
1141
self.assertRaises(errors.ReservedId,
1142
self.applyDeprecated,
1143
deprecated_in((1, 5, 0)),
1147
:param deprecation_format: The deprecation format that the callable
1148
should have been deprecated with. This is the same type as the
1149
parameter to deprecated_method/deprecated_function. If the
1150
callable is not deprecated with this format, an assertion error
1152
:param a_callable: A callable to call. This may be a bound method or
1153
a regular function. It will be called with ``*args`` and
1155
:param args: The positional arguments for the callable
1156
:param kwargs: The keyword arguments for the callable
1157
:return: The result of a_callable(``*args``, ``**kwargs``)
1159
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1161
expected_first_warning = symbol_versioning.deprecation_string(
1162
a_callable, deprecation_format)
1163
if len(call_warnings) == 0:
1164
self.fail("No deprecation warning generated by call to %s" %
1166
self.assertEqual(expected_first_warning, call_warnings[0])
1169
def callCatchWarnings(self, fn, *args, **kw):
1170
"""Call a callable that raises python warnings.
1172
The caller's responsible for examining the returned warnings.
1174
If the callable raises an exception, the exception is not
1175
caught and propagates up to the caller. In that case, the list
1176
of warnings is not available.
1178
:returns: ([warning_object, ...], fn_result)
1180
# XXX: This is not perfect, because it completely overrides the
1181
# warnings filters, and some code may depend on suppressing particular
1182
# warnings. It's the easiest way to insulate ourselves from -Werror,
1183
# though. -- Andrew, 20071062
1185
def _catcher(message, category, filename, lineno, file=None, line=None):
1186
# despite the name, 'message' is normally(?) a Warning subclass
1188
wlist.append(message)
1189
saved_showwarning = warnings.showwarning
1190
saved_filters = warnings.filters
1192
warnings.showwarning = _catcher
1193
warnings.filters = []
1194
result = fn(*args, **kw)
1196
warnings.showwarning = saved_showwarning
1197
warnings.filters = saved_filters
1198
return wlist, result
1200
def callDeprecated(self, expected, callable, *args, **kwargs):
1201
"""Assert that a callable is deprecated in a particular way.
1203
This is a very precise test for unusual requirements. The
1204
applyDeprecated helper function is probably more suited for most tests
1205
as it allows you to simply specify the deprecation format being used
1206
and will ensure that that is issued for the function being called.
1208
Note that this only captures warnings raised by symbol_versioning.warn,
1209
not other callers that go direct to the warning module. To catch
1210
general warnings, use callCatchWarnings.
1212
:param expected: a list of the deprecation warnings expected, in order
1213
:param callable: The callable to call
1214
:param args: The positional arguments for the callable
1215
:param kwargs: The keyword arguments for the callable
1217
call_warnings, result = self._capture_deprecation_warnings(callable,
1219
self.assertEqual(expected, call_warnings)
1222
def _startLogFile(self):
1223
"""Send bzr and test log messages to a temporary file.
1225
The file is removed as the test is torn down.
1227
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1228
self._log_file = os.fdopen(fileno, 'w+')
1229
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1230
self._log_file_name = name
1231
self.addCleanup(self._finishLogFile)
1233
def _finishLogFile(self):
1234
"""Finished with the log file.
1236
Close the file and delete it, unless setKeepLogfile was called.
1238
if self._log_file is None:
1240
bzrlib.trace.pop_log_file(self._log_memento)
1241
self._log_file.close()
1242
self._log_file = None
1243
if not self._keep_log_file:
1244
os.remove(self._log_file_name)
1245
self._log_file_name = None
1247
def setKeepLogfile(self):
1248
"""Make the logfile not be deleted when _finishLogFile is called."""
1249
self._keep_log_file = True
1251
def addCleanup(self, callable, *args, **kwargs):
1252
"""Arrange to run a callable when this case is torn down.
1254
Callables are run in the reverse of the order they are registered,
1255
ie last-in first-out.
1257
self._cleanups.append((callable, args, kwargs))
1259
def _cleanEnvironment(self):
1261
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1262
'HOME': os.getcwd(),
1263
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1264
# tests do check our impls match APPDATA
1265
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1269
'BZREMAIL': None, # may still be present in the environment
1271
'BZR_PROGRESS_BAR': None,
1273
'BZR_PLUGIN_PATH': None,
1275
'SSH_AUTH_SOCK': None,
1279
'https_proxy': None,
1280
'HTTPS_PROXY': None,
1285
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1286
# least. If you do (care), please update this comment
1290
'BZR_REMOTE_PATH': None,
1293
self.addCleanup(self._restoreEnvironment)
1294
for name, value in new_env.iteritems():
1295
self._captureVar(name, value)
1297
def _captureVar(self, name, newvalue):
1298
"""Set an environment variable, and reset it when finished."""
1299
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1301
def _restore_debug_flags(self):
1302
debug.debug_flags.clear()
1303
debug.debug_flags.update(self._preserved_debug_flags)
1305
def _restoreEnvironment(self):
1306
for name, value in self.__old_env.iteritems():
1307
osutils.set_or_unset_env(name, value)
1309
def _restoreHooks(self):
1310
for klass, (name, hooks) in self._preserved_hooks.items():
1311
setattr(klass, name, hooks)
1313
def knownFailure(self, reason):
1314
"""This test has failed for some known reason."""
1315
raise KnownFailure(reason)
1317
def _do_skip(self, result, reason):
1318
addSkip = getattr(result, 'addSkip', None)
1319
if not callable(addSkip):
1320
result.addError(self, sys.exc_info())
1322
addSkip(self, reason)
1324
def run(self, result=None):
1325
if result is None: result = self.defaultTestResult()
1326
for feature in getattr(self, '_test_needs_features', []):
1327
if not feature.available():
1328
result.startTest(self)
1329
if getattr(result, 'addNotSupported', None):
1330
result.addNotSupported(self, feature)
1332
result.addSuccess(self)
1333
result.stopTest(self)
1337
result.startTest(self)
1338
absent_attr = object()
1340
method_name = getattr(self, '_testMethodName', absent_attr)
1341
if method_name is absent_attr:
1343
method_name = getattr(self, '_TestCase__testMethodName')
1344
testMethod = getattr(self, method_name)
1348
if not self._bzr_test_setUp_run:
1350
"test setUp did not invoke "
1351
"bzrlib.tests.TestCase's setUp")
1352
except KeyboardInterrupt:
1354
except TestSkipped, e:
1355
self._do_skip(result, e.args[0])
1359
result.addError(self, sys.exc_info())
1366
except self.failureException:
1367
result.addFailure(self, sys.exc_info())
1368
except TestSkipped, e:
1370
reason = "No reason given."
1373
self._do_skip(result, reason)
1374
except KeyboardInterrupt:
1377
result.addError(self, sys.exc_info())
1381
if not self._bzr_test_tearDown_run:
1383
"test tearDown did not invoke "
1384
"bzrlib.tests.TestCase's tearDown")
1385
except KeyboardInterrupt:
1388
result.addError(self, sys.exc_info())
1390
if ok: result.addSuccess(self)
1392
result.stopTest(self)
1394
except TestNotApplicable:
1395
# Not moved from the result [yet].
1397
except KeyboardInterrupt:
1401
absent_attr = object()
1402
for attr_name in self.attrs_to_keep:
1403
attr = getattr(self, attr_name, absent_attr)
1404
if attr is not absent_attr:
1405
saved_attrs[attr_name] = attr
1406
self.__dict__ = saved_attrs
1409
self._bzr_test_tearDown_run = True
1411
self._log_contents = ''
1412
unittest.TestCase.tearDown(self)
1414
def time(self, callable, *args, **kwargs):
1415
"""Run callable and accrue the time it takes to the benchmark time.
1417
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1418
this will cause lsprofile statistics to be gathered and stored in
1421
if self._benchtime is None:
1425
if not self._gather_lsprof_in_benchmarks:
1426
return callable(*args, **kwargs)
1428
# record this benchmark
1429
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1431
self._benchcalls.append(((callable, args, kwargs), stats))
1434
self._benchtime += time.time() - start
1436
def _runCleanups(self):
1437
"""Run registered cleanup functions.
1439
This should only be called from TestCase.tearDown.
1441
# TODO: Perhaps this should keep running cleanups even if
1442
# one of them fails?
1444
# Actually pop the cleanups from the list so tearDown running
1445
# twice is safe (this happens for skipped tests).
1446
while self._cleanups:
1447
cleanup, args, kwargs = self._cleanups.pop()
1448
cleanup(*args, **kwargs)
1450
def log(self, *args):
1453
def _get_log(self, keep_log_file=False):
1454
"""Get the log from bzrlib.trace calls from this test.
1456
:param keep_log_file: When True, if the log is still a file on disk
1457
leave it as a file on disk. When False, if the log is still a file
1458
on disk, the log file is deleted and the log preserved as
1460
:return: A string containing the log.
1462
# flush the log file, to get all content
1464
if bzrlib.trace._trace_file:
1465
bzrlib.trace._trace_file.flush()
1466
if self._log_contents:
1467
# XXX: this can hardly contain the content flushed above --vila
1469
return self._log_contents
1470
if self._log_file_name is not None:
1471
logfile = open(self._log_file_name)
1473
log_contents = logfile.read()
1476
if not keep_log_file:
1477
self._log_contents = log_contents
1479
os.remove(self._log_file_name)
1481
if sys.platform == 'win32' and e.errno == errno.EACCES:
1482
sys.stderr.write(('Unable to delete log file '
1483
' %r\n' % self._log_file_name))
1488
return "DELETED log file to reduce memory footprint"
1490
def requireFeature(self, feature):
1491
"""This test requires a specific feature is available.
1493
:raises UnavailableFeature: When feature is not available.
1495
if not feature.available():
1496
raise UnavailableFeature(feature)
1498
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1500
"""Run bazaar command line, splitting up a string command line."""
1501
if isinstance(args, basestring):
1502
# shlex don't understand unicode strings,
1503
# so args should be plain string (bialix 20070906)
1504
args = list(shlex.split(str(args)))
1505
return self._run_bzr_core(args, retcode=retcode,
1506
encoding=encoding, stdin=stdin, working_dir=working_dir,
1509
def _run_bzr_core(self, args, retcode, encoding, stdin,
1511
if encoding is None:
1512
encoding = osutils.get_user_encoding()
1513
stdout = StringIOWrapper()
1514
stderr = StringIOWrapper()
1515
stdout.encoding = encoding
1516
stderr.encoding = encoding
1518
self.log('run bzr: %r', args)
1519
# FIXME: don't call into logging here
1520
handler = logging.StreamHandler(stderr)
1521
handler.setLevel(logging.INFO)
1522
logger = logging.getLogger('')
1523
logger.addHandler(handler)
1524
old_ui_factory = ui.ui_factory
1525
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1528
if working_dir is not None:
1529
cwd = osutils.getcwd()
1530
os.chdir(working_dir)
1533
result = self.apply_redirected(ui.ui_factory.stdin,
1535
bzrlib.commands.run_bzr_catch_user_errors,
1538
logger.removeHandler(handler)
1539
ui.ui_factory = old_ui_factory
1543
out = stdout.getvalue()
1544
err = stderr.getvalue()
1546
self.log('output:\n%r', out)
1548
self.log('errors:\n%r', err)
1549
if retcode is not None:
1550
self.assertEquals(retcode, result,
1551
message='Unexpected return code')
1554
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1555
working_dir=None, error_regexes=[], output_encoding=None):
1556
"""Invoke bzr, as if it were run from the command line.
1558
The argument list should not include the bzr program name - the
1559
first argument is normally the bzr command. Arguments may be
1560
passed in three ways:
1562
1- A list of strings, eg ["commit", "a"]. This is recommended
1563
when the command contains whitespace or metacharacters, or
1564
is built up at run time.
1566
2- A single string, eg "add a". This is the most convenient
1567
for hardcoded commands.
1569
This runs bzr through the interface that catches and reports
1570
errors, and with logging set to something approximating the
1571
default, so that error reporting can be checked.
1573
This should be the main method for tests that want to exercise the
1574
overall behavior of the bzr application (rather than a unit test
1575
or a functional test of the library.)
1577
This sends the stdout/stderr results into the test's log,
1578
where it may be useful for debugging. See also run_captured.
1580
:keyword stdin: A string to be used as stdin for the command.
1581
:keyword retcode: The status code the command should return;
1583
:keyword working_dir: The directory to run the command in
1584
:keyword error_regexes: A list of expected error messages. If
1585
specified they must be seen in the error output of the command.
1587
out, err = self._run_bzr_autosplit(
1592
working_dir=working_dir,
1594
for regex in error_regexes:
1595
self.assertContainsRe(err, regex)
1598
def run_bzr_error(self, error_regexes, *args, **kwargs):
1599
"""Run bzr, and check that stderr contains the supplied regexes
1601
:param error_regexes: Sequence of regular expressions which
1602
must each be found in the error output. The relative ordering
1604
:param args: command-line arguments for bzr
1605
:param kwargs: Keyword arguments which are interpreted by run_bzr
1606
This function changes the default value of retcode to be 3,
1607
since in most cases this is run when you expect bzr to fail.
1609
:return: (out, err) The actual output of running the command (in case
1610
you want to do more inspection)
1614
# Make sure that commit is failing because there is nothing to do
1615
self.run_bzr_error(['no changes to commit'],
1616
['commit', '-m', 'my commit comment'])
1617
# Make sure --strict is handling an unknown file, rather than
1618
# giving us the 'nothing to do' error
1619
self.build_tree(['unknown'])
1620
self.run_bzr_error(['Commit refused because there are unknown files'],
1621
['commit', --strict', '-m', 'my commit comment'])
1623
kwargs.setdefault('retcode', 3)
1624
kwargs['error_regexes'] = error_regexes
1625
out, err = self.run_bzr(*args, **kwargs)
1628
def run_bzr_subprocess(self, *args, **kwargs):
1629
"""Run bzr in a subprocess for testing.
1631
This starts a new Python interpreter and runs bzr in there.
1632
This should only be used for tests that have a justifiable need for
1633
this isolation: e.g. they are testing startup time, or signal
1634
handling, or early startup code, etc. Subprocess code can't be
1635
profiled or debugged so easily.
1637
:keyword retcode: The status code that is expected. Defaults to 0. If
1638
None is supplied, the status code is not checked.
1639
:keyword env_changes: A dictionary which lists changes to environment
1640
variables. A value of None will unset the env variable.
1641
The values must be strings. The change will only occur in the
1642
child, so you don't need to fix the environment after running.
1643
:keyword universal_newlines: Convert CRLF => LF
1644
:keyword allow_plugins: By default the subprocess is run with
1645
--no-plugins to ensure test reproducibility. Also, it is possible
1646
for system-wide plugins to create unexpected output on stderr,
1647
which can cause unnecessary test failures.
1649
env_changes = kwargs.get('env_changes', {})
1650
working_dir = kwargs.get('working_dir', None)
1651
allow_plugins = kwargs.get('allow_plugins', False)
1653
if isinstance(args[0], list):
1655
elif isinstance(args[0], basestring):
1656
args = list(shlex.split(args[0]))
1658
raise ValueError("passing varargs to run_bzr_subprocess")
1659
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1660
working_dir=working_dir,
1661
allow_plugins=allow_plugins)
1662
# We distinguish between retcode=None and retcode not passed.
1663
supplied_retcode = kwargs.get('retcode', 0)
1664
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1665
universal_newlines=kwargs.get('universal_newlines', False),
1668
def start_bzr_subprocess(self, process_args, env_changes=None,
1669
skip_if_plan_to_signal=False,
1671
allow_plugins=False):
1672
"""Start bzr in a subprocess for testing.
1674
This starts a new Python interpreter and runs bzr in there.
1675
This should only be used for tests that have a justifiable need for
1676
this isolation: e.g. they are testing startup time, or signal
1677
handling, or early startup code, etc. Subprocess code can't be
1678
profiled or debugged so easily.
1680
:param process_args: a list of arguments to pass to the bzr executable,
1681
for example ``['--version']``.
1682
:param env_changes: A dictionary which lists changes to environment
1683
variables. A value of None will unset the env variable.
1684
The values must be strings. The change will only occur in the
1685
child, so you don't need to fix the environment after running.
1686
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1688
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1690
:returns: Popen object for the started process.
1692
if skip_if_plan_to_signal:
1693
if not getattr(os, 'kill', None):
1694
raise TestSkipped("os.kill not available.")
1696
if env_changes is None:
1700
def cleanup_environment():
1701
for env_var, value in env_changes.iteritems():
1702
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1704
def restore_environment():
1705
for env_var, value in old_env.iteritems():
1706
osutils.set_or_unset_env(env_var, value)
1708
bzr_path = self.get_bzr_path()
1711
if working_dir is not None:
1712
cwd = osutils.getcwd()
1713
os.chdir(working_dir)
1716
# win32 subprocess doesn't support preexec_fn
1717
# so we will avoid using it on all platforms, just to
1718
# make sure the code path is used, and we don't break on win32
1719
cleanup_environment()
1720
command = [sys.executable]
1721
# frozen executables don't need the path to bzr
1722
if getattr(sys, "frozen", None) is None:
1723
command.append(bzr_path)
1724
if not allow_plugins:
1725
command.append('--no-plugins')
1726
command.extend(process_args)
1727
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1729
restore_environment()
1735
def _popen(self, *args, **kwargs):
1736
"""Place a call to Popen.
1738
Allows tests to override this method to intercept the calls made to
1739
Popen for introspection.
1741
return Popen(*args, **kwargs)
1743
def get_bzr_path(self):
1744
"""Return the path of the 'bzr' executable for this test suite."""
1745
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1746
if not os.path.isfile(bzr_path):
1747
# We are probably installed. Assume sys.argv is the right file
1748
bzr_path = sys.argv[0]
1751
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1752
universal_newlines=False, process_args=None):
1753
"""Finish the execution of process.
1755
:param process: the Popen object returned from start_bzr_subprocess.
1756
:param retcode: The status code that is expected. Defaults to 0. If
1757
None is supplied, the status code is not checked.
1758
:param send_signal: an optional signal to send to the process.
1759
:param universal_newlines: Convert CRLF => LF
1760
:returns: (stdout, stderr)
1762
if send_signal is not None:
1763
os.kill(process.pid, send_signal)
1764
out, err = process.communicate()
1766
if universal_newlines:
1767
out = out.replace('\r\n', '\n')
1768
err = err.replace('\r\n', '\n')
1770
if retcode is not None and retcode != process.returncode:
1771
if process_args is None:
1772
process_args = "(unknown args)"
1773
mutter('Output of bzr %s:\n%s', process_args, out)
1774
mutter('Error for bzr %s:\n%s', process_args, err)
1775
self.fail('Command bzr %s failed with retcode %s != %s'
1776
% (process_args, retcode, process.returncode))
1779
def check_inventory_shape(self, inv, shape):
1780
"""Compare an inventory to a list of expected names.
1782
Fail if they are not precisely equal.
1785
shape = list(shape) # copy
1786
for path, ie in inv.entries():
1787
name = path.replace('\\', '/')
1788
if ie.kind == 'directory':
1795
self.fail("expected paths not found in inventory: %r" % shape)
1797
self.fail("unexpected paths found in inventory: %r" % extras)
1799
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1800
a_callable=None, *args, **kwargs):
1801
"""Call callable with redirected std io pipes.
1803
Returns the return code."""
1804
if not callable(a_callable):
1805
raise ValueError("a_callable must be callable.")
1807
stdin = StringIO("")
1809
if getattr(self, "_log_file", None) is not None:
1810
stdout = self._log_file
1814
if getattr(self, "_log_file", None is not None):
1815
stderr = self._log_file
1818
real_stdin = sys.stdin
1819
real_stdout = sys.stdout
1820
real_stderr = sys.stderr
1825
return a_callable(*args, **kwargs)
1827
sys.stdout = real_stdout
1828
sys.stderr = real_stderr
1829
sys.stdin = real_stdin
1831
def reduceLockdirTimeout(self):
1832
"""Reduce the default lock timeout for the duration of the test, so that
1833
if LockContention occurs during a test, it does so quickly.
1835
Tests that expect to provoke LockContention errors should call this.
1837
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1839
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1840
self.addCleanup(resetTimeout)
1841
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1843
def make_utf8_encoded_stringio(self, encoding_type=None):
1844
"""Return a StringIOWrapper instance, that will encode Unicode
1847
if encoding_type is None:
1848
encoding_type = 'strict'
1850
output_encoding = 'utf-8'
1851
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1852
sio.encoding = output_encoding
1856
class CapturedCall(object):
1857
"""A helper for capturing smart server calls for easy debug analysis."""
1859
def __init__(self, params, prefix_length):
1860
"""Capture the call with params and skip prefix_length stack frames."""
1863
# The last 5 frames are the __init__, the hook frame, and 3 smart
1864
# client frames. Beyond this we could get more clever, but this is good
1866
stack = traceback.extract_stack()[prefix_length:-5]
1867
self.stack = ''.join(traceback.format_list(stack))
1870
return self.call.method
1873
return self.call.method
1879
class TestCaseWithMemoryTransport(TestCase):
1880
"""Common test class for tests that do not need disk resources.
1882
Tests that need disk resources should derive from TestCaseWithTransport.
1884
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1886
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1887
a directory which does not exist. This serves to help ensure test isolation
1888
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1889
must exist. However, TestCaseWithMemoryTransport does not offer local
1890
file defaults for the transport in tests, nor does it obey the command line
1891
override, so tests that accidentally write to the common directory should
1894
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1895
a .bzr directory that stops us ascending higher into the filesystem.
1901
def __init__(self, methodName='runTest'):
1902
# allow test parameterization after test construction and before test
1903
# execution. Variables that the parameterizer sets need to be
1904
# ones that are not set by setUp, or setUp will trash them.
1905
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1906
self.vfs_transport_factory = default_transport
1907
self.transport_server = None
1908
self.transport_readonly_server = None
1909
self.__vfs_server = None
1911
def get_transport(self, relpath=None):
1912
"""Return a writeable transport.
1914
This transport is for the test scratch space relative to
1917
:param relpath: a path relative to the base url.
1919
t = get_transport(self.get_url(relpath))
1920
self.assertFalse(t.is_readonly())
1923
def get_readonly_transport(self, relpath=None):
1924
"""Return a readonly transport for the test scratch space
1926
This can be used to test that operations which should only need
1927
readonly access in fact do not try to write.
1929
:param relpath: a path relative to the base url.
1931
t = get_transport(self.get_readonly_url(relpath))
1932
self.assertTrue(t.is_readonly())
1935
def create_transport_readonly_server(self):
1936
"""Create a transport server from class defined at init.
1938
This is mostly a hook for daughter classes.
1940
return self.transport_readonly_server()
1942
def get_readonly_server(self):
1943
"""Get the server instance for the readonly transport
1945
This is useful for some tests with specific servers to do diagnostics.
1947
if self.__readonly_server is None:
1948
if self.transport_readonly_server is None:
1949
# readonly decorator requested
1950
# bring up the server
1951
self.__readonly_server = ReadonlyServer()
1952
self.__readonly_server.setUp(self.get_vfs_only_server())
1954
self.__readonly_server = self.create_transport_readonly_server()
1955
self.__readonly_server.setUp(self.get_vfs_only_server())
1956
self.addCleanup(self.__readonly_server.tearDown)
1957
return self.__readonly_server
1959
def get_readonly_url(self, relpath=None):
1960
"""Get a URL for the readonly transport.
1962
This will either be backed by '.' or a decorator to the transport
1963
used by self.get_url()
1964
relpath provides for clients to get a path relative to the base url.
1965
These should only be downwards relative, not upwards.
1967
base = self.get_readonly_server().get_url()
1968
return self._adjust_url(base, relpath)
1970
def get_vfs_only_server(self):
1971
"""Get the vfs only read/write server instance.
1973
This is useful for some tests with specific servers that need
1976
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1977
is no means to override it.
1979
if self.__vfs_server is None:
1980
self.__vfs_server = MemoryServer()
1981
self.__vfs_server.setUp()
1982
self.addCleanup(self.__vfs_server.tearDown)
1983
return self.__vfs_server
1985
def get_server(self):
1986
"""Get the read/write server instance.
1988
This is useful for some tests with specific servers that need
1991
This is built from the self.transport_server factory. If that is None,
1992
then the self.get_vfs_server is returned.
1994
if self.__server is None:
1995
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1996
return self.get_vfs_only_server()
1998
# bring up a decorated means of access to the vfs only server.
1999
self.__server = self.transport_server()
2001
self.__server.setUp(self.get_vfs_only_server())
2002
except TypeError, e:
2003
# This should never happen; the try:Except here is to assist
2004
# developers having to update code rather than seeing an
2005
# uninformative TypeError.
2006
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
2007
self.addCleanup(self.__server.tearDown)
2008
return self.__server
2010
def _adjust_url(self, base, relpath):
2011
"""Get a URL (or maybe a path) for the readwrite transport.
2013
This will either be backed by '.' or to an equivalent non-file based
2015
relpath provides for clients to get a path relative to the base url.
2016
These should only be downwards relative, not upwards.
2018
if relpath is not None and relpath != '.':
2019
if not base.endswith('/'):
2021
# XXX: Really base should be a url; we did after all call
2022
# get_url()! But sometimes it's just a path (from
2023
# LocalAbspathServer), and it'd be wrong to append urlescaped data
2024
# to a non-escaped local path.
2025
if base.startswith('./') or base.startswith('/'):
2028
base += urlutils.escape(relpath)
2031
def get_url(self, relpath=None):
2032
"""Get a URL (or maybe a path) for the readwrite transport.
2034
This will either be backed by '.' or to an equivalent non-file based
2036
relpath provides for clients to get a path relative to the base url.
2037
These should only be downwards relative, not upwards.
2039
base = self.get_server().get_url()
2040
return self._adjust_url(base, relpath)
2042
def get_vfs_only_url(self, relpath=None):
2043
"""Get a URL (or maybe a path for the plain old vfs transport.
2045
This will never be a smart protocol. It always has all the
2046
capabilities of the local filesystem, but it might actually be a
2047
MemoryTransport or some other similar virtual filesystem.
2049
This is the backing transport (if any) of the server returned by
2050
get_url and get_readonly_url.
2052
:param relpath: provides for clients to get a path relative to the base
2053
url. These should only be downwards relative, not upwards.
2056
base = self.get_vfs_only_server().get_url()
2057
return self._adjust_url(base, relpath)
2059
def _create_safety_net(self):
2060
"""Make a fake bzr directory.
2062
This prevents any tests propagating up onto the TEST_ROOT directory's
2065
root = TestCaseWithMemoryTransport.TEST_ROOT
2066
bzrdir.BzrDir.create_standalone_workingtree(root)
2068
def _check_safety_net(self):
2069
"""Check that the safety .bzr directory have not been touched.
2071
_make_test_root have created a .bzr directory to prevent tests from
2072
propagating. This method ensures than a test did not leaked.
2074
root = TestCaseWithMemoryTransport.TEST_ROOT
2075
wt = workingtree.WorkingTree.open(root)
2076
last_rev = wt.last_revision()
2077
if last_rev != 'null:':
2078
# The current test have modified the /bzr directory, we need to
2079
# recreate a new one or all the followng tests will fail.
2080
# If you need to inspect its content uncomment the following line
2081
# import pdb; pdb.set_trace()
2082
_rmtree_temp_dir(root + '/.bzr')
2083
self._create_safety_net()
2084
raise AssertionError('%s/.bzr should not be modified' % root)
2086
def _make_test_root(self):
2087
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2088
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
2089
TestCaseWithMemoryTransport.TEST_ROOT = root
2091
self._create_safety_net()
2093
# The same directory is used by all tests, and we're not
2094
# specifically told when all tests are finished. This will do.
2095
atexit.register(_rmtree_temp_dir, root)
2097
self.addCleanup(self._check_safety_net)
2099
def makeAndChdirToTestDir(self):
2100
"""Create a temporary directories for this one test.
2102
This must set self.test_home_dir and self.test_dir and chdir to
2105
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2107
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2108
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2109
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2111
def make_branch(self, relpath, format=None):
2112
"""Create a branch on the transport at relpath."""
2113
repo = self.make_repository(relpath, format=format)
2114
return repo.bzrdir.create_branch()
2116
def make_bzrdir(self, relpath, format=None):
2118
# might be a relative or absolute path
2119
maybe_a_url = self.get_url(relpath)
2120
segments = maybe_a_url.rsplit('/', 1)
2121
t = get_transport(maybe_a_url)
2122
if len(segments) > 1 and segments[-1] not in ('', '.'):
2126
if isinstance(format, basestring):
2127
format = bzrdir.format_registry.make_bzrdir(format)
2128
return format.initialize_on_transport(t)
2129
except errors.UninitializableFormat:
2130
raise TestSkipped("Format %s is not initializable." % format)
2132
def make_repository(self, relpath, shared=False, format=None):
2133
"""Create a repository on our default transport at relpath.
2135
Note that relpath must be a relative path, not a full url.
2137
# FIXME: If you create a remoterepository this returns the underlying
2138
# real format, which is incorrect. Actually we should make sure that
2139
# RemoteBzrDir returns a RemoteRepository.
2140
# maybe mbp 20070410
2141
made_control = self.make_bzrdir(relpath, format=format)
2142
return made_control.create_repository(shared=shared)
2144
def make_smart_server(self, path):
2145
smart_server = server.SmartTCPServer_for_testing()
2146
smart_server.setUp(self.get_server())
2147
remote_transport = get_transport(smart_server.get_url()).clone(path)
2148
self.addCleanup(smart_server.tearDown)
2149
return remote_transport
2151
def make_branch_and_memory_tree(self, relpath, format=None):
2152
"""Create a branch on the default transport and a MemoryTree for it."""
2153
b = self.make_branch(relpath, format=format)
2154
return memorytree.MemoryTree.create_on_branch(b)
2156
def make_branch_builder(self, relpath, format=None):
2157
branch = self.make_branch(relpath, format=format)
2158
return branchbuilder.BranchBuilder(branch=branch)
2160
def overrideEnvironmentForTesting(self):
2161
os.environ['HOME'] = self.test_home_dir
2162
os.environ['BZR_HOME'] = self.test_home_dir
2165
super(TestCaseWithMemoryTransport, self).setUp()
2166
self._make_test_root()
2167
_currentdir = os.getcwdu()
2168
def _leaveDirectory():
2169
os.chdir(_currentdir)
2170
self.addCleanup(_leaveDirectory)
2171
self.makeAndChdirToTestDir()
2172
self.overrideEnvironmentForTesting()
2173
self.__readonly_server = None
2174
self.__server = None
2175
self.reduceLockdirTimeout()
2177
def setup_smart_server_with_call_log(self):
2178
"""Sets up a smart server as the transport server with a call log."""
2179
self.transport_server = server.SmartTCPServer_for_testing
2180
self.hpss_calls = []
2182
# Skip the current stack down to the caller of
2183
# setup_smart_server_with_call_log
2184
prefix_length = len(traceback.extract_stack()) - 2
2185
def capture_hpss_call(params):
2186
self.hpss_calls.append(
2187
CapturedCall(params, prefix_length))
2188
client._SmartClient.hooks.install_named_hook(
2189
'call', capture_hpss_call, None)
2191
def reset_smart_call_log(self):
2192
self.hpss_calls = []
2195
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2196
"""Derived class that runs a test within a temporary directory.
2198
This is useful for tests that need to create a branch, etc.
2200
The directory is created in a slightly complex way: for each
2201
Python invocation, a new temporary top-level directory is created.
2202
All test cases create their own directory within that. If the
2203
tests complete successfully, the directory is removed.
2205
:ivar test_base_dir: The path of the top-level directory for this
2206
test, which contains a home directory and a work directory.
2208
:ivar test_home_dir: An initially empty directory under test_base_dir
2209
which is used as $HOME for this test.
2211
:ivar test_dir: A directory under test_base_dir used as the current
2212
directory when the test proper is run.
2215
OVERRIDE_PYTHON = 'python'
2217
def check_file_contents(self, filename, expect):
2218
self.log("check contents of file %s" % filename)
2219
contents = file(filename, 'r').read()
2220
if contents != expect:
2221
self.log("expected: %r" % expect)
2222
self.log("actually: %r" % contents)
2223
self.fail("contents of %s not as expected" % filename)
2225
def _getTestDirPrefix(self):
2226
# create a directory within the top level test directory
2227
if sys.platform == 'win32':
2228
name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2229
# windows is likely to have path-length limits so use a short name
2230
name_prefix = name_prefix[-30:]
2232
name_prefix = re.sub('[/]', '_', self.id())
2235
def makeAndChdirToTestDir(self):
2236
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2238
For TestCaseInTempDir we create a temporary directory based on the test
2239
name and then create two subdirs - test and home under it.
2241
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2242
self._getTestDirPrefix())
2244
for i in range(100):
2245
if os.path.exists(name):
2246
name = name_prefix + '_' + str(i)
2250
# now create test and home directories within this dir
2251
self.test_base_dir = name
2252
self.test_home_dir = self.test_base_dir + '/home'
2253
os.mkdir(self.test_home_dir)
2254
self.test_dir = self.test_base_dir + '/work'
2255
os.mkdir(self.test_dir)
2256
os.chdir(self.test_dir)
2257
# put name of test inside
2258
f = file(self.test_base_dir + '/name', 'w')
2263
self.addCleanup(self.deleteTestDir)
2265
def deleteTestDir(self):
2266
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2267
_rmtree_temp_dir(self.test_base_dir)
2269
def build_tree(self, shape, line_endings='binary', transport=None):
2270
"""Build a test tree according to a pattern.
2272
shape is a sequence of file specifications. If the final
2273
character is '/', a directory is created.
2275
This assumes that all the elements in the tree being built are new.
2277
This doesn't add anything to a branch.
2279
:type shape: list or tuple.
2280
:param line_endings: Either 'binary' or 'native'
2281
in binary mode, exact contents are written in native mode, the
2282
line endings match the default platform endings.
2283
:param transport: A transport to write to, for building trees on VFS's.
2284
If the transport is readonly or None, "." is opened automatically.
2287
if type(shape) not in (list, tuple):
2288
raise AssertionError("Parameter 'shape' should be "
2289
"a list or a tuple. Got %r instead" % (shape,))
2290
# It's OK to just create them using forward slashes on windows.
2291
if transport is None or transport.is_readonly():
2292
transport = get_transport(".")
2294
self.assertIsInstance(name, basestring)
2296
transport.mkdir(urlutils.escape(name[:-1]))
2298
if line_endings == 'binary':
2300
elif line_endings == 'native':
2303
raise errors.BzrError(
2304
'Invalid line ending request %r' % line_endings)
2305
content = "contents of %s%s" % (name.encode('utf-8'), end)
2306
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2308
def build_tree_contents(self, shape):
2309
build_tree_contents(shape)
2311
def assertInWorkingTree(self, path, root_path='.', tree=None):
2312
"""Assert whether path or paths are in the WorkingTree"""
2314
tree = workingtree.WorkingTree.open(root_path)
2315
if not isinstance(path, basestring):
2317
self.assertInWorkingTree(p, tree=tree)
2319
self.assertIsNot(tree.path2id(path), None,
2320
path+' not in working tree.')
2322
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2323
"""Assert whether path or paths are not in the WorkingTree"""
2325
tree = workingtree.WorkingTree.open(root_path)
2326
if not isinstance(path, basestring):
2328
self.assertNotInWorkingTree(p,tree=tree)
2330
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2333
class TestCaseWithTransport(TestCaseInTempDir):
2334
"""A test case that provides get_url and get_readonly_url facilities.
2336
These back onto two transport servers, one for readonly access and one for
2339
If no explicit class is provided for readonly access, a
2340
ReadonlyTransportDecorator is used instead which allows the use of non disk
2341
based read write transports.
2343
If an explicit class is provided for readonly access, that server and the
2344
readwrite one must both define get_url() as resolving to os.getcwd().
2347
def get_vfs_only_server(self):
2348
"""See TestCaseWithMemoryTransport.
2350
This is useful for some tests with specific servers that need
2353
if self.__vfs_server is None:
2354
self.__vfs_server = self.vfs_transport_factory()
2355
self.__vfs_server.setUp()
2356
self.addCleanup(self.__vfs_server.tearDown)
2357
return self.__vfs_server
2359
def make_branch_and_tree(self, relpath, format=None):
2360
"""Create a branch on the transport and a tree locally.
2362
If the transport is not a LocalTransport, the Tree can't be created on
2363
the transport. In that case if the vfs_transport_factory is
2364
LocalURLServer the working tree is created in the local
2365
directory backing the transport, and the returned tree's branch and
2366
repository will also be accessed locally. Otherwise a lightweight
2367
checkout is created and returned.
2369
:param format: The BzrDirFormat.
2370
:returns: the WorkingTree.
2372
# TODO: always use the local disk path for the working tree,
2373
# this obviously requires a format that supports branch references
2374
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2376
b = self.make_branch(relpath, format=format)
2378
return b.bzrdir.create_workingtree()
2379
except errors.NotLocalUrl:
2380
# We can only make working trees locally at the moment. If the
2381
# transport can't support them, then we keep the non-disk-backed
2382
# branch and create a local checkout.
2383
if self.vfs_transport_factory is LocalURLServer:
2384
# the branch is colocated on disk, we cannot create a checkout.
2385
# hopefully callers will expect this.
2386
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2387
wt = local_controldir.create_workingtree()
2388
if wt.branch._format != b._format:
2390
# Make sure that assigning to wt._branch fixes wt.branch,
2391
# in case the implementation details of workingtree objects
2393
self.assertIs(b, wt.branch)
2396
return b.create_checkout(relpath, lightweight=True)
2398
def assertIsDirectory(self, relpath, transport):
2399
"""Assert that relpath within transport is a directory.
2401
This may not be possible on all transports; in that case it propagates
2402
a TransportNotPossible.
2405
mode = transport.stat(relpath).st_mode
2406
except errors.NoSuchFile:
2407
self.fail("path %s is not a directory; no such file"
2409
if not stat.S_ISDIR(mode):
2410
self.fail("path %s is not a directory; has mode %#o"
2413
def assertTreesEqual(self, left, right):
2414
"""Check that left and right have the same content and properties."""
2415
# we use a tree delta to check for equality of the content, and we
2416
# manually check for equality of other things such as the parents list.
2417
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2418
differences = left.changes_from(right)
2419
self.assertFalse(differences.has_changed(),
2420
"Trees %r and %r are different: %r" % (left, right, differences))
2423
super(TestCaseWithTransport, self).setUp()
2424
self.__vfs_server = None
2427
class ChrootedTestCase(TestCaseWithTransport):
2428
"""A support class that provides readonly urls outside the local namespace.
2430
This is done by checking if self.transport_server is a MemoryServer. if it
2431
is then we are chrooted already, if it is not then an HttpServer is used
2434
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2435
be used without needed to redo it when a different
2436
subclass is in use ?
2440
super(ChrootedTestCase, self).setUp()
2441
if not self.vfs_transport_factory == MemoryServer:
2442
self.transport_readonly_server = HttpServer
2445
def condition_id_re(pattern):
2446
"""Create a condition filter which performs a re check on a test's id.
2448
:param pattern: A regular expression string.
2449
:return: A callable that returns True if the re matches.
2451
filter_re = osutils.re_compile_checked(pattern, 0,
2453
def condition(test):
2455
return filter_re.search(test_id)
2459
def condition_isinstance(klass_or_klass_list):
2460
"""Create a condition filter which returns isinstance(param, klass).
2462
:return: A callable which when called with one parameter obj return the
2463
result of isinstance(obj, klass_or_klass_list).
2466
return isinstance(obj, klass_or_klass_list)
2470
def condition_id_in_list(id_list):
2471
"""Create a condition filter which verify that test's id in a list.
2473
:param id_list: A TestIdList object.
2474
:return: A callable that returns True if the test's id appears in the list.
2476
def condition(test):
2477
return id_list.includes(test.id())
2481
def condition_id_startswith(starts):
2482
"""Create a condition filter verifying that test's id starts with a string.
2484
:param starts: A list of string.
2485
:return: A callable that returns True if the test's id starts with one of
2488
def condition(test):
2489
for start in starts:
2490
if test.id().startswith(start):
2496
def exclude_tests_by_condition(suite, condition):
2497
"""Create a test suite which excludes some tests from suite.
2499
:param suite: The suite to get tests from.
2500
:param condition: A callable whose result evaluates True when called with a
2501
test case which should be excluded from the result.
2502
:return: A suite which contains the tests found in suite that fail
2506
for test in iter_suite_tests(suite):
2507
if not condition(test):
2509
return TestUtil.TestSuite(result)
2512
def filter_suite_by_condition(suite, condition):
2513
"""Create a test suite by filtering another one.
2515
:param suite: The source suite.
2516
:param condition: A callable whose result evaluates True when called with a
2517
test case which should be included in the result.
2518
:return: A suite which contains the tests found in suite that pass
2522
for test in iter_suite_tests(suite):
2525
return TestUtil.TestSuite(result)
2528
def filter_suite_by_re(suite, pattern):
2529
"""Create a test suite by filtering another one.
2531
:param suite: the source suite
2532
:param pattern: pattern that names must match
2533
:returns: the newly created suite
2535
condition = condition_id_re(pattern)
2536
result_suite = filter_suite_by_condition(suite, condition)
2540
def filter_suite_by_id_list(suite, test_id_list):
2541
"""Create a test suite by filtering another one.
2543
:param suite: The source suite.
2544
:param test_id_list: A list of the test ids to keep as strings.
2545
:returns: the newly created suite
2547
condition = condition_id_in_list(test_id_list)
2548
result_suite = filter_suite_by_condition(suite, condition)
2552
def filter_suite_by_id_startswith(suite, start):
2553
"""Create a test suite by filtering another one.
2555
:param suite: The source suite.
2556
:param start: A list of string the test id must start with one of.
2557
:returns: the newly created suite
2559
condition = condition_id_startswith(start)
2560
result_suite = filter_suite_by_condition(suite, condition)
2564
def exclude_tests_by_re(suite, pattern):
2565
"""Create a test suite which excludes some tests from suite.
2567
:param suite: The suite to get tests from.
2568
:param pattern: A regular expression string. Test ids that match this
2569
pattern will be excluded from the result.
2570
:return: A TestSuite that contains all the tests from suite without the
2571
tests that matched pattern. The order of tests is the same as it was in
2574
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2577
def preserve_input(something):
2578
"""A helper for performing test suite transformation chains.
2580
:param something: Anything you want to preserve.
2586
def randomize_suite(suite):
2587
"""Return a new TestSuite with suite's tests in random order.
2589
The tests in the input suite are flattened into a single suite in order to
2590
accomplish this. Any nested TestSuites are removed to provide global
2593
tests = list(iter_suite_tests(suite))
2594
random.shuffle(tests)
2595
return TestUtil.TestSuite(tests)
2598
def split_suite_by_condition(suite, condition):
2599
"""Split a test suite into two by a condition.
2601
:param suite: The suite to split.
2602
:param condition: The condition to match on. Tests that match this
2603
condition are returned in the first test suite, ones that do not match
2604
are in the second suite.
2605
:return: A tuple of two test suites, where the first contains tests from
2606
suite matching the condition, and the second contains the remainder
2607
from suite. The order within each output suite is the same as it was in
2612
for test in iter_suite_tests(suite):
2614
matched.append(test)
2616
did_not_match.append(test)
2617
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2620
def split_suite_by_re(suite, pattern):
2621
"""Split a test suite into two by a regular expression.
2623
:param suite: The suite to split.
2624
:param pattern: A regular expression string. Test ids that match this
2625
pattern will be in the first test suite returned, and the others in the
2626
second test suite returned.
2627
:return: A tuple of two test suites, where the first contains tests from
2628
suite matching pattern, and the second contains the remainder from
2629
suite. The order within each output suite is the same as it was in
2632
return split_suite_by_condition(suite, condition_id_re(pattern))
2635
def run_suite(suite, name='test', verbose=False, pattern=".*",
2636
stop_on_failure=False,
2637
transport=None, lsprof_timed=None, bench_history=None,
2638
matching_tests_first=None,
2641
exclude_pattern=None,
2644
suite_decorators=None,
2646
"""Run a test suite for bzr selftest.
2648
:param runner_class: The class of runner to use. Must support the
2649
constructor arguments passed by run_suite which are more than standard
2651
:return: A boolean indicating success.
2653
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2658
if runner_class is None:
2659
runner_class = TextTestRunner
2662
runner = runner_class(stream=stream,
2664
verbosity=verbosity,
2665
bench_history=bench_history,
2666
list_only=list_only,
2669
runner.stop_on_failure=stop_on_failure
2670
# built in decorator factories:
2672
random_order(random_seed, runner),
2673
exclude_tests(exclude_pattern),
2675
if matching_tests_first:
2676
decorators.append(tests_first(pattern))
2678
decorators.append(filter_tests(pattern))
2679
if suite_decorators:
2680
decorators.extend(suite_decorators)
2681
for decorator in decorators:
2682
suite = decorator(suite)
2683
result = runner.run(suite)
2688
return result.wasStrictlySuccessful()
2690
return result.wasSuccessful()
2693
# A registry where get() returns a suite decorator.
2694
parallel_registry = registry.Registry()
2697
def fork_decorator(suite):
2698
concurrency = local_concurrency()
2699
if concurrency == 1:
2701
from testtools import ConcurrentTestSuite
2702
return ConcurrentTestSuite(suite, fork_for_tests)
2703
parallel_registry.register('fork', fork_decorator)
2706
def subprocess_decorator(suite):
2707
concurrency = local_concurrency()
2708
if concurrency == 1:
2710
from testtools import ConcurrentTestSuite
2711
return ConcurrentTestSuite(suite, reinvoke_for_tests)
2712
parallel_registry.register('subprocess', subprocess_decorator)
2715
def exclude_tests(exclude_pattern):
2716
"""Return a test suite decorator that excludes tests."""
2717
if exclude_pattern is None:
2718
return identity_decorator
2719
def decorator(suite):
2720
return ExcludeDecorator(suite, exclude_pattern)
2724
def filter_tests(pattern):
2726
return identity_decorator
2727
def decorator(suite):
2728
return FilterTestsDecorator(suite, pattern)
2732
def random_order(random_seed, runner):
2733
"""Return a test suite decorator factory for randomising tests order.
2735
:param random_seed: now, a string which casts to a long, or a long.
2736
:param runner: A test runner with a stream attribute to report on.
2738
if random_seed is None:
2739
return identity_decorator
2740
def decorator(suite):
2741
return RandomDecorator(suite, random_seed, runner.stream)
2745
def tests_first(pattern):
2747
return identity_decorator
2748
def decorator(suite):
2749
return TestFirstDecorator(suite, pattern)
2753
def identity_decorator(suite):
2758
class TestDecorator(TestSuite):
2759
"""A decorator for TestCase/TestSuite objects.
2761
Usually, subclasses should override __iter__(used when flattening test
2762
suites), which we do to filter, reorder, parallelise and so on, run() and
2766
def __init__(self, suite):
2767
TestSuite.__init__(self)
2770
def countTestCases(self):
2773
cases += test.countTestCases()
2780
def run(self, result):
2781
# Use iteration on self, not self._tests, to allow subclasses to hook
2784
if result.shouldStop:
2790
class ExcludeDecorator(TestDecorator):
2791
"""A decorator which excludes test matching an exclude pattern."""
2793
def __init__(self, suite, exclude_pattern):
2794
TestDecorator.__init__(self, suite)
2795
self.exclude_pattern = exclude_pattern
2796
self.excluded = False
2800
return iter(self._tests)
2801
self.excluded = True
2802
suite = exclude_tests_by_re(self, self.exclude_pattern)
2804
self.addTests(suite)
2805
return iter(self._tests)
2808
class FilterTestsDecorator(TestDecorator):
2809
"""A decorator which filters tests to those matching a pattern."""
2811
def __init__(self, suite, pattern):
2812
TestDecorator.__init__(self, suite)
2813
self.pattern = pattern
2814
self.filtered = False
2818
return iter(self._tests)
2819
self.filtered = True
2820
suite = filter_suite_by_re(self, self.pattern)
2822
self.addTests(suite)
2823
return iter(self._tests)
2826
class RandomDecorator(TestDecorator):
2827
"""A decorator which randomises the order of its tests."""
2829
def __init__(self, suite, random_seed, stream):
2830
TestDecorator.__init__(self, suite)
2831
self.random_seed = random_seed
2832
self.randomised = False
2833
self.stream = stream
2837
return iter(self._tests)
2838
self.randomised = True
2839
self.stream.writeln("Randomizing test order using seed %s\n" %
2840
(self.actual_seed()))
2841
# Initialise the random number generator.
2842
random.seed(self.actual_seed())
2843
suite = randomize_suite(self)
2845
self.addTests(suite)
2846
return iter(self._tests)
2848
def actual_seed(self):
2849
if self.random_seed == "now":
2850
# We convert the seed to a long to make it reuseable across
2851
# invocations (because the user can reenter it).
2852
self.random_seed = long(time.time())
2854
# Convert the seed to a long if we can
2856
self.random_seed = long(self.random_seed)
2859
return self.random_seed
2862
class TestFirstDecorator(TestDecorator):
2863
"""A decorator which moves named tests to the front."""
2865
def __init__(self, suite, pattern):
2866
TestDecorator.__init__(self, suite)
2867
self.pattern = pattern
2868
self.filtered = False
2872
return iter(self._tests)
2873
self.filtered = True
2874
suites = split_suite_by_re(self, self.pattern)
2876
self.addTests(suites)
2877
return iter(self._tests)
2880
def partition_tests(suite, count):
2881
"""Partition suite into count lists of tests."""
2883
tests = list(iter_suite_tests(suite))
2884
tests_per_process = int(math.ceil(float(len(tests)) / count))
2885
for block in range(count):
2886
low_test = block * tests_per_process
2887
high_test = low_test + tests_per_process
2888
process_tests = tests[low_test:high_test]
2889
result.append(process_tests)
2893
def fork_for_tests(suite):
2894
"""Take suite and start up one runner per CPU by forking()
2896
:return: An iterable of TestCase-like objects which can each have
2897
run(result) called on them to feed tests to result.
2899
concurrency = local_concurrency()
2901
from subunit import TestProtocolClient, ProtocolTestCase
2902
class TestInOtherProcess(ProtocolTestCase):
2903
# Should be in subunit, I think. RBC.
2904
def __init__(self, stream, pid):
2905
ProtocolTestCase.__init__(self, stream)
2908
def run(self, result):
2910
ProtocolTestCase.run(self, result)
2912
os.waitpid(self.pid, os.WNOHANG)
2914
test_blocks = partition_tests(suite, concurrency)
2915
for process_tests in test_blocks:
2916
process_suite = TestSuite()
2917
process_suite.addTests(process_tests)
2918
c2pread, c2pwrite = os.pipe()
2923
# Leave stderr and stdout open so we can see test noise
2924
# Close stdin so that the child goes away if it decides to
2925
# read from stdin (otherwise its a roulette to see what
2926
# child actually gets keystrokes for pdb etc).
2929
stream = os.fdopen(c2pwrite, 'wb', 1)
2930
subunit_result = TestProtocolClient(stream)
2931
process_suite.run(subunit_result)
2936
stream = os.fdopen(c2pread, 'rb', 1)
2937
test = TestInOtherProcess(stream, pid)
2942
def reinvoke_for_tests(suite):
2943
"""Take suite and start up one runner per CPU using subprocess().
2945
:return: An iterable of TestCase-like objects which can each have
2946
run(result) called on them to feed tests to result.
2948
concurrency = local_concurrency()
2950
from subunit import TestProtocolClient, ProtocolTestCase
2951
class TestInSubprocess(ProtocolTestCase):
2952
def __init__(self, process, name):
2953
ProtocolTestCase.__init__(self, process.stdout)
2954
self.process = process
2955
self.process.stdin.close()
2958
def run(self, result):
2960
ProtocolTestCase.run(self, result)
2963
os.unlink(self.name)
2964
# print "pid %d finished" % finished_process
2965
test_blocks = partition_tests(suite, concurrency)
2966
for process_tests in test_blocks:
2967
# ugly; currently reimplement rather than reuses TestCase methods.
2968
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
2969
if not os.path.isfile(bzr_path):
2970
# We are probably installed. Assume sys.argv is the right file
2971
bzr_path = sys.argv[0]
2972
fd, test_list_file_name = tempfile.mkstemp()
2973
test_list_file = os.fdopen(fd, 'wb', 1)
2974
for test in process_tests:
2975
test_list_file.write(test.id() + '\n')
2976
test_list_file.close()
2978
argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
2980
if '--no-plugins' in sys.argv:
2981
argv.append('--no-plugins')
2982
# stderr=STDOUT would be ideal, but until we prevent noise on
2983
# stderr it can interrupt the subunit protocol.
2984
process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
2986
test = TestInSubprocess(process, test_list_file_name)
2989
os.unlink(test_list_file_name)
2994
def cpucount(content):
2995
lines = content.splitlines()
2996
prefix = 'processor'
2998
if line.startswith(prefix):
2999
concurrency = int(line[line.find(':')+1:]) + 1
3003
def local_concurrency():
3005
content = file('/proc/cpuinfo', 'rb').read()
3006
concurrency = cpucount(content)
3007
except Exception, e:
3012
class BZRTransformingResult(unittest.TestResult):
3014
def __init__(self, target):
3015
unittest.TestResult.__init__(self)
3016
self.result = target
3018
def startTest(self, test):
3019
self.result.startTest(test)
3021
def stopTest(self, test):
3022
self.result.stopTest(test)
3024
def addError(self, test, err):
3025
feature = self._error_looks_like('UnavailableFeature: ', err)
3026
if feature is not None:
3027
self.result.addNotSupported(test, feature)
3029
self.result.addError(test, err)
3031
def addFailure(self, test, err):
3032
known = self._error_looks_like('KnownFailure: ', err)
3033
if known is not None:
3034
self.result._addKnownFailure(test, [KnownFailure,
3035
KnownFailure(known), None])
3037
self.result.addFailure(test, err)
3039
def addSkip(self, test, reason):
3040
self.result.addSkip(test, reason)
3042
def addSuccess(self, test):
3043
self.result.addSuccess(test)
3045
def _error_looks_like(self, prefix, err):
3046
"""Deserialize exception and returns the stringify value."""
3050
if isinstance(exc, subunit.RemoteException):
3051
# stringify the exception gives access to the remote traceback
3052
# We search the last line for 'prefix'
3053
lines = str(exc).split('\n')
3054
while lines and not lines[-1]:
3057
if lines[-1].startswith(prefix):
3058
value = lines[-1][len(prefix):]
3062
# Controlled by "bzr selftest -E=..." option
3063
selftest_debug_flags = set()
3066
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3068
test_suite_factory=None,
3071
matching_tests_first=None,
3074
exclude_pattern=None,
3080
suite_decorators=None,
3082
"""Run the whole test suite under the enhanced runner"""
3083
# XXX: Very ugly way to do this...
3084
# Disable warning about old formats because we don't want it to disturb
3085
# any blackbox tests.
3086
from bzrlib import repository
3087
repository._deprecation_warning_done = True
3089
global default_transport
3090
if transport is None:
3091
transport = default_transport
3092
old_transport = default_transport
3093
default_transport = transport
3094
global selftest_debug_flags
3095
old_debug_flags = selftest_debug_flags
3096
if debug_flags is not None:
3097
selftest_debug_flags = set(debug_flags)
3099
if load_list is None:
3102
keep_only = load_test_id_list(load_list)
3103
if test_suite_factory is None:
3104
suite = test_suite(keep_only, starting_with)
3106
suite = test_suite_factory()
3107
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3108
stop_on_failure=stop_on_failure,
3109
transport=transport,
3110
lsprof_timed=lsprof_timed,
3111
bench_history=bench_history,
3112
matching_tests_first=matching_tests_first,
3113
list_only=list_only,
3114
random_seed=random_seed,
3115
exclude_pattern=exclude_pattern,
3117
runner_class=runner_class,
3118
suite_decorators=suite_decorators,
3121
default_transport = old_transport
3122
selftest_debug_flags = old_debug_flags
3125
def load_test_id_list(file_name):
3126
"""Load a test id list from a text file.
3128
The format is one test id by line. No special care is taken to impose
3129
strict rules, these test ids are used to filter the test suite so a test id
3130
that do not match an existing test will do no harm. This allows user to add
3131
comments, leave blank lines, etc.
3135
ftest = open(file_name, 'rt')
3137
if e.errno != errno.ENOENT:
3140
raise errors.NoSuchFile(file_name)
3142
for test_name in ftest.readlines():
3143
test_list.append(test_name.strip())
3148
def suite_matches_id_list(test_suite, id_list):
3149
"""Warns about tests not appearing or appearing more than once.
3151
:param test_suite: A TestSuite object.
3152
:param test_id_list: The list of test ids that should be found in
3155
:return: (absents, duplicates) absents is a list containing the test found
3156
in id_list but not in test_suite, duplicates is a list containing the
3157
test found multiple times in test_suite.
3159
When using a prefined test id list, it may occurs that some tests do not
3160
exist anymore or that some tests use the same id. This function warns the
3161
tester about potential problems in his workflow (test lists are volatile)
3162
or in the test suite itself (using the same id for several tests does not
3163
help to localize defects).
3165
# Build a dict counting id occurrences
3167
for test in iter_suite_tests(test_suite):
3169
tests[id] = tests.get(id, 0) + 1
3174
occurs = tests.get(id, 0)
3176
not_found.append(id)
3178
duplicates.append(id)
3180
return not_found, duplicates
3183
class TestIdList(object):
3184
"""Test id list to filter a test suite.
3186
Relying on the assumption that test ids are built as:
3187
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3188
notation, this class offers methods to :
3189
- avoid building a test suite for modules not refered to in the test list,
3190
- keep only the tests listed from the module test suite.
3193
def __init__(self, test_id_list):
3194
# When a test suite needs to be filtered against us we compare test ids
3195
# for equality, so a simple dict offers a quick and simple solution.
3196
self.tests = dict().fromkeys(test_id_list, True)
3198
# While unittest.TestCase have ids like:
3199
# <module>.<class>.<method>[(<param+)],
3200
# doctest.DocTestCase can have ids like:
3203
# <module>.<function>
3204
# <module>.<class>.<method>
3206
# Since we can't predict a test class from its name only, we settle on
3207
# a simple constraint: a test id always begins with its module name.
3210
for test_id in test_id_list:
3211
parts = test_id.split('.')
3212
mod_name = parts.pop(0)
3213
modules[mod_name] = True
3215
mod_name += '.' + part
3216
modules[mod_name] = True
3217
self.modules = modules
3219
def refers_to(self, module_name):
3220
"""Is there tests for the module or one of its sub modules."""
3221
return self.modules.has_key(module_name)
3223
def includes(self, test_id):
3224
return self.tests.has_key(test_id)
3227
class TestPrefixAliasRegistry(registry.Registry):
3228
"""A registry for test prefix aliases.
3230
This helps implement shorcuts for the --starting-with selftest
3231
option. Overriding existing prefixes is not allowed but not fatal (a
3232
warning will be emitted).
3235
def register(self, key, obj, help=None, info=None,
3236
override_existing=False):
3237
"""See Registry.register.
3239
Trying to override an existing alias causes a warning to be emitted,
3240
not a fatal execption.
3243
super(TestPrefixAliasRegistry, self).register(
3244
key, obj, help=help, info=info, override_existing=False)
3246
actual = self.get(key)
3247
note('Test prefix alias %s is already used for %s, ignoring %s'
3248
% (key, actual, obj))
3250
def resolve_alias(self, id_start):
3251
"""Replace the alias by the prefix in the given string.
3253
Using an unknown prefix is an error to help catching typos.
3255
parts = id_start.split('.')
3257
parts[0] = self.get(parts[0])
3259
raise errors.BzrCommandError(
3260
'%s is not a known test prefix alias' % parts[0])
3261
return '.'.join(parts)
3264
test_prefix_alias_registry = TestPrefixAliasRegistry()
3265
"""Registry of test prefix aliases."""
3268
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3269
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3270
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3272
# Obvious higest levels prefixes, feel free to add your own via a plugin
3273
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3274
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3275
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3276
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3277
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3280
def test_suite(keep_only=None, starting_with=None):
3281
"""Build and return TestSuite for the whole of bzrlib.
3283
:param keep_only: A list of test ids limiting the suite returned.
3285
:param starting_with: An id limiting the suite returned to the tests
3288
This function can be replaced if you need to change the default test
3289
suite on a global basis, but it is not encouraged.
3293
'bzrlib.tests.blackbox',
3294
'bzrlib.tests.branch_implementations',
3295
'bzrlib.tests.bzrdir_implementations',
3296
'bzrlib.tests.commands',
3297
'bzrlib.tests.interrepository_implementations',
3298
'bzrlib.tests.intertree_implementations',
3299
'bzrlib.tests.inventory_implementations',
3300
'bzrlib.tests.per_interbranch',
3301
'bzrlib.tests.per_lock',
3302
'bzrlib.tests.per_repository',
3303
'bzrlib.tests.per_repository_chk',
3304
'bzrlib.tests.per_repository_reference',
3305
'bzrlib.tests.test__chk_map',
3306
'bzrlib.tests.test__dirstate_helpers',
3307
'bzrlib.tests.test__groupcompress',
3308
'bzrlib.tests.test__walkdirs_win32',
3309
'bzrlib.tests.test_ancestry',
3310
'bzrlib.tests.test_annotate',
3311
'bzrlib.tests.test_api',
3312
'bzrlib.tests.test_atomicfile',
3313
'bzrlib.tests.test_bad_files',
3314
'bzrlib.tests.test_bisect_multi',
3315
'bzrlib.tests.test_branch',
3316
'bzrlib.tests.test_branchbuilder',
3317
'bzrlib.tests.test_btree_index',
3318
'bzrlib.tests.test_bugtracker',
3319
'bzrlib.tests.test_bundle',
3320
'bzrlib.tests.test_bzrdir',
3321
'bzrlib.tests.test__chunks_to_lines',
3322
'bzrlib.tests.test_cache_utf8',
3323
'bzrlib.tests.test_chk_map',
3324
'bzrlib.tests.test_chunk_writer',
3325
'bzrlib.tests.test_clean_tree',
3326
'bzrlib.tests.test_commands',
3327
'bzrlib.tests.test_commit',
3328
'bzrlib.tests.test_commit_merge',
3329
'bzrlib.tests.test_config',
3330
'bzrlib.tests.test_conflicts',
3331
'bzrlib.tests.test_counted_lock',
3332
'bzrlib.tests.test_decorators',
3333
'bzrlib.tests.test_delta',
3334
'bzrlib.tests.test_debug',
3335
'bzrlib.tests.test_deprecated_graph',
3336
'bzrlib.tests.test_diff',
3337
'bzrlib.tests.test_directory_service',
3338
'bzrlib.tests.test_dirstate',
3339
'bzrlib.tests.test_email_message',
3340
'bzrlib.tests.test_eol_filters',
3341
'bzrlib.tests.test_errors',
3342
'bzrlib.tests.test_export',
3343
'bzrlib.tests.test_extract',
3344
'bzrlib.tests.test_fetch',
3345
'bzrlib.tests.test_fifo_cache',
3346
'bzrlib.tests.test_filters',
3347
'bzrlib.tests.test_ftp_transport',
3348
'bzrlib.tests.test_foreign',
3349
'bzrlib.tests.test_generate_docs',
3350
'bzrlib.tests.test_generate_ids',
3351
'bzrlib.tests.test_globbing',
3352
'bzrlib.tests.test_gpg',
3353
'bzrlib.tests.test_graph',
3354
'bzrlib.tests.test_groupcompress',
3355
'bzrlib.tests.test_hashcache',
3356
'bzrlib.tests.test_help',
3357
'bzrlib.tests.test_hooks',
3358
'bzrlib.tests.test_http',
3359
'bzrlib.tests.test_http_implementations',
3360
'bzrlib.tests.test_http_response',
3361
'bzrlib.tests.test_https_ca_bundle',
3362
'bzrlib.tests.test_identitymap',
3363
'bzrlib.tests.test_ignores',
3364
'bzrlib.tests.test_index',
3365
'bzrlib.tests.test_info',
3366
'bzrlib.tests.test_inv',
3367
'bzrlib.tests.test_inventory_delta',
3368
'bzrlib.tests.test_knit',
3369
'bzrlib.tests.test_lazy_import',
3370
'bzrlib.tests.test_lazy_regex',
3371
'bzrlib.tests.test_lockable_files',
3372
'bzrlib.tests.test_lockdir',
3373
'bzrlib.tests.test_log',
3374
'bzrlib.tests.test_lru_cache',
3375
'bzrlib.tests.test_lsprof',
3376
'bzrlib.tests.test_mail_client',
3377
'bzrlib.tests.test_memorytree',
3378
'bzrlib.tests.test_merge',
3379
'bzrlib.tests.test_merge3',
3380
'bzrlib.tests.test_merge_core',
3381
'bzrlib.tests.test_merge_directive',
3382
'bzrlib.tests.test_missing',
3383
'bzrlib.tests.test_msgeditor',
3384
'bzrlib.tests.test_multiparent',
3385
'bzrlib.tests.test_mutabletree',
3386
'bzrlib.tests.test_nonascii',
3387
'bzrlib.tests.test_options',
3388
'bzrlib.tests.test_osutils',
3389
'bzrlib.tests.test_osutils_encodings',
3390
'bzrlib.tests.test_pack',
3391
'bzrlib.tests.test_pack_repository',
3392
'bzrlib.tests.test_patch',
3393
'bzrlib.tests.test_patches',
3394
'bzrlib.tests.test_permissions',
3395
'bzrlib.tests.test_plugins',
3396
'bzrlib.tests.test_progress',
3397
'bzrlib.tests.test_read_bundle',
3398
'bzrlib.tests.test_reconcile',
3399
'bzrlib.tests.test_reconfigure',
3400
'bzrlib.tests.test_registry',
3401
'bzrlib.tests.test_remote',
3402
'bzrlib.tests.test_rename_map',
3403
'bzrlib.tests.test_repository',
3404
'bzrlib.tests.test_revert',
3405
'bzrlib.tests.test_revision',
3406
'bzrlib.tests.test_revisionspec',
3407
'bzrlib.tests.test_revisiontree',
3408
'bzrlib.tests.test_rio',
3409
'bzrlib.tests.test_rules',
3410
'bzrlib.tests.test_sampler',
3411
'bzrlib.tests.test_selftest',
3412
'bzrlib.tests.test_serializer',
3413
'bzrlib.tests.test_setup',
3414
'bzrlib.tests.test_sftp_transport',
3415
'bzrlib.tests.test_shelf',
3416
'bzrlib.tests.test_shelf_ui',
3417
'bzrlib.tests.test_smart',
3418
'bzrlib.tests.test_smart_add',
3419
'bzrlib.tests.test_smart_request',
3420
'bzrlib.tests.test_smart_transport',
3421
'bzrlib.tests.test_smtp_connection',
3422
'bzrlib.tests.test_source',
3423
'bzrlib.tests.test_ssh_transport',
3424
'bzrlib.tests.test_status',
3425
'bzrlib.tests.test_store',
3426
'bzrlib.tests.test_strace',
3427
'bzrlib.tests.test_subsume',
3428
'bzrlib.tests.test_switch',
3429
'bzrlib.tests.test_symbol_versioning',
3430
'bzrlib.tests.test_tag',
3431
'bzrlib.tests.test_testament',
3432
'bzrlib.tests.test_textfile',
3433
'bzrlib.tests.test_textmerge',
3434
'bzrlib.tests.test_timestamp',
3435
'bzrlib.tests.test_trace',
3436
'bzrlib.tests.test_transactions',
3437
'bzrlib.tests.test_transform',
3438
'bzrlib.tests.test_transport',
3439
'bzrlib.tests.test_transport_implementations',
3440
'bzrlib.tests.test_transport_log',
3441
'bzrlib.tests.test_tree',
3442
'bzrlib.tests.test_treebuilder',
3443
'bzrlib.tests.test_tsort',
3444
'bzrlib.tests.test_tuned_gzip',
3445
'bzrlib.tests.test_ui',
3446
'bzrlib.tests.test_uncommit',
3447
'bzrlib.tests.test_upgrade',
3448
'bzrlib.tests.test_upgrade_stacked',
3449
'bzrlib.tests.test_urlutils',
3450
'bzrlib.tests.test_version',
3451
'bzrlib.tests.test_version_info',
3452
'bzrlib.tests.test_versionedfile',
3453
'bzrlib.tests.test_weave',
3454
'bzrlib.tests.test_whitebox',
3455
'bzrlib.tests.test_win32utils',
3456
'bzrlib.tests.test_workingtree',
3457
'bzrlib.tests.test_workingtree_4',
3458
'bzrlib.tests.test_wsgi',
3459
'bzrlib.tests.test_xml',
3460
'bzrlib.tests.tree_implementations',
3461
'bzrlib.tests.workingtree_implementations',
3462
'bzrlib.util.tests.test_bencode',
3465
loader = TestUtil.TestLoader()
3468
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3469
for start in starting_with]
3470
# We take precedence over keep_only because *at loading time* using
3471
# both options means we will load less tests for the same final result.
3472
def interesting_module(name):
3473
for start in starting_with:
3475
# Either the module name starts with the specified string
3476
name.startswith(start)
3477
# or it may contain tests starting with the specified string
3478
or start.startswith(name)
3482
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3484
elif keep_only is not None:
3485
id_filter = TestIdList(keep_only)
3486
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3487
def interesting_module(name):
3488
return id_filter.refers_to(name)
3491
loader = TestUtil.TestLoader()
3492
def interesting_module(name):
3493
# No filtering, all modules are interesting
3496
suite = loader.suiteClass()
3498
# modules building their suite with loadTestsFromModuleNames
3499
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
3501
modules_to_doctest = [
3503
'bzrlib.branchbuilder',
3506
'bzrlib.iterablefile',
3510
'bzrlib.symbol_versioning',
3513
'bzrlib.version_info_formats.format_custom',
3516
for mod in modules_to_doctest:
3517
if not interesting_module(mod):
3518
# No tests to keep here, move along
3521
# note that this really does mean "report only" -- doctest
3522
# still runs the rest of the examples
3523
doc_suite = doctest.DocTestSuite(mod,
3524
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
3525
except ValueError, e:
3526
print '**failed to get doctest for: %s\n%s' % (mod, e)
3528
if len(doc_suite._tests) == 0:
3529
raise errors.BzrError("no doctests found in %s" % (mod,))
3530
suite.addTest(doc_suite)
3532
default_encoding = sys.getdefaultencoding()
3533
for name, plugin in bzrlib.plugin.plugins().items():
3534
if not interesting_module(plugin.module.__name__):
3536
plugin_suite = plugin.test_suite()
3537
# We used to catch ImportError here and turn it into just a warning,
3538
# but really if you don't have --no-plugins this should be a failure.
3539
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3540
if plugin_suite is None:
3541
plugin_suite = plugin.load_plugin_tests(loader)
3542
if plugin_suite is not None:
3543
suite.addTest(plugin_suite)
3544
if default_encoding != sys.getdefaultencoding():
3545
bzrlib.trace.warning(
3546
'Plugin "%s" tried to reset default encoding to: %s', name,
3547
sys.getdefaultencoding())
3549
sys.setdefaultencoding(default_encoding)
3552
suite = filter_suite_by_id_startswith(suite, starting_with)
3554
if keep_only is not None:
3555
# Now that the referred modules have loaded their tests, keep only the
3557
suite = filter_suite_by_id_list(suite, id_filter)
3558
# Do some sanity checks on the id_list filtering
3559
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3561
# The tester has used both keep_only and starting_with, so he is
3562
# already aware that some tests are excluded from the list, there
3563
# is no need to tell him which.
3566
# Some tests mentioned in the list are not in the test suite. The
3567
# list may be out of date, report to the tester.
3568
for id in not_found:
3569
bzrlib.trace.warning('"%s" not found in the test suite', id)
3570
for id in duplicates:
3571
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
3576
def multiply_scenarios(scenarios_left, scenarios_right):
3577
"""Multiply two sets of scenarios.
3579
:returns: the cartesian product of the two sets of scenarios, that is
3580
a scenario for every possible combination of a left scenario and a
3584
('%s,%s' % (left_name, right_name),
3585
dict(left_dict.items() + right_dict.items()))
3586
for left_name, left_dict in scenarios_left
3587
for right_name, right_dict in scenarios_right]
3590
def multiply_tests(tests, scenarios, result):
3591
"""Multiply tests_list by scenarios into result.
3593
This is the core workhorse for test parameterisation.
3595
Typically the load_tests() method for a per-implementation test suite will
3596
call multiply_tests and return the result.
3598
:param tests: The tests to parameterise.
3599
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3600
scenario_param_dict).
3601
:param result: A TestSuite to add created tests to.
3603
This returns the passed in result TestSuite with the cross product of all
3604
the tests repeated once for each scenario. Each test is adapted by adding
3605
the scenario name at the end of its id(), and updating the test object's
3606
__dict__ with the scenario_param_dict.
3608
>>> import bzrlib.tests.test_sampler
3609
>>> r = multiply_tests(
3610
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3611
... [('one', dict(param=1)),
3612
... ('two', dict(param=2))],
3614
>>> tests = list(iter_suite_tests(r))
3618
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3624
for test in iter_suite_tests(tests):
3625
apply_scenarios(test, scenarios, result)
3629
def apply_scenarios(test, scenarios, result):
3630
"""Apply the scenarios in scenarios to test and add to result.
3632
:param test: The test to apply scenarios to.
3633
:param scenarios: An iterable of scenarios to apply to test.
3635
:seealso: apply_scenario
3637
for scenario in scenarios:
3638
result.addTest(apply_scenario(test, scenario))
3642
def apply_scenario(test, scenario):
3643
"""Copy test and apply scenario to it.
3645
:param test: A test to adapt.
3646
:param scenario: A tuple describing the scenarion.
3647
The first element of the tuple is the new test id.
3648
The second element is a dict containing attributes to set on the
3650
:return: The adapted test.
3652
new_id = "%s(%s)" % (test.id(), scenario[0])
3653
new_test = clone_test(test, new_id)
3654
for name, value in scenario[1].items():
3655
setattr(new_test, name, value)
3659
def clone_test(test, new_id):
3660
"""Clone a test giving it a new id.
3662
:param test: The test to clone.
3663
:param new_id: The id to assign to it.
3664
:return: The new test.
3666
from copy import deepcopy
3667
new_test = deepcopy(test)
3668
new_test.id = lambda: new_id
3672
def _rmtree_temp_dir(dirname):
3673
# If LANG=C we probably have created some bogus paths
3674
# which rmtree(unicode) will fail to delete
3675
# so make sure we are using rmtree(str) to delete everything
3676
# except on win32, where rmtree(str) will fail
3677
# since it doesn't have the property of byte-stream paths
3678
# (they are either ascii or mbcs)
3679
if sys.platform == 'win32':
3680
# make sure we are using the unicode win32 api
3681
dirname = unicode(dirname)
3683
dirname = dirname.encode(sys.getfilesystemencoding())
3685
osutils.rmtree(dirname)
3687
if sys.platform == 'win32' and e.errno == errno.EACCES:
3688
sys.stderr.write('Permission denied: '
3689
'unable to remove testing dir '
3691
% (os.path.basename(dirname), e))
3696
class Feature(object):
3697
"""An operating system Feature."""
3700
self._available = None
3702
def available(self):
3703
"""Is the feature available?
3705
:return: True if the feature is available.
3707
if self._available is None:
3708
self._available = self._probe()
3709
return self._available
3712
"""Implement this method in concrete features.
3714
:return: True if the feature is available.
3716
raise NotImplementedError
3719
if getattr(self, 'feature_name', None):
3720
return self.feature_name()
3721
return self.__class__.__name__
3724
class _SymlinkFeature(Feature):
3727
return osutils.has_symlinks()
3729
def feature_name(self):
3732
SymlinkFeature = _SymlinkFeature()
3735
class _HardlinkFeature(Feature):
3738
return osutils.has_hardlinks()
3740
def feature_name(self):
3743
HardlinkFeature = _HardlinkFeature()
3746
class _OsFifoFeature(Feature):
3749
return getattr(os, 'mkfifo', None)
3751
def feature_name(self):
3752
return 'filesystem fifos'
3754
OsFifoFeature = _OsFifoFeature()
3757
class _UnicodeFilenameFeature(Feature):
3758
"""Does the filesystem support Unicode filenames?"""
3762
# Check for character combinations unlikely to be covered by any
3763
# single non-unicode encoding. We use the characters
3764
# - greek small letter alpha (U+03B1) and
3765
# - braille pattern dots-123456 (U+283F).
3766
os.stat(u'\u03b1\u283f')
3767
except UnicodeEncodeError:
3769
except (IOError, OSError):
3770
# The filesystem allows the Unicode filename but the file doesn't
3774
# The filesystem allows the Unicode filename and the file exists,
3778
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3781
def probe_unicode_in_user_encoding():
3782
"""Try to encode several unicode strings to use in unicode-aware tests.
3783
Return first successfull match.
3785
:return: (unicode value, encoded plain string value) or (None, None)
3787
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3788
for uni_val in possible_vals:
3790
str_val = uni_val.encode(osutils.get_user_encoding())
3791
except UnicodeEncodeError:
3792
# Try a different character
3795
return uni_val, str_val
3799
def probe_bad_non_ascii(encoding):
3800
"""Try to find [bad] character with code [128..255]
3801
that cannot be decoded to unicode in some encoding.
3802
Return None if all non-ascii characters is valid
3805
for i in xrange(128, 256):
3808
char.decode(encoding)
3809
except UnicodeDecodeError:
3814
class _HTTPSServerFeature(Feature):
3815
"""Some tests want an https Server, check if one is available.
3817
Right now, the only way this is available is under python2.6 which provides
3828
def feature_name(self):
3829
return 'HTTPSServer'
3832
HTTPSServerFeature = _HTTPSServerFeature()
3835
class _UnicodeFilename(Feature):
3836
"""Does the filesystem support Unicode filenames?"""
3841
except UnicodeEncodeError:
3843
except (IOError, OSError):
3844
# The filesystem allows the Unicode filename but the file doesn't
3848
# The filesystem allows the Unicode filename and the file exists,
3852
UnicodeFilename = _UnicodeFilename()
3855
class _UTF8Filesystem(Feature):
3856
"""Is the filesystem UTF-8?"""
3859
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
3863
UTF8Filesystem = _UTF8Filesystem()
3866
class _CaseInsCasePresFilenameFeature(Feature):
3867
"""Is the file-system case insensitive, but case-preserving?"""
3870
fileno, name = tempfile.mkstemp(prefix='MixedCase')
3872
# first check truly case-preserving for created files, then check
3873
# case insensitive when opening existing files.
3874
name = osutils.normpath(name)
3875
base, rel = osutils.split(name)
3876
found_rel = osutils.canonical_relpath(base, name)
3877
return (found_rel == rel
3878
and os.path.isfile(name.upper())
3879
and os.path.isfile(name.lower()))
3884
def feature_name(self):
3885
return "case-insensitive case-preserving filesystem"
3887
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
3890
class _CaseInsensitiveFilesystemFeature(Feature):
3891
"""Check if underlying filesystem is case-insensitive but *not* case
3894
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
3895
# more likely to be case preserving, so this case is rare.
3898
if CaseInsCasePresFilenameFeature.available():
3901
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3902
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3903
TestCaseWithMemoryTransport.TEST_ROOT = root
3905
root = TestCaseWithMemoryTransport.TEST_ROOT
3906
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3908
name_a = osutils.pathjoin(tdir, 'a')
3909
name_A = osutils.pathjoin(tdir, 'A')
3911
result = osutils.isdir(name_A)
3912
_rmtree_temp_dir(tdir)
3915
def feature_name(self):
3916
return 'case-insensitive filesystem'
3918
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
3921
class _SubUnitFeature(Feature):
3922
"""Check if subunit is available."""
3931
def feature_name(self):
3934
SubUnitFeature = _SubUnitFeature()
3935
# Only define SubUnitBzrRunner if subunit is available.
3937
from subunit import TestProtocolClient
3938
class SubUnitBzrRunner(TextTestRunner):
3939
def run(self, test):
3940
result = TestProtocolClient(self.stream)