55
58
_testtools_version = getattr(testtools, '__version__', ())
56
59
if _testtools_version < (0, 9, 5):
57
60
raise ImportError("need at least testtools 0.9.5: %s is %r"
58
% (testtools.__file__, _testtools_version))
61
% (testtools.__file__, _testtools_version))
59
62
from testtools import content
145
140
'XDG_CONFIG_HOME': None,
146
141
# brz now uses the Win32 API and doesn't rely on APPDATA, but the
147
142
# tests do check our impls match APPDATA
148
'BRZ_EDITOR': None, # test_msgeditor manipulates this variable
143
'BRZ_EDITOR': None, # test_msgeditor manipulates this variable
151
146
'BRZ_EMAIL': None,
152
'BZREMAIL': None, # may still be present in the environment
153
'EMAIL': 'jrandom@example.com', # set EMAIL as brz does not guess
147
'BZREMAIL': None, # may still be present in the environment
148
'EMAIL': 'jrandom@example.com', # set EMAIL as brz does not guess
154
149
'BRZ_PROGRESS_BAR': None,
155
# This should trap leaks to ~/.brz.log. This occurs when tests use TestCase
156
# as a base class instead of TestCaseInTempDir. Tests inheriting from
157
# TestCase should not use disk resources, BRZ_LOG is one.
150
# Trap leaks to $XDG_CACHE_HOME/breezy/brz.log. This occurs when tests use
151
# TestCase as a base class instead of TestCaseInTempDir. Tests inheriting
152
# from TestCase should not use disk resources, BRZ_LOG is one.
158
153
'BRZ_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
159
154
'BRZ_PLUGIN_PATH': '-site',
160
155
'BRZ_DISABLE_PLUGINS': None,
311
301
self._show_list('ERROR', self.errors)
312
302
self._show_list('FAIL', self.failures)
313
303
self.stream.write(self.sep2)
314
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
315
run, run != 1 and "s" or "", timeTaken))
304
self.stream.write("%s %d test%s in %.3fs\n\n" % (
305
actionTaken, run, run != 1 and "s" or "", timeTaken))
316
306
if not self.wasSuccessful():
317
307
self.stream.write("FAILED (")
318
308
failed, errored = map(len, (self.failures, self.errors))
320
310
self.stream.write("failures=%d" % failed)
322
if failed: self.stream.write(", ")
313
self.stream.write(", ")
323
314
self.stream.write("errors=%d" % errored)
324
315
if self.known_failure_count:
325
if failed or errored: self.stream.write(", ")
316
if failed or errored:
317
self.stream.write(", ")
326
318
self.stream.write("known_failure_count=%d" %
327
self.known_failure_count)
319
self.known_failure_count)
328
320
self.stream.write(")\n")
330
322
if self.known_failure_count:
331
323
self.stream.write("OK (known_failures=%d)\n" %
332
self.known_failure_count)
324
self.known_failure_count)
334
326
self.stream.write("OK\n")
335
327
if self.skip_count > 0:
336
328
skipped = self.skip_count
337
329
self.stream.write('%d test%s skipped\n' %
338
(skipped, skipped != 1 and "s" or ""))
330
(skipped, skipped != 1 and "s" or ""))
339
331
if self.unsupported:
340
332
for feature, count in sorted(self.unsupported.items()):
341
333
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
344
336
ok = self.wasStrictlySuccessful()
347
339
if self._first_thread_leaker_id:
348
340
self.stream.write(
349
341
'%s is leaking threads among %d leaking tests.\n' % (
350
self._first_thread_leaker_id,
351
self._tests_leaking_threads_count))
342
self._first_thread_leaker_id,
343
self._tests_leaking_threads_count))
352
344
# We don't report the main thread as an active one.
353
345
self.stream.write(
354
346
'%d non-main threads were left active in the end.\n'
369
361
# to decide whether to round/floor/ceiling. This was added when we
370
362
# had pyp3 test failures that suggest a floor was happening.
371
363
shift = 10 ** precision
372
return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
373
a_timedelta.microseconds / 1000000.0) * shift) / shift
365
(a_timedelta.days * 86400.0 + a_timedelta.seconds +
366
a_timedelta.microseconds / 1000000.0) * shift) / shift
375
368
def _elapsedTestTimeString(self):
376
"""Return a time string for the overall time the current test has taken."""
369
"""Return time string for overall time the current test has taken."""
377
370
return self._formatTime(self._delta_to_float(
378
371
self._now() - self._start_datetime, 3))
458
451
Called from the TestCase run() method when the test
459
452
fails with an unexpected error.
461
self._post_mortem(self._traceback_from_test)
454
self._post_mortem(self._traceback_from_test or err[2])
462
455
super(ExtendedTestResult, self).addError(test, err)
463
456
self.error_count += 1
464
457
self.report_error(test, err)
471
464
Called from the TestCase run() method when the test
472
465
fails because e.g. an assert() method failed.
474
self._post_mortem(self._traceback_from_test)
467
self._post_mortem(self._traceback_from_test or err[2])
475
468
super(ExtendedTestResult, self).addFailure(test, err)
476
469
self.failure_count += 1
477
470
self.report_failure(test, err)
561
554
'brz selftest: %s\n' % (bzr_path,))
562
555
self.stream.write(
564
breezy.__path__[0],))
557
breezy.__path__[0],))
565
558
self.stream.write(
566
559
' bzr-%s python-%s %s\n' % (
567
breezy.version_string,
568
breezy._format_version_tuple(sys.version_info),
569
platform.platform(aliased=1),
560
breezy.version_string,
561
breezy._format_version_tuple(sys.version_info),
562
platform.platform(aliased=1),
571
564
self.stream.write('\n')
573
566
def report_test_start(self, test):
581
574
# testtools details object would be fitting.
582
575
if 'threads' in selftest_debug_flags:
583
576
self.stream.write('%s is leaking, active is now %d\n' %
584
(test.id(), len(active_threads)))
577
(test.id(), len(active_threads)))
586
579
def startTestRun(self):
587
580
self.startTime = time.time()
629
622
a = '[%d' % self.count # total that have been run
630
623
# tests skipped as known not to be relevant are not important enough
632
## if self.skip_count:
633
## a += ', %d skip' % self.skip_count
634
## if self.known_failure_count:
635
## a += '+%dX' % self.known_failure_count
625
# if self.skip_count:
626
# a += ', %d skip' % self.skip_count
627
# if self.known_failure_count:
628
# a += '+%dX' % self.known_failure_count
636
629
if self.num_tests:
637
a +='/%d' % self.num_tests
630
a += '/%d' % self.num_tests
639
632
runtime = time.time() - self._overall_start_time
640
633
if runtime >= 60:
652
645
def report_test_start(self, test):
654
self._progress_prefix_text()
656
+ self._shortened_test_description(test))
647
self._progress_prefix_text() +
649
self._shortened_test_description(test))
658
651
def _test_description(self, test):
659
652
return self._shortened_test_description(test)
696
689
def _ellipsize_to_right(self, a_string, final_width):
697
690
"""Truncate and pad a string, keeping the right hand side"""
698
691
if len(a_string) > final_width:
699
result = '...' + a_string[3-final_width:]
692
result = '...' + a_string[3 - final_width:]
701
694
result = a_string
702
695
return result.ljust(final_width)
725
718
def report_error(self, test, err):
726
719
self.stream.write('ERROR %s\n%s\n'
727
% (self._testTimeString(test),
728
self._error_summary(err)))
720
% (self._testTimeString(test),
721
self._error_summary(err)))
730
723
def report_failure(self, test, err):
731
724
self.stream.write(' FAIL %s\n%s\n'
732
% (self._testTimeString(test),
733
self._error_summary(err)))
725
% (self._testTimeString(test),
726
self._error_summary(err)))
735
728
def report_known_failure(self, test, err):
736
729
self.stream.write('XFAIL %s\n%s\n'
737
% (self._testTimeString(test),
738
self._error_summary(err)))
730
% (self._testTimeString(test),
731
self._error_summary(err)))
740
733
def report_unexpected_success(self, test, reason):
741
734
self.stream.write(' FAIL %s\n%s: %s\n'
742
% (self._testTimeString(test),
743
"Unexpected success. Should have failed",
735
% (self._testTimeString(test),
736
"Unexpected success. Should have failed",
746
739
def report_success(self, test):
747
740
self.stream.write(' OK %s\n' % self._testTimeString(test))
755
748
def report_skip(self, test, reason):
756
749
self.stream.write(' SKIP %s\n%s\n'
757
% (self._testTimeString(test), reason))
750
% (self._testTimeString(test), reason))
759
752
def report_not_applicable(self, test, reason):
760
753
self.stream.write(' N/A %s\n %s\n'
761
% (self._testTimeString(test), reason))
754
% (self._testTimeString(test), reason))
763
756
def report_unsupported(self, test, feature):
764
757
"""test cannot be run because feature is missing."""
765
758
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
766
%(self._testTimeString(test), feature))
759
% (self._testTimeString(test), feature))
769
762
class TextTestRunner(object):
810
803
elif self.verbosity >= 2:
811
804
result_class = VerboseTestResult
812
805
original_result = result_class(self.stream,
815
bench_history=self._bench_history,
808
bench_history=self._bench_history,
818
811
# Signal to result objects that look at stop early policy to stop,
819
812
original_result.stop_early = self.stop_on_failure
820
813
result = original_result
946
940
def __init__(self, methodName='testMethod'):
947
941
super(TestCase, self).__init__(methodName)
948
942
self._directory_isolation = True
949
self.exception_handlers.insert(0,
950
(UnavailableFeature, self._do_unsupported_or_skip))
951
self.exception_handlers.insert(0,
952
(TestNotApplicable, self._do_not_applicable))
943
self.exception_handlers.insert(
944
0, (UnavailableFeature, self._do_unsupported_or_skip))
945
self.exception_handlers.insert(
946
0, (TestNotApplicable, self._do_not_applicable))
955
949
super(TestCase, self).setUp()
1026
1020
:param counter_name: The counter identifier in ``_counters``, defaults
1029
_counters = self._counters # Avoid closing over self
1023
_counters = self._counters # Avoid closing over self
1030
1024
if counter_name is None:
1031
1025
counter_name = name
1032
1026
if counter_name in _counters:
1033
1027
raise AssertionError('%s is already used as a counter name'
1035
1029
_counters[counter_name] = 0
1036
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1030
self.addDetail(counter_name, content.Content(
1037
1032
lambda: [b'%d' % (_counters[counter_name],)]))
1038
1034
def increment_counter(*args, **kwargs):
1039
1035
_counters[counter_name] += 1
1040
1036
label = 'count %s calls' % (counter_name,)
1048
1044
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1049
1045
self.install_counter_hook(config.ConfigHooks, hook_name,
1050
'config.%s' % (hook_name,))
1046
'config.%s' % (hook_name,))
1052
1048
# The OldConfigHooks are private and need special handling to protect
1053
1049
# against recursive tests (tests that run other tests), so we just do
1216
1212
# This prevents all transports, including e.g. sftp ones backed on disk
1217
1213
# from working unless they are explicitly granted permission. We then
1218
# depend on the code that sets up test transports to check that they are
1219
# appropriately isolated and enable their use by calling
1214
# depend on the code that sets up test transports to check that they
1215
# are appropriately isolated and enable their use by calling
1220
1216
# self.permit_transport()
1221
1217
if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1222
1218
raise errors.BzrError("Attempt to escape test isolation: %r %r"
1223
% (url, self._bzr_selftest_roots))
1219
% (url, self._bzr_selftest_roots))
1225
1221
def record_directory_isolation(self):
1226
1222
"""Gather accessed directories to permit later access.
1279
1275
# hook into brz dir opening. This leaves a small window of error for
1280
1276
# transport tests, but they are well known, and we can improve on this
1282
controldir.ControlDir.hooks.install_named_hook("pre_open",
1283
self._preopen_isolate_transport, "Check brz directories are safe.")
1278
controldir.ControlDir.hooks.install_named_hook(
1279
"pre_open", self._preopen_isolate_transport,
1280
"Check brz directories are safe.")
1285
1282
def _ndiff_strings(self, a, b):
1286
1283
"""Return ndiff between two strings containing lines.
1288
1285
A trailing newline is added if missing to make the strings
1289
1286
print properly."""
1290
if b and b[-1] != '\n':
1287
if b and not b.endswith('\n'):
1292
if a and a[-1] != '\n':
1289
if a and not a.endswith('\n'):
1294
1291
difflines = difflib.ndiff(a.splitlines(True),
1295
1292
b.splitlines(True),
1327
1324
if message is None:
1328
1325
message = "texts not equal:\n"
1326
if a + ('\n' if isinstance(a, str) else b'\n') == b:
1330
1327
message = 'first string is missing a final newline.\n'
1328
if a == b + ('\n' if isinstance(b, str) else b'\n'):
1332
1329
message = 'second string is missing a final newline.\n'
1333
raise AssertionError(message +
1334
self._ndiff_strings(a, b))
1330
raise AssertionError(message
1331
+ self._ndiff_strings(
1332
a if isinstance(a, str) else a.decode(),
1333
b if isinstance(b, str) else b.decode()))
1336
1335
def assertEqualMode(self, mode, mode_test):
1337
1336
self.assertEqual(mode, mode_test,
1402
1401
def assertStartsWith(self, s, prefix):
1403
1402
if not s.startswith(prefix):
1404
raise AssertionError('string %r does not start with %r' % (s, prefix))
1403
raise AssertionError(
1404
'string %r does not start with %r' % (s, prefix))
1406
1406
def assertEndsWith(self, s, suffix):
1407
1407
"""Asserts that s ends with suffix."""
1408
1408
if not s.endswith(suffix):
1409
raise AssertionError('string %r does not end with %r' % (s, suffix))
1409
raise AssertionError(
1410
'string %r does not end with %r' % (s, suffix))
1411
1412
def assertContainsRe(self, haystack, needle_re, flags=0):
1412
1413
"""Assert that a contains something matching a regular expression."""
1413
1414
if not re.search(needle_re, haystack, flags):
1414
if '\n' in haystack or len(haystack) > 60:
1415
if (('\n' if isinstance(haystack, str) else b'\n') in haystack or
1416
len(haystack) > 60):
1415
1417
# a long string, format it in a more readable way
1416
1418
raise AssertionError(
1417
'pattern "%s" not found in\n"""\\\n%s"""\n'
1418
% (needle_re, haystack))
1419
'pattern "%s" not found in\n"""\\\n%s"""\n'
1420
% (needle_re, haystack))
1420
1422
raise AssertionError('pattern "%s" not found in "%s"'
1421
% (needle_re, haystack))
1423
% (needle_re, haystack))
1423
1425
def assertNotContainsRe(self, haystack, needle_re, flags=0):
1424
1426
"""Assert that a does not match a regular expression"""
1425
1427
if re.search(needle_re, haystack, flags):
1426
1428
raise AssertionError('pattern "%s" found in "%s"'
1427
% (needle_re, haystack))
1429
% (needle_re, haystack))
1429
1431
def assertContainsString(self, haystack, needle):
1430
1432
if haystack.find(needle) == -1:
1529
1531
def assertFileEqual(self, content, path):
1530
1532
"""Fail if path does not contain 'content'."""
1531
1533
self.assertPathExists(path)
1532
with open(path, 'rb') as f:
1535
mode = 'r' + ('b' if isinstance(content, bytes) else '')
1536
with open(path, mode) as f:
1534
1538
self.assertEqualDiff(content, s)
1544
1548
def assertPathExists(self, path):
1545
1549
"""Fail unless path or paths, which may be abs or relative, exist."""
1546
if not isinstance(path, (str, text_type)):
1550
# TODO(jelmer): Clean this up for pad.lv/1696545
1551
if not isinstance(path, (bytes, str)):
1548
1553
self.assertPathExists(p)
1550
1555
self.assertTrue(osutils.lexists(path),
1551
path + " does not exist")
1556
path + " does not exist")
1553
1558
def assertPathDoesNotExist(self, path):
1554
1559
"""Fail if path or paths, which may be abs or relative, exist."""
1555
if not isinstance(path, (str, text_type)):
1560
if not isinstance(path, (str, str)):
1557
1562
self.assertPathDoesNotExist(p)
1559
1564
self.assertFalse(osutils.lexists(path),
1562
1567
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1563
1568
"""A helper for callDeprecated and applyDeprecated.
1610
1616
:param kwargs: The keyword arguments for the callable
1611
1617
:return: The result of a_callable(``*args``, ``**kwargs``)
1613
call_warnings, result = self._capture_deprecation_warnings(a_callable,
1619
call_warnings, result = self._capture_deprecation_warnings(
1620
a_callable, *args, **kwargs)
1615
1621
expected_first_warning = symbol_versioning.deprecation_string(
1616
1622
a_callable, deprecation_format)
1617
1623
if len(call_warnings) == 0:
1618
1624
self.fail("No deprecation warning generated by call to %s" %
1620
1626
self.assertEqual(expected_first_warning, call_warnings[0])
1636
1642
# warnings. It's the easiest way to insulate ourselves from -Werror,
1637
1643
# though. -- Andrew, 20071062
1639
1646
def _catcher(message, category, filename, lineno, file=None, line=None):
1640
1647
# despite the name, 'message' is normally(?) a Warning subclass
1668
1675
:param args: The positional arguments for the callable
1669
1676
:param kwargs: The keyword arguments for the callable
1671
call_warnings, result = self._capture_deprecation_warnings(callable,
1678
call_warnings, result = self._capture_deprecation_warnings(
1679
callable, *args, **kwargs)
1673
1680
self.assertEqual(expected, call_warnings)
1676
1683
def _startLogFile(self):
1677
1684
"""Setup a in-memory target for bzr and testcase log messages"""
1678
1685
pseudo_log_file = BytesIO()
1679
1687
def _get_log_contents_for_weird_testtools_api():
1680
1688
return [pseudo_log_file.getvalue().decode(
1681
1689
"utf-8", "replace").encode("utf-8")]
1682
self.addDetail("log", content.Content(content.ContentType("text",
1683
"plain", {"charset": "utf8"}),
1684
_get_log_contents_for_weird_testtools_api))
1691
"log", content.Content(
1692
content.ContentType("text", "plain", {"charset": "utf8"}),
1693
_get_log_contents_for_weird_testtools_api))
1685
1694
self._log_file = pseudo_log_file
1686
1695
self._log_memento = trace.push_log_file(self._log_file)
1687
1696
self.addCleanup(self._finishLogFile)
1758
1767
:param obj: The namespace holding the reference to be replaced;
1759
1768
typically a module, class, or object.
1760
:param attr_name: A string for the name of the attribute to
1769
:param attr_name: A string for the name of the attribute to patch.
1762
1770
:returns: A list that will be extended with one item every time the
1763
1771
function is called, with a tuple of (args, kwargs).
1915
1924
if not feature.available():
1916
1925
raise UnavailableFeature(feature)
1918
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1920
"""Run bazaar command line, splitting up a string command line."""
1921
if isinstance(args, string_types):
1922
args = shlex.split(args)
1923
return self._run_bzr_core(args, retcode=retcode,
1924
encoding=encoding, stdin=stdin, working_dir=working_dir,
1927
def _run_bzr_core(self, args, retcode, encoding, stdin,
1927
def _run_bzr_core(self, args, encoding, stdin, stdout, stderr,
1929
1929
# Clear chk_map page cache, because the contents are likely to mask
1930
1930
# locking errors.
1931
1931
chk_map.clear_cache()
1932
if encoding is None:
1933
encoding = osutils.get_user_encoding()
1935
1933
self.log('run brz: %r', args)
1937
stdout = ui_testing.BytesIOWithEncoding()
1938
stderr = ui_testing.BytesIOWithEncoding()
1939
stdout.encoding = stderr.encoding = encoding
1941
# FIXME: don't call into logging here
1942
handler = trace.EncodedStreamHandler(
1943
stderr, errors="replace", level=logging.INFO)
1944
logger = logging.getLogger('')
1945
logger.addHandler(handler)
1947
self._last_cmd_stdout = codecs.getwriter(encoding)(stdout)
1948
self._last_cmd_stderr = codecs.getwriter(encoding)(stderr)
1935
self._last_cmd_stdout = stdout
1936
self._last_cmd_stderr = stderr
1950
1938
old_ui_factory = ui.ui_factory
1951
1939
ui.ui_factory = ui_testing.TestUIFactory(
1959
1947
os.chdir(working_dir)
1962
result = self.apply_redirected(
1963
ui.ui_factory.stdin,
1965
_mod_commands.run_bzr_catch_user_errors,
1951
result = self.apply_redirected(
1952
ui.ui_factory.stdin,
1954
_mod_commands.run_bzr_catch_user_errors,
1968
logger.removeHandler(handler)
1969
1957
ui.ui_factory = old_ui_factory
1970
1958
if cwd is not None:
1963
def run_bzr_raw(self, args, retcode=0, stdin=None, encoding=None,
1964
working_dir=None, error_regexes=[]):
1965
"""Invoke brz, as if it were run from the command line.
1967
The argument list should not include the brz program name - the
1968
first argument is normally the brz command. Arguments may be
1969
passed in three ways:
1971
1- A list of strings, eg ["commit", "a"]. This is recommended
1972
when the command contains whitespace or metacharacters, or
1973
is built up at run time.
1975
2- A single string, eg "add a". This is the most convenient
1976
for hardcoded commands.
1978
This runs brz through the interface that catches and reports
1979
errors, and with logging set to something approximating the
1980
default, so that error reporting can be checked.
1982
This should be the main method for tests that want to exercise the
1983
overall behavior of the brz application (rather than a unit test
1984
or a functional test of the library.)
1986
This sends the stdout/stderr results into the test's log,
1987
where it may be useful for debugging. See also run_captured.
1989
:keyword stdin: A string to be used as stdin for the command.
1990
:keyword retcode: The status code the command should return;
1992
:keyword working_dir: The directory to run the command in
1993
:keyword error_regexes: A list of expected error messages. If
1994
specified they must be seen in the error output of the command.
1996
if isinstance(args, str):
1997
args = shlex.split(args)
1999
if encoding is None:
2000
encoding = osutils.get_user_encoding()
2004
wrapped_stdout = TextIOWrapper(stdout, encoding)
2005
wrapped_stderr = TextIOWrapper(stderr, encoding)
2006
handler = logging.StreamHandler(wrapped_stderr)
2007
handler.setLevel(logging.INFO)
2009
logger = logging.getLogger('')
2010
logger.addHandler(handler)
2012
result = self._run_bzr_core(
2013
args, encoding=encoding, stdin=stdin, stdout=wrapped_stdout,
2014
stderr=wrapped_stderr, working_dir=working_dir,
2017
logger.removeHandler(handler)
2019
wrapped_stdout.flush()
2020
wrapped_stderr.flush()
1973
2022
out = stdout.getvalue()
1974
2023
err = stderr.getvalue()
1978
2027
self.log('errors:\n%r', err)
1979
2028
if retcode is not None:
1980
2029
self.assertEqual(retcode, result,
1981
message='Unexpected return code')
1982
return result, out, err
2030
message='Unexpected return code')
2031
self.assertIsInstance(error_regexes, (list, tuple))
2032
for regex in error_regexes:
2033
self.assertContainsRe(err, regex)
1984
2036
def run_bzr(self, args, retcode=0, stdin=None, encoding=None,
1985
2037
working_dir=None, error_regexes=[]):
2014
2066
:keyword error_regexes: A list of expected error messages. If
2015
2067
specified they must be seen in the error output of the command.
2017
retcode, out, err = self._run_bzr_autosplit(
2022
working_dir=working_dir,
2069
if isinstance(args, str):
2070
args = shlex.split(args)
2072
if encoding is None:
2073
encoding = osutils.get_user_encoding()
2075
stdout = ui_testing.StringIOWithEncoding()
2076
stderr = ui_testing.StringIOWithEncoding()
2077
stdout.encoding = stderr.encoding = encoding
2078
handler = logging.StreamHandler(stream=stderr)
2079
handler.setLevel(logging.INFO)
2081
logger = logging.getLogger('')
2082
logger.addHandler(handler)
2085
result = self._run_bzr_core(
2086
args, encoding=encoding, stdin=stdin, stdout=stdout,
2087
stderr=stderr, working_dir=working_dir)
2089
logger.removeHandler(handler)
2091
out = stdout.getvalue()
2092
err = stderr.getvalue()
2094
self.log('output:\n%r', out)
2096
self.log('errors:\n%r', err)
2097
if retcode is not None:
2098
self.assertEqual(retcode, result,
2099
message='Unexpected return code')
2024
2100
self.assertIsInstance(error_regexes, (list, tuple))
2025
2101
for regex in error_regexes:
2026
2102
self.assertContainsRe(err, regex)
2048
2124
# Make sure --strict is handling an unknown file, rather than
2049
2125
# giving us the 'nothing to do' error
2050
2126
self.build_tree(['unknown'])
2051
self.run_bzr_error(['Commit refused because there are unknown files'],
2052
['commit', --strict', '-m', 'my commit comment'])
2128
['Commit refused because there are unknown files'],
2129
['commit', --strict', '-m', 'my commit comment'])
2054
2131
kwargs.setdefault('retcode', 3)
2055
2132
kwargs['error_regexes'] = error_regexes
2093
2170
# We distinguish between retcode=None and retcode not passed.
2094
2171
supplied_retcode = kwargs.get('retcode', 0)
2095
2172
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2096
universal_newlines=kwargs.get('universal_newlines', False),
2173
universal_newlines=kwargs.get(
2174
'universal_newlines', False),
2099
2177
def start_bzr_subprocess(self, process_args, env_changes=None,
2100
2178
skip_if_plan_to_signal=False,
2194
2272
# the life of this function, which might interfere with e.g.
2195
2273
# cleaning tempdirs on Windows.
2196
2274
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2197
#detail_content = content.content_from_file(
2275
# detail_content = content.content_from_file(
2198
2276
# log_file_path, buffer_now=True)
2199
2277
with open(log_file_path, 'rb') as log_file:
2200
2278
log_file_bytes = log_file.read()
2201
detail_content = content.Content(content.ContentType("text",
2202
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2279
detail_content = content.Content(
2280
content.ContentType("text", "plain", {"charset": "utf8"}),
2281
lambda: [log_file_bytes])
2203
2282
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2206
2285
def _popen(self, *args, **kwargs):
2207
2286
"""Place a call to Popen.
2239
2318
out, err = process.communicate()
2241
2320
if universal_newlines:
2242
out = out.replace('\r\n', '\n')
2243
err = err.replace('\r\n', '\n')
2321
out = out.replace(b'\r\n', b'\n')
2322
err = err.replace(b'\r\n', b'\n')
2245
2324
if retcode is not None and retcode != process.returncode:
2246
2325
if process_args is None:
2407
2486
self.addCleanup(transport.disconnect)
2409
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2410
_add_disconnect_cleanup, None)
2488
_mod_transport.Transport.hooks.install_named_hook(
2489
'post_connect', _add_disconnect_cleanup, None)
2412
2491
self._make_test_root()
2413
2492
self.addCleanup(os.chdir, osutils.getcwd())
2464
2543
self.__readonly_server = test_server.ReadonlyServer()
2466
2545
# explicit readonly transport.
2467
self.__readonly_server = self.create_transport_readonly_server()
2546
self.__readonly_server = (
2547
self.create_transport_readonly_server())
2468
2548
self.start_server(self.__readonly_server,
2469
self.get_vfs_only_server())
2549
self.get_vfs_only_server())
2470
2550
return self.__readonly_server
2472
2552
def get_readonly_url(self, relpath=None):
2593
2673
root = TestCaseWithMemoryTransport.TEST_ROOT
2594
2674
t = _mod_transport.get_transport_from_path(root)
2595
2675
self.permit_url(t.base)
2596
if (t.get_bytes('.bzr/checkout/dirstate') !=
2676
if (t.get_bytes('.bzr/checkout/dirstate') !=
2597
2677
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2598
2678
# The current test have modified the /bzr directory, we need to
2599
2679
# recreate a new one or all the followng tests will fail.
2687
2768
smart_server = test_server.SmartTCPServer_for_testing()
2688
2769
self.start_server(smart_server, backing_server)
2689
2770
remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2691
2772
return remote_transport
2693
2774
def make_branch_and_memory_tree(self, relpath, format=None):
2694
2775
"""Create a branch on the default transport and a MemoryTree for it."""
2695
2776
b = self.make_branch(relpath, format=format)
2696
return memorytree.MemoryTree.create_on_branch(b)
2777
return b.create_memorytree()
2698
2779
def make_branch_builder(self, relpath, format=None):
2699
2780
branch = self.make_branch(relpath, format=format)
2702
2783
def overrideEnvironmentForTesting(self):
2703
2784
test_home_dir = self.test_home_dir
2704
if not PY3 and isinstance(test_home_dir, text_type):
2705
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2706
2785
self.overrideEnv('HOME', test_home_dir)
2707
2786
self.overrideEnv('BRZ_HOME', test_home_dir)
2708
2787
self.overrideEnv('GNUPGHOME', os.path.join(test_home_dir, '.gnupg'))
2716
2795
# Skip the current stack down to the caller of
2717
2796
# setup_smart_server_with_call_log
2718
2797
prefix_length = len(traceback.extract_stack()) - 2
2719
2799
def capture_hpss_call(params):
2720
2800
self.hpss_calls.append(
2721
2801
CapturedCall(params, prefix_length))
2722
2803
def capture_connect(transport):
2723
2804
self.hpss_connections.append(transport)
2724
2805
client._SmartClient.hooks.install_named_hook(
2837
2918
if type(shape) not in (list, tuple):
2838
2919
raise AssertionError("Parameter 'shape' should be "
2839
"a list or a tuple. Got %r instead" % (shape,))
2920
"a list or a tuple. Got %r instead" % (shape,))
2840
2921
# It's OK to just create them using forward slashes on windows.
2841
2922
if transport is None or transport.is_readonly():
2842
2923
transport = _mod_transport.get_transport_from_path(".")
2843
2924
for name in shape:
2844
self.assertIsInstance(name, (str, text_type))
2925
self.assertIsInstance(name, str)
2845
2926
if name[-1] == '/':
2846
2927
transport.mkdir(urlutils.escape(name[:-1]))
2861
2942
"""Assert whether path or paths are in the WorkingTree"""
2862
2943
if tree is None:
2863
2944
tree = workingtree.WorkingTree.open(root_path)
2864
if not isinstance(path, (str, text_type)):
2945
if not isinstance(path, str):
2866
2947
self.assertInWorkingTree(p, tree=tree)
2868
2949
self.assertTrue(tree.is_versioned(path),
2869
path+' not in working tree.')
2950
path + ' not in working tree.')
2871
2952
def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2872
2953
"""Assert whether path or paths are not in the WorkingTree"""
2873
2954
if tree is None:
2874
2955
tree = workingtree.WorkingTree.open(root_path)
2875
if not isinstance(path, (str, text_type)):
2956
if not isinstance(path, str):
2877
2958
self.assertNotInWorkingTree(p, tree=tree)
2879
self.assertFalse(tree.is_versioned(path), path+' in working tree.')
2960
self.assertFalse(tree.is_versioned(
2961
path), path + ' in working tree.')
2882
2964
class TestCaseWithTransport(TestCaseInTempDir):
2937
3019
format = self.resolve_format(format=format)
2938
3020
if not format.supports_workingtrees:
2939
b = self.make_branch(relpath+'.branch', format=format)
3021
b = self.make_branch(relpath + '.branch', format=format)
2940
3022
return b.create_checkout(relpath, lightweight=True)
2941
3023
b = self.make_branch(relpath, format=format)
2983
3065
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2984
3066
differences = left.changes_from(right)
2985
3067
self.assertFalse(differences.has_changed(),
2986
"Trees %r and %r are different: %r" % (left, right, differences))
3068
"Trees %r and %r are different: %r" % (left, right, differences))
2988
3070
def disable_missing_extensions_warning(self):
2989
3071
"""Some tests expect a precise stderr content.
3233
3316
if stream is None:
3234
3317
stream = sys.stdout
3235
3318
runner = runner_class(stream=stream,
3237
verbosity=verbosity,
3238
bench_history=bench_history,
3240
result_decorators=result_decorators,
3242
runner.stop_on_failure=stop_on_failure
3320
verbosity=verbosity,
3321
bench_history=bench_history,
3323
result_decorators=result_decorators,
3325
runner.stop_on_failure = stop_on_failure
3243
3326
if isinstance(suite, unittest.TestSuite):
3244
3327
# Empty out _tests list of passed suite and populate new TestSuite
3245
3328
suite._tests[:], suite = [], TestSuite(suite)
3286
3369
def fork_decorator(suite):
3287
3370
if getattr(os, "fork", None) is None:
3288
3371
raise errors.BzrCommandError("platform does not support fork,"
3289
" try --parallel=subprocess instead.")
3372
" try --parallel=subprocess instead.")
3290
3373
concurrency = osutils.local_concurrency()
3291
3374
if concurrency == 1:
3293
3376
from testtools import ConcurrentTestSuite
3294
3377
return ConcurrentTestSuite(suite, fork_for_tests)
3295
3380
parallel_registry.register('fork', fork_decorator)
3394
3485
def __init__(self, suite, random_seed, stream):
3395
3486
random_seed = self.actual_seed(random_seed)
3396
3487
stream.write("Randomizing test order using seed %s\n\n" %
3398
3489
# Initialise the random number generator.
3399
3490
random.seed(random_seed)
3400
3491
super(RandomDecorator, self).__init__(randomize_suite(suite))
3484
3576
pid = os.fork()
3487
stream = os.fdopen(c2pwrite, 'wb', 1)
3579
stream = os.fdopen(c2pwrite, 'wb', 0)
3488
3580
workaround_zealous_crypto_random()
3586
coverage.process_startup()
3489
3587
os.close(c2pread)
3490
3588
# Leave stderr and stdout open so we can see test noise
3491
3589
# Close stdin so that the child goes away if it decides to
3500
3598
# if stream couldn't be created or something else goes wrong.
3501
3599
# The traceback is formatted to a string and written in one go
3502
3600
# to avoid interleaving lines from multiple failing children.
3601
tb = traceback.format_exc()
3602
if isinstance(tb, str):
3603
tb = tb.encode('utf-8')
3504
stream.write(traceback.format_exc())
3509
3611
os.close(c2pwrite)
3510
stream = os.fdopen(c2pread, 'rb', 1)
3612
stream = os.fdopen(c2pread, 'rb', 0)
3511
3613
test = TestInOtherProcess(stream, pid)
3512
3614
result.append(test)
3539
3642
test_blocks = partition_tests(suite, concurrency)
3540
3643
for process_tests in test_blocks:
3541
3644
# ugly; currently reimplement rather than reuses TestCase methods.
3542
bzr_path = os.path.dirname(os.path.dirname(breezy.__file__))+'/bzr'
3645
bzr_path = os.path.dirname(os.path.dirname(breezy.__file__)) + '/bzr'
3543
3646
if not os.path.isfile(bzr_path):
3544
3647
# We are probably installed. Assume sys.argv is the right file
3545
3648
bzr_path = sys.argv[0]
3554
3657
test_list_file.close()
3556
3659
argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3558
3661
if '--no-plugins' in sys.argv:
3559
3662
argv.append('--no-plugins')
3560
3663
# stderr=subprocess.STDOUT would be ideal, but until we prevent
3561
3664
# noise on stderr it can interrupt the subunit protocol.
3562
3665
process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3563
stdout=subprocess.PIPE,
3564
stderr=subprocess.PIPE,
3666
stdout=subprocess.PIPE,
3667
stderr=subprocess.PIPE,
3566
3669
test = TestInSubprocess(process, test_list_file_name)
3567
3670
result.append(test)
3680
3783
if lsprof_tests:
3681
3784
result_decorators.append(ProfileResult)
3682
3785
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3683
stop_on_failure=stop_on_failure,
3684
transport=transport,
3685
lsprof_timed=lsprof_timed,
3686
bench_history=bench_history,
3687
matching_tests_first=matching_tests_first,
3688
list_only=list_only,
3689
random_seed=random_seed,
3690
exclude_pattern=exclude_pattern,
3692
runner_class=runner_class,
3693
suite_decorators=suite_decorators,
3695
result_decorators=result_decorators,
3786
stop_on_failure=stop_on_failure,
3787
transport=transport,
3788
lsprof_timed=lsprof_timed,
3789
bench_history=bench_history,
3790
matching_tests_first=matching_tests_first,
3791
list_only=list_only,
3792
random_seed=random_seed,
3793
exclude_pattern=exclude_pattern,
3795
runner_class=runner_class,
3796
suite_decorators=suite_decorators,
3798
result_decorators=result_decorators,
3698
3801
default_transport = old_transport
3699
3802
selftest_debug_flags = old_debug_flags
3851
3954
test_prefix_alias_registry.register('bd', 'breezy.doc')
3852
3955
test_prefix_alias_registry.register('bu', 'breezy.utils')
3853
3956
test_prefix_alias_registry.register('bt', 'breezy.tests')
3957
test_prefix_alias_registry.register('bgt', 'breezy.git.tests')
3854
3958
test_prefix_alias_registry.register('bb', 'breezy.tests.blackbox')
3855
3959
test_prefix_alias_registry.register('bp', 'breezy.plugins')
3858
3962
def _test_suite_testmod_names():
3859
3963
"""Return the standard list of test module names to test."""
3965
'breezy.git.tests.test_blackbox',
3966
'breezy.git.tests.test_builder',
3967
'breezy.git.tests.test_branch',
3968
'breezy.git.tests.test_cache',
3969
'breezy.git.tests.test_dir',
3970
'breezy.git.tests.test_fetch',
3971
'breezy.git.tests.test_git_remote_helper',
3972
'breezy.git.tests.test_mapping',
3973
'breezy.git.tests.test_memorytree',
3974
'breezy.git.tests.test_object_store',
3975
'breezy.git.tests.test_pristine_tar',
3976
'breezy.git.tests.test_push',
3977
'breezy.git.tests.test_remote',
3978
'breezy.git.tests.test_repository',
3979
'breezy.git.tests.test_refs',
3980
'breezy.git.tests.test_revspec',
3981
'breezy.git.tests.test_roundtrip',
3982
'breezy.git.tests.test_server',
3983
'breezy.git.tests.test_transportgit',
3984
'breezy.git.tests.test_unpeel_map',
3985
'breezy.git.tests.test_urls',
3986
'breezy.git.tests.test_workingtree',
3862
3987
'breezy.tests.blackbox',
3863
3988
'breezy.tests.commands',
3864
3989
'breezy.tests.per_branch',
3911
4036
'breezy.tests.test_chk_serializer',
3912
4037
'breezy.tests.test_chunk_writer',
3913
4038
'breezy.tests.test_clean_tree',
3914
'breezy.tests.test_cleanup',
3915
4039
'breezy.tests.test_cmdline',
3916
4040
'breezy.tests.test_commands',
3917
4041
'breezy.tests.test_commit',
3918
4042
'breezy.tests.test_commit_merge',
3919
4043
'breezy.tests.test_config',
4044
'breezy.tests.test_bedding',
3920
4045
'breezy.tests.test_conflicts',
3921
4046
'breezy.tests.test_controldir',
3922
4047
'breezy.tests.test_counted_lock',
3941
4066
'breezy.tests.test_fifo_cache',
3942
4067
'breezy.tests.test_filters',
3943
4068
'breezy.tests.test_filter_tree',
3944
'breezy.tests.test_ftp_transport',
3945
4069
'breezy.tests.test_foreign',
3946
4070
'breezy.tests.test_generate_docs',
3947
4071
'breezy.tests.test_generate_ids',
3948
4072
'breezy.tests.test_globbing',
3949
4073
'breezy.tests.test_gpg',
3950
4074
'breezy.tests.test_graph',
4075
'breezy.tests.test_grep',
3951
4076
'breezy.tests.test_groupcompress',
3952
4077
'breezy.tests.test_hashcache',
3953
4078
'breezy.tests.test_help',
3968
4093
'breezy.tests.test_lazy_import',
3969
4094
'breezy.tests.test_lazy_regex',
3970
4095
'breezy.tests.test_library_state',
4096
'breezy.tests.test_location',
3971
4097
'breezy.tests.test_lock',
3972
4098
'breezy.tests.test_lockable_files',
3973
4099
'breezy.tests.test_lockdir',
3979
4105
'breezy.tests.test_memorytree',
3980
4106
'breezy.tests.test_merge',
3981
4107
'breezy.tests.test_merge3',
4108
'breezy.tests.test_mergeable',
3982
4109
'breezy.tests.test_merge_core',
3983
4110
'breezy.tests.test_merge_directive',
3984
4111
'breezy.tests.test_mergetools',
3985
4112
'breezy.tests.test_missing',
3986
4113
'breezy.tests.test_msgeditor',
3987
4114
'breezy.tests.test_multiparent',
4115
'breezy.tests.test_multiwalker',
3988
4116
'breezy.tests.test_mutabletree',
3989
4117
'breezy.tests.test_nonascii',
3990
4118
'breezy.tests.test_options',
3996
4124
'breezy.tests.test_permissions',
3997
4125
'breezy.tests.test_plugins',
3998
4126
'breezy.tests.test_progress',
4127
'breezy.tests.test_propose',
3999
4128
'breezy.tests.test_pyutils',
4000
4129
'breezy.tests.test_read_bundle',
4001
4130
'breezy.tests.test_reconcile',
4117
4246
# both options means we will load less tests for the same final result.
4118
4247
def interesting_module(name):
4119
4248
for start in starting_with:
4121
# Either the module name starts with the specified string
4122
name.startswith(start)
4123
# or it may contain tests starting with the specified string
4124
or start.startswith(name)
4249
# Either the module name starts with the specified string
4250
# or it may contain tests starting with the specified string
4251
if name.startswith(start) or start.startswith(name):
4128
4254
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
4130
4256
elif keep_only is not None:
4131
4257
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
4132
4259
def interesting_module(name):
4133
4260
return id_filter.refers_to(name)
4136
4263
loader = TestUtil.TestLoader()
4137
4265
def interesting_module(name):
4138
4266
# No filtering, all modules are interesting
4352
4483
if feature.available():
4353
4484
scenarios.append(('C', {'module': feature.module}))
4355
# the compiled module isn't available, so we add a failing test
4356
4486
class FailWithoutFeature(TestCase):
4488
return ext_module_name + '.' + super(FailWithoutFeature, self).id()
4357
4489
def test_fail(self):
4358
4490
self.requireFeature(feature)
4491
# the compiled module isn't available, so we add a failing test
4359
4492
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4360
4493
result = multiply_tests(standard_tests, scenarios, suite)
4361
4494
return result, feature
4379
4512
# We don't want to fail here because some useful display will be lost
4380
4513
# otherwise. Polluting the tmp dir is bad, but not giving all the
4381
4514
# possible info to the test runner is even worse.
4515
if test_id is not None:
4383
4516
ui.ui_factory.clear_term()
4384
4517
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4385
4518
# Ugly, but the last thing we want here is fail, so bear with it.
4460
4593
def __init__(self, stream=sys.stderr, descriptions=0, verbosity=1,
4461
4594
bench_history=None, strict=False, result_decorators=None):
4462
4595
TextTestRunner.__init__(
4463
self, stream=stream,
4464
descriptions=descriptions, verbosity=verbosity,
4465
bench_history=bench_history, strict=strict,
4466
result_decorators=result_decorators)
4596
self, stream=stream,
4597
descriptions=descriptions, verbosity=verbosity,
4598
bench_history=bench_history, strict=strict,
4599
result_decorators=result_decorators)
4467
4600
SubunitTestRunner.__init__(self, verbosity=verbosity,