/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/__init__.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2018-11-16 18:59:44 UTC
  • mfrom: (7143.15.15 more-cleanups)
  • Revision ID: breezy.the.bot@gmail.com-20181116185944-biefv1sub37qfybm
Sprinkle some PEP8iness.

Merged from https://code.launchpad.net/~jelmer/brz/more-cleanups/+merge/358611

Show diffs side-by-side

added added

removed removed

Lines of Context:
60
60
_testtools_version = getattr(testtools, '__version__', ())
61
61
if _testtools_version < (0, 9, 5):
62
62
    raise ImportError("need at least testtools 0.9.5: %s is %r"
63
 
        % (testtools.__file__, _testtools_version))
 
63
                      % (testtools.__file__, _testtools_version))
64
64
from testtools import content
65
65
 
66
66
import breezy
75
75
    hooks,
76
76
    lock as _mod_lock,
77
77
    lockdir,
78
 
    memorytree,
79
78
    osutils,
80
79
    plugin as _mod_plugin,
81
80
    pyutils,
113
112
    treeshape,
114
113
    ui_testing,
115
114
    )
116
 
from ..tests.features import _CompatabilityThunkFeature
117
115
 
118
116
# Mark this python module as being part of the implementation
119
117
# of unittest: this gives us better tracebacks where the last
150
148
    'XDG_CONFIG_HOME': None,
151
149
    # brz now uses the Win32 API and doesn't rely on APPDATA, but the
152
150
    # tests do check our impls match APPDATA
153
 
    'BRZ_EDITOR': None, # test_msgeditor manipulates this variable
 
151
    'BRZ_EDITOR': None,  # test_msgeditor manipulates this variable
154
152
    'VISUAL': None,
155
153
    'EDITOR': None,
156
154
    'BRZ_EMAIL': None,
157
 
    'BZREMAIL': None, # may still be present in the environment
158
 
    'EMAIL': 'jrandom@example.com', # set EMAIL as brz does not guess
 
155
    'BZREMAIL': None,  # may still be present in the environment
 
156
    'EMAIL': 'jrandom@example.com',  # set EMAIL as brz does not guess
159
157
    'BRZ_PROGRESS_BAR': None,
160
158
    # This should trap leaks to ~/.brz.log. This occurs when tests use TestCase
161
159
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
311
309
        self._show_list('ERROR', self.errors)
312
310
        self._show_list('FAIL', self.failures)
313
311
        self.stream.write(self.sep2)
314
 
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
315
 
                            run, run != 1 and "s" or "", timeTaken))
 
312
        self.stream.write("%s %d test%s in %.3fs\n\n" % (
 
313
            actionTaken, run, run != 1 and "s" or "", timeTaken))
316
314
        if not self.wasSuccessful():
317
315
            self.stream.write("FAILED (")
318
316
            failed, errored = map(len, (self.failures, self.errors))
319
317
            if failed:
320
318
                self.stream.write("failures=%d" % failed)
321
319
            if errored:
322
 
                if failed: self.stream.write(", ")
 
320
                if failed:
 
321
                    self.stream.write(", ")
323
322
                self.stream.write("errors=%d" % errored)
324
323
            if self.known_failure_count:
325
 
                if failed or errored: self.stream.write(", ")
 
324
                if failed or errored:
 
325
                    self.stream.write(", ")
326
326
                self.stream.write("known_failure_count=%d" %
327
 
                    self.known_failure_count)
 
327
                                  self.known_failure_count)
328
328
            self.stream.write(")\n")
329
329
        else:
330
330
            if self.known_failure_count:
331
331
                self.stream.write("OK (known_failures=%d)\n" %
332
 
                    self.known_failure_count)
 
332
                                  self.known_failure_count)
333
333
            else:
334
334
                self.stream.write("OK\n")
335
335
        if self.skip_count > 0:
336
336
            skipped = self.skip_count
337
337
            self.stream.write('%d test%s skipped\n' %
338
 
                                (skipped, skipped != 1 and "s" or ""))
 
338
                              (skipped, skipped != 1 and "s" or ""))
339
339
        if self.unsupported:
340
340
            for feature, count in sorted(self.unsupported.items()):
341
341
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
342
 
                    (feature, count))
 
342
                                  (feature, count))
343
343
        if self._strict:
344
344
            ok = self.wasStrictlySuccessful()
345
345
        else:
347
347
        if self._first_thread_leaker_id:
348
348
            self.stream.write(
349
349
                '%s is leaking threads among %d leaking tests.\n' % (
350
 
                self._first_thread_leaker_id,
351
 
                self._tests_leaking_threads_count))
 
350
                    self._first_thread_leaker_id,
 
351
                    self._tests_leaking_threads_count))
352
352
            # We don't report the main thread as an active one.
353
353
            self.stream.write(
354
354
                '%d non-main threads were left active in the end.\n'
369
369
        # to decide whether to round/floor/ceiling. This was added when we
370
370
        # had pyp3 test failures that suggest a floor was happening.
371
371
        shift = 10 ** precision
372
 
        return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
373
 
            a_timedelta.microseconds / 1000000.0) * shift) / shift
 
372
        return math.ceil(
 
373
            (a_timedelta.days * 86400.0 + a_timedelta.seconds +
 
374
             a_timedelta.microseconds / 1000000.0) * shift) / shift
374
375
 
375
376
    def _elapsedTestTimeString(self):
376
 
        """Return a time string for the overall time the current test has taken."""
 
377
        """Return time string for overall time the current test has taken."""
377
378
        return self._formatTime(self._delta_to_float(
378
379
            self._now() - self._start_datetime, 3))
379
380
 
506
507
        super(ExtendedTestResult, self).addFailure(test, details=details)
507
508
        self.failure_count += 1
508
509
        self.report_unexpected_success(test,
509
 
            "".join(details["reason"].iter_text()))
 
510
                                       "".join(details["reason"].iter_text()))
510
511
        if self.stop_early:
511
512
            self.stop()
512
513
 
561
562
            'brz selftest: %s\n' % (bzr_path,))
562
563
        self.stream.write(
563
564
            '   %s\n' % (
564
 
                    breezy.__path__[0],))
 
565
                breezy.__path__[0],))
565
566
        self.stream.write(
566
567
            '   bzr-%s python-%s %s\n' % (
567
 
                    breezy.version_string,
568
 
                    breezy._format_version_tuple(sys.version_info),
569
 
                    platform.platform(aliased=1),
570
 
                    ))
 
568
                breezy.version_string,
 
569
                breezy._format_version_tuple(sys.version_info),
 
570
                platform.platform(aliased=1),
 
571
                ))
571
572
        self.stream.write('\n')
572
573
 
573
574
    def report_test_start(self, test):
581
582
        #                testtools details object would be fitting.
582
583
        if 'threads' in selftest_debug_flags:
583
584
            self.stream.write('%s is leaking, active is now %d\n' %
584
 
                (test.id(), len(active_threads)))
 
585
                              (test.id(), len(active_threads)))
585
586
 
586
587
    def startTestRun(self):
587
588
        self.startTime = time.time()
603
604
                 strict=None,
604
605
                 ):
605
606
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
606
 
            bench_history, strict)
 
607
                                    bench_history, strict)
607
608
        self.pb = self.ui.nested_progress_bar()
608
609
        self.pb.show_pct = False
609
610
        self.pb.show_spinner = False
629
630
        a = '[%d' % self.count              # total that have been run
630
631
        # tests skipped as known not to be relevant are not important enough
631
632
        # to show here
632
 
        ## if self.skip_count:
633
 
        ##     a += ', %d skip' % self.skip_count
634
 
        ## if self.known_failure_count:
635
 
        ##     a += '+%dX' % self.known_failure_count
 
633
        # if self.skip_count:
 
634
        #     a += ', %d skip' % self.skip_count
 
635
        # if self.known_failure_count:
 
636
        #     a += '+%dX' % self.known_failure_count
636
637
        if self.num_tests:
637
 
            a +='/%d' % self.num_tests
 
638
            a += '/%d' % self.num_tests
638
639
        a += ' in '
639
640
        runtime = time.time() - self._overall_start_time
640
641
        if runtime >= 60:
651
652
 
652
653
    def report_test_start(self, test):
653
654
        self.pb.update(
654
 
                self._progress_prefix_text()
655
 
                + ' '
656
 
                + self._shortened_test_description(test))
 
655
            self._progress_prefix_text() +
 
656
            ' ' +
 
657
            self._shortened_test_description(test))
657
658
 
658
659
    def _test_description(self, test):
659
660
        return self._shortened_test_description(test)
696
697
    def _ellipsize_to_right(self, a_string, final_width):
697
698
        """Truncate and pad a string, keeping the right hand side"""
698
699
        if len(a_string) > final_width:
699
 
            result = '...' + a_string[3-final_width:]
 
700
            result = '...' + a_string[3 - final_width:]
700
701
        else:
701
702
            result = a_string
702
703
        return result.ljust(final_width)
713
714
            # 11-char time string, plus a trailing blank
714
715
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
715
716
            # space
716
 
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
717
            self.stream.write(self._ellipsize_to_right(name, width - 18))
717
718
        else:
718
719
            self.stream.write(name)
719
720
        self.stream.flush()
724
725
 
725
726
    def report_error(self, test, err):
726
727
        self.stream.write('ERROR %s\n%s\n'
727
 
                % (self._testTimeString(test),
728
 
                   self._error_summary(err)))
 
728
                          % (self._testTimeString(test),
 
729
                             self._error_summary(err)))
729
730
 
730
731
    def report_failure(self, test, err):
731
732
        self.stream.write(' FAIL %s\n%s\n'
732
 
                % (self._testTimeString(test),
733
 
                   self._error_summary(err)))
 
733
                          % (self._testTimeString(test),
 
734
                             self._error_summary(err)))
734
735
 
735
736
    def report_known_failure(self, test, err):
736
737
        self.stream.write('XFAIL %s\n%s\n'
737
 
                % (self._testTimeString(test),
738
 
                   self._error_summary(err)))
 
738
                          % (self._testTimeString(test),
 
739
                             self._error_summary(err)))
739
740
 
740
741
    def report_unexpected_success(self, test, reason):
741
742
        self.stream.write(' FAIL %s\n%s: %s\n'
742
 
                % (self._testTimeString(test),
743
 
                   "Unexpected success. Should have failed",
744
 
                   reason))
 
743
                          % (self._testTimeString(test),
 
744
                             "Unexpected success. Should have failed",
 
745
                             reason))
745
746
 
746
747
    def report_success(self, test):
747
748
        self.stream.write('   OK %s\n' % self._testTimeString(test))
754
755
 
755
756
    def report_skip(self, test, reason):
756
757
        self.stream.write(' SKIP %s\n%s\n'
757
 
                % (self._testTimeString(test), reason))
 
758
                          % (self._testTimeString(test), reason))
758
759
 
759
760
    def report_not_applicable(self, test, reason):
760
761
        self.stream.write('  N/A %s\n    %s\n'
761
 
                % (self._testTimeString(test), reason))
 
762
                          % (self._testTimeString(test), reason))
762
763
 
763
764
    def report_unsupported(self, test, feature):
764
765
        """test cannot be run because feature is missing."""
765
766
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
766
 
                %(self._testTimeString(test), feature))
 
767
                          % (self._testTimeString(test), feature))
767
768
 
768
769
 
769
770
class TextTestRunner(object):
781
782
 
782
783
        :param result_decorators: An optional list of decorators to apply
783
784
            to the result object being used by the runner. Decorators are
784
 
            applied left to right - the first element in the list is the 
 
785
            applied left to right - the first element in the list is the
785
786
            innermost decorator.
786
787
        """
787
788
        # stream may know claim to know to write unicode strings, but in older
794
795
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
795
796
        #                so should swap to the plain codecs.StreamWriter
796
797
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
797
 
            "backslashreplace")
 
798
                                                     "backslashreplace")
798
799
        stream.encoding = new_encoding
799
800
        self.stream = stream
800
801
        self.descriptions = descriptions
810
811
        elif self.verbosity >= 2:
811
812
            result_class = VerboseTestResult
812
813
        original_result = result_class(self.stream,
813
 
                              self.descriptions,
814
 
                              self.verbosity,
815
 
                              bench_history=self._bench_history,
816
 
                              strict=self._strict,
817
 
                              )
 
814
                                       self.descriptions,
 
815
                                       self.verbosity,
 
816
                                       bench_history=self._bench_history,
 
817
                                       strict=self._strict,
 
818
                                       )
818
819
        # Signal to result objects that look at stop early policy to stop,
819
820
        original_result.stop_early = self.stop_on_failure
820
821
        result = original_result
862
863
def _clever_some_str(value):
863
864
    try:
864
865
        return str(value)
865
 
    except:
 
866
    except BaseException:
866
867
        try:
867
868
            return repr(value).replace('\\n', '\n')
868
 
        except:
 
869
        except BaseException:
869
870
            return '<unprintable %s object>' % type(value).__name__
870
871
 
 
872
 
871
873
traceback._some_str = _clever_some_str
872
874
 
873
875
 
946
948
    def __init__(self, methodName='testMethod'):
947
949
        super(TestCase, self).__init__(methodName)
948
950
        self._directory_isolation = True
949
 
        self.exception_handlers.insert(0,
950
 
            (UnavailableFeature, self._do_unsupported_or_skip))
951
 
        self.exception_handlers.insert(0,
952
 
            (TestNotApplicable, self._do_not_applicable))
 
951
        self.exception_handlers.insert(
 
952
            0, (UnavailableFeature, self._do_unsupported_or_skip))
 
953
        self.exception_handlers.insert(
 
954
            0, (TestNotApplicable, self._do_not_applicable))
953
955
 
954
956
    def setUp(self):
955
957
        super(TestCase, self).setUp()
1026
1028
        :param counter_name: The counter identifier in ``_counters``, defaults
1027
1029
            to ``name``.
1028
1030
        """
1029
 
        _counters = self._counters # Avoid closing over self
 
1031
        _counters = self._counters  # Avoid closing over self
1030
1032
        if counter_name is None:
1031
1033
            counter_name = name
1032
1034
        if counter_name in _counters:
1033
1035
            raise AssertionError('%s is already used as a counter name'
1034
 
                                  % (counter_name,))
 
1036
                                 % (counter_name,))
1035
1037
        _counters[counter_name] = 0
1036
 
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1038
        self.addDetail(counter_name, content.Content(
 
1039
            content.UTF8_TEXT,
1037
1040
            lambda: [b'%d' % (_counters[counter_name],)]))
 
1041
 
1038
1042
        def increment_counter(*args, **kwargs):
1039
1043
            _counters[counter_name] += 1
1040
1044
        label = 'count %s calls' % (counter_name,)
1047
1051
        """
1048
1052
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1049
1053
            self.install_counter_hook(config.ConfigHooks, hook_name,
1050
 
                                       'config.%s' % (hook_name,))
 
1054
                                      'config.%s' % (hook_name,))
1051
1055
 
1052
1056
        # The OldConfigHooks are private and need special handling to protect
1053
1057
        # against recursive tests (tests that run other tests), so we just do
1215
1219
            return
1216
1220
        # This prevents all transports, including e.g. sftp ones backed on disk
1217
1221
        # from working unless they are explicitly granted permission. We then
1218
 
        # depend on the code that sets up test transports to check that they are
1219
 
        # appropriately isolated and enable their use by calling
 
1222
        # depend on the code that sets up test transports to check that they
 
1223
        # are appropriately isolated and enable their use by calling
1220
1224
        # self.permit_transport()
1221
1225
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1222
1226
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
1223
 
                % (url, self._bzr_selftest_roots))
 
1227
                                  % (url, self._bzr_selftest_roots))
1224
1228
 
1225
1229
    def record_directory_isolation(self):
1226
1230
        """Gather accessed directories to permit later access.
1279
1283
        # hook into brz dir opening. This leaves a small window of error for
1280
1284
        # transport tests, but they are well known, and we can improve on this
1281
1285
        # step.
1282
 
        controldir.ControlDir.hooks.install_named_hook("pre_open",
1283
 
            self._preopen_isolate_transport, "Check brz directories are safe.")
 
1286
        controldir.ControlDir.hooks.install_named_hook(
 
1287
            "pre_open", self._preopen_isolate_transport,
 
1288
            "Check brz directories are safe.")
1284
1289
 
1285
1290
    def _ndiff_strings(self, a, b):
1286
1291
        """Return ndiff between two strings containing lines.
1308
1313
        if message:
1309
1314
            message += '\n'
1310
1315
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1311
 
            % (message,
1312
 
               pprint.pformat(a), pprint.pformat(b)))
 
1316
                             % (message,
 
1317
                                pprint.pformat(a), pprint.pformat(b)))
1313
1318
 
1314
1319
    # FIXME: This is deprecated in unittest2 but plugins may still use it so we
1315
1320
    # need a deprecation period for them -- vila 2016-02-01
1330
1335
            message = 'first string is missing a final newline.\n'
1331
1336
        if a == b + ('\n' if isinstance(b, text_type) else b'\n'):
1332
1337
            message = 'second string is missing a final newline.\n'
1333
 
        raise AssertionError(message +
1334
 
                             self._ndiff_strings(a, b))
 
1338
        raise AssertionError(message
 
1339
                             + self._ndiff_strings(a, b))
1335
1340
 
1336
1341
    def assertEqualMode(self, mode, mode_test):
1337
1342
        self.assertEqual(mode, mode_test,
1401
1406
 
1402
1407
    def assertStartsWith(self, s, prefix):
1403
1408
        if not s.startswith(prefix):
1404
 
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1409
            raise AssertionError(
 
1410
                'string %r does not start with %r' % (s, prefix))
1405
1411
 
1406
1412
    def assertEndsWith(self, s, suffix):
1407
1413
        """Asserts that s ends with suffix."""
1408
1414
        if not s.endswith(suffix):
1409
 
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1415
            raise AssertionError(
 
1416
                'string %r does not end with %r' % (s, suffix))
1410
1417
 
1411
1418
    def assertContainsRe(self, haystack, needle_re, flags=0):
1412
1419
        """Assert that a contains something matching a regular expression."""
1413
1420
        if not re.search(needle_re, haystack, flags):
1414
 
            if ('\n' if isinstance(haystack, str) else b'\n') in haystack or len(haystack) > 60:
 
1421
            if (('\n' if isinstance(haystack, str) else b'\n') in haystack or
 
1422
                    len(haystack) > 60):
1415
1423
                # a long string, format it in a more readable way
1416
1424
                raise AssertionError(
1417
 
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
1418
 
                        % (needle_re, haystack))
 
1425
                    'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1426
                    % (needle_re, haystack))
1419
1427
            else:
1420
1428
                raise AssertionError('pattern "%s" not found in "%s"'
1421
 
                        % (needle_re, haystack))
 
1429
                                     % (needle_re, haystack))
1422
1430
 
1423
1431
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
1424
1432
        """Assert that a does not match a regular expression"""
1425
1433
        if re.search(needle_re, haystack, flags):
1426
1434
            raise AssertionError('pattern "%s" found in "%s"'
1427
 
                    % (needle_re, haystack))
 
1435
                                 % (needle_re, haystack))
1428
1436
 
1429
1437
    def assertContainsString(self, haystack, needle):
1430
1438
        if haystack.find(needle) == -1:
1516
1524
 
1517
1525
    def assertIsInstance(self, obj, kls, msg=None):
1518
1526
        """Fail if obj is not an instance of kls
1519
 
        
 
1527
 
1520
1528
        :param msg: Supplementary message to show if the assertion fails.
1521
1529
        """
1522
1530
        if not isinstance(obj, kls):
1529
1537
    def assertFileEqual(self, content, path):
1530
1538
        """Fail if path does not contain 'content'."""
1531
1539
        self.assertPathExists(path)
1532
 
        
1533
 
        with open(path, 'r' + ('b' if isinstance(content, bytes) else '')) as f:
 
1540
 
 
1541
        mode = 'r' + ('b' if isinstance(content, bytes) else '')
 
1542
        with open(path, mode) as f:
1534
1543
            s = f.read()
1535
1544
        self.assertEqualDiff(content, s)
1536
1545
 
1550
1559
                self.assertPathExists(p)
1551
1560
        else:
1552
1561
            self.assertTrue(osutils.lexists(path),
1553
 
                path + " does not exist")
 
1562
                            path + " does not exist")
1554
1563
 
1555
1564
    def assertPathDoesNotExist(self, path):
1556
1565
        """Fail if path or paths, which may be abs or relative, exist."""
1559
1568
                self.assertPathDoesNotExist(p)
1560
1569
        else:
1561
1570
            self.assertFalse(osutils.lexists(path),
1562
 
                path + " exists")
 
1571
                             path + " exists")
1563
1572
 
1564
1573
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1565
1574
        """A helper for callDeprecated and applyDeprecated.
1571
1580
            a_callable(``*args``, ``**kwargs``).
1572
1581
        """
1573
1582
        local_warnings = []
 
1583
 
1574
1584
        def capture_warnings(msg, cls=None, stacklevel=None):
1575
1585
            # we've hooked into a deprecation specific callpath,
1576
1586
            # only deprecations should getting sent via it.
1612
1622
        :param kwargs: The keyword arguments for the callable
1613
1623
        :return: The result of a_callable(``*args``, ``**kwargs``)
1614
1624
        """
1615
 
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
1616
 
            *args, **kwargs)
 
1625
        call_warnings, result = self._capture_deprecation_warnings(
 
1626
            a_callable, *args, **kwargs)
1617
1627
        expected_first_warning = symbol_versioning.deprecation_string(
1618
1628
            a_callable, deprecation_format)
1619
1629
        if len(call_warnings) == 0:
1620
1630
            self.fail("No deprecation warning generated by call to %s" %
1621
 
                a_callable)
 
1631
                      a_callable)
1622
1632
        self.assertEqual(expected_first_warning, call_warnings[0])
1623
1633
        return result
1624
1634
 
1638
1648
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
1639
1649
        # though.  -- Andrew, 20071062
1640
1650
        wlist = []
 
1651
 
1641
1652
        def _catcher(message, category, filename, lineno, file=None, line=None):
1642
1653
            # despite the name, 'message' is normally(?) a Warning subclass
1643
1654
            # instance
1670
1681
        :param args: The positional arguments for the callable
1671
1682
        :param kwargs: The keyword arguments for the callable
1672
1683
        """
1673
 
        call_warnings, result = self._capture_deprecation_warnings(callable,
1674
 
            *args, **kwargs)
 
1684
        call_warnings, result = self._capture_deprecation_warnings(
 
1685
            callable, *args, **kwargs)
1675
1686
        self.assertEqual(expected, call_warnings)
1676
1687
        return result
1677
1688
 
1678
1689
    def _startLogFile(self):
1679
1690
        """Setup a in-memory target for bzr and testcase log messages"""
1680
1691
        pseudo_log_file = BytesIO()
 
1692
 
1681
1693
        def _get_log_contents_for_weird_testtools_api():
1682
1694
            return [pseudo_log_file.getvalue().decode(
1683
1695
                "utf-8", "replace").encode("utf-8")]
1684
 
        self.addDetail("log", content.Content(content.ContentType("text",
1685
 
            "plain", {"charset": "utf8"}),
1686
 
            _get_log_contents_for_weird_testtools_api))
 
1696
        self.addDetail(
 
1697
            "log", content.Content(
 
1698
                content.ContentType("text", "plain", {"charset": "utf8"}),
 
1699
                _get_log_contents_for_weird_testtools_api))
1687
1700
        self._log_file = pseudo_log_file
1688
1701
        self._log_memento = trace.push_log_file(self._log_file)
1689
1702
        self.addCleanup(self._finishLogFile)
1743
1756
 
1744
1757
        :param name: The environment variable name.
1745
1758
 
1746
 
        :param new: The value to set the variable to. If None, the 
 
1759
        :param new: The value to set the variable to. If None, the
1747
1760
            variable is deleted from the environment.
1748
1761
 
1749
1762
        :returns: The actual variable value.
1759
1772
 
1760
1773
        :param obj: The namespace holding the reference to be replaced;
1761
1774
            typically a module, class, or object.
1762
 
        :param attr_name: A string for the name of the attribute to 
1763
 
            patch.
 
1775
        :param attr_name: A string for the name of the attribute to patch.
1764
1776
        :returns: A list that will be extended with one item every time the
1765
1777
            function is called, with a tuple of (args, kwargs).
1766
1778
        """
1838
1850
            reason = 'No reason given'
1839
1851
        else:
1840
1852
            reason = e.args[0]
1841
 
        self._suppress_log ()
 
1853
        self._suppress_log()
1842
1854
        addNotApplicable = getattr(result, 'addNotApplicable', None)
1843
1855
        if addNotApplicable is not None:
1844
1856
            result.addNotApplicable(self, reason)
1883
1895
        self._benchcalls.
1884
1896
        """
1885
1897
        if self._benchtime is None:
1886
 
            self.addDetail('benchtime', content.Content(content.UTF8_TEXT,
1887
 
                lambda:[str(self._benchtime).encode('utf-8')]))
 
1898
            self.addDetail('benchtime', content.Content(
 
1899
                content.UTF8_TEXT,
 
1900
                lambda: [str(self._benchtime).encode('utf-8')]))
1888
1901
            self._benchtime = 0
1889
1902
        start = time.time()
1890
1903
        try:
1918
1931
            raise UnavailableFeature(feature)
1919
1932
 
1920
1933
    def _run_bzr_core(self, args, encoding, stdin, stdout, stderr,
1921
 
            working_dir):
 
1934
                      working_dir):
1922
1935
        # Clear chk_map page cache, because the contents are likely to mask
1923
1936
        # locking errors.
1924
1937
        chk_map.clear_cache()
1958
1971
        return result
1959
1972
 
1960
1973
    def run_bzr_raw(self, args, retcode=0, stdin=None, encoding=None,
1961
 
                working_dir=None, error_regexes=[]):
 
1974
                    working_dir=None, error_regexes=[]):
1962
1975
        """Invoke brz, as if it were run from the command line.
1963
1976
 
1964
1977
        The argument list should not include the brz program name - the
2016
2029
        logger.addHandler(handler)
2017
2030
        try:
2018
2031
            result = self._run_bzr_core(
2019
 
                    args, encoding=encoding, stdin=stdin, stdout=wrapped_stdout,
2020
 
                    stderr=wrapped_stderr, working_dir=working_dir,
2021
 
                    )
 
2032
                args, encoding=encoding, stdin=stdin, stdout=wrapped_stdout,
 
2033
                stderr=wrapped_stderr, working_dir=working_dir,
 
2034
                )
2022
2035
        finally:
2023
2036
            logger.removeHandler(handler)
2024
2037
 
2034
2047
            self.log('errors:\n%r', err)
2035
2048
        if retcode is not None:
2036
2049
            self.assertEqual(retcode, result,
2037
 
                              message='Unexpected return code')
 
2050
                             message='Unexpected return code')
2038
2051
        self.assertIsInstance(error_regexes, (list, tuple))
2039
2052
        for regex in error_regexes:
2040
2053
            self.assertContainsRe(err, regex)
2097
2110
        logger.addHandler(handler)
2098
2111
 
2099
2112
        try:
2100
 
            result = self._run_bzr_core(args,
2101
 
                    encoding=encoding, stdin=stdin, stdout=stdout,
2102
 
                    stderr=stderr, working_dir=working_dir,
2103
 
                    )
 
2113
            result = self._run_bzr_core(
 
2114
                args, encoding=encoding, stdin=stdin, stdout=stdout,
 
2115
                stderr=stderr, working_dir=working_dir)
2104
2116
        finally:
2105
2117
            logger.removeHandler(handler)
2106
2118
 
2112
2124
            self.log('errors:\n%r', err)
2113
2125
        if retcode is not None:
2114
2126
            self.assertEqual(retcode, result,
2115
 
                              message='Unexpected return code')
 
2127
                             message='Unexpected return code')
2116
2128
        self.assertIsInstance(error_regexes, (list, tuple))
2117
2129
        for regex in error_regexes:
2118
2130
            self.assertContainsRe(err, regex)
2140
2152
            # Make sure --strict is handling an unknown file, rather than
2141
2153
            # giving us the 'nothing to do' error
2142
2154
            self.build_tree(['unknown'])
2143
 
            self.run_bzr_error(['Commit refused because there are unknown files'],
2144
 
                               ['commit', --strict', '-m', 'my commit comment'])
 
2155
            self.run_bzr_error(
 
2156
                ['Commit refused because there are unknown files'],
 
2157
                ['commit', --strict', '-m', 'my commit comment'])
2145
2158
        """
2146
2159
        kwargs.setdefault('retcode', 3)
2147
2160
        kwargs['error_regexes'] = error_regexes
2185
2198
        # We distinguish between retcode=None and retcode not passed.
2186
2199
        supplied_retcode = kwargs.get('retcode', 0)
2187
2200
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2188
 
            universal_newlines=kwargs.get('universal_newlines', False),
2189
 
            process_args=args)
 
2201
                                          universal_newlines=kwargs.get(
 
2202
                                              'universal_newlines', False),
 
2203
                                          process_args=args)
2190
2204
 
2191
2205
    def start_bzr_subprocess(self, process_args, env_changes=None,
2192
2206
                             skip_if_plan_to_signal=False,
2286
2300
            # the life of this function, which might interfere with e.g.
2287
2301
            # cleaning tempdirs on Windows.
2288
2302
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2289
 
            #detail_content = content.content_from_file(
 
2303
            # detail_content = content.content_from_file(
2290
2304
            #    log_file_path, buffer_now=True)
2291
2305
            with open(log_file_path, 'rb') as log_file:
2292
2306
                log_file_bytes = log_file.read()
2293
 
            detail_content = content.Content(content.ContentType("text",
2294
 
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2307
            detail_content = content.Content(
 
2308
                content.ContentType("text", "plain", {"charset": "utf8"}),
 
2309
                lambda: [log_file_bytes])
2295
2310
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2296
 
                detail_content)
 
2311
                           detail_content)
2297
2312
 
2298
2313
    def _popen(self, *args, **kwargs):
2299
2314
        """Place a call to Popen.
2355
2370
            if ie.kind == 'directory':
2356
2371
                name = name + '/'
2357
2372
            if name == "/":
2358
 
                pass # ignore root entry
 
2373
                pass  # ignore root entry
2359
2374
            elif name in shape:
2360
2375
                shape.remove(name)
2361
2376
            else:
2429
2444
        orig_info = request_handlers.get_info(verb)
2430
2445
        request_handlers.remove(verb)
2431
2446
        self.addCleanup(request_handlers.register, verb, orig_method,
2432
 
            info=orig_info)
 
2447
                        info=orig_info)
2433
2448
 
2434
2449
    def __hash__(self):
2435
2450
        return id(self)
2504
2519
            """
2505
2520
            self.addCleanup(transport.disconnect)
2506
2521
 
2507
 
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
2508
 
            _add_disconnect_cleanup, None)
 
2522
        _mod_transport.Transport.hooks.install_named_hook(
 
2523
            'post_connect', _add_disconnect_cleanup, None)
2509
2524
 
2510
2525
        self._make_test_root()
2511
2526
        self.addCleanup(os.chdir, osutils.getcwd())
2562
2577
                self.__readonly_server = test_server.ReadonlyServer()
2563
2578
            else:
2564
2579
                # explicit readonly transport.
2565
 
                self.__readonly_server = self.create_transport_readonly_server()
 
2580
                self.__readonly_server = (
 
2581
                    self.create_transport_readonly_server())
2566
2582
            self.start_server(self.__readonly_server,
2567
 
                self.get_vfs_only_server())
 
2583
                              self.get_vfs_only_server())
2568
2584
        return self.__readonly_server
2569
2585
 
2570
2586
    def get_readonly_url(self, relpath=None):
2603
2619
        """
2604
2620
        if self.__server is None:
2605
2621
            if (self.transport_server is None or self.transport_server is
2606
 
                self.vfs_transport_factory):
 
2622
                    self.vfs_transport_factory):
2607
2623
                self.__server = self.get_vfs_only_server()
2608
2624
            else:
2609
2625
                # bring up a decorated means of access to the vfs only server.
2691
2707
        root = TestCaseWithMemoryTransport.TEST_ROOT
2692
2708
        t = _mod_transport.get_transport_from_path(root)
2693
2709
        self.permit_url(t.base)
2694
 
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2710
        if (t.get_bytes('.bzr/checkout/dirstate') !=
2695
2711
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2696
2712
            # The current test have modified the /bzr directory, we need to
2697
2713
            # recreate a new one or all the followng tests will fail.
2723
2739
        This must set self.test_home_dir and self.test_dir and chdir to
2724
2740
        self.test_dir.
2725
2741
 
2726
 
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2742
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this
 
2743
        test.
2727
2744
        """
2728
2745
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2729
2746
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2785
2802
        smart_server = test_server.SmartTCPServer_for_testing()
2786
2803
        self.start_server(smart_server, backing_server)
2787
2804
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2788
 
                                                   ).clone(path)
 
2805
                                                                 ).clone(path)
2789
2806
        return remote_transport
2790
2807
 
2791
2808
    def make_branch_and_memory_tree(self, relpath, format=None):
2814
2831
        # Skip the current stack down to the caller of
2815
2832
        # setup_smart_server_with_call_log
2816
2833
        prefix_length = len(traceback.extract_stack()) - 2
 
2834
 
2817
2835
        def capture_hpss_call(params):
2818
2836
            self.hpss_calls.append(
2819
2837
                CapturedCall(params, prefix_length))
 
2838
 
2820
2839
        def capture_connect(transport):
2821
2840
            self.hpss_connections.append(transport)
2822
2841
        client._SmartClient.hooks.install_named_hook(
2883
2902
        name and then create two subdirs - test and home under it.
2884
2903
        """
2885
2904
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2886
 
            self._getTestDirPrefix())
 
2905
                                       self._getTestDirPrefix())
2887
2906
        name = name_prefix
2888
2907
        for i in range(100):
2889
2908
            if os.path.exists(name):
2934
2953
        """
2935
2954
        if type(shape) not in (list, tuple):
2936
2955
            raise AssertionError("Parameter 'shape' should be "
2937
 
                "a list or a tuple. Got %r instead" % (shape,))
 
2956
                                 "a list or a tuple. Got %r instead" % (shape,))
2938
2957
        # It's OK to just create them using forward slashes on windows.
2939
2958
        if transport is None or transport.is_readonly():
2940
2959
            transport = _mod_transport.get_transport_from_path(".")
2964
2983
                self.assertInWorkingTree(p, tree=tree)
2965
2984
        else:
2966
2985
            self.assertTrue(tree.is_versioned(path),
2967
 
                path+' not in working tree.')
 
2986
                            path + ' not in working tree.')
2968
2987
 
2969
2988
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2970
2989
        """Assert whether path or paths are not in the WorkingTree"""
2974
2993
            for p in path:
2975
2994
                self.assertNotInWorkingTree(p, tree=tree)
2976
2995
        else:
2977
 
            self.assertFalse(tree.is_versioned(path), path+' in working tree.')
 
2996
            self.assertFalse(tree.is_versioned(
 
2997
                path), path + ' in working tree.')
2978
2998
 
2979
2999
 
2980
3000
class TestCaseWithTransport(TestCaseInTempDir):
3034
3054
        # RBC 20060208
3035
3055
        format = self.resolve_format(format=format)
3036
3056
        if not format.supports_workingtrees:
3037
 
            b = self.make_branch(relpath+'.branch', format=format)
 
3057
            b = self.make_branch(relpath + '.branch', format=format)
3038
3058
            return b.create_checkout(relpath, lightweight=True)
3039
3059
        b = self.make_branch(relpath, format=format)
3040
3060
        try:
3081
3101
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
3082
3102
        differences = left.changes_from(right)
3083
3103
        self.assertFalse(differences.has_changed(),
3084
 
            "Trees %r and %r are different: %r" % (left, right, differences))
 
3104
                         "Trees %r and %r are different: %r" % (left, right, differences))
3085
3105
 
3086
3106
    def disable_missing_extensions_warning(self):
3087
3107
        """Some tests expect a precise stderr content.
3119
3139
    :return: A callable that returns True if the re matches.
3120
3140
    """
3121
3141
    filter_re = re.compile(pattern, 0)
 
3142
 
3122
3143
    def condition(test):
3123
3144
        test_id = test.id()
3124
3145
        return filter_re.search(test_id)
3331
3352
    if stream is None:
3332
3353
        stream = sys.stdout
3333
3354
    runner = runner_class(stream=stream,
3334
 
                            descriptions=0,
3335
 
                            verbosity=verbosity,
3336
 
                            bench_history=bench_history,
3337
 
                            strict=strict,
3338
 
                            result_decorators=result_decorators,
3339
 
                            )
3340
 
    runner.stop_on_failure=stop_on_failure
 
3355
                          descriptions=0,
 
3356
                          verbosity=verbosity,
 
3357
                          bench_history=bench_history,
 
3358
                          strict=strict,
 
3359
                          result_decorators=result_decorators,
 
3360
                          )
 
3361
    runner.stop_on_failure = stop_on_failure
3341
3362
    if isinstance(suite, unittest.TestSuite):
3342
3363
        # Empty out _tests list of passed suite and populate new TestSuite
3343
3364
        suite._tests[:], suite = [], TestSuite(suite)
3384
3405
def fork_decorator(suite):
3385
3406
    if getattr(os, "fork", None) is None:
3386
3407
        raise errors.BzrCommandError("platform does not support fork,"
3387
 
            " try --parallel=subprocess instead.")
 
3408
                                     " try --parallel=subprocess instead.")
3388
3409
    concurrency = osutils.local_concurrency()
3389
3410
    if concurrency == 1:
3390
3411
        return suite
3391
3412
    from testtools import ConcurrentTestSuite
3392
3413
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3414
 
 
3415
 
3393
3416
parallel_registry.register('fork', fork_decorator)
3394
3417
 
3395
3418
 
3399
3422
        return suite
3400
3423
    from testtools import ConcurrentTestSuite
3401
3424
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3425
 
 
3426
 
3402
3427
parallel_registry.register('subprocess', subprocess_decorator)
3403
3428
 
3404
3429
 
3406
3431
    """Return a test suite decorator that excludes tests."""
3407
3432
    if exclude_pattern is None:
3408
3433
        return identity_decorator
 
3434
 
3409
3435
    def decorator(suite):
3410
3436
        return ExcludeDecorator(suite, exclude_pattern)
3411
3437
    return decorator
3414
3440
def filter_tests(pattern):
3415
3441
    if pattern == '.*':
3416
3442
        return identity_decorator
 
3443
 
3417
3444
    def decorator(suite):
3418
3445
        return FilterTestsDecorator(suite, pattern)
3419
3446
    return decorator
3427
3454
    """
3428
3455
    if random_seed is None:
3429
3456
        return identity_decorator
 
3457
 
3430
3458
    def decorator(suite):
3431
3459
        return RandomDecorator(suite, random_seed, runner.stream)
3432
3460
    return decorator
3435
3463
def tests_first(pattern):
3436
3464
    if pattern == '.*':
3437
3465
        return identity_decorator
 
3466
 
3438
3467
    def decorator(suite):
3439
3468
        return TestFirstDecorator(suite, pattern)
3440
3469
    return decorator
3492
3521
    def __init__(self, suite, random_seed, stream):
3493
3522
        random_seed = self.actual_seed(random_seed)
3494
3523
        stream.write("Randomizing test order using seed %s\n\n" %
3495
 
            (random_seed,))
 
3524
                     (random_seed,))
3496
3525
        # Initialise the random number generator.
3497
3526
        random.seed(random_seed)
3498
3527
        super(RandomDecorator, self).__init__(randomize_suite(suite))
3557
3586
    result = []
3558
3587
    from subunit import ProtocolTestCase
3559
3588
    from subunit.test_results import AutoTimingTestResultDecorator
 
3589
 
3560
3590
    class TestInOtherProcess(ProtocolTestCase):
3561
3591
        # Should be in subunit, I think. RBC.
3562
3592
        def __init__(self, stream, pid):
3630
3660
    concurrency = osutils.local_concurrency()
3631
3661
    result = []
3632
3662
    from subunit import ProtocolTestCase
 
3663
 
3633
3664
    class TestInSubprocess(ProtocolTestCase):
3634
3665
        def __init__(self, process, name):
3635
3666
            ProtocolTestCase.__init__(self, process.stdout)
3647
3678
    test_blocks = partition_tests(suite, concurrency)
3648
3679
    for process_tests in test_blocks:
3649
3680
        # ugly; currently reimplement rather than reuses TestCase methods.
3650
 
        bzr_path = os.path.dirname(os.path.dirname(breezy.__file__))+'/bzr'
 
3681
        bzr_path = os.path.dirname(os.path.dirname(breezy.__file__)) + '/bzr'
3651
3682
        if not os.path.isfile(bzr_path):
3652
3683
            # We are probably installed. Assume sys.argv is the right file
3653
3684
            bzr_path = sys.argv[0]
3662
3693
        test_list_file.close()
3663
3694
        try:
3664
3695
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3665
 
                '--subunit']
 
3696
                               '--subunit']
3666
3697
            if '--no-plugins' in sys.argv:
3667
3698
                argv.append('--no-plugins')
3668
3699
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
3669
3700
            # noise on stderr it can interrupt the subunit protocol.
3670
3701
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3671
 
                                      stdout=subprocess.PIPE,
3672
 
                                      stderr=subprocess.PIPE,
3673
 
                                      bufsize=1)
 
3702
                                       stdout=subprocess.PIPE,
 
3703
                                       stderr=subprocess.PIPE,
 
3704
                                       bufsize=1)
3674
3705
            test = TestInSubprocess(process, test_list_file_name)
3675
3706
            result.append(test)
3676
3707
        except:
3681
3712
 
3682
3713
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3683
3714
    """Generate profiling data for all activity between start and success.
3684
 
    
 
3715
 
3685
3716
    The profile data is appended to the test's _benchcalls attribute and can
3686
3717
    be accessed by the forwarded-to TestResult.
3687
3718
 
3788
3819
        if lsprof_tests:
3789
3820
            result_decorators.append(ProfileResult)
3790
3821
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3791
 
                     stop_on_failure=stop_on_failure,
3792
 
                     transport=transport,
3793
 
                     lsprof_timed=lsprof_timed,
3794
 
                     bench_history=bench_history,
3795
 
                     matching_tests_first=matching_tests_first,
3796
 
                     list_only=list_only,
3797
 
                     random_seed=random_seed,
3798
 
                     exclude_pattern=exclude_pattern,
3799
 
                     strict=strict,
3800
 
                     runner_class=runner_class,
3801
 
                     suite_decorators=suite_decorators,
3802
 
                     stream=stream,
3803
 
                     result_decorators=result_decorators,
3804
 
                     )
 
3822
                         stop_on_failure=stop_on_failure,
 
3823
                         transport=transport,
 
3824
                         lsprof_timed=lsprof_timed,
 
3825
                         bench_history=bench_history,
 
3826
                         matching_tests_first=matching_tests_first,
 
3827
                         list_only=list_only,
 
3828
                         random_seed=random_seed,
 
3829
                         exclude_pattern=exclude_pattern,
 
3830
                         strict=strict,
 
3831
                         runner_class=runner_class,
 
3832
                         suite_decorators=suite_decorators,
 
3833
                         stream=stream,
 
3834
                         result_decorators=result_decorators,
 
3835
                         )
3805
3836
    finally:
3806
3837
        default_transport = old_transport
3807
3838
        selftest_debug_flags = old_debug_flags
4246
4277
        def interesting_module(name):
4247
4278
            for start in starting_with:
4248
4279
                if (
4249
 
                    # Either the module name starts with the specified string
4250
 
                    name.startswith(start)
4251
 
                    # or it may contain tests starting with the specified string
4252
 
                    or start.startswith(name)
4253
 
                    ):
 
4280
                        # Either the module name starts with the specified string
 
4281
                        name.startswith(start)
 
4282
                    or                     # or it may contain tests starting with the specified string
 
4283
                        start.startswith(name)
 
4284
                        ):
4254
4285
                    return True
4255
4286
            return False
4256
4287
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
4257
4288
 
4258
4289
    elif keep_only is not None:
4259
4290
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
4291
 
4260
4292
        def interesting_module(name):
4261
4293
            return id_filter.refers_to(name)
4262
4294
 
4263
4295
    else:
4264
4296
        loader = TestUtil.TestLoader()
 
4297
 
4265
4298
        def interesting_module(name):
4266
4299
            # No filtering, all modules are interesting
4267
4300
            return True
4595
4628
        def __init__(self, stream=sys.stderr, descriptions=0, verbosity=1,
4596
4629
                     bench_history=None, strict=False, result_decorators=None):
4597
4630
            TextTestRunner.__init__(
4598
 
                    self, stream=stream,
4599
 
                    descriptions=descriptions, verbosity=verbosity,
4600
 
                    bench_history=bench_history, strict=strict,
4601
 
                    result_decorators=result_decorators)
 
4631
                self, stream=stream,
 
4632
                descriptions=descriptions, verbosity=verbosity,
 
4633
                bench_history=bench_history, strict=strict,
 
4634
                result_decorators=result_decorators)
4602
4635
            SubunitTestRunner.__init__(self, verbosity=verbosity,
4603
4636
                                       stream=stream)
4604
4637