/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Wouter van Heyst
  • Date: 2006-06-06 12:06:20 UTC
  • mfrom: (1740 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: larstiq@larstiq.dyndns.org-20060606120620-50066b0951e4ef7c
merge bzr.dev 1740

Show diffs side-by-side

added added

removed removed

Lines of Context:
43
43
import bzrlib.branch
44
44
import bzrlib.bzrdir as bzrdir
45
45
import bzrlib.commands
 
46
import bzrlib.bundle.serializer
46
47
import bzrlib.errors as errors
47
48
import bzrlib.inventory
48
49
import bzrlib.iterablefile
49
50
import bzrlib.lockdir
 
51
try:
 
52
    import bzrlib.lsprof
 
53
except ImportError:
 
54
    # lsprof not available
 
55
    pass
50
56
from bzrlib.merge import merge_inner
51
57
import bzrlib.merge3
52
58
import bzrlib.osutils
53
59
import bzrlib.osutils as osutils
54
60
import bzrlib.plugin
 
61
import bzrlib.progress as progress
55
62
from bzrlib.revision import common_ancestor
56
63
import bzrlib.store
57
64
import bzrlib.trace
70
77
MODULES_TO_TEST = []
71
78
MODULES_TO_DOCTEST = [
72
79
                      bzrlib.branch,
 
80
                      bzrlib.bundle.serializer,
73
81
                      bzrlib.commands,
74
82
                      bzrlib.errors,
75
83
                      bzrlib.inventory,
80
88
                      bzrlib.osutils,
81
89
                      bzrlib.store
82
90
                      ]
 
91
 
 
92
 
83
93
def packages_to_test():
84
94
    """Return a list of packages to test.
85
95
 
114
124
    Shows output in a different format, including displaying runtime for tests.
115
125
    """
116
126
    stop_early = False
117
 
 
118
 
    def _elapsedTime(self):
119
 
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
127
    
 
128
    def __init__(self, stream, descriptions, verbosity, pb=None):
 
129
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
130
        self.pb = pb
 
131
    
 
132
    def extractBenchmarkTime(self, testCase):
 
133
        """Add a benchmark time for the current test case."""
 
134
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
135
    
 
136
    def _elapsedTestTimeString(self):
 
137
        """Return a time string for the overall time the current test has taken."""
 
138
        return self._formatTime(time.time() - self._start_time)
 
139
 
 
140
    def _testTimeString(self):
 
141
        if self._benchmarkTime is not None:
 
142
            return "%s/%s" % (
 
143
                self._formatTime(self._benchmarkTime),
 
144
                self._elapsedTestTimeString())
 
145
        else:
 
146
            return "      %s" % self._elapsedTestTimeString()
 
147
 
 
148
    def _formatTime(self, seconds):
 
149
        """Format seconds as milliseconds with leading spaces."""
 
150
        return "%5dms" % (1000 * seconds)
 
151
 
 
152
    def _ellipsise_unimportant_words(self, a_string, final_width,
 
153
                                   keep_start=False):
 
154
        """Add ellipses (sp?) for overly long strings.
 
155
        
 
156
        :param keep_start: If true preserve the start of a_string rather
 
157
                           than the end of it.
 
158
        """
 
159
        if keep_start:
 
160
            if len(a_string) > final_width:
 
161
                result = a_string[:final_width-3] + '...'
 
162
            else:
 
163
                result = a_string
 
164
        else:
 
165
            if len(a_string) > final_width:
 
166
                result = '...' + a_string[3-final_width:]
 
167
            else:
 
168
                result = a_string
 
169
        return result.ljust(final_width)
120
170
 
121
171
    def startTest(self, test):
122
172
        unittest.TestResult.startTest(self, test)
124
174
        # the beginning, but in an id, the important words are
125
175
        # at the end
126
176
        SHOW_DESCRIPTIONS = False
 
177
 
 
178
        if not self.showAll and self.dots and self.pb is not None:
 
179
            final_width = 13
 
180
        else:
 
181
            final_width = osutils.terminal_width()
 
182
            final_width = final_width - 15 - 8
 
183
        what = None
 
184
        if SHOW_DESCRIPTIONS:
 
185
            what = test.shortDescription()
 
186
            if what:
 
187
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
 
188
        if what is None:
 
189
            what = test.id()
 
190
            if what.startswith('bzrlib.tests.'):
 
191
                what = what[13:]
 
192
            what = self._ellipsise_unimportant_words(what, final_width)
127
193
        if self.showAll:
128
 
            width = osutils.terminal_width()
129
 
            name_width = width - 15
130
 
            what = None
131
 
            if SHOW_DESCRIPTIONS:
132
 
                what = test.shortDescription()
133
 
                if what:
134
 
                    if len(what) > name_width:
135
 
                        what = what[:name_width-3] + '...'
136
 
            if what is None:
137
 
                what = test.id()
138
 
                if what.startswith('bzrlib.tests.'):
139
 
                    what = what[13:]
140
 
                if len(what) > name_width:
141
 
                    what = '...' + what[3-name_width:]
142
 
            what = what.ljust(name_width)
143
194
            self.stream.write(what)
 
195
        elif self.dots and self.pb is not None:
 
196
            self.pb.update(what, self.testsRun - 1, None)
144
197
        self.stream.flush()
 
198
        self._recordTestStartTime()
 
199
 
 
200
    def _recordTestStartTime(self):
 
201
        """Record that a test has started."""
145
202
        self._start_time = time.time()
146
203
 
147
204
    def addError(self, test, err):
148
205
        if isinstance(err[1], TestSkipped):
149
206
            return self.addSkipped(test, err)    
150
207
        unittest.TestResult.addError(self, test, err)
 
208
        self.extractBenchmarkTime(test)
151
209
        if self.showAll:
152
 
            self.stream.writeln("ERROR %s" % self._elapsedTime())
153
 
        elif self.dots:
 
210
            self.stream.writeln("ERROR %s" % self._testTimeString())
 
211
        elif self.dots and self.pb is None:
154
212
            self.stream.write('E')
 
213
        elif self.dots:
 
214
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
155
215
        self.stream.flush()
156
216
        if self.stop_early:
157
217
            self.stop()
158
218
 
159
219
    def addFailure(self, test, err):
160
220
        unittest.TestResult.addFailure(self, test, err)
 
221
        self.extractBenchmarkTime(test)
161
222
        if self.showAll:
162
 
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
163
 
        elif self.dots:
 
223
            self.stream.writeln(" FAIL %s" % self._testTimeString())
 
224
        elif self.dots and self.pb is None:
164
225
            self.stream.write('F')
 
226
        elif self.dots:
 
227
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
165
228
        self.stream.flush()
166
229
        if self.stop_early:
167
230
            self.stop()
168
231
 
169
232
    def addSuccess(self, test):
 
233
        self.extractBenchmarkTime(test)
170
234
        if self.showAll:
171
 
            self.stream.writeln('   OK %s' % self._elapsedTime())
172
 
        elif self.dots:
 
235
            self.stream.writeln('   OK %s' % self._testTimeString())
 
236
            for bench_called, stats in getattr(test, '_benchcalls', []):
 
237
                self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
238
                stats.pprint(file=self.stream)
 
239
        elif self.dots and self.pb is None:
173
240
            self.stream.write('~')
 
241
        elif self.dots:
 
242
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
174
243
        self.stream.flush()
175
244
        unittest.TestResult.addSuccess(self, test)
176
245
 
177
246
    def addSkipped(self, test, skip_excinfo):
 
247
        self.extractBenchmarkTime(test)
178
248
        if self.showAll:
179
 
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
 
249
            print >>self.stream, ' SKIP %s' % self._testTimeString()
180
250
            print >>self.stream, '     %s' % skip_excinfo[1]
181
 
        elif self.dots:
 
251
        elif self.dots and self.pb is None:
182
252
            self.stream.write('S')
 
253
        elif self.dots:
 
254
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
183
255
        self.stream.flush()
184
256
        # seems best to treat this as success from point-of-view of unittest
185
257
        # -- it actually does nothing so it barely matters :)
200
272
            self.stream.writeln("%s" % err)
201
273
 
202
274
 
203
 
class TextTestRunner(unittest.TextTestRunner):
 
275
class TextTestRunner(object):
204
276
    stop_on_failure = False
205
277
 
 
278
    def __init__(self,
 
279
                 stream=sys.stderr,
 
280
                 descriptions=0,
 
281
                 verbosity=1,
 
282
                 keep_output=False,
 
283
                 pb=None):
 
284
        self.stream = unittest._WritelnDecorator(stream)
 
285
        self.descriptions = descriptions
 
286
        self.verbosity = verbosity
 
287
        self.keep_output = keep_output
 
288
        self.pb = pb
 
289
 
206
290
    def _makeResult(self):
207
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
291
        result = _MyResult(self.stream,
 
292
                           self.descriptions,
 
293
                           self.verbosity,
 
294
                           pb=self.pb)
208
295
        result.stop_early = self.stop_on_failure
209
296
        return result
210
297
 
 
298
    def run(self, test):
 
299
        "Run the given test case or test suite."
 
300
        result = self._makeResult()
 
301
        startTime = time.time()
 
302
        if self.pb is not None:
 
303
            self.pb.update('Running tests', 0, test.countTestCases())
 
304
        test.run(result)
 
305
        stopTime = time.time()
 
306
        timeTaken = stopTime - startTime
 
307
        result.printErrors()
 
308
        self.stream.writeln(result.separator2)
 
309
        run = result.testsRun
 
310
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
311
                            (run, run != 1 and "s" or "", timeTaken))
 
312
        self.stream.writeln()
 
313
        if not result.wasSuccessful():
 
314
            self.stream.write("FAILED (")
 
315
            failed, errored = map(len, (result.failures, result.errors))
 
316
            if failed:
 
317
                self.stream.write("failures=%d" % failed)
 
318
            if errored:
 
319
                if failed: self.stream.write(", ")
 
320
                self.stream.write("errors=%d" % errored)
 
321
            self.stream.writeln(")")
 
322
        else:
 
323
            self.stream.writeln("OK")
 
324
        if self.pb is not None:
 
325
            self.pb.update('Cleaning up', 0, 1)
 
326
        # This is still a little bogus, 
 
327
        # but only a little. Folk not using our testrunner will
 
328
        # have to delete their temp directories themselves.
 
329
        test_root = TestCaseInTempDir.TEST_ROOT
 
330
        if result.wasSuccessful() or not self.keep_output:
 
331
            if test_root is not None:
 
332
                    osutils.rmtree(test_root)
 
333
        else:
 
334
            if self.pb is not None:
 
335
                self.pb.note("Failed tests working directories are in '%s'\n",
 
336
                             test_root)
 
337
            else:
 
338
                self.stream.writeln(
 
339
                    "Failed tests working directories are in '%s'\n" %
 
340
                    test_root)
 
341
        TestCaseInTempDir.TEST_ROOT = None
 
342
        if self.pb is not None:
 
343
            self.pb.clear()
 
344
        return result
 
345
 
211
346
 
212
347
def iter_suite_tests(suite):
213
348
    """Return all tests in a suite, recursing through nested suites"""
278
413
    accidentally overlooked.
279
414
    """
280
415
 
281
 
    BZRPATH = 'bzr'
282
416
    _log_file_name = None
283
417
    _log_contents = ''
 
418
    # record lsprof data when performing benchmark calls.
 
419
    _gather_lsprof_in_benchmarks = False
284
420
 
285
421
    def __init__(self, methodName='testMethod'):
286
422
        super(TestCase, self).__init__(methodName)
291
427
        self._cleanEnvironment()
292
428
        bzrlib.trace.disable_default_logging()
293
429
        self._startLogFile()
 
430
        self._benchcalls = []
 
431
        self._benchtime = None
294
432
 
295
433
    def _ndiff_strings(self, a, b):
296
434
        """Return ndiff between two strings containing lines.
445
583
        self._runCleanups()
446
584
        unittest.TestCase.tearDown(self)
447
585
 
 
586
    def time(self, callable, *args, **kwargs):
 
587
        """Run callable and accrue the time it takes to the benchmark time.
 
588
        
 
589
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
590
        this will cause lsprofile statistics to be gathered and stored in
 
591
        self._benchcalls.
 
592
        """
 
593
        if self._benchtime is None:
 
594
            self._benchtime = 0
 
595
        start = time.time()
 
596
        try:
 
597
            if not self._gather_lsprof_in_benchmarks:
 
598
                return callable(*args, **kwargs)
 
599
            else:
 
600
                # record this benchmark
 
601
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
602
                stats.sort()
 
603
                self._benchcalls.append(((callable, args, kwargs), stats))
 
604
                return ret
 
605
        finally:
 
606
            self._benchtime += time.time() - start
 
607
 
448
608
    def _runCleanups(self):
449
609
        """Run registered cleanup functions. 
450
610
 
508
668
        handler.setLevel(logging.INFO)
509
669
        logger = logging.getLogger('')
510
670
        logger.addHandler(handler)
511
 
        old_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
671
        old_ui_factory = bzrlib.ui.ui_factory
 
672
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
673
            stdout=stdout,
 
674
            stderr=stderr)
512
675
        bzrlib.ui.ui_factory.stdin = stdin
513
676
        try:
514
677
            result = self.apply_redirected(stdin, stdout, stderr,
516
679
                                           argv)
517
680
        finally:
518
681
            logger.removeHandler(handler)
519
 
            bzrlib.ui.ui_factory.stdin = old_stdin
520
 
        # TODO: jam 20060105 Because we theoretically know the encoding
521
 
        #       of stdout and stderr, we could decode them at this time
522
 
        #       but for now, we will assume that the output of all
523
 
        #       functions
 
682
            bzrlib.ui.ui_factory = old_ui_factory
 
683
 
524
684
        out = stdout.getvalue()
525
685
        err = stderr.getvalue()
526
686
        if out:
943
1103
 
944
1104
def run_suite(suite, name='test', verbose=False, pattern=".*",
945
1105
              stop_on_failure=False, keep_output=False,
946
 
              transport=None):
 
1106
              transport=None, lsprof_timed=None):
947
1107
    TestCaseInTempDir._TEST_NAME = name
 
1108
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
948
1109
    if verbose:
949
1110
        verbosity = 2
 
1111
        pb = None
950
1112
    else:
951
1113
        verbosity = 1
 
1114
        pb = progress.ProgressBar()
952
1115
    runner = TextTestRunner(stream=sys.stdout,
953
1116
                            descriptions=0,
954
 
                            verbosity=verbosity)
 
1117
                            verbosity=verbosity,
 
1118
                            keep_output=keep_output,
 
1119
                            pb=pb)
955
1120
    runner.stop_on_failure=stop_on_failure
956
1121
    if pattern != '.*':
957
1122
        suite = filter_suite_by_re(suite, pattern)
958
1123
    result = runner.run(suite)
959
 
    # This is still a little bogus, 
960
 
    # but only a little. Folk not using our testrunner will
961
 
    # have to delete their temp directories themselves.
962
 
    test_root = TestCaseInTempDir.TEST_ROOT
963
 
    if result.wasSuccessful() or not keep_output:
964
 
        if test_root is not None:
965
 
            print 'Deleting test root %s...' % test_root
966
 
            try:
967
 
                osutils.rmtree(test_root)
968
 
            finally:
969
 
                print
970
 
    else:
971
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
972
1124
    return result.wasSuccessful()
973
1125
 
974
1126
 
975
1127
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
976
1128
             keep_output=False,
977
 
             transport=None):
 
1129
             transport=None,
 
1130
             test_suite_factory=None,
 
1131
             lsprof_timed=None):
978
1132
    """Run the whole test suite under the enhanced runner"""
979
1133
    global default_transport
980
1134
    if transport is None:
981
1135
        transport = default_transport
982
1136
    old_transport = default_transport
983
1137
    default_transport = transport
984
 
    suite = test_suite()
985
1138
    try:
 
1139
        if test_suite_factory is None:
 
1140
            suite = test_suite()
 
1141
        else:
 
1142
            suite = test_suite_factory()
986
1143
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
987
1144
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
988
 
                     transport=transport)
 
1145
                     transport=transport,
 
1146
                     lsprof_timed=lsprof_timed)
989
1147
    finally:
990
1148
        default_transport = old_transport
991
1149
 
992
1150
 
993
 
 
994
1151
def test_suite():
995
 
    """Build and return TestSuite for the whole program."""
 
1152
    """Build and return TestSuite for the whole of bzrlib.
 
1153
    
 
1154
    This function can be replaced if you need to change the default test
 
1155
    suite on a global basis, but it is not encouraged.
 
1156
    """
996
1157
    from doctest import DocTestSuite
997
1158
 
998
1159
    global MODULES_TO_DOCTEST
1002
1163
                   'bzrlib.tests.test_api',
1003
1164
                   'bzrlib.tests.test_bad_files',
1004
1165
                   'bzrlib.tests.test_branch',
 
1166
                   'bzrlib.tests.test_bundle',
1005
1167
                   'bzrlib.tests.test_bzrdir',
1006
1168
                   'bzrlib.tests.test_command',
1007
1169
                   'bzrlib.tests.test_commit',
1033
1195
                   'bzrlib.tests.test_options',
1034
1196
                   'bzrlib.tests.test_osutils',
1035
1197
                   'bzrlib.tests.test_patch',
 
1198
                   'bzrlib.tests.test_patches',
1036
1199
                   'bzrlib.tests.test_permissions',
1037
1200
                   'bzrlib.tests.test_plugins',
1038
1201
                   'bzrlib.tests.test_progress',
1048
1211
                   'bzrlib.tests.test_sftp_transport',
1049
1212
                   'bzrlib.tests.test_smart_add',
1050
1213
                   'bzrlib.tests.test_source',
 
1214
                   'bzrlib.tests.test_status',
1051
1215
                   'bzrlib.tests.test_store',
1052
1216
                   'bzrlib.tests.test_symbol_versioning',
1053
1217
                   'bzrlib.tests.test_testament',
1071
1235
    test_transport_implementations = [
1072
1236
        'bzrlib.tests.test_transport_implementations']
1073
1237
 
1074
 
    TestCase.BZRPATH = osutils.pathjoin(
1075
 
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
1076
 
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
1077
 
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
1078
 
    print
1079
1238
    suite = TestSuite()
1080
 
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
1081
 
    # errors if it fails to load a named module - no indication of what's
1082
 
    # actually wrong, just "no such module".  We should probably override that
1083
 
    # class, but for the moment just load them ourselves. (mbp 20051202)
1084
 
    loader = TestLoader()
 
1239
    loader = TestUtil.TestLoader()
1085
1240
    from bzrlib.transport import TransportTestProviderAdapter
1086
1241
    adapter = TransportTestProviderAdapter()
1087
1242
    adapt_modules(test_transport_implementations, adapter, loader, suite)
1088
 
    for mod_name in testmod_names:
1089
 
        mod = _load_module_by_name(mod_name)
1090
 
        suite.addTest(loader.loadTestsFromModule(mod))
 
1243
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1091
1244
    for package in packages_to_test():
1092
1245
        suite.addTest(package.test_suite())
1093
1246
    for m in MODULES_TO_TEST:
1102
1255
 
1103
1256
def adapt_modules(mods_list, adapter, loader, suite):
1104
1257
    """Adapt the modules in mods_list using adapter and add to suite."""
1105
 
    for mod_name in mods_list:
1106
 
        mod = _load_module_by_name(mod_name)
1107
 
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
1108
 
            suite.addTests(adapter.adapt(test))
1109
 
 
1110
 
 
1111
 
def _load_module_by_name(mod_name):
1112
 
    parts = mod_name.split('.')
1113
 
    module = __import__(mod_name)
1114
 
    del parts[0]
1115
 
    # for historical reasons python returns the top-level module even though
1116
 
    # it loads the submodule; we need to walk down to get the one we want.
1117
 
    while parts:
1118
 
        module = getattr(module, parts.pop(0))
1119
 
    return module
 
1258
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
1259
        suite.addTests(adapter.adapt(test))