60
60
_testtools_version = getattr(testtools, '__version__', ())
61
61
if _testtools_version < (0, 9, 5):
62
62
raise ImportError("need at least testtools 0.9.5: %s is %r"
63
% (testtools.__file__, _testtools_version))
63
% (testtools.__file__, _testtools_version))
64
64
from testtools import content
150
148
'XDG_CONFIG_HOME': None,
151
149
# brz now uses the Win32 API and doesn't rely on APPDATA, but the
152
150
# tests do check our impls match APPDATA
153
'BRZ_EDITOR': None, # test_msgeditor manipulates this variable
151
'BRZ_EDITOR': None, # test_msgeditor manipulates this variable
156
154
'BRZ_EMAIL': None,
157
'BZREMAIL': None, # may still be present in the environment
158
'EMAIL': 'jrandom@example.com', # set EMAIL as brz does not guess
155
'BZREMAIL': None, # may still be present in the environment
156
'EMAIL': 'jrandom@example.com', # set EMAIL as brz does not guess
159
157
'BRZ_PROGRESS_BAR': None,
160
158
# This should trap leaks to ~/.brz.log. This occurs when tests use TestCase
161
159
# as a base class instead of TestCaseInTempDir. Tests inheriting from
311
309
self._show_list('ERROR', self.errors)
312
310
self._show_list('FAIL', self.failures)
313
311
self.stream.write(self.sep2)
314
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
315
run, run != 1 and "s" or "", timeTaken))
312
self.stream.write("%s %d test%s in %.3fs\n\n" % (
313
actionTaken, run, run != 1 and "s" or "", timeTaken))
316
314
if not self.wasSuccessful():
317
315
self.stream.write("FAILED (")
318
316
failed, errored = map(len, (self.failures, self.errors))
320
318
self.stream.write("failures=%d" % failed)
322
if failed: self.stream.write(", ")
321
self.stream.write(", ")
323
322
self.stream.write("errors=%d" % errored)
324
323
if self.known_failure_count:
325
if failed or errored: self.stream.write(", ")
324
if failed or errored:
325
self.stream.write(", ")
326
326
self.stream.write("known_failure_count=%d" %
327
self.known_failure_count)
327
self.known_failure_count)
328
328
self.stream.write(")\n")
330
330
if self.known_failure_count:
331
331
self.stream.write("OK (known_failures=%d)\n" %
332
self.known_failure_count)
332
self.known_failure_count)
334
334
self.stream.write("OK\n")
335
335
if self.skip_count > 0:
336
336
skipped = self.skip_count
337
337
self.stream.write('%d test%s skipped\n' %
338
(skipped, skipped != 1 and "s" or ""))
338
(skipped, skipped != 1 and "s" or ""))
339
339
if self.unsupported:
340
340
for feature, count in sorted(self.unsupported.items()):
341
341
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
344
344
ok = self.wasStrictlySuccessful()
347
347
if self._first_thread_leaker_id:
348
348
self.stream.write(
349
349
'%s is leaking threads among %d leaking tests.\n' % (
350
self._first_thread_leaker_id,
351
self._tests_leaking_threads_count))
350
self._first_thread_leaker_id,
351
self._tests_leaking_threads_count))
352
352
# We don't report the main thread as an active one.
353
353
self.stream.write(
354
354
'%d non-main threads were left active in the end.\n'
369
369
# to decide whether to round/floor/ceiling. This was added when we
370
370
# had pyp3 test failures that suggest a floor was happening.
371
371
shift = 10 ** precision
372
return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
373
a_timedelta.microseconds / 1000000.0) * shift) / shift
373
(a_timedelta.days * 86400.0 + a_timedelta.seconds +
374
a_timedelta.microseconds / 1000000.0) * shift) / shift
375
376
def _elapsedTestTimeString(self):
376
"""Return a time string for the overall time the current test has taken."""
377
"""Return time string for overall time the current test has taken."""
377
378
return self._formatTime(self._delta_to_float(
378
379
self._now() - self._start_datetime, 3))
506
507
super(ExtendedTestResult, self).addFailure(test, details=details)
507
508
self.failure_count += 1
508
509
self.report_unexpected_success(test,
509
"".join(details["reason"].iter_text()))
510
"".join(details["reason"].iter_text()))
510
511
if self.stop_early:
561
562
'brz selftest: %s\n' % (bzr_path,))
562
563
self.stream.write(
564
breezy.__path__[0],))
565
breezy.__path__[0],))
565
566
self.stream.write(
566
567
' bzr-%s python-%s %s\n' % (
567
breezy.version_string,
568
breezy._format_version_tuple(sys.version_info),
569
platform.platform(aliased=1),
568
breezy.version_string,
569
breezy._format_version_tuple(sys.version_info),
570
platform.platform(aliased=1),
571
572
self.stream.write('\n')
573
574
def report_test_start(self, test):
581
582
# testtools details object would be fitting.
582
583
if 'threads' in selftest_debug_flags:
583
584
self.stream.write('%s is leaking, active is now %d\n' %
584
(test.id(), len(active_threads)))
585
(test.id(), len(active_threads)))
586
587
def startTestRun(self):
587
588
self.startTime = time.time()
605
606
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
606
bench_history, strict)
607
bench_history, strict)
607
608
self.pb = self.ui.nested_progress_bar()
608
609
self.pb.show_pct = False
609
610
self.pb.show_spinner = False
629
630
a = '[%d' % self.count # total that have been run
630
631
# tests skipped as known not to be relevant are not important enough
632
## if self.skip_count:
633
## a += ', %d skip' % self.skip_count
634
## if self.known_failure_count:
635
## a += '+%dX' % self.known_failure_count
633
# if self.skip_count:
634
# a += ', %d skip' % self.skip_count
635
# if self.known_failure_count:
636
# a += '+%dX' % self.known_failure_count
636
637
if self.num_tests:
637
a +='/%d' % self.num_tests
638
a += '/%d' % self.num_tests
639
640
runtime = time.time() - self._overall_start_time
640
641
if runtime >= 60:
652
653
def report_test_start(self, test):
654
self._progress_prefix_text()
656
+ self._shortened_test_description(test))
655
self._progress_prefix_text() +
657
self._shortened_test_description(test))
658
659
def _test_description(self, test):
659
660
return self._shortened_test_description(test)
696
697
def _ellipsize_to_right(self, a_string, final_width):
697
698
"""Truncate and pad a string, keeping the right hand side"""
698
699
if len(a_string) > final_width:
699
result = '...' + a_string[3-final_width:]
700
result = '...' + a_string[3 - final_width:]
701
702
result = a_string
702
703
return result.ljust(final_width)
713
714
# 11-char time string, plus a trailing blank
714
715
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
716
self.stream.write(self._ellipsize_to_right(name, width-18))
717
self.stream.write(self._ellipsize_to_right(name, width - 18))
718
719
self.stream.write(name)
719
720
self.stream.flush()
725
726
def report_error(self, test, err):
726
727
self.stream.write('ERROR %s\n%s\n'
727
% (self._testTimeString(test),
728
self._error_summary(err)))
728
% (self._testTimeString(test),
729
self._error_summary(err)))
730
731
def report_failure(self, test, err):
731
732
self.stream.write(' FAIL %s\n%s\n'
732
% (self._testTimeString(test),
733
self._error_summary(err)))
733
% (self._testTimeString(test),
734
self._error_summary(err)))
735
736
def report_known_failure(self, test, err):
736
737
self.stream.write('XFAIL %s\n%s\n'
737
% (self._testTimeString(test),
738
self._error_summary(err)))
738
% (self._testTimeString(test),
739
self._error_summary(err)))
740
741
def report_unexpected_success(self, test, reason):
741
742
self.stream.write(' FAIL %s\n%s: %s\n'
742
% (self._testTimeString(test),
743
"Unexpected success. Should have failed",
743
% (self._testTimeString(test),
744
"Unexpected success. Should have failed",
746
747
def report_success(self, test):
747
748
self.stream.write(' OK %s\n' % self._testTimeString(test))
755
756
def report_skip(self, test, reason):
756
757
self.stream.write(' SKIP %s\n%s\n'
757
% (self._testTimeString(test), reason))
758
% (self._testTimeString(test), reason))
759
760
def report_not_applicable(self, test, reason):
760
761
self.stream.write(' N/A %s\n %s\n'
761
% (self._testTimeString(test), reason))
762
% (self._testTimeString(test), reason))
763
764
def report_unsupported(self, test, feature):
764
765
"""test cannot be run because feature is missing."""
765
766
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
766
%(self._testTimeString(test), feature))
767
% (self._testTimeString(test), feature))
769
770
class TextTestRunner(object):
782
783
:param result_decorators: An optional list of decorators to apply
783
784
to the result object being used by the runner. Decorators are
784
applied left to right - the first element in the list is the
785
applied left to right - the first element in the list is the
785
786
innermost decorator.
787
788
# stream may know claim to know to write unicode strings, but in older
794
795
# GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
795
796
# so should swap to the plain codecs.StreamWriter
796
797
stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
798
799
stream.encoding = new_encoding
799
800
self.stream = stream
800
801
self.descriptions = descriptions
810
811
elif self.verbosity >= 2:
811
812
result_class = VerboseTestResult
812
813
original_result = result_class(self.stream,
815
bench_history=self._bench_history,
816
bench_history=self._bench_history,
818
819
# Signal to result objects that look at stop early policy to stop,
819
820
original_result.stop_early = self.stop_on_failure
820
821
result = original_result
862
863
def _clever_some_str(value):
864
865
return str(value)
866
except BaseException:
867
868
return repr(value).replace('\\n', '\n')
869
except BaseException:
869
870
return '<unprintable %s object>' % type(value).__name__
871
873
traceback._some_str = _clever_some_str
946
948
def __init__(self, methodName='testMethod'):
947
949
super(TestCase, self).__init__(methodName)
948
950
self._directory_isolation = True
949
self.exception_handlers.insert(0,
950
(UnavailableFeature, self._do_unsupported_or_skip))
951
self.exception_handlers.insert(0,
952
(TestNotApplicable, self._do_not_applicable))
951
self.exception_handlers.insert(
952
0, (UnavailableFeature, self._do_unsupported_or_skip))
953
self.exception_handlers.insert(
954
0, (TestNotApplicable, self._do_not_applicable))
955
957
super(TestCase, self).setUp()
1026
1028
:param counter_name: The counter identifier in ``_counters``, defaults
1029
_counters = self._counters # Avoid closing over self
1031
_counters = self._counters # Avoid closing over self
1030
1032
if counter_name is None:
1031
1033
counter_name = name
1032
1034
if counter_name in _counters:
1033
1035
raise AssertionError('%s is already used as a counter name'
1035
1037
_counters[counter_name] = 0
1036
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1038
self.addDetail(counter_name, content.Content(
1037
1040
lambda: [b'%d' % (_counters[counter_name],)]))
1038
1042
def increment_counter(*args, **kwargs):
1039
1043
_counters[counter_name] += 1
1040
1044
label = 'count %s calls' % (counter_name,)
1048
1052
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1049
1053
self.install_counter_hook(config.ConfigHooks, hook_name,
1050
'config.%s' % (hook_name,))
1054
'config.%s' % (hook_name,))
1052
1056
# The OldConfigHooks are private and need special handling to protect
1053
1057
# against recursive tests (tests that run other tests), so we just do
1216
1220
# This prevents all transports, including e.g. sftp ones backed on disk
1217
1221
# from working unless they are explicitly granted permission. We then
1218
# depend on the code that sets up test transports to check that they are
1219
# appropriately isolated and enable their use by calling
1222
# depend on the code that sets up test transports to check that they
1223
# are appropriately isolated and enable their use by calling
1220
1224
# self.permit_transport()
1221
1225
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1222
1226
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1223
% (url, self._bzr_selftest_roots))
1227
% (url, self._bzr_selftest_roots))
1225
1229
def record_directory_isolation(self):
1226
1230
"""Gather accessed directories to permit later access.
1279
1283
# hook into brz dir opening. This leaves a small window of error for
1280
1284
# transport tests, but they are well known, and we can improve on this
1282
controldir.ControlDir.hooks.install_named_hook("pre_open",
1283
self._preopen_isolate_transport, "Check brz directories are safe.")
1286
controldir.ControlDir.hooks.install_named_hook(
1287
"pre_open", self._preopen_isolate_transport,
1288
"Check brz directories are safe.")
1285
1290
def _ndiff_strings(self, a, b):
1286
1291
"""Return ndiff between two strings containing lines.
1309
1314
message += '\n'
1310
1315
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1312
pprint.pformat(a), pprint.pformat(b)))
1317
pprint.pformat(a), pprint.pformat(b)))
1314
1319
# FIXME: This is deprecated in unittest2 but plugins may still use it so we
1315
1320
# need a deprecation period for them -- vila 2016-02-01
1330
1335
message = 'first string is missing a final newline.\n'
1331
1336
if a == b + ('\n' if isinstance(b, text_type) else b'\n'):
1332
1337
message = 'second string is missing a final newline.\n'
1333
raise AssertionError(message +
1334
self._ndiff_strings(a, b))
1338
raise AssertionError(message
1339
+ self._ndiff_strings(a, b))
1336
1341
def assertEqualMode(self, mode, mode_test):
1337
1342
self.assertEqual(mode, mode_test,
1402
1407
def assertStartsWith(self, s, prefix):
1403
1408
if not s.startswith(prefix):
1404
raise AssertionError('string %r does not start with %r' % (s, prefix))
1409
raise AssertionError(
1410
'string %r does not start with %r' % (s, prefix))
1406
1412
def assertEndsWith(self, s, suffix):
1407
1413
"""Asserts that s ends with suffix."""
1408
1414
if not s.endswith(suffix):
1409
raise AssertionError('string %r does not end with %r' % (s, suffix))
1415
raise AssertionError(
1416
'string %r does not end with %r' % (s, suffix))
1411
1418
def assertContainsRe(self, haystack, needle_re, flags=0):
1412
1419
"""Assert that a contains something matching a regular expression."""
1413
1420
if not re.search(needle_re, haystack, flags):
1414
if ('\n' if isinstance(haystack, str) else b'\n') in haystack or len(haystack) > 60:
1421
if (('\n' if isinstance(haystack, str) else b'\n') in haystack or
1422
len(haystack) > 60):
1415
1423
# a long string, format it in a more readable way
1416
1424
raise AssertionError(
1417
'pattern "%s" not found in\n"""\\\n%s"""\n'
1418
% (needle_re, haystack))
1425
'pattern "%s" not found in\n"""\\\n%s"""\n'
1426
% (needle_re, haystack))
1420
1428
raise AssertionError('pattern "%s" not found in "%s"'
1421
% (needle_re, haystack))
1429
% (needle_re, haystack))
1423
1431
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1424
1432
"""Assert that a does not match a regular expression"""
1425
1433
if re.search(needle_re, haystack, flags):
1426
1434
raise AssertionError('pattern "%s" found in "%s"'
1427
% (needle_re, haystack))
1435
% (needle_re, haystack))
1429
1437
def assertContainsString(self, haystack, needle):
1430
1438
if haystack.find(needle) == -1:
1517
1525
def assertIsInstance(self, obj, kls, msg=None):
1518
1526
"""Fail if obj is not an instance of kls
1520
1528
:param msg: Supplementary message to show if the assertion fails.
1522
1530
if not isinstance(obj, kls):
1529
1537
def assertFileEqual(self, content, path):
1530
1538
"""Fail if path does not contain 'content'."""
1531
1539
self.assertPathExists(path)
1533
with open(path, 'r' + ('b' if isinstance(content, bytes) else '')) as f:
1541
mode = 'r' + ('b' if isinstance(content, bytes) else '')
1542
with open(path, mode) as f:
1535
1544
self.assertEqualDiff(content, s)
1550
1559
self.assertPathExists(p)
1552
1561
self.assertTrue(osutils.lexists(path),
1553
path + " does not exist")
1562
path + " does not exist")
1555
1564
def assertPathDoesNotExist(self, path):
1556
1565
"""Fail if path or paths, which may be abs or relative, exist."""
1559
1568
self.assertPathDoesNotExist(p)
1561
1570
self.assertFalse(osutils.lexists(path),
1564
1573
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1565
1574
"""A helper for callDeprecated and applyDeprecated.
1571
1580
a_callable(``*args``, ``**kwargs``).
1573
1582
local_warnings = []
1574
1584
def capture_warnings(msg, cls=None, stacklevel=None):
1575
1585
# we've hooked into a deprecation specific callpath,
1576
1586
# only deprecations should getting sent via it.
1612
1622
:param kwargs: The keyword arguments for the callable
1613
1623
:return: The result of a_callable(``*args``, ``**kwargs``)
1615
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1625
call_warnings, result = self._capture_deprecation_warnings(
1626
a_callable, *args, **kwargs)
1617
1627
expected_first_warning = symbol_versioning.deprecation_string(
1618
1628
a_callable, deprecation_format)
1619
1629
if len(call_warnings) == 0:
1620
1630
self.fail("No deprecation warning generated by call to %s" %
1622
1632
self.assertEqual(expected_first_warning, call_warnings[0])
1638
1648
# warnings. It's the easiest way to insulate ourselves from -Werror,
1639
1649
# though. -- Andrew, 20071062
1641
1652
def _catcher(message, category, filename, lineno, file=None, line=None):
1642
1653
# despite the name, 'message' is normally(?) a Warning subclass
1670
1681
:param args: The positional arguments for the callable
1671
1682
:param kwargs: The keyword arguments for the callable
1673
call_warnings, result = self._capture_deprecation_warnings(callable,
1684
call_warnings, result = self._capture_deprecation_warnings(
1685
callable, *args, **kwargs)
1675
1686
self.assertEqual(expected, call_warnings)
1678
1689
def _startLogFile(self):
1679
1690
"""Setup a in-memory target for bzr and testcase log messages"""
1680
1691
pseudo_log_file = BytesIO()
1681
1693
def _get_log_contents_for_weird_testtools_api():
1682
1694
return [pseudo_log_file.getvalue().decode(
1683
1695
"utf-8", "replace").encode("utf-8")]
1684
self.addDetail("log", content.Content(content.ContentType("text",
1685
"plain", {"charset": "utf8"}),
1686
_get_log_contents_for_weird_testtools_api))
1697
"log", content.Content(
1698
content.ContentType("text", "plain", {"charset": "utf8"}),
1699
_get_log_contents_for_weird_testtools_api))
1687
1700
self._log_file = pseudo_log_file
1688
1701
self._log_memento = trace.push_log_file(self._log_file)
1689
1702
self.addCleanup(self._finishLogFile)
1744
1757
:param name: The environment variable name.
1746
:param new: The value to set the variable to. If None, the
1759
:param new: The value to set the variable to. If None, the
1747
1760
variable is deleted from the environment.
1749
1762
:returns: The actual variable value.
1760
1773
:param obj: The namespace holding the reference to be replaced;
1761
1774
typically a module, class, or object.
1762
:param attr_name: A string for the name of the attribute to
1775
:param attr_name: A string for the name of the attribute to patch.
1764
1776
:returns: A list that will be extended with one item every time the
1765
1777
function is called, with a tuple of (args, kwargs).
1838
1850
reason = 'No reason given'
1840
1852
reason = e.args[0]
1841
self._suppress_log ()
1853
self._suppress_log()
1842
1854
addNotApplicable = getattr(result, 'addNotApplicable', None)
1843
1855
if addNotApplicable is not None:
1844
1856
result.addNotApplicable(self, reason)
1883
1895
self._benchcalls.
1885
1897
if self._benchtime is None:
1886
self.addDetail('benchtime', content.Content(content.UTF8_TEXT,
1887
lambda:[str(self._benchtime).encode('utf-8')]))
1898
self.addDetail('benchtime', content.Content(
1900
lambda: [str(self._benchtime).encode('utf-8')]))
1888
1901
self._benchtime = 0
1889
1902
start = time.time()
1918
1931
raise UnavailableFeature(feature)
1920
1933
def _run_bzr_core(self, args, encoding, stdin, stdout, stderr,
1922
1935
# Clear chk_map page cache, because the contents are likely to mask
1923
1936
# locking errors.
1924
1937
chk_map.clear_cache()
1960
1973
def run_bzr_raw(self, args, retcode=0, stdin=None, encoding=None,
1961
working_dir=None, error_regexes=[]):
1974
working_dir=None, error_regexes=[]):
1962
1975
"""Invoke brz, as if it were run from the command line.
1964
1977
The argument list should not include the brz program name - the
2016
2029
logger.addHandler(handler)
2018
2031
result = self._run_bzr_core(
2019
args, encoding=encoding, stdin=stdin, stdout=wrapped_stdout,
2020
stderr=wrapped_stderr, working_dir=working_dir,
2032
args, encoding=encoding, stdin=stdin, stdout=wrapped_stdout,
2033
stderr=wrapped_stderr, working_dir=working_dir,
2023
2036
logger.removeHandler(handler)
2034
2047
self.log('errors:\n%r', err)
2035
2048
if retcode is not None:
2036
2049
self.assertEqual(retcode, result,
2037
message='Unexpected return code')
2050
message='Unexpected return code')
2038
2051
self.assertIsInstance(error_regexes, (list, tuple))
2039
2052
for regex in error_regexes:
2040
2053
self.assertContainsRe(err, regex)
2097
2110
logger.addHandler(handler)
2100
result = self._run_bzr_core(args,
2101
encoding=encoding, stdin=stdin, stdout=stdout,
2102
stderr=stderr, working_dir=working_dir,
2113
result = self._run_bzr_core(
2114
args, encoding=encoding, stdin=stdin, stdout=stdout,
2115
stderr=stderr, working_dir=working_dir)
2105
2117
logger.removeHandler(handler)
2112
2124
self.log('errors:\n%r', err)
2113
2125
if retcode is not None:
2114
2126
self.assertEqual(retcode, result,
2115
message='Unexpected return code')
2127
message='Unexpected return code')
2116
2128
self.assertIsInstance(error_regexes, (list, tuple))
2117
2129
for regex in error_regexes:
2118
2130
self.assertContainsRe(err, regex)
2140
2152
# Make sure --strict is handling an unknown file, rather than
2141
2153
# giving us the 'nothing to do' error
2142
2154
self.build_tree(['unknown'])
2143
self.run_bzr_error(['Commit refused because there are unknown files'],
2144
['commit', --strict', '-m', 'my commit comment'])
2156
['Commit refused because there are unknown files'],
2157
['commit', --strict', '-m', 'my commit comment'])
2146
2159
kwargs.setdefault('retcode', 3)
2147
2160
kwargs['error_regexes'] = error_regexes
2185
2198
# We distinguish between retcode=None and retcode not passed.
2186
2199
supplied_retcode = kwargs.get('retcode', 0)
2187
2200
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2188
universal_newlines=kwargs.get('universal_newlines', False),
2201
universal_newlines=kwargs.get(
2202
'universal_newlines', False),
2191
2205
def start_bzr_subprocess(self, process_args, env_changes=None,
2192
2206
skip_if_plan_to_signal=False,
2286
2300
# the life of this function, which might interfere with e.g.
2287
2301
# cleaning tempdirs on Windows.
2288
2302
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2289
#detail_content = content.content_from_file(
2303
# detail_content = content.content_from_file(
2290
2304
# log_file_path, buffer_now=True)
2291
2305
with open(log_file_path, 'rb') as log_file:
2292
2306
log_file_bytes = log_file.read()
2293
detail_content = content.Content(content.ContentType("text",
2294
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2307
detail_content = content.Content(
2308
content.ContentType("text", "plain", {"charset": "utf8"}),
2309
lambda: [log_file_bytes])
2295
2310
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2298
2313
def _popen(self, *args, **kwargs):
2299
2314
"""Place a call to Popen.
2429
2444
orig_info = request_handlers.get_info(verb)
2430
2445
request_handlers.remove(verb)
2431
2446
self.addCleanup(request_handlers.register, verb, orig_method,
2434
2449
def __hash__(self):
2435
2450
return id(self)
2505
2520
self.addCleanup(transport.disconnect)
2507
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2508
_add_disconnect_cleanup, None)
2522
_mod_transport.Transport.hooks.install_named_hook(
2523
'post_connect', _add_disconnect_cleanup, None)
2510
2525
self._make_test_root()
2511
2526
self.addCleanup(os.chdir, osutils.getcwd())
2562
2577
self.__readonly_server = test_server.ReadonlyServer()
2564
2579
# explicit readonly transport.
2565
self.__readonly_server = self.create_transport_readonly_server()
2580
self.__readonly_server = (
2581
self.create_transport_readonly_server())
2566
2582
self.start_server(self.__readonly_server,
2567
self.get_vfs_only_server())
2583
self.get_vfs_only_server())
2568
2584
return self.__readonly_server
2570
2586
def get_readonly_url(self, relpath=None):
2604
2620
if self.__server is None:
2605
2621
if (self.transport_server is None or self.transport_server is
2606
self.vfs_transport_factory):
2622
self.vfs_transport_factory):
2607
2623
self.__server = self.get_vfs_only_server()
2609
2625
# bring up a decorated means of access to the vfs only server.
2691
2707
root = TestCaseWithMemoryTransport.TEST_ROOT
2692
2708
t = _mod_transport.get_transport_from_path(root)
2693
2709
self.permit_url(t.base)
2694
if (t.get_bytes('.bzr/checkout/dirstate') !=
2710
if (t.get_bytes('.bzr/checkout/dirstate') !=
2695
2711
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2696
2712
# The current test have modified the /bzr directory, we need to
2697
2713
# recreate a new one or all the followng tests will fail.
2723
2739
This must set self.test_home_dir and self.test_dir and chdir to
2726
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2742
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this
2728
2745
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2729
2746
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2785
2802
smart_server = test_server.SmartTCPServer_for_testing()
2786
2803
self.start_server(smart_server, backing_server)
2787
2804
remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2789
2806
return remote_transport
2791
2808
def make_branch_and_memory_tree(self, relpath, format=None):
2814
2831
# Skip the current stack down to the caller of
2815
2832
# setup_smart_server_with_call_log
2816
2833
prefix_length = len(traceback.extract_stack()) - 2
2817
2835
def capture_hpss_call(params):
2818
2836
self.hpss_calls.append(
2819
2837
CapturedCall(params, prefix_length))
2820
2839
def capture_connect(transport):
2821
2840
self.hpss_connections.append(transport)
2822
2841
client._SmartClient.hooks.install_named_hook(
2883
2902
name and then create two subdirs - test and home under it.
2885
2904
name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2886
self._getTestDirPrefix())
2905
self._getTestDirPrefix())
2887
2906
name = name_prefix
2888
2907
for i in range(100):
2889
2908
if os.path.exists(name):
2935
2954
if type(shape) not in (list, tuple):
2936
2955
raise AssertionError("Parameter 'shape' should be "
2937
"a list or a tuple. Got %r instead" % (shape,))
2956
"a list or a tuple. Got %r instead" % (shape,))
2938
2957
# It's OK to just create them using forward slashes on windows.
2939
2958
if transport is None or transport.is_readonly():
2940
2959
transport = _mod_transport.get_transport_from_path(".")
2964
2983
self.assertInWorkingTree(p, tree=tree)
2966
2985
self.assertTrue(tree.is_versioned(path),
2967
path+' not in working tree.')
2986
path + ' not in working tree.')
2969
2988
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2970
2989
"""Assert whether path or paths are not in the WorkingTree"""
2975
2994
self.assertNotInWorkingTree(p, tree=tree)
2977
self.assertFalse(tree.is_versioned(path), path+' in working tree.')
2996
self.assertFalse(tree.is_versioned(
2997
path), path + ' in working tree.')
2980
3000
class TestCaseWithTransport(TestCaseInTempDir):
3035
3055
format = self.resolve_format(format=format)
3036
3056
if not format.supports_workingtrees:
3037
b = self.make_branch(relpath+'.branch', format=format)
3057
b = self.make_branch(relpath + '.branch', format=format)
3038
3058
return b.create_checkout(relpath, lightweight=True)
3039
3059
b = self.make_branch(relpath, format=format)
3081
3101
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
3082
3102
differences = left.changes_from(right)
3083
3103
self.assertFalse(differences.has_changed(),
3084
"Trees %r and %r are different: %r" % (left, right, differences))
3104
"Trees %r and %r are different: %r" % (left, right, differences))
3086
3106
def disable_missing_extensions_warning(self):
3087
3107
"""Some tests expect a precise stderr content.
3331
3352
if stream is None:
3332
3353
stream = sys.stdout
3333
3354
runner = runner_class(stream=stream,
3335
verbosity=verbosity,
3336
bench_history=bench_history,
3338
result_decorators=result_decorators,
3340
runner.stop_on_failure=stop_on_failure
3356
verbosity=verbosity,
3357
bench_history=bench_history,
3359
result_decorators=result_decorators,
3361
runner.stop_on_failure = stop_on_failure
3341
3362
if isinstance(suite, unittest.TestSuite):
3342
3363
# Empty out _tests list of passed suite and populate new TestSuite
3343
3364
suite._tests[:], suite = [], TestSuite(suite)
3384
3405
def fork_decorator(suite):
3385
3406
if getattr(os, "fork", None) is None:
3386
3407
raise errors.BzrCommandError("platform does not support fork,"
3387
" try --parallel=subprocess instead.")
3408
" try --parallel=subprocess instead.")
3388
3409
concurrency = osutils.local_concurrency()
3389
3410
if concurrency == 1:
3391
3412
from testtools import ConcurrentTestSuite
3392
3413
return ConcurrentTestSuite(suite, fork_for_tests)
3393
3416
parallel_registry.register('fork', fork_decorator)
3406
3431
"""Return a test suite decorator that excludes tests."""
3407
3432
if exclude_pattern is None:
3408
3433
return identity_decorator
3409
3435
def decorator(suite):
3410
3436
return ExcludeDecorator(suite, exclude_pattern)
3411
3437
return decorator
3492
3521
def __init__(self, suite, random_seed, stream):
3493
3522
random_seed = self.actual_seed(random_seed)
3494
3523
stream.write("Randomizing test order using seed %s\n\n" %
3496
3525
# Initialise the random number generator.
3497
3526
random.seed(random_seed)
3498
3527
super(RandomDecorator, self).__init__(randomize_suite(suite))
3558
3587
from subunit import ProtocolTestCase
3559
3588
from subunit.test_results import AutoTimingTestResultDecorator
3560
3590
class TestInOtherProcess(ProtocolTestCase):
3561
3591
# Should be in subunit, I think. RBC.
3562
3592
def __init__(self, stream, pid):
3630
3660
concurrency = osutils.local_concurrency()
3632
3662
from subunit import ProtocolTestCase
3633
3664
class TestInSubprocess(ProtocolTestCase):
3634
3665
def __init__(self, process, name):
3635
3666
ProtocolTestCase.__init__(self, process.stdout)
3647
3678
test_blocks = partition_tests(suite, concurrency)
3648
3679
for process_tests in test_blocks:
3649
3680
# ugly; currently reimplement rather than reuses TestCase methods.
3650
bzr_path = os.path.dirname(os.path.dirname(breezy.__file__))+'/bzr'
3681
bzr_path = os.path.dirname(os.path.dirname(breezy.__file__)) + '/bzr'
3651
3682
if not os.path.isfile(bzr_path):
3652
3683
# We are probably installed. Assume sys.argv is the right file
3653
3684
bzr_path = sys.argv[0]
3662
3693
test_list_file.close()
3664
3695
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3666
3697
if '--no-plugins' in sys.argv:
3667
3698
argv.append('--no-plugins')
3668
3699
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3669
3700
# noise on stderr it can interrupt the subunit protocol.
3670
3701
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3671
stdout=subprocess.PIPE,
3672
stderr=subprocess.PIPE,
3702
stdout=subprocess.PIPE,
3703
stderr=subprocess.PIPE,
3674
3705
test = TestInSubprocess(process, test_list_file_name)
3675
3706
result.append(test)
3682
3713
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3683
3714
"""Generate profiling data for all activity between start and success.
3685
3716
The profile data is appended to the test's _benchcalls attribute and can
3686
3717
be accessed by the forwarded-to TestResult.
3788
3819
if lsprof_tests:
3789
3820
result_decorators.append(ProfileResult)
3790
3821
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3791
stop_on_failure=stop_on_failure,
3792
transport=transport,
3793
lsprof_timed=lsprof_timed,
3794
bench_history=bench_history,
3795
matching_tests_first=matching_tests_first,
3796
list_only=list_only,
3797
random_seed=random_seed,
3798
exclude_pattern=exclude_pattern,
3800
runner_class=runner_class,
3801
suite_decorators=suite_decorators,
3803
result_decorators=result_decorators,
3822
stop_on_failure=stop_on_failure,
3823
transport=transport,
3824
lsprof_timed=lsprof_timed,
3825
bench_history=bench_history,
3826
matching_tests_first=matching_tests_first,
3827
list_only=list_only,
3828
random_seed=random_seed,
3829
exclude_pattern=exclude_pattern,
3831
runner_class=runner_class,
3832
suite_decorators=suite_decorators,
3834
result_decorators=result_decorators,
3806
3837
default_transport = old_transport
3807
3838
selftest_debug_flags = old_debug_flags
4246
4277
def interesting_module(name):
4247
4278
for start in starting_with:
4249
# Either the module name starts with the specified string
4250
name.startswith(start)
4251
# or it may contain tests starting with the specified string
4252
or start.startswith(name)
4280
# Either the module name starts with the specified string
4281
name.startswith(start)
4282
or # or it may contain tests starting with the specified string
4283
start.startswith(name)
4256
4287
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
4258
4289
elif keep_only is not None:
4259
4290
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
4260
4292
def interesting_module(name):
4261
4293
return id_filter.refers_to(name)
4264
4296
loader = TestUtil.TestLoader()
4265
4298
def interesting_module(name):
4266
4299
# No filtering, all modules are interesting
4595
4628
def __init__(self, stream=sys.stderr, descriptions=0, verbosity=1,
4596
4629
bench_history=None, strict=False, result_decorators=None):
4597
4630
TextTestRunner.__init__(
4598
self, stream=stream,
4599
descriptions=descriptions, verbosity=verbosity,
4600
bench_history=bench_history, strict=strict,
4601
result_decorators=result_decorators)
4631
self, stream=stream,
4632
descriptions=descriptions, verbosity=verbosity,
4633
bench_history=bench_history, strict=strict,
4634
result_decorators=result_decorators)
4602
4635
SubunitTestRunner.__init__(self, verbosity=verbosity,