/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2006-12-01 19:41:16 UTC
  • mfrom: (2158 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2159.
  • Revision ID: john@arbash-meinel.com-20061201194116-nvn5qhfxux5284jc
[merge] bzr.dev 2158

Show diffs side-by-side

added added

removed removed

Lines of Context:
43
43
import time
44
44
 
45
45
 
46
 
from bzrlib import memorytree
 
46
from bzrlib import (
 
47
    bzrdir,
 
48
    debug,
 
49
    errors,
 
50
    memorytree,
 
51
    osutils,
 
52
    progress,
 
53
    urlutils,
 
54
    )
47
55
import bzrlib.branch
48
 
import bzrlib.bzrdir as bzrdir
49
56
import bzrlib.commands
50
57
import bzrlib.bundle.serializer
51
 
import bzrlib.errors as errors
52
58
import bzrlib.export
53
59
import bzrlib.inventory
54
60
import bzrlib.iterablefile
61
67
from bzrlib.merge import merge_inner
62
68
import bzrlib.merge3
63
69
import bzrlib.osutils
64
 
import bzrlib.osutils as osutils
65
70
import bzrlib.plugin
66
 
import bzrlib.progress as progress
67
71
from bzrlib.revision import common_ancestor
68
72
import bzrlib.store
69
73
from bzrlib import symbol_versioning
73
77
from bzrlib.transport.local import LocalURLServer
74
78
from bzrlib.transport.memory import MemoryServer
75
79
from bzrlib.transport.readonly import ReadonlyServer
76
 
from bzrlib.trace import mutter
 
80
from bzrlib.trace import mutter, note
77
81
from bzrlib.tests import TestUtil
 
82
from bzrlib.tests.HttpServer import HttpServer
78
83
from bzrlib.tests.TestUtil import (
79
84
                          TestSuite,
80
85
                          TestLoader,
81
86
                          )
82
87
from bzrlib.tests.treeshape import build_tree_contents
83
 
import bzrlib.urlutils as urlutils
84
88
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
85
89
 
86
90
default_transport = LocalURLServer
131
135
            ]
132
136
 
133
137
 
134
 
class _MyResult(unittest._TextTestResult):
135
 
    """Custom TestResult.
 
138
class ExtendedTestResult(unittest._TextTestResult):
 
139
    """Accepts, reports and accumulates the results of running tests.
136
140
 
137
 
    Shows output in a different format, including displaying runtime for tests.
 
141
    Compared to this unittest version this class adds support for profiling,
 
142
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
143
    There are further-specialized subclasses for different types of display.
138
144
    """
 
145
 
139
146
    stop_early = False
140
147
    
141
 
    def __init__(self, stream, descriptions, verbosity, pb=None,
142
 
                 bench_history=None):
 
148
    def __init__(self, stream, descriptions, verbosity,
 
149
                 bench_history=None,
 
150
                 num_tests=None,
 
151
                 ):
143
152
        """Construct new TestResult.
144
153
 
145
154
        :param bench_history: Optionally, a writable file object to accumulate
146
155
            benchmark results.
147
156
        """
148
157
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
149
 
        self.pb = pb
150
158
        if bench_history is not None:
151
159
            from bzrlib.version import _get_bzr_source_tree
152
160
            src_tree = _get_bzr_source_tree()
162
170
                revision_id = ''
163
171
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
164
172
        self._bench_history = bench_history
 
173
        self.ui = bzrlib.ui.ui_factory
 
174
        self.num_tests = num_tests
 
175
        self.error_count = 0
 
176
        self.failure_count = 0
 
177
        self.skip_count = 0
 
178
        self.count = 0
 
179
        self._overall_start_time = time.time()
165
180
    
166
181
    def extractBenchmarkTime(self, testCase):
167
182
        """Add a benchmark time for the current test case."""
183
198
        """Format seconds as milliseconds with leading spaces."""
184
199
        return "%5dms" % (1000 * seconds)
185
200
 
186
 
    def _ellipsise_unimportant_words(self, a_string, final_width,
187
 
                                   keep_start=False):
188
 
        """Add ellipses (sp?) for overly long strings.
189
 
        
190
 
        :param keep_start: If true preserve the start of a_string rather
191
 
                           than the end of it.
192
 
        """
193
 
        if keep_start:
194
 
            if len(a_string) > final_width:
195
 
                result = a_string[:final_width-3] + '...'
196
 
            else:
197
 
                result = a_string
198
 
        else:
199
 
            if len(a_string) > final_width:
200
 
                result = '...' + a_string[3-final_width:]
201
 
            else:
202
 
                result = a_string
203
 
        return result.ljust(final_width)
 
201
    def _shortened_test_description(self, test):
 
202
        what = test.id()
 
203
        what = re.sub(r'^bzrlib\.(tests|benchmark)\.', '', what)
 
204
        return what
204
205
 
205
206
    def startTest(self, test):
206
207
        unittest.TestResult.startTest(self, test)
207
 
        # In a short description, the important words are in
208
 
        # the beginning, but in an id, the important words are
209
 
        # at the end
210
 
        SHOW_DESCRIPTIONS = False
211
 
 
212
 
        if not self.showAll and self.dots and self.pb is not None:
213
 
            final_width = 13
214
 
        else:
215
 
            final_width = osutils.terminal_width()
216
 
            final_width = final_width - 15 - 8
217
 
        what = None
218
 
        if SHOW_DESCRIPTIONS:
219
 
            what = test.shortDescription()
220
 
            if what:
221
 
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
222
 
        if what is None:
223
 
            what = test.id()
224
 
            if what.startswith('bzrlib.tests.'):
225
 
                what = what[13:]
226
 
            what = self._ellipsise_unimportant_words(what, final_width)
227
 
        if self.showAll:
228
 
            self.stream.write(what)
229
 
        elif self.dots and self.pb is not None:
230
 
            self.pb.update(what, self.testsRun - 1, None)
231
 
        self.stream.flush()
 
208
        self.report_test_start(test)
232
209
        self._recordTestStartTime()
233
210
 
234
211
    def _recordTestStartTime(self):
245
222
        if setKeepLogfile is not None:
246
223
            setKeepLogfile()
247
224
        self.extractBenchmarkTime(test)
248
 
        if self.showAll:
249
 
            self.stream.writeln("ERROR %s" % self._testTimeString())
250
 
        elif self.dots and self.pb is None:
251
 
            self.stream.write('E')
252
 
        elif self.dots:
253
 
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
254
 
            self.pb.note(self._ellipsise_unimportant_words(
255
 
                            test.id() + ': ERROR',
256
 
                            osutils.terminal_width()))
257
 
        self.stream.flush()
 
225
        self.report_error(test, err)
258
226
        if self.stop_early:
259
227
            self.stop()
260
228
 
266
234
        if setKeepLogfile is not None:
267
235
            setKeepLogfile()
268
236
        self.extractBenchmarkTime(test)
269
 
        if self.showAll:
270
 
            self.stream.writeln(" FAIL %s" % self._testTimeString())
271
 
        elif self.dots and self.pb is None:
272
 
            self.stream.write('F')
273
 
        elif self.dots:
274
 
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
275
 
            self.pb.note(self._ellipsise_unimportant_words(
276
 
                            test.id() + ': FAIL',
277
 
                            osutils.terminal_width()))
278
 
        self.stream.flush()
 
237
        self.report_failure(test, err)
279
238
        if self.stop_early:
280
239
            self.stop()
281
240
 
286
245
                self._bench_history.write("%s %s\n" % (
287
246
                    self._formatTime(self._benchmarkTime),
288
247
                    test.id()))
289
 
        if self.showAll:
290
 
            self.stream.writeln('   OK %s' % self._testTimeString())
291
 
            for bench_called, stats in getattr(test, '_benchcalls', []):
292
 
                self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
293
 
                stats.pprint(file=self.stream)
294
 
        elif self.dots and self.pb is None:
295
 
            self.stream.write('~')
296
 
        elif self.dots:
297
 
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
298
 
        self.stream.flush()
 
248
        self.report_success(test)
299
249
        unittest.TestResult.addSuccess(self, test)
300
250
 
301
251
    def addSkipped(self, test, skip_excinfo):
302
252
        self.extractBenchmarkTime(test)
303
 
        if self.showAll:
304
 
            print >>self.stream, ' SKIP %s' % self._testTimeString()
305
 
            print >>self.stream, '     %s' % skip_excinfo[1]
306
 
        elif self.dots and self.pb is None:
307
 
            self.stream.write('S')
308
 
        elif self.dots:
309
 
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
310
 
        self.stream.flush()
 
253
        self.report_skip(test, skip_excinfo)
311
254
        # seems best to treat this as success from point-of-view of unittest
312
255
        # -- it actually does nothing so it barely matters :)
313
256
        try:
333
276
            self.stream.writeln(self.separator2)
334
277
            self.stream.writeln("%s" % err)
335
278
 
 
279
    def finished(self):
 
280
        pass
 
281
 
 
282
    def report_cleaning_up(self):
 
283
        pass
 
284
 
 
285
    def report_success(self, test):
 
286
        pass
 
287
 
 
288
 
 
289
class TextTestResult(ExtendedTestResult):
 
290
    """Displays progress and results of tests in text form"""
 
291
 
 
292
    def __init__(self, *args, **kw):
 
293
        ExtendedTestResult.__init__(self, *args, **kw)
 
294
        self.pb = self.ui.nested_progress_bar()
 
295
        self.pb.show_pct = False
 
296
        self.pb.show_spinner = False
 
297
        self.pb.show_eta = False, 
 
298
        self.pb.show_count = False
 
299
        self.pb.show_bar = False
 
300
 
 
301
    def report_starting(self):
 
302
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
303
 
 
304
    def _progress_prefix_text(self):
 
305
        a = '[%d' % self.count
 
306
        if self.num_tests is not None:
 
307
            a +='/%d' % self.num_tests
 
308
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
309
        if self.error_count:
 
310
            a += ', %d errors' % self.error_count
 
311
        if self.failure_count:
 
312
            a += ', %d failed' % self.failure_count
 
313
        if self.skip_count:
 
314
            a += ', %d skipped' % self.skip_count
 
315
        a += ']'
 
316
        return a
 
317
 
 
318
    def report_test_start(self, test):
 
319
        self.count += 1
 
320
        self.pb.update(
 
321
                self._progress_prefix_text()
 
322
                + ' ' 
 
323
                + self._shortened_test_description(test))
 
324
 
 
325
    def report_error(self, test, err):
 
326
        self.error_count += 1
 
327
        self.pb.note('ERROR: %s\n    %s\n', 
 
328
            self._shortened_test_description(test),
 
329
            err[1],
 
330
            )
 
331
 
 
332
    def report_failure(self, test, err):
 
333
        self.failure_count += 1
 
334
        self.pb.note('FAIL: %s\n    %s\n', 
 
335
            self._shortened_test_description(test),
 
336
            err[1],
 
337
            )
 
338
 
 
339
    def report_skip(self, test, skip_excinfo):
 
340
        self.skip_count += 1
 
341
        if False:
 
342
            # at the moment these are mostly not things we can fix
 
343
            # and so they just produce stipple; use the verbose reporter
 
344
            # to see them.
 
345
            if False:
 
346
                # show test and reason for skip
 
347
                self.pb.note('SKIP: %s\n    %s\n', 
 
348
                    self._shortened_test_description(test),
 
349
                    skip_excinfo[1])
 
350
            else:
 
351
                # since the class name was left behind in the still-visible
 
352
                # progress bar...
 
353
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
354
 
 
355
    def report_cleaning_up(self):
 
356
        self.pb.update('cleaning up...')
 
357
 
 
358
    def finished(self):
 
359
        self.pb.finished()
 
360
 
 
361
 
 
362
class VerboseTestResult(ExtendedTestResult):
 
363
    """Produce long output, with one line per test run plus times"""
 
364
 
 
365
    def _ellipsize_to_right(self, a_string, final_width):
 
366
        """Truncate and pad a string, keeping the right hand side"""
 
367
        if len(a_string) > final_width:
 
368
            result = '...' + a_string[3-final_width:]
 
369
        else:
 
370
            result = a_string
 
371
        return result.ljust(final_width)
 
372
 
 
373
    def report_starting(self):
 
374
        self.stream.write('running %d tests...\n' % self.num_tests)
 
375
 
 
376
    def report_test_start(self, test):
 
377
        self.count += 1
 
378
        name = self._shortened_test_description(test)
 
379
        self.stream.write(self._ellipsize_to_right(name, 60))
 
380
        self.stream.flush()
 
381
 
 
382
    def report_error(self, test, err):
 
383
        self.error_count += 1
 
384
        self.stream.writeln('ERROR %s\n    %s' 
 
385
                % (self._testTimeString(), err[1]))
 
386
 
 
387
    def report_failure(self, test, err):
 
388
        self.failure_count += 1
 
389
        self.stream.writeln('FAIL %s\n    %s'
 
390
                % (self._testTimeString(), err[1]))
 
391
 
 
392
    def report_success(self, test):
 
393
        self.stream.writeln('   OK %s' % self._testTimeString())
 
394
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
395
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
396
            stats.pprint(file=self.stream)
 
397
        self.stream.flush()
 
398
 
 
399
    def report_skip(self, test, skip_excinfo):
 
400
        print >>self.stream, ' SKIP %s' % self._testTimeString()
 
401
        print >>self.stream, '     %s' % skip_excinfo[1]
 
402
 
336
403
 
337
404
class TextTestRunner(object):
338
405
    stop_on_failure = False
342
409
                 descriptions=0,
343
410
                 verbosity=1,
344
411
                 keep_output=False,
345
 
                 pb=None,
346
412
                 bench_history=None):
347
413
        self.stream = unittest._WritelnDecorator(stream)
348
414
        self.descriptions = descriptions
349
415
        self.verbosity = verbosity
350
416
        self.keep_output = keep_output
351
 
        self.pb = pb
352
417
        self._bench_history = bench_history
353
418
 
354
 
    def _makeResult(self):
355
 
        result = _MyResult(self.stream,
356
 
                           self.descriptions,
357
 
                           self.verbosity,
358
 
                           pb=self.pb,
359
 
                           bench_history=self._bench_history)
360
 
        result.stop_early = self.stop_on_failure
361
 
        return result
362
 
 
363
419
    def run(self, test):
364
420
        "Run the given test case or test suite."
365
 
        result = self._makeResult()
366
421
        startTime = time.time()
367
 
        if self.pb is not None:
368
 
            self.pb.update('Running tests', 0, test.countTestCases())
 
422
        if self.verbosity == 1:
 
423
            result_class = TextTestResult
 
424
        elif self.verbosity >= 2:
 
425
            result_class = VerboseTestResult
 
426
        result = result_class(self.stream,
 
427
                              self.descriptions,
 
428
                              self.verbosity,
 
429
                              bench_history=self._bench_history,
 
430
                              num_tests=test.countTestCases(),
 
431
                              )
 
432
        result.stop_early = self.stop_on_failure
 
433
        result.report_starting()
369
434
        test.run(result)
370
435
        stopTime = time.time()
371
436
        timeTaken = stopTime - startTime
386
451
            self.stream.writeln(")")
387
452
        else:
388
453
            self.stream.writeln("OK")
389
 
        if self.pb is not None:
390
 
            self.pb.update('Cleaning up', 0, 1)
 
454
        result.report_cleaning_up()
391
455
        # This is still a little bogus, 
392
456
        # but only a little. Folk not using our testrunner will
393
457
        # have to delete their temp directories themselves.
408
472
                        sys.getfilesystemencoding())
409
473
                osutils.rmtree(test_root)
410
474
        else:
411
 
            if self.pb is not None:
412
 
                self.pb.note("Failed tests working directories are in '%s'\n",
413
 
                             test_root)
414
 
            else:
415
 
                self.stream.writeln(
416
 
                    "Failed tests working directories are in '%s'\n" %
417
 
                    test_root)
 
475
            note("Failed tests working directories are in '%s'\n", test_root)
418
476
        TestCaseWithMemoryTransport.TEST_ROOT = None
419
 
        if self.pb is not None:
420
 
            self.pb.clear()
 
477
        result.finished()
421
478
        return result
422
479
 
423
480
 
503
560
        unittest.TestCase.setUp(self)
504
561
        self._cleanEnvironment()
505
562
        bzrlib.trace.disable_default_logging()
 
563
        self._silenceUI()
506
564
        self._startLogFile()
507
565
        self._benchcalls = []
508
566
        self._benchtime = None
509
567
 
 
568
    def _silenceUI(self):
 
569
        """Turn off UI for duration of test"""
 
570
        # by default the UI is off; tests can turn it on if they want it.
 
571
        saved = bzrlib.ui.ui_factory
 
572
        def _restore():
 
573
            bzrlib.ui.ui_factory = saved
 
574
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
575
        self.addCleanup(_restore)
 
576
 
510
577
    def _ndiff_strings(self, a, b):
511
578
        """Return ndiff between two strings containing lines.
512
579
        
571
638
            raise AssertionError("value(s) %r not present in container %r" % 
572
639
                                 (missing, superlist))
573
640
 
 
641
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
642
        """Fail unless excClass is raised when the iterator from func is used.
 
643
        
 
644
        Many functions can return generators this makes sure
 
645
        to wrap them in a list() call to make sure the whole generator
 
646
        is run, and that the proper exception is raised.
 
647
        """
 
648
        try:
 
649
            list(func(*args, **kwargs))
 
650
        except excClass:
 
651
            return
 
652
        else:
 
653
            if getattr(excClass,'__name__', None) is not None:
 
654
                excName = excClass.__name__
 
655
            else:
 
656
                excName = str(excClass)
 
657
            raise self.failureException, "%s not raised" % excName
 
658
 
574
659
    def assertIs(self, left, right):
575
660
        if not (left is right):
576
661
            raise AssertionError("%r is not %r." % (left, right))
603
688
            a_callable(*args, **kwargs).
604
689
        """
605
690
        local_warnings = []
606
 
        def capture_warnings(msg, cls, stacklevel=None):
 
691
        def capture_warnings(msg, cls=None, stacklevel=None):
607
692
            # we've hooked into a deprecation specific callpath,
608
693
            # only deprecations should getting sent via it.
609
694
            self.assertEqual(cls, DeprecationWarning)
837
922
            os.chdir(working_dir)
838
923
 
839
924
        try:
840
 
            result = self.apply_redirected(stdin, stdout, stderr,
841
 
                                           bzrlib.commands.run_bzr_catch_errors,
842
 
                                           argv)
 
925
            saved_debug_flags = frozenset(debug.debug_flags)
 
926
            debug.debug_flags.clear()
 
927
            try:
 
928
                result = self.apply_redirected(stdin, stdout, stderr,
 
929
                                               bzrlib.commands.run_bzr_catch_errors,
 
930
                                               argv)
 
931
            finally:
 
932
                debug.debug_flags.update(saved_debug_flags)
843
933
        finally:
844
934
            logger.removeHandler(handler)
845
935
            bzrlib.ui.ui_factory = old_ui_factory
1177
1267
        self.assertTrue(t.is_readonly())
1178
1268
        return t
1179
1269
 
 
1270
    def create_transport_readonly_server(self):
 
1271
        """Create a transport server from class defined at init.
 
1272
 
 
1273
        This is mostly a hook for daughter classes.
 
1274
        """
 
1275
        return self.transport_readonly_server()
 
1276
 
1180
1277
    def get_readonly_server(self):
1181
1278
        """Get the server instance for the readonly transport
1182
1279
 
1190
1287
                self.__readonly_server = ReadonlyServer()
1191
1288
                self.__readonly_server.setUp(self.__server)
1192
1289
            else:
1193
 
                self.__readonly_server = self.transport_readonly_server()
 
1290
                self.__readonly_server = self.create_transport_readonly_server()
1194
1291
                self.__readonly_server.setUp()
1195
1292
            self.addCleanup(self.__readonly_server.tearDown)
1196
1293
        return self.__readonly_server
1449
1546
    readwrite one must both define get_url() as resolving to os.getcwd().
1450
1547
    """
1451
1548
 
 
1549
    def create_transport_server(self):
 
1550
        """Create a transport server from class defined at init.
 
1551
 
 
1552
        This is mostly a hook for daughter classes.
 
1553
        """
 
1554
        return self.transport_server()
 
1555
 
1452
1556
    def get_server(self):
1453
1557
        """See TestCaseWithMemoryTransport.
1454
1558
 
1456
1560
        diagnostics.
1457
1561
        """
1458
1562
        if self.__server is None:
1459
 
            self.__server = self.transport_server()
 
1563
            self.__server = self.create_transport_server()
1460
1564
            self.__server.setUp()
1461
1565
            self.addCleanup(self.__server.tearDown)
1462
1566
        return self.__server
1527
1631
 
1528
1632
    def setUp(self):
1529
1633
        super(ChrootedTestCase, self).setUp()
1530
 
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1531
 
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
1634
        if not self.transport_server == MemoryServer:
 
1635
            self.transport_readonly_server = HttpServer
1532
1636
 
1533
1637
 
1534
1638
def filter_suite_by_re(suite, pattern):
1546
1650
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1547
1651
    if verbose:
1548
1652
        verbosity = 2
1549
 
        pb = None
1550
1653
    else:
1551
1654
        verbosity = 1
1552
 
        pb = progress.ProgressBar()
1553
1655
    runner = TextTestRunner(stream=sys.stdout,
1554
1656
                            descriptions=0,
1555
1657
                            verbosity=verbosity,
1556
1658
                            keep_output=keep_output,
1557
 
                            pb=pb,
1558
1659
                            bench_history=bench_history)
1559
1660
    runner.stop_on_failure=stop_on_failure
1560
1661
    if pattern != '.*':
1622
1723
                   'bzrlib.tests.test_escaped_store',
1623
1724
                   'bzrlib.tests.test_fetch',
1624
1725
                   'bzrlib.tests.test_ftp_transport',
 
1726
                   'bzrlib.tests.test_generate_ids',
1625
1727
                   'bzrlib.tests.test_gpg',
1626
1728
                   'bzrlib.tests.test_graph',
1627
1729
                   'bzrlib.tests.test_hashcache',
1688
1790
                   'bzrlib.tests.test_weave',
1689
1791
                   'bzrlib.tests.test_whitebox',
1690
1792
                   'bzrlib.tests.test_workingtree',
 
1793
                   'bzrlib.tests.test_wsgi',
1691
1794
                   'bzrlib.tests.test_xml',
1692
1795
                   ]
1693
1796
    test_transport_implementations = [