55
51
# nb: check this before importing anything else from within it
56
52
_testtools_version = getattr(testtools, '__version__', ())
57
if _testtools_version < (0, 9, 2):
58
raise ImportError("need at least testtools 0.9.2: %s is %r"
53
if _testtools_version < (0, 9, 5):
54
raise ImportError("need at least testtools 0.9.5: %s is %r"
59
55
% (testtools.__file__, _testtools_version))
60
56
from testtools import content
62
59
from bzrlib import (
63
commands as _mod_commands,
73
plugin as _mod_plugin,
80
transport as _mod_transport,
80
import bzrlib.commands
81
import bzrlib.timestamp
83
import bzrlib.inventory
84
import bzrlib.iterablefile
87
84
import bzrlib.lsprof
88
85
except ImportError:
89
86
# lsprof not available
91
from bzrlib.merge import merge_inner
94
from bzrlib.smart import client, request, server
96
from bzrlib import symbol_versioning
88
from bzrlib.smart import client, request
89
from bzrlib.transport import (
97
93
from bzrlib.symbol_versioning import (
99
94
deprecated_function,
105
from bzrlib.transport import (
110
import bzrlib.transport
111
from bzrlib.trace import mutter, note
112
97
from bzrlib.tests import (
117
from bzrlib.tests.http_server import HttpServer
118
from bzrlib.tests.TestUtil import (
122
103
from bzrlib.ui import NullProgressView
123
104
from bzrlib.ui.text import TextUIFactory
124
import bzrlib.version_info_formats.format_custom
125
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
105
from bzrlib.tests.features import _CompatabilityThunkFeature
127
107
# Mark this python module as being part of the implementation
128
108
# of unittest: this gives us better tracebacks where the last
140
120
SUBUNIT_SEEK_SET = 0
141
121
SUBUNIT_SEEK_CUR = 1
144
class ExtendedTestResult(unittest._TextTestResult):
123
# These are intentionally brought into this namespace. That way plugins, etc
124
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
125
TestSuite = TestUtil.TestSuite
126
TestLoader = TestUtil.TestLoader
128
# Tests should run in a clean and clearly defined environment. The goal is to
129
# keep them isolated from the running environment as mush as possible. The test
130
# framework ensures the variables defined below are set (or deleted if the
131
# value is None) before a test is run and reset to their original value after
132
# the test is run. Generally if some code depends on an environment variable,
133
# the tests should start without this variable in the environment. There are a
134
# few exceptions but you shouldn't violate this rule lightly.
138
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
139
# tests do check our impls match APPDATA
140
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
144
'BZREMAIL': None, # may still be present in the environment
145
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
146
'BZR_PROGRESS_BAR': None,
147
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
148
# as a base class instead of TestCaseInTempDir. Tests inheriting from
149
# TestCase should not use disk resources, BZR_LOG is one.
150
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
151
'BZR_PLUGIN_PATH': None,
152
'BZR_DISABLE_PLUGINS': None,
153
'BZR_PLUGINS_AT': None,
154
'BZR_CONCURRENCY': None,
155
# Make sure that any text ui tests are consistent regardless of
156
# the environment the test case is run in; you may want tests that
157
# test other combinations. 'dumb' is a reasonable guess for tests
158
# going to a pipe or a StringIO.
164
'SSH_AUTH_SOCK': None,
174
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
175
# least. If you do (care), please update this comment
179
'BZR_REMOTE_PATH': None,
180
# Generally speaking, we don't want apport reporting on crashes in
181
# the test envirnoment unless we're specifically testing apport,
182
# so that it doesn't leak into the real system environment. We
183
# use an env var so it propagates to subprocesses.
184
'APPORT_DISABLE': '1',
188
def override_os_environ(test, env=None):
189
"""Modify os.environ keeping a copy.
191
:param test: A test instance
193
:param env: A dict containing variable definitions to be installed
196
env = isolated_environ
197
test._original_os_environ = dict([(var, value)
198
for var, value in os.environ.iteritems()])
199
for var, value in env.iteritems():
200
osutils.set_or_unset_env(var, value)
201
if var not in test._original_os_environ:
202
# The var is new, add it with a value of None, so
203
# restore_os_environ will delete it
204
test._original_os_environ[var] = None
207
def restore_os_environ(test):
208
"""Restore os.environ to its original state.
210
:param test: A test instance previously passed to override_os_environ.
212
for var, value in test._original_os_environ.iteritems():
213
# Restore the original value (or delete it if the value has been set to
214
# None in override_os_environ).
215
osutils.set_or_unset_env(var, value)
218
def _clear__type_equality_funcs(test):
219
"""Cleanup bound methods stored on TestCase instances
221
Clear the dict breaking a few (mostly) harmless cycles in the affected
222
unittests released with Python 2.6 and initial Python 2.7 versions.
224
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
225
shipped in Oneiric, an object with no clear method was used, hence the
226
extra complications, see bug 809048 for details.
228
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
229
if type_equality_funcs is not None:
230
tef_clear = getattr(type_equality_funcs, "clear", None)
231
if tef_clear is None:
232
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
233
if tef_instance_dict is not None:
234
tef_clear = tef_instance_dict.clear
235
if tef_clear is not None:
239
class ExtendedTestResult(testtools.TextTestResult):
145
240
"""Accepts, reports and accumulates the results of running tests.
147
242
Compared to the unittest version this class adds support for
218
318
if failed or errored: self.stream.write(", ")
219
319
self.stream.write("known_failure_count=%d" %
220
320
self.known_failure_count)
221
self.stream.writeln(")")
321
self.stream.write(")\n")
223
323
if self.known_failure_count:
224
self.stream.writeln("OK (known_failures=%d)" %
324
self.stream.write("OK (known_failures=%d)\n" %
225
325
self.known_failure_count)
227
self.stream.writeln("OK")
327
self.stream.write("OK\n")
228
328
if self.skip_count > 0:
229
329
skipped = self.skip_count
230
self.stream.writeln('%d test%s skipped' %
330
self.stream.write('%d test%s skipped\n' %
231
331
(skipped, skipped != 1 and "s" or ""))
232
332
if self.unsupported:
233
333
for feature, count in sorted(self.unsupported.items()):
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
334
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
235
335
(feature, count))
237
337
ok = self.wasStrictlySuccessful()
239
339
ok = self.wasSuccessful()
240
if TestCase._first_thread_leaker_id:
340
if self._first_thread_leaker_id:
241
341
self.stream.write(
242
342
'%s is leaking threads among %d leaking tests.\n' % (
243
TestCase._first_thread_leaker_id,
244
TestCase._leaking_threads_tests))
343
self._first_thread_leaker_id,
344
self._tests_leaking_threads_count))
245
345
# We don't report the main thread as an active one.
246
346
self.stream.write(
247
347
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
348
% (len(self._active_threads) - 1))
250
350
def getDescription(self, test):
276
377
def _shortened_test_description(self, test):
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
379
what = re.sub(r'^bzrlib\.tests\.', '', what)
382
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
383
# multiple times in a row, because the handler is added for
384
# each test but the container list is shared between cases.
385
# See lp:498869 lp:625574 and lp:637725 for background.
386
def _record_traceback_from_test(self, exc_info):
387
"""Store the traceback from passed exc_info tuple till"""
388
self._traceback_from_test = exc_info[2]
281
390
def startTest(self, test):
282
unittest.TestResult.startTest(self, test)
391
super(ExtendedTestResult, self).startTest(test)
283
392
if self.count == 0:
284
393
self.startTests()
285
395
self.report_test_start(test)
286
396
test.number = self.count
287
397
self._recordTestStartTime()
398
# Make testtools cases give us the real traceback on failure
399
addOnException = getattr(test, "addOnException", None)
400
if addOnException is not None:
401
addOnException(self._record_traceback_from_test)
402
# Only check for thread leaks on bzrlib derived test cases
403
if isinstance(test, TestCase):
404
test.addCleanup(self._check_leaked_threads, test)
406
def stopTest(self, test):
407
super(ExtendedTestResult, self).stopTest(test)
408
# Manually break cycles, means touching various private things but hey
409
getDetails = getattr(test, "getDetails", None)
410
if getDetails is not None:
412
_clear__type_equality_funcs(test)
413
self._traceback_from_test = None
289
415
def startTests(self):
291
if getattr(sys, 'frozen', None) is None:
292
bzr_path = osutils.realpath(sys.argv[0])
294
bzr_path = sys.executable
296
'bzr selftest: %s\n' % (bzr_path,))
299
bzrlib.__path__[0],))
301
' bzr-%s python-%s %s\n' % (
302
bzrlib.version_string,
303
bzrlib._format_version_tuple(sys.version_info),
304
platform.platform(aliased=1),
306
self.stream.write('\n')
416
self.report_tests_starting()
417
self._active_threads = threading.enumerate()
419
def _check_leaked_threads(self, test):
420
"""See if any threads have leaked since last call
422
A sample of live threads is stored in the _active_threads attribute,
423
when this method runs it compares the current live threads and any not
424
in the previous sample are treated as having leaked.
426
now_active_threads = set(threading.enumerate())
427
threads_leaked = now_active_threads.difference(self._active_threads)
429
self._report_thread_leak(test, threads_leaked, now_active_threads)
430
self._tests_leaking_threads_count += 1
431
if self._first_thread_leaker_id is None:
432
self._first_thread_leaker_id = test.id()
433
self._active_threads = now_active_threads
308
435
def _recordTestStartTime(self):
309
436
"""Record that a test has started."""
310
self._start_time = time.time()
312
def _cleanupLogFile(self, test):
313
# We can only do this if we have one of our TestCases, not if
315
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
316
if setKeepLogfile is not None:
437
self._start_datetime = self._now()
319
439
def addError(self, test, err):
320
440
"""Tell result that test finished with an error.
356
474
self._formatTime(benchmark_time),
358
476
self.report_success(test)
359
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
477
super(ExtendedTestResult, self).addSuccess(test)
361
478
test._log_contents = ''
363
480
def addExpectedFailure(self, test, err):
364
481
self.known_failure_count += 1
365
482
self.report_known_failure(test, err)
484
def addUnexpectedSuccess(self, test, details=None):
485
"""Tell result the test unexpectedly passed, counting as a failure
487
When the minimum version of testtools required becomes 0.9.8 this
488
can be updated to use the new handling there.
490
super(ExtendedTestResult, self).addFailure(test, details=details)
491
self.failure_count += 1
492
self.report_unexpected_success(test,
493
"".join(details["reason"].iter_text()))
367
497
def addNotSupported(self, test, feature):
368
498
"""The test will not be run because of a missing feature.
401
536
raise errors.BzrError("Unknown whence %r" % whence)
403
def report_cleaning_up(self):
538
def report_tests_starting(self):
539
"""Display information before the test run begins"""
540
if getattr(sys, 'frozen', None) is None:
541
bzr_path = osutils.realpath(sys.argv[0])
543
bzr_path = sys.executable
545
'bzr selftest: %s\n' % (bzr_path,))
548
bzrlib.__path__[0],))
550
' bzr-%s python-%s %s\n' % (
551
bzrlib.version_string,
552
bzrlib._format_version_tuple(sys.version_info),
553
platform.platform(aliased=1),
555
self.stream.write('\n')
557
def report_test_start(self, test):
558
"""Display information on the test just about to be run"""
560
def _report_thread_leak(self, test, leaked_threads, active_threads):
561
"""Display information on a test that leaked one or more threads"""
562
# GZ 2010-09-09: A leak summary reported separately from the general
563
# thread debugging would be nice. Tests under subunit
564
# need something not using stream, perhaps adding a
565
# testtools details object would be fitting.
566
if 'threads' in selftest_debug_flags:
567
self.stream.write('%s is leaking, active is now %d\n' %
568
(test.id(), len(active_threads)))
406
570
def startTestRun(self):
407
571
self.startTime = time.time()
551
712
return '%s%s' % (indent, err[1])
553
714
def report_error(self, test, err):
554
self.stream.writeln('ERROR %s\n%s'
715
self.stream.write('ERROR %s\n%s\n'
555
716
% (self._testTimeString(test),
556
717
self._error_summary(err)))
558
719
def report_failure(self, test, err):
559
self.stream.writeln(' FAIL %s\n%s'
720
self.stream.write(' FAIL %s\n%s\n'
560
721
% (self._testTimeString(test),
561
722
self._error_summary(err)))
563
724
def report_known_failure(self, test, err):
564
self.stream.writeln('XFAIL %s\n%s'
725
self.stream.write('XFAIL %s\n%s\n'
565
726
% (self._testTimeString(test),
566
727
self._error_summary(err)))
729
def report_unexpected_success(self, test, reason):
730
self.stream.write(' FAIL %s\n%s: %s\n'
731
% (self._testTimeString(test),
732
"Unexpected success. Should have failed",
568
735
def report_success(self, test):
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
736
self.stream.write(' OK %s\n' % self._testTimeString(test))
570
737
for bench_called, stats in getattr(test, '_benchcalls', []):
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
738
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
572
739
stats.pprint(file=self.stream)
573
740
# flush the stream so that we get smooth output. This verbose mode is
574
741
# used to show the output in PQM.
575
742
self.stream.flush()
577
744
def report_skip(self, test, reason):
578
self.stream.writeln(' SKIP %s\n%s'
745
self.stream.write(' SKIP %s\n%s\n'
579
746
% (self._testTimeString(test), reason))
581
748
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
749
self.stream.write(' N/A %s\n %s\n'
583
750
% (self._testTimeString(test), reason))
585
752
def report_unsupported(self, test, feature):
586
753
"""test cannot be run because feature is missing."""
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
754
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
588
755
%(self._testTimeString(test), feature))
827
1020
self._track_transports()
828
1021
self._track_locks()
829
1022
self._clear_debug_flags()
830
TestCase._active_threads = threading.activeCount()
831
self.addCleanup(self._check_leaked_threads)
1023
# Isolate global verbosity level, to make sure it's reproducible
1024
# between tests. We should get rid of this altogether: bug 656694. --
1026
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
1027
# Isolate config option expansion until its default value for bzrlib is
1028
# settled on or a the FIXME associated with _get_expand_default_value
1029
# is addressed -- vila 20110219
1030
self.overrideAttr(config, '_expand_default_value', None)
1031
self._log_files = set()
1032
# Each key in the ``_counters`` dict holds a value for a different
1033
# counter. When the test ends, addDetail() should be used to output the
1034
# counter values. This happens in install_counter_hook().
1036
if 'config_stats' in selftest_debug_flags:
1037
self._install_config_stats_hooks()
1038
# Do not use i18n for tests (unless the test reverses this)
833
1041
def debug(self):
834
1042
# debug a frame up.
836
pdb.Pdb().set_trace(sys._getframe().f_back)
838
def _check_leaked_threads(self):
839
active = threading.activeCount()
840
leaked_threads = active - TestCase._active_threads
841
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
TestCase._leaking_threads_tests += 1
850
if TestCase._first_thread_leaker_id is None:
851
TestCase._first_thread_leaker_id = self.id()
1044
# The sys preserved stdin/stdout should allow blackbox tests debugging
1045
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1046
).set_trace(sys._getframe().f_back)
1048
def discardDetail(self, name):
1049
"""Extend the addDetail, getDetails api so we can remove a detail.
1051
eg. bzr always adds the 'log' detail at startup, but we don't want to
1052
include it for skipped, xfail, etc tests.
1054
It is safe to call this for a detail that doesn't exist, in case this
1055
gets called multiple times.
1057
# We cheat. details is stored in __details which means we shouldn't
1058
# touch it. but getDetails() returns the dict directly, so we can
1060
details = self.getDetails()
1064
def install_counter_hook(self, hooks, name, counter_name=None):
1065
"""Install a counting hook.
1067
Any hook can be counted as long as it doesn't need to return a value.
1069
:param hooks: Where the hook should be installed.
1071
:param name: The hook name that will be counted.
1073
:param counter_name: The counter identifier in ``_counters``, defaults
1076
_counters = self._counters # Avoid closing over self
1077
if counter_name is None:
1079
if _counters.has_key(counter_name):
1080
raise AssertionError('%s is already used as a counter name'
1082
_counters[counter_name] = 0
1083
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1084
lambda: ['%d' % (_counters[counter_name],)]))
1085
def increment_counter(*args, **kwargs):
1086
_counters[counter_name] += 1
1087
label = 'count %s calls' % (counter_name,)
1088
hooks.install_named_hook(name, increment_counter, label)
1089
self.addCleanup(hooks.uninstall_named_hook, name, label)
1091
def _install_config_stats_hooks(self):
1092
"""Install config hooks to count hook calls.
1095
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1096
self.install_counter_hook(config.ConfigHooks, hook_name,
1097
'config.%s' % (hook_name,))
1099
# The OldConfigHooks are private and need special handling to protect
1100
# against recursive tests (tests that run other tests), so we just do
1101
# manually what registering them into _builtin_known_hooks will provide
1103
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1104
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1105
self.install_counter_hook(config.OldConfigHooks, hook_name,
1106
'old_config.%s' % (hook_name,))
853
1108
def _clear_debug_flags(self):
854
1109
"""Prevent externally set debug flags affecting tests.
1322
1590
self.assertEqual(expected_docstring, obj.__doc__)
1592
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1324
1593
def failUnlessExists(self, path):
1594
return self.assertPathExists(path)
1596
def assertPathExists(self, path):
1325
1597
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1598
if not isinstance(path, basestring):
1328
self.failUnlessExists(p)
1600
self.assertPathExists(p)
1330
self.failUnless(osutils.lexists(path),path+" does not exist")
1602
self.assertTrue(osutils.lexists(path),
1603
path + " does not exist")
1605
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1332
1606
def failIfExists(self, path):
1607
return self.assertPathDoesNotExist(path)
1609
def assertPathDoesNotExist(self, path):
1333
1610
"""Fail if path or paths, which may be abs or relative, exist."""
1334
1611
if not isinstance(path, basestring):
1336
self.failIfExists(p)
1613
self.assertPathDoesNotExist(p)
1338
self.failIf(osutils.lexists(path),path+" exists")
1615
self.assertFalse(osutils.lexists(path),
1340
1618
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1341
1619
"""A helper for callDeprecated and applyDeprecated.
1453
1732
def _startLogFile(self):
1454
"""Send bzr and test log messages to a temporary file.
1456
The file is removed as the test is torn down.
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1459
self._log_file = os.fdopen(fileno, 'w+')
1460
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1461
self._log_file_name = name
1733
"""Setup a in-memory target for bzr and testcase log messages"""
1734
pseudo_log_file = StringIO()
1735
def _get_log_contents_for_weird_testtools_api():
1736
return [pseudo_log_file.getvalue().decode(
1737
"utf-8", "replace").encode("utf-8")]
1738
self.addDetail("log", content.Content(content.ContentType("text",
1739
"plain", {"charset": "utf8"}),
1740
_get_log_contents_for_weird_testtools_api))
1741
self._log_file = pseudo_log_file
1742
self._log_memento = trace.push_log_file(self._log_file)
1462
1743
self.addCleanup(self._finishLogFile)
1464
1745
def _finishLogFile(self):
1465
"""Finished with the log file.
1467
Close the file and delete it, unless setKeepLogfile was called.
1469
if bzrlib.trace._trace_file:
1746
"""Flush and dereference the in-memory log for this testcase"""
1747
if trace._trace_file:
1470
1748
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1472
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1749
trace._trace_file.flush()
1750
trace.pop_log_file(self._log_memento)
1751
# The logging module now tracks references for cleanup so discard ours
1752
del self._log_memento
1476
1754
def thisFailsStrictLockCheck(self):
1477
1755
"""It is known that this test would fail with -Dstrict_locks.
1513
1786
setattr(obj, attr_name, new)
1789
def overrideEnv(self, name, new):
1790
"""Set an environment variable, and reset it after the test.
1792
:param name: The environment variable name.
1794
:param new: The value to set the variable to. If None, the
1795
variable is deleted from the environment.
1797
:returns: The actual variable value.
1799
value = osutils.set_or_unset_env(name, new)
1800
self.addCleanup(osutils.set_or_unset_env, name, value)
1803
def recordCalls(self, obj, attr_name):
1804
"""Monkeypatch in a wrapper that will record calls.
1806
The monkeypatch is automatically removed when the test concludes.
1808
:param obj: The namespace holding the reference to be replaced;
1809
typically a module, class, or object.
1810
:param attr_name: A string for the name of the attribute to
1812
:returns: A list that will be extended with one item every time the
1813
function is called, with a tuple of (args, kwargs).
1817
def decorator(*args, **kwargs):
1818
calls.append((args, kwargs))
1819
return orig(*args, **kwargs)
1820
orig = self.overrideAttr(obj, attr_name, decorator)
1516
1823
def _cleanEnvironment(self):
1518
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1519
'HOME': os.getcwd(),
1520
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1521
# tests do check our impls match APPDATA
1522
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1526
'BZREMAIL': None, # may still be present in the environment
1528
'BZR_PROGRESS_BAR': None,
1530
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
'BZR_CONCURRENCY': None,
1534
# Make sure that any text ui tests are consistent regardless of
1535
# the environment the test case is run in; you may want tests that
1536
# test other combinations. 'dumb' is a reasonable guess for tests
1537
# going to a pipe or a StringIO.
1541
'BZR_COLUMNS': '80',
1543
'SSH_AUTH_SOCK': None,
1547
'https_proxy': None,
1548
'HTTPS_PROXY': None,
1553
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1554
# least. If you do (care), please update this comment
1558
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
self.addCleanup(self._restoreEnvironment)
1567
for name, value in new_env.iteritems():
1568
self._captureVar(name, value)
1570
def _captureVar(self, name, newvalue):
1571
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1574
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1576
osutils.set_or_unset_env(name, value)
1824
for name, value in isolated_environ.iteritems():
1825
self.overrideEnv(name, value)
1578
1827
def _restoreHooks(self):
1579
1828
for klass, (name, hooks) in self._preserved_hooks.items():
1580
1829
setattr(klass, name, hooks)
1830
self._preserved_hooks.clear()
1831
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1832
self._preserved_lazy_hooks.clear()
1582
1834
def knownFailure(self, reason):
1583
"""This test has failed for some known reason."""
1584
raise KnownFailure(reason)
1835
"""Declare that this test fails for a known reason
1837
Tests that are known to fail should generally be using expectedFailure
1838
with an appropriate reverse assertion if a change could cause the test
1839
to start passing. Conversely if the test has no immediate prospect of
1840
succeeding then using skip is more suitable.
1842
When this method is called while an exception is being handled, that
1843
traceback will be used, otherwise a new exception will be thrown to
1844
provide one but won't be reported.
1846
self._add_reason(reason)
1848
exc_info = sys.exc_info()
1849
if exc_info != (None, None, None):
1850
self._report_traceback(exc_info)
1853
raise self.failureException(reason)
1854
except self.failureException:
1855
exc_info = sys.exc_info()
1856
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1857
raise testtools.testcase._ExpectedFailure(exc_info)
1861
def _suppress_log(self):
1862
"""Remove the log info from details."""
1863
self.discardDetail('log')
1586
1865
def _do_skip(self, result, reason):
1866
self._suppress_log()
1587
1867
addSkip = getattr(result, 'addSkip', None)
1588
1868
if not callable(addSkip):
1589
1869
result.addSuccess(result)
1645
1948
self._benchtime += time.time() - start
1647
1950
def log(self, *args):
1650
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1656
:param keep_log_file: When True, if the log is still a file on disk
1657
leave it as a file on disk. When False, if the log is still a file
1658
on disk, the log file is deleted and the log preserved as
1660
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1668
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1727
return "No log file content and no log file name."
1729
1953
def get_log(self):
1730
1954
"""Get a unicode string containing the log from bzrlib.trace.
1945
2170
variables. A value of None will unset the env variable.
1946
2171
The values must be strings. The change will only occur in the
1947
2172
child, so you don't need to fix the environment after running.
1948
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
2173
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2174
doesn't support signalling subprocesses.
1950
2175
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2176
:param stderr: file to use for the subprocess's stderr. Valid values
2177
are those valid for the stderr argument of `subprocess.Popen`.
2178
Default value is ``subprocess.PIPE``.
1952
2180
:returns: Popen object for the started process.
1954
2182
if skip_if_plan_to_signal:
1955
if not getattr(os, 'kill', None):
1956
raise TestSkipped("os.kill not available.")
2183
if os.name != "posix":
2184
raise TestSkipped("Sending signals not supported")
1958
2186
if env_changes is None:
1959
2187
env_changes = {}
2188
# Because $HOME is set to a tempdir for the context of a test, modules
2189
# installed in the user dir will not be found unless $PYTHONUSERBASE
2190
# gets set to the computed directory of this parent process.
2191
if site.USER_BASE is not None:
2192
env_changes["PYTHONUSERBASE"] = site.USER_BASE
1962
2195
def cleanup_environment():
2235
def _add_subprocess_log(self, log_file_path):
2236
if len(self._log_files) == 0:
2237
# Register an addCleanup func. We do this on the first call to
2238
# _add_subprocess_log rather than in TestCase.setUp so that this
2239
# addCleanup is registered after any cleanups for tempdirs that
2240
# subclasses might create, which will probably remove the log file
2242
self.addCleanup(self._subprocess_log_cleanup)
2243
# self._log_files is a set, so if a log file is reused we won't grab it
2245
self._log_files.add(log_file_path)
2247
def _subprocess_log_cleanup(self):
2248
for count, log_file_path in enumerate(self._log_files):
2249
# We use buffer_now=True to avoid holding the file open beyond
2250
# the life of this function, which might interfere with e.g.
2251
# cleaning tempdirs on Windows.
2252
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2253
#detail_content = content.content_from_file(
2254
# log_file_path, buffer_now=True)
2255
with open(log_file_path, 'rb') as log_file:
2256
log_file_bytes = log_file.read()
2257
detail_content = content.Content(content.ContentType("text",
2258
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2259
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1997
2262
def _popen(self, *args, **kwargs):
1998
2263
"""Place a call to Popen.
2000
2265
Allows tests to override this method to intercept the calls made to
2001
2266
Popen for introspection.
2003
return Popen(*args, **kwargs)
2268
return subprocess.Popen(*args, **kwargs)
2005
2270
def get_source_path(self):
2006
2271
"""Return the path of the directory containing bzrlib."""
2036
2301
if retcode is not None and retcode != process.returncode:
2037
2302
if process_args is None:
2038
2303
process_args = "(unknown args)"
2039
mutter('Output of bzr %s:\n%s', process_args, out)
2040
mutter('Error for bzr %s:\n%s', process_args, err)
2304
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2305
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2041
2306
self.fail('Command bzr %s failed with retcode %s != %s'
2042
2307
% (process_args, retcode, process.returncode))
2043
2308
return [out, err]
2045
def check_inventory_shape(self, inv, shape):
2046
"""Compare an inventory to a list of expected names.
2310
def check_tree_shape(self, tree, shape):
2311
"""Compare a tree to a list of expected names.
2048
2313
Fail if they are not precisely equal.
2051
2316
shape = list(shape) # copy
2052
for path, ie in inv.entries():
2317
for path, ie in tree.iter_entries_by_dir():
2053
2318
name = path.replace('\\', '/')
2054
2319
if ie.kind == 'directory':
2055
2320
name = name + '/'
2322
pass # ignore root entry
2057
2324
shape.remove(name)
2059
2326
extras.append(name)
2149
2418
class TestCaseWithMemoryTransport(TestCase):
2150
2419
"""Common test class for tests that do not need disk resources.
2152
Tests that need disk resources should derive from TestCaseWithTransport.
2421
Tests that need disk resources should derive from TestCaseInTempDir
2422
orTestCaseWithTransport.
2154
2424
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2156
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2426
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2157
2427
a directory which does not exist. This serves to help ensure test isolation
2158
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2159
must exist. However, TestCaseWithMemoryTransport does not offer local
2160
file defaults for the transport in tests, nor does it obey the command line
2428
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2429
must exist. However, TestCaseWithMemoryTransport does not offer local file
2430
defaults for the transport in tests, nor does it obey the command line
2161
2431
override, so tests that accidentally write to the common directory should
2164
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2165
a .bzr directory that stops us ascending higher into the filesystem.
2434
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2435
``.bzr`` directory that stops us ascending higher into the filesystem.
2168
2438
TEST_ROOT = None
2378
2659
def make_branch(self, relpath, format=None):
2379
2660
"""Create a branch on the transport at relpath."""
2380
2661
repo = self.make_repository(relpath, format=format)
2381
return repo.bzrdir.create_branch()
2662
return repo.bzrdir.create_branch(append_revisions_only=False)
2664
def get_default_format(self):
2667
def resolve_format(self, format):
2668
"""Resolve an object to a ControlDir format object.
2670
The initial format object can either already be
2671
a ControlDirFormat, None (for the default format),
2672
or a string with the name of the control dir format.
2674
:param format: Object to resolve
2675
:return A ControlDirFormat instance
2678
format = self.get_default_format()
2679
if isinstance(format, basestring):
2680
format = bzrdir.format_registry.make_bzrdir(format)
2383
2683
def make_bzrdir(self, relpath, format=None):
2385
2685
# might be a relative or absolute path
2386
2686
maybe_a_url = self.get_url(relpath)
2387
2687
segments = maybe_a_url.rsplit('/', 1)
2388
t = get_transport(maybe_a_url)
2688
t = _mod_transport.get_transport(maybe_a_url)
2389
2689
if len(segments) > 1 and segments[-1] not in ('', '.'):
2390
2690
t.ensure_base()
2393
if isinstance(format, basestring):
2394
format = bzrdir.format_registry.make_bzrdir(format)
2691
format = self.resolve_format(format)
2395
2692
return format.initialize_on_transport(t)
2396
2693
except errors.UninitializableFormat:
2397
2694
raise TestSkipped("Format %s is not initializable." % format)
2399
def make_repository(self, relpath, shared=False, format=None):
2696
def make_repository(self, relpath, shared=None, format=None):
2400
2697
"""Create a repository on our default transport at relpath.
2402
2699
Note that relpath must be a relative path, not a full url.
3101
3414
"""A decorator which excludes test matching an exclude pattern."""
3103
3416
def __init__(self, suite, exclude_pattern):
3104
TestDecorator.__init__(self, suite)
3105
self.exclude_pattern = exclude_pattern
3106
self.excluded = False
3110
return iter(self._tests)
3111
self.excluded = True
3112
suite = exclude_tests_by_re(self, self.exclude_pattern)
3114
self.addTests(suite)
3115
return iter(self._tests)
3417
super(ExcludeDecorator, self).__init__(
3418
exclude_tests_by_re(suite, exclude_pattern))
3118
3421
class FilterTestsDecorator(TestDecorator):
3119
3422
"""A decorator which filters tests to those matching a pattern."""
3121
3424
def __init__(self, suite, pattern):
3122
TestDecorator.__init__(self, suite)
3123
self.pattern = pattern
3124
self.filtered = False
3128
return iter(self._tests)
3129
self.filtered = True
3130
suite = filter_suite_by_re(self, self.pattern)
3132
self.addTests(suite)
3133
return iter(self._tests)
3425
super(FilterTestsDecorator, self).__init__(
3426
filter_suite_by_re(suite, pattern))
3136
3429
class RandomDecorator(TestDecorator):
3137
3430
"""A decorator which randomises the order of its tests."""
3139
3432
def __init__(self, suite, random_seed, stream):
3140
TestDecorator.__init__(self, suite)
3141
self.random_seed = random_seed
3142
self.randomised = False
3143
self.stream = stream
3147
return iter(self._tests)
3148
self.randomised = True
3149
self.stream.write("Randomizing test order using seed %s\n\n" %
3150
(self.actual_seed()))
3433
random_seed = self.actual_seed(random_seed)
3434
stream.write("Randomizing test order using seed %s\n\n" %
3151
3436
# Initialise the random number generator.
3152
random.seed(self.actual_seed())
3153
suite = randomize_suite(self)
3155
self.addTests(suite)
3156
return iter(self._tests)
3437
random.seed(random_seed)
3438
super(RandomDecorator, self).__init__(randomize_suite(suite))
3158
def actual_seed(self):
3159
if self.random_seed == "now":
3441
def actual_seed(seed):
3160
3443
# We convert the seed to a long to make it reuseable across
3161
3444
# invocations (because the user can reenter it).
3162
self.random_seed = long(time.time())
3445
return long(time.time())
3164
3447
# Convert the seed to a long if we can
3166
self.random_seed = long(self.random_seed)
3450
except (TypeError, ValueError):
3169
return self.random_seed
3172
3455
class TestFirstDecorator(TestDecorator):
3173
3456
"""A decorator which moves named tests to the front."""
3175
3458
def __init__(self, suite, pattern):
3176
TestDecorator.__init__(self, suite)
3177
self.pattern = pattern
3178
self.filtered = False
3182
return iter(self._tests)
3183
self.filtered = True
3184
suites = split_suite_by_re(self, self.pattern)
3186
self.addTests(suites)
3187
return iter(self._tests)
3459
super(TestFirstDecorator, self).__init__()
3460
self.addTests(split_suite_by_re(suite, pattern))
3190
3463
def partition_tests(suite, count):
3191
3464
"""Partition suite into count lists of tests."""
3193
tests = list(iter_suite_tests(suite))
3194
tests_per_process = int(math.ceil(float(len(tests)) / count))
3195
for block in range(count):
3196
low_test = block * tests_per_process
3197
high_test = low_test + tests_per_process
3198
process_tests = tests[low_test:high_test]
3199
result.append(process_tests)
3465
# This just assigns tests in a round-robin fashion. On one hand this
3466
# splits up blocks of related tests that might run faster if they shared
3467
# resources, but on the other it avoids assigning blocks of slow tests to
3468
# just one partition. So the slowest partition shouldn't be much slower
3470
partitions = [list() for i in range(count)]
3471
tests = iter_suite_tests(suite)
3472
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3473
partition.append(test)
3203
3477
def workaround_zealous_crypto_random():
3234
3508
ProtocolTestCase.run(self, result)
3236
os.waitpid(self.pid, 0)
3510
pid, status = os.waitpid(self.pid, 0)
3511
# GZ 2011-10-18: If status is nonzero, should report to the result
3512
# that something went wrong.
3238
3514
test_blocks = partition_tests(suite, concurrency)
3515
# Clear the tests from the original suite so it doesn't keep them alive
3516
suite._tests[:] = []
3239
3517
for process_tests in test_blocks:
3240
process_suite = TestSuite()
3241
process_suite.addTests(process_tests)
3518
process_suite = TestUtil.TestSuite(process_tests)
3519
# Also clear each split list so new suite has only reference
3520
process_tests[:] = []
3242
3521
c2pread, c2pwrite = os.pipe()
3243
3522
pid = os.fork()
3245
workaround_zealous_crypto_random()
3525
stream = os.fdopen(c2pwrite, 'wb', 1)
3526
workaround_zealous_crypto_random()
3247
3527
os.close(c2pread)
3248
3528
# Leave stderr and stdout open so we can see test noise
3249
3529
# Close stdin so that the child goes away if it decides to
3250
3530
# read from stdin (otherwise its a roulette to see what
3251
3531
# child actually gets keystrokes for pdb etc).
3252
3532
sys.stdin.close()
3254
stream = os.fdopen(c2pwrite, 'wb', 1)
3255
3533
subunit_result = AutoTimingTestResultDecorator(
3256
TestProtocolClient(stream))
3534
SubUnitBzrProtocolClient(stream))
3257
3535
process_suite.run(subunit_result)
3537
# Try and report traceback on stream, but exit with error even
3538
# if stream couldn't be created or something else goes wrong.
3539
# The traceback is formatted to a string and written in one go
3540
# to avoid interleaving lines from multiple failing children.
3542
stream.write(traceback.format_exc())
3261
3547
os.close(c2pwrite)
3262
3548
stream = os.fdopen(c2pread, 'rb', 1)
3324
class ForwardingResult(unittest.TestResult):
3326
def __init__(self, target):
3327
unittest.TestResult.__init__(self)
3328
self.result = target
3330
def startTest(self, test):
3331
self.result.startTest(test)
3333
def stopTest(self, test):
3334
self.result.stopTest(test)
3336
def startTestRun(self):
3337
self.result.startTestRun()
3339
def stopTestRun(self):
3340
self.result.stopTestRun()
3342
def addSkip(self, test, reason):
3343
self.result.addSkip(test, reason)
3345
def addSuccess(self, test):
3346
self.result.addSuccess(test)
3348
def addError(self, test, err):
3349
self.result.addError(test, err)
3351
def addFailure(self, test, err):
3352
self.result.addFailure(test, err)
3353
ForwardingResult = testtools.ExtendedToOriginalDecorator
3356
class ProfileResult(ForwardingResult):
3612
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3357
3613
"""Generate profiling data for all activity between start and success.
3359
3615
The profile data is appended to the test's _benchcalls attribute and can
3685
3954
'bzrlib.tests.test_commit_merge',
3686
3955
'bzrlib.tests.test_config',
3687
3956
'bzrlib.tests.test_conflicts',
3957
'bzrlib.tests.test_controldir',
3688
3958
'bzrlib.tests.test_counted_lock',
3689
3959
'bzrlib.tests.test_crash',
3690
3960
'bzrlib.tests.test_decorators',
3691
3961
'bzrlib.tests.test_delta',
3692
3962
'bzrlib.tests.test_debug',
3693
'bzrlib.tests.test_deprecated_graph',
3694
3963
'bzrlib.tests.test_diff',
3695
3964
'bzrlib.tests.test_directory_service',
3696
3965
'bzrlib.tests.test_dirstate',
3697
3966
'bzrlib.tests.test_email_message',
3698
3967
'bzrlib.tests.test_eol_filters',
3699
3968
'bzrlib.tests.test_errors',
3969
'bzrlib.tests.test_estimate_compressed_size',
3700
3970
'bzrlib.tests.test_export',
3971
'bzrlib.tests.test_export_pot',
3701
3972
'bzrlib.tests.test_extract',
3973
'bzrlib.tests.test_features',
3702
3974
'bzrlib.tests.test_fetch',
3975
'bzrlib.tests.test_fixtures',
3703
3976
'bzrlib.tests.test_fifo_cache',
3704
3977
'bzrlib.tests.test_filters',
3978
'bzrlib.tests.test_filter_tree',
3705
3979
'bzrlib.tests.test_ftp_transport',
3706
3980
'bzrlib.tests.test_foreign',
3707
3981
'bzrlib.tests.test_generate_docs',
4100
4414
if test_id != None:
4101
4415
ui.ui_factory.clear_term()
4102
4416
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4417
# Ugly, but the last thing we want here is fail, so bear with it.
4418
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4419
).encode('ascii', 'replace')
4103
4420
sys.stderr.write('Unable to remove testing dir %s\n%s'
4104
% (os.path.basename(dirname), e))
4107
class Feature(object):
4108
"""An operating system Feature."""
4111
self._available = None
4113
def available(self):
4114
"""Is the feature available?
4116
:return: True if the feature is available.
4118
if self._available is None:
4119
self._available = self._probe()
4120
return self._available
4123
"""Implement this method in concrete features.
4125
:return: True if the feature is available.
4127
raise NotImplementedError
4130
if getattr(self, 'feature_name', None):
4131
return self.feature_name()
4132
return self.__class__.__name__
4135
class _SymlinkFeature(Feature):
4138
return osutils.has_symlinks()
4140
def feature_name(self):
4143
SymlinkFeature = _SymlinkFeature()
4146
class _HardlinkFeature(Feature):
4149
return osutils.has_hardlinks()
4151
def feature_name(self):
4154
HardlinkFeature = _HardlinkFeature()
4157
class _OsFifoFeature(Feature):
4160
return getattr(os, 'mkfifo', None)
4162
def feature_name(self):
4163
return 'filesystem fifos'
4165
OsFifoFeature = _OsFifoFeature()
4168
class _UnicodeFilenameFeature(Feature):
4169
"""Does the filesystem support Unicode filenames?"""
4173
# Check for character combinations unlikely to be covered by any
4174
# single non-unicode encoding. We use the characters
4175
# - greek small letter alpha (U+03B1) and
4176
# - braille pattern dots-123456 (U+283F).
4177
os.stat(u'\u03b1\u283f')
4178
except UnicodeEncodeError:
4180
except (IOError, OSError):
4181
# The filesystem allows the Unicode filename but the file doesn't
4185
# The filesystem allows the Unicode filename and the file exists,
4189
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4192
class _CompatabilityThunkFeature(Feature):
4193
"""This feature is just a thunk to another feature.
4195
It issues a deprecation warning if it is accessed, to let you know that you
4196
should really use a different feature.
4199
def __init__(self, dep_version, module, name,
4200
replacement_name, replacement_module=None):
4201
super(_CompatabilityThunkFeature, self).__init__()
4202
self._module = module
4203
if replacement_module is None:
4204
replacement_module = module
4205
self._replacement_module = replacement_module
4207
self._replacement_name = replacement_name
4208
self._dep_version = dep_version
4209
self._feature = None
4212
if self._feature is None:
4213
depr_msg = self._dep_version % ('%s.%s'
4214
% (self._module, self._name))
4215
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4216
self._replacement_name)
4217
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4218
# Import the new feature and use it as a replacement for the
4220
mod = __import__(self._replacement_module, {}, {},
4221
[self._replacement_name])
4222
self._feature = getattr(mod, self._replacement_name)
4226
return self._feature._probe()
4229
class ModuleAvailableFeature(Feature):
4230
"""This is a feature than describes a module we want to be available.
4232
Declare the name of the module in __init__(), and then after probing, the
4233
module will be available as 'self.module'.
4235
:ivar module: The module if it is available, else None.
4238
def __init__(self, module_name):
4239
super(ModuleAvailableFeature, self).__init__()
4240
self.module_name = module_name
4244
self._module = __import__(self.module_name, {}, {}, [''])
4251
if self.available(): # Make sure the probe has been done
4255
def feature_name(self):
4256
return self.module_name
4259
# This is kept here for compatibility, it is recommended to use
4260
# 'bzrlib.tests.feature.paramiko' instead
4261
ParamikoFeature = _CompatabilityThunkFeature(
4262
deprecated_in((2,1,0)),
4263
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4421
% (os.path.basename(dirname), printable_e))
4266
4424
def probe_unicode_in_user_encoding():
4299
class _HTTPSServerFeature(Feature):
4300
"""Some tests want an https Server, check if one is available.
4302
Right now, the only way this is available is under python2.6 which provides
4313
def feature_name(self):
4314
return 'HTTPSServer'
4317
HTTPSServerFeature = _HTTPSServerFeature()
4320
class _UnicodeFilename(Feature):
4321
"""Does the filesystem support Unicode filenames?"""
4326
except UnicodeEncodeError:
4328
except (IOError, OSError):
4329
# The filesystem allows the Unicode filename but the file doesn't
4333
# The filesystem allows the Unicode filename and the file exists,
4337
UnicodeFilename = _UnicodeFilename()
4340
class _UTF8Filesystem(Feature):
4341
"""Is the filesystem UTF-8?"""
4344
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4348
UTF8Filesystem = _UTF8Filesystem()
4351
class _BreakinFeature(Feature):
4352
"""Does this platform support the breakin feature?"""
4355
from bzrlib import breakin
4356
if breakin.determine_signal() is None:
4358
if sys.platform == 'win32':
4359
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4360
# We trigger SIGBREAK via a Console api so we need ctypes to
4361
# access the function
4368
def feature_name(self):
4369
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4372
BreakinFeature = _BreakinFeature()
4375
class _CaseInsCasePresFilenameFeature(Feature):
4376
"""Is the file-system case insensitive, but case-preserving?"""
4379
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4381
# first check truly case-preserving for created files, then check
4382
# case insensitive when opening existing files.
4383
name = osutils.normpath(name)
4384
base, rel = osutils.split(name)
4385
found_rel = osutils.canonical_relpath(base, name)
4386
return (found_rel == rel
4387
and os.path.isfile(name.upper())
4388
and os.path.isfile(name.lower()))
4393
def feature_name(self):
4394
return "case-insensitive case-preserving filesystem"
4396
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4399
class _CaseInsensitiveFilesystemFeature(Feature):
4400
"""Check if underlying filesystem is case-insensitive but *not* case
4403
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4404
# more likely to be case preserving, so this case is rare.
4407
if CaseInsCasePresFilenameFeature.available():
4410
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4411
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4412
TestCaseWithMemoryTransport.TEST_ROOT = root
4414
root = TestCaseWithMemoryTransport.TEST_ROOT
4415
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4417
name_a = osutils.pathjoin(tdir, 'a')
4418
name_A = osutils.pathjoin(tdir, 'A')
4420
result = osutils.isdir(name_A)
4421
_rmtree_temp_dir(tdir)
4424
def feature_name(self):
4425
return 'case-insensitive filesystem'
4427
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4430
class _CaseSensitiveFilesystemFeature(Feature):
4433
if CaseInsCasePresFilenameFeature.available():
4435
elif CaseInsensitiveFilesystemFeature.available():
4440
def feature_name(self):
4441
return 'case-sensitive filesystem'
4443
# new coding style is for feature instances to be lowercase
4444
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4447
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4448
SubUnitFeature = _CompatabilityThunkFeature(
4449
deprecated_in((2,1,0)),
4450
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4451
4457
# Only define SubUnitBzrRunner if subunit is available.
4453
4459
from subunit import TestProtocolClient
4454
4460
from subunit.test_results import AutoTimingTestResultDecorator
4461
class SubUnitBzrProtocolClient(TestProtocolClient):
4463
def stopTest(self, test):
4464
super(SubUnitBzrProtocolClient, self).stopTest(test)
4465
_clear__type_equality_funcs(test)
4467
def addSuccess(self, test, details=None):
4468
# The subunit client always includes the details in the subunit
4469
# stream, but we don't want to include it in ours.
4470
if details is not None and 'log' in details:
4472
return super(SubUnitBzrProtocolClient, self).addSuccess(
4455
4475
class SubUnitBzrRunner(TextTestRunner):
4456
4476
def run(self, test):
4457
4477
result = AutoTimingTestResultDecorator(
4458
TestProtocolClient(self.stream))
4478
SubUnitBzrProtocolClient(self.stream))
4459
4479
test.run(result)
4461
4481
except ImportError:
4464
class _PosixPermissionsFeature(Feature):
4468
# create temporary file and check if specified perms are maintained.
4471
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4472
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4475
os.chmod(name, write_perms)
4477
read_perms = os.stat(name).st_mode & 0777
4479
return (write_perms == read_perms)
4481
return (os.name == 'posix') and has_perms()
4483
def feature_name(self):
4484
return 'POSIX permissions support'
4486
posix_permissions_feature = _PosixPermissionsFeature()
4485
# API compatibility for old plugins; see bug 892622.
4488
'HTTPServerFeature',
4489
'ModuleAvailableFeature',
4490
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4491
'OsFifoFeature', 'UnicodeFilenameFeature',
4492
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4493
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4494
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4495
'posix_permissions_feature',
4497
globals()[name] = _CompatabilityThunkFeature(
4498
symbol_versioning.deprecated_in((2, 5, 0)),
4499
'bzrlib.tests', name,
4500
name, 'bzrlib.tests.features')
4503
for (old_name, new_name) in [
4504
('UnicodeFilename', 'UnicodeFilenameFeature'),
4506
globals()[name] = _CompatabilityThunkFeature(
4507
symbol_versioning.deprecated_in((2, 5, 0)),
4508
'bzrlib.tests', old_name,
4509
new_name, 'bzrlib.tests.features')