1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
63
import bzrlib.commands
64
import bzrlib.timestamp
66
import bzrlib.inventory
67
import bzrlib.iterablefile
72
# lsprof not available
74
from bzrlib.merge import merge_inner
77
from bzrlib.revision import common_ancestor
79
from bzrlib import symbol_versioning
80
from bzrlib.symbol_versioning import (
86
from bzrlib.transport import get_transport
87
import bzrlib.transport
88
from bzrlib.transport.local import LocalURLServer
89
from bzrlib.transport.memory import MemoryServer
90
from bzrlib.transport.readonly import ReadonlyServer
91
from bzrlib.trace import mutter, note
92
from bzrlib.tests import TestUtil
93
from bzrlib.tests.HttpServer import HttpServer
94
from bzrlib.tests.TestUtil import (
98
from bzrlib.tests.EncodingAdapter import EncodingTestAdapter
99
from bzrlib.tests.treeshape import build_tree_contents
100
import bzrlib.version_info_formats.format_custom
101
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
103
# Mark this python module as being part of the implementation
104
# of unittest: this gives us better tracebacks where the last
105
# shown frame is the test code, not our assertXYZ.
108
default_transport = LocalURLServer
111
MODULES_TO_DOCTEST = [
121
bzrlib.version_info_formats.format_custom,
122
# quoted to avoid module-loading circularity
127
def packages_to_test():
128
"""Return a list of packages to test.
130
The packages are not globally imported so that import failures are
131
triggered when running selftest, not when importing the command.
134
import bzrlib.tests.blackbox
135
import bzrlib.tests.branch_implementations
136
import bzrlib.tests.bzrdir_implementations
137
import bzrlib.tests.commands
138
import bzrlib.tests.interrepository_implementations
139
import bzrlib.tests.interversionedfile_implementations
140
import bzrlib.tests.intertree_implementations
141
import bzrlib.tests.inventory_implementations
142
import bzrlib.tests.per_lock
143
import bzrlib.tests.repository_implementations
144
import bzrlib.tests.revisionstore_implementations
145
import bzrlib.tests.tree_implementations
146
import bzrlib.tests.workingtree_implementations
149
bzrlib.tests.blackbox,
150
bzrlib.tests.branch_implementations,
151
bzrlib.tests.bzrdir_implementations,
152
bzrlib.tests.commands,
153
bzrlib.tests.interrepository_implementations,
154
bzrlib.tests.interversionedfile_implementations,
155
bzrlib.tests.intertree_implementations,
156
bzrlib.tests.inventory_implementations,
157
bzrlib.tests.per_lock,
158
bzrlib.tests.repository_implementations,
159
bzrlib.tests.revisionstore_implementations,
160
bzrlib.tests.tree_implementations,
161
bzrlib.tests.workingtree_implementations,
165
class ExtendedTestResult(unittest._TextTestResult):
166
"""Accepts, reports and accumulates the results of running tests.
168
Compared to the unittest version this class adds support for
169
profiling, benchmarking, stopping as soon as a test fails, and
170
skipping tests. There are further-specialized subclasses for
171
different types of display.
173
When a test finishes, in whatever way, it calls one of the addSuccess,
174
addFailure or addError classes. These in turn may redirect to a more
175
specific case for the special test results supported by our extended
178
Note that just one of these objects is fed the results from many tests.
183
def __init__(self, stream, descriptions, verbosity,
187
"""Construct new TestResult.
189
:param bench_history: Optionally, a writable file object to accumulate
192
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
193
if bench_history is not None:
194
from bzrlib.version import _get_bzr_source_tree
195
src_tree = _get_bzr_source_tree()
198
revision_id = src_tree.get_parent_ids()[0]
200
# XXX: if this is a brand new tree, do the same as if there
204
# XXX: If there's no branch, what should we do?
206
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
207
self._bench_history = bench_history
208
self.ui = ui.ui_factory
209
self.num_tests = num_tests
211
self.failure_count = 0
212
self.known_failure_count = 0
214
self.not_applicable_count = 0
215
self.unsupported = {}
217
self._overall_start_time = time.time()
219
def _extractBenchmarkTime(self, testCase):
220
"""Add a benchmark time for the current test case."""
221
return getattr(testCase, "_benchtime", None)
223
def _elapsedTestTimeString(self):
224
"""Return a time string for the overall time the current test has taken."""
225
return self._formatTime(time.time() - self._start_time)
227
def _testTimeString(self, testCase):
228
benchmark_time = self._extractBenchmarkTime(testCase)
229
if benchmark_time is not None:
231
self._formatTime(benchmark_time),
232
self._elapsedTestTimeString())
234
return " %s" % self._elapsedTestTimeString()
236
def _formatTime(self, seconds):
237
"""Format seconds as milliseconds with leading spaces."""
238
# some benchmarks can take thousands of seconds to run, so we need 8
240
return "%8dms" % (1000 * seconds)
242
def _shortened_test_description(self, test):
244
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
247
def startTest(self, test):
248
unittest.TestResult.startTest(self, test)
249
self.report_test_start(test)
250
test.number = self.count
251
self._recordTestStartTime()
253
def _recordTestStartTime(self):
254
"""Record that a test has started."""
255
self._start_time = time.time()
257
def _cleanupLogFile(self, test):
258
# We can only do this if we have one of our TestCases, not if
260
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
261
if setKeepLogfile is not None:
264
def addError(self, test, err):
265
"""Tell result that test finished with an error.
267
Called from the TestCase run() method when the test
268
fails with an unexpected error.
270
self._testConcluded(test)
271
if isinstance(err[1], TestSkipped):
272
return self._addSkipped(test, err)
273
elif isinstance(err[1], UnavailableFeature):
274
return self.addNotSupported(test, err[1].args[0])
276
unittest.TestResult.addError(self, test, err)
277
self.error_count += 1
278
self.report_error(test, err)
282
def addFailure(self, test, err):
283
"""Tell result that test failed.
285
Called from the TestCase run() method when the test
286
fails because e.g. an assert() method failed.
288
self._testConcluded(test)
289
if isinstance(err[1], KnownFailure):
290
return self._addKnownFailure(test, err)
292
unittest.TestResult.addFailure(self, test, err)
293
self.failure_count += 1
294
self.report_failure(test, err)
298
def addSuccess(self, test):
299
"""Tell result that test completed successfully.
301
Called from the TestCase run()
303
self._testConcluded(test)
304
if self._bench_history is not None:
305
benchmark_time = self._extractBenchmarkTime(test)
306
if benchmark_time is not None:
307
self._bench_history.write("%s %s\n" % (
308
self._formatTime(benchmark_time),
310
self.report_success(test)
311
unittest.TestResult.addSuccess(self, test)
313
def _testConcluded(self, test):
314
"""Common code when a test has finished.
316
Called regardless of whether it succeded, failed, etc.
318
self._cleanupLogFile(test)
320
def _addKnownFailure(self, test, err):
321
self.known_failure_count += 1
322
self.report_known_failure(test, err)
324
def addNotSupported(self, test, feature):
325
"""The test will not be run because of a missing feature.
327
# this can be called in two different ways: it may be that the
328
# test started running, and then raised (through addError)
329
# UnavailableFeature. Alternatively this method can be called
330
# while probing for features before running the tests; in that
331
# case we will see startTest and stopTest, but the test will never
333
self.unsupported.setdefault(str(feature), 0)
334
self.unsupported[str(feature)] += 1
335
self.report_unsupported(test, feature)
337
def _addSkipped(self, test, skip_excinfo):
338
if isinstance(skip_excinfo[1], TestNotApplicable):
339
self.not_applicable_count += 1
340
self.report_not_applicable(test, skip_excinfo)
343
self.report_skip(test, skip_excinfo)
346
except KeyboardInterrupt:
349
self.addError(test, test._exc_info())
351
# seems best to treat this as success from point-of-view of unittest
352
# -- it actually does nothing so it barely matters :)
353
unittest.TestResult.addSuccess(self, test)
355
def printErrorList(self, flavour, errors):
356
for test, err in errors:
357
self.stream.writeln(self.separator1)
358
self.stream.write("%s: " % flavour)
359
self.stream.writeln(self.getDescription(test))
360
if getattr(test, '_get_log', None) is not None:
361
self.stream.write('\n')
363
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
364
self.stream.write('\n')
365
self.stream.write(test._get_log())
366
self.stream.write('\n')
368
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
369
self.stream.write('\n')
370
self.stream.writeln(self.separator2)
371
self.stream.writeln("%s" % err)
376
def report_cleaning_up(self):
379
def report_success(self, test):
382
def wasStrictlySuccessful(self):
383
if self.unsupported or self.known_failure_count:
385
return self.wasSuccessful()
388
class TextTestResult(ExtendedTestResult):
389
"""Displays progress and results of tests in text form"""
391
def __init__(self, stream, descriptions, verbosity,
396
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
397
bench_history, num_tests)
399
self.pb = self.ui.nested_progress_bar()
400
self._supplied_pb = False
403
self._supplied_pb = True
404
self.pb.show_pct = False
405
self.pb.show_spinner = False
406
self.pb.show_eta = False,
407
self.pb.show_count = False
408
self.pb.show_bar = False
410
def report_starting(self):
411
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
413
def _progress_prefix_text(self):
414
a = '[%d' % self.count
415
if self.num_tests is not None:
416
a +='/%d' % self.num_tests
417
a += ' in %ds' % (time.time() - self._overall_start_time)
419
a += ', %d errors' % self.error_count
420
if self.failure_count:
421
a += ', %d failed' % self.failure_count
422
if self.known_failure_count:
423
a += ', %d known failures' % self.known_failure_count
425
a += ', %d skipped' % self.skip_count
427
a += ', %d missing features' % len(self.unsupported)
431
def report_test_start(self, test):
434
self._progress_prefix_text()
436
+ self._shortened_test_description(test))
438
def _test_description(self, test):
439
return self._shortened_test_description(test)
441
def report_error(self, test, err):
442
self.pb.note('ERROR: %s\n %s\n',
443
self._test_description(test),
447
def report_failure(self, test, err):
448
self.pb.note('FAIL: %s\n %s\n',
449
self._test_description(test),
453
def report_known_failure(self, test, err):
454
self.pb.note('XFAIL: %s\n%s\n',
455
self._test_description(test), err[1])
457
def report_skip(self, test, skip_excinfo):
460
def report_not_applicable(self, test, skip_excinfo):
463
def report_unsupported(self, test, feature):
464
"""test cannot be run because feature is missing."""
466
def report_cleaning_up(self):
467
self.pb.update('cleaning up...')
470
if not self._supplied_pb:
474
class VerboseTestResult(ExtendedTestResult):
475
"""Produce long output, with one line per test run plus times"""
477
def _ellipsize_to_right(self, a_string, final_width):
478
"""Truncate and pad a string, keeping the right hand side"""
479
if len(a_string) > final_width:
480
result = '...' + a_string[3-final_width:]
483
return result.ljust(final_width)
485
def report_starting(self):
486
self.stream.write('running %d tests...\n' % self.num_tests)
488
def report_test_start(self, test):
490
name = self._shortened_test_description(test)
491
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
492
# numbers, plus a trailing blank
493
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
494
self.stream.write(self._ellipsize_to_right(name,
495
osutils.terminal_width()-30))
498
def _error_summary(self, err):
500
return '%s%s' % (indent, err[1])
502
def report_error(self, test, err):
503
self.stream.writeln('ERROR %s\n%s'
504
% (self._testTimeString(test),
505
self._error_summary(err)))
507
def report_failure(self, test, err):
508
self.stream.writeln(' FAIL %s\n%s'
509
% (self._testTimeString(test),
510
self._error_summary(err)))
512
def report_known_failure(self, test, err):
513
self.stream.writeln('XFAIL %s\n%s'
514
% (self._testTimeString(test),
515
self._error_summary(err)))
517
def report_success(self, test):
518
self.stream.writeln(' OK %s' % self._testTimeString(test))
519
for bench_called, stats in getattr(test, '_benchcalls', []):
520
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
521
stats.pprint(file=self.stream)
522
# flush the stream so that we get smooth output. This verbose mode is
523
# used to show the output in PQM.
526
def report_skip(self, test, skip_excinfo):
527
self.stream.writeln(' SKIP %s\n%s'
528
% (self._testTimeString(test),
529
self._error_summary(skip_excinfo)))
531
def report_not_applicable(self, test, skip_excinfo):
532
self.stream.writeln(' N/A %s\n%s'
533
% (self._testTimeString(test),
534
self._error_summary(skip_excinfo)))
536
def report_unsupported(self, test, feature):
537
"""test cannot be run because feature is missing."""
538
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
539
%(self._testTimeString(test), feature))
542
class TextTestRunner(object):
543
stop_on_failure = False
552
self.stream = unittest._WritelnDecorator(stream)
553
self.descriptions = descriptions
554
self.verbosity = verbosity
555
self._bench_history = bench_history
556
self.list_only = list_only
559
"Run the given test case or test suite."
560
startTime = time.time()
561
if self.verbosity == 1:
562
result_class = TextTestResult
563
elif self.verbosity >= 2:
564
result_class = VerboseTestResult
565
result = result_class(self.stream,
568
bench_history=self._bench_history,
569
num_tests=test.countTestCases(),
571
result.stop_early = self.stop_on_failure
572
result.report_starting()
574
if self.verbosity >= 2:
575
self.stream.writeln("Listing tests only ...\n")
577
for t in iter_suite_tests(test):
578
self.stream.writeln("%s" % (t.id()))
580
actionTaken = "Listed"
583
run = result.testsRun
585
stopTime = time.time()
586
timeTaken = stopTime - startTime
588
self.stream.writeln(result.separator2)
589
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
590
run, run != 1 and "s" or "", timeTaken))
591
self.stream.writeln()
592
if not result.wasSuccessful():
593
self.stream.write("FAILED (")
594
failed, errored = map(len, (result.failures, result.errors))
596
self.stream.write("failures=%d" % failed)
598
if failed: self.stream.write(", ")
599
self.stream.write("errors=%d" % errored)
600
if result.known_failure_count:
601
if failed or errored: self.stream.write(", ")
602
self.stream.write("known_failure_count=%d" %
603
result.known_failure_count)
604
self.stream.writeln(")")
606
if result.known_failure_count:
607
self.stream.writeln("OK (known_failures=%d)" %
608
result.known_failure_count)
610
self.stream.writeln("OK")
611
if result.skip_count > 0:
612
skipped = result.skip_count
613
self.stream.writeln('%d test%s skipped' %
614
(skipped, skipped != 1 and "s" or ""))
615
if result.unsupported:
616
for feature, count in sorted(result.unsupported.items()):
617
self.stream.writeln("Missing feature '%s' skipped %d tests." %
623
def iter_suite_tests(suite):
624
"""Return all tests in a suite, recursing through nested suites"""
625
for item in suite._tests:
626
if isinstance(item, unittest.TestCase):
628
elif isinstance(item, unittest.TestSuite):
629
for r in iter_suite_tests(item):
632
raise Exception('unknown object %r inside test suite %r'
636
class TestSkipped(Exception):
637
"""Indicates that a test was intentionally skipped, rather than failing."""
640
class TestNotApplicable(TestSkipped):
641
"""A test is not applicable to the situation where it was run.
643
This is only normally raised by parameterized tests, if they find that
644
the instance they're constructed upon does not support one aspect
649
class KnownFailure(AssertionError):
650
"""Indicates that a test failed in a precisely expected manner.
652
Such failures dont block the whole test suite from passing because they are
653
indicators of partially completed code or of future work. We have an
654
explicit error for them so that we can ensure that they are always visible:
655
KnownFailures are always shown in the output of bzr selftest.
659
class UnavailableFeature(Exception):
660
"""A feature required for this test was not available.
662
The feature should be used to construct the exception.
666
class CommandFailed(Exception):
670
class StringIOWrapper(object):
671
"""A wrapper around cStringIO which just adds an encoding attribute.
673
Internally we can check sys.stdout to see what the output encoding
674
should be. However, cStringIO has no encoding attribute that we can
675
set. So we wrap it instead.
680
def __init__(self, s=None):
682
self.__dict__['_cstring'] = StringIO(s)
684
self.__dict__['_cstring'] = StringIO()
686
def __getattr__(self, name, getattr=getattr):
687
return getattr(self.__dict__['_cstring'], name)
689
def __setattr__(self, name, val):
690
if name == 'encoding':
691
self.__dict__['encoding'] = val
693
return setattr(self._cstring, name, val)
696
class TestUIFactory(ui.CLIUIFactory):
697
"""A UI Factory for testing.
699
Hide the progress bar but emit note()s.
701
Allows get_password to be tested without real tty attached.
708
super(TestUIFactory, self).__init__()
709
if stdin is not None:
710
# We use a StringIOWrapper to be able to test various
711
# encodings, but the user is still responsible to
712
# encode the string and to set the encoding attribute
713
# of StringIOWrapper.
714
self.stdin = StringIOWrapper(stdin)
716
self.stdout = sys.stdout
720
self.stderr = sys.stderr
725
"""See progress.ProgressBar.clear()."""
727
def clear_term(self):
728
"""See progress.ProgressBar.clear_term()."""
730
def clear_term(self):
731
"""See progress.ProgressBar.clear_term()."""
734
"""See progress.ProgressBar.finished()."""
736
def note(self, fmt_string, *args, **kwargs):
737
"""See progress.ProgressBar.note()."""
738
self.stdout.write((fmt_string + "\n") % args)
740
def progress_bar(self):
743
def nested_progress_bar(self):
746
def update(self, message, count=None, total=None):
747
"""See progress.ProgressBar.update()."""
749
def get_non_echoed_password(self, prompt):
750
"""Get password from stdin without trying to handle the echo mode"""
752
self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
753
password = self.stdin.readline()
756
if password[-1] == '\n':
757
password = password[:-1]
761
class TestCase(unittest.TestCase):
762
"""Base class for bzr unit tests.
764
Tests that need access to disk resources should subclass
765
TestCaseInTempDir not TestCase.
767
Error and debug log messages are redirected from their usual
768
location into a temporary file, the contents of which can be
769
retrieved by _get_log(). We use a real OS file, not an in-memory object,
770
so that it can also capture file IO. When the test completes this file
771
is read into memory and removed from disk.
773
There are also convenience functions to invoke bzr's command-line
774
routine, and to build and check bzr trees.
776
In addition to the usual method of overriding tearDown(), this class also
777
allows subclasses to register functions into the _cleanups list, which is
778
run in order as the object is torn down. It's less likely this will be
779
accidentally overlooked.
782
_log_file_name = None
784
_keep_log_file = False
785
# record lsprof data when performing benchmark calls.
786
_gather_lsprof_in_benchmarks = False
788
def __init__(self, methodName='testMethod'):
789
super(TestCase, self).__init__(methodName)
793
unittest.TestCase.setUp(self)
794
self._cleanEnvironment()
795
bzrlib.trace.disable_default_logging()
798
self._benchcalls = []
799
self._benchtime = None
801
self._clear_debug_flags()
803
def _clear_debug_flags(self):
804
"""Prevent externally set debug flags affecting tests.
806
Tests that want to use debug flags can just set them in the
807
debug_flags set during setup/teardown.
809
self._preserved_debug_flags = set(debug.debug_flags)
810
debug.debug_flags.clear()
811
self.addCleanup(self._restore_debug_flags)
813
def _clear_hooks(self):
814
# prevent hooks affecting tests
816
import bzrlib.smart.server
817
self._preserved_hooks = {
818
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
819
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
821
self.addCleanup(self._restoreHooks)
822
# reset all hooks to an empty instance of the appropriate type
823
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
824
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
826
def _silenceUI(self):
827
"""Turn off UI for duration of test"""
828
# by default the UI is off; tests can turn it on if they want it.
829
saved = ui.ui_factory
831
ui.ui_factory = saved
832
ui.ui_factory = ui.SilentUIFactory()
833
self.addCleanup(_restore)
835
def _ndiff_strings(self, a, b):
836
"""Return ndiff between two strings containing lines.
838
A trailing newline is added if missing to make the strings
840
if b and b[-1] != '\n':
842
if a and a[-1] != '\n':
844
difflines = difflib.ndiff(a.splitlines(True),
846
linejunk=lambda x: False,
847
charjunk=lambda x: False)
848
return ''.join(difflines)
850
def assertEqual(self, a, b, message=''):
854
except UnicodeError, e:
855
# If we can't compare without getting a UnicodeError, then
856
# obviously they are different
857
mutter('UnicodeError: %s', e)
860
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
862
pformat(a), pformat(b)))
864
assertEquals = assertEqual
866
def assertEqualDiff(self, a, b, message=None):
867
"""Assert two texts are equal, if not raise an exception.
869
This is intended for use with multi-line strings where it can
870
be hard to find the differences by eye.
872
# TODO: perhaps override assertEquals to call this for strings?
876
message = "texts not equal:\n"
877
raise AssertionError(message +
878
self._ndiff_strings(a, b))
880
def assertEqualMode(self, mode, mode_test):
881
self.assertEqual(mode, mode_test,
882
'mode mismatch %o != %o' % (mode, mode_test))
884
def assertPositive(self, val):
885
"""Assert that val is greater than 0."""
886
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
888
def assertNegative(self, val):
889
"""Assert that val is less than 0."""
890
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
892
def assertStartsWith(self, s, prefix):
893
if not s.startswith(prefix):
894
raise AssertionError('string %r does not start with %r' % (s, prefix))
896
def assertEndsWith(self, s, suffix):
897
"""Asserts that s ends with suffix."""
898
if not s.endswith(suffix):
899
raise AssertionError('string %r does not end with %r' % (s, suffix))
901
def assertContainsRe(self, haystack, needle_re):
902
"""Assert that a contains something matching a regular expression."""
903
if not re.search(needle_re, haystack):
904
if '\n' in haystack or len(haystack) > 60:
905
# a long string, format it in a more readable way
906
raise AssertionError(
907
'pattern "%s" not found in\n"""\\\n%s"""\n'
908
% (needle_re, haystack))
910
raise AssertionError('pattern "%s" not found in "%s"'
911
% (needle_re, haystack))
913
def assertNotContainsRe(self, haystack, needle_re):
914
"""Assert that a does not match a regular expression"""
915
if re.search(needle_re, haystack):
916
raise AssertionError('pattern "%s" found in "%s"'
917
% (needle_re, haystack))
919
def assertSubset(self, sublist, superlist):
920
"""Assert that every entry in sublist is present in superlist."""
921
missing = set(sublist) - set(superlist)
923
raise AssertionError("value(s) %r not present in container %r" %
924
(missing, superlist))
926
def assertListRaises(self, excClass, func, *args, **kwargs):
927
"""Fail unless excClass is raised when the iterator from func is used.
929
Many functions can return generators this makes sure
930
to wrap them in a list() call to make sure the whole generator
931
is run, and that the proper exception is raised.
934
list(func(*args, **kwargs))
938
if getattr(excClass,'__name__', None) is not None:
939
excName = excClass.__name__
941
excName = str(excClass)
942
raise self.failureException, "%s not raised" % excName
944
def assertRaises(self, excClass, callableObj, *args, **kwargs):
945
"""Assert that a callable raises a particular exception.
947
:param excClass: As for the except statement, this may be either an
948
exception class, or a tuple of classes.
949
:param callableObj: A callable, will be passed ``*args`` and
952
Returns the exception so that you can examine it.
955
callableObj(*args, **kwargs)
959
if getattr(excClass,'__name__', None) is not None:
960
excName = excClass.__name__
963
excName = str(excClass)
964
raise self.failureException, "%s not raised" % excName
966
def assertIs(self, left, right, message=None):
967
if not (left is right):
968
if message is not None:
969
raise AssertionError(message)
971
raise AssertionError("%r is not %r." % (left, right))
973
def assertIsNot(self, left, right, message=None):
975
if message is not None:
976
raise AssertionError(message)
978
raise AssertionError("%r is %r." % (left, right))
980
def assertTransportMode(self, transport, path, mode):
981
"""Fail if a path does not have mode mode.
983
If modes are not supported on this transport, the assertion is ignored.
985
if not transport._can_roundtrip_unix_modebits():
987
path_stat = transport.stat(path)
988
actual_mode = stat.S_IMODE(path_stat.st_mode)
989
self.assertEqual(mode, actual_mode,
990
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
992
def assertIsSameRealPath(self, path1, path2):
993
"""Fail if path1 and path2 points to different files"""
994
self.assertEqual(osutils.realpath(path1),
995
osutils.realpath(path2),
996
"apparent paths:\na = %s\nb = %s\n," % (path1, path2))
998
def assertIsInstance(self, obj, kls):
999
"""Fail if obj is not an instance of kls"""
1000
if not isinstance(obj, kls):
1001
self.fail("%r is an instance of %s rather than %s" % (
1002
obj, obj.__class__, kls))
1004
def expectFailure(self, reason, assertion, *args, **kwargs):
1005
"""Invoke a test, expecting it to fail for the given reason.
1007
This is for assertions that ought to succeed, but currently fail.
1008
(The failure is *expected* but not *wanted*.) Please be very precise
1009
about the failure you're expecting. If a new bug is introduced,
1010
AssertionError should be raised, not KnownFailure.
1012
Frequently, expectFailure should be followed by an opposite assertion.
1015
Intended to be used with a callable that raises AssertionError as the
1016
'assertion' parameter. args and kwargs are passed to the 'assertion'.
1018
Raises KnownFailure if the test fails. Raises AssertionError if the
1023
self.expectFailure('Math is broken', self.assertNotEqual, 54,
1025
self.assertEqual(42, dynamic_val)
1027
This means that a dynamic_val of 54 will cause the test to raise
1028
a KnownFailure. Once math is fixed and the expectFailure is removed,
1029
only a dynamic_val of 42 will allow the test to pass. Anything other
1030
than 54 or 42 will cause an AssertionError.
1033
assertion(*args, **kwargs)
1034
except AssertionError:
1035
raise KnownFailure(reason)
1037
self.fail('Unexpected success. Should have failed: %s' % reason)
1039
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1040
"""A helper for callDeprecated and applyDeprecated.
1042
:param a_callable: A callable to call.
1043
:param args: The positional arguments for the callable
1044
:param kwargs: The keyword arguments for the callable
1045
:return: A tuple (warnings, result). result is the result of calling
1046
a_callable(``*args``, ``**kwargs``).
1049
def capture_warnings(msg, cls=None, stacklevel=None):
1050
# we've hooked into a deprecation specific callpath,
1051
# only deprecations should getting sent via it.
1052
self.assertEqual(cls, DeprecationWarning)
1053
local_warnings.append(msg)
1054
original_warning_method = symbol_versioning.warn
1055
symbol_versioning.set_warning_method(capture_warnings)
1057
result = a_callable(*args, **kwargs)
1059
symbol_versioning.set_warning_method(original_warning_method)
1060
return (local_warnings, result)
1062
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1063
"""Call a deprecated callable without warning the user.
1065
Note that this only captures warnings raised by symbol_versioning.warn,
1066
not other callers that go direct to the warning module.
1068
To test that a deprecated method raises an error, do something like
1071
self.assertRaises(errors.ReservedId,
1072
self.applyDeprecated, zero_ninetyone,
1073
br.append_revision, 'current:')
1075
:param deprecation_format: The deprecation format that the callable
1076
should have been deprecated with. This is the same type as the
1077
parameter to deprecated_method/deprecated_function. If the
1078
callable is not deprecated with this format, an assertion error
1080
:param a_callable: A callable to call. This may be a bound method or
1081
a regular function. It will be called with ``*args`` and
1083
:param args: The positional arguments for the callable
1084
:param kwargs: The keyword arguments for the callable
1085
:return: The result of a_callable(``*args``, ``**kwargs``)
1087
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1089
expected_first_warning = symbol_versioning.deprecation_string(
1090
a_callable, deprecation_format)
1091
if len(call_warnings) == 0:
1092
self.fail("No deprecation warning generated by call to %s" %
1094
self.assertEqual(expected_first_warning, call_warnings[0])
1097
def callCatchWarnings(self, fn, *args, **kw):
1098
"""Call a callable that raises python warnings.
1100
The caller's responsible for examining the returned warnings.
1102
If the callable raises an exception, the exception is not
1103
caught and propagates up to the caller. In that case, the list
1104
of warnings is not available.
1106
:returns: ([warning_object, ...], fn_result)
1108
# XXX: This is not perfect, because it completely overrides the
1109
# warnings filters, and some code may depend on suppressing particular
1110
# warnings. It's the easiest way to insulate ourselves from -Werror,
1111
# though. -- Andrew, 20071062
1113
def _catcher(message, category, filename, lineno, file=None):
1114
# despite the name, 'message' is normally(?) a Warning subclass
1116
wlist.append(message)
1117
saved_showwarning = warnings.showwarning
1118
saved_filters = warnings.filters
1120
warnings.showwarning = _catcher
1121
warnings.filters = []
1122
result = fn(*args, **kw)
1124
warnings.showwarning = saved_showwarning
1125
warnings.filters = saved_filters
1126
return wlist, result
1128
def callDeprecated(self, expected, callable, *args, **kwargs):
1129
"""Assert that a callable is deprecated in a particular way.
1131
This is a very precise test for unusual requirements. The
1132
applyDeprecated helper function is probably more suited for most tests
1133
as it allows you to simply specify the deprecation format being used
1134
and will ensure that that is issued for the function being called.
1136
Note that this only captures warnings raised by symbol_versioning.warn,
1137
not other callers that go direct to the warning module. To catch
1138
general warnings, use callCatchWarnings.
1140
:param expected: a list of the deprecation warnings expected, in order
1141
:param callable: The callable to call
1142
:param args: The positional arguments for the callable
1143
:param kwargs: The keyword arguments for the callable
1145
call_warnings, result = self._capture_deprecation_warnings(callable,
1147
self.assertEqual(expected, call_warnings)
1150
def _startLogFile(self):
1151
"""Send bzr and test log messages to a temporary file.
1153
The file is removed as the test is torn down.
1155
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1156
self._log_file = os.fdopen(fileno, 'w+')
1157
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
1158
self._log_file_name = name
1159
self.addCleanup(self._finishLogFile)
1161
def _finishLogFile(self):
1162
"""Finished with the log file.
1164
Close the file and delete it, unless setKeepLogfile was called.
1166
if self._log_file is None:
1168
bzrlib.trace.disable_test_log(self._log_nonce)
1169
self._log_file.close()
1170
self._log_file = None
1171
if not self._keep_log_file:
1172
os.remove(self._log_file_name)
1173
self._log_file_name = None
1175
def setKeepLogfile(self):
1176
"""Make the logfile not be deleted when _finishLogFile is called."""
1177
self._keep_log_file = True
1179
def addCleanup(self, callable):
1180
"""Arrange to run a callable when this case is torn down.
1182
Callables are run in the reverse of the order they are registered,
1183
ie last-in first-out.
1185
if callable in self._cleanups:
1186
raise ValueError("cleanup function %r already registered on %s"
1188
self._cleanups.append(callable)
1190
def _cleanEnvironment(self):
1192
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1193
'HOME': os.getcwd(),
1194
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
1195
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1197
'BZREMAIL': None, # may still be present in the environment
1199
'BZR_PROGRESS_BAR': None,
1201
'SSH_AUTH_SOCK': None,
1205
'https_proxy': None,
1206
'HTTPS_PROXY': None,
1211
# Nobody cares about these ones AFAIK. So far at
1212
# least. If you do (care), please update this comment
1216
'BZR_REMOTE_PATH': None,
1219
self.addCleanup(self._restoreEnvironment)
1220
for name, value in new_env.iteritems():
1221
self._captureVar(name, value)
1223
def _captureVar(self, name, newvalue):
1224
"""Set an environment variable, and reset it when finished."""
1225
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1227
def _restore_debug_flags(self):
1228
debug.debug_flags.clear()
1229
debug.debug_flags.update(self._preserved_debug_flags)
1231
def _restoreEnvironment(self):
1232
for name, value in self.__old_env.iteritems():
1233
osutils.set_or_unset_env(name, value)
1235
def _restoreHooks(self):
1236
for klass, hooks in self._preserved_hooks.items():
1237
setattr(klass, 'hooks', hooks)
1239
def knownFailure(self, reason):
1240
"""This test has failed for some known reason."""
1241
raise KnownFailure(reason)
1243
def run(self, result=None):
1244
if result is None: result = self.defaultTestResult()
1245
for feature in getattr(self, '_test_needs_features', []):
1246
if not feature.available():
1247
result.startTest(self)
1248
if getattr(result, 'addNotSupported', None):
1249
result.addNotSupported(self, feature)
1251
result.addSuccess(self)
1252
result.stopTest(self)
1254
return unittest.TestCase.run(self, result)
1258
unittest.TestCase.tearDown(self)
1260
def time(self, callable, *args, **kwargs):
1261
"""Run callable and accrue the time it takes to the benchmark time.
1263
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1264
this will cause lsprofile statistics to be gathered and stored in
1267
if self._benchtime is None:
1271
if not self._gather_lsprof_in_benchmarks:
1272
return callable(*args, **kwargs)
1274
# record this benchmark
1275
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1277
self._benchcalls.append(((callable, args, kwargs), stats))
1280
self._benchtime += time.time() - start
1282
def _runCleanups(self):
1283
"""Run registered cleanup functions.
1285
This should only be called from TestCase.tearDown.
1287
# TODO: Perhaps this should keep running cleanups even if
1288
# one of them fails?
1290
# Actually pop the cleanups from the list so tearDown running
1291
# twice is safe (this happens for skipped tests).
1292
while self._cleanups:
1293
self._cleanups.pop()()
1295
def log(self, *args):
1298
def _get_log(self, keep_log_file=False):
1299
"""Get the log from bzrlib.trace calls from this test.
1301
:param keep_log_file: When True, if the log is still a file on disk
1302
leave it as a file on disk. When False, if the log is still a file
1303
on disk, the log file is deleted and the log preserved as
1305
:return: A string containing the log.
1307
# flush the log file, to get all content
1309
bzrlib.trace._trace_file.flush()
1310
if self._log_contents:
1311
return self._log_contents
1312
if self._log_file_name is not None:
1313
logfile = open(self._log_file_name)
1315
log_contents = logfile.read()
1318
if not keep_log_file:
1319
self._log_contents = log_contents
1321
os.remove(self._log_file_name)
1323
if sys.platform == 'win32' and e.errno == errno.EACCES:
1324
sys.stderr.write(('Unable to delete log file '
1325
' %r\n' % self._log_file_name))
1330
return "DELETED log file to reduce memory footprint"
1332
def requireFeature(self, feature):
1333
"""This test requires a specific feature is available.
1335
:raises UnavailableFeature: When feature is not available.
1337
if not feature.available():
1338
raise UnavailableFeature(feature)
1340
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1342
"""Run bazaar command line, splitting up a string command line."""
1343
if isinstance(args, basestring):
1344
# shlex don't understand unicode strings,
1345
# so args should be plain string (bialix 20070906)
1346
args = list(shlex.split(str(args)))
1347
return self._run_bzr_core(args, retcode=retcode,
1348
encoding=encoding, stdin=stdin, working_dir=working_dir,
1351
def _run_bzr_core(self, args, retcode, encoding, stdin,
1353
if encoding is None:
1354
encoding = bzrlib.user_encoding
1355
stdout = StringIOWrapper()
1356
stderr = StringIOWrapper()
1357
stdout.encoding = encoding
1358
stderr.encoding = encoding
1360
self.log('run bzr: %r', args)
1361
# FIXME: don't call into logging here
1362
handler = logging.StreamHandler(stderr)
1363
handler.setLevel(logging.INFO)
1364
logger = logging.getLogger('')
1365
logger.addHandler(handler)
1366
old_ui_factory = ui.ui_factory
1367
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1370
if working_dir is not None:
1371
cwd = osutils.getcwd()
1372
os.chdir(working_dir)
1375
result = self.apply_redirected(ui.ui_factory.stdin,
1377
bzrlib.commands.run_bzr_catch_user_errors,
1380
logger.removeHandler(handler)
1381
ui.ui_factory = old_ui_factory
1385
out = stdout.getvalue()
1386
err = stderr.getvalue()
1388
self.log('output:\n%r', out)
1390
self.log('errors:\n%r', err)
1391
if retcode is not None:
1392
self.assertEquals(retcode, result,
1393
message='Unexpected return code')
1396
def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1397
working_dir=None, error_regexes=[], output_encoding=None):
1398
"""Invoke bzr, as if it were run from the command line.
1400
The argument list should not include the bzr program name - the
1401
first argument is normally the bzr command. Arguments may be
1402
passed in three ways:
1404
1- A list of strings, eg ["commit", "a"]. This is recommended
1405
when the command contains whitespace or metacharacters, or
1406
is built up at run time.
1408
2- A single string, eg "add a". This is the most convenient
1409
for hardcoded commands.
1411
This runs bzr through the interface that catches and reports
1412
errors, and with logging set to something approximating the
1413
default, so that error reporting can be checked.
1415
This should be the main method for tests that want to exercise the
1416
overall behavior of the bzr application (rather than a unit test
1417
or a functional test of the library.)
1419
This sends the stdout/stderr results into the test's log,
1420
where it may be useful for debugging. See also run_captured.
1422
:keyword stdin: A string to be used as stdin for the command.
1423
:keyword retcode: The status code the command should return;
1425
:keyword working_dir: The directory to run the command in
1426
:keyword error_regexes: A list of expected error messages. If
1427
specified they must be seen in the error output of the command.
1429
out, err = self._run_bzr_autosplit(
1434
working_dir=working_dir,
1436
for regex in error_regexes:
1437
self.assertContainsRe(err, regex)
1440
def run_bzr_error(self, error_regexes, *args, **kwargs):
1441
"""Run bzr, and check that stderr contains the supplied regexes
1443
:param error_regexes: Sequence of regular expressions which
1444
must each be found in the error output. The relative ordering
1446
:param args: command-line arguments for bzr
1447
:param kwargs: Keyword arguments which are interpreted by run_bzr
1448
This function changes the default value of retcode to be 3,
1449
since in most cases this is run when you expect bzr to fail.
1451
:return: (out, err) The actual output of running the command (in case
1452
you want to do more inspection)
1456
# Make sure that commit is failing because there is nothing to do
1457
self.run_bzr_error(['no changes to commit'],
1458
['commit', '-m', 'my commit comment'])
1459
# Make sure --strict is handling an unknown file, rather than
1460
# giving us the 'nothing to do' error
1461
self.build_tree(['unknown'])
1462
self.run_bzr_error(['Commit refused because there are unknown files'],
1463
['commit', --strict', '-m', 'my commit comment'])
1465
kwargs.setdefault('retcode', 3)
1466
kwargs['error_regexes'] = error_regexes
1467
out, err = self.run_bzr(*args, **kwargs)
1470
def run_bzr_subprocess(self, *args, **kwargs):
1471
"""Run bzr in a subprocess for testing.
1473
This starts a new Python interpreter and runs bzr in there.
1474
This should only be used for tests that have a justifiable need for
1475
this isolation: e.g. they are testing startup time, or signal
1476
handling, or early startup code, etc. Subprocess code can't be
1477
profiled or debugged so easily.
1479
:keyword retcode: The status code that is expected. Defaults to 0. If
1480
None is supplied, the status code is not checked.
1481
:keyword env_changes: A dictionary which lists changes to environment
1482
variables. A value of None will unset the env variable.
1483
The values must be strings. The change will only occur in the
1484
child, so you don't need to fix the environment after running.
1485
:keyword universal_newlines: Convert CRLF => LF
1486
:keyword allow_plugins: By default the subprocess is run with
1487
--no-plugins to ensure test reproducibility. Also, it is possible
1488
for system-wide plugins to create unexpected output on stderr,
1489
which can cause unnecessary test failures.
1491
env_changes = kwargs.get('env_changes', {})
1492
working_dir = kwargs.get('working_dir', None)
1493
allow_plugins = kwargs.get('allow_plugins', False)
1495
if isinstance(args[0], list):
1497
elif isinstance(args[0], basestring):
1498
args = list(shlex.split(args[0]))
1500
symbol_versioning.warn(zero_ninetyone %
1501
"passing varargs to run_bzr_subprocess",
1502
DeprecationWarning, stacklevel=3)
1503
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1504
working_dir=working_dir,
1505
allow_plugins=allow_plugins)
1506
# We distinguish between retcode=None and retcode not passed.
1507
supplied_retcode = kwargs.get('retcode', 0)
1508
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1509
universal_newlines=kwargs.get('universal_newlines', False),
1512
def start_bzr_subprocess(self, process_args, env_changes=None,
1513
skip_if_plan_to_signal=False,
1515
allow_plugins=False):
1516
"""Start bzr in a subprocess for testing.
1518
This starts a new Python interpreter and runs bzr in there.
1519
This should only be used for tests that have a justifiable need for
1520
this isolation: e.g. they are testing startup time, or signal
1521
handling, or early startup code, etc. Subprocess code can't be
1522
profiled or debugged so easily.
1524
:param process_args: a list of arguments to pass to the bzr executable,
1525
for example ``['--version']``.
1526
:param env_changes: A dictionary which lists changes to environment
1527
variables. A value of None will unset the env variable.
1528
The values must be strings. The change will only occur in the
1529
child, so you don't need to fix the environment after running.
1530
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1532
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1534
:returns: Popen object for the started process.
1536
if skip_if_plan_to_signal:
1537
if not getattr(os, 'kill', None):
1538
raise TestSkipped("os.kill not available.")
1540
if env_changes is None:
1544
def cleanup_environment():
1545
for env_var, value in env_changes.iteritems():
1546
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1548
def restore_environment():
1549
for env_var, value in old_env.iteritems():
1550
osutils.set_or_unset_env(env_var, value)
1552
bzr_path = self.get_bzr_path()
1555
if working_dir is not None:
1556
cwd = osutils.getcwd()
1557
os.chdir(working_dir)
1560
# win32 subprocess doesn't support preexec_fn
1561
# so we will avoid using it on all platforms, just to
1562
# make sure the code path is used, and we don't break on win32
1563
cleanup_environment()
1564
command = [sys.executable, bzr_path]
1565
if not allow_plugins:
1566
command.append('--no-plugins')
1567
command.extend(process_args)
1568
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1570
restore_environment()
1576
def _popen(self, *args, **kwargs):
1577
"""Place a call to Popen.
1579
Allows tests to override this method to intercept the calls made to
1580
Popen for introspection.
1582
return Popen(*args, **kwargs)
1584
def get_bzr_path(self):
1585
"""Return the path of the 'bzr' executable for this test suite."""
1586
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1587
if not os.path.isfile(bzr_path):
1588
# We are probably installed. Assume sys.argv is the right file
1589
bzr_path = sys.argv[0]
1592
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1593
universal_newlines=False, process_args=None):
1594
"""Finish the execution of process.
1596
:param process: the Popen object returned from start_bzr_subprocess.
1597
:param retcode: The status code that is expected. Defaults to 0. If
1598
None is supplied, the status code is not checked.
1599
:param send_signal: an optional signal to send to the process.
1600
:param universal_newlines: Convert CRLF => LF
1601
:returns: (stdout, stderr)
1603
if send_signal is not None:
1604
os.kill(process.pid, send_signal)
1605
out, err = process.communicate()
1607
if universal_newlines:
1608
out = out.replace('\r\n', '\n')
1609
err = err.replace('\r\n', '\n')
1611
if retcode is not None and retcode != process.returncode:
1612
if process_args is None:
1613
process_args = "(unknown args)"
1614
mutter('Output of bzr %s:\n%s', process_args, out)
1615
mutter('Error for bzr %s:\n%s', process_args, err)
1616
self.fail('Command bzr %s failed with retcode %s != %s'
1617
% (process_args, retcode, process.returncode))
1620
def check_inventory_shape(self, inv, shape):
1621
"""Compare an inventory to a list of expected names.
1623
Fail if they are not precisely equal.
1626
shape = list(shape) # copy
1627
for path, ie in inv.entries():
1628
name = path.replace('\\', '/')
1629
if ie.kind == 'directory':
1636
self.fail("expected paths not found in inventory: %r" % shape)
1638
self.fail("unexpected paths found in inventory: %r" % extras)
1640
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1641
a_callable=None, *args, **kwargs):
1642
"""Call callable with redirected std io pipes.
1644
Returns the return code."""
1645
if not callable(a_callable):
1646
raise ValueError("a_callable must be callable.")
1648
stdin = StringIO("")
1650
if getattr(self, "_log_file", None) is not None:
1651
stdout = self._log_file
1655
if getattr(self, "_log_file", None is not None):
1656
stderr = self._log_file
1659
real_stdin = sys.stdin
1660
real_stdout = sys.stdout
1661
real_stderr = sys.stderr
1666
return a_callable(*args, **kwargs)
1668
sys.stdout = real_stdout
1669
sys.stderr = real_stderr
1670
sys.stdin = real_stdin
1672
def reduceLockdirTimeout(self):
1673
"""Reduce the default lock timeout for the duration of the test, so that
1674
if LockContention occurs during a test, it does so quickly.
1676
Tests that expect to provoke LockContention errors should call this.
1678
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1680
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1681
self.addCleanup(resetTimeout)
1682
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1684
def make_utf8_encoded_stringio(self, encoding_type=None):
1685
"""Return a StringIOWrapper instance, that will encode Unicode
1688
if encoding_type is None:
1689
encoding_type = 'strict'
1691
output_encoding = 'utf-8'
1692
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1693
sio.encoding = output_encoding
1697
class TestCaseWithMemoryTransport(TestCase):
1698
"""Common test class for tests that do not need disk resources.
1700
Tests that need disk resources should derive from TestCaseWithTransport.
1702
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1704
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1705
a directory which does not exist. This serves to help ensure test isolation
1706
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1707
must exist. However, TestCaseWithMemoryTransport does not offer local
1708
file defaults for the transport in tests, nor does it obey the command line
1709
override, so tests that accidentally write to the common directory should
1712
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1713
a .bzr directory that stops us ascending higher into the filesystem.
1719
def __init__(self, methodName='runTest'):
1720
# allow test parameterisation after test construction and before test
1721
# execution. Variables that the parameteriser sets need to be
1722
# ones that are not set by setUp, or setUp will trash them.
1723
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1724
self.vfs_transport_factory = default_transport
1725
self.transport_server = None
1726
self.transport_readonly_server = None
1727
self.__vfs_server = None
1729
def get_transport(self, relpath=None):
1730
"""Return a writeable transport.
1732
This transport is for the test scratch space relative to
1735
:param relpath: a path relative to the base url.
1737
t = get_transport(self.get_url(relpath))
1738
self.assertFalse(t.is_readonly())
1741
def get_readonly_transport(self, relpath=None):
1742
"""Return a readonly transport for the test scratch space
1744
This can be used to test that operations which should only need
1745
readonly access in fact do not try to write.
1747
:param relpath: a path relative to the base url.
1749
t = get_transport(self.get_readonly_url(relpath))
1750
self.assertTrue(t.is_readonly())
1753
def create_transport_readonly_server(self):
1754
"""Create a transport server from class defined at init.
1756
This is mostly a hook for daughter classes.
1758
return self.transport_readonly_server()
1760
def get_readonly_server(self):
1761
"""Get the server instance for the readonly transport
1763
This is useful for some tests with specific servers to do diagnostics.
1765
if self.__readonly_server is None:
1766
if self.transport_readonly_server is None:
1767
# readonly decorator requested
1768
# bring up the server
1769
self.__readonly_server = ReadonlyServer()
1770
self.__readonly_server.setUp(self.get_vfs_only_server())
1772
self.__readonly_server = self.create_transport_readonly_server()
1773
self.__readonly_server.setUp(self.get_vfs_only_server())
1774
self.addCleanup(self.__readonly_server.tearDown)
1775
return self.__readonly_server
1777
def get_readonly_url(self, relpath=None):
1778
"""Get a URL for the readonly transport.
1780
This will either be backed by '.' or a decorator to the transport
1781
used by self.get_url()
1782
relpath provides for clients to get a path relative to the base url.
1783
These should only be downwards relative, not upwards.
1785
base = self.get_readonly_server().get_url()
1786
return self._adjust_url(base, relpath)
1788
def get_vfs_only_server(self):
1789
"""Get the vfs only read/write server instance.
1791
This is useful for some tests with specific servers that need
1794
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1795
is no means to override it.
1797
if self.__vfs_server is None:
1798
self.__vfs_server = MemoryServer()
1799
self.__vfs_server.setUp()
1800
self.addCleanup(self.__vfs_server.tearDown)
1801
return self.__vfs_server
1803
def get_server(self):
1804
"""Get the read/write server instance.
1806
This is useful for some tests with specific servers that need
1809
This is built from the self.transport_server factory. If that is None,
1810
then the self.get_vfs_server is returned.
1812
if self.__server is None:
1813
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1814
return self.get_vfs_only_server()
1816
# bring up a decorated means of access to the vfs only server.
1817
self.__server = self.transport_server()
1819
self.__server.setUp(self.get_vfs_only_server())
1820
except TypeError, e:
1821
# This should never happen; the try:Except here is to assist
1822
# developers having to update code rather than seeing an
1823
# uninformative TypeError.
1824
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1825
self.addCleanup(self.__server.tearDown)
1826
return self.__server
1828
def _adjust_url(self, base, relpath):
1829
"""Get a URL (or maybe a path) for the readwrite transport.
1831
This will either be backed by '.' or to an equivalent non-file based
1833
relpath provides for clients to get a path relative to the base url.
1834
These should only be downwards relative, not upwards.
1836
if relpath is not None and relpath != '.':
1837
if not base.endswith('/'):
1839
# XXX: Really base should be a url; we did after all call
1840
# get_url()! But sometimes it's just a path (from
1841
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1842
# to a non-escaped local path.
1843
if base.startswith('./') or base.startswith('/'):
1846
base += urlutils.escape(relpath)
1849
def get_url(self, relpath=None):
1850
"""Get a URL (or maybe a path) for the readwrite transport.
1852
This will either be backed by '.' or to an equivalent non-file based
1854
relpath provides for clients to get a path relative to the base url.
1855
These should only be downwards relative, not upwards.
1857
base = self.get_server().get_url()
1858
return self._adjust_url(base, relpath)
1860
def get_vfs_only_url(self, relpath=None):
1861
"""Get a URL (or maybe a path for the plain old vfs transport.
1863
This will never be a smart protocol. It always has all the
1864
capabilities of the local filesystem, but it might actually be a
1865
MemoryTransport or some other similar virtual filesystem.
1867
This is the backing transport (if any) of the server returned by
1868
get_url and get_readonly_url.
1870
:param relpath: provides for clients to get a path relative to the base
1871
url. These should only be downwards relative, not upwards.
1874
base = self.get_vfs_only_server().get_url()
1875
return self._adjust_url(base, relpath)
1877
def _create_safety_net(self):
1878
"""Make a fake bzr directory.
1880
This prevents any tests propagating up onto the TEST_ROOT directory's
1883
root = TestCaseWithMemoryTransport.TEST_ROOT
1884
bzrdir.BzrDir.create_standalone_workingtree(root)
1886
def _check_safety_net(self):
1887
"""Check that the safety .bzr directory have not been touched.
1889
_make_test_root have created a .bzr directory to prevent tests from
1890
propagating. This method ensures than a test did not leaked.
1892
root = TestCaseWithMemoryTransport.TEST_ROOT
1893
wt = workingtree.WorkingTree.open(root)
1894
last_rev = wt.last_revision()
1895
if last_rev != 'null:':
1896
# The current test have modified the /bzr directory, we need to
1897
# recreate a new one or all the followng tests will fail.
1898
# If you need to inspect its content uncomment the following line
1899
# import pdb; pdb.set_trace()
1900
_rmtree_temp_dir(root + '/.bzr')
1901
self._create_safety_net()
1902
raise AssertionError('%s/.bzr should not be modified' % root)
1904
def _make_test_root(self):
1905
if TestCaseWithMemoryTransport.TEST_ROOT is None:
1906
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1907
TestCaseWithMemoryTransport.TEST_ROOT = root
1909
self._create_safety_net()
1911
# The same directory is used by all tests, and we're not
1912
# specifically told when all tests are finished. This will do.
1913
atexit.register(_rmtree_temp_dir, root)
1915
self.addCleanup(self._check_safety_net)
1917
def makeAndChdirToTestDir(self):
1918
"""Create a temporary directories for this one test.
1920
This must set self.test_home_dir and self.test_dir and chdir to
1923
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1925
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1926
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1927
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1929
def make_branch(self, relpath, format=None):
1930
"""Create a branch on the transport at relpath."""
1931
repo = self.make_repository(relpath, format=format)
1932
return repo.bzrdir.create_branch()
1934
def make_bzrdir(self, relpath, format=None):
1936
# might be a relative or absolute path
1937
maybe_a_url = self.get_url(relpath)
1938
segments = maybe_a_url.rsplit('/', 1)
1939
t = get_transport(maybe_a_url)
1940
if len(segments) > 1 and segments[-1] not in ('', '.'):
1944
if isinstance(format, basestring):
1945
format = bzrdir.format_registry.make_bzrdir(format)
1946
return format.initialize_on_transport(t)
1947
except errors.UninitializableFormat:
1948
raise TestSkipped("Format %s is not initializable." % format)
1950
def make_repository(self, relpath, shared=False, format=None):
1951
"""Create a repository on our default transport at relpath.
1953
Note that relpath must be a relative path, not a full url.
1955
# FIXME: If you create a remoterepository this returns the underlying
1956
# real format, which is incorrect. Actually we should make sure that
1957
# RemoteBzrDir returns a RemoteRepository.
1958
# maybe mbp 20070410
1959
made_control = self.make_bzrdir(relpath, format=format)
1960
return made_control.create_repository(shared=shared)
1962
def make_branch_and_memory_tree(self, relpath, format=None):
1963
"""Create a branch on the default transport and a MemoryTree for it."""
1964
b = self.make_branch(relpath, format=format)
1965
return memorytree.MemoryTree.create_on_branch(b)
1967
def overrideEnvironmentForTesting(self):
1968
os.environ['HOME'] = self.test_home_dir
1969
os.environ['BZR_HOME'] = self.test_home_dir
1972
super(TestCaseWithMemoryTransport, self).setUp()
1973
self._make_test_root()
1974
_currentdir = os.getcwdu()
1975
def _leaveDirectory():
1976
os.chdir(_currentdir)
1977
self.addCleanup(_leaveDirectory)
1978
self.makeAndChdirToTestDir()
1979
self.overrideEnvironmentForTesting()
1980
self.__readonly_server = None
1981
self.__server = None
1982
self.reduceLockdirTimeout()
1985
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1986
"""Derived class that runs a test within a temporary directory.
1988
This is useful for tests that need to create a branch, etc.
1990
The directory is created in a slightly complex way: for each
1991
Python invocation, a new temporary top-level directory is created.
1992
All test cases create their own directory within that. If the
1993
tests complete successfully, the directory is removed.
1995
:ivar test_base_dir: The path of the top-level directory for this
1996
test, which contains a home directory and a work directory.
1998
:ivar test_home_dir: An initially empty directory under test_base_dir
1999
which is used as $HOME for this test.
2001
:ivar test_dir: A directory under test_base_dir used as the current
2002
directory when the test proper is run.
2005
OVERRIDE_PYTHON = 'python'
2007
def check_file_contents(self, filename, expect):
2008
self.log("check contents of file %s" % filename)
2009
contents = file(filename, 'r').read()
2010
if contents != expect:
2011
self.log("expected: %r" % expect)
2012
self.log("actually: %r" % contents)
2013
self.fail("contents of %s not as expected" % filename)
2015
def makeAndChdirToTestDir(self):
2016
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2018
For TestCaseInTempDir we create a temporary directory based on the test
2019
name and then create two subdirs - test and home under it.
2021
# create a directory within the top level test directory
2022
candidate_dir = osutils.mkdtemp(dir=self.TEST_ROOT)
2023
# now create test and home directories within this dir
2024
self.test_base_dir = candidate_dir
2025
self.test_home_dir = self.test_base_dir + '/home'
2026
os.mkdir(self.test_home_dir)
2027
self.test_dir = self.test_base_dir + '/work'
2028
os.mkdir(self.test_dir)
2029
os.chdir(self.test_dir)
2030
# put name of test inside
2031
f = file(self.test_base_dir + '/name', 'w')
2036
self.addCleanup(self.deleteTestDir)
2038
def deleteTestDir(self):
2039
os.chdir(self.TEST_ROOT)
2040
_rmtree_temp_dir(self.test_base_dir)
2042
def build_tree(self, shape, line_endings='binary', transport=None):
2043
"""Build a test tree according to a pattern.
2045
shape is a sequence of file specifications. If the final
2046
character is '/', a directory is created.
2048
This assumes that all the elements in the tree being built are new.
2050
This doesn't add anything to a branch.
2052
:type shape: list or tuple.
2053
:param line_endings: Either 'binary' or 'native'
2054
in binary mode, exact contents are written in native mode, the
2055
line endings match the default platform endings.
2056
:param transport: A transport to write to, for building trees on VFS's.
2057
If the transport is readonly or None, "." is opened automatically.
2060
if type(shape) not in (list, tuple):
2061
raise AssertionError("Parameter 'shape' should be "
2062
"a list or a tuple. Got %r instead" % (shape,))
2063
# It's OK to just create them using forward slashes on windows.
2064
if transport is None or transport.is_readonly():
2065
transport = get_transport(".")
2067
self.assert_(isinstance(name, basestring))
2069
transport.mkdir(urlutils.escape(name[:-1]))
2071
if line_endings == 'binary':
2073
elif line_endings == 'native':
2076
raise errors.BzrError(
2077
'Invalid line ending request %r' % line_endings)
2078
content = "contents of %s%s" % (name.encode('utf-8'), end)
2079
transport.put_bytes_non_atomic(urlutils.escape(name), content)
2081
def build_tree_contents(self, shape):
2082
build_tree_contents(shape)
2084
def assertFileEqual(self, content, path):
2085
"""Fail if path does not contain 'content'."""
2086
self.failUnlessExists(path)
2087
f = file(path, 'rb')
2092
self.assertEqualDiff(content, s)
2094
def failUnlessExists(self, path):
2095
"""Fail unless path or paths, which may be abs or relative, exist."""
2096
if not isinstance(path, basestring):
2098
self.failUnlessExists(p)
2100
self.failUnless(osutils.lexists(path),path+" does not exist")
2102
def failIfExists(self, path):
2103
"""Fail if path or paths, which may be abs or relative, exist."""
2104
if not isinstance(path, basestring):
2106
self.failIfExists(p)
2108
self.failIf(osutils.lexists(path),path+" exists")
2110
def assertInWorkingTree(self, path, root_path='.', tree=None):
2111
"""Assert whether path or paths are in the WorkingTree"""
2113
tree = workingtree.WorkingTree.open(root_path)
2114
if not isinstance(path, basestring):
2116
self.assertInWorkingTree(p,tree=tree)
2118
self.assertIsNot(tree.path2id(path), None,
2119
path+' not in working tree.')
2121
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2122
"""Assert whether path or paths are not in the WorkingTree"""
2124
tree = workingtree.WorkingTree.open(root_path)
2125
if not isinstance(path, basestring):
2127
self.assertNotInWorkingTree(p,tree=tree)
2129
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2132
class TestCaseWithTransport(TestCaseInTempDir):
2133
"""A test case that provides get_url and get_readonly_url facilities.
2135
These back onto two transport servers, one for readonly access and one for
2138
If no explicit class is provided for readonly access, a
2139
ReadonlyTransportDecorator is used instead which allows the use of non disk
2140
based read write transports.
2142
If an explicit class is provided for readonly access, that server and the
2143
readwrite one must both define get_url() as resolving to os.getcwd().
2146
def get_vfs_only_server(self):
2147
"""See TestCaseWithMemoryTransport.
2149
This is useful for some tests with specific servers that need
2152
if self.__vfs_server is None:
2153
self.__vfs_server = self.vfs_transport_factory()
2154
self.__vfs_server.setUp()
2155
self.addCleanup(self.__vfs_server.tearDown)
2156
return self.__vfs_server
2158
def make_branch_and_tree(self, relpath, format=None):
2159
"""Create a branch on the transport and a tree locally.
2161
If the transport is not a LocalTransport, the Tree can't be created on
2162
the transport. In that case if the vfs_transport_factory is
2163
LocalURLServer the working tree is created in the local
2164
directory backing the transport, and the returned tree's branch and
2165
repository will also be accessed locally. Otherwise a lightweight
2166
checkout is created and returned.
2168
:param format: The BzrDirFormat.
2169
:returns: the WorkingTree.
2171
# TODO: always use the local disk path for the working tree,
2172
# this obviously requires a format that supports branch references
2173
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2175
b = self.make_branch(relpath, format=format)
2177
return b.bzrdir.create_workingtree()
2178
except errors.NotLocalUrl:
2179
# We can only make working trees locally at the moment. If the
2180
# transport can't support them, then we keep the non-disk-backed
2181
# branch and create a local checkout.
2182
if self.vfs_transport_factory is LocalURLServer:
2183
# the branch is colocated on disk, we cannot create a checkout.
2184
# hopefully callers will expect this.
2185
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2186
return local_controldir.create_workingtree()
2188
return b.create_checkout(relpath, lightweight=True)
2190
def assertIsDirectory(self, relpath, transport):
2191
"""Assert that relpath within transport is a directory.
2193
This may not be possible on all transports; in that case it propagates
2194
a TransportNotPossible.
2197
mode = transport.stat(relpath).st_mode
2198
except errors.NoSuchFile:
2199
self.fail("path %s is not a directory; no such file"
2201
if not stat.S_ISDIR(mode):
2202
self.fail("path %s is not a directory; has mode %#o"
2205
def assertTreesEqual(self, left, right):
2206
"""Check that left and right have the same content and properties."""
2207
# we use a tree delta to check for equality of the content, and we
2208
# manually check for equality of other things such as the parents list.
2209
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2210
differences = left.changes_from(right)
2211
self.assertFalse(differences.has_changed(),
2212
"Trees %r and %r are different: %r" % (left, right, differences))
2215
super(TestCaseWithTransport, self).setUp()
2216
self.__vfs_server = None
2219
class ChrootedTestCase(TestCaseWithTransport):
2220
"""A support class that provides readonly urls outside the local namespace.
2222
This is done by checking if self.transport_server is a MemoryServer. if it
2223
is then we are chrooted already, if it is not then an HttpServer is used
2226
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2227
be used without needed to redo it when a different
2228
subclass is in use ?
2232
super(ChrootedTestCase, self).setUp()
2233
if not self.vfs_transport_factory == MemoryServer:
2234
self.transport_readonly_server = HttpServer
2237
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2238
random_order=False):
2239
"""Create a test suite by filtering another one.
2241
:param suite: the source suite
2242
:param pattern: pattern that names must match
2243
:param exclude_pattern: pattern that names must not match, if any
2244
:param random_order: if True, tests in the new suite will be put in
2246
:returns: the newly created suite
2248
return sort_suite_by_re(suite, pattern, exclude_pattern,
2249
random_order, False)
2252
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2253
random_order=False, append_rest=True):
2254
"""Create a test suite by sorting another one.
2256
:param suite: the source suite
2257
:param pattern: pattern that names must match in order to go
2258
first in the new suite
2259
:param exclude_pattern: pattern that names must not match, if any
2260
:param random_order: if True, tests in the new suite will be put in
2262
:param append_rest: if False, pattern is a strict filter and not
2263
just an ordering directive
2264
:returns: the newly created suite
2268
filter_re = re.compile(pattern)
2269
if exclude_pattern is not None:
2270
exclude_re = re.compile(exclude_pattern)
2271
for test in iter_suite_tests(suite):
2273
if exclude_pattern is None or not exclude_re.search(test_id):
2274
if filter_re.search(test_id):
2279
random.shuffle(first)
2280
random.shuffle(second)
2281
return TestUtil.TestSuite(first + second)
2284
def run_suite(suite, name='test', verbose=False, pattern=".*",
2285
stop_on_failure=False,
2286
transport=None, lsprof_timed=None, bench_history=None,
2287
matching_tests_first=None,
2290
exclude_pattern=None,
2294
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2299
runner = TextTestRunner(stream=sys.stdout,
2301
verbosity=verbosity,
2302
bench_history=bench_history,
2303
list_only=list_only,
2305
runner.stop_on_failure=stop_on_failure
2306
# Initialise the random number generator and display the seed used.
2307
# We convert the seed to a long to make it reuseable across invocations.
2308
random_order = False
2309
if random_seed is not None:
2311
if random_seed == "now":
2312
random_seed = long(time.time())
2314
# Convert the seed to a long if we can
2316
random_seed = long(random_seed)
2319
runner.stream.writeln("Randomizing test order using seed %s\n" %
2321
random.seed(random_seed)
2322
# Customise the list of tests if requested
2323
if pattern != '.*' or exclude_pattern is not None or random_order:
2324
if matching_tests_first:
2325
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2328
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
2331
if coverage_dir is not None:
2332
tracer = trace.Trace(count=1, trace=0)
2333
sys.settrace(tracer.globaltrace)
2335
result = runner.run(suite)
2337
if coverage_dir is not None:
2339
results = tracer.results()
2340
results.write_results(show_missing=1, summary=False,
2341
coverdir=coverage_dir)
2344
return result.wasStrictlySuccessful()
2346
return result.wasSuccessful()
2349
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2351
test_suite_factory=None,
2354
matching_tests_first=None,
2357
exclude_pattern=None,
2361
"""Run the whole test suite under the enhanced runner"""
2362
# XXX: Very ugly way to do this...
2363
# Disable warning about old formats because we don't want it to disturb
2364
# any blackbox tests.
2365
from bzrlib import repository
2366
repository._deprecation_warning_done = True
2368
global default_transport
2369
if transport is None:
2370
transport = default_transport
2371
old_transport = default_transport
2372
default_transport = transport
2374
if test_suite_factory is None:
2375
suite = test_suite()
2377
suite = test_suite_factory()
2378
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2379
stop_on_failure=stop_on_failure,
2380
transport=transport,
2381
lsprof_timed=lsprof_timed,
2382
bench_history=bench_history,
2383
matching_tests_first=matching_tests_first,
2384
list_only=list_only,
2385
random_seed=random_seed,
2386
exclude_pattern=exclude_pattern,
2388
coverage_dir=coverage_dir)
2390
default_transport = old_transport
2394
"""Build and return TestSuite for the whole of bzrlib.
2396
This function can be replaced if you need to change the default test
2397
suite on a global basis, but it is not encouraged.
2400
'bzrlib.util.tests.test_bencode',
2401
'bzrlib.tests.test__dirstate_helpers',
2402
'bzrlib.tests.test_ancestry',
2403
'bzrlib.tests.test_annotate',
2404
'bzrlib.tests.test_api',
2405
'bzrlib.tests.test_atomicfile',
2406
'bzrlib.tests.test_bad_files',
2407
'bzrlib.tests.test_bisect_multi',
2408
'bzrlib.tests.test_branch',
2409
'bzrlib.tests.test_branchbuilder',
2410
'bzrlib.tests.test_bugtracker',
2411
'bzrlib.tests.test_bundle',
2412
'bzrlib.tests.test_bzrdir',
2413
'bzrlib.tests.test_cache_utf8',
2414
'bzrlib.tests.test_commands',
2415
'bzrlib.tests.test_commit',
2416
'bzrlib.tests.test_commit_merge',
2417
'bzrlib.tests.test_config',
2418
'bzrlib.tests.test_conflicts',
2419
'bzrlib.tests.test_counted_lock',
2420
'bzrlib.tests.test_decorators',
2421
'bzrlib.tests.test_delta',
2422
'bzrlib.tests.test_deprecated_graph',
2423
'bzrlib.tests.test_diff',
2424
'bzrlib.tests.test_dirstate',
2425
'bzrlib.tests.test_email_message',
2426
'bzrlib.tests.test_errors',
2427
'bzrlib.tests.test_escaped_store',
2428
'bzrlib.tests.test_extract',
2429
'bzrlib.tests.test_fetch',
2430
'bzrlib.tests.test_ftp_transport',
2431
'bzrlib.tests.test_generate_docs',
2432
'bzrlib.tests.test_generate_ids',
2433
'bzrlib.tests.test_globbing',
2434
'bzrlib.tests.test_gpg',
2435
'bzrlib.tests.test_graph',
2436
'bzrlib.tests.test_hashcache',
2437
'bzrlib.tests.test_help',
2438
'bzrlib.tests.test_hooks',
2439
'bzrlib.tests.test_http',
2440
'bzrlib.tests.test_http_response',
2441
'bzrlib.tests.test_https_ca_bundle',
2442
'bzrlib.tests.test_identitymap',
2443
'bzrlib.tests.test_ignores',
2444
'bzrlib.tests.test_index',
2445
'bzrlib.tests.test_info',
2446
'bzrlib.tests.test_inv',
2447
'bzrlib.tests.test_knit',
2448
'bzrlib.tests.test_lazy_import',
2449
'bzrlib.tests.test_lazy_regex',
2450
'bzrlib.tests.test_lockdir',
2451
'bzrlib.tests.test_lockable_files',
2452
'bzrlib.tests.test_log',
2453
'bzrlib.tests.test_lsprof',
2454
'bzrlib.tests.test_lru_cache',
2455
'bzrlib.tests.test_mail_client',
2456
'bzrlib.tests.test_memorytree',
2457
'bzrlib.tests.test_merge',
2458
'bzrlib.tests.test_merge3',
2459
'bzrlib.tests.test_merge_core',
2460
'bzrlib.tests.test_merge_directive',
2461
'bzrlib.tests.test_missing',
2462
'bzrlib.tests.test_msgeditor',
2463
'bzrlib.tests.test_multiparent',
2464
'bzrlib.tests.test_nonascii',
2465
'bzrlib.tests.test_options',
2466
'bzrlib.tests.test_osutils',
2467
'bzrlib.tests.test_osutils_encodings',
2468
'bzrlib.tests.test_pack',
2469
'bzrlib.tests.test_patch',
2470
'bzrlib.tests.test_patches',
2471
'bzrlib.tests.test_permissions',
2472
'bzrlib.tests.test_plugins',
2473
'bzrlib.tests.test_progress',
2474
'bzrlib.tests.test_reconfigure',
2475
'bzrlib.tests.test_reconcile',
2476
'bzrlib.tests.test_registry',
2477
'bzrlib.tests.test_remote',
2478
'bzrlib.tests.test_repository',
2479
'bzrlib.tests.test_revert',
2480
'bzrlib.tests.test_revision',
2481
'bzrlib.tests.test_revisionnamespaces',
2482
'bzrlib.tests.test_revisiontree',
2483
'bzrlib.tests.test_rio',
2484
'bzrlib.tests.test_sampler',
2485
'bzrlib.tests.test_selftest',
2486
'bzrlib.tests.test_setup',
2487
'bzrlib.tests.test_sftp_transport',
2488
'bzrlib.tests.test_smart',
2489
'bzrlib.tests.test_smart_add',
2490
'bzrlib.tests.test_smart_transport',
2491
'bzrlib.tests.test_smtp_connection',
2492
'bzrlib.tests.test_source',
2493
'bzrlib.tests.test_ssh_transport',
2494
'bzrlib.tests.test_status',
2495
'bzrlib.tests.test_store',
2496
'bzrlib.tests.test_strace',
2497
'bzrlib.tests.test_subsume',
2498
'bzrlib.tests.test_switch',
2499
'bzrlib.tests.test_symbol_versioning',
2500
'bzrlib.tests.test_tag',
2501
'bzrlib.tests.test_testament',
2502
'bzrlib.tests.test_textfile',
2503
'bzrlib.tests.test_textmerge',
2504
'bzrlib.tests.test_timestamp',
2505
'bzrlib.tests.test_trace',
2506
'bzrlib.tests.test_transactions',
2507
'bzrlib.tests.test_transform',
2508
'bzrlib.tests.test_transport',
2509
'bzrlib.tests.test_tree',
2510
'bzrlib.tests.test_treebuilder',
2511
'bzrlib.tests.test_tsort',
2512
'bzrlib.tests.test_tuned_gzip',
2513
'bzrlib.tests.test_ui',
2514
'bzrlib.tests.test_upgrade',
2515
'bzrlib.tests.test_urlutils',
2516
'bzrlib.tests.test_versionedfile',
2517
'bzrlib.tests.test_version',
2518
'bzrlib.tests.test_version_info',
2519
'bzrlib.tests.test_weave',
2520
'bzrlib.tests.test_whitebox',
2521
'bzrlib.tests.test_win32utils',
2522
'bzrlib.tests.test_workingtree',
2523
'bzrlib.tests.test_workingtree_4',
2524
'bzrlib.tests.test_wsgi',
2525
'bzrlib.tests.test_xml',
2527
test_transport_implementations = [
2528
'bzrlib.tests.test_transport_implementations',
2529
'bzrlib.tests.test_read_bundle',
2531
suite = TestUtil.TestSuite()
2532
loader = TestUtil.TestLoader()
2533
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2534
from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2535
adapter = TransportTestProviderAdapter()
2536
adapt_modules(test_transport_implementations, adapter, loader, suite)
2538
["bzrlib.tests.test_msgeditor.MsgEditorTest."
2539
"test__create_temp_file_with_commit_template_in_unicode_dir"],
2540
EncodingTestAdapter(), loader, suite)
2541
for package in packages_to_test():
2542
suite.addTest(package.test_suite())
2543
for m in MODULES_TO_TEST:
2544
suite.addTest(loader.loadTestsFromModule(m))
2545
for m in MODULES_TO_DOCTEST:
2547
suite.addTest(doctest.DocTestSuite(m))
2548
except ValueError, e:
2549
print '**failed to get doctest for: %s\n%s' %(m,e)
2551
default_encoding = sys.getdefaultencoding()
2552
for name, plugin in bzrlib.plugin.plugins().items():
2554
plugin_suite = plugin.test_suite()
2555
except ImportError, e:
2556
bzrlib.trace.warning(
2557
'Unable to test plugin "%s": %s', name, e)
2559
if plugin_suite is not None:
2560
suite.addTest(plugin_suite)
2561
if default_encoding != sys.getdefaultencoding():
2562
bzrlib.trace.warning(
2563
'Plugin "%s" tried to reset default encoding to: %s', name,
2564
sys.getdefaultencoding())
2566
sys.setdefaultencoding(default_encoding)
2570
def multiply_tests_from_modules(module_name_list, scenario_iter):
2571
"""Adapt all tests in some given modules to given scenarios.
2573
This is the recommended public interface for test parameterization.
2574
Typically the test_suite() method for a per-implementation test
2575
suite will call multiply_tests_from_modules and return the
2578
:param module_name_list: List of fully-qualified names of test
2580
:param scenario_iter: Iterable of pairs of (scenario_name,
2581
scenario_param_dict).
2583
This returns a new TestSuite containing the cross product of
2584
all the tests in all the modules, each repeated for each scenario.
2585
Each test is adapted by adding the scenario name at the end
2586
of its name, and updating the test object's __dict__ with the
2587
scenario_param_dict.
2589
>>> r = multiply_tests_from_modules(
2590
... ['bzrlib.tests.test_sampler'],
2591
... [('one', dict(param=1)),
2592
... ('two', dict(param=2))])
2593
>>> tests = list(iter_suite_tests(r))
2597
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
2603
loader = TestLoader()
2605
adapter = TestScenarioApplier()
2606
adapter.scenarios = list(scenario_iter)
2607
adapt_modules(module_name_list, adapter, loader, suite)
2611
def multiply_scenarios(scenarios_left, scenarios_right):
2612
"""Multiply two sets of scenarios.
2614
:returns: the cartesian product of the two sets of scenarios, that is
2615
a scenario for every possible combination of a left scenario and a
2619
('%s,%s' % (left_name, right_name),
2620
dict(left_dict.items() + right_dict.items()))
2621
for left_name, left_dict in scenarios_left
2622
for right_name, right_dict in scenarios_right]
2626
def adapt_modules(mods_list, adapter, loader, suite):
2627
"""Adapt the modules in mods_list using adapter and add to suite."""
2628
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2629
suite.addTests(adapter.adapt(test))
2632
def adapt_tests(tests_list, adapter, loader, suite):
2633
"""Adapt the tests in tests_list using adapter and add to suite."""
2634
for test in tests_list:
2635
suite.addTests(adapter.adapt(loader.loadTestsFromName(test)))
2638
def _rmtree_temp_dir(dirname):
2639
# If LANG=C we probably have created some bogus paths
2640
# which rmtree(unicode) will fail to delete
2641
# so make sure we are using rmtree(str) to delete everything
2642
# except on win32, where rmtree(str) will fail
2643
# since it doesn't have the property of byte-stream paths
2644
# (they are either ascii or mbcs)
2645
if sys.platform == 'win32':
2646
# make sure we are using the unicode win32 api
2647
dirname = unicode(dirname)
2649
dirname = dirname.encode(sys.getfilesystemencoding())
2651
osutils.rmtree(dirname)
2653
if sys.platform == 'win32' and e.errno == errno.EACCES:
2654
sys.stderr.write(('Permission denied: '
2655
'unable to remove testing dir '
2656
'%s\n' % os.path.basename(dirname)))
2661
class Feature(object):
2662
"""An operating system Feature."""
2665
self._available = None
2667
def available(self):
2668
"""Is the feature available?
2670
:return: True if the feature is available.
2672
if self._available is None:
2673
self._available = self._probe()
2674
return self._available
2677
"""Implement this method in concrete features.
2679
:return: True if the feature is available.
2681
raise NotImplementedError
2684
if getattr(self, 'feature_name', None):
2685
return self.feature_name()
2686
return self.__class__.__name__
2689
class _SymlinkFeature(Feature):
2692
return osutils.has_symlinks()
2694
def feature_name(self):
2697
SymlinkFeature = _SymlinkFeature()
2700
class _OsFifoFeature(Feature):
2703
return getattr(os, 'mkfifo', None)
2705
def feature_name(self):
2706
return 'filesystem fifos'
2708
OsFifoFeature = _OsFifoFeature()
2711
class TestScenarioApplier(object):
2712
"""A tool to apply scenarios to tests."""
2714
def adapt(self, test):
2715
"""Return a TestSuite containing a copy of test for each scenario."""
2716
result = unittest.TestSuite()
2717
for scenario in self.scenarios:
2718
result.addTest(self.adapt_test_to_scenario(test, scenario))
2721
def adapt_test_to_scenario(self, test, scenario):
2722
"""Copy test and apply scenario to it.
2724
:param test: A test to adapt.
2725
:param scenario: A tuple describing the scenarion.
2726
The first element of the tuple is the new test id.
2727
The second element is a dict containing attributes to set on the
2729
:return: The adapted test.
2731
from copy import deepcopy
2732
new_test = deepcopy(test)
2733
for name, value in scenario[1].items():
2734
setattr(new_test, name, value)
2735
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2736
new_test.id = lambda: new_id
2740
def probe_unicode_in_user_encoding():
2741
"""Try to encode several unicode strings to use in unicode-aware tests.
2742
Return first successfull match.
2744
:return: (unicode value, encoded plain string value) or (None, None)
2746
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
2747
for uni_val in possible_vals:
2749
str_val = uni_val.encode(bzrlib.user_encoding)
2750
except UnicodeEncodeError:
2751
# Try a different character
2754
return uni_val, str_val
2758
def probe_bad_non_ascii(encoding):
2759
"""Try to find [bad] character with code [128..255]
2760
that cannot be decoded to unicode in some encoding.
2761
Return None if all non-ascii characters is valid
2764
for i in xrange(128, 256):
2767
char.decode(encoding)
2768
except UnicodeDecodeError:
2773
class _FTPServerFeature(Feature):
2774
"""Some tests want an FTP Server, check if one is available.
2776
Right now, the only way this is available is if 'medusa' is installed.
2777
http://www.amk.ca/python/code/medusa.html
2782
import bzrlib.tests.ftp_server
2787
def feature_name(self):
2790
FTPServerFeature = _FTPServerFeature()
2793
class _CaseInsensitiveFilesystemFeature(Feature):
2794
"""Check if underlined filesystem is case-insensitive
2795
(e.g. on Windows, Cygwin, MacOS)
2799
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2800
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
2801
TestCaseWithMemoryTransport.TEST_ROOT = root
2803
root = TestCaseWithMemoryTransport.TEST_ROOT
2804
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
2806
name_a = osutils.pathjoin(tdir, 'a')
2807
name_A = osutils.pathjoin(tdir, 'A')
2809
result = osutils.isdir(name_A)
2810
_rmtree_temp_dir(tdir)
2813
def feature_name(self):
2814
return 'case-insensitive filesystem'
2816
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()