1
# Copyright (C) 2005 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from cStringIO import StringIO
30
import bzrlib.commands
32
import bzrlib.osutils as osutils
33
from bzrlib.trace import mutter
34
from bzrlib.selftest import TestUtil
35
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
36
from bzrlib.selftest.treeshape import build_tree_contents
37
from bzrlib.errors import BzrError
40
MODULES_TO_DOCTEST = []
44
class EarlyStoppingTestResultAdapter(object):
45
"""An adapter for TestResult to stop at the first first failure or error"""
47
def __init__(self, result):
50
def addError(self, test, err):
51
self._result.addError(test, err)
54
def addFailure(self, test, err):
55
self._result.addFailure(test, err)
58
def __getattr__(self, name):
59
return getattr(self._result, name)
61
def __setattr__(self, name, value):
63
object.__setattr__(self, name, value)
64
return setattr(self._result, name, value)
67
class _MyResult(unittest._TextTestResult):
70
No special behaviour for now.
73
# assumes 80-column window, less 'ERROR 99999ms' = 13ch
74
def _elapsedTime(self):
75
return "%5dms" % (1000 * (time.time() - self._start_time))
77
def startTest(self, test):
78
unittest.TestResult.startTest(self, test)
79
# TODO: Maybe show test.shortDescription somewhere?
80
what = test.shortDescription() or test.id()
81
pref = 'bzrlib.selftest.'
82
if what.startswith(pref):
83
what = what[len(pref):]
85
self.stream.write('%-65.65s' % what)
87
self._start_time = time.time()
89
def addError(self, test, err):
90
unittest.TestResult.addError(self, test, err)
92
self.stream.writeln("ERROR %s" % self._elapsedTime())
94
self.stream.write('E')
97
def addFailure(self, test, err):
98
unittest.TestResult.addFailure(self, test, err)
100
self.stream.writeln(" FAIL %s" % self._elapsedTime())
102
self.stream.write('F')
105
def addSuccess(self, test):
107
self.stream.writeln(' OK %s' % self._elapsedTime())
109
self.stream.write('~')
111
unittest.TestResult.addSuccess(self, test)
113
def printErrorList(self, flavour, errors):
114
for test, err in errors:
115
self.stream.writeln(self.separator1)
116
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
117
if hasattr(test, '_get_log'):
118
self.stream.writeln()
119
self.stream.writeln('log from this test:')
120
print >>self.stream, test._get_log()
121
self.stream.writeln(self.separator2)
122
self.stream.writeln("%s" % err)
125
class TextTestRunner(unittest.TextTestRunner):
126
stop_on_failure = False
128
def _makeResult(self):
129
result = _MyResult(self.stream, self.descriptions, self.verbosity)
130
if self.stop_on_failure:
131
result = EarlyStoppingTestResultAdapter(result)
135
def iter_suite_tests(suite):
136
"""Return all tests in a suite, recursing through nested suites"""
137
for item in suite._tests:
138
if isinstance(item, unittest.TestCase):
140
elif isinstance(item, unittest.TestSuite):
141
for r in iter_suite_tests(item):
144
raise Exception('unknown object %r inside test suite %r'
148
class TestSkipped(Exception):
149
"""Indicates that a test was intentionally skipped, rather than failing."""
153
class CommandFailed(Exception):
156
class TestCase(unittest.TestCase):
157
"""Base class for bzr unit tests.
159
Tests that need access to disk resources should subclass
160
TestCaseInTempDir not TestCase.
162
Error and debug log messages are redirected from their usual
163
location into a temporary file, the contents of which can be
164
retrieved by _get_log(). We use a real OS file, not an in-memory object,
165
so that it can also capture file IO. When the test completes this file
166
is read into memory and removed from disk.
168
There are also convenience functions to invoke bzr's command-line
169
routine, and to build and check bzr trees.
171
In addition to the usual method of overriding tearDown(), this class also
172
allows subclasses to register functions into the _cleanups list, which is
173
run in order as the object is torn down. It's less likely this will be
174
accidentally overlooked.
178
_log_file_name = None
182
unittest.TestCase.setUp(self)
184
self._cleanEnvironment()
185
bzrlib.trace.disable_default_logging()
188
def _ndiff_strings(self, a, b):
189
"""Return ndiff between two strings containing lines.
191
A trailing newline is added if missing to make the strings
193
if b and b[-1] != '\n':
195
if a and a[-1] != '\n':
197
difflines = difflib.ndiff(a.splitlines(True),
199
linejunk=lambda x: False,
200
charjunk=lambda x: False)
201
return ''.join(difflines)
203
def assertEqualDiff(self, a, b):
204
"""Assert two texts are equal, if not raise an exception.
206
This is intended for use with multi-line strings where it can
207
be hard to find the differences by eye.
209
# TODO: perhaps override assertEquals to call this for strings?
212
raise AssertionError("texts not equal:\n" +
213
self._ndiff_strings(a, b))
215
def assertContainsRe(self, haystack, needle_re):
216
"""Assert that a contains something matching a regular expression."""
217
if not re.search(needle_re, haystack):
218
raise AssertionError('pattern "%s" not found in "%s"'
219
% (needle_re, haystack))
221
def _startLogFile(self):
222
"""Send bzr and test log messages to a temporary file.
224
The file is removed as the test is torn down.
226
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
227
self._log_file = os.fdopen(fileno, 'w+')
228
bzrlib.trace.enable_test_log(self._log_file)
229
self._log_file_name = name
230
self.addCleanup(self._finishLogFile)
232
def _finishLogFile(self):
233
"""Finished with the log file.
235
Read contents into memory, close, and delete.
237
bzrlib.trace.disable_test_log()
238
self._log_file.seek(0)
239
self._log_contents = self._log_file.read()
240
self._log_file.close()
241
os.remove(self._log_file_name)
242
self._log_file = self._log_file_name = None
244
def addCleanup(self, callable):
245
"""Arrange to run a callable when this case is torn down.
247
Callables are run in the reverse of the order they are registered,
248
ie last-in first-out.
250
if callable in self._cleanups:
251
raise ValueError("cleanup function %r already registered on %s"
253
self._cleanups.append(callable)
255
def _cleanEnvironment(self):
258
'APPDATA': os.getcwd(),
263
self.addCleanup(self._restoreEnvironment)
264
for name, value in new_env.iteritems():
265
self._captureVar(name, value)
268
def _captureVar(self, name, newvalue):
269
"""Set an environment variable, preparing it to be reset when finished."""
270
self.__old_env[name] = os.environ.get(name, None)
272
if name in os.environ:
275
os.environ[name] = newvalue
278
def _restoreVar(name, value):
280
if name in os.environ:
283
os.environ[name] = value
285
def _restoreEnvironment(self):
286
for name, value in self.__old_env.iteritems():
287
self._restoreVar(name, value)
291
unittest.TestCase.tearDown(self)
293
def _runCleanups(self):
294
"""Run registered cleanup functions.
296
This should only be called from TestCase.tearDown.
298
for callable in reversed(self._cleanups):
301
def log(self, *args):
305
"""Return as a string the log for this test"""
306
if self._log_file_name:
307
return open(self._log_file_name).read()
309
return self._log_contents
310
# TODO: Delete the log after it's been read in
312
def capture(self, cmd, retcode=0):
313
"""Shortcut that splits cmd into words, runs, and returns stdout"""
314
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
316
def run_bzr_captured(self, argv, retcode=0):
317
"""Invoke bzr and return (stdout, stderr).
319
Useful for code that wants to check the contents of the
320
output, the way error messages are presented, etc.
322
This should be the main method for tests that want to exercise the
323
overall behavior of the bzr application (rather than a unit test
324
or a functional test of the library.)
326
Much of the old code runs bzr by forking a new copy of Python, but
327
that is slower, harder to debug, and generally not necessary.
329
This runs bzr through the interface that catches and reports
330
errors, and with logging set to something approximating the
331
default, so that error reporting can be checked.
333
argv -- arguments to invoke bzr
334
retcode -- expected return code, or None for don't-care.
338
self.log('run bzr: %s', ' '.join(argv))
339
# FIXME: don't call into logging here
340
handler = logging.StreamHandler(stderr)
341
handler.setFormatter(bzrlib.trace.QuietFormatter())
342
handler.setLevel(logging.INFO)
343
logger = logging.getLogger('')
344
logger.addHandler(handler)
346
result = self.apply_redirected(None, stdout, stderr,
347
bzrlib.commands.run_bzr_catch_errors,
350
logger.removeHandler(handler)
351
out = stdout.getvalue()
352
err = stderr.getvalue()
354
self.log('output:\n%s', out)
356
self.log('errors:\n%s', err)
357
if retcode is not None:
358
self.assertEquals(result, retcode)
361
def run_bzr(self, *args, **kwargs):
362
"""Invoke bzr, as if it were run from the command line.
364
This should be the main method for tests that want to exercise the
365
overall behavior of the bzr application (rather than a unit test
366
or a functional test of the library.)
368
This sends the stdout/stderr results into the test's log,
369
where it may be useful for debugging. See also run_captured.
371
retcode = kwargs.pop('retcode', 0)
372
return self.run_bzr_captured(args, retcode)
374
def check_inventory_shape(self, inv, shape):
375
"""Compare an inventory to a list of expected names.
377
Fail if they are not precisely equal.
380
shape = list(shape) # copy
381
for path, ie in inv.entries():
382
name = path.replace('\\', '/')
390
self.fail("expected paths not found in inventory: %r" % shape)
392
self.fail("unexpected paths found in inventory: %r" % extras)
394
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
395
a_callable=None, *args, **kwargs):
396
"""Call callable with redirected std io pipes.
398
Returns the return code."""
399
if not callable(a_callable):
400
raise ValueError("a_callable must be callable.")
404
if hasattr(self, "_log_file"):
405
stdout = self._log_file
409
if hasattr(self, "_log_file"):
410
stderr = self._log_file
413
real_stdin = sys.stdin
414
real_stdout = sys.stdout
415
real_stderr = sys.stderr
420
return a_callable(*args, **kwargs)
422
sys.stdout = real_stdout
423
sys.stderr = real_stderr
424
sys.stdin = real_stdin
427
BzrTestBase = TestCase
430
class TestCaseInTempDir(TestCase):
431
"""Derived class that runs a test within a temporary directory.
433
This is useful for tests that need to create a branch, etc.
435
The directory is created in a slightly complex way: for each
436
Python invocation, a new temporary top-level directory is created.
437
All test cases create their own directory within that. If the
438
tests complete successfully, the directory is removed.
440
InTempDir is an old alias for FunctionalTestCase.
445
OVERRIDE_PYTHON = 'python'
447
def check_file_contents(self, filename, expect):
448
self.log("check contents of file %s" % filename)
449
contents = file(filename, 'r').read()
450
if contents != expect:
451
self.log("expected: %r" % expect)
452
self.log("actually: %r" % contents)
453
self.fail("contents of %s not as expected" % filename)
455
def _make_test_root(self):
456
if TestCaseInTempDir.TEST_ROOT is not None:
460
root = u'test%04d.tmp' % i
464
if e.errno == errno.EEXIST:
469
# successfully created
470
TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
472
# make a fake bzr directory there to prevent any tests propagating
473
# up onto the source directory's real branch
474
os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
477
super(TestCaseInTempDir, self).setUp()
478
self._make_test_root()
479
_currentdir = os.getcwdu()
480
short_id = self.id().replace('bzrlib.selftest.', '') \
481
.replace('__main__.', '')
482
self.test_dir = os.path.join(self.TEST_ROOT, short_id)
483
os.mkdir(self.test_dir)
484
os.chdir(self.test_dir)
485
os.environ['HOME'] = self.test_dir
486
def _leaveDirectory():
487
os.chdir(_currentdir)
488
self.addCleanup(_leaveDirectory)
490
def build_tree(self, shape, line_endings='native'):
491
"""Build a test tree according to a pattern.
493
shape is a sequence of file specifications. If the final
494
character is '/', a directory is created.
496
This doesn't add anything to a branch.
497
:param line_endings: Either 'binary' or 'native'
498
in binary mode, exact contents are written
499
in native mode, the line endings match the
500
default platform endings.
502
# XXX: It's OK to just create them using forward slashes on windows?
504
self.assert_(isinstance(name, basestring))
508
if line_endings == 'binary':
510
elif line_endings == 'native':
513
raise BzrError('Invalid line ending request %r' % (line_endings,))
514
print >>f, "contents of", name
517
def build_tree_contents(self, shape):
518
bzrlib.selftest.build_tree_contents(shape)
520
def failUnlessExists(self, path):
521
"""Fail unless path, which may be abs or relative, exists."""
522
self.failUnless(osutils.lexists(path))
524
def assertFileEqual(self, content, path):
525
"""Fail if path does not contain 'content'."""
526
self.failUnless(osutils.lexists(path))
527
self.assertEqualDiff(content, open(path, 'r').read())
530
class MetaTestLog(TestCase):
531
def test_logging(self):
532
"""Test logs are captured when a test fails."""
533
self.log('a test message')
534
self._log_file.flush()
535
self.assertContainsRe(self._get_log(), 'a test message\n')
538
def filter_suite_by_re(suite, pattern):
539
result = TestUtil.TestSuite()
540
filter_re = re.compile(pattern)
541
for test in iter_suite_tests(suite):
542
if filter_re.search(test.id()):
547
def run_suite(suite, name='test', verbose=False, pattern=".*",
548
stop_on_failure=False, keep_output=False):
549
TestCaseInTempDir._TEST_NAME = name
554
runner = TextTestRunner(stream=sys.stdout,
557
runner.stop_on_failure=stop_on_failure
559
suite = filter_suite_by_re(suite, pattern)
560
result = runner.run(suite)
561
# This is still a little bogus,
562
# but only a little. Folk not using our testrunner will
563
# have to delete their temp directories themselves.
564
if result.wasSuccessful() or not keep_output:
565
if TestCaseInTempDir.TEST_ROOT is not None:
566
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
568
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
569
return result.wasSuccessful()
572
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
574
"""Run the whole test suite under the enhanced runner"""
575
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
576
stop_on_failure=stop_on_failure, keep_output=keep_output)
580
"""Build and return TestSuite for the whole program."""
581
import bzrlib.store, bzrlib.inventory, bzrlib.branch
582
import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
583
from doctest import DocTestSuite
585
global MODULES_TO_TEST, MODULES_TO_DOCTEST
587
# FIXME: If these fail to load, e.g. because of a syntax error, the
588
# exception is hidden by unittest. Sucks. Should either fix that or
589
# perhaps import them and pass them to unittest as modules.
591
['bzrlib.selftest.MetaTestLog',
592
'bzrlib.selftest.testapi',
593
'bzrlib.selftest.testgpg',
594
'bzrlib.selftest.testidentitymap',
595
'bzrlib.selftest.testinv',
596
'bzrlib.selftest.test_ancestry',
597
'bzrlib.selftest.test_commit',
598
'bzrlib.selftest.test_command',
599
'bzrlib.selftest.test_commit_merge',
600
'bzrlib.selftest.testconfig',
601
'bzrlib.selftest.versioning',
602
'bzrlib.selftest.testmerge3',
603
'bzrlib.selftest.testmerge',
604
'bzrlib.selftest.testhashcache',
605
'bzrlib.selftest.teststatus',
606
'bzrlib.selftest.testlog',
607
'bzrlib.selftest.testrevisionnamespaces',
608
'bzrlib.selftest.testbranch',
609
'bzrlib.selftest.testrevision',
610
'bzrlib.selftest.test_revision_info',
611
'bzrlib.selftest.test_merge_core',
612
'bzrlib.selftest.test_smart_add',
613
'bzrlib.selftest.test_bad_files',
614
'bzrlib.selftest.testdiff',
615
'bzrlib.selftest.test_parent',
616
'bzrlib.selftest.test_xml',
617
'bzrlib.selftest.test_weave',
618
'bzrlib.selftest.testfetch',
619
'bzrlib.selftest.whitebox',
620
'bzrlib.selftest.teststore',
621
'bzrlib.selftest.blackbox',
622
'bzrlib.selftest.testsampler',
623
'bzrlib.selftest.testtransactions',
624
'bzrlib.selftest.testtransport',
625
'bzrlib.selftest.testsftp',
626
'bzrlib.selftest.testgraph',
627
'bzrlib.selftest.testworkingtree',
628
'bzrlib.selftest.test_upgrade',
629
'bzrlib.selftest.test_conflicts',
630
'bzrlib.selftest.testtestament',
631
'bzrlib.selftest.testannotate',
632
'bzrlib.selftest.testrevprops',
633
'bzrlib.selftest.testoptions',
634
'bzrlib.selftest.testhttp',
635
'bzrlib.selftest.testnonascii',
636
'bzrlib.selftest.testreweave',
637
'bzrlib.selftest.testtsort',
638
'bzrlib.selftest.testtrace',
639
'bzrlib.selftest.testbasicio',
642
for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
643
bzrlib.osutils, bzrlib.commands, bzrlib.merge3,
646
if m not in MODULES_TO_DOCTEST:
647
MODULES_TO_DOCTEST.append(m)
649
TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
650
print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
653
suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
654
for m in MODULES_TO_TEST:
655
suite.addTest(TestLoader().loadTestsFromModule(m))
656
for m in (MODULES_TO_DOCTEST):
657
suite.addTest(DocTestSuite(m))
658
for p in bzrlib.plugin.all_plugins:
659
if hasattr(p, 'test_suite'):
660
suite.addTest(p.test_suite())