1
# Copyright (C) 2005, 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
30
from cStringIO import StringIO
39
from subprocess import Popen, PIPE
47
import bzrlib.bzrdir as bzrdir
48
import bzrlib.commands
49
import bzrlib.bundle.serializer
50
import bzrlib.errors as errors
51
import bzrlib.inventory
52
import bzrlib.iterablefile
57
# lsprof not available
59
from bzrlib.merge import merge_inner
62
import bzrlib.osutils as osutils
64
import bzrlib.progress as progress
65
from bzrlib.revision import common_ancestor
68
from bzrlib.transport import get_transport
69
import bzrlib.transport
70
from bzrlib.transport.local import LocalRelpathServer
71
from bzrlib.transport.readonly import ReadonlyServer
72
from bzrlib.trace import mutter
73
from bzrlib.tests import TestUtil
74
from bzrlib.tests.TestUtil import (
78
from bzrlib.tests.treeshape import build_tree_contents
79
import bzrlib.urlutils as urlutils
80
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
82
default_transport = LocalRelpathServer
85
MODULES_TO_DOCTEST = [
87
bzrlib.bundle.serializer,
100
def packages_to_test():
101
"""Return a list of packages to test.
103
The packages are not globally imported so that import failures are
104
triggered when running selftest, not when importing the command.
107
import bzrlib.tests.blackbox
108
import bzrlib.tests.branch_implementations
109
import bzrlib.tests.bzrdir_implementations
110
import bzrlib.tests.interrepository_implementations
111
import bzrlib.tests.interversionedfile_implementations
112
import bzrlib.tests.repository_implementations
113
import bzrlib.tests.revisionstore_implementations
114
import bzrlib.tests.workingtree_implementations
117
bzrlib.tests.blackbox,
118
bzrlib.tests.branch_implementations,
119
bzrlib.tests.bzrdir_implementations,
120
bzrlib.tests.interrepository_implementations,
121
bzrlib.tests.interversionedfile_implementations,
122
bzrlib.tests.repository_implementations,
123
bzrlib.tests.revisionstore_implementations,
124
bzrlib.tests.workingtree_implementations,
128
class _MyResult(unittest._TextTestResult):
129
"""Custom TestResult.
131
Shows output in a different format, including displaying runtime for tests.
135
def __init__(self, stream, descriptions, verbosity, pb=None):
136
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
139
def extractBenchmarkTime(self, testCase):
140
"""Add a benchmark time for the current test case."""
141
self._benchmarkTime = getattr(testCase, "_benchtime", None)
143
def _elapsedTestTimeString(self):
144
"""Return a time string for the overall time the current test has taken."""
145
return self._formatTime(time.time() - self._start_time)
147
def _testTimeString(self):
148
if self._benchmarkTime is not None:
150
self._formatTime(self._benchmarkTime),
151
self._elapsedTestTimeString())
153
return " %s" % self._elapsedTestTimeString()
155
def _formatTime(self, seconds):
156
"""Format seconds as milliseconds with leading spaces."""
157
return "%5dms" % (1000 * seconds)
159
def _ellipsise_unimportant_words(self, a_string, final_width,
161
"""Add ellipses (sp?) for overly long strings.
163
:param keep_start: If true preserve the start of a_string rather
167
if len(a_string) > final_width:
168
result = a_string[:final_width-3] + '...'
172
if len(a_string) > final_width:
173
result = '...' + a_string[3-final_width:]
176
return result.ljust(final_width)
178
def startTest(self, test):
179
unittest.TestResult.startTest(self, test)
180
# In a short description, the important words are in
181
# the beginning, but in an id, the important words are
183
SHOW_DESCRIPTIONS = False
185
if not self.showAll and self.dots and self.pb is not None:
188
final_width = osutils.terminal_width()
189
final_width = final_width - 15 - 8
191
if SHOW_DESCRIPTIONS:
192
what = test.shortDescription()
194
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
197
if what.startswith('bzrlib.tests.'):
199
what = self._ellipsise_unimportant_words(what, final_width)
201
self.stream.write(what)
202
elif self.dots and self.pb is not None:
203
self.pb.update(what, self.testsRun - 1, None)
205
self._recordTestStartTime()
207
def _recordTestStartTime(self):
208
"""Record that a test has started."""
209
self._start_time = time.time()
211
def addError(self, test, err):
212
if isinstance(err[1], TestSkipped):
213
return self.addSkipped(test, err)
214
unittest.TestResult.addError(self, test, err)
215
self.extractBenchmarkTime(test)
217
self.stream.writeln("ERROR %s" % self._testTimeString())
218
elif self.dots and self.pb is None:
219
self.stream.write('E')
221
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
222
self.pb.note(self._ellipsise_unimportant_words(
223
test.id() + ': ERROR',
224
osutils.terminal_width()))
229
def addFailure(self, test, err):
230
unittest.TestResult.addFailure(self, test, err)
231
self.extractBenchmarkTime(test)
233
self.stream.writeln(" FAIL %s" % self._testTimeString())
234
elif self.dots and self.pb is None:
235
self.stream.write('F')
237
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
238
self.pb.note(self._ellipsise_unimportant_words(
239
test.id() + ': FAIL',
240
osutils.terminal_width()))
245
def addSuccess(self, test):
246
self.extractBenchmarkTime(test)
248
self.stream.writeln(' OK %s' % self._testTimeString())
249
for bench_called, stats in getattr(test, '_benchcalls', []):
250
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
251
stats.pprint(file=self.stream)
252
elif self.dots and self.pb is None:
253
self.stream.write('~')
255
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
257
unittest.TestResult.addSuccess(self, test)
259
def addSkipped(self, test, skip_excinfo):
260
self.extractBenchmarkTime(test)
262
print >>self.stream, ' SKIP %s' % self._testTimeString()
263
print >>self.stream, ' %s' % skip_excinfo[1]
264
elif self.dots and self.pb is None:
265
self.stream.write('S')
267
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
269
# seems best to treat this as success from point-of-view of unittest
270
# -- it actually does nothing so it barely matters :)
273
except KeyboardInterrupt:
276
self.addError(test, test.__exc_info())
278
unittest.TestResult.addSuccess(self, test)
280
def printErrorList(self, flavour, errors):
281
for test, err in errors:
282
self.stream.writeln(self.separator1)
283
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
284
if getattr(test, '_get_log', None) is not None:
286
print >>self.stream, \
287
('vvvv[log from %s]' % test.id()).ljust(78,'-')
288
print >>self.stream, test._get_log()
289
print >>self.stream, \
290
('^^^^[log from %s]' % test.id()).ljust(78,'-')
291
self.stream.writeln(self.separator2)
292
self.stream.writeln("%s" % err)
295
class TextTestRunner(object):
296
stop_on_failure = False
304
self.stream = unittest._WritelnDecorator(stream)
305
self.descriptions = descriptions
306
self.verbosity = verbosity
307
self.keep_output = keep_output
310
def _makeResult(self):
311
result = _MyResult(self.stream,
315
result.stop_early = self.stop_on_failure
319
"Run the given test case or test suite."
320
result = self._makeResult()
321
startTime = time.time()
322
if self.pb is not None:
323
self.pb.update('Running tests', 0, test.countTestCases())
325
stopTime = time.time()
326
timeTaken = stopTime - startTime
328
self.stream.writeln(result.separator2)
329
run = result.testsRun
330
self.stream.writeln("Ran %d test%s in %.3fs" %
331
(run, run != 1 and "s" or "", timeTaken))
332
self.stream.writeln()
333
if not result.wasSuccessful():
334
self.stream.write("FAILED (")
335
failed, errored = map(len, (result.failures, result.errors))
337
self.stream.write("failures=%d" % failed)
339
if failed: self.stream.write(", ")
340
self.stream.write("errors=%d" % errored)
341
self.stream.writeln(")")
343
self.stream.writeln("OK")
344
if self.pb is not None:
345
self.pb.update('Cleaning up', 0, 1)
346
# This is still a little bogus,
347
# but only a little. Folk not using our testrunner will
348
# have to delete their temp directories themselves.
349
test_root = TestCaseInTempDir.TEST_ROOT
350
if result.wasSuccessful() or not self.keep_output:
351
if test_root is not None:
352
# If LANG=C we probably have created some bogus paths
353
# which rmtree(unicode) will fail to delete
354
# so make sure we are using rmtree(str) to delete everything
355
# except on win32, where rmtree(str) will fail
356
# since it doesn't have the property of byte-stream paths
357
# (they are either ascii or mbcs)
358
if sys.platform == 'win32':
359
# make sure we are using the unicode win32 api
360
test_root = unicode(test_root)
362
test_root = test_root.encode(
363
sys.getfilesystemencoding())
364
osutils.rmtree(test_root)
366
if self.pb is not None:
367
self.pb.note("Failed tests working directories are in '%s'\n",
371
"Failed tests working directories are in '%s'\n" %
373
TestCaseInTempDir.TEST_ROOT = None
374
if self.pb is not None:
379
def iter_suite_tests(suite):
380
"""Return all tests in a suite, recursing through nested suites"""
381
for item in suite._tests:
382
if isinstance(item, unittest.TestCase):
384
elif isinstance(item, unittest.TestSuite):
385
for r in iter_suite_tests(item):
388
raise Exception('unknown object %r inside test suite %r'
392
class TestSkipped(Exception):
393
"""Indicates that a test was intentionally skipped, rather than failing."""
397
class CommandFailed(Exception):
401
class StringIOWrapper(object):
402
"""A wrapper around cStringIO which just adds an encoding attribute.
404
Internally we can check sys.stdout to see what the output encoding
405
should be. However, cStringIO has no encoding attribute that we can
406
set. So we wrap it instead.
411
def __init__(self, s=None):
413
self.__dict__['_cstring'] = StringIO(s)
415
self.__dict__['_cstring'] = StringIO()
417
def __getattr__(self, name, getattr=getattr):
418
return getattr(self.__dict__['_cstring'], name)
420
def __setattr__(self, name, val):
421
if name == 'encoding':
422
self.__dict__['encoding'] = val
424
return setattr(self._cstring, name, val)
427
class TestCase(unittest.TestCase):
428
"""Base class for bzr unit tests.
430
Tests that need access to disk resources should subclass
431
TestCaseInTempDir not TestCase.
433
Error and debug log messages are redirected from their usual
434
location into a temporary file, the contents of which can be
435
retrieved by _get_log(). We use a real OS file, not an in-memory object,
436
so that it can also capture file IO. When the test completes this file
437
is read into memory and removed from disk.
439
There are also convenience functions to invoke bzr's command-line
440
routine, and to build and check bzr trees.
442
In addition to the usual method of overriding tearDown(), this class also
443
allows subclasses to register functions into the _cleanups list, which is
444
run in order as the object is torn down. It's less likely this will be
445
accidentally overlooked.
448
_log_file_name = None
450
# record lsprof data when performing benchmark calls.
451
_gather_lsprof_in_benchmarks = False
453
def __init__(self, methodName='testMethod'):
454
super(TestCase, self).__init__(methodName)
458
unittest.TestCase.setUp(self)
459
self._cleanEnvironment()
460
bzrlib.trace.disable_default_logging()
462
self._benchcalls = []
463
self._benchtime = None
465
def _ndiff_strings(self, a, b):
466
"""Return ndiff between two strings containing lines.
468
A trailing newline is added if missing to make the strings
470
if b and b[-1] != '\n':
472
if a and a[-1] != '\n':
474
difflines = difflib.ndiff(a.splitlines(True),
476
linejunk=lambda x: False,
477
charjunk=lambda x: False)
478
return ''.join(difflines)
480
def assertEqualDiff(self, a, b, message=None):
481
"""Assert two texts are equal, if not raise an exception.
483
This is intended for use with multi-line strings where it can
484
be hard to find the differences by eye.
486
# TODO: perhaps override assertEquals to call this for strings?
490
message = "texts not equal:\n"
491
raise AssertionError(message +
492
self._ndiff_strings(a, b))
494
def assertEqualMode(self, mode, mode_test):
495
self.assertEqual(mode, mode_test,
496
'mode mismatch %o != %o' % (mode, mode_test))
498
def assertStartsWith(self, s, prefix):
499
if not s.startswith(prefix):
500
raise AssertionError('string %r does not start with %r' % (s, prefix))
502
def assertEndsWith(self, s, suffix):
503
"""Asserts that s ends with suffix."""
504
if not s.endswith(suffix):
505
raise AssertionError('string %r does not end with %r' % (s, suffix))
507
def assertContainsRe(self, haystack, needle_re):
508
"""Assert that a contains something matching a regular expression."""
509
if not re.search(needle_re, haystack):
510
raise AssertionError('pattern "%s" not found in "%s"'
511
% (needle_re, haystack))
513
def assertNotContainsRe(self, haystack, needle_re):
514
"""Assert that a does not match a regular expression"""
515
if re.search(needle_re, haystack):
516
raise AssertionError('pattern "%s" found in "%s"'
517
% (needle_re, haystack))
519
def assertSubset(self, sublist, superlist):
520
"""Assert that every entry in sublist is present in superlist."""
522
for entry in sublist:
523
if entry not in superlist:
524
missing.append(entry)
526
raise AssertionError("value(s) %r not present in container %r" %
527
(missing, superlist))
529
def assertIs(self, left, right):
530
if not (left is right):
531
raise AssertionError("%r is not %r." % (left, right))
533
def assertTransportMode(self, transport, path, mode):
534
"""Fail if a path does not have mode mode.
536
If modes are not supported on this transport, the assertion is ignored.
538
if not transport._can_roundtrip_unix_modebits():
540
path_stat = transport.stat(path)
541
actual_mode = stat.S_IMODE(path_stat.st_mode)
542
self.assertEqual(mode, actual_mode,
543
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
545
def assertIsInstance(self, obj, kls):
546
"""Fail if obj is not an instance of kls"""
547
if not isinstance(obj, kls):
548
self.fail("%r is an instance of %s rather than %s" % (
549
obj, obj.__class__, kls))
551
def _startLogFile(self):
552
"""Send bzr and test log messages to a temporary file.
554
The file is removed as the test is torn down.
556
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
557
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
558
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
559
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
560
self._log_file_name = name
561
self.addCleanup(self._finishLogFile)
563
def _finishLogFile(self):
564
"""Finished with the log file.
566
Read contents into memory, close, and delete.
568
if self._log_file is None:
570
bzrlib.trace.disable_test_log(self._log_nonce)
571
self._log_file.seek(0)
572
self._log_contents = self._log_file.read()
573
self._log_file.close()
574
os.remove(self._log_file_name)
575
self._log_file = self._log_file_name = None
577
def addCleanup(self, callable):
578
"""Arrange to run a callable when this case is torn down.
580
Callables are run in the reverse of the order they are registered,
581
ie last-in first-out.
583
if callable in self._cleanups:
584
raise ValueError("cleanup function %r already registered on %s"
586
self._cleanups.append(callable)
588
def _cleanEnvironment(self):
591
'APPDATA': os.getcwd(),
596
self.addCleanup(self._restoreEnvironment)
597
for name, value in new_env.iteritems():
598
self._captureVar(name, value)
601
def _captureVar(self, name, newvalue):
602
"""Set an environment variable, preparing it to be reset when finished."""
603
self.__old_env[name] = os.environ.get(name, None)
605
if name in os.environ:
608
os.environ[name] = newvalue
611
def _restoreVar(name, value):
613
if name in os.environ:
616
os.environ[name] = value
618
def _restoreEnvironment(self):
619
for name, value in self.__old_env.iteritems():
620
self._restoreVar(name, value)
624
unittest.TestCase.tearDown(self)
626
def time(self, callable, *args, **kwargs):
627
"""Run callable and accrue the time it takes to the benchmark time.
629
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
630
this will cause lsprofile statistics to be gathered and stored in
633
if self._benchtime is None:
637
if not self._gather_lsprof_in_benchmarks:
638
return callable(*args, **kwargs)
640
# record this benchmark
641
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
643
self._benchcalls.append(((callable, args, kwargs), stats))
646
self._benchtime += time.time() - start
648
def _runCleanups(self):
649
"""Run registered cleanup functions.
651
This should only be called from TestCase.tearDown.
653
# TODO: Perhaps this should keep running cleanups even if
655
for cleanup_fn in reversed(self._cleanups):
658
def log(self, *args):
662
"""Return as a string the log for this test"""
663
if self._log_file_name:
664
return open(self._log_file_name).read()
666
return self._log_contents
667
# TODO: Delete the log after it's been read in
669
def capture(self, cmd, retcode=0):
670
"""Shortcut that splits cmd into words, runs, and returns stdout"""
671
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
673
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
674
"""Invoke bzr and return (stdout, stderr).
676
Useful for code that wants to check the contents of the
677
output, the way error messages are presented, etc.
679
This should be the main method for tests that want to exercise the
680
overall behavior of the bzr application (rather than a unit test
681
or a functional test of the library.)
683
Much of the old code runs bzr by forking a new copy of Python, but
684
that is slower, harder to debug, and generally not necessary.
686
This runs bzr through the interface that catches and reports
687
errors, and with logging set to something approximating the
688
default, so that error reporting can be checked.
690
:param argv: arguments to invoke bzr
691
:param retcode: expected return code, or None for don't-care.
692
:param encoding: encoding for sys.stdout and sys.stderr
693
:param stdin: A string to be used as stdin for the command.
696
encoding = bzrlib.user_encoding
697
if stdin is not None:
698
stdin = StringIO(stdin)
699
stdout = StringIOWrapper()
700
stderr = StringIOWrapper()
701
stdout.encoding = encoding
702
stderr.encoding = encoding
704
self.log('run bzr: %r', argv)
705
# FIXME: don't call into logging here
706
handler = logging.StreamHandler(stderr)
707
handler.setLevel(logging.INFO)
708
logger = logging.getLogger('')
709
logger.addHandler(handler)
710
old_ui_factory = bzrlib.ui.ui_factory
711
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
714
bzrlib.ui.ui_factory.stdin = stdin
716
result = self.apply_redirected(stdin, stdout, stderr,
717
bzrlib.commands.run_bzr_catch_errors,
720
logger.removeHandler(handler)
721
bzrlib.ui.ui_factory = old_ui_factory
723
out = stdout.getvalue()
724
err = stderr.getvalue()
726
self.log('output:\n%r', out)
728
self.log('errors:\n%r', err)
729
if retcode is not None:
730
self.assertEquals(retcode, result)
733
def run_bzr(self, *args, **kwargs):
734
"""Invoke bzr, as if it were run from the command line.
736
This should be the main method for tests that want to exercise the
737
overall behavior of the bzr application (rather than a unit test
738
or a functional test of the library.)
740
This sends the stdout/stderr results into the test's log,
741
where it may be useful for debugging. See also run_captured.
743
:param stdin: A string to be used as stdin for the command.
745
retcode = kwargs.pop('retcode', 0)
746
encoding = kwargs.pop('encoding', None)
747
stdin = kwargs.pop('stdin', None)
748
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
750
def run_bzr_decode(self, *args, **kwargs):
751
if kwargs.has_key('encoding'):
752
encoding = kwargs['encoding']
754
encoding = bzrlib.user_encoding
755
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
757
def run_bzr_error(self, error_regexes, *args, **kwargs):
758
"""Run bzr, and check that stderr contains the supplied regexes
760
:param error_regexes: Sequence of regular expressions which
761
must each be found in the error output. The relative ordering
763
:param args: command-line arguments for bzr
764
:param kwargs: Keyword arguments which are interpreted by run_bzr
765
This function changes the default value of retcode to be 3,
766
since in most cases this is run when you expect bzr to fail.
767
:return: (out, err) The actual output of running the command (in case you
768
want to do more inspection)
771
# Make sure that commit is failing because there is nothing to do
772
self.run_bzr_error(['no changes to commit'],
773
'commit', '-m', 'my commit comment')
774
# Make sure --strict is handling an unknown file, rather than
775
# giving us the 'nothing to do' error
776
self.build_tree(['unknown'])
777
self.run_bzr_error(['Commit refused because there are unknown files'],
778
'commit', '--strict', '-m', 'my commit comment')
780
kwargs.setdefault('retcode', 3)
781
out, err = self.run_bzr(*args, **kwargs)
782
for regex in error_regexes:
783
self.assertContainsRe(err, regex)
786
def run_bzr_subprocess(self, *args, **kwargs):
787
"""Run bzr in a subprocess for testing.
789
This starts a new Python interpreter and runs bzr in there.
790
This should only be used for tests that have a justifiable need for
791
this isolation: e.g. they are testing startup time, or signal
792
handling, or early startup code, etc. Subprocess code can't be
793
profiled or debugged so easily.
795
:param retcode: The status code that is expected. Defaults to 0. If
796
None is supplied, the status code is not checked.
798
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
800
process = Popen([sys.executable, bzr_path]+args, stdout=PIPE,
802
out = process.stdout.read()
803
err = process.stderr.read()
804
retcode = process.wait()
805
supplied_retcode = kwargs.get('retcode', 0)
806
if supplied_retcode is not None:
807
assert supplied_retcode == retcode
810
def check_inventory_shape(self, inv, shape):
811
"""Compare an inventory to a list of expected names.
813
Fail if they are not precisely equal.
816
shape = list(shape) # copy
817
for path, ie in inv.entries():
818
name = path.replace('\\', '/')
826
self.fail("expected paths not found in inventory: %r" % shape)
828
self.fail("unexpected paths found in inventory: %r" % extras)
830
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
831
a_callable=None, *args, **kwargs):
832
"""Call callable with redirected std io pipes.
834
Returns the return code."""
835
if not callable(a_callable):
836
raise ValueError("a_callable must be callable.")
840
if getattr(self, "_log_file", None) is not None:
841
stdout = self._log_file
845
if getattr(self, "_log_file", None is not None):
846
stderr = self._log_file
849
real_stdin = sys.stdin
850
real_stdout = sys.stdout
851
real_stderr = sys.stderr
856
return a_callable(*args, **kwargs)
858
sys.stdout = real_stdout
859
sys.stderr = real_stderr
860
sys.stdin = real_stdin
862
def merge(self, branch_from, wt_to):
863
"""A helper for tests to do a ui-less merge.
865
This should move to the main library when someone has time to integrate
868
# minimal ui-less merge.
869
wt_to.branch.fetch(branch_from)
870
base_rev = common_ancestor(branch_from.last_revision(),
871
wt_to.branch.last_revision(),
872
wt_to.branch.repository)
873
merge_inner(wt_to.branch, branch_from.basis_tree(),
874
wt_to.branch.repository.revision_tree(base_rev),
876
wt_to.add_pending_merge(branch_from.last_revision())
879
BzrTestBase = TestCase
882
class TestCaseInTempDir(TestCase):
883
"""Derived class that runs a test within a temporary directory.
885
This is useful for tests that need to create a branch, etc.
887
The directory is created in a slightly complex way: for each
888
Python invocation, a new temporary top-level directory is created.
889
All test cases create their own directory within that. If the
890
tests complete successfully, the directory is removed.
892
InTempDir is an old alias for FunctionalTestCase.
897
OVERRIDE_PYTHON = 'python'
899
def check_file_contents(self, filename, expect):
900
self.log("check contents of file %s" % filename)
901
contents = file(filename, 'r').read()
902
if contents != expect:
903
self.log("expected: %r" % expect)
904
self.log("actually: %r" % contents)
905
self.fail("contents of %s not as expected" % filename)
907
def _make_test_root(self):
908
if TestCaseInTempDir.TEST_ROOT is not None:
912
root = u'test%04d.tmp' % i
916
if e.errno == errno.EEXIST:
921
# successfully created
922
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
924
# make a fake bzr directory there to prevent any tests propagating
925
# up onto the source directory's real branch
926
bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
929
super(TestCaseInTempDir, self).setUp()
930
self._make_test_root()
931
_currentdir = os.getcwdu()
932
# shorten the name, to avoid test failures due to path length
933
short_id = self.id().replace('bzrlib.tests.', '') \
934
.replace('__main__.', '')[-100:]
935
# it's possible the same test class is run several times for
936
# parameterized tests, so make sure the names don't collide.
940
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
942
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
943
if os.path.exists(candidate_dir):
947
self.test_dir = candidate_dir
948
os.mkdir(self.test_dir)
949
os.chdir(self.test_dir)
951
os.environ['HOME'] = self.test_dir
952
os.environ['APPDATA'] = self.test_dir
953
def _leaveDirectory():
954
os.chdir(_currentdir)
955
self.addCleanup(_leaveDirectory)
957
def build_tree(self, shape, line_endings='native', transport=None):
958
"""Build a test tree according to a pattern.
960
shape is a sequence of file specifications. If the final
961
character is '/', a directory is created.
963
This doesn't add anything to a branch.
964
:param line_endings: Either 'binary' or 'native'
965
in binary mode, exact contents are written
966
in native mode, the line endings match the
967
default platform endings.
969
:param transport: A transport to write to, for building trees on
970
VFS's. If the transport is readonly or None,
971
"." is opened automatically.
973
# XXX: It's OK to just create them using forward slashes on windows?
974
if transport is None or transport.is_readonly():
975
transport = get_transport(".")
977
self.assert_(isinstance(name, basestring))
979
transport.mkdir(urlutils.escape(name[:-1]))
981
if line_endings == 'binary':
983
elif line_endings == 'native':
986
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
987
content = "contents of %s%s" % (name.encode('utf-8'), end)
988
transport.put(urlutils.escape(name), StringIO(content))
990
def build_tree_contents(self, shape):
991
build_tree_contents(shape)
993
def failUnlessExists(self, path):
994
"""Fail unless path, which may be abs or relative, exists."""
995
self.failUnless(osutils.lexists(path))
997
def failIfExists(self, path):
998
"""Fail if path, which may be abs or relative, exists."""
999
self.failIf(osutils.lexists(path))
1001
def assertFileEqual(self, content, path):
1002
"""Fail if path does not contain 'content'."""
1003
self.failUnless(osutils.lexists(path))
1004
# TODO: jam 20060427 Shouldn't this be 'rb'?
1005
self.assertEqualDiff(content, open(path, 'r').read())
1008
class TestCaseWithTransport(TestCaseInTempDir):
1009
"""A test case that provides get_url and get_readonly_url facilities.
1011
These back onto two transport servers, one for readonly access and one for
1014
If no explicit class is provided for readonly access, a
1015
ReadonlyTransportDecorator is used instead which allows the use of non disk
1016
based read write transports.
1018
If an explicit class is provided for readonly access, that server and the
1019
readwrite one must both define get_url() as resolving to os.getcwd().
1022
def __init__(self, methodName='testMethod'):
1023
super(TestCaseWithTransport, self).__init__(methodName)
1024
self.__readonly_server = None
1025
self.__server = None
1026
self.transport_server = default_transport
1027
self.transport_readonly_server = None
1029
def get_readonly_url(self, relpath=None):
1030
"""Get a URL for the readonly transport.
1032
This will either be backed by '.' or a decorator to the transport
1033
used by self.get_url()
1034
relpath provides for clients to get a path relative to the base url.
1035
These should only be downwards relative, not upwards.
1037
base = self.get_readonly_server().get_url()
1038
if relpath is not None:
1039
if not base.endswith('/'):
1041
base = base + relpath
1044
def get_readonly_server(self):
1045
"""Get the server instance for the readonly transport
1047
This is useful for some tests with specific servers to do diagnostics.
1049
if self.__readonly_server is None:
1050
if self.transport_readonly_server is None:
1051
# readonly decorator requested
1052
# bring up the server
1054
self.__readonly_server = ReadonlyServer()
1055
self.__readonly_server.setUp(self.__server)
1057
self.__readonly_server = self.transport_readonly_server()
1058
self.__readonly_server.setUp()
1059
self.addCleanup(self.__readonly_server.tearDown)
1060
return self.__readonly_server
1062
def get_server(self):
1063
"""Get the read/write server instance.
1065
This is useful for some tests with specific servers that need
1068
if self.__server is None:
1069
self.__server = self.transport_server()
1070
self.__server.setUp()
1071
self.addCleanup(self.__server.tearDown)
1072
return self.__server
1074
def get_url(self, relpath=None):
1075
"""Get a URL for the readwrite transport.
1077
This will either be backed by '.' or to an equivalent non-file based
1079
relpath provides for clients to get a path relative to the base url.
1080
These should only be downwards relative, not upwards.
1082
base = self.get_server().get_url()
1083
if relpath is not None and relpath != '.':
1084
if not base.endswith('/'):
1086
base = base + urlutils.escape(relpath)
1089
def get_transport(self):
1090
"""Return a writeable transport for the test scratch space"""
1091
t = get_transport(self.get_url())
1092
self.assertFalse(t.is_readonly())
1095
def get_readonly_transport(self):
1096
"""Return a readonly transport for the test scratch space
1098
This can be used to test that operations which should only need
1099
readonly access in fact do not try to write.
1101
t = get_transport(self.get_readonly_url())
1102
self.assertTrue(t.is_readonly())
1105
def make_branch(self, relpath, format=None):
1106
"""Create a branch on the transport at relpath."""
1107
repo = self.make_repository(relpath, format=format)
1108
return repo.bzrdir.create_branch()
1110
def make_bzrdir(self, relpath, format=None):
1112
url = self.get_url(relpath)
1113
mutter('relpath %r => url %r', relpath, url)
1114
segments = url.split('/')
1115
if segments and segments[-1] not in ('', '.'):
1116
parent = '/'.join(segments[:-1])
1117
t = get_transport(parent)
1119
t.mkdir(segments[-1])
1120
except errors.FileExists:
1123
format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1124
# FIXME: make this use a single transport someday. RBC 20060418
1125
return format.initialize_on_transport(get_transport(relpath))
1126
except errors.UninitializableFormat:
1127
raise TestSkipped("Format %s is not initializable." % format)
1129
def make_repository(self, relpath, shared=False, format=None):
1130
"""Create a repository on our default transport at relpath."""
1131
made_control = self.make_bzrdir(relpath, format=format)
1132
return made_control.create_repository(shared=shared)
1134
def make_branch_and_tree(self, relpath, format=None):
1135
"""Create a branch on the transport and a tree locally.
1139
# TODO: always use the local disk path for the working tree,
1140
# this obviously requires a format that supports branch references
1141
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1143
b = self.make_branch(relpath, format=format)
1145
return b.bzrdir.create_workingtree()
1146
except errors.NotLocalUrl:
1147
# new formats - catch No tree error and create
1148
# a branch reference and a checkout.
1149
# old formats at that point - raise TestSkipped.
1150
# TODO: rbc 20060208
1151
return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
1153
def assertIsDirectory(self, relpath, transport):
1154
"""Assert that relpath within transport is a directory.
1156
This may not be possible on all transports; in that case it propagates
1157
a TransportNotPossible.
1160
mode = transport.stat(relpath).st_mode
1161
except errors.NoSuchFile:
1162
self.fail("path %s is not a directory; no such file"
1164
if not stat.S_ISDIR(mode):
1165
self.fail("path %s is not a directory; has mode %#o"
1169
class ChrootedTestCase(TestCaseWithTransport):
1170
"""A support class that provides readonly urls outside the local namespace.
1172
This is done by checking if self.transport_server is a MemoryServer. if it
1173
is then we are chrooted already, if it is not then an HttpServer is used
1176
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1177
be used without needed to redo it when a different
1178
subclass is in use ?
1182
super(ChrootedTestCase, self).setUp()
1183
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1184
self.transport_readonly_server = bzrlib.transport.http.HttpServer
1187
def filter_suite_by_re(suite, pattern):
1188
result = TestUtil.TestSuite()
1189
filter_re = re.compile(pattern)
1190
for test in iter_suite_tests(suite):
1191
if filter_re.search(test.id()):
1192
result.addTest(test)
1196
def run_suite(suite, name='test', verbose=False, pattern=".*",
1197
stop_on_failure=False, keep_output=False,
1198
transport=None, lsprof_timed=None):
1199
TestCaseInTempDir._TEST_NAME = name
1200
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1206
pb = progress.ProgressBar()
1207
runner = TextTestRunner(stream=sys.stdout,
1209
verbosity=verbosity,
1210
keep_output=keep_output,
1212
runner.stop_on_failure=stop_on_failure
1214
suite = filter_suite_by_re(suite, pattern)
1215
result = runner.run(suite)
1216
return result.wasSuccessful()
1219
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1222
test_suite_factory=None,
1224
"""Run the whole test suite under the enhanced runner"""
1225
global default_transport
1226
if transport is None:
1227
transport = default_transport
1228
old_transport = default_transport
1229
default_transport = transport
1231
if test_suite_factory is None:
1232
suite = test_suite()
1234
suite = test_suite_factory()
1235
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1236
stop_on_failure=stop_on_failure, keep_output=keep_output,
1237
transport=transport,
1238
lsprof_timed=lsprof_timed)
1240
default_transport = old_transport
1244
"""Build and return TestSuite for the whole of bzrlib.
1246
This function can be replaced if you need to change the default test
1247
suite on a global basis, but it is not encouraged.
1250
'bzrlib.tests.test_ancestry',
1251
'bzrlib.tests.test_api',
1252
'bzrlib.tests.test_bad_files',
1253
'bzrlib.tests.test_branch',
1254
'bzrlib.tests.test_bundle',
1255
'bzrlib.tests.test_bzrdir',
1256
'bzrlib.tests.test_command',
1257
'bzrlib.tests.test_commit',
1258
'bzrlib.tests.test_commit_merge',
1259
'bzrlib.tests.test_config',
1260
'bzrlib.tests.test_conflicts',
1261
'bzrlib.tests.test_decorators',
1262
'bzrlib.tests.test_diff',
1263
'bzrlib.tests.test_doc_generate',
1264
'bzrlib.tests.test_emptytree',
1265
'bzrlib.tests.test_errors',
1266
'bzrlib.tests.test_escaped_store',
1267
'bzrlib.tests.test_fetch',
1268
'bzrlib.tests.test_gpg',
1269
'bzrlib.tests.test_graph',
1270
'bzrlib.tests.test_hashcache',
1271
'bzrlib.tests.test_http',
1272
'bzrlib.tests.test_identitymap',
1273
'bzrlib.tests.test_inv',
1274
'bzrlib.tests.test_knit',
1275
'bzrlib.tests.test_lockdir',
1276
'bzrlib.tests.test_lockable_files',
1277
'bzrlib.tests.test_log',
1278
'bzrlib.tests.test_merge',
1279
'bzrlib.tests.test_merge3',
1280
'bzrlib.tests.test_merge_core',
1281
'bzrlib.tests.test_missing',
1282
'bzrlib.tests.test_msgeditor',
1283
'bzrlib.tests.test_nonascii',
1284
'bzrlib.tests.test_options',
1285
'bzrlib.tests.test_osutils',
1286
'bzrlib.tests.test_patch',
1287
'bzrlib.tests.test_patches',
1288
'bzrlib.tests.test_permissions',
1289
'bzrlib.tests.test_plugins',
1290
'bzrlib.tests.test_progress',
1291
'bzrlib.tests.test_reconcile',
1292
'bzrlib.tests.test_repository',
1293
'bzrlib.tests.test_revision',
1294
'bzrlib.tests.test_revisionnamespaces',
1295
'bzrlib.tests.test_revprops',
1296
'bzrlib.tests.test_revisiontree',
1297
'bzrlib.tests.test_rio',
1298
'bzrlib.tests.test_sampler',
1299
'bzrlib.tests.test_selftest',
1300
'bzrlib.tests.test_setup',
1301
'bzrlib.tests.test_sftp_transport',
1302
'bzrlib.tests.test_smart_add',
1303
'bzrlib.tests.test_source',
1304
'bzrlib.tests.test_status',
1305
'bzrlib.tests.test_store',
1306
'bzrlib.tests.test_symbol_versioning',
1307
'bzrlib.tests.test_testament',
1308
'bzrlib.tests.test_textfile',
1309
'bzrlib.tests.test_textmerge',
1310
'bzrlib.tests.test_trace',
1311
'bzrlib.tests.test_transactions',
1312
'bzrlib.tests.test_transform',
1313
'bzrlib.tests.test_transport',
1314
'bzrlib.tests.test_tsort',
1315
'bzrlib.tests.test_tuned_gzip',
1316
'bzrlib.tests.test_ui',
1317
'bzrlib.tests.test_upgrade',
1318
'bzrlib.tests.test_urlutils',
1319
'bzrlib.tests.test_versionedfile',
1320
'bzrlib.tests.test_weave',
1321
'bzrlib.tests.test_whitebox',
1322
'bzrlib.tests.test_workingtree',
1323
'bzrlib.tests.test_xml',
1325
test_transport_implementations = [
1326
'bzrlib.tests.test_transport_implementations',
1327
'bzrlib.tests.test_read_bundle',
1329
suite = TestUtil.TestSuite()
1330
loader = TestUtil.TestLoader()
1331
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1332
from bzrlib.transport import TransportTestProviderAdapter
1333
adapter = TransportTestProviderAdapter()
1334
adapt_modules(test_transport_implementations, adapter, loader, suite)
1335
for package in packages_to_test():
1336
suite.addTest(package.test_suite())
1337
for m in MODULES_TO_TEST:
1338
suite.addTest(loader.loadTestsFromModule(m))
1339
for m in MODULES_TO_DOCTEST:
1340
suite.addTest(doctest.DocTestSuite(m))
1341
for name, plugin in bzrlib.plugin.all_plugins().items():
1342
if getattr(plugin, 'test_suite', None) is not None:
1343
suite.addTest(plugin.test_suite())
1347
def adapt_modules(mods_list, adapter, loader, suite):
1348
"""Adapt the modules in mods_list using adapter and add to suite."""
1349
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1350
suite.addTests(adapter.adapt(test))