/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Pool
  • Date: 2009-08-03 05:31:01 UTC
  • mto: This revision was merged to the branch mainline in revision 4584.
  • Revision ID: mbp@sourcefrog.net-20090803053101-x1yx8cbx2rw89a5y
selftest sets ProgressTask.show_transport_activity off

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import math
 
37
import os
 
38
from pprint import pformat
 
39
import random
 
40
import re
 
41
import shlex
 
42
import stat
 
43
from subprocess import Popen, PIPE, STDOUT
 
44
import sys
 
45
import tempfile
 
46
import threading
 
47
import time
 
48
import unittest
 
49
import warnings
 
50
 
 
51
 
 
52
from bzrlib import (
 
53
    branchbuilder,
 
54
    bzrdir,
 
55
    debug,
 
56
    errors,
 
57
    hooks,
 
58
    lock as _mod_lock,
 
59
    memorytree,
 
60
    osutils,
 
61
    progress,
 
62
    ui,
 
63
    urlutils,
 
64
    registry,
 
65
    workingtree,
 
66
    )
 
67
import bzrlib.branch
 
68
import bzrlib.commands
 
69
import bzrlib.timestamp
 
70
import bzrlib.export
 
71
import bzrlib.inventory
 
72
import bzrlib.iterablefile
 
73
import bzrlib.lockdir
 
74
try:
 
75
    import bzrlib.lsprof
 
76
except ImportError:
 
77
    # lsprof not available
 
78
    pass
 
79
from bzrlib.merge import merge_inner
 
80
import bzrlib.merge3
 
81
import bzrlib.plugin
 
82
from bzrlib.smart import client, request, server
 
83
import bzrlib.store
 
84
from bzrlib import symbol_versioning
 
85
from bzrlib.symbol_versioning import (
 
86
    DEPRECATED_PARAMETER,
 
87
    deprecated_function,
 
88
    deprecated_method,
 
89
    deprecated_passed,
 
90
    )
 
91
import bzrlib.trace
 
92
from bzrlib.transport import get_transport
 
93
import bzrlib.transport
 
94
from bzrlib.transport.local import LocalURLServer
 
95
from bzrlib.transport.memory import MemoryServer
 
96
from bzrlib.transport.readonly import ReadonlyServer
 
97
from bzrlib.trace import mutter, note
 
98
from bzrlib.tests import TestUtil
 
99
from bzrlib.tests.http_server import HttpServer
 
100
from bzrlib.tests.TestUtil import (
 
101
                          TestSuite,
 
102
                          TestLoader,
 
103
                          )
 
104
from bzrlib.tests.treeshape import build_tree_contents
 
105
from bzrlib.ui.text import TextUIFactory
 
106
import bzrlib.version_info_formats.format_custom
 
107
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
108
 
 
109
# Mark this python module as being part of the implementation
 
110
# of unittest: this gives us better tracebacks where the last
 
111
# shown frame is the test code, not our assertXYZ.
 
112
__unittest = 1
 
113
 
 
114
default_transport = LocalURLServer
 
115
 
 
116
 
 
117
class ExtendedTestResult(unittest._TextTestResult):
 
118
    """Accepts, reports and accumulates the results of running tests.
 
119
 
 
120
    Compared to the unittest version this class adds support for
 
121
    profiling, benchmarking, stopping as soon as a test fails,  and
 
122
    skipping tests.  There are further-specialized subclasses for
 
123
    different types of display.
 
124
 
 
125
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
126
    addFailure or addError classes.  These in turn may redirect to a more
 
127
    specific case for the special test results supported by our extended
 
128
    tests.
 
129
 
 
130
    Note that just one of these objects is fed the results from many tests.
 
131
    """
 
132
 
 
133
    stop_early = False
 
134
 
 
135
    def __init__(self, stream, descriptions, verbosity,
 
136
                 bench_history=None,
 
137
                 num_tests=None,
 
138
                 strict=False,
 
139
                 ):
 
140
        """Construct new TestResult.
 
141
 
 
142
        :param bench_history: Optionally, a writable file object to accumulate
 
143
            benchmark results.
 
144
        """
 
145
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
146
        if bench_history is not None:
 
147
            from bzrlib.version import _get_bzr_source_tree
 
148
            src_tree = _get_bzr_source_tree()
 
149
            if src_tree:
 
150
                try:
 
151
                    revision_id = src_tree.get_parent_ids()[0]
 
152
                except IndexError:
 
153
                    # XXX: if this is a brand new tree, do the same as if there
 
154
                    # is no branch.
 
155
                    revision_id = ''
 
156
            else:
 
157
                # XXX: If there's no branch, what should we do?
 
158
                revision_id = ''
 
159
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
160
        self._bench_history = bench_history
 
161
        self.ui = ui.ui_factory
 
162
        self.num_tests = num_tests
 
163
        self.error_count = 0
 
164
        self.failure_count = 0
 
165
        self.known_failure_count = 0
 
166
        self.skip_count = 0
 
167
        self.not_applicable_count = 0
 
168
        self.unsupported = {}
 
169
        self.count = 0
 
170
        self._overall_start_time = time.time()
 
171
        self._strict = strict
 
172
 
 
173
    def done(self):
 
174
        if self._strict:
 
175
            ok = self.wasStrictlySuccessful()
 
176
        else:
 
177
            ok = self.wasSuccessful()
 
178
        if ok:
 
179
            self.stream.write('tests passed\n')
 
180
        else:
 
181
            self.stream.write('tests failed\n')
 
182
        if TestCase._first_thread_leaker_id:
 
183
            self.stream.write(
 
184
                '%s is leaking threads among %d leaking tests.\n' % (
 
185
                TestCase._first_thread_leaker_id,
 
186
                TestCase._leaking_threads_tests))
 
187
 
 
188
    def _extractBenchmarkTime(self, testCase):
 
189
        """Add a benchmark time for the current test case."""
 
190
        return getattr(testCase, "_benchtime", None)
 
191
 
 
192
    def _elapsedTestTimeString(self):
 
193
        """Return a time string for the overall time the current test has taken."""
 
194
        return self._formatTime(time.time() - self._start_time)
 
195
 
 
196
    def _testTimeString(self, testCase):
 
197
        benchmark_time = self._extractBenchmarkTime(testCase)
 
198
        if benchmark_time is not None:
 
199
            return self._formatTime(benchmark_time) + "*"
 
200
        else:
 
201
            return self._elapsedTestTimeString()
 
202
 
 
203
    def _formatTime(self, seconds):
 
204
        """Format seconds as milliseconds with leading spaces."""
 
205
        # some benchmarks can take thousands of seconds to run, so we need 8
 
206
        # places
 
207
        return "%8dms" % (1000 * seconds)
 
208
 
 
209
    def _shortened_test_description(self, test):
 
210
        what = test.id()
 
211
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
212
        return what
 
213
 
 
214
    def startTest(self, test):
 
215
        unittest.TestResult.startTest(self, test)
 
216
        if self.count == 0:
 
217
            self.startTests()
 
218
        self.report_test_start(test)
 
219
        test.number = self.count
 
220
        self._recordTestStartTime()
 
221
 
 
222
    def startTests(self):
 
223
        self.stream.write(
 
224
            'testing: %s\n' % (osutils.realpath(sys.argv[0]),))
 
225
        self.stream.write(
 
226
            '   %s (%s python%s)\n' % (
 
227
                    bzrlib.__path__[0],
 
228
                    bzrlib.version_string,
 
229
                    bzrlib._format_version_tuple(sys.version_info),
 
230
                    ))
 
231
        self.stream.write('\n')
 
232
 
 
233
    def _recordTestStartTime(self):
 
234
        """Record that a test has started."""
 
235
        self._start_time = time.time()
 
236
 
 
237
    def _cleanupLogFile(self, test):
 
238
        # We can only do this if we have one of our TestCases, not if
 
239
        # we have a doctest.
 
240
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
241
        if setKeepLogfile is not None:
 
242
            setKeepLogfile()
 
243
 
 
244
    def addError(self, test, err):
 
245
        """Tell result that test finished with an error.
 
246
 
 
247
        Called from the TestCase run() method when the test
 
248
        fails with an unexpected error.
 
249
        """
 
250
        self._testConcluded(test)
 
251
        if isinstance(err[1], TestNotApplicable):
 
252
            return self._addNotApplicable(test, err)
 
253
        elif isinstance(err[1], UnavailableFeature):
 
254
            return self.addNotSupported(test, err[1].args[0])
 
255
        else:
 
256
            unittest.TestResult.addError(self, test, err)
 
257
            self.error_count += 1
 
258
            self.report_error(test, err)
 
259
            if self.stop_early:
 
260
                self.stop()
 
261
            self._cleanupLogFile(test)
 
262
 
 
263
    def addFailure(self, test, err):
 
264
        """Tell result that test failed.
 
265
 
 
266
        Called from the TestCase run() method when the test
 
267
        fails because e.g. an assert() method failed.
 
268
        """
 
269
        self._testConcluded(test)
 
270
        if isinstance(err[1], KnownFailure):
 
271
            return self._addKnownFailure(test, err)
 
272
        else:
 
273
            unittest.TestResult.addFailure(self, test, err)
 
274
            self.failure_count += 1
 
275
            self.report_failure(test, err)
 
276
            if self.stop_early:
 
277
                self.stop()
 
278
            self._cleanupLogFile(test)
 
279
 
 
280
    def addSuccess(self, test):
 
281
        """Tell result that test completed successfully.
 
282
 
 
283
        Called from the TestCase run()
 
284
        """
 
285
        self._testConcluded(test)
 
286
        if self._bench_history is not None:
 
287
            benchmark_time = self._extractBenchmarkTime(test)
 
288
            if benchmark_time is not None:
 
289
                self._bench_history.write("%s %s\n" % (
 
290
                    self._formatTime(benchmark_time),
 
291
                    test.id()))
 
292
        self.report_success(test)
 
293
        self._cleanupLogFile(test)
 
294
        unittest.TestResult.addSuccess(self, test)
 
295
        test._log_contents = ''
 
296
 
 
297
    def _testConcluded(self, test):
 
298
        """Common code when a test has finished.
 
299
 
 
300
        Called regardless of whether it succeded, failed, etc.
 
301
        """
 
302
        pass
 
303
 
 
304
    def _addKnownFailure(self, test, err):
 
305
        self.known_failure_count += 1
 
306
        self.report_known_failure(test, err)
 
307
 
 
308
    def addNotSupported(self, test, feature):
 
309
        """The test will not be run because of a missing feature.
 
310
        """
 
311
        # this can be called in two different ways: it may be that the
 
312
        # test started running, and then raised (through addError)
 
313
        # UnavailableFeature.  Alternatively this method can be called
 
314
        # while probing for features before running the tests; in that
 
315
        # case we will see startTest and stopTest, but the test will never
 
316
        # actually run.
 
317
        self.unsupported.setdefault(str(feature), 0)
 
318
        self.unsupported[str(feature)] += 1
 
319
        self.report_unsupported(test, feature)
 
320
 
 
321
    def addSkip(self, test, reason):
 
322
        """A test has not run for 'reason'."""
 
323
        self.skip_count += 1
 
324
        self.report_skip(test, reason)
 
325
 
 
326
    def _addNotApplicable(self, test, skip_excinfo):
 
327
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
328
            self.not_applicable_count += 1
 
329
            self.report_not_applicable(test, skip_excinfo)
 
330
        try:
 
331
            test.tearDown()
 
332
        except KeyboardInterrupt:
 
333
            raise
 
334
        except:
 
335
            self.addError(test, test.exc_info())
 
336
        else:
 
337
            # seems best to treat this as success from point-of-view of unittest
 
338
            # -- it actually does nothing so it barely matters :)
 
339
            unittest.TestResult.addSuccess(self, test)
 
340
            test._log_contents = ''
 
341
 
 
342
    def printErrorList(self, flavour, errors):
 
343
        for test, err in errors:
 
344
            self.stream.writeln(self.separator1)
 
345
            self.stream.write("%s: " % flavour)
 
346
            self.stream.writeln(self.getDescription(test))
 
347
            if getattr(test, '_get_log', None) is not None:
 
348
                log_contents = test._get_log()
 
349
                if log_contents:
 
350
                    self.stream.write('\n')
 
351
                    self.stream.write(
 
352
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
353
                    self.stream.write('\n')
 
354
                    self.stream.write(log_contents)
 
355
                    self.stream.write('\n')
 
356
                    self.stream.write(
 
357
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
358
                    self.stream.write('\n')
 
359
            self.stream.writeln(self.separator2)
 
360
            self.stream.writeln("%s" % err)
 
361
 
 
362
    def finished(self):
 
363
        pass
 
364
 
 
365
    def report_cleaning_up(self):
 
366
        pass
 
367
 
 
368
    def report_success(self, test):
 
369
        pass
 
370
 
 
371
    def wasStrictlySuccessful(self):
 
372
        if self.unsupported or self.known_failure_count:
 
373
            return False
 
374
        return self.wasSuccessful()
 
375
 
 
376
 
 
377
class TextTestResult(ExtendedTestResult):
 
378
    """Displays progress and results of tests in text form"""
 
379
 
 
380
    def __init__(self, stream, descriptions, verbosity,
 
381
                 bench_history=None,
 
382
                 num_tests=None,
 
383
                 pb=None,
 
384
                 strict=None,
 
385
                 ):
 
386
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
387
            bench_history, num_tests, strict)
 
388
        # We no longer pass them around, but just rely on the UIFactory stack
 
389
        # for state
 
390
        if pb is not None:
 
391
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
392
        self.pb = self.ui.nested_progress_bar()
 
393
        self.pb.show_pct = False
 
394
        self.pb.show_spinner = False
 
395
        self.pb.show_eta = False,
 
396
        self.pb.show_count = False
 
397
        self.pb.show_bar = False
 
398
        self.pb.update_latency = 0
 
399
        self.pb.show_transport_activity = False
 
400
 
 
401
    def report_starting(self):
 
402
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
403
 
 
404
    def _progress_prefix_text(self):
 
405
        # the longer this text, the less space we have to show the test
 
406
        # name...
 
407
        a = '[%d' % self.count              # total that have been run
 
408
        # tests skipped as known not to be relevant are not important enough
 
409
        # to show here
 
410
        ## if self.skip_count:
 
411
        ##     a += ', %d skip' % self.skip_count
 
412
        ## if self.known_failure_count:
 
413
        ##     a += '+%dX' % self.known_failure_count
 
414
        if self.num_tests is not None:
 
415
            a +='/%d' % self.num_tests
 
416
        a += ' in '
 
417
        runtime = time.time() - self._overall_start_time
 
418
        if runtime >= 60:
 
419
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
420
        else:
 
421
            a += '%ds' % runtime
 
422
        if self.error_count:
 
423
            a += ', %d err' % self.error_count
 
424
        if self.failure_count:
 
425
            a += ', %d fail' % self.failure_count
 
426
        if self.unsupported:
 
427
            a += ', %d missing' % len(self.unsupported)
 
428
        a += ']'
 
429
        return a
 
430
 
 
431
    def report_test_start(self, test):
 
432
        self.count += 1
 
433
        self.pb.update(
 
434
                self._progress_prefix_text()
 
435
                + ' '
 
436
                + self._shortened_test_description(test))
 
437
 
 
438
    def _test_description(self, test):
 
439
        return self._shortened_test_description(test)
 
440
 
 
441
    def report_error(self, test, err):
 
442
        self.pb.note('ERROR: %s\n    %s\n',
 
443
            self._test_description(test),
 
444
            err[1],
 
445
            )
 
446
 
 
447
    def report_failure(self, test, err):
 
448
        self.pb.note('FAIL: %s\n    %s\n',
 
449
            self._test_description(test),
 
450
            err[1],
 
451
            )
 
452
 
 
453
    def report_known_failure(self, test, err):
 
454
        self.pb.note('XFAIL: %s\n%s\n',
 
455
            self._test_description(test), err[1])
 
456
 
 
457
    def report_skip(self, test, reason):
 
458
        pass
 
459
 
 
460
    def report_not_applicable(self, test, skip_excinfo):
 
461
        pass
 
462
 
 
463
    def report_unsupported(self, test, feature):
 
464
        """test cannot be run because feature is missing."""
 
465
 
 
466
    def report_cleaning_up(self):
 
467
        self.pb.update('Cleaning up')
 
468
 
 
469
    def printErrors(self):
 
470
        # clear the pb to make room for the error listing
 
471
        self.pb.clear()
 
472
        super(TextTestResult, self).printErrors()
 
473
 
 
474
    def finished(self):
 
475
        self.pb.finished()
 
476
 
 
477
 
 
478
class VerboseTestResult(ExtendedTestResult):
 
479
    """Produce long output, with one line per test run plus times"""
 
480
 
 
481
    def _ellipsize_to_right(self, a_string, final_width):
 
482
        """Truncate and pad a string, keeping the right hand side"""
 
483
        if len(a_string) > final_width:
 
484
            result = '...' + a_string[3-final_width:]
 
485
        else:
 
486
            result = a_string
 
487
        return result.ljust(final_width)
 
488
 
 
489
    def report_starting(self):
 
490
        self.stream.write('running %d tests...\n' % self.num_tests)
 
491
 
 
492
    def report_test_start(self, test):
 
493
        self.count += 1
 
494
        name = self._shortened_test_description(test)
 
495
        # width needs space for 6 char status, plus 1 for slash, plus an
 
496
        # 11-char time string, plus a trailing blank
 
497
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
498
        self.stream.write(self._ellipsize_to_right(name,
 
499
                          osutils.terminal_width()-18))
 
500
        self.stream.flush()
 
501
 
 
502
    def _error_summary(self, err):
 
503
        indent = ' ' * 4
 
504
        return '%s%s' % (indent, err[1])
 
505
 
 
506
    def report_error(self, test, err):
 
507
        self.stream.writeln('ERROR %s\n%s'
 
508
                % (self._testTimeString(test),
 
509
                   self._error_summary(err)))
 
510
 
 
511
    def report_failure(self, test, err):
 
512
        self.stream.writeln(' FAIL %s\n%s'
 
513
                % (self._testTimeString(test),
 
514
                   self._error_summary(err)))
 
515
 
 
516
    def report_known_failure(self, test, err):
 
517
        self.stream.writeln('XFAIL %s\n%s'
 
518
                % (self._testTimeString(test),
 
519
                   self._error_summary(err)))
 
520
 
 
521
    def report_success(self, test):
 
522
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
523
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
524
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
525
            stats.pprint(file=self.stream)
 
526
        # flush the stream so that we get smooth output. This verbose mode is
 
527
        # used to show the output in PQM.
 
528
        self.stream.flush()
 
529
 
 
530
    def report_skip(self, test, reason):
 
531
        self.stream.writeln(' SKIP %s\n%s'
 
532
                % (self._testTimeString(test), reason))
 
533
 
 
534
    def report_not_applicable(self, test, skip_excinfo):
 
535
        self.stream.writeln('  N/A %s\n%s'
 
536
                % (self._testTimeString(test),
 
537
                   self._error_summary(skip_excinfo)))
 
538
 
 
539
    def report_unsupported(self, test, feature):
 
540
        """test cannot be run because feature is missing."""
 
541
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
542
                %(self._testTimeString(test), feature))
 
543
 
 
544
 
 
545
class TextTestRunner(object):
 
546
    stop_on_failure = False
 
547
 
 
548
    def __init__(self,
 
549
                 stream=sys.stderr,
 
550
                 descriptions=0,
 
551
                 verbosity=1,
 
552
                 bench_history=None,
 
553
                 list_only=False,
 
554
                 strict=False,
 
555
                 ):
 
556
        self.stream = unittest._WritelnDecorator(stream)
 
557
        self.descriptions = descriptions
 
558
        self.verbosity = verbosity
 
559
        self._bench_history = bench_history
 
560
        self.list_only = list_only
 
561
        self._strict = strict
 
562
 
 
563
    def run(self, test):
 
564
        "Run the given test case or test suite."
 
565
        startTime = time.time()
 
566
        if self.verbosity == 1:
 
567
            result_class = TextTestResult
 
568
        elif self.verbosity >= 2:
 
569
            result_class = VerboseTestResult
 
570
        result = result_class(self.stream,
 
571
                              self.descriptions,
 
572
                              self.verbosity,
 
573
                              bench_history=self._bench_history,
 
574
                              num_tests=test.countTestCases(),
 
575
                              strict=self._strict,
 
576
                              )
 
577
        result.stop_early = self.stop_on_failure
 
578
        result.report_starting()
 
579
        if self.list_only:
 
580
            if self.verbosity >= 2:
 
581
                self.stream.writeln("Listing tests only ...\n")
 
582
            run = 0
 
583
            for t in iter_suite_tests(test):
 
584
                self.stream.writeln("%s" % (t.id()))
 
585
                run += 1
 
586
            return None
 
587
        else:
 
588
            try:
 
589
                import testtools
 
590
            except ImportError:
 
591
                test.run(result)
 
592
            else:
 
593
                if isinstance(test, testtools.ConcurrentTestSuite):
 
594
                    # We need to catch bzr specific behaviors
 
595
                    test.run(BZRTransformingResult(result))
 
596
                else:
 
597
                    test.run(result)
 
598
            run = result.testsRun
 
599
            actionTaken = "Ran"
 
600
        stopTime = time.time()
 
601
        timeTaken = stopTime - startTime
 
602
        result.printErrors()
 
603
        self.stream.writeln(result.separator2)
 
604
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
605
                            run, run != 1 and "s" or "", timeTaken))
 
606
        self.stream.writeln()
 
607
        if not result.wasSuccessful():
 
608
            self.stream.write("FAILED (")
 
609
            failed, errored = map(len, (result.failures, result.errors))
 
610
            if failed:
 
611
                self.stream.write("failures=%d" % failed)
 
612
            if errored:
 
613
                if failed: self.stream.write(", ")
 
614
                self.stream.write("errors=%d" % errored)
 
615
            if result.known_failure_count:
 
616
                if failed or errored: self.stream.write(", ")
 
617
                self.stream.write("known_failure_count=%d" %
 
618
                    result.known_failure_count)
 
619
            self.stream.writeln(")")
 
620
        else:
 
621
            if result.known_failure_count:
 
622
                self.stream.writeln("OK (known_failures=%d)" %
 
623
                    result.known_failure_count)
 
624
            else:
 
625
                self.stream.writeln("OK")
 
626
        if result.skip_count > 0:
 
627
            skipped = result.skip_count
 
628
            self.stream.writeln('%d test%s skipped' %
 
629
                                (skipped, skipped != 1 and "s" or ""))
 
630
        if result.unsupported:
 
631
            for feature, count in sorted(result.unsupported.items()):
 
632
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
633
                    (feature, count))
 
634
        result.finished()
 
635
        return result
 
636
 
 
637
 
 
638
def iter_suite_tests(suite):
 
639
    """Return all tests in a suite, recursing through nested suites"""
 
640
    if isinstance(suite, unittest.TestCase):
 
641
        yield suite
 
642
    elif isinstance(suite, unittest.TestSuite):
 
643
        for item in suite:
 
644
            for r in iter_suite_tests(item):
 
645
                yield r
 
646
    else:
 
647
        raise Exception('unknown type %r for object %r'
 
648
                        % (type(suite), suite))
 
649
 
 
650
 
 
651
class TestSkipped(Exception):
 
652
    """Indicates that a test was intentionally skipped, rather than failing."""
 
653
 
 
654
 
 
655
class TestNotApplicable(TestSkipped):
 
656
    """A test is not applicable to the situation where it was run.
 
657
 
 
658
    This is only normally raised by parameterized tests, if they find that
 
659
    the instance they're constructed upon does not support one aspect
 
660
    of its interface.
 
661
    """
 
662
 
 
663
 
 
664
class KnownFailure(AssertionError):
 
665
    """Indicates that a test failed in a precisely expected manner.
 
666
 
 
667
    Such failures dont block the whole test suite from passing because they are
 
668
    indicators of partially completed code or of future work. We have an
 
669
    explicit error for them so that we can ensure that they are always visible:
 
670
    KnownFailures are always shown in the output of bzr selftest.
 
671
    """
 
672
 
 
673
 
 
674
class UnavailableFeature(Exception):
 
675
    """A feature required for this test was not available.
 
676
 
 
677
    The feature should be used to construct the exception.
 
678
    """
 
679
 
 
680
 
 
681
class CommandFailed(Exception):
 
682
    pass
 
683
 
 
684
 
 
685
class StringIOWrapper(object):
 
686
    """A wrapper around cStringIO which just adds an encoding attribute.
 
687
 
 
688
    Internally we can check sys.stdout to see what the output encoding
 
689
    should be. However, cStringIO has no encoding attribute that we can
 
690
    set. So we wrap it instead.
 
691
    """
 
692
    encoding='ascii'
 
693
    _cstring = None
 
694
 
 
695
    def __init__(self, s=None):
 
696
        if s is not None:
 
697
            self.__dict__['_cstring'] = StringIO(s)
 
698
        else:
 
699
            self.__dict__['_cstring'] = StringIO()
 
700
 
 
701
    def __getattr__(self, name, getattr=getattr):
 
702
        return getattr(self.__dict__['_cstring'], name)
 
703
 
 
704
    def __setattr__(self, name, val):
 
705
        if name == 'encoding':
 
706
            self.__dict__['encoding'] = val
 
707
        else:
 
708
            return setattr(self._cstring, name, val)
 
709
 
 
710
 
 
711
class TestUIFactory(TextUIFactory):
 
712
    """A UI Factory for testing.
 
713
 
 
714
    Hide the progress bar but emit note()s.
 
715
    Redirect stdin.
 
716
    Allows get_password to be tested without real tty attached.
 
717
    """
 
718
 
 
719
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
720
        if stdin is not None:
 
721
            # We use a StringIOWrapper to be able to test various
 
722
            # encodings, but the user is still responsible to
 
723
            # encode the string and to set the encoding attribute
 
724
            # of StringIOWrapper.
 
725
            stdin = StringIOWrapper(stdin)
 
726
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
727
 
 
728
    def clear(self):
 
729
        """See progress.ProgressBar.clear()."""
 
730
 
 
731
    def clear_term(self):
 
732
        """See progress.ProgressBar.clear_term()."""
 
733
 
 
734
    def finished(self):
 
735
        """See progress.ProgressBar.finished()."""
 
736
 
 
737
    def note(self, fmt_string, *args):
 
738
        """See progress.ProgressBar.note()."""
 
739
        if args:
 
740
            fmt_string = fmt_string % args
 
741
        self.stdout.write(fmt_string + "\n")
 
742
 
 
743
    def progress_bar(self):
 
744
        return self
 
745
 
 
746
    def nested_progress_bar(self):
 
747
        return self
 
748
 
 
749
    def update(self, message, count=None, total=None):
 
750
        """See progress.ProgressBar.update()."""
 
751
 
 
752
    def get_non_echoed_password(self):
 
753
        """Get password from stdin without trying to handle the echo mode"""
 
754
        password = self.stdin.readline()
 
755
        if not password:
 
756
            raise EOFError
 
757
        if password[-1] == '\n':
 
758
            password = password[:-1]
 
759
        return password
 
760
 
 
761
 
 
762
class TestCase(unittest.TestCase):
 
763
    """Base class for bzr unit tests.
 
764
 
 
765
    Tests that need access to disk resources should subclass
 
766
    TestCaseInTempDir not TestCase.
 
767
 
 
768
    Error and debug log messages are redirected from their usual
 
769
    location into a temporary file, the contents of which can be
 
770
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
771
    so that it can also capture file IO.  When the test completes this file
 
772
    is read into memory and removed from disk.
 
773
 
 
774
    There are also convenience functions to invoke bzr's command-line
 
775
    routine, and to build and check bzr trees.
 
776
 
 
777
    In addition to the usual method of overriding tearDown(), this class also
 
778
    allows subclasses to register functions into the _cleanups list, which is
 
779
    run in order as the object is torn down.  It's less likely this will be
 
780
    accidentally overlooked.
 
781
    """
 
782
 
 
783
    _active_threads = None
 
784
    _leaking_threads_tests = 0
 
785
    _first_thread_leaker_id = None
 
786
    _log_file_name = None
 
787
    _log_contents = ''
 
788
    _keep_log_file = False
 
789
    # record lsprof data when performing benchmark calls.
 
790
    _gather_lsprof_in_benchmarks = False
 
791
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
792
                     '_log_contents', '_log_file_name', '_benchtime',
 
793
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
794
 
 
795
    def __init__(self, methodName='testMethod'):
 
796
        super(TestCase, self).__init__(methodName)
 
797
        self._cleanups = []
 
798
        self._bzr_test_setUp_run = False
 
799
        self._bzr_test_tearDown_run = False
 
800
 
 
801
    def setUp(self):
 
802
        unittest.TestCase.setUp(self)
 
803
        self._bzr_test_setUp_run = True
 
804
        self._cleanEnvironment()
 
805
        self._silenceUI()
 
806
        self._startLogFile()
 
807
        self._benchcalls = []
 
808
        self._benchtime = None
 
809
        self._clear_hooks()
 
810
        # Track locks - needs to be called before _clear_debug_flags.
 
811
        self._track_locks()
 
812
        self._clear_debug_flags()
 
813
        TestCase._active_threads = threading.activeCount()
 
814
        self.addCleanup(self._check_leaked_threads)
 
815
 
 
816
    def debug(self):
 
817
        # debug a frame up.
 
818
        import pdb
 
819
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
820
 
 
821
    def _check_leaked_threads(self):
 
822
        active = threading.activeCount()
 
823
        leaked_threads = active - TestCase._active_threads
 
824
        TestCase._active_threads = active
 
825
        if leaked_threads:
 
826
            TestCase._leaking_threads_tests += 1
 
827
            if TestCase._first_thread_leaker_id is None:
 
828
                TestCase._first_thread_leaker_id = self.id()
 
829
 
 
830
    def _clear_debug_flags(self):
 
831
        """Prevent externally set debug flags affecting tests.
 
832
 
 
833
        Tests that want to use debug flags can just set them in the
 
834
        debug_flags set during setup/teardown.
 
835
        """
 
836
        self._preserved_debug_flags = set(debug.debug_flags)
 
837
        if 'allow_debug' not in selftest_debug_flags:
 
838
            debug.debug_flags.clear()
 
839
        self.addCleanup(self._restore_debug_flags)
 
840
 
 
841
    def _clear_hooks(self):
 
842
        # prevent hooks affecting tests
 
843
        self._preserved_hooks = {}
 
844
        for key, factory in hooks.known_hooks.items():
 
845
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
846
            current_hooks = hooks.known_hooks_key_to_object(key)
 
847
            self._preserved_hooks[parent] = (name, current_hooks)
 
848
        self.addCleanup(self._restoreHooks)
 
849
        for key, factory in hooks.known_hooks.items():
 
850
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
851
            setattr(parent, name, factory())
 
852
        # this hook should always be installed
 
853
        request._install_hook()
 
854
 
 
855
    def _silenceUI(self):
 
856
        """Turn off UI for duration of test"""
 
857
        # by default the UI is off; tests can turn it on if they want it.
 
858
        saved = ui.ui_factory
 
859
        def _restore():
 
860
            ui.ui_factory = saved
 
861
        ui.ui_factory = ui.SilentUIFactory()
 
862
        self.addCleanup(_restore)
 
863
 
 
864
    def _check_locks(self):
 
865
        """Check that all lock take/release actions have been paired."""
 
866
        # once we have fixed all the current lock problems, we can change the
 
867
        # following code to always check for mismatched locks, but only do
 
868
        # traceback showing with -Dlock (self._lock_check_thorough is True).
 
869
        # For now, because the test suite will fail, we only assert that lock
 
870
        # matching has occured with -Dlock.
 
871
        # unhook:
 
872
        acquired_locks = [lock for action, lock in self._lock_actions
 
873
                          if action == 'acquired']
 
874
        released_locks = [lock for action, lock in self._lock_actions
 
875
                          if action == 'released']
 
876
        broken_locks = [lock for action, lock in self._lock_actions
 
877
                        if action == 'broken']
 
878
        # trivially, given the tests for lock acquistion and release, if we
 
879
        # have as many in each list, it should be ok. Some lock tests also
 
880
        # break some locks on purpose and should be taken into account by
 
881
        # considering that breaking a lock is just a dirty way of releasing it.
 
882
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
883
            message = ('Different number of acquired and '
 
884
                       'released or broken locks. (%s, %s + %s)' %
 
885
                       (acquired_locks, released_locks, broken_locks))
 
886
            if not self._lock_check_thorough:
 
887
                # Rather than fail, just warn
 
888
                print "Broken test %s: %s" % (self, message)
 
889
                return
 
890
            self.fail(message)
 
891
 
 
892
    def _track_locks(self):
 
893
        """Track lock activity during tests."""
 
894
        self._lock_actions = []
 
895
        self._lock_check_thorough = 'lock' not in debug.debug_flags
 
896
        self.addCleanup(self._check_locks)
 
897
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
898
                                                self._lock_acquired, None)
 
899
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
900
                                                self._lock_released, None)
 
901
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
902
                                                self._lock_broken, None)
 
903
 
 
904
    def _lock_acquired(self, result):
 
905
        self._lock_actions.append(('acquired', result))
 
906
 
 
907
    def _lock_released(self, result):
 
908
        self._lock_actions.append(('released', result))
 
909
 
 
910
    def _lock_broken(self, result):
 
911
        self._lock_actions.append(('broken', result))
 
912
 
 
913
    def _ndiff_strings(self, a, b):
 
914
        """Return ndiff between two strings containing lines.
 
915
 
 
916
        A trailing newline is added if missing to make the strings
 
917
        print properly."""
 
918
        if b and b[-1] != '\n':
 
919
            b += '\n'
 
920
        if a and a[-1] != '\n':
 
921
            a += '\n'
 
922
        difflines = difflib.ndiff(a.splitlines(True),
 
923
                                  b.splitlines(True),
 
924
                                  linejunk=lambda x: False,
 
925
                                  charjunk=lambda x: False)
 
926
        return ''.join(difflines)
 
927
 
 
928
    def assertEqual(self, a, b, message=''):
 
929
        try:
 
930
            if a == b:
 
931
                return
 
932
        except UnicodeError, e:
 
933
            # If we can't compare without getting a UnicodeError, then
 
934
            # obviously they are different
 
935
            mutter('UnicodeError: %s', e)
 
936
        if message:
 
937
            message += '\n'
 
938
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
939
            % (message,
 
940
               pformat(a), pformat(b)))
 
941
 
 
942
    assertEquals = assertEqual
 
943
 
 
944
    def assertEqualDiff(self, a, b, message=None):
 
945
        """Assert two texts are equal, if not raise an exception.
 
946
 
 
947
        This is intended for use with multi-line strings where it can
 
948
        be hard to find the differences by eye.
 
949
        """
 
950
        # TODO: perhaps override assertEquals to call this for strings?
 
951
        if a == b:
 
952
            return
 
953
        if message is None:
 
954
            message = "texts not equal:\n"
 
955
        if a == b + '\n':
 
956
            message = 'first string is missing a final newline.\n'
 
957
        if a + '\n' == b:
 
958
            message = 'second string is missing a final newline.\n'
 
959
        raise AssertionError(message +
 
960
                             self._ndiff_strings(a, b))
 
961
 
 
962
    def assertEqualMode(self, mode, mode_test):
 
963
        self.assertEqual(mode, mode_test,
 
964
                         'mode mismatch %o != %o' % (mode, mode_test))
 
965
 
 
966
    def assertEqualStat(self, expected, actual):
 
967
        """assert that expected and actual are the same stat result.
 
968
 
 
969
        :param expected: A stat result.
 
970
        :param actual: A stat result.
 
971
        :raises AssertionError: If the expected and actual stat values differ
 
972
            other than by atime.
 
973
        """
 
974
        self.assertEqual(expected.st_size, actual.st_size)
 
975
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
976
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
977
        self.assertEqual(expected.st_dev, actual.st_dev)
 
978
        self.assertEqual(expected.st_ino, actual.st_ino)
 
979
        self.assertEqual(expected.st_mode, actual.st_mode)
 
980
 
 
981
    def assertLength(self, length, obj_with_len):
 
982
        """Assert that obj_with_len is of length length."""
 
983
        if len(obj_with_len) != length:
 
984
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
985
                length, len(obj_with_len), obj_with_len))
 
986
 
 
987
    def assertPositive(self, val):
 
988
        """Assert that val is greater than 0."""
 
989
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
990
 
 
991
    def assertNegative(self, val):
 
992
        """Assert that val is less than 0."""
 
993
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
994
 
 
995
    def assertStartsWith(self, s, prefix):
 
996
        if not s.startswith(prefix):
 
997
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
998
 
 
999
    def assertEndsWith(self, s, suffix):
 
1000
        """Asserts that s ends with suffix."""
 
1001
        if not s.endswith(suffix):
 
1002
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1003
 
 
1004
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1005
        """Assert that a contains something matching a regular expression."""
 
1006
        if not re.search(needle_re, haystack, flags):
 
1007
            if '\n' in haystack or len(haystack) > 60:
 
1008
                # a long string, format it in a more readable way
 
1009
                raise AssertionError(
 
1010
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1011
                        % (needle_re, haystack))
 
1012
            else:
 
1013
                raise AssertionError('pattern "%s" not found in "%s"'
 
1014
                        % (needle_re, haystack))
 
1015
 
 
1016
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1017
        """Assert that a does not match a regular expression"""
 
1018
        if re.search(needle_re, haystack, flags):
 
1019
            raise AssertionError('pattern "%s" found in "%s"'
 
1020
                    % (needle_re, haystack))
 
1021
 
 
1022
    def assertSubset(self, sublist, superlist):
 
1023
        """Assert that every entry in sublist is present in superlist."""
 
1024
        missing = set(sublist) - set(superlist)
 
1025
        if len(missing) > 0:
 
1026
            raise AssertionError("value(s) %r not present in container %r" %
 
1027
                                 (missing, superlist))
 
1028
 
 
1029
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1030
        """Fail unless excClass is raised when the iterator from func is used.
 
1031
 
 
1032
        Many functions can return generators this makes sure
 
1033
        to wrap them in a list() call to make sure the whole generator
 
1034
        is run, and that the proper exception is raised.
 
1035
        """
 
1036
        try:
 
1037
            list(func(*args, **kwargs))
 
1038
        except excClass, e:
 
1039
            return e
 
1040
        else:
 
1041
            if getattr(excClass,'__name__', None) is not None:
 
1042
                excName = excClass.__name__
 
1043
            else:
 
1044
                excName = str(excClass)
 
1045
            raise self.failureException, "%s not raised" % excName
 
1046
 
 
1047
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1048
        """Assert that a callable raises a particular exception.
 
1049
 
 
1050
        :param excClass: As for the except statement, this may be either an
 
1051
            exception class, or a tuple of classes.
 
1052
        :param callableObj: A callable, will be passed ``*args`` and
 
1053
            ``**kwargs``.
 
1054
 
 
1055
        Returns the exception so that you can examine it.
 
1056
        """
 
1057
        try:
 
1058
            callableObj(*args, **kwargs)
 
1059
        except excClass, e:
 
1060
            return e
 
1061
        else:
 
1062
            if getattr(excClass,'__name__', None) is not None:
 
1063
                excName = excClass.__name__
 
1064
            else:
 
1065
                # probably a tuple
 
1066
                excName = str(excClass)
 
1067
            raise self.failureException, "%s not raised" % excName
 
1068
 
 
1069
    def assertIs(self, left, right, message=None):
 
1070
        if not (left is right):
 
1071
            if message is not None:
 
1072
                raise AssertionError(message)
 
1073
            else:
 
1074
                raise AssertionError("%r is not %r." % (left, right))
 
1075
 
 
1076
    def assertIsNot(self, left, right, message=None):
 
1077
        if (left is right):
 
1078
            if message is not None:
 
1079
                raise AssertionError(message)
 
1080
            else:
 
1081
                raise AssertionError("%r is %r." % (left, right))
 
1082
 
 
1083
    def assertTransportMode(self, transport, path, mode):
 
1084
        """Fail if a path does not have mode "mode".
 
1085
 
 
1086
        If modes are not supported on this transport, the assertion is ignored.
 
1087
        """
 
1088
        if not transport._can_roundtrip_unix_modebits():
 
1089
            return
 
1090
        path_stat = transport.stat(path)
 
1091
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1092
        self.assertEqual(mode, actual_mode,
 
1093
                         'mode of %r incorrect (%s != %s)'
 
1094
                         % (path, oct(mode), oct(actual_mode)))
 
1095
 
 
1096
    def assertIsSameRealPath(self, path1, path2):
 
1097
        """Fail if path1 and path2 points to different files"""
 
1098
        self.assertEqual(osutils.realpath(path1),
 
1099
                         osutils.realpath(path2),
 
1100
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1101
 
 
1102
    def assertIsInstance(self, obj, kls, msg=None):
 
1103
        """Fail if obj is not an instance of kls
 
1104
        
 
1105
        :param msg: Supplementary message to show if the assertion fails.
 
1106
        """
 
1107
        if not isinstance(obj, kls):
 
1108
            m = "%r is an instance of %s rather than %s" % (
 
1109
                obj, obj.__class__, kls)
 
1110
            if msg:
 
1111
                m += ": " + msg
 
1112
            self.fail(m)
 
1113
 
 
1114
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1115
        """Invoke a test, expecting it to fail for the given reason.
 
1116
 
 
1117
        This is for assertions that ought to succeed, but currently fail.
 
1118
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1119
        about the failure you're expecting.  If a new bug is introduced,
 
1120
        AssertionError should be raised, not KnownFailure.
 
1121
 
 
1122
        Frequently, expectFailure should be followed by an opposite assertion.
 
1123
        See example below.
 
1124
 
 
1125
        Intended to be used with a callable that raises AssertionError as the
 
1126
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1127
 
 
1128
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1129
        test succeeds.
 
1130
 
 
1131
        example usage::
 
1132
 
 
1133
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1134
                             dynamic_val)
 
1135
          self.assertEqual(42, dynamic_val)
 
1136
 
 
1137
          This means that a dynamic_val of 54 will cause the test to raise
 
1138
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1139
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1140
          than 54 or 42 will cause an AssertionError.
 
1141
        """
 
1142
        try:
 
1143
            assertion(*args, **kwargs)
 
1144
        except AssertionError:
 
1145
            raise KnownFailure(reason)
 
1146
        else:
 
1147
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1148
 
 
1149
    def assertFileEqual(self, content, path):
 
1150
        """Fail if path does not contain 'content'."""
 
1151
        self.failUnlessExists(path)
 
1152
        f = file(path, 'rb')
 
1153
        try:
 
1154
            s = f.read()
 
1155
        finally:
 
1156
            f.close()
 
1157
        self.assertEqualDiff(content, s)
 
1158
 
 
1159
    def failUnlessExists(self, path):
 
1160
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1161
        if not isinstance(path, basestring):
 
1162
            for p in path:
 
1163
                self.failUnlessExists(p)
 
1164
        else:
 
1165
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1166
 
 
1167
    def failIfExists(self, path):
 
1168
        """Fail if path or paths, which may be abs or relative, exist."""
 
1169
        if not isinstance(path, basestring):
 
1170
            for p in path:
 
1171
                self.failIfExists(p)
 
1172
        else:
 
1173
            self.failIf(osutils.lexists(path),path+" exists")
 
1174
 
 
1175
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1176
        """A helper for callDeprecated and applyDeprecated.
 
1177
 
 
1178
        :param a_callable: A callable to call.
 
1179
        :param args: The positional arguments for the callable
 
1180
        :param kwargs: The keyword arguments for the callable
 
1181
        :return: A tuple (warnings, result). result is the result of calling
 
1182
            a_callable(``*args``, ``**kwargs``).
 
1183
        """
 
1184
        local_warnings = []
 
1185
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1186
            # we've hooked into a deprecation specific callpath,
 
1187
            # only deprecations should getting sent via it.
 
1188
            self.assertEqual(cls, DeprecationWarning)
 
1189
            local_warnings.append(msg)
 
1190
        original_warning_method = symbol_versioning.warn
 
1191
        symbol_versioning.set_warning_method(capture_warnings)
 
1192
        try:
 
1193
            result = a_callable(*args, **kwargs)
 
1194
        finally:
 
1195
            symbol_versioning.set_warning_method(original_warning_method)
 
1196
        return (local_warnings, result)
 
1197
 
 
1198
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1199
        """Call a deprecated callable without warning the user.
 
1200
 
 
1201
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1202
        not other callers that go direct to the warning module.
 
1203
 
 
1204
        To test that a deprecated method raises an error, do something like
 
1205
        this::
 
1206
 
 
1207
            self.assertRaises(errors.ReservedId,
 
1208
                self.applyDeprecated,
 
1209
                deprecated_in((1, 5, 0)),
 
1210
                br.append_revision,
 
1211
                'current:')
 
1212
 
 
1213
        :param deprecation_format: The deprecation format that the callable
 
1214
            should have been deprecated with. This is the same type as the
 
1215
            parameter to deprecated_method/deprecated_function. If the
 
1216
            callable is not deprecated with this format, an assertion error
 
1217
            will be raised.
 
1218
        :param a_callable: A callable to call. This may be a bound method or
 
1219
            a regular function. It will be called with ``*args`` and
 
1220
            ``**kwargs``.
 
1221
        :param args: The positional arguments for the callable
 
1222
        :param kwargs: The keyword arguments for the callable
 
1223
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1224
        """
 
1225
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1226
            *args, **kwargs)
 
1227
        expected_first_warning = symbol_versioning.deprecation_string(
 
1228
            a_callable, deprecation_format)
 
1229
        if len(call_warnings) == 0:
 
1230
            self.fail("No deprecation warning generated by call to %s" %
 
1231
                a_callable)
 
1232
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1233
        return result
 
1234
 
 
1235
    def callCatchWarnings(self, fn, *args, **kw):
 
1236
        """Call a callable that raises python warnings.
 
1237
 
 
1238
        The caller's responsible for examining the returned warnings.
 
1239
 
 
1240
        If the callable raises an exception, the exception is not
 
1241
        caught and propagates up to the caller.  In that case, the list
 
1242
        of warnings is not available.
 
1243
 
 
1244
        :returns: ([warning_object, ...], fn_result)
 
1245
        """
 
1246
        # XXX: This is not perfect, because it completely overrides the
 
1247
        # warnings filters, and some code may depend on suppressing particular
 
1248
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1249
        # though.  -- Andrew, 20071062
 
1250
        wlist = []
 
1251
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1252
            # despite the name, 'message' is normally(?) a Warning subclass
 
1253
            # instance
 
1254
            wlist.append(message)
 
1255
        saved_showwarning = warnings.showwarning
 
1256
        saved_filters = warnings.filters
 
1257
        try:
 
1258
            warnings.showwarning = _catcher
 
1259
            warnings.filters = []
 
1260
            result = fn(*args, **kw)
 
1261
        finally:
 
1262
            warnings.showwarning = saved_showwarning
 
1263
            warnings.filters = saved_filters
 
1264
        return wlist, result
 
1265
 
 
1266
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1267
        """Assert that a callable is deprecated in a particular way.
 
1268
 
 
1269
        This is a very precise test for unusual requirements. The
 
1270
        applyDeprecated helper function is probably more suited for most tests
 
1271
        as it allows you to simply specify the deprecation format being used
 
1272
        and will ensure that that is issued for the function being called.
 
1273
 
 
1274
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1275
        not other callers that go direct to the warning module.  To catch
 
1276
        general warnings, use callCatchWarnings.
 
1277
 
 
1278
        :param expected: a list of the deprecation warnings expected, in order
 
1279
        :param callable: The callable to call
 
1280
        :param args: The positional arguments for the callable
 
1281
        :param kwargs: The keyword arguments for the callable
 
1282
        """
 
1283
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1284
            *args, **kwargs)
 
1285
        self.assertEqual(expected, call_warnings)
 
1286
        return result
 
1287
 
 
1288
    def _startLogFile(self):
 
1289
        """Send bzr and test log messages to a temporary file.
 
1290
 
 
1291
        The file is removed as the test is torn down.
 
1292
        """
 
1293
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1294
        self._log_file = os.fdopen(fileno, 'w+')
 
1295
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1296
        self._log_file_name = name
 
1297
        self.addCleanup(self._finishLogFile)
 
1298
 
 
1299
    def _finishLogFile(self):
 
1300
        """Finished with the log file.
 
1301
 
 
1302
        Close the file and delete it, unless setKeepLogfile was called.
 
1303
        """
 
1304
        if self._log_file is None:
 
1305
            return
 
1306
        bzrlib.trace.pop_log_file(self._log_memento)
 
1307
        self._log_file.close()
 
1308
        self._log_file = None
 
1309
        if not self._keep_log_file:
 
1310
            os.remove(self._log_file_name)
 
1311
            self._log_file_name = None
 
1312
 
 
1313
    def setKeepLogfile(self):
 
1314
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1315
        self._keep_log_file = True
 
1316
 
 
1317
    def addCleanup(self, callable, *args, **kwargs):
 
1318
        """Arrange to run a callable when this case is torn down.
 
1319
 
 
1320
        Callables are run in the reverse of the order they are registered,
 
1321
        ie last-in first-out.
 
1322
        """
 
1323
        self._cleanups.append((callable, args, kwargs))
 
1324
 
 
1325
    def _cleanEnvironment(self):
 
1326
        new_env = {
 
1327
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1328
            'HOME': os.getcwd(),
 
1329
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1330
            # tests do check our impls match APPDATA
 
1331
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1332
            'VISUAL': None,
 
1333
            'EDITOR': None,
 
1334
            'BZR_EMAIL': None,
 
1335
            'BZREMAIL': None, # may still be present in the environment
 
1336
            'EMAIL': None,
 
1337
            'BZR_PROGRESS_BAR': None,
 
1338
            'BZR_LOG': None,
 
1339
            'BZR_PLUGIN_PATH': None,
 
1340
            # Make sure that any text ui tests are consistent regardless of
 
1341
            # the environment the test case is run in; you may want tests that
 
1342
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1343
            # going to a pipe or a StringIO.
 
1344
            'TERM': 'dumb',
 
1345
            'LINES': '25',
 
1346
            'COLUMNS': '80',
 
1347
            # SSH Agent
 
1348
            'SSH_AUTH_SOCK': None,
 
1349
            # Proxies
 
1350
            'http_proxy': None,
 
1351
            'HTTP_PROXY': None,
 
1352
            'https_proxy': None,
 
1353
            'HTTPS_PROXY': None,
 
1354
            'no_proxy': None,
 
1355
            'NO_PROXY': None,
 
1356
            'all_proxy': None,
 
1357
            'ALL_PROXY': None,
 
1358
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1359
            # least. If you do (care), please update this comment
 
1360
            # -- vila 20080401
 
1361
            'ftp_proxy': None,
 
1362
            'FTP_PROXY': None,
 
1363
            'BZR_REMOTE_PATH': None,
 
1364
        }
 
1365
        self.__old_env = {}
 
1366
        self.addCleanup(self._restoreEnvironment)
 
1367
        for name, value in new_env.iteritems():
 
1368
            self._captureVar(name, value)
 
1369
 
 
1370
    def _captureVar(self, name, newvalue):
 
1371
        """Set an environment variable, and reset it when finished."""
 
1372
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1373
 
 
1374
    def _restore_debug_flags(self):
 
1375
        debug.debug_flags.clear()
 
1376
        debug.debug_flags.update(self._preserved_debug_flags)
 
1377
 
 
1378
    def _restoreEnvironment(self):
 
1379
        for name, value in self.__old_env.iteritems():
 
1380
            osutils.set_or_unset_env(name, value)
 
1381
 
 
1382
    def _restoreHooks(self):
 
1383
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1384
            setattr(klass, name, hooks)
 
1385
 
 
1386
    def knownFailure(self, reason):
 
1387
        """This test has failed for some known reason."""
 
1388
        raise KnownFailure(reason)
 
1389
 
 
1390
    def _do_skip(self, result, reason):
 
1391
        addSkip = getattr(result, 'addSkip', None)
 
1392
        if not callable(addSkip):
 
1393
            result.addError(self, sys.exc_info())
 
1394
        else:
 
1395
            addSkip(self, reason)
 
1396
 
 
1397
    def run(self, result=None):
 
1398
        if result is None: result = self.defaultTestResult()
 
1399
        for feature in getattr(self, '_test_needs_features', []):
 
1400
            if not feature.available():
 
1401
                result.startTest(self)
 
1402
                if getattr(result, 'addNotSupported', None):
 
1403
                    result.addNotSupported(self, feature)
 
1404
                else:
 
1405
                    result.addSuccess(self)
 
1406
                result.stopTest(self)
 
1407
                return result
 
1408
        try:
 
1409
            try:
 
1410
                result.startTest(self)
 
1411
                absent_attr = object()
 
1412
                # Python 2.5
 
1413
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1414
                if method_name is absent_attr:
 
1415
                    # Python 2.4
 
1416
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1417
                testMethod = getattr(self, method_name)
 
1418
                try:
 
1419
                    try:
 
1420
                        self.setUp()
 
1421
                        if not self._bzr_test_setUp_run:
 
1422
                            self.fail(
 
1423
                                "test setUp did not invoke "
 
1424
                                "bzrlib.tests.TestCase's setUp")
 
1425
                    except KeyboardInterrupt:
 
1426
                        self._runCleanups()
 
1427
                        raise
 
1428
                    except TestSkipped, e:
 
1429
                        self._do_skip(result, e.args[0])
 
1430
                        self.tearDown()
 
1431
                        return result
 
1432
                    except:
 
1433
                        result.addError(self, sys.exc_info())
 
1434
                        self._runCleanups()
 
1435
                        return result
 
1436
 
 
1437
                    ok = False
 
1438
                    try:
 
1439
                        testMethod()
 
1440
                        ok = True
 
1441
                    except self.failureException:
 
1442
                        result.addFailure(self, sys.exc_info())
 
1443
                    except TestSkipped, e:
 
1444
                        if not e.args:
 
1445
                            reason = "No reason given."
 
1446
                        else:
 
1447
                            reason = e.args[0]
 
1448
                        self._do_skip(result, reason)
 
1449
                    except KeyboardInterrupt:
 
1450
                        self._runCleanups()
 
1451
                        raise
 
1452
                    except:
 
1453
                        result.addError(self, sys.exc_info())
 
1454
 
 
1455
                    try:
 
1456
                        self.tearDown()
 
1457
                        if not self._bzr_test_tearDown_run:
 
1458
                            self.fail(
 
1459
                                "test tearDown did not invoke "
 
1460
                                "bzrlib.tests.TestCase's tearDown")
 
1461
                    except KeyboardInterrupt:
 
1462
                        self._runCleanups()
 
1463
                        raise
 
1464
                    except:
 
1465
                        result.addError(self, sys.exc_info())
 
1466
                        self._runCleanups()
 
1467
                        ok = False
 
1468
                    if ok: result.addSuccess(self)
 
1469
                finally:
 
1470
                    result.stopTest(self)
 
1471
                return result
 
1472
            except TestNotApplicable:
 
1473
                # Not moved from the result [yet].
 
1474
                self._runCleanups()
 
1475
                raise
 
1476
            except KeyboardInterrupt:
 
1477
                self._runCleanups()
 
1478
                raise
 
1479
        finally:
 
1480
            saved_attrs = {}
 
1481
            for attr_name in self.attrs_to_keep:
 
1482
                if attr_name in self.__dict__:
 
1483
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1484
            self.__dict__ = saved_attrs
 
1485
 
 
1486
    def tearDown(self):
 
1487
        self._runCleanups()
 
1488
        self._log_contents = ''
 
1489
        self._bzr_test_tearDown_run = True
 
1490
        unittest.TestCase.tearDown(self)
 
1491
 
 
1492
    def time(self, callable, *args, **kwargs):
 
1493
        """Run callable and accrue the time it takes to the benchmark time.
 
1494
 
 
1495
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1496
        this will cause lsprofile statistics to be gathered and stored in
 
1497
        self._benchcalls.
 
1498
        """
 
1499
        if self._benchtime is None:
 
1500
            self._benchtime = 0
 
1501
        start = time.time()
 
1502
        try:
 
1503
            if not self._gather_lsprof_in_benchmarks:
 
1504
                return callable(*args, **kwargs)
 
1505
            else:
 
1506
                # record this benchmark
 
1507
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1508
                stats.sort()
 
1509
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1510
                return ret
 
1511
        finally:
 
1512
            self._benchtime += time.time() - start
 
1513
 
 
1514
    def _runCleanups(self):
 
1515
        """Run registered cleanup functions.
 
1516
 
 
1517
        This should only be called from TestCase.tearDown.
 
1518
        """
 
1519
        # TODO: Perhaps this should keep running cleanups even if
 
1520
        # one of them fails?
 
1521
 
 
1522
        # Actually pop the cleanups from the list so tearDown running
 
1523
        # twice is safe (this happens for skipped tests).
 
1524
        while self._cleanups:
 
1525
            cleanup, args, kwargs = self._cleanups.pop()
 
1526
            cleanup(*args, **kwargs)
 
1527
 
 
1528
    def log(self, *args):
 
1529
        mutter(*args)
 
1530
 
 
1531
    def _get_log(self, keep_log_file=False):
 
1532
        """Get the log from bzrlib.trace calls from this test.
 
1533
 
 
1534
        :param keep_log_file: When True, if the log is still a file on disk
 
1535
            leave it as a file on disk. When False, if the log is still a file
 
1536
            on disk, the log file is deleted and the log preserved as
 
1537
            self._log_contents.
 
1538
        :return: A string containing the log.
 
1539
        """
 
1540
        # flush the log file, to get all content
 
1541
        import bzrlib.trace
 
1542
        if bzrlib.trace._trace_file:
 
1543
            bzrlib.trace._trace_file.flush()
 
1544
        if self._log_contents:
 
1545
            # XXX: this can hardly contain the content flushed above --vila
 
1546
            # 20080128
 
1547
            return self._log_contents
 
1548
        if self._log_file_name is not None:
 
1549
            logfile = open(self._log_file_name)
 
1550
            try:
 
1551
                log_contents = logfile.read()
 
1552
            finally:
 
1553
                logfile.close()
 
1554
            if not keep_log_file:
 
1555
                self._log_contents = log_contents
 
1556
                try:
 
1557
                    os.remove(self._log_file_name)
 
1558
                except OSError, e:
 
1559
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1560
                        sys.stderr.write(('Unable to delete log file '
 
1561
                                             ' %r\n' % self._log_file_name))
 
1562
                    else:
 
1563
                        raise
 
1564
            return log_contents
 
1565
        else:
 
1566
            return "DELETED log file to reduce memory footprint"
 
1567
 
 
1568
    def requireFeature(self, feature):
 
1569
        """This test requires a specific feature is available.
 
1570
 
 
1571
        :raises UnavailableFeature: When feature is not available.
 
1572
        """
 
1573
        if not feature.available():
 
1574
            raise UnavailableFeature(feature)
 
1575
 
 
1576
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1577
            working_dir):
 
1578
        """Run bazaar command line, splitting up a string command line."""
 
1579
        if isinstance(args, basestring):
 
1580
            # shlex don't understand unicode strings,
 
1581
            # so args should be plain string (bialix 20070906)
 
1582
            args = list(shlex.split(str(args)))
 
1583
        return self._run_bzr_core(args, retcode=retcode,
 
1584
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1585
                )
 
1586
 
 
1587
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1588
            working_dir):
 
1589
        if encoding is None:
 
1590
            encoding = osutils.get_user_encoding()
 
1591
        stdout = StringIOWrapper()
 
1592
        stderr = StringIOWrapper()
 
1593
        stdout.encoding = encoding
 
1594
        stderr.encoding = encoding
 
1595
 
 
1596
        self.log('run bzr: %r', args)
 
1597
        # FIXME: don't call into logging here
 
1598
        handler = logging.StreamHandler(stderr)
 
1599
        handler.setLevel(logging.INFO)
 
1600
        logger = logging.getLogger('')
 
1601
        logger.addHandler(handler)
 
1602
        old_ui_factory = ui.ui_factory
 
1603
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1604
 
 
1605
        cwd = None
 
1606
        if working_dir is not None:
 
1607
            cwd = osutils.getcwd()
 
1608
            os.chdir(working_dir)
 
1609
 
 
1610
        try:
 
1611
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1612
                stdout, stderr,
 
1613
                bzrlib.commands.run_bzr_catch_user_errors,
 
1614
                args)
 
1615
        finally:
 
1616
            logger.removeHandler(handler)
 
1617
            ui.ui_factory = old_ui_factory
 
1618
            if cwd is not None:
 
1619
                os.chdir(cwd)
 
1620
 
 
1621
        out = stdout.getvalue()
 
1622
        err = stderr.getvalue()
 
1623
        if out:
 
1624
            self.log('output:\n%r', out)
 
1625
        if err:
 
1626
            self.log('errors:\n%r', err)
 
1627
        if retcode is not None:
 
1628
            self.assertEquals(retcode, result,
 
1629
                              message='Unexpected return code')
 
1630
        return out, err
 
1631
 
 
1632
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1633
                working_dir=None, error_regexes=[], output_encoding=None):
 
1634
        """Invoke bzr, as if it were run from the command line.
 
1635
 
 
1636
        The argument list should not include the bzr program name - the
 
1637
        first argument is normally the bzr command.  Arguments may be
 
1638
        passed in three ways:
 
1639
 
 
1640
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1641
        when the command contains whitespace or metacharacters, or
 
1642
        is built up at run time.
 
1643
 
 
1644
        2- A single string, eg "add a".  This is the most convenient
 
1645
        for hardcoded commands.
 
1646
 
 
1647
        This runs bzr through the interface that catches and reports
 
1648
        errors, and with logging set to something approximating the
 
1649
        default, so that error reporting can be checked.
 
1650
 
 
1651
        This should be the main method for tests that want to exercise the
 
1652
        overall behavior of the bzr application (rather than a unit test
 
1653
        or a functional test of the library.)
 
1654
 
 
1655
        This sends the stdout/stderr results into the test's log,
 
1656
        where it may be useful for debugging.  See also run_captured.
 
1657
 
 
1658
        :keyword stdin: A string to be used as stdin for the command.
 
1659
        :keyword retcode: The status code the command should return;
 
1660
            default 0.
 
1661
        :keyword working_dir: The directory to run the command in
 
1662
        :keyword error_regexes: A list of expected error messages.  If
 
1663
            specified they must be seen in the error output of the command.
 
1664
        """
 
1665
        out, err = self._run_bzr_autosplit(
 
1666
            args=args,
 
1667
            retcode=retcode,
 
1668
            encoding=encoding,
 
1669
            stdin=stdin,
 
1670
            working_dir=working_dir,
 
1671
            )
 
1672
        self.assertIsInstance(error_regexes, (list, tuple))
 
1673
        for regex in error_regexes:
 
1674
            self.assertContainsRe(err, regex)
 
1675
        return out, err
 
1676
 
 
1677
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1678
        """Run bzr, and check that stderr contains the supplied regexes
 
1679
 
 
1680
        :param error_regexes: Sequence of regular expressions which
 
1681
            must each be found in the error output. The relative ordering
 
1682
            is not enforced.
 
1683
        :param args: command-line arguments for bzr
 
1684
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1685
            This function changes the default value of retcode to be 3,
 
1686
            since in most cases this is run when you expect bzr to fail.
 
1687
 
 
1688
        :return: (out, err) The actual output of running the command (in case
 
1689
            you want to do more inspection)
 
1690
 
 
1691
        Examples of use::
 
1692
 
 
1693
            # Make sure that commit is failing because there is nothing to do
 
1694
            self.run_bzr_error(['no changes to commit'],
 
1695
                               ['commit', '-m', 'my commit comment'])
 
1696
            # Make sure --strict is handling an unknown file, rather than
 
1697
            # giving us the 'nothing to do' error
 
1698
            self.build_tree(['unknown'])
 
1699
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1700
                               ['commit', --strict', '-m', 'my commit comment'])
 
1701
        """
 
1702
        kwargs.setdefault('retcode', 3)
 
1703
        kwargs['error_regexes'] = error_regexes
 
1704
        out, err = self.run_bzr(*args, **kwargs)
 
1705
        return out, err
 
1706
 
 
1707
    def run_bzr_subprocess(self, *args, **kwargs):
 
1708
        """Run bzr in a subprocess for testing.
 
1709
 
 
1710
        This starts a new Python interpreter and runs bzr in there.
 
1711
        This should only be used for tests that have a justifiable need for
 
1712
        this isolation: e.g. they are testing startup time, or signal
 
1713
        handling, or early startup code, etc.  Subprocess code can't be
 
1714
        profiled or debugged so easily.
 
1715
 
 
1716
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1717
            None is supplied, the status code is not checked.
 
1718
        :keyword env_changes: A dictionary which lists changes to environment
 
1719
            variables. A value of None will unset the env variable.
 
1720
            The values must be strings. The change will only occur in the
 
1721
            child, so you don't need to fix the environment after running.
 
1722
        :keyword universal_newlines: Convert CRLF => LF
 
1723
        :keyword allow_plugins: By default the subprocess is run with
 
1724
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1725
            for system-wide plugins to create unexpected output on stderr,
 
1726
            which can cause unnecessary test failures.
 
1727
        """
 
1728
        env_changes = kwargs.get('env_changes', {})
 
1729
        working_dir = kwargs.get('working_dir', None)
 
1730
        allow_plugins = kwargs.get('allow_plugins', False)
 
1731
        if len(args) == 1:
 
1732
            if isinstance(args[0], list):
 
1733
                args = args[0]
 
1734
            elif isinstance(args[0], basestring):
 
1735
                args = list(shlex.split(args[0]))
 
1736
        else:
 
1737
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1738
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1739
                                            working_dir=working_dir,
 
1740
                                            allow_plugins=allow_plugins)
 
1741
        # We distinguish between retcode=None and retcode not passed.
 
1742
        supplied_retcode = kwargs.get('retcode', 0)
 
1743
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1744
            universal_newlines=kwargs.get('universal_newlines', False),
 
1745
            process_args=args)
 
1746
 
 
1747
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1748
                             skip_if_plan_to_signal=False,
 
1749
                             working_dir=None,
 
1750
                             allow_plugins=False):
 
1751
        """Start bzr in a subprocess for testing.
 
1752
 
 
1753
        This starts a new Python interpreter and runs bzr in there.
 
1754
        This should only be used for tests that have a justifiable need for
 
1755
        this isolation: e.g. they are testing startup time, or signal
 
1756
        handling, or early startup code, etc.  Subprocess code can't be
 
1757
        profiled or debugged so easily.
 
1758
 
 
1759
        :param process_args: a list of arguments to pass to the bzr executable,
 
1760
            for example ``['--version']``.
 
1761
        :param env_changes: A dictionary which lists changes to environment
 
1762
            variables. A value of None will unset the env variable.
 
1763
            The values must be strings. The change will only occur in the
 
1764
            child, so you don't need to fix the environment after running.
 
1765
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1766
            is not available.
 
1767
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1768
 
 
1769
        :returns: Popen object for the started process.
 
1770
        """
 
1771
        if skip_if_plan_to_signal:
 
1772
            if not getattr(os, 'kill', None):
 
1773
                raise TestSkipped("os.kill not available.")
 
1774
 
 
1775
        if env_changes is None:
 
1776
            env_changes = {}
 
1777
        old_env = {}
 
1778
 
 
1779
        def cleanup_environment():
 
1780
            for env_var, value in env_changes.iteritems():
 
1781
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1782
 
 
1783
        def restore_environment():
 
1784
            for env_var, value in old_env.iteritems():
 
1785
                osutils.set_or_unset_env(env_var, value)
 
1786
 
 
1787
        bzr_path = self.get_bzr_path()
 
1788
 
 
1789
        cwd = None
 
1790
        if working_dir is not None:
 
1791
            cwd = osutils.getcwd()
 
1792
            os.chdir(working_dir)
 
1793
 
 
1794
        try:
 
1795
            # win32 subprocess doesn't support preexec_fn
 
1796
            # so we will avoid using it on all platforms, just to
 
1797
            # make sure the code path is used, and we don't break on win32
 
1798
            cleanup_environment()
 
1799
            command = [sys.executable]
 
1800
            # frozen executables don't need the path to bzr
 
1801
            if getattr(sys, "frozen", None) is None:
 
1802
                command.append(bzr_path)
 
1803
            if not allow_plugins:
 
1804
                command.append('--no-plugins')
 
1805
            command.extend(process_args)
 
1806
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1807
        finally:
 
1808
            restore_environment()
 
1809
            if cwd is not None:
 
1810
                os.chdir(cwd)
 
1811
 
 
1812
        return process
 
1813
 
 
1814
    def _popen(self, *args, **kwargs):
 
1815
        """Place a call to Popen.
 
1816
 
 
1817
        Allows tests to override this method to intercept the calls made to
 
1818
        Popen for introspection.
 
1819
        """
 
1820
        return Popen(*args, **kwargs)
 
1821
 
 
1822
    def get_bzr_path(self):
 
1823
        """Return the path of the 'bzr' executable for this test suite."""
 
1824
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1825
        if not os.path.isfile(bzr_path):
 
1826
            # We are probably installed. Assume sys.argv is the right file
 
1827
            bzr_path = sys.argv[0]
 
1828
        return bzr_path
 
1829
 
 
1830
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1831
                              universal_newlines=False, process_args=None):
 
1832
        """Finish the execution of process.
 
1833
 
 
1834
        :param process: the Popen object returned from start_bzr_subprocess.
 
1835
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1836
            None is supplied, the status code is not checked.
 
1837
        :param send_signal: an optional signal to send to the process.
 
1838
        :param universal_newlines: Convert CRLF => LF
 
1839
        :returns: (stdout, stderr)
 
1840
        """
 
1841
        if send_signal is not None:
 
1842
            os.kill(process.pid, send_signal)
 
1843
        out, err = process.communicate()
 
1844
 
 
1845
        if universal_newlines:
 
1846
            out = out.replace('\r\n', '\n')
 
1847
            err = err.replace('\r\n', '\n')
 
1848
 
 
1849
        if retcode is not None and retcode != process.returncode:
 
1850
            if process_args is None:
 
1851
                process_args = "(unknown args)"
 
1852
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1853
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1854
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1855
                      % (process_args, retcode, process.returncode))
 
1856
        return [out, err]
 
1857
 
 
1858
    def check_inventory_shape(self, inv, shape):
 
1859
        """Compare an inventory to a list of expected names.
 
1860
 
 
1861
        Fail if they are not precisely equal.
 
1862
        """
 
1863
        extras = []
 
1864
        shape = list(shape)             # copy
 
1865
        for path, ie in inv.entries():
 
1866
            name = path.replace('\\', '/')
 
1867
            if ie.kind == 'directory':
 
1868
                name = name + '/'
 
1869
            if name in shape:
 
1870
                shape.remove(name)
 
1871
            else:
 
1872
                extras.append(name)
 
1873
        if shape:
 
1874
            self.fail("expected paths not found in inventory: %r" % shape)
 
1875
        if extras:
 
1876
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1877
 
 
1878
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1879
                         a_callable=None, *args, **kwargs):
 
1880
        """Call callable with redirected std io pipes.
 
1881
 
 
1882
        Returns the return code."""
 
1883
        if not callable(a_callable):
 
1884
            raise ValueError("a_callable must be callable.")
 
1885
        if stdin is None:
 
1886
            stdin = StringIO("")
 
1887
        if stdout is None:
 
1888
            if getattr(self, "_log_file", None) is not None:
 
1889
                stdout = self._log_file
 
1890
            else:
 
1891
                stdout = StringIO()
 
1892
        if stderr is None:
 
1893
            if getattr(self, "_log_file", None is not None):
 
1894
                stderr = self._log_file
 
1895
            else:
 
1896
                stderr = StringIO()
 
1897
        real_stdin = sys.stdin
 
1898
        real_stdout = sys.stdout
 
1899
        real_stderr = sys.stderr
 
1900
        try:
 
1901
            sys.stdout = stdout
 
1902
            sys.stderr = stderr
 
1903
            sys.stdin = stdin
 
1904
            return a_callable(*args, **kwargs)
 
1905
        finally:
 
1906
            sys.stdout = real_stdout
 
1907
            sys.stderr = real_stderr
 
1908
            sys.stdin = real_stdin
 
1909
 
 
1910
    def reduceLockdirTimeout(self):
 
1911
        """Reduce the default lock timeout for the duration of the test, so that
 
1912
        if LockContention occurs during a test, it does so quickly.
 
1913
 
 
1914
        Tests that expect to provoke LockContention errors should call this.
 
1915
        """
 
1916
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1917
        def resetTimeout():
 
1918
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1919
        self.addCleanup(resetTimeout)
 
1920
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1921
 
 
1922
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1923
        """Return a StringIOWrapper instance, that will encode Unicode
 
1924
        input to UTF-8.
 
1925
        """
 
1926
        if encoding_type is None:
 
1927
            encoding_type = 'strict'
 
1928
        sio = StringIO()
 
1929
        output_encoding = 'utf-8'
 
1930
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1931
        sio.encoding = output_encoding
 
1932
        return sio
 
1933
 
 
1934
 
 
1935
class CapturedCall(object):
 
1936
    """A helper for capturing smart server calls for easy debug analysis."""
 
1937
 
 
1938
    def __init__(self, params, prefix_length):
 
1939
        """Capture the call with params and skip prefix_length stack frames."""
 
1940
        self.call = params
 
1941
        import traceback
 
1942
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
1943
        # client frames. Beyond this we could get more clever, but this is good
 
1944
        # enough for now.
 
1945
        stack = traceback.extract_stack()[prefix_length:-5]
 
1946
        self.stack = ''.join(traceback.format_list(stack))
 
1947
 
 
1948
    def __str__(self):
 
1949
        return self.call.method
 
1950
 
 
1951
    def __repr__(self):
 
1952
        return self.call.method
 
1953
 
 
1954
    def stack(self):
 
1955
        return self.stack
 
1956
 
 
1957
 
 
1958
class TestCaseWithMemoryTransport(TestCase):
 
1959
    """Common test class for tests that do not need disk resources.
 
1960
 
 
1961
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1962
 
 
1963
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1964
 
 
1965
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1966
    a directory which does not exist. This serves to help ensure test isolation
 
1967
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1968
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1969
    file defaults for the transport in tests, nor does it obey the command line
 
1970
    override, so tests that accidentally write to the common directory should
 
1971
    be rare.
 
1972
 
 
1973
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1974
    a .bzr directory that stops us ascending higher into the filesystem.
 
1975
    """
 
1976
 
 
1977
    TEST_ROOT = None
 
1978
    _TEST_NAME = 'test'
 
1979
 
 
1980
    def __init__(self, methodName='runTest'):
 
1981
        # allow test parameterization after test construction and before test
 
1982
        # execution. Variables that the parameterizer sets need to be
 
1983
        # ones that are not set by setUp, or setUp will trash them.
 
1984
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1985
        self.vfs_transport_factory = default_transport
 
1986
        self.transport_server = None
 
1987
        self.transport_readonly_server = None
 
1988
        self.__vfs_server = None
 
1989
 
 
1990
    def get_transport(self, relpath=None):
 
1991
        """Return a writeable transport.
 
1992
 
 
1993
        This transport is for the test scratch space relative to
 
1994
        "self._test_root"
 
1995
 
 
1996
        :param relpath: a path relative to the base url.
 
1997
        """
 
1998
        t = get_transport(self.get_url(relpath))
 
1999
        self.assertFalse(t.is_readonly())
 
2000
        return t
 
2001
 
 
2002
    def get_readonly_transport(self, relpath=None):
 
2003
        """Return a readonly transport for the test scratch space
 
2004
 
 
2005
        This can be used to test that operations which should only need
 
2006
        readonly access in fact do not try to write.
 
2007
 
 
2008
        :param relpath: a path relative to the base url.
 
2009
        """
 
2010
        t = get_transport(self.get_readonly_url(relpath))
 
2011
        self.assertTrue(t.is_readonly())
 
2012
        return t
 
2013
 
 
2014
    def create_transport_readonly_server(self):
 
2015
        """Create a transport server from class defined at init.
 
2016
 
 
2017
        This is mostly a hook for daughter classes.
 
2018
        """
 
2019
        return self.transport_readonly_server()
 
2020
 
 
2021
    def get_readonly_server(self):
 
2022
        """Get the server instance for the readonly transport
 
2023
 
 
2024
        This is useful for some tests with specific servers to do diagnostics.
 
2025
        """
 
2026
        if self.__readonly_server is None:
 
2027
            if self.transport_readonly_server is None:
 
2028
                # readonly decorator requested
 
2029
                # bring up the server
 
2030
                self.__readonly_server = ReadonlyServer()
 
2031
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2032
            else:
 
2033
                self.__readonly_server = self.create_transport_readonly_server()
 
2034
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2035
            self.addCleanup(self.__readonly_server.tearDown)
 
2036
        return self.__readonly_server
 
2037
 
 
2038
    def get_readonly_url(self, relpath=None):
 
2039
        """Get a URL for the readonly transport.
 
2040
 
 
2041
        This will either be backed by '.' or a decorator to the transport
 
2042
        used by self.get_url()
 
2043
        relpath provides for clients to get a path relative to the base url.
 
2044
        These should only be downwards relative, not upwards.
 
2045
        """
 
2046
        base = self.get_readonly_server().get_url()
 
2047
        return self._adjust_url(base, relpath)
 
2048
 
 
2049
    def get_vfs_only_server(self):
 
2050
        """Get the vfs only read/write server instance.
 
2051
 
 
2052
        This is useful for some tests with specific servers that need
 
2053
        diagnostics.
 
2054
 
 
2055
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2056
        is no means to override it.
 
2057
        """
 
2058
        if self.__vfs_server is None:
 
2059
            self.__vfs_server = MemoryServer()
 
2060
            self.__vfs_server.setUp()
 
2061
            self.addCleanup(self.__vfs_server.tearDown)
 
2062
        return self.__vfs_server
 
2063
 
 
2064
    def get_server(self):
 
2065
        """Get the read/write server instance.
 
2066
 
 
2067
        This is useful for some tests with specific servers that need
 
2068
        diagnostics.
 
2069
 
 
2070
        This is built from the self.transport_server factory. If that is None,
 
2071
        then the self.get_vfs_server is returned.
 
2072
        """
 
2073
        if self.__server is None:
 
2074
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
2075
                return self.get_vfs_only_server()
 
2076
            else:
 
2077
                # bring up a decorated means of access to the vfs only server.
 
2078
                self.__server = self.transport_server()
 
2079
                try:
 
2080
                    self.__server.setUp(self.get_vfs_only_server())
 
2081
                except TypeError, e:
 
2082
                    # This should never happen; the try:Except here is to assist
 
2083
                    # developers having to update code rather than seeing an
 
2084
                    # uninformative TypeError.
 
2085
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
2086
            self.addCleanup(self.__server.tearDown)
 
2087
        return self.__server
 
2088
 
 
2089
    def _adjust_url(self, base, relpath):
 
2090
        """Get a URL (or maybe a path) for the readwrite transport.
 
2091
 
 
2092
        This will either be backed by '.' or to an equivalent non-file based
 
2093
        facility.
 
2094
        relpath provides for clients to get a path relative to the base url.
 
2095
        These should only be downwards relative, not upwards.
 
2096
        """
 
2097
        if relpath is not None and relpath != '.':
 
2098
            if not base.endswith('/'):
 
2099
                base = base + '/'
 
2100
            # XXX: Really base should be a url; we did after all call
 
2101
            # get_url()!  But sometimes it's just a path (from
 
2102
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2103
            # to a non-escaped local path.
 
2104
            if base.startswith('./') or base.startswith('/'):
 
2105
                base += relpath
 
2106
            else:
 
2107
                base += urlutils.escape(relpath)
 
2108
        return base
 
2109
 
 
2110
    def get_url(self, relpath=None):
 
2111
        """Get a URL (or maybe a path) for the readwrite transport.
 
2112
 
 
2113
        This will either be backed by '.' or to an equivalent non-file based
 
2114
        facility.
 
2115
        relpath provides for clients to get a path relative to the base url.
 
2116
        These should only be downwards relative, not upwards.
 
2117
        """
 
2118
        base = self.get_server().get_url()
 
2119
        return self._adjust_url(base, relpath)
 
2120
 
 
2121
    def get_vfs_only_url(self, relpath=None):
 
2122
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2123
 
 
2124
        This will never be a smart protocol.  It always has all the
 
2125
        capabilities of the local filesystem, but it might actually be a
 
2126
        MemoryTransport or some other similar virtual filesystem.
 
2127
 
 
2128
        This is the backing transport (if any) of the server returned by
 
2129
        get_url and get_readonly_url.
 
2130
 
 
2131
        :param relpath: provides for clients to get a path relative to the base
 
2132
            url.  These should only be downwards relative, not upwards.
 
2133
        :return: A URL
 
2134
        """
 
2135
        base = self.get_vfs_only_server().get_url()
 
2136
        return self._adjust_url(base, relpath)
 
2137
 
 
2138
    def _create_safety_net(self):
 
2139
        """Make a fake bzr directory.
 
2140
 
 
2141
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2142
        real branch.
 
2143
        """
 
2144
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2145
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2146
 
 
2147
    def _check_safety_net(self):
 
2148
        """Check that the safety .bzr directory have not been touched.
 
2149
 
 
2150
        _make_test_root have created a .bzr directory to prevent tests from
 
2151
        propagating. This method ensures than a test did not leaked.
 
2152
        """
 
2153
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2154
        wt = workingtree.WorkingTree.open(root)
 
2155
        last_rev = wt.last_revision()
 
2156
        if last_rev != 'null:':
 
2157
            # The current test have modified the /bzr directory, we need to
 
2158
            # recreate a new one or all the followng tests will fail.
 
2159
            # If you need to inspect its content uncomment the following line
 
2160
            # import pdb; pdb.set_trace()
 
2161
            _rmtree_temp_dir(root + '/.bzr')
 
2162
            self._create_safety_net()
 
2163
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2164
 
 
2165
    def _make_test_root(self):
 
2166
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2167
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2168
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2169
 
 
2170
            self._create_safety_net()
 
2171
 
 
2172
            # The same directory is used by all tests, and we're not
 
2173
            # specifically told when all tests are finished.  This will do.
 
2174
            atexit.register(_rmtree_temp_dir, root)
 
2175
 
 
2176
        self.addCleanup(self._check_safety_net)
 
2177
 
 
2178
    def makeAndChdirToTestDir(self):
 
2179
        """Create a temporary directories for this one test.
 
2180
 
 
2181
        This must set self.test_home_dir and self.test_dir and chdir to
 
2182
        self.test_dir.
 
2183
 
 
2184
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2185
        """
 
2186
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2187
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2188
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2189
 
 
2190
    def make_branch(self, relpath, format=None):
 
2191
        """Create a branch on the transport at relpath."""
 
2192
        repo = self.make_repository(relpath, format=format)
 
2193
        return repo.bzrdir.create_branch()
 
2194
 
 
2195
    def make_bzrdir(self, relpath, format=None):
 
2196
        try:
 
2197
            # might be a relative or absolute path
 
2198
            maybe_a_url = self.get_url(relpath)
 
2199
            segments = maybe_a_url.rsplit('/', 1)
 
2200
            t = get_transport(maybe_a_url)
 
2201
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2202
                t.ensure_base()
 
2203
            if format is None:
 
2204
                format = 'default'
 
2205
            if isinstance(format, basestring):
 
2206
                format = bzrdir.format_registry.make_bzrdir(format)
 
2207
            return format.initialize_on_transport(t)
 
2208
        except errors.UninitializableFormat:
 
2209
            raise TestSkipped("Format %s is not initializable." % format)
 
2210
 
 
2211
    def make_repository(self, relpath, shared=False, format=None):
 
2212
        """Create a repository on our default transport at relpath.
 
2213
 
 
2214
        Note that relpath must be a relative path, not a full url.
 
2215
        """
 
2216
        # FIXME: If you create a remoterepository this returns the underlying
 
2217
        # real format, which is incorrect.  Actually we should make sure that
 
2218
        # RemoteBzrDir returns a RemoteRepository.
 
2219
        # maybe  mbp 20070410
 
2220
        made_control = self.make_bzrdir(relpath, format=format)
 
2221
        return made_control.create_repository(shared=shared)
 
2222
 
 
2223
    def make_smart_server(self, path):
 
2224
        smart_server = server.SmartTCPServer_for_testing()
 
2225
        smart_server.setUp(self.get_server())
 
2226
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2227
        self.addCleanup(smart_server.tearDown)
 
2228
        return remote_transport
 
2229
 
 
2230
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2231
        """Create a branch on the default transport and a MemoryTree for it."""
 
2232
        b = self.make_branch(relpath, format=format)
 
2233
        return memorytree.MemoryTree.create_on_branch(b)
 
2234
 
 
2235
    def make_branch_builder(self, relpath, format=None):
 
2236
        branch = self.make_branch(relpath, format=format)
 
2237
        return branchbuilder.BranchBuilder(branch=branch)
 
2238
 
 
2239
    def overrideEnvironmentForTesting(self):
 
2240
        os.environ['HOME'] = self.test_home_dir
 
2241
        os.environ['BZR_HOME'] = self.test_home_dir
 
2242
 
 
2243
    def setUp(self):
 
2244
        super(TestCaseWithMemoryTransport, self).setUp()
 
2245
        self._make_test_root()
 
2246
        _currentdir = os.getcwdu()
 
2247
        def _leaveDirectory():
 
2248
            os.chdir(_currentdir)
 
2249
        self.addCleanup(_leaveDirectory)
 
2250
        self.makeAndChdirToTestDir()
 
2251
        self.overrideEnvironmentForTesting()
 
2252
        self.__readonly_server = None
 
2253
        self.__server = None
 
2254
        self.reduceLockdirTimeout()
 
2255
 
 
2256
    def setup_smart_server_with_call_log(self):
 
2257
        """Sets up a smart server as the transport server with a call log."""
 
2258
        self.transport_server = server.SmartTCPServer_for_testing
 
2259
        self.hpss_calls = []
 
2260
        import traceback
 
2261
        # Skip the current stack down to the caller of
 
2262
        # setup_smart_server_with_call_log
 
2263
        prefix_length = len(traceback.extract_stack()) - 2
 
2264
        def capture_hpss_call(params):
 
2265
            self.hpss_calls.append(
 
2266
                CapturedCall(params, prefix_length))
 
2267
        client._SmartClient.hooks.install_named_hook(
 
2268
            'call', capture_hpss_call, None)
 
2269
 
 
2270
    def reset_smart_call_log(self):
 
2271
        self.hpss_calls = []
 
2272
 
 
2273
 
 
2274
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2275
    """Derived class that runs a test within a temporary directory.
 
2276
 
 
2277
    This is useful for tests that need to create a branch, etc.
 
2278
 
 
2279
    The directory is created in a slightly complex way: for each
 
2280
    Python invocation, a new temporary top-level directory is created.
 
2281
    All test cases create their own directory within that.  If the
 
2282
    tests complete successfully, the directory is removed.
 
2283
 
 
2284
    :ivar test_base_dir: The path of the top-level directory for this
 
2285
    test, which contains a home directory and a work directory.
 
2286
 
 
2287
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2288
    which is used as $HOME for this test.
 
2289
 
 
2290
    :ivar test_dir: A directory under test_base_dir used as the current
 
2291
    directory when the test proper is run.
 
2292
    """
 
2293
 
 
2294
    OVERRIDE_PYTHON = 'python'
 
2295
 
 
2296
    def check_file_contents(self, filename, expect):
 
2297
        self.log("check contents of file %s" % filename)
 
2298
        contents = file(filename, 'r').read()
 
2299
        if contents != expect:
 
2300
            self.log("expected: %r" % expect)
 
2301
            self.log("actually: %r" % contents)
 
2302
            self.fail("contents of %s not as expected" % filename)
 
2303
 
 
2304
    def _getTestDirPrefix(self):
 
2305
        # create a directory within the top level test directory
 
2306
        if sys.platform == 'win32':
 
2307
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2308
            # windows is likely to have path-length limits so use a short name
 
2309
            name_prefix = name_prefix[-30:]
 
2310
        else:
 
2311
            name_prefix = re.sub('[/]', '_', self.id())
 
2312
        return name_prefix
 
2313
 
 
2314
    def makeAndChdirToTestDir(self):
 
2315
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2316
 
 
2317
        For TestCaseInTempDir we create a temporary directory based on the test
 
2318
        name and then create two subdirs - test and home under it.
 
2319
        """
 
2320
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2321
            self._getTestDirPrefix())
 
2322
        name = name_prefix
 
2323
        for i in range(100):
 
2324
            if os.path.exists(name):
 
2325
                name = name_prefix + '_' + str(i)
 
2326
            else:
 
2327
                os.mkdir(name)
 
2328
                break
 
2329
        # now create test and home directories within this dir
 
2330
        self.test_base_dir = name
 
2331
        self.test_home_dir = self.test_base_dir + '/home'
 
2332
        os.mkdir(self.test_home_dir)
 
2333
        self.test_dir = self.test_base_dir + '/work'
 
2334
        os.mkdir(self.test_dir)
 
2335
        os.chdir(self.test_dir)
 
2336
        # put name of test inside
 
2337
        f = file(self.test_base_dir + '/name', 'w')
 
2338
        try:
 
2339
            f.write(self.id())
 
2340
        finally:
 
2341
            f.close()
 
2342
        self.addCleanup(self.deleteTestDir)
 
2343
 
 
2344
    def deleteTestDir(self):
 
2345
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2346
        _rmtree_temp_dir(self.test_base_dir)
 
2347
 
 
2348
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2349
        """Build a test tree according to a pattern.
 
2350
 
 
2351
        shape is a sequence of file specifications.  If the final
 
2352
        character is '/', a directory is created.
 
2353
 
 
2354
        This assumes that all the elements in the tree being built are new.
 
2355
 
 
2356
        This doesn't add anything to a branch.
 
2357
 
 
2358
        :type shape:    list or tuple.
 
2359
        :param line_endings: Either 'binary' or 'native'
 
2360
            in binary mode, exact contents are written in native mode, the
 
2361
            line endings match the default platform endings.
 
2362
        :param transport: A transport to write to, for building trees on VFS's.
 
2363
            If the transport is readonly or None, "." is opened automatically.
 
2364
        :return: None
 
2365
        """
 
2366
        if type(shape) not in (list, tuple):
 
2367
            raise AssertionError("Parameter 'shape' should be "
 
2368
                "a list or a tuple. Got %r instead" % (shape,))
 
2369
        # It's OK to just create them using forward slashes on windows.
 
2370
        if transport is None or transport.is_readonly():
 
2371
            transport = get_transport(".")
 
2372
        for name in shape:
 
2373
            self.assertIsInstance(name, basestring)
 
2374
            if name[-1] == '/':
 
2375
                transport.mkdir(urlutils.escape(name[:-1]))
 
2376
            else:
 
2377
                if line_endings == 'binary':
 
2378
                    end = '\n'
 
2379
                elif line_endings == 'native':
 
2380
                    end = os.linesep
 
2381
                else:
 
2382
                    raise errors.BzrError(
 
2383
                        'Invalid line ending request %r' % line_endings)
 
2384
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2385
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2386
 
 
2387
    def build_tree_contents(self, shape):
 
2388
        build_tree_contents(shape)
 
2389
 
 
2390
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2391
        """Assert whether path or paths are in the WorkingTree"""
 
2392
        if tree is None:
 
2393
            tree = workingtree.WorkingTree.open(root_path)
 
2394
        if not isinstance(path, basestring):
 
2395
            for p in path:
 
2396
                self.assertInWorkingTree(p, tree=tree)
 
2397
        else:
 
2398
            self.assertIsNot(tree.path2id(path), None,
 
2399
                path+' not in working tree.')
 
2400
 
 
2401
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2402
        """Assert whether path or paths are not in the WorkingTree"""
 
2403
        if tree is None:
 
2404
            tree = workingtree.WorkingTree.open(root_path)
 
2405
        if not isinstance(path, basestring):
 
2406
            for p in path:
 
2407
                self.assertNotInWorkingTree(p,tree=tree)
 
2408
        else:
 
2409
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2410
 
 
2411
 
 
2412
class TestCaseWithTransport(TestCaseInTempDir):
 
2413
    """A test case that provides get_url and get_readonly_url facilities.
 
2414
 
 
2415
    These back onto two transport servers, one for readonly access and one for
 
2416
    read write access.
 
2417
 
 
2418
    If no explicit class is provided for readonly access, a
 
2419
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2420
    based read write transports.
 
2421
 
 
2422
    If an explicit class is provided for readonly access, that server and the
 
2423
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2424
    """
 
2425
 
 
2426
    def get_vfs_only_server(self):
 
2427
        """See TestCaseWithMemoryTransport.
 
2428
 
 
2429
        This is useful for some tests with specific servers that need
 
2430
        diagnostics.
 
2431
        """
 
2432
        if self.__vfs_server is None:
 
2433
            self.__vfs_server = self.vfs_transport_factory()
 
2434
            self.__vfs_server.setUp()
 
2435
            self.addCleanup(self.__vfs_server.tearDown)
 
2436
        return self.__vfs_server
 
2437
 
 
2438
    def make_branch_and_tree(self, relpath, format=None):
 
2439
        """Create a branch on the transport and a tree locally.
 
2440
 
 
2441
        If the transport is not a LocalTransport, the Tree can't be created on
 
2442
        the transport.  In that case if the vfs_transport_factory is
 
2443
        LocalURLServer the working tree is created in the local
 
2444
        directory backing the transport, and the returned tree's branch and
 
2445
        repository will also be accessed locally. Otherwise a lightweight
 
2446
        checkout is created and returned.
 
2447
 
 
2448
        :param format: The BzrDirFormat.
 
2449
        :returns: the WorkingTree.
 
2450
        """
 
2451
        # TODO: always use the local disk path for the working tree,
 
2452
        # this obviously requires a format that supports branch references
 
2453
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2454
        # RBC 20060208
 
2455
        b = self.make_branch(relpath, format=format)
 
2456
        try:
 
2457
            return b.bzrdir.create_workingtree()
 
2458
        except errors.NotLocalUrl:
 
2459
            # We can only make working trees locally at the moment.  If the
 
2460
            # transport can't support them, then we keep the non-disk-backed
 
2461
            # branch and create a local checkout.
 
2462
            if self.vfs_transport_factory is LocalURLServer:
 
2463
                # the branch is colocated on disk, we cannot create a checkout.
 
2464
                # hopefully callers will expect this.
 
2465
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2466
                wt = local_controldir.create_workingtree()
 
2467
                if wt.branch._format != b._format:
 
2468
                    wt._branch = b
 
2469
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2470
                    # in case the implementation details of workingtree objects
 
2471
                    # change.
 
2472
                    self.assertIs(b, wt.branch)
 
2473
                return wt
 
2474
            else:
 
2475
                return b.create_checkout(relpath, lightweight=True)
 
2476
 
 
2477
    def assertIsDirectory(self, relpath, transport):
 
2478
        """Assert that relpath within transport is a directory.
 
2479
 
 
2480
        This may not be possible on all transports; in that case it propagates
 
2481
        a TransportNotPossible.
 
2482
        """
 
2483
        try:
 
2484
            mode = transport.stat(relpath).st_mode
 
2485
        except errors.NoSuchFile:
 
2486
            self.fail("path %s is not a directory; no such file"
 
2487
                      % (relpath))
 
2488
        if not stat.S_ISDIR(mode):
 
2489
            self.fail("path %s is not a directory; has mode %#o"
 
2490
                      % (relpath, mode))
 
2491
 
 
2492
    def assertTreesEqual(self, left, right):
 
2493
        """Check that left and right have the same content and properties."""
 
2494
        # we use a tree delta to check for equality of the content, and we
 
2495
        # manually check for equality of other things such as the parents list.
 
2496
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2497
        differences = left.changes_from(right)
 
2498
        self.assertFalse(differences.has_changed(),
 
2499
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2500
 
 
2501
    def setUp(self):
 
2502
        super(TestCaseWithTransport, self).setUp()
 
2503
        self.__vfs_server = None
 
2504
 
 
2505
 
 
2506
class ChrootedTestCase(TestCaseWithTransport):
 
2507
    """A support class that provides readonly urls outside the local namespace.
 
2508
 
 
2509
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2510
    is then we are chrooted already, if it is not then an HttpServer is used
 
2511
    for readonly urls.
 
2512
 
 
2513
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2514
                       be used without needed to redo it when a different
 
2515
                       subclass is in use ?
 
2516
    """
 
2517
 
 
2518
    def setUp(self):
 
2519
        super(ChrootedTestCase, self).setUp()
 
2520
        if not self.vfs_transport_factory == MemoryServer:
 
2521
            self.transport_readonly_server = HttpServer
 
2522
 
 
2523
 
 
2524
def condition_id_re(pattern):
 
2525
    """Create a condition filter which performs a re check on a test's id.
 
2526
 
 
2527
    :param pattern: A regular expression string.
 
2528
    :return: A callable that returns True if the re matches.
 
2529
    """
 
2530
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2531
        'test filter')
 
2532
    def condition(test):
 
2533
        test_id = test.id()
 
2534
        return filter_re.search(test_id)
 
2535
    return condition
 
2536
 
 
2537
 
 
2538
def condition_isinstance(klass_or_klass_list):
 
2539
    """Create a condition filter which returns isinstance(param, klass).
 
2540
 
 
2541
    :return: A callable which when called with one parameter obj return the
 
2542
        result of isinstance(obj, klass_or_klass_list).
 
2543
    """
 
2544
    def condition(obj):
 
2545
        return isinstance(obj, klass_or_klass_list)
 
2546
    return condition
 
2547
 
 
2548
 
 
2549
def condition_id_in_list(id_list):
 
2550
    """Create a condition filter which verify that test's id in a list.
 
2551
 
 
2552
    :param id_list: A TestIdList object.
 
2553
    :return: A callable that returns True if the test's id appears in the list.
 
2554
    """
 
2555
    def condition(test):
 
2556
        return id_list.includes(test.id())
 
2557
    return condition
 
2558
 
 
2559
 
 
2560
def condition_id_startswith(starts):
 
2561
    """Create a condition filter verifying that test's id starts with a string.
 
2562
 
 
2563
    :param starts: A list of string.
 
2564
    :return: A callable that returns True if the test's id starts with one of
 
2565
        the given strings.
 
2566
    """
 
2567
    def condition(test):
 
2568
        for start in starts:
 
2569
            if test.id().startswith(start):
 
2570
                return True
 
2571
        return False
 
2572
    return condition
 
2573
 
 
2574
 
 
2575
def exclude_tests_by_condition(suite, condition):
 
2576
    """Create a test suite which excludes some tests from suite.
 
2577
 
 
2578
    :param suite: The suite to get tests from.
 
2579
    :param condition: A callable whose result evaluates True when called with a
 
2580
        test case which should be excluded from the result.
 
2581
    :return: A suite which contains the tests found in suite that fail
 
2582
        condition.
 
2583
    """
 
2584
    result = []
 
2585
    for test in iter_suite_tests(suite):
 
2586
        if not condition(test):
 
2587
            result.append(test)
 
2588
    return TestUtil.TestSuite(result)
 
2589
 
 
2590
 
 
2591
def filter_suite_by_condition(suite, condition):
 
2592
    """Create a test suite by filtering another one.
 
2593
 
 
2594
    :param suite: The source suite.
 
2595
    :param condition: A callable whose result evaluates True when called with a
 
2596
        test case which should be included in the result.
 
2597
    :return: A suite which contains the tests found in suite that pass
 
2598
        condition.
 
2599
    """
 
2600
    result = []
 
2601
    for test in iter_suite_tests(suite):
 
2602
        if condition(test):
 
2603
            result.append(test)
 
2604
    return TestUtil.TestSuite(result)
 
2605
 
 
2606
 
 
2607
def filter_suite_by_re(suite, pattern):
 
2608
    """Create a test suite by filtering another one.
 
2609
 
 
2610
    :param suite:           the source suite
 
2611
    :param pattern:         pattern that names must match
 
2612
    :returns: the newly created suite
 
2613
    """
 
2614
    condition = condition_id_re(pattern)
 
2615
    result_suite = filter_suite_by_condition(suite, condition)
 
2616
    return result_suite
 
2617
 
 
2618
 
 
2619
def filter_suite_by_id_list(suite, test_id_list):
 
2620
    """Create a test suite by filtering another one.
 
2621
 
 
2622
    :param suite: The source suite.
 
2623
    :param test_id_list: A list of the test ids to keep as strings.
 
2624
    :returns: the newly created suite
 
2625
    """
 
2626
    condition = condition_id_in_list(test_id_list)
 
2627
    result_suite = filter_suite_by_condition(suite, condition)
 
2628
    return result_suite
 
2629
 
 
2630
 
 
2631
def filter_suite_by_id_startswith(suite, start):
 
2632
    """Create a test suite by filtering another one.
 
2633
 
 
2634
    :param suite: The source suite.
 
2635
    :param start: A list of string the test id must start with one of.
 
2636
    :returns: the newly created suite
 
2637
    """
 
2638
    condition = condition_id_startswith(start)
 
2639
    result_suite = filter_suite_by_condition(suite, condition)
 
2640
    return result_suite
 
2641
 
 
2642
 
 
2643
def exclude_tests_by_re(suite, pattern):
 
2644
    """Create a test suite which excludes some tests from suite.
 
2645
 
 
2646
    :param suite: The suite to get tests from.
 
2647
    :param pattern: A regular expression string. Test ids that match this
 
2648
        pattern will be excluded from the result.
 
2649
    :return: A TestSuite that contains all the tests from suite without the
 
2650
        tests that matched pattern. The order of tests is the same as it was in
 
2651
        suite.
 
2652
    """
 
2653
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2654
 
 
2655
 
 
2656
def preserve_input(something):
 
2657
    """A helper for performing test suite transformation chains.
 
2658
 
 
2659
    :param something: Anything you want to preserve.
 
2660
    :return: Something.
 
2661
    """
 
2662
    return something
 
2663
 
 
2664
 
 
2665
def randomize_suite(suite):
 
2666
    """Return a new TestSuite with suite's tests in random order.
 
2667
 
 
2668
    The tests in the input suite are flattened into a single suite in order to
 
2669
    accomplish this. Any nested TestSuites are removed to provide global
 
2670
    randomness.
 
2671
    """
 
2672
    tests = list(iter_suite_tests(suite))
 
2673
    random.shuffle(tests)
 
2674
    return TestUtil.TestSuite(tests)
 
2675
 
 
2676
 
 
2677
def split_suite_by_condition(suite, condition):
 
2678
    """Split a test suite into two by a condition.
 
2679
 
 
2680
    :param suite: The suite to split.
 
2681
    :param condition: The condition to match on. Tests that match this
 
2682
        condition are returned in the first test suite, ones that do not match
 
2683
        are in the second suite.
 
2684
    :return: A tuple of two test suites, where the first contains tests from
 
2685
        suite matching the condition, and the second contains the remainder
 
2686
        from suite. The order within each output suite is the same as it was in
 
2687
        suite.
 
2688
    """
 
2689
    matched = []
 
2690
    did_not_match = []
 
2691
    for test in iter_suite_tests(suite):
 
2692
        if condition(test):
 
2693
            matched.append(test)
 
2694
        else:
 
2695
            did_not_match.append(test)
 
2696
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2697
 
 
2698
 
 
2699
def split_suite_by_re(suite, pattern):
 
2700
    """Split a test suite into two by a regular expression.
 
2701
 
 
2702
    :param suite: The suite to split.
 
2703
    :param pattern: A regular expression string. Test ids that match this
 
2704
        pattern will be in the first test suite returned, and the others in the
 
2705
        second test suite returned.
 
2706
    :return: A tuple of two test suites, where the first contains tests from
 
2707
        suite matching pattern, and the second contains the remainder from
 
2708
        suite. The order within each output suite is the same as it was in
 
2709
        suite.
 
2710
    """
 
2711
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2712
 
 
2713
 
 
2714
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2715
              stop_on_failure=False,
 
2716
              transport=None, lsprof_timed=None, bench_history=None,
 
2717
              matching_tests_first=None,
 
2718
              list_only=False,
 
2719
              random_seed=None,
 
2720
              exclude_pattern=None,
 
2721
              strict=False,
 
2722
              runner_class=None,
 
2723
              suite_decorators=None,
 
2724
              stream=None):
 
2725
    """Run a test suite for bzr selftest.
 
2726
 
 
2727
    :param runner_class: The class of runner to use. Must support the
 
2728
        constructor arguments passed by run_suite which are more than standard
 
2729
        python uses.
 
2730
    :return: A boolean indicating success.
 
2731
    """
 
2732
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2733
    if verbose:
 
2734
        verbosity = 2
 
2735
    else:
 
2736
        verbosity = 1
 
2737
    if runner_class is None:
 
2738
        runner_class = TextTestRunner
 
2739
    if stream is None:
 
2740
        stream = sys.stdout
 
2741
    runner = runner_class(stream=stream,
 
2742
                            descriptions=0,
 
2743
                            verbosity=verbosity,
 
2744
                            bench_history=bench_history,
 
2745
                            list_only=list_only,
 
2746
                            strict=strict,
 
2747
                            )
 
2748
    runner.stop_on_failure=stop_on_failure
 
2749
    # built in decorator factories:
 
2750
    decorators = [
 
2751
        random_order(random_seed, runner),
 
2752
        exclude_tests(exclude_pattern),
 
2753
        ]
 
2754
    if matching_tests_first:
 
2755
        decorators.append(tests_first(pattern))
 
2756
    else:
 
2757
        decorators.append(filter_tests(pattern))
 
2758
    if suite_decorators:
 
2759
        decorators.extend(suite_decorators)
 
2760
    for decorator in decorators:
 
2761
        suite = decorator(suite)
 
2762
    result = runner.run(suite)
 
2763
    if list_only:
 
2764
        return True
 
2765
    result.done()
 
2766
    if strict:
 
2767
        return result.wasStrictlySuccessful()
 
2768
    else:
 
2769
        return result.wasSuccessful()
 
2770
 
 
2771
 
 
2772
# A registry where get() returns a suite decorator.
 
2773
parallel_registry = registry.Registry()
 
2774
 
 
2775
 
 
2776
def fork_decorator(suite):
 
2777
    concurrency = osutils.local_concurrency()
 
2778
    if concurrency == 1:
 
2779
        return suite
 
2780
    from testtools import ConcurrentTestSuite
 
2781
    return ConcurrentTestSuite(suite, fork_for_tests)
 
2782
parallel_registry.register('fork', fork_decorator)
 
2783
 
 
2784
 
 
2785
def subprocess_decorator(suite):
 
2786
    concurrency = osutils.local_concurrency()
 
2787
    if concurrency == 1:
 
2788
        return suite
 
2789
    from testtools import ConcurrentTestSuite
 
2790
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
2791
parallel_registry.register('subprocess', subprocess_decorator)
 
2792
 
 
2793
 
 
2794
def exclude_tests(exclude_pattern):
 
2795
    """Return a test suite decorator that excludes tests."""
 
2796
    if exclude_pattern is None:
 
2797
        return identity_decorator
 
2798
    def decorator(suite):
 
2799
        return ExcludeDecorator(suite, exclude_pattern)
 
2800
    return decorator
 
2801
 
 
2802
 
 
2803
def filter_tests(pattern):
 
2804
    if pattern == '.*':
 
2805
        return identity_decorator
 
2806
    def decorator(suite):
 
2807
        return FilterTestsDecorator(suite, pattern)
 
2808
    return decorator
 
2809
 
 
2810
 
 
2811
def random_order(random_seed, runner):
 
2812
    """Return a test suite decorator factory for randomising tests order.
 
2813
    
 
2814
    :param random_seed: now, a string which casts to a long, or a long.
 
2815
    :param runner: A test runner with a stream attribute to report on.
 
2816
    """
 
2817
    if random_seed is None:
 
2818
        return identity_decorator
 
2819
    def decorator(suite):
 
2820
        return RandomDecorator(suite, random_seed, runner.stream)
 
2821
    return decorator
 
2822
 
 
2823
 
 
2824
def tests_first(pattern):
 
2825
    if pattern == '.*':
 
2826
        return identity_decorator
 
2827
    def decorator(suite):
 
2828
        return TestFirstDecorator(suite, pattern)
 
2829
    return decorator
 
2830
 
 
2831
 
 
2832
def identity_decorator(suite):
 
2833
    """Return suite."""
 
2834
    return suite
 
2835
 
 
2836
 
 
2837
class TestDecorator(TestSuite):
 
2838
    """A decorator for TestCase/TestSuite objects.
 
2839
    
 
2840
    Usually, subclasses should override __iter__(used when flattening test
 
2841
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
2842
    debug().
 
2843
    """
 
2844
 
 
2845
    def __init__(self, suite):
 
2846
        TestSuite.__init__(self)
 
2847
        self.addTest(suite)
 
2848
 
 
2849
    def countTestCases(self):
 
2850
        cases = 0
 
2851
        for test in self:
 
2852
            cases += test.countTestCases()
 
2853
        return cases
 
2854
 
 
2855
    def debug(self):
 
2856
        for test in self:
 
2857
            test.debug()
 
2858
 
 
2859
    def run(self, result):
 
2860
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
2861
        # into __iter__.
 
2862
        for test in self:
 
2863
            if result.shouldStop:
 
2864
                break
 
2865
            test.run(result)
 
2866
        return result
 
2867
 
 
2868
 
 
2869
class ExcludeDecorator(TestDecorator):
 
2870
    """A decorator which excludes test matching an exclude pattern."""
 
2871
 
 
2872
    def __init__(self, suite, exclude_pattern):
 
2873
        TestDecorator.__init__(self, suite)
 
2874
        self.exclude_pattern = exclude_pattern
 
2875
        self.excluded = False
 
2876
 
 
2877
    def __iter__(self):
 
2878
        if self.excluded:
 
2879
            return iter(self._tests)
 
2880
        self.excluded = True
 
2881
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
2882
        del self._tests[:]
 
2883
        self.addTests(suite)
 
2884
        return iter(self._tests)
 
2885
 
 
2886
 
 
2887
class FilterTestsDecorator(TestDecorator):
 
2888
    """A decorator which filters tests to those matching a pattern."""
 
2889
 
 
2890
    def __init__(self, suite, pattern):
 
2891
        TestDecorator.__init__(self, suite)
 
2892
        self.pattern = pattern
 
2893
        self.filtered = False
 
2894
 
 
2895
    def __iter__(self):
 
2896
        if self.filtered:
 
2897
            return iter(self._tests)
 
2898
        self.filtered = True
 
2899
        suite = filter_suite_by_re(self, self.pattern)
 
2900
        del self._tests[:]
 
2901
        self.addTests(suite)
 
2902
        return iter(self._tests)
 
2903
 
 
2904
 
 
2905
class RandomDecorator(TestDecorator):
 
2906
    """A decorator which randomises the order of its tests."""
 
2907
 
 
2908
    def __init__(self, suite, random_seed, stream):
 
2909
        TestDecorator.__init__(self, suite)
 
2910
        self.random_seed = random_seed
 
2911
        self.randomised = False
 
2912
        self.stream = stream
 
2913
 
 
2914
    def __iter__(self):
 
2915
        if self.randomised:
 
2916
            return iter(self._tests)
 
2917
        self.randomised = True
 
2918
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
2919
            (self.actual_seed()))
 
2920
        # Initialise the random number generator.
 
2921
        random.seed(self.actual_seed())
 
2922
        suite = randomize_suite(self)
 
2923
        del self._tests[:]
 
2924
        self.addTests(suite)
 
2925
        return iter(self._tests)
 
2926
 
 
2927
    def actual_seed(self):
 
2928
        if self.random_seed == "now":
 
2929
            # We convert the seed to a long to make it reuseable across
 
2930
            # invocations (because the user can reenter it).
 
2931
            self.random_seed = long(time.time())
 
2932
        else:
 
2933
            # Convert the seed to a long if we can
 
2934
            try:
 
2935
                self.random_seed = long(self.random_seed)
 
2936
            except:
 
2937
                pass
 
2938
        return self.random_seed
 
2939
 
 
2940
 
 
2941
class TestFirstDecorator(TestDecorator):
 
2942
    """A decorator which moves named tests to the front."""
 
2943
 
 
2944
    def __init__(self, suite, pattern):
 
2945
        TestDecorator.__init__(self, suite)
 
2946
        self.pattern = pattern
 
2947
        self.filtered = False
 
2948
 
 
2949
    def __iter__(self):
 
2950
        if self.filtered:
 
2951
            return iter(self._tests)
 
2952
        self.filtered = True
 
2953
        suites = split_suite_by_re(self, self.pattern)
 
2954
        del self._tests[:]
 
2955
        self.addTests(suites)
 
2956
        return iter(self._tests)
 
2957
 
 
2958
 
 
2959
def partition_tests(suite, count):
 
2960
    """Partition suite into count lists of tests."""
 
2961
    result = []
 
2962
    tests = list(iter_suite_tests(suite))
 
2963
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
2964
    for block in range(count):
 
2965
        low_test = block * tests_per_process
 
2966
        high_test = low_test + tests_per_process
 
2967
        process_tests = tests[low_test:high_test]
 
2968
        result.append(process_tests)
 
2969
    return result
 
2970
 
 
2971
 
 
2972
def fork_for_tests(suite):
 
2973
    """Take suite and start up one runner per CPU by forking()
 
2974
 
 
2975
    :return: An iterable of TestCase-like objects which can each have
 
2976
        run(result) called on them to feed tests to result.
 
2977
    """
 
2978
    concurrency = osutils.local_concurrency()
 
2979
    result = []
 
2980
    from subunit import TestProtocolClient, ProtocolTestCase
 
2981
    try:
 
2982
        from subunit.test_results import AutoTimingTestResultDecorator
 
2983
    except ImportError:
 
2984
        AutoTimingTestResultDecorator = lambda x:x
 
2985
    class TestInOtherProcess(ProtocolTestCase):
 
2986
        # Should be in subunit, I think. RBC.
 
2987
        def __init__(self, stream, pid):
 
2988
            ProtocolTestCase.__init__(self, stream)
 
2989
            self.pid = pid
 
2990
 
 
2991
        def run(self, result):
 
2992
            try:
 
2993
                ProtocolTestCase.run(self, result)
 
2994
            finally:
 
2995
                os.waitpid(self.pid, os.WNOHANG)
 
2996
 
 
2997
    test_blocks = partition_tests(suite, concurrency)
 
2998
    for process_tests in test_blocks:
 
2999
        process_suite = TestSuite()
 
3000
        process_suite.addTests(process_tests)
 
3001
        c2pread, c2pwrite = os.pipe()
 
3002
        pid = os.fork()
 
3003
        if pid == 0:
 
3004
            try:
 
3005
                os.close(c2pread)
 
3006
                # Leave stderr and stdout open so we can see test noise
 
3007
                # Close stdin so that the child goes away if it decides to
 
3008
                # read from stdin (otherwise its a roulette to see what
 
3009
                # child actually gets keystrokes for pdb etc).
 
3010
                sys.stdin.close()
 
3011
                sys.stdin = None
 
3012
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3013
                subunit_result = AutoTimingTestResultDecorator(
 
3014
                    TestProtocolClient(stream))
 
3015
                process_suite.run(subunit_result)
 
3016
            finally:
 
3017
                os._exit(0)
 
3018
        else:
 
3019
            os.close(c2pwrite)
 
3020
            stream = os.fdopen(c2pread, 'rb', 1)
 
3021
            test = TestInOtherProcess(stream, pid)
 
3022
            result.append(test)
 
3023
    return result
 
3024
 
 
3025
 
 
3026
def reinvoke_for_tests(suite):
 
3027
    """Take suite and start up one runner per CPU using subprocess().
 
3028
 
 
3029
    :return: An iterable of TestCase-like objects which can each have
 
3030
        run(result) called on them to feed tests to result.
 
3031
    """
 
3032
    concurrency = osutils.local_concurrency()
 
3033
    result = []
 
3034
    from subunit import ProtocolTestCase
 
3035
    class TestInSubprocess(ProtocolTestCase):
 
3036
        def __init__(self, process, name):
 
3037
            ProtocolTestCase.__init__(self, process.stdout)
 
3038
            self.process = process
 
3039
            self.process.stdin.close()
 
3040
            self.name = name
 
3041
 
 
3042
        def run(self, result):
 
3043
            try:
 
3044
                ProtocolTestCase.run(self, result)
 
3045
            finally:
 
3046
                self.process.wait()
 
3047
                os.unlink(self.name)
 
3048
            # print "pid %d finished" % finished_process
 
3049
    test_blocks = partition_tests(suite, concurrency)
 
3050
    for process_tests in test_blocks:
 
3051
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3052
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3053
        if not os.path.isfile(bzr_path):
 
3054
            # We are probably installed. Assume sys.argv is the right file
 
3055
            bzr_path = sys.argv[0]
 
3056
        fd, test_list_file_name = tempfile.mkstemp()
 
3057
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3058
        for test in process_tests:
 
3059
            test_list_file.write(test.id() + '\n')
 
3060
        test_list_file.close()
 
3061
        try:
 
3062
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3063
                '--subunit']
 
3064
            if '--no-plugins' in sys.argv:
 
3065
                argv.append('--no-plugins')
 
3066
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3067
            # stderr it can interrupt the subunit protocol.
 
3068
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3069
                bufsize=1)
 
3070
            test = TestInSubprocess(process, test_list_file_name)
 
3071
            result.append(test)
 
3072
        except:
 
3073
            os.unlink(test_list_file_name)
 
3074
            raise
 
3075
    return result
 
3076
 
 
3077
 
 
3078
class BZRTransformingResult(unittest.TestResult):
 
3079
 
 
3080
    def __init__(self, target):
 
3081
        unittest.TestResult.__init__(self)
 
3082
        self.result = target
 
3083
 
 
3084
    def startTest(self, test):
 
3085
        self.result.startTest(test)
 
3086
 
 
3087
    def stopTest(self, test):
 
3088
        self.result.stopTest(test)
 
3089
 
 
3090
    def addError(self, test, err):
 
3091
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3092
        if feature is not None:
 
3093
            self.result.addNotSupported(test, feature)
 
3094
        else:
 
3095
            self.result.addError(test, err)
 
3096
 
 
3097
    def addFailure(self, test, err):
 
3098
        known = self._error_looks_like('KnownFailure: ', err)
 
3099
        if known is not None:
 
3100
            self.result._addKnownFailure(test, [KnownFailure,
 
3101
                                                KnownFailure(known), None])
 
3102
        else:
 
3103
            self.result.addFailure(test, err)
 
3104
 
 
3105
    def addSkip(self, test, reason):
 
3106
        self.result.addSkip(test, reason)
 
3107
 
 
3108
    def addSuccess(self, test):
 
3109
        self.result.addSuccess(test)
 
3110
 
 
3111
    def _error_looks_like(self, prefix, err):
 
3112
        """Deserialize exception and returns the stringify value."""
 
3113
        import subunit
 
3114
        value = None
 
3115
        typ, exc, _ = err
 
3116
        if isinstance(exc, subunit.RemoteException):
 
3117
            # stringify the exception gives access to the remote traceback
 
3118
            # We search the last line for 'prefix'
 
3119
            lines = str(exc).split('\n')
 
3120
            while lines and not lines[-1]:
 
3121
                lines.pop(-1)
 
3122
            if lines:
 
3123
                if lines[-1].startswith(prefix):
 
3124
                    value = lines[-1][len(prefix):]
 
3125
        return value
 
3126
 
 
3127
 
 
3128
# Controlled by "bzr selftest -E=..." option
 
3129
selftest_debug_flags = set()
 
3130
 
 
3131
 
 
3132
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3133
             transport=None,
 
3134
             test_suite_factory=None,
 
3135
             lsprof_timed=None,
 
3136
             bench_history=None,
 
3137
             matching_tests_first=None,
 
3138
             list_only=False,
 
3139
             random_seed=None,
 
3140
             exclude_pattern=None,
 
3141
             strict=False,
 
3142
             load_list=None,
 
3143
             debug_flags=None,
 
3144
             starting_with=None,
 
3145
             runner_class=None,
 
3146
             suite_decorators=None,
 
3147
             ):
 
3148
    """Run the whole test suite under the enhanced runner"""
 
3149
    # XXX: Very ugly way to do this...
 
3150
    # Disable warning about old formats because we don't want it to disturb
 
3151
    # any blackbox tests.
 
3152
    from bzrlib import repository
 
3153
    repository._deprecation_warning_done = True
 
3154
 
 
3155
    global default_transport
 
3156
    if transport is None:
 
3157
        transport = default_transport
 
3158
    old_transport = default_transport
 
3159
    default_transport = transport
 
3160
    global selftest_debug_flags
 
3161
    old_debug_flags = selftest_debug_flags
 
3162
    if debug_flags is not None:
 
3163
        selftest_debug_flags = set(debug_flags)
 
3164
    try:
 
3165
        if load_list is None:
 
3166
            keep_only = None
 
3167
        else:
 
3168
            keep_only = load_test_id_list(load_list)
 
3169
        if test_suite_factory is None:
 
3170
            suite = test_suite(keep_only, starting_with)
 
3171
        else:
 
3172
            suite = test_suite_factory()
 
3173
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3174
                     stop_on_failure=stop_on_failure,
 
3175
                     transport=transport,
 
3176
                     lsprof_timed=lsprof_timed,
 
3177
                     bench_history=bench_history,
 
3178
                     matching_tests_first=matching_tests_first,
 
3179
                     list_only=list_only,
 
3180
                     random_seed=random_seed,
 
3181
                     exclude_pattern=exclude_pattern,
 
3182
                     strict=strict,
 
3183
                     runner_class=runner_class,
 
3184
                     suite_decorators=suite_decorators,
 
3185
                     )
 
3186
    finally:
 
3187
        default_transport = old_transport
 
3188
        selftest_debug_flags = old_debug_flags
 
3189
 
 
3190
 
 
3191
def load_test_id_list(file_name):
 
3192
    """Load a test id list from a text file.
 
3193
 
 
3194
    The format is one test id by line.  No special care is taken to impose
 
3195
    strict rules, these test ids are used to filter the test suite so a test id
 
3196
    that do not match an existing test will do no harm. This allows user to add
 
3197
    comments, leave blank lines, etc.
 
3198
    """
 
3199
    test_list = []
 
3200
    try:
 
3201
        ftest = open(file_name, 'rt')
 
3202
    except IOError, e:
 
3203
        if e.errno != errno.ENOENT:
 
3204
            raise
 
3205
        else:
 
3206
            raise errors.NoSuchFile(file_name)
 
3207
 
 
3208
    for test_name in ftest.readlines():
 
3209
        test_list.append(test_name.strip())
 
3210
    ftest.close()
 
3211
    return test_list
 
3212
 
 
3213
 
 
3214
def suite_matches_id_list(test_suite, id_list):
 
3215
    """Warns about tests not appearing or appearing more than once.
 
3216
 
 
3217
    :param test_suite: A TestSuite object.
 
3218
    :param test_id_list: The list of test ids that should be found in
 
3219
         test_suite.
 
3220
 
 
3221
    :return: (absents, duplicates) absents is a list containing the test found
 
3222
        in id_list but not in test_suite, duplicates is a list containing the
 
3223
        test found multiple times in test_suite.
 
3224
 
 
3225
    When using a prefined test id list, it may occurs that some tests do not
 
3226
    exist anymore or that some tests use the same id. This function warns the
 
3227
    tester about potential problems in his workflow (test lists are volatile)
 
3228
    or in the test suite itself (using the same id for several tests does not
 
3229
    help to localize defects).
 
3230
    """
 
3231
    # Build a dict counting id occurrences
 
3232
    tests = dict()
 
3233
    for test in iter_suite_tests(test_suite):
 
3234
        id = test.id()
 
3235
        tests[id] = tests.get(id, 0) + 1
 
3236
 
 
3237
    not_found = []
 
3238
    duplicates = []
 
3239
    for id in id_list:
 
3240
        occurs = tests.get(id, 0)
 
3241
        if not occurs:
 
3242
            not_found.append(id)
 
3243
        elif occurs > 1:
 
3244
            duplicates.append(id)
 
3245
 
 
3246
    return not_found, duplicates
 
3247
 
 
3248
 
 
3249
class TestIdList(object):
 
3250
    """Test id list to filter a test suite.
 
3251
 
 
3252
    Relying on the assumption that test ids are built as:
 
3253
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3254
    notation, this class offers methods to :
 
3255
    - avoid building a test suite for modules not refered to in the test list,
 
3256
    - keep only the tests listed from the module test suite.
 
3257
    """
 
3258
 
 
3259
    def __init__(self, test_id_list):
 
3260
        # When a test suite needs to be filtered against us we compare test ids
 
3261
        # for equality, so a simple dict offers a quick and simple solution.
 
3262
        self.tests = dict().fromkeys(test_id_list, True)
 
3263
 
 
3264
        # While unittest.TestCase have ids like:
 
3265
        # <module>.<class>.<method>[(<param+)],
 
3266
        # doctest.DocTestCase can have ids like:
 
3267
        # <module>
 
3268
        # <module>.<class>
 
3269
        # <module>.<function>
 
3270
        # <module>.<class>.<method>
 
3271
 
 
3272
        # Since we can't predict a test class from its name only, we settle on
 
3273
        # a simple constraint: a test id always begins with its module name.
 
3274
 
 
3275
        modules = {}
 
3276
        for test_id in test_id_list:
 
3277
            parts = test_id.split('.')
 
3278
            mod_name = parts.pop(0)
 
3279
            modules[mod_name] = True
 
3280
            for part in parts:
 
3281
                mod_name += '.' + part
 
3282
                modules[mod_name] = True
 
3283
        self.modules = modules
 
3284
 
 
3285
    def refers_to(self, module_name):
 
3286
        """Is there tests for the module or one of its sub modules."""
 
3287
        return self.modules.has_key(module_name)
 
3288
 
 
3289
    def includes(self, test_id):
 
3290
        return self.tests.has_key(test_id)
 
3291
 
 
3292
 
 
3293
class TestPrefixAliasRegistry(registry.Registry):
 
3294
    """A registry for test prefix aliases.
 
3295
 
 
3296
    This helps implement shorcuts for the --starting-with selftest
 
3297
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3298
    warning will be emitted).
 
3299
    """
 
3300
 
 
3301
    def register(self, key, obj, help=None, info=None,
 
3302
                 override_existing=False):
 
3303
        """See Registry.register.
 
3304
 
 
3305
        Trying to override an existing alias causes a warning to be emitted,
 
3306
        not a fatal execption.
 
3307
        """
 
3308
        try:
 
3309
            super(TestPrefixAliasRegistry, self).register(
 
3310
                key, obj, help=help, info=info, override_existing=False)
 
3311
        except KeyError:
 
3312
            actual = self.get(key)
 
3313
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3314
                 % (key, actual, obj))
 
3315
 
 
3316
    def resolve_alias(self, id_start):
 
3317
        """Replace the alias by the prefix in the given string.
 
3318
 
 
3319
        Using an unknown prefix is an error to help catching typos.
 
3320
        """
 
3321
        parts = id_start.split('.')
 
3322
        try:
 
3323
            parts[0] = self.get(parts[0])
 
3324
        except KeyError:
 
3325
            raise errors.BzrCommandError(
 
3326
                '%s is not a known test prefix alias' % parts[0])
 
3327
        return '.'.join(parts)
 
3328
 
 
3329
 
 
3330
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3331
"""Registry of test prefix aliases."""
 
3332
 
 
3333
 
 
3334
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3335
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3336
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3337
 
 
3338
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3339
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3340
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3341
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3342
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3343
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3344
 
 
3345
 
 
3346
def test_suite(keep_only=None, starting_with=None):
 
3347
    """Build and return TestSuite for the whole of bzrlib.
 
3348
 
 
3349
    :param keep_only: A list of test ids limiting the suite returned.
 
3350
 
 
3351
    :param starting_with: An id limiting the suite returned to the tests
 
3352
         starting with it.
 
3353
 
 
3354
    This function can be replaced if you need to change the default test
 
3355
    suite on a global basis, but it is not encouraged.
 
3356
    """
 
3357
    testmod_names = [
 
3358
                   'bzrlib.doc',
 
3359
                   'bzrlib.tests.blackbox',
 
3360
                   'bzrlib.tests.commands',
 
3361
                   'bzrlib.tests.per_branch',
 
3362
                   'bzrlib.tests.per_bzrdir',
 
3363
                   'bzrlib.tests.per_interrepository',
 
3364
                   'bzrlib.tests.per_intertree',
 
3365
                   'bzrlib.tests.per_inventory',
 
3366
                   'bzrlib.tests.per_interbranch',
 
3367
                   'bzrlib.tests.per_lock',
 
3368
                   'bzrlib.tests.per_transport',
 
3369
                   'bzrlib.tests.per_tree',
 
3370
                   'bzrlib.tests.per_repository',
 
3371
                   'bzrlib.tests.per_repository_chk',
 
3372
                   'bzrlib.tests.per_repository_reference',
 
3373
                   'bzrlib.tests.per_workingtree',
 
3374
                   'bzrlib.tests.test__annotator',
 
3375
                   'bzrlib.tests.test__chk_map',
 
3376
                   'bzrlib.tests.test__dirstate_helpers',
 
3377
                   'bzrlib.tests.test__groupcompress',
 
3378
                   'bzrlib.tests.test__known_graph',
 
3379
                   'bzrlib.tests.test__rio',
 
3380
                   'bzrlib.tests.test__walkdirs_win32',
 
3381
                   'bzrlib.tests.test_ancestry',
 
3382
                   'bzrlib.tests.test_annotate',
 
3383
                   'bzrlib.tests.test_api',
 
3384
                   'bzrlib.tests.test_atomicfile',
 
3385
                   'bzrlib.tests.test_bad_files',
 
3386
                   'bzrlib.tests.test_bencode',
 
3387
                   'bzrlib.tests.test_bisect_multi',
 
3388
                   'bzrlib.tests.test_branch',
 
3389
                   'bzrlib.tests.test_branchbuilder',
 
3390
                   'bzrlib.tests.test_btree_index',
 
3391
                   'bzrlib.tests.test_bugtracker',
 
3392
                   'bzrlib.tests.test_bundle',
 
3393
                   'bzrlib.tests.test_bzrdir',
 
3394
                   'bzrlib.tests.test__chunks_to_lines',
 
3395
                   'bzrlib.tests.test_cache_utf8',
 
3396
                   'bzrlib.tests.test_chk_map',
 
3397
                   'bzrlib.tests.test_chk_serializer',
 
3398
                   'bzrlib.tests.test_chunk_writer',
 
3399
                   'bzrlib.tests.test_clean_tree',
 
3400
                   'bzrlib.tests.test_commands',
 
3401
                   'bzrlib.tests.test_commit',
 
3402
                   'bzrlib.tests.test_commit_merge',
 
3403
                   'bzrlib.tests.test_config',
 
3404
                   'bzrlib.tests.test_conflicts',
 
3405
                   'bzrlib.tests.test_counted_lock',
 
3406
                   'bzrlib.tests.test_decorators',
 
3407
                   'bzrlib.tests.test_delta',
 
3408
                   'bzrlib.tests.test_debug',
 
3409
                   'bzrlib.tests.test_deprecated_graph',
 
3410
                   'bzrlib.tests.test_diff',
 
3411
                   'bzrlib.tests.test_directory_service',
 
3412
                   'bzrlib.tests.test_dirstate',
 
3413
                   'bzrlib.tests.test_email_message',
 
3414
                   'bzrlib.tests.test_eol_filters',
 
3415
                   'bzrlib.tests.test_errors',
 
3416
                   'bzrlib.tests.test_export',
 
3417
                   'bzrlib.tests.test_extract',
 
3418
                   'bzrlib.tests.test_fetch',
 
3419
                   'bzrlib.tests.test_fifo_cache',
 
3420
                   'bzrlib.tests.test_filters',
 
3421
                   'bzrlib.tests.test_ftp_transport',
 
3422
                   'bzrlib.tests.test_foreign',
 
3423
                   'bzrlib.tests.test_generate_docs',
 
3424
                   'bzrlib.tests.test_generate_ids',
 
3425
                   'bzrlib.tests.test_globbing',
 
3426
                   'bzrlib.tests.test_gpg',
 
3427
                   'bzrlib.tests.test_graph',
 
3428
                   'bzrlib.tests.test_groupcompress',
 
3429
                   'bzrlib.tests.test_hashcache',
 
3430
                   'bzrlib.tests.test_help',
 
3431
                   'bzrlib.tests.test_hooks',
 
3432
                   'bzrlib.tests.test_http',
 
3433
                   'bzrlib.tests.test_http_response',
 
3434
                   'bzrlib.tests.test_https_ca_bundle',
 
3435
                   'bzrlib.tests.test_identitymap',
 
3436
                   'bzrlib.tests.test_ignores',
 
3437
                   'bzrlib.tests.test_index',
 
3438
                   'bzrlib.tests.test_info',
 
3439
                   'bzrlib.tests.test_inv',
 
3440
                   'bzrlib.tests.test_inventory_delta',
 
3441
                   'bzrlib.tests.test_knit',
 
3442
                   'bzrlib.tests.test_lazy_import',
 
3443
                   'bzrlib.tests.test_lazy_regex',
 
3444
                   'bzrlib.tests.test_lockable_files',
 
3445
                   'bzrlib.tests.test_lockdir',
 
3446
                   'bzrlib.tests.test_log',
 
3447
                   'bzrlib.tests.test_lru_cache',
 
3448
                   'bzrlib.tests.test_lsprof',
 
3449
                   'bzrlib.tests.test_mail_client',
 
3450
                   'bzrlib.tests.test_memorytree',
 
3451
                   'bzrlib.tests.test_merge',
 
3452
                   'bzrlib.tests.test_merge3',
 
3453
                   'bzrlib.tests.test_merge_core',
 
3454
                   'bzrlib.tests.test_merge_directive',
 
3455
                   'bzrlib.tests.test_missing',
 
3456
                   'bzrlib.tests.test_msgeditor',
 
3457
                   'bzrlib.tests.test_multiparent',
 
3458
                   'bzrlib.tests.test_mutabletree',
 
3459
                   'bzrlib.tests.test_nonascii',
 
3460
                   'bzrlib.tests.test_options',
 
3461
                   'bzrlib.tests.test_osutils',
 
3462
                   'bzrlib.tests.test_osutils_encodings',
 
3463
                   'bzrlib.tests.test_pack',
 
3464
                   'bzrlib.tests.test_pack_repository',
 
3465
                   'bzrlib.tests.test_patch',
 
3466
                   'bzrlib.tests.test_patches',
 
3467
                   'bzrlib.tests.test_permissions',
 
3468
                   'bzrlib.tests.test_plugins',
 
3469
                   'bzrlib.tests.test_progress',
 
3470
                   'bzrlib.tests.test_read_bundle',
 
3471
                   'bzrlib.tests.test_reconcile',
 
3472
                   'bzrlib.tests.test_reconfigure',
 
3473
                   'bzrlib.tests.test_registry',
 
3474
                   'bzrlib.tests.test_remote',
 
3475
                   'bzrlib.tests.test_rename_map',
 
3476
                   'bzrlib.tests.test_repository',
 
3477
                   'bzrlib.tests.test_revert',
 
3478
                   'bzrlib.tests.test_revision',
 
3479
                   'bzrlib.tests.test_revisionspec',
 
3480
                   'bzrlib.tests.test_revisiontree',
 
3481
                   'bzrlib.tests.test_rio',
 
3482
                   'bzrlib.tests.test_rules',
 
3483
                   'bzrlib.tests.test_sampler',
 
3484
                   'bzrlib.tests.test_selftest',
 
3485
                   'bzrlib.tests.test_serializer',
 
3486
                   'bzrlib.tests.test_setup',
 
3487
                   'bzrlib.tests.test_sftp_transport',
 
3488
                   'bzrlib.tests.test_shelf',
 
3489
                   'bzrlib.tests.test_shelf_ui',
 
3490
                   'bzrlib.tests.test_smart',
 
3491
                   'bzrlib.tests.test_smart_add',
 
3492
                   'bzrlib.tests.test_smart_request',
 
3493
                   'bzrlib.tests.test_smart_transport',
 
3494
                   'bzrlib.tests.test_smtp_connection',
 
3495
                   'bzrlib.tests.test_source',
 
3496
                   'bzrlib.tests.test_ssh_transport',
 
3497
                   'bzrlib.tests.test_status',
 
3498
                   'bzrlib.tests.test_store',
 
3499
                   'bzrlib.tests.test_strace',
 
3500
                   'bzrlib.tests.test_subsume',
 
3501
                   'bzrlib.tests.test_switch',
 
3502
                   'bzrlib.tests.test_symbol_versioning',
 
3503
                   'bzrlib.tests.test_tag',
 
3504
                   'bzrlib.tests.test_testament',
 
3505
                   'bzrlib.tests.test_textfile',
 
3506
                   'bzrlib.tests.test_textmerge',
 
3507
                   'bzrlib.tests.test_timestamp',
 
3508
                   'bzrlib.tests.test_trace',
 
3509
                   'bzrlib.tests.test_transactions',
 
3510
                   'bzrlib.tests.test_transform',
 
3511
                   'bzrlib.tests.test_transport',
 
3512
                   'bzrlib.tests.test_transport_log',
 
3513
                   'bzrlib.tests.test_tree',
 
3514
                   'bzrlib.tests.test_treebuilder',
 
3515
                   'bzrlib.tests.test_tsort',
 
3516
                   'bzrlib.tests.test_tuned_gzip',
 
3517
                   'bzrlib.tests.test_ui',
 
3518
                   'bzrlib.tests.test_uncommit',
 
3519
                   'bzrlib.tests.test_upgrade',
 
3520
                   'bzrlib.tests.test_upgrade_stacked',
 
3521
                   'bzrlib.tests.test_urlutils',
 
3522
                   'bzrlib.tests.test_version',
 
3523
                   'bzrlib.tests.test_version_info',
 
3524
                   'bzrlib.tests.test_versionedfile',
 
3525
                   'bzrlib.tests.test_weave',
 
3526
                   'bzrlib.tests.test_whitebox',
 
3527
                   'bzrlib.tests.test_win32utils',
 
3528
                   'bzrlib.tests.test_workingtree',
 
3529
                   'bzrlib.tests.test_workingtree_4',
 
3530
                   'bzrlib.tests.test_wsgi',
 
3531
                   'bzrlib.tests.test_xml',
 
3532
                   ]
 
3533
 
 
3534
    loader = TestUtil.TestLoader()
 
3535
 
 
3536
    if keep_only is not None:
 
3537
        id_filter = TestIdList(keep_only)
 
3538
    if starting_with:
 
3539
        starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3540
                         for start in starting_with]
 
3541
        # We take precedence over keep_only because *at loading time* using
 
3542
        # both options means we will load less tests for the same final result.
 
3543
        def interesting_module(name):
 
3544
            for start in starting_with:
 
3545
                if (
 
3546
                    # Either the module name starts with the specified string
 
3547
                    name.startswith(start)
 
3548
                    # or it may contain tests starting with the specified string
 
3549
                    or start.startswith(name)
 
3550
                    ):
 
3551
                    return True
 
3552
            return False
 
3553
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3554
 
 
3555
    elif keep_only is not None:
 
3556
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3557
        def interesting_module(name):
 
3558
            return id_filter.refers_to(name)
 
3559
 
 
3560
    else:
 
3561
        loader = TestUtil.TestLoader()
 
3562
        def interesting_module(name):
 
3563
            # No filtering, all modules are interesting
 
3564
            return True
 
3565
 
 
3566
    suite = loader.suiteClass()
 
3567
 
 
3568
    # modules building their suite with loadTestsFromModuleNames
 
3569
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
3570
 
 
3571
    modules_to_doctest = [
 
3572
        'bzrlib',
 
3573
        'bzrlib.branchbuilder',
 
3574
        'bzrlib.export',
 
3575
        'bzrlib.inventory',
 
3576
        'bzrlib.iterablefile',
 
3577
        'bzrlib.lockdir',
 
3578
        'bzrlib.merge3',
 
3579
        'bzrlib.option',
 
3580
        'bzrlib.symbol_versioning',
 
3581
        'bzrlib.tests',
 
3582
        'bzrlib.timestamp',
 
3583
        'bzrlib.version_info_formats.format_custom',
 
3584
        ]
 
3585
 
 
3586
    for mod in modules_to_doctest:
 
3587
        if not interesting_module(mod):
 
3588
            # No tests to keep here, move along
 
3589
            continue
 
3590
        try:
 
3591
            # note that this really does mean "report only" -- doctest
 
3592
            # still runs the rest of the examples
 
3593
            doc_suite = doctest.DocTestSuite(mod,
 
3594
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3595
        except ValueError, e:
 
3596
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3597
            raise
 
3598
        if len(doc_suite._tests) == 0:
 
3599
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3600
        suite.addTest(doc_suite)
 
3601
 
 
3602
    default_encoding = sys.getdefaultencoding()
 
3603
    for name, plugin in bzrlib.plugin.plugins().items():
 
3604
        if not interesting_module(plugin.module.__name__):
 
3605
            continue
 
3606
        plugin_suite = plugin.test_suite()
 
3607
        # We used to catch ImportError here and turn it into just a warning,
 
3608
        # but really if you don't have --no-plugins this should be a failure.
 
3609
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3610
        if plugin_suite is None:
 
3611
            plugin_suite = plugin.load_plugin_tests(loader)
 
3612
        if plugin_suite is not None:
 
3613
            suite.addTest(plugin_suite)
 
3614
        if default_encoding != sys.getdefaultencoding():
 
3615
            bzrlib.trace.warning(
 
3616
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3617
                sys.getdefaultencoding())
 
3618
            reload(sys)
 
3619
            sys.setdefaultencoding(default_encoding)
 
3620
 
 
3621
    if starting_with:
 
3622
        suite = filter_suite_by_id_startswith(suite, starting_with)
 
3623
 
 
3624
    if keep_only is not None:
 
3625
        # Now that the referred modules have loaded their tests, keep only the
 
3626
        # requested ones.
 
3627
        suite = filter_suite_by_id_list(suite, id_filter)
 
3628
        # Do some sanity checks on the id_list filtering
 
3629
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3630
        if starting_with:
 
3631
            # The tester has used both keep_only and starting_with, so he is
 
3632
            # already aware that some tests are excluded from the list, there
 
3633
            # is no need to tell him which.
 
3634
            pass
 
3635
        else:
 
3636
            # Some tests mentioned in the list are not in the test suite. The
 
3637
            # list may be out of date, report to the tester.
 
3638
            for id in not_found:
 
3639
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3640
        for id in duplicates:
 
3641
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3642
 
 
3643
    return suite
 
3644
 
 
3645
 
 
3646
def multiply_scenarios(scenarios_left, scenarios_right):
 
3647
    """Multiply two sets of scenarios.
 
3648
 
 
3649
    :returns: the cartesian product of the two sets of scenarios, that is
 
3650
        a scenario for every possible combination of a left scenario and a
 
3651
        right scenario.
 
3652
    """
 
3653
    return [
 
3654
        ('%s,%s' % (left_name, right_name),
 
3655
         dict(left_dict.items() + right_dict.items()))
 
3656
        for left_name, left_dict in scenarios_left
 
3657
        for right_name, right_dict in scenarios_right]
 
3658
 
 
3659
 
 
3660
def multiply_tests(tests, scenarios, result):
 
3661
    """Multiply tests_list by scenarios into result.
 
3662
 
 
3663
    This is the core workhorse for test parameterisation.
 
3664
 
 
3665
    Typically the load_tests() method for a per-implementation test suite will
 
3666
    call multiply_tests and return the result.
 
3667
 
 
3668
    :param tests: The tests to parameterise.
 
3669
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
3670
        scenario_param_dict).
 
3671
    :param result: A TestSuite to add created tests to.
 
3672
 
 
3673
    This returns the passed in result TestSuite with the cross product of all
 
3674
    the tests repeated once for each scenario.  Each test is adapted by adding
 
3675
    the scenario name at the end of its id(), and updating the test object's
 
3676
    __dict__ with the scenario_param_dict.
 
3677
 
 
3678
    >>> import bzrlib.tests.test_sampler
 
3679
    >>> r = multiply_tests(
 
3680
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
3681
    ...     [('one', dict(param=1)),
 
3682
    ...      ('two', dict(param=2))],
 
3683
    ...     TestSuite())
 
3684
    >>> tests = list(iter_suite_tests(r))
 
3685
    >>> len(tests)
 
3686
    2
 
3687
    >>> tests[0].id()
 
3688
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
3689
    >>> tests[0].param
 
3690
    1
 
3691
    >>> tests[1].param
 
3692
    2
 
3693
    """
 
3694
    for test in iter_suite_tests(tests):
 
3695
        apply_scenarios(test, scenarios, result)
 
3696
    return result
 
3697
 
 
3698
 
 
3699
def apply_scenarios(test, scenarios, result):
 
3700
    """Apply the scenarios in scenarios to test and add to result.
 
3701
 
 
3702
    :param test: The test to apply scenarios to.
 
3703
    :param scenarios: An iterable of scenarios to apply to test.
 
3704
    :return: result
 
3705
    :seealso: apply_scenario
 
3706
    """
 
3707
    for scenario in scenarios:
 
3708
        result.addTest(apply_scenario(test, scenario))
 
3709
    return result
 
3710
 
 
3711
 
 
3712
def apply_scenario(test, scenario):
 
3713
    """Copy test and apply scenario to it.
 
3714
 
 
3715
    :param test: A test to adapt.
 
3716
    :param scenario: A tuple describing the scenarion.
 
3717
        The first element of the tuple is the new test id.
 
3718
        The second element is a dict containing attributes to set on the
 
3719
        test.
 
3720
    :return: The adapted test.
 
3721
    """
 
3722
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
3723
    new_test = clone_test(test, new_id)
 
3724
    for name, value in scenario[1].items():
 
3725
        setattr(new_test, name, value)
 
3726
    return new_test
 
3727
 
 
3728
 
 
3729
def clone_test(test, new_id):
 
3730
    """Clone a test giving it a new id.
 
3731
 
 
3732
    :param test: The test to clone.
 
3733
    :param new_id: The id to assign to it.
 
3734
    :return: The new test.
 
3735
    """
 
3736
    from copy import deepcopy
 
3737
    new_test = deepcopy(test)
 
3738
    new_test.id = lambda: new_id
 
3739
    return new_test
 
3740
 
 
3741
 
 
3742
def _rmtree_temp_dir(dirname):
 
3743
    # If LANG=C we probably have created some bogus paths
 
3744
    # which rmtree(unicode) will fail to delete
 
3745
    # so make sure we are using rmtree(str) to delete everything
 
3746
    # except on win32, where rmtree(str) will fail
 
3747
    # since it doesn't have the property of byte-stream paths
 
3748
    # (they are either ascii or mbcs)
 
3749
    if sys.platform == 'win32':
 
3750
        # make sure we are using the unicode win32 api
 
3751
        dirname = unicode(dirname)
 
3752
    else:
 
3753
        dirname = dirname.encode(sys.getfilesystemencoding())
 
3754
    try:
 
3755
        osutils.rmtree(dirname)
 
3756
    except OSError, e:
 
3757
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
3758
            sys.stderr.write('Permission denied: '
 
3759
                             'unable to remove testing dir '
 
3760
                             '%s\n%s'
 
3761
                             % (os.path.basename(dirname), e))
 
3762
        else:
 
3763
            raise
 
3764
 
 
3765
 
 
3766
class Feature(object):
 
3767
    """An operating system Feature."""
 
3768
 
 
3769
    def __init__(self):
 
3770
        self._available = None
 
3771
 
 
3772
    def available(self):
 
3773
        """Is the feature available?
 
3774
 
 
3775
        :return: True if the feature is available.
 
3776
        """
 
3777
        if self._available is None:
 
3778
            self._available = self._probe()
 
3779
        return self._available
 
3780
 
 
3781
    def _probe(self):
 
3782
        """Implement this method in concrete features.
 
3783
 
 
3784
        :return: True if the feature is available.
 
3785
        """
 
3786
        raise NotImplementedError
 
3787
 
 
3788
    def __str__(self):
 
3789
        if getattr(self, 'feature_name', None):
 
3790
            return self.feature_name()
 
3791
        return self.__class__.__name__
 
3792
 
 
3793
 
 
3794
class _SymlinkFeature(Feature):
 
3795
 
 
3796
    def _probe(self):
 
3797
        return osutils.has_symlinks()
 
3798
 
 
3799
    def feature_name(self):
 
3800
        return 'symlinks'
 
3801
 
 
3802
SymlinkFeature = _SymlinkFeature()
 
3803
 
 
3804
 
 
3805
class _HardlinkFeature(Feature):
 
3806
 
 
3807
    def _probe(self):
 
3808
        return osutils.has_hardlinks()
 
3809
 
 
3810
    def feature_name(self):
 
3811
        return 'hardlinks'
 
3812
 
 
3813
HardlinkFeature = _HardlinkFeature()
 
3814
 
 
3815
 
 
3816
class _OsFifoFeature(Feature):
 
3817
 
 
3818
    def _probe(self):
 
3819
        return getattr(os, 'mkfifo', None)
 
3820
 
 
3821
    def feature_name(self):
 
3822
        return 'filesystem fifos'
 
3823
 
 
3824
OsFifoFeature = _OsFifoFeature()
 
3825
 
 
3826
 
 
3827
class _UnicodeFilenameFeature(Feature):
 
3828
    """Does the filesystem support Unicode filenames?"""
 
3829
 
 
3830
    def _probe(self):
 
3831
        try:
 
3832
            # Check for character combinations unlikely to be covered by any
 
3833
            # single non-unicode encoding. We use the characters
 
3834
            # - greek small letter alpha (U+03B1) and
 
3835
            # - braille pattern dots-123456 (U+283F).
 
3836
            os.stat(u'\u03b1\u283f')
 
3837
        except UnicodeEncodeError:
 
3838
            return False
 
3839
        except (IOError, OSError):
 
3840
            # The filesystem allows the Unicode filename but the file doesn't
 
3841
            # exist.
 
3842
            return True
 
3843
        else:
 
3844
            # The filesystem allows the Unicode filename and the file exists,
 
3845
            # for some reason.
 
3846
            return True
 
3847
 
 
3848
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
3849
 
 
3850
 
 
3851
def probe_unicode_in_user_encoding():
 
3852
    """Try to encode several unicode strings to use in unicode-aware tests.
 
3853
    Return first successfull match.
 
3854
 
 
3855
    :return:  (unicode value, encoded plain string value) or (None, None)
 
3856
    """
 
3857
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
3858
    for uni_val in possible_vals:
 
3859
        try:
 
3860
            str_val = uni_val.encode(osutils.get_user_encoding())
 
3861
        except UnicodeEncodeError:
 
3862
            # Try a different character
 
3863
            pass
 
3864
        else:
 
3865
            return uni_val, str_val
 
3866
    return None, None
 
3867
 
 
3868
 
 
3869
def probe_bad_non_ascii(encoding):
 
3870
    """Try to find [bad] character with code [128..255]
 
3871
    that cannot be decoded to unicode in some encoding.
 
3872
    Return None if all non-ascii characters is valid
 
3873
    for given encoding.
 
3874
    """
 
3875
    for i in xrange(128, 256):
 
3876
        char = chr(i)
 
3877
        try:
 
3878
            char.decode(encoding)
 
3879
        except UnicodeDecodeError:
 
3880
            return char
 
3881
    return None
 
3882
 
 
3883
 
 
3884
class _HTTPSServerFeature(Feature):
 
3885
    """Some tests want an https Server, check if one is available.
 
3886
 
 
3887
    Right now, the only way this is available is under python2.6 which provides
 
3888
    an ssl module.
 
3889
    """
 
3890
 
 
3891
    def _probe(self):
 
3892
        try:
 
3893
            import ssl
 
3894
            return True
 
3895
        except ImportError:
 
3896
            return False
 
3897
 
 
3898
    def feature_name(self):
 
3899
        return 'HTTPSServer'
 
3900
 
 
3901
 
 
3902
HTTPSServerFeature = _HTTPSServerFeature()
 
3903
 
 
3904
 
 
3905
class _UnicodeFilename(Feature):
 
3906
    """Does the filesystem support Unicode filenames?"""
 
3907
 
 
3908
    def _probe(self):
 
3909
        try:
 
3910
            os.stat(u'\u03b1')
 
3911
        except UnicodeEncodeError:
 
3912
            return False
 
3913
        except (IOError, OSError):
 
3914
            # The filesystem allows the Unicode filename but the file doesn't
 
3915
            # exist.
 
3916
            return True
 
3917
        else:
 
3918
            # The filesystem allows the Unicode filename and the file exists,
 
3919
            # for some reason.
 
3920
            return True
 
3921
 
 
3922
UnicodeFilename = _UnicodeFilename()
 
3923
 
 
3924
 
 
3925
class _UTF8Filesystem(Feature):
 
3926
    """Is the filesystem UTF-8?"""
 
3927
 
 
3928
    def _probe(self):
 
3929
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
3930
            return True
 
3931
        return False
 
3932
 
 
3933
UTF8Filesystem = _UTF8Filesystem()
 
3934
 
 
3935
 
 
3936
class _CaseInsCasePresFilenameFeature(Feature):
 
3937
    """Is the file-system case insensitive, but case-preserving?"""
 
3938
 
 
3939
    def _probe(self):
 
3940
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
3941
        try:
 
3942
            # first check truly case-preserving for created files, then check
 
3943
            # case insensitive when opening existing files.
 
3944
            name = osutils.normpath(name)
 
3945
            base, rel = osutils.split(name)
 
3946
            found_rel = osutils.canonical_relpath(base, name)
 
3947
            return (found_rel == rel
 
3948
                    and os.path.isfile(name.upper())
 
3949
                    and os.path.isfile(name.lower()))
 
3950
        finally:
 
3951
            os.close(fileno)
 
3952
            os.remove(name)
 
3953
 
 
3954
    def feature_name(self):
 
3955
        return "case-insensitive case-preserving filesystem"
 
3956
 
 
3957
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
3958
 
 
3959
 
 
3960
class _CaseInsensitiveFilesystemFeature(Feature):
 
3961
    """Check if underlying filesystem is case-insensitive but *not* case
 
3962
    preserving.
 
3963
    """
 
3964
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
3965
    # more likely to be case preserving, so this case is rare.
 
3966
 
 
3967
    def _probe(self):
 
3968
        if CaseInsCasePresFilenameFeature.available():
 
3969
            return False
 
3970
 
 
3971
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
3972
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
3973
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
3974
        else:
 
3975
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
3976
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
3977
            dir=root)
 
3978
        name_a = osutils.pathjoin(tdir, 'a')
 
3979
        name_A = osutils.pathjoin(tdir, 'A')
 
3980
        os.mkdir(name_a)
 
3981
        result = osutils.isdir(name_A)
 
3982
        _rmtree_temp_dir(tdir)
 
3983
        return result
 
3984
 
 
3985
    def feature_name(self):
 
3986
        return 'case-insensitive filesystem'
 
3987
 
 
3988
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
3989
 
 
3990
 
 
3991
class _SubUnitFeature(Feature):
 
3992
    """Check if subunit is available."""
 
3993
 
 
3994
    def _probe(self):
 
3995
        try:
 
3996
            import subunit
 
3997
            return True
 
3998
        except ImportError:
 
3999
            return False
 
4000
 
 
4001
    def feature_name(self):
 
4002
        return 'subunit'
 
4003
 
 
4004
SubUnitFeature = _SubUnitFeature()
 
4005
# Only define SubUnitBzrRunner if subunit is available.
 
4006
try:
 
4007
    from subunit import TestProtocolClient
 
4008
    try:
 
4009
        from subunit.test_results import AutoTimingTestResultDecorator
 
4010
    except ImportError:
 
4011
        AutoTimingTestResultDecorator = lambda x:x
 
4012
    class SubUnitBzrRunner(TextTestRunner):
 
4013
        def run(self, test):
 
4014
            result = AutoTimingTestResultDecorator(
 
4015
                TestProtocolClient(self.stream))
 
4016
            test.run(result)
 
4017
            return result
 
4018
except ImportError:
 
4019
    pass