1
# Copyright (C) 2005 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
from cStringIO import StringIO
39
import bzrlib.commands
40
from bzrlib.errors import BzrError
41
import bzrlib.inventory
44
import bzrlib.osutils as osutils
48
from bzrlib.transport import urlescape
49
import bzrlib.transport
50
from bzrlib.trace import mutter
51
from bzrlib.tests.TestUtil import TestLoader, TestSuite
52
from bzrlib.tests.treeshape import build_tree_contents
55
MODULES_TO_DOCTEST = [
64
def packages_to_test():
65
import bzrlib.tests.blackbox
71
class EarlyStoppingTestResultAdapter(object):
72
"""An adapter for TestResult to stop at the first first failure or error"""
74
def __init__(self, result):
77
def addError(self, test, err):
78
if (isinstance(err[1], TestSkipped) and
79
getattr(self, "addSkipped", None) is not None):
80
return self.addSkipped(test, err)
81
self._result.addError(test, err)
84
def addFailure(self, test, err):
85
self._result.addFailure(test, err)
88
def __getattr__(self, name):
89
return getattr(self._result, name)
91
def __setattr__(self, name, value):
93
object.__setattr__(self, name, value)
94
return setattr(self._result, name, value)
97
class _MyResult(unittest._TextTestResult):
100
Shows output in a different format, including displaying runtime for tests.
103
def _elapsedTime(self):
104
return "%5dms" % (1000 * (time.time() - self._start_time))
106
def startTest(self, test):
107
unittest.TestResult.startTest(self, test)
108
# In a short description, the important words are in
109
# the beginning, but in an id, the important words are
111
SHOW_DESCRIPTIONS = False
113
width = osutils.terminal_width()
114
name_width = width - 15
116
if SHOW_DESCRIPTIONS:
117
what = test.shortDescription()
119
if len(what) > name_width:
120
what = what[:name_width-3] + '...'
123
if what.startswith('bzrlib.tests.'):
125
if len(what) > name_width:
126
what = '...' + what[3-name_width:]
127
what = what.ljust(name_width)
128
self.stream.write(what)
130
self._start_time = time.time()
132
def addError(self, test, err):
133
if isinstance(err[1], TestSkipped):
134
return self.addSkipped(test, err)
135
unittest.TestResult.addError(self, test, err)
137
self.stream.writeln("ERROR %s" % self._elapsedTime())
139
self.stream.write('E')
142
def addFailure(self, test, err):
143
unittest.TestResult.addFailure(self, test, err)
145
self.stream.writeln(" FAIL %s" % self._elapsedTime())
147
self.stream.write('F')
150
def addSuccess(self, test):
152
self.stream.writeln(' OK %s' % self._elapsedTime())
154
self.stream.write('~')
156
unittest.TestResult.addSuccess(self, test)
158
def addSkipped(self, test, skip_excinfo):
160
print >>self.stream, ' SKIP %s' % self._elapsedTime()
161
print >>self.stream, ' %s' % skip_excinfo[1]
163
self.stream.write('S')
165
# seems best to treat this as success from point-of-view of unittest
166
# -- it actually does nothing so it barely matters :)
167
unittest.TestResult.addSuccess(self, test)
169
def printErrorList(self, flavour, errors):
170
for test, err in errors:
171
self.stream.writeln(self.separator1)
172
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
173
if getattr(test, '_get_log', None) is not None:
175
print >>self.stream, \
176
('vvvv[log from %s]' % test.id()).ljust(78,'-')
177
print >>self.stream, test._get_log()
178
print >>self.stream, \
179
('^^^^[log from %s]' % test.id()).ljust(78,'-')
180
self.stream.writeln(self.separator2)
181
self.stream.writeln("%s" % err)
184
class TextTestRunner(unittest.TextTestRunner):
185
stop_on_failure = False
187
def _makeResult(self):
188
result = _MyResult(self.stream, self.descriptions, self.verbosity)
189
if self.stop_on_failure:
190
result = EarlyStoppingTestResultAdapter(result)
194
def iter_suite_tests(suite):
195
"""Return all tests in a suite, recursing through nested suites"""
196
for item in suite._tests:
197
if isinstance(item, unittest.TestCase):
199
elif isinstance(item, unittest.TestSuite):
200
for r in iter_suite_tests(item):
203
raise Exception('unknown object %r inside test suite %r'
207
class TestSkipped(Exception):
208
"""Indicates that a test was intentionally skipped, rather than failing."""
212
class CommandFailed(Exception):
215
class TestCase(unittest.TestCase):
216
"""Base class for bzr unit tests.
218
Tests that need access to disk resources should subclass
219
TestCaseInTempDir not TestCase.
221
Error and debug log messages are redirected from their usual
222
location into a temporary file, the contents of which can be
223
retrieved by _get_log(). We use a real OS file, not an in-memory object,
224
so that it can also capture file IO. When the test completes this file
225
is read into memory and removed from disk.
227
There are also convenience functions to invoke bzr's command-line
228
routine, and to build and check bzr trees.
230
In addition to the usual method of overriding tearDown(), this class also
231
allows subclasses to register functions into the _cleanups list, which is
232
run in order as the object is torn down. It's less likely this will be
233
accidentally overlooked.
237
_log_file_name = None
241
unittest.TestCase.setUp(self)
243
self._cleanEnvironment()
244
bzrlib.trace.disable_default_logging()
247
def _ndiff_strings(self, a, b):
248
"""Return ndiff between two strings containing lines.
250
A trailing newline is added if missing to make the strings
252
if b and b[-1] != '\n':
254
if a and a[-1] != '\n':
256
difflines = difflib.ndiff(a.splitlines(True),
258
linejunk=lambda x: False,
259
charjunk=lambda x: False)
260
return ''.join(difflines)
262
def assertEqualDiff(self, a, b):
263
"""Assert two texts are equal, if not raise an exception.
265
This is intended for use with multi-line strings where it can
266
be hard to find the differences by eye.
268
# TODO: perhaps override assertEquals to call this for strings?
271
raise AssertionError("texts not equal:\n" +
272
self._ndiff_strings(a, b))
274
def assertStartsWith(self, s, prefix):
275
if not s.startswith(prefix):
276
raise AssertionError('string %r does not start with %r' % (s, prefix))
278
def assertEndsWith(self, s, suffix):
279
if not s.endswith(prefix):
280
raise AssertionError('string %r does not end with %r' % (s, suffix))
282
def assertContainsRe(self, haystack, needle_re):
283
"""Assert that a contains something matching a regular expression."""
284
if not re.search(needle_re, haystack):
285
raise AssertionError('pattern "%s" not found in "%s"'
286
% (needle_re, haystack))
288
def AssertSubset(self, sublist, superlist):
289
"""Assert that every entry in sublist is present in superlist."""
291
for entry in sublist:
292
if entry not in superlist:
293
missing.append(entry)
295
raise AssertionError("value(s) %r not present in container %r" %
296
(missing, superlist))
298
def assertTransportMode(self, transport, path, mode):
299
"""Fail if a path does not have mode mode.
301
If modes are not supported on this platform, the test is skipped.
303
if sys.platform == 'win32':
305
path_stat = transport.stat(path)
306
actual_mode = stat.S_IMODE(path_stat.st_mode)
307
self.assertEqual(mode, actual_mode,
308
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
310
def _startLogFile(self):
311
"""Send bzr and test log messages to a temporary file.
313
The file is removed as the test is torn down.
315
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
316
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
317
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
318
bzrlib.trace.enable_test_log(self._log_file)
319
self._log_file_name = name
320
self.addCleanup(self._finishLogFile)
322
def _finishLogFile(self):
323
"""Finished with the log file.
325
Read contents into memory, close, and delete.
327
bzrlib.trace.disable_test_log()
328
self._log_file.seek(0)
329
self._log_contents = self._log_file.read()
330
self._log_file.close()
331
os.remove(self._log_file_name)
332
self._log_file = self._log_file_name = None
334
def addCleanup(self, callable):
335
"""Arrange to run a callable when this case is torn down.
337
Callables are run in the reverse of the order they are registered,
338
ie last-in first-out.
340
if callable in self._cleanups:
341
raise ValueError("cleanup function %r already registered on %s"
343
self._cleanups.append(callable)
345
def _cleanEnvironment(self):
348
'APPDATA': os.getcwd(),
353
self.addCleanup(self._restoreEnvironment)
354
for name, value in new_env.iteritems():
355
self._captureVar(name, value)
358
def _captureVar(self, name, newvalue):
359
"""Set an environment variable, preparing it to be reset when finished."""
360
self.__old_env[name] = os.environ.get(name, None)
362
if name in os.environ:
365
os.environ[name] = newvalue
368
def _restoreVar(name, value):
370
if name in os.environ:
373
os.environ[name] = value
375
def _restoreEnvironment(self):
376
for name, value in self.__old_env.iteritems():
377
self._restoreVar(name, value)
381
unittest.TestCase.tearDown(self)
383
def _runCleanups(self):
384
"""Run registered cleanup functions.
386
This should only be called from TestCase.tearDown.
388
for cleanup_fn in reversed(self._cleanups):
391
def log(self, *args):
395
"""Return as a string the log for this test"""
396
if self._log_file_name:
397
return open(self._log_file_name).read()
399
return self._log_contents
400
# TODO: Delete the log after it's been read in
402
def capture(self, cmd, retcode=0):
403
"""Shortcut that splits cmd into words, runs, and returns stdout"""
404
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
406
def run_bzr_captured(self, argv, retcode=0):
407
"""Invoke bzr and return (stdout, stderr).
409
Useful for code that wants to check the contents of the
410
output, the way error messages are presented, etc.
412
This should be the main method for tests that want to exercise the
413
overall behavior of the bzr application (rather than a unit test
414
or a functional test of the library.)
416
Much of the old code runs bzr by forking a new copy of Python, but
417
that is slower, harder to debug, and generally not necessary.
419
This runs bzr through the interface that catches and reports
420
errors, and with logging set to something approximating the
421
default, so that error reporting can be checked.
423
argv -- arguments to invoke bzr
424
retcode -- expected return code, or None for don't-care.
428
self.log('run bzr: %s', ' '.join(argv))
429
# FIXME: don't call into logging here
430
handler = logging.StreamHandler(stderr)
431
handler.setFormatter(bzrlib.trace.QuietFormatter())
432
handler.setLevel(logging.INFO)
433
logger = logging.getLogger('')
434
logger.addHandler(handler)
436
result = self.apply_redirected(None, stdout, stderr,
437
bzrlib.commands.run_bzr_catch_errors,
440
logger.removeHandler(handler)
441
out = stdout.getvalue()
442
err = stderr.getvalue()
444
self.log('output:\n%s', out)
446
self.log('errors:\n%s', err)
447
if retcode is not None:
448
self.assertEquals(result, retcode)
451
def run_bzr(self, *args, **kwargs):
452
"""Invoke bzr, as if it were run from the command line.
454
This should be the main method for tests that want to exercise the
455
overall behavior of the bzr application (rather than a unit test
456
or a functional test of the library.)
458
This sends the stdout/stderr results into the test's log,
459
where it may be useful for debugging. See also run_captured.
461
retcode = kwargs.pop('retcode', 0)
462
return self.run_bzr_captured(args, retcode)
464
def check_inventory_shape(self, inv, shape):
465
"""Compare an inventory to a list of expected names.
467
Fail if they are not precisely equal.
470
shape = list(shape) # copy
471
for path, ie in inv.entries():
472
name = path.replace('\\', '/')
480
self.fail("expected paths not found in inventory: %r" % shape)
482
self.fail("unexpected paths found in inventory: %r" % extras)
484
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
485
a_callable=None, *args, **kwargs):
486
"""Call callable with redirected std io pipes.
488
Returns the return code."""
489
if not callable(a_callable):
490
raise ValueError("a_callable must be callable.")
494
if getattr(self, "_log_file", None) is not None:
495
stdout = self._log_file
499
if getattr(self, "_log_file", None is not None):
500
stderr = self._log_file
503
real_stdin = sys.stdin
504
real_stdout = sys.stdout
505
real_stderr = sys.stderr
510
return a_callable(*args, **kwargs)
512
sys.stdout = real_stdout
513
sys.stderr = real_stderr
514
sys.stdin = real_stdin
517
BzrTestBase = TestCase
520
class TestCaseInTempDir(TestCase):
521
"""Derived class that runs a test within a temporary directory.
523
This is useful for tests that need to create a branch, etc.
525
The directory is created in a slightly complex way: for each
526
Python invocation, a new temporary top-level directory is created.
527
All test cases create their own directory within that. If the
528
tests complete successfully, the directory is removed.
530
InTempDir is an old alias for FunctionalTestCase.
535
OVERRIDE_PYTHON = 'python'
537
def check_file_contents(self, filename, expect):
538
self.log("check contents of file %s" % filename)
539
contents = file(filename, 'r').read()
540
if contents != expect:
541
self.log("expected: %r" % expect)
542
self.log("actually: %r" % contents)
543
self.fail("contents of %s not as expected" % filename)
545
def _make_test_root(self):
546
if TestCaseInTempDir.TEST_ROOT is not None:
550
root = u'test%04d.tmp' % i
554
if e.errno == errno.EEXIST:
559
# successfully created
560
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
562
# make a fake bzr directory there to prevent any tests propagating
563
# up onto the source directory's real branch
564
os.mkdir(osutils.pathjoin(TestCaseInTempDir.TEST_ROOT, '.bzr'))
567
super(TestCaseInTempDir, self).setUp()
568
self._make_test_root()
569
_currentdir = os.getcwdu()
570
short_id = self.id().replace('bzrlib.tests.', '') \
571
.replace('__main__.', '')
572
self.test_dir = osutils.pathjoin(self.TEST_ROOT, short_id)
573
os.mkdir(self.test_dir)
574
os.chdir(self.test_dir)
575
os.environ['HOME'] = self.test_dir
576
os.environ['APPDATA'] = self.test_dir
577
def _leaveDirectory():
578
os.chdir(_currentdir)
579
self.addCleanup(_leaveDirectory)
581
def build_tree(self, shape, line_endings='native', transport=None):
582
"""Build a test tree according to a pattern.
584
shape is a sequence of file specifications. If the final
585
character is '/', a directory is created.
587
This doesn't add anything to a branch.
588
:param line_endings: Either 'binary' or 'native'
589
in binary mode, exact contents are written
590
in native mode, the line endings match the
591
default platform endings.
593
:param transport: A transport to write to, for building trees on
594
VFS's. If the transport is readonly or None,
595
"." is opened automatically.
597
# XXX: It's OK to just create them using forward slashes on windows?
598
if transport is None or transport.is_readonly():
599
transport = bzrlib.transport.get_transport(".")
601
self.assert_(isinstance(name, basestring))
603
transport.mkdir(urlescape(name[:-1]))
605
if line_endings == 'binary':
607
elif line_endings == 'native':
610
raise BzrError('Invalid line ending request %r' % (line_endings,))
611
content = "contents of %s%s" % (name, end)
612
transport.put(urlescape(name), StringIO(content))
614
def build_tree_contents(self, shape):
615
build_tree_contents(shape)
617
def failUnlessExists(self, path):
618
"""Fail unless path, which may be abs or relative, exists."""
619
self.failUnless(osutils.lexists(path))
621
def failIfExists(self, path):
622
"""Fail if path, which may be abs or relative, exists."""
623
self.failIf(osutils.lexists(path))
625
def assertFileEqual(self, content, path):
626
"""Fail if path does not contain 'content'."""
627
self.failUnless(osutils.lexists(path))
628
self.assertEqualDiff(content, open(path, 'r').read())
631
def filter_suite_by_re(suite, pattern):
633
filter_re = re.compile(pattern)
634
for test in iter_suite_tests(suite):
635
if filter_re.search(test.id()):
640
def run_suite(suite, name='test', verbose=False, pattern=".*",
641
stop_on_failure=False, keep_output=False):
642
TestCaseInTempDir._TEST_NAME = name
647
runner = TextTestRunner(stream=sys.stdout,
650
runner.stop_on_failure=stop_on_failure
652
suite = filter_suite_by_re(suite, pattern)
653
result = runner.run(suite)
654
# This is still a little bogus,
655
# but only a little. Folk not using our testrunner will
656
# have to delete their temp directories themselves.
657
if result.wasSuccessful() or not keep_output:
658
if TestCaseInTempDir.TEST_ROOT is not None:
659
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
661
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
662
return result.wasSuccessful()
665
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
667
"""Run the whole test suite under the enhanced runner"""
668
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
669
stop_on_failure=stop_on_failure, keep_output=keep_output)
673
"""Build and return TestSuite for the whole program."""
674
from doctest import DocTestSuite
676
global MODULES_TO_DOCTEST
679
'bzrlib.tests.test_ancestry',
680
'bzrlib.tests.test_annotate',
681
'bzrlib.tests.test_api',
682
'bzrlib.tests.test_bad_files',
683
'bzrlib.tests.test_basis_inventory',
684
'bzrlib.tests.test_branch',
685
'bzrlib.tests.test_command',
686
'bzrlib.tests.test_commit',
687
'bzrlib.tests.test_commit_merge',
688
'bzrlib.tests.test_config',
689
'bzrlib.tests.test_conflicts',
690
'bzrlib.tests.test_diff',
691
'bzrlib.tests.test_fetch',
692
'bzrlib.tests.test_gpg',
693
'bzrlib.tests.test_graph',
694
'bzrlib.tests.test_hashcache',
695
'bzrlib.tests.test_http',
696
'bzrlib.tests.test_identitymap',
697
'bzrlib.tests.test_inv',
698
'bzrlib.tests.test_log',
699
'bzrlib.tests.test_merge',
700
'bzrlib.tests.test_merge3',
701
'bzrlib.tests.test_merge_core',
702
'bzrlib.tests.test_missing',
703
'bzrlib.tests.test_msgeditor',
704
'bzrlib.tests.test_nonascii',
705
'bzrlib.tests.test_options',
706
'bzrlib.tests.test_osutils',
707
'bzrlib.tests.test_parent',
708
'bzrlib.tests.test_permissions',
709
'bzrlib.tests.test_plugins',
710
'bzrlib.tests.test_remove',
711
'bzrlib.tests.test_revision',
712
'bzrlib.tests.test_revisionnamespaces',
713
'bzrlib.tests.test_revprops',
714
'bzrlib.tests.test_reweave',
715
'bzrlib.tests.test_rio',
716
'bzrlib.tests.test_sampler',
717
'bzrlib.tests.test_selftest',
718
'bzrlib.tests.test_setup',
719
'bzrlib.tests.test_sftp_transport',
720
'bzrlib.tests.test_smart_add',
721
'bzrlib.tests.test_source',
722
'bzrlib.tests.test_status',
723
'bzrlib.tests.test_store',
724
'bzrlib.tests.test_symbol_versioning',
725
'bzrlib.tests.test_testament',
726
'bzrlib.tests.test_trace',
727
'bzrlib.tests.test_transactions',
728
'bzrlib.tests.test_transport',
729
'bzrlib.tests.test_tsort',
730
'bzrlib.tests.test_ui',
731
'bzrlib.tests.test_uncommit',
732
'bzrlib.tests.test_upgrade',
733
'bzrlib.tests.test_weave',
734
'bzrlib.tests.test_whitebox',
735
'bzrlib.tests.test_workingtree',
736
'bzrlib.tests.test_xml',
738
test_branch_implementations = [
739
'bzrlib.tests.test_branch_implementations']
740
test_transport_implementations = [
741
'bzrlib.tests.test_transport_implementations']
743
TestCase.BZRPATH = osutils.pathjoin(
744
osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
745
print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
746
print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
749
# python2.4's TestLoader.loadTestsFromNames gives very poor
750
# errors if it fails to load a named module - no indication of what's
751
# actually wrong, just "no such module". We should probably override that
752
# class, but for the moment just load them ourselves. (mbp 20051202)
753
loader = TestLoader()
754
from bzrlib.transport import TransportTestProviderAdapter
755
adapter = TransportTestProviderAdapter()
756
for mod_name in test_transport_implementations:
757
mod = _load_module_by_name(mod_name)
758
for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
759
suite.addTests(adapter.adapt(test))
760
from bzrlib.branch import BranchTestProviderAdapter
761
adapter = BranchTestProviderAdapter(
762
bzrlib.transport.local.LocalRelpathServer,
763
bzrlib.transport.local.LocalRelpathServer,
764
bzrlib.branch.BzrBranchFormat._formats.values())
765
for mod_name in test_branch_implementations:
766
mod = _load_module_by_name(mod_name)
767
for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
768
suite.addTests(adapter.adapt(test))
769
for mod_name in testmod_names:
770
mod = _load_module_by_name(mod_name)
771
suite.addTest(loader.loadTestsFromModule(mod))
772
for package in packages_to_test():
773
suite.addTest(package.test_suite())
774
for m in MODULES_TO_TEST:
775
suite.addTest(loader.loadTestsFromModule(m))
776
for m in (MODULES_TO_DOCTEST):
777
suite.addTest(DocTestSuite(m))
778
for name, plugin in bzrlib.plugin.all_plugins().items():
779
if getattr(plugin, 'test_suite', None) is not None:
780
suite.addTest(plugin.test_suite())
784
def _load_module_by_name(mod_name):
785
parts = mod_name.split('.')
786
module = __import__(mod_name)
788
# for historical reasons python returns the top-level module even though
789
# it loads the submodule; we need to walk down to get the one we want.
791
module = getattr(module, parts.pop(0))