/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Pool
  • Date: 2010-10-08 04:38:25 UTC
  • mfrom: (5462 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5478.
  • Revision ID: mbp@sourcefrog.net-20101008043825-b181r8bo5r3qwb6j
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
 
17
"""Testing framework extensions"""
17
18
 
18
19
# TODO: Perhaps there should be an API to find out if bzr running under the
19
20
# test suite -- some plugins might want to avoid making intrusive changes if
28
29
 
29
30
import atexit
30
31
import codecs
31
 
from copy import copy
 
32
import copy
32
33
from cStringIO import StringIO
33
34
import difflib
34
35
import doctest
35
36
import errno
 
37
import itertools
36
38
import logging
37
 
import math
38
39
import os
39
 
from pprint import pformat
 
40
import platform
 
41
import pprint
40
42
import random
41
43
import re
42
44
import shlex
43
45
import stat
44
 
from subprocess import Popen, PIPE, STDOUT
 
46
import subprocess
45
47
import sys
46
48
import tempfile
47
49
import threading
48
50
import time
 
51
import traceback
49
52
import unittest
50
53
import warnings
51
54
 
 
55
import testtools
 
56
# nb: check this before importing anything else from within it
 
57
_testtools_version = getattr(testtools, '__version__', ())
 
58
if _testtools_version < (0, 9, 2):
 
59
    raise ImportError("need at least testtools 0.9.2: %s is %r"
 
60
        % (testtools.__file__, _testtools_version))
 
61
from testtools import content
52
62
 
53
63
from bzrlib import (
54
64
    branchbuilder,
61
71
    lock as _mod_lock,
62
72
    memorytree,
63
73
    osutils,
64
 
    progress,
65
74
    ui,
66
75
    urlutils,
67
76
    registry,
 
77
    transport as _mod_transport,
68
78
    workingtree,
69
79
    )
70
80
import bzrlib.branch
88
98
from bzrlib.symbol_versioning import (
89
99
    DEPRECATED_PARAMETER,
90
100
    deprecated_function,
 
101
    deprecated_in,
91
102
    deprecated_method,
92
103
    deprecated_passed,
93
104
    )
94
105
import bzrlib.trace
95
 
from bzrlib.transport import get_transport, pathfilter
96
 
import bzrlib.transport
97
 
from bzrlib.transport.local import LocalURLServer
98
 
from bzrlib.transport.memory import MemoryServer
99
 
from bzrlib.transport.readonly import ReadonlyServer
 
106
from bzrlib.transport import (
 
107
    memory,
 
108
    pathfilter,
 
109
    )
100
110
from bzrlib.trace import mutter, note
101
 
from bzrlib.tests import TestUtil
102
 
from bzrlib.tests.http_server import HttpServer
103
 
from bzrlib.tests.TestUtil import (
104
 
                          TestSuite,
105
 
                          TestLoader,
106
 
                          )
107
 
from bzrlib.tests.treeshape import build_tree_contents
 
111
from bzrlib.tests import (
 
112
    test_server,
 
113
    TestUtil,
 
114
    treeshape,
 
115
    )
108
116
from bzrlib.ui import NullProgressView
109
117
from bzrlib.ui.text import TextUIFactory
110
118
import bzrlib.version_info_formats.format_custom
115
123
# shown frame is the test code, not our assertXYZ.
116
124
__unittest = 1
117
125
 
118
 
default_transport = LocalURLServer
 
126
default_transport = test_server.LocalURLServer
 
127
 
 
128
 
 
129
_unitialized_attr = object()
 
130
"""A sentinel needed to act as a default value in a method signature."""
 
131
 
119
132
 
120
133
# Subunit result codes, defined here to prevent a hard dependency on subunit.
121
134
SUBUNIT_SEEK_SET = 0
122
135
SUBUNIT_SEEK_CUR = 1
123
136
 
 
137
# These are intentionally brought into this namespace. That way plugins, etc
 
138
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
 
139
TestSuite = TestUtil.TestSuite
 
140
TestLoader = TestUtil.TestLoader
124
141
 
125
 
class ExtendedTestResult(unittest._TextTestResult):
 
142
class ExtendedTestResult(testtools.TextTestResult):
126
143
    """Accepts, reports and accumulates the results of running tests.
127
144
 
128
145
    Compared to the unittest version this class adds support for
149
166
        :param bench_history: Optionally, a writable file object to accumulate
150
167
            benchmark results.
151
168
        """
152
 
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
169
        testtools.TextTestResult.__init__(self, stream)
153
170
        if bench_history is not None:
154
171
            from bzrlib.version import _get_bzr_source_tree
155
172
            src_tree = _get_bzr_source_tree()
176
193
        self.count = 0
177
194
        self._overall_start_time = time.time()
178
195
        self._strict = strict
 
196
        self._first_thread_leaker_id = None
 
197
        self._tests_leaking_threads_count = 0
179
198
 
180
199
    def stopTestRun(self):
181
200
        run = self.testsRun
182
201
        actionTaken = "Ran"
183
202
        stopTime = time.time()
184
203
        timeTaken = stopTime - self.startTime
185
 
        self.printErrors()
186
 
        self.stream.writeln(self.separator2)
187
 
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
204
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
205
        #                the parent class method is similar have to duplicate
 
206
        self._show_list('ERROR', self.errors)
 
207
        self._show_list('FAIL', self.failures)
 
208
        self.stream.write(self.sep2)
 
209
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
188
210
                            run, run != 1 and "s" or "", timeTaken))
189
 
        self.stream.writeln()
190
211
        if not self.wasSuccessful():
191
212
            self.stream.write("FAILED (")
192
213
            failed, errored = map(len, (self.failures, self.errors))
199
220
                if failed or errored: self.stream.write(", ")
200
221
                self.stream.write("known_failure_count=%d" %
201
222
                    self.known_failure_count)
202
 
            self.stream.writeln(")")
 
223
            self.stream.write(")\n")
203
224
        else:
204
225
            if self.known_failure_count:
205
 
                self.stream.writeln("OK (known_failures=%d)" %
 
226
                self.stream.write("OK (known_failures=%d)\n" %
206
227
                    self.known_failure_count)
207
228
            else:
208
 
                self.stream.writeln("OK")
 
229
                self.stream.write("OK\n")
209
230
        if self.skip_count > 0:
210
231
            skipped = self.skip_count
211
 
            self.stream.writeln('%d test%s skipped' %
 
232
            self.stream.write('%d test%s skipped\n' %
212
233
                                (skipped, skipped != 1 and "s" or ""))
213
234
        if self.unsupported:
214
235
            for feature, count in sorted(self.unsupported.items()):
215
 
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
236
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
216
237
                    (feature, count))
217
238
        if self._strict:
218
239
            ok = self.wasStrictlySuccessful()
219
240
        else:
220
241
            ok = self.wasSuccessful()
221
 
        if TestCase._first_thread_leaker_id:
 
242
        if self._first_thread_leaker_id:
222
243
            self.stream.write(
223
244
                '%s is leaking threads among %d leaking tests.\n' % (
224
 
                TestCase._first_thread_leaker_id,
225
 
                TestCase._leaking_threads_tests))
 
245
                self._first_thread_leaker_id,
 
246
                self._tests_leaking_threads_count))
226
247
            # We don't report the main thread as an active one.
227
248
            self.stream.write(
228
249
                '%d non-main threads were left active in the end.\n'
229
 
                % (TestCase._active_threads - 1))
230
 
 
231
 
    def _extractBenchmarkTime(self, testCase):
 
250
                % (len(self._active_threads) - 1))
 
251
 
 
252
    def getDescription(self, test):
 
253
        return test.id()
 
254
 
 
255
    def _extractBenchmarkTime(self, testCase, details=None):
232
256
        """Add a benchmark time for the current test case."""
 
257
        if details and 'benchtime' in details:
 
258
            return float(''.join(details['benchtime'].iter_bytes()))
233
259
        return getattr(testCase, "_benchtime", None)
234
260
 
235
261
    def _elapsedTestTimeString(self):
251
277
 
252
278
    def _shortened_test_description(self, test):
253
279
        what = test.id()
254
 
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
280
        what = re.sub(r'^bzrlib\.tests\.', '', what)
255
281
        return what
256
282
 
257
283
    def startTest(self, test):
258
 
        unittest.TestResult.startTest(self, test)
 
284
        super(ExtendedTestResult, self).startTest(test)
259
285
        if self.count == 0:
260
286
            self.startTests()
 
287
        self.count += 1
261
288
        self.report_test_start(test)
262
289
        test.number = self.count
263
290
        self._recordTestStartTime()
 
291
        # Only check for thread leaks if the test case supports cleanups
 
292
        addCleanup = getattr(test, "addCleanup", None)
 
293
        if addCleanup is not None:
 
294
            addCleanup(self._check_leaked_threads, test)
264
295
 
265
296
    def startTests(self):
266
 
        import platform
267
 
        if getattr(sys, 'frozen', None) is None:
268
 
            bzr_path = osutils.realpath(sys.argv[0])
269
 
        else:
270
 
            bzr_path = sys.executable
271
 
        self.stream.write(
272
 
            'testing: %s\n' % (bzr_path,))
273
 
        self.stream.write(
274
 
            '   %s\n' % (
275
 
                    bzrlib.__path__[0],))
276
 
        self.stream.write(
277
 
            '   bzr-%s python-%s %s\n' % (
278
 
                    bzrlib.version_string,
279
 
                    bzrlib._format_version_tuple(sys.version_info),
280
 
                    platform.platform(aliased=1),
281
 
                    ))
282
 
        self.stream.write('\n')
 
297
        self.report_tests_starting()
 
298
        self._active_threads = threading.enumerate()
 
299
 
 
300
    def _check_leaked_threads(self, test):
 
301
        """See if any threads have leaked since last call
 
302
 
 
303
        A sample of live threads is stored in the _active_threads attribute,
 
304
        when this method runs it compares the current live threads and any not
 
305
        in the previous sample are treated as having leaked.
 
306
        """
 
307
        now_active_threads = set(threading.enumerate())
 
308
        threads_leaked = now_active_threads.difference(self._active_threads)
 
309
        if threads_leaked:
 
310
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
311
            self._tests_leaking_threads_count += 1
 
312
            if self._first_thread_leaker_id is None:
 
313
                self._first_thread_leaker_id = test.id()
 
314
            self._active_threads = now_active_threads
283
315
 
284
316
    def _recordTestStartTime(self):
285
317
        """Record that a test has started."""
286
318
        self._start_time = time.time()
287
319
 
288
 
    def _cleanupLogFile(self, test):
289
 
        # We can only do this if we have one of our TestCases, not if
290
 
        # we have a doctest.
291
 
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
292
 
        if setKeepLogfile is not None:
293
 
            setKeepLogfile()
294
 
 
295
320
    def addError(self, test, err):
296
321
        """Tell result that test finished with an error.
297
322
 
299
324
        fails with an unexpected error.
300
325
        """
301
326
        self._post_mortem()
302
 
        unittest.TestResult.addError(self, test, err)
 
327
        super(ExtendedTestResult, self).addError(test, err)
303
328
        self.error_count += 1
304
329
        self.report_error(test, err)
305
330
        if self.stop_early:
306
331
            self.stop()
307
 
        self._cleanupLogFile(test)
308
332
 
309
333
    def addFailure(self, test, err):
310
334
        """Tell result that test failed.
313
337
        fails because e.g. an assert() method failed.
314
338
        """
315
339
        self._post_mortem()
316
 
        unittest.TestResult.addFailure(self, test, err)
 
340
        super(ExtendedTestResult, self).addFailure(test, err)
317
341
        self.failure_count += 1
318
342
        self.report_failure(test, err)
319
343
        if self.stop_early:
320
344
            self.stop()
321
 
        self._cleanupLogFile(test)
322
345
 
323
 
    def addSuccess(self, test):
 
346
    def addSuccess(self, test, details=None):
324
347
        """Tell result that test completed successfully.
325
348
 
326
349
        Called from the TestCase run()
327
350
        """
328
351
        if self._bench_history is not None:
329
 
            benchmark_time = self._extractBenchmarkTime(test)
 
352
            benchmark_time = self._extractBenchmarkTime(test, details)
330
353
            if benchmark_time is not None:
331
354
                self._bench_history.write("%s %s\n" % (
332
355
                    self._formatTime(benchmark_time),
333
356
                    test.id()))
334
357
        self.report_success(test)
335
 
        self._cleanupLogFile(test)
336
 
        unittest.TestResult.addSuccess(self, test)
 
358
        super(ExtendedTestResult, self).addSuccess(test)
337
359
        test._log_contents = ''
338
360
 
339
361
    def addExpectedFailure(self, test, err):
362
384
        self.not_applicable_count += 1
363
385
        self.report_not_applicable(test, reason)
364
386
 
365
 
    def printErrorList(self, flavour, errors):
366
 
        for test, err in errors:
367
 
            self.stream.writeln(self.separator1)
368
 
            self.stream.write("%s: " % flavour)
369
 
            self.stream.writeln(self.getDescription(test))
370
 
            if getattr(test, '_get_log', None) is not None:
371
 
                log_contents = test._get_log()
372
 
                if log_contents:
373
 
                    self.stream.write('\n')
374
 
                    self.stream.write(
375
 
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
376
 
                    self.stream.write('\n')
377
 
                    self.stream.write(log_contents)
378
 
                    self.stream.write('\n')
379
 
                    self.stream.write(
380
 
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
381
 
                    self.stream.write('\n')
382
 
            self.stream.writeln(self.separator2)
383
 
            self.stream.writeln("%s" % err)
384
 
 
385
387
    def _post_mortem(self):
386
388
        """Start a PDB post mortem session."""
387
389
        if os.environ.get('BZR_TEST_PDB', None):
396
398
        else:
397
399
            raise errors.BzrError("Unknown whence %r" % whence)
398
400
 
399
 
    def report_cleaning_up(self):
400
 
        pass
 
401
    def report_tests_starting(self):
 
402
        """Display information before the test run begins"""
 
403
        if getattr(sys, 'frozen', None) is None:
 
404
            bzr_path = osutils.realpath(sys.argv[0])
 
405
        else:
 
406
            bzr_path = sys.executable
 
407
        self.stream.write(
 
408
            'bzr selftest: %s\n' % (bzr_path,))
 
409
        self.stream.write(
 
410
            '   %s\n' % (
 
411
                    bzrlib.__path__[0],))
 
412
        self.stream.write(
 
413
            '   bzr-%s python-%s %s\n' % (
 
414
                    bzrlib.version_string,
 
415
                    bzrlib._format_version_tuple(sys.version_info),
 
416
                    platform.platform(aliased=1),
 
417
                    ))
 
418
        self.stream.write('\n')
 
419
 
 
420
    def report_test_start(self, test):
 
421
        """Display information on the test just about to be run"""
 
422
 
 
423
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
424
        """Display information on a test that leaked one or more threads"""
 
425
        # GZ 2010-09-09: A leak summary reported separately from the general
 
426
        #                thread debugging would be nice. Tests under subunit
 
427
        #                need something not using stream, perhaps adding a
 
428
        #                testtools details object would be fitting.
 
429
        if 'threads' in selftest_debug_flags:
 
430
            self.stream.write('%s is leaking, active is now %d\n' %
 
431
                (test.id(), len(active_threads)))
401
432
 
402
433
    def startTestRun(self):
403
434
        self.startTime = time.time()
440
471
        self.pb.finished()
441
472
        super(TextTestResult, self).stopTestRun()
442
473
 
443
 
    def startTestRun(self):
444
 
        super(TextTestResult, self).startTestRun()
 
474
    def report_tests_starting(self):
 
475
        super(TextTestResult, self).report_tests_starting()
445
476
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
446
477
 
447
 
    def printErrors(self):
448
 
        # clear the pb to make room for the error listing
449
 
        self.pb.clear()
450
 
        super(TextTestResult, self).printErrors()
451
 
 
452
478
    def _progress_prefix_text(self):
453
479
        # the longer this text, the less space we have to show the test
454
480
        # name...
467
493
            a += '%dm%ds' % (runtime / 60, runtime % 60)
468
494
        else:
469
495
            a += '%ds' % runtime
470
 
        if self.error_count:
471
 
            a += ', %d err' % self.error_count
472
 
        if self.failure_count:
473
 
            a += ', %d fail' % self.failure_count
 
496
        total_fail_count = self.error_count + self.failure_count
 
497
        if total_fail_count:
 
498
            a += ', %d failed' % total_fail_count
474
499
        # if self.unsupported:
475
500
        #     a += ', %d missing' % len(self.unsupported)
476
501
        a += ']'
477
502
        return a
478
503
 
479
504
    def report_test_start(self, test):
480
 
        self.count += 1
481
505
        self.pb.update(
482
506
                self._progress_prefix_text()
483
507
                + ' '
487
511
        return self._shortened_test_description(test)
488
512
 
489
513
    def report_error(self, test, err):
490
 
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
 
514
        self.stream.write('ERROR: %s\n    %s\n' % (
491
515
            self._test_description(test),
492
516
            err[1],
493
517
            ))
494
518
 
495
519
    def report_failure(self, test, err):
496
 
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
 
520
        self.stream.write('FAIL: %s\n    %s\n' % (
497
521
            self._test_description(test),
498
522
            err[1],
499
523
            ))
500
524
 
501
525
    def report_known_failure(self, test, err):
502
 
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
503
 
            self._test_description(test), err[1]))
 
526
        pass
504
527
 
505
528
    def report_skip(self, test, reason):
506
529
        pass
511
534
    def report_unsupported(self, test, feature):
512
535
        """test cannot be run because feature is missing."""
513
536
 
514
 
    def report_cleaning_up(self):
515
 
        self.pb.update('Cleaning up')
516
 
 
517
537
 
518
538
class VerboseTestResult(ExtendedTestResult):
519
539
    """Produce long output, with one line per test run plus times"""
526
546
            result = a_string
527
547
        return result.ljust(final_width)
528
548
 
529
 
    def startTestRun(self):
530
 
        super(VerboseTestResult, self).startTestRun()
 
549
    def report_tests_starting(self):
531
550
        self.stream.write('running %d tests...\n' % self.num_tests)
 
551
        super(VerboseTestResult, self).report_tests_starting()
532
552
 
533
553
    def report_test_start(self, test):
534
 
        self.count += 1
535
554
        name = self._shortened_test_description(test)
536
555
        width = osutils.terminal_width()
537
556
        if width is not None:
549
568
        return '%s%s' % (indent, err[1])
550
569
 
551
570
    def report_error(self, test, err):
552
 
        self.stream.writeln('ERROR %s\n%s'
 
571
        self.stream.write('ERROR %s\n%s\n'
553
572
                % (self._testTimeString(test),
554
573
                   self._error_summary(err)))
555
574
 
556
575
    def report_failure(self, test, err):
557
 
        self.stream.writeln(' FAIL %s\n%s'
 
576
        self.stream.write(' FAIL %s\n%s\n'
558
577
                % (self._testTimeString(test),
559
578
                   self._error_summary(err)))
560
579
 
561
580
    def report_known_failure(self, test, err):
562
 
        self.stream.writeln('XFAIL %s\n%s'
 
581
        self.stream.write('XFAIL %s\n%s\n'
563
582
                % (self._testTimeString(test),
564
583
                   self._error_summary(err)))
565
584
 
566
585
    def report_success(self, test):
567
 
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
586
        self.stream.write('   OK %s\n' % self._testTimeString(test))
568
587
        for bench_called, stats in getattr(test, '_benchcalls', []):
569
 
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
588
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
570
589
            stats.pprint(file=self.stream)
571
590
        # flush the stream so that we get smooth output. This verbose mode is
572
591
        # used to show the output in PQM.
573
592
        self.stream.flush()
574
593
 
575
594
    def report_skip(self, test, reason):
576
 
        self.stream.writeln(' SKIP %s\n%s'
 
595
        self.stream.write(' SKIP %s\n%s\n'
577
596
                % (self._testTimeString(test), reason))
578
597
 
579
598
    def report_not_applicable(self, test, reason):
580
 
        self.stream.writeln('  N/A %s\n    %s'
 
599
        self.stream.write('  N/A %s\n    %s\n'
581
600
                % (self._testTimeString(test), reason))
582
601
 
583
602
    def report_unsupported(self, test, feature):
584
603
        """test cannot be run because feature is missing."""
585
 
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
604
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
586
605
                %(self._testTimeString(test), feature))
587
606
 
588
607
 
604
623
            applied left to right - the first element in the list is the 
605
624
            innermost decorator.
606
625
        """
607
 
        self.stream = unittest._WritelnDecorator(stream)
 
626
        # stream may know claim to know to write unicode strings, but in older
 
627
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
628
        # specifically a built in file with encoding 'UTF-8' will still try
 
629
        # to encode using ascii.
 
630
        new_encoding = osutils.get_terminal_encoding()
 
631
        codec = codecs.lookup(new_encoding)
 
632
        if type(codec) is tuple:
 
633
            # Python 2.4
 
634
            encode = codec[0]
 
635
        else:
 
636
            encode = codec.encode
 
637
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
 
638
        stream.encoding = new_encoding
 
639
        self.stream = stream
608
640
        self.descriptions = descriptions
609
641
        self.verbosity = verbosity
610
642
        self._bench_history = bench_history
629
661
        for decorator in self._result_decorators:
630
662
            result = decorator(result)
631
663
            result.stop_early = self.stop_on_failure
632
 
        try:
633
 
            import testtools
634
 
        except ImportError:
635
 
            pass
636
 
        else:
637
 
            if isinstance(test, testtools.ConcurrentTestSuite):
638
 
                # We need to catch bzr specific behaviors
639
 
                result = BZRTransformingResult(result)
640
664
        result.startTestRun()
641
665
        try:
642
666
            test.run(result)
660
684
                        % (type(suite), suite))
661
685
 
662
686
 
663
 
class TestSkipped(Exception):
664
 
    """Indicates that a test was intentionally skipped, rather than failing."""
 
687
TestSkipped = testtools.testcase.TestSkipped
665
688
 
666
689
 
667
690
class TestNotApplicable(TestSkipped):
673
696
    """
674
697
 
675
698
 
676
 
class KnownFailure(AssertionError):
677
 
    """Indicates that a test failed in a precisely expected manner.
678
 
 
679
 
    Such failures dont block the whole test suite from passing because they are
680
 
    indicators of partially completed code or of future work. We have an
681
 
    explicit error for them so that we can ensure that they are always visible:
682
 
    KnownFailures are always shown in the output of bzr selftest.
683
 
    """
 
699
# traceback._some_str fails to format exceptions that have the default
 
700
# __str__ which does an implicit ascii conversion. However, repr() on those
 
701
# objects works, for all that its not quite what the doctor may have ordered.
 
702
def _clever_some_str(value):
 
703
    try:
 
704
        return str(value)
 
705
    except:
 
706
        try:
 
707
            return repr(value).replace('\\n', '\n')
 
708
        except:
 
709
            return '<unprintable %s object>' % type(value).__name__
 
710
 
 
711
traceback._some_str = _clever_some_str
 
712
 
 
713
 
 
714
# deprecated - use self.knownFailure(), or self.expectFailure.
 
715
KnownFailure = testtools.testcase._ExpectedFailure
684
716
 
685
717
 
686
718
class UnavailableFeature(Exception):
692
724
    """
693
725
 
694
726
 
695
 
class CommandFailed(Exception):
696
 
    pass
697
 
 
698
 
 
699
727
class StringIOWrapper(object):
700
728
    """A wrapper around cStringIO which just adds an encoding attribute.
701
729
 
738
766
    # XXX: Should probably unify more with CannedInputUIFactory or a
739
767
    # particular configuration of TextUIFactory, or otherwise have a clearer
740
768
    # idea of how they're supposed to be different.
741
 
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
769
    # See https://bugs.launchpad.net/bzr/+bug/408213
742
770
 
743
771
    def __init__(self, stdout=None, stderr=None, stdin=None):
744
772
        if stdin is not None:
762
790
        return NullProgressView()
763
791
 
764
792
 
765
 
class TestCase(unittest.TestCase):
 
793
class TestCase(testtools.TestCase):
766
794
    """Base class for bzr unit tests.
767
795
 
768
796
    Tests that need access to disk resources should subclass
778
806
    routine, and to build and check bzr trees.
779
807
 
780
808
    In addition to the usual method of overriding tearDown(), this class also
781
 
    allows subclasses to register functions into the _cleanups list, which is
 
809
    allows subclasses to register cleanup functions via addCleanup, which are
782
810
    run in order as the object is torn down.  It's less likely this will be
783
811
    accidentally overlooked.
784
812
    """
785
813
 
786
 
    _active_threads = None
787
 
    _leaking_threads_tests = 0
788
 
    _first_thread_leaker_id = None
789
 
    _log_file_name = None
790
 
    _log_contents = ''
791
 
    _keep_log_file = False
 
814
    _log_file = None
792
815
    # record lsprof data when performing benchmark calls.
793
816
    _gather_lsprof_in_benchmarks = False
794
 
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
795
 
                     '_log_contents', '_log_file_name', '_benchtime',
796
 
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
797
817
 
798
818
    def __init__(self, methodName='testMethod'):
799
819
        super(TestCase, self).__init__(methodName)
800
 
        self._cleanups = []
801
 
        self._bzr_test_setUp_run = False
802
 
        self._bzr_test_tearDown_run = False
803
820
        self._directory_isolation = True
 
821
        self.exception_handlers.insert(0,
 
822
            (UnavailableFeature, self._do_unsupported_or_skip))
 
823
        self.exception_handlers.insert(0,
 
824
            (TestNotApplicable, self._do_not_applicable))
804
825
 
805
826
    def setUp(self):
806
 
        unittest.TestCase.setUp(self)
807
 
        self._bzr_test_setUp_run = True
 
827
        super(TestCase, self).setUp()
 
828
        for feature in getattr(self, '_test_needs_features', []):
 
829
            self.requireFeature(feature)
 
830
        self._log_contents = None
 
831
        self.addDetail("log", content.Content(content.ContentType("text",
 
832
            "plain", {"charset": "utf8"}),
 
833
            lambda:[self._get_log(keep_log_file=True)]))
808
834
        self._cleanEnvironment()
809
835
        self._silenceUI()
810
836
        self._startLogFile()
814
840
        self._track_transports()
815
841
        self._track_locks()
816
842
        self._clear_debug_flags()
817
 
        TestCase._active_threads = threading.activeCount()
818
 
        self.addCleanup(self._check_leaked_threads)
819
843
 
820
844
    def debug(self):
821
845
        # debug a frame up.
822
846
        import pdb
823
847
        pdb.Pdb().set_trace(sys._getframe().f_back)
824
848
 
825
 
    def _check_leaked_threads(self):
826
 
        active = threading.activeCount()
827
 
        leaked_threads = active - TestCase._active_threads
828
 
        TestCase._active_threads = active
829
 
        # If some tests make the number of threads *decrease*, we'll consider
830
 
        # that they are just observing old threads dieing, not agressively kill
831
 
        # random threads. So we don't report these tests as leaking. The risk
832
 
        # is that we have false positives that way (the test see 2 threads
833
 
        # going away but leak one) but it seems less likely than the actual
834
 
        # false positives (the test see threads going away and does not leak).
835
 
        if leaked_threads > 0:
836
 
            TestCase._leaking_threads_tests += 1
837
 
            if TestCase._first_thread_leaker_id is None:
838
 
                TestCase._first_thread_leaker_id = self.id()
 
849
    def discardDetail(self, name):
 
850
        """Extend the addDetail, getDetails api so we can remove a detail.
 
851
 
 
852
        eg. bzr always adds the 'log' detail at startup, but we don't want to
 
853
        include it for skipped, xfail, etc tests.
 
854
 
 
855
        It is safe to call this for a detail that doesn't exist, in case this
 
856
        gets called multiple times.
 
857
        """
 
858
        # We cheat. details is stored in __details which means we shouldn't
 
859
        # touch it. but getDetails() returns the dict directly, so we can
 
860
        # mutate it.
 
861
        details = self.getDetails()
 
862
        if name in details:
 
863
            del details[name]
839
864
 
840
865
    def _clear_debug_flags(self):
841
866
        """Prevent externally set debug flags affecting tests.
843
868
        Tests that want to use debug flags can just set them in the
844
869
        debug_flags set during setup/teardown.
845
870
        """
846
 
        self._preserved_debug_flags = set(debug.debug_flags)
 
871
        # Start with a copy of the current debug flags we can safely modify.
 
872
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
847
873
        if 'allow_debug' not in selftest_debug_flags:
848
874
            debug.debug_flags.clear()
849
875
        if 'disable_lock_checks' not in selftest_debug_flags:
850
876
            debug.debug_flags.add('strict_locks')
851
 
        self.addCleanup(self._restore_debug_flags)
852
877
 
853
878
    def _clear_hooks(self):
854
879
        # prevent hooks affecting tests
875
900
    def _silenceUI(self):
876
901
        """Turn off UI for duration of test"""
877
902
        # by default the UI is off; tests can turn it on if they want it.
878
 
        saved = ui.ui_factory
879
 
        def _restore():
880
 
            ui.ui_factory = saved
881
 
        ui.ui_factory = ui.SilentUIFactory()
882
 
        self.addCleanup(_restore)
 
903
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
883
904
 
884
905
    def _check_locks(self):
885
906
        """Check that all lock take/release actions have been paired."""
914
935
            self._lock_check_thorough = False
915
936
        else:
916
937
            self._lock_check_thorough = True
917
 
            
 
938
 
918
939
        self.addCleanup(self._check_locks)
919
940
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
920
941
                                                self._lock_acquired, None)
934
955
 
935
956
    def permit_dir(self, name):
936
957
        """Permit a directory to be used by this test. See permit_url."""
937
 
        name_transport = get_transport(name)
 
958
        name_transport = _mod_transport.get_transport(name)
938
959
        self.permit_url(name)
939
960
        self.permit_url(name_transport.base)
940
961
 
963
984
            try:
964
985
                workingtree.WorkingTree.open(path)
965
986
            except (errors.NotBranchError, errors.NoWorkingTree):
966
 
                return
 
987
                raise TestSkipped('Needs a working tree of bzr sources')
967
988
        finally:
968
989
            self.enable_directory_isolation()
969
990
 
1013
1034
        server's urls to be used.
1014
1035
        """
1015
1036
        if backing_server is None:
1016
 
            transport_server.setUp()
 
1037
            transport_server.start_server()
1017
1038
        else:
1018
 
            transport_server.setUp(backing_server)
1019
 
        self.addCleanup(transport_server.tearDown)
 
1039
            transport_server.start_server(backing_server)
 
1040
        self.addCleanup(transport_server.stop_server)
1020
1041
        # Obtain a real transport because if the server supplies a password, it
1021
1042
        # will be hidden from the base on the client side.
1022
 
        t = get_transport(transport_server.get_url())
 
1043
        t = _mod_transport.get_transport(transport_server.get_url())
1023
1044
        # Some transport servers effectively chroot the backing transport;
1024
1045
        # others like SFTPServer don't - users of the transport can walk up the
1025
1046
        # transport to read the entire backing transport. This wouldn't matter
1036
1057
        if t.base.endswith('/work/'):
1037
1058
            # we have safety net/test root/work
1038
1059
            t = t.clone('../..')
1039
 
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1060
        elif isinstance(transport_server,
 
1061
                        test_server.SmartTCPServer_for_testing):
1040
1062
            # The smart server adds a path similar to work, which is traversed
1041
1063
            # up from by the client. But the server is chrooted - the actual
1042
1064
            # backing transport is not escaped from, and VFS requests to the
1085
1107
            message += '\n'
1086
1108
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1087
1109
            % (message,
1088
 
               pformat(a), pformat(b)))
 
1110
               pprint.pformat(a), pprint.pformat(b)))
1089
1111
 
1090
1112
    assertEquals = assertEqual
1091
1113
 
1197
1219
            raise AssertionError('pattern "%s" found in "%s"'
1198
1220
                    % (needle_re, haystack))
1199
1221
 
 
1222
    def assertContainsString(self, haystack, needle):
 
1223
        if haystack.find(needle) == -1:
 
1224
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1225
 
1200
1226
    def assertSubset(self, sublist, superlist):
1201
1227
        """Assert that every entry in sublist is present in superlist."""
1202
1228
        missing = set(sublist) - set(superlist)
1289
1315
                m += ": " + msg
1290
1316
            self.fail(m)
1291
1317
 
1292
 
    def expectFailure(self, reason, assertion, *args, **kwargs):
1293
 
        """Invoke a test, expecting it to fail for the given reason.
1294
 
 
1295
 
        This is for assertions that ought to succeed, but currently fail.
1296
 
        (The failure is *expected* but not *wanted*.)  Please be very precise
1297
 
        about the failure you're expecting.  If a new bug is introduced,
1298
 
        AssertionError should be raised, not KnownFailure.
1299
 
 
1300
 
        Frequently, expectFailure should be followed by an opposite assertion.
1301
 
        See example below.
1302
 
 
1303
 
        Intended to be used with a callable that raises AssertionError as the
1304
 
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
1305
 
 
1306
 
        Raises KnownFailure if the test fails.  Raises AssertionError if the
1307
 
        test succeeds.
1308
 
 
1309
 
        example usage::
1310
 
 
1311
 
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
1312
 
                             dynamic_val)
1313
 
          self.assertEqual(42, dynamic_val)
1314
 
 
1315
 
          This means that a dynamic_val of 54 will cause the test to raise
1316
 
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
1317
 
          only a dynamic_val of 42 will allow the test to pass.  Anything other
1318
 
          than 54 or 42 will cause an AssertionError.
1319
 
        """
1320
 
        try:
1321
 
            assertion(*args, **kwargs)
1322
 
        except AssertionError:
1323
 
            raise KnownFailure(reason)
1324
 
        else:
1325
 
            self.fail('Unexpected success.  Should have failed: %s' % reason)
1326
 
 
1327
1318
    def assertFileEqual(self, content, path):
1328
1319
        """Fail if path does not contain 'content'."""
1329
1320
        self.failUnlessExists(path)
1334
1325
            f.close()
1335
1326
        self.assertEqualDiff(content, s)
1336
1327
 
 
1328
    def assertDocstring(self, expected_docstring, obj):
 
1329
        """Fail if obj does not have expected_docstring"""
 
1330
        if __doc__ is None:
 
1331
            # With -OO the docstring should be None instead
 
1332
            self.assertIs(obj.__doc__, None)
 
1333
        else:
 
1334
            self.assertEqual(expected_docstring, obj.__doc__)
 
1335
 
1337
1336
    def failUnlessExists(self, path):
1338
1337
        """Fail unless path or paths, which may be abs or relative, exist."""
1339
1338
        if not isinstance(path, basestring):
1468
1467
 
1469
1468
        The file is removed as the test is torn down.
1470
1469
        """
1471
 
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1472
 
        self._log_file = os.fdopen(fileno, 'w+')
 
1470
        self._log_file = StringIO()
1473
1471
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1474
 
        self._log_file_name = name
1475
1472
        self.addCleanup(self._finishLogFile)
1476
1473
 
1477
1474
    def _finishLogFile(self):
1479
1476
 
1480
1477
        Close the file and delete it, unless setKeepLogfile was called.
1481
1478
        """
1482
 
        if self._log_file is None:
1483
 
            return
 
1479
        if bzrlib.trace._trace_file:
 
1480
            # flush the log file, to get all content
 
1481
            bzrlib.trace._trace_file.flush()
1484
1482
        bzrlib.trace.pop_log_file(self._log_memento)
1485
 
        self._log_file.close()
1486
 
        self._log_file = None
1487
 
        if not self._keep_log_file:
1488
 
            os.remove(self._log_file_name)
1489
 
            self._log_file_name = None
1490
 
 
1491
 
    def setKeepLogfile(self):
1492
 
        """Make the logfile not be deleted when _finishLogFile is called."""
1493
 
        self._keep_log_file = True
 
1483
        # Cache the log result and delete the file on disk
 
1484
        self._get_log(False)
1494
1485
 
1495
1486
    def thisFailsStrictLockCheck(self):
1496
1487
        """It is known that this test would fail with -Dstrict_locks.
1505
1496
        """
1506
1497
        debug.debug_flags.discard('strict_locks')
1507
1498
 
1508
 
    def addCleanup(self, callable, *args, **kwargs):
1509
 
        """Arrange to run a callable when this case is torn down.
1510
 
 
1511
 
        Callables are run in the reverse of the order they are registered,
1512
 
        ie last-in first-out.
 
1499
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1500
        """Overrides an object attribute restoring it after the test.
 
1501
 
 
1502
        :param obj: The object that will be mutated.
 
1503
 
 
1504
        :param attr_name: The attribute name we want to preserve/override in
 
1505
            the object.
 
1506
 
 
1507
        :param new: The optional value we want to set the attribute to.
 
1508
 
 
1509
        :returns: The actual attr value.
1513
1510
        """
1514
 
        self._cleanups.append((callable, args, kwargs))
 
1511
        value = getattr(obj, attr_name)
 
1512
        # The actual value is captured by the call below
 
1513
        self.addCleanup(setattr, obj, attr_name, value)
 
1514
        if new is not _unitialized_attr:
 
1515
            setattr(obj, attr_name, new)
 
1516
        return value
1515
1517
 
1516
1518
    def _cleanEnvironment(self):
1517
1519
        new_env = {
1524
1526
            'EDITOR': None,
1525
1527
            'BZR_EMAIL': None,
1526
1528
            'BZREMAIL': None, # may still be present in the environment
1527
 
            'EMAIL': None,
 
1529
            'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
1528
1530
            'BZR_PROGRESS_BAR': None,
1529
1531
            'BZR_LOG': None,
1530
1532
            'BZR_PLUGIN_PATH': None,
 
1533
            'BZR_DISABLE_PLUGINS': None,
 
1534
            'BZR_PLUGINS_AT': None,
1531
1535
            'BZR_CONCURRENCY': None,
1532
1536
            # Make sure that any text ui tests are consistent regardless of
1533
1537
            # the environment the test case is run in; you may want tests that
1554
1558
            'ftp_proxy': None,
1555
1559
            'FTP_PROXY': None,
1556
1560
            'BZR_REMOTE_PATH': None,
 
1561
            # Generally speaking, we don't want apport reporting on crashes in
 
1562
            # the test envirnoment unless we're specifically testing apport,
 
1563
            # so that it doesn't leak into the real system environment.  We
 
1564
            # use an env var so it propagates to subprocesses.
 
1565
            'APPORT_DISABLE': '1',
1557
1566
        }
1558
 
        self.__old_env = {}
 
1567
        self._old_env = {}
1559
1568
        self.addCleanup(self._restoreEnvironment)
1560
1569
        for name, value in new_env.iteritems():
1561
1570
            self._captureVar(name, value)
1562
1571
 
1563
1572
    def _captureVar(self, name, newvalue):
1564
1573
        """Set an environment variable, and reset it when finished."""
1565
 
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1566
 
 
1567
 
    def _restore_debug_flags(self):
1568
 
        debug.debug_flags.clear()
1569
 
        debug.debug_flags.update(self._preserved_debug_flags)
 
1574
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1570
1575
 
1571
1576
    def _restoreEnvironment(self):
1572
 
        for name, value in self.__old_env.iteritems():
 
1577
        for name, value in self._old_env.iteritems():
1573
1578
            osutils.set_or_unset_env(name, value)
1574
1579
 
1575
1580
    def _restoreHooks(self):
1580
1585
        """This test has failed for some known reason."""
1581
1586
        raise KnownFailure(reason)
1582
1587
 
 
1588
    def _suppress_log(self):
 
1589
        """Remove the log info from details."""
 
1590
        self.discardDetail('log')
 
1591
 
1583
1592
    def _do_skip(self, result, reason):
 
1593
        self._suppress_log()
1584
1594
        addSkip = getattr(result, 'addSkip', None)
1585
1595
        if not callable(addSkip):
1586
1596
            result.addSuccess(result)
1587
1597
        else:
1588
1598
            addSkip(self, reason)
1589
1599
 
1590
 
    def _do_known_failure(self, result):
 
1600
    @staticmethod
 
1601
    def _do_known_failure(self, result, e):
 
1602
        self._suppress_log()
1591
1603
        err = sys.exc_info()
1592
1604
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1593
1605
        if addExpectedFailure is not None:
1595
1607
        else:
1596
1608
            result.addSuccess(self)
1597
1609
 
 
1610
    @staticmethod
1598
1611
    def _do_not_applicable(self, result, e):
1599
1612
        if not e.args:
1600
1613
            reason = 'No reason given'
1601
1614
        else:
1602
1615
            reason = e.args[0]
 
1616
        self._suppress_log ()
1603
1617
        addNotApplicable = getattr(result, 'addNotApplicable', None)
1604
1618
        if addNotApplicable is not None:
1605
1619
            result.addNotApplicable(self, reason)
1606
1620
        else:
1607
1621
            self._do_skip(result, reason)
1608
1622
 
1609
 
    def _do_unsupported_or_skip(self, result, reason):
 
1623
    @staticmethod
 
1624
    def _report_skip(self, result, err):
 
1625
        """Override the default _report_skip.
 
1626
 
 
1627
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1628
        already been formatted into the 'reason' string, and we can't pull it
 
1629
        out again.
 
1630
        """
 
1631
        self._suppress_log()
 
1632
        super(TestCase, self)._report_skip(self, result, err)
 
1633
 
 
1634
    @staticmethod
 
1635
    def _report_expected_failure(self, result, err):
 
1636
        """Strip the log.
 
1637
 
 
1638
        See _report_skip for motivation.
 
1639
        """
 
1640
        self._suppress_log()
 
1641
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1642
 
 
1643
    @staticmethod
 
1644
    def _do_unsupported_or_skip(self, result, e):
 
1645
        reason = e.args[0]
 
1646
        self._suppress_log()
1610
1647
        addNotSupported = getattr(result, 'addNotSupported', None)
1611
1648
        if addNotSupported is not None:
1612
1649
            result.addNotSupported(self, reason)
1613
1650
        else:
1614
1651
            self._do_skip(result, reason)
1615
1652
 
1616
 
    def run(self, result=None):
1617
 
        if result is None: result = self.defaultTestResult()
1618
 
        result.startTest(self)
1619
 
        try:
1620
 
            self._run(result)
1621
 
            return result
1622
 
        finally:
1623
 
            result.stopTest(self)
1624
 
 
1625
 
    def _run(self, result):
1626
 
        for feature in getattr(self, '_test_needs_features', []):
1627
 
            if not feature.available():
1628
 
                return self._do_unsupported_or_skip(result, feature)
1629
 
        try:
1630
 
            absent_attr = object()
1631
 
            # Python 2.5
1632
 
            method_name = getattr(self, '_testMethodName', absent_attr)
1633
 
            if method_name is absent_attr:
1634
 
                # Python 2.4
1635
 
                method_name = getattr(self, '_TestCase__testMethodName')
1636
 
            testMethod = getattr(self, method_name)
1637
 
            try:
1638
 
                try:
1639
 
                    self.setUp()
1640
 
                    if not self._bzr_test_setUp_run:
1641
 
                        self.fail(
1642
 
                            "test setUp did not invoke "
1643
 
                            "bzrlib.tests.TestCase's setUp")
1644
 
                except KeyboardInterrupt:
1645
 
                    self._runCleanups()
1646
 
                    raise
1647
 
                except KnownFailure:
1648
 
                    self._do_known_failure(result)
1649
 
                    self.tearDown()
1650
 
                    return
1651
 
                except TestNotApplicable, e:
1652
 
                    self._do_not_applicable(result, e)
1653
 
                    self.tearDown()
1654
 
                    return
1655
 
                except TestSkipped, e:
1656
 
                    self._do_skip(result, e.args[0])
1657
 
                    self.tearDown()
1658
 
                    return result
1659
 
                except UnavailableFeature, e:
1660
 
                    self._do_unsupported_or_skip(result, e.args[0])
1661
 
                    self.tearDown()
1662
 
                    return
1663
 
                except:
1664
 
                    result.addError(self, sys.exc_info())
1665
 
                    self._runCleanups()
1666
 
                    return result
1667
 
 
1668
 
                ok = False
1669
 
                try:
1670
 
                    testMethod()
1671
 
                    ok = True
1672
 
                except KnownFailure:
1673
 
                    self._do_known_failure(result)
1674
 
                except self.failureException:
1675
 
                    result.addFailure(self, sys.exc_info())
1676
 
                except TestNotApplicable, e:
1677
 
                    self._do_not_applicable(result, e)
1678
 
                except TestSkipped, e:
1679
 
                    if not e.args:
1680
 
                        reason = "No reason given."
1681
 
                    else:
1682
 
                        reason = e.args[0]
1683
 
                    self._do_skip(result, reason)
1684
 
                except UnavailableFeature, e:
1685
 
                    self._do_unsupported_or_skip(result, e.args[0])
1686
 
                except KeyboardInterrupt:
1687
 
                    self._runCleanups()
1688
 
                    raise
1689
 
                except:
1690
 
                    result.addError(self, sys.exc_info())
1691
 
 
1692
 
                try:
1693
 
                    self.tearDown()
1694
 
                    if not self._bzr_test_tearDown_run:
1695
 
                        self.fail(
1696
 
                            "test tearDown did not invoke "
1697
 
                            "bzrlib.tests.TestCase's tearDown")
1698
 
                except KeyboardInterrupt:
1699
 
                    self._runCleanups()
1700
 
                    raise
1701
 
                except:
1702
 
                    result.addError(self, sys.exc_info())
1703
 
                    self._runCleanups()
1704
 
                    ok = False
1705
 
                if ok: result.addSuccess(self)
1706
 
                return result
1707
 
            except KeyboardInterrupt:
1708
 
                self._runCleanups()
1709
 
                raise
1710
 
        finally:
1711
 
            saved_attrs = {}
1712
 
            for attr_name in self.attrs_to_keep:
1713
 
                if attr_name in self.__dict__:
1714
 
                    saved_attrs[attr_name] = self.__dict__[attr_name]
1715
 
            self.__dict__ = saved_attrs
1716
 
 
1717
 
    def tearDown(self):
1718
 
        self._runCleanups()
1719
 
        self._log_contents = ''
1720
 
        self._bzr_test_tearDown_run = True
1721
 
        unittest.TestCase.tearDown(self)
1722
 
 
1723
1653
    def time(self, callable, *args, **kwargs):
1724
1654
        """Run callable and accrue the time it takes to the benchmark time.
1725
1655
 
1728
1658
        self._benchcalls.
1729
1659
        """
1730
1660
        if self._benchtime is None:
 
1661
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1662
                "text", "plain"), lambda:[str(self._benchtime)]))
1731
1663
            self._benchtime = 0
1732
1664
        start = time.time()
1733
1665
        try:
1742
1674
        finally:
1743
1675
            self._benchtime += time.time() - start
1744
1676
 
1745
 
    def _runCleanups(self):
1746
 
        """Run registered cleanup functions.
1747
 
 
1748
 
        This should only be called from TestCase.tearDown.
1749
 
        """
1750
 
        # TODO: Perhaps this should keep running cleanups even if
1751
 
        # one of them fails?
1752
 
 
1753
 
        # Actually pop the cleanups from the list so tearDown running
1754
 
        # twice is safe (this happens for skipped tests).
1755
 
        while self._cleanups:
1756
 
            cleanup, args, kwargs = self._cleanups.pop()
1757
 
            cleanup(*args, **kwargs)
1758
 
 
1759
1677
    def log(self, *args):
1760
1678
        mutter(*args)
1761
1679
 
1762
1680
    def _get_log(self, keep_log_file=False):
1763
 
        """Get the log from bzrlib.trace calls from this test.
 
1681
        """Internal helper to get the log from bzrlib.trace for this test.
 
1682
 
 
1683
        Please use self.getDetails, or self.get_log to access this in test case
 
1684
        code.
1764
1685
 
1765
1686
        :param keep_log_file: When True, if the log is still a file on disk
1766
1687
            leave it as a file on disk. When False, if the log is still a file
1768
1689
            self._log_contents.
1769
1690
        :return: A string containing the log.
1770
1691
        """
1771
 
        # flush the log file, to get all content
1772
 
        import bzrlib.trace
1773
 
        if bzrlib.trace._trace_file:
1774
 
            bzrlib.trace._trace_file.flush()
1775
 
        if self._log_contents:
1776
 
            # XXX: this can hardly contain the content flushed above --vila
1777
 
            # 20080128
 
1692
        if self._log_contents is not None:
 
1693
            try:
 
1694
                self._log_contents.decode('utf8')
 
1695
            except UnicodeDecodeError:
 
1696
                unicodestr = self._log_contents.decode('utf8', 'replace')
 
1697
                self._log_contents = unicodestr.encode('utf8')
1778
1698
            return self._log_contents
1779
 
        if self._log_file_name is not None:
1780
 
            logfile = open(self._log_file_name)
 
1699
        if self._log_file is not None:
 
1700
            log_contents = self._log_file.getvalue()
1781
1701
            try:
1782
 
                log_contents = logfile.read()
1783
 
            finally:
1784
 
                logfile.close()
 
1702
                log_contents.decode('utf8')
 
1703
            except UnicodeDecodeError:
 
1704
                unicodestr = log_contents.decode('utf8', 'replace')
 
1705
                log_contents = unicodestr.encode('utf8')
1785
1706
            if not keep_log_file:
 
1707
                self._log_file = None
 
1708
                # Permit multiple calls to get_log until we clean it up in
 
1709
                # finishLogFile
1786
1710
                self._log_contents = log_contents
1787
 
                try:
1788
 
                    os.remove(self._log_file_name)
1789
 
                except OSError, e:
1790
 
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
1791
 
                        sys.stderr.write(('Unable to delete log file '
1792
 
                                             ' %r\n' % self._log_file_name))
1793
 
                    else:
1794
 
                        raise
1795
1711
            return log_contents
1796
1712
        else:
1797
 
            return "DELETED log file to reduce memory footprint"
 
1713
            return "No log file content."
 
1714
 
 
1715
    def get_log(self):
 
1716
        """Get a unicode string containing the log from bzrlib.trace.
 
1717
 
 
1718
        Undecodable characters are replaced.
 
1719
        """
 
1720
        return u"".join(self.getDetails()['log'].iter_text())
1798
1721
 
1799
1722
    def requireFeature(self, feature):
1800
1723
        """This test requires a specific feature is available.
2008
1931
            variables. A value of None will unset the env variable.
2009
1932
            The values must be strings. The change will only occur in the
2010
1933
            child, so you don't need to fix the environment after running.
2011
 
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
2012
 
            is not available.
 
1934
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
1935
            doesn't support signalling subprocesses.
2013
1936
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
2014
1937
 
2015
1938
        :returns: Popen object for the started process.
2016
1939
        """
2017
1940
        if skip_if_plan_to_signal:
2018
 
            if not getattr(os, 'kill', None):
2019
 
                raise TestSkipped("os.kill not available.")
 
1941
            if os.name != "posix":
 
1942
                raise TestSkipped("Sending signals not supported")
2020
1943
 
2021
1944
        if env_changes is None:
2022
1945
            env_changes = {}
2049
1972
            if not allow_plugins:
2050
1973
                command.append('--no-plugins')
2051
1974
            command.extend(process_args)
2052
 
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1975
            process = self._popen(command, stdin=subprocess.PIPE,
 
1976
                                  stdout=subprocess.PIPE,
 
1977
                                  stderr=subprocess.PIPE)
2053
1978
        finally:
2054
1979
            restore_environment()
2055
1980
            if cwd is not None:
2063
1988
        Allows tests to override this method to intercept the calls made to
2064
1989
        Popen for introspection.
2065
1990
        """
2066
 
        return Popen(*args, **kwargs)
 
1991
        return subprocess.Popen(*args, **kwargs)
2067
1992
 
2068
1993
    def get_source_path(self):
2069
1994
        """Return the path of the directory containing bzrlib."""
2071
1996
 
2072
1997
    def get_bzr_path(self):
2073
1998
        """Return the path of the 'bzr' executable for this test suite."""
2074
 
        bzr_path = self.get_source_path()+'/bzr'
 
1999
        bzr_path = os.path.join(self.get_source_path(), "bzr")
2075
2000
        if not os.path.isfile(bzr_path):
2076
2001
            # We are probably installed. Assume sys.argv is the right file
2077
2002
            bzr_path = sys.argv[0]
2163
2088
 
2164
2089
        Tests that expect to provoke LockContention errors should call this.
2165
2090
        """
2166
 
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2167
 
        def resetTimeout():
2168
 
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2169
 
        self.addCleanup(resetTimeout)
2170
 
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
2091
        self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2171
2092
 
2172
2093
    def make_utf8_encoded_stringio(self, encoding_type=None):
2173
2094
        """Return a StringIOWrapper instance, that will encode Unicode
2187
2108
        request_handlers = request.request_handlers
2188
2109
        orig_method = request_handlers.get(verb)
2189
2110
        request_handlers.remove(verb)
2190
 
        def restoreVerb():
2191
 
            request_handlers.register(verb, orig_method)
2192
 
        self.addCleanup(restoreVerb)
 
2111
        self.addCleanup(request_handlers.register, verb, orig_method)
2193
2112
 
2194
2113
 
2195
2114
class CapturedCall(object):
2255
2174
 
2256
2175
        :param relpath: a path relative to the base url.
2257
2176
        """
2258
 
        t = get_transport(self.get_url(relpath))
 
2177
        t = _mod_transport.get_transport(self.get_url(relpath))
2259
2178
        self.assertFalse(t.is_readonly())
2260
2179
        return t
2261
2180
 
2267
2186
 
2268
2187
        :param relpath: a path relative to the base url.
2269
2188
        """
2270
 
        t = get_transport(self.get_readonly_url(relpath))
 
2189
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2271
2190
        self.assertTrue(t.is_readonly())
2272
2191
        return t
2273
2192
 
2286
2205
        if self.__readonly_server is None:
2287
2206
            if self.transport_readonly_server is None:
2288
2207
                # readonly decorator requested
2289
 
                self.__readonly_server = ReadonlyServer()
 
2208
                self.__readonly_server = test_server.ReadonlyServer()
2290
2209
            else:
2291
2210
                # explicit readonly transport.
2292
2211
                self.__readonly_server = self.create_transport_readonly_server()
2315
2234
        is no means to override it.
2316
2235
        """
2317
2236
        if self.__vfs_server is None:
2318
 
            self.__vfs_server = MemoryServer()
 
2237
            self.__vfs_server = memory.MemoryServer()
2319
2238
            self.start_server(self.__vfs_server)
2320
2239
        return self.__vfs_server
2321
2240
 
2403
2322
        propagating. This method ensures than a test did not leaked.
2404
2323
        """
2405
2324
        root = TestCaseWithMemoryTransport.TEST_ROOT
2406
 
        self.permit_url(get_transport(root).base)
 
2325
        self.permit_url(_mod_transport.get_transport(root).base)
2407
2326
        wt = workingtree.WorkingTree.open(root)
2408
2327
        last_rev = wt.last_revision()
2409
2328
        if last_rev != 'null:':
2454
2373
            # might be a relative or absolute path
2455
2374
            maybe_a_url = self.get_url(relpath)
2456
2375
            segments = maybe_a_url.rsplit('/', 1)
2457
 
            t = get_transport(maybe_a_url)
 
2376
            t = _mod_transport.get_transport(maybe_a_url)
2458
2377
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2459
2378
                t.ensure_base()
2460
2379
            if format is None:
2477
2396
        made_control = self.make_bzrdir(relpath, format=format)
2478
2397
        return made_control.create_repository(shared=shared)
2479
2398
 
2480
 
    def make_smart_server(self, path):
2481
 
        smart_server = server.SmartTCPServer_for_testing()
2482
 
        self.start_server(smart_server, self.get_server())
2483
 
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2399
    def make_smart_server(self, path, backing_server=None):
 
2400
        if backing_server is None:
 
2401
            backing_server = self.get_server()
 
2402
        smart_server = test_server.SmartTCPServer_for_testing()
 
2403
        self.start_server(smart_server, backing_server)
 
2404
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2405
                                                   ).clone(path)
2484
2406
        return remote_transport
2485
2407
 
2486
2408
    def make_branch_and_memory_tree(self, relpath, format=None):
2501
2423
 
2502
2424
    def setUp(self):
2503
2425
        super(TestCaseWithMemoryTransport, self).setUp()
 
2426
        # Ensure that ConnectedTransport doesn't leak sockets
 
2427
        def get_transport_with_cleanup(*args, **kwargs):
 
2428
            t = orig_get_transport(*args, **kwargs)
 
2429
            if isinstance(t, _mod_transport.ConnectedTransport):
 
2430
                self.addCleanup(t.disconnect)
 
2431
            return t
 
2432
 
 
2433
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2434
                                               get_transport_with_cleanup)
2504
2435
        self._make_test_root()
2505
 
        _currentdir = os.getcwdu()
2506
 
        def _leaveDirectory():
2507
 
            os.chdir(_currentdir)
2508
 
        self.addCleanup(_leaveDirectory)
 
2436
        self.addCleanup(os.chdir, os.getcwdu())
2509
2437
        self.makeAndChdirToTestDir()
2510
2438
        self.overrideEnvironmentForTesting()
2511
2439
        self.__readonly_server = None
2514
2442
 
2515
2443
    def setup_smart_server_with_call_log(self):
2516
2444
        """Sets up a smart server as the transport server with a call log."""
2517
 
        self.transport_server = server.SmartTCPServer_for_testing
 
2445
        self.transport_server = test_server.SmartTCPServer_for_testing
2518
2446
        self.hpss_calls = []
2519
2447
        import traceback
2520
2448
        # Skip the current stack down to the caller of
2554
2482
 
2555
2483
    def check_file_contents(self, filename, expect):
2556
2484
        self.log("check contents of file %s" % filename)
2557
 
        contents = file(filename, 'r').read()
 
2485
        f = file(filename)
 
2486
        try:
 
2487
            contents = f.read()
 
2488
        finally:
 
2489
            f.close()
2558
2490
        if contents != expect:
2559
2491
            self.log("expected: %r" % expect)
2560
2492
            self.log("actually: %r" % contents)
2634
2566
                "a list or a tuple. Got %r instead" % (shape,))
2635
2567
        # It's OK to just create them using forward slashes on windows.
2636
2568
        if transport is None or transport.is_readonly():
2637
 
            transport = get_transport(".")
 
2569
            transport = _mod_transport.get_transport(".")
2638
2570
        for name in shape:
2639
2571
            self.assertIsInstance(name, basestring)
2640
2572
            if name[-1] == '/':
2650
2582
                content = "contents of %s%s" % (name.encode('utf-8'), end)
2651
2583
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
2652
2584
 
2653
 
    def build_tree_contents(self, shape):
2654
 
        build_tree_contents(shape)
 
2585
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
2655
2586
 
2656
2587
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2657
2588
        """Assert whether path or paths are in the WorkingTree"""
2733
2664
            # We can only make working trees locally at the moment.  If the
2734
2665
            # transport can't support them, then we keep the non-disk-backed
2735
2666
            # branch and create a local checkout.
2736
 
            if self.vfs_transport_factory is LocalURLServer:
 
2667
            if self.vfs_transport_factory is test_server.LocalURLServer:
2737
2668
                # the branch is colocated on disk, we cannot create a checkout.
2738
2669
                # hopefully callers will expect this.
2739
2670
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2798
2729
    """
2799
2730
 
2800
2731
    def setUp(self):
 
2732
        from bzrlib.tests import http_server
2801
2733
        super(ChrootedTestCase, self).setUp()
2802
 
        if not self.vfs_transport_factory == MemoryServer:
2803
 
            self.transport_readonly_server = HttpServer
 
2734
        if not self.vfs_transport_factory == memory.MemoryServer:
 
2735
            self.transport_readonly_server = http_server.HttpServer
2804
2736
 
2805
2737
 
2806
2738
def condition_id_re(pattern):
2809
2741
    :param pattern: A regular expression string.
2810
2742
    :return: A callable that returns True if the re matches.
2811
2743
    """
2812
 
    filter_re = osutils.re_compile_checked(pattern, 0,
2813
 
        'test filter')
 
2744
    filter_re = re.compile(pattern, 0)
2814
2745
    def condition(test):
2815
2746
        test_id = test.id()
2816
2747
        return filter_re.search(test_id)
3068
2999
 
3069
3000
 
3070
3001
def fork_decorator(suite):
 
3002
    if getattr(os, "fork", None) is None:
 
3003
        raise errors.BzrCommandError("platform does not support fork,"
 
3004
            " try --parallel=subprocess instead.")
3071
3005
    concurrency = osutils.local_concurrency()
3072
3006
    if concurrency == 1:
3073
3007
        return suite
3128
3062
    return suite
3129
3063
 
3130
3064
 
3131
 
class TestDecorator(TestSuite):
 
3065
class TestDecorator(TestUtil.TestSuite):
3132
3066
    """A decorator for TestCase/TestSuite objects.
3133
3067
    
3134
3068
    Usually, subclasses should override __iter__(used when flattening test
3137
3071
    """
3138
3072
 
3139
3073
    def __init__(self, suite):
3140
 
        TestSuite.__init__(self)
 
3074
        TestUtil.TestSuite.__init__(self)
3141
3075
        self.addTest(suite)
3142
3076
 
3143
3077
    def countTestCases(self):
3219
3153
        if self.randomised:
3220
3154
            return iter(self._tests)
3221
3155
        self.randomised = True
3222
 
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3156
        self.stream.write("Randomizing test order using seed %s\n\n" %
3223
3157
            (self.actual_seed()))
3224
3158
        # Initialise the random number generator.
3225
3159
        random.seed(self.actual_seed())
3262
3196
 
3263
3197
def partition_tests(suite, count):
3264
3198
    """Partition suite into count lists of tests."""
3265
 
    result = []
3266
 
    tests = list(iter_suite_tests(suite))
3267
 
    tests_per_process = int(math.ceil(float(len(tests)) / count))
3268
 
    for block in range(count):
3269
 
        low_test = block * tests_per_process
3270
 
        high_test = low_test + tests_per_process
3271
 
        process_tests = tests[low_test:high_test]
3272
 
        result.append(process_tests)
3273
 
    return result
 
3199
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3200
    # splits up blocks of related tests that might run faster if they shared
 
3201
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3202
    # just one partition.  So the slowest partition shouldn't be much slower
 
3203
    # than the fastest.
 
3204
    partitions = [list() for i in range(count)]
 
3205
    tests = iter_suite_tests(suite)
 
3206
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
 
3207
        partition.append(test)
 
3208
    return partitions
 
3209
 
 
3210
 
 
3211
def workaround_zealous_crypto_random():
 
3212
    """Crypto.Random want to help us being secure, but we don't care here.
 
3213
 
 
3214
    This workaround some test failure related to the sftp server. Once paramiko
 
3215
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3216
    """
 
3217
    try:
 
3218
        from Crypto.Random import atfork
 
3219
        atfork()
 
3220
    except ImportError:
 
3221
        pass
3274
3222
 
3275
3223
 
3276
3224
def fork_for_tests(suite):
3282
3230
    concurrency = osutils.local_concurrency()
3283
3231
    result = []
3284
3232
    from subunit import TestProtocolClient, ProtocolTestCase
 
3233
    from subunit.test_results import AutoTimingTestResultDecorator
3285
3234
    class TestInOtherProcess(ProtocolTestCase):
3286
3235
        # Should be in subunit, I think. RBC.
3287
3236
        def __init__(self, stream, pid):
3292
3241
            try:
3293
3242
                ProtocolTestCase.run(self, result)
3294
3243
            finally:
3295
 
                os.waitpid(self.pid, os.WNOHANG)
 
3244
                os.waitpid(self.pid, 0)
3296
3245
 
3297
3246
    test_blocks = partition_tests(suite, concurrency)
3298
3247
    for process_tests in test_blocks:
3299
 
        process_suite = TestSuite()
 
3248
        process_suite = TestUtil.TestSuite()
3300
3249
        process_suite.addTests(process_tests)
3301
3250
        c2pread, c2pwrite = os.pipe()
3302
3251
        pid = os.fork()
3303
3252
        if pid == 0:
 
3253
            workaround_zealous_crypto_random()
3304
3254
            try:
3305
3255
                os.close(c2pread)
3306
3256
                # Leave stderr and stdout open so we can see test noise
3310
3260
                sys.stdin.close()
3311
3261
                sys.stdin = None
3312
3262
                stream = os.fdopen(c2pwrite, 'wb', 1)
3313
 
                subunit_result = BzrAutoTimingTestResultDecorator(
 
3263
                subunit_result = AutoTimingTestResultDecorator(
3314
3264
                    TestProtocolClient(stream))
3315
3265
                process_suite.run(subunit_result)
3316
3266
            finally:
3367
3317
                '--subunit']
3368
3318
            if '--no-plugins' in sys.argv:
3369
3319
                argv.append('--no-plugins')
3370
 
            # stderr=STDOUT would be ideal, but until we prevent noise on
3371
 
            # stderr it can interrupt the subunit protocol.
3372
 
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3373
 
                bufsize=1)
 
3320
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3321
            # noise on stderr it can interrupt the subunit protocol.
 
3322
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3323
                                      stdout=subprocess.PIPE,
 
3324
                                      stderr=subprocess.PIPE,
 
3325
                                      bufsize=1)
3374
3326
            test = TestInSubprocess(process, test_list_file_name)
3375
3327
            result.append(test)
3376
3328
        except:
3408
3360
 
3409
3361
    def addFailure(self, test, err):
3410
3362
        self.result.addFailure(test, err)
3411
 
 
3412
 
 
3413
 
class BZRTransformingResult(ForwardingResult):
3414
 
 
3415
 
    def addError(self, test, err):
3416
 
        feature = self._error_looks_like('UnavailableFeature: ', err)
3417
 
        if feature is not None:
3418
 
            self.result.addNotSupported(test, feature)
3419
 
        else:
3420
 
            self.result.addError(test, err)
3421
 
 
3422
 
    def addFailure(self, test, err):
3423
 
        known = self._error_looks_like('KnownFailure: ', err)
3424
 
        if known is not None:
3425
 
            self.result.addExpectedFailure(test,
3426
 
                [KnownFailure, KnownFailure(known), None])
3427
 
        else:
3428
 
            self.result.addFailure(test, err)
3429
 
 
3430
 
    def _error_looks_like(self, prefix, err):
3431
 
        """Deserialize exception and returns the stringify value."""
3432
 
        import subunit
3433
 
        value = None
3434
 
        typ, exc, _ = err
3435
 
        if isinstance(exc, subunit.RemoteException):
3436
 
            # stringify the exception gives access to the remote traceback
3437
 
            # We search the last line for 'prefix'
3438
 
            lines = str(exc).split('\n')
3439
 
            while lines and not lines[-1]:
3440
 
                lines.pop(-1)
3441
 
            if lines:
3442
 
                if lines[-1].startswith(prefix):
3443
 
                    value = lines[-1][len(prefix):]
3444
 
        return value
3445
 
 
3446
 
 
3447
 
try:
3448
 
    from subunit.test_results import AutoTimingTestResultDecorator
3449
 
    # Expected failure should be seen as a success not a failure Once subunit
3450
 
    # provide native support for that, BZRTransformingResult and this class
3451
 
    # will become useless.
3452
 
    class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
3453
 
 
3454
 
        def addExpectedFailure(self, test, err):
3455
 
            self._before_event()
3456
 
            return self._call_maybe("addExpectedFailure", self._degrade_skip,
3457
 
                                    test, err)
3458
 
except ImportError:
3459
 
    # Let's just define a no-op decorator
3460
 
    BzrAutoTimingTestResultDecorator = lambda x:x
 
3363
ForwardingResult = testtools.ExtendedToOriginalDecorator
3461
3364
 
3462
3365
 
3463
3366
class ProfileResult(ForwardingResult):
3474
3377
 
3475
3378
    def startTest(self, test):
3476
3379
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3380
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3381
        # unavoidably fail.
 
3382
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
3477
3383
        self.profiler.start()
3478
3384
        ForwardingResult.startTest(self, test)
3479
3385
 
3500
3406
#                           rather than failing tests. And no longer raise
3501
3407
#                           LockContention when fctnl locks are not being used
3502
3408
#                           with proper exclusion rules.
 
3409
#   -Ethreads               Will display thread ident at creation/join time to
 
3410
#                           help track thread leaks
3503
3411
selftest_debug_flags = set()
3504
3412
 
3505
3413
 
3724
3632
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3725
3633
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3726
3634
 
3727
 
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3635
# Obvious highest levels prefixes, feel free to add your own via a plugin
3728
3636
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3729
3637
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3730
3638
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3738
3646
        'bzrlib.doc',
3739
3647
        'bzrlib.tests.blackbox',
3740
3648
        'bzrlib.tests.commands',
 
3649
        'bzrlib.tests.doc_generate',
3741
3650
        'bzrlib.tests.per_branch',
3742
3651
        'bzrlib.tests.per_bzrdir',
 
3652
        'bzrlib.tests.per_controldir',
 
3653
        'bzrlib.tests.per_controldir_colo',
3743
3654
        'bzrlib.tests.per_foreign_vcs',
3744
3655
        'bzrlib.tests.per_interrepository',
3745
3656
        'bzrlib.tests.per_intertree',
3757
3668
        'bzrlib.tests.per_versionedfile',
3758
3669
        'bzrlib.tests.per_workingtree',
3759
3670
        'bzrlib.tests.test__annotator',
 
3671
        'bzrlib.tests.test__bencode',
 
3672
        'bzrlib.tests.test__btree_serializer',
3760
3673
        'bzrlib.tests.test__chk_map',
3761
3674
        'bzrlib.tests.test__dirstate_helpers',
3762
3675
        'bzrlib.tests.test__groupcompress',
3770
3683
        'bzrlib.tests.test_api',
3771
3684
        'bzrlib.tests.test_atomicfile',
3772
3685
        'bzrlib.tests.test_bad_files',
3773
 
        'bzrlib.tests.test_bencode',
3774
3686
        'bzrlib.tests.test_bisect_multi',
3775
3687
        'bzrlib.tests.test_branch',
3776
3688
        'bzrlib.tests.test_branchbuilder',
3785
3697
        'bzrlib.tests.test_chunk_writer',
3786
3698
        'bzrlib.tests.test_clean_tree',
3787
3699
        'bzrlib.tests.test_cleanup',
 
3700
        'bzrlib.tests.test_cmdline',
3788
3701
        'bzrlib.tests.test_commands',
3789
3702
        'bzrlib.tests.test_commit',
3790
3703
        'bzrlib.tests.test_commit_merge',
3805
3718
        'bzrlib.tests.test_export',
3806
3719
        'bzrlib.tests.test_extract',
3807
3720
        'bzrlib.tests.test_fetch',
 
3721
        'bzrlib.tests.test_fixtures',
3808
3722
        'bzrlib.tests.test_fifo_cache',
3809
3723
        'bzrlib.tests.test_filters',
3810
3724
        'bzrlib.tests.test_ftp_transport',
3824
3738
        'bzrlib.tests.test_identitymap',
3825
3739
        'bzrlib.tests.test_ignores',
3826
3740
        'bzrlib.tests.test_index',
 
3741
        'bzrlib.tests.test_import_tariff',
3827
3742
        'bzrlib.tests.test_info',
3828
3743
        'bzrlib.tests.test_inv',
3829
3744
        'bzrlib.tests.test_inventory_delta',
3830
3745
        'bzrlib.tests.test_knit',
3831
3746
        'bzrlib.tests.test_lazy_import',
3832
3747
        'bzrlib.tests.test_lazy_regex',
 
3748
        'bzrlib.tests.test_library_state',
3833
3749
        'bzrlib.tests.test_lock',
3834
3750
        'bzrlib.tests.test_lockable_files',
3835
3751
        'bzrlib.tests.test_lockdir',
3837
3753
        'bzrlib.tests.test_lru_cache',
3838
3754
        'bzrlib.tests.test_lsprof',
3839
3755
        'bzrlib.tests.test_mail_client',
 
3756
        'bzrlib.tests.test_matchers',
3840
3757
        'bzrlib.tests.test_memorytree',
3841
3758
        'bzrlib.tests.test_merge',
3842
3759
        'bzrlib.tests.test_merge3',
3891
3808
        'bzrlib.tests.test_switch',
3892
3809
        'bzrlib.tests.test_symbol_versioning',
3893
3810
        'bzrlib.tests.test_tag',
 
3811
        'bzrlib.tests.test_test_server',
3894
3812
        'bzrlib.tests.test_testament',
3895
3813
        'bzrlib.tests.test_textfile',
3896
3814
        'bzrlib.tests.test_textmerge',
3902
3820
        'bzrlib.tests.test_transport_log',
3903
3821
        'bzrlib.tests.test_tree',
3904
3822
        'bzrlib.tests.test_treebuilder',
 
3823
        'bzrlib.tests.test_treeshape',
3905
3824
        'bzrlib.tests.test_tsort',
3906
3825
        'bzrlib.tests.test_tuned_gzip',
3907
3826
        'bzrlib.tests.test_ui',
3911
3830
        'bzrlib.tests.test_urlutils',
3912
3831
        'bzrlib.tests.test_version',
3913
3832
        'bzrlib.tests.test_version_info',
 
3833
        'bzrlib.tests.test_versionedfile',
3914
3834
        'bzrlib.tests.test_weave',
3915
3835
        'bzrlib.tests.test_whitebox',
3916
3836
        'bzrlib.tests.test_win32utils',
3922
3842
 
3923
3843
 
3924
3844
def _test_suite_modules_to_doctest():
3925
 
    """Return the list of modules to doctest."""   
 
3845
    """Return the list of modules to doctest."""
 
3846
    if __doc__ is None:
 
3847
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
3848
        return []
3926
3849
    return [
3927
3850
        'bzrlib',
3928
3851
        'bzrlib.branchbuilder',
 
3852
        'bzrlib.decorators',
3929
3853
        'bzrlib.export',
3930
3854
        'bzrlib.inventory',
3931
3855
        'bzrlib.iterablefile',
3934
3858
        'bzrlib.option',
3935
3859
        'bzrlib.symbol_versioning',
3936
3860
        'bzrlib.tests',
 
3861
        'bzrlib.tests.fixtures',
3937
3862
        'bzrlib.timestamp',
3938
3863
        'bzrlib.transport.http',
3939
3864
        'bzrlib.version_info_formats.format_custom',
4081
4006
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4082
4007
    ...     [('one', dict(param=1)),
4083
4008
    ...      ('two', dict(param=2))],
4084
 
    ...     TestSuite())
 
4009
    ...     TestUtil.TestSuite())
4085
4010
    >>> tests = list(iter_suite_tests(r))
4086
4011
    >>> len(tests)
4087
4012
    2
4134
4059
    :param new_id: The id to assign to it.
4135
4060
    :return: The new test.
4136
4061
    """
4137
 
    new_test = copy(test)
 
4062
    new_test = copy.copy(test)
4138
4063
    new_test.id = lambda: new_id
 
4064
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4065
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4066
    # read the test output for parameterized tests, because tracebacks will be
 
4067
    # associated with irrelevant tests.
 
4068
    try:
 
4069
        details = new_test._TestCase__details
 
4070
    except AttributeError:
 
4071
        # must be a different version of testtools than expected.  Do nothing.
 
4072
        pass
 
4073
    else:
 
4074
        # Reset the '__details' dict.
 
4075
        new_test._TestCase__details = {}
4139
4076
    return new_test
4140
4077
 
4141
4078
 
 
4079
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4080
                                ext_module_name):
 
4081
    """Helper for permutating tests against an extension module.
 
4082
 
 
4083
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4084
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4085
    against both implementations. Setting 'test.module' to the appropriate
 
4086
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
4087
 
 
4088
    :param standard_tests: A test suite to permute
 
4089
    :param loader: A TestLoader
 
4090
    :param py_module_name: The python path to a python module that can always
 
4091
        be loaded, and will be considered the 'python' implementation. (eg
 
4092
        'bzrlib._chk_map_py')
 
4093
    :param ext_module_name: The python path to an extension module. If the
 
4094
        module cannot be loaded, a single test will be added, which notes that
 
4095
        the module is not available. If it can be loaded, all standard_tests
 
4096
        will be run against that module.
 
4097
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4098
        tests. feature is the Feature object that can be used to determine if
 
4099
        the module is available.
 
4100
    """
 
4101
 
 
4102
    py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
 
4103
    scenarios = [
 
4104
        ('python', {'module': py_module}),
 
4105
    ]
 
4106
    suite = loader.suiteClass()
 
4107
    feature = ModuleAvailableFeature(ext_module_name)
 
4108
    if feature.available():
 
4109
        scenarios.append(('C', {'module': feature.module}))
 
4110
    else:
 
4111
        # the compiled module isn't available, so we add a failing test
 
4112
        class FailWithoutFeature(TestCase):
 
4113
            def test_fail(self):
 
4114
                self.requireFeature(feature)
 
4115
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4116
    result = multiply_tests(standard_tests, scenarios, suite)
 
4117
    return result, feature
 
4118
 
 
4119
 
4142
4120
def _rmtree_temp_dir(dirname, test_id=None):
4143
4121
    # If LANG=C we probably have created some bogus paths
4144
4122
    # which rmtree(unicode) will fail to delete
4160
4138
        if test_id != None:
4161
4139
            ui.ui_factory.clear_term()
4162
4140
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4141
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4142
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4143
                                    ).encode('ascii', 'replace')
4163
4144
        sys.stderr.write('Unable to remove testing dir %s\n%s'
4164
 
                         % (os.path.basename(dirname), e))
 
4145
                         % (os.path.basename(dirname), printable_e))
4165
4146
 
4166
4147
 
4167
4148
class Feature(object):
4249
4230
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4250
4231
 
4251
4232
 
 
4233
class _CompatabilityThunkFeature(Feature):
 
4234
    """This feature is just a thunk to another feature.
 
4235
 
 
4236
    It issues a deprecation warning if it is accessed, to let you know that you
 
4237
    should really use a different feature.
 
4238
    """
 
4239
 
 
4240
    def __init__(self, dep_version, module, name,
 
4241
                 replacement_name, replacement_module=None):
 
4242
        super(_CompatabilityThunkFeature, self).__init__()
 
4243
        self._module = module
 
4244
        if replacement_module is None:
 
4245
            replacement_module = module
 
4246
        self._replacement_module = replacement_module
 
4247
        self._name = name
 
4248
        self._replacement_name = replacement_name
 
4249
        self._dep_version = dep_version
 
4250
        self._feature = None
 
4251
 
 
4252
    def _ensure(self):
 
4253
        if self._feature is None:
 
4254
            depr_msg = self._dep_version % ('%s.%s'
 
4255
                                            % (self._module, self._name))
 
4256
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4257
                                               self._replacement_name)
 
4258
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4259
            # Import the new feature and use it as a replacement for the
 
4260
            # deprecated one.
 
4261
            mod = __import__(self._replacement_module, {}, {},
 
4262
                             [self._replacement_name])
 
4263
            self._feature = getattr(mod, self._replacement_name)
 
4264
 
 
4265
    def _probe(self):
 
4266
        self._ensure()
 
4267
        return self._feature._probe()
 
4268
 
 
4269
 
4252
4270
class ModuleAvailableFeature(Feature):
4253
4271
    """This is a feature than describes a module we want to be available.
4254
4272
 
4274
4292
        if self.available(): # Make sure the probe has been done
4275
4293
            return self._module
4276
4294
        return None
4277
 
    
 
4295
 
4278
4296
    def feature_name(self):
4279
4297
        return self.module_name
4280
4298
 
4281
4299
 
 
4300
# This is kept here for compatibility, it is recommended to use
 
4301
# 'bzrlib.tests.feature.paramiko' instead
 
4302
ParamikoFeature = _CompatabilityThunkFeature(
 
4303
    deprecated_in((2,1,0)),
 
4304
    'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
 
4305
 
4282
4306
 
4283
4307
def probe_unicode_in_user_encoding():
4284
4308
    """Try to encode several unicode strings to use in unicode-aware tests.
4334
4358
HTTPSServerFeature = _HTTPSServerFeature()
4335
4359
 
4336
4360
 
4337
 
class _ParamikoFeature(Feature):
4338
 
    """Is paramiko available?"""
4339
 
 
4340
 
    def _probe(self):
4341
 
        try:
4342
 
            from bzrlib.transport.sftp import SFTPAbsoluteServer
4343
 
            return True
4344
 
        except errors.ParamikoNotPresent:
4345
 
            return False
4346
 
 
4347
 
    def feature_name(self):
4348
 
        return "Paramiko"
4349
 
 
4350
 
 
4351
 
ParamikoFeature = _ParamikoFeature()
4352
 
 
4353
 
 
4354
4361
class _UnicodeFilename(Feature):
4355
4362
    """Does the filesystem support Unicode filenames?"""
4356
4363
 
4371
4378
UnicodeFilename = _UnicodeFilename()
4372
4379
 
4373
4380
 
 
4381
class _ByteStringNamedFilesystem(Feature):
 
4382
    """Is the filesystem based on bytes?"""
 
4383
 
 
4384
    def _probe(self):
 
4385
        if os.name == "posix":
 
4386
            return True
 
4387
        return False
 
4388
 
 
4389
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4390
 
 
4391
 
4374
4392
class _UTF8Filesystem(Feature):
4375
4393
    """Is the filesystem UTF-8?"""
4376
4394
 
4461
4479
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4462
4480
 
4463
4481
 
4464
 
class _SubUnitFeature(Feature):
4465
 
    """Check if subunit is available."""
 
4482
class _CaseSensitiveFilesystemFeature(Feature):
4466
4483
 
4467
4484
    def _probe(self):
4468
 
        try:
4469
 
            import subunit
 
4485
        if CaseInsCasePresFilenameFeature.available():
 
4486
            return False
 
4487
        elif CaseInsensitiveFilesystemFeature.available():
 
4488
            return False
 
4489
        else:
4470
4490
            return True
4471
 
        except ImportError:
4472
 
            return False
4473
4491
 
4474
4492
    def feature_name(self):
4475
 
        return 'subunit'
4476
 
 
4477
 
SubUnitFeature = _SubUnitFeature()
 
4493
        return 'case-sensitive filesystem'
 
4494
 
 
4495
# new coding style is for feature instances to be lowercase
 
4496
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4497
 
 
4498
 
 
4499
# Kept for compatibility, use bzrlib.tests.features.subunit instead
 
4500
SubUnitFeature = _CompatabilityThunkFeature(
 
4501
    deprecated_in((2,1,0)),
 
4502
    'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4478
4503
# Only define SubUnitBzrRunner if subunit is available.
4479
4504
try:
4480
4505
    from subunit import TestProtocolClient
 
4506
    from subunit.test_results import AutoTimingTestResultDecorator
 
4507
    class SubUnitBzrProtocolClient(TestProtocolClient):
 
4508
 
 
4509
        def addSuccess(self, test, details=None):
 
4510
            # The subunit client always includes the details in the subunit
 
4511
            # stream, but we don't want to include it in ours.
 
4512
            if details is not None and 'log' in details:
 
4513
                del details['log']
 
4514
            return super(SubUnitBzrProtocolClient, self).addSuccess(
 
4515
                test, details)
 
4516
 
4481
4517
    class SubUnitBzrRunner(TextTestRunner):
4482
4518
        def run(self, test):
4483
 
            result = BzrAutoTimingTestResultDecorator(
4484
 
                TestProtocolClient(self.stream))
 
4519
            result = AutoTimingTestResultDecorator(
 
4520
                SubUnitBzrProtocolClient(self.stream))
4485
4521
            test.run(result)
4486
4522
            return result
4487
4523
except ImportError:
4488
4524
    pass
 
4525
 
 
4526
class _PosixPermissionsFeature(Feature):
 
4527
 
 
4528
    def _probe(self):
 
4529
        def has_perms():
 
4530
            # create temporary file and check if specified perms are maintained.
 
4531
            import tempfile
 
4532
 
 
4533
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4534
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4535
            fd, name = f
 
4536
            os.close(fd)
 
4537
            os.chmod(name, write_perms)
 
4538
 
 
4539
            read_perms = os.stat(name).st_mode & 0777
 
4540
            os.unlink(name)
 
4541
            return (write_perms == read_perms)
 
4542
 
 
4543
        return (os.name == 'posix') and has_perms()
 
4544
 
 
4545
    def feature_name(self):
 
4546
        return 'POSIX permissions support'
 
4547
 
 
4548
posix_permissions_feature = _PosixPermissionsFeature()