1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
30
from cStringIO import StringIO
36
from pprint import pformat
40
from subprocess import Popen, PIPE
58
import bzrlib.commands
59
import bzrlib.timestamp
61
import bzrlib.inventory
62
import bzrlib.iterablefile
67
# lsprof not available
69
from bzrlib.merge import merge_inner
73
from bzrlib.revision import common_ancestor
75
from bzrlib import symbol_versioning
77
from bzrlib.transport import get_transport
78
import bzrlib.transport
79
from bzrlib.transport.local import LocalURLServer
80
from bzrlib.transport.memory import MemoryServer
81
from bzrlib.transport.readonly import ReadonlyServer
82
from bzrlib.trace import mutter, note
83
from bzrlib.tests import TestUtil
84
from bzrlib.tests.HttpServer import HttpServer
85
from bzrlib.tests.TestUtil import (
89
from bzrlib.tests.treeshape import build_tree_contents
90
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
92
default_transport = LocalURLServer
95
MODULES_TO_DOCTEST = [
107
NUMBERED_DIRS = False # dirs kind for TestCaseInTempDir (numbered or named)
110
def packages_to_test():
111
"""Return a list of packages to test.
113
The packages are not globally imported so that import failures are
114
triggered when running selftest, not when importing the command.
117
import bzrlib.tests.blackbox
118
import bzrlib.tests.branch_implementations
119
import bzrlib.tests.bzrdir_implementations
120
import bzrlib.tests.interrepository_implementations
121
import bzrlib.tests.interversionedfile_implementations
122
import bzrlib.tests.intertree_implementations
123
import bzrlib.tests.repository_implementations
124
import bzrlib.tests.revisionstore_implementations
125
import bzrlib.tests.tree_implementations
126
import bzrlib.tests.workingtree_implementations
129
bzrlib.tests.blackbox,
130
bzrlib.tests.branch_implementations,
131
bzrlib.tests.bzrdir_implementations,
132
bzrlib.tests.interrepository_implementations,
133
bzrlib.tests.interversionedfile_implementations,
134
bzrlib.tests.intertree_implementations,
135
bzrlib.tests.repository_implementations,
136
bzrlib.tests.revisionstore_implementations,
137
bzrlib.tests.tree_implementations,
138
bzrlib.tests.workingtree_implementations,
142
class ExtendedTestResult(unittest._TextTestResult):
143
"""Accepts, reports and accumulates the results of running tests.
145
Compared to this unittest version this class adds support for profiling,
146
benchmarking, stopping as soon as a test fails, and skipping tests.
147
There are further-specialized subclasses for different types of display.
152
def __init__(self, stream, descriptions, verbosity,
156
"""Construct new TestResult.
158
:param bench_history: Optionally, a writable file object to accumulate
161
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
162
if bench_history is not None:
163
from bzrlib.version import _get_bzr_source_tree
164
src_tree = _get_bzr_source_tree()
167
revision_id = src_tree.get_parent_ids()[0]
169
# XXX: if this is a brand new tree, do the same as if there
173
# XXX: If there's no branch, what should we do?
175
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
176
self._bench_history = bench_history
177
self.ui = ui.ui_factory
178
self.num_tests = num_tests
180
self.failure_count = 0
183
self._overall_start_time = time.time()
185
def extractBenchmarkTime(self, testCase):
186
"""Add a benchmark time for the current test case."""
187
self._benchmarkTime = getattr(testCase, "_benchtime", None)
189
def _elapsedTestTimeString(self):
190
"""Return a time string for the overall time the current test has taken."""
191
return self._formatTime(time.time() - self._start_time)
193
def _testTimeString(self):
194
if self._benchmarkTime is not None:
196
self._formatTime(self._benchmarkTime),
197
self._elapsedTestTimeString())
199
return " %s" % self._elapsedTestTimeString()
201
def _formatTime(self, seconds):
202
"""Format seconds as milliseconds with leading spaces."""
203
# some benchmarks can take thousands of seconds to run, so we need 8
205
return "%8dms" % (1000 * seconds)
207
def _shortened_test_description(self, test):
209
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
212
def startTest(self, test):
213
unittest.TestResult.startTest(self, test)
214
self.report_test_start(test)
215
test.number = self.count
216
self._recordTestStartTime()
218
def _recordTestStartTime(self):
219
"""Record that a test has started."""
220
self._start_time = time.time()
222
def addError(self, test, err):
223
if isinstance(err[1], TestSkipped):
224
return self.addSkipped(test, err)
225
unittest.TestResult.addError(self, test, err)
226
# We can only do this if we have one of our TestCases, not if
228
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
229
if setKeepLogfile is not None:
231
self.extractBenchmarkTime(test)
232
self.report_error(test, err)
236
def addFailure(self, test, err):
237
unittest.TestResult.addFailure(self, test, err)
238
# We can only do this if we have one of our TestCases, not if
240
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
241
if setKeepLogfile is not None:
243
self.extractBenchmarkTime(test)
244
self.report_failure(test, err)
248
def addSuccess(self, test):
249
self.extractBenchmarkTime(test)
250
if self._bench_history is not None:
251
if self._benchmarkTime is not None:
252
self._bench_history.write("%s %s\n" % (
253
self._formatTime(self._benchmarkTime),
255
self.report_success(test)
256
unittest.TestResult.addSuccess(self, test)
258
def addSkipped(self, test, skip_excinfo):
259
self.extractBenchmarkTime(test)
260
self.report_skip(test, skip_excinfo)
261
# seems best to treat this as success from point-of-view of unittest
262
# -- it actually does nothing so it barely matters :)
265
except KeyboardInterrupt:
268
self.addError(test, test.__exc_info())
270
unittest.TestResult.addSuccess(self, test)
272
def printErrorList(self, flavour, errors):
273
for test, err in errors:
274
self.stream.writeln(self.separator1)
275
self.stream.write("%s: " % flavour)
277
self.stream.write('#%d ' % test.number)
278
self.stream.writeln(self.getDescription(test))
279
if getattr(test, '_get_log', None) is not None:
281
print >>self.stream, \
282
('vvvv[log from %s]' % test.id()).ljust(78,'-')
283
print >>self.stream, test._get_log()
284
print >>self.stream, \
285
('^^^^[log from %s]' % test.id()).ljust(78,'-')
286
self.stream.writeln(self.separator2)
287
self.stream.writeln("%s" % err)
292
def report_cleaning_up(self):
295
def report_success(self, test):
299
class TextTestResult(ExtendedTestResult):
300
"""Displays progress and results of tests in text form"""
302
def __init__(self, *args, **kw):
303
ExtendedTestResult.__init__(self, *args, **kw)
304
self.pb = self.ui.nested_progress_bar()
305
self.pb.show_pct = False
306
self.pb.show_spinner = False
307
self.pb.show_eta = False,
308
self.pb.show_count = False
309
self.pb.show_bar = False
311
def report_starting(self):
312
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
314
def _progress_prefix_text(self):
315
a = '[%d' % self.count
316
if self.num_tests is not None:
317
a +='/%d' % self.num_tests
318
a += ' in %ds' % (time.time() - self._overall_start_time)
320
a += ', %d errors' % self.error_count
321
if self.failure_count:
322
a += ', %d failed' % self.failure_count
324
a += ', %d skipped' % self.skip_count
328
def report_test_start(self, test):
331
self._progress_prefix_text()
333
+ self._shortened_test_description(test))
335
def _test_description(self, test):
337
return '#%d %s' % (self.count,
338
self._shortened_test_description(test))
340
return self._shortened_test_description(test)
342
def report_error(self, test, err):
343
self.error_count += 1
344
self.pb.note('ERROR: %s\n %s\n',
345
self._test_description(test),
349
def report_failure(self, test, err):
350
self.failure_count += 1
351
self.pb.note('FAIL: %s\n %s\n',
352
self._test_description(test),
356
def report_skip(self, test, skip_excinfo):
359
# at the moment these are mostly not things we can fix
360
# and so they just produce stipple; use the verbose reporter
363
# show test and reason for skip
364
self.pb.note('SKIP: %s\n %s\n',
365
self._shortened_test_description(test),
368
# since the class name was left behind in the still-visible
370
self.pb.note('SKIP: %s', skip_excinfo[1])
372
def report_cleaning_up(self):
373
self.pb.update('cleaning up...')
379
class VerboseTestResult(ExtendedTestResult):
380
"""Produce long output, with one line per test run plus times"""
382
def _ellipsize_to_right(self, a_string, final_width):
383
"""Truncate and pad a string, keeping the right hand side"""
384
if len(a_string) > final_width:
385
result = '...' + a_string[3-final_width:]
388
return result.ljust(final_width)
390
def report_starting(self):
391
self.stream.write('running %d tests...\n' % self.num_tests)
393
def report_test_start(self, test):
395
name = self._shortened_test_description(test)
396
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
397
# numbers, plus a trailing blank
398
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
400
self.stream.write('%5d ' % self.count)
401
self.stream.write(self._ellipsize_to_right(name,
402
osutils.terminal_width()-36))
404
self.stream.write(self._ellipsize_to_right(name,
405
osutils.terminal_width()-30))
408
def _error_summary(self, err):
412
return '%s%s' % (indent, err[1])
414
def report_error(self, test, err):
415
self.error_count += 1
416
self.stream.writeln('ERROR %s\n%s'
417
% (self._testTimeString(),
418
self._error_summary(err)))
420
def report_failure(self, test, err):
421
self.failure_count += 1
422
self.stream.writeln(' FAIL %s\n%s'
423
% (self._testTimeString(),
424
self._error_summary(err)))
426
def report_success(self, test):
427
self.stream.writeln(' OK %s' % self._testTimeString())
428
for bench_called, stats in getattr(test, '_benchcalls', []):
429
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
430
stats.pprint(file=self.stream)
433
def report_skip(self, test, skip_excinfo):
435
self.stream.writeln(' SKIP %s\n%s'
436
% (self._testTimeString(),
437
self._error_summary(skip_excinfo)))
440
class TextTestRunner(object):
441
stop_on_failure = False
449
self.stream = unittest._WritelnDecorator(stream)
450
self.descriptions = descriptions
451
self.verbosity = verbosity
452
self.keep_output = keep_output
453
self._bench_history = bench_history
456
"Run the given test case or test suite."
457
startTime = time.time()
458
if self.verbosity == 1:
459
result_class = TextTestResult
460
elif self.verbosity >= 2:
461
result_class = VerboseTestResult
462
result = result_class(self.stream,
465
bench_history=self._bench_history,
466
num_tests=test.countTestCases(),
468
result.stop_early = self.stop_on_failure
469
result.report_starting()
471
stopTime = time.time()
472
timeTaken = stopTime - startTime
474
self.stream.writeln(result.separator2)
475
run = result.testsRun
476
self.stream.writeln("Ran %d test%s in %.3fs" %
477
(run, run != 1 and "s" or "", timeTaken))
478
self.stream.writeln()
479
if not result.wasSuccessful():
480
self.stream.write("FAILED (")
481
failed, errored = map(len, (result.failures, result.errors))
483
self.stream.write("failures=%d" % failed)
485
if failed: self.stream.write(", ")
486
self.stream.write("errors=%d" % errored)
487
self.stream.writeln(")")
489
self.stream.writeln("OK")
490
if result.skip_count > 0:
491
skipped = result.skip_count
492
self.stream.writeln('%d test%s skipped' %
493
(skipped, skipped != 1 and "s" or ""))
494
result.report_cleaning_up()
495
# This is still a little bogus,
496
# but only a little. Folk not using our testrunner will
497
# have to delete their temp directories themselves.
498
test_root = TestCaseWithMemoryTransport.TEST_ROOT
499
if result.wasSuccessful() or not self.keep_output:
500
if test_root is not None:
501
# If LANG=C we probably have created some bogus paths
502
# which rmtree(unicode) will fail to delete
503
# so make sure we are using rmtree(str) to delete everything
504
# except on win32, where rmtree(str) will fail
505
# since it doesn't have the property of byte-stream paths
506
# (they are either ascii or mbcs)
507
if sys.platform == 'win32':
508
# make sure we are using the unicode win32 api
509
test_root = unicode(test_root)
511
test_root = test_root.encode(
512
sys.getfilesystemencoding())
514
osutils.rmtree(test_root)
516
if sys.platform == 'win32' and e.errno == errno.EACCES:
517
print >>sys.stderr, ('Permission denied: '
518
'unable to remove testing dir '
519
'%s' % os.path.basename(test_root))
523
note("Failed tests working directories are in '%s'\n", test_root)
524
TestCaseWithMemoryTransport.TEST_ROOT = None
529
def iter_suite_tests(suite):
530
"""Return all tests in a suite, recursing through nested suites"""
531
for item in suite._tests:
532
if isinstance(item, unittest.TestCase):
534
elif isinstance(item, unittest.TestSuite):
535
for r in iter_suite_tests(item):
538
raise Exception('unknown object %r inside test suite %r'
542
class TestSkipped(Exception):
543
"""Indicates that a test was intentionally skipped, rather than failing."""
546
class CommandFailed(Exception):
550
class StringIOWrapper(object):
551
"""A wrapper around cStringIO which just adds an encoding attribute.
553
Internally we can check sys.stdout to see what the output encoding
554
should be. However, cStringIO has no encoding attribute that we can
555
set. So we wrap it instead.
560
def __init__(self, s=None):
562
self.__dict__['_cstring'] = StringIO(s)
564
self.__dict__['_cstring'] = StringIO()
566
def __getattr__(self, name, getattr=getattr):
567
return getattr(self.__dict__['_cstring'], name)
569
def __setattr__(self, name, val):
570
if name == 'encoding':
571
self.__dict__['encoding'] = val
573
return setattr(self._cstring, name, val)
576
class TestUIFactory(ui.CLIUIFactory):
577
"""A UI Factory for testing.
579
Hide the progress bar but emit note()s.
581
Allows get_password to be tested without real tty attached.
588
super(TestUIFactory, self).__init__()
589
if stdin is not None:
590
# We use a StringIOWrapper to be able to test various
591
# encodings, but the user is still responsible to
592
# encode the string and to set the encoding attribute
593
# of StringIOWrapper.
594
self.stdin = StringIOWrapper(stdin)
596
self.stdout = sys.stdout
600
self.stderr = sys.stderr
605
"""See progress.ProgressBar.clear()."""
607
def clear_term(self):
608
"""See progress.ProgressBar.clear_term()."""
610
def clear_term(self):
611
"""See progress.ProgressBar.clear_term()."""
614
"""See progress.ProgressBar.finished()."""
616
def note(self, fmt_string, *args, **kwargs):
617
"""See progress.ProgressBar.note()."""
618
self.stdout.write((fmt_string + "\n") % args)
620
def progress_bar(self):
623
def nested_progress_bar(self):
626
def update(self, message, count=None, total=None):
627
"""See progress.ProgressBar.update()."""
629
def get_non_echoed_password(self, prompt):
630
"""Get password from stdin without trying to handle the echo mode"""
632
self.stdout.write(prompt)
633
password = self.stdin.readline()
636
if password[-1] == '\n':
637
password = password[:-1]
641
class TestCase(unittest.TestCase):
642
"""Base class for bzr unit tests.
644
Tests that need access to disk resources should subclass
645
TestCaseInTempDir not TestCase.
647
Error and debug log messages are redirected from their usual
648
location into a temporary file, the contents of which can be
649
retrieved by _get_log(). We use a real OS file, not an in-memory object,
650
so that it can also capture file IO. When the test completes this file
651
is read into memory and removed from disk.
653
There are also convenience functions to invoke bzr's command-line
654
routine, and to build and check bzr trees.
656
In addition to the usual method of overriding tearDown(), this class also
657
allows subclasses to register functions into the _cleanups list, which is
658
run in order as the object is torn down. It's less likely this will be
659
accidentally overlooked.
662
_log_file_name = None
664
_keep_log_file = False
665
# record lsprof data when performing benchmark calls.
666
_gather_lsprof_in_benchmarks = False
668
def __init__(self, methodName='testMethod'):
669
super(TestCase, self).__init__(methodName)
673
unittest.TestCase.setUp(self)
674
self._cleanEnvironment()
675
bzrlib.trace.disable_default_logging()
678
self._benchcalls = []
679
self._benchtime = None
680
# prevent hooks affecting tests
681
self._preserved_hooks = bzrlib.branch.Branch.hooks
682
self.addCleanup(self._restoreHooks)
683
# this list of hooks must be kept in sync with the defaults
685
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
687
def _silenceUI(self):
688
"""Turn off UI for duration of test"""
689
# by default the UI is off; tests can turn it on if they want it.
690
saved = ui.ui_factory
692
ui.ui_factory = saved
693
ui.ui_factory = ui.SilentUIFactory()
694
self.addCleanup(_restore)
696
def _ndiff_strings(self, a, b):
697
"""Return ndiff between two strings containing lines.
699
A trailing newline is added if missing to make the strings
701
if b and b[-1] != '\n':
703
if a and a[-1] != '\n':
705
difflines = difflib.ndiff(a.splitlines(True),
707
linejunk=lambda x: False,
708
charjunk=lambda x: False)
709
return ''.join(difflines)
711
def assertEqual(self, a, b, message=''):
716
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
718
pformat(a, indent=4), pformat(b, indent=4)))
720
assertEquals = assertEqual
722
def assertEqualDiff(self, a, b, message=None):
723
"""Assert two texts are equal, if not raise an exception.
725
This is intended for use with multi-line strings where it can
726
be hard to find the differences by eye.
728
# TODO: perhaps override assertEquals to call this for strings?
732
message = "texts not equal:\n"
733
raise AssertionError(message +
734
self._ndiff_strings(a, b))
736
def assertEqualMode(self, mode, mode_test):
737
self.assertEqual(mode, mode_test,
738
'mode mismatch %o != %o' % (mode, mode_test))
740
def assertStartsWith(self, s, prefix):
741
if not s.startswith(prefix):
742
raise AssertionError('string %r does not start with %r' % (s, prefix))
744
def assertEndsWith(self, s, suffix):
745
"""Asserts that s ends with suffix."""
746
if not s.endswith(suffix):
747
raise AssertionError('string %r does not end with %r' % (s, suffix))
749
def assertContainsRe(self, haystack, needle_re):
750
"""Assert that a contains something matching a regular expression."""
751
if not re.search(needle_re, haystack):
752
raise AssertionError('pattern "%r" not found in "%r"'
753
% (needle_re, haystack))
755
def assertNotContainsRe(self, haystack, needle_re):
756
"""Assert that a does not match a regular expression"""
757
if re.search(needle_re, haystack):
758
raise AssertionError('pattern "%s" found in "%s"'
759
% (needle_re, haystack))
761
def assertSubset(self, sublist, superlist):
762
"""Assert that every entry in sublist is present in superlist."""
764
for entry in sublist:
765
if entry not in superlist:
766
missing.append(entry)
768
raise AssertionError("value(s) %r not present in container %r" %
769
(missing, superlist))
771
def assertListRaises(self, excClass, func, *args, **kwargs):
772
"""Fail unless excClass is raised when the iterator from func is used.
774
Many functions can return generators this makes sure
775
to wrap them in a list() call to make sure the whole generator
776
is run, and that the proper exception is raised.
779
list(func(*args, **kwargs))
783
if getattr(excClass,'__name__', None) is not None:
784
excName = excClass.__name__
786
excName = str(excClass)
787
raise self.failureException, "%s not raised" % excName
789
def assertIs(self, left, right, message=None):
790
if not (left is right):
791
if message is not None:
792
raise AssertionError(message)
794
raise AssertionError("%r is not %r." % (left, right))
796
def assertIsNot(self, left, right, message=None):
798
if message is not None:
799
raise AssertionError(message)
801
raise AssertionError("%r is %r." % (left, right))
803
def assertTransportMode(self, transport, path, mode):
804
"""Fail if a path does not have mode mode.
806
If modes are not supported on this transport, the assertion is ignored.
808
if not transport._can_roundtrip_unix_modebits():
810
path_stat = transport.stat(path)
811
actual_mode = stat.S_IMODE(path_stat.st_mode)
812
self.assertEqual(mode, actual_mode,
813
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
815
def assertIsInstance(self, obj, kls):
816
"""Fail if obj is not an instance of kls"""
817
if not isinstance(obj, kls):
818
self.fail("%r is an instance of %s rather than %s" % (
819
obj, obj.__class__, kls))
821
def _capture_warnings(self, a_callable, *args, **kwargs):
822
"""A helper for callDeprecated and applyDeprecated.
824
:param a_callable: A callable to call.
825
:param args: The positional arguments for the callable
826
:param kwargs: The keyword arguments for the callable
827
:return: A tuple (warnings, result). result is the result of calling
828
a_callable(*args, **kwargs).
831
def capture_warnings(msg, cls=None, stacklevel=None):
832
# we've hooked into a deprecation specific callpath,
833
# only deprecations should getting sent via it.
834
self.assertEqual(cls, DeprecationWarning)
835
local_warnings.append(msg)
836
original_warning_method = symbol_versioning.warn
837
symbol_versioning.set_warning_method(capture_warnings)
839
result = a_callable(*args, **kwargs)
841
symbol_versioning.set_warning_method(original_warning_method)
842
return (local_warnings, result)
844
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
845
"""Call a deprecated callable without warning the user.
847
:param deprecation_format: The deprecation format that the callable
848
should have been deprecated with. This is the same type as the
849
parameter to deprecated_method/deprecated_function. If the
850
callable is not deprecated with this format, an assertion error
852
:param a_callable: A callable to call. This may be a bound method or
853
a regular function. It will be called with *args and **kwargs.
854
:param args: The positional arguments for the callable
855
:param kwargs: The keyword arguments for the callable
856
:return: The result of a_callable(*args, **kwargs)
858
call_warnings, result = self._capture_warnings(a_callable,
860
expected_first_warning = symbol_versioning.deprecation_string(
861
a_callable, deprecation_format)
862
if len(call_warnings) == 0:
863
self.fail("No deprecation warning generated by call to %s" %
865
self.assertEqual(expected_first_warning, call_warnings[0])
868
def callDeprecated(self, expected, callable, *args, **kwargs):
869
"""Assert that a callable is deprecated in a particular way.
871
This is a very precise test for unusual requirements. The
872
applyDeprecated helper function is probably more suited for most tests
873
as it allows you to simply specify the deprecation format being used
874
and will ensure that that is issued for the function being called.
876
:param expected: a list of the deprecation warnings expected, in order
877
:param callable: The callable to call
878
:param args: The positional arguments for the callable
879
:param kwargs: The keyword arguments for the callable
881
call_warnings, result = self._capture_warnings(callable,
883
self.assertEqual(expected, call_warnings)
886
def _startLogFile(self):
887
"""Send bzr and test log messages to a temporary file.
889
The file is removed as the test is torn down.
891
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
892
self._log_file = os.fdopen(fileno, 'w+')
893
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
894
self._log_file_name = name
895
self.addCleanup(self._finishLogFile)
897
def _finishLogFile(self):
898
"""Finished with the log file.
900
Close the file and delete it, unless setKeepLogfile was called.
902
if self._log_file is None:
904
bzrlib.trace.disable_test_log(self._log_nonce)
905
self._log_file.close()
906
self._log_file = None
907
if not self._keep_log_file:
908
os.remove(self._log_file_name)
909
self._log_file_name = None
911
def setKeepLogfile(self):
912
"""Make the logfile not be deleted when _finishLogFile is called."""
913
self._keep_log_file = True
915
def addCleanup(self, callable):
916
"""Arrange to run a callable when this case is torn down.
918
Callables are run in the reverse of the order they are registered,
919
ie last-in first-out.
921
if callable in self._cleanups:
922
raise ValueError("cleanup function %r already registered on %s"
924
self._cleanups.append(callable)
926
def _cleanEnvironment(self):
928
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
930
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
932
'BZREMAIL': None, # may still be present in the environment
934
'BZR_PROGRESS_BAR': None,
944
# Nobody cares about these ones AFAIK. So far at
945
# least. If you do (care), please update this comment
951
self.addCleanup(self._restoreEnvironment)
952
for name, value in new_env.iteritems():
953
self._captureVar(name, value)
955
def _captureVar(self, name, newvalue):
956
"""Set an environment variable, and reset it when finished."""
957
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
959
def _restoreEnvironment(self):
960
for name, value in self.__old_env.iteritems():
961
osutils.set_or_unset_env(name, value)
963
def _restoreHooks(self):
964
bzrlib.branch.Branch.hooks = self._preserved_hooks
968
unittest.TestCase.tearDown(self)
970
def time(self, callable, *args, **kwargs):
971
"""Run callable and accrue the time it takes to the benchmark time.
973
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
974
this will cause lsprofile statistics to be gathered and stored in
977
if self._benchtime is None:
981
if not self._gather_lsprof_in_benchmarks:
982
return callable(*args, **kwargs)
984
# record this benchmark
985
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
987
self._benchcalls.append(((callable, args, kwargs), stats))
990
self._benchtime += time.time() - start
992
def _runCleanups(self):
993
"""Run registered cleanup functions.
995
This should only be called from TestCase.tearDown.
997
# TODO: Perhaps this should keep running cleanups even if
1000
# Actually pop the cleanups from the list so tearDown running
1001
# twice is safe (this happens for skipped tests).
1002
while self._cleanups:
1003
self._cleanups.pop()()
1005
def log(self, *args):
1008
def _get_log(self, keep_log_file=False):
1009
"""Return as a string the log for this test. If the file is still
1010
on disk and keep_log_file=False, delete the log file and store the
1011
content in self._log_contents."""
1012
# flush the log file, to get all content
1014
bzrlib.trace._trace_file.flush()
1015
if self._log_contents:
1016
return self._log_contents
1017
if self._log_file_name is not None:
1018
logfile = open(self._log_file_name)
1020
log_contents = logfile.read()
1023
if not keep_log_file:
1024
self._log_contents = log_contents
1026
os.remove(self._log_file_name)
1028
if sys.platform == 'win32' and e.errno == errno.EACCES:
1029
print >>sys.stderr, ('Unable to delete log file '
1030
' %r' % self._log_file_name)
1035
return "DELETED log file to reduce memory footprint"
1037
def capture(self, cmd, retcode=0):
1038
"""Shortcut that splits cmd into words, runs, and returns stdout"""
1039
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1041
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
1043
"""Invoke bzr and return (stdout, stderr).
1045
Useful for code that wants to check the contents of the
1046
output, the way error messages are presented, etc.
1048
This should be the main method for tests that want to exercise the
1049
overall behavior of the bzr application (rather than a unit test
1050
or a functional test of the library.)
1052
Much of the old code runs bzr by forking a new copy of Python, but
1053
that is slower, harder to debug, and generally not necessary.
1055
This runs bzr through the interface that catches and reports
1056
errors, and with logging set to something approximating the
1057
default, so that error reporting can be checked.
1059
:param argv: arguments to invoke bzr
1060
:param retcode: expected return code, or None for don't-care.
1061
:param encoding: encoding for sys.stdout and sys.stderr
1062
:param stdin: A string to be used as stdin for the command.
1063
:param working_dir: Change to this directory before running
1065
if encoding is None:
1066
encoding = bzrlib.user_encoding
1067
stdout = StringIOWrapper()
1068
stderr = StringIOWrapper()
1069
stdout.encoding = encoding
1070
stderr.encoding = encoding
1072
self.log('run bzr: %r', argv)
1073
# FIXME: don't call into logging here
1074
handler = logging.StreamHandler(stderr)
1075
handler.setLevel(logging.INFO)
1076
logger = logging.getLogger('')
1077
logger.addHandler(handler)
1078
old_ui_factory = ui.ui_factory
1079
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1082
if working_dir is not None:
1083
cwd = osutils.getcwd()
1084
os.chdir(working_dir)
1087
saved_debug_flags = frozenset(debug.debug_flags)
1088
debug.debug_flags.clear()
1090
result = self.apply_redirected(ui.ui_factory.stdin,
1092
bzrlib.commands.run_bzr_catch_errors,
1095
debug.debug_flags.update(saved_debug_flags)
1097
logger.removeHandler(handler)
1098
ui.ui_factory = old_ui_factory
1102
out = stdout.getvalue()
1103
err = stderr.getvalue()
1105
self.log('output:\n%r', out)
1107
self.log('errors:\n%r', err)
1108
if retcode is not None:
1109
self.assertEquals(retcode, result)
1112
def run_bzr(self, *args, **kwargs):
1113
"""Invoke bzr, as if it were run from the command line.
1115
This should be the main method for tests that want to exercise the
1116
overall behavior of the bzr application (rather than a unit test
1117
or a functional test of the library.)
1119
This sends the stdout/stderr results into the test's log,
1120
where it may be useful for debugging. See also run_captured.
1122
:param stdin: A string to be used as stdin for the command.
1123
:param retcode: The status code the command should return
1124
:param working_dir: The directory to run the command in
1126
retcode = kwargs.pop('retcode', 0)
1127
encoding = kwargs.pop('encoding', None)
1128
stdin = kwargs.pop('stdin', None)
1129
working_dir = kwargs.pop('working_dir', None)
1130
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
1131
stdin=stdin, working_dir=working_dir)
1133
def run_bzr_decode(self, *args, **kwargs):
1134
if 'encoding' in kwargs:
1135
encoding = kwargs['encoding']
1137
encoding = bzrlib.user_encoding
1138
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1140
def run_bzr_error(self, error_regexes, *args, **kwargs):
1141
"""Run bzr, and check that stderr contains the supplied regexes
1143
:param error_regexes: Sequence of regular expressions which
1144
must each be found in the error output. The relative ordering
1146
:param args: command-line arguments for bzr
1147
:param kwargs: Keyword arguments which are interpreted by run_bzr
1148
This function changes the default value of retcode to be 3,
1149
since in most cases this is run when you expect bzr to fail.
1150
:return: (out, err) The actual output of running the command (in case you
1151
want to do more inspection)
1154
# Make sure that commit is failing because there is nothing to do
1155
self.run_bzr_error(['no changes to commit'],
1156
'commit', '-m', 'my commit comment')
1157
# Make sure --strict is handling an unknown file, rather than
1158
# giving us the 'nothing to do' error
1159
self.build_tree(['unknown'])
1160
self.run_bzr_error(['Commit refused because there are unknown files'],
1161
'commit', '--strict', '-m', 'my commit comment')
1163
kwargs.setdefault('retcode', 3)
1164
out, err = self.run_bzr(*args, **kwargs)
1165
for regex in error_regexes:
1166
self.assertContainsRe(err, regex)
1169
def run_bzr_subprocess(self, *args, **kwargs):
1170
"""Run bzr in a subprocess for testing.
1172
This starts a new Python interpreter and runs bzr in there.
1173
This should only be used for tests that have a justifiable need for
1174
this isolation: e.g. they are testing startup time, or signal
1175
handling, or early startup code, etc. Subprocess code can't be
1176
profiled or debugged so easily.
1178
:param retcode: The status code that is expected. Defaults to 0. If
1179
None is supplied, the status code is not checked.
1180
:param env_changes: A dictionary which lists changes to environment
1181
variables. A value of None will unset the env variable.
1182
The values must be strings. The change will only occur in the
1183
child, so you don't need to fix the environment after running.
1184
:param universal_newlines: Convert CRLF => LF
1185
:param allow_plugins: By default the subprocess is run with
1186
--no-plugins to ensure test reproducibility. Also, it is possible
1187
for system-wide plugins to create unexpected output on stderr,
1188
which can cause unnecessary test failures.
1190
env_changes = kwargs.get('env_changes', {})
1191
working_dir = kwargs.get('working_dir', None)
1192
allow_plugins = kwargs.get('allow_plugins', False)
1193
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1194
working_dir=working_dir,
1195
allow_plugins=allow_plugins)
1196
# We distinguish between retcode=None and retcode not passed.
1197
supplied_retcode = kwargs.get('retcode', 0)
1198
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1199
universal_newlines=kwargs.get('universal_newlines', False),
1202
def start_bzr_subprocess(self, process_args, env_changes=None,
1203
skip_if_plan_to_signal=False,
1205
allow_plugins=False):
1206
"""Start bzr in a subprocess for testing.
1208
This starts a new Python interpreter and runs bzr in there.
1209
This should only be used for tests that have a justifiable need for
1210
this isolation: e.g. they are testing startup time, or signal
1211
handling, or early startup code, etc. Subprocess code can't be
1212
profiled or debugged so easily.
1214
:param process_args: a list of arguments to pass to the bzr executable,
1215
for example `['--version']`.
1216
:param env_changes: A dictionary which lists changes to environment
1217
variables. A value of None will unset the env variable.
1218
The values must be strings. The change will only occur in the
1219
child, so you don't need to fix the environment after running.
1220
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1222
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1224
:returns: Popen object for the started process.
1226
if skip_if_plan_to_signal:
1227
if not getattr(os, 'kill', None):
1228
raise TestSkipped("os.kill not available.")
1230
if env_changes is None:
1234
def cleanup_environment():
1235
for env_var, value in env_changes.iteritems():
1236
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1238
def restore_environment():
1239
for env_var, value in old_env.iteritems():
1240
osutils.set_or_unset_env(env_var, value)
1242
bzr_path = self.get_bzr_path()
1245
if working_dir is not None:
1246
cwd = osutils.getcwd()
1247
os.chdir(working_dir)
1250
# win32 subprocess doesn't support preexec_fn
1251
# so we will avoid using it on all platforms, just to
1252
# make sure the code path is used, and we don't break on win32
1253
cleanup_environment()
1254
command = [sys.executable, bzr_path]
1255
if not allow_plugins:
1256
command.append('--no-plugins')
1257
command.extend(process_args)
1258
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1260
restore_environment()
1266
def _popen(self, *args, **kwargs):
1267
"""Place a call to Popen.
1269
Allows tests to override this method to intercept the calls made to
1270
Popen for introspection.
1272
return Popen(*args, **kwargs)
1274
def get_bzr_path(self):
1275
"""Return the path of the 'bzr' executable for this test suite."""
1276
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1277
if not os.path.isfile(bzr_path):
1278
# We are probably installed. Assume sys.argv is the right file
1279
bzr_path = sys.argv[0]
1282
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1283
universal_newlines=False, process_args=None):
1284
"""Finish the execution of process.
1286
:param process: the Popen object returned from start_bzr_subprocess.
1287
:param retcode: The status code that is expected. Defaults to 0. If
1288
None is supplied, the status code is not checked.
1289
:param send_signal: an optional signal to send to the process.
1290
:param universal_newlines: Convert CRLF => LF
1291
:returns: (stdout, stderr)
1293
if send_signal is not None:
1294
os.kill(process.pid, send_signal)
1295
out, err = process.communicate()
1297
if universal_newlines:
1298
out = out.replace('\r\n', '\n')
1299
err = err.replace('\r\n', '\n')
1301
if retcode is not None and retcode != process.returncode:
1302
if process_args is None:
1303
process_args = "(unknown args)"
1304
mutter('Output of bzr %s:\n%s', process_args, out)
1305
mutter('Error for bzr %s:\n%s', process_args, err)
1306
self.fail('Command bzr %s failed with retcode %s != %s'
1307
% (process_args, retcode, process.returncode))
1310
def check_inventory_shape(self, inv, shape):
1311
"""Compare an inventory to a list of expected names.
1313
Fail if they are not precisely equal.
1316
shape = list(shape) # copy
1317
for path, ie in inv.entries():
1318
name = path.replace('\\', '/')
1319
if ie.kind == 'dir':
1326
self.fail("expected paths not found in inventory: %r" % shape)
1328
self.fail("unexpected paths found in inventory: %r" % extras)
1330
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1331
a_callable=None, *args, **kwargs):
1332
"""Call callable with redirected std io pipes.
1334
Returns the return code."""
1335
if not callable(a_callable):
1336
raise ValueError("a_callable must be callable.")
1338
stdin = StringIO("")
1340
if getattr(self, "_log_file", None) is not None:
1341
stdout = self._log_file
1345
if getattr(self, "_log_file", None is not None):
1346
stderr = self._log_file
1349
real_stdin = sys.stdin
1350
real_stdout = sys.stdout
1351
real_stderr = sys.stderr
1356
return a_callable(*args, **kwargs)
1358
sys.stdout = real_stdout
1359
sys.stderr = real_stderr
1360
sys.stdin = real_stdin
1362
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1363
def merge(self, branch_from, wt_to):
1364
"""A helper for tests to do a ui-less merge.
1366
This should move to the main library when someone has time to integrate
1369
# minimal ui-less merge.
1370
wt_to.branch.fetch(branch_from)
1371
base_rev = common_ancestor(branch_from.last_revision(),
1372
wt_to.branch.last_revision(),
1373
wt_to.branch.repository)
1374
merge_inner(wt_to.branch, branch_from.basis_tree(),
1375
wt_to.branch.repository.revision_tree(base_rev),
1377
wt_to.add_parent_tree_id(branch_from.last_revision())
1380
BzrTestBase = TestCase
1383
class TestCaseWithMemoryTransport(TestCase):
1384
"""Common test class for tests that do not need disk resources.
1386
Tests that need disk resources should derive from TestCaseWithTransport.
1388
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1390
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1391
a directory which does not exist. This serves to help ensure test isolation
1392
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1393
must exist. However, TestCaseWithMemoryTransport does not offer local
1394
file defaults for the transport in tests, nor does it obey the command line
1395
override, so tests that accidentally write to the common directory should
1403
def __init__(self, methodName='runTest'):
1404
# allow test parameterisation after test construction and before test
1405
# execution. Variables that the parameteriser sets need to be
1406
# ones that are not set by setUp, or setUp will trash them.
1407
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1408
self.transport_server = default_transport
1409
self.transport_readonly_server = None
1411
def get_transport(self):
1412
"""Return a writeable transport for the test scratch space"""
1413
t = get_transport(self.get_url())
1414
self.assertFalse(t.is_readonly())
1417
def get_readonly_transport(self):
1418
"""Return a readonly transport for the test scratch space
1420
This can be used to test that operations which should only need
1421
readonly access in fact do not try to write.
1423
t = get_transport(self.get_readonly_url())
1424
self.assertTrue(t.is_readonly())
1427
def create_transport_readonly_server(self):
1428
"""Create a transport server from class defined at init.
1430
This is mostly a hook for daughter classes.
1432
return self.transport_readonly_server()
1434
def get_readonly_server(self):
1435
"""Get the server instance for the readonly transport
1437
This is useful for some tests with specific servers to do diagnostics.
1439
if self.__readonly_server is None:
1440
if self.transport_readonly_server is None:
1441
# readonly decorator requested
1442
# bring up the server
1444
self.__readonly_server = ReadonlyServer()
1445
self.__readonly_server.setUp(self.__server)
1447
self.__readonly_server = self.create_transport_readonly_server()
1448
self.__readonly_server.setUp()
1449
self.addCleanup(self.__readonly_server.tearDown)
1450
return self.__readonly_server
1452
def get_readonly_url(self, relpath=None):
1453
"""Get a URL for the readonly transport.
1455
This will either be backed by '.' or a decorator to the transport
1456
used by self.get_url()
1457
relpath provides for clients to get a path relative to the base url.
1458
These should only be downwards relative, not upwards.
1460
base = self.get_readonly_server().get_url()
1461
if relpath is not None:
1462
if not base.endswith('/'):
1464
base = base + relpath
1467
def get_server(self):
1468
"""Get the read/write server instance.
1470
This is useful for some tests with specific servers that need
1473
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1474
is no means to override it.
1476
if self.__server is None:
1477
self.__server = MemoryServer()
1478
self.__server.setUp()
1479
self.addCleanup(self.__server.tearDown)
1480
return self.__server
1482
def get_url(self, relpath=None):
1483
"""Get a URL (or maybe a path) for the readwrite transport.
1485
This will either be backed by '.' or to an equivalent non-file based
1487
relpath provides for clients to get a path relative to the base url.
1488
These should only be downwards relative, not upwards.
1490
base = self.get_server().get_url()
1491
if relpath is not None and relpath != '.':
1492
if not base.endswith('/'):
1494
# XXX: Really base should be a url; we did after all call
1495
# get_url()! But sometimes it's just a path (from
1496
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1497
# to a non-escaped local path.
1498
if base.startswith('./') or base.startswith('/'):
1501
base += urlutils.escape(relpath)
1504
def _make_test_root(self):
1505
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1509
root = u'test%04d.tmp' % i
1513
if e.errno == errno.EEXIST:
1518
# successfully created
1519
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1521
# make a fake bzr directory there to prevent any tests propagating
1522
# up onto the source directory's real branch
1523
bzrdir.BzrDir.create_standalone_workingtree(
1524
TestCaseWithMemoryTransport.TEST_ROOT)
1526
def makeAndChdirToTestDir(self):
1527
"""Create a temporary directories for this one test.
1529
This must set self.test_home_dir and self.test_dir and chdir to
1532
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1534
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1535
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1536
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1538
def make_branch(self, relpath, format=None):
1539
"""Create a branch on the transport at relpath."""
1540
repo = self.make_repository(relpath, format=format)
1541
return repo.bzrdir.create_branch()
1543
def make_bzrdir(self, relpath, format=None):
1545
# might be a relative or absolute path
1546
maybe_a_url = self.get_url(relpath)
1547
segments = maybe_a_url.rsplit('/', 1)
1548
t = get_transport(maybe_a_url)
1549
if len(segments) > 1 and segments[-1] not in ('', '.'):
1552
except errors.FileExists:
1556
if isinstance(format, basestring):
1557
format = bzrdir.format_registry.make_bzrdir(format)
1558
return format.initialize_on_transport(t)
1559
except errors.UninitializableFormat:
1560
raise TestSkipped("Format %s is not initializable." % format)
1562
def make_repository(self, relpath, shared=False, format=None):
1563
"""Create a repository on our default transport at relpath."""
1564
made_control = self.make_bzrdir(relpath, format=format)
1565
return made_control.create_repository(shared=shared)
1567
def make_branch_and_memory_tree(self, relpath, format=None):
1568
"""Create a branch on the default transport and a MemoryTree for it."""
1569
b = self.make_branch(relpath, format=format)
1570
return memorytree.MemoryTree.create_on_branch(b)
1572
def overrideEnvironmentForTesting(self):
1573
os.environ['HOME'] = self.test_home_dir
1574
os.environ['BZR_HOME'] = self.test_home_dir
1577
super(TestCaseWithMemoryTransport, self).setUp()
1578
self._make_test_root()
1579
_currentdir = os.getcwdu()
1580
def _leaveDirectory():
1581
os.chdir(_currentdir)
1582
self.addCleanup(_leaveDirectory)
1583
self.makeAndChdirToTestDir()
1584
self.overrideEnvironmentForTesting()
1585
self.__readonly_server = None
1586
self.__server = None
1589
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1590
"""Derived class that runs a test within a temporary directory.
1592
This is useful for tests that need to create a branch, etc.
1594
The directory is created in a slightly complex way: for each
1595
Python invocation, a new temporary top-level directory is created.
1596
All test cases create their own directory within that. If the
1597
tests complete successfully, the directory is removed.
1599
InTempDir is an old alias for FunctionalTestCase.
1602
OVERRIDE_PYTHON = 'python'
1604
def check_file_contents(self, filename, expect):
1605
self.log("check contents of file %s" % filename)
1606
contents = file(filename, 'r').read()
1607
if contents != expect:
1608
self.log("expected: %r" % expect)
1609
self.log("actually: %r" % contents)
1610
self.fail("contents of %s not as expected" % filename)
1612
def makeAndChdirToTestDir(self):
1613
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
1615
For TestCaseInTempDir we create a temporary directory based on the test
1616
name and then create two subdirs - test and home under it.
1618
if NUMBERED_DIRS: # strongly recommended on Windows
1619
# due the path length limitation (260 chars)
1620
candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
1621
int(self.number/1000),
1623
os.makedirs(candidate_dir)
1624
self.test_home_dir = candidate_dir + '/home'
1625
os.mkdir(self.test_home_dir)
1626
self.test_dir = candidate_dir + '/work'
1627
os.mkdir(self.test_dir)
1628
os.chdir(self.test_dir)
1629
# put name of test inside
1630
f = file(candidate_dir + '/name', 'w')
1635
# shorten the name, to avoid test failures due to path length
1636
short_id = self.id().replace('bzrlib.tests.', '') \
1637
.replace('__main__.', '')[-100:]
1638
# it's possible the same test class is run several times for
1639
# parameterized tests, so make sure the names don't collide.
1643
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1645
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1646
if os.path.exists(candidate_dir):
1650
os.mkdir(candidate_dir)
1651
self.test_home_dir = candidate_dir + '/home'
1652
os.mkdir(self.test_home_dir)
1653
self.test_dir = candidate_dir + '/work'
1654
os.mkdir(self.test_dir)
1655
os.chdir(self.test_dir)
1658
def build_tree(self, shape, line_endings='binary', transport=None):
1659
"""Build a test tree according to a pattern.
1661
shape is a sequence of file specifications. If the final
1662
character is '/', a directory is created.
1664
This assumes that all the elements in the tree being built are new.
1666
This doesn't add anything to a branch.
1667
:param line_endings: Either 'binary' or 'native'
1668
in binary mode, exact contents are written
1669
in native mode, the line endings match the
1670
default platform endings.
1672
:param transport: A transport to write to, for building trees on
1673
VFS's. If the transport is readonly or None,
1674
"." is opened automatically.
1676
# It's OK to just create them using forward slashes on windows.
1677
if transport is None or transport.is_readonly():
1678
transport = get_transport(".")
1680
self.assert_(isinstance(name, basestring))
1682
transport.mkdir(urlutils.escape(name[:-1]))
1684
if line_endings == 'binary':
1686
elif line_endings == 'native':
1689
raise errors.BzrError(
1690
'Invalid line ending request %r' % line_endings)
1691
content = "contents of %s%s" % (name.encode('utf-8'), end)
1692
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1694
def build_tree_contents(self, shape):
1695
build_tree_contents(shape)
1697
def assertFileEqual(self, content, path):
1698
"""Fail if path does not contain 'content'."""
1699
self.failUnlessExists(path)
1700
# TODO: jam 20060427 Shouldn't this be 'rb'?
1706
self.assertEqualDiff(content, s)
1708
def failUnlessExists(self, path):
1709
"""Fail unless path, which may be abs or relative, exists."""
1710
self.failUnless(osutils.lexists(path),path+" does not exist")
1712
def failIfExists(self, path):
1713
"""Fail if path, which may be abs or relative, exists."""
1714
self.failIf(osutils.lexists(path),path+" exists")
1717
class TestCaseWithTransport(TestCaseInTempDir):
1718
"""A test case that provides get_url and get_readonly_url facilities.
1720
These back onto two transport servers, one for readonly access and one for
1723
If no explicit class is provided for readonly access, a
1724
ReadonlyTransportDecorator is used instead which allows the use of non disk
1725
based read write transports.
1727
If an explicit class is provided for readonly access, that server and the
1728
readwrite one must both define get_url() as resolving to os.getcwd().
1731
def create_transport_server(self):
1732
"""Create a transport server from class defined at init.
1734
This is mostly a hook for daughter classes.
1736
return self.transport_server()
1738
def get_server(self):
1739
"""See TestCaseWithMemoryTransport.
1741
This is useful for some tests with specific servers that need
1744
if self.__server is None:
1745
self.__server = self.create_transport_server()
1746
self.__server.setUp()
1747
self.addCleanup(self.__server.tearDown)
1748
return self.__server
1750
def make_branch_and_tree(self, relpath, format=None):
1751
"""Create a branch on the transport and a tree locally.
1753
If the transport is not a LocalTransport, the Tree can't be created on
1754
the transport. In that case the working tree is created in the local
1755
directory, and the returned tree's branch and repository will also be
1758
This will fail if the original default transport for this test
1759
case wasn't backed by the working directory, as the branch won't
1760
be on disk for us to open it.
1762
:param format: The BzrDirFormat.
1763
:returns: the WorkingTree.
1765
# TODO: always use the local disk path for the working tree,
1766
# this obviously requires a format that supports branch references
1767
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1769
b = self.make_branch(relpath, format=format)
1771
return b.bzrdir.create_workingtree()
1772
except errors.NotLocalUrl:
1773
# We can only make working trees locally at the moment. If the
1774
# transport can't support them, then reopen the branch on a local
1775
# transport, and create the working tree there.
1777
# Possibly we should instead keep
1778
# the non-disk-backed branch and create a local checkout?
1779
bd = bzrdir.BzrDir.open(relpath)
1780
return bd.create_workingtree()
1782
def assertIsDirectory(self, relpath, transport):
1783
"""Assert that relpath within transport is a directory.
1785
This may not be possible on all transports; in that case it propagates
1786
a TransportNotPossible.
1789
mode = transport.stat(relpath).st_mode
1790
except errors.NoSuchFile:
1791
self.fail("path %s is not a directory; no such file"
1793
if not stat.S_ISDIR(mode):
1794
self.fail("path %s is not a directory; has mode %#o"
1797
def assertTreesEqual(self, left, right):
1798
"""Check that left and right have the same content and properties."""
1799
# we use a tree delta to check for equality of the content, and we
1800
# manually check for equality of other things such as the parents list.
1801
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
1802
differences = left.changes_from(right)
1803
self.assertFalse(differences.has_changed(),
1804
"Trees %r and %r are different: %r" % (left, right, differences))
1807
super(TestCaseWithTransport, self).setUp()
1808
self.__server = None
1811
class ChrootedTestCase(TestCaseWithTransport):
1812
"""A support class that provides readonly urls outside the local namespace.
1814
This is done by checking if self.transport_server is a MemoryServer. if it
1815
is then we are chrooted already, if it is not then an HttpServer is used
1818
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1819
be used without needed to redo it when a different
1820
subclass is in use ?
1824
super(ChrootedTestCase, self).setUp()
1825
if not self.transport_server == MemoryServer:
1826
self.transport_readonly_server = HttpServer
1829
def filter_suite_by_re(suite, pattern):
1830
result = TestUtil.TestSuite()
1831
filter_re = re.compile(pattern)
1832
for test in iter_suite_tests(suite):
1833
if filter_re.search(test.id()):
1834
result.addTest(test)
1838
def sort_suite_by_re(suite, pattern):
1841
filter_re = re.compile(pattern)
1842
for test in iter_suite_tests(suite):
1843
if filter_re.search(test.id()):
1847
return TestUtil.TestSuite(first + second)
1850
def run_suite(suite, name='test', verbose=False, pattern=".*",
1851
stop_on_failure=False, keep_output=False,
1852
transport=None, lsprof_timed=None, bench_history=None,
1853
matching_tests_first=None,
1854
numbered_dirs=None):
1855
global NUMBERED_DIRS
1856
if numbered_dirs is not None:
1857
NUMBERED_DIRS = bool(numbered_dirs)
1859
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1864
runner = TextTestRunner(stream=sys.stdout,
1866
verbosity=verbosity,
1867
keep_output=keep_output,
1868
bench_history=bench_history)
1869
runner.stop_on_failure=stop_on_failure
1871
if matching_tests_first:
1872
suite = sort_suite_by_re(suite, pattern)
1874
suite = filter_suite_by_re(suite, pattern)
1875
result = runner.run(suite)
1876
return result.wasSuccessful()
1879
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1882
test_suite_factory=None,
1885
matching_tests_first=None,
1886
numbered_dirs=None):
1887
"""Run the whole test suite under the enhanced runner"""
1888
# XXX: Very ugly way to do this...
1889
# Disable warning about old formats because we don't want it to disturb
1890
# any blackbox tests.
1891
from bzrlib import repository
1892
repository._deprecation_warning_done = True
1894
global default_transport
1895
if transport is None:
1896
transport = default_transport
1897
old_transport = default_transport
1898
default_transport = transport
1900
if test_suite_factory is None:
1901
suite = test_suite()
1903
suite = test_suite_factory()
1904
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1905
stop_on_failure=stop_on_failure, keep_output=keep_output,
1906
transport=transport,
1907
lsprof_timed=lsprof_timed,
1908
bench_history=bench_history,
1909
matching_tests_first=matching_tests_first,
1910
numbered_dirs=numbered_dirs)
1912
default_transport = old_transport
1916
"""Build and return TestSuite for the whole of bzrlib.
1918
This function can be replaced if you need to change the default test
1919
suite on a global basis, but it is not encouraged.
1922
'bzrlib.tests.test_ancestry',
1923
'bzrlib.tests.test_annotate',
1924
'bzrlib.tests.test_api',
1925
'bzrlib.tests.test_atomicfile',
1926
'bzrlib.tests.test_bad_files',
1927
'bzrlib.tests.test_branch',
1928
'bzrlib.tests.test_bundle',
1929
'bzrlib.tests.test_bzrdir',
1930
'bzrlib.tests.test_cache_utf8',
1931
'bzrlib.tests.test_commands',
1932
'bzrlib.tests.test_commit',
1933
'bzrlib.tests.test_commit_merge',
1934
'bzrlib.tests.test_config',
1935
'bzrlib.tests.test_conflicts',
1936
'bzrlib.tests.test_decorators',
1937
'bzrlib.tests.test_delta',
1938
'bzrlib.tests.test_diff',
1939
'bzrlib.tests.test_dirstate',
1940
'bzrlib.tests.test_doc_generate',
1941
'bzrlib.tests.test_errors',
1942
'bzrlib.tests.test_escaped_store',
1943
'bzrlib.tests.test_extract',
1944
'bzrlib.tests.test_fetch',
1945
'bzrlib.tests.test_ftp_transport',
1946
'bzrlib.tests.test_generate_docs',
1947
'bzrlib.tests.test_generate_ids',
1948
'bzrlib.tests.test_globbing',
1949
'bzrlib.tests.test_gpg',
1950
'bzrlib.tests.test_graph',
1951
'bzrlib.tests.test_hashcache',
1952
'bzrlib.tests.test_http',
1953
'bzrlib.tests.test_http_response',
1954
'bzrlib.tests.test_https_ca_bundle',
1955
'bzrlib.tests.test_identitymap',
1956
'bzrlib.tests.test_ignores',
1957
'bzrlib.tests.test_inv',
1958
'bzrlib.tests.test_knit',
1959
'bzrlib.tests.test_lazy_import',
1960
'bzrlib.tests.test_lazy_regex',
1961
'bzrlib.tests.test_lockdir',
1962
'bzrlib.tests.test_lockable_files',
1963
'bzrlib.tests.test_log',
1964
'bzrlib.tests.test_memorytree',
1965
'bzrlib.tests.test_merge',
1966
'bzrlib.tests.test_merge3',
1967
'bzrlib.tests.test_merge_core',
1968
'bzrlib.tests.test_merge_directive',
1969
'bzrlib.tests.test_missing',
1970
'bzrlib.tests.test_msgeditor',
1971
'bzrlib.tests.test_nonascii',
1972
'bzrlib.tests.test_options',
1973
'bzrlib.tests.test_osutils',
1974
'bzrlib.tests.test_osutils_encodings',
1975
'bzrlib.tests.test_patch',
1976
'bzrlib.tests.test_patches',
1977
'bzrlib.tests.test_permissions',
1978
'bzrlib.tests.test_plugins',
1979
'bzrlib.tests.test_progress',
1980
'bzrlib.tests.test_reconcile',
1981
'bzrlib.tests.test_registry',
1982
'bzrlib.tests.test_repository',
1983
'bzrlib.tests.test_revert',
1984
'bzrlib.tests.test_revision',
1985
'bzrlib.tests.test_revisionnamespaces',
1986
'bzrlib.tests.test_revisiontree',
1987
'bzrlib.tests.test_rio',
1988
'bzrlib.tests.test_sampler',
1989
'bzrlib.tests.test_selftest',
1990
'bzrlib.tests.test_setup',
1991
'bzrlib.tests.test_sftp_transport',
1992
'bzrlib.tests.test_smart_add',
1993
'bzrlib.tests.test_smart_transport',
1994
'bzrlib.tests.test_source',
1995
'bzrlib.tests.test_ssh_transport',
1996
'bzrlib.tests.test_status',
1997
'bzrlib.tests.test_store',
1998
'bzrlib.tests.test_subsume',
1999
'bzrlib.tests.test_symbol_versioning',
2000
'bzrlib.tests.test_tag',
2001
'bzrlib.tests.test_testament',
2002
'bzrlib.tests.test_textfile',
2003
'bzrlib.tests.test_textmerge',
2004
'bzrlib.tests.test_timestamp',
2005
'bzrlib.tests.test_trace',
2006
'bzrlib.tests.test_transactions',
2007
'bzrlib.tests.test_transform',
2008
'bzrlib.tests.test_transport',
2009
'bzrlib.tests.test_tree',
2010
'bzrlib.tests.test_treebuilder',
2011
'bzrlib.tests.test_tsort',
2012
'bzrlib.tests.test_tuned_gzip',
2013
'bzrlib.tests.test_ui',
2014
'bzrlib.tests.test_upgrade',
2015
'bzrlib.tests.test_urlutils',
2016
'bzrlib.tests.test_versionedfile',
2017
'bzrlib.tests.test_version',
2018
'bzrlib.tests.test_version_info',
2019
'bzrlib.tests.test_weave',
2020
'bzrlib.tests.test_whitebox',
2021
'bzrlib.tests.test_workingtree',
2022
'bzrlib.tests.test_workingtree_4',
2023
'bzrlib.tests.test_wsgi',
2024
'bzrlib.tests.test_xml',
2026
test_transport_implementations = [
2027
'bzrlib.tests.test_transport_implementations',
2028
'bzrlib.tests.test_read_bundle',
2030
suite = TestUtil.TestSuite()
2031
loader = TestUtil.TestLoader()
2032
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2033
from bzrlib.transport import TransportTestProviderAdapter
2034
adapter = TransportTestProviderAdapter()
2035
adapt_modules(test_transport_implementations, adapter, loader, suite)
2036
for package in packages_to_test():
2037
suite.addTest(package.test_suite())
2038
for m in MODULES_TO_TEST:
2039
suite.addTest(loader.loadTestsFromModule(m))
2040
for m in MODULES_TO_DOCTEST:
2042
suite.addTest(doctest.DocTestSuite(m))
2043
except ValueError, e:
2044
print '**failed to get doctest for: %s\n%s' %(m,e)
2046
for name, plugin in bzrlib.plugin.all_plugins().items():
2047
if getattr(plugin, 'test_suite', None) is not None:
2048
default_encoding = sys.getdefaultencoding()
2050
plugin_suite = plugin.test_suite()
2051
except ImportError, e:
2052
bzrlib.trace.warning(
2053
'Unable to test plugin "%s": %s', name, e)
2055
suite.addTest(plugin_suite)
2056
if default_encoding != sys.getdefaultencoding():
2057
bzrlib.trace.warning(
2058
'Plugin "%s" tried to reset default encoding to: %s', name,
2059
sys.getdefaultencoding())
2061
sys.setdefaultencoding(default_encoding)
2065
def adapt_modules(mods_list, adapter, loader, suite):
2066
"""Adapt the modules in mods_list using adapter and add to suite."""
2067
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2068
suite.addTests(adapter.adapt(test))
2071
def clean_selftest_output(root=None, quiet=False):
2072
"""Remove all selftest output directories from root directory.
2074
:param root: root directory for clean
2075
(if ommitted or None then clean current directory).
2076
:param quiet: suppress report about deleting directories
2081
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
2084
for i in os.listdir(root):
2085
if os.path.isdir(i) and re_dir.match(i):
2087
print 'delete directory:', i