1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
30
from cStringIO import StringIO
39
from subprocess import Popen, PIPE
56
import bzrlib.commands
57
import bzrlib.bundle.serializer
59
import bzrlib.inventory
60
import bzrlib.iterablefile
65
# lsprof not available
67
from bzrlib.merge import merge_inner
71
from bzrlib.revision import common_ancestor
73
from bzrlib import symbol_versioning
75
from bzrlib.transport import get_transport
76
import bzrlib.transport
77
from bzrlib.transport.local import LocalURLServer
78
from bzrlib.transport.memory import MemoryServer
79
from bzrlib.transport.readonly import ReadonlyServer
80
from bzrlib.trace import mutter, note
81
from bzrlib.tests import TestUtil
82
from bzrlib.tests.HttpServer import HttpServer
83
from bzrlib.tests.TestUtil import (
87
from bzrlib.tests.treeshape import build_tree_contents
88
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
90
default_transport = LocalURLServer
93
MODULES_TO_DOCTEST = [
94
bzrlib.bundle.serializer,
105
NUMBERED_DIRS = False # dirs kind for TestCaseInTempDir (numbered or named)
108
def packages_to_test():
109
"""Return a list of packages to test.
111
The packages are not globally imported so that import failures are
112
triggered when running selftest, not when importing the command.
115
import bzrlib.tests.blackbox
116
import bzrlib.tests.branch_implementations
117
import bzrlib.tests.bzrdir_implementations
118
import bzrlib.tests.interrepository_implementations
119
import bzrlib.tests.interversionedfile_implementations
120
import bzrlib.tests.intertree_implementations
121
import bzrlib.tests.repository_implementations
122
import bzrlib.tests.revisionstore_implementations
123
import bzrlib.tests.tree_implementations
124
import bzrlib.tests.workingtree_implementations
127
bzrlib.tests.blackbox,
128
bzrlib.tests.branch_implementations,
129
bzrlib.tests.bzrdir_implementations,
130
bzrlib.tests.interrepository_implementations,
131
bzrlib.tests.interversionedfile_implementations,
132
bzrlib.tests.intertree_implementations,
133
bzrlib.tests.repository_implementations,
134
bzrlib.tests.revisionstore_implementations,
135
bzrlib.tests.tree_implementations,
136
bzrlib.tests.workingtree_implementations,
140
class ExtendedTestResult(unittest._TextTestResult):
141
"""Accepts, reports and accumulates the results of running tests.
143
Compared to this unittest version this class adds support for profiling,
144
benchmarking, stopping as soon as a test fails, and skipping tests.
145
There are further-specialized subclasses for different types of display.
150
def __init__(self, stream, descriptions, verbosity,
154
"""Construct new TestResult.
156
:param bench_history: Optionally, a writable file object to accumulate
159
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
160
if bench_history is not None:
161
from bzrlib.version import _get_bzr_source_tree
162
src_tree = _get_bzr_source_tree()
165
revision_id = src_tree.get_parent_ids()[0]
167
# XXX: if this is a brand new tree, do the same as if there
171
# XXX: If there's no branch, what should we do?
173
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
174
self._bench_history = bench_history
175
self.ui = bzrlib.ui.ui_factory
176
self.num_tests = num_tests
178
self.failure_count = 0
181
self._overall_start_time = time.time()
183
def extractBenchmarkTime(self, testCase):
184
"""Add a benchmark time for the current test case."""
185
self._benchmarkTime = getattr(testCase, "_benchtime", None)
187
def _elapsedTestTimeString(self):
188
"""Return a time string for the overall time the current test has taken."""
189
return self._formatTime(time.time() - self._start_time)
191
def _testTimeString(self):
192
if self._benchmarkTime is not None:
194
self._formatTime(self._benchmarkTime),
195
self._elapsedTestTimeString())
197
return " %s" % self._elapsedTestTimeString()
199
def _formatTime(self, seconds):
200
"""Format seconds as milliseconds with leading spaces."""
201
# some benchmarks can take thousands of seconds to run, so we need 8
203
return "%8dms" % (1000 * seconds)
205
def _shortened_test_description(self, test):
207
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
210
def startTest(self, test):
211
unittest.TestResult.startTest(self, test)
212
self.report_test_start(test)
213
test.number = self.count
214
self._recordTestStartTime()
216
def _recordTestStartTime(self):
217
"""Record that a test has started."""
218
self._start_time = time.time()
220
def addError(self, test, err):
221
if isinstance(err[1], TestSkipped):
222
return self.addSkipped(test, err)
223
unittest.TestResult.addError(self, test, err)
224
# We can only do this if we have one of our TestCases, not if
226
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
227
if setKeepLogfile is not None:
229
self.extractBenchmarkTime(test)
230
self.report_error(test, err)
234
def addFailure(self, test, err):
235
unittest.TestResult.addFailure(self, test, err)
236
# We can only do this if we have one of our TestCases, not if
238
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
239
if setKeepLogfile is not None:
241
self.extractBenchmarkTime(test)
242
self.report_failure(test, err)
246
def addSuccess(self, test):
247
self.extractBenchmarkTime(test)
248
if self._bench_history is not None:
249
if self._benchmarkTime is not None:
250
self._bench_history.write("%s %s\n" % (
251
self._formatTime(self._benchmarkTime),
253
self.report_success(test)
254
unittest.TestResult.addSuccess(self, test)
256
def addSkipped(self, test, skip_excinfo):
257
self.extractBenchmarkTime(test)
258
self.report_skip(test, skip_excinfo)
259
# seems best to treat this as success from point-of-view of unittest
260
# -- it actually does nothing so it barely matters :)
263
except KeyboardInterrupt:
266
self.addError(test, test.__exc_info())
268
unittest.TestResult.addSuccess(self, test)
270
def printErrorList(self, flavour, errors):
271
for test, err in errors:
272
self.stream.writeln(self.separator1)
273
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
274
if getattr(test, '_get_log', None) is not None:
276
print >>self.stream, \
277
('vvvv[log from %s]' % test.id()).ljust(78,'-')
278
print >>self.stream, test._get_log()
279
print >>self.stream, \
280
('^^^^[log from %s]' % test.id()).ljust(78,'-')
281
self.stream.writeln(self.separator2)
282
self.stream.writeln("%s" % err)
287
def report_cleaning_up(self):
290
def report_success(self, test):
294
class TextTestResult(ExtendedTestResult):
295
"""Displays progress and results of tests in text form"""
297
def __init__(self, *args, **kw):
298
ExtendedTestResult.__init__(self, *args, **kw)
299
self.pb = self.ui.nested_progress_bar()
300
self.pb.show_pct = False
301
self.pb.show_spinner = False
302
self.pb.show_eta = False,
303
self.pb.show_count = False
304
self.pb.show_bar = False
306
def report_starting(self):
307
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
309
def _progress_prefix_text(self):
310
a = '[%d' % self.count
311
if self.num_tests is not None:
312
a +='/%d' % self.num_tests
313
a += ' in %ds' % (time.time() - self._overall_start_time)
315
a += ', %d errors' % self.error_count
316
if self.failure_count:
317
a += ', %d failed' % self.failure_count
319
a += ', %d skipped' % self.skip_count
323
def report_test_start(self, test):
326
self._progress_prefix_text()
328
+ self._shortened_test_description(test))
330
def _test_description(self, test):
332
return '#%d %s' % (self.count,
333
self._shortened_test_description(test))
335
return self._shortened_test_description(test)
337
def report_error(self, test, err):
338
self.error_count += 1
339
self.pb.note('ERROR: %s\n %s\n',
340
self._test_description(test),
344
def report_failure(self, test, err):
345
self.failure_count += 1
346
self.pb.note('FAIL: %s\n %s\n',
347
self._test_description(test),
351
def report_skip(self, test, skip_excinfo):
354
# at the moment these are mostly not things we can fix
355
# and so they just produce stipple; use the verbose reporter
358
# show test and reason for skip
359
self.pb.note('SKIP: %s\n %s\n',
360
self._shortened_test_description(test),
363
# since the class name was left behind in the still-visible
365
self.pb.note('SKIP: %s', skip_excinfo[1])
367
def report_cleaning_up(self):
368
self.pb.update('cleaning up...')
374
class VerboseTestResult(ExtendedTestResult):
375
"""Produce long output, with one line per test run plus times"""
377
def _ellipsize_to_right(self, a_string, final_width):
378
"""Truncate and pad a string, keeping the right hand side"""
379
if len(a_string) > final_width:
380
result = '...' + a_string[3-final_width:]
383
return result.ljust(final_width)
385
def report_starting(self):
386
self.stream.write('running %d tests...\n' % self.num_tests)
388
def report_test_start(self, test):
390
name = self._shortened_test_description(test)
391
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
392
# numbers, plus a trailing blank
393
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
395
self.stream.write('%5d ' % self.count)
396
self.stream.write(self._ellipsize_to_right(name,
397
osutils.terminal_width()-36))
399
self.stream.write(self._ellipsize_to_right(name,
400
osutils.terminal_width()-30))
403
def _error_summary(self, err):
407
return '%s%s' % (indent, err[1])
409
def report_error(self, test, err):
410
self.error_count += 1
411
self.stream.writeln('ERROR %s\n%s'
412
% (self._testTimeString(),
413
self._error_summary(err)))
415
def report_failure(self, test, err):
416
self.failure_count += 1
417
self.stream.writeln(' FAIL %s\n%s'
418
% (self._testTimeString(),
419
self._error_summary(err)))
421
def report_success(self, test):
422
self.stream.writeln(' OK %s' % self._testTimeString())
423
for bench_called, stats in getattr(test, '_benchcalls', []):
424
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
425
stats.pprint(file=self.stream)
428
def report_skip(self, test, skip_excinfo):
430
self.stream.writeln(' SKIP %s\n%s'
431
% (self._testTimeString(),
432
self._error_summary(skip_excinfo)))
435
class TextTestRunner(object):
436
stop_on_failure = False
444
self.stream = unittest._WritelnDecorator(stream)
445
self.descriptions = descriptions
446
self.verbosity = verbosity
447
self.keep_output = keep_output
448
self._bench_history = bench_history
451
"Run the given test case or test suite."
452
startTime = time.time()
453
if self.verbosity == 1:
454
result_class = TextTestResult
455
elif self.verbosity >= 2:
456
result_class = VerboseTestResult
457
result = result_class(self.stream,
460
bench_history=self._bench_history,
461
num_tests=test.countTestCases(),
463
result.stop_early = self.stop_on_failure
464
result.report_starting()
466
stopTime = time.time()
467
timeTaken = stopTime - startTime
469
self.stream.writeln(result.separator2)
470
run = result.testsRun
471
self.stream.writeln("Ran %d test%s in %.3fs" %
472
(run, run != 1 and "s" or "", timeTaken))
473
self.stream.writeln()
474
if not result.wasSuccessful():
475
self.stream.write("FAILED (")
476
failed, errored = map(len, (result.failures, result.errors))
478
self.stream.write("failures=%d" % failed)
480
if failed: self.stream.write(", ")
481
self.stream.write("errors=%d" % errored)
482
self.stream.writeln(")")
484
self.stream.writeln("OK")
485
if result.skip_count > 0:
486
skipped = result.skip_count
487
self.stream.writeln('%d test%s skipped' %
488
(skipped, skipped != 1 and "s" or ""))
489
result.report_cleaning_up()
490
# This is still a little bogus,
491
# but only a little. Folk not using our testrunner will
492
# have to delete their temp directories themselves.
493
test_root = TestCaseWithMemoryTransport.TEST_ROOT
494
if result.wasSuccessful() or not self.keep_output:
495
if test_root is not None:
496
# If LANG=C we probably have created some bogus paths
497
# which rmtree(unicode) will fail to delete
498
# so make sure we are using rmtree(str) to delete everything
499
# except on win32, where rmtree(str) will fail
500
# since it doesn't have the property of byte-stream paths
501
# (they are either ascii or mbcs)
502
if sys.platform == 'win32':
503
# make sure we are using the unicode win32 api
504
test_root = unicode(test_root)
506
test_root = test_root.encode(
507
sys.getfilesystemencoding())
509
osutils.rmtree(test_root)
511
if sys.platform == 'win32' and e.errno == errno.EACCES:
512
print >>sys.stderr, ('Permission denied: '
513
'unable to remove testing dir '
514
'%s' % os.path.basename(test_root))
518
note("Failed tests working directories are in '%s'\n", test_root)
519
TestCaseWithMemoryTransport.TEST_ROOT = None
524
def iter_suite_tests(suite):
525
"""Return all tests in a suite, recursing through nested suites"""
526
for item in suite._tests:
527
if isinstance(item, unittest.TestCase):
529
elif isinstance(item, unittest.TestSuite):
530
for r in iter_suite_tests(item):
533
raise Exception('unknown object %r inside test suite %r'
537
class TestSkipped(Exception):
538
"""Indicates that a test was intentionally skipped, rather than failing."""
541
class CommandFailed(Exception):
545
class StringIOWrapper(object):
546
"""A wrapper around cStringIO which just adds an encoding attribute.
548
Internally we can check sys.stdout to see what the output encoding
549
should be. However, cStringIO has no encoding attribute that we can
550
set. So we wrap it instead.
555
def __init__(self, s=None):
557
self.__dict__['_cstring'] = StringIO(s)
559
self.__dict__['_cstring'] = StringIO()
561
def __getattr__(self, name, getattr=getattr):
562
return getattr(self.__dict__['_cstring'], name)
564
def __setattr__(self, name, val):
565
if name == 'encoding':
566
self.__dict__['encoding'] = val
568
return setattr(self._cstring, name, val)
571
class TestCase(unittest.TestCase):
572
"""Base class for bzr unit tests.
574
Tests that need access to disk resources should subclass
575
TestCaseInTempDir not TestCase.
577
Error and debug log messages are redirected from their usual
578
location into a temporary file, the contents of which can be
579
retrieved by _get_log(). We use a real OS file, not an in-memory object,
580
so that it can also capture file IO. When the test completes this file
581
is read into memory and removed from disk.
583
There are also convenience functions to invoke bzr's command-line
584
routine, and to build and check bzr trees.
586
In addition to the usual method of overriding tearDown(), this class also
587
allows subclasses to register functions into the _cleanups list, which is
588
run in order as the object is torn down. It's less likely this will be
589
accidentally overlooked.
592
_log_file_name = None
594
_keep_log_file = False
595
# record lsprof data when performing benchmark calls.
596
_gather_lsprof_in_benchmarks = False
598
def __init__(self, methodName='testMethod'):
599
super(TestCase, self).__init__(methodName)
603
unittest.TestCase.setUp(self)
604
self._cleanEnvironment()
605
bzrlib.trace.disable_default_logging()
608
self._benchcalls = []
609
self._benchtime = None
610
# prevent hooks affecting tests
611
self._preserved_hooks = bzrlib.branch.Branch.hooks
612
self.addCleanup(self._restoreHooks)
613
# this list of hooks must be kept in sync with the defaults
615
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
617
def _silenceUI(self):
618
"""Turn off UI for duration of test"""
619
# by default the UI is off; tests can turn it on if they want it.
620
saved = bzrlib.ui.ui_factory
622
bzrlib.ui.ui_factory = saved
623
bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
624
self.addCleanup(_restore)
626
def _ndiff_strings(self, a, b):
627
"""Return ndiff between two strings containing lines.
629
A trailing newline is added if missing to make the strings
631
if b and b[-1] != '\n':
633
if a and a[-1] != '\n':
635
difflines = difflib.ndiff(a.splitlines(True),
637
linejunk=lambda x: False,
638
charjunk=lambda x: False)
639
return ''.join(difflines)
641
def assertEqualDiff(self, a, b, message=None):
642
"""Assert two texts are equal, if not raise an exception.
644
This is intended for use with multi-line strings where it can
645
be hard to find the differences by eye.
647
# TODO: perhaps override assertEquals to call this for strings?
651
message = "texts not equal:\n"
652
raise AssertionError(message +
653
self._ndiff_strings(a, b))
655
def assertEqualMode(self, mode, mode_test):
656
self.assertEqual(mode, mode_test,
657
'mode mismatch %o != %o' % (mode, mode_test))
659
def assertStartsWith(self, s, prefix):
660
if not s.startswith(prefix):
661
raise AssertionError('string %r does not start with %r' % (s, prefix))
663
def assertEndsWith(self, s, suffix):
664
"""Asserts that s ends with suffix."""
665
if not s.endswith(suffix):
666
raise AssertionError('string %r does not end with %r' % (s, suffix))
668
def assertContainsRe(self, haystack, needle_re):
669
"""Assert that a contains something matching a regular expression."""
670
if not re.search(needle_re, haystack):
671
raise AssertionError('pattern "%s" not found in "%s"'
672
% (needle_re, haystack))
674
def assertNotContainsRe(self, haystack, needle_re):
675
"""Assert that a does not match a regular expression"""
676
if re.search(needle_re, haystack):
677
raise AssertionError('pattern "%s" found in "%s"'
678
% (needle_re, haystack))
680
def assertSubset(self, sublist, superlist):
681
"""Assert that every entry in sublist is present in superlist."""
683
for entry in sublist:
684
if entry not in superlist:
685
missing.append(entry)
687
raise AssertionError("value(s) %r not present in container %r" %
688
(missing, superlist))
690
def assertListRaises(self, excClass, func, *args, **kwargs):
691
"""Fail unless excClass is raised when the iterator from func is used.
693
Many functions can return generators this makes sure
694
to wrap them in a list() call to make sure the whole generator
695
is run, and that the proper exception is raised.
698
list(func(*args, **kwargs))
702
if getattr(excClass,'__name__', None) is not None:
703
excName = excClass.__name__
705
excName = str(excClass)
706
raise self.failureException, "%s not raised" % excName
708
def assertIs(self, left, right, message=None):
709
if not (left is right):
710
if message is not None:
711
raise AssertionError(message)
713
raise AssertionError("%r is not %r." % (left, right))
715
def assertIsNot(self, left, right, message=None):
717
if message is not None:
718
raise AssertionError(message)
720
raise AssertionError("%r is %r." % (left, right))
722
def assertTransportMode(self, transport, path, mode):
723
"""Fail if a path does not have mode mode.
725
If modes are not supported on this transport, the assertion is ignored.
727
if not transport._can_roundtrip_unix_modebits():
729
path_stat = transport.stat(path)
730
actual_mode = stat.S_IMODE(path_stat.st_mode)
731
self.assertEqual(mode, actual_mode,
732
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
734
def assertIsInstance(self, obj, kls):
735
"""Fail if obj is not an instance of kls"""
736
if not isinstance(obj, kls):
737
self.fail("%r is an instance of %s rather than %s" % (
738
obj, obj.__class__, kls))
740
def _capture_warnings(self, a_callable, *args, **kwargs):
741
"""A helper for callDeprecated and applyDeprecated.
743
:param a_callable: A callable to call.
744
:param args: The positional arguments for the callable
745
:param kwargs: The keyword arguments for the callable
746
:return: A tuple (warnings, result). result is the result of calling
747
a_callable(*args, **kwargs).
750
def capture_warnings(msg, cls=None, stacklevel=None):
751
# we've hooked into a deprecation specific callpath,
752
# only deprecations should getting sent via it.
753
self.assertEqual(cls, DeprecationWarning)
754
local_warnings.append(msg)
755
original_warning_method = symbol_versioning.warn
756
symbol_versioning.set_warning_method(capture_warnings)
758
result = a_callable(*args, **kwargs)
760
symbol_versioning.set_warning_method(original_warning_method)
761
return (local_warnings, result)
763
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
764
"""Call a deprecated callable without warning the user.
766
:param deprecation_format: The deprecation format that the callable
767
should have been deprecated with. This is the same type as the
768
parameter to deprecated_method/deprecated_function. If the
769
callable is not deprecated with this format, an assertion error
771
:param a_callable: A callable to call. This may be a bound method or
772
a regular function. It will be called with *args and **kwargs.
773
:param args: The positional arguments for the callable
774
:param kwargs: The keyword arguments for the callable
775
:return: The result of a_callable(*args, **kwargs)
777
call_warnings, result = self._capture_warnings(a_callable,
779
expected_first_warning = symbol_versioning.deprecation_string(
780
a_callable, deprecation_format)
781
if len(call_warnings) == 0:
782
self.fail("No assertion generated by call to %s" %
784
self.assertEqual(expected_first_warning, call_warnings[0])
787
def callDeprecated(self, expected, callable, *args, **kwargs):
788
"""Assert that a callable is deprecated in a particular way.
790
This is a very precise test for unusual requirements. The
791
applyDeprecated helper function is probably more suited for most tests
792
as it allows you to simply specify the deprecation format being used
793
and will ensure that that is issued for the function being called.
795
:param expected: a list of the deprecation warnings expected, in order
796
:param callable: The callable to call
797
:param args: The positional arguments for the callable
798
:param kwargs: The keyword arguments for the callable
800
call_warnings, result = self._capture_warnings(callable,
802
self.assertEqual(expected, call_warnings)
805
def _startLogFile(self):
806
"""Send bzr and test log messages to a temporary file.
808
The file is removed as the test is torn down.
810
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
811
self._log_file = os.fdopen(fileno, 'w+')
812
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
813
self._log_file_name = name
814
self.addCleanup(self._finishLogFile)
816
def _finishLogFile(self):
817
"""Finished with the log file.
819
Close the file and delete it, unless setKeepLogfile was called.
821
if self._log_file is None:
823
bzrlib.trace.disable_test_log(self._log_nonce)
824
self._log_file.close()
825
self._log_file = None
826
if not self._keep_log_file:
827
os.remove(self._log_file_name)
828
self._log_file_name = None
830
def setKeepLogfile(self):
831
"""Make the logfile not be deleted when _finishLogFile is called."""
832
self._keep_log_file = True
834
def addCleanup(self, callable):
835
"""Arrange to run a callable when this case is torn down.
837
Callables are run in the reverse of the order they are registered,
838
ie last-in first-out.
840
if callable in self._cleanups:
841
raise ValueError("cleanup function %r already registered on %s"
843
self._cleanups.append(callable)
845
def _cleanEnvironment(self):
847
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
849
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
851
'BZREMAIL': None, # may still be present in the environment
853
'BZR_PROGRESS_BAR': None,
863
# Nobody cares about these ones AFAIK. So far at
864
# least. If you do (care), please update this comment
870
self.addCleanup(self._restoreEnvironment)
871
for name, value in new_env.iteritems():
872
self._captureVar(name, value)
874
def _captureVar(self, name, newvalue):
875
"""Set an environment variable, and reset it when finished."""
876
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
878
def _restoreEnvironment(self):
879
for name, value in self.__old_env.iteritems():
880
osutils.set_or_unset_env(name, value)
882
def _restoreHooks(self):
883
bzrlib.branch.Branch.hooks = self._preserved_hooks
887
unittest.TestCase.tearDown(self)
889
def time(self, callable, *args, **kwargs):
890
"""Run callable and accrue the time it takes to the benchmark time.
892
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
893
this will cause lsprofile statistics to be gathered and stored in
896
if self._benchtime is None:
900
if not self._gather_lsprof_in_benchmarks:
901
return callable(*args, **kwargs)
903
# record this benchmark
904
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
906
self._benchcalls.append(((callable, args, kwargs), stats))
909
self._benchtime += time.time() - start
911
def _runCleanups(self):
912
"""Run registered cleanup functions.
914
This should only be called from TestCase.tearDown.
916
# TODO: Perhaps this should keep running cleanups even if
918
for cleanup_fn in reversed(self._cleanups):
921
def log(self, *args):
924
def _get_log(self, keep_log_file=False):
925
"""Return as a string the log for this test. If the file is still
926
on disk and keep_log_file=False, delete the log file and store the
927
content in self._log_contents."""
928
# flush the log file, to get all content
930
bzrlib.trace._trace_file.flush()
931
if self._log_contents:
932
return self._log_contents
933
if self._log_file_name is not None:
934
logfile = open(self._log_file_name)
936
log_contents = logfile.read()
939
if not keep_log_file:
940
self._log_contents = log_contents
942
os.remove(self._log_file_name)
944
if sys.platform == 'win32' and e.errno == errno.EACCES:
945
print >>sys.stderr, ('Unable to delete log file '
946
' %r' % self._log_file_name)
951
return "DELETED log file to reduce memory footprint"
953
def capture(self, cmd, retcode=0):
954
"""Shortcut that splits cmd into words, runs, and returns stdout"""
955
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
957
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
959
"""Invoke bzr and return (stdout, stderr).
961
Useful for code that wants to check the contents of the
962
output, the way error messages are presented, etc.
964
This should be the main method for tests that want to exercise the
965
overall behavior of the bzr application (rather than a unit test
966
or a functional test of the library.)
968
Much of the old code runs bzr by forking a new copy of Python, but
969
that is slower, harder to debug, and generally not necessary.
971
This runs bzr through the interface that catches and reports
972
errors, and with logging set to something approximating the
973
default, so that error reporting can be checked.
975
:param argv: arguments to invoke bzr
976
:param retcode: expected return code, or None for don't-care.
977
:param encoding: encoding for sys.stdout and sys.stderr
978
:param stdin: A string to be used as stdin for the command.
979
:param working_dir: Change to this directory before running
982
encoding = bzrlib.user_encoding
983
if stdin is not None:
984
stdin = StringIO(stdin)
985
stdout = StringIOWrapper()
986
stderr = StringIOWrapper()
987
stdout.encoding = encoding
988
stderr.encoding = encoding
990
self.log('run bzr: %r', argv)
991
# FIXME: don't call into logging here
992
handler = logging.StreamHandler(stderr)
993
handler.setLevel(logging.INFO)
994
logger = logging.getLogger('')
995
logger.addHandler(handler)
996
old_ui_factory = bzrlib.ui.ui_factory
997
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
1000
bzrlib.ui.ui_factory.stdin = stdin
1003
if working_dir is not None:
1004
cwd = osutils.getcwd()
1005
os.chdir(working_dir)
1008
saved_debug_flags = frozenset(debug.debug_flags)
1009
debug.debug_flags.clear()
1011
result = self.apply_redirected(stdin, stdout, stderr,
1012
bzrlib.commands.run_bzr_catch_errors,
1015
debug.debug_flags.update(saved_debug_flags)
1017
logger.removeHandler(handler)
1018
bzrlib.ui.ui_factory = old_ui_factory
1022
out = stdout.getvalue()
1023
err = stderr.getvalue()
1025
self.log('output:\n%r', out)
1027
self.log('errors:\n%r', err)
1028
if retcode is not None:
1029
self.assertEquals(retcode, result)
1032
def run_bzr(self, *args, **kwargs):
1033
"""Invoke bzr, as if it were run from the command line.
1035
This should be the main method for tests that want to exercise the
1036
overall behavior of the bzr application (rather than a unit test
1037
or a functional test of the library.)
1039
This sends the stdout/stderr results into the test's log,
1040
where it may be useful for debugging. See also run_captured.
1042
:param stdin: A string to be used as stdin for the command.
1044
retcode = kwargs.pop('retcode', 0)
1045
encoding = kwargs.pop('encoding', None)
1046
stdin = kwargs.pop('stdin', None)
1047
working_dir = kwargs.pop('working_dir', None)
1048
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
1049
stdin=stdin, working_dir=working_dir)
1051
def run_bzr_decode(self, *args, **kwargs):
1052
if 'encoding' in kwargs:
1053
encoding = kwargs['encoding']
1055
encoding = bzrlib.user_encoding
1056
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1058
def run_bzr_error(self, error_regexes, *args, **kwargs):
1059
"""Run bzr, and check that stderr contains the supplied regexes
1061
:param error_regexes: Sequence of regular expressions which
1062
must each be found in the error output. The relative ordering
1064
:param args: command-line arguments for bzr
1065
:param kwargs: Keyword arguments which are interpreted by run_bzr
1066
This function changes the default value of retcode to be 3,
1067
since in most cases this is run when you expect bzr to fail.
1068
:return: (out, err) The actual output of running the command (in case you
1069
want to do more inspection)
1072
# Make sure that commit is failing because there is nothing to do
1073
self.run_bzr_error(['no changes to commit'],
1074
'commit', '-m', 'my commit comment')
1075
# Make sure --strict is handling an unknown file, rather than
1076
# giving us the 'nothing to do' error
1077
self.build_tree(['unknown'])
1078
self.run_bzr_error(['Commit refused because there are unknown files'],
1079
'commit', '--strict', '-m', 'my commit comment')
1081
kwargs.setdefault('retcode', 3)
1082
out, err = self.run_bzr(*args, **kwargs)
1083
for regex in error_regexes:
1084
self.assertContainsRe(err, regex)
1087
def run_bzr_subprocess(self, *args, **kwargs):
1088
"""Run bzr in a subprocess for testing.
1090
This starts a new Python interpreter and runs bzr in there.
1091
This should only be used for tests that have a justifiable need for
1092
this isolation: e.g. they are testing startup time, or signal
1093
handling, or early startup code, etc. Subprocess code can't be
1094
profiled or debugged so easily.
1096
:param retcode: The status code that is expected. Defaults to 0. If
1097
None is supplied, the status code is not checked.
1098
:param env_changes: A dictionary which lists changes to environment
1099
variables. A value of None will unset the env variable.
1100
The values must be strings. The change will only occur in the
1101
child, so you don't need to fix the environment after running.
1102
:param universal_newlines: Convert CRLF => LF
1103
:param allow_plugins: By default the subprocess is run with
1104
--no-plugins to ensure test reproducibility. Also, it is possible
1105
for system-wide plugins to create unexpected output on stderr,
1106
which can cause unnecessary test failures.
1108
env_changes = kwargs.get('env_changes', {})
1109
working_dir = kwargs.get('working_dir', None)
1110
allow_plugins = kwargs.get('allow_plugins', False)
1111
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1112
working_dir=working_dir,
1113
allow_plugins=allow_plugins)
1114
# We distinguish between retcode=None and retcode not passed.
1115
supplied_retcode = kwargs.get('retcode', 0)
1116
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1117
universal_newlines=kwargs.get('universal_newlines', False),
1120
def start_bzr_subprocess(self, process_args, env_changes=None,
1121
skip_if_plan_to_signal=False,
1123
allow_plugins=False):
1124
"""Start bzr in a subprocess for testing.
1126
This starts a new Python interpreter and runs bzr in there.
1127
This should only be used for tests that have a justifiable need for
1128
this isolation: e.g. they are testing startup time, or signal
1129
handling, or early startup code, etc. Subprocess code can't be
1130
profiled or debugged so easily.
1132
:param process_args: a list of arguments to pass to the bzr executable,
1133
for example `['--version']`.
1134
:param env_changes: A dictionary which lists changes to environment
1135
variables. A value of None will unset the env variable.
1136
The values must be strings. The change will only occur in the
1137
child, so you don't need to fix the environment after running.
1138
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1140
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1142
:returns: Popen object for the started process.
1144
if skip_if_plan_to_signal:
1145
if not getattr(os, 'kill', None):
1146
raise TestSkipped("os.kill not available.")
1148
if env_changes is None:
1152
def cleanup_environment():
1153
for env_var, value in env_changes.iteritems():
1154
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1156
def restore_environment():
1157
for env_var, value in old_env.iteritems():
1158
osutils.set_or_unset_env(env_var, value)
1160
bzr_path = self.get_bzr_path()
1163
if working_dir is not None:
1164
cwd = osutils.getcwd()
1165
os.chdir(working_dir)
1168
# win32 subprocess doesn't support preexec_fn
1169
# so we will avoid using it on all platforms, just to
1170
# make sure the code path is used, and we don't break on win32
1171
cleanup_environment()
1172
command = [sys.executable, bzr_path]
1173
if not allow_plugins:
1174
command.append('--no-plugins')
1175
command.extend(process_args)
1176
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1178
restore_environment()
1184
def _popen(self, *args, **kwargs):
1185
"""Place a call to Popen.
1187
Allows tests to override this method to intercept the calls made to
1188
Popen for introspection.
1190
return Popen(*args, **kwargs)
1192
def get_bzr_path(self):
1193
"""Return the path of the 'bzr' executable for this test suite."""
1194
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1195
if not os.path.isfile(bzr_path):
1196
# We are probably installed. Assume sys.argv is the right file
1197
bzr_path = sys.argv[0]
1200
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1201
universal_newlines=False, process_args=None):
1202
"""Finish the execution of process.
1204
:param process: the Popen object returned from start_bzr_subprocess.
1205
:param retcode: The status code that is expected. Defaults to 0. If
1206
None is supplied, the status code is not checked.
1207
:param send_signal: an optional signal to send to the process.
1208
:param universal_newlines: Convert CRLF => LF
1209
:returns: (stdout, stderr)
1211
if send_signal is not None:
1212
os.kill(process.pid, send_signal)
1213
out, err = process.communicate()
1215
if universal_newlines:
1216
out = out.replace('\r\n', '\n')
1217
err = err.replace('\r\n', '\n')
1219
if retcode is not None and retcode != process.returncode:
1220
if process_args is None:
1221
process_args = "(unknown args)"
1222
mutter('Output of bzr %s:\n%s', process_args, out)
1223
mutter('Error for bzr %s:\n%s', process_args, err)
1224
self.fail('Command bzr %s failed with retcode %s != %s'
1225
% (process_args, retcode, process.returncode))
1228
def check_inventory_shape(self, inv, shape):
1229
"""Compare an inventory to a list of expected names.
1231
Fail if they are not precisely equal.
1234
shape = list(shape) # copy
1235
for path, ie in inv.entries():
1236
name = path.replace('\\', '/')
1237
if ie.kind == 'dir':
1244
self.fail("expected paths not found in inventory: %r" % shape)
1246
self.fail("unexpected paths found in inventory: %r" % extras)
1248
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1249
a_callable=None, *args, **kwargs):
1250
"""Call callable with redirected std io pipes.
1252
Returns the return code."""
1253
if not callable(a_callable):
1254
raise ValueError("a_callable must be callable.")
1256
stdin = StringIO("")
1258
if getattr(self, "_log_file", None) is not None:
1259
stdout = self._log_file
1263
if getattr(self, "_log_file", None is not None):
1264
stderr = self._log_file
1267
real_stdin = sys.stdin
1268
real_stdout = sys.stdout
1269
real_stderr = sys.stderr
1274
return a_callable(*args, **kwargs)
1276
sys.stdout = real_stdout
1277
sys.stderr = real_stderr
1278
sys.stdin = real_stdin
1280
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1281
def merge(self, branch_from, wt_to):
1282
"""A helper for tests to do a ui-less merge.
1284
This should move to the main library when someone has time to integrate
1287
# minimal ui-less merge.
1288
wt_to.branch.fetch(branch_from)
1289
base_rev = common_ancestor(branch_from.last_revision(),
1290
wt_to.branch.last_revision(),
1291
wt_to.branch.repository)
1292
merge_inner(wt_to.branch, branch_from.basis_tree(),
1293
wt_to.branch.repository.revision_tree(base_rev),
1295
wt_to.add_parent_tree_id(branch_from.last_revision())
1298
BzrTestBase = TestCase
1301
class TestCaseWithMemoryTransport(TestCase):
1302
"""Common test class for tests that do not need disk resources.
1304
Tests that need disk resources should derive from TestCaseWithTransport.
1306
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1308
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1309
a directory which does not exist. This serves to help ensure test isolation
1310
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1311
must exist. However, TestCaseWithMemoryTransport does not offer local
1312
file defaults for the transport in tests, nor does it obey the command line
1313
override, so tests that accidentally write to the common directory should
1321
def __init__(self, methodName='runTest'):
1322
# allow test parameterisation after test construction and before test
1323
# execution. Variables that the parameteriser sets need to be
1324
# ones that are not set by setUp, or setUp will trash them.
1325
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1326
self.transport_server = default_transport
1327
self.transport_readonly_server = None
1329
def get_transport(self):
1330
"""Return a writeable transport for the test scratch space"""
1331
t = get_transport(self.get_url())
1332
self.assertFalse(t.is_readonly())
1335
def get_readonly_transport(self):
1336
"""Return a readonly transport for the test scratch space
1338
This can be used to test that operations which should only need
1339
readonly access in fact do not try to write.
1341
t = get_transport(self.get_readonly_url())
1342
self.assertTrue(t.is_readonly())
1345
def create_transport_readonly_server(self):
1346
"""Create a transport server from class defined at init.
1348
This is mostly a hook for daughter classes.
1350
return self.transport_readonly_server()
1352
def get_readonly_server(self):
1353
"""Get the server instance for the readonly transport
1355
This is useful for some tests with specific servers to do diagnostics.
1357
if self.__readonly_server is None:
1358
if self.transport_readonly_server is None:
1359
# readonly decorator requested
1360
# bring up the server
1362
self.__readonly_server = ReadonlyServer()
1363
self.__readonly_server.setUp(self.__server)
1365
self.__readonly_server = self.create_transport_readonly_server()
1366
self.__readonly_server.setUp()
1367
self.addCleanup(self.__readonly_server.tearDown)
1368
return self.__readonly_server
1370
def get_readonly_url(self, relpath=None):
1371
"""Get a URL for the readonly transport.
1373
This will either be backed by '.' or a decorator to the transport
1374
used by self.get_url()
1375
relpath provides for clients to get a path relative to the base url.
1376
These should only be downwards relative, not upwards.
1378
base = self.get_readonly_server().get_url()
1379
if relpath is not None:
1380
if not base.endswith('/'):
1382
base = base + relpath
1385
def get_server(self):
1386
"""Get the read/write server instance.
1388
This is useful for some tests with specific servers that need
1391
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1392
is no means to override it.
1394
if self.__server is None:
1395
self.__server = MemoryServer()
1396
self.__server.setUp()
1397
self.addCleanup(self.__server.tearDown)
1398
return self.__server
1400
def get_url(self, relpath=None):
1401
"""Get a URL (or maybe a path) for the readwrite transport.
1403
This will either be backed by '.' or to an equivalent non-file based
1405
relpath provides for clients to get a path relative to the base url.
1406
These should only be downwards relative, not upwards.
1408
base = self.get_server().get_url()
1409
if relpath is not None and relpath != '.':
1410
if not base.endswith('/'):
1412
# XXX: Really base should be a url; we did after all call
1413
# get_url()! But sometimes it's just a path (from
1414
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1415
# to a non-escaped local path.
1416
if base.startswith('./') or base.startswith('/'):
1419
base += urlutils.escape(relpath)
1422
def _make_test_root(self):
1423
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1427
root = u'test%04d.tmp' % i
1431
if e.errno == errno.EEXIST:
1436
# successfully created
1437
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1439
# make a fake bzr directory there to prevent any tests propagating
1440
# up onto the source directory's real branch
1441
bzrdir.BzrDir.create_standalone_workingtree(
1442
TestCaseWithMemoryTransport.TEST_ROOT)
1444
def makeAndChdirToTestDir(self):
1445
"""Create a temporary directories for this one test.
1447
This must set self.test_home_dir and self.test_dir and chdir to
1450
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1452
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1453
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1454
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1456
def make_branch(self, relpath, format=None):
1457
"""Create a branch on the transport at relpath."""
1458
repo = self.make_repository(relpath, format=format)
1459
return repo.bzrdir.create_branch()
1461
def make_bzrdir(self, relpath, format=None):
1463
# might be a relative or absolute path
1464
maybe_a_url = self.get_url(relpath)
1465
segments = maybe_a_url.rsplit('/', 1)
1466
t = get_transport(maybe_a_url)
1467
if len(segments) > 1 and segments[-1] not in ('', '.'):
1470
except errors.FileExists:
1474
if isinstance(format, basestring):
1475
format = bzrdir.format_registry.make_bzrdir(format)
1476
return format.initialize_on_transport(t)
1477
except errors.UninitializableFormat:
1478
raise TestSkipped("Format %s is not initializable." % format)
1480
def make_repository(self, relpath, shared=False, format=None):
1481
"""Create a repository on our default transport at relpath."""
1482
made_control = self.make_bzrdir(relpath, format=format)
1483
return made_control.create_repository(shared=shared)
1485
def make_branch_and_memory_tree(self, relpath, format=None):
1486
"""Create a branch on the default transport and a MemoryTree for it."""
1487
b = self.make_branch(relpath, format=format)
1488
return memorytree.MemoryTree.create_on_branch(b)
1490
def overrideEnvironmentForTesting(self):
1491
os.environ['HOME'] = self.test_home_dir
1492
os.environ['BZR_HOME'] = self.test_home_dir
1495
super(TestCaseWithMemoryTransport, self).setUp()
1496
self._make_test_root()
1497
_currentdir = os.getcwdu()
1498
def _leaveDirectory():
1499
os.chdir(_currentdir)
1500
self.addCleanup(_leaveDirectory)
1501
self.makeAndChdirToTestDir()
1502
self.overrideEnvironmentForTesting()
1503
self.__readonly_server = None
1504
self.__server = None
1507
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1508
"""Derived class that runs a test within a temporary directory.
1510
This is useful for tests that need to create a branch, etc.
1512
The directory is created in a slightly complex way: for each
1513
Python invocation, a new temporary top-level directory is created.
1514
All test cases create their own directory within that. If the
1515
tests complete successfully, the directory is removed.
1517
InTempDir is an old alias for FunctionalTestCase.
1520
OVERRIDE_PYTHON = 'python'
1522
def check_file_contents(self, filename, expect):
1523
self.log("check contents of file %s" % filename)
1524
contents = file(filename, 'r').read()
1525
if contents != expect:
1526
self.log("expected: %r" % expect)
1527
self.log("actually: %r" % contents)
1528
self.fail("contents of %s not as expected" % filename)
1530
def makeAndChdirToTestDir(self):
1531
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
1533
For TestCaseInTempDir we create a temporary directory based on the test
1534
name and then create two subdirs - test and home under it.
1536
if NUMBERED_DIRS: # strongly recommended on Windows
1537
# due the path length limitation (260 chars)
1538
candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
1539
int(self.number/1000),
1541
os.makedirs(candidate_dir)
1542
self.test_home_dir = candidate_dir + '/home'
1543
os.mkdir(self.test_home_dir)
1544
self.test_dir = candidate_dir + '/work'
1545
os.mkdir(self.test_dir)
1546
os.chdir(self.test_dir)
1547
# put name of test inside
1548
f = file(candidate_dir + '/name', 'w')
1553
# shorten the name, to avoid test failures due to path length
1554
short_id = self.id().replace('bzrlib.tests.', '') \
1555
.replace('__main__.', '')[-100:]
1556
# it's possible the same test class is run several times for
1557
# parameterized tests, so make sure the names don't collide.
1561
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1563
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1564
if os.path.exists(candidate_dir):
1568
os.mkdir(candidate_dir)
1569
self.test_home_dir = candidate_dir + '/home'
1570
os.mkdir(self.test_home_dir)
1571
self.test_dir = candidate_dir + '/work'
1572
os.mkdir(self.test_dir)
1573
os.chdir(self.test_dir)
1576
def build_tree(self, shape, line_endings='binary', transport=None):
1577
"""Build a test tree according to a pattern.
1579
shape is a sequence of file specifications. If the final
1580
character is '/', a directory is created.
1582
This assumes that all the elements in the tree being built are new.
1584
This doesn't add anything to a branch.
1585
:param line_endings: Either 'binary' or 'native'
1586
in binary mode, exact contents are written
1587
in native mode, the line endings match the
1588
default platform endings.
1590
:param transport: A transport to write to, for building trees on
1591
VFS's. If the transport is readonly or None,
1592
"." is opened automatically.
1594
# It's OK to just create them using forward slashes on windows.
1595
if transport is None or transport.is_readonly():
1596
transport = get_transport(".")
1598
self.assert_(isinstance(name, basestring))
1600
transport.mkdir(urlutils.escape(name[:-1]))
1602
if line_endings == 'binary':
1604
elif line_endings == 'native':
1607
raise errors.BzrError(
1608
'Invalid line ending request %r' % line_endings)
1609
content = "contents of %s%s" % (name.encode('utf-8'), end)
1610
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1612
def build_tree_contents(self, shape):
1613
build_tree_contents(shape)
1615
def assertFileEqual(self, content, path):
1616
"""Fail if path does not contain 'content'."""
1617
self.failUnlessExists(path)
1618
# TODO: jam 20060427 Shouldn't this be 'rb'?
1619
self.assertEqualDiff(content, open(path, 'r').read())
1621
def failUnlessExists(self, path):
1622
"""Fail unless path, which may be abs or relative, exists."""
1623
self.failUnless(osutils.lexists(path),path+" does not exist")
1625
def failIfExists(self, path):
1626
"""Fail if path, which may be abs or relative, exists."""
1627
self.failIf(osutils.lexists(path),path+" exists")
1630
class TestCaseWithTransport(TestCaseInTempDir):
1631
"""A test case that provides get_url and get_readonly_url facilities.
1633
These back onto two transport servers, one for readonly access and one for
1636
If no explicit class is provided for readonly access, a
1637
ReadonlyTransportDecorator is used instead which allows the use of non disk
1638
based read write transports.
1640
If an explicit class is provided for readonly access, that server and the
1641
readwrite one must both define get_url() as resolving to os.getcwd().
1644
def create_transport_server(self):
1645
"""Create a transport server from class defined at init.
1647
This is mostly a hook for daughter classes.
1649
return self.transport_server()
1651
def get_server(self):
1652
"""See TestCaseWithMemoryTransport.
1654
This is useful for some tests with specific servers that need
1657
if self.__server is None:
1658
self.__server = self.create_transport_server()
1659
self.__server.setUp()
1660
self.addCleanup(self.__server.tearDown)
1661
return self.__server
1663
def make_branch_and_tree(self, relpath, format=None):
1664
"""Create a branch on the transport and a tree locally.
1666
If the transport is not a LocalTransport, the Tree can't be created on
1667
the transport. In that case the working tree is created in the local
1668
directory, and the returned tree's branch and repository will also be
1671
This will fail if the original default transport for this test
1672
case wasn't backed by the working directory, as the branch won't
1673
be on disk for us to open it.
1675
:param format: The BzrDirFormat.
1676
:returns: the WorkingTree.
1678
# TODO: always use the local disk path for the working tree,
1679
# this obviously requires a format that supports branch references
1680
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1682
b = self.make_branch(relpath, format=format)
1684
return b.bzrdir.create_workingtree()
1685
except errors.NotLocalUrl:
1686
# We can only make working trees locally at the moment. If the
1687
# transport can't support them, then reopen the branch on a local
1688
# transport, and create the working tree there.
1690
# Possibly we should instead keep
1691
# the non-disk-backed branch and create a local checkout?
1692
bd = bzrdir.BzrDir.open(relpath)
1693
return bd.create_workingtree()
1695
def assertIsDirectory(self, relpath, transport):
1696
"""Assert that relpath within transport is a directory.
1698
This may not be possible on all transports; in that case it propagates
1699
a TransportNotPossible.
1702
mode = transport.stat(relpath).st_mode
1703
except errors.NoSuchFile:
1704
self.fail("path %s is not a directory; no such file"
1706
if not stat.S_ISDIR(mode):
1707
self.fail("path %s is not a directory; has mode %#o"
1711
super(TestCaseWithTransport, self).setUp()
1712
self.__server = None
1715
class ChrootedTestCase(TestCaseWithTransport):
1716
"""A support class that provides readonly urls outside the local namespace.
1718
This is done by checking if self.transport_server is a MemoryServer. if it
1719
is then we are chrooted already, if it is not then an HttpServer is used
1722
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1723
be used without needed to redo it when a different
1724
subclass is in use ?
1728
super(ChrootedTestCase, self).setUp()
1729
if not self.transport_server == MemoryServer:
1730
self.transport_readonly_server = HttpServer
1733
def filter_suite_by_re(suite, pattern):
1734
result = TestUtil.TestSuite()
1735
filter_re = re.compile(pattern)
1736
for test in iter_suite_tests(suite):
1737
if filter_re.search(test.id()):
1738
result.addTest(test)
1742
def sort_suite_by_re(suite, pattern):
1745
filter_re = re.compile(pattern)
1746
for test in iter_suite_tests(suite):
1747
if filter_re.search(test.id()):
1751
return TestUtil.TestSuite(first + second)
1754
def run_suite(suite, name='test', verbose=False, pattern=".*",
1755
stop_on_failure=False, keep_output=False,
1756
transport=None, lsprof_timed=None, bench_history=None,
1757
matching_tests_first=None,
1758
numbered_dirs=None):
1759
global NUMBERED_DIRS
1760
if numbered_dirs is not None:
1761
NUMBERED_DIRS = bool(numbered_dirs)
1763
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1768
runner = TextTestRunner(stream=sys.stdout,
1770
verbosity=verbosity,
1771
keep_output=keep_output,
1772
bench_history=bench_history)
1773
runner.stop_on_failure=stop_on_failure
1775
if matching_tests_first:
1776
suite = sort_suite_by_re(suite, pattern)
1778
suite = filter_suite_by_re(suite, pattern)
1779
result = runner.run(suite)
1780
return result.wasSuccessful()
1783
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1786
test_suite_factory=None,
1789
matching_tests_first=None,
1790
numbered_dirs=None):
1791
"""Run the whole test suite under the enhanced runner"""
1792
# XXX: Very ugly way to do this...
1793
# Disable warning about old formats because we don't want it to disturb
1794
# any blackbox tests.
1795
from bzrlib import repository
1796
repository._deprecation_warning_done = True
1798
global default_transport
1799
if transport is None:
1800
transport = default_transport
1801
old_transport = default_transport
1802
default_transport = transport
1804
if test_suite_factory is None:
1805
suite = test_suite()
1807
suite = test_suite_factory()
1808
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1809
stop_on_failure=stop_on_failure, keep_output=keep_output,
1810
transport=transport,
1811
lsprof_timed=lsprof_timed,
1812
bench_history=bench_history,
1813
matching_tests_first=matching_tests_first,
1814
numbered_dirs=numbered_dirs)
1816
default_transport = old_transport
1820
"""Build and return TestSuite for the whole of bzrlib.
1822
This function can be replaced if you need to change the default test
1823
suite on a global basis, but it is not encouraged.
1826
'bzrlib.tests.test_ancestry',
1827
'bzrlib.tests.test_annotate',
1828
'bzrlib.tests.test_api',
1829
'bzrlib.tests.test_atomicfile',
1830
'bzrlib.tests.test_bad_files',
1831
'bzrlib.tests.test_branch',
1832
'bzrlib.tests.test_bundle',
1833
'bzrlib.tests.test_bzrdir',
1834
'bzrlib.tests.test_cache_utf8',
1835
'bzrlib.tests.test_commands',
1836
'bzrlib.tests.test_commit',
1837
'bzrlib.tests.test_commit_merge',
1838
'bzrlib.tests.test_config',
1839
'bzrlib.tests.test_conflicts',
1840
'bzrlib.tests.test_decorators',
1841
'bzrlib.tests.test_delta',
1842
'bzrlib.tests.test_diff',
1843
'bzrlib.tests.test_doc_generate',
1844
'bzrlib.tests.test_errors',
1845
'bzrlib.tests.test_escaped_store',
1846
'bzrlib.tests.test_fetch',
1847
'bzrlib.tests.test_ftp_transport',
1848
'bzrlib.tests.test_generate_docs',
1849
'bzrlib.tests.test_generate_ids',
1850
'bzrlib.tests.test_globbing',
1851
'bzrlib.tests.test_gpg',
1852
'bzrlib.tests.test_graph',
1853
'bzrlib.tests.test_hashcache',
1854
'bzrlib.tests.test_http',
1855
'bzrlib.tests.test_http_response',
1856
'bzrlib.tests.test_https_ca_bundle',
1857
'bzrlib.tests.test_identitymap',
1858
'bzrlib.tests.test_ignores',
1859
'bzrlib.tests.test_inv',
1860
'bzrlib.tests.test_knit',
1861
'bzrlib.tests.test_lazy_import',
1862
'bzrlib.tests.test_lazy_regex',
1863
'bzrlib.tests.test_lockdir',
1864
'bzrlib.tests.test_lockable_files',
1865
'bzrlib.tests.test_log',
1866
'bzrlib.tests.test_memorytree',
1867
'bzrlib.tests.test_merge',
1868
'bzrlib.tests.test_merge3',
1869
'bzrlib.tests.test_merge_core',
1870
'bzrlib.tests.test_missing',
1871
'bzrlib.tests.test_msgeditor',
1872
'bzrlib.tests.test_nonascii',
1873
'bzrlib.tests.test_options',
1874
'bzrlib.tests.test_osutils',
1875
'bzrlib.tests.test_osutils_encodings',
1876
'bzrlib.tests.test_patch',
1877
'bzrlib.tests.test_patches',
1878
'bzrlib.tests.test_permissions',
1879
'bzrlib.tests.test_plugins',
1880
'bzrlib.tests.test_progress',
1881
'bzrlib.tests.test_reconcile',
1882
'bzrlib.tests.test_registry',
1883
'bzrlib.tests.test_repository',
1884
'bzrlib.tests.test_revert',
1885
'bzrlib.tests.test_revision',
1886
'bzrlib.tests.test_revisionnamespaces',
1887
'bzrlib.tests.test_revisiontree',
1888
'bzrlib.tests.test_rio',
1889
'bzrlib.tests.test_sampler',
1890
'bzrlib.tests.test_selftest',
1891
'bzrlib.tests.test_setup',
1892
'bzrlib.tests.test_sftp_transport',
1893
'bzrlib.tests.test_smart_add',
1894
'bzrlib.tests.test_smart_transport',
1895
'bzrlib.tests.test_source',
1896
'bzrlib.tests.test_status',
1897
'bzrlib.tests.test_store',
1898
'bzrlib.tests.test_symbol_versioning',
1899
'bzrlib.tests.test_tag',
1900
'bzrlib.tests.test_testament',
1901
'bzrlib.tests.test_textfile',
1902
'bzrlib.tests.test_textmerge',
1903
'bzrlib.tests.test_trace',
1904
'bzrlib.tests.test_transactions',
1905
'bzrlib.tests.test_transform',
1906
'bzrlib.tests.test_transport',
1907
'bzrlib.tests.test_tree',
1908
'bzrlib.tests.test_treebuilder',
1909
'bzrlib.tests.test_tsort',
1910
'bzrlib.tests.test_tuned_gzip',
1911
'bzrlib.tests.test_ui',
1912
'bzrlib.tests.test_upgrade',
1913
'bzrlib.tests.test_urlutils',
1914
'bzrlib.tests.test_versionedfile',
1915
'bzrlib.tests.test_version',
1916
'bzrlib.tests.test_version_info',
1917
'bzrlib.tests.test_weave',
1918
'bzrlib.tests.test_whitebox',
1919
'bzrlib.tests.test_workingtree',
1920
'bzrlib.tests.test_wsgi',
1921
'bzrlib.tests.test_xml',
1923
test_transport_implementations = [
1924
'bzrlib.tests.test_transport_implementations',
1925
'bzrlib.tests.test_read_bundle',
1927
suite = TestUtil.TestSuite()
1928
loader = TestUtil.TestLoader()
1929
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1930
from bzrlib.transport import TransportTestProviderAdapter
1931
adapter = TransportTestProviderAdapter()
1932
adapt_modules(test_transport_implementations, adapter, loader, suite)
1933
for package in packages_to_test():
1934
suite.addTest(package.test_suite())
1935
for m in MODULES_TO_TEST:
1936
suite.addTest(loader.loadTestsFromModule(m))
1937
for m in MODULES_TO_DOCTEST:
1939
suite.addTest(doctest.DocTestSuite(m))
1940
except ValueError, e:
1941
print '**failed to get doctest for: %s\n%s' %(m,e)
1943
for name, plugin in bzrlib.plugin.all_plugins().items():
1944
if getattr(plugin, 'test_suite', None) is not None:
1945
default_encoding = sys.getdefaultencoding()
1947
plugin_suite = plugin.test_suite()
1948
except ImportError, e:
1949
bzrlib.trace.warning(
1950
'Unable to test plugin "%s": %s', name, e)
1952
suite.addTest(plugin_suite)
1953
if default_encoding != sys.getdefaultencoding():
1954
bzrlib.trace.warning(
1955
'Plugin "%s" tried to reset default encoding to: %s', name,
1956
sys.getdefaultencoding())
1958
sys.setdefaultencoding(default_encoding)
1962
def adapt_modules(mods_list, adapter, loader, suite):
1963
"""Adapt the modules in mods_list using adapter and add to suite."""
1964
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1965
suite.addTests(adapter.adapt(test))
1968
def clean_selftest_output(root=None, quiet=False):
1969
"""Remove all selftest output directories from root directory.
1971
:param root: root directory for clean
1972
(if ommitted or None then clean current directory).
1973
:param quiet: suppress report about deleting directories
1978
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
1981
for i in os.listdir(root):
1982
if os.path.isdir(i) and re_dir.match(i):
1984
print 'delete directory:', i