/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-08-27 00:53:27 UTC
  • mfrom: (4436.2.1 386180-check)
  • Revision ID: pqm@pqm.ubuntu.com-20090827005327-iky1i2fzwi75h4ie
(mbp) make check no longer repeats the test run in LANG=C

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from copy import copy
 
32
from cStringIO import StringIO
 
33
import difflib
 
34
import doctest
 
35
import errno
 
36
import logging
 
37
import math
 
38
import os
 
39
from pprint import pformat
 
40
import random
 
41
import re
 
42
import shlex
 
43
import stat
 
44
from subprocess import Popen, PIPE, STDOUT
 
45
import sys
 
46
import tempfile
 
47
import threading
 
48
import time
 
49
import unittest
 
50
import warnings
 
51
 
 
52
 
 
53
from bzrlib import (
 
54
    branchbuilder,
 
55
    bzrdir,
 
56
    debug,
 
57
    errors,
 
58
    hooks,
 
59
    lock as _mod_lock,
 
60
    memorytree,
 
61
    osutils,
 
62
    progress,
 
63
    ui,
 
64
    urlutils,
 
65
    registry,
 
66
    workingtree,
 
67
    )
 
68
import bzrlib.branch
 
69
import bzrlib.commands
 
70
import bzrlib.timestamp
 
71
import bzrlib.export
 
72
import bzrlib.inventory
 
73
import bzrlib.iterablefile
 
74
import bzrlib.lockdir
 
75
try:
 
76
    import bzrlib.lsprof
 
77
except ImportError:
 
78
    # lsprof not available
 
79
    pass
 
80
from bzrlib.merge import merge_inner
 
81
import bzrlib.merge3
 
82
import bzrlib.plugin
 
83
from bzrlib.smart import client, request, server
 
84
import bzrlib.store
 
85
from bzrlib import symbol_versioning
 
86
from bzrlib.symbol_versioning import (
 
87
    DEPRECATED_PARAMETER,
 
88
    deprecated_function,
 
89
    deprecated_method,
 
90
    deprecated_passed,
 
91
    )
 
92
import bzrlib.trace
 
93
from bzrlib.transport import get_transport
 
94
import bzrlib.transport
 
95
from bzrlib.transport.local import LocalURLServer
 
96
from bzrlib.transport.memory import MemoryServer
 
97
from bzrlib.transport.readonly import ReadonlyServer
 
98
from bzrlib.trace import mutter, note
 
99
from bzrlib.tests import TestUtil
 
100
from bzrlib.tests.http_server import HttpServer
 
101
from bzrlib.tests.TestUtil import (
 
102
                          TestSuite,
 
103
                          TestLoader,
 
104
                          )
 
105
from bzrlib.tests.treeshape import build_tree_contents
 
106
from bzrlib.ui import NullProgressView
 
107
from bzrlib.ui.text import TextUIFactory
 
108
import bzrlib.version_info_formats.format_custom
 
109
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
110
 
 
111
# Mark this python module as being part of the implementation
 
112
# of unittest: this gives us better tracebacks where the last
 
113
# shown frame is the test code, not our assertXYZ.
 
114
__unittest = 1
 
115
 
 
116
default_transport = LocalURLServer
 
117
 
 
118
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
119
SUBUNIT_SEEK_SET = 0
 
120
SUBUNIT_SEEK_CUR = 1
 
121
 
 
122
 
 
123
class ExtendedTestResult(unittest._TextTestResult):
 
124
    """Accepts, reports and accumulates the results of running tests.
 
125
 
 
126
    Compared to the unittest version this class adds support for
 
127
    profiling, benchmarking, stopping as soon as a test fails,  and
 
128
    skipping tests.  There are further-specialized subclasses for
 
129
    different types of display.
 
130
 
 
131
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
132
    addFailure or addError classes.  These in turn may redirect to a more
 
133
    specific case for the special test results supported by our extended
 
134
    tests.
 
135
 
 
136
    Note that just one of these objects is fed the results from many tests.
 
137
    """
 
138
 
 
139
    stop_early = False
 
140
 
 
141
    def __init__(self, stream, descriptions, verbosity,
 
142
                 bench_history=None,
 
143
                 strict=False,
 
144
                 ):
 
145
        """Construct new TestResult.
 
146
 
 
147
        :param bench_history: Optionally, a writable file object to accumulate
 
148
            benchmark results.
 
149
        """
 
150
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
151
        if bench_history is not None:
 
152
            from bzrlib.version import _get_bzr_source_tree
 
153
            src_tree = _get_bzr_source_tree()
 
154
            if src_tree:
 
155
                try:
 
156
                    revision_id = src_tree.get_parent_ids()[0]
 
157
                except IndexError:
 
158
                    # XXX: if this is a brand new tree, do the same as if there
 
159
                    # is no branch.
 
160
                    revision_id = ''
 
161
            else:
 
162
                # XXX: If there's no branch, what should we do?
 
163
                revision_id = ''
 
164
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
165
        self._bench_history = bench_history
 
166
        self.ui = ui.ui_factory
 
167
        self.num_tests = 0
 
168
        self.error_count = 0
 
169
        self.failure_count = 0
 
170
        self.known_failure_count = 0
 
171
        self.skip_count = 0
 
172
        self.not_applicable_count = 0
 
173
        self.unsupported = {}
 
174
        self.count = 0
 
175
        self._overall_start_time = time.time()
 
176
        self._strict = strict
 
177
 
 
178
    def done(self):
 
179
        # nb: called stopTestRun in the version of this that Python merged
 
180
        # upstream, according to lifeless 20090803
 
181
        if self._strict:
 
182
            ok = self.wasStrictlySuccessful()
 
183
        else:
 
184
            ok = self.wasSuccessful()
 
185
        if ok:
 
186
            self.stream.write('tests passed\n')
 
187
        else:
 
188
            self.stream.write('tests failed\n')
 
189
        if TestCase._first_thread_leaker_id:
 
190
            self.stream.write(
 
191
                '%s is leaking threads among %d leaking tests.\n' % (
 
192
                TestCase._first_thread_leaker_id,
 
193
                TestCase._leaking_threads_tests))
 
194
 
 
195
    def _extractBenchmarkTime(self, testCase):
 
196
        """Add a benchmark time for the current test case."""
 
197
        return getattr(testCase, "_benchtime", None)
 
198
 
 
199
    def _elapsedTestTimeString(self):
 
200
        """Return a time string for the overall time the current test has taken."""
 
201
        return self._formatTime(time.time() - self._start_time)
 
202
 
 
203
    def _testTimeString(self, testCase):
 
204
        benchmark_time = self._extractBenchmarkTime(testCase)
 
205
        if benchmark_time is not None:
 
206
            return self._formatTime(benchmark_time) + "*"
 
207
        else:
 
208
            return self._elapsedTestTimeString()
 
209
 
 
210
    def _formatTime(self, seconds):
 
211
        """Format seconds as milliseconds with leading spaces."""
 
212
        # some benchmarks can take thousands of seconds to run, so we need 8
 
213
        # places
 
214
        return "%8dms" % (1000 * seconds)
 
215
 
 
216
    def _shortened_test_description(self, test):
 
217
        what = test.id()
 
218
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
219
        return what
 
220
 
 
221
    def startTest(self, test):
 
222
        unittest.TestResult.startTest(self, test)
 
223
        if self.count == 0:
 
224
            self.startTests()
 
225
        self.report_test_start(test)
 
226
        test.number = self.count
 
227
        self._recordTestStartTime()
 
228
 
 
229
    def startTests(self):
 
230
        import platform
 
231
        if getattr(sys, 'frozen', None) is None:
 
232
            bzr_path = osutils.realpath(sys.argv[0])
 
233
        else:
 
234
            bzr_path = sys.executable
 
235
        self.stream.write(
 
236
            'testing: %s\n' % (bzr_path,))
 
237
        self.stream.write(
 
238
            '   %s\n' % (
 
239
                    bzrlib.__path__[0],))
 
240
        self.stream.write(
 
241
            '   bzr-%s python-%s %s\n' % (
 
242
                    bzrlib.version_string,
 
243
                    bzrlib._format_version_tuple(sys.version_info),
 
244
                    platform.platform(aliased=1),
 
245
                    ))
 
246
        self.stream.write('\n')
 
247
 
 
248
    def _recordTestStartTime(self):
 
249
        """Record that a test has started."""
 
250
        self._start_time = time.time()
 
251
 
 
252
    def _cleanupLogFile(self, test):
 
253
        # We can only do this if we have one of our TestCases, not if
 
254
        # we have a doctest.
 
255
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
256
        if setKeepLogfile is not None:
 
257
            setKeepLogfile()
 
258
 
 
259
    def addError(self, test, err):
 
260
        """Tell result that test finished with an error.
 
261
 
 
262
        Called from the TestCase run() method when the test
 
263
        fails with an unexpected error.
 
264
        """
 
265
        self._testConcluded(test)
 
266
        if isinstance(err[1], TestNotApplicable):
 
267
            return self._addNotApplicable(test, err)
 
268
        elif isinstance(err[1], UnavailableFeature):
 
269
            return self.addNotSupported(test, err[1].args[0])
 
270
        else:
 
271
            unittest.TestResult.addError(self, test, err)
 
272
            self.error_count += 1
 
273
            self.report_error(test, err)
 
274
            if self.stop_early:
 
275
                self.stop()
 
276
            self._cleanupLogFile(test)
 
277
 
 
278
    def addFailure(self, test, err):
 
279
        """Tell result that test failed.
 
280
 
 
281
        Called from the TestCase run() method when the test
 
282
        fails because e.g. an assert() method failed.
 
283
        """
 
284
        self._testConcluded(test)
 
285
        if isinstance(err[1], KnownFailure):
 
286
            return self._addKnownFailure(test, err)
 
287
        else:
 
288
            unittest.TestResult.addFailure(self, test, err)
 
289
            self.failure_count += 1
 
290
            self.report_failure(test, err)
 
291
            if self.stop_early:
 
292
                self.stop()
 
293
            self._cleanupLogFile(test)
 
294
 
 
295
    def addSuccess(self, test):
 
296
        """Tell result that test completed successfully.
 
297
 
 
298
        Called from the TestCase run()
 
299
        """
 
300
        self._testConcluded(test)
 
301
        if self._bench_history is not None:
 
302
            benchmark_time = self._extractBenchmarkTime(test)
 
303
            if benchmark_time is not None:
 
304
                self._bench_history.write("%s %s\n" % (
 
305
                    self._formatTime(benchmark_time),
 
306
                    test.id()))
 
307
        self.report_success(test)
 
308
        self._cleanupLogFile(test)
 
309
        unittest.TestResult.addSuccess(self, test)
 
310
        test._log_contents = ''
 
311
 
 
312
    def _testConcluded(self, test):
 
313
        """Common code when a test has finished.
 
314
 
 
315
        Called regardless of whether it succeded, failed, etc.
 
316
        """
 
317
        pass
 
318
 
 
319
    def _addKnownFailure(self, test, err):
 
320
        self.known_failure_count += 1
 
321
        self.report_known_failure(test, err)
 
322
 
 
323
    def addNotSupported(self, test, feature):
 
324
        """The test will not be run because of a missing feature.
 
325
        """
 
326
        # this can be called in two different ways: it may be that the
 
327
        # test started running, and then raised (through addError)
 
328
        # UnavailableFeature.  Alternatively this method can be called
 
329
        # while probing for features before running the tests; in that
 
330
        # case we will see startTest and stopTest, but the test will never
 
331
        # actually run.
 
332
        self.unsupported.setdefault(str(feature), 0)
 
333
        self.unsupported[str(feature)] += 1
 
334
        self.report_unsupported(test, feature)
 
335
 
 
336
    def addSkip(self, test, reason):
 
337
        """A test has not run for 'reason'."""
 
338
        self.skip_count += 1
 
339
        self.report_skip(test, reason)
 
340
 
 
341
    def _addNotApplicable(self, test, skip_excinfo):
 
342
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
343
            self.not_applicable_count += 1
 
344
            self.report_not_applicable(test, skip_excinfo)
 
345
        try:
 
346
            test.tearDown()
 
347
        except KeyboardInterrupt:
 
348
            raise
 
349
        except:
 
350
            self.addError(test, test.exc_info())
 
351
        else:
 
352
            # seems best to treat this as success from point-of-view of unittest
 
353
            # -- it actually does nothing so it barely matters :)
 
354
            unittest.TestResult.addSuccess(self, test)
 
355
            test._log_contents = ''
 
356
 
 
357
    def printErrorList(self, flavour, errors):
 
358
        for test, err in errors:
 
359
            self.stream.writeln(self.separator1)
 
360
            self.stream.write("%s: " % flavour)
 
361
            self.stream.writeln(self.getDescription(test))
 
362
            if getattr(test, '_get_log', None) is not None:
 
363
                log_contents = test._get_log()
 
364
                if log_contents:
 
365
                    self.stream.write('\n')
 
366
                    self.stream.write(
 
367
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
368
                    self.stream.write('\n')
 
369
                    self.stream.write(log_contents)
 
370
                    self.stream.write('\n')
 
371
                    self.stream.write(
 
372
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
373
                    self.stream.write('\n')
 
374
            self.stream.writeln(self.separator2)
 
375
            self.stream.writeln("%s" % err)
 
376
 
 
377
    def progress(self, offset, whence):
 
378
        """The test is adjusting the count of tests to run."""
 
379
        if whence == SUBUNIT_SEEK_SET:
 
380
            self.num_tests = offset
 
381
        elif whence == SUBUNIT_SEEK_CUR:
 
382
            self.num_tests += offset
 
383
        else:
 
384
            raise errors.BzrError("Unknown whence %r" % whence)
 
385
 
 
386
    def finished(self):
 
387
        pass
 
388
 
 
389
    def report_cleaning_up(self):
 
390
        pass
 
391
 
 
392
    def report_success(self, test):
 
393
        pass
 
394
 
 
395
    def wasStrictlySuccessful(self):
 
396
        if self.unsupported or self.known_failure_count:
 
397
            return False
 
398
        return self.wasSuccessful()
 
399
 
 
400
 
 
401
class TextTestResult(ExtendedTestResult):
 
402
    """Displays progress and results of tests in text form"""
 
403
 
 
404
    def __init__(self, stream, descriptions, verbosity,
 
405
                 bench_history=None,
 
406
                 pb=None,
 
407
                 strict=None,
 
408
                 ):
 
409
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
410
            bench_history, strict)
 
411
        # We no longer pass them around, but just rely on the UIFactory stack
 
412
        # for state
 
413
        if pb is not None:
 
414
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
415
        self.pb = self.ui.nested_progress_bar()
 
416
        self.pb.show_pct = False
 
417
        self.pb.show_spinner = False
 
418
        self.pb.show_eta = False,
 
419
        self.pb.show_count = False
 
420
        self.pb.show_bar = False
 
421
        self.pb.update_latency = 0
 
422
        self.pb.show_transport_activity = False
 
423
 
 
424
    def done(self):
 
425
        # called when the tests that are going to run have run
 
426
        self.pb.clear()
 
427
        super(TextTestResult, self).done()
 
428
 
 
429
    def finished(self):
 
430
        self.pb.finished()
 
431
 
 
432
    def report_starting(self):
 
433
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
434
 
 
435
    def printErrors(self):
 
436
        # clear the pb to make room for the error listing
 
437
        self.pb.clear()
 
438
        super(TextTestResult, self).printErrors()
 
439
 
 
440
    def _progress_prefix_text(self):
 
441
        # the longer this text, the less space we have to show the test
 
442
        # name...
 
443
        a = '[%d' % self.count              # total that have been run
 
444
        # tests skipped as known not to be relevant are not important enough
 
445
        # to show here
 
446
        ## if self.skip_count:
 
447
        ##     a += ', %d skip' % self.skip_count
 
448
        ## if self.known_failure_count:
 
449
        ##     a += '+%dX' % self.known_failure_count
 
450
        if self.num_tests:
 
451
            a +='/%d' % self.num_tests
 
452
        a += ' in '
 
453
        runtime = time.time() - self._overall_start_time
 
454
        if runtime >= 60:
 
455
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
456
        else:
 
457
            a += '%ds' % runtime
 
458
        if self.error_count:
 
459
            a += ', %d err' % self.error_count
 
460
        if self.failure_count:
 
461
            a += ', %d fail' % self.failure_count
 
462
        if self.unsupported:
 
463
            a += ', %d missing' % len(self.unsupported)
 
464
        a += ']'
 
465
        return a
 
466
 
 
467
    def report_test_start(self, test):
 
468
        self.count += 1
 
469
        self.pb.update(
 
470
                self._progress_prefix_text()
 
471
                + ' '
 
472
                + self._shortened_test_description(test))
 
473
 
 
474
    def _test_description(self, test):
 
475
        return self._shortened_test_description(test)
 
476
 
 
477
    def report_error(self, test, err):
 
478
        self.pb.note('ERROR: %s\n    %s\n',
 
479
            self._test_description(test),
 
480
            err[1],
 
481
            )
 
482
 
 
483
    def report_failure(self, test, err):
 
484
        self.pb.note('FAIL: %s\n    %s\n',
 
485
            self._test_description(test),
 
486
            err[1],
 
487
            )
 
488
 
 
489
    def report_known_failure(self, test, err):
 
490
        self.pb.note('XFAIL: %s\n%s\n',
 
491
            self._test_description(test), err[1])
 
492
 
 
493
    def report_skip(self, test, reason):
 
494
        pass
 
495
 
 
496
    def report_not_applicable(self, test, skip_excinfo):
 
497
        pass
 
498
 
 
499
    def report_unsupported(self, test, feature):
 
500
        """test cannot be run because feature is missing."""
 
501
 
 
502
    def report_cleaning_up(self):
 
503
        self.pb.update('Cleaning up')
 
504
 
 
505
 
 
506
class VerboseTestResult(ExtendedTestResult):
 
507
    """Produce long output, with one line per test run plus times"""
 
508
 
 
509
    def _ellipsize_to_right(self, a_string, final_width):
 
510
        """Truncate and pad a string, keeping the right hand side"""
 
511
        if len(a_string) > final_width:
 
512
            result = '...' + a_string[3-final_width:]
 
513
        else:
 
514
            result = a_string
 
515
        return result.ljust(final_width)
 
516
 
 
517
    def report_starting(self):
 
518
        self.stream.write('running %d tests...\n' % self.num_tests)
 
519
 
 
520
    def report_test_start(self, test):
 
521
        self.count += 1
 
522
        name = self._shortened_test_description(test)
 
523
        # width needs space for 6 char status, plus 1 for slash, plus an
 
524
        # 11-char time string, plus a trailing blank
 
525
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
526
        self.stream.write(self._ellipsize_to_right(name,
 
527
                          osutils.terminal_width()-18))
 
528
        self.stream.flush()
 
529
 
 
530
    def _error_summary(self, err):
 
531
        indent = ' ' * 4
 
532
        return '%s%s' % (indent, err[1])
 
533
 
 
534
    def report_error(self, test, err):
 
535
        self.stream.writeln('ERROR %s\n%s'
 
536
                % (self._testTimeString(test),
 
537
                   self._error_summary(err)))
 
538
 
 
539
    def report_failure(self, test, err):
 
540
        self.stream.writeln(' FAIL %s\n%s'
 
541
                % (self._testTimeString(test),
 
542
                   self._error_summary(err)))
 
543
 
 
544
    def report_known_failure(self, test, err):
 
545
        self.stream.writeln('XFAIL %s\n%s'
 
546
                % (self._testTimeString(test),
 
547
                   self._error_summary(err)))
 
548
 
 
549
    def report_success(self, test):
 
550
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
551
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
552
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
553
            stats.pprint(file=self.stream)
 
554
        # flush the stream so that we get smooth output. This verbose mode is
 
555
        # used to show the output in PQM.
 
556
        self.stream.flush()
 
557
 
 
558
    def report_skip(self, test, reason):
 
559
        self.stream.writeln(' SKIP %s\n%s'
 
560
                % (self._testTimeString(test), reason))
 
561
 
 
562
    def report_not_applicable(self, test, skip_excinfo):
 
563
        self.stream.writeln('  N/A %s\n%s'
 
564
                % (self._testTimeString(test),
 
565
                   self._error_summary(skip_excinfo)))
 
566
 
 
567
    def report_unsupported(self, test, feature):
 
568
        """test cannot be run because feature is missing."""
 
569
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
570
                %(self._testTimeString(test), feature))
 
571
 
 
572
 
 
573
class TextTestRunner(object):
 
574
    stop_on_failure = False
 
575
 
 
576
    def __init__(self,
 
577
                 stream=sys.stderr,
 
578
                 descriptions=0,
 
579
                 verbosity=1,
 
580
                 bench_history=None,
 
581
                 list_only=False,
 
582
                 strict=False,
 
583
                 result_decorators=None,
 
584
                 ):
 
585
        """Create a TextTestRunner.
 
586
 
 
587
        :param result_decorators: An optional list of decorators to apply
 
588
            to the result object being used by the runner. Decorators are
 
589
            applied left to right - the first element in the list is the 
 
590
            innermost decorator.
 
591
        """
 
592
        self.stream = unittest._WritelnDecorator(stream)
 
593
        self.descriptions = descriptions
 
594
        self.verbosity = verbosity
 
595
        self._bench_history = bench_history
 
596
        self.list_only = list_only
 
597
        self._strict = strict
 
598
        self._result_decorators = result_decorators or []
 
599
 
 
600
    def run(self, test):
 
601
        "Run the given test case or test suite."
 
602
        startTime = time.time()
 
603
        if self.verbosity == 1:
 
604
            result_class = TextTestResult
 
605
        elif self.verbosity >= 2:
 
606
            result_class = VerboseTestResult
 
607
        result = result_class(self.stream,
 
608
                              self.descriptions,
 
609
                              self.verbosity,
 
610
                              bench_history=self._bench_history,
 
611
                              strict=self._strict,
 
612
                              )
 
613
        run_result = result
 
614
        for decorator in self._result_decorators:
 
615
            run_result = decorator(run_result)
 
616
        result.stop_early = self.stop_on_failure
 
617
        result.report_starting()
 
618
        if self.list_only:
 
619
            if self.verbosity >= 2:
 
620
                self.stream.writeln("Listing tests only ...\n")
 
621
            run = 0
 
622
            for t in iter_suite_tests(test):
 
623
                self.stream.writeln("%s" % (t.id()))
 
624
                run += 1
 
625
            return None
 
626
        else:
 
627
            try:
 
628
                import testtools
 
629
            except ImportError:
 
630
                test.run(run_result)
 
631
            else:
 
632
                if isinstance(test, testtools.ConcurrentTestSuite):
 
633
                    # We need to catch bzr specific behaviors
 
634
                    test.run(BZRTransformingResult(run_result))
 
635
                else:
 
636
                    test.run(run_result)
 
637
            run = result.testsRun
 
638
            actionTaken = "Ran"
 
639
        stopTime = time.time()
 
640
        timeTaken = stopTime - startTime
 
641
        result.printErrors()
 
642
        self.stream.writeln(result.separator2)
 
643
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
644
                            run, run != 1 and "s" or "", timeTaken))
 
645
        self.stream.writeln()
 
646
        if not result.wasSuccessful():
 
647
            self.stream.write("FAILED (")
 
648
            failed, errored = map(len, (result.failures, result.errors))
 
649
            if failed:
 
650
                self.stream.write("failures=%d" % failed)
 
651
            if errored:
 
652
                if failed: self.stream.write(", ")
 
653
                self.stream.write("errors=%d" % errored)
 
654
            if result.known_failure_count:
 
655
                if failed or errored: self.stream.write(", ")
 
656
                self.stream.write("known_failure_count=%d" %
 
657
                    result.known_failure_count)
 
658
            self.stream.writeln(")")
 
659
        else:
 
660
            if result.known_failure_count:
 
661
                self.stream.writeln("OK (known_failures=%d)" %
 
662
                    result.known_failure_count)
 
663
            else:
 
664
                self.stream.writeln("OK")
 
665
        if result.skip_count > 0:
 
666
            skipped = result.skip_count
 
667
            self.stream.writeln('%d test%s skipped' %
 
668
                                (skipped, skipped != 1 and "s" or ""))
 
669
        if result.unsupported:
 
670
            for feature, count in sorted(result.unsupported.items()):
 
671
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
672
                    (feature, count))
 
673
        result.finished()
 
674
        return result
 
675
 
 
676
 
 
677
def iter_suite_tests(suite):
 
678
    """Return all tests in a suite, recursing through nested suites"""
 
679
    if isinstance(suite, unittest.TestCase):
 
680
        yield suite
 
681
    elif isinstance(suite, unittest.TestSuite):
 
682
        for item in suite:
 
683
            for r in iter_suite_tests(item):
 
684
                yield r
 
685
    else:
 
686
        raise Exception('unknown type %r for object %r'
 
687
                        % (type(suite), suite))
 
688
 
 
689
 
 
690
class TestSkipped(Exception):
 
691
    """Indicates that a test was intentionally skipped, rather than failing."""
 
692
 
 
693
 
 
694
class TestNotApplicable(TestSkipped):
 
695
    """A test is not applicable to the situation where it was run.
 
696
 
 
697
    This is only normally raised by parameterized tests, if they find that
 
698
    the instance they're constructed upon does not support one aspect
 
699
    of its interface.
 
700
    """
 
701
 
 
702
 
 
703
class KnownFailure(AssertionError):
 
704
    """Indicates that a test failed in a precisely expected manner.
 
705
 
 
706
    Such failures dont block the whole test suite from passing because they are
 
707
    indicators of partially completed code or of future work. We have an
 
708
    explicit error for them so that we can ensure that they are always visible:
 
709
    KnownFailures are always shown in the output of bzr selftest.
 
710
    """
 
711
 
 
712
 
 
713
class UnavailableFeature(Exception):
 
714
    """A feature required for this test was not available.
 
715
 
 
716
    The feature should be used to construct the exception.
 
717
    """
 
718
 
 
719
 
 
720
class CommandFailed(Exception):
 
721
    pass
 
722
 
 
723
 
 
724
class StringIOWrapper(object):
 
725
    """A wrapper around cStringIO which just adds an encoding attribute.
 
726
 
 
727
    Internally we can check sys.stdout to see what the output encoding
 
728
    should be. However, cStringIO has no encoding attribute that we can
 
729
    set. So we wrap it instead.
 
730
    """
 
731
    encoding='ascii'
 
732
    _cstring = None
 
733
 
 
734
    def __init__(self, s=None):
 
735
        if s is not None:
 
736
            self.__dict__['_cstring'] = StringIO(s)
 
737
        else:
 
738
            self.__dict__['_cstring'] = StringIO()
 
739
 
 
740
    def __getattr__(self, name, getattr=getattr):
 
741
        return getattr(self.__dict__['_cstring'], name)
 
742
 
 
743
    def __setattr__(self, name, val):
 
744
        if name == 'encoding':
 
745
            self.__dict__['encoding'] = val
 
746
        else:
 
747
            return setattr(self._cstring, name, val)
 
748
 
 
749
 
 
750
class TestUIFactory(TextUIFactory):
 
751
    """A UI Factory for testing.
 
752
 
 
753
    Hide the progress bar but emit note()s.
 
754
    Redirect stdin.
 
755
    Allows get_password to be tested without real tty attached.
 
756
 
 
757
    See also CannedInputUIFactory which lets you provide programmatic input in
 
758
    a structured way.
 
759
    """
 
760
    # TODO: Capture progress events at the model level and allow them to be
 
761
    # observed by tests that care.
 
762
    #
 
763
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
764
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
765
    # idea of how they're supposed to be different.
 
766
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
767
 
 
768
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
769
        if stdin is not None:
 
770
            # We use a StringIOWrapper to be able to test various
 
771
            # encodings, but the user is still responsible to
 
772
            # encode the string and to set the encoding attribute
 
773
            # of StringIOWrapper.
 
774
            stdin = StringIOWrapper(stdin)
 
775
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
776
 
 
777
    def get_non_echoed_password(self):
 
778
        """Get password from stdin without trying to handle the echo mode"""
 
779
        password = self.stdin.readline()
 
780
        if not password:
 
781
            raise EOFError
 
782
        if password[-1] == '\n':
 
783
            password = password[:-1]
 
784
        return password
 
785
 
 
786
    def make_progress_view(self):
 
787
        return NullProgressView()
 
788
 
 
789
 
 
790
class TestCase(unittest.TestCase):
 
791
    """Base class for bzr unit tests.
 
792
 
 
793
    Tests that need access to disk resources should subclass
 
794
    TestCaseInTempDir not TestCase.
 
795
 
 
796
    Error and debug log messages are redirected from their usual
 
797
    location into a temporary file, the contents of which can be
 
798
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
799
    so that it can also capture file IO.  When the test completes this file
 
800
    is read into memory and removed from disk.
 
801
 
 
802
    There are also convenience functions to invoke bzr's command-line
 
803
    routine, and to build and check bzr trees.
 
804
 
 
805
    In addition to the usual method of overriding tearDown(), this class also
 
806
    allows subclasses to register functions into the _cleanups list, which is
 
807
    run in order as the object is torn down.  It's less likely this will be
 
808
    accidentally overlooked.
 
809
    """
 
810
 
 
811
    _active_threads = None
 
812
    _leaking_threads_tests = 0
 
813
    _first_thread_leaker_id = None
 
814
    _log_file_name = None
 
815
    _log_contents = ''
 
816
    _keep_log_file = False
 
817
    # record lsprof data when performing benchmark calls.
 
818
    _gather_lsprof_in_benchmarks = False
 
819
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
820
                     '_log_contents', '_log_file_name', '_benchtime',
 
821
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
822
 
 
823
    def __init__(self, methodName='testMethod'):
 
824
        super(TestCase, self).__init__(methodName)
 
825
        self._cleanups = []
 
826
        self._bzr_test_setUp_run = False
 
827
        self._bzr_test_tearDown_run = False
 
828
 
 
829
    def setUp(self):
 
830
        unittest.TestCase.setUp(self)
 
831
        self._bzr_test_setUp_run = True
 
832
        self._cleanEnvironment()
 
833
        self._silenceUI()
 
834
        self._startLogFile()
 
835
        self._benchcalls = []
 
836
        self._benchtime = None
 
837
        self._clear_hooks()
 
838
        self._track_locks()
 
839
        self._clear_debug_flags()
 
840
        TestCase._active_threads = threading.activeCount()
 
841
        self.addCleanup(self._check_leaked_threads)
 
842
 
 
843
    def debug(self):
 
844
        # debug a frame up.
 
845
        import pdb
 
846
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
847
 
 
848
    def _check_leaked_threads(self):
 
849
        active = threading.activeCount()
 
850
        leaked_threads = active - TestCase._active_threads
 
851
        TestCase._active_threads = active
 
852
        if leaked_threads:
 
853
            TestCase._leaking_threads_tests += 1
 
854
            if TestCase._first_thread_leaker_id is None:
 
855
                TestCase._first_thread_leaker_id = self.id()
 
856
 
 
857
    def _clear_debug_flags(self):
 
858
        """Prevent externally set debug flags affecting tests.
 
859
 
 
860
        Tests that want to use debug flags can just set them in the
 
861
        debug_flags set during setup/teardown.
 
862
        """
 
863
        self._preserved_debug_flags = set(debug.debug_flags)
 
864
        if 'allow_debug' not in selftest_debug_flags:
 
865
            debug.debug_flags.clear()
 
866
        if 'disable_lock_checks' not in selftest_debug_flags:
 
867
            debug.debug_flags.add('strict_locks')
 
868
        self.addCleanup(self._restore_debug_flags)
 
869
 
 
870
    def _clear_hooks(self):
 
871
        # prevent hooks affecting tests
 
872
        self._preserved_hooks = {}
 
873
        for key, factory in hooks.known_hooks.items():
 
874
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
875
            current_hooks = hooks.known_hooks_key_to_object(key)
 
876
            self._preserved_hooks[parent] = (name, current_hooks)
 
877
        self.addCleanup(self._restoreHooks)
 
878
        for key, factory in hooks.known_hooks.items():
 
879
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
880
            setattr(parent, name, factory())
 
881
        # this hook should always be installed
 
882
        request._install_hook()
 
883
 
 
884
    def _silenceUI(self):
 
885
        """Turn off UI for duration of test"""
 
886
        # by default the UI is off; tests can turn it on if they want it.
 
887
        saved = ui.ui_factory
 
888
        def _restore():
 
889
            ui.ui_factory = saved
 
890
        ui.ui_factory = ui.SilentUIFactory()
 
891
        self.addCleanup(_restore)
 
892
 
 
893
    def _check_locks(self):
 
894
        """Check that all lock take/release actions have been paired."""
 
895
        # We always check for mismatched locks. If a mismatch is found, we
 
896
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
897
        # case we just print a warning.
 
898
        # unhook:
 
899
        acquired_locks = [lock for action, lock in self._lock_actions
 
900
                          if action == 'acquired']
 
901
        released_locks = [lock for action, lock in self._lock_actions
 
902
                          if action == 'released']
 
903
        broken_locks = [lock for action, lock in self._lock_actions
 
904
                        if action == 'broken']
 
905
        # trivially, given the tests for lock acquistion and release, if we
 
906
        # have as many in each list, it should be ok. Some lock tests also
 
907
        # break some locks on purpose and should be taken into account by
 
908
        # considering that breaking a lock is just a dirty way of releasing it.
 
909
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
910
            message = ('Different number of acquired and '
 
911
                       'released or broken locks. (%s, %s + %s)' %
 
912
                       (acquired_locks, released_locks, broken_locks))
 
913
            if not self._lock_check_thorough:
 
914
                # Rather than fail, just warn
 
915
                print "Broken test %s: %s" % (self, message)
 
916
                return
 
917
            self.fail(message)
 
918
 
 
919
    def _track_locks(self):
 
920
        """Track lock activity during tests."""
 
921
        self._lock_actions = []
 
922
        if 'disable_lock_checks' in selftest_debug_flags:
 
923
            self._lock_check_thorough = False
 
924
        else:
 
925
            self._lock_check_thorough = True
 
926
            
 
927
        self.addCleanup(self._check_locks)
 
928
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
929
                                                self._lock_acquired, None)
 
930
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
931
                                                self._lock_released, None)
 
932
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
933
                                                self._lock_broken, None)
 
934
 
 
935
    def _lock_acquired(self, result):
 
936
        self._lock_actions.append(('acquired', result))
 
937
 
 
938
    def _lock_released(self, result):
 
939
        self._lock_actions.append(('released', result))
 
940
 
 
941
    def _lock_broken(self, result):
 
942
        self._lock_actions.append(('broken', result))
 
943
 
 
944
    def _ndiff_strings(self, a, b):
 
945
        """Return ndiff between two strings containing lines.
 
946
 
 
947
        A trailing newline is added if missing to make the strings
 
948
        print properly."""
 
949
        if b and b[-1] != '\n':
 
950
            b += '\n'
 
951
        if a and a[-1] != '\n':
 
952
            a += '\n'
 
953
        difflines = difflib.ndiff(a.splitlines(True),
 
954
                                  b.splitlines(True),
 
955
                                  linejunk=lambda x: False,
 
956
                                  charjunk=lambda x: False)
 
957
        return ''.join(difflines)
 
958
 
 
959
    def assertEqual(self, a, b, message=''):
 
960
        try:
 
961
            if a == b:
 
962
                return
 
963
        except UnicodeError, e:
 
964
            # If we can't compare without getting a UnicodeError, then
 
965
            # obviously they are different
 
966
            mutter('UnicodeError: %s', e)
 
967
        if message:
 
968
            message += '\n'
 
969
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
970
            % (message,
 
971
               pformat(a), pformat(b)))
 
972
 
 
973
    assertEquals = assertEqual
 
974
 
 
975
    def assertEqualDiff(self, a, b, message=None):
 
976
        """Assert two texts are equal, if not raise an exception.
 
977
 
 
978
        This is intended for use with multi-line strings where it can
 
979
        be hard to find the differences by eye.
 
980
        """
 
981
        # TODO: perhaps override assertEquals to call this for strings?
 
982
        if a == b:
 
983
            return
 
984
        if message is None:
 
985
            message = "texts not equal:\n"
 
986
        if a == b + '\n':
 
987
            message = 'first string is missing a final newline.\n'
 
988
        if a + '\n' == b:
 
989
            message = 'second string is missing a final newline.\n'
 
990
        raise AssertionError(message +
 
991
                             self._ndiff_strings(a, b))
 
992
 
 
993
    def assertEqualMode(self, mode, mode_test):
 
994
        self.assertEqual(mode, mode_test,
 
995
                         'mode mismatch %o != %o' % (mode, mode_test))
 
996
 
 
997
    def assertEqualStat(self, expected, actual):
 
998
        """assert that expected and actual are the same stat result.
 
999
 
 
1000
        :param expected: A stat result.
 
1001
        :param actual: A stat result.
 
1002
        :raises AssertionError: If the expected and actual stat values differ
 
1003
            other than by atime.
 
1004
        """
 
1005
        self.assertEqual(expected.st_size, actual.st_size)
 
1006
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
1007
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
1008
        self.assertEqual(expected.st_dev, actual.st_dev)
 
1009
        self.assertEqual(expected.st_ino, actual.st_ino)
 
1010
        self.assertEqual(expected.st_mode, actual.st_mode)
 
1011
 
 
1012
    def assertLength(self, length, obj_with_len):
 
1013
        """Assert that obj_with_len is of length length."""
 
1014
        if len(obj_with_len) != length:
 
1015
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1016
                length, len(obj_with_len), obj_with_len))
 
1017
 
 
1018
    def assertPositive(self, val):
 
1019
        """Assert that val is greater than 0."""
 
1020
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1021
 
 
1022
    def assertNegative(self, val):
 
1023
        """Assert that val is less than 0."""
 
1024
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1025
 
 
1026
    def assertStartsWith(self, s, prefix):
 
1027
        if not s.startswith(prefix):
 
1028
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1029
 
 
1030
    def assertEndsWith(self, s, suffix):
 
1031
        """Asserts that s ends with suffix."""
 
1032
        if not s.endswith(suffix):
 
1033
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1034
 
 
1035
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1036
        """Assert that a contains something matching a regular expression."""
 
1037
        if not re.search(needle_re, haystack, flags):
 
1038
            if '\n' in haystack or len(haystack) > 60:
 
1039
                # a long string, format it in a more readable way
 
1040
                raise AssertionError(
 
1041
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1042
                        % (needle_re, haystack))
 
1043
            else:
 
1044
                raise AssertionError('pattern "%s" not found in "%s"'
 
1045
                        % (needle_re, haystack))
 
1046
 
 
1047
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1048
        """Assert that a does not match a regular expression"""
 
1049
        if re.search(needle_re, haystack, flags):
 
1050
            raise AssertionError('pattern "%s" found in "%s"'
 
1051
                    % (needle_re, haystack))
 
1052
 
 
1053
    def assertSubset(self, sublist, superlist):
 
1054
        """Assert that every entry in sublist is present in superlist."""
 
1055
        missing = set(sublist) - set(superlist)
 
1056
        if len(missing) > 0:
 
1057
            raise AssertionError("value(s) %r not present in container %r" %
 
1058
                                 (missing, superlist))
 
1059
 
 
1060
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1061
        """Fail unless excClass is raised when the iterator from func is used.
 
1062
 
 
1063
        Many functions can return generators this makes sure
 
1064
        to wrap them in a list() call to make sure the whole generator
 
1065
        is run, and that the proper exception is raised.
 
1066
        """
 
1067
        try:
 
1068
            list(func(*args, **kwargs))
 
1069
        except excClass, e:
 
1070
            return e
 
1071
        else:
 
1072
            if getattr(excClass,'__name__', None) is not None:
 
1073
                excName = excClass.__name__
 
1074
            else:
 
1075
                excName = str(excClass)
 
1076
            raise self.failureException, "%s not raised" % excName
 
1077
 
 
1078
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1079
        """Assert that a callable raises a particular exception.
 
1080
 
 
1081
        :param excClass: As for the except statement, this may be either an
 
1082
            exception class, or a tuple of classes.
 
1083
        :param callableObj: A callable, will be passed ``*args`` and
 
1084
            ``**kwargs``.
 
1085
 
 
1086
        Returns the exception so that you can examine it.
 
1087
        """
 
1088
        try:
 
1089
            callableObj(*args, **kwargs)
 
1090
        except excClass, e:
 
1091
            return e
 
1092
        else:
 
1093
            if getattr(excClass,'__name__', None) is not None:
 
1094
                excName = excClass.__name__
 
1095
            else:
 
1096
                # probably a tuple
 
1097
                excName = str(excClass)
 
1098
            raise self.failureException, "%s not raised" % excName
 
1099
 
 
1100
    def assertIs(self, left, right, message=None):
 
1101
        if not (left is right):
 
1102
            if message is not None:
 
1103
                raise AssertionError(message)
 
1104
            else:
 
1105
                raise AssertionError("%r is not %r." % (left, right))
 
1106
 
 
1107
    def assertIsNot(self, left, right, message=None):
 
1108
        if (left is right):
 
1109
            if message is not None:
 
1110
                raise AssertionError(message)
 
1111
            else:
 
1112
                raise AssertionError("%r is %r." % (left, right))
 
1113
 
 
1114
    def assertTransportMode(self, transport, path, mode):
 
1115
        """Fail if a path does not have mode "mode".
 
1116
 
 
1117
        If modes are not supported on this transport, the assertion is ignored.
 
1118
        """
 
1119
        if not transport._can_roundtrip_unix_modebits():
 
1120
            return
 
1121
        path_stat = transport.stat(path)
 
1122
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1123
        self.assertEqual(mode, actual_mode,
 
1124
                         'mode of %r incorrect (%s != %s)'
 
1125
                         % (path, oct(mode), oct(actual_mode)))
 
1126
 
 
1127
    def assertIsSameRealPath(self, path1, path2):
 
1128
        """Fail if path1 and path2 points to different files"""
 
1129
        self.assertEqual(osutils.realpath(path1),
 
1130
                         osutils.realpath(path2),
 
1131
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1132
 
 
1133
    def assertIsInstance(self, obj, kls, msg=None):
 
1134
        """Fail if obj is not an instance of kls
 
1135
        
 
1136
        :param msg: Supplementary message to show if the assertion fails.
 
1137
        """
 
1138
        if not isinstance(obj, kls):
 
1139
            m = "%r is an instance of %s rather than %s" % (
 
1140
                obj, obj.__class__, kls)
 
1141
            if msg:
 
1142
                m += ": " + msg
 
1143
            self.fail(m)
 
1144
 
 
1145
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1146
        """Invoke a test, expecting it to fail for the given reason.
 
1147
 
 
1148
        This is for assertions that ought to succeed, but currently fail.
 
1149
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1150
        about the failure you're expecting.  If a new bug is introduced,
 
1151
        AssertionError should be raised, not KnownFailure.
 
1152
 
 
1153
        Frequently, expectFailure should be followed by an opposite assertion.
 
1154
        See example below.
 
1155
 
 
1156
        Intended to be used with a callable that raises AssertionError as the
 
1157
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1158
 
 
1159
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1160
        test succeeds.
 
1161
 
 
1162
        example usage::
 
1163
 
 
1164
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1165
                             dynamic_val)
 
1166
          self.assertEqual(42, dynamic_val)
 
1167
 
 
1168
          This means that a dynamic_val of 54 will cause the test to raise
 
1169
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1170
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1171
          than 54 or 42 will cause an AssertionError.
 
1172
        """
 
1173
        try:
 
1174
            assertion(*args, **kwargs)
 
1175
        except AssertionError:
 
1176
            raise KnownFailure(reason)
 
1177
        else:
 
1178
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1179
 
 
1180
    def assertFileEqual(self, content, path):
 
1181
        """Fail if path does not contain 'content'."""
 
1182
        self.failUnlessExists(path)
 
1183
        f = file(path, 'rb')
 
1184
        try:
 
1185
            s = f.read()
 
1186
        finally:
 
1187
            f.close()
 
1188
        self.assertEqualDiff(content, s)
 
1189
 
 
1190
    def failUnlessExists(self, path):
 
1191
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1192
        if not isinstance(path, basestring):
 
1193
            for p in path:
 
1194
                self.failUnlessExists(p)
 
1195
        else:
 
1196
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1197
 
 
1198
    def failIfExists(self, path):
 
1199
        """Fail if path or paths, which may be abs or relative, exist."""
 
1200
        if not isinstance(path, basestring):
 
1201
            for p in path:
 
1202
                self.failIfExists(p)
 
1203
        else:
 
1204
            self.failIf(osutils.lexists(path),path+" exists")
 
1205
 
 
1206
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1207
        """A helper for callDeprecated and applyDeprecated.
 
1208
 
 
1209
        :param a_callable: A callable to call.
 
1210
        :param args: The positional arguments for the callable
 
1211
        :param kwargs: The keyword arguments for the callable
 
1212
        :return: A tuple (warnings, result). result is the result of calling
 
1213
            a_callable(``*args``, ``**kwargs``).
 
1214
        """
 
1215
        local_warnings = []
 
1216
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1217
            # we've hooked into a deprecation specific callpath,
 
1218
            # only deprecations should getting sent via it.
 
1219
            self.assertEqual(cls, DeprecationWarning)
 
1220
            local_warnings.append(msg)
 
1221
        original_warning_method = symbol_versioning.warn
 
1222
        symbol_versioning.set_warning_method(capture_warnings)
 
1223
        try:
 
1224
            result = a_callable(*args, **kwargs)
 
1225
        finally:
 
1226
            symbol_versioning.set_warning_method(original_warning_method)
 
1227
        return (local_warnings, result)
 
1228
 
 
1229
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1230
        """Call a deprecated callable without warning the user.
 
1231
 
 
1232
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1233
        not other callers that go direct to the warning module.
 
1234
 
 
1235
        To test that a deprecated method raises an error, do something like
 
1236
        this::
 
1237
 
 
1238
            self.assertRaises(errors.ReservedId,
 
1239
                self.applyDeprecated,
 
1240
                deprecated_in((1, 5, 0)),
 
1241
                br.append_revision,
 
1242
                'current:')
 
1243
 
 
1244
        :param deprecation_format: The deprecation format that the callable
 
1245
            should have been deprecated with. This is the same type as the
 
1246
            parameter to deprecated_method/deprecated_function. If the
 
1247
            callable is not deprecated with this format, an assertion error
 
1248
            will be raised.
 
1249
        :param a_callable: A callable to call. This may be a bound method or
 
1250
            a regular function. It will be called with ``*args`` and
 
1251
            ``**kwargs``.
 
1252
        :param args: The positional arguments for the callable
 
1253
        :param kwargs: The keyword arguments for the callable
 
1254
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1255
        """
 
1256
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1257
            *args, **kwargs)
 
1258
        expected_first_warning = symbol_versioning.deprecation_string(
 
1259
            a_callable, deprecation_format)
 
1260
        if len(call_warnings) == 0:
 
1261
            self.fail("No deprecation warning generated by call to %s" %
 
1262
                a_callable)
 
1263
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1264
        return result
 
1265
 
 
1266
    def callCatchWarnings(self, fn, *args, **kw):
 
1267
        """Call a callable that raises python warnings.
 
1268
 
 
1269
        The caller's responsible for examining the returned warnings.
 
1270
 
 
1271
        If the callable raises an exception, the exception is not
 
1272
        caught and propagates up to the caller.  In that case, the list
 
1273
        of warnings is not available.
 
1274
 
 
1275
        :returns: ([warning_object, ...], fn_result)
 
1276
        """
 
1277
        # XXX: This is not perfect, because it completely overrides the
 
1278
        # warnings filters, and some code may depend on suppressing particular
 
1279
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1280
        # though.  -- Andrew, 20071062
 
1281
        wlist = []
 
1282
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1283
            # despite the name, 'message' is normally(?) a Warning subclass
 
1284
            # instance
 
1285
            wlist.append(message)
 
1286
        saved_showwarning = warnings.showwarning
 
1287
        saved_filters = warnings.filters
 
1288
        try:
 
1289
            warnings.showwarning = _catcher
 
1290
            warnings.filters = []
 
1291
            result = fn(*args, **kw)
 
1292
        finally:
 
1293
            warnings.showwarning = saved_showwarning
 
1294
            warnings.filters = saved_filters
 
1295
        return wlist, result
 
1296
 
 
1297
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1298
        """Assert that a callable is deprecated in a particular way.
 
1299
 
 
1300
        This is a very precise test for unusual requirements. The
 
1301
        applyDeprecated helper function is probably more suited for most tests
 
1302
        as it allows you to simply specify the deprecation format being used
 
1303
        and will ensure that that is issued for the function being called.
 
1304
 
 
1305
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1306
        not other callers that go direct to the warning module.  To catch
 
1307
        general warnings, use callCatchWarnings.
 
1308
 
 
1309
        :param expected: a list of the deprecation warnings expected, in order
 
1310
        :param callable: The callable to call
 
1311
        :param args: The positional arguments for the callable
 
1312
        :param kwargs: The keyword arguments for the callable
 
1313
        """
 
1314
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1315
            *args, **kwargs)
 
1316
        self.assertEqual(expected, call_warnings)
 
1317
        return result
 
1318
 
 
1319
    def _startLogFile(self):
 
1320
        """Send bzr and test log messages to a temporary file.
 
1321
 
 
1322
        The file is removed as the test is torn down.
 
1323
        """
 
1324
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1325
        self._log_file = os.fdopen(fileno, 'w+')
 
1326
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1327
        self._log_file_name = name
 
1328
        self.addCleanup(self._finishLogFile)
 
1329
 
 
1330
    def _finishLogFile(self):
 
1331
        """Finished with the log file.
 
1332
 
 
1333
        Close the file and delete it, unless setKeepLogfile was called.
 
1334
        """
 
1335
        if self._log_file is None:
 
1336
            return
 
1337
        bzrlib.trace.pop_log_file(self._log_memento)
 
1338
        self._log_file.close()
 
1339
        self._log_file = None
 
1340
        if not self._keep_log_file:
 
1341
            os.remove(self._log_file_name)
 
1342
            self._log_file_name = None
 
1343
 
 
1344
    def setKeepLogfile(self):
 
1345
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1346
        self._keep_log_file = True
 
1347
 
 
1348
    def thisFailsStrictLockCheck(self):
 
1349
        """It is known that this test would fail with -Dstrict_locks.
 
1350
 
 
1351
        By default, all tests are run with strict lock checking unless
 
1352
        -Edisable_lock_checks is supplied. However there are some tests which
 
1353
        we know fail strict locks at this point that have not been fixed.
 
1354
        They should call this function to disable the strict checking.
 
1355
 
 
1356
        This should be used sparingly, it is much better to fix the locking
 
1357
        issues rather than papering over the problem by calling this function.
 
1358
        """
 
1359
        debug.debug_flags.discard('strict_locks')
 
1360
 
 
1361
    def addCleanup(self, callable, *args, **kwargs):
 
1362
        """Arrange to run a callable when this case is torn down.
 
1363
 
 
1364
        Callables are run in the reverse of the order they are registered,
 
1365
        ie last-in first-out.
 
1366
        """
 
1367
        self._cleanups.append((callable, args, kwargs))
 
1368
 
 
1369
    def _cleanEnvironment(self):
 
1370
        new_env = {
 
1371
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1372
            'HOME': os.getcwd(),
 
1373
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1374
            # tests do check our impls match APPDATA
 
1375
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1376
            'VISUAL': None,
 
1377
            'EDITOR': None,
 
1378
            'BZR_EMAIL': None,
 
1379
            'BZREMAIL': None, # may still be present in the environment
 
1380
            'EMAIL': None,
 
1381
            'BZR_PROGRESS_BAR': None,
 
1382
            'BZR_LOG': None,
 
1383
            'BZR_PLUGIN_PATH': None,
 
1384
            # Make sure that any text ui tests are consistent regardless of
 
1385
            # the environment the test case is run in; you may want tests that
 
1386
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1387
            # going to a pipe or a StringIO.
 
1388
            'TERM': 'dumb',
 
1389
            'LINES': '25',
 
1390
            'COLUMNS': '80',
 
1391
            # SSH Agent
 
1392
            'SSH_AUTH_SOCK': None,
 
1393
            # Proxies
 
1394
            'http_proxy': None,
 
1395
            'HTTP_PROXY': None,
 
1396
            'https_proxy': None,
 
1397
            'HTTPS_PROXY': None,
 
1398
            'no_proxy': None,
 
1399
            'NO_PROXY': None,
 
1400
            'all_proxy': None,
 
1401
            'ALL_PROXY': None,
 
1402
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1403
            # least. If you do (care), please update this comment
 
1404
            # -- vila 20080401
 
1405
            'ftp_proxy': None,
 
1406
            'FTP_PROXY': None,
 
1407
            'BZR_REMOTE_PATH': None,
 
1408
        }
 
1409
        self.__old_env = {}
 
1410
        self.addCleanup(self._restoreEnvironment)
 
1411
        for name, value in new_env.iteritems():
 
1412
            self._captureVar(name, value)
 
1413
 
 
1414
    def _captureVar(self, name, newvalue):
 
1415
        """Set an environment variable, and reset it when finished."""
 
1416
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1417
 
 
1418
    def _restore_debug_flags(self):
 
1419
        debug.debug_flags.clear()
 
1420
        debug.debug_flags.update(self._preserved_debug_flags)
 
1421
 
 
1422
    def _restoreEnvironment(self):
 
1423
        for name, value in self.__old_env.iteritems():
 
1424
            osutils.set_or_unset_env(name, value)
 
1425
 
 
1426
    def _restoreHooks(self):
 
1427
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1428
            setattr(klass, name, hooks)
 
1429
 
 
1430
    def knownFailure(self, reason):
 
1431
        """This test has failed for some known reason."""
 
1432
        raise KnownFailure(reason)
 
1433
 
 
1434
    def _do_skip(self, result, reason):
 
1435
        addSkip = getattr(result, 'addSkip', None)
 
1436
        if not callable(addSkip):
 
1437
            result.addError(self, sys.exc_info())
 
1438
        else:
 
1439
            addSkip(self, reason)
 
1440
 
 
1441
    def run(self, result=None):
 
1442
        if result is None: result = self.defaultTestResult()
 
1443
        for feature in getattr(self, '_test_needs_features', []):
 
1444
            if not feature.available():
 
1445
                result.startTest(self)
 
1446
                if getattr(result, 'addNotSupported', None):
 
1447
                    result.addNotSupported(self, feature)
 
1448
                else:
 
1449
                    result.addSuccess(self)
 
1450
                result.stopTest(self)
 
1451
                return result
 
1452
        try:
 
1453
            try:
 
1454
                result.startTest(self)
 
1455
                absent_attr = object()
 
1456
                # Python 2.5
 
1457
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1458
                if method_name is absent_attr:
 
1459
                    # Python 2.4
 
1460
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1461
                testMethod = getattr(self, method_name)
 
1462
                try:
 
1463
                    try:
 
1464
                        self.setUp()
 
1465
                        if not self._bzr_test_setUp_run:
 
1466
                            self.fail(
 
1467
                                "test setUp did not invoke "
 
1468
                                "bzrlib.tests.TestCase's setUp")
 
1469
                    except KeyboardInterrupt:
 
1470
                        self._runCleanups()
 
1471
                        raise
 
1472
                    except TestSkipped, e:
 
1473
                        self._do_skip(result, e.args[0])
 
1474
                        self.tearDown()
 
1475
                        return result
 
1476
                    except:
 
1477
                        result.addError(self, sys.exc_info())
 
1478
                        self._runCleanups()
 
1479
                        return result
 
1480
 
 
1481
                    ok = False
 
1482
                    try:
 
1483
                        testMethod()
 
1484
                        ok = True
 
1485
                    except self.failureException:
 
1486
                        result.addFailure(self, sys.exc_info())
 
1487
                    except TestSkipped, e:
 
1488
                        if not e.args:
 
1489
                            reason = "No reason given."
 
1490
                        else:
 
1491
                            reason = e.args[0]
 
1492
                        self._do_skip(result, reason)
 
1493
                    except KeyboardInterrupt:
 
1494
                        self._runCleanups()
 
1495
                        raise
 
1496
                    except:
 
1497
                        result.addError(self, sys.exc_info())
 
1498
 
 
1499
                    try:
 
1500
                        self.tearDown()
 
1501
                        if not self._bzr_test_tearDown_run:
 
1502
                            self.fail(
 
1503
                                "test tearDown did not invoke "
 
1504
                                "bzrlib.tests.TestCase's tearDown")
 
1505
                    except KeyboardInterrupt:
 
1506
                        self._runCleanups()
 
1507
                        raise
 
1508
                    except:
 
1509
                        result.addError(self, sys.exc_info())
 
1510
                        self._runCleanups()
 
1511
                        ok = False
 
1512
                    if ok: result.addSuccess(self)
 
1513
                finally:
 
1514
                    result.stopTest(self)
 
1515
                return result
 
1516
            except TestNotApplicable:
 
1517
                # Not moved from the result [yet].
 
1518
                self._runCleanups()
 
1519
                raise
 
1520
            except KeyboardInterrupt:
 
1521
                self._runCleanups()
 
1522
                raise
 
1523
        finally:
 
1524
            saved_attrs = {}
 
1525
            for attr_name in self.attrs_to_keep:
 
1526
                if attr_name in self.__dict__:
 
1527
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1528
            self.__dict__ = saved_attrs
 
1529
 
 
1530
    def tearDown(self):
 
1531
        self._runCleanups()
 
1532
        self._log_contents = ''
 
1533
        self._bzr_test_tearDown_run = True
 
1534
        unittest.TestCase.tearDown(self)
 
1535
 
 
1536
    def time(self, callable, *args, **kwargs):
 
1537
        """Run callable and accrue the time it takes to the benchmark time.
 
1538
 
 
1539
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1540
        this will cause lsprofile statistics to be gathered and stored in
 
1541
        self._benchcalls.
 
1542
        """
 
1543
        if self._benchtime is None:
 
1544
            self._benchtime = 0
 
1545
        start = time.time()
 
1546
        try:
 
1547
            if not self._gather_lsprof_in_benchmarks:
 
1548
                return callable(*args, **kwargs)
 
1549
            else:
 
1550
                # record this benchmark
 
1551
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1552
                stats.sort()
 
1553
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1554
                return ret
 
1555
        finally:
 
1556
            self._benchtime += time.time() - start
 
1557
 
 
1558
    def _runCleanups(self):
 
1559
        """Run registered cleanup functions.
 
1560
 
 
1561
        This should only be called from TestCase.tearDown.
 
1562
        """
 
1563
        # TODO: Perhaps this should keep running cleanups even if
 
1564
        # one of them fails?
 
1565
 
 
1566
        # Actually pop the cleanups from the list so tearDown running
 
1567
        # twice is safe (this happens for skipped tests).
 
1568
        while self._cleanups:
 
1569
            cleanup, args, kwargs = self._cleanups.pop()
 
1570
            cleanup(*args, **kwargs)
 
1571
 
 
1572
    def log(self, *args):
 
1573
        mutter(*args)
 
1574
 
 
1575
    def _get_log(self, keep_log_file=False):
 
1576
        """Get the log from bzrlib.trace calls from this test.
 
1577
 
 
1578
        :param keep_log_file: When True, if the log is still a file on disk
 
1579
            leave it as a file on disk. When False, if the log is still a file
 
1580
            on disk, the log file is deleted and the log preserved as
 
1581
            self._log_contents.
 
1582
        :return: A string containing the log.
 
1583
        """
 
1584
        # flush the log file, to get all content
 
1585
        import bzrlib.trace
 
1586
        if bzrlib.trace._trace_file:
 
1587
            bzrlib.trace._trace_file.flush()
 
1588
        if self._log_contents:
 
1589
            # XXX: this can hardly contain the content flushed above --vila
 
1590
            # 20080128
 
1591
            return self._log_contents
 
1592
        if self._log_file_name is not None:
 
1593
            logfile = open(self._log_file_name)
 
1594
            try:
 
1595
                log_contents = logfile.read()
 
1596
            finally:
 
1597
                logfile.close()
 
1598
            if not keep_log_file:
 
1599
                self._log_contents = log_contents
 
1600
                try:
 
1601
                    os.remove(self._log_file_name)
 
1602
                except OSError, e:
 
1603
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1604
                        sys.stderr.write(('Unable to delete log file '
 
1605
                                             ' %r\n' % self._log_file_name))
 
1606
                    else:
 
1607
                        raise
 
1608
            return log_contents
 
1609
        else:
 
1610
            return "DELETED log file to reduce memory footprint"
 
1611
 
 
1612
    def requireFeature(self, feature):
 
1613
        """This test requires a specific feature is available.
 
1614
 
 
1615
        :raises UnavailableFeature: When feature is not available.
 
1616
        """
 
1617
        if not feature.available():
 
1618
            raise UnavailableFeature(feature)
 
1619
 
 
1620
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1621
            working_dir):
 
1622
        """Run bazaar command line, splitting up a string command line."""
 
1623
        if isinstance(args, basestring):
 
1624
            # shlex don't understand unicode strings,
 
1625
            # so args should be plain string (bialix 20070906)
 
1626
            args = list(shlex.split(str(args)))
 
1627
        return self._run_bzr_core(args, retcode=retcode,
 
1628
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1629
                )
 
1630
 
 
1631
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1632
            working_dir):
 
1633
        if encoding is None:
 
1634
            encoding = osutils.get_user_encoding()
 
1635
        stdout = StringIOWrapper()
 
1636
        stderr = StringIOWrapper()
 
1637
        stdout.encoding = encoding
 
1638
        stderr.encoding = encoding
 
1639
 
 
1640
        self.log('run bzr: %r', args)
 
1641
        # FIXME: don't call into logging here
 
1642
        handler = logging.StreamHandler(stderr)
 
1643
        handler.setLevel(logging.INFO)
 
1644
        logger = logging.getLogger('')
 
1645
        logger.addHandler(handler)
 
1646
        old_ui_factory = ui.ui_factory
 
1647
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1648
 
 
1649
        cwd = None
 
1650
        if working_dir is not None:
 
1651
            cwd = osutils.getcwd()
 
1652
            os.chdir(working_dir)
 
1653
 
 
1654
        try:
 
1655
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1656
                stdout, stderr,
 
1657
                bzrlib.commands.run_bzr_catch_user_errors,
 
1658
                args)
 
1659
        finally:
 
1660
            logger.removeHandler(handler)
 
1661
            ui.ui_factory = old_ui_factory
 
1662
            if cwd is not None:
 
1663
                os.chdir(cwd)
 
1664
 
 
1665
        out = stdout.getvalue()
 
1666
        err = stderr.getvalue()
 
1667
        if out:
 
1668
            self.log('output:\n%r', out)
 
1669
        if err:
 
1670
            self.log('errors:\n%r', err)
 
1671
        if retcode is not None:
 
1672
            self.assertEquals(retcode, result,
 
1673
                              message='Unexpected return code')
 
1674
        return out, err
 
1675
 
 
1676
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1677
                working_dir=None, error_regexes=[], output_encoding=None):
 
1678
        """Invoke bzr, as if it were run from the command line.
 
1679
 
 
1680
        The argument list should not include the bzr program name - the
 
1681
        first argument is normally the bzr command.  Arguments may be
 
1682
        passed in three ways:
 
1683
 
 
1684
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1685
        when the command contains whitespace or metacharacters, or
 
1686
        is built up at run time.
 
1687
 
 
1688
        2- A single string, eg "add a".  This is the most convenient
 
1689
        for hardcoded commands.
 
1690
 
 
1691
        This runs bzr through the interface that catches and reports
 
1692
        errors, and with logging set to something approximating the
 
1693
        default, so that error reporting can be checked.
 
1694
 
 
1695
        This should be the main method for tests that want to exercise the
 
1696
        overall behavior of the bzr application (rather than a unit test
 
1697
        or a functional test of the library.)
 
1698
 
 
1699
        This sends the stdout/stderr results into the test's log,
 
1700
        where it may be useful for debugging.  See also run_captured.
 
1701
 
 
1702
        :keyword stdin: A string to be used as stdin for the command.
 
1703
        :keyword retcode: The status code the command should return;
 
1704
            default 0.
 
1705
        :keyword working_dir: The directory to run the command in
 
1706
        :keyword error_regexes: A list of expected error messages.  If
 
1707
            specified they must be seen in the error output of the command.
 
1708
        """
 
1709
        out, err = self._run_bzr_autosplit(
 
1710
            args=args,
 
1711
            retcode=retcode,
 
1712
            encoding=encoding,
 
1713
            stdin=stdin,
 
1714
            working_dir=working_dir,
 
1715
            )
 
1716
        self.assertIsInstance(error_regexes, (list, tuple))
 
1717
        for regex in error_regexes:
 
1718
            self.assertContainsRe(err, regex)
 
1719
        return out, err
 
1720
 
 
1721
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1722
        """Run bzr, and check that stderr contains the supplied regexes
 
1723
 
 
1724
        :param error_regexes: Sequence of regular expressions which
 
1725
            must each be found in the error output. The relative ordering
 
1726
            is not enforced.
 
1727
        :param args: command-line arguments for bzr
 
1728
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1729
            This function changes the default value of retcode to be 3,
 
1730
            since in most cases this is run when you expect bzr to fail.
 
1731
 
 
1732
        :return: (out, err) The actual output of running the command (in case
 
1733
            you want to do more inspection)
 
1734
 
 
1735
        Examples of use::
 
1736
 
 
1737
            # Make sure that commit is failing because there is nothing to do
 
1738
            self.run_bzr_error(['no changes to commit'],
 
1739
                               ['commit', '-m', 'my commit comment'])
 
1740
            # Make sure --strict is handling an unknown file, rather than
 
1741
            # giving us the 'nothing to do' error
 
1742
            self.build_tree(['unknown'])
 
1743
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1744
                               ['commit', --strict', '-m', 'my commit comment'])
 
1745
        """
 
1746
        kwargs.setdefault('retcode', 3)
 
1747
        kwargs['error_regexes'] = error_regexes
 
1748
        out, err = self.run_bzr(*args, **kwargs)
 
1749
        return out, err
 
1750
 
 
1751
    def run_bzr_subprocess(self, *args, **kwargs):
 
1752
        """Run bzr in a subprocess for testing.
 
1753
 
 
1754
        This starts a new Python interpreter and runs bzr in there.
 
1755
        This should only be used for tests that have a justifiable need for
 
1756
        this isolation: e.g. they are testing startup time, or signal
 
1757
        handling, or early startup code, etc.  Subprocess code can't be
 
1758
        profiled or debugged so easily.
 
1759
 
 
1760
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1761
            None is supplied, the status code is not checked.
 
1762
        :keyword env_changes: A dictionary which lists changes to environment
 
1763
            variables. A value of None will unset the env variable.
 
1764
            The values must be strings. The change will only occur in the
 
1765
            child, so you don't need to fix the environment after running.
 
1766
        :keyword universal_newlines: Convert CRLF => LF
 
1767
        :keyword allow_plugins: By default the subprocess is run with
 
1768
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1769
            for system-wide plugins to create unexpected output on stderr,
 
1770
            which can cause unnecessary test failures.
 
1771
        """
 
1772
        env_changes = kwargs.get('env_changes', {})
 
1773
        working_dir = kwargs.get('working_dir', None)
 
1774
        allow_plugins = kwargs.get('allow_plugins', False)
 
1775
        if len(args) == 1:
 
1776
            if isinstance(args[0], list):
 
1777
                args = args[0]
 
1778
            elif isinstance(args[0], basestring):
 
1779
                args = list(shlex.split(args[0]))
 
1780
        else:
 
1781
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1782
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1783
                                            working_dir=working_dir,
 
1784
                                            allow_plugins=allow_plugins)
 
1785
        # We distinguish between retcode=None and retcode not passed.
 
1786
        supplied_retcode = kwargs.get('retcode', 0)
 
1787
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1788
            universal_newlines=kwargs.get('universal_newlines', False),
 
1789
            process_args=args)
 
1790
 
 
1791
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1792
                             skip_if_plan_to_signal=False,
 
1793
                             working_dir=None,
 
1794
                             allow_plugins=False):
 
1795
        """Start bzr in a subprocess for testing.
 
1796
 
 
1797
        This starts a new Python interpreter and runs bzr in there.
 
1798
        This should only be used for tests that have a justifiable need for
 
1799
        this isolation: e.g. they are testing startup time, or signal
 
1800
        handling, or early startup code, etc.  Subprocess code can't be
 
1801
        profiled or debugged so easily.
 
1802
 
 
1803
        :param process_args: a list of arguments to pass to the bzr executable,
 
1804
            for example ``['--version']``.
 
1805
        :param env_changes: A dictionary which lists changes to environment
 
1806
            variables. A value of None will unset the env variable.
 
1807
            The values must be strings. The change will only occur in the
 
1808
            child, so you don't need to fix the environment after running.
 
1809
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1810
            is not available.
 
1811
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1812
 
 
1813
        :returns: Popen object for the started process.
 
1814
        """
 
1815
        if skip_if_plan_to_signal:
 
1816
            if not getattr(os, 'kill', None):
 
1817
                raise TestSkipped("os.kill not available.")
 
1818
 
 
1819
        if env_changes is None:
 
1820
            env_changes = {}
 
1821
        old_env = {}
 
1822
 
 
1823
        def cleanup_environment():
 
1824
            for env_var, value in env_changes.iteritems():
 
1825
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1826
 
 
1827
        def restore_environment():
 
1828
            for env_var, value in old_env.iteritems():
 
1829
                osutils.set_or_unset_env(env_var, value)
 
1830
 
 
1831
        bzr_path = self.get_bzr_path()
 
1832
 
 
1833
        cwd = None
 
1834
        if working_dir is not None:
 
1835
            cwd = osutils.getcwd()
 
1836
            os.chdir(working_dir)
 
1837
 
 
1838
        try:
 
1839
            # win32 subprocess doesn't support preexec_fn
 
1840
            # so we will avoid using it on all platforms, just to
 
1841
            # make sure the code path is used, and we don't break on win32
 
1842
            cleanup_environment()
 
1843
            command = [sys.executable]
 
1844
            # frozen executables don't need the path to bzr
 
1845
            if getattr(sys, "frozen", None) is None:
 
1846
                command.append(bzr_path)
 
1847
            if not allow_plugins:
 
1848
                command.append('--no-plugins')
 
1849
            command.extend(process_args)
 
1850
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1851
        finally:
 
1852
            restore_environment()
 
1853
            if cwd is not None:
 
1854
                os.chdir(cwd)
 
1855
 
 
1856
        return process
 
1857
 
 
1858
    def _popen(self, *args, **kwargs):
 
1859
        """Place a call to Popen.
 
1860
 
 
1861
        Allows tests to override this method to intercept the calls made to
 
1862
        Popen for introspection.
 
1863
        """
 
1864
        return Popen(*args, **kwargs)
 
1865
 
 
1866
    def get_bzr_path(self):
 
1867
        """Return the path of the 'bzr' executable for this test suite."""
 
1868
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1869
        if not os.path.isfile(bzr_path):
 
1870
            # We are probably installed. Assume sys.argv is the right file
 
1871
            bzr_path = sys.argv[0]
 
1872
        return bzr_path
 
1873
 
 
1874
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1875
                              universal_newlines=False, process_args=None):
 
1876
        """Finish the execution of process.
 
1877
 
 
1878
        :param process: the Popen object returned from start_bzr_subprocess.
 
1879
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1880
            None is supplied, the status code is not checked.
 
1881
        :param send_signal: an optional signal to send to the process.
 
1882
        :param universal_newlines: Convert CRLF => LF
 
1883
        :returns: (stdout, stderr)
 
1884
        """
 
1885
        if send_signal is not None:
 
1886
            os.kill(process.pid, send_signal)
 
1887
        out, err = process.communicate()
 
1888
 
 
1889
        if universal_newlines:
 
1890
            out = out.replace('\r\n', '\n')
 
1891
            err = err.replace('\r\n', '\n')
 
1892
 
 
1893
        if retcode is not None and retcode != process.returncode:
 
1894
            if process_args is None:
 
1895
                process_args = "(unknown args)"
 
1896
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1897
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1898
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1899
                      % (process_args, retcode, process.returncode))
 
1900
        return [out, err]
 
1901
 
 
1902
    def check_inventory_shape(self, inv, shape):
 
1903
        """Compare an inventory to a list of expected names.
 
1904
 
 
1905
        Fail if they are not precisely equal.
 
1906
        """
 
1907
        extras = []
 
1908
        shape = list(shape)             # copy
 
1909
        for path, ie in inv.entries():
 
1910
            name = path.replace('\\', '/')
 
1911
            if ie.kind == 'directory':
 
1912
                name = name + '/'
 
1913
            if name in shape:
 
1914
                shape.remove(name)
 
1915
            else:
 
1916
                extras.append(name)
 
1917
        if shape:
 
1918
            self.fail("expected paths not found in inventory: %r" % shape)
 
1919
        if extras:
 
1920
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1921
 
 
1922
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1923
                         a_callable=None, *args, **kwargs):
 
1924
        """Call callable with redirected std io pipes.
 
1925
 
 
1926
        Returns the return code."""
 
1927
        if not callable(a_callable):
 
1928
            raise ValueError("a_callable must be callable.")
 
1929
        if stdin is None:
 
1930
            stdin = StringIO("")
 
1931
        if stdout is None:
 
1932
            if getattr(self, "_log_file", None) is not None:
 
1933
                stdout = self._log_file
 
1934
            else:
 
1935
                stdout = StringIO()
 
1936
        if stderr is None:
 
1937
            if getattr(self, "_log_file", None is not None):
 
1938
                stderr = self._log_file
 
1939
            else:
 
1940
                stderr = StringIO()
 
1941
        real_stdin = sys.stdin
 
1942
        real_stdout = sys.stdout
 
1943
        real_stderr = sys.stderr
 
1944
        try:
 
1945
            sys.stdout = stdout
 
1946
            sys.stderr = stderr
 
1947
            sys.stdin = stdin
 
1948
            return a_callable(*args, **kwargs)
 
1949
        finally:
 
1950
            sys.stdout = real_stdout
 
1951
            sys.stderr = real_stderr
 
1952
            sys.stdin = real_stdin
 
1953
 
 
1954
    def reduceLockdirTimeout(self):
 
1955
        """Reduce the default lock timeout for the duration of the test, so that
 
1956
        if LockContention occurs during a test, it does so quickly.
 
1957
 
 
1958
        Tests that expect to provoke LockContention errors should call this.
 
1959
        """
 
1960
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1961
        def resetTimeout():
 
1962
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1963
        self.addCleanup(resetTimeout)
 
1964
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1965
 
 
1966
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1967
        """Return a StringIOWrapper instance, that will encode Unicode
 
1968
        input to UTF-8.
 
1969
        """
 
1970
        if encoding_type is None:
 
1971
            encoding_type = 'strict'
 
1972
        sio = StringIO()
 
1973
        output_encoding = 'utf-8'
 
1974
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1975
        sio.encoding = output_encoding
 
1976
        return sio
 
1977
 
 
1978
    def disable_verb(self, verb):
 
1979
        """Disable a smart server verb for one test."""
 
1980
        from bzrlib.smart import request
 
1981
        request_handlers = request.request_handlers
 
1982
        orig_method = request_handlers.get(verb)
 
1983
        request_handlers.remove(verb)
 
1984
        def restoreVerb():
 
1985
            request_handlers.register(verb, orig_method)
 
1986
        self.addCleanup(restoreVerb)
 
1987
 
 
1988
 
 
1989
class CapturedCall(object):
 
1990
    """A helper for capturing smart server calls for easy debug analysis."""
 
1991
 
 
1992
    def __init__(self, params, prefix_length):
 
1993
        """Capture the call with params and skip prefix_length stack frames."""
 
1994
        self.call = params
 
1995
        import traceback
 
1996
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
1997
        # client frames. Beyond this we could get more clever, but this is good
 
1998
        # enough for now.
 
1999
        stack = traceback.extract_stack()[prefix_length:-5]
 
2000
        self.stack = ''.join(traceback.format_list(stack))
 
2001
 
 
2002
    def __str__(self):
 
2003
        return self.call.method
 
2004
 
 
2005
    def __repr__(self):
 
2006
        return self.call.method
 
2007
 
 
2008
    def stack(self):
 
2009
        return self.stack
 
2010
 
 
2011
 
 
2012
class TestCaseWithMemoryTransport(TestCase):
 
2013
    """Common test class for tests that do not need disk resources.
 
2014
 
 
2015
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2016
 
 
2017
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2018
 
 
2019
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2020
    a directory which does not exist. This serves to help ensure test isolation
 
2021
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2022
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2023
    file defaults for the transport in tests, nor does it obey the command line
 
2024
    override, so tests that accidentally write to the common directory should
 
2025
    be rare.
 
2026
 
 
2027
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2028
    a .bzr directory that stops us ascending higher into the filesystem.
 
2029
    """
 
2030
 
 
2031
    TEST_ROOT = None
 
2032
    _TEST_NAME = 'test'
 
2033
 
 
2034
    def __init__(self, methodName='runTest'):
 
2035
        # allow test parameterization after test construction and before test
 
2036
        # execution. Variables that the parameterizer sets need to be
 
2037
        # ones that are not set by setUp, or setUp will trash them.
 
2038
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2039
        self.vfs_transport_factory = default_transport
 
2040
        self.transport_server = None
 
2041
        self.transport_readonly_server = None
 
2042
        self.__vfs_server = None
 
2043
 
 
2044
    def get_transport(self, relpath=None):
 
2045
        """Return a writeable transport.
 
2046
 
 
2047
        This transport is for the test scratch space relative to
 
2048
        "self._test_root"
 
2049
 
 
2050
        :param relpath: a path relative to the base url.
 
2051
        """
 
2052
        t = get_transport(self.get_url(relpath))
 
2053
        self.assertFalse(t.is_readonly())
 
2054
        return t
 
2055
 
 
2056
    def get_readonly_transport(self, relpath=None):
 
2057
        """Return a readonly transport for the test scratch space
 
2058
 
 
2059
        This can be used to test that operations which should only need
 
2060
        readonly access in fact do not try to write.
 
2061
 
 
2062
        :param relpath: a path relative to the base url.
 
2063
        """
 
2064
        t = get_transport(self.get_readonly_url(relpath))
 
2065
        self.assertTrue(t.is_readonly())
 
2066
        return t
 
2067
 
 
2068
    def create_transport_readonly_server(self):
 
2069
        """Create a transport server from class defined at init.
 
2070
 
 
2071
        This is mostly a hook for daughter classes.
 
2072
        """
 
2073
        return self.transport_readonly_server()
 
2074
 
 
2075
    def get_readonly_server(self):
 
2076
        """Get the server instance for the readonly transport
 
2077
 
 
2078
        This is useful for some tests with specific servers to do diagnostics.
 
2079
        """
 
2080
        if self.__readonly_server is None:
 
2081
            if self.transport_readonly_server is None:
 
2082
                # readonly decorator requested
 
2083
                # bring up the server
 
2084
                self.__readonly_server = ReadonlyServer()
 
2085
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2086
            else:
 
2087
                self.__readonly_server = self.create_transport_readonly_server()
 
2088
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2089
            self.addCleanup(self.__readonly_server.tearDown)
 
2090
        return self.__readonly_server
 
2091
 
 
2092
    def get_readonly_url(self, relpath=None):
 
2093
        """Get a URL for the readonly transport.
 
2094
 
 
2095
        This will either be backed by '.' or a decorator to the transport
 
2096
        used by self.get_url()
 
2097
        relpath provides for clients to get a path relative to the base url.
 
2098
        These should only be downwards relative, not upwards.
 
2099
        """
 
2100
        base = self.get_readonly_server().get_url()
 
2101
        return self._adjust_url(base, relpath)
 
2102
 
 
2103
    def get_vfs_only_server(self):
 
2104
        """Get the vfs only read/write server instance.
 
2105
 
 
2106
        This is useful for some tests with specific servers that need
 
2107
        diagnostics.
 
2108
 
 
2109
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2110
        is no means to override it.
 
2111
        """
 
2112
        if self.__vfs_server is None:
 
2113
            self.__vfs_server = MemoryServer()
 
2114
            self.__vfs_server.setUp()
 
2115
            self.addCleanup(self.__vfs_server.tearDown)
 
2116
        return self.__vfs_server
 
2117
 
 
2118
    def get_server(self):
 
2119
        """Get the read/write server instance.
 
2120
 
 
2121
        This is useful for some tests with specific servers that need
 
2122
        diagnostics.
 
2123
 
 
2124
        This is built from the self.transport_server factory. If that is None,
 
2125
        then the self.get_vfs_server is returned.
 
2126
        """
 
2127
        if self.__server is None:
 
2128
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
2129
                return self.get_vfs_only_server()
 
2130
            else:
 
2131
                # bring up a decorated means of access to the vfs only server.
 
2132
                self.__server = self.transport_server()
 
2133
                try:
 
2134
                    self.__server.setUp(self.get_vfs_only_server())
 
2135
                except TypeError, e:
 
2136
                    # This should never happen; the try:Except here is to assist
 
2137
                    # developers having to update code rather than seeing an
 
2138
                    # uninformative TypeError.
 
2139
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
2140
            self.addCleanup(self.__server.tearDown)
 
2141
        return self.__server
 
2142
 
 
2143
    def _adjust_url(self, base, relpath):
 
2144
        """Get a URL (or maybe a path) for the readwrite transport.
 
2145
 
 
2146
        This will either be backed by '.' or to an equivalent non-file based
 
2147
        facility.
 
2148
        relpath provides for clients to get a path relative to the base url.
 
2149
        These should only be downwards relative, not upwards.
 
2150
        """
 
2151
        if relpath is not None and relpath != '.':
 
2152
            if not base.endswith('/'):
 
2153
                base = base + '/'
 
2154
            # XXX: Really base should be a url; we did after all call
 
2155
            # get_url()!  But sometimes it's just a path (from
 
2156
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2157
            # to a non-escaped local path.
 
2158
            if base.startswith('./') or base.startswith('/'):
 
2159
                base += relpath
 
2160
            else:
 
2161
                base += urlutils.escape(relpath)
 
2162
        return base
 
2163
 
 
2164
    def get_url(self, relpath=None):
 
2165
        """Get a URL (or maybe a path) for the readwrite transport.
 
2166
 
 
2167
        This will either be backed by '.' or to an equivalent non-file based
 
2168
        facility.
 
2169
        relpath provides for clients to get a path relative to the base url.
 
2170
        These should only be downwards relative, not upwards.
 
2171
        """
 
2172
        base = self.get_server().get_url()
 
2173
        return self._adjust_url(base, relpath)
 
2174
 
 
2175
    def get_vfs_only_url(self, relpath=None):
 
2176
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2177
 
 
2178
        This will never be a smart protocol.  It always has all the
 
2179
        capabilities of the local filesystem, but it might actually be a
 
2180
        MemoryTransport or some other similar virtual filesystem.
 
2181
 
 
2182
        This is the backing transport (if any) of the server returned by
 
2183
        get_url and get_readonly_url.
 
2184
 
 
2185
        :param relpath: provides for clients to get a path relative to the base
 
2186
            url.  These should only be downwards relative, not upwards.
 
2187
        :return: A URL
 
2188
        """
 
2189
        base = self.get_vfs_only_server().get_url()
 
2190
        return self._adjust_url(base, relpath)
 
2191
 
 
2192
    def _create_safety_net(self):
 
2193
        """Make a fake bzr directory.
 
2194
 
 
2195
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2196
        real branch.
 
2197
        """
 
2198
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2199
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2200
 
 
2201
    def _check_safety_net(self):
 
2202
        """Check that the safety .bzr directory have not been touched.
 
2203
 
 
2204
        _make_test_root have created a .bzr directory to prevent tests from
 
2205
        propagating. This method ensures than a test did not leaked.
 
2206
        """
 
2207
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2208
        wt = workingtree.WorkingTree.open(root)
 
2209
        last_rev = wt.last_revision()
 
2210
        if last_rev != 'null:':
 
2211
            # The current test have modified the /bzr directory, we need to
 
2212
            # recreate a new one or all the followng tests will fail.
 
2213
            # If you need to inspect its content uncomment the following line
 
2214
            # import pdb; pdb.set_trace()
 
2215
            _rmtree_temp_dir(root + '/.bzr')
 
2216
            self._create_safety_net()
 
2217
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2218
 
 
2219
    def _make_test_root(self):
 
2220
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2221
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2222
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2223
 
 
2224
            self._create_safety_net()
 
2225
 
 
2226
            # The same directory is used by all tests, and we're not
 
2227
            # specifically told when all tests are finished.  This will do.
 
2228
            atexit.register(_rmtree_temp_dir, root)
 
2229
 
 
2230
        self.addCleanup(self._check_safety_net)
 
2231
 
 
2232
    def makeAndChdirToTestDir(self):
 
2233
        """Create a temporary directories for this one test.
 
2234
 
 
2235
        This must set self.test_home_dir and self.test_dir and chdir to
 
2236
        self.test_dir.
 
2237
 
 
2238
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2239
        """
 
2240
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2241
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2242
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2243
 
 
2244
    def make_branch(self, relpath, format=None):
 
2245
        """Create a branch on the transport at relpath."""
 
2246
        repo = self.make_repository(relpath, format=format)
 
2247
        return repo.bzrdir.create_branch()
 
2248
 
 
2249
    def make_bzrdir(self, relpath, format=None):
 
2250
        try:
 
2251
            # might be a relative or absolute path
 
2252
            maybe_a_url = self.get_url(relpath)
 
2253
            segments = maybe_a_url.rsplit('/', 1)
 
2254
            t = get_transport(maybe_a_url)
 
2255
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2256
                t.ensure_base()
 
2257
            if format is None:
 
2258
                format = 'default'
 
2259
            if isinstance(format, basestring):
 
2260
                format = bzrdir.format_registry.make_bzrdir(format)
 
2261
            return format.initialize_on_transport(t)
 
2262
        except errors.UninitializableFormat:
 
2263
            raise TestSkipped("Format %s is not initializable." % format)
 
2264
 
 
2265
    def make_repository(self, relpath, shared=False, format=None):
 
2266
        """Create a repository on our default transport at relpath.
 
2267
 
 
2268
        Note that relpath must be a relative path, not a full url.
 
2269
        """
 
2270
        # FIXME: If you create a remoterepository this returns the underlying
 
2271
        # real format, which is incorrect.  Actually we should make sure that
 
2272
        # RemoteBzrDir returns a RemoteRepository.
 
2273
        # maybe  mbp 20070410
 
2274
        made_control = self.make_bzrdir(relpath, format=format)
 
2275
        return made_control.create_repository(shared=shared)
 
2276
 
 
2277
    def make_smart_server(self, path):
 
2278
        smart_server = server.SmartTCPServer_for_testing()
 
2279
        smart_server.setUp(self.get_server())
 
2280
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2281
        self.addCleanup(smart_server.tearDown)
 
2282
        return remote_transport
 
2283
 
 
2284
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2285
        """Create a branch on the default transport and a MemoryTree for it."""
 
2286
        b = self.make_branch(relpath, format=format)
 
2287
        return memorytree.MemoryTree.create_on_branch(b)
 
2288
 
 
2289
    def make_branch_builder(self, relpath, format=None):
 
2290
        branch = self.make_branch(relpath, format=format)
 
2291
        return branchbuilder.BranchBuilder(branch=branch)
 
2292
 
 
2293
    def overrideEnvironmentForTesting(self):
 
2294
        os.environ['HOME'] = self.test_home_dir
 
2295
        os.environ['BZR_HOME'] = self.test_home_dir
 
2296
 
 
2297
    def setUp(self):
 
2298
        super(TestCaseWithMemoryTransport, self).setUp()
 
2299
        self._make_test_root()
 
2300
        _currentdir = os.getcwdu()
 
2301
        def _leaveDirectory():
 
2302
            os.chdir(_currentdir)
 
2303
        self.addCleanup(_leaveDirectory)
 
2304
        self.makeAndChdirToTestDir()
 
2305
        self.overrideEnvironmentForTesting()
 
2306
        self.__readonly_server = None
 
2307
        self.__server = None
 
2308
        self.reduceLockdirTimeout()
 
2309
 
 
2310
    def setup_smart_server_with_call_log(self):
 
2311
        """Sets up a smart server as the transport server with a call log."""
 
2312
        self.transport_server = server.SmartTCPServer_for_testing
 
2313
        self.hpss_calls = []
 
2314
        import traceback
 
2315
        # Skip the current stack down to the caller of
 
2316
        # setup_smart_server_with_call_log
 
2317
        prefix_length = len(traceback.extract_stack()) - 2
 
2318
        def capture_hpss_call(params):
 
2319
            self.hpss_calls.append(
 
2320
                CapturedCall(params, prefix_length))
 
2321
        client._SmartClient.hooks.install_named_hook(
 
2322
            'call', capture_hpss_call, None)
 
2323
 
 
2324
    def reset_smart_call_log(self):
 
2325
        self.hpss_calls = []
 
2326
 
 
2327
 
 
2328
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2329
    """Derived class that runs a test within a temporary directory.
 
2330
 
 
2331
    This is useful for tests that need to create a branch, etc.
 
2332
 
 
2333
    The directory is created in a slightly complex way: for each
 
2334
    Python invocation, a new temporary top-level directory is created.
 
2335
    All test cases create their own directory within that.  If the
 
2336
    tests complete successfully, the directory is removed.
 
2337
 
 
2338
    :ivar test_base_dir: The path of the top-level directory for this
 
2339
    test, which contains a home directory and a work directory.
 
2340
 
 
2341
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2342
    which is used as $HOME for this test.
 
2343
 
 
2344
    :ivar test_dir: A directory under test_base_dir used as the current
 
2345
    directory when the test proper is run.
 
2346
    """
 
2347
 
 
2348
    OVERRIDE_PYTHON = 'python'
 
2349
 
 
2350
    def check_file_contents(self, filename, expect):
 
2351
        self.log("check contents of file %s" % filename)
 
2352
        contents = file(filename, 'r').read()
 
2353
        if contents != expect:
 
2354
            self.log("expected: %r" % expect)
 
2355
            self.log("actually: %r" % contents)
 
2356
            self.fail("contents of %s not as expected" % filename)
 
2357
 
 
2358
    def _getTestDirPrefix(self):
 
2359
        # create a directory within the top level test directory
 
2360
        if sys.platform in ('win32', 'cygwin'):
 
2361
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2362
            # windows is likely to have path-length limits so use a short name
 
2363
            name_prefix = name_prefix[-30:]
 
2364
        else:
 
2365
            name_prefix = re.sub('[/]', '_', self.id())
 
2366
        return name_prefix
 
2367
 
 
2368
    def makeAndChdirToTestDir(self):
 
2369
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2370
 
 
2371
        For TestCaseInTempDir we create a temporary directory based on the test
 
2372
        name and then create two subdirs - test and home under it.
 
2373
        """
 
2374
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2375
            self._getTestDirPrefix())
 
2376
        name = name_prefix
 
2377
        for i in range(100):
 
2378
            if os.path.exists(name):
 
2379
                name = name_prefix + '_' + str(i)
 
2380
            else:
 
2381
                os.mkdir(name)
 
2382
                break
 
2383
        # now create test and home directories within this dir
 
2384
        self.test_base_dir = name
 
2385
        self.test_home_dir = self.test_base_dir + '/home'
 
2386
        os.mkdir(self.test_home_dir)
 
2387
        self.test_dir = self.test_base_dir + '/work'
 
2388
        os.mkdir(self.test_dir)
 
2389
        os.chdir(self.test_dir)
 
2390
        # put name of test inside
 
2391
        f = file(self.test_base_dir + '/name', 'w')
 
2392
        try:
 
2393
            f.write(self.id())
 
2394
        finally:
 
2395
            f.close()
 
2396
        self.addCleanup(self.deleteTestDir)
 
2397
 
 
2398
    def deleteTestDir(self):
 
2399
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2400
        _rmtree_temp_dir(self.test_base_dir)
 
2401
 
 
2402
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2403
        """Build a test tree according to a pattern.
 
2404
 
 
2405
        shape is a sequence of file specifications.  If the final
 
2406
        character is '/', a directory is created.
 
2407
 
 
2408
        This assumes that all the elements in the tree being built are new.
 
2409
 
 
2410
        This doesn't add anything to a branch.
 
2411
 
 
2412
        :type shape:    list or tuple.
 
2413
        :param line_endings: Either 'binary' or 'native'
 
2414
            in binary mode, exact contents are written in native mode, the
 
2415
            line endings match the default platform endings.
 
2416
        :param transport: A transport to write to, for building trees on VFS's.
 
2417
            If the transport is readonly or None, "." is opened automatically.
 
2418
        :return: None
 
2419
        """
 
2420
        if type(shape) not in (list, tuple):
 
2421
            raise AssertionError("Parameter 'shape' should be "
 
2422
                "a list or a tuple. Got %r instead" % (shape,))
 
2423
        # It's OK to just create them using forward slashes on windows.
 
2424
        if transport is None or transport.is_readonly():
 
2425
            transport = get_transport(".")
 
2426
        for name in shape:
 
2427
            self.assertIsInstance(name, basestring)
 
2428
            if name[-1] == '/':
 
2429
                transport.mkdir(urlutils.escape(name[:-1]))
 
2430
            else:
 
2431
                if line_endings == 'binary':
 
2432
                    end = '\n'
 
2433
                elif line_endings == 'native':
 
2434
                    end = os.linesep
 
2435
                else:
 
2436
                    raise errors.BzrError(
 
2437
                        'Invalid line ending request %r' % line_endings)
 
2438
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2439
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2440
 
 
2441
    def build_tree_contents(self, shape):
 
2442
        build_tree_contents(shape)
 
2443
 
 
2444
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2445
        """Assert whether path or paths are in the WorkingTree"""
 
2446
        if tree is None:
 
2447
            tree = workingtree.WorkingTree.open(root_path)
 
2448
        if not isinstance(path, basestring):
 
2449
            for p in path:
 
2450
                self.assertInWorkingTree(p, tree=tree)
 
2451
        else:
 
2452
            self.assertIsNot(tree.path2id(path), None,
 
2453
                path+' not in working tree.')
 
2454
 
 
2455
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2456
        """Assert whether path or paths are not in the WorkingTree"""
 
2457
        if tree is None:
 
2458
            tree = workingtree.WorkingTree.open(root_path)
 
2459
        if not isinstance(path, basestring):
 
2460
            for p in path:
 
2461
                self.assertNotInWorkingTree(p,tree=tree)
 
2462
        else:
 
2463
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2464
 
 
2465
 
 
2466
class TestCaseWithTransport(TestCaseInTempDir):
 
2467
    """A test case that provides get_url and get_readonly_url facilities.
 
2468
 
 
2469
    These back onto two transport servers, one for readonly access and one for
 
2470
    read write access.
 
2471
 
 
2472
    If no explicit class is provided for readonly access, a
 
2473
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2474
    based read write transports.
 
2475
 
 
2476
    If an explicit class is provided for readonly access, that server and the
 
2477
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2478
    """
 
2479
 
 
2480
    def get_vfs_only_server(self):
 
2481
        """See TestCaseWithMemoryTransport.
 
2482
 
 
2483
        This is useful for some tests with specific servers that need
 
2484
        diagnostics.
 
2485
        """
 
2486
        if self.__vfs_server is None:
 
2487
            self.__vfs_server = self.vfs_transport_factory()
 
2488
            self.__vfs_server.setUp()
 
2489
            self.addCleanup(self.__vfs_server.tearDown)
 
2490
        return self.__vfs_server
 
2491
 
 
2492
    def make_branch_and_tree(self, relpath, format=None):
 
2493
        """Create a branch on the transport and a tree locally.
 
2494
 
 
2495
        If the transport is not a LocalTransport, the Tree can't be created on
 
2496
        the transport.  In that case if the vfs_transport_factory is
 
2497
        LocalURLServer the working tree is created in the local
 
2498
        directory backing the transport, and the returned tree's branch and
 
2499
        repository will also be accessed locally. Otherwise a lightweight
 
2500
        checkout is created and returned.
 
2501
 
 
2502
        We do this because we can't physically create a tree in the local
 
2503
        path, with a branch reference to the transport_factory url, and
 
2504
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2505
        namespace is distinct from the local disk - the two branch objects
 
2506
        would collide. While we could construct a tree with its branch object
 
2507
        pointing at the transport_factory transport in memory, reopening it
 
2508
        would behaving unexpectedly, and has in the past caused testing bugs
 
2509
        when we tried to do it that way.
 
2510
 
 
2511
        :param format: The BzrDirFormat.
 
2512
        :returns: the WorkingTree.
 
2513
        """
 
2514
        # TODO: always use the local disk path for the working tree,
 
2515
        # this obviously requires a format that supports branch references
 
2516
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2517
        # RBC 20060208
 
2518
        b = self.make_branch(relpath, format=format)
 
2519
        try:
 
2520
            return b.bzrdir.create_workingtree()
 
2521
        except errors.NotLocalUrl:
 
2522
            # We can only make working trees locally at the moment.  If the
 
2523
            # transport can't support them, then we keep the non-disk-backed
 
2524
            # branch and create a local checkout.
 
2525
            if self.vfs_transport_factory is LocalURLServer:
 
2526
                # the branch is colocated on disk, we cannot create a checkout.
 
2527
                # hopefully callers will expect this.
 
2528
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2529
                wt = local_controldir.create_workingtree()
 
2530
                if wt.branch._format != b._format:
 
2531
                    wt._branch = b
 
2532
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2533
                    # in case the implementation details of workingtree objects
 
2534
                    # change.
 
2535
                    self.assertIs(b, wt.branch)
 
2536
                return wt
 
2537
            else:
 
2538
                return b.create_checkout(relpath, lightweight=True)
 
2539
 
 
2540
    def assertIsDirectory(self, relpath, transport):
 
2541
        """Assert that relpath within transport is a directory.
 
2542
 
 
2543
        This may not be possible on all transports; in that case it propagates
 
2544
        a TransportNotPossible.
 
2545
        """
 
2546
        try:
 
2547
            mode = transport.stat(relpath).st_mode
 
2548
        except errors.NoSuchFile:
 
2549
            self.fail("path %s is not a directory; no such file"
 
2550
                      % (relpath))
 
2551
        if not stat.S_ISDIR(mode):
 
2552
            self.fail("path %s is not a directory; has mode %#o"
 
2553
                      % (relpath, mode))
 
2554
 
 
2555
    def assertTreesEqual(self, left, right):
 
2556
        """Check that left and right have the same content and properties."""
 
2557
        # we use a tree delta to check for equality of the content, and we
 
2558
        # manually check for equality of other things such as the parents list.
 
2559
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2560
        differences = left.changes_from(right)
 
2561
        self.assertFalse(differences.has_changed(),
 
2562
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2563
 
 
2564
    def setUp(self):
 
2565
        super(TestCaseWithTransport, self).setUp()
 
2566
        self.__vfs_server = None
 
2567
 
 
2568
 
 
2569
class ChrootedTestCase(TestCaseWithTransport):
 
2570
    """A support class that provides readonly urls outside the local namespace.
 
2571
 
 
2572
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2573
    is then we are chrooted already, if it is not then an HttpServer is used
 
2574
    for readonly urls.
 
2575
 
 
2576
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2577
                       be used without needed to redo it when a different
 
2578
                       subclass is in use ?
 
2579
    """
 
2580
 
 
2581
    def setUp(self):
 
2582
        super(ChrootedTestCase, self).setUp()
 
2583
        if not self.vfs_transport_factory == MemoryServer:
 
2584
            self.transport_readonly_server = HttpServer
 
2585
 
 
2586
 
 
2587
def condition_id_re(pattern):
 
2588
    """Create a condition filter which performs a re check on a test's id.
 
2589
 
 
2590
    :param pattern: A regular expression string.
 
2591
    :return: A callable that returns True if the re matches.
 
2592
    """
 
2593
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2594
        'test filter')
 
2595
    def condition(test):
 
2596
        test_id = test.id()
 
2597
        return filter_re.search(test_id)
 
2598
    return condition
 
2599
 
 
2600
 
 
2601
def condition_isinstance(klass_or_klass_list):
 
2602
    """Create a condition filter which returns isinstance(param, klass).
 
2603
 
 
2604
    :return: A callable which when called with one parameter obj return the
 
2605
        result of isinstance(obj, klass_or_klass_list).
 
2606
    """
 
2607
    def condition(obj):
 
2608
        return isinstance(obj, klass_or_klass_list)
 
2609
    return condition
 
2610
 
 
2611
 
 
2612
def condition_id_in_list(id_list):
 
2613
    """Create a condition filter which verify that test's id in a list.
 
2614
 
 
2615
    :param id_list: A TestIdList object.
 
2616
    :return: A callable that returns True if the test's id appears in the list.
 
2617
    """
 
2618
    def condition(test):
 
2619
        return id_list.includes(test.id())
 
2620
    return condition
 
2621
 
 
2622
 
 
2623
def condition_id_startswith(starts):
 
2624
    """Create a condition filter verifying that test's id starts with a string.
 
2625
 
 
2626
    :param starts: A list of string.
 
2627
    :return: A callable that returns True if the test's id starts with one of
 
2628
        the given strings.
 
2629
    """
 
2630
    def condition(test):
 
2631
        for start in starts:
 
2632
            if test.id().startswith(start):
 
2633
                return True
 
2634
        return False
 
2635
    return condition
 
2636
 
 
2637
 
 
2638
def exclude_tests_by_condition(suite, condition):
 
2639
    """Create a test suite which excludes some tests from suite.
 
2640
 
 
2641
    :param suite: The suite to get tests from.
 
2642
    :param condition: A callable whose result evaluates True when called with a
 
2643
        test case which should be excluded from the result.
 
2644
    :return: A suite which contains the tests found in suite that fail
 
2645
        condition.
 
2646
    """
 
2647
    result = []
 
2648
    for test in iter_suite_tests(suite):
 
2649
        if not condition(test):
 
2650
            result.append(test)
 
2651
    return TestUtil.TestSuite(result)
 
2652
 
 
2653
 
 
2654
def filter_suite_by_condition(suite, condition):
 
2655
    """Create a test suite by filtering another one.
 
2656
 
 
2657
    :param suite: The source suite.
 
2658
    :param condition: A callable whose result evaluates True when called with a
 
2659
        test case which should be included in the result.
 
2660
    :return: A suite which contains the tests found in suite that pass
 
2661
        condition.
 
2662
    """
 
2663
    result = []
 
2664
    for test in iter_suite_tests(suite):
 
2665
        if condition(test):
 
2666
            result.append(test)
 
2667
    return TestUtil.TestSuite(result)
 
2668
 
 
2669
 
 
2670
def filter_suite_by_re(suite, pattern):
 
2671
    """Create a test suite by filtering another one.
 
2672
 
 
2673
    :param suite:           the source suite
 
2674
    :param pattern:         pattern that names must match
 
2675
    :returns: the newly created suite
 
2676
    """
 
2677
    condition = condition_id_re(pattern)
 
2678
    result_suite = filter_suite_by_condition(suite, condition)
 
2679
    return result_suite
 
2680
 
 
2681
 
 
2682
def filter_suite_by_id_list(suite, test_id_list):
 
2683
    """Create a test suite by filtering another one.
 
2684
 
 
2685
    :param suite: The source suite.
 
2686
    :param test_id_list: A list of the test ids to keep as strings.
 
2687
    :returns: the newly created suite
 
2688
    """
 
2689
    condition = condition_id_in_list(test_id_list)
 
2690
    result_suite = filter_suite_by_condition(suite, condition)
 
2691
    return result_suite
 
2692
 
 
2693
 
 
2694
def filter_suite_by_id_startswith(suite, start):
 
2695
    """Create a test suite by filtering another one.
 
2696
 
 
2697
    :param suite: The source suite.
 
2698
    :param start: A list of string the test id must start with one of.
 
2699
    :returns: the newly created suite
 
2700
    """
 
2701
    condition = condition_id_startswith(start)
 
2702
    result_suite = filter_suite_by_condition(suite, condition)
 
2703
    return result_suite
 
2704
 
 
2705
 
 
2706
def exclude_tests_by_re(suite, pattern):
 
2707
    """Create a test suite which excludes some tests from suite.
 
2708
 
 
2709
    :param suite: The suite to get tests from.
 
2710
    :param pattern: A regular expression string. Test ids that match this
 
2711
        pattern will be excluded from the result.
 
2712
    :return: A TestSuite that contains all the tests from suite without the
 
2713
        tests that matched pattern. The order of tests is the same as it was in
 
2714
        suite.
 
2715
    """
 
2716
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2717
 
 
2718
 
 
2719
def preserve_input(something):
 
2720
    """A helper for performing test suite transformation chains.
 
2721
 
 
2722
    :param something: Anything you want to preserve.
 
2723
    :return: Something.
 
2724
    """
 
2725
    return something
 
2726
 
 
2727
 
 
2728
def randomize_suite(suite):
 
2729
    """Return a new TestSuite with suite's tests in random order.
 
2730
 
 
2731
    The tests in the input suite are flattened into a single suite in order to
 
2732
    accomplish this. Any nested TestSuites are removed to provide global
 
2733
    randomness.
 
2734
    """
 
2735
    tests = list(iter_suite_tests(suite))
 
2736
    random.shuffle(tests)
 
2737
    return TestUtil.TestSuite(tests)
 
2738
 
 
2739
 
 
2740
def split_suite_by_condition(suite, condition):
 
2741
    """Split a test suite into two by a condition.
 
2742
 
 
2743
    :param suite: The suite to split.
 
2744
    :param condition: The condition to match on. Tests that match this
 
2745
        condition are returned in the first test suite, ones that do not match
 
2746
        are in the second suite.
 
2747
    :return: A tuple of two test suites, where the first contains tests from
 
2748
        suite matching the condition, and the second contains the remainder
 
2749
        from suite. The order within each output suite is the same as it was in
 
2750
        suite.
 
2751
    """
 
2752
    matched = []
 
2753
    did_not_match = []
 
2754
    for test in iter_suite_tests(suite):
 
2755
        if condition(test):
 
2756
            matched.append(test)
 
2757
        else:
 
2758
            did_not_match.append(test)
 
2759
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2760
 
 
2761
 
 
2762
def split_suite_by_re(suite, pattern):
 
2763
    """Split a test suite into two by a regular expression.
 
2764
 
 
2765
    :param suite: The suite to split.
 
2766
    :param pattern: A regular expression string. Test ids that match this
 
2767
        pattern will be in the first test suite returned, and the others in the
 
2768
        second test suite returned.
 
2769
    :return: A tuple of two test suites, where the first contains tests from
 
2770
        suite matching pattern, and the second contains the remainder from
 
2771
        suite. The order within each output suite is the same as it was in
 
2772
        suite.
 
2773
    """
 
2774
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2775
 
 
2776
 
 
2777
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2778
              stop_on_failure=False,
 
2779
              transport=None, lsprof_timed=None, bench_history=None,
 
2780
              matching_tests_first=None,
 
2781
              list_only=False,
 
2782
              random_seed=None,
 
2783
              exclude_pattern=None,
 
2784
              strict=False,
 
2785
              runner_class=None,
 
2786
              suite_decorators=None,
 
2787
              stream=None,
 
2788
              result_decorators=None,
 
2789
              ):
 
2790
    """Run a test suite for bzr selftest.
 
2791
 
 
2792
    :param runner_class: The class of runner to use. Must support the
 
2793
        constructor arguments passed by run_suite which are more than standard
 
2794
        python uses.
 
2795
    :return: A boolean indicating success.
 
2796
    """
 
2797
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2798
    if verbose:
 
2799
        verbosity = 2
 
2800
    else:
 
2801
        verbosity = 1
 
2802
    if runner_class is None:
 
2803
        runner_class = TextTestRunner
 
2804
    if stream is None:
 
2805
        stream = sys.stdout
 
2806
    runner = runner_class(stream=stream,
 
2807
                            descriptions=0,
 
2808
                            verbosity=verbosity,
 
2809
                            bench_history=bench_history,
 
2810
                            list_only=list_only,
 
2811
                            strict=strict,
 
2812
                            result_decorators=result_decorators,
 
2813
                            )
 
2814
    runner.stop_on_failure=stop_on_failure
 
2815
    # built in decorator factories:
 
2816
    decorators = [
 
2817
        random_order(random_seed, runner),
 
2818
        exclude_tests(exclude_pattern),
 
2819
        ]
 
2820
    if matching_tests_first:
 
2821
        decorators.append(tests_first(pattern))
 
2822
    else:
 
2823
        decorators.append(filter_tests(pattern))
 
2824
    if suite_decorators:
 
2825
        decorators.extend(suite_decorators)
 
2826
    # tell the result object how many tests will be running: (except if
 
2827
    # --parallel=fork is being used. Robert said he will provide a better
 
2828
    # progress design later -- vila 20090817)
 
2829
    if fork_decorator not in decorators:
 
2830
        decorators.append(CountingDecorator)
 
2831
    for decorator in decorators:
 
2832
        suite = decorator(suite)
 
2833
    result = runner.run(suite)
 
2834
    if list_only:
 
2835
        return True
 
2836
    result.done()
 
2837
    if strict:
 
2838
        return result.wasStrictlySuccessful()
 
2839
    else:
 
2840
        return result.wasSuccessful()
 
2841
 
 
2842
 
 
2843
# A registry where get() returns a suite decorator.
 
2844
parallel_registry = registry.Registry()
 
2845
 
 
2846
 
 
2847
def fork_decorator(suite):
 
2848
    concurrency = osutils.local_concurrency()
 
2849
    if concurrency == 1:
 
2850
        return suite
 
2851
    from testtools import ConcurrentTestSuite
 
2852
    return ConcurrentTestSuite(suite, fork_for_tests)
 
2853
parallel_registry.register('fork', fork_decorator)
 
2854
 
 
2855
 
 
2856
def subprocess_decorator(suite):
 
2857
    concurrency = osutils.local_concurrency()
 
2858
    if concurrency == 1:
 
2859
        return suite
 
2860
    from testtools import ConcurrentTestSuite
 
2861
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
2862
parallel_registry.register('subprocess', subprocess_decorator)
 
2863
 
 
2864
 
 
2865
def exclude_tests(exclude_pattern):
 
2866
    """Return a test suite decorator that excludes tests."""
 
2867
    if exclude_pattern is None:
 
2868
        return identity_decorator
 
2869
    def decorator(suite):
 
2870
        return ExcludeDecorator(suite, exclude_pattern)
 
2871
    return decorator
 
2872
 
 
2873
 
 
2874
def filter_tests(pattern):
 
2875
    if pattern == '.*':
 
2876
        return identity_decorator
 
2877
    def decorator(suite):
 
2878
        return FilterTestsDecorator(suite, pattern)
 
2879
    return decorator
 
2880
 
 
2881
 
 
2882
def random_order(random_seed, runner):
 
2883
    """Return a test suite decorator factory for randomising tests order.
 
2884
    
 
2885
    :param random_seed: now, a string which casts to a long, or a long.
 
2886
    :param runner: A test runner with a stream attribute to report on.
 
2887
    """
 
2888
    if random_seed is None:
 
2889
        return identity_decorator
 
2890
    def decorator(suite):
 
2891
        return RandomDecorator(suite, random_seed, runner.stream)
 
2892
    return decorator
 
2893
 
 
2894
 
 
2895
def tests_first(pattern):
 
2896
    if pattern == '.*':
 
2897
        return identity_decorator
 
2898
    def decorator(suite):
 
2899
        return TestFirstDecorator(suite, pattern)
 
2900
    return decorator
 
2901
 
 
2902
 
 
2903
def identity_decorator(suite):
 
2904
    """Return suite."""
 
2905
    return suite
 
2906
 
 
2907
 
 
2908
class TestDecorator(TestSuite):
 
2909
    """A decorator for TestCase/TestSuite objects.
 
2910
    
 
2911
    Usually, subclasses should override __iter__(used when flattening test
 
2912
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
2913
    debug().
 
2914
    """
 
2915
 
 
2916
    def __init__(self, suite):
 
2917
        TestSuite.__init__(self)
 
2918
        self.addTest(suite)
 
2919
 
 
2920
    def countTestCases(self):
 
2921
        cases = 0
 
2922
        for test in self:
 
2923
            cases += test.countTestCases()
 
2924
        return cases
 
2925
 
 
2926
    def debug(self):
 
2927
        for test in self:
 
2928
            test.debug()
 
2929
 
 
2930
    def run(self, result):
 
2931
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
2932
        # into __iter__.
 
2933
        for test in self:
 
2934
            if result.shouldStop:
 
2935
                break
 
2936
            test.run(result)
 
2937
        return result
 
2938
 
 
2939
 
 
2940
class CountingDecorator(TestDecorator):
 
2941
    """A decorator which calls result.progress(self.countTestCases)."""
 
2942
 
 
2943
    def run(self, result):
 
2944
        progress_method = getattr(result, 'progress', None)
 
2945
        if callable(progress_method):
 
2946
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
2947
        return super(CountingDecorator, self).run(result)
 
2948
 
 
2949
 
 
2950
class ExcludeDecorator(TestDecorator):
 
2951
    """A decorator which excludes test matching an exclude pattern."""
 
2952
 
 
2953
    def __init__(self, suite, exclude_pattern):
 
2954
        TestDecorator.__init__(self, suite)
 
2955
        self.exclude_pattern = exclude_pattern
 
2956
        self.excluded = False
 
2957
 
 
2958
    def __iter__(self):
 
2959
        if self.excluded:
 
2960
            return iter(self._tests)
 
2961
        self.excluded = True
 
2962
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
2963
        del self._tests[:]
 
2964
        self.addTests(suite)
 
2965
        return iter(self._tests)
 
2966
 
 
2967
 
 
2968
class FilterTestsDecorator(TestDecorator):
 
2969
    """A decorator which filters tests to those matching a pattern."""
 
2970
 
 
2971
    def __init__(self, suite, pattern):
 
2972
        TestDecorator.__init__(self, suite)
 
2973
        self.pattern = pattern
 
2974
        self.filtered = False
 
2975
 
 
2976
    def __iter__(self):
 
2977
        if self.filtered:
 
2978
            return iter(self._tests)
 
2979
        self.filtered = True
 
2980
        suite = filter_suite_by_re(self, self.pattern)
 
2981
        del self._tests[:]
 
2982
        self.addTests(suite)
 
2983
        return iter(self._tests)
 
2984
 
 
2985
 
 
2986
class RandomDecorator(TestDecorator):
 
2987
    """A decorator which randomises the order of its tests."""
 
2988
 
 
2989
    def __init__(self, suite, random_seed, stream):
 
2990
        TestDecorator.__init__(self, suite)
 
2991
        self.random_seed = random_seed
 
2992
        self.randomised = False
 
2993
        self.stream = stream
 
2994
 
 
2995
    def __iter__(self):
 
2996
        if self.randomised:
 
2997
            return iter(self._tests)
 
2998
        self.randomised = True
 
2999
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3000
            (self.actual_seed()))
 
3001
        # Initialise the random number generator.
 
3002
        random.seed(self.actual_seed())
 
3003
        suite = randomize_suite(self)
 
3004
        del self._tests[:]
 
3005
        self.addTests(suite)
 
3006
        return iter(self._tests)
 
3007
 
 
3008
    def actual_seed(self):
 
3009
        if self.random_seed == "now":
 
3010
            # We convert the seed to a long to make it reuseable across
 
3011
            # invocations (because the user can reenter it).
 
3012
            self.random_seed = long(time.time())
 
3013
        else:
 
3014
            # Convert the seed to a long if we can
 
3015
            try:
 
3016
                self.random_seed = long(self.random_seed)
 
3017
            except:
 
3018
                pass
 
3019
        return self.random_seed
 
3020
 
 
3021
 
 
3022
class TestFirstDecorator(TestDecorator):
 
3023
    """A decorator which moves named tests to the front."""
 
3024
 
 
3025
    def __init__(self, suite, pattern):
 
3026
        TestDecorator.__init__(self, suite)
 
3027
        self.pattern = pattern
 
3028
        self.filtered = False
 
3029
 
 
3030
    def __iter__(self):
 
3031
        if self.filtered:
 
3032
            return iter(self._tests)
 
3033
        self.filtered = True
 
3034
        suites = split_suite_by_re(self, self.pattern)
 
3035
        del self._tests[:]
 
3036
        self.addTests(suites)
 
3037
        return iter(self._tests)
 
3038
 
 
3039
 
 
3040
def partition_tests(suite, count):
 
3041
    """Partition suite into count lists of tests."""
 
3042
    result = []
 
3043
    tests = list(iter_suite_tests(suite))
 
3044
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3045
    for block in range(count):
 
3046
        low_test = block * tests_per_process
 
3047
        high_test = low_test + tests_per_process
 
3048
        process_tests = tests[low_test:high_test]
 
3049
        result.append(process_tests)
 
3050
    return result
 
3051
 
 
3052
 
 
3053
def fork_for_tests(suite):
 
3054
    """Take suite and start up one runner per CPU by forking()
 
3055
 
 
3056
    :return: An iterable of TestCase-like objects which can each have
 
3057
        run(result) called on them to feed tests to result.
 
3058
    """
 
3059
    concurrency = osutils.local_concurrency()
 
3060
    result = []
 
3061
    from subunit import TestProtocolClient, ProtocolTestCase
 
3062
    try:
 
3063
        from subunit.test_results import AutoTimingTestResultDecorator
 
3064
    except ImportError:
 
3065
        AutoTimingTestResultDecorator = lambda x:x
 
3066
    class TestInOtherProcess(ProtocolTestCase):
 
3067
        # Should be in subunit, I think. RBC.
 
3068
        def __init__(self, stream, pid):
 
3069
            ProtocolTestCase.__init__(self, stream)
 
3070
            self.pid = pid
 
3071
 
 
3072
        def run(self, result):
 
3073
            try:
 
3074
                ProtocolTestCase.run(self, result)
 
3075
            finally:
 
3076
                os.waitpid(self.pid, os.WNOHANG)
 
3077
 
 
3078
    test_blocks = partition_tests(suite, concurrency)
 
3079
    for process_tests in test_blocks:
 
3080
        process_suite = TestSuite()
 
3081
        process_suite.addTests(process_tests)
 
3082
        c2pread, c2pwrite = os.pipe()
 
3083
        pid = os.fork()
 
3084
        if pid == 0:
 
3085
            try:
 
3086
                os.close(c2pread)
 
3087
                # Leave stderr and stdout open so we can see test noise
 
3088
                # Close stdin so that the child goes away if it decides to
 
3089
                # read from stdin (otherwise its a roulette to see what
 
3090
                # child actually gets keystrokes for pdb etc).
 
3091
                sys.stdin.close()
 
3092
                sys.stdin = None
 
3093
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3094
                subunit_result = AutoTimingTestResultDecorator(
 
3095
                    TestProtocolClient(stream))
 
3096
                process_suite.run(subunit_result)
 
3097
            finally:
 
3098
                os._exit(0)
 
3099
        else:
 
3100
            os.close(c2pwrite)
 
3101
            stream = os.fdopen(c2pread, 'rb', 1)
 
3102
            test = TestInOtherProcess(stream, pid)
 
3103
            result.append(test)
 
3104
    return result
 
3105
 
 
3106
 
 
3107
def reinvoke_for_tests(suite):
 
3108
    """Take suite and start up one runner per CPU using subprocess().
 
3109
 
 
3110
    :return: An iterable of TestCase-like objects which can each have
 
3111
        run(result) called on them to feed tests to result.
 
3112
    """
 
3113
    concurrency = osutils.local_concurrency()
 
3114
    result = []
 
3115
    from subunit import ProtocolTestCase
 
3116
    class TestInSubprocess(ProtocolTestCase):
 
3117
        def __init__(self, process, name):
 
3118
            ProtocolTestCase.__init__(self, process.stdout)
 
3119
            self.process = process
 
3120
            self.process.stdin.close()
 
3121
            self.name = name
 
3122
 
 
3123
        def run(self, result):
 
3124
            try:
 
3125
                ProtocolTestCase.run(self, result)
 
3126
            finally:
 
3127
                self.process.wait()
 
3128
                os.unlink(self.name)
 
3129
            # print "pid %d finished" % finished_process
 
3130
    test_blocks = partition_tests(suite, concurrency)
 
3131
    for process_tests in test_blocks:
 
3132
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3133
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3134
        if not os.path.isfile(bzr_path):
 
3135
            # We are probably installed. Assume sys.argv is the right file
 
3136
            bzr_path = sys.argv[0]
 
3137
        fd, test_list_file_name = tempfile.mkstemp()
 
3138
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3139
        for test in process_tests:
 
3140
            test_list_file.write(test.id() + '\n')
 
3141
        test_list_file.close()
 
3142
        try:
 
3143
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3144
                '--subunit']
 
3145
            if '--no-plugins' in sys.argv:
 
3146
                argv.append('--no-plugins')
 
3147
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3148
            # stderr it can interrupt the subunit protocol.
 
3149
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3150
                bufsize=1)
 
3151
            test = TestInSubprocess(process, test_list_file_name)
 
3152
            result.append(test)
 
3153
        except:
 
3154
            os.unlink(test_list_file_name)
 
3155
            raise
 
3156
    return result
 
3157
 
 
3158
 
 
3159
class ForwardingResult(unittest.TestResult):
 
3160
 
 
3161
    def __init__(self, target):
 
3162
        unittest.TestResult.__init__(self)
 
3163
        self.result = target
 
3164
 
 
3165
    def startTest(self, test):
 
3166
        self.result.startTest(test)
 
3167
 
 
3168
    def stopTest(self, test):
 
3169
        self.result.stopTest(test)
 
3170
 
 
3171
    def addSkip(self, test, reason):
 
3172
        self.result.addSkip(test, reason)
 
3173
 
 
3174
    def addSuccess(self, test):
 
3175
        self.result.addSuccess(test)
 
3176
 
 
3177
    def addError(self, test, err):
 
3178
        self.result.addError(test, err)
 
3179
 
 
3180
    def addFailure(self, test, err):
 
3181
        self.result.addFailure(test, err)
 
3182
 
 
3183
 
 
3184
class BZRTransformingResult(ForwardingResult):
 
3185
 
 
3186
    def addError(self, test, err):
 
3187
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3188
        if feature is not None:
 
3189
            self.result.addNotSupported(test, feature)
 
3190
        else:
 
3191
            self.result.addError(test, err)
 
3192
 
 
3193
    def addFailure(self, test, err):
 
3194
        known = self._error_looks_like('KnownFailure: ', err)
 
3195
        if known is not None:
 
3196
            self.result._addKnownFailure(test, [KnownFailure,
 
3197
                                                KnownFailure(known), None])
 
3198
        else:
 
3199
            self.result.addFailure(test, err)
 
3200
 
 
3201
    def _error_looks_like(self, prefix, err):
 
3202
        """Deserialize exception and returns the stringify value."""
 
3203
        import subunit
 
3204
        value = None
 
3205
        typ, exc, _ = err
 
3206
        if isinstance(exc, subunit.RemoteException):
 
3207
            # stringify the exception gives access to the remote traceback
 
3208
            # We search the last line for 'prefix'
 
3209
            lines = str(exc).split('\n')
 
3210
            while lines and not lines[-1]:
 
3211
                lines.pop(-1)
 
3212
            if lines:
 
3213
                if lines[-1].startswith(prefix):
 
3214
                    value = lines[-1][len(prefix):]
 
3215
        return value
 
3216
 
 
3217
 
 
3218
class ProfileResult(ForwardingResult):
 
3219
    """Generate profiling data for all activity between start and success.
 
3220
    
 
3221
    The profile data is appended to the test's _benchcalls attribute and can
 
3222
    be accessed by the forwarded-to TestResult.
 
3223
 
 
3224
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3225
    where our existing output support for lsprof is, and this class aims to
 
3226
    fit in with that: while it could be moved it's not necessary to accomplish
 
3227
    test profiling, nor would it be dramatically cleaner.
 
3228
    """
 
3229
 
 
3230
    def startTest(self, test):
 
3231
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3232
        self.profiler.start()
 
3233
        ForwardingResult.startTest(self, test)
 
3234
 
 
3235
    def addSuccess(self, test):
 
3236
        stats = self.profiler.stop()
 
3237
        try:
 
3238
            calls = test._benchcalls
 
3239
        except AttributeError:
 
3240
            test._benchcalls = []
 
3241
            calls = test._benchcalls
 
3242
        calls.append(((test.id(), "", ""), stats))
 
3243
        ForwardingResult.addSuccess(self, test)
 
3244
 
 
3245
    def stopTest(self, test):
 
3246
        ForwardingResult.stopTest(self, test)
 
3247
        self.profiler = None
 
3248
 
 
3249
 
 
3250
# Controlled by "bzr selftest -E=..." option
 
3251
# Currently supported:
 
3252
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3253
#                           preserves any flags supplied at the command line.
 
3254
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3255
#                           rather than failing tests. And no longer raise
 
3256
#                           LockContention when fctnl locks are not being used
 
3257
#                           with proper exclusion rules.
 
3258
selftest_debug_flags = set()
 
3259
 
 
3260
 
 
3261
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3262
             transport=None,
 
3263
             test_suite_factory=None,
 
3264
             lsprof_timed=None,
 
3265
             bench_history=None,
 
3266
             matching_tests_first=None,
 
3267
             list_only=False,
 
3268
             random_seed=None,
 
3269
             exclude_pattern=None,
 
3270
             strict=False,
 
3271
             load_list=None,
 
3272
             debug_flags=None,
 
3273
             starting_with=None,
 
3274
             runner_class=None,
 
3275
             suite_decorators=None,
 
3276
             stream=None,
 
3277
             lsprof_tests=False,
 
3278
             ):
 
3279
    """Run the whole test suite under the enhanced runner"""
 
3280
    # XXX: Very ugly way to do this...
 
3281
    # Disable warning about old formats because we don't want it to disturb
 
3282
    # any blackbox tests.
 
3283
    from bzrlib import repository
 
3284
    repository._deprecation_warning_done = True
 
3285
 
 
3286
    global default_transport
 
3287
    if transport is None:
 
3288
        transport = default_transport
 
3289
    old_transport = default_transport
 
3290
    default_transport = transport
 
3291
    global selftest_debug_flags
 
3292
    old_debug_flags = selftest_debug_flags
 
3293
    if debug_flags is not None:
 
3294
        selftest_debug_flags = set(debug_flags)
 
3295
    try:
 
3296
        if load_list is None:
 
3297
            keep_only = None
 
3298
        else:
 
3299
            keep_only = load_test_id_list(load_list)
 
3300
        if starting_with:
 
3301
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3302
                             for start in starting_with]
 
3303
        if test_suite_factory is None:
 
3304
            # Reduce loading time by loading modules based on the starting_with
 
3305
            # patterns.
 
3306
            suite = test_suite(keep_only, starting_with)
 
3307
        else:
 
3308
            suite = test_suite_factory()
 
3309
        if starting_with:
 
3310
            # But always filter as requested.
 
3311
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3312
        result_decorators = []
 
3313
        if lsprof_tests:
 
3314
            result_decorators.append(ProfileResult)
 
3315
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3316
                     stop_on_failure=stop_on_failure,
 
3317
                     transport=transport,
 
3318
                     lsprof_timed=lsprof_timed,
 
3319
                     bench_history=bench_history,
 
3320
                     matching_tests_first=matching_tests_first,
 
3321
                     list_only=list_only,
 
3322
                     random_seed=random_seed,
 
3323
                     exclude_pattern=exclude_pattern,
 
3324
                     strict=strict,
 
3325
                     runner_class=runner_class,
 
3326
                     suite_decorators=suite_decorators,
 
3327
                     stream=stream,
 
3328
                     result_decorators=result_decorators,
 
3329
                     )
 
3330
    finally:
 
3331
        default_transport = old_transport
 
3332
        selftest_debug_flags = old_debug_flags
 
3333
 
 
3334
 
 
3335
def load_test_id_list(file_name):
 
3336
    """Load a test id list from a text file.
 
3337
 
 
3338
    The format is one test id by line.  No special care is taken to impose
 
3339
    strict rules, these test ids are used to filter the test suite so a test id
 
3340
    that do not match an existing test will do no harm. This allows user to add
 
3341
    comments, leave blank lines, etc.
 
3342
    """
 
3343
    test_list = []
 
3344
    try:
 
3345
        ftest = open(file_name, 'rt')
 
3346
    except IOError, e:
 
3347
        if e.errno != errno.ENOENT:
 
3348
            raise
 
3349
        else:
 
3350
            raise errors.NoSuchFile(file_name)
 
3351
 
 
3352
    for test_name in ftest.readlines():
 
3353
        test_list.append(test_name.strip())
 
3354
    ftest.close()
 
3355
    return test_list
 
3356
 
 
3357
 
 
3358
def suite_matches_id_list(test_suite, id_list):
 
3359
    """Warns about tests not appearing or appearing more than once.
 
3360
 
 
3361
    :param test_suite: A TestSuite object.
 
3362
    :param test_id_list: The list of test ids that should be found in
 
3363
         test_suite.
 
3364
 
 
3365
    :return: (absents, duplicates) absents is a list containing the test found
 
3366
        in id_list but not in test_suite, duplicates is a list containing the
 
3367
        test found multiple times in test_suite.
 
3368
 
 
3369
    When using a prefined test id list, it may occurs that some tests do not
 
3370
    exist anymore or that some tests use the same id. This function warns the
 
3371
    tester about potential problems in his workflow (test lists are volatile)
 
3372
    or in the test suite itself (using the same id for several tests does not
 
3373
    help to localize defects).
 
3374
    """
 
3375
    # Build a dict counting id occurrences
 
3376
    tests = dict()
 
3377
    for test in iter_suite_tests(test_suite):
 
3378
        id = test.id()
 
3379
        tests[id] = tests.get(id, 0) + 1
 
3380
 
 
3381
    not_found = []
 
3382
    duplicates = []
 
3383
    for id in id_list:
 
3384
        occurs = tests.get(id, 0)
 
3385
        if not occurs:
 
3386
            not_found.append(id)
 
3387
        elif occurs > 1:
 
3388
            duplicates.append(id)
 
3389
 
 
3390
    return not_found, duplicates
 
3391
 
 
3392
 
 
3393
class TestIdList(object):
 
3394
    """Test id list to filter a test suite.
 
3395
 
 
3396
    Relying on the assumption that test ids are built as:
 
3397
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3398
    notation, this class offers methods to :
 
3399
    - avoid building a test suite for modules not refered to in the test list,
 
3400
    - keep only the tests listed from the module test suite.
 
3401
    """
 
3402
 
 
3403
    def __init__(self, test_id_list):
 
3404
        # When a test suite needs to be filtered against us we compare test ids
 
3405
        # for equality, so a simple dict offers a quick and simple solution.
 
3406
        self.tests = dict().fromkeys(test_id_list, True)
 
3407
 
 
3408
        # While unittest.TestCase have ids like:
 
3409
        # <module>.<class>.<method>[(<param+)],
 
3410
        # doctest.DocTestCase can have ids like:
 
3411
        # <module>
 
3412
        # <module>.<class>
 
3413
        # <module>.<function>
 
3414
        # <module>.<class>.<method>
 
3415
 
 
3416
        # Since we can't predict a test class from its name only, we settle on
 
3417
        # a simple constraint: a test id always begins with its module name.
 
3418
 
 
3419
        modules = {}
 
3420
        for test_id in test_id_list:
 
3421
            parts = test_id.split('.')
 
3422
            mod_name = parts.pop(0)
 
3423
            modules[mod_name] = True
 
3424
            for part in parts:
 
3425
                mod_name += '.' + part
 
3426
                modules[mod_name] = True
 
3427
        self.modules = modules
 
3428
 
 
3429
    def refers_to(self, module_name):
 
3430
        """Is there tests for the module or one of its sub modules."""
 
3431
        return self.modules.has_key(module_name)
 
3432
 
 
3433
    def includes(self, test_id):
 
3434
        return self.tests.has_key(test_id)
 
3435
 
 
3436
 
 
3437
class TestPrefixAliasRegistry(registry.Registry):
 
3438
    """A registry for test prefix aliases.
 
3439
 
 
3440
    This helps implement shorcuts for the --starting-with selftest
 
3441
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3442
    warning will be emitted).
 
3443
    """
 
3444
 
 
3445
    def register(self, key, obj, help=None, info=None,
 
3446
                 override_existing=False):
 
3447
        """See Registry.register.
 
3448
 
 
3449
        Trying to override an existing alias causes a warning to be emitted,
 
3450
        not a fatal execption.
 
3451
        """
 
3452
        try:
 
3453
            super(TestPrefixAliasRegistry, self).register(
 
3454
                key, obj, help=help, info=info, override_existing=False)
 
3455
        except KeyError:
 
3456
            actual = self.get(key)
 
3457
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3458
                 % (key, actual, obj))
 
3459
 
 
3460
    def resolve_alias(self, id_start):
 
3461
        """Replace the alias by the prefix in the given string.
 
3462
 
 
3463
        Using an unknown prefix is an error to help catching typos.
 
3464
        """
 
3465
        parts = id_start.split('.')
 
3466
        try:
 
3467
            parts[0] = self.get(parts[0])
 
3468
        except KeyError:
 
3469
            raise errors.BzrCommandError(
 
3470
                '%s is not a known test prefix alias' % parts[0])
 
3471
        return '.'.join(parts)
 
3472
 
 
3473
 
 
3474
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3475
"""Registry of test prefix aliases."""
 
3476
 
 
3477
 
 
3478
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3479
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3480
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3481
 
 
3482
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3483
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3484
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3485
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3486
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3487
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3488
 
 
3489
 
 
3490
def _test_suite_testmod_names():
 
3491
    """Return the standard list of test module names to test."""
 
3492
    return [
 
3493
        'bzrlib.doc',
 
3494
        'bzrlib.tests.blackbox',
 
3495
        'bzrlib.tests.commands',
 
3496
        'bzrlib.tests.per_branch',
 
3497
        'bzrlib.tests.per_bzrdir',
 
3498
        'bzrlib.tests.per_interrepository',
 
3499
        'bzrlib.tests.per_intertree',
 
3500
        'bzrlib.tests.per_inventory',
 
3501
        'bzrlib.tests.per_interbranch',
 
3502
        'bzrlib.tests.per_lock',
 
3503
        'bzrlib.tests.per_transport',
 
3504
        'bzrlib.tests.per_tree',
 
3505
        'bzrlib.tests.per_pack_repository',
 
3506
        'bzrlib.tests.per_repository',
 
3507
        'bzrlib.tests.per_repository_chk',
 
3508
        'bzrlib.tests.per_repository_reference',
 
3509
        'bzrlib.tests.per_versionedfile',
 
3510
        'bzrlib.tests.per_workingtree',
 
3511
        'bzrlib.tests.test__annotator',
 
3512
        'bzrlib.tests.test__chk_map',
 
3513
        'bzrlib.tests.test__dirstate_helpers',
 
3514
        'bzrlib.tests.test__groupcompress',
 
3515
        'bzrlib.tests.test__known_graph',
 
3516
        'bzrlib.tests.test__rio',
 
3517
        'bzrlib.tests.test__walkdirs_win32',
 
3518
        'bzrlib.tests.test_ancestry',
 
3519
        'bzrlib.tests.test_annotate',
 
3520
        'bzrlib.tests.test_api',
 
3521
        'bzrlib.tests.test_atomicfile',
 
3522
        'bzrlib.tests.test_bad_files',
 
3523
        'bzrlib.tests.test_bencode',
 
3524
        'bzrlib.tests.test_bisect_multi',
 
3525
        'bzrlib.tests.test_branch',
 
3526
        'bzrlib.tests.test_branchbuilder',
 
3527
        'bzrlib.tests.test_btree_index',
 
3528
        'bzrlib.tests.test_bugtracker',
 
3529
        'bzrlib.tests.test_bundle',
 
3530
        'bzrlib.tests.test_bzrdir',
 
3531
        'bzrlib.tests.test__chunks_to_lines',
 
3532
        'bzrlib.tests.test_cache_utf8',
 
3533
        'bzrlib.tests.test_chk_map',
 
3534
        'bzrlib.tests.test_chk_serializer',
 
3535
        'bzrlib.tests.test_chunk_writer',
 
3536
        'bzrlib.tests.test_clean_tree',
 
3537
        'bzrlib.tests.test_commands',
 
3538
        'bzrlib.tests.test_commit',
 
3539
        'bzrlib.tests.test_commit_merge',
 
3540
        'bzrlib.tests.test_config',
 
3541
        'bzrlib.tests.test_conflicts',
 
3542
        'bzrlib.tests.test_counted_lock',
 
3543
        'bzrlib.tests.test_crash',
 
3544
        'bzrlib.tests.test_decorators',
 
3545
        'bzrlib.tests.test_delta',
 
3546
        'bzrlib.tests.test_debug',
 
3547
        'bzrlib.tests.test_deprecated_graph',
 
3548
        'bzrlib.tests.test_diff',
 
3549
        'bzrlib.tests.test_directory_service',
 
3550
        'bzrlib.tests.test_dirstate',
 
3551
        'bzrlib.tests.test_email_message',
 
3552
        'bzrlib.tests.test_eol_filters',
 
3553
        'bzrlib.tests.test_errors',
 
3554
        'bzrlib.tests.test_export',
 
3555
        'bzrlib.tests.test_extract',
 
3556
        'bzrlib.tests.test_fetch',
 
3557
        'bzrlib.tests.test_fifo_cache',
 
3558
        'bzrlib.tests.test_filters',
 
3559
        'bzrlib.tests.test_ftp_transport',
 
3560
        'bzrlib.tests.test_foreign',
 
3561
        'bzrlib.tests.test_generate_docs',
 
3562
        'bzrlib.tests.test_generate_ids',
 
3563
        'bzrlib.tests.test_globbing',
 
3564
        'bzrlib.tests.test_gpg',
 
3565
        'bzrlib.tests.test_graph',
 
3566
        'bzrlib.tests.test_groupcompress',
 
3567
        'bzrlib.tests.test_hashcache',
 
3568
        'bzrlib.tests.test_help',
 
3569
        'bzrlib.tests.test_hooks',
 
3570
        'bzrlib.tests.test_http',
 
3571
        'bzrlib.tests.test_http_response',
 
3572
        'bzrlib.tests.test_https_ca_bundle',
 
3573
        'bzrlib.tests.test_identitymap',
 
3574
        'bzrlib.tests.test_ignores',
 
3575
        'bzrlib.tests.test_index',
 
3576
        'bzrlib.tests.test_info',
 
3577
        'bzrlib.tests.test_inv',
 
3578
        'bzrlib.tests.test_inventory_delta',
 
3579
        'bzrlib.tests.test_knit',
 
3580
        'bzrlib.tests.test_lazy_import',
 
3581
        'bzrlib.tests.test_lazy_regex',
 
3582
        'bzrlib.tests.test_lock',
 
3583
        'bzrlib.tests.test_lockable_files',
 
3584
        'bzrlib.tests.test_lockdir',
 
3585
        'bzrlib.tests.test_log',
 
3586
        'bzrlib.tests.test_lru_cache',
 
3587
        'bzrlib.tests.test_lsprof',
 
3588
        'bzrlib.tests.test_mail_client',
 
3589
        'bzrlib.tests.test_memorytree',
 
3590
        'bzrlib.tests.test_merge',
 
3591
        'bzrlib.tests.test_merge3',
 
3592
        'bzrlib.tests.test_merge_core',
 
3593
        'bzrlib.tests.test_merge_directive',
 
3594
        'bzrlib.tests.test_missing',
 
3595
        'bzrlib.tests.test_msgeditor',
 
3596
        'bzrlib.tests.test_multiparent',
 
3597
        'bzrlib.tests.test_mutabletree',
 
3598
        'bzrlib.tests.test_nonascii',
 
3599
        'bzrlib.tests.test_options',
 
3600
        'bzrlib.tests.test_osutils',
 
3601
        'bzrlib.tests.test_osutils_encodings',
 
3602
        'bzrlib.tests.test_pack',
 
3603
        'bzrlib.tests.test_patch',
 
3604
        'bzrlib.tests.test_patches',
 
3605
        'bzrlib.tests.test_permissions',
 
3606
        'bzrlib.tests.test_plugins',
 
3607
        'bzrlib.tests.test_progress',
 
3608
        'bzrlib.tests.test_read_bundle',
 
3609
        'bzrlib.tests.test_reconcile',
 
3610
        'bzrlib.tests.test_reconfigure',
 
3611
        'bzrlib.tests.test_registry',
 
3612
        'bzrlib.tests.test_remote',
 
3613
        'bzrlib.tests.test_rename_map',
 
3614
        'bzrlib.tests.test_repository',
 
3615
        'bzrlib.tests.test_revert',
 
3616
        'bzrlib.tests.test_revision',
 
3617
        'bzrlib.tests.test_revisionspec',
 
3618
        'bzrlib.tests.test_revisiontree',
 
3619
        'bzrlib.tests.test_rio',
 
3620
        'bzrlib.tests.test_rules',
 
3621
        'bzrlib.tests.test_sampler',
 
3622
        'bzrlib.tests.test_selftest',
 
3623
        'bzrlib.tests.test_serializer',
 
3624
        'bzrlib.tests.test_setup',
 
3625
        'bzrlib.tests.test_sftp_transport',
 
3626
        'bzrlib.tests.test_shelf',
 
3627
        'bzrlib.tests.test_shelf_ui',
 
3628
        'bzrlib.tests.test_smart',
 
3629
        'bzrlib.tests.test_smart_add',
 
3630
        'bzrlib.tests.test_smart_request',
 
3631
        'bzrlib.tests.test_smart_transport',
 
3632
        'bzrlib.tests.test_smtp_connection',
 
3633
        'bzrlib.tests.test_source',
 
3634
        'bzrlib.tests.test_ssh_transport',
 
3635
        'bzrlib.tests.test_status',
 
3636
        'bzrlib.tests.test_store',
 
3637
        'bzrlib.tests.test_strace',
 
3638
        'bzrlib.tests.test_subsume',
 
3639
        'bzrlib.tests.test_switch',
 
3640
        'bzrlib.tests.test_symbol_versioning',
 
3641
        'bzrlib.tests.test_tag',
 
3642
        'bzrlib.tests.test_testament',
 
3643
        'bzrlib.tests.test_textfile',
 
3644
        'bzrlib.tests.test_textmerge',
 
3645
        'bzrlib.tests.test_timestamp',
 
3646
        'bzrlib.tests.test_trace',
 
3647
        'bzrlib.tests.test_transactions',
 
3648
        'bzrlib.tests.test_transform',
 
3649
        'bzrlib.tests.test_transport',
 
3650
        'bzrlib.tests.test_transport_log',
 
3651
        'bzrlib.tests.test_tree',
 
3652
        'bzrlib.tests.test_treebuilder',
 
3653
        'bzrlib.tests.test_tsort',
 
3654
        'bzrlib.tests.test_tuned_gzip',
 
3655
        'bzrlib.tests.test_ui',
 
3656
        'bzrlib.tests.test_uncommit',
 
3657
        'bzrlib.tests.test_upgrade',
 
3658
        'bzrlib.tests.test_upgrade_stacked',
 
3659
        'bzrlib.tests.test_urlutils',
 
3660
        'bzrlib.tests.test_version',
 
3661
        'bzrlib.tests.test_version_info',
 
3662
        'bzrlib.tests.test_weave',
 
3663
        'bzrlib.tests.test_whitebox',
 
3664
        'bzrlib.tests.test_win32utils',
 
3665
        'bzrlib.tests.test_workingtree',
 
3666
        'bzrlib.tests.test_workingtree_4',
 
3667
        'bzrlib.tests.test_wsgi',
 
3668
        'bzrlib.tests.test_xml',
 
3669
        ]
 
3670
 
 
3671
 
 
3672
def _test_suite_modules_to_doctest():
 
3673
    """Return the list of modules to doctest."""   
 
3674
    return [
 
3675
        'bzrlib',
 
3676
        'bzrlib.branchbuilder',
 
3677
        'bzrlib.export',
 
3678
        'bzrlib.inventory',
 
3679
        'bzrlib.iterablefile',
 
3680
        'bzrlib.lockdir',
 
3681
        'bzrlib.merge3',
 
3682
        'bzrlib.option',
 
3683
        'bzrlib.symbol_versioning',
 
3684
        'bzrlib.tests',
 
3685
        'bzrlib.timestamp',
 
3686
        'bzrlib.version_info_formats.format_custom',
 
3687
        ]
 
3688
 
 
3689
 
 
3690
def test_suite(keep_only=None, starting_with=None):
 
3691
    """Build and return TestSuite for the whole of bzrlib.
 
3692
 
 
3693
    :param keep_only: A list of test ids limiting the suite returned.
 
3694
 
 
3695
    :param starting_with: An id limiting the suite returned to the tests
 
3696
         starting with it.
 
3697
 
 
3698
    This function can be replaced if you need to change the default test
 
3699
    suite on a global basis, but it is not encouraged.
 
3700
    """
 
3701
 
 
3702
    loader = TestUtil.TestLoader()
 
3703
 
 
3704
    if keep_only is not None:
 
3705
        id_filter = TestIdList(keep_only)
 
3706
    if starting_with:
 
3707
        # We take precedence over keep_only because *at loading time* using
 
3708
        # both options means we will load less tests for the same final result.
 
3709
        def interesting_module(name):
 
3710
            for start in starting_with:
 
3711
                if (
 
3712
                    # Either the module name starts with the specified string
 
3713
                    name.startswith(start)
 
3714
                    # or it may contain tests starting with the specified string
 
3715
                    or start.startswith(name)
 
3716
                    ):
 
3717
                    return True
 
3718
            return False
 
3719
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3720
 
 
3721
    elif keep_only is not None:
 
3722
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3723
        def interesting_module(name):
 
3724
            return id_filter.refers_to(name)
 
3725
 
 
3726
    else:
 
3727
        loader = TestUtil.TestLoader()
 
3728
        def interesting_module(name):
 
3729
            # No filtering, all modules are interesting
 
3730
            return True
 
3731
 
 
3732
    suite = loader.suiteClass()
 
3733
 
 
3734
    # modules building their suite with loadTestsFromModuleNames
 
3735
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
3736
 
 
3737
    for mod in _test_suite_modules_to_doctest():
 
3738
        if not interesting_module(mod):
 
3739
            # No tests to keep here, move along
 
3740
            continue
 
3741
        try:
 
3742
            # note that this really does mean "report only" -- doctest
 
3743
            # still runs the rest of the examples
 
3744
            doc_suite = doctest.DocTestSuite(mod,
 
3745
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3746
        except ValueError, e:
 
3747
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3748
            raise
 
3749
        if len(doc_suite._tests) == 0:
 
3750
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3751
        suite.addTest(doc_suite)
 
3752
 
 
3753
    default_encoding = sys.getdefaultencoding()
 
3754
    for name, plugin in bzrlib.plugin.plugins().items():
 
3755
        if not interesting_module(plugin.module.__name__):
 
3756
            continue
 
3757
        plugin_suite = plugin.test_suite()
 
3758
        # We used to catch ImportError here and turn it into just a warning,
 
3759
        # but really if you don't have --no-plugins this should be a failure.
 
3760
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3761
        if plugin_suite is None:
 
3762
            plugin_suite = plugin.load_plugin_tests(loader)
 
3763
        if plugin_suite is not None:
 
3764
            suite.addTest(plugin_suite)
 
3765
        if default_encoding != sys.getdefaultencoding():
 
3766
            bzrlib.trace.warning(
 
3767
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3768
                sys.getdefaultencoding())
 
3769
            reload(sys)
 
3770
            sys.setdefaultencoding(default_encoding)
 
3771
 
 
3772
    if keep_only is not None:
 
3773
        # Now that the referred modules have loaded their tests, keep only the
 
3774
        # requested ones.
 
3775
        suite = filter_suite_by_id_list(suite, id_filter)
 
3776
        # Do some sanity checks on the id_list filtering
 
3777
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3778
        if starting_with:
 
3779
            # The tester has used both keep_only and starting_with, so he is
 
3780
            # already aware that some tests are excluded from the list, there
 
3781
            # is no need to tell him which.
 
3782
            pass
 
3783
        else:
 
3784
            # Some tests mentioned in the list are not in the test suite. The
 
3785
            # list may be out of date, report to the tester.
 
3786
            for id in not_found:
 
3787
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3788
        for id in duplicates:
 
3789
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3790
 
 
3791
    return suite
 
3792
 
 
3793
 
 
3794
def multiply_scenarios(scenarios_left, scenarios_right):
 
3795
    """Multiply two sets of scenarios.
 
3796
 
 
3797
    :returns: the cartesian product of the two sets of scenarios, that is
 
3798
        a scenario for every possible combination of a left scenario and a
 
3799
        right scenario.
 
3800
    """
 
3801
    return [
 
3802
        ('%s,%s' % (left_name, right_name),
 
3803
         dict(left_dict.items() + right_dict.items()))
 
3804
        for left_name, left_dict in scenarios_left
 
3805
        for right_name, right_dict in scenarios_right]
 
3806
 
 
3807
 
 
3808
def multiply_tests(tests, scenarios, result):
 
3809
    """Multiply tests_list by scenarios into result.
 
3810
 
 
3811
    This is the core workhorse for test parameterisation.
 
3812
 
 
3813
    Typically the load_tests() method for a per-implementation test suite will
 
3814
    call multiply_tests and return the result.
 
3815
 
 
3816
    :param tests: The tests to parameterise.
 
3817
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
3818
        scenario_param_dict).
 
3819
    :param result: A TestSuite to add created tests to.
 
3820
 
 
3821
    This returns the passed in result TestSuite with the cross product of all
 
3822
    the tests repeated once for each scenario.  Each test is adapted by adding
 
3823
    the scenario name at the end of its id(), and updating the test object's
 
3824
    __dict__ with the scenario_param_dict.
 
3825
 
 
3826
    >>> import bzrlib.tests.test_sampler
 
3827
    >>> r = multiply_tests(
 
3828
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
3829
    ...     [('one', dict(param=1)),
 
3830
    ...      ('two', dict(param=2))],
 
3831
    ...     TestSuite())
 
3832
    >>> tests = list(iter_suite_tests(r))
 
3833
    >>> len(tests)
 
3834
    2
 
3835
    >>> tests[0].id()
 
3836
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
3837
    >>> tests[0].param
 
3838
    1
 
3839
    >>> tests[1].param
 
3840
    2
 
3841
    """
 
3842
    for test in iter_suite_tests(tests):
 
3843
        apply_scenarios(test, scenarios, result)
 
3844
    return result
 
3845
 
 
3846
 
 
3847
def apply_scenarios(test, scenarios, result):
 
3848
    """Apply the scenarios in scenarios to test and add to result.
 
3849
 
 
3850
    :param test: The test to apply scenarios to.
 
3851
    :param scenarios: An iterable of scenarios to apply to test.
 
3852
    :return: result
 
3853
    :seealso: apply_scenario
 
3854
    """
 
3855
    for scenario in scenarios:
 
3856
        result.addTest(apply_scenario(test, scenario))
 
3857
    return result
 
3858
 
 
3859
 
 
3860
def apply_scenario(test, scenario):
 
3861
    """Copy test and apply scenario to it.
 
3862
 
 
3863
    :param test: A test to adapt.
 
3864
    :param scenario: A tuple describing the scenarion.
 
3865
        The first element of the tuple is the new test id.
 
3866
        The second element is a dict containing attributes to set on the
 
3867
        test.
 
3868
    :return: The adapted test.
 
3869
    """
 
3870
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
3871
    new_test = clone_test(test, new_id)
 
3872
    for name, value in scenario[1].items():
 
3873
        setattr(new_test, name, value)
 
3874
    return new_test
 
3875
 
 
3876
 
 
3877
def clone_test(test, new_id):
 
3878
    """Clone a test giving it a new id.
 
3879
 
 
3880
    :param test: The test to clone.
 
3881
    :param new_id: The id to assign to it.
 
3882
    :return: The new test.
 
3883
    """
 
3884
    new_test = copy(test)
 
3885
    new_test.id = lambda: new_id
 
3886
    return new_test
 
3887
 
 
3888
 
 
3889
def _rmtree_temp_dir(dirname):
 
3890
    # If LANG=C we probably have created some bogus paths
 
3891
    # which rmtree(unicode) will fail to delete
 
3892
    # so make sure we are using rmtree(str) to delete everything
 
3893
    # except on win32, where rmtree(str) will fail
 
3894
    # since it doesn't have the property of byte-stream paths
 
3895
    # (they are either ascii or mbcs)
 
3896
    if sys.platform == 'win32':
 
3897
        # make sure we are using the unicode win32 api
 
3898
        dirname = unicode(dirname)
 
3899
    else:
 
3900
        dirname = dirname.encode(sys.getfilesystemencoding())
 
3901
    try:
 
3902
        osutils.rmtree(dirname)
 
3903
    except OSError, e:
 
3904
        # We don't want to fail here because some useful display will be lost
 
3905
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
3906
        # possible info to the test runner is even worse.
 
3907
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
3908
                         % (os.path.basename(dirname), e))
 
3909
 
 
3910
 
 
3911
class Feature(object):
 
3912
    """An operating system Feature."""
 
3913
 
 
3914
    def __init__(self):
 
3915
        self._available = None
 
3916
 
 
3917
    def available(self):
 
3918
        """Is the feature available?
 
3919
 
 
3920
        :return: True if the feature is available.
 
3921
        """
 
3922
        if self._available is None:
 
3923
            self._available = self._probe()
 
3924
        return self._available
 
3925
 
 
3926
    def _probe(self):
 
3927
        """Implement this method in concrete features.
 
3928
 
 
3929
        :return: True if the feature is available.
 
3930
        """
 
3931
        raise NotImplementedError
 
3932
 
 
3933
    def __str__(self):
 
3934
        if getattr(self, 'feature_name', None):
 
3935
            return self.feature_name()
 
3936
        return self.__class__.__name__
 
3937
 
 
3938
 
 
3939
class _SymlinkFeature(Feature):
 
3940
 
 
3941
    def _probe(self):
 
3942
        return osutils.has_symlinks()
 
3943
 
 
3944
    def feature_name(self):
 
3945
        return 'symlinks'
 
3946
 
 
3947
SymlinkFeature = _SymlinkFeature()
 
3948
 
 
3949
 
 
3950
class _HardlinkFeature(Feature):
 
3951
 
 
3952
    def _probe(self):
 
3953
        return osutils.has_hardlinks()
 
3954
 
 
3955
    def feature_name(self):
 
3956
        return 'hardlinks'
 
3957
 
 
3958
HardlinkFeature = _HardlinkFeature()
 
3959
 
 
3960
 
 
3961
class _OsFifoFeature(Feature):
 
3962
 
 
3963
    def _probe(self):
 
3964
        return getattr(os, 'mkfifo', None)
 
3965
 
 
3966
    def feature_name(self):
 
3967
        return 'filesystem fifos'
 
3968
 
 
3969
OsFifoFeature = _OsFifoFeature()
 
3970
 
 
3971
 
 
3972
class _UnicodeFilenameFeature(Feature):
 
3973
    """Does the filesystem support Unicode filenames?"""
 
3974
 
 
3975
    def _probe(self):
 
3976
        try:
 
3977
            # Check for character combinations unlikely to be covered by any
 
3978
            # single non-unicode encoding. We use the characters
 
3979
            # - greek small letter alpha (U+03B1) and
 
3980
            # - braille pattern dots-123456 (U+283F).
 
3981
            os.stat(u'\u03b1\u283f')
 
3982
        except UnicodeEncodeError:
 
3983
            return False
 
3984
        except (IOError, OSError):
 
3985
            # The filesystem allows the Unicode filename but the file doesn't
 
3986
            # exist.
 
3987
            return True
 
3988
        else:
 
3989
            # The filesystem allows the Unicode filename and the file exists,
 
3990
            # for some reason.
 
3991
            return True
 
3992
 
 
3993
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
3994
 
 
3995
 
 
3996
def probe_unicode_in_user_encoding():
 
3997
    """Try to encode several unicode strings to use in unicode-aware tests.
 
3998
    Return first successfull match.
 
3999
 
 
4000
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4001
    """
 
4002
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4003
    for uni_val in possible_vals:
 
4004
        try:
 
4005
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4006
        except UnicodeEncodeError:
 
4007
            # Try a different character
 
4008
            pass
 
4009
        else:
 
4010
            return uni_val, str_val
 
4011
    return None, None
 
4012
 
 
4013
 
 
4014
def probe_bad_non_ascii(encoding):
 
4015
    """Try to find [bad] character with code [128..255]
 
4016
    that cannot be decoded to unicode in some encoding.
 
4017
    Return None if all non-ascii characters is valid
 
4018
    for given encoding.
 
4019
    """
 
4020
    for i in xrange(128, 256):
 
4021
        char = chr(i)
 
4022
        try:
 
4023
            char.decode(encoding)
 
4024
        except UnicodeDecodeError:
 
4025
            return char
 
4026
    return None
 
4027
 
 
4028
 
 
4029
class _HTTPSServerFeature(Feature):
 
4030
    """Some tests want an https Server, check if one is available.
 
4031
 
 
4032
    Right now, the only way this is available is under python2.6 which provides
 
4033
    an ssl module.
 
4034
    """
 
4035
 
 
4036
    def _probe(self):
 
4037
        try:
 
4038
            import ssl
 
4039
            return True
 
4040
        except ImportError:
 
4041
            return False
 
4042
 
 
4043
    def feature_name(self):
 
4044
        return 'HTTPSServer'
 
4045
 
 
4046
 
 
4047
HTTPSServerFeature = _HTTPSServerFeature()
 
4048
 
 
4049
 
 
4050
class _UnicodeFilename(Feature):
 
4051
    """Does the filesystem support Unicode filenames?"""
 
4052
 
 
4053
    def _probe(self):
 
4054
        try:
 
4055
            os.stat(u'\u03b1')
 
4056
        except UnicodeEncodeError:
 
4057
            return False
 
4058
        except (IOError, OSError):
 
4059
            # The filesystem allows the Unicode filename but the file doesn't
 
4060
            # exist.
 
4061
            return True
 
4062
        else:
 
4063
            # The filesystem allows the Unicode filename and the file exists,
 
4064
            # for some reason.
 
4065
            return True
 
4066
 
 
4067
UnicodeFilename = _UnicodeFilename()
 
4068
 
 
4069
 
 
4070
class _UTF8Filesystem(Feature):
 
4071
    """Is the filesystem UTF-8?"""
 
4072
 
 
4073
    def _probe(self):
 
4074
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4075
            return True
 
4076
        return False
 
4077
 
 
4078
UTF8Filesystem = _UTF8Filesystem()
 
4079
 
 
4080
 
 
4081
class _CaseInsCasePresFilenameFeature(Feature):
 
4082
    """Is the file-system case insensitive, but case-preserving?"""
 
4083
 
 
4084
    def _probe(self):
 
4085
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4086
        try:
 
4087
            # first check truly case-preserving for created files, then check
 
4088
            # case insensitive when opening existing files.
 
4089
            name = osutils.normpath(name)
 
4090
            base, rel = osutils.split(name)
 
4091
            found_rel = osutils.canonical_relpath(base, name)
 
4092
            return (found_rel == rel
 
4093
                    and os.path.isfile(name.upper())
 
4094
                    and os.path.isfile(name.lower()))
 
4095
        finally:
 
4096
            os.close(fileno)
 
4097
            os.remove(name)
 
4098
 
 
4099
    def feature_name(self):
 
4100
        return "case-insensitive case-preserving filesystem"
 
4101
 
 
4102
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4103
 
 
4104
 
 
4105
class _CaseInsensitiveFilesystemFeature(Feature):
 
4106
    """Check if underlying filesystem is case-insensitive but *not* case
 
4107
    preserving.
 
4108
    """
 
4109
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4110
    # more likely to be case preserving, so this case is rare.
 
4111
 
 
4112
    def _probe(self):
 
4113
        if CaseInsCasePresFilenameFeature.available():
 
4114
            return False
 
4115
 
 
4116
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4117
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4118
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4119
        else:
 
4120
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4121
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4122
            dir=root)
 
4123
        name_a = osutils.pathjoin(tdir, 'a')
 
4124
        name_A = osutils.pathjoin(tdir, 'A')
 
4125
        os.mkdir(name_a)
 
4126
        result = osutils.isdir(name_A)
 
4127
        _rmtree_temp_dir(tdir)
 
4128
        return result
 
4129
 
 
4130
    def feature_name(self):
 
4131
        return 'case-insensitive filesystem'
 
4132
 
 
4133
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4134
 
 
4135
 
 
4136
class _SubUnitFeature(Feature):
 
4137
    """Check if subunit is available."""
 
4138
 
 
4139
    def _probe(self):
 
4140
        try:
 
4141
            import subunit
 
4142
            return True
 
4143
        except ImportError:
 
4144
            return False
 
4145
 
 
4146
    def feature_name(self):
 
4147
        return 'subunit'
 
4148
 
 
4149
SubUnitFeature = _SubUnitFeature()
 
4150
# Only define SubUnitBzrRunner if subunit is available.
 
4151
try:
 
4152
    from subunit import TestProtocolClient
 
4153
    try:
 
4154
        from subunit.test_results import AutoTimingTestResultDecorator
 
4155
    except ImportError:
 
4156
        AutoTimingTestResultDecorator = lambda x:x
 
4157
    class SubUnitBzrRunner(TextTestRunner):
 
4158
        def run(self, test):
 
4159
            result = AutoTimingTestResultDecorator(
 
4160
                TestProtocolClient(self.stream))
 
4161
            test.run(result)
 
4162
            return result
 
4163
except ImportError:
 
4164
    pass