55
50
# nb: check this before importing anything else from within it
56
51
_testtools_version = getattr(testtools, '__version__', ())
57
if _testtools_version < (0, 9, 2):
58
raise ImportError("need at least testtools 0.9.2: %s is %r"
52
if _testtools_version < (0, 9, 5):
53
raise ImportError("need at least testtools 0.9.5: %s is %r"
59
54
% (testtools.__file__, _testtools_version))
60
55
from testtools import content
62
58
from bzrlib import (
62
commands as _mod_commands,
72
plugin as _mod_plugin,
79
transport as _mod_transport,
80
import bzrlib.commands
81
import bzrlib.timestamp
83
import bzrlib.inventory
84
import bzrlib.iterablefile
87
83
import bzrlib.lsprof
88
84
except ImportError:
89
85
# lsprof not available
91
from bzrlib.merge import merge_inner
94
from bzrlib.smart import client, request, server
96
from bzrlib import symbol_versioning
87
from bzrlib.smart import client, request
88
from bzrlib.transport import (
97
92
from bzrlib.symbol_versioning import (
99
93
deprecated_function,
105
from bzrlib.transport import (
110
import bzrlib.transport
111
from bzrlib.trace import mutter, note
112
96
from bzrlib.tests import (
116
from bzrlib.tests.http_server import HttpServer
117
from bzrlib.tests.TestUtil import (
121
from bzrlib.tests.treeshape import build_tree_contents
122
101
from bzrlib.ui import NullProgressView
123
102
from bzrlib.ui.text import TextUIFactory
124
import bzrlib.version_info_formats.format_custom
125
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
127
104
# Mark this python module as being part of the implementation
128
105
# of unittest: this gives us better tracebacks where the last
140
117
SUBUNIT_SEEK_SET = 0
141
118
SUBUNIT_SEEK_CUR = 1
144
class ExtendedTestResult(unittest._TextTestResult):
120
# These are intentionally brought into this namespace. That way plugins, etc
121
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
122
TestSuite = TestUtil.TestSuite
123
TestLoader = TestUtil.TestLoader
125
# Tests should run in a clean and clearly defined environment. The goal is to
126
# keep them isolated from the running environment as mush as possible. The test
127
# framework ensures the variables defined below are set (or deleted if the
128
# value is None) before a test is run and reset to their original value after
129
# the test is run. Generally if some code depends on an environment variable,
130
# the tests should start without this variable in the environment. There are a
131
# few exceptions but you shouldn't violate this rule lightly.
135
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
136
# tests do check our impls match APPDATA
137
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
141
'BZREMAIL': None, # may still be present in the environment
142
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
143
'BZR_PROGRESS_BAR': None,
144
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
145
# as a base class instead of TestCaseInTempDir. Tests inheriting from
146
# TestCase should not use disk resources, BZR_LOG is one.
147
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
148
'BZR_PLUGIN_PATH': None,
149
'BZR_DISABLE_PLUGINS': None,
150
'BZR_PLUGINS_AT': None,
151
'BZR_CONCURRENCY': None,
152
# Make sure that any text ui tests are consistent regardless of
153
# the environment the test case is run in; you may want tests that
154
# test other combinations. 'dumb' is a reasonable guess for tests
155
# going to a pipe or a StringIO.
161
'SSH_AUTH_SOCK': None,
171
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
172
# least. If you do (care), please update this comment
176
'BZR_REMOTE_PATH': None,
177
# Generally speaking, we don't want apport reporting on crashes in
178
# the test envirnoment unless we're specifically testing apport,
179
# so that it doesn't leak into the real system environment. We
180
# use an env var so it propagates to subprocesses.
181
'APPORT_DISABLE': '1',
185
def override_os_environ(test, env=None):
186
"""Modify os.environ keeping a copy.
188
:param test: A test instance
190
:param env: A dict containing variable definitions to be installed
193
env = isolated_environ
194
test._original_os_environ = dict([(var, value)
195
for var, value in os.environ.iteritems()])
196
for var, value in env.iteritems():
197
osutils.set_or_unset_env(var, value)
198
if var not in test._original_os_environ:
199
# The var is new, add it with a value of None, so
200
# restore_os_environ will delete it
201
test._original_os_environ[var] = None
204
def restore_os_environ(test):
205
"""Restore os.environ to its original state.
207
:param test: A test instance previously passed to override_os_environ.
209
for var, value in test._original_os_environ.iteritems():
210
# Restore the original value (or delete it if the value has been set to
211
# None in override_os_environ).
212
osutils.set_or_unset_env(var, value)
215
def _clear__type_equality_funcs(test):
216
"""Cleanup bound methods stored on TestCase instances
218
Clear the dict breaking a few (mostly) harmless cycles in the affected
219
unittests released with Python 2.6 and initial Python 2.7 versions.
221
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
shipped in Oneiric, an object with no clear method was used, hence the
223
extra complications, see bug 809048 for details.
225
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
if type_equality_funcs is not None:
227
tef_clear = getattr(type_equality_funcs, "clear", None)
228
if tef_clear is None:
229
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
if tef_instance_dict is not None:
231
tef_clear = tef_instance_dict.clear
232
if tef_clear is not None:
236
class ExtendedTestResult(testtools.TextTestResult):
145
237
"""Accepts, reports and accumulates the results of running tests.
147
239
Compared to the unittest version this class adds support for
196
288
self._overall_start_time = time.time()
197
289
self._strict = strict
290
self._first_thread_leaker_id = None
291
self._tests_leaking_threads_count = 0
292
self._traceback_from_test = None
199
294
def stopTestRun(self):
200
295
run = self.testsRun
201
296
actionTaken = "Ran"
202
297
stopTime = time.time()
203
298
timeTaken = stopTime - self.startTime
205
self.stream.writeln(self.separator2)
206
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
299
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
300
# the parent class method is similar have to duplicate
301
self._show_list('ERROR', self.errors)
302
self._show_list('FAIL', self.failures)
303
self.stream.write(self.sep2)
304
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
207
305
run, run != 1 and "s" or "", timeTaken))
208
self.stream.writeln()
209
306
if not self.wasSuccessful():
210
307
self.stream.write("FAILED (")
211
308
failed, errored = map(len, (self.failures, self.errors))
218
315
if failed or errored: self.stream.write(", ")
219
316
self.stream.write("known_failure_count=%d" %
220
317
self.known_failure_count)
221
self.stream.writeln(")")
318
self.stream.write(")\n")
223
320
if self.known_failure_count:
224
self.stream.writeln("OK (known_failures=%d)" %
321
self.stream.write("OK (known_failures=%d)\n" %
225
322
self.known_failure_count)
227
self.stream.writeln("OK")
324
self.stream.write("OK\n")
228
325
if self.skip_count > 0:
229
326
skipped = self.skip_count
230
self.stream.writeln('%d test%s skipped' %
327
self.stream.write('%d test%s skipped\n' %
231
328
(skipped, skipped != 1 and "s" or ""))
232
329
if self.unsupported:
233
330
for feature, count in sorted(self.unsupported.items()):
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
331
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
235
332
(feature, count))
237
334
ok = self.wasStrictlySuccessful()
239
336
ok = self.wasSuccessful()
240
if TestCase._first_thread_leaker_id:
337
if self._first_thread_leaker_id:
241
338
self.stream.write(
242
339
'%s is leaking threads among %d leaking tests.\n' % (
243
TestCase._first_thread_leaker_id,
244
TestCase._leaking_threads_tests))
340
self._first_thread_leaker_id,
341
self._tests_leaking_threads_count))
245
342
# We don't report the main thread as an active one.
246
343
self.stream.write(
247
344
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
345
% (len(self._active_threads) - 1))
250
347
def getDescription(self, test):
276
374
def _shortened_test_description(self, test):
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
376
what = re.sub(r'^bzrlib\.tests\.', '', what)
379
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
380
# multiple times in a row, because the handler is added for
381
# each test but the container list is shared between cases.
382
# See lp:498869 lp:625574 and lp:637725 for background.
383
def _record_traceback_from_test(self, exc_info):
384
"""Store the traceback from passed exc_info tuple till"""
385
self._traceback_from_test = exc_info[2]
281
387
def startTest(self, test):
282
unittest.TestResult.startTest(self, test)
388
super(ExtendedTestResult, self).startTest(test)
283
389
if self.count == 0:
284
390
self.startTests()
285
392
self.report_test_start(test)
286
393
test.number = self.count
287
394
self._recordTestStartTime()
395
# Make testtools cases give us the real traceback on failure
396
addOnException = getattr(test, "addOnException", None)
397
if addOnException is not None:
398
addOnException(self._record_traceback_from_test)
399
# Only check for thread leaks on bzrlib derived test cases
400
if isinstance(test, TestCase):
401
test.addCleanup(self._check_leaked_threads, test)
403
def stopTest(self, test):
404
super(ExtendedTestResult, self).stopTest(test)
405
# Manually break cycles, means touching various private things but hey
406
getDetails = getattr(test, "getDetails", None)
407
if getDetails is not None:
409
_clear__type_equality_funcs(test)
410
self._traceback_from_test = None
289
412
def startTests(self):
291
if getattr(sys, 'frozen', None) is None:
292
bzr_path = osutils.realpath(sys.argv[0])
294
bzr_path = sys.executable
296
'bzr selftest: %s\n' % (bzr_path,))
299
bzrlib.__path__[0],))
301
' bzr-%s python-%s %s\n' % (
302
bzrlib.version_string,
303
bzrlib._format_version_tuple(sys.version_info),
304
platform.platform(aliased=1),
306
self.stream.write('\n')
413
self.report_tests_starting()
414
self._active_threads = threading.enumerate()
416
def _check_leaked_threads(self, test):
417
"""See if any threads have leaked since last call
419
A sample of live threads is stored in the _active_threads attribute,
420
when this method runs it compares the current live threads and any not
421
in the previous sample are treated as having leaked.
423
now_active_threads = set(threading.enumerate())
424
threads_leaked = now_active_threads.difference(self._active_threads)
426
self._report_thread_leak(test, threads_leaked, now_active_threads)
427
self._tests_leaking_threads_count += 1
428
if self._first_thread_leaker_id is None:
429
self._first_thread_leaker_id = test.id()
430
self._active_threads = now_active_threads
308
432
def _recordTestStartTime(self):
309
433
"""Record that a test has started."""
310
self._start_time = time.time()
312
def _cleanupLogFile(self, test):
313
# We can only do this if we have one of our TestCases, not if
315
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
316
if setKeepLogfile is not None:
434
self._start_datetime = self._now()
319
436
def addError(self, test, err):
320
437
"""Tell result that test finished with an error.
356
471
self._formatTime(benchmark_time),
358
473
self.report_success(test)
359
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
474
super(ExtendedTestResult, self).addSuccess(test)
361
475
test._log_contents = ''
363
477
def addExpectedFailure(self, test, err):
364
478
self.known_failure_count += 1
365
479
self.report_known_failure(test, err)
481
def addUnexpectedSuccess(self, test, details=None):
482
"""Tell result the test unexpectedly passed, counting as a failure
484
When the minimum version of testtools required becomes 0.9.8 this
485
can be updated to use the new handling there.
487
super(ExtendedTestResult, self).addFailure(test, details=details)
488
self.failure_count += 1
489
self.report_unexpected_success(test,
490
"".join(details["reason"].iter_text()))
367
494
def addNotSupported(self, test, feature):
368
495
"""The test will not be run because of a missing feature.
401
533
raise errors.BzrError("Unknown whence %r" % whence)
403
def report_cleaning_up(self):
535
def report_tests_starting(self):
536
"""Display information before the test run begins"""
537
if getattr(sys, 'frozen', None) is None:
538
bzr_path = osutils.realpath(sys.argv[0])
540
bzr_path = sys.executable
542
'bzr selftest: %s\n' % (bzr_path,))
545
bzrlib.__path__[0],))
547
' bzr-%s python-%s %s\n' % (
548
bzrlib.version_string,
549
bzrlib._format_version_tuple(sys.version_info),
550
platform.platform(aliased=1),
552
self.stream.write('\n')
554
def report_test_start(self, test):
555
"""Display information on the test just about to be run"""
557
def _report_thread_leak(self, test, leaked_threads, active_threads):
558
"""Display information on a test that leaked one or more threads"""
559
# GZ 2010-09-09: A leak summary reported separately from the general
560
# thread debugging would be nice. Tests under subunit
561
# need something not using stream, perhaps adding a
562
# testtools details object would be fitting.
563
if 'threads' in selftest_debug_flags:
564
self.stream.write('%s is leaking, active is now %d\n' %
565
(test.id(), len(active_threads)))
406
567
def startTestRun(self):
407
568
self.startTime = time.time()
551
709
return '%s%s' % (indent, err[1])
553
711
def report_error(self, test, err):
554
self.stream.writeln('ERROR %s\n%s'
712
self.stream.write('ERROR %s\n%s\n'
555
713
% (self._testTimeString(test),
556
714
self._error_summary(err)))
558
716
def report_failure(self, test, err):
559
self.stream.writeln(' FAIL %s\n%s'
717
self.stream.write(' FAIL %s\n%s\n'
560
718
% (self._testTimeString(test),
561
719
self._error_summary(err)))
563
721
def report_known_failure(self, test, err):
564
self.stream.writeln('XFAIL %s\n%s'
722
self.stream.write('XFAIL %s\n%s\n'
565
723
% (self._testTimeString(test),
566
724
self._error_summary(err)))
726
def report_unexpected_success(self, test, reason):
727
self.stream.write(' FAIL %s\n%s: %s\n'
728
% (self._testTimeString(test),
729
"Unexpected success. Should have failed",
568
732
def report_success(self, test):
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
733
self.stream.write(' OK %s\n' % self._testTimeString(test))
570
734
for bench_called, stats in getattr(test, '_benchcalls', []):
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
735
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
572
736
stats.pprint(file=self.stream)
573
737
# flush the stream so that we get smooth output. This verbose mode is
574
738
# used to show the output in PQM.
575
739
self.stream.flush()
577
741
def report_skip(self, test, reason):
578
self.stream.writeln(' SKIP %s\n%s'
742
self.stream.write(' SKIP %s\n%s\n'
579
743
% (self._testTimeString(test), reason))
581
745
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
746
self.stream.write(' N/A %s\n %s\n'
583
747
% (self._testTimeString(test), reason))
585
749
def report_unsupported(self, test, feature):
586
750
"""test cannot be run because feature is missing."""
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
751
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
588
752
%(self._testTimeString(test), feature))
789
975
routine, and to build and check bzr trees.
791
977
In addition to the usual method of overriding tearDown(), this class also
792
allows subclasses to register functions into the _cleanups list, which is
978
allows subclasses to register cleanup functions via addCleanup, which are
793
979
run in order as the object is torn down. It's less likely this will be
794
980
accidentally overlooked.
797
_active_threads = None
798
_leaking_threads_tests = 0
799
_first_thread_leaker_id = None
800
_log_file_name = None
801
984
# record lsprof data when performing benchmark calls.
802
985
_gather_lsprof_in_benchmarks = False
804
987
def __init__(self, methodName='testMethod'):
805
988
super(TestCase, self).__init__(methodName)
807
989
self._directory_isolation = True
808
990
self.exception_handlers.insert(0,
809
991
(UnavailableFeature, self._do_unsupported_or_skip))
827
1007
self._track_transports()
828
1008
self._track_locks()
829
1009
self._clear_debug_flags()
830
TestCase._active_threads = threading.activeCount()
831
self.addCleanup(self._check_leaked_threads)
1010
# Isolate global verbosity level, to make sure it's reproducible
1011
# between tests. We should get rid of this altogether: bug 656694. --
1013
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1014
# Isolate config option expansion until its default value for bzrlib is
1015
# settled on or a the FIXME associated with _get_expand_default_value
1016
# is addressed -- vila 20110219
1017
self.overrideAttr(config, '_expand_default_value', None)
1018
self._log_files = set()
1019
# Each key in the ``_counters`` dict holds a value for a different
1020
# counter. When the test ends, addDetail() should be used to output the
1021
# counter values. This happens in install_counter_hook().
1023
if 'config_stats' in selftest_debug_flags:
1024
self._install_config_stats_hooks()
1025
# Do not use i18n for tests (unless the test reverses this)
833
1028
def debug(self):
834
1029
# debug a frame up.
836
pdb.Pdb().set_trace(sys._getframe().f_back)
838
def _check_leaked_threads(self):
839
active = threading.activeCount()
840
leaked_threads = active - TestCase._active_threads
841
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
TestCase._leaking_threads_tests += 1
850
if TestCase._first_thread_leaker_id is None:
851
TestCase._first_thread_leaker_id = self.id()
1031
# The sys preserved stdin/stdout should allow blackbox tests debugging
1032
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1033
).set_trace(sys._getframe().f_back)
1035
def discardDetail(self, name):
1036
"""Extend the addDetail, getDetails api so we can remove a detail.
1038
eg. bzr always adds the 'log' detail at startup, but we don't want to
1039
include it for skipped, xfail, etc tests.
1041
It is safe to call this for a detail that doesn't exist, in case this
1042
gets called multiple times.
1044
# We cheat. details is stored in __details which means we shouldn't
1045
# touch it. but getDetails() returns the dict directly, so we can
1047
details = self.getDetails()
1051
def install_counter_hook(self, hooks, name, counter_name=None):
1052
"""Install a counting hook.
1054
Any hook can be counted as long as it doesn't need to return a value.
1056
:param hooks: Where the hook should be installed.
1058
:param name: The hook name that will be counted.
1060
:param counter_name: The counter identifier in ``_counters``, defaults
1063
_counters = self._counters # Avoid closing over self
1064
if counter_name is None:
1066
if _counters.has_key(counter_name):
1067
raise AssertionError('%s is already used as a counter name'
1069
_counters[counter_name] = 0
1070
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1071
lambda: ['%d' % (_counters[counter_name],)]))
1072
def increment_counter(*args, **kwargs):
1073
_counters[counter_name] += 1
1074
label = 'count %s calls' % (counter_name,)
1075
hooks.install_named_hook(name, increment_counter, label)
1076
self.addCleanup(hooks.uninstall_named_hook, name, label)
1078
def _install_config_stats_hooks(self):
1079
"""Install config hooks to count hook calls.
1082
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1083
self.install_counter_hook(config.ConfigHooks, hook_name,
1084
'config.%s' % (hook_name,))
1086
# The OldConfigHooks are private and need special handling to protect
1087
# against recursive tests (tests that run other tests), so we just do
1088
# manually what registering them into _builtin_known_hooks will provide
1090
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1091
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1092
self.install_counter_hook(config.OldConfigHooks, hook_name,
1093
'old_config.%s' % (hook_name,))
853
1095
def _clear_debug_flags(self):
854
1096
"""Prevent externally set debug flags affecting tests.
866
1108
def _clear_hooks(self):
867
1109
# prevent hooks affecting tests
1110
known_hooks = hooks.known_hooks
868
1111
self._preserved_hooks = {}
869
for key, factory in hooks.known_hooks.items():
870
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
871
current_hooks = hooks.known_hooks_key_to_object(key)
1112
for key, (parent, name) in known_hooks.iter_parent_objects():
1113
current_hooks = getattr(parent, name)
872
1114
self._preserved_hooks[parent] = (name, current_hooks)
1115
self._preserved_lazy_hooks = hooks._lazy_hooks
1116
hooks._lazy_hooks = {}
873
1117
self.addCleanup(self._restoreHooks)
874
for key, factory in hooks.known_hooks.items():
875
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1118
for key, (parent, name) in known_hooks.iter_parent_objects():
1119
factory = known_hooks.get(key)
876
1120
setattr(parent, name, factory())
877
1121
# this hook should always be installed
878
1122
request._install_hook()
1135
1383
'st_mtime did not match')
1136
1384
self.assertEqual(expected.st_ctime, actual.st_ctime,
1137
1385
'st_ctime did not match')
1138
if sys.platform != 'win32':
1386
if sys.platform == 'win32':
1139
1387
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1140
1388
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1389
# odd. We just force it to always be 0 to avoid any problems.
1390
self.assertEqual(0, expected.st_dev)
1391
self.assertEqual(0, actual.st_dev)
1392
self.assertEqual(0, expected.st_ino)
1393
self.assertEqual(0, actual.st_ino)
1143
1395
self.assertEqual(expected.st_dev, actual.st_dev,
1144
1396
'st_dev did not match')
1145
1397
self.assertEqual(expected.st_ino, actual.st_ino,
1322
1577
self.assertEqual(expected_docstring, obj.__doc__)
1579
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1324
1580
def failUnlessExists(self, path):
1581
return self.assertPathExists(path)
1583
def assertPathExists(self, path):
1325
1584
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1585
if not isinstance(path, basestring):
1328
self.failUnlessExists(p)
1587
self.assertPathExists(p)
1330
self.failUnless(osutils.lexists(path),path+" does not exist")
1589
self.assertTrue(osutils.lexists(path),
1590
path + " does not exist")
1592
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1332
1593
def failIfExists(self, path):
1594
return self.assertPathDoesNotExist(path)
1596
def assertPathDoesNotExist(self, path):
1333
1597
"""Fail if path or paths, which may be abs or relative, exist."""
1334
1598
if not isinstance(path, basestring):
1336
self.failIfExists(p)
1600
self.assertPathDoesNotExist(p)
1338
self.failIf(osutils.lexists(path),path+" exists")
1602
self.assertFalse(osutils.lexists(path),
1340
1605
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1341
1606
"""A helper for callDeprecated and applyDeprecated.
1456
1722
The file is removed as the test is torn down.
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1459
self._log_file = os.fdopen(fileno, 'w+')
1460
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1461
self._log_file_name = name
1724
pseudo_log_file = StringIO()
1725
def _get_log_contents_for_weird_testtools_api():
1726
return [pseudo_log_file.getvalue().decode(
1727
"utf-8", "replace").encode("utf-8")]
1728
self.addDetail("log", content.Content(content.ContentType("text",
1729
"plain", {"charset": "utf8"}),
1730
_get_log_contents_for_weird_testtools_api))
1731
self._log_file = pseudo_log_file
1732
self._log_memento = trace.push_log_file(self._log_file)
1462
1733
self.addCleanup(self._finishLogFile)
1464
1735
def _finishLogFile(self):
1465
1736
"""Finished with the log file.
1467
Close the file and delete it, unless setKeepLogfile was called.
1738
Close the file and delete it.
1469
if bzrlib.trace._trace_file:
1740
if trace._trace_file:
1470
1741
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1472
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1742
trace._trace_file.flush()
1743
trace.pop_log_file(self._log_memento)
1476
1745
def thisFailsStrictLockCheck(self):
1477
1746
"""It is known that this test would fail with -Dstrict_locks.
1513
1777
setattr(obj, attr_name, new)
1780
def overrideEnv(self, name, new):
1781
"""Set an environment variable, and reset it after the test.
1783
:param name: The environment variable name.
1785
:param new: The value to set the variable to. If None, the
1786
variable is deleted from the environment.
1788
:returns: The actual variable value.
1790
value = osutils.set_or_unset_env(name, new)
1791
self.addCleanup(osutils.set_or_unset_env, name, value)
1794
def recordCalls(self, obj, attr_name):
1795
"""Monkeypatch in a wrapper that will record calls.
1797
The monkeypatch is automatically removed when the test concludes.
1799
:param obj: The namespace holding the reference to be replaced;
1800
typically a module, class, or object.
1801
:param attr_name: A string for the name of the attribute to
1803
:returns: A list that will be extended with one item every time the
1804
function is called, with a tuple of (args, kwargs).
1808
def decorator(*args, **kwargs):
1809
calls.append((args, kwargs))
1810
return orig(*args, **kwargs)
1811
orig = self.overrideAttr(obj, attr_name, decorator)
1516
1814
def _cleanEnvironment(self):
1518
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1519
'HOME': os.getcwd(),
1520
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1521
# tests do check our impls match APPDATA
1522
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1526
'BZREMAIL': None, # may still be present in the environment
1528
'BZR_PROGRESS_BAR': None,
1530
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
'BZR_CONCURRENCY': None,
1534
# Make sure that any text ui tests are consistent regardless of
1535
# the environment the test case is run in; you may want tests that
1536
# test other combinations. 'dumb' is a reasonable guess for tests
1537
# going to a pipe or a StringIO.
1541
'BZR_COLUMNS': '80',
1543
'SSH_AUTH_SOCK': None,
1547
'https_proxy': None,
1548
'HTTPS_PROXY': None,
1553
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1554
# least. If you do (care), please update this comment
1558
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
self.addCleanup(self._restoreEnvironment)
1567
for name, value in new_env.iteritems():
1568
self._captureVar(name, value)
1570
def _captureVar(self, name, newvalue):
1571
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1574
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1576
osutils.set_or_unset_env(name, value)
1815
for name, value in isolated_environ.iteritems():
1816
self.overrideEnv(name, value)
1578
1818
def _restoreHooks(self):
1579
1819
for klass, (name, hooks) in self._preserved_hooks.items():
1580
1820
setattr(klass, name, hooks)
1821
self._preserved_hooks.clear()
1822
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1823
self._preserved_lazy_hooks.clear()
1582
1825
def knownFailure(self, reason):
1583
"""This test has failed for some known reason."""
1584
raise KnownFailure(reason)
1826
"""Declare that this test fails for a known reason
1828
Tests that are known to fail should generally be using expectedFailure
1829
with an appropriate reverse assertion if a change could cause the test
1830
to start passing. Conversely if the test has no immediate prospect of
1831
succeeding then using skip is more suitable.
1833
When this method is called while an exception is being handled, that
1834
traceback will be used, otherwise a new exception will be thrown to
1835
provide one but won't be reported.
1837
self._add_reason(reason)
1839
exc_info = sys.exc_info()
1840
if exc_info != (None, None, None):
1841
self._report_traceback(exc_info)
1844
raise self.failureException(reason)
1845
except self.failureException:
1846
exc_info = sys.exc_info()
1847
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1848
raise testtools.testcase._ExpectedFailure(exc_info)
1852
def _suppress_log(self):
1853
"""Remove the log info from details."""
1854
self.discardDetail('log')
1586
1856
def _do_skip(self, result, reason):
1857
self._suppress_log()
1587
1858
addSkip = getattr(result, 'addSkip', None)
1588
1859
if not callable(addSkip):
1589
1860
result.addSuccess(result)
1612
1885
self._do_skip(result, reason)
1888
def _report_skip(self, result, err):
1889
"""Override the default _report_skip.
1891
We want to strip the 'log' detail. If we waint until _do_skip, it has
1892
already been formatted into the 'reason' string, and we can't pull it
1895
self._suppress_log()
1896
super(TestCase, self)._report_skip(self, result, err)
1899
def _report_expected_failure(self, result, err):
1902
See _report_skip for motivation.
1904
self._suppress_log()
1905
super(TestCase, self)._report_expected_failure(self, result, err)
1615
1908
def _do_unsupported_or_skip(self, result, e):
1616
1909
reason = e.args[0]
1910
self._suppress_log()
1617
1911
addNotSupported = getattr(result, 'addNotSupported', None)
1618
1912
if addNotSupported is not None:
1619
1913
result.addNotSupported(self, reason)
1645
1939
self._benchtime += time.time() - start
1647
1941
def log(self, *args):
1650
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1656
:param keep_log_file: When True, if the log is still a file on disk
1657
leave it as a file on disk. When False, if the log is still a file
1658
on disk, the log file is deleted and the log preserved as
1660
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1668
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1727
return "No log file content and no log file name."
1729
1944
def get_log(self):
1730
1945
"""Get a unicode string containing the log from bzrlib.trace.
2221
def _add_subprocess_log(self, log_file_path):
2222
if len(self._log_files) == 0:
2223
# Register an addCleanup func. We do this on the first call to
2224
# _add_subprocess_log rather than in TestCase.setUp so that this
2225
# addCleanup is registered after any cleanups for tempdirs that
2226
# subclasses might create, which will probably remove the log file
2228
self.addCleanup(self._subprocess_log_cleanup)
2229
# self._log_files is a set, so if a log file is reused we won't grab it
2231
self._log_files.add(log_file_path)
2233
def _subprocess_log_cleanup(self):
2234
for count, log_file_path in enumerate(self._log_files):
2235
# We use buffer_now=True to avoid holding the file open beyond
2236
# the life of this function, which might interfere with e.g.
2237
# cleaning tempdirs on Windows.
2238
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2239
#detail_content = content.content_from_file(
2240
# log_file_path, buffer_now=True)
2241
with open(log_file_path, 'rb') as log_file:
2242
log_file_bytes = log_file.read()
2243
detail_content = content.Content(content.ContentType("text",
2244
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2245
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1997
2248
def _popen(self, *args, **kwargs):
1998
2249
"""Place a call to Popen.
2000
2251
Allows tests to override this method to intercept the calls made to
2001
2252
Popen for introspection.
2003
return Popen(*args, **kwargs)
2254
return subprocess.Popen(*args, **kwargs)
2005
2256
def get_source_path(self):
2006
2257
"""Return the path of the directory containing bzrlib."""
2036
2287
if retcode is not None and retcode != process.returncode:
2037
2288
if process_args is None:
2038
2289
process_args = "(unknown args)"
2039
mutter('Output of bzr %s:\n%s', process_args, out)
2040
mutter('Error for bzr %s:\n%s', process_args, err)
2290
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2291
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2041
2292
self.fail('Command bzr %s failed with retcode %s != %s'
2042
2293
% (process_args, retcode, process.returncode))
2043
2294
return [out, err]
2045
def check_inventory_shape(self, inv, shape):
2046
"""Compare an inventory to a list of expected names.
2296
def check_tree_shape(self, tree, shape):
2297
"""Compare a tree to a list of expected names.
2048
2299
Fail if they are not precisely equal.
2051
2302
shape = list(shape) # copy
2052
for path, ie in inv.entries():
2303
for path, ie in tree.iter_entries_by_dir():
2053
2304
name = path.replace('\\', '/')
2054
2305
if ie.kind == 'directory':
2055
2306
name = name + '/'
2308
pass # ignore root entry
2057
2310
shape.remove(name)
2059
2312
extras.append(name)
2149
2402
class TestCaseWithMemoryTransport(TestCase):
2150
2403
"""Common test class for tests that do not need disk resources.
2152
Tests that need disk resources should derive from TestCaseWithTransport.
2405
Tests that need disk resources should derive from TestCaseInTempDir
2406
orTestCaseWithTransport.
2154
2408
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2156
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2410
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2157
2411
a directory which does not exist. This serves to help ensure test isolation
2158
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2159
must exist. However, TestCaseWithMemoryTransport does not offer local
2160
file defaults for the transport in tests, nor does it obey the command line
2412
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2413
must exist. However, TestCaseWithMemoryTransport does not offer local file
2414
defaults for the transport in tests, nor does it obey the command line
2161
2415
override, so tests that accidentally write to the common directory should
2164
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2165
a .bzr directory that stops us ascending higher into the filesystem.
2418
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2419
``.bzr`` directory that stops us ascending higher into the filesystem.
2168
2422
TEST_ROOT = None
2378
2646
def make_branch(self, relpath, format=None):
2379
2647
"""Create a branch on the transport at relpath."""
2380
2648
repo = self.make_repository(relpath, format=format)
2381
return repo.bzrdir.create_branch()
2649
return repo.bzrdir.create_branch(append_revisions_only=False)
2651
def get_default_format(self):
2654
def resolve_format(self, format):
2655
"""Resolve an object to a ControlDir format object.
2657
The initial format object can either already be
2658
a ControlDirFormat, None (for the default format),
2659
or a string with the name of the control dir format.
2661
:param format: Object to resolve
2662
:return A ControlDirFormat instance
2665
format = self.get_default_format()
2666
if isinstance(format, basestring):
2667
format = bzrdir.format_registry.make_bzrdir(format)
2383
2670
def make_bzrdir(self, relpath, format=None):
2385
2672
# might be a relative or absolute path
2386
2673
maybe_a_url = self.get_url(relpath)
2387
2674
segments = maybe_a_url.rsplit('/', 1)
2388
t = get_transport(maybe_a_url)
2675
t = _mod_transport.get_transport(maybe_a_url)
2389
2676
if len(segments) > 1 and segments[-1] not in ('', '.'):
2390
2677
t.ensure_base()
2393
if isinstance(format, basestring):
2394
format = bzrdir.format_registry.make_bzrdir(format)
2678
format = self.resolve_format(format)
2395
2679
return format.initialize_on_transport(t)
2396
2680
except errors.UninitializableFormat:
2397
2681
raise TestSkipped("Format %s is not initializable." % format)
2399
def make_repository(self, relpath, shared=False, format=None):
2683
def make_repository(self, relpath, shared=None, format=None):
2400
2684
"""Create a repository on our default transport at relpath.
2402
2686
Note that relpath must be a relative path, not a full url.
2427
2714
test_home_dir = self.test_home_dir
2428
2715
if isinstance(test_home_dir, unicode):
2429
2716
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2717
self.overrideEnv('HOME', test_home_dir)
2718
self.overrideEnv('BZR_HOME', test_home_dir)
2433
2720
def setUp(self):
2434
2721
super(TestCaseWithMemoryTransport, self).setUp()
2722
# Ensure that ConnectedTransport doesn't leak sockets
2723
def get_transport_from_url_with_cleanup(*args, **kwargs):
2724
t = orig_get_transport_from_url(*args, **kwargs)
2725
if isinstance(t, _mod_transport.ConnectedTransport):
2726
self.addCleanup(t.disconnect)
2729
orig_get_transport_from_url = self.overrideAttr(
2730
_mod_transport, 'get_transport_from_url',
2731
get_transport_from_url_with_cleanup)
2435
2732
self._make_test_root()
2436
2733
self.addCleanup(os.chdir, os.getcwdu())
2437
2734
self.makeAndChdirToTestDir()
3059
class TestDecorator(TestSuite):
3372
class TestDecorator(TestUtil.TestSuite):
3060
3373
"""A decorator for TestCase/TestSuite objects.
3062
Usually, subclasses should override __iter__(used when flattening test
3063
suites), which we do to filter, reorder, parallelise and so on, run() and
3375
Contains rather than flattening suite passed on construction
3067
def __init__(self, suite):
3068
TestSuite.__init__(self)
3071
def countTestCases(self):
3074
cases += test.countTestCases()
3081
def run(self, result):
3082
# Use iteration on self, not self._tests, to allow subclasses to hook
3085
if result.shouldStop:
3378
def __init__(self, suite=None):
3379
super(TestDecorator, self).__init__()
3380
if suite is not None:
3383
# Don't need subclass run method with suite emptying
3384
run = unittest.TestSuite.run
3091
3387
class CountingDecorator(TestDecorator):
3102
3398
"""A decorator which excludes test matching an exclude pattern."""
3104
3400
def __init__(self, suite, exclude_pattern):
3105
TestDecorator.__init__(self, suite)
3106
self.exclude_pattern = exclude_pattern
3107
self.excluded = False
3111
return iter(self._tests)
3112
self.excluded = True
3113
suite = exclude_tests_by_re(self, self.exclude_pattern)
3115
self.addTests(suite)
3116
return iter(self._tests)
3401
super(ExcludeDecorator, self).__init__(
3402
exclude_tests_by_re(suite, exclude_pattern))
3119
3405
class FilterTestsDecorator(TestDecorator):
3120
3406
"""A decorator which filters tests to those matching a pattern."""
3122
3408
def __init__(self, suite, pattern):
3123
TestDecorator.__init__(self, suite)
3124
self.pattern = pattern
3125
self.filtered = False
3129
return iter(self._tests)
3130
self.filtered = True
3131
suite = filter_suite_by_re(self, self.pattern)
3133
self.addTests(suite)
3134
return iter(self._tests)
3409
super(FilterTestsDecorator, self).__init__(
3410
filter_suite_by_re(suite, pattern))
3137
3413
class RandomDecorator(TestDecorator):
3138
3414
"""A decorator which randomises the order of its tests."""
3140
3416
def __init__(self, suite, random_seed, stream):
3141
TestDecorator.__init__(self, suite)
3142
self.random_seed = random_seed
3143
self.randomised = False
3144
self.stream = stream
3148
return iter(self._tests)
3149
self.randomised = True
3150
self.stream.write("Randomizing test order using seed %s\n\n" %
3151
(self.actual_seed()))
3417
random_seed = self.actual_seed(random_seed)
3418
stream.write("Randomizing test order using seed %s\n\n" %
3152
3420
# Initialise the random number generator.
3153
random.seed(self.actual_seed())
3154
suite = randomize_suite(self)
3156
self.addTests(suite)
3157
return iter(self._tests)
3421
random.seed(random_seed)
3422
super(RandomDecorator, self).__init__(randomize_suite(suite))
3159
def actual_seed(self):
3160
if self.random_seed == "now":
3425
def actual_seed(seed):
3161
3427
# We convert the seed to a long to make it reuseable across
3162
3428
# invocations (because the user can reenter it).
3163
self.random_seed = long(time.time())
3429
return long(time.time())
3165
3431
# Convert the seed to a long if we can
3167
self.random_seed = long(self.random_seed)
3434
except (TypeError, ValueError):
3170
return self.random_seed
3173
3439
class TestFirstDecorator(TestDecorator):
3174
3440
"""A decorator which moves named tests to the front."""
3176
3442
def __init__(self, suite, pattern):
3177
TestDecorator.__init__(self, suite)
3178
self.pattern = pattern
3179
self.filtered = False
3183
return iter(self._tests)
3184
self.filtered = True
3185
suites = split_suite_by_re(self, self.pattern)
3187
self.addTests(suites)
3188
return iter(self._tests)
3443
super(TestFirstDecorator, self).__init__()
3444
self.addTests(split_suite_by_re(suite, pattern))
3191
3447
def partition_tests(suite, count):
3192
3448
"""Partition suite into count lists of tests."""
3194
tests = list(iter_suite_tests(suite))
3195
tests_per_process = int(math.ceil(float(len(tests)) / count))
3196
for block in range(count):
3197
low_test = block * tests_per_process
3198
high_test = low_test + tests_per_process
3199
process_tests = tests[low_test:high_test]
3200
result.append(process_tests)
3449
# This just assigns tests in a round-robin fashion. On one hand this
3450
# splits up blocks of related tests that might run faster if they shared
3451
# resources, but on the other it avoids assigning blocks of slow tests to
3452
# just one partition. So the slowest partition shouldn't be much slower
3454
partitions = [list() for i in range(count)]
3455
tests = iter_suite_tests(suite)
3456
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3457
partition.append(test)
3204
3461
def workaround_zealous_crypto_random():
3325
class ForwardingResult(unittest.TestResult):
3327
def __init__(self, target):
3328
unittest.TestResult.__init__(self)
3329
self.result = target
3331
def startTest(self, test):
3332
self.result.startTest(test)
3334
def stopTest(self, test):
3335
self.result.stopTest(test)
3337
def startTestRun(self):
3338
self.result.startTestRun()
3340
def stopTestRun(self):
3341
self.result.stopTestRun()
3343
def addSkip(self, test, reason):
3344
self.result.addSkip(test, reason)
3346
def addSuccess(self, test):
3347
self.result.addSuccess(test)
3349
def addError(self, test, err):
3350
self.result.addError(test, err)
3352
def addFailure(self, test, err):
3353
self.result.addFailure(test, err)
3354
ForwardingResult = testtools.ExtendedToOriginalDecorator
3357
class ProfileResult(ForwardingResult):
3590
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3358
3591
"""Generate profiling data for all activity between start and success.
3360
3593
The profile data is appended to the test's _benchcalls attribute and can
3648
3892
'bzrlib.tests.per_repository',
3649
3893
'bzrlib.tests.per_repository_chk',
3650
3894
'bzrlib.tests.per_repository_reference',
3895
'bzrlib.tests.per_repository_vf',
3651
3896
'bzrlib.tests.per_uifactory',
3652
3897
'bzrlib.tests.per_versionedfile',
3653
3898
'bzrlib.tests.per_workingtree',
3654
3899
'bzrlib.tests.test__annotator',
3655
3900
'bzrlib.tests.test__bencode',
3901
'bzrlib.tests.test__btree_serializer',
3656
3902
'bzrlib.tests.test__chk_map',
3657
3903
'bzrlib.tests.test__dirstate_helpers',
3658
3904
'bzrlib.tests.test__groupcompress',
3686
3932
'bzrlib.tests.test_commit_merge',
3687
3933
'bzrlib.tests.test_config',
3688
3934
'bzrlib.tests.test_conflicts',
3935
'bzrlib.tests.test_controldir',
3689
3936
'bzrlib.tests.test_counted_lock',
3690
3937
'bzrlib.tests.test_crash',
3691
3938
'bzrlib.tests.test_decorators',
3692
3939
'bzrlib.tests.test_delta',
3693
3940
'bzrlib.tests.test_debug',
3694
'bzrlib.tests.test_deprecated_graph',
3695
3941
'bzrlib.tests.test_diff',
3696
3942
'bzrlib.tests.test_directory_service',
3697
3943
'bzrlib.tests.test_dirstate',
3698
3944
'bzrlib.tests.test_email_message',
3699
3945
'bzrlib.tests.test_eol_filters',
3700
3946
'bzrlib.tests.test_errors',
3947
'bzrlib.tests.test_estimate_compressed_size',
3701
3948
'bzrlib.tests.test_export',
3949
'bzrlib.tests.test_export_pot',
3702
3950
'bzrlib.tests.test_extract',
3951
'bzrlib.tests.test_features',
3703
3952
'bzrlib.tests.test_fetch',
3953
'bzrlib.tests.test_fixtures',
3704
3954
'bzrlib.tests.test_fifo_cache',
3705
3955
'bzrlib.tests.test_filters',
3956
'bzrlib.tests.test_filter_tree',
3706
3957
'bzrlib.tests.test_ftp_transport',
3707
3958
'bzrlib.tests.test_foreign',
3708
3959
'bzrlib.tests.test_generate_docs',
3938
4202
# Some tests mentioned in the list are not in the test suite. The
3939
4203
# list may be out of date, report to the tester.
3940
4204
for id in not_found:
3941
bzrlib.trace.warning('"%s" not found in the test suite', id)
4205
trace.warning('"%s" not found in the test suite', id)
3942
4206
for id in duplicates:
3943
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4207
trace.warning('"%s" is used as an id by several tests', id)
3948
def multiply_scenarios(scenarios_left, scenarios_right):
4212
def multiply_scenarios(*scenarios):
4213
"""Multiply two or more iterables of scenarios.
4215
It is safe to pass scenario generators or iterators.
4217
:returns: A list of compound scenarios: the cross-product of all
4218
scenarios, with the names concatenated and the parameters
4221
return reduce(_multiply_two_scenarios, map(list, scenarios))
4224
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3949
4225
"""Multiply two sets of scenarios.
3951
4227
:returns: the cartesian product of the two sets of scenarios, that is
4102
4391
if test_id != None:
4103
4392
ui.ui_factory.clear_term()
4104
4393
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4394
# Ugly, but the last thing we want here is fail, so bear with it.
4395
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4396
).encode('ascii', 'replace')
4105
4397
sys.stderr.write('Unable to remove testing dir %s\n%s'
4106
% (os.path.basename(dirname), e))
4109
class Feature(object):
4110
"""An operating system Feature."""
4113
self._available = None
4115
def available(self):
4116
"""Is the feature available?
4118
:return: True if the feature is available.
4120
if self._available is None:
4121
self._available = self._probe()
4122
return self._available
4125
"""Implement this method in concrete features.
4127
:return: True if the feature is available.
4129
raise NotImplementedError
4132
if getattr(self, 'feature_name', None):
4133
return self.feature_name()
4134
return self.__class__.__name__
4137
class _SymlinkFeature(Feature):
4140
return osutils.has_symlinks()
4142
def feature_name(self):
4145
SymlinkFeature = _SymlinkFeature()
4148
class _HardlinkFeature(Feature):
4151
return osutils.has_hardlinks()
4153
def feature_name(self):
4156
HardlinkFeature = _HardlinkFeature()
4159
class _OsFifoFeature(Feature):
4162
return getattr(os, 'mkfifo', None)
4164
def feature_name(self):
4165
return 'filesystem fifos'
4167
OsFifoFeature = _OsFifoFeature()
4170
class _UnicodeFilenameFeature(Feature):
4171
"""Does the filesystem support Unicode filenames?"""
4175
# Check for character combinations unlikely to be covered by any
4176
# single non-unicode encoding. We use the characters
4177
# - greek small letter alpha (U+03B1) and
4178
# - braille pattern dots-123456 (U+283F).
4179
os.stat(u'\u03b1\u283f')
4180
except UnicodeEncodeError:
4182
except (IOError, OSError):
4183
# The filesystem allows the Unicode filename but the file doesn't
4187
# The filesystem allows the Unicode filename and the file exists,
4191
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4194
class _CompatabilityThunkFeature(Feature):
4195
"""This feature is just a thunk to another feature.
4197
It issues a deprecation warning if it is accessed, to let you know that you
4198
should really use a different feature.
4201
def __init__(self, dep_version, module, name,
4202
replacement_name, replacement_module=None):
4203
super(_CompatabilityThunkFeature, self).__init__()
4204
self._module = module
4205
if replacement_module is None:
4206
replacement_module = module
4207
self._replacement_module = replacement_module
4209
self._replacement_name = replacement_name
4210
self._dep_version = dep_version
4211
self._feature = None
4214
if self._feature is None:
4215
depr_msg = self._dep_version % ('%s.%s'
4216
% (self._module, self._name))
4217
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4218
self._replacement_name)
4219
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4220
# Import the new feature and use it as a replacement for the
4222
mod = __import__(self._replacement_module, {}, {},
4223
[self._replacement_name])
4224
self._feature = getattr(mod, self._replacement_name)
4228
return self._feature._probe()
4231
class ModuleAvailableFeature(Feature):
4232
"""This is a feature than describes a module we want to be available.
4234
Declare the name of the module in __init__(), and then after probing, the
4235
module will be available as 'self.module'.
4237
:ivar module: The module if it is available, else None.
4240
def __init__(self, module_name):
4241
super(ModuleAvailableFeature, self).__init__()
4242
self.module_name = module_name
4246
self._module = __import__(self.module_name, {}, {}, [''])
4253
if self.available(): # Make sure the probe has been done
4257
def feature_name(self):
4258
return self.module_name
4261
# This is kept here for compatibility, it is recommended to use
4262
# 'bzrlib.tests.feature.paramiko' instead
4263
ParamikoFeature = _CompatabilityThunkFeature(
4264
deprecated_in((2,1,0)),
4265
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4398
% (os.path.basename(dirname), printable_e))
4268
4401
def probe_unicode_in_user_encoding():
4301
class _HTTPSServerFeature(Feature):
4302
"""Some tests want an https Server, check if one is available.
4304
Right now, the only way this is available is under python2.6 which provides
4315
def feature_name(self):
4316
return 'HTTPSServer'
4319
HTTPSServerFeature = _HTTPSServerFeature()
4322
class _UnicodeFilename(Feature):
4323
"""Does the filesystem support Unicode filenames?"""
4328
except UnicodeEncodeError:
4330
except (IOError, OSError):
4331
# The filesystem allows the Unicode filename but the file doesn't
4335
# The filesystem allows the Unicode filename and the file exists,
4339
UnicodeFilename = _UnicodeFilename()
4342
class _UTF8Filesystem(Feature):
4343
"""Is the filesystem UTF-8?"""
4346
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4350
UTF8Filesystem = _UTF8Filesystem()
4353
class _BreakinFeature(Feature):
4354
"""Does this platform support the breakin feature?"""
4357
from bzrlib import breakin
4358
if breakin.determine_signal() is None:
4360
if sys.platform == 'win32':
4361
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4362
# We trigger SIGBREAK via a Console api so we need ctypes to
4363
# access the function
4370
def feature_name(self):
4371
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4374
BreakinFeature = _BreakinFeature()
4377
class _CaseInsCasePresFilenameFeature(Feature):
4378
"""Is the file-system case insensitive, but case-preserving?"""
4381
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4383
# first check truly case-preserving for created files, then check
4384
# case insensitive when opening existing files.
4385
name = osutils.normpath(name)
4386
base, rel = osutils.split(name)
4387
found_rel = osutils.canonical_relpath(base, name)
4388
return (found_rel == rel
4389
and os.path.isfile(name.upper())
4390
and os.path.isfile(name.lower()))
4395
def feature_name(self):
4396
return "case-insensitive case-preserving filesystem"
4398
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4401
class _CaseInsensitiveFilesystemFeature(Feature):
4402
"""Check if underlying filesystem is case-insensitive but *not* case
4405
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4406
# more likely to be case preserving, so this case is rare.
4409
if CaseInsCasePresFilenameFeature.available():
4412
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4413
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4414
TestCaseWithMemoryTransport.TEST_ROOT = root
4416
root = TestCaseWithMemoryTransport.TEST_ROOT
4417
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4419
name_a = osutils.pathjoin(tdir, 'a')
4420
name_A = osutils.pathjoin(tdir, 'A')
4422
result = osutils.isdir(name_A)
4423
_rmtree_temp_dir(tdir)
4426
def feature_name(self):
4427
return 'case-insensitive filesystem'
4429
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4432
class _CaseSensitiveFilesystemFeature(Feature):
4435
if CaseInsCasePresFilenameFeature.available():
4437
elif CaseInsensitiveFilesystemFeature.available():
4442
def feature_name(self):
4443
return 'case-sensitive filesystem'
4445
# new coding style is for feature instances to be lowercase
4446
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4449
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4450
SubUnitFeature = _CompatabilityThunkFeature(
4451
deprecated_in((2,1,0)),
4452
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4453
4434
# Only define SubUnitBzrRunner if subunit is available.
4455
4436
from subunit import TestProtocolClient
4456
4437
from subunit.test_results import AutoTimingTestResultDecorator
4438
class SubUnitBzrProtocolClient(TestProtocolClient):
4440
def stopTest(self, test):
4441
super(SubUnitBzrProtocolClient, self).stopTest(test)
4442
_clear__type_equality_funcs(test)
4444
def addSuccess(self, test, details=None):
4445
# The subunit client always includes the details in the subunit
4446
# stream, but we don't want to include it in ours.
4447
if details is not None and 'log' in details:
4449
return super(SubUnitBzrProtocolClient, self).addSuccess(
4457
4452
class SubUnitBzrRunner(TextTestRunner):
4458
4453
def run(self, test):
4459
4454
result = AutoTimingTestResultDecorator(
4460
TestProtocolClient(self.stream))
4455
SubUnitBzrProtocolClient(self.stream))
4461
4456
test.run(result)
4463
4458
except ImportError:
4466
class _PosixPermissionsFeature(Feature):
4470
# create temporary file and check if specified perms are maintained.
4473
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4474
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4477
os.chmod(name, write_perms)
4479
read_perms = os.stat(name).st_mode & 0777
4481
return (write_perms == read_perms)
4483
return (os.name == 'posix') and has_perms()
4485
def feature_name(self):
4486
return 'POSIX permissions support'
4488
posix_permissions_feature = _PosixPermissionsFeature()
4462
@deprecated_function(deprecated_in((2, 5, 0)))
4463
def ModuleAvailableFeature(name):
4464
from bzrlib.tests import features
4465
return features.ModuleAvailableFeature(name)