1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
30
from cStringIO import StringIO
36
from pprint import pformat
40
from subprocess import Popen, PIPE
57
import bzrlib.commands
58
import bzrlib.bundle.serializer
60
import bzrlib.inventory
61
import bzrlib.iterablefile
66
# lsprof not available
68
from bzrlib.merge import merge_inner
72
from bzrlib.revision import common_ancestor
74
from bzrlib import symbol_versioning
76
from bzrlib.transport import get_transport
77
import bzrlib.transport
78
from bzrlib.transport.local import LocalURLServer
79
from bzrlib.transport.memory import MemoryServer
80
from bzrlib.transport.readonly import ReadonlyServer
81
from bzrlib.trace import mutter, note
82
from bzrlib.tests import TestUtil
83
from bzrlib.tests.HttpServer import HttpServer
84
from bzrlib.tests.TestUtil import (
88
from bzrlib.tests.treeshape import build_tree_contents
89
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
91
default_transport = LocalURLServer
94
MODULES_TO_DOCTEST = [
95
bzrlib.bundle.serializer,
106
NUMBERED_DIRS = False # dirs kind for TestCaseInTempDir (numbered or named)
109
def packages_to_test():
110
"""Return a list of packages to test.
112
The packages are not globally imported so that import failures are
113
triggered when running selftest, not when importing the command.
116
import bzrlib.tests.blackbox
117
import bzrlib.tests.branch_implementations
118
import bzrlib.tests.bzrdir_implementations
119
import bzrlib.tests.interrepository_implementations
120
import bzrlib.tests.interversionedfile_implementations
121
import bzrlib.tests.intertree_implementations
122
import bzrlib.tests.repository_implementations
123
import bzrlib.tests.revisionstore_implementations
124
import bzrlib.tests.tree_implementations
125
import bzrlib.tests.workingtree_implementations
128
bzrlib.tests.blackbox,
129
bzrlib.tests.branch_implementations,
130
bzrlib.tests.bzrdir_implementations,
131
bzrlib.tests.interrepository_implementations,
132
bzrlib.tests.interversionedfile_implementations,
133
bzrlib.tests.intertree_implementations,
134
bzrlib.tests.repository_implementations,
135
bzrlib.tests.revisionstore_implementations,
136
bzrlib.tests.tree_implementations,
137
bzrlib.tests.workingtree_implementations,
141
class ExtendedTestResult(unittest._TextTestResult):
142
"""Accepts, reports and accumulates the results of running tests.
144
Compared to this unittest version this class adds support for profiling,
145
benchmarking, stopping as soon as a test fails, and skipping tests.
146
There are further-specialized subclasses for different types of display.
151
def __init__(self, stream, descriptions, verbosity,
155
"""Construct new TestResult.
157
:param bench_history: Optionally, a writable file object to accumulate
160
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
161
if bench_history is not None:
162
from bzrlib.version import _get_bzr_source_tree
163
src_tree = _get_bzr_source_tree()
166
revision_id = src_tree.get_parent_ids()[0]
168
# XXX: if this is a brand new tree, do the same as if there
172
# XXX: If there's no branch, what should we do?
174
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
175
self._bench_history = bench_history
176
self.ui = bzrlib.ui.ui_factory
177
self.num_tests = num_tests
179
self.failure_count = 0
182
self._overall_start_time = time.time()
184
def extractBenchmarkTime(self, testCase):
185
"""Add a benchmark time for the current test case."""
186
self._benchmarkTime = getattr(testCase, "_benchtime", None)
188
def _elapsedTestTimeString(self):
189
"""Return a time string for the overall time the current test has taken."""
190
return self._formatTime(time.time() - self._start_time)
192
def _testTimeString(self):
193
if self._benchmarkTime is not None:
195
self._formatTime(self._benchmarkTime),
196
self._elapsedTestTimeString())
198
return " %s" % self._elapsedTestTimeString()
200
def _formatTime(self, seconds):
201
"""Format seconds as milliseconds with leading spaces."""
202
# some benchmarks can take thousands of seconds to run, so we need 8
204
return "%8dms" % (1000 * seconds)
206
def _shortened_test_description(self, test):
208
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
211
def startTest(self, test):
212
unittest.TestResult.startTest(self, test)
213
self.report_test_start(test)
214
test.number = self.count
215
self._recordTestStartTime()
217
def _recordTestStartTime(self):
218
"""Record that a test has started."""
219
self._start_time = time.time()
221
def addError(self, test, err):
222
if isinstance(err[1], TestSkipped):
223
return self.addSkipped(test, err)
224
unittest.TestResult.addError(self, test, err)
225
# We can only do this if we have one of our TestCases, not if
227
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
228
if setKeepLogfile is not None:
230
self.extractBenchmarkTime(test)
231
self.report_error(test, err)
235
def addFailure(self, test, err):
236
unittest.TestResult.addFailure(self, test, err)
237
# We can only do this if we have one of our TestCases, not if
239
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
240
if setKeepLogfile is not None:
242
self.extractBenchmarkTime(test)
243
self.report_failure(test, err)
247
def addSuccess(self, test):
248
self.extractBenchmarkTime(test)
249
if self._bench_history is not None:
250
if self._benchmarkTime is not None:
251
self._bench_history.write("%s %s\n" % (
252
self._formatTime(self._benchmarkTime),
254
self.report_success(test)
255
unittest.TestResult.addSuccess(self, test)
257
def addSkipped(self, test, skip_excinfo):
258
self.extractBenchmarkTime(test)
259
self.report_skip(test, skip_excinfo)
260
# seems best to treat this as success from point-of-view of unittest
261
# -- it actually does nothing so it barely matters :)
264
except KeyboardInterrupt:
267
self.addError(test, test.__exc_info())
269
unittest.TestResult.addSuccess(self, test)
271
def printErrorList(self, flavour, errors):
272
for test, err in errors:
273
self.stream.writeln(self.separator1)
274
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
275
if getattr(test, '_get_log', None) is not None:
277
print >>self.stream, \
278
('vvvv[log from %s]' % test.id()).ljust(78,'-')
279
print >>self.stream, test._get_log()
280
print >>self.stream, \
281
('^^^^[log from %s]' % test.id()).ljust(78,'-')
282
self.stream.writeln(self.separator2)
283
self.stream.writeln("%s" % err)
288
def report_cleaning_up(self):
291
def report_success(self, test):
295
class TextTestResult(ExtendedTestResult):
296
"""Displays progress and results of tests in text form"""
298
def __init__(self, *args, **kw):
299
ExtendedTestResult.__init__(self, *args, **kw)
300
self.pb = self.ui.nested_progress_bar()
301
self.pb.show_pct = False
302
self.pb.show_spinner = False
303
self.pb.show_eta = False,
304
self.pb.show_count = False
305
self.pb.show_bar = False
307
def report_starting(self):
308
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
310
def _progress_prefix_text(self):
311
a = '[%d' % self.count
312
if self.num_tests is not None:
313
a +='/%d' % self.num_tests
314
a += ' in %ds' % (time.time() - self._overall_start_time)
316
a += ', %d errors' % self.error_count
317
if self.failure_count:
318
a += ', %d failed' % self.failure_count
320
a += ', %d skipped' % self.skip_count
324
def report_test_start(self, test):
327
self._progress_prefix_text()
329
+ self._shortened_test_description(test))
331
def _test_description(self, test):
333
return '#%d %s' % (self.count,
334
self._shortened_test_description(test))
336
return self._shortened_test_description(test)
338
def report_error(self, test, err):
339
self.error_count += 1
340
self.pb.note('ERROR: %s\n %s\n',
341
self._test_description(test),
345
def report_failure(self, test, err):
346
self.failure_count += 1
347
self.pb.note('FAIL: %s\n %s\n',
348
self._test_description(test),
352
def report_skip(self, test, skip_excinfo):
355
# at the moment these are mostly not things we can fix
356
# and so they just produce stipple; use the verbose reporter
359
# show test and reason for skip
360
self.pb.note('SKIP: %s\n %s\n',
361
self._shortened_test_description(test),
364
# since the class name was left behind in the still-visible
366
self.pb.note('SKIP: %s', skip_excinfo[1])
368
def report_cleaning_up(self):
369
self.pb.update('cleaning up...')
375
class VerboseTestResult(ExtendedTestResult):
376
"""Produce long output, with one line per test run plus times"""
378
def _ellipsize_to_right(self, a_string, final_width):
379
"""Truncate and pad a string, keeping the right hand side"""
380
if len(a_string) > final_width:
381
result = '...' + a_string[3-final_width:]
384
return result.ljust(final_width)
386
def report_starting(self):
387
self.stream.write('running %d tests...\n' % self.num_tests)
389
def report_test_start(self, test):
391
name = self._shortened_test_description(test)
392
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
393
# numbers, plus a trailing blank
394
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
396
self.stream.write('%5d ' % self.count)
397
self.stream.write(self._ellipsize_to_right(name,
398
osutils.terminal_width()-36))
400
self.stream.write(self._ellipsize_to_right(name,
401
osutils.terminal_width()-30))
404
def _error_summary(self, err):
408
return '%s%s' % (indent, err[1])
410
def report_error(self, test, err):
411
self.error_count += 1
412
self.stream.writeln('ERROR %s\n%s'
413
% (self._testTimeString(),
414
self._error_summary(err)))
416
def report_failure(self, test, err):
417
self.failure_count += 1
418
self.stream.writeln(' FAIL %s\n%s'
419
% (self._testTimeString(),
420
self._error_summary(err)))
422
def report_success(self, test):
423
self.stream.writeln(' OK %s' % self._testTimeString())
424
for bench_called, stats in getattr(test, '_benchcalls', []):
426
self.stream.write(' ' * 6)
427
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
428
stats.pprint(file=self.stream)
431
def report_skip(self, test, skip_excinfo):
433
self.stream.writeln(' SKIP %s\n%s'
434
% (self._testTimeString(),
435
self._error_summary(skip_excinfo)))
438
class TextTestRunner(object):
439
stop_on_failure = False
447
self.stream = unittest._WritelnDecorator(stream)
448
self.descriptions = descriptions
449
self.verbosity = verbosity
450
self.keep_output = keep_output
451
self._bench_history = bench_history
454
"Run the given test case or test suite."
455
startTime = time.time()
456
if self.verbosity == 1:
457
result_class = TextTestResult
458
elif self.verbosity >= 2:
459
result_class = VerboseTestResult
460
result = result_class(self.stream,
463
bench_history=self._bench_history,
464
num_tests=test.countTestCases(),
466
result.stop_early = self.stop_on_failure
467
result.report_starting()
469
stopTime = time.time()
470
timeTaken = stopTime - startTime
472
self.stream.writeln(result.separator2)
473
run = result.testsRun
474
self.stream.writeln("Ran %d test%s in %.3fs" %
475
(run, run != 1 and "s" or "", timeTaken))
476
self.stream.writeln()
477
if not result.wasSuccessful():
478
self.stream.write("FAILED (")
479
failed, errored = map(len, (result.failures, result.errors))
481
self.stream.write("failures=%d" % failed)
483
if failed: self.stream.write(", ")
484
self.stream.write("errors=%d" % errored)
485
self.stream.writeln(")")
487
self.stream.writeln("OK")
488
if result.skip_count > 0:
489
skipped = result.skip_count
490
self.stream.writeln('%d test%s skipped' %
491
(skipped, skipped != 1 and "s" or ""))
492
result.report_cleaning_up()
493
# This is still a little bogus,
494
# but only a little. Folk not using our testrunner will
495
# have to delete their temp directories themselves.
496
test_root = TestCaseWithMemoryTransport.TEST_ROOT
497
if result.wasSuccessful() or not self.keep_output:
498
if test_root is not None:
499
# If LANG=C we probably have created some bogus paths
500
# which rmtree(unicode) will fail to delete
501
# so make sure we are using rmtree(str) to delete everything
502
# except on win32, where rmtree(str) will fail
503
# since it doesn't have the property of byte-stream paths
504
# (they are either ascii or mbcs)
505
if sys.platform == 'win32':
506
# make sure we are using the unicode win32 api
507
test_root = unicode(test_root)
509
test_root = test_root.encode(
510
sys.getfilesystemencoding())
512
osutils.rmtree(test_root)
514
if sys.platform == 'win32' and e.errno == errno.EACCES:
515
print >>sys.stderr, ('Permission denied: '
516
'unable to remove testing dir '
517
'%s' % os.path.basename(test_root))
521
note("Failed tests working directories are in '%s'\n", test_root)
522
TestCaseWithMemoryTransport.TEST_ROOT = None
527
def iter_suite_tests(suite):
528
"""Return all tests in a suite, recursing through nested suites"""
529
for item in suite._tests:
530
if isinstance(item, unittest.TestCase):
532
elif isinstance(item, unittest.TestSuite):
533
for r in iter_suite_tests(item):
536
raise Exception('unknown object %r inside test suite %r'
540
class TestSkipped(Exception):
541
"""Indicates that a test was intentionally skipped, rather than failing."""
544
class CommandFailed(Exception):
548
class StringIOWrapper(object):
549
"""A wrapper around cStringIO which just adds an encoding attribute.
551
Internally we can check sys.stdout to see what the output encoding
552
should be. However, cStringIO has no encoding attribute that we can
553
set. So we wrap it instead.
558
def __init__(self, s=None):
560
self.__dict__['_cstring'] = StringIO(s)
562
self.__dict__['_cstring'] = StringIO()
564
def __getattr__(self, name, getattr=getattr):
565
return getattr(self.__dict__['_cstring'], name)
567
def __setattr__(self, name, val):
568
if name == 'encoding':
569
self.__dict__['encoding'] = val
571
return setattr(self._cstring, name, val)
574
class TestCase(unittest.TestCase):
575
"""Base class for bzr unit tests.
577
Tests that need access to disk resources should subclass
578
TestCaseInTempDir not TestCase.
580
Error and debug log messages are redirected from their usual
581
location into a temporary file, the contents of which can be
582
retrieved by _get_log(). We use a real OS file, not an in-memory object,
583
so that it can also capture file IO. When the test completes this file
584
is read into memory and removed from disk.
586
There are also convenience functions to invoke bzr's command-line
587
routine, and to build and check bzr trees.
589
In addition to the usual method of overriding tearDown(), this class also
590
allows subclasses to register functions into the _cleanups list, which is
591
run in order as the object is torn down. It's less likely this will be
592
accidentally overlooked.
595
_log_file_name = None
597
_keep_log_file = False
598
# record lsprof data when performing benchmark calls.
599
_gather_lsprof_in_benchmarks = False
601
def __init__(self, methodName='testMethod'):
602
super(TestCase, self).__init__(methodName)
606
unittest.TestCase.setUp(self)
607
self._cleanEnvironment()
608
bzrlib.trace.disable_default_logging()
611
self._benchcalls = []
612
self._benchtime = None
613
# prevent hooks affecting tests
614
self._preserved_hooks = bzrlib.branch.Branch.hooks
615
self.addCleanup(self._restoreHooks)
616
# this list of hooks must be kept in sync with the defaults
618
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
620
def _silenceUI(self):
621
"""Turn off UI for duration of test"""
622
# by default the UI is off; tests can turn it on if they want it.
623
saved = bzrlib.ui.ui_factory
625
bzrlib.ui.ui_factory = saved
626
bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
627
self.addCleanup(_restore)
629
def _ndiff_strings(self, a, b):
630
"""Return ndiff between two strings containing lines.
632
A trailing newline is added if missing to make the strings
634
if b and b[-1] != '\n':
636
if a and a[-1] != '\n':
638
difflines = difflib.ndiff(a.splitlines(True),
640
linejunk=lambda x: False,
641
charjunk=lambda x: False)
642
return ''.join(difflines)
644
def assertEqual(self, a, b, message=''):
649
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
651
pformat(a, indent=4), pformat(b, indent=4)))
653
assertEquals = assertEqual
655
def assertEqualDiff(self, a, b, message=None):
656
"""Assert two texts are equal, if not raise an exception.
658
This is intended for use with multi-line strings where it can
659
be hard to find the differences by eye.
661
# TODO: perhaps override assertEquals to call this for strings?
665
message = "texts not equal:\n"
666
raise AssertionError(message +
667
self._ndiff_strings(a, b))
669
def assertEqualMode(self, mode, mode_test):
670
self.assertEqual(mode, mode_test,
671
'mode mismatch %o != %o' % (mode, mode_test))
673
def assertStartsWith(self, s, prefix):
674
if not s.startswith(prefix):
675
raise AssertionError('string %r does not start with %r' % (s, prefix))
677
def assertEndsWith(self, s, suffix):
678
"""Asserts that s ends with suffix."""
679
if not s.endswith(suffix):
680
raise AssertionError('string %r does not end with %r' % (s, suffix))
682
def assertContainsRe(self, haystack, needle_re):
683
"""Assert that a contains something matching a regular expression."""
684
if not re.search(needle_re, haystack):
685
raise AssertionError('pattern "%r" not found in "%r"'
686
% (needle_re, haystack))
688
def assertNotContainsRe(self, haystack, needle_re):
689
"""Assert that a does not match a regular expression"""
690
if re.search(needle_re, haystack):
691
raise AssertionError('pattern "%s" found in "%s"'
692
% (needle_re, haystack))
694
def assertSubset(self, sublist, superlist):
695
"""Assert that every entry in sublist is present in superlist."""
697
for entry in sublist:
698
if entry not in superlist:
699
missing.append(entry)
701
raise AssertionError("value(s) %r not present in container %r" %
702
(missing, superlist))
704
def assertListRaises(self, excClass, func, *args, **kwargs):
705
"""Fail unless excClass is raised when the iterator from func is used.
707
Many functions can return generators this makes sure
708
to wrap them in a list() call to make sure the whole generator
709
is run, and that the proper exception is raised.
712
list(func(*args, **kwargs))
716
if getattr(excClass,'__name__', None) is not None:
717
excName = excClass.__name__
719
excName = str(excClass)
720
raise self.failureException, "%s not raised" % excName
722
def assertIs(self, left, right, message=None):
723
if not (left is right):
724
if message is not None:
725
raise AssertionError(message)
727
raise AssertionError("%r is not %r." % (left, right))
729
def assertIsNot(self, left, right, message=None):
731
if message is not None:
732
raise AssertionError(message)
734
raise AssertionError("%r is %r." % (left, right))
736
def assertTransportMode(self, transport, path, mode):
737
"""Fail if a path does not have mode mode.
739
If modes are not supported on this transport, the assertion is ignored.
741
if not transport._can_roundtrip_unix_modebits():
743
path_stat = transport.stat(path)
744
actual_mode = stat.S_IMODE(path_stat.st_mode)
745
self.assertEqual(mode, actual_mode,
746
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
748
def assertIsInstance(self, obj, kls):
749
"""Fail if obj is not an instance of kls"""
750
if not isinstance(obj, kls):
751
self.fail("%r is an instance of %s rather than %s" % (
752
obj, obj.__class__, kls))
754
def _capture_warnings(self, a_callable, *args, **kwargs):
755
"""A helper for callDeprecated and applyDeprecated.
757
:param a_callable: A callable to call.
758
:param args: The positional arguments for the callable
759
:param kwargs: The keyword arguments for the callable
760
:return: A tuple (warnings, result). result is the result of calling
761
a_callable(*args, **kwargs).
764
def capture_warnings(msg, cls=None, stacklevel=None):
765
# we've hooked into a deprecation specific callpath,
766
# only deprecations should getting sent via it.
767
self.assertEqual(cls, DeprecationWarning)
768
local_warnings.append(msg)
769
original_warning_method = symbol_versioning.warn
770
symbol_versioning.set_warning_method(capture_warnings)
772
result = a_callable(*args, **kwargs)
774
symbol_versioning.set_warning_method(original_warning_method)
775
return (local_warnings, result)
777
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
778
"""Call a deprecated callable without warning the user.
780
:param deprecation_format: The deprecation format that the callable
781
should have been deprecated with. This is the same type as the
782
parameter to deprecated_method/deprecated_function. If the
783
callable is not deprecated with this format, an assertion error
785
:param a_callable: A callable to call. This may be a bound method or
786
a regular function. It will be called with *args and **kwargs.
787
:param args: The positional arguments for the callable
788
:param kwargs: The keyword arguments for the callable
789
:return: The result of a_callable(*args, **kwargs)
791
call_warnings, result = self._capture_warnings(a_callable,
793
expected_first_warning = symbol_versioning.deprecation_string(
794
a_callable, deprecation_format)
795
if len(call_warnings) == 0:
796
self.fail("No deprecation warning generated by call to %s" %
798
self.assertEqual(expected_first_warning, call_warnings[0])
801
def callDeprecated(self, expected, callable, *args, **kwargs):
802
"""Assert that a callable is deprecated in a particular way.
804
This is a very precise test for unusual requirements. The
805
applyDeprecated helper function is probably more suited for most tests
806
as it allows you to simply specify the deprecation format being used
807
and will ensure that that is issued for the function being called.
809
:param expected: a list of the deprecation warnings expected, in order
810
:param callable: The callable to call
811
:param args: The positional arguments for the callable
812
:param kwargs: The keyword arguments for the callable
814
call_warnings, result = self._capture_warnings(callable,
816
self.assertEqual(expected, call_warnings)
819
def _startLogFile(self):
820
"""Send bzr and test log messages to a temporary file.
822
The file is removed as the test is torn down.
824
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
825
self._log_file = os.fdopen(fileno, 'w+')
826
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
827
self._log_file_name = name
828
self.addCleanup(self._finishLogFile)
830
def _finishLogFile(self):
831
"""Finished with the log file.
833
Close the file and delete it, unless setKeepLogfile was called.
835
if self._log_file is None:
837
bzrlib.trace.disable_test_log(self._log_nonce)
838
self._log_file.close()
839
self._log_file = None
840
if not self._keep_log_file:
841
os.remove(self._log_file_name)
842
self._log_file_name = None
844
def setKeepLogfile(self):
845
"""Make the logfile not be deleted when _finishLogFile is called."""
846
self._keep_log_file = True
848
def addCleanup(self, callable):
849
"""Arrange to run a callable when this case is torn down.
851
Callables are run in the reverse of the order they are registered,
852
ie last-in first-out.
854
if callable in self._cleanups:
855
raise ValueError("cleanup function %r already registered on %s"
857
self._cleanups.append(callable)
859
def _cleanEnvironment(self):
861
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
863
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
865
'BZREMAIL': None, # may still be present in the environment
867
'BZR_PROGRESS_BAR': None,
877
# Nobody cares about these ones AFAIK. So far at
878
# least. If you do (care), please update this comment
884
self.addCleanup(self._restoreEnvironment)
885
for name, value in new_env.iteritems():
886
self._captureVar(name, value)
888
def _captureVar(self, name, newvalue):
889
"""Set an environment variable, and reset it when finished."""
890
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
892
def _restoreEnvironment(self):
893
for name, value in self.__old_env.iteritems():
894
osutils.set_or_unset_env(name, value)
896
def _restoreHooks(self):
897
bzrlib.branch.Branch.hooks = self._preserved_hooks
901
unittest.TestCase.tearDown(self)
903
def time(self, callable, *args, **kwargs):
904
"""Run callable and accrue the time it takes to the benchmark time.
906
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
907
this will cause lsprofile statistics to be gathered and stored in
910
if self._benchtime is None:
914
if not self._gather_lsprof_in_benchmarks:
915
return callable(*args, **kwargs)
917
# record this benchmark
918
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
920
self._benchcalls.append(((callable, args, kwargs), stats))
923
self._benchtime += time.time() - start
925
def _runCleanups(self):
926
"""Run registered cleanup functions.
928
This should only be called from TestCase.tearDown.
930
# TODO: Perhaps this should keep running cleanups even if
932
for cleanup_fn in reversed(self._cleanups):
935
def log(self, *args):
938
def _get_log(self, keep_log_file=False):
939
"""Return as a string the log for this test. If the file is still
940
on disk and keep_log_file=False, delete the log file and store the
941
content in self._log_contents."""
942
# flush the log file, to get all content
944
bzrlib.trace._trace_file.flush()
945
if self._log_contents:
946
return self._log_contents
947
if self._log_file_name is not None:
948
logfile = open(self._log_file_name)
950
log_contents = logfile.read()
953
if not keep_log_file:
954
self._log_contents = log_contents
956
os.remove(self._log_file_name)
958
if sys.platform == 'win32' and e.errno == errno.EACCES:
959
print >>sys.stderr, ('Unable to delete log file '
960
' %r' % self._log_file_name)
965
return "DELETED log file to reduce memory footprint"
967
def capture(self, cmd, retcode=0):
968
"""Shortcut that splits cmd into words, runs, and returns stdout"""
969
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
971
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
973
"""Invoke bzr and return (stdout, stderr).
975
Useful for code that wants to check the contents of the
976
output, the way error messages are presented, etc.
978
This should be the main method for tests that want to exercise the
979
overall behavior of the bzr application (rather than a unit test
980
or a functional test of the library.)
982
Much of the old code runs bzr by forking a new copy of Python, but
983
that is slower, harder to debug, and generally not necessary.
985
This runs bzr through the interface that catches and reports
986
errors, and with logging set to something approximating the
987
default, so that error reporting can be checked.
989
:param argv: arguments to invoke bzr
990
:param retcode: expected return code, or None for don't-care.
991
:param encoding: encoding for sys.stdout and sys.stderr
992
:param stdin: A string to be used as stdin for the command.
993
:param working_dir: Change to this directory before running
996
encoding = bzrlib.user_encoding
997
if stdin is not None:
998
stdin = StringIO(stdin)
999
stdout = StringIOWrapper()
1000
stderr = StringIOWrapper()
1001
stdout.encoding = encoding
1002
stderr.encoding = encoding
1004
self.log('run bzr: %r', argv)
1005
# FIXME: don't call into logging here
1006
handler = logging.StreamHandler(stderr)
1007
handler.setLevel(logging.INFO)
1008
logger = logging.getLogger('')
1009
logger.addHandler(handler)
1010
old_ui_factory = bzrlib.ui.ui_factory
1011
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
1014
bzrlib.ui.ui_factory.stdin = stdin
1017
if working_dir is not None:
1018
cwd = osutils.getcwd()
1019
os.chdir(working_dir)
1022
saved_debug_flags = frozenset(debug.debug_flags)
1023
debug.debug_flags.clear()
1025
result = self.apply_redirected(stdin, stdout, stderr,
1026
bzrlib.commands.run_bzr_catch_errors,
1029
debug.debug_flags.update(saved_debug_flags)
1031
logger.removeHandler(handler)
1032
bzrlib.ui.ui_factory = old_ui_factory
1036
out = stdout.getvalue()
1037
err = stderr.getvalue()
1039
self.log('output:\n%r', out)
1041
self.log('errors:\n%r', err)
1042
if retcode is not None:
1043
self.assertEquals(retcode, result)
1046
def run_bzr(self, *args, **kwargs):
1047
"""Invoke bzr, as if it were run from the command line.
1049
This should be the main method for tests that want to exercise the
1050
overall behavior of the bzr application (rather than a unit test
1051
or a functional test of the library.)
1053
This sends the stdout/stderr results into the test's log,
1054
where it may be useful for debugging. See also run_captured.
1056
:param stdin: A string to be used as stdin for the command.
1057
:param retcode: The status code the command should return
1058
:param working_dir: The directory to run the command in
1060
retcode = kwargs.pop('retcode', 0)
1061
encoding = kwargs.pop('encoding', None)
1062
stdin = kwargs.pop('stdin', None)
1063
working_dir = kwargs.pop('working_dir', None)
1064
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
1065
stdin=stdin, working_dir=working_dir)
1067
def run_bzr_decode(self, *args, **kwargs):
1068
if 'encoding' in kwargs:
1069
encoding = kwargs['encoding']
1071
encoding = bzrlib.user_encoding
1072
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1074
def run_bzr_error(self, error_regexes, *args, **kwargs):
1075
"""Run bzr, and check that stderr contains the supplied regexes
1077
:param error_regexes: Sequence of regular expressions which
1078
must each be found in the error output. The relative ordering
1080
:param args: command-line arguments for bzr
1081
:param kwargs: Keyword arguments which are interpreted by run_bzr
1082
This function changes the default value of retcode to be 3,
1083
since in most cases this is run when you expect bzr to fail.
1084
:return: (out, err) The actual output of running the command (in case you
1085
want to do more inspection)
1088
# Make sure that commit is failing because there is nothing to do
1089
self.run_bzr_error(['no changes to commit'],
1090
'commit', '-m', 'my commit comment')
1091
# Make sure --strict is handling an unknown file, rather than
1092
# giving us the 'nothing to do' error
1093
self.build_tree(['unknown'])
1094
self.run_bzr_error(['Commit refused because there are unknown files'],
1095
'commit', '--strict', '-m', 'my commit comment')
1097
kwargs.setdefault('retcode', 3)
1098
out, err = self.run_bzr(*args, **kwargs)
1099
for regex in error_regexes:
1100
self.assertContainsRe(err, regex)
1103
def run_bzr_subprocess(self, *args, **kwargs):
1104
"""Run bzr in a subprocess for testing.
1106
This starts a new Python interpreter and runs bzr in there.
1107
This should only be used for tests that have a justifiable need for
1108
this isolation: e.g. they are testing startup time, or signal
1109
handling, or early startup code, etc. Subprocess code can't be
1110
profiled or debugged so easily.
1112
:param retcode: The status code that is expected. Defaults to 0. If
1113
None is supplied, the status code is not checked.
1114
:param env_changes: A dictionary which lists changes to environment
1115
variables. A value of None will unset the env variable.
1116
The values must be strings. The change will only occur in the
1117
child, so you don't need to fix the environment after running.
1118
:param universal_newlines: Convert CRLF => LF
1119
:param allow_plugins: By default the subprocess is run with
1120
--no-plugins to ensure test reproducibility. Also, it is possible
1121
for system-wide plugins to create unexpected output on stderr,
1122
which can cause unnecessary test failures.
1124
env_changes = kwargs.get('env_changes', {})
1125
working_dir = kwargs.get('working_dir', None)
1126
allow_plugins = kwargs.get('allow_plugins', False)
1127
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1128
working_dir=working_dir,
1129
allow_plugins=allow_plugins)
1130
# We distinguish between retcode=None and retcode not passed.
1131
supplied_retcode = kwargs.get('retcode', 0)
1132
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1133
universal_newlines=kwargs.get('universal_newlines', False),
1136
def start_bzr_subprocess(self, process_args, env_changes=None,
1137
skip_if_plan_to_signal=False,
1139
allow_plugins=False):
1140
"""Start bzr in a subprocess for testing.
1142
This starts a new Python interpreter and runs bzr in there.
1143
This should only be used for tests that have a justifiable need for
1144
this isolation: e.g. they are testing startup time, or signal
1145
handling, or early startup code, etc. Subprocess code can't be
1146
profiled or debugged so easily.
1148
:param process_args: a list of arguments to pass to the bzr executable,
1149
for example `['--version']`.
1150
:param env_changes: A dictionary which lists changes to environment
1151
variables. A value of None will unset the env variable.
1152
The values must be strings. The change will only occur in the
1153
child, so you don't need to fix the environment after running.
1154
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1156
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1158
:returns: Popen object for the started process.
1160
if skip_if_plan_to_signal:
1161
if not getattr(os, 'kill', None):
1162
raise TestSkipped("os.kill not available.")
1164
if env_changes is None:
1168
def cleanup_environment():
1169
for env_var, value in env_changes.iteritems():
1170
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1172
def restore_environment():
1173
for env_var, value in old_env.iteritems():
1174
osutils.set_or_unset_env(env_var, value)
1176
bzr_path = self.get_bzr_path()
1179
if working_dir is not None:
1180
cwd = osutils.getcwd()
1181
os.chdir(working_dir)
1184
# win32 subprocess doesn't support preexec_fn
1185
# so we will avoid using it on all platforms, just to
1186
# make sure the code path is used, and we don't break on win32
1187
cleanup_environment()
1188
command = [sys.executable, bzr_path]
1189
if not allow_plugins:
1190
command.append('--no-plugins')
1191
command.extend(process_args)
1192
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1194
restore_environment()
1200
def _popen(self, *args, **kwargs):
1201
"""Place a call to Popen.
1203
Allows tests to override this method to intercept the calls made to
1204
Popen for introspection.
1206
return Popen(*args, **kwargs)
1208
def get_bzr_path(self):
1209
"""Return the path of the 'bzr' executable for this test suite."""
1210
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1211
if not os.path.isfile(bzr_path):
1212
# We are probably installed. Assume sys.argv is the right file
1213
bzr_path = sys.argv[0]
1216
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1217
universal_newlines=False, process_args=None):
1218
"""Finish the execution of process.
1220
:param process: the Popen object returned from start_bzr_subprocess.
1221
:param retcode: The status code that is expected. Defaults to 0. If
1222
None is supplied, the status code is not checked.
1223
:param send_signal: an optional signal to send to the process.
1224
:param universal_newlines: Convert CRLF => LF
1225
:returns: (stdout, stderr)
1227
if send_signal is not None:
1228
os.kill(process.pid, send_signal)
1229
out, err = process.communicate()
1231
if universal_newlines:
1232
out = out.replace('\r\n', '\n')
1233
err = err.replace('\r\n', '\n')
1235
if retcode is not None and retcode != process.returncode:
1236
if process_args is None:
1237
process_args = "(unknown args)"
1238
mutter('Output of bzr %s:\n%s', process_args, out)
1239
mutter('Error for bzr %s:\n%s', process_args, err)
1240
self.fail('Command bzr %s failed with retcode %s != %s'
1241
% (process_args, retcode, process.returncode))
1244
def check_inventory_shape(self, inv, shape):
1245
"""Compare an inventory to a list of expected names.
1247
Fail if they are not precisely equal.
1250
shape = list(shape) # copy
1251
for path, ie in inv.entries():
1252
name = path.replace('\\', '/')
1253
if ie.kind == 'dir':
1260
self.fail("expected paths not found in inventory: %r" % shape)
1262
self.fail("unexpected paths found in inventory: %r" % extras)
1264
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1265
a_callable=None, *args, **kwargs):
1266
"""Call callable with redirected std io pipes.
1268
Returns the return code."""
1269
if not callable(a_callable):
1270
raise ValueError("a_callable must be callable.")
1272
stdin = StringIO("")
1274
if getattr(self, "_log_file", None) is not None:
1275
stdout = self._log_file
1279
if getattr(self, "_log_file", None is not None):
1280
stderr = self._log_file
1283
real_stdin = sys.stdin
1284
real_stdout = sys.stdout
1285
real_stderr = sys.stderr
1290
return a_callable(*args, **kwargs)
1292
sys.stdout = real_stdout
1293
sys.stderr = real_stderr
1294
sys.stdin = real_stdin
1296
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1297
def merge(self, branch_from, wt_to):
1298
"""A helper for tests to do a ui-less merge.
1300
This should move to the main library when someone has time to integrate
1303
# minimal ui-less merge.
1304
wt_to.branch.fetch(branch_from)
1305
base_rev = common_ancestor(branch_from.last_revision(),
1306
wt_to.branch.last_revision(),
1307
wt_to.branch.repository)
1308
merge_inner(wt_to.branch, branch_from.basis_tree(),
1309
wt_to.branch.repository.revision_tree(base_rev),
1311
wt_to.add_parent_tree_id(branch_from.last_revision())
1314
BzrTestBase = TestCase
1317
class TestCaseWithMemoryTransport(TestCase):
1318
"""Common test class for tests that do not need disk resources.
1320
Tests that need disk resources should derive from TestCaseWithTransport.
1322
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1324
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1325
a directory which does not exist. This serves to help ensure test isolation
1326
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1327
must exist. However, TestCaseWithMemoryTransport does not offer local
1328
file defaults for the transport in tests, nor does it obey the command line
1329
override, so tests that accidentally write to the common directory should
1337
def __init__(self, methodName='runTest'):
1338
# allow test parameterisation after test construction and before test
1339
# execution. Variables that the parameteriser sets need to be
1340
# ones that are not set by setUp, or setUp will trash them.
1341
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1342
self.transport_server = default_transport
1343
self.transport_readonly_server = None
1345
def get_transport(self):
1346
"""Return a writeable transport for the test scratch space"""
1347
t = get_transport(self.get_url())
1348
self.assertFalse(t.is_readonly())
1351
def get_readonly_transport(self):
1352
"""Return a readonly transport for the test scratch space
1354
This can be used to test that operations which should only need
1355
readonly access in fact do not try to write.
1357
t = get_transport(self.get_readonly_url())
1358
self.assertTrue(t.is_readonly())
1361
def create_transport_readonly_server(self):
1362
"""Create a transport server from class defined at init.
1364
This is mostly a hook for daughter classes.
1366
return self.transport_readonly_server()
1368
def get_readonly_server(self):
1369
"""Get the server instance for the readonly transport
1371
This is useful for some tests with specific servers to do diagnostics.
1373
if self.__readonly_server is None:
1374
if self.transport_readonly_server is None:
1375
# readonly decorator requested
1376
# bring up the server
1378
self.__readonly_server = ReadonlyServer()
1379
self.__readonly_server.setUp(self.__server)
1381
self.__readonly_server = self.create_transport_readonly_server()
1382
self.__readonly_server.setUp()
1383
self.addCleanup(self.__readonly_server.tearDown)
1384
return self.__readonly_server
1386
def get_readonly_url(self, relpath=None):
1387
"""Get a URL for the readonly transport.
1389
This will either be backed by '.' or a decorator to the transport
1390
used by self.get_url()
1391
relpath provides for clients to get a path relative to the base url.
1392
These should only be downwards relative, not upwards.
1394
base = self.get_readonly_server().get_url()
1395
if relpath is not None:
1396
if not base.endswith('/'):
1398
base = base + relpath
1401
def get_server(self):
1402
"""Get the read/write server instance.
1404
This is useful for some tests with specific servers that need
1407
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1408
is no means to override it.
1410
if self.__server is None:
1411
self.__server = MemoryServer()
1412
self.__server.setUp()
1413
self.addCleanup(self.__server.tearDown)
1414
return self.__server
1416
def get_url(self, relpath=None):
1417
"""Get a URL (or maybe a path) for the readwrite transport.
1419
This will either be backed by '.' or to an equivalent non-file based
1421
relpath provides for clients to get a path relative to the base url.
1422
These should only be downwards relative, not upwards.
1424
base = self.get_server().get_url()
1425
if relpath is not None and relpath != '.':
1426
if not base.endswith('/'):
1428
# XXX: Really base should be a url; we did after all call
1429
# get_url()! But sometimes it's just a path (from
1430
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1431
# to a non-escaped local path.
1432
if base.startswith('./') or base.startswith('/'):
1435
base += urlutils.escape(relpath)
1438
def _make_test_root(self):
1439
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1443
root = u'test%04d.tmp' % i
1447
if e.errno == errno.EEXIST:
1452
# successfully created
1453
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1455
# make a fake bzr directory there to prevent any tests propagating
1456
# up onto the source directory's real branch
1457
bzrdir.BzrDir.create_standalone_workingtree(
1458
TestCaseWithMemoryTransport.TEST_ROOT)
1460
def makeAndChdirToTestDir(self):
1461
"""Create a temporary directories for this one test.
1463
This must set self.test_home_dir and self.test_dir and chdir to
1466
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1468
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1469
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1470
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1472
def make_branch(self, relpath, format=None):
1473
"""Create a branch on the transport at relpath."""
1474
repo = self.make_repository(relpath, format=format)
1475
return repo.bzrdir.create_branch()
1477
def make_bzrdir(self, relpath, format=None):
1479
# might be a relative or absolute path
1480
maybe_a_url = self.get_url(relpath)
1481
segments = maybe_a_url.rsplit('/', 1)
1482
t = get_transport(maybe_a_url)
1483
if len(segments) > 1 and segments[-1] not in ('', '.'):
1486
except errors.FileExists:
1490
if isinstance(format, basestring):
1491
format = bzrdir.format_registry.make_bzrdir(format)
1492
return format.initialize_on_transport(t)
1493
except errors.UninitializableFormat:
1494
raise TestSkipped("Format %s is not initializable." % format)
1496
def make_repository(self, relpath, shared=False, format=None):
1497
"""Create a repository on our default transport at relpath."""
1498
made_control = self.make_bzrdir(relpath, format=format)
1499
return made_control.create_repository(shared=shared)
1501
def make_branch_and_memory_tree(self, relpath, format=None):
1502
"""Create a branch on the default transport and a MemoryTree for it."""
1503
b = self.make_branch(relpath, format=format)
1504
return memorytree.MemoryTree.create_on_branch(b)
1506
def overrideEnvironmentForTesting(self):
1507
os.environ['HOME'] = self.test_home_dir
1508
os.environ['BZR_HOME'] = self.test_home_dir
1511
super(TestCaseWithMemoryTransport, self).setUp()
1512
self._make_test_root()
1513
_currentdir = os.getcwdu()
1514
def _leaveDirectory():
1515
os.chdir(_currentdir)
1516
self.addCleanup(_leaveDirectory)
1517
self.makeAndChdirToTestDir()
1518
self.overrideEnvironmentForTesting()
1519
self.__readonly_server = None
1520
self.__server = None
1523
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1524
"""Derived class that runs a test within a temporary directory.
1526
This is useful for tests that need to create a branch, etc.
1528
The directory is created in a slightly complex way: for each
1529
Python invocation, a new temporary top-level directory is created.
1530
All test cases create their own directory within that. If the
1531
tests complete successfully, the directory is removed.
1533
InTempDir is an old alias for FunctionalTestCase.
1536
OVERRIDE_PYTHON = 'python'
1538
def check_file_contents(self, filename, expect):
1539
self.log("check contents of file %s" % filename)
1540
contents = file(filename, 'r').read()
1541
if contents != expect:
1542
self.log("expected: %r" % expect)
1543
self.log("actually: %r" % contents)
1544
self.fail("contents of %s not as expected" % filename)
1546
def makeAndChdirToTestDir(self):
1547
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
1549
For TestCaseInTempDir we create a temporary directory based on the test
1550
name and then create two subdirs - test and home under it.
1552
if NUMBERED_DIRS: # strongly recommended on Windows
1553
# due the path length limitation (260 chars)
1554
candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
1555
int(self.number/1000),
1557
os.makedirs(candidate_dir)
1558
self.test_home_dir = candidate_dir + '/home'
1559
os.mkdir(self.test_home_dir)
1560
self.test_dir = candidate_dir + '/work'
1561
os.mkdir(self.test_dir)
1562
os.chdir(self.test_dir)
1563
# put name of test inside
1564
f = file(candidate_dir + '/name', 'w')
1569
# shorten the name, to avoid test failures due to path length
1570
short_id = self.id().replace('bzrlib.tests.', '') \
1571
.replace('__main__.', '')[-100:]
1572
# it's possible the same test class is run several times for
1573
# parameterized tests, so make sure the names don't collide.
1577
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1579
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1580
if os.path.exists(candidate_dir):
1584
os.mkdir(candidate_dir)
1585
self.test_home_dir = candidate_dir + '/home'
1586
os.mkdir(self.test_home_dir)
1587
self.test_dir = candidate_dir + '/work'
1588
os.mkdir(self.test_dir)
1589
os.chdir(self.test_dir)
1592
def build_tree(self, shape, line_endings='binary', transport=None):
1593
"""Build a test tree according to a pattern.
1595
shape is a sequence of file specifications. If the final
1596
character is '/', a directory is created.
1598
This assumes that all the elements in the tree being built are new.
1600
This doesn't add anything to a branch.
1601
:param line_endings: Either 'binary' or 'native'
1602
in binary mode, exact contents are written
1603
in native mode, the line endings match the
1604
default platform endings.
1606
:param transport: A transport to write to, for building trees on
1607
VFS's. If the transport is readonly or None,
1608
"." is opened automatically.
1610
# It's OK to just create them using forward slashes on windows.
1611
if transport is None or transport.is_readonly():
1612
transport = get_transport(".")
1614
self.assert_(isinstance(name, basestring))
1616
transport.mkdir(urlutils.escape(name[:-1]))
1618
if line_endings == 'binary':
1620
elif line_endings == 'native':
1623
raise errors.BzrError(
1624
'Invalid line ending request %r' % line_endings)
1625
content = "contents of %s%s" % (name.encode('utf-8'), end)
1626
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1628
def build_tree_contents(self, shape):
1629
build_tree_contents(shape)
1631
def assertFileEqual(self, content, path):
1632
"""Fail if path does not contain 'content'."""
1633
self.failUnlessExists(path)
1634
# TODO: jam 20060427 Shouldn't this be 'rb'?
1635
self.assertEqualDiff(content, open(path, 'r').read())
1637
def failUnlessExists(self, path):
1638
"""Fail unless path, which may be abs or relative, exists."""
1639
self.failUnless(osutils.lexists(path),path+" does not exist")
1641
def failIfExists(self, path):
1642
"""Fail if path, which may be abs or relative, exists."""
1643
self.failIf(osutils.lexists(path),path+" exists")
1646
class TestCaseWithTransport(TestCaseInTempDir):
1647
"""A test case that provides get_url and get_readonly_url facilities.
1649
These back onto two transport servers, one for readonly access and one for
1652
If no explicit class is provided for readonly access, a
1653
ReadonlyTransportDecorator is used instead which allows the use of non disk
1654
based read write transports.
1656
If an explicit class is provided for readonly access, that server and the
1657
readwrite one must both define get_url() as resolving to os.getcwd().
1660
def create_transport_server(self):
1661
"""Create a transport server from class defined at init.
1663
This is mostly a hook for daughter classes.
1665
return self.transport_server()
1667
def get_server(self):
1668
"""See TestCaseWithMemoryTransport.
1670
This is useful for some tests with specific servers that need
1673
if self.__server is None:
1674
self.__server = self.create_transport_server()
1675
self.__server.setUp()
1676
self.addCleanup(self.__server.tearDown)
1677
return self.__server
1679
def make_branch_and_tree(self, relpath, format=None):
1680
"""Create a branch on the transport and a tree locally.
1682
If the transport is not a LocalTransport, the Tree can't be created on
1683
the transport. In that case the working tree is created in the local
1684
directory, and the returned tree's branch and repository will also be
1687
This will fail if the original default transport for this test
1688
case wasn't backed by the working directory, as the branch won't
1689
be on disk for us to open it.
1691
:param format: The BzrDirFormat.
1692
:returns: the WorkingTree.
1694
# TODO: always use the local disk path for the working tree,
1695
# this obviously requires a format that supports branch references
1696
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1698
b = self.make_branch(relpath, format=format)
1700
return b.bzrdir.create_workingtree()
1701
except errors.NotLocalUrl:
1702
# We can only make working trees locally at the moment. If the
1703
# transport can't support them, then reopen the branch on a local
1704
# transport, and create the working tree there.
1706
# Possibly we should instead keep
1707
# the non-disk-backed branch and create a local checkout?
1708
bd = bzrdir.BzrDir.open(relpath)
1709
return bd.create_workingtree()
1711
def assertIsDirectory(self, relpath, transport):
1712
"""Assert that relpath within transport is a directory.
1714
This may not be possible on all transports; in that case it propagates
1715
a TransportNotPossible.
1718
mode = transport.stat(relpath).st_mode
1719
except errors.NoSuchFile:
1720
self.fail("path %s is not a directory; no such file"
1722
if not stat.S_ISDIR(mode):
1723
self.fail("path %s is not a directory; has mode %#o"
1726
def assertTreesEqual(self, left, right):
1727
"""Check that left and right have the same content and properties."""
1728
# we use a tree delta to check for equality of the content, and we
1729
# manually check for equality of other things such as the parents list.
1730
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
1731
differences = left.changes_from(right)
1732
self.assertFalse(differences.has_changed(),
1733
"Trees %r and %r are different: %r" % (left, right, differences))
1736
super(TestCaseWithTransport, self).setUp()
1737
self.__server = None
1740
class ChrootedTestCase(TestCaseWithTransport):
1741
"""A support class that provides readonly urls outside the local namespace.
1743
This is done by checking if self.transport_server is a MemoryServer. if it
1744
is then we are chrooted already, if it is not then an HttpServer is used
1747
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1748
be used without needed to redo it when a different
1749
subclass is in use ?
1753
super(ChrootedTestCase, self).setUp()
1754
if not self.transport_server == MemoryServer:
1755
self.transport_readonly_server = HttpServer
1758
def filter_suite_by_re(suite, pattern):
1759
result = TestUtil.TestSuite()
1760
filter_re = re.compile(pattern)
1761
for test in iter_suite_tests(suite):
1762
if filter_re.search(test.id()):
1763
result.addTest(test)
1767
def sort_suite_by_re(suite, pattern):
1770
filter_re = re.compile(pattern)
1771
for test in iter_suite_tests(suite):
1772
if filter_re.search(test.id()):
1776
return TestUtil.TestSuite(first + second)
1779
def run_suite(suite, name='test', verbose=False, pattern=".*",
1780
stop_on_failure=False, keep_output=False,
1781
transport=None, lsprof_timed=None, bench_history=None,
1782
matching_tests_first=None,
1783
numbered_dirs=False):
1784
global NUMBERED_DIRS
1785
NUMBERED_DIRS = bool(numbered_dirs)
1787
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1792
runner = TextTestRunner(stream=sys.stdout,
1794
verbosity=verbosity,
1795
keep_output=keep_output,
1796
bench_history=bench_history)
1797
runner.stop_on_failure=stop_on_failure
1799
if matching_tests_first:
1800
suite = sort_suite_by_re(suite, pattern)
1802
suite = filter_suite_by_re(suite, pattern)
1803
result = runner.run(suite)
1804
return result.wasSuccessful()
1807
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1810
test_suite_factory=None,
1813
matching_tests_first=None,
1814
numbered_dirs=False):
1815
"""Run the whole test suite under the enhanced runner"""
1816
# XXX: Very ugly way to do this...
1817
# Disable warning about old formats because we don't want it to disturb
1818
# any blackbox tests.
1819
from bzrlib import repository
1820
repository._deprecation_warning_done = True
1822
global default_transport
1823
if transport is None:
1824
transport = default_transport
1825
old_transport = default_transport
1826
default_transport = transport
1828
if test_suite_factory is None:
1829
suite = test_suite()
1831
suite = test_suite_factory()
1832
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1833
stop_on_failure=stop_on_failure, keep_output=keep_output,
1834
transport=transport,
1835
lsprof_timed=lsprof_timed,
1836
bench_history=bench_history,
1837
matching_tests_first=matching_tests_first,
1838
numbered_dirs=numbered_dirs)
1840
default_transport = old_transport
1844
"""Build and return TestSuite for the whole of bzrlib.
1846
This function can be replaced if you need to change the default test
1847
suite on a global basis, but it is not encouraged.
1850
'bzrlib.tests.test_ancestry',
1851
'bzrlib.tests.test_annotate',
1852
'bzrlib.tests.test_api',
1853
'bzrlib.tests.test_atomicfile',
1854
'bzrlib.tests.test_bad_files',
1855
'bzrlib.tests.test_branch',
1856
'bzrlib.tests.test_bundle',
1857
'bzrlib.tests.test_bzrdir',
1858
'bzrlib.tests.test_cache_utf8',
1859
'bzrlib.tests.test_commands',
1860
'bzrlib.tests.test_commit',
1861
'bzrlib.tests.test_commit_merge',
1862
'bzrlib.tests.test_config',
1863
'bzrlib.tests.test_conflicts',
1864
'bzrlib.tests.test_decorators',
1865
'bzrlib.tests.test_delta',
1866
'bzrlib.tests.test_diff',
1867
'bzrlib.tests.test_dirstate',
1868
'bzrlib.tests.test_doc_generate',
1869
'bzrlib.tests.test_errors',
1870
'bzrlib.tests.test_escaped_store',
1871
'bzrlib.tests.test_extract',
1872
'bzrlib.tests.test_fetch',
1873
'bzrlib.tests.test_ftp_transport',
1874
'bzrlib.tests.test_generate_docs',
1875
'bzrlib.tests.test_generate_ids',
1876
'bzrlib.tests.test_globbing',
1877
'bzrlib.tests.test_gpg',
1878
'bzrlib.tests.test_graph',
1879
'bzrlib.tests.test_hashcache',
1880
'bzrlib.tests.test_http',
1881
'bzrlib.tests.test_http_response',
1882
'bzrlib.tests.test_https_ca_bundle',
1883
'bzrlib.tests.test_identitymap',
1884
'bzrlib.tests.test_ignores',
1885
'bzrlib.tests.test_inv',
1886
'bzrlib.tests.test_knit',
1887
'bzrlib.tests.test_lazy_import',
1888
'bzrlib.tests.test_lazy_regex',
1889
'bzrlib.tests.test_lockdir',
1890
'bzrlib.tests.test_lockable_files',
1891
'bzrlib.tests.test_log',
1892
'bzrlib.tests.test_memorytree',
1893
'bzrlib.tests.test_merge',
1894
'bzrlib.tests.test_merge3',
1895
'bzrlib.tests.test_merge_core',
1896
'bzrlib.tests.test_missing',
1897
'bzrlib.tests.test_msgeditor',
1898
'bzrlib.tests.test_nonascii',
1899
'bzrlib.tests.test_options',
1900
'bzrlib.tests.test_osutils',
1901
'bzrlib.tests.test_osutils_encodings',
1902
'bzrlib.tests.test_patch',
1903
'bzrlib.tests.test_patches',
1904
'bzrlib.tests.test_permissions',
1905
'bzrlib.tests.test_plugins',
1906
'bzrlib.tests.test_progress',
1907
'bzrlib.tests.test_reconcile',
1908
'bzrlib.tests.test_registry',
1909
'bzrlib.tests.test_repository',
1910
'bzrlib.tests.test_revert',
1911
'bzrlib.tests.test_revision',
1912
'bzrlib.tests.test_revisionnamespaces',
1913
'bzrlib.tests.test_revisiontree',
1914
'bzrlib.tests.test_rio',
1915
'bzrlib.tests.test_sampler',
1916
'bzrlib.tests.test_selftest',
1917
'bzrlib.tests.test_setup',
1918
'bzrlib.tests.test_sftp_transport',
1919
'bzrlib.tests.test_smart_add',
1920
'bzrlib.tests.test_smart_transport',
1921
'bzrlib.tests.test_source',
1922
'bzrlib.tests.test_status',
1923
'bzrlib.tests.test_store',
1924
'bzrlib.tests.test_subsume',
1925
'bzrlib.tests.test_symbol_versioning',
1926
'bzrlib.tests.test_tag',
1927
'bzrlib.tests.test_testament',
1928
'bzrlib.tests.test_textfile',
1929
'bzrlib.tests.test_textmerge',
1930
'bzrlib.tests.test_trace',
1931
'bzrlib.tests.test_transactions',
1932
'bzrlib.tests.test_transform',
1933
'bzrlib.tests.test_transport',
1934
'bzrlib.tests.test_tree',
1935
'bzrlib.tests.test_treebuilder',
1936
'bzrlib.tests.test_tsort',
1937
'bzrlib.tests.test_tuned_gzip',
1938
'bzrlib.tests.test_ui',
1939
'bzrlib.tests.test_upgrade',
1940
'bzrlib.tests.test_urlutils',
1941
'bzrlib.tests.test_versionedfile',
1942
'bzrlib.tests.test_version',
1943
'bzrlib.tests.test_version_info',
1944
'bzrlib.tests.test_weave',
1945
'bzrlib.tests.test_whitebox',
1946
'bzrlib.tests.test_workingtree',
1947
'bzrlib.tests.test_workingtree_4',
1948
'bzrlib.tests.test_wsgi',
1949
'bzrlib.tests.test_xml',
1951
test_transport_implementations = [
1952
'bzrlib.tests.test_transport_implementations',
1953
'bzrlib.tests.test_read_bundle',
1955
suite = TestUtil.TestSuite()
1956
loader = TestUtil.TestLoader()
1957
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1958
from bzrlib.transport import TransportTestProviderAdapter
1959
adapter = TransportTestProviderAdapter()
1960
adapt_modules(test_transport_implementations, adapter, loader, suite)
1961
for package in packages_to_test():
1962
suite.addTest(package.test_suite())
1963
for m in MODULES_TO_TEST:
1964
suite.addTest(loader.loadTestsFromModule(m))
1965
for m in MODULES_TO_DOCTEST:
1967
suite.addTest(doctest.DocTestSuite(m))
1968
except ValueError, e:
1969
print '**failed to get doctest for: %s\n%s' %(m,e)
1971
for name, plugin in bzrlib.plugin.all_plugins().items():
1972
if getattr(plugin, 'test_suite', None) is not None:
1973
default_encoding = sys.getdefaultencoding()
1975
plugin_suite = plugin.test_suite()
1976
except ImportError, e:
1977
bzrlib.trace.warning(
1978
'Unable to test plugin "%s": %s', name, e)
1980
suite.addTest(plugin_suite)
1981
if default_encoding != sys.getdefaultencoding():
1982
bzrlib.trace.warning(
1983
'Plugin "%s" tried to reset default encoding to: %s', name,
1984
sys.getdefaultencoding())
1986
sys.setdefaultencoding(default_encoding)
1990
def adapt_modules(mods_list, adapter, loader, suite):
1991
"""Adapt the modules in mods_list using adapter and add to suite."""
1992
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1993
suite.addTests(adapter.adapt(test))
1996
def clean_selftest_output(root=None, quiet=False):
1997
"""Remove all selftest output directories from root directory.
1999
:param root: root directory for clean
2000
(if ommitted or None then clean current directory).
2001
:param quiet: suppress report about deleting directories
2006
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
2009
for i in os.listdir(root):
2010
if os.path.isdir(i) and re_dir.match(i):
2012
print 'delete directory:', i