1
# Copyright (C) 2005, 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
30
from cStringIO import StringIO
39
from subprocess import Popen, PIPE
47
import bzrlib.bzrdir as bzrdir
48
import bzrlib.commands
49
import bzrlib.bundle.serializer
50
import bzrlib.errors as errors
51
import bzrlib.inventory
52
import bzrlib.iterablefile
57
# lsprof not available
59
from bzrlib.merge import merge_inner
62
import bzrlib.osutils as osutils
64
import bzrlib.progress as progress
65
from bzrlib.revision import common_ancestor
67
from bzrlib import symbol_versioning
69
from bzrlib.transport import get_transport
70
import bzrlib.transport
71
from bzrlib.transport.local import LocalRelpathServer
72
from bzrlib.transport.readonly import ReadonlyServer
73
from bzrlib.trace import mutter
74
from bzrlib.tests import TestUtil
75
from bzrlib.tests.TestUtil import (
79
from bzrlib.tests.treeshape import build_tree_contents
80
import bzrlib.urlutils as urlutils
81
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
83
default_transport = LocalRelpathServer
86
MODULES_TO_DOCTEST = [
87
bzrlib.bundle.serializer,
99
def packages_to_test():
100
"""Return a list of packages to test.
102
The packages are not globally imported so that import failures are
103
triggered when running selftest, not when importing the command.
106
import bzrlib.tests.blackbox
107
import bzrlib.tests.branch_implementations
108
import bzrlib.tests.bzrdir_implementations
109
import bzrlib.tests.interrepository_implementations
110
import bzrlib.tests.interversionedfile_implementations
111
import bzrlib.tests.intertree_implementations
112
import bzrlib.tests.repository_implementations
113
import bzrlib.tests.revisionstore_implementations
114
import bzrlib.tests.tree_implementations
115
import bzrlib.tests.workingtree_implementations
118
bzrlib.tests.blackbox,
119
bzrlib.tests.branch_implementations,
120
bzrlib.tests.bzrdir_implementations,
121
bzrlib.tests.interrepository_implementations,
122
bzrlib.tests.interversionedfile_implementations,
123
bzrlib.tests.intertree_implementations,
124
bzrlib.tests.repository_implementations,
125
bzrlib.tests.revisionstore_implementations,
126
bzrlib.tests.tree_implementations,
127
bzrlib.tests.workingtree_implementations,
131
class _MyResult(unittest._TextTestResult):
132
"""Custom TestResult.
134
Shows output in a different format, including displaying runtime for tests.
138
def __init__(self, stream, descriptions, verbosity, pb=None,
140
"""Construct new TestResult.
142
:param bench_history: Optionally, a writable file object to accumulate
145
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
147
if bench_history is not None:
148
from bzrlib.version import _get_bzr_source_tree
149
src_tree = _get_bzr_source_tree()
152
revision_id = src_tree.get_parent_ids()[0]
154
# XXX: if this is a brand new tree, do the same as if there
158
# XXX: If there's no branch, what should we do?
160
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
161
self._bench_history = bench_history
163
def extractBenchmarkTime(self, testCase):
164
"""Add a benchmark time for the current test case."""
165
self._benchmarkTime = getattr(testCase, "_benchtime", None)
167
def _elapsedTestTimeString(self):
168
"""Return a time string for the overall time the current test has taken."""
169
return self._formatTime(time.time() - self._start_time)
171
def _testTimeString(self):
172
if self._benchmarkTime is not None:
174
self._formatTime(self._benchmarkTime),
175
self._elapsedTestTimeString())
177
return " %s" % self._elapsedTestTimeString()
179
def _formatTime(self, seconds):
180
"""Format seconds as milliseconds with leading spaces."""
181
return "%5dms" % (1000 * seconds)
183
def _ellipsise_unimportant_words(self, a_string, final_width,
185
"""Add ellipses (sp?) for overly long strings.
187
:param keep_start: If true preserve the start of a_string rather
191
if len(a_string) > final_width:
192
result = a_string[:final_width-3] + '...'
196
if len(a_string) > final_width:
197
result = '...' + a_string[3-final_width:]
200
return result.ljust(final_width)
202
def startTest(self, test):
203
unittest.TestResult.startTest(self, test)
204
# In a short description, the important words are in
205
# the beginning, but in an id, the important words are
207
SHOW_DESCRIPTIONS = False
209
if not self.showAll and self.dots and self.pb is not None:
212
final_width = osutils.terminal_width()
213
final_width = final_width - 15 - 8
215
if SHOW_DESCRIPTIONS:
216
what = test.shortDescription()
218
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
221
if what.startswith('bzrlib.tests.'):
223
what = self._ellipsise_unimportant_words(what, final_width)
225
self.stream.write(what)
226
elif self.dots and self.pb is not None:
227
self.pb.update(what, self.testsRun - 1, None)
229
self._recordTestStartTime()
231
def _recordTestStartTime(self):
232
"""Record that a test has started."""
233
self._start_time = time.time()
235
def addError(self, test, err):
236
if isinstance(err[1], TestSkipped):
237
return self.addSkipped(test, err)
238
unittest.TestResult.addError(self, test, err)
239
self.extractBenchmarkTime(test)
241
self.stream.writeln("ERROR %s" % self._testTimeString())
242
elif self.dots and self.pb is None:
243
self.stream.write('E')
245
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
246
self.pb.note(self._ellipsise_unimportant_words(
247
test.id() + ': ERROR',
248
osutils.terminal_width()))
253
def addFailure(self, test, err):
254
unittest.TestResult.addFailure(self, test, err)
255
self.extractBenchmarkTime(test)
257
self.stream.writeln(" FAIL %s" % self._testTimeString())
258
elif self.dots and self.pb is None:
259
self.stream.write('F')
261
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
262
self.pb.note(self._ellipsise_unimportant_words(
263
test.id() + ': FAIL',
264
osutils.terminal_width()))
269
def addSuccess(self, test):
270
self.extractBenchmarkTime(test)
271
if self._bench_history is not None:
272
if self._benchmarkTime is not None:
273
self._bench_history.write("%s %s\n" % (
274
self._formatTime(self._benchmarkTime),
277
self.stream.writeln(' OK %s' % self._testTimeString())
278
for bench_called, stats in getattr(test, '_benchcalls', []):
279
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
280
stats.pprint(file=self.stream)
281
elif self.dots and self.pb is None:
282
self.stream.write('~')
284
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
286
unittest.TestResult.addSuccess(self, test)
288
def addSkipped(self, test, skip_excinfo):
289
self.extractBenchmarkTime(test)
291
print >>self.stream, ' SKIP %s' % self._testTimeString()
292
print >>self.stream, ' %s' % skip_excinfo[1]
293
elif self.dots and self.pb is None:
294
self.stream.write('S')
296
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
298
# seems best to treat this as success from point-of-view of unittest
299
# -- it actually does nothing so it barely matters :)
302
except KeyboardInterrupt:
305
self.addError(test, test.__exc_info())
307
unittest.TestResult.addSuccess(self, test)
309
def printErrorList(self, flavour, errors):
310
for test, err in errors:
311
self.stream.writeln(self.separator1)
312
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
313
if getattr(test, '_get_log', None) is not None:
315
print >>self.stream, \
316
('vvvv[log from %s]' % test.id()).ljust(78,'-')
317
print >>self.stream, test._get_log()
318
print >>self.stream, \
319
('^^^^[log from %s]' % test.id()).ljust(78,'-')
320
self.stream.writeln(self.separator2)
321
self.stream.writeln("%s" % err)
324
class TextTestRunner(object):
325
stop_on_failure = False
334
self.stream = unittest._WritelnDecorator(stream)
335
self.descriptions = descriptions
336
self.verbosity = verbosity
337
self.keep_output = keep_output
339
self._bench_history = bench_history
341
def _makeResult(self):
342
result = _MyResult(self.stream,
346
bench_history=self._bench_history)
347
result.stop_early = self.stop_on_failure
351
"Run the given test case or test suite."
352
result = self._makeResult()
353
startTime = time.time()
354
if self.pb is not None:
355
self.pb.update('Running tests', 0, test.countTestCases())
357
stopTime = time.time()
358
timeTaken = stopTime - startTime
360
self.stream.writeln(result.separator2)
361
run = result.testsRun
362
self.stream.writeln("Ran %d test%s in %.3fs" %
363
(run, run != 1 and "s" or "", timeTaken))
364
self.stream.writeln()
365
if not result.wasSuccessful():
366
self.stream.write("FAILED (")
367
failed, errored = map(len, (result.failures, result.errors))
369
self.stream.write("failures=%d" % failed)
371
if failed: self.stream.write(", ")
372
self.stream.write("errors=%d" % errored)
373
self.stream.writeln(")")
375
self.stream.writeln("OK")
376
if self.pb is not None:
377
self.pb.update('Cleaning up', 0, 1)
378
# This is still a little bogus,
379
# but only a little. Folk not using our testrunner will
380
# have to delete their temp directories themselves.
381
test_root = TestCaseInTempDir.TEST_ROOT
382
if result.wasSuccessful() or not self.keep_output:
383
if test_root is not None:
384
# If LANG=C we probably have created some bogus paths
385
# which rmtree(unicode) will fail to delete
386
# so make sure we are using rmtree(str) to delete everything
387
# except on win32, where rmtree(str) will fail
388
# since it doesn't have the property of byte-stream paths
389
# (they are either ascii or mbcs)
390
if sys.platform == 'win32':
391
# make sure we are using the unicode win32 api
392
test_root = unicode(test_root)
394
test_root = test_root.encode(
395
sys.getfilesystemencoding())
396
osutils.rmtree(test_root)
398
if self.pb is not None:
399
self.pb.note("Failed tests working directories are in '%s'\n",
403
"Failed tests working directories are in '%s'\n" %
405
TestCaseInTempDir.TEST_ROOT = None
406
if self.pb is not None:
411
def iter_suite_tests(suite):
412
"""Return all tests in a suite, recursing through nested suites"""
413
for item in suite._tests:
414
if isinstance(item, unittest.TestCase):
416
elif isinstance(item, unittest.TestSuite):
417
for r in iter_suite_tests(item):
420
raise Exception('unknown object %r inside test suite %r'
424
class TestSkipped(Exception):
425
"""Indicates that a test was intentionally skipped, rather than failing."""
428
class CommandFailed(Exception):
432
class StringIOWrapper(object):
433
"""A wrapper around cStringIO which just adds an encoding attribute.
435
Internally we can check sys.stdout to see what the output encoding
436
should be. However, cStringIO has no encoding attribute that we can
437
set. So we wrap it instead.
442
def __init__(self, s=None):
444
self.__dict__['_cstring'] = StringIO(s)
446
self.__dict__['_cstring'] = StringIO()
448
def __getattr__(self, name, getattr=getattr):
449
return getattr(self.__dict__['_cstring'], name)
451
def __setattr__(self, name, val):
452
if name == 'encoding':
453
self.__dict__['encoding'] = val
455
return setattr(self._cstring, name, val)
458
class TestCase(unittest.TestCase):
459
"""Base class for bzr unit tests.
461
Tests that need access to disk resources should subclass
462
TestCaseInTempDir not TestCase.
464
Error and debug log messages are redirected from their usual
465
location into a temporary file, the contents of which can be
466
retrieved by _get_log(). We use a real OS file, not an in-memory object,
467
so that it can also capture file IO. When the test completes this file
468
is read into memory and removed from disk.
470
There are also convenience functions to invoke bzr's command-line
471
routine, and to build and check bzr trees.
473
In addition to the usual method of overriding tearDown(), this class also
474
allows subclasses to register functions into the _cleanups list, which is
475
run in order as the object is torn down. It's less likely this will be
476
accidentally overlooked.
479
_log_file_name = None
481
# record lsprof data when performing benchmark calls.
482
_gather_lsprof_in_benchmarks = False
484
def __init__(self, methodName='testMethod'):
485
super(TestCase, self).__init__(methodName)
489
unittest.TestCase.setUp(self)
490
self._cleanEnvironment()
491
bzrlib.trace.disable_default_logging()
493
self._benchcalls = []
494
self._benchtime = None
496
def _ndiff_strings(self, a, b):
497
"""Return ndiff between two strings containing lines.
499
A trailing newline is added if missing to make the strings
501
if b and b[-1] != '\n':
503
if a and a[-1] != '\n':
505
difflines = difflib.ndiff(a.splitlines(True),
507
linejunk=lambda x: False,
508
charjunk=lambda x: False)
509
return ''.join(difflines)
511
def assertEqualDiff(self, a, b, message=None):
512
"""Assert two texts are equal, if not raise an exception.
514
This is intended for use with multi-line strings where it can
515
be hard to find the differences by eye.
517
# TODO: perhaps override assertEquals to call this for strings?
521
message = "texts not equal:\n"
522
raise AssertionError(message +
523
self._ndiff_strings(a, b))
525
def assertEqualMode(self, mode, mode_test):
526
self.assertEqual(mode, mode_test,
527
'mode mismatch %o != %o' % (mode, mode_test))
529
def assertStartsWith(self, s, prefix):
530
if not s.startswith(prefix):
531
raise AssertionError('string %r does not start with %r' % (s, prefix))
533
def assertEndsWith(self, s, suffix):
534
"""Asserts that s ends with suffix."""
535
if not s.endswith(suffix):
536
raise AssertionError('string %r does not end with %r' % (s, suffix))
538
def assertContainsRe(self, haystack, needle_re):
539
"""Assert that a contains something matching a regular expression."""
540
if not re.search(needle_re, haystack):
541
raise AssertionError('pattern "%s" not found in "%s"'
542
% (needle_re, haystack))
544
def assertNotContainsRe(self, haystack, needle_re):
545
"""Assert that a does not match a regular expression"""
546
if re.search(needle_re, haystack):
547
raise AssertionError('pattern "%s" found in "%s"'
548
% (needle_re, haystack))
550
def assertSubset(self, sublist, superlist):
551
"""Assert that every entry in sublist is present in superlist."""
553
for entry in sublist:
554
if entry not in superlist:
555
missing.append(entry)
557
raise AssertionError("value(s) %r not present in container %r" %
558
(missing, superlist))
560
def assertIs(self, left, right):
561
if not (left is right):
562
raise AssertionError("%r is not %r." % (left, right))
564
def assertTransportMode(self, transport, path, mode):
565
"""Fail if a path does not have mode mode.
567
If modes are not supported on this transport, the assertion is ignored.
569
if not transport._can_roundtrip_unix_modebits():
571
path_stat = transport.stat(path)
572
actual_mode = stat.S_IMODE(path_stat.st_mode)
573
self.assertEqual(mode, actual_mode,
574
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
576
def assertIsInstance(self, obj, kls):
577
"""Fail if obj is not an instance of kls"""
578
if not isinstance(obj, kls):
579
self.fail("%r is an instance of %s rather than %s" % (
580
obj, obj.__class__, kls))
582
def _capture_warnings(self, a_callable, *args, **kwargs):
583
"""A helper for callDeprecated and applyDeprecated.
585
:param a_callable: A callable to call.
586
:param args: The positional arguments for the callable
587
:param kwargs: The keyword arguments for the callable
588
:return: A tuple (warnings, result). result is the result of calling
589
a_callable(*args, **kwargs).
592
def capture_warnings(msg, cls, stacklevel=None):
593
# we've hooked into a deprecation specific callpath,
594
# only deprecations should getting sent via it.
595
self.assertEqual(cls, DeprecationWarning)
596
local_warnings.append(msg)
597
original_warning_method = symbol_versioning.warn
598
symbol_versioning.set_warning_method(capture_warnings)
600
result = a_callable(*args, **kwargs)
602
symbol_versioning.set_warning_method(original_warning_method)
603
return (local_warnings, result)
605
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
606
"""Call a deprecated callable without warning the user.
608
:param deprecation_format: The deprecation format that the callable
609
should have been deprecated with. This is the same type as the
610
parameter to deprecated_method/deprecated_function. If the
611
callable is not deprecated with this format, an assertion error
613
:param a_callable: A callable to call. This may be a bound method or
614
a regular function. It will be called with *args and **kwargs.
615
:param args: The positional arguments for the callable
616
:param kwargs: The keyword arguments for the callable
617
:return: The result of a_callable(*args, **kwargs)
619
call_warnings, result = self._capture_warnings(a_callable,
621
expected_first_warning = symbol_versioning.deprecation_string(
622
a_callable, deprecation_format)
623
if len(call_warnings) == 0:
624
self.fail("No assertion generated by call to %s" %
626
self.assertEqual(expected_first_warning, call_warnings[0])
629
def callDeprecated(self, expected, callable, *args, **kwargs):
630
"""Assert that a callable is deprecated in a particular way.
632
This is a very precise test for unusual requirements. The
633
applyDeprecated helper function is probably more suited for most tests
634
as it allows you to simply specify the deprecation format being used
635
and will ensure that that is issued for the function being called.
637
:param expected: a list of the deprecation warnings expected, in order
638
:param callable: The callable to call
639
:param args: The positional arguments for the callable
640
:param kwargs: The keyword arguments for the callable
642
call_warnings, result = self._capture_warnings(callable,
644
self.assertEqual(expected, call_warnings)
647
def _startLogFile(self):
648
"""Send bzr and test log messages to a temporary file.
650
The file is removed as the test is torn down.
652
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
653
self._log_file = os.fdopen(fileno, 'w+')
654
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
655
self._log_file_name = name
656
self.addCleanup(self._finishLogFile)
658
def _finishLogFile(self):
659
"""Finished with the log file.
661
Read contents into memory, close, and delete.
663
if self._log_file is None:
665
bzrlib.trace.disable_test_log(self._log_nonce)
666
self._log_file.seek(0)
667
self._log_contents = self._log_file.read()
668
self._log_file.close()
669
os.remove(self._log_file_name)
670
self._log_file = self._log_file_name = None
672
def addCleanup(self, callable):
673
"""Arrange to run a callable when this case is torn down.
675
Callables are run in the reverse of the order they are registered,
676
ie last-in first-out.
678
if callable in self._cleanups:
679
raise ValueError("cleanup function %r already registered on %s"
681
self._cleanups.append(callable)
683
def _cleanEnvironment(self):
686
'APPDATA': os.getcwd(),
688
'BZREMAIL': None, # may still be present in the environment
690
'BZR_PROGRESS_BAR': None,
693
self.addCleanup(self._restoreEnvironment)
694
for name, value in new_env.iteritems():
695
self._captureVar(name, value)
697
def _captureVar(self, name, newvalue):
698
"""Set an environment variable, and reset it when finished."""
699
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
701
def _restoreEnvironment(self):
702
for name, value in self.__old_env.iteritems():
703
osutils.set_or_unset_env(name, value)
707
unittest.TestCase.tearDown(self)
709
def time(self, callable, *args, **kwargs):
710
"""Run callable and accrue the time it takes to the benchmark time.
712
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
713
this will cause lsprofile statistics to be gathered and stored in
716
if self._benchtime is None:
720
if not self._gather_lsprof_in_benchmarks:
721
return callable(*args, **kwargs)
723
# record this benchmark
724
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
726
self._benchcalls.append(((callable, args, kwargs), stats))
729
self._benchtime += time.time() - start
731
def _runCleanups(self):
732
"""Run registered cleanup functions.
734
This should only be called from TestCase.tearDown.
736
# TODO: Perhaps this should keep running cleanups even if
738
for cleanup_fn in reversed(self._cleanups):
741
def log(self, *args):
745
"""Return as a string the log for this test"""
746
if self._log_file_name:
747
return open(self._log_file_name).read()
749
return self._log_contents
750
# TODO: Delete the log after it's been read in
752
def capture(self, cmd, retcode=0):
753
"""Shortcut that splits cmd into words, runs, and returns stdout"""
754
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
756
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
757
"""Invoke bzr and return (stdout, stderr).
759
Useful for code that wants to check the contents of the
760
output, the way error messages are presented, etc.
762
This should be the main method for tests that want to exercise the
763
overall behavior of the bzr application (rather than a unit test
764
or a functional test of the library.)
766
Much of the old code runs bzr by forking a new copy of Python, but
767
that is slower, harder to debug, and generally not necessary.
769
This runs bzr through the interface that catches and reports
770
errors, and with logging set to something approximating the
771
default, so that error reporting can be checked.
773
:param argv: arguments to invoke bzr
774
:param retcode: expected return code, or None for don't-care.
775
:param encoding: encoding for sys.stdout and sys.stderr
776
:param stdin: A string to be used as stdin for the command.
779
encoding = bzrlib.user_encoding
780
if stdin is not None:
781
stdin = StringIO(stdin)
782
stdout = StringIOWrapper()
783
stderr = StringIOWrapper()
784
stdout.encoding = encoding
785
stderr.encoding = encoding
787
self.log('run bzr: %r', argv)
788
# FIXME: don't call into logging here
789
handler = logging.StreamHandler(stderr)
790
handler.setLevel(logging.INFO)
791
logger = logging.getLogger('')
792
logger.addHandler(handler)
793
old_ui_factory = bzrlib.ui.ui_factory
794
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
797
bzrlib.ui.ui_factory.stdin = stdin
799
result = self.apply_redirected(stdin, stdout, stderr,
800
bzrlib.commands.run_bzr_catch_errors,
803
logger.removeHandler(handler)
804
bzrlib.ui.ui_factory = old_ui_factory
806
out = stdout.getvalue()
807
err = stderr.getvalue()
809
self.log('output:\n%r', out)
811
self.log('errors:\n%r', err)
812
if retcode is not None:
813
self.assertEquals(retcode, result)
816
def run_bzr(self, *args, **kwargs):
817
"""Invoke bzr, as if it were run from the command line.
819
This should be the main method for tests that want to exercise the
820
overall behavior of the bzr application (rather than a unit test
821
or a functional test of the library.)
823
This sends the stdout/stderr results into the test's log,
824
where it may be useful for debugging. See also run_captured.
826
:param stdin: A string to be used as stdin for the command.
828
retcode = kwargs.pop('retcode', 0)
829
encoding = kwargs.pop('encoding', None)
830
stdin = kwargs.pop('stdin', None)
831
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
833
def run_bzr_decode(self, *args, **kwargs):
834
if 'encoding' in kwargs:
835
encoding = kwargs['encoding']
837
encoding = bzrlib.user_encoding
838
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
840
def run_bzr_error(self, error_regexes, *args, **kwargs):
841
"""Run bzr, and check that stderr contains the supplied regexes
843
:param error_regexes: Sequence of regular expressions which
844
must each be found in the error output. The relative ordering
846
:param args: command-line arguments for bzr
847
:param kwargs: Keyword arguments which are interpreted by run_bzr
848
This function changes the default value of retcode to be 3,
849
since in most cases this is run when you expect bzr to fail.
850
:return: (out, err) The actual output of running the command (in case you
851
want to do more inspection)
854
# Make sure that commit is failing because there is nothing to do
855
self.run_bzr_error(['no changes to commit'],
856
'commit', '-m', 'my commit comment')
857
# Make sure --strict is handling an unknown file, rather than
858
# giving us the 'nothing to do' error
859
self.build_tree(['unknown'])
860
self.run_bzr_error(['Commit refused because there are unknown files'],
861
'commit', '--strict', '-m', 'my commit comment')
863
kwargs.setdefault('retcode', 3)
864
out, err = self.run_bzr(*args, **kwargs)
865
for regex in error_regexes:
866
self.assertContainsRe(err, regex)
869
def run_bzr_subprocess(self, *args, **kwargs):
870
"""Run bzr in a subprocess for testing.
872
This starts a new Python interpreter and runs bzr in there.
873
This should only be used for tests that have a justifiable need for
874
this isolation: e.g. they are testing startup time, or signal
875
handling, or early startup code, etc. Subprocess code can't be
876
profiled or debugged so easily.
878
:param retcode: The status code that is expected. Defaults to 0. If
879
None is supplied, the status code is not checked.
880
:param env_changes: A dictionary which lists changes to environment
881
variables. A value of None will unset the env variable.
882
The values must be strings. The change will only occur in the
883
child, so you don't need to fix the environment after running.
884
:param universal_newlines: Convert CRLF => LF
886
env_changes = kwargs.get('env_changes', {})
890
def cleanup_environment():
891
for env_var, value in env_changes.iteritems():
892
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
894
def restore_environment():
895
for env_var, value in old_env.iteritems():
896
osutils.set_or_unset_env(env_var, value)
898
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
902
# win32 subprocess doesn't support preexec_fn
903
# so we will avoid using it on all platforms, just to
904
# make sure the code path is used, and we don't break on win32
905
cleanup_environment()
906
process = Popen([sys.executable, bzr_path]+args,
907
stdout=PIPE, stderr=PIPE)
909
restore_environment()
911
out = process.stdout.read()
912
err = process.stderr.read()
914
if kwargs.get('universal_newlines', False):
915
out = out.replace('\r\n', '\n')
916
err = err.replace('\r\n', '\n')
918
retcode = process.wait()
919
supplied_retcode = kwargs.get('retcode', 0)
920
if supplied_retcode is not None:
921
assert supplied_retcode == retcode
924
def check_inventory_shape(self, inv, shape):
925
"""Compare an inventory to a list of expected names.
927
Fail if they are not precisely equal.
930
shape = list(shape) # copy
931
for path, ie in inv.entries():
932
name = path.replace('\\', '/')
940
self.fail("expected paths not found in inventory: %r" % shape)
942
self.fail("unexpected paths found in inventory: %r" % extras)
944
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
945
a_callable=None, *args, **kwargs):
946
"""Call callable with redirected std io pipes.
948
Returns the return code."""
949
if not callable(a_callable):
950
raise ValueError("a_callable must be callable.")
954
if getattr(self, "_log_file", None) is not None:
955
stdout = self._log_file
959
if getattr(self, "_log_file", None is not None):
960
stderr = self._log_file
963
real_stdin = sys.stdin
964
real_stdout = sys.stdout
965
real_stderr = sys.stderr
970
return a_callable(*args, **kwargs)
972
sys.stdout = real_stdout
973
sys.stderr = real_stderr
974
sys.stdin = real_stdin
976
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
977
def merge(self, branch_from, wt_to):
978
"""A helper for tests to do a ui-less merge.
980
This should move to the main library when someone has time to integrate
983
# minimal ui-less merge.
984
wt_to.branch.fetch(branch_from)
985
base_rev = common_ancestor(branch_from.last_revision(),
986
wt_to.branch.last_revision(),
987
wt_to.branch.repository)
988
merge_inner(wt_to.branch, branch_from.basis_tree(),
989
wt_to.branch.repository.revision_tree(base_rev),
991
wt_to.add_parent_tree_id(branch_from.last_revision())
994
BzrTestBase = TestCase
997
class TestCaseInTempDir(TestCase):
998
"""Derived class that runs a test within a temporary directory.
1000
This is useful for tests that need to create a branch, etc.
1002
The directory is created in a slightly complex way: for each
1003
Python invocation, a new temporary top-level directory is created.
1004
All test cases create their own directory within that. If the
1005
tests complete successfully, the directory is removed.
1007
InTempDir is an old alias for FunctionalTestCase.
1012
OVERRIDE_PYTHON = 'python'
1014
def check_file_contents(self, filename, expect):
1015
self.log("check contents of file %s" % filename)
1016
contents = file(filename, 'r').read()
1017
if contents != expect:
1018
self.log("expected: %r" % expect)
1019
self.log("actually: %r" % contents)
1020
self.fail("contents of %s not as expected" % filename)
1022
def _make_test_root(self):
1023
if TestCaseInTempDir.TEST_ROOT is not None:
1027
root = u'test%04d.tmp' % i
1031
if e.errno == errno.EEXIST:
1036
# successfully created
1037
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
1039
# make a fake bzr directory there to prevent any tests propagating
1040
# up onto the source directory's real branch
1041
bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
1044
super(TestCaseInTempDir, self).setUp()
1045
self._make_test_root()
1046
_currentdir = os.getcwdu()
1047
# shorten the name, to avoid test failures due to path length
1048
short_id = self.id().replace('bzrlib.tests.', '') \
1049
.replace('__main__.', '')[-100:]
1050
# it's possible the same test class is run several times for
1051
# parameterized tests, so make sure the names don't collide.
1055
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1057
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1058
if os.path.exists(candidate_dir):
1062
os.mkdir(candidate_dir)
1063
self.test_home_dir = candidate_dir + '/home'
1064
os.mkdir(self.test_home_dir)
1065
self.test_dir = candidate_dir + '/work'
1066
os.mkdir(self.test_dir)
1067
os.chdir(self.test_dir)
1069
os.environ['HOME'] = self.test_home_dir
1070
os.environ['APPDATA'] = self.test_home_dir
1071
def _leaveDirectory():
1072
os.chdir(_currentdir)
1073
self.addCleanup(_leaveDirectory)
1075
def build_tree(self, shape, line_endings='native', transport=None):
1076
"""Build a test tree according to a pattern.
1078
shape is a sequence of file specifications. If the final
1079
character is '/', a directory is created.
1081
This assumes that all the elements in the tree being built are new.
1083
This doesn't add anything to a branch.
1084
:param line_endings: Either 'binary' or 'native'
1085
in binary mode, exact contents are written
1086
in native mode, the line endings match the
1087
default platform endings.
1089
:param transport: A transport to write to, for building trees on
1090
VFS's. If the transport is readonly or None,
1091
"." is opened automatically.
1093
# It's OK to just create them using forward slashes on windows.
1094
if transport is None or transport.is_readonly():
1095
transport = get_transport(".")
1097
self.assert_(isinstance(name, basestring))
1099
transport.mkdir(urlutils.escape(name[:-1]))
1101
if line_endings == 'binary':
1103
elif line_endings == 'native':
1106
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
1107
content = "contents of %s%s" % (name.encode('utf-8'), end)
1108
# Technically 'put()' is the right command. However, put
1109
# uses an AtomicFile, which requires an extra rename into place
1110
# As long as the files didn't exist in the past, append() will
1111
# do the same thing as put()
1112
# On jam's machine, make_kernel_like_tree is:
1113
# put: 4.5-7.5s (averaging 6s)
1115
# put_non_atomic: 2.9-4.5s
1116
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1118
def build_tree_contents(self, shape):
1119
build_tree_contents(shape)
1121
def failUnlessExists(self, path):
1122
"""Fail unless path, which may be abs or relative, exists."""
1123
self.failUnless(osutils.lexists(path))
1125
def failIfExists(self, path):
1126
"""Fail if path, which may be abs or relative, exists."""
1127
self.failIf(osutils.lexists(path))
1129
def assertFileEqual(self, content, path):
1130
"""Fail if path does not contain 'content'."""
1131
self.failUnless(osutils.lexists(path))
1132
# TODO: jam 20060427 Shouldn't this be 'rb'?
1133
self.assertEqualDiff(content, open(path, 'r').read())
1136
class TestCaseWithTransport(TestCaseInTempDir):
1137
"""A test case that provides get_url and get_readonly_url facilities.
1139
These back onto two transport servers, one for readonly access and one for
1142
If no explicit class is provided for readonly access, a
1143
ReadonlyTransportDecorator is used instead which allows the use of non disk
1144
based read write transports.
1146
If an explicit class is provided for readonly access, that server and the
1147
readwrite one must both define get_url() as resolving to os.getcwd().
1150
def __init__(self, methodName='testMethod'):
1151
super(TestCaseWithTransport, self).__init__(methodName)
1152
self.__readonly_server = None
1153
self.__server = None
1154
self.transport_server = default_transport
1155
self.transport_readonly_server = None
1157
def get_readonly_url(self, relpath=None):
1158
"""Get a URL for the readonly transport.
1160
This will either be backed by '.' or a decorator to the transport
1161
used by self.get_url()
1162
relpath provides for clients to get a path relative to the base url.
1163
These should only be downwards relative, not upwards.
1165
base = self.get_readonly_server().get_url()
1166
if relpath is not None:
1167
if not base.endswith('/'):
1169
base = base + relpath
1172
def get_readonly_server(self):
1173
"""Get the server instance for the readonly transport
1175
This is useful for some tests with specific servers to do diagnostics.
1177
if self.__readonly_server is None:
1178
if self.transport_readonly_server is None:
1179
# readonly decorator requested
1180
# bring up the server
1182
self.__readonly_server = ReadonlyServer()
1183
self.__readonly_server.setUp(self.__server)
1185
self.__readonly_server = self.transport_readonly_server()
1186
self.__readonly_server.setUp()
1187
self.addCleanup(self.__readonly_server.tearDown)
1188
return self.__readonly_server
1190
def get_server(self):
1191
"""Get the read/write server instance.
1193
This is useful for some tests with specific servers that need
1196
if self.__server is None:
1197
self.__server = self.transport_server()
1198
self.__server.setUp()
1199
self.addCleanup(self.__server.tearDown)
1200
return self.__server
1202
def get_url(self, relpath=None):
1203
"""Get a URL for the readwrite transport.
1205
This will either be backed by '.' or to an equivalent non-file based
1207
relpath provides for clients to get a path relative to the base url.
1208
These should only be downwards relative, not upwards.
1210
base = self.get_server().get_url()
1211
if relpath is not None and relpath != '.':
1212
if not base.endswith('/'):
1214
base = base + urlutils.escape(relpath)
1217
def get_transport(self):
1218
"""Return a writeable transport for the test scratch space"""
1219
t = get_transport(self.get_url())
1220
self.assertFalse(t.is_readonly())
1223
def get_readonly_transport(self):
1224
"""Return a readonly transport for the test scratch space
1226
This can be used to test that operations which should only need
1227
readonly access in fact do not try to write.
1229
t = get_transport(self.get_readonly_url())
1230
self.assertTrue(t.is_readonly())
1233
def make_branch(self, relpath, format=None):
1234
"""Create a branch on the transport at relpath."""
1235
repo = self.make_repository(relpath, format=format)
1236
return repo.bzrdir.create_branch()
1238
def make_bzrdir(self, relpath, format=None):
1240
url = self.get_url(relpath)
1241
mutter('relpath %r => url %r', relpath, url)
1242
segments = url.split('/')
1243
if segments and segments[-1] not in ('', '.'):
1244
parent = '/'.join(segments[:-1])
1245
t = get_transport(parent)
1247
t.mkdir(segments[-1])
1248
except errors.FileExists:
1251
format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1252
# FIXME: make this use a single transport someday. RBC 20060418
1253
return format.initialize_on_transport(get_transport(relpath))
1254
except errors.UninitializableFormat:
1255
raise TestSkipped("Format %s is not initializable." % format)
1257
def make_repository(self, relpath, shared=False, format=None):
1258
"""Create a repository on our default transport at relpath."""
1259
made_control = self.make_bzrdir(relpath, format=format)
1260
return made_control.create_repository(shared=shared)
1262
def make_branch_and_tree(self, relpath, format=None):
1263
"""Create a branch on the transport and a tree locally.
1267
# TODO: always use the local disk path for the working tree,
1268
# this obviously requires a format that supports branch references
1269
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1271
b = self.make_branch(relpath, format=format)
1273
return b.bzrdir.create_workingtree()
1274
except errors.NotLocalUrl:
1275
# new formats - catch No tree error and create
1276
# a branch reference and a checkout.
1277
# old formats at that point - raise TestSkipped.
1278
# TODO: rbc 20060208
1279
return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
1281
def assertIsDirectory(self, relpath, transport):
1282
"""Assert that relpath within transport is a directory.
1284
This may not be possible on all transports; in that case it propagates
1285
a TransportNotPossible.
1288
mode = transport.stat(relpath).st_mode
1289
except errors.NoSuchFile:
1290
self.fail("path %s is not a directory; no such file"
1292
if not stat.S_ISDIR(mode):
1293
self.fail("path %s is not a directory; has mode %#o"
1297
class ChrootedTestCase(TestCaseWithTransport):
1298
"""A support class that provides readonly urls outside the local namespace.
1300
This is done by checking if self.transport_server is a MemoryServer. if it
1301
is then we are chrooted already, if it is not then an HttpServer is used
1304
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1305
be used without needed to redo it when a different
1306
subclass is in use ?
1310
super(ChrootedTestCase, self).setUp()
1311
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1312
self.transport_readonly_server = bzrlib.transport.http.HttpServer
1315
def filter_suite_by_re(suite, pattern):
1316
result = TestUtil.TestSuite()
1317
filter_re = re.compile(pattern)
1318
for test in iter_suite_tests(suite):
1319
if filter_re.search(test.id()):
1320
result.addTest(test)
1324
def run_suite(suite, name='test', verbose=False, pattern=".*",
1325
stop_on_failure=False, keep_output=False,
1326
transport=None, lsprof_timed=None, bench_history=None):
1327
TestCaseInTempDir._TEST_NAME = name
1328
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1334
pb = progress.ProgressBar()
1335
runner = TextTestRunner(stream=sys.stdout,
1337
verbosity=verbosity,
1338
keep_output=keep_output,
1340
bench_history=bench_history)
1341
runner.stop_on_failure=stop_on_failure
1343
suite = filter_suite_by_re(suite, pattern)
1344
result = runner.run(suite)
1345
return result.wasSuccessful()
1348
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1351
test_suite_factory=None,
1353
bench_history=None):
1354
"""Run the whole test suite under the enhanced runner"""
1355
# XXX: Very ugly way to do this...
1356
# Disable warning about old formats because we don't want it to disturb
1357
# any blackbox tests.
1358
from bzrlib import repository
1359
repository._deprecation_warning_done = True
1361
global default_transport
1362
if transport is None:
1363
transport = default_transport
1364
old_transport = default_transport
1365
default_transport = transport
1367
if test_suite_factory is None:
1368
suite = test_suite()
1370
suite = test_suite_factory()
1371
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1372
stop_on_failure=stop_on_failure, keep_output=keep_output,
1373
transport=transport,
1374
lsprof_timed=lsprof_timed,
1375
bench_history=bench_history)
1377
default_transport = old_transport
1381
"""Build and return TestSuite for the whole of bzrlib.
1383
This function can be replaced if you need to change the default test
1384
suite on a global basis, but it is not encouraged.
1387
'bzrlib.tests.test_ancestry',
1388
'bzrlib.tests.test_api',
1389
'bzrlib.tests.test_atomicfile',
1390
'bzrlib.tests.test_bad_files',
1391
'bzrlib.tests.test_branch',
1392
'bzrlib.tests.test_bundle',
1393
'bzrlib.tests.test_bzrdir',
1394
'bzrlib.tests.test_cache_utf8',
1395
'bzrlib.tests.test_command',
1396
'bzrlib.tests.test_commit',
1397
'bzrlib.tests.test_commit_merge',
1398
'bzrlib.tests.test_config',
1399
'bzrlib.tests.test_conflicts',
1400
'bzrlib.tests.test_decorators',
1401
'bzrlib.tests.test_diff',
1402
'bzrlib.tests.test_doc_generate',
1403
'bzrlib.tests.test_errors',
1404
'bzrlib.tests.test_escaped_store',
1405
'bzrlib.tests.test_fetch',
1406
'bzrlib.tests.test_gpg',
1407
'bzrlib.tests.test_graph',
1408
'bzrlib.tests.test_hashcache',
1409
'bzrlib.tests.test_http',
1410
'bzrlib.tests.test_http_response',
1411
'bzrlib.tests.test_identitymap',
1412
'bzrlib.tests.test_ignores',
1413
'bzrlib.tests.test_inv',
1414
'bzrlib.tests.test_knit',
1415
'bzrlib.tests.test_lazy_import',
1416
'bzrlib.tests.test_lockdir',
1417
'bzrlib.tests.test_lockable_files',
1418
'bzrlib.tests.test_log',
1419
'bzrlib.tests.test_merge',
1420
'bzrlib.tests.test_merge3',
1421
'bzrlib.tests.test_merge_core',
1422
'bzrlib.tests.test_missing',
1423
'bzrlib.tests.test_msgeditor',
1424
'bzrlib.tests.test_nonascii',
1425
'bzrlib.tests.test_options',
1426
'bzrlib.tests.test_osutils',
1427
'bzrlib.tests.test_patch',
1428
'bzrlib.tests.test_patches',
1429
'bzrlib.tests.test_permissions',
1430
'bzrlib.tests.test_plugins',
1431
'bzrlib.tests.test_progress',
1432
'bzrlib.tests.test_reconcile',
1433
'bzrlib.tests.test_repository',
1434
'bzrlib.tests.test_revert',
1435
'bzrlib.tests.test_revision',
1436
'bzrlib.tests.test_revisionnamespaces',
1437
'bzrlib.tests.test_revisiontree',
1438
'bzrlib.tests.test_rio',
1439
'bzrlib.tests.test_sampler',
1440
'bzrlib.tests.test_selftest',
1441
'bzrlib.tests.test_setup',
1442
'bzrlib.tests.test_sftp_transport',
1443
'bzrlib.tests.test_ftp_transport',
1444
'bzrlib.tests.test_smart_add',
1445
'bzrlib.tests.test_source',
1446
'bzrlib.tests.test_status',
1447
'bzrlib.tests.test_store',
1448
'bzrlib.tests.test_symbol_versioning',
1449
'bzrlib.tests.test_testament',
1450
'bzrlib.tests.test_textfile',
1451
'bzrlib.tests.test_textmerge',
1452
'bzrlib.tests.test_trace',
1453
'bzrlib.tests.test_transactions',
1454
'bzrlib.tests.test_transform',
1455
'bzrlib.tests.test_transport',
1456
'bzrlib.tests.test_tree',
1457
'bzrlib.tests.test_tsort',
1458
'bzrlib.tests.test_tuned_gzip',
1459
'bzrlib.tests.test_ui',
1460
'bzrlib.tests.test_upgrade',
1461
'bzrlib.tests.test_urlutils',
1462
'bzrlib.tests.test_versionedfile',
1463
'bzrlib.tests.test_version',
1464
'bzrlib.tests.test_weave',
1465
'bzrlib.tests.test_whitebox',
1466
'bzrlib.tests.test_workingtree',
1467
'bzrlib.tests.test_xml',
1469
test_transport_implementations = [
1470
'bzrlib.tests.test_transport_implementations',
1471
'bzrlib.tests.test_read_bundle',
1473
suite = TestUtil.TestSuite()
1474
loader = TestUtil.TestLoader()
1475
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1476
from bzrlib.transport import TransportTestProviderAdapter
1477
adapter = TransportTestProviderAdapter()
1478
adapt_modules(test_transport_implementations, adapter, loader, suite)
1479
for package in packages_to_test():
1480
suite.addTest(package.test_suite())
1481
for m in MODULES_TO_TEST:
1482
suite.addTest(loader.loadTestsFromModule(m))
1483
for m in MODULES_TO_DOCTEST:
1485
suite.addTest(doctest.DocTestSuite(m))
1486
except ValueError, e:
1487
print '**failed to get doctest for: %s\n%s' %(m,e)
1489
for name, plugin in bzrlib.plugin.all_plugins().items():
1490
if getattr(plugin, 'test_suite', None) is not None:
1491
suite.addTest(plugin.test_suite())
1495
def adapt_modules(mods_list, adapter, loader, suite):
1496
"""Adapt the modules in mods_list using adapter and add to suite."""
1497
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1498
suite.addTests(adapter.adapt(test))