/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin
  • Date: 2010-05-25 17:27:52 UTC
  • mfrom: (5254 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5257.
  • Revision ID: gzlist@googlemail.com-20100525172752-amm089xcikv968sw
Merge bzr.dev to unite with similar changes already made

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
 
17
"""Testing framework extensions"""
17
18
 
18
19
# TODO: Perhaps there should be an API to find out if bzr running under the
19
20
# test suite -- some plugins might want to avoid making intrusive changes if
46
47
import tempfile
47
48
import threading
48
49
import time
 
50
import traceback
49
51
import unittest
50
52
import warnings
51
53
 
 
54
import testtools
 
55
# nb: check this before importing anything else from within it
 
56
_testtools_version = getattr(testtools, '__version__', ())
 
57
if _testtools_version < (0, 9, 2):
 
58
    raise ImportError("need at least testtools 0.9.2: %s is %r"
 
59
        % (testtools.__file__, _testtools_version))
 
60
from testtools import content
52
61
 
53
62
from bzrlib import (
54
63
    branchbuilder,
55
64
    bzrdir,
 
65
    chk_map,
56
66
    config,
57
67
    debug,
58
68
    errors,
87
97
from bzrlib.symbol_versioning import (
88
98
    DEPRECATED_PARAMETER,
89
99
    deprecated_function,
 
100
    deprecated_in,
90
101
    deprecated_method,
91
102
    deprecated_passed,
92
103
    )
93
104
import bzrlib.trace
94
 
from bzrlib.transport import get_transport, pathfilter
 
105
from bzrlib.transport import (
 
106
    get_transport,
 
107
    memory,
 
108
    pathfilter,
 
109
    )
95
110
import bzrlib.transport
96
 
from bzrlib.transport.local import LocalURLServer
97
 
from bzrlib.transport.memory import MemoryServer
98
 
from bzrlib.transport.readonly import ReadonlyServer
99
111
from bzrlib.trace import mutter, note
100
 
from bzrlib.tests import TestUtil
 
112
from bzrlib.tests import (
 
113
    test_server,
 
114
    TestUtil,
 
115
    treeshape,
 
116
    )
101
117
from bzrlib.tests.http_server import HttpServer
102
118
from bzrlib.tests.TestUtil import (
103
119
                          TestSuite,
104
120
                          TestLoader,
105
121
                          )
106
 
from bzrlib.tests.treeshape import build_tree_contents
107
122
from bzrlib.ui import NullProgressView
108
123
from bzrlib.ui.text import TextUIFactory
109
124
import bzrlib.version_info_formats.format_custom
114
129
# shown frame is the test code, not our assertXYZ.
115
130
__unittest = 1
116
131
 
117
 
default_transport = LocalURLServer
 
132
default_transport = test_server.LocalURLServer
 
133
 
 
134
 
 
135
_unitialized_attr = object()
 
136
"""A sentinel needed to act as a default value in a method signature."""
 
137
 
118
138
 
119
139
# Subunit result codes, defined here to prevent a hard dependency on subunit.
120
140
SUBUNIT_SEEK_SET = 0
222
242
                '%s is leaking threads among %d leaking tests.\n' % (
223
243
                TestCase._first_thread_leaker_id,
224
244
                TestCase._leaking_threads_tests))
225
 
 
226
 
    def _extractBenchmarkTime(self, testCase):
 
245
            # We don't report the main thread as an active one.
 
246
            self.stream.write(
 
247
                '%d non-main threads were left active in the end.\n'
 
248
                % (TestCase._active_threads - 1))
 
249
 
 
250
    def getDescription(self, test):
 
251
        return test.id()
 
252
 
 
253
    def _extractBenchmarkTime(self, testCase, details=None):
227
254
        """Add a benchmark time for the current test case."""
 
255
        if details and 'benchtime' in details:
 
256
            return float(''.join(details['benchtime'].iter_bytes()))
228
257
        return getattr(testCase, "_benchtime", None)
229
258
 
230
259
    def _elapsedTestTimeString(self):
264
293
        else:
265
294
            bzr_path = sys.executable
266
295
        self.stream.write(
267
 
            'testing: %s\n' % (bzr_path,))
 
296
            'bzr selftest: %s\n' % (bzr_path,))
268
297
        self.stream.write(
269
298
            '   %s\n' % (
270
299
                    bzrlib.__path__[0],))
293
322
        Called from the TestCase run() method when the test
294
323
        fails with an unexpected error.
295
324
        """
296
 
        self._testConcluded(test)
297
 
        if isinstance(err[1], TestNotApplicable):
298
 
            return self._addNotApplicable(test, err)
299
 
        elif isinstance(err[1], UnavailableFeature):
300
 
            return self.addNotSupported(test, err[1].args[0])
301
 
        else:
302
 
            self._post_mortem()
303
 
            unittest.TestResult.addError(self, test, err)
304
 
            self.error_count += 1
305
 
            self.report_error(test, err)
306
 
            if self.stop_early:
307
 
                self.stop()
308
 
            self._cleanupLogFile(test)
 
325
        self._post_mortem()
 
326
        unittest.TestResult.addError(self, test, err)
 
327
        self.error_count += 1
 
328
        self.report_error(test, err)
 
329
        if self.stop_early:
 
330
            self.stop()
 
331
        self._cleanupLogFile(test)
309
332
 
310
333
    def addFailure(self, test, err):
311
334
        """Tell result that test failed.
313
336
        Called from the TestCase run() method when the test
314
337
        fails because e.g. an assert() method failed.
315
338
        """
316
 
        self._testConcluded(test)
317
 
        if isinstance(err[1], KnownFailure):
318
 
            return self._addKnownFailure(test, err)
319
 
        else:
320
 
            self._post_mortem()
321
 
            unittest.TestResult.addFailure(self, test, err)
322
 
            self.failure_count += 1
323
 
            self.report_failure(test, err)
324
 
            if self.stop_early:
325
 
                self.stop()
326
 
            self._cleanupLogFile(test)
 
339
        self._post_mortem()
 
340
        unittest.TestResult.addFailure(self, test, err)
 
341
        self.failure_count += 1
 
342
        self.report_failure(test, err)
 
343
        if self.stop_early:
 
344
            self.stop()
 
345
        self._cleanupLogFile(test)
327
346
 
328
 
    def addSuccess(self, test):
 
347
    def addSuccess(self, test, details=None):
329
348
        """Tell result that test completed successfully.
330
349
 
331
350
        Called from the TestCase run()
332
351
        """
333
 
        self._testConcluded(test)
334
352
        if self._bench_history is not None:
335
 
            benchmark_time = self._extractBenchmarkTime(test)
 
353
            benchmark_time = self._extractBenchmarkTime(test, details)
336
354
            if benchmark_time is not None:
337
355
                self._bench_history.write("%s %s\n" % (
338
356
                    self._formatTime(benchmark_time),
342
360
        unittest.TestResult.addSuccess(self, test)
343
361
        test._log_contents = ''
344
362
 
345
 
    def _testConcluded(self, test):
346
 
        """Common code when a test has finished.
347
 
 
348
 
        Called regardless of whether it succeded, failed, etc.
349
 
        """
350
 
        pass
351
 
 
352
 
    def _addKnownFailure(self, test, err):
 
363
    def addExpectedFailure(self, test, err):
353
364
        self.known_failure_count += 1
354
365
        self.report_known_failure(test, err)
355
366
 
357
368
        """The test will not be run because of a missing feature.
358
369
        """
359
370
        # this can be called in two different ways: it may be that the
360
 
        # test started running, and then raised (through addError)
 
371
        # test started running, and then raised (through requireFeature)
361
372
        # UnavailableFeature.  Alternatively this method can be called
362
 
        # while probing for features before running the tests; in that
363
 
        # case we will see startTest and stopTest, but the test will never
364
 
        # actually run.
 
373
        # while probing for features before running the test code proper; in
 
374
        # that case we will see startTest and stopTest, but the test will
 
375
        # never actually run.
365
376
        self.unsupported.setdefault(str(feature), 0)
366
377
        self.unsupported[str(feature)] += 1
367
378
        self.report_unsupported(test, feature)
371
382
        self.skip_count += 1
372
383
        self.report_skip(test, reason)
373
384
 
374
 
    def _addNotApplicable(self, test, skip_excinfo):
375
 
        if isinstance(skip_excinfo[1], TestNotApplicable):
376
 
            self.not_applicable_count += 1
377
 
            self.report_not_applicable(test, skip_excinfo)
378
 
        try:
379
 
            test.tearDown()
380
 
        except KeyboardInterrupt:
381
 
            raise
382
 
        except:
383
 
            self.addError(test, test.exc_info())
384
 
        else:
385
 
            # seems best to treat this as success from point-of-view of unittest
386
 
            # -- it actually does nothing so it barely matters :)
387
 
            unittest.TestResult.addSuccess(self, test)
388
 
            test._log_contents = ''
389
 
 
390
 
    def printErrorList(self, flavour, errors):
391
 
        for test, err in errors:
392
 
            self.stream.writeln(self.separator1)
393
 
            self.stream.write("%s: " % flavour)
394
 
            self.stream.writeln(self.getDescription(test))
395
 
            if getattr(test, '_get_log', None) is not None:
396
 
                log_contents = test._get_log()
397
 
                if log_contents:
398
 
                    self.stream.write('\n')
399
 
                    self.stream.write(
400
 
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
401
 
                    self.stream.write('\n')
402
 
                    self.stream.write(log_contents)
403
 
                    self.stream.write('\n')
404
 
                    self.stream.write(
405
 
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
406
 
                    self.stream.write('\n')
407
 
            self.stream.writeln(self.separator2)
408
 
            self.stream.writeln("%s" % err)
 
385
    def addNotApplicable(self, test, reason):
 
386
        self.not_applicable_count += 1
 
387
        self.report_not_applicable(test, reason)
409
388
 
410
389
    def _post_mortem(self):
411
390
        """Start a PDB post mortem session."""
492
471
            a += '%dm%ds' % (runtime / 60, runtime % 60)
493
472
        else:
494
473
            a += '%ds' % runtime
495
 
        if self.error_count:
496
 
            a += ', %d err' % self.error_count
497
 
        if self.failure_count:
498
 
            a += ', %d fail' % self.failure_count
499
 
        if self.unsupported:
500
 
            a += ', %d missing' % len(self.unsupported)
 
474
        total_fail_count = self.error_count + self.failure_count
 
475
        if total_fail_count:
 
476
            a += ', %d failed' % total_fail_count
 
477
        # if self.unsupported:
 
478
        #     a += ', %d missing' % len(self.unsupported)
501
479
        a += ']'
502
480
        return a
503
481
 
512
490
        return self._shortened_test_description(test)
513
491
 
514
492
    def report_error(self, test, err):
515
 
        self.pb.note('ERROR: %s\n    %s\n',
 
493
        self.stream.write('ERROR: %s\n    %s\n' % (
516
494
            self._test_description(test),
517
495
            err[1],
518
 
            )
 
496
            ))
519
497
 
520
498
    def report_failure(self, test, err):
521
 
        self.pb.note('FAIL: %s\n    %s\n',
 
499
        self.stream.write('FAIL: %s\n    %s\n' % (
522
500
            self._test_description(test),
523
501
            err[1],
524
 
            )
 
502
            ))
525
503
 
526
504
    def report_known_failure(self, test, err):
527
 
        self.pb.note('XFAIL: %s\n%s\n',
528
 
            self._test_description(test), err[1])
 
505
        pass
529
506
 
530
507
    def report_skip(self, test, reason):
531
508
        pass
532
509
 
533
 
    def report_not_applicable(self, test, skip_excinfo):
 
510
    def report_not_applicable(self, test, reason):
534
511
        pass
535
512
 
536
513
    def report_unsupported(self, test, feature):
558
535
    def report_test_start(self, test):
559
536
        self.count += 1
560
537
        name = self._shortened_test_description(test)
561
 
        # width needs space for 6 char status, plus 1 for slash, plus an
562
 
        # 11-char time string, plus a trailing blank
563
 
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
564
 
        self.stream.write(self._ellipsize_to_right(name,
565
 
                          osutils.terminal_width()-18))
 
538
        width = osutils.terminal_width()
 
539
        if width is not None:
 
540
            # width needs space for 6 char status, plus 1 for slash, plus an
 
541
            # 11-char time string, plus a trailing blank
 
542
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
543
            # space
 
544
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
545
        else:
 
546
            self.stream.write(name)
566
547
        self.stream.flush()
567
548
 
568
549
    def _error_summary(self, err):
597
578
        self.stream.writeln(' SKIP %s\n%s'
598
579
                % (self._testTimeString(test), reason))
599
580
 
600
 
    def report_not_applicable(self, test, skip_excinfo):
601
 
        self.stream.writeln('  N/A %s\n%s'
602
 
                % (self._testTimeString(test),
603
 
                   self._error_summary(skip_excinfo)))
 
581
    def report_not_applicable(self, test, reason):
 
582
        self.stream.writeln('  N/A %s\n    %s'
 
583
                % (self._testTimeString(test), reason))
604
584
 
605
585
    def report_unsupported(self, test, feature):
606
586
        """test cannot be run because feature is missing."""
626
606
            applied left to right - the first element in the list is the 
627
607
            innermost decorator.
628
608
        """
 
609
        # stream may know claim to know to write unicode strings, but in older
 
610
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
611
        # specifically a built in file with encoding 'UTF-8' will still try
 
612
        # to encode using ascii.
 
613
        new_encoding = osutils.get_terminal_encoding()
 
614
        codec = codecs.lookup(new_encoding)
 
615
        if type(codec) is tuple:
 
616
            # Python 2.4
 
617
            encode = codec[0]
 
618
        else:
 
619
            encode = codec.encode
 
620
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
 
621
        stream.encoding = new_encoding
629
622
        self.stream = unittest._WritelnDecorator(stream)
630
623
        self.descriptions = descriptions
631
624
        self.verbosity = verbosity
651
644
        for decorator in self._result_decorators:
652
645
            result = decorator(result)
653
646
            result.stop_early = self.stop_on_failure
654
 
        try:
655
 
            import testtools
656
 
        except ImportError:
657
 
            pass
658
 
        else:
659
 
            if isinstance(test, testtools.ConcurrentTestSuite):
660
 
                # We need to catch bzr specific behaviors
661
 
                result = BZRTransformingResult(result)
662
647
        result.startTestRun()
663
648
        try:
664
649
            test.run(result)
682
667
                        % (type(suite), suite))
683
668
 
684
669
 
685
 
class TestSkipped(Exception):
686
 
    """Indicates that a test was intentionally skipped, rather than failing."""
 
670
TestSkipped = testtools.testcase.TestSkipped
687
671
 
688
672
 
689
673
class TestNotApplicable(TestSkipped):
695
679
    """
696
680
 
697
681
 
698
 
class KnownFailure(AssertionError):
699
 
    """Indicates that a test failed in a precisely expected manner.
700
 
 
701
 
    Such failures dont block the whole test suite from passing because they are
702
 
    indicators of partially completed code or of future work. We have an
703
 
    explicit error for them so that we can ensure that they are always visible:
704
 
    KnownFailures are always shown in the output of bzr selftest.
705
 
    """
 
682
# traceback._some_str fails to format exceptions that have the default
 
683
# __str__ which does an implicit ascii conversion. However, repr() on those
 
684
# objects works, for all that its not quite what the doctor may have ordered.
 
685
def _clever_some_str(value):
 
686
    try:
 
687
        return str(value)
 
688
    except:
 
689
        try:
 
690
            return repr(value).replace('\\n', '\n')
 
691
        except:
 
692
            return '<unprintable %s object>' % type(value).__name__
 
693
 
 
694
traceback._some_str = _clever_some_str
 
695
 
 
696
 
 
697
# deprecated - use self.knownFailure(), or self.expectFailure.
 
698
KnownFailure = testtools.testcase._ExpectedFailure
706
699
 
707
700
 
708
701
class UnavailableFeature(Exception):
709
702
    """A feature required for this test was not available.
710
703
 
 
704
    This can be considered a specialised form of SkippedTest.
 
705
 
711
706
    The feature should be used to construct the exception.
712
707
    """
713
708
 
714
709
 
715
 
class CommandFailed(Exception):
716
 
    pass
717
 
 
718
 
 
719
710
class StringIOWrapper(object):
720
711
    """A wrapper around cStringIO which just adds an encoding attribute.
721
712
 
758
749
    # XXX: Should probably unify more with CannedInputUIFactory or a
759
750
    # particular configuration of TextUIFactory, or otherwise have a clearer
760
751
    # idea of how they're supposed to be different.
761
 
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
752
    # See https://bugs.launchpad.net/bzr/+bug/408213
762
753
 
763
754
    def __init__(self, stdout=None, stderr=None, stdin=None):
764
755
        if stdin is not None:
782
773
        return NullProgressView()
783
774
 
784
775
 
785
 
class TestCase(unittest.TestCase):
 
776
class TestCase(testtools.TestCase):
786
777
    """Base class for bzr unit tests.
787
778
 
788
779
    Tests that need access to disk resources should subclass
807
798
    _leaking_threads_tests = 0
808
799
    _first_thread_leaker_id = None
809
800
    _log_file_name = None
810
 
    _log_contents = ''
811
 
    _keep_log_file = False
812
801
    # record lsprof data when performing benchmark calls.
813
802
    _gather_lsprof_in_benchmarks = False
814
 
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
815
 
                     '_log_contents', '_log_file_name', '_benchtime',
816
 
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
817
803
 
818
804
    def __init__(self, methodName='testMethod'):
819
805
        super(TestCase, self).__init__(methodName)
820
806
        self._cleanups = []
821
 
        self._bzr_test_setUp_run = False
822
 
        self._bzr_test_tearDown_run = False
823
807
        self._directory_isolation = True
 
808
        self.exception_handlers.insert(0,
 
809
            (UnavailableFeature, self._do_unsupported_or_skip))
 
810
        self.exception_handlers.insert(0,
 
811
            (TestNotApplicable, self._do_not_applicable))
824
812
 
825
813
    def setUp(self):
826
 
        unittest.TestCase.setUp(self)
827
 
        self._bzr_test_setUp_run = True
 
814
        super(TestCase, self).setUp()
 
815
        for feature in getattr(self, '_test_needs_features', []):
 
816
            self.requireFeature(feature)
 
817
        self._log_contents = None
 
818
        self.addDetail("log", content.Content(content.ContentType("text",
 
819
            "plain", {"charset": "utf8"}),
 
820
            lambda:[self._get_log(keep_log_file=True)]))
828
821
        self._cleanEnvironment()
829
822
        self._silenceUI()
830
823
        self._startLogFile()
846
839
        active = threading.activeCount()
847
840
        leaked_threads = active - TestCase._active_threads
848
841
        TestCase._active_threads = active
849
 
        if leaked_threads:
 
842
        # If some tests make the number of threads *decrease*, we'll consider
 
843
        # that they are just observing old threads dieing, not agressively kill
 
844
        # random threads. So we don't report these tests as leaking. The risk
 
845
        # is that we have false positives that way (the test see 2 threads
 
846
        # going away but leak one) but it seems less likely than the actual
 
847
        # false positives (the test see threads going away and does not leak).
 
848
        if leaked_threads > 0:
850
849
            TestCase._leaking_threads_tests += 1
851
850
            if TestCase._first_thread_leaker_id is None:
852
851
                TestCase._first_thread_leaker_id = self.id()
857
856
        Tests that want to use debug flags can just set them in the
858
857
        debug_flags set during setup/teardown.
859
858
        """
860
 
        self._preserved_debug_flags = set(debug.debug_flags)
 
859
        # Start with a copy of the current debug flags we can safely modify.
 
860
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
861
861
        if 'allow_debug' not in selftest_debug_flags:
862
862
            debug.debug_flags.clear()
863
863
        if 'disable_lock_checks' not in selftest_debug_flags:
864
864
            debug.debug_flags.add('strict_locks')
865
 
        self.addCleanup(self._restore_debug_flags)
866
865
 
867
866
    def _clear_hooks(self):
868
867
        # prevent hooks affecting tests
889
888
    def _silenceUI(self):
890
889
        """Turn off UI for duration of test"""
891
890
        # by default the UI is off; tests can turn it on if they want it.
892
 
        saved = ui.ui_factory
893
 
        def _restore():
894
 
            ui.ui_factory = saved
895
 
        ui.ui_factory = ui.SilentUIFactory()
896
 
        self.addCleanup(_restore)
 
891
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
897
892
 
898
893
    def _check_locks(self):
899
894
        """Check that all lock take/release actions have been paired."""
928
923
            self._lock_check_thorough = False
929
924
        else:
930
925
            self._lock_check_thorough = True
931
 
            
 
926
 
932
927
        self.addCleanup(self._check_locks)
933
928
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
934
929
                                                self._lock_acquired, None)
1027
1022
        server's urls to be used.
1028
1023
        """
1029
1024
        if backing_server is None:
1030
 
            transport_server.setUp()
 
1025
            transport_server.start_server()
1031
1026
        else:
1032
 
            transport_server.setUp(backing_server)
1033
 
        self.addCleanup(transport_server.tearDown)
 
1027
            transport_server.start_server(backing_server)
 
1028
        self.addCleanup(transport_server.stop_server)
1034
1029
        # Obtain a real transport because if the server supplies a password, it
1035
1030
        # will be hidden from the base on the client side.
1036
1031
        t = get_transport(transport_server.get_url())
1050
1045
        if t.base.endswith('/work/'):
1051
1046
            # we have safety net/test root/work
1052
1047
            t = t.clone('../..')
1053
 
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1048
        elif isinstance(transport_server,
 
1049
                        test_server.SmartTCPServer_for_testing):
1054
1050
            # The smart server adds a path similar to work, which is traversed
1055
1051
            # up from by the client. But the server is chrooted - the actual
1056
1052
            # backing transport is not escaped from, and VFS requests to the
1133
1129
        :raises AssertionError: If the expected and actual stat values differ
1134
1130
            other than by atime.
1135
1131
        """
1136
 
        self.assertEqual(expected.st_size, actual.st_size)
1137
 
        self.assertEqual(expected.st_mtime, actual.st_mtime)
1138
 
        self.assertEqual(expected.st_ctime, actual.st_ctime)
1139
 
        self.assertEqual(expected.st_dev, actual.st_dev)
1140
 
        self.assertEqual(expected.st_ino, actual.st_ino)
1141
 
        self.assertEqual(expected.st_mode, actual.st_mode)
 
1132
        self.assertEqual(expected.st_size, actual.st_size,
 
1133
                         'st_size did not match')
 
1134
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1135
                         'st_mtime did not match')
 
1136
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1137
                         'st_ctime did not match')
 
1138
        if sys.platform != 'win32':
 
1139
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1140
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1141
            # odd. Regardless we shouldn't actually try to assert anything
 
1142
            # about their values
 
1143
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1144
                             'st_dev did not match')
 
1145
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1146
                             'st_ino did not match')
 
1147
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1148
                         'st_mode did not match')
1142
1149
 
1143
1150
    def assertLength(self, length, obj_with_len):
1144
1151
        """Assert that obj_with_len is of length length."""
1146
1153
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
1147
1154
                length, len(obj_with_len), obj_with_len))
1148
1155
 
 
1156
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1157
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
 
1158
        """
 
1159
        from bzrlib import trace
 
1160
        captured = []
 
1161
        orig_log_exception_quietly = trace.log_exception_quietly
 
1162
        try:
 
1163
            def capture():
 
1164
                orig_log_exception_quietly()
 
1165
                captured.append(sys.exc_info())
 
1166
            trace.log_exception_quietly = capture
 
1167
            func(*args, **kwargs)
 
1168
        finally:
 
1169
            trace.log_exception_quietly = orig_log_exception_quietly
 
1170
        self.assertLength(1, captured)
 
1171
        err = captured[0][1]
 
1172
        self.assertIsInstance(err, exception_class)
 
1173
        return err
 
1174
 
1149
1175
    def assertPositive(self, val):
1150
1176
        """Assert that val is greater than 0."""
1151
1177
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1181
1207
            raise AssertionError('pattern "%s" found in "%s"'
1182
1208
                    % (needle_re, haystack))
1183
1209
 
 
1210
    def assertContainsString(self, haystack, needle):
 
1211
        if haystack.find(needle) == -1:
 
1212
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1213
 
1184
1214
    def assertSubset(self, sublist, superlist):
1185
1215
        """Assert that every entry in sublist is present in superlist."""
1186
1216
        missing = set(sublist) - set(superlist)
1273
1303
                m += ": " + msg
1274
1304
            self.fail(m)
1275
1305
 
1276
 
    def expectFailure(self, reason, assertion, *args, **kwargs):
1277
 
        """Invoke a test, expecting it to fail for the given reason.
1278
 
 
1279
 
        This is for assertions that ought to succeed, but currently fail.
1280
 
        (The failure is *expected* but not *wanted*.)  Please be very precise
1281
 
        about the failure you're expecting.  If a new bug is introduced,
1282
 
        AssertionError should be raised, not KnownFailure.
1283
 
 
1284
 
        Frequently, expectFailure should be followed by an opposite assertion.
1285
 
        See example below.
1286
 
 
1287
 
        Intended to be used with a callable that raises AssertionError as the
1288
 
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
1289
 
 
1290
 
        Raises KnownFailure if the test fails.  Raises AssertionError if the
1291
 
        test succeeds.
1292
 
 
1293
 
        example usage::
1294
 
 
1295
 
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
1296
 
                             dynamic_val)
1297
 
          self.assertEqual(42, dynamic_val)
1298
 
 
1299
 
          This means that a dynamic_val of 54 will cause the test to raise
1300
 
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
1301
 
          only a dynamic_val of 42 will allow the test to pass.  Anything other
1302
 
          than 54 or 42 will cause an AssertionError.
1303
 
        """
1304
 
        try:
1305
 
            assertion(*args, **kwargs)
1306
 
        except AssertionError:
1307
 
            raise KnownFailure(reason)
1308
 
        else:
1309
 
            self.fail('Unexpected success.  Should have failed: %s' % reason)
1310
 
 
1311
1306
    def assertFileEqual(self, content, path):
1312
1307
        """Fail if path does not contain 'content'."""
1313
1308
        self.failUnlessExists(path)
1318
1313
            f.close()
1319
1314
        self.assertEqualDiff(content, s)
1320
1315
 
 
1316
    def assertDocstring(self, expected_docstring, obj):
 
1317
        """Fail if obj does not have expected_docstring"""
 
1318
        if __doc__ is None:
 
1319
            # With -OO the docstring should be None instead
 
1320
            self.assertIs(obj.__doc__, None)
 
1321
        else:
 
1322
            self.assertEqual(expected_docstring, obj.__doc__)
 
1323
 
1321
1324
    def failUnlessExists(self, path):
1322
1325
        """Fail unless path or paths, which may be abs or relative, exist."""
1323
1326
        if not isinstance(path, basestring):
1463
1466
 
1464
1467
        Close the file and delete it, unless setKeepLogfile was called.
1465
1468
        """
1466
 
        if self._log_file is None:
1467
 
            return
 
1469
        if bzrlib.trace._trace_file:
 
1470
            # flush the log file, to get all content
 
1471
            bzrlib.trace._trace_file.flush()
1468
1472
        bzrlib.trace.pop_log_file(self._log_memento)
1469
 
        self._log_file.close()
1470
 
        self._log_file = None
1471
 
        if not self._keep_log_file:
1472
 
            os.remove(self._log_file_name)
1473
 
            self._log_file_name = None
1474
 
 
1475
 
    def setKeepLogfile(self):
1476
 
        """Make the logfile not be deleted when _finishLogFile is called."""
1477
 
        self._keep_log_file = True
 
1473
        # Cache the log result and delete the file on disk
 
1474
        self._get_log(False)
1478
1475
 
1479
1476
    def thisFailsStrictLockCheck(self):
1480
1477
        """It is known that this test would fail with -Dstrict_locks.
1497
1494
        """
1498
1495
        self._cleanups.append((callable, args, kwargs))
1499
1496
 
 
1497
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1498
        """Overrides an object attribute restoring it after the test.
 
1499
 
 
1500
        :param obj: The object that will be mutated.
 
1501
 
 
1502
        :param attr_name: The attribute name we want to preserve/override in
 
1503
            the object.
 
1504
 
 
1505
        :param new: The optional value we want to set the attribute to.
 
1506
 
 
1507
        :returns: The actual attr value.
 
1508
        """
 
1509
        value = getattr(obj, attr_name)
 
1510
        # The actual value is captured by the call below
 
1511
        self.addCleanup(setattr, obj, attr_name, value)
 
1512
        if new is not _unitialized_attr:
 
1513
            setattr(obj, attr_name, new)
 
1514
        return value
 
1515
 
1500
1516
    def _cleanEnvironment(self):
1501
1517
        new_env = {
1502
1518
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1508
1524
            'EDITOR': None,
1509
1525
            'BZR_EMAIL': None,
1510
1526
            'BZREMAIL': None, # may still be present in the environment
1511
 
            'EMAIL': None,
 
1527
            'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1512
1528
            'BZR_PROGRESS_BAR': None,
1513
1529
            'BZR_LOG': None,
1514
1530
            'BZR_PLUGIN_PATH': None,
 
1531
            'BZR_DISABLE_PLUGINS': None,
 
1532
            'BZR_PLUGINS_AT': None,
 
1533
            'BZR_CONCURRENCY': None,
1515
1534
            # Make sure that any text ui tests are consistent regardless of
1516
1535
            # the environment the test case is run in; you may want tests that
1517
1536
            # test other combinations.  'dumb' is a reasonable guess for tests
1519
1538
            'TERM': 'dumb',
1520
1539
            'LINES': '25',
1521
1540
            'COLUMNS': '80',
 
1541
            'BZR_COLUMNS': '80',
1522
1542
            # SSH Agent
1523
1543
            'SSH_AUTH_SOCK': None,
1524
1544
            # Proxies
1536
1556
            'ftp_proxy': None,
1537
1557
            'FTP_PROXY': None,
1538
1558
            'BZR_REMOTE_PATH': None,
 
1559
            # Generally speaking, we don't want apport reporting on crashes in
 
1560
            # the test envirnoment unless we're specifically testing apport,
 
1561
            # so that it doesn't leak into the real system environment.  We
 
1562
            # use an env var so it propagates to subprocesses.
 
1563
            'APPORT_DISABLE': '1',
1539
1564
        }
1540
 
        self.__old_env = {}
 
1565
        self._old_env = {}
1541
1566
        self.addCleanup(self._restoreEnvironment)
1542
1567
        for name, value in new_env.iteritems():
1543
1568
            self._captureVar(name, value)
1544
1569
 
1545
1570
    def _captureVar(self, name, newvalue):
1546
1571
        """Set an environment variable, and reset it when finished."""
1547
 
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1548
 
 
1549
 
    def _restore_debug_flags(self):
1550
 
        debug.debug_flags.clear()
1551
 
        debug.debug_flags.update(self._preserved_debug_flags)
 
1572
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1552
1573
 
1553
1574
    def _restoreEnvironment(self):
1554
 
        for name, value in self.__old_env.iteritems():
 
1575
        for name, value in self._old_env.iteritems():
1555
1576
            osutils.set_or_unset_env(name, value)
1556
1577
 
1557
1578
    def _restoreHooks(self):
1565
1586
    def _do_skip(self, result, reason):
1566
1587
        addSkip = getattr(result, 'addSkip', None)
1567
1588
        if not callable(addSkip):
1568
 
            result.addError(self, sys.exc_info())
 
1589
            result.addSuccess(result)
1569
1590
        else:
1570
1591
            addSkip(self, reason)
1571
1592
 
1572
 
    def run(self, result=None):
1573
 
        if result is None: result = self.defaultTestResult()
1574
 
        for feature in getattr(self, '_test_needs_features', []):
1575
 
            if not feature.available():
1576
 
                result.startTest(self)
1577
 
                if getattr(result, 'addNotSupported', None):
1578
 
                    result.addNotSupported(self, feature)
1579
 
                else:
1580
 
                    result.addSuccess(self)
1581
 
                result.stopTest(self)
1582
 
                return result
1583
 
        try:
1584
 
            try:
1585
 
                result.startTest(self)
1586
 
                absent_attr = object()
1587
 
                # Python 2.5
1588
 
                method_name = getattr(self, '_testMethodName', absent_attr)
1589
 
                if method_name is absent_attr:
1590
 
                    # Python 2.4
1591
 
                    method_name = getattr(self, '_TestCase__testMethodName')
1592
 
                testMethod = getattr(self, method_name)
1593
 
                try:
1594
 
                    try:
1595
 
                        self.setUp()
1596
 
                        if not self._bzr_test_setUp_run:
1597
 
                            self.fail(
1598
 
                                "test setUp did not invoke "
1599
 
                                "bzrlib.tests.TestCase's setUp")
1600
 
                    except KeyboardInterrupt:
1601
 
                        self._runCleanups()
1602
 
                        raise
1603
 
                    except TestSkipped, e:
1604
 
                        self._do_skip(result, e.args[0])
1605
 
                        self.tearDown()
1606
 
                        return result
1607
 
                    except:
1608
 
                        result.addError(self, sys.exc_info())
1609
 
                        self._runCleanups()
1610
 
                        return result
1611
 
 
1612
 
                    ok = False
1613
 
                    try:
1614
 
                        testMethod()
1615
 
                        ok = True
1616
 
                    except self.failureException:
1617
 
                        result.addFailure(self, sys.exc_info())
1618
 
                    except TestSkipped, e:
1619
 
                        if not e.args:
1620
 
                            reason = "No reason given."
1621
 
                        else:
1622
 
                            reason = e.args[0]
1623
 
                        self._do_skip(result, reason)
1624
 
                    except KeyboardInterrupt:
1625
 
                        self._runCleanups()
1626
 
                        raise
1627
 
                    except:
1628
 
                        result.addError(self, sys.exc_info())
1629
 
 
1630
 
                    try:
1631
 
                        self.tearDown()
1632
 
                        if not self._bzr_test_tearDown_run:
1633
 
                            self.fail(
1634
 
                                "test tearDown did not invoke "
1635
 
                                "bzrlib.tests.TestCase's tearDown")
1636
 
                    except KeyboardInterrupt:
1637
 
                        self._runCleanups()
1638
 
                        raise
1639
 
                    except:
1640
 
                        result.addError(self, sys.exc_info())
1641
 
                        self._runCleanups()
1642
 
                        ok = False
1643
 
                    if ok: result.addSuccess(self)
1644
 
                finally:
1645
 
                    result.stopTest(self)
1646
 
                return result
1647
 
            except TestNotApplicable:
1648
 
                # Not moved from the result [yet].
1649
 
                self._runCleanups()
1650
 
                raise
1651
 
            except KeyboardInterrupt:
1652
 
                self._runCleanups()
1653
 
                raise
1654
 
        finally:
1655
 
            saved_attrs = {}
1656
 
            for attr_name in self.attrs_to_keep:
1657
 
                if attr_name in self.__dict__:
1658
 
                    saved_attrs[attr_name] = self.__dict__[attr_name]
1659
 
            self.__dict__ = saved_attrs
1660
 
 
1661
 
    def tearDown(self):
1662
 
        self._runCleanups()
1663
 
        self._log_contents = ''
1664
 
        self._bzr_test_tearDown_run = True
1665
 
        unittest.TestCase.tearDown(self)
 
1593
    @staticmethod
 
1594
    def _do_known_failure(self, result, e):
 
1595
        err = sys.exc_info()
 
1596
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1597
        if addExpectedFailure is not None:
 
1598
            addExpectedFailure(self, err)
 
1599
        else:
 
1600
            result.addSuccess(self)
 
1601
 
 
1602
    @staticmethod
 
1603
    def _do_not_applicable(self, result, e):
 
1604
        if not e.args:
 
1605
            reason = 'No reason given'
 
1606
        else:
 
1607
            reason = e.args[0]
 
1608
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1609
        if addNotApplicable is not None:
 
1610
            result.addNotApplicable(self, reason)
 
1611
        else:
 
1612
            self._do_skip(result, reason)
 
1613
 
 
1614
    @staticmethod
 
1615
    def _do_unsupported_or_skip(self, result, e):
 
1616
        reason = e.args[0]
 
1617
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1618
        if addNotSupported is not None:
 
1619
            result.addNotSupported(self, reason)
 
1620
        else:
 
1621
            self._do_skip(result, reason)
1666
1622
 
1667
1623
    def time(self, callable, *args, **kwargs):
1668
1624
        """Run callable and accrue the time it takes to the benchmark time.
1672
1628
        self._benchcalls.
1673
1629
        """
1674
1630
        if self._benchtime is None:
 
1631
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1632
                "text", "plain"), lambda:[str(self._benchtime)]))
1675
1633
            self._benchtime = 0
1676
1634
        start = time.time()
1677
1635
        try:
1686
1644
        finally:
1687
1645
            self._benchtime += time.time() - start
1688
1646
 
1689
 
    def _runCleanups(self):
1690
 
        """Run registered cleanup functions.
1691
 
 
1692
 
        This should only be called from TestCase.tearDown.
1693
 
        """
1694
 
        # TODO: Perhaps this should keep running cleanups even if
1695
 
        # one of them fails?
1696
 
 
1697
 
        # Actually pop the cleanups from the list so tearDown running
1698
 
        # twice is safe (this happens for skipped tests).
1699
 
        while self._cleanups:
1700
 
            cleanup, args, kwargs = self._cleanups.pop()
1701
 
            cleanup(*args, **kwargs)
1702
 
 
1703
1647
    def log(self, *args):
1704
1648
        mutter(*args)
1705
1649
 
1706
1650
    def _get_log(self, keep_log_file=False):
1707
 
        """Get the log from bzrlib.trace calls from this test.
 
1651
        """Internal helper to get the log from bzrlib.trace for this test.
 
1652
 
 
1653
        Please use self.getDetails, or self.get_log to access this in test case
 
1654
        code.
1708
1655
 
1709
1656
        :param keep_log_file: When True, if the log is still a file on disk
1710
1657
            leave it as a file on disk. When False, if the log is still a file
1712
1659
            self._log_contents.
1713
1660
        :return: A string containing the log.
1714
1661
        """
1715
 
        # flush the log file, to get all content
 
1662
        if self._log_contents is not None:
 
1663
            try:
 
1664
                self._log_contents.decode('utf8')
 
1665
            except UnicodeDecodeError:
 
1666
                unicodestr = self._log_contents.decode('utf8', 'replace')
 
1667
                self._log_contents = unicodestr.encode('utf8')
 
1668
            return self._log_contents
1716
1669
        import bzrlib.trace
1717
1670
        if bzrlib.trace._trace_file:
 
1671
            # flush the log file, to get all content
1718
1672
            bzrlib.trace._trace_file.flush()
1719
 
        if self._log_contents:
1720
 
            # XXX: this can hardly contain the content flushed above --vila
1721
 
            # 20080128
1722
 
            return self._log_contents
1723
1673
        if self._log_file_name is not None:
1724
1674
            logfile = open(self._log_file_name)
1725
1675
            try:
1726
1676
                log_contents = logfile.read()
1727
1677
            finally:
1728
1678
                logfile.close()
 
1679
            try:
 
1680
                log_contents.decode('utf8')
 
1681
            except UnicodeDecodeError:
 
1682
                unicodestr = log_contents.decode('utf8', 'replace')
 
1683
                log_contents = unicodestr.encode('utf8')
1729
1684
            if not keep_log_file:
 
1685
                close_attempts = 0
 
1686
                max_close_attempts = 100
 
1687
                first_close_error = None
 
1688
                while close_attempts < max_close_attempts:
 
1689
                    close_attempts += 1
 
1690
                    try:
 
1691
                        self._log_file.close()
 
1692
                    except IOError, ioe:
 
1693
                        if ioe.errno is None:
 
1694
                            # No errno implies 'close() called during
 
1695
                            # concurrent operation on the same file object', so
 
1696
                            # retry.  Probably a thread is trying to write to
 
1697
                            # the log file.
 
1698
                            if first_close_error is None:
 
1699
                                first_close_error = ioe
 
1700
                            continue
 
1701
                        raise
 
1702
                    else:
 
1703
                        break
 
1704
                if close_attempts > 1:
 
1705
                    sys.stderr.write(
 
1706
                        'Unable to close log file on first attempt, '
 
1707
                        'will retry: %s\n' % (first_close_error,))
 
1708
                    if close_attempts == max_close_attempts:
 
1709
                        sys.stderr.write(
 
1710
                            'Unable to close log file after %d attempts.\n'
 
1711
                            % (max_close_attempts,))
 
1712
                self._log_file = None
 
1713
                # Permit multiple calls to get_log until we clean it up in
 
1714
                # finishLogFile
1730
1715
                self._log_contents = log_contents
1731
1716
                try:
1732
1717
                    os.remove(self._log_file_name)
1736
1721
                                             ' %r\n' % self._log_file_name))
1737
1722
                    else:
1738
1723
                        raise
 
1724
                self._log_file_name = None
1739
1725
            return log_contents
1740
1726
        else:
1741
 
            return "DELETED log file to reduce memory footprint"
 
1727
            return "No log file content and no log file name."
 
1728
 
 
1729
    def get_log(self):
 
1730
        """Get a unicode string containing the log from bzrlib.trace.
 
1731
 
 
1732
        Undecodable characters are replaced.
 
1733
        """
 
1734
        return u"".join(self.getDetails()['log'].iter_text())
1742
1735
 
1743
1736
    def requireFeature(self, feature):
1744
1737
        """This test requires a specific feature is available.
1761
1754
 
1762
1755
    def _run_bzr_core(self, args, retcode, encoding, stdin,
1763
1756
            working_dir):
 
1757
        # Clear chk_map page cache, because the contents are likely to mask
 
1758
        # locking errors.
 
1759
        chk_map.clear_cache()
1764
1760
        if encoding is None:
1765
1761
            encoding = osutils.get_user_encoding()
1766
1762
        stdout = StringIOWrapper()
1783
1779
            os.chdir(working_dir)
1784
1780
 
1785
1781
        try:
1786
 
            result = self.apply_redirected(ui.ui_factory.stdin,
1787
 
                stdout, stderr,
1788
 
                bzrlib.commands.run_bzr_catch_user_errors,
1789
 
                args)
 
1782
            try:
 
1783
                result = self.apply_redirected(ui.ui_factory.stdin,
 
1784
                    stdout, stderr,
 
1785
                    bzrlib.commands.run_bzr_catch_user_errors,
 
1786
                    args)
 
1787
            except KeyboardInterrupt:
 
1788
                # Reraise KeyboardInterrupt with contents of redirected stdout
 
1789
                # and stderr as arguments, for tests which are interested in
 
1790
                # stdout and stderr and are expecting the exception.
 
1791
                out = stdout.getvalue()
 
1792
                err = stderr.getvalue()
 
1793
                if out:
 
1794
                    self.log('output:\n%r', out)
 
1795
                if err:
 
1796
                    self.log('errors:\n%r', err)
 
1797
                raise KeyboardInterrupt(out, err)
1790
1798
        finally:
1791
1799
            logger.removeHandler(handler)
1792
1800
            ui.ui_factory = old_ui_factory
2092
2100
 
2093
2101
        Tests that expect to provoke LockContention errors should call this.
2094
2102
        """
2095
 
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2096
 
        def resetTimeout():
2097
 
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2098
 
        self.addCleanup(resetTimeout)
2099
 
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
2103
        self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2100
2104
 
2101
2105
    def make_utf8_encoded_stringio(self, encoding_type=None):
2102
2106
        """Return a StringIOWrapper instance, that will encode Unicode
2116
2120
        request_handlers = request.request_handlers
2117
2121
        orig_method = request_handlers.get(verb)
2118
2122
        request_handlers.remove(verb)
2119
 
        def restoreVerb():
2120
 
            request_handlers.register(verb, orig_method)
2121
 
        self.addCleanup(restoreVerb)
 
2123
        self.addCleanup(request_handlers.register, verb, orig_method)
2122
2124
 
2123
2125
 
2124
2126
class CapturedCall(object):
2215
2217
        if self.__readonly_server is None:
2216
2218
            if self.transport_readonly_server is None:
2217
2219
                # readonly decorator requested
2218
 
                self.__readonly_server = ReadonlyServer()
 
2220
                self.__readonly_server = test_server.ReadonlyServer()
2219
2221
            else:
2220
2222
                # explicit readonly transport.
2221
2223
                self.__readonly_server = self.create_transport_readonly_server()
2244
2246
        is no means to override it.
2245
2247
        """
2246
2248
        if self.__vfs_server is None:
2247
 
            self.__vfs_server = MemoryServer()
 
2249
            self.__vfs_server = memory.MemoryServer()
2248
2250
            self.start_server(self.__vfs_server)
2249
2251
        return self.__vfs_server
2250
2252
 
2340
2342
            # recreate a new one or all the followng tests will fail.
2341
2343
            # If you need to inspect its content uncomment the following line
2342
2344
            # import pdb; pdb.set_trace()
2343
 
            _rmtree_temp_dir(root + '/.bzr')
 
2345
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2344
2346
            self._create_safety_net()
2345
2347
            raise AssertionError('%s/.bzr should not be modified' % root)
2346
2348
 
2407
2409
        return made_control.create_repository(shared=shared)
2408
2410
 
2409
2411
    def make_smart_server(self, path):
2410
 
        smart_server = server.SmartTCPServer_for_testing()
 
2412
        smart_server = test_server.SmartTCPServer_for_testing()
2411
2413
        self.start_server(smart_server, self.get_server())
2412
2414
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2413
2415
        return remote_transport
2422
2424
        return branchbuilder.BranchBuilder(branch=branch)
2423
2425
 
2424
2426
    def overrideEnvironmentForTesting(self):
2425
 
        os.environ['HOME'] = self.test_home_dir
2426
 
        os.environ['BZR_HOME'] = self.test_home_dir
 
2427
        test_home_dir = self.test_home_dir
 
2428
        if isinstance(test_home_dir, unicode):
 
2429
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2430
        os.environ['HOME'] = test_home_dir
 
2431
        os.environ['BZR_HOME'] = test_home_dir
2427
2432
 
2428
2433
    def setUp(self):
2429
2434
        super(TestCaseWithMemoryTransport, self).setUp()
2430
2435
        self._make_test_root()
2431
 
        _currentdir = os.getcwdu()
2432
 
        def _leaveDirectory():
2433
 
            os.chdir(_currentdir)
2434
 
        self.addCleanup(_leaveDirectory)
 
2436
        self.addCleanup(os.chdir, os.getcwdu())
2435
2437
        self.makeAndChdirToTestDir()
2436
2438
        self.overrideEnvironmentForTesting()
2437
2439
        self.__readonly_server = None
2440
2442
 
2441
2443
    def setup_smart_server_with_call_log(self):
2442
2444
        """Sets up a smart server as the transport server with a call log."""
2443
 
        self.transport_server = server.SmartTCPServer_for_testing
 
2445
        self.transport_server = test_server.SmartTCPServer_for_testing
2444
2446
        self.hpss_calls = []
2445
2447
        import traceback
2446
2448
        # Skip the current stack down to the caller of
2539
2541
 
2540
2542
    def deleteTestDir(self):
2541
2543
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2542
 
        _rmtree_temp_dir(self.test_base_dir)
 
2544
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2543
2545
 
2544
2546
    def build_tree(self, shape, line_endings='binary', transport=None):
2545
2547
        """Build a test tree according to a pattern.
2580
2582
                content = "contents of %s%s" % (name.encode('utf-8'), end)
2581
2583
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
2582
2584
 
2583
 
    def build_tree_contents(self, shape):
2584
 
        build_tree_contents(shape)
 
2585
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
2585
2586
 
2586
2587
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2587
2588
        """Assert whether path or paths are in the WorkingTree"""
2663
2664
            # We can only make working trees locally at the moment.  If the
2664
2665
            # transport can't support them, then we keep the non-disk-backed
2665
2666
            # branch and create a local checkout.
2666
 
            if self.vfs_transport_factory is LocalURLServer:
 
2667
            if self.vfs_transport_factory is test_server.LocalURLServer:
2667
2668
                # the branch is colocated on disk, we cannot create a checkout.
2668
2669
                # hopefully callers will expect this.
2669
2670
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2729
2730
 
2730
2731
    def setUp(self):
2731
2732
        super(ChrootedTestCase, self).setUp()
2732
 
        if not self.vfs_transport_factory == MemoryServer:
 
2733
        if not self.vfs_transport_factory == memory.MemoryServer:
2733
2734
            self.transport_readonly_server = HttpServer
2734
2735
 
2735
2736
 
3149
3150
        if self.randomised:
3150
3151
            return iter(self._tests)
3151
3152
        self.randomised = True
3152
 
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3153
        self.stream.write("Randomizing test order using seed %s\n\n" %
3153
3154
            (self.actual_seed()))
3154
3155
        # Initialise the random number generator.
3155
3156
        random.seed(self.actual_seed())
3203
3204
    return result
3204
3205
 
3205
3206
 
 
3207
def workaround_zealous_crypto_random():
 
3208
    """Crypto.Random want to help us being secure, but we don't care here.
 
3209
 
 
3210
    This workaround some test failure related to the sftp server. Once paramiko
 
3211
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3212
    """
 
3213
    try:
 
3214
        from Crypto.Random import atfork
 
3215
        atfork()
 
3216
    except ImportError:
 
3217
        pass
 
3218
 
 
3219
 
3206
3220
def fork_for_tests(suite):
3207
3221
    """Take suite and start up one runner per CPU by forking()
3208
3222
 
3212
3226
    concurrency = osutils.local_concurrency()
3213
3227
    result = []
3214
3228
    from subunit import TestProtocolClient, ProtocolTestCase
3215
 
    try:
3216
 
        from subunit.test_results import AutoTimingTestResultDecorator
3217
 
    except ImportError:
3218
 
        AutoTimingTestResultDecorator = lambda x:x
 
3229
    from subunit.test_results import AutoTimingTestResultDecorator
3219
3230
    class TestInOtherProcess(ProtocolTestCase):
3220
3231
        # Should be in subunit, I think. RBC.
3221
3232
        def __init__(self, stream, pid):
3226
3237
            try:
3227
3238
                ProtocolTestCase.run(self, result)
3228
3239
            finally:
3229
 
                os.waitpid(self.pid, os.WNOHANG)
 
3240
                os.waitpid(self.pid, 0)
3230
3241
 
3231
3242
    test_blocks = partition_tests(suite, concurrency)
3232
3243
    for process_tests in test_blocks:
3235
3246
        c2pread, c2pwrite = os.pipe()
3236
3247
        pid = os.fork()
3237
3248
        if pid == 0:
 
3249
            workaround_zealous_crypto_random()
3238
3250
            try:
3239
3251
                os.close(c2pread)
3240
3252
                # Leave stderr and stdout open so we can see test noise
3287
3299
        if not os.path.isfile(bzr_path):
3288
3300
            # We are probably installed. Assume sys.argv is the right file
3289
3301
            bzr_path = sys.argv[0]
 
3302
        bzr_path = [bzr_path]
 
3303
        if sys.platform == "win32":
 
3304
            # if we're on windows, we can't execute the bzr script directly
 
3305
            bzr_path = [sys.executable] + bzr_path
3290
3306
        fd, test_list_file_name = tempfile.mkstemp()
3291
3307
        test_list_file = os.fdopen(fd, 'wb', 1)
3292
3308
        for test in process_tests:
3293
3309
            test_list_file.write(test.id() + '\n')
3294
3310
        test_list_file.close()
3295
3311
        try:
3296
 
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3312
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3297
3313
                '--subunit']
3298
3314
            if '--no-plugins' in sys.argv:
3299
3315
                argv.append('--no-plugins')
3338
3354
 
3339
3355
    def addFailure(self, test, err):
3340
3356
        self.result.addFailure(test, err)
3341
 
 
3342
 
 
3343
 
class BZRTransformingResult(ForwardingResult):
3344
 
 
3345
 
    def addError(self, test, err):
3346
 
        feature = self._error_looks_like('UnavailableFeature: ', err)
3347
 
        if feature is not None:
3348
 
            self.result.addNotSupported(test, feature)
3349
 
        else:
3350
 
            self.result.addError(test, err)
3351
 
 
3352
 
    def addFailure(self, test, err):
3353
 
        known = self._error_looks_like('KnownFailure: ', err)
3354
 
        if known is not None:
3355
 
            self.result._addKnownFailure(test, [KnownFailure,
3356
 
                                                KnownFailure(known), None])
3357
 
        else:
3358
 
            self.result.addFailure(test, err)
3359
 
 
3360
 
    def _error_looks_like(self, prefix, err):
3361
 
        """Deserialize exception and returns the stringify value."""
3362
 
        import subunit
3363
 
        value = None
3364
 
        typ, exc, _ = err
3365
 
        if isinstance(exc, subunit.RemoteException):
3366
 
            # stringify the exception gives access to the remote traceback
3367
 
            # We search the last line for 'prefix'
3368
 
            lines = str(exc).split('\n')
3369
 
            while lines and not lines[-1]:
3370
 
                lines.pop(-1)
3371
 
            if lines:
3372
 
                if lines[-1].startswith(prefix):
3373
 
                    value = lines[-1][len(prefix):]
3374
 
        return value
 
3357
ForwardingResult = testtools.ExtendedToOriginalDecorator
3375
3358
 
3376
3359
 
3377
3360
class ProfileResult(ForwardingResult):
3638
3621
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3639
3622
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3640
3623
 
3641
 
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3624
# Obvious highest levels prefixes, feel free to add your own via a plugin
3642
3625
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3643
3626
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3644
3627
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3654
3637
        'bzrlib.tests.commands',
3655
3638
        'bzrlib.tests.per_branch',
3656
3639
        'bzrlib.tests.per_bzrdir',
 
3640
        'bzrlib.tests.per_bzrdir_colo',
 
3641
        'bzrlib.tests.per_foreign_vcs',
3657
3642
        'bzrlib.tests.per_interrepository',
3658
3643
        'bzrlib.tests.per_intertree',
3659
3644
        'bzrlib.tests.per_inventory',
3660
3645
        'bzrlib.tests.per_interbranch',
3661
3646
        'bzrlib.tests.per_lock',
 
3647
        'bzrlib.tests.per_merger',
3662
3648
        'bzrlib.tests.per_transport',
3663
3649
        'bzrlib.tests.per_tree',
3664
3650
        'bzrlib.tests.per_pack_repository',
3665
3651
        'bzrlib.tests.per_repository',
3666
3652
        'bzrlib.tests.per_repository_chk',
3667
3653
        'bzrlib.tests.per_repository_reference',
 
3654
        'bzrlib.tests.per_uifactory',
3668
3655
        'bzrlib.tests.per_versionedfile',
3669
3656
        'bzrlib.tests.per_workingtree',
3670
3657
        'bzrlib.tests.test__annotator',
 
3658
        'bzrlib.tests.test__bencode',
3671
3659
        'bzrlib.tests.test__chk_map',
3672
3660
        'bzrlib.tests.test__dirstate_helpers',
3673
3661
        'bzrlib.tests.test__groupcompress',
3674
3662
        'bzrlib.tests.test__known_graph',
3675
3663
        'bzrlib.tests.test__rio',
 
3664
        'bzrlib.tests.test__simple_set',
 
3665
        'bzrlib.tests.test__static_tuple',
3676
3666
        'bzrlib.tests.test__walkdirs_win32',
3677
3667
        'bzrlib.tests.test_ancestry',
3678
3668
        'bzrlib.tests.test_annotate',
3679
3669
        'bzrlib.tests.test_api',
3680
3670
        'bzrlib.tests.test_atomicfile',
3681
3671
        'bzrlib.tests.test_bad_files',
3682
 
        'bzrlib.tests.test_bencode',
3683
3672
        'bzrlib.tests.test_bisect_multi',
3684
3673
        'bzrlib.tests.test_branch',
3685
3674
        'bzrlib.tests.test_branchbuilder',
3693
3682
        'bzrlib.tests.test_chk_serializer',
3694
3683
        'bzrlib.tests.test_chunk_writer',
3695
3684
        'bzrlib.tests.test_clean_tree',
 
3685
        'bzrlib.tests.test_cleanup',
 
3686
        'bzrlib.tests.test_cmdline',
3696
3687
        'bzrlib.tests.test_commands',
3697
3688
        'bzrlib.tests.test_commit',
3698
3689
        'bzrlib.tests.test_commit_merge',
3732
3723
        'bzrlib.tests.test_identitymap',
3733
3724
        'bzrlib.tests.test_ignores',
3734
3725
        'bzrlib.tests.test_index',
 
3726
        'bzrlib.tests.test_import_tariff',
3735
3727
        'bzrlib.tests.test_info',
3736
3728
        'bzrlib.tests.test_inv',
3737
3729
        'bzrlib.tests.test_inventory_delta',
3745
3737
        'bzrlib.tests.test_lru_cache',
3746
3738
        'bzrlib.tests.test_lsprof',
3747
3739
        'bzrlib.tests.test_mail_client',
 
3740
        'bzrlib.tests.test_matchers',
3748
3741
        'bzrlib.tests.test_memorytree',
3749
3742
        'bzrlib.tests.test_merge',
3750
3743
        'bzrlib.tests.test_merge3',
3830
3823
 
3831
3824
 
3832
3825
def _test_suite_modules_to_doctest():
3833
 
    """Return the list of modules to doctest."""   
 
3826
    """Return the list of modules to doctest."""
 
3827
    if __doc__ is None:
 
3828
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
3829
        return []
3834
3830
    return [
3835
3831
        'bzrlib',
3836
3832
        'bzrlib.branchbuilder',
 
3833
        'bzrlib.decorators',
3837
3834
        'bzrlib.export',
3838
3835
        'bzrlib.inventory',
3839
3836
        'bzrlib.iterablefile',
4046
4043
    return new_test
4047
4044
 
4048
4045
 
4049
 
def _rmtree_temp_dir(dirname):
 
4046
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4047
                                ext_module_name):
 
4048
    """Helper for permutating tests against an extension module.
 
4049
 
 
4050
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4051
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4052
    against both implementations. Setting 'test.module' to the appropriate
 
4053
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
4054
 
 
4055
    :param standard_tests: A test suite to permute
 
4056
    :param loader: A TestLoader
 
4057
    :param py_module_name: The python path to a python module that can always
 
4058
        be loaded, and will be considered the 'python' implementation. (eg
 
4059
        'bzrlib._chk_map_py')
 
4060
    :param ext_module_name: The python path to an extension module. If the
 
4061
        module cannot be loaded, a single test will be added, which notes that
 
4062
        the module is not available. If it can be loaded, all standard_tests
 
4063
        will be run against that module.
 
4064
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4065
        tests. feature is the Feature object that can be used to determine if
 
4066
        the module is available.
 
4067
    """
 
4068
 
 
4069
    py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
 
4070
    scenarios = [
 
4071
        ('python', {'module': py_module}),
 
4072
    ]
 
4073
    suite = loader.suiteClass()
 
4074
    feature = ModuleAvailableFeature(ext_module_name)
 
4075
    if feature.available():
 
4076
        scenarios.append(('C', {'module': feature.module}))
 
4077
    else:
 
4078
        # the compiled module isn't available, so we add a failing test
 
4079
        class FailWithoutFeature(TestCase):
 
4080
            def test_fail(self):
 
4081
                self.requireFeature(feature)
 
4082
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4083
    result = multiply_tests(standard_tests, scenarios, suite)
 
4084
    return result, feature
 
4085
 
 
4086
 
 
4087
def _rmtree_temp_dir(dirname, test_id=None):
4050
4088
    # If LANG=C we probably have created some bogus paths
4051
4089
    # which rmtree(unicode) will fail to delete
4052
4090
    # so make sure we are using rmtree(str) to delete everything
4064
4102
        # We don't want to fail here because some useful display will be lost
4065
4103
        # otherwise. Polluting the tmp dir is bad, but not giving all the
4066
4104
        # possible info to the test runner is even worse.
 
4105
        if test_id != None:
 
4106
            ui.ui_factory.clear_term()
 
4107
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4108
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4109
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4110
                                    ).encode('ascii', 'replace')
4067
4111
        sys.stderr.write('Unable to remove testing dir %s\n%s'
4068
 
                         % (os.path.basename(dirname), e))
 
4112
                         % (os.path.basename(dirname), printable_e))
4069
4113
 
4070
4114
 
4071
4115
class Feature(object):
4153
4197
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4154
4198
 
4155
4199
 
 
4200
class _CompatabilityThunkFeature(Feature):
 
4201
    """This feature is just a thunk to another feature.
 
4202
 
 
4203
    It issues a deprecation warning if it is accessed, to let you know that you
 
4204
    should really use a different feature.
 
4205
    """
 
4206
 
 
4207
    def __init__(self, dep_version, module, name,
 
4208
                 replacement_name, replacement_module=None):
 
4209
        super(_CompatabilityThunkFeature, self).__init__()
 
4210
        self._module = module
 
4211
        if replacement_module is None:
 
4212
            replacement_module = module
 
4213
        self._replacement_module = replacement_module
 
4214
        self._name = name
 
4215
        self._replacement_name = replacement_name
 
4216
        self._dep_version = dep_version
 
4217
        self._feature = None
 
4218
 
 
4219
    def _ensure(self):
 
4220
        if self._feature is None:
 
4221
            depr_msg = self._dep_version % ('%s.%s'
 
4222
                                            % (self._module, self._name))
 
4223
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4224
                                               self._replacement_name)
 
4225
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4226
            # Import the new feature and use it as a replacement for the
 
4227
            # deprecated one.
 
4228
            mod = __import__(self._replacement_module, {}, {},
 
4229
                             [self._replacement_name])
 
4230
            self._feature = getattr(mod, self._replacement_name)
 
4231
 
 
4232
    def _probe(self):
 
4233
        self._ensure()
 
4234
        return self._feature._probe()
 
4235
 
 
4236
 
 
4237
class ModuleAvailableFeature(Feature):
 
4238
    """This is a feature than describes a module we want to be available.
 
4239
 
 
4240
    Declare the name of the module in __init__(), and then after probing, the
 
4241
    module will be available as 'self.module'.
 
4242
 
 
4243
    :ivar module: The module if it is available, else None.
 
4244
    """
 
4245
 
 
4246
    def __init__(self, module_name):
 
4247
        super(ModuleAvailableFeature, self).__init__()
 
4248
        self.module_name = module_name
 
4249
 
 
4250
    def _probe(self):
 
4251
        try:
 
4252
            self._module = __import__(self.module_name, {}, {}, [''])
 
4253
            return True
 
4254
        except ImportError:
 
4255
            return False
 
4256
 
 
4257
    @property
 
4258
    def module(self):
 
4259
        if self.available(): # Make sure the probe has been done
 
4260
            return self._module
 
4261
        return None
 
4262
 
 
4263
    def feature_name(self):
 
4264
        return self.module_name
 
4265
 
 
4266
 
 
4267
# This is kept here for compatibility, it is recommended to use
 
4268
# 'bzrlib.tests.feature.paramiko' instead
 
4269
ParamikoFeature = _CompatabilityThunkFeature(
 
4270
    deprecated_in((2,1,0)),
 
4271
    'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
 
4272
 
 
4273
 
4156
4274
def probe_unicode_in_user_encoding():
4157
4275
    """Try to encode several unicode strings to use in unicode-aware tests.
4158
4276
    Return first successfull match.
4238
4356
UTF8Filesystem = _UTF8Filesystem()
4239
4357
 
4240
4358
 
 
4359
class _BreakinFeature(Feature):
 
4360
    """Does this platform support the breakin feature?"""
 
4361
 
 
4362
    def _probe(self):
 
4363
        from bzrlib import breakin
 
4364
        if breakin.determine_signal() is None:
 
4365
            return False
 
4366
        if sys.platform == 'win32':
 
4367
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4368
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4369
            # access the function
 
4370
            try:
 
4371
                import ctypes
 
4372
            except OSError:
 
4373
                return False
 
4374
        return True
 
4375
 
 
4376
    def feature_name(self):
 
4377
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4378
 
 
4379
 
 
4380
BreakinFeature = _BreakinFeature()
 
4381
 
 
4382
 
4241
4383
class _CaseInsCasePresFilenameFeature(Feature):
4242
4384
    """Is the file-system case insensitive, but case-preserving?"""
4243
4385
 
4293
4435
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4294
4436
 
4295
4437
 
4296
 
class _SubUnitFeature(Feature):
4297
 
    """Check if subunit is available."""
 
4438
class _CaseSensitiveFilesystemFeature(Feature):
4298
4439
 
4299
4440
    def _probe(self):
4300
 
        try:
4301
 
            import subunit
 
4441
        if CaseInsCasePresFilenameFeature.available():
 
4442
            return False
 
4443
        elif CaseInsensitiveFilesystemFeature.available():
 
4444
            return False
 
4445
        else:
4302
4446
            return True
4303
 
        except ImportError:
4304
 
            return False
4305
4447
 
4306
4448
    def feature_name(self):
4307
 
        return 'subunit'
4308
 
 
4309
 
SubUnitFeature = _SubUnitFeature()
 
4449
        return 'case-sensitive filesystem'
 
4450
 
 
4451
# new coding style is for feature instances to be lowercase
 
4452
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4453
 
 
4454
 
 
4455
# Kept for compatibility, use bzrlib.tests.features.subunit instead
 
4456
SubUnitFeature = _CompatabilityThunkFeature(
 
4457
    deprecated_in((2,1,0)),
 
4458
    'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4310
4459
# Only define SubUnitBzrRunner if subunit is available.
4311
4460
try:
4312
4461
    from subunit import TestProtocolClient
4313
 
    try:
4314
 
        from subunit.test_results import AutoTimingTestResultDecorator
4315
 
    except ImportError:
4316
 
        AutoTimingTestResultDecorator = lambda x:x
 
4462
    from subunit.test_results import AutoTimingTestResultDecorator
4317
4463
    class SubUnitBzrRunner(TextTestRunner):
4318
4464
        def run(self, test):
4319
4465
            result = AutoTimingTestResultDecorator(
4322
4468
            return result
4323
4469
except ImportError:
4324
4470
    pass
 
4471
 
 
4472
class _PosixPermissionsFeature(Feature):
 
4473
 
 
4474
    def _probe(self):
 
4475
        def has_perms():
 
4476
            # create temporary file and check if specified perms are maintained.
 
4477
            import tempfile
 
4478
 
 
4479
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4480
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4481
            fd, name = f
 
4482
            os.close(fd)
 
4483
            os.chmod(name, write_perms)
 
4484
 
 
4485
            read_perms = os.stat(name).st_mode & 0777
 
4486
            os.unlink(name)
 
4487
            return (write_perms == read_perms)
 
4488
 
 
4489
        return (os.name == 'posix') and has_perms()
 
4490
 
 
4491
    def feature_name(self):
 
4492
        return 'POSIX permissions support'
 
4493
 
 
4494
posix_permissions_feature = _PosixPermissionsFeature()