/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Pool
  • Date: 2009-09-18 01:08:24 UTC
  • mto: This revision was merged to the branch mainline in revision 4712.
  • Revision ID: mbp@sourcefrog.net-20090918010824-o96afvwbbxhw4d5p
Unhandled smart-server exceptions are reported using generic report_exception

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from copy import copy
 
32
from cStringIO import StringIO
 
33
import difflib
 
34
import doctest
 
35
import errno
 
36
import logging
 
37
import math
 
38
import os
 
39
from pprint import pformat
 
40
import random
 
41
import re
 
42
import shlex
 
43
import stat
 
44
from subprocess import Popen, PIPE, STDOUT
 
45
import sys
 
46
import tempfile
 
47
import threading
 
48
import time
 
49
import unittest
 
50
import warnings
 
51
 
 
52
 
 
53
from bzrlib import (
 
54
    branchbuilder,
 
55
    bzrdir,
 
56
    debug,
 
57
    errors,
 
58
    hooks,
 
59
    lock as _mod_lock,
 
60
    memorytree,
 
61
    osutils,
 
62
    progress,
 
63
    ui,
 
64
    urlutils,
 
65
    registry,
 
66
    workingtree,
 
67
    )
 
68
import bzrlib.branch
 
69
import bzrlib.commands
 
70
import bzrlib.timestamp
 
71
import bzrlib.export
 
72
import bzrlib.inventory
 
73
import bzrlib.iterablefile
 
74
import bzrlib.lockdir
 
75
try:
 
76
    import bzrlib.lsprof
 
77
except ImportError:
 
78
    # lsprof not available
 
79
    pass
 
80
from bzrlib.merge import merge_inner
 
81
import bzrlib.merge3
 
82
import bzrlib.plugin
 
83
from bzrlib.smart import client, request, server
 
84
import bzrlib.store
 
85
from bzrlib import symbol_versioning
 
86
from bzrlib.symbol_versioning import (
 
87
    DEPRECATED_PARAMETER,
 
88
    deprecated_function,
 
89
    deprecated_method,
 
90
    deprecated_passed,
 
91
    )
 
92
import bzrlib.trace
 
93
from bzrlib.transport import get_transport
 
94
import bzrlib.transport
 
95
from bzrlib.transport.local import LocalURLServer
 
96
from bzrlib.transport.memory import MemoryServer
 
97
from bzrlib.transport.readonly import ReadonlyServer
 
98
from bzrlib.trace import mutter, note
 
99
from bzrlib.tests import TestUtil
 
100
from bzrlib.tests.http_server import HttpServer
 
101
from bzrlib.tests.TestUtil import (
 
102
                          TestSuite,
 
103
                          TestLoader,
 
104
                          )
 
105
from bzrlib.tests.treeshape import build_tree_contents
 
106
from bzrlib.ui import NullProgressView
 
107
from bzrlib.ui.text import TextUIFactory
 
108
import bzrlib.version_info_formats.format_custom
 
109
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
110
 
 
111
# Mark this python module as being part of the implementation
 
112
# of unittest: this gives us better tracebacks where the last
 
113
# shown frame is the test code, not our assertXYZ.
 
114
__unittest = 1
 
115
 
 
116
default_transport = LocalURLServer
 
117
 
 
118
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
119
SUBUNIT_SEEK_SET = 0
 
120
SUBUNIT_SEEK_CUR = 1
 
121
 
 
122
 
 
123
class ExtendedTestResult(unittest._TextTestResult):
 
124
    """Accepts, reports and accumulates the results of running tests.
 
125
 
 
126
    Compared to the unittest version this class adds support for
 
127
    profiling, benchmarking, stopping as soon as a test fails,  and
 
128
    skipping tests.  There are further-specialized subclasses for
 
129
    different types of display.
 
130
 
 
131
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
132
    addFailure or addError classes.  These in turn may redirect to a more
 
133
    specific case for the special test results supported by our extended
 
134
    tests.
 
135
 
 
136
    Note that just one of these objects is fed the results from many tests.
 
137
    """
 
138
 
 
139
    stop_early = False
 
140
 
 
141
    def __init__(self, stream, descriptions, verbosity,
 
142
                 bench_history=None,
 
143
                 strict=False,
 
144
                 ):
 
145
        """Construct new TestResult.
 
146
 
 
147
        :param bench_history: Optionally, a writable file object to accumulate
 
148
            benchmark results.
 
149
        """
 
150
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
151
        if bench_history is not None:
 
152
            from bzrlib.version import _get_bzr_source_tree
 
153
            src_tree = _get_bzr_source_tree()
 
154
            if src_tree:
 
155
                try:
 
156
                    revision_id = src_tree.get_parent_ids()[0]
 
157
                except IndexError:
 
158
                    # XXX: if this is a brand new tree, do the same as if there
 
159
                    # is no branch.
 
160
                    revision_id = ''
 
161
            else:
 
162
                # XXX: If there's no branch, what should we do?
 
163
                revision_id = ''
 
164
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
165
        self._bench_history = bench_history
 
166
        self.ui = ui.ui_factory
 
167
        self.num_tests = 0
 
168
        self.error_count = 0
 
169
        self.failure_count = 0
 
170
        self.known_failure_count = 0
 
171
        self.skip_count = 0
 
172
        self.not_applicable_count = 0
 
173
        self.unsupported = {}
 
174
        self.count = 0
 
175
        self._overall_start_time = time.time()
 
176
        self._strict = strict
 
177
 
 
178
    def stopTestRun(self):
 
179
        run = self.testsRun
 
180
        actionTaken = "Ran"
 
181
        stopTime = time.time()
 
182
        timeTaken = stopTime - self.startTime
 
183
        self.printErrors()
 
184
        self.stream.writeln(self.separator2)
 
185
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
186
                            run, run != 1 and "s" or "", timeTaken))
 
187
        self.stream.writeln()
 
188
        if not self.wasSuccessful():
 
189
            self.stream.write("FAILED (")
 
190
            failed, errored = map(len, (self.failures, self.errors))
 
191
            if failed:
 
192
                self.stream.write("failures=%d" % failed)
 
193
            if errored:
 
194
                if failed: self.stream.write(", ")
 
195
                self.stream.write("errors=%d" % errored)
 
196
            if self.known_failure_count:
 
197
                if failed or errored: self.stream.write(", ")
 
198
                self.stream.write("known_failure_count=%d" %
 
199
                    self.known_failure_count)
 
200
            self.stream.writeln(")")
 
201
        else:
 
202
            if self.known_failure_count:
 
203
                self.stream.writeln("OK (known_failures=%d)" %
 
204
                    self.known_failure_count)
 
205
            else:
 
206
                self.stream.writeln("OK")
 
207
        if self.skip_count > 0:
 
208
            skipped = self.skip_count
 
209
            self.stream.writeln('%d test%s skipped' %
 
210
                                (skipped, skipped != 1 and "s" or ""))
 
211
        if self.unsupported:
 
212
            for feature, count in sorted(self.unsupported.items()):
 
213
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
214
                    (feature, count))
 
215
        if self._strict:
 
216
            ok = self.wasStrictlySuccessful()
 
217
        else:
 
218
            ok = self.wasSuccessful()
 
219
        if TestCase._first_thread_leaker_id:
 
220
            self.stream.write(
 
221
                '%s is leaking threads among %d leaking tests.\n' % (
 
222
                TestCase._first_thread_leaker_id,
 
223
                TestCase._leaking_threads_tests))
 
224
 
 
225
    def _extractBenchmarkTime(self, testCase):
 
226
        """Add a benchmark time for the current test case."""
 
227
        return getattr(testCase, "_benchtime", None)
 
228
 
 
229
    def _elapsedTestTimeString(self):
 
230
        """Return a time string for the overall time the current test has taken."""
 
231
        return self._formatTime(time.time() - self._start_time)
 
232
 
 
233
    def _testTimeString(self, testCase):
 
234
        benchmark_time = self._extractBenchmarkTime(testCase)
 
235
        if benchmark_time is not None:
 
236
            return self._formatTime(benchmark_time) + "*"
 
237
        else:
 
238
            return self._elapsedTestTimeString()
 
239
 
 
240
    def _formatTime(self, seconds):
 
241
        """Format seconds as milliseconds with leading spaces."""
 
242
        # some benchmarks can take thousands of seconds to run, so we need 8
 
243
        # places
 
244
        return "%8dms" % (1000 * seconds)
 
245
 
 
246
    def _shortened_test_description(self, test):
 
247
        what = test.id()
 
248
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
249
        return what
 
250
 
 
251
    def startTest(self, test):
 
252
        unittest.TestResult.startTest(self, test)
 
253
        if self.count == 0:
 
254
            self.startTests()
 
255
        self.report_test_start(test)
 
256
        test.number = self.count
 
257
        self._recordTestStartTime()
 
258
 
 
259
    def startTests(self):
 
260
        import platform
 
261
        if getattr(sys, 'frozen', None) is None:
 
262
            bzr_path = osutils.realpath(sys.argv[0])
 
263
        else:
 
264
            bzr_path = sys.executable
 
265
        self.stream.write(
 
266
            'testing: %s\n' % (bzr_path,))
 
267
        self.stream.write(
 
268
            '   %s\n' % (
 
269
                    bzrlib.__path__[0],))
 
270
        self.stream.write(
 
271
            '   bzr-%s python-%s %s\n' % (
 
272
                    bzrlib.version_string,
 
273
                    bzrlib._format_version_tuple(sys.version_info),
 
274
                    platform.platform(aliased=1),
 
275
                    ))
 
276
        self.stream.write('\n')
 
277
 
 
278
    def _recordTestStartTime(self):
 
279
        """Record that a test has started."""
 
280
        self._start_time = time.time()
 
281
 
 
282
    def _cleanupLogFile(self, test):
 
283
        # We can only do this if we have one of our TestCases, not if
 
284
        # we have a doctest.
 
285
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
286
        if setKeepLogfile is not None:
 
287
            setKeepLogfile()
 
288
 
 
289
    def addError(self, test, err):
 
290
        """Tell result that test finished with an error.
 
291
 
 
292
        Called from the TestCase run() method when the test
 
293
        fails with an unexpected error.
 
294
        """
 
295
        self._testConcluded(test)
 
296
        if isinstance(err[1], TestNotApplicable):
 
297
            return self._addNotApplicable(test, err)
 
298
        elif isinstance(err[1], UnavailableFeature):
 
299
            return self.addNotSupported(test, err[1].args[0])
 
300
        else:
 
301
            self._post_mortem()
 
302
            unittest.TestResult.addError(self, test, err)
 
303
            self.error_count += 1
 
304
            self.report_error(test, err)
 
305
            if self.stop_early:
 
306
                self.stop()
 
307
            self._cleanupLogFile(test)
 
308
 
 
309
    def addFailure(self, test, err):
 
310
        """Tell result that test failed.
 
311
 
 
312
        Called from the TestCase run() method when the test
 
313
        fails because e.g. an assert() method failed.
 
314
        """
 
315
        self._testConcluded(test)
 
316
        if isinstance(err[1], KnownFailure):
 
317
            return self._addKnownFailure(test, err)
 
318
        else:
 
319
            self._post_mortem()
 
320
            unittest.TestResult.addFailure(self, test, err)
 
321
            self.failure_count += 1
 
322
            self.report_failure(test, err)
 
323
            if self.stop_early:
 
324
                self.stop()
 
325
            self._cleanupLogFile(test)
 
326
 
 
327
    def addSuccess(self, test):
 
328
        """Tell result that test completed successfully.
 
329
 
 
330
        Called from the TestCase run()
 
331
        """
 
332
        self._testConcluded(test)
 
333
        if self._bench_history is not None:
 
334
            benchmark_time = self._extractBenchmarkTime(test)
 
335
            if benchmark_time is not None:
 
336
                self._bench_history.write("%s %s\n" % (
 
337
                    self._formatTime(benchmark_time),
 
338
                    test.id()))
 
339
        self.report_success(test)
 
340
        self._cleanupLogFile(test)
 
341
        unittest.TestResult.addSuccess(self, test)
 
342
        test._log_contents = ''
 
343
 
 
344
    def _testConcluded(self, test):
 
345
        """Common code when a test has finished.
 
346
 
 
347
        Called regardless of whether it succeded, failed, etc.
 
348
        """
 
349
        pass
 
350
 
 
351
    def _addKnownFailure(self, test, err):
 
352
        self.known_failure_count += 1
 
353
        self.report_known_failure(test, err)
 
354
 
 
355
    def addNotSupported(self, test, feature):
 
356
        """The test will not be run because of a missing feature.
 
357
        """
 
358
        # this can be called in two different ways: it may be that the
 
359
        # test started running, and then raised (through addError)
 
360
        # UnavailableFeature.  Alternatively this method can be called
 
361
        # while probing for features before running the tests; in that
 
362
        # case we will see startTest and stopTest, but the test will never
 
363
        # actually run.
 
364
        self.unsupported.setdefault(str(feature), 0)
 
365
        self.unsupported[str(feature)] += 1
 
366
        self.report_unsupported(test, feature)
 
367
 
 
368
    def addSkip(self, test, reason):
 
369
        """A test has not run for 'reason'."""
 
370
        self.skip_count += 1
 
371
        self.report_skip(test, reason)
 
372
 
 
373
    def _addNotApplicable(self, test, skip_excinfo):
 
374
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
375
            self.not_applicable_count += 1
 
376
            self.report_not_applicable(test, skip_excinfo)
 
377
        try:
 
378
            test.tearDown()
 
379
        except KeyboardInterrupt:
 
380
            raise
 
381
        except:
 
382
            self.addError(test, test.exc_info())
 
383
        else:
 
384
            # seems best to treat this as success from point-of-view of unittest
 
385
            # -- it actually does nothing so it barely matters :)
 
386
            unittest.TestResult.addSuccess(self, test)
 
387
            test._log_contents = ''
 
388
 
 
389
    def printErrorList(self, flavour, errors):
 
390
        for test, err in errors:
 
391
            self.stream.writeln(self.separator1)
 
392
            self.stream.write("%s: " % flavour)
 
393
            self.stream.writeln(self.getDescription(test))
 
394
            if getattr(test, '_get_log', None) is not None:
 
395
                log_contents = test._get_log()
 
396
                if log_contents:
 
397
                    self.stream.write('\n')
 
398
                    self.stream.write(
 
399
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
400
                    self.stream.write('\n')
 
401
                    self.stream.write(log_contents)
 
402
                    self.stream.write('\n')
 
403
                    self.stream.write(
 
404
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
405
                    self.stream.write('\n')
 
406
            self.stream.writeln(self.separator2)
 
407
            self.stream.writeln("%s" % err)
 
408
 
 
409
    def _post_mortem(self):
 
410
        """Start a PDB post mortem session."""
 
411
        if os.environ.get('BZR_TEST_PDB', None):
 
412
            import pdb;pdb.post_mortem()
 
413
 
 
414
    def progress(self, offset, whence):
 
415
        """The test is adjusting the count of tests to run."""
 
416
        if whence == SUBUNIT_SEEK_SET:
 
417
            self.num_tests = offset
 
418
        elif whence == SUBUNIT_SEEK_CUR:
 
419
            self.num_tests += offset
 
420
        else:
 
421
            raise errors.BzrError("Unknown whence %r" % whence)
 
422
 
 
423
    def report_cleaning_up(self):
 
424
        pass
 
425
 
 
426
    def startTestRun(self):
 
427
        self.startTime = time.time()
 
428
 
 
429
    def report_success(self, test):
 
430
        pass
 
431
 
 
432
    def wasStrictlySuccessful(self):
 
433
        if self.unsupported or self.known_failure_count:
 
434
            return False
 
435
        return self.wasSuccessful()
 
436
 
 
437
 
 
438
class TextTestResult(ExtendedTestResult):
 
439
    """Displays progress and results of tests in text form"""
 
440
 
 
441
    def __init__(self, stream, descriptions, verbosity,
 
442
                 bench_history=None,
 
443
                 pb=None,
 
444
                 strict=None,
 
445
                 ):
 
446
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
447
            bench_history, strict)
 
448
        # We no longer pass them around, but just rely on the UIFactory stack
 
449
        # for state
 
450
        if pb is not None:
 
451
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
452
        self.pb = self.ui.nested_progress_bar()
 
453
        self.pb.show_pct = False
 
454
        self.pb.show_spinner = False
 
455
        self.pb.show_eta = False,
 
456
        self.pb.show_count = False
 
457
        self.pb.show_bar = False
 
458
        self.pb.update_latency = 0
 
459
        self.pb.show_transport_activity = False
 
460
 
 
461
    def stopTestRun(self):
 
462
        # called when the tests that are going to run have run
 
463
        self.pb.clear()
 
464
        self.pb.finished()
 
465
        super(TextTestResult, self).stopTestRun()
 
466
 
 
467
    def startTestRun(self):
 
468
        super(TextTestResult, self).startTestRun()
 
469
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
470
 
 
471
    def printErrors(self):
 
472
        # clear the pb to make room for the error listing
 
473
        self.pb.clear()
 
474
        super(TextTestResult, self).printErrors()
 
475
 
 
476
    def _progress_prefix_text(self):
 
477
        # the longer this text, the less space we have to show the test
 
478
        # name...
 
479
        a = '[%d' % self.count              # total that have been run
 
480
        # tests skipped as known not to be relevant are not important enough
 
481
        # to show here
 
482
        ## if self.skip_count:
 
483
        ##     a += ', %d skip' % self.skip_count
 
484
        ## if self.known_failure_count:
 
485
        ##     a += '+%dX' % self.known_failure_count
 
486
        if self.num_tests:
 
487
            a +='/%d' % self.num_tests
 
488
        a += ' in '
 
489
        runtime = time.time() - self._overall_start_time
 
490
        if runtime >= 60:
 
491
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
492
        else:
 
493
            a += '%ds' % runtime
 
494
        if self.error_count:
 
495
            a += ', %d err' % self.error_count
 
496
        if self.failure_count:
 
497
            a += ', %d fail' % self.failure_count
 
498
        if self.unsupported:
 
499
            a += ', %d missing' % len(self.unsupported)
 
500
        a += ']'
 
501
        return a
 
502
 
 
503
    def report_test_start(self, test):
 
504
        self.count += 1
 
505
        self.pb.update(
 
506
                self._progress_prefix_text()
 
507
                + ' '
 
508
                + self._shortened_test_description(test))
 
509
 
 
510
    def _test_description(self, test):
 
511
        return self._shortened_test_description(test)
 
512
 
 
513
    def report_error(self, test, err):
 
514
        self.pb.note('ERROR: %s\n    %s\n',
 
515
            self._test_description(test),
 
516
            err[1],
 
517
            )
 
518
 
 
519
    def report_failure(self, test, err):
 
520
        self.pb.note('FAIL: %s\n    %s\n',
 
521
            self._test_description(test),
 
522
            err[1],
 
523
            )
 
524
 
 
525
    def report_known_failure(self, test, err):
 
526
        self.pb.note('XFAIL: %s\n%s\n',
 
527
            self._test_description(test), err[1])
 
528
 
 
529
    def report_skip(self, test, reason):
 
530
        pass
 
531
 
 
532
    def report_not_applicable(self, test, skip_excinfo):
 
533
        pass
 
534
 
 
535
    def report_unsupported(self, test, feature):
 
536
        """test cannot be run because feature is missing."""
 
537
 
 
538
    def report_cleaning_up(self):
 
539
        self.pb.update('Cleaning up')
 
540
 
 
541
 
 
542
class VerboseTestResult(ExtendedTestResult):
 
543
    """Produce long output, with one line per test run plus times"""
 
544
 
 
545
    def _ellipsize_to_right(self, a_string, final_width):
 
546
        """Truncate and pad a string, keeping the right hand side"""
 
547
        if len(a_string) > final_width:
 
548
            result = '...' + a_string[3-final_width:]
 
549
        else:
 
550
            result = a_string
 
551
        return result.ljust(final_width)
 
552
 
 
553
    def startTestRun(self):
 
554
        super(VerboseTestResult, self).startTestRun()
 
555
        self.stream.write('running %d tests...\n' % self.num_tests)
 
556
 
 
557
    def report_test_start(self, test):
 
558
        self.count += 1
 
559
        name = self._shortened_test_description(test)
 
560
        # width needs space for 6 char status, plus 1 for slash, plus an
 
561
        # 11-char time string, plus a trailing blank
 
562
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
563
        self.stream.write(self._ellipsize_to_right(name,
 
564
                          osutils.terminal_width()-18))
 
565
        self.stream.flush()
 
566
 
 
567
    def _error_summary(self, err):
 
568
        indent = ' ' * 4
 
569
        return '%s%s' % (indent, err[1])
 
570
 
 
571
    def report_error(self, test, err):
 
572
        self.stream.writeln('ERROR %s\n%s'
 
573
                % (self._testTimeString(test),
 
574
                   self._error_summary(err)))
 
575
 
 
576
    def report_failure(self, test, err):
 
577
        self.stream.writeln(' FAIL %s\n%s'
 
578
                % (self._testTimeString(test),
 
579
                   self._error_summary(err)))
 
580
 
 
581
    def report_known_failure(self, test, err):
 
582
        self.stream.writeln('XFAIL %s\n%s'
 
583
                % (self._testTimeString(test),
 
584
                   self._error_summary(err)))
 
585
 
 
586
    def report_success(self, test):
 
587
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
588
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
589
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
590
            stats.pprint(file=self.stream)
 
591
        # flush the stream so that we get smooth output. This verbose mode is
 
592
        # used to show the output in PQM.
 
593
        self.stream.flush()
 
594
 
 
595
    def report_skip(self, test, reason):
 
596
        self.stream.writeln(' SKIP %s\n%s'
 
597
                % (self._testTimeString(test), reason))
 
598
 
 
599
    def report_not_applicable(self, test, skip_excinfo):
 
600
        self.stream.writeln('  N/A %s\n%s'
 
601
                % (self._testTimeString(test),
 
602
                   self._error_summary(skip_excinfo)))
 
603
 
 
604
    def report_unsupported(self, test, feature):
 
605
        """test cannot be run because feature is missing."""
 
606
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
607
                %(self._testTimeString(test), feature))
 
608
 
 
609
 
 
610
class TextTestRunner(object):
 
611
    stop_on_failure = False
 
612
 
 
613
    def __init__(self,
 
614
                 stream=sys.stderr,
 
615
                 descriptions=0,
 
616
                 verbosity=1,
 
617
                 bench_history=None,
 
618
                 strict=False,
 
619
                 result_decorators=None,
 
620
                 ):
 
621
        """Create a TextTestRunner.
 
622
 
 
623
        :param result_decorators: An optional list of decorators to apply
 
624
            to the result object being used by the runner. Decorators are
 
625
            applied left to right - the first element in the list is the 
 
626
            innermost decorator.
 
627
        """
 
628
        self.stream = unittest._WritelnDecorator(stream)
 
629
        self.descriptions = descriptions
 
630
        self.verbosity = verbosity
 
631
        self._bench_history = bench_history
 
632
        self._strict = strict
 
633
        self._result_decorators = result_decorators or []
 
634
 
 
635
    def run(self, test):
 
636
        "Run the given test case or test suite."
 
637
        if self.verbosity == 1:
 
638
            result_class = TextTestResult
 
639
        elif self.verbosity >= 2:
 
640
            result_class = VerboseTestResult
 
641
        original_result = result_class(self.stream,
 
642
                              self.descriptions,
 
643
                              self.verbosity,
 
644
                              bench_history=self._bench_history,
 
645
                              strict=self._strict,
 
646
                              )
 
647
        # Signal to result objects that look at stop early policy to stop,
 
648
        original_result.stop_early = self.stop_on_failure
 
649
        result = original_result
 
650
        for decorator in self._result_decorators:
 
651
            result = decorator(result)
 
652
            result.stop_early = self.stop_on_failure
 
653
        try:
 
654
            import testtools
 
655
        except ImportError:
 
656
            pass
 
657
        else:
 
658
            if isinstance(test, testtools.ConcurrentTestSuite):
 
659
                # We need to catch bzr specific behaviors
 
660
                result = BZRTransformingResult(result)
 
661
        result.startTestRun()
 
662
        try:
 
663
            test.run(result)
 
664
        finally:
 
665
            result.stopTestRun()
 
666
        # higher level code uses our extended protocol to determine
 
667
        # what exit code to give.
 
668
        return original_result
 
669
 
 
670
 
 
671
def iter_suite_tests(suite):
 
672
    """Return all tests in a suite, recursing through nested suites"""
 
673
    if isinstance(suite, unittest.TestCase):
 
674
        yield suite
 
675
    elif isinstance(suite, unittest.TestSuite):
 
676
        for item in suite:
 
677
            for r in iter_suite_tests(item):
 
678
                yield r
 
679
    else:
 
680
        raise Exception('unknown type %r for object %r'
 
681
                        % (type(suite), suite))
 
682
 
 
683
 
 
684
class TestSkipped(Exception):
 
685
    """Indicates that a test was intentionally skipped, rather than failing."""
 
686
 
 
687
 
 
688
class TestNotApplicable(TestSkipped):
 
689
    """A test is not applicable to the situation where it was run.
 
690
 
 
691
    This is only normally raised by parameterized tests, if they find that
 
692
    the instance they're constructed upon does not support one aspect
 
693
    of its interface.
 
694
    """
 
695
 
 
696
 
 
697
class KnownFailure(AssertionError):
 
698
    """Indicates that a test failed in a precisely expected manner.
 
699
 
 
700
    Such failures dont block the whole test suite from passing because they are
 
701
    indicators of partially completed code or of future work. We have an
 
702
    explicit error for them so that we can ensure that they are always visible:
 
703
    KnownFailures are always shown in the output of bzr selftest.
 
704
    """
 
705
 
 
706
 
 
707
class UnavailableFeature(Exception):
 
708
    """A feature required for this test was not available.
 
709
 
 
710
    The feature should be used to construct the exception.
 
711
    """
 
712
 
 
713
 
 
714
class CommandFailed(Exception):
 
715
    pass
 
716
 
 
717
 
 
718
class StringIOWrapper(object):
 
719
    """A wrapper around cStringIO which just adds an encoding attribute.
 
720
 
 
721
    Internally we can check sys.stdout to see what the output encoding
 
722
    should be. However, cStringIO has no encoding attribute that we can
 
723
    set. So we wrap it instead.
 
724
    """
 
725
    encoding='ascii'
 
726
    _cstring = None
 
727
 
 
728
    def __init__(self, s=None):
 
729
        if s is not None:
 
730
            self.__dict__['_cstring'] = StringIO(s)
 
731
        else:
 
732
            self.__dict__['_cstring'] = StringIO()
 
733
 
 
734
    def __getattr__(self, name, getattr=getattr):
 
735
        return getattr(self.__dict__['_cstring'], name)
 
736
 
 
737
    def __setattr__(self, name, val):
 
738
        if name == 'encoding':
 
739
            self.__dict__['encoding'] = val
 
740
        else:
 
741
            return setattr(self._cstring, name, val)
 
742
 
 
743
 
 
744
class TestUIFactory(TextUIFactory):
 
745
    """A UI Factory for testing.
 
746
 
 
747
    Hide the progress bar but emit note()s.
 
748
    Redirect stdin.
 
749
    Allows get_password to be tested without real tty attached.
 
750
 
 
751
    See also CannedInputUIFactory which lets you provide programmatic input in
 
752
    a structured way.
 
753
    """
 
754
    # TODO: Capture progress events at the model level and allow them to be
 
755
    # observed by tests that care.
 
756
    #
 
757
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
758
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
759
    # idea of how they're supposed to be different.
 
760
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
761
 
 
762
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
763
        if stdin is not None:
 
764
            # We use a StringIOWrapper to be able to test various
 
765
            # encodings, but the user is still responsible to
 
766
            # encode the string and to set the encoding attribute
 
767
            # of StringIOWrapper.
 
768
            stdin = StringIOWrapper(stdin)
 
769
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
770
 
 
771
    def get_non_echoed_password(self):
 
772
        """Get password from stdin without trying to handle the echo mode"""
 
773
        password = self.stdin.readline()
 
774
        if not password:
 
775
            raise EOFError
 
776
        if password[-1] == '\n':
 
777
            password = password[:-1]
 
778
        return password
 
779
 
 
780
    def make_progress_view(self):
 
781
        return NullProgressView()
 
782
 
 
783
 
 
784
class TestCase(unittest.TestCase):
 
785
    """Base class for bzr unit tests.
 
786
 
 
787
    Tests that need access to disk resources should subclass
 
788
    TestCaseInTempDir not TestCase.
 
789
 
 
790
    Error and debug log messages are redirected from their usual
 
791
    location into a temporary file, the contents of which can be
 
792
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
793
    so that it can also capture file IO.  When the test completes this file
 
794
    is read into memory and removed from disk.
 
795
 
 
796
    There are also convenience functions to invoke bzr's command-line
 
797
    routine, and to build and check bzr trees.
 
798
 
 
799
    In addition to the usual method of overriding tearDown(), this class also
 
800
    allows subclasses to register functions into the _cleanups list, which is
 
801
    run in order as the object is torn down.  It's less likely this will be
 
802
    accidentally overlooked.
 
803
    """
 
804
 
 
805
    _active_threads = None
 
806
    _leaking_threads_tests = 0
 
807
    _first_thread_leaker_id = None
 
808
    _log_file_name = None
 
809
    _log_contents = ''
 
810
    _keep_log_file = False
 
811
    # record lsprof data when performing benchmark calls.
 
812
    _gather_lsprof_in_benchmarks = False
 
813
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
814
                     '_log_contents', '_log_file_name', '_benchtime',
 
815
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
816
 
 
817
    def __init__(self, methodName='testMethod'):
 
818
        super(TestCase, self).__init__(methodName)
 
819
        self._cleanups = []
 
820
        self._bzr_test_setUp_run = False
 
821
        self._bzr_test_tearDown_run = False
 
822
 
 
823
    def setUp(self):
 
824
        unittest.TestCase.setUp(self)
 
825
        self._bzr_test_setUp_run = True
 
826
        self._cleanEnvironment()
 
827
        self._silenceUI()
 
828
        self._startLogFile()
 
829
        self._benchcalls = []
 
830
        self._benchtime = None
 
831
        self._clear_hooks()
 
832
        self._track_locks()
 
833
        self._clear_debug_flags()
 
834
        TestCase._active_threads = threading.activeCount()
 
835
        self.addCleanup(self._check_leaked_threads)
 
836
 
 
837
    def debug(self):
 
838
        # debug a frame up.
 
839
        import pdb
 
840
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
841
 
 
842
    def _check_leaked_threads(self):
 
843
        active = threading.activeCount()
 
844
        leaked_threads = active - TestCase._active_threads
 
845
        TestCase._active_threads = active
 
846
        if leaked_threads:
 
847
            TestCase._leaking_threads_tests += 1
 
848
            if TestCase._first_thread_leaker_id is None:
 
849
                TestCase._first_thread_leaker_id = self.id()
 
850
 
 
851
    def _clear_debug_flags(self):
 
852
        """Prevent externally set debug flags affecting tests.
 
853
 
 
854
        Tests that want to use debug flags can just set them in the
 
855
        debug_flags set during setup/teardown.
 
856
        """
 
857
        self._preserved_debug_flags = set(debug.debug_flags)
 
858
        if 'allow_debug' not in selftest_debug_flags:
 
859
            debug.debug_flags.clear()
 
860
        if 'disable_lock_checks' not in selftest_debug_flags:
 
861
            debug.debug_flags.add('strict_locks')
 
862
        self.addCleanup(self._restore_debug_flags)
 
863
 
 
864
    def _clear_hooks(self):
 
865
        # prevent hooks affecting tests
 
866
        self._preserved_hooks = {}
 
867
        for key, factory in hooks.known_hooks.items():
 
868
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
869
            current_hooks = hooks.known_hooks_key_to_object(key)
 
870
            self._preserved_hooks[parent] = (name, current_hooks)
 
871
        self.addCleanup(self._restoreHooks)
 
872
        for key, factory in hooks.known_hooks.items():
 
873
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
874
            setattr(parent, name, factory())
 
875
        # this hook should always be installed
 
876
        request._install_hook()
 
877
 
 
878
    def _silenceUI(self):
 
879
        """Turn off UI for duration of test"""
 
880
        # by default the UI is off; tests can turn it on if they want it.
 
881
        saved = ui.ui_factory
 
882
        def _restore():
 
883
            ui.ui_factory = saved
 
884
        ui.ui_factory = ui.SilentUIFactory()
 
885
        self.addCleanup(_restore)
 
886
 
 
887
    def _check_locks(self):
 
888
        """Check that all lock take/release actions have been paired."""
 
889
        # We always check for mismatched locks. If a mismatch is found, we
 
890
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
891
        # case we just print a warning.
 
892
        # unhook:
 
893
        acquired_locks = [lock for action, lock in self._lock_actions
 
894
                          if action == 'acquired']
 
895
        released_locks = [lock for action, lock in self._lock_actions
 
896
                          if action == 'released']
 
897
        broken_locks = [lock for action, lock in self._lock_actions
 
898
                        if action == 'broken']
 
899
        # trivially, given the tests for lock acquistion and release, if we
 
900
        # have as many in each list, it should be ok. Some lock tests also
 
901
        # break some locks on purpose and should be taken into account by
 
902
        # considering that breaking a lock is just a dirty way of releasing it.
 
903
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
904
            message = ('Different number of acquired and '
 
905
                       'released or broken locks. (%s, %s + %s)' %
 
906
                       (acquired_locks, released_locks, broken_locks))
 
907
            if not self._lock_check_thorough:
 
908
                # Rather than fail, just warn
 
909
                print "Broken test %s: %s" % (self, message)
 
910
                return
 
911
            self.fail(message)
 
912
 
 
913
    def _track_locks(self):
 
914
        """Track lock activity during tests."""
 
915
        self._lock_actions = []
 
916
        if 'disable_lock_checks' in selftest_debug_flags:
 
917
            self._lock_check_thorough = False
 
918
        else:
 
919
            self._lock_check_thorough = True
 
920
            
 
921
        self.addCleanup(self._check_locks)
 
922
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
923
                                                self._lock_acquired, None)
 
924
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
925
                                                self._lock_released, None)
 
926
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
927
                                                self._lock_broken, None)
 
928
 
 
929
    def _lock_acquired(self, result):
 
930
        self._lock_actions.append(('acquired', result))
 
931
 
 
932
    def _lock_released(self, result):
 
933
        self._lock_actions.append(('released', result))
 
934
 
 
935
    def _lock_broken(self, result):
 
936
        self._lock_actions.append(('broken', result))
 
937
 
 
938
    def start_server(self, transport_server, backing_server=None):
 
939
        """Start transport_server for this test.
 
940
 
 
941
        This starts the server, registers a cleanup for it and permits the
 
942
        server's urls to be used.
 
943
        """
 
944
        if backing_server is None:
 
945
            transport_server.setUp()
 
946
        else:
 
947
            transport_server.setUp(backing_server)
 
948
        self.addCleanup(transport_server.tearDown)
 
949
 
 
950
    def _ndiff_strings(self, a, b):
 
951
        """Return ndiff between two strings containing lines.
 
952
 
 
953
        A trailing newline is added if missing to make the strings
 
954
        print properly."""
 
955
        if b and b[-1] != '\n':
 
956
            b += '\n'
 
957
        if a and a[-1] != '\n':
 
958
            a += '\n'
 
959
        difflines = difflib.ndiff(a.splitlines(True),
 
960
                                  b.splitlines(True),
 
961
                                  linejunk=lambda x: False,
 
962
                                  charjunk=lambda x: False)
 
963
        return ''.join(difflines)
 
964
 
 
965
    def assertEqual(self, a, b, message=''):
 
966
        try:
 
967
            if a == b:
 
968
                return
 
969
        except UnicodeError, e:
 
970
            # If we can't compare without getting a UnicodeError, then
 
971
            # obviously they are different
 
972
            mutter('UnicodeError: %s', e)
 
973
        if message:
 
974
            message += '\n'
 
975
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
976
            % (message,
 
977
               pformat(a), pformat(b)))
 
978
 
 
979
    assertEquals = assertEqual
 
980
 
 
981
    def assertEqualDiff(self, a, b, message=None):
 
982
        """Assert two texts are equal, if not raise an exception.
 
983
 
 
984
        This is intended for use with multi-line strings where it can
 
985
        be hard to find the differences by eye.
 
986
        """
 
987
        # TODO: perhaps override assertEquals to call this for strings?
 
988
        if a == b:
 
989
            return
 
990
        if message is None:
 
991
            message = "texts not equal:\n"
 
992
        if a + '\n' == b:
 
993
            message = 'first string is missing a final newline.\n'
 
994
        if a == b + '\n':
 
995
            message = 'second string is missing a final newline.\n'
 
996
        raise AssertionError(message +
 
997
                             self._ndiff_strings(a, b))
 
998
 
 
999
    def assertEqualMode(self, mode, mode_test):
 
1000
        self.assertEqual(mode, mode_test,
 
1001
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1002
 
 
1003
    def assertEqualStat(self, expected, actual):
 
1004
        """assert that expected and actual are the same stat result.
 
1005
 
 
1006
        :param expected: A stat result.
 
1007
        :param actual: A stat result.
 
1008
        :raises AssertionError: If the expected and actual stat values differ
 
1009
            other than by atime.
 
1010
        """
 
1011
        self.assertEqual(expected.st_size, actual.st_size)
 
1012
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
1013
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
1014
        self.assertEqual(expected.st_dev, actual.st_dev)
 
1015
        self.assertEqual(expected.st_ino, actual.st_ino)
 
1016
        self.assertEqual(expected.st_mode, actual.st_mode)
 
1017
 
 
1018
    def assertLength(self, length, obj_with_len):
 
1019
        """Assert that obj_with_len is of length length."""
 
1020
        if len(obj_with_len) != length:
 
1021
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1022
                length, len(obj_with_len), obj_with_len))
 
1023
 
 
1024
    def assertPositive(self, val):
 
1025
        """Assert that val is greater than 0."""
 
1026
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1027
 
 
1028
    def assertNegative(self, val):
 
1029
        """Assert that val is less than 0."""
 
1030
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1031
 
 
1032
    def assertStartsWith(self, s, prefix):
 
1033
        if not s.startswith(prefix):
 
1034
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1035
 
 
1036
    def assertEndsWith(self, s, suffix):
 
1037
        """Asserts that s ends with suffix."""
 
1038
        if not s.endswith(suffix):
 
1039
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1040
 
 
1041
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1042
        """Assert that a contains something matching a regular expression."""
 
1043
        if not re.search(needle_re, haystack, flags):
 
1044
            if '\n' in haystack or len(haystack) > 60:
 
1045
                # a long string, format it in a more readable way
 
1046
                raise AssertionError(
 
1047
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1048
                        % (needle_re, haystack))
 
1049
            else:
 
1050
                raise AssertionError('pattern "%s" not found in "%s"'
 
1051
                        % (needle_re, haystack))
 
1052
 
 
1053
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1054
        """Assert that a does not match a regular expression"""
 
1055
        if re.search(needle_re, haystack, flags):
 
1056
            raise AssertionError('pattern "%s" found in "%s"'
 
1057
                    % (needle_re, haystack))
 
1058
 
 
1059
    def assertSubset(self, sublist, superlist):
 
1060
        """Assert that every entry in sublist is present in superlist."""
 
1061
        missing = set(sublist) - set(superlist)
 
1062
        if len(missing) > 0:
 
1063
            raise AssertionError("value(s) %r not present in container %r" %
 
1064
                                 (missing, superlist))
 
1065
 
 
1066
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1067
        """Fail unless excClass is raised when the iterator from func is used.
 
1068
 
 
1069
        Many functions can return generators this makes sure
 
1070
        to wrap them in a list() call to make sure the whole generator
 
1071
        is run, and that the proper exception is raised.
 
1072
        """
 
1073
        try:
 
1074
            list(func(*args, **kwargs))
 
1075
        except excClass, e:
 
1076
            return e
 
1077
        else:
 
1078
            if getattr(excClass,'__name__', None) is not None:
 
1079
                excName = excClass.__name__
 
1080
            else:
 
1081
                excName = str(excClass)
 
1082
            raise self.failureException, "%s not raised" % excName
 
1083
 
 
1084
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1085
        """Assert that a callable raises a particular exception.
 
1086
 
 
1087
        :param excClass: As for the except statement, this may be either an
 
1088
            exception class, or a tuple of classes.
 
1089
        :param callableObj: A callable, will be passed ``*args`` and
 
1090
            ``**kwargs``.
 
1091
 
 
1092
        Returns the exception so that you can examine it.
 
1093
        """
 
1094
        try:
 
1095
            callableObj(*args, **kwargs)
 
1096
        except excClass, e:
 
1097
            return e
 
1098
        else:
 
1099
            if getattr(excClass,'__name__', None) is not None:
 
1100
                excName = excClass.__name__
 
1101
            else:
 
1102
                # probably a tuple
 
1103
                excName = str(excClass)
 
1104
            raise self.failureException, "%s not raised" % excName
 
1105
 
 
1106
    def assertIs(self, left, right, message=None):
 
1107
        if not (left is right):
 
1108
            if message is not None:
 
1109
                raise AssertionError(message)
 
1110
            else:
 
1111
                raise AssertionError("%r is not %r." % (left, right))
 
1112
 
 
1113
    def assertIsNot(self, left, right, message=None):
 
1114
        if (left is right):
 
1115
            if message is not None:
 
1116
                raise AssertionError(message)
 
1117
            else:
 
1118
                raise AssertionError("%r is %r." % (left, right))
 
1119
 
 
1120
    def assertTransportMode(self, transport, path, mode):
 
1121
        """Fail if a path does not have mode "mode".
 
1122
 
 
1123
        If modes are not supported on this transport, the assertion is ignored.
 
1124
        """
 
1125
        if not transport._can_roundtrip_unix_modebits():
 
1126
            return
 
1127
        path_stat = transport.stat(path)
 
1128
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1129
        self.assertEqual(mode, actual_mode,
 
1130
                         'mode of %r incorrect (%s != %s)'
 
1131
                         % (path, oct(mode), oct(actual_mode)))
 
1132
 
 
1133
    def assertIsSameRealPath(self, path1, path2):
 
1134
        """Fail if path1 and path2 points to different files"""
 
1135
        self.assertEqual(osutils.realpath(path1),
 
1136
                         osutils.realpath(path2),
 
1137
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1138
 
 
1139
    def assertIsInstance(self, obj, kls, msg=None):
 
1140
        """Fail if obj is not an instance of kls
 
1141
        
 
1142
        :param msg: Supplementary message to show if the assertion fails.
 
1143
        """
 
1144
        if not isinstance(obj, kls):
 
1145
            m = "%r is an instance of %s rather than %s" % (
 
1146
                obj, obj.__class__, kls)
 
1147
            if msg:
 
1148
                m += ": " + msg
 
1149
            self.fail(m)
 
1150
 
 
1151
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1152
        """Invoke a test, expecting it to fail for the given reason.
 
1153
 
 
1154
        This is for assertions that ought to succeed, but currently fail.
 
1155
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1156
        about the failure you're expecting.  If a new bug is introduced,
 
1157
        AssertionError should be raised, not KnownFailure.
 
1158
 
 
1159
        Frequently, expectFailure should be followed by an opposite assertion.
 
1160
        See example below.
 
1161
 
 
1162
        Intended to be used with a callable that raises AssertionError as the
 
1163
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1164
 
 
1165
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1166
        test succeeds.
 
1167
 
 
1168
        example usage::
 
1169
 
 
1170
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1171
                             dynamic_val)
 
1172
          self.assertEqual(42, dynamic_val)
 
1173
 
 
1174
          This means that a dynamic_val of 54 will cause the test to raise
 
1175
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1176
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1177
          than 54 or 42 will cause an AssertionError.
 
1178
        """
 
1179
        try:
 
1180
            assertion(*args, **kwargs)
 
1181
        except AssertionError:
 
1182
            raise KnownFailure(reason)
 
1183
        else:
 
1184
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1185
 
 
1186
    def assertFileEqual(self, content, path):
 
1187
        """Fail if path does not contain 'content'."""
 
1188
        self.failUnlessExists(path)
 
1189
        f = file(path, 'rb')
 
1190
        try:
 
1191
            s = f.read()
 
1192
        finally:
 
1193
            f.close()
 
1194
        self.assertEqualDiff(content, s)
 
1195
 
 
1196
    def failUnlessExists(self, path):
 
1197
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1198
        if not isinstance(path, basestring):
 
1199
            for p in path:
 
1200
                self.failUnlessExists(p)
 
1201
        else:
 
1202
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1203
 
 
1204
    def failIfExists(self, path):
 
1205
        """Fail if path or paths, which may be abs or relative, exist."""
 
1206
        if not isinstance(path, basestring):
 
1207
            for p in path:
 
1208
                self.failIfExists(p)
 
1209
        else:
 
1210
            self.failIf(osutils.lexists(path),path+" exists")
 
1211
 
 
1212
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1213
        """A helper for callDeprecated and applyDeprecated.
 
1214
 
 
1215
        :param a_callable: A callable to call.
 
1216
        :param args: The positional arguments for the callable
 
1217
        :param kwargs: The keyword arguments for the callable
 
1218
        :return: A tuple (warnings, result). result is the result of calling
 
1219
            a_callable(``*args``, ``**kwargs``).
 
1220
        """
 
1221
        local_warnings = []
 
1222
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1223
            # we've hooked into a deprecation specific callpath,
 
1224
            # only deprecations should getting sent via it.
 
1225
            self.assertEqual(cls, DeprecationWarning)
 
1226
            local_warnings.append(msg)
 
1227
        original_warning_method = symbol_versioning.warn
 
1228
        symbol_versioning.set_warning_method(capture_warnings)
 
1229
        try:
 
1230
            result = a_callable(*args, **kwargs)
 
1231
        finally:
 
1232
            symbol_versioning.set_warning_method(original_warning_method)
 
1233
        return (local_warnings, result)
 
1234
 
 
1235
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1236
        """Call a deprecated callable without warning the user.
 
1237
 
 
1238
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1239
        not other callers that go direct to the warning module.
 
1240
 
 
1241
        To test that a deprecated method raises an error, do something like
 
1242
        this::
 
1243
 
 
1244
            self.assertRaises(errors.ReservedId,
 
1245
                self.applyDeprecated,
 
1246
                deprecated_in((1, 5, 0)),
 
1247
                br.append_revision,
 
1248
                'current:')
 
1249
 
 
1250
        :param deprecation_format: The deprecation format that the callable
 
1251
            should have been deprecated with. This is the same type as the
 
1252
            parameter to deprecated_method/deprecated_function. If the
 
1253
            callable is not deprecated with this format, an assertion error
 
1254
            will be raised.
 
1255
        :param a_callable: A callable to call. This may be a bound method or
 
1256
            a regular function. It will be called with ``*args`` and
 
1257
            ``**kwargs``.
 
1258
        :param args: The positional arguments for the callable
 
1259
        :param kwargs: The keyword arguments for the callable
 
1260
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1261
        """
 
1262
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1263
            *args, **kwargs)
 
1264
        expected_first_warning = symbol_versioning.deprecation_string(
 
1265
            a_callable, deprecation_format)
 
1266
        if len(call_warnings) == 0:
 
1267
            self.fail("No deprecation warning generated by call to %s" %
 
1268
                a_callable)
 
1269
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1270
        return result
 
1271
 
 
1272
    def callCatchWarnings(self, fn, *args, **kw):
 
1273
        """Call a callable that raises python warnings.
 
1274
 
 
1275
        The caller's responsible for examining the returned warnings.
 
1276
 
 
1277
        If the callable raises an exception, the exception is not
 
1278
        caught and propagates up to the caller.  In that case, the list
 
1279
        of warnings is not available.
 
1280
 
 
1281
        :returns: ([warning_object, ...], fn_result)
 
1282
        """
 
1283
        # XXX: This is not perfect, because it completely overrides the
 
1284
        # warnings filters, and some code may depend on suppressing particular
 
1285
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1286
        # though.  -- Andrew, 20071062
 
1287
        wlist = []
 
1288
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1289
            # despite the name, 'message' is normally(?) a Warning subclass
 
1290
            # instance
 
1291
            wlist.append(message)
 
1292
        saved_showwarning = warnings.showwarning
 
1293
        saved_filters = warnings.filters
 
1294
        try:
 
1295
            warnings.showwarning = _catcher
 
1296
            warnings.filters = []
 
1297
            result = fn(*args, **kw)
 
1298
        finally:
 
1299
            warnings.showwarning = saved_showwarning
 
1300
            warnings.filters = saved_filters
 
1301
        return wlist, result
 
1302
 
 
1303
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1304
        """Assert that a callable is deprecated in a particular way.
 
1305
 
 
1306
        This is a very precise test for unusual requirements. The
 
1307
        applyDeprecated helper function is probably more suited for most tests
 
1308
        as it allows you to simply specify the deprecation format being used
 
1309
        and will ensure that that is issued for the function being called.
 
1310
 
 
1311
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1312
        not other callers that go direct to the warning module.  To catch
 
1313
        general warnings, use callCatchWarnings.
 
1314
 
 
1315
        :param expected: a list of the deprecation warnings expected, in order
 
1316
        :param callable: The callable to call
 
1317
        :param args: The positional arguments for the callable
 
1318
        :param kwargs: The keyword arguments for the callable
 
1319
        """
 
1320
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1321
            *args, **kwargs)
 
1322
        self.assertEqual(expected, call_warnings)
 
1323
        return result
 
1324
 
 
1325
    def _startLogFile(self):
 
1326
        """Send bzr and test log messages to a temporary file.
 
1327
 
 
1328
        The file is removed as the test is torn down.
 
1329
        """
 
1330
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1331
        self._log_file = os.fdopen(fileno, 'w+')
 
1332
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1333
        self._log_file_name = name
 
1334
        self.addCleanup(self._finishLogFile)
 
1335
 
 
1336
    def _finishLogFile(self):
 
1337
        """Finished with the log file.
 
1338
 
 
1339
        Close the file and delete it, unless setKeepLogfile was called.
 
1340
        """
 
1341
        if self._log_file is None:
 
1342
            return
 
1343
        bzrlib.trace.pop_log_file(self._log_memento)
 
1344
        self._log_file.close()
 
1345
        self._log_file = None
 
1346
        if not self._keep_log_file:
 
1347
            os.remove(self._log_file_name)
 
1348
            self._log_file_name = None
 
1349
 
 
1350
    def setKeepLogfile(self):
 
1351
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1352
        self._keep_log_file = True
 
1353
 
 
1354
    def thisFailsStrictLockCheck(self):
 
1355
        """It is known that this test would fail with -Dstrict_locks.
 
1356
 
 
1357
        By default, all tests are run with strict lock checking unless
 
1358
        -Edisable_lock_checks is supplied. However there are some tests which
 
1359
        we know fail strict locks at this point that have not been fixed.
 
1360
        They should call this function to disable the strict checking.
 
1361
 
 
1362
        This should be used sparingly, it is much better to fix the locking
 
1363
        issues rather than papering over the problem by calling this function.
 
1364
        """
 
1365
        debug.debug_flags.discard('strict_locks')
 
1366
 
 
1367
    def addCleanup(self, callable, *args, **kwargs):
 
1368
        """Arrange to run a callable when this case is torn down.
 
1369
 
 
1370
        Callables are run in the reverse of the order they are registered,
 
1371
        ie last-in first-out.
 
1372
        """
 
1373
        self._cleanups.append((callable, args, kwargs))
 
1374
 
 
1375
    def _cleanEnvironment(self):
 
1376
        new_env = {
 
1377
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1378
            'HOME': os.getcwd(),
 
1379
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1380
            # tests do check our impls match APPDATA
 
1381
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1382
            'VISUAL': None,
 
1383
            'EDITOR': None,
 
1384
            'BZR_EMAIL': None,
 
1385
            'BZREMAIL': None, # may still be present in the environment
 
1386
            'EMAIL': None,
 
1387
            'BZR_PROGRESS_BAR': None,
 
1388
            'BZR_LOG': None,
 
1389
            'BZR_PLUGIN_PATH': None,
 
1390
            # Make sure that any text ui tests are consistent regardless of
 
1391
            # the environment the test case is run in; you may want tests that
 
1392
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1393
            # going to a pipe or a StringIO.
 
1394
            'TERM': 'dumb',
 
1395
            'LINES': '25',
 
1396
            'COLUMNS': '80',
 
1397
            # SSH Agent
 
1398
            'SSH_AUTH_SOCK': None,
 
1399
            # Proxies
 
1400
            'http_proxy': None,
 
1401
            'HTTP_PROXY': None,
 
1402
            'https_proxy': None,
 
1403
            'HTTPS_PROXY': None,
 
1404
            'no_proxy': None,
 
1405
            'NO_PROXY': None,
 
1406
            'all_proxy': None,
 
1407
            'ALL_PROXY': None,
 
1408
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1409
            # least. If you do (care), please update this comment
 
1410
            # -- vila 20080401
 
1411
            'ftp_proxy': None,
 
1412
            'FTP_PROXY': None,
 
1413
            'BZR_REMOTE_PATH': None,
 
1414
        }
 
1415
        self.__old_env = {}
 
1416
        self.addCleanup(self._restoreEnvironment)
 
1417
        for name, value in new_env.iteritems():
 
1418
            self._captureVar(name, value)
 
1419
 
 
1420
    def _captureVar(self, name, newvalue):
 
1421
        """Set an environment variable, and reset it when finished."""
 
1422
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1423
 
 
1424
    def _restore_debug_flags(self):
 
1425
        debug.debug_flags.clear()
 
1426
        debug.debug_flags.update(self._preserved_debug_flags)
 
1427
 
 
1428
    def _restoreEnvironment(self):
 
1429
        for name, value in self.__old_env.iteritems():
 
1430
            osutils.set_or_unset_env(name, value)
 
1431
 
 
1432
    def _restoreHooks(self):
 
1433
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1434
            setattr(klass, name, hooks)
 
1435
 
 
1436
    def knownFailure(self, reason):
 
1437
        """This test has failed for some known reason."""
 
1438
        raise KnownFailure(reason)
 
1439
 
 
1440
    def _do_skip(self, result, reason):
 
1441
        addSkip = getattr(result, 'addSkip', None)
 
1442
        if not callable(addSkip):
 
1443
            result.addError(self, sys.exc_info())
 
1444
        else:
 
1445
            addSkip(self, reason)
 
1446
 
 
1447
    def run(self, result=None):
 
1448
        if result is None: result = self.defaultTestResult()
 
1449
        for feature in getattr(self, '_test_needs_features', []):
 
1450
            if not feature.available():
 
1451
                result.startTest(self)
 
1452
                if getattr(result, 'addNotSupported', None):
 
1453
                    result.addNotSupported(self, feature)
 
1454
                else:
 
1455
                    result.addSuccess(self)
 
1456
                result.stopTest(self)
 
1457
                return result
 
1458
        try:
 
1459
            try:
 
1460
                result.startTest(self)
 
1461
                absent_attr = object()
 
1462
                # Python 2.5
 
1463
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1464
                if method_name is absent_attr:
 
1465
                    # Python 2.4
 
1466
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1467
                testMethod = getattr(self, method_name)
 
1468
                try:
 
1469
                    try:
 
1470
                        self.setUp()
 
1471
                        if not self._bzr_test_setUp_run:
 
1472
                            self.fail(
 
1473
                                "test setUp did not invoke "
 
1474
                                "bzrlib.tests.TestCase's setUp")
 
1475
                    except KeyboardInterrupt:
 
1476
                        self._runCleanups()
 
1477
                        raise
 
1478
                    except TestSkipped, e:
 
1479
                        self._do_skip(result, e.args[0])
 
1480
                        self.tearDown()
 
1481
                        return result
 
1482
                    except:
 
1483
                        result.addError(self, sys.exc_info())
 
1484
                        self._runCleanups()
 
1485
                        return result
 
1486
 
 
1487
                    ok = False
 
1488
                    try:
 
1489
                        testMethod()
 
1490
                        ok = True
 
1491
                    except self.failureException:
 
1492
                        result.addFailure(self, sys.exc_info())
 
1493
                    except TestSkipped, e:
 
1494
                        if not e.args:
 
1495
                            reason = "No reason given."
 
1496
                        else:
 
1497
                            reason = e.args[0]
 
1498
                        self._do_skip(result, reason)
 
1499
                    except KeyboardInterrupt:
 
1500
                        self._runCleanups()
 
1501
                        raise
 
1502
                    except:
 
1503
                        result.addError(self, sys.exc_info())
 
1504
 
 
1505
                    try:
 
1506
                        self.tearDown()
 
1507
                        if not self._bzr_test_tearDown_run:
 
1508
                            self.fail(
 
1509
                                "test tearDown did not invoke "
 
1510
                                "bzrlib.tests.TestCase's tearDown")
 
1511
                    except KeyboardInterrupt:
 
1512
                        self._runCleanups()
 
1513
                        raise
 
1514
                    except:
 
1515
                        result.addError(self, sys.exc_info())
 
1516
                        self._runCleanups()
 
1517
                        ok = False
 
1518
                    if ok: result.addSuccess(self)
 
1519
                finally:
 
1520
                    result.stopTest(self)
 
1521
                return result
 
1522
            except TestNotApplicable:
 
1523
                # Not moved from the result [yet].
 
1524
                self._runCleanups()
 
1525
                raise
 
1526
            except KeyboardInterrupt:
 
1527
                self._runCleanups()
 
1528
                raise
 
1529
        finally:
 
1530
            saved_attrs = {}
 
1531
            for attr_name in self.attrs_to_keep:
 
1532
                if attr_name in self.__dict__:
 
1533
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1534
            self.__dict__ = saved_attrs
 
1535
 
 
1536
    def tearDown(self):
 
1537
        self._runCleanups()
 
1538
        self._log_contents = ''
 
1539
        self._bzr_test_tearDown_run = True
 
1540
        unittest.TestCase.tearDown(self)
 
1541
 
 
1542
    def time(self, callable, *args, **kwargs):
 
1543
        """Run callable and accrue the time it takes to the benchmark time.
 
1544
 
 
1545
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1546
        this will cause lsprofile statistics to be gathered and stored in
 
1547
        self._benchcalls.
 
1548
        """
 
1549
        if self._benchtime is None:
 
1550
            self._benchtime = 0
 
1551
        start = time.time()
 
1552
        try:
 
1553
            if not self._gather_lsprof_in_benchmarks:
 
1554
                return callable(*args, **kwargs)
 
1555
            else:
 
1556
                # record this benchmark
 
1557
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1558
                stats.sort()
 
1559
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1560
                return ret
 
1561
        finally:
 
1562
            self._benchtime += time.time() - start
 
1563
 
 
1564
    def _runCleanups(self):
 
1565
        """Run registered cleanup functions.
 
1566
 
 
1567
        This should only be called from TestCase.tearDown.
 
1568
        """
 
1569
        # TODO: Perhaps this should keep running cleanups even if
 
1570
        # one of them fails?
 
1571
 
 
1572
        # Actually pop the cleanups from the list so tearDown running
 
1573
        # twice is safe (this happens for skipped tests).
 
1574
        while self._cleanups:
 
1575
            cleanup, args, kwargs = self._cleanups.pop()
 
1576
            cleanup(*args, **kwargs)
 
1577
 
 
1578
    def log(self, *args):
 
1579
        mutter(*args)
 
1580
 
 
1581
    def _get_log(self, keep_log_file=False):
 
1582
        """Get the log from bzrlib.trace calls from this test.
 
1583
 
 
1584
        :param keep_log_file: When True, if the log is still a file on disk
 
1585
            leave it as a file on disk. When False, if the log is still a file
 
1586
            on disk, the log file is deleted and the log preserved as
 
1587
            self._log_contents.
 
1588
        :return: A string containing the log.
 
1589
        """
 
1590
        # flush the log file, to get all content
 
1591
        import bzrlib.trace
 
1592
        if bzrlib.trace._trace_file:
 
1593
            bzrlib.trace._trace_file.flush()
 
1594
        if self._log_contents:
 
1595
            # XXX: this can hardly contain the content flushed above --vila
 
1596
            # 20080128
 
1597
            return self._log_contents
 
1598
        if self._log_file_name is not None:
 
1599
            logfile = open(self._log_file_name)
 
1600
            try:
 
1601
                log_contents = logfile.read()
 
1602
            finally:
 
1603
                logfile.close()
 
1604
            if not keep_log_file:
 
1605
                self._log_contents = log_contents
 
1606
                try:
 
1607
                    os.remove(self._log_file_name)
 
1608
                except OSError, e:
 
1609
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1610
                        sys.stderr.write(('Unable to delete log file '
 
1611
                                             ' %r\n' % self._log_file_name))
 
1612
                    else:
 
1613
                        raise
 
1614
            return log_contents
 
1615
        else:
 
1616
            return "DELETED log file to reduce memory footprint"
 
1617
 
 
1618
    def requireFeature(self, feature):
 
1619
        """This test requires a specific feature is available.
 
1620
 
 
1621
        :raises UnavailableFeature: When feature is not available.
 
1622
        """
 
1623
        if not feature.available():
 
1624
            raise UnavailableFeature(feature)
 
1625
 
 
1626
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1627
            working_dir):
 
1628
        """Run bazaar command line, splitting up a string command line."""
 
1629
        if isinstance(args, basestring):
 
1630
            # shlex don't understand unicode strings,
 
1631
            # so args should be plain string (bialix 20070906)
 
1632
            args = list(shlex.split(str(args)))
 
1633
        return self._run_bzr_core(args, retcode=retcode,
 
1634
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1635
                )
 
1636
 
 
1637
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1638
            working_dir):
 
1639
        if encoding is None:
 
1640
            encoding = osutils.get_user_encoding()
 
1641
        stdout = StringIOWrapper()
 
1642
        stderr = StringIOWrapper()
 
1643
        stdout.encoding = encoding
 
1644
        stderr.encoding = encoding
 
1645
 
 
1646
        self.log('run bzr: %r', args)
 
1647
        # FIXME: don't call into logging here
 
1648
        handler = logging.StreamHandler(stderr)
 
1649
        handler.setLevel(logging.INFO)
 
1650
        logger = logging.getLogger('')
 
1651
        logger.addHandler(handler)
 
1652
        old_ui_factory = ui.ui_factory
 
1653
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1654
 
 
1655
        cwd = None
 
1656
        if working_dir is not None:
 
1657
            cwd = osutils.getcwd()
 
1658
            os.chdir(working_dir)
 
1659
 
 
1660
        try:
 
1661
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1662
                stdout, stderr,
 
1663
                bzrlib.commands.run_bzr_catch_user_errors,
 
1664
                args)
 
1665
        finally:
 
1666
            logger.removeHandler(handler)
 
1667
            ui.ui_factory = old_ui_factory
 
1668
            if cwd is not None:
 
1669
                os.chdir(cwd)
 
1670
 
 
1671
        out = stdout.getvalue()
 
1672
        err = stderr.getvalue()
 
1673
        if out:
 
1674
            self.log('output:\n%r', out)
 
1675
        if err:
 
1676
            self.log('errors:\n%r', err)
 
1677
        if retcode is not None:
 
1678
            self.assertEquals(retcode, result,
 
1679
                              message='Unexpected return code')
 
1680
        return out, err
 
1681
 
 
1682
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1683
                working_dir=None, error_regexes=[], output_encoding=None):
 
1684
        """Invoke bzr, as if it were run from the command line.
 
1685
 
 
1686
        The argument list should not include the bzr program name - the
 
1687
        first argument is normally the bzr command.  Arguments may be
 
1688
        passed in three ways:
 
1689
 
 
1690
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1691
        when the command contains whitespace or metacharacters, or
 
1692
        is built up at run time.
 
1693
 
 
1694
        2- A single string, eg "add a".  This is the most convenient
 
1695
        for hardcoded commands.
 
1696
 
 
1697
        This runs bzr through the interface that catches and reports
 
1698
        errors, and with logging set to something approximating the
 
1699
        default, so that error reporting can be checked.
 
1700
 
 
1701
        This should be the main method for tests that want to exercise the
 
1702
        overall behavior of the bzr application (rather than a unit test
 
1703
        or a functional test of the library.)
 
1704
 
 
1705
        This sends the stdout/stderr results into the test's log,
 
1706
        where it may be useful for debugging.  See also run_captured.
 
1707
 
 
1708
        :keyword stdin: A string to be used as stdin for the command.
 
1709
        :keyword retcode: The status code the command should return;
 
1710
            default 0.
 
1711
        :keyword working_dir: The directory to run the command in
 
1712
        :keyword error_regexes: A list of expected error messages.  If
 
1713
            specified they must be seen in the error output of the command.
 
1714
        """
 
1715
        out, err = self._run_bzr_autosplit(
 
1716
            args=args,
 
1717
            retcode=retcode,
 
1718
            encoding=encoding,
 
1719
            stdin=stdin,
 
1720
            working_dir=working_dir,
 
1721
            )
 
1722
        self.assertIsInstance(error_regexes, (list, tuple))
 
1723
        for regex in error_regexes:
 
1724
            self.assertContainsRe(err, regex)
 
1725
        return out, err
 
1726
 
 
1727
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1728
        """Run bzr, and check that stderr contains the supplied regexes
 
1729
 
 
1730
        :param error_regexes: Sequence of regular expressions which
 
1731
            must each be found in the error output. The relative ordering
 
1732
            is not enforced.
 
1733
        :param args: command-line arguments for bzr
 
1734
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1735
            This function changes the default value of retcode to be 3,
 
1736
            since in most cases this is run when you expect bzr to fail.
 
1737
 
 
1738
        :return: (out, err) The actual output of running the command (in case
 
1739
            you want to do more inspection)
 
1740
 
 
1741
        Examples of use::
 
1742
 
 
1743
            # Make sure that commit is failing because there is nothing to do
 
1744
            self.run_bzr_error(['no changes to commit'],
 
1745
                               ['commit', '-m', 'my commit comment'])
 
1746
            # Make sure --strict is handling an unknown file, rather than
 
1747
            # giving us the 'nothing to do' error
 
1748
            self.build_tree(['unknown'])
 
1749
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1750
                               ['commit', --strict', '-m', 'my commit comment'])
 
1751
        """
 
1752
        kwargs.setdefault('retcode', 3)
 
1753
        kwargs['error_regexes'] = error_regexes
 
1754
        out, err = self.run_bzr(*args, **kwargs)
 
1755
        return out, err
 
1756
 
 
1757
    def run_bzr_subprocess(self, *args, **kwargs):
 
1758
        """Run bzr in a subprocess for testing.
 
1759
 
 
1760
        This starts a new Python interpreter and runs bzr in there.
 
1761
        This should only be used for tests that have a justifiable need for
 
1762
        this isolation: e.g. they are testing startup time, or signal
 
1763
        handling, or early startup code, etc.  Subprocess code can't be
 
1764
        profiled or debugged so easily.
 
1765
 
 
1766
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1767
            None is supplied, the status code is not checked.
 
1768
        :keyword env_changes: A dictionary which lists changes to environment
 
1769
            variables. A value of None will unset the env variable.
 
1770
            The values must be strings. The change will only occur in the
 
1771
            child, so you don't need to fix the environment after running.
 
1772
        :keyword universal_newlines: Convert CRLF => LF
 
1773
        :keyword allow_plugins: By default the subprocess is run with
 
1774
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1775
            for system-wide plugins to create unexpected output on stderr,
 
1776
            which can cause unnecessary test failures.
 
1777
        """
 
1778
        env_changes = kwargs.get('env_changes', {})
 
1779
        working_dir = kwargs.get('working_dir', None)
 
1780
        allow_plugins = kwargs.get('allow_plugins', False)
 
1781
        if len(args) == 1:
 
1782
            if isinstance(args[0], list):
 
1783
                args = args[0]
 
1784
            elif isinstance(args[0], basestring):
 
1785
                args = list(shlex.split(args[0]))
 
1786
        else:
 
1787
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1788
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1789
                                            working_dir=working_dir,
 
1790
                                            allow_plugins=allow_plugins)
 
1791
        # We distinguish between retcode=None and retcode not passed.
 
1792
        supplied_retcode = kwargs.get('retcode', 0)
 
1793
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1794
            universal_newlines=kwargs.get('universal_newlines', False),
 
1795
            process_args=args)
 
1796
 
 
1797
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1798
                             skip_if_plan_to_signal=False,
 
1799
                             working_dir=None,
 
1800
                             allow_plugins=False):
 
1801
        """Start bzr in a subprocess for testing.
 
1802
 
 
1803
        This starts a new Python interpreter and runs bzr in there.
 
1804
        This should only be used for tests that have a justifiable need for
 
1805
        this isolation: e.g. they are testing startup time, or signal
 
1806
        handling, or early startup code, etc.  Subprocess code can't be
 
1807
        profiled or debugged so easily.
 
1808
 
 
1809
        :param process_args: a list of arguments to pass to the bzr executable,
 
1810
            for example ``['--version']``.
 
1811
        :param env_changes: A dictionary which lists changes to environment
 
1812
            variables. A value of None will unset the env variable.
 
1813
            The values must be strings. The change will only occur in the
 
1814
            child, so you don't need to fix the environment after running.
 
1815
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1816
            is not available.
 
1817
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1818
 
 
1819
        :returns: Popen object for the started process.
 
1820
        """
 
1821
        if skip_if_plan_to_signal:
 
1822
            if not getattr(os, 'kill', None):
 
1823
                raise TestSkipped("os.kill not available.")
 
1824
 
 
1825
        if env_changes is None:
 
1826
            env_changes = {}
 
1827
        old_env = {}
 
1828
 
 
1829
        def cleanup_environment():
 
1830
            for env_var, value in env_changes.iteritems():
 
1831
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1832
 
 
1833
        def restore_environment():
 
1834
            for env_var, value in old_env.iteritems():
 
1835
                osutils.set_or_unset_env(env_var, value)
 
1836
 
 
1837
        bzr_path = self.get_bzr_path()
 
1838
 
 
1839
        cwd = None
 
1840
        if working_dir is not None:
 
1841
            cwd = osutils.getcwd()
 
1842
            os.chdir(working_dir)
 
1843
 
 
1844
        try:
 
1845
            # win32 subprocess doesn't support preexec_fn
 
1846
            # so we will avoid using it on all platforms, just to
 
1847
            # make sure the code path is used, and we don't break on win32
 
1848
            cleanup_environment()
 
1849
            command = [sys.executable]
 
1850
            # frozen executables don't need the path to bzr
 
1851
            if getattr(sys, "frozen", None) is None:
 
1852
                command.append(bzr_path)
 
1853
            if not allow_plugins:
 
1854
                command.append('--no-plugins')
 
1855
            command.extend(process_args)
 
1856
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1857
        finally:
 
1858
            restore_environment()
 
1859
            if cwd is not None:
 
1860
                os.chdir(cwd)
 
1861
 
 
1862
        return process
 
1863
 
 
1864
    def _popen(self, *args, **kwargs):
 
1865
        """Place a call to Popen.
 
1866
 
 
1867
        Allows tests to override this method to intercept the calls made to
 
1868
        Popen for introspection.
 
1869
        """
 
1870
        return Popen(*args, **kwargs)
 
1871
 
 
1872
    def get_bzr_path(self):
 
1873
        """Return the path of the 'bzr' executable for this test suite."""
 
1874
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1875
        if not os.path.isfile(bzr_path):
 
1876
            # We are probably installed. Assume sys.argv is the right file
 
1877
            bzr_path = sys.argv[0]
 
1878
        return bzr_path
 
1879
 
 
1880
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1881
                              universal_newlines=False, process_args=None):
 
1882
        """Finish the execution of process.
 
1883
 
 
1884
        :param process: the Popen object returned from start_bzr_subprocess.
 
1885
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1886
            None is supplied, the status code is not checked.
 
1887
        :param send_signal: an optional signal to send to the process.
 
1888
        :param universal_newlines: Convert CRLF => LF
 
1889
        :returns: (stdout, stderr)
 
1890
        """
 
1891
        if send_signal is not None:
 
1892
            os.kill(process.pid, send_signal)
 
1893
        out, err = process.communicate()
 
1894
 
 
1895
        if universal_newlines:
 
1896
            out = out.replace('\r\n', '\n')
 
1897
            err = err.replace('\r\n', '\n')
 
1898
 
 
1899
        if retcode is not None and retcode != process.returncode:
 
1900
            if process_args is None:
 
1901
                process_args = "(unknown args)"
 
1902
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1903
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1904
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1905
                      % (process_args, retcode, process.returncode))
 
1906
        return [out, err]
 
1907
 
 
1908
    def check_inventory_shape(self, inv, shape):
 
1909
        """Compare an inventory to a list of expected names.
 
1910
 
 
1911
        Fail if they are not precisely equal.
 
1912
        """
 
1913
        extras = []
 
1914
        shape = list(shape)             # copy
 
1915
        for path, ie in inv.entries():
 
1916
            name = path.replace('\\', '/')
 
1917
            if ie.kind == 'directory':
 
1918
                name = name + '/'
 
1919
            if name in shape:
 
1920
                shape.remove(name)
 
1921
            else:
 
1922
                extras.append(name)
 
1923
        if shape:
 
1924
            self.fail("expected paths not found in inventory: %r" % shape)
 
1925
        if extras:
 
1926
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1927
 
 
1928
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1929
                         a_callable=None, *args, **kwargs):
 
1930
        """Call callable with redirected std io pipes.
 
1931
 
 
1932
        Returns the return code."""
 
1933
        if not callable(a_callable):
 
1934
            raise ValueError("a_callable must be callable.")
 
1935
        if stdin is None:
 
1936
            stdin = StringIO("")
 
1937
        if stdout is None:
 
1938
            if getattr(self, "_log_file", None) is not None:
 
1939
                stdout = self._log_file
 
1940
            else:
 
1941
                stdout = StringIO()
 
1942
        if stderr is None:
 
1943
            if getattr(self, "_log_file", None is not None):
 
1944
                stderr = self._log_file
 
1945
            else:
 
1946
                stderr = StringIO()
 
1947
        real_stdin = sys.stdin
 
1948
        real_stdout = sys.stdout
 
1949
        real_stderr = sys.stderr
 
1950
        try:
 
1951
            sys.stdout = stdout
 
1952
            sys.stderr = stderr
 
1953
            sys.stdin = stdin
 
1954
            return a_callable(*args, **kwargs)
 
1955
        finally:
 
1956
            sys.stdout = real_stdout
 
1957
            sys.stderr = real_stderr
 
1958
            sys.stdin = real_stdin
 
1959
 
 
1960
    def reduceLockdirTimeout(self):
 
1961
        """Reduce the default lock timeout for the duration of the test, so that
 
1962
        if LockContention occurs during a test, it does so quickly.
 
1963
 
 
1964
        Tests that expect to provoke LockContention errors should call this.
 
1965
        """
 
1966
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1967
        def resetTimeout():
 
1968
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1969
        self.addCleanup(resetTimeout)
 
1970
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1971
 
 
1972
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1973
        """Return a StringIOWrapper instance, that will encode Unicode
 
1974
        input to UTF-8.
 
1975
        """
 
1976
        if encoding_type is None:
 
1977
            encoding_type = 'strict'
 
1978
        sio = StringIO()
 
1979
        output_encoding = 'utf-8'
 
1980
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1981
        sio.encoding = output_encoding
 
1982
        return sio
 
1983
 
 
1984
    def disable_verb(self, verb):
 
1985
        """Disable a smart server verb for one test."""
 
1986
        from bzrlib.smart import request
 
1987
        request_handlers = request.request_handlers
 
1988
        orig_method = request_handlers.get(verb)
 
1989
        request_handlers.remove(verb)
 
1990
        def restoreVerb():
 
1991
            request_handlers.register(verb, orig_method)
 
1992
        self.addCleanup(restoreVerb)
 
1993
 
 
1994
 
 
1995
class CapturedCall(object):
 
1996
    """A helper for capturing smart server calls for easy debug analysis."""
 
1997
 
 
1998
    def __init__(self, params, prefix_length):
 
1999
        """Capture the call with params and skip prefix_length stack frames."""
 
2000
        self.call = params
 
2001
        import traceback
 
2002
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2003
        # client frames. Beyond this we could get more clever, but this is good
 
2004
        # enough for now.
 
2005
        stack = traceback.extract_stack()[prefix_length:-5]
 
2006
        self.stack = ''.join(traceback.format_list(stack))
 
2007
 
 
2008
    def __str__(self):
 
2009
        return self.call.method
 
2010
 
 
2011
    def __repr__(self):
 
2012
        return self.call.method
 
2013
 
 
2014
    def stack(self):
 
2015
        return self.stack
 
2016
 
 
2017
 
 
2018
class TestCaseWithMemoryTransport(TestCase):
 
2019
    """Common test class for tests that do not need disk resources.
 
2020
 
 
2021
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2022
 
 
2023
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2024
 
 
2025
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2026
    a directory which does not exist. This serves to help ensure test isolation
 
2027
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2028
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2029
    file defaults for the transport in tests, nor does it obey the command line
 
2030
    override, so tests that accidentally write to the common directory should
 
2031
    be rare.
 
2032
 
 
2033
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2034
    a .bzr directory that stops us ascending higher into the filesystem.
 
2035
    """
 
2036
 
 
2037
    TEST_ROOT = None
 
2038
    _TEST_NAME = 'test'
 
2039
 
 
2040
    def __init__(self, methodName='runTest'):
 
2041
        # allow test parameterization after test construction and before test
 
2042
        # execution. Variables that the parameterizer sets need to be
 
2043
        # ones that are not set by setUp, or setUp will trash them.
 
2044
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2045
        self.vfs_transport_factory = default_transport
 
2046
        self.transport_server = None
 
2047
        self.transport_readonly_server = None
 
2048
        self.__vfs_server = None
 
2049
 
 
2050
    def get_transport(self, relpath=None):
 
2051
        """Return a writeable transport.
 
2052
 
 
2053
        This transport is for the test scratch space relative to
 
2054
        "self._test_root"
 
2055
 
 
2056
        :param relpath: a path relative to the base url.
 
2057
        """
 
2058
        t = get_transport(self.get_url(relpath))
 
2059
        self.assertFalse(t.is_readonly())
 
2060
        return t
 
2061
 
 
2062
    def get_readonly_transport(self, relpath=None):
 
2063
        """Return a readonly transport for the test scratch space
 
2064
 
 
2065
        This can be used to test that operations which should only need
 
2066
        readonly access in fact do not try to write.
 
2067
 
 
2068
        :param relpath: a path relative to the base url.
 
2069
        """
 
2070
        t = get_transport(self.get_readonly_url(relpath))
 
2071
        self.assertTrue(t.is_readonly())
 
2072
        return t
 
2073
 
 
2074
    def create_transport_readonly_server(self):
 
2075
        """Create a transport server from class defined at init.
 
2076
 
 
2077
        This is mostly a hook for daughter classes.
 
2078
        """
 
2079
        return self.transport_readonly_server()
 
2080
 
 
2081
    def get_readonly_server(self):
 
2082
        """Get the server instance for the readonly transport
 
2083
 
 
2084
        This is useful for some tests with specific servers to do diagnostics.
 
2085
        """
 
2086
        if self.__readonly_server is None:
 
2087
            if self.transport_readonly_server is None:
 
2088
                # readonly decorator requested
 
2089
                self.__readonly_server = ReadonlyServer()
 
2090
            else:
 
2091
                # explicit readonly transport.
 
2092
                self.__readonly_server = self.create_transport_readonly_server()
 
2093
            self.start_server(self.__readonly_server,
 
2094
                self.get_vfs_only_server())
 
2095
        return self.__readonly_server
 
2096
 
 
2097
    def get_readonly_url(self, relpath=None):
 
2098
        """Get a URL for the readonly transport.
 
2099
 
 
2100
        This will either be backed by '.' or a decorator to the transport
 
2101
        used by self.get_url()
 
2102
        relpath provides for clients to get a path relative to the base url.
 
2103
        These should only be downwards relative, not upwards.
 
2104
        """
 
2105
        base = self.get_readonly_server().get_url()
 
2106
        return self._adjust_url(base, relpath)
 
2107
 
 
2108
    def get_vfs_only_server(self):
 
2109
        """Get the vfs only read/write server instance.
 
2110
 
 
2111
        This is useful for some tests with specific servers that need
 
2112
        diagnostics.
 
2113
 
 
2114
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2115
        is no means to override it.
 
2116
        """
 
2117
        if self.__vfs_server is None:
 
2118
            self.__vfs_server = MemoryServer()
 
2119
            self.start_server(self.__vfs_server)
 
2120
        return self.__vfs_server
 
2121
 
 
2122
    def get_server(self):
 
2123
        """Get the read/write server instance.
 
2124
 
 
2125
        This is useful for some tests with specific servers that need
 
2126
        diagnostics.
 
2127
 
 
2128
        This is built from the self.transport_server factory. If that is None,
 
2129
        then the self.get_vfs_server is returned.
 
2130
        """
 
2131
        if self.__server is None:
 
2132
            if (self.transport_server is None or self.transport_server is
 
2133
                self.vfs_transport_factory):
 
2134
                self.__server = self.get_vfs_only_server()
 
2135
            else:
 
2136
                # bring up a decorated means of access to the vfs only server.
 
2137
                self.__server = self.transport_server()
 
2138
                self.start_server(self.__server, self.get_vfs_only_server())
 
2139
        return self.__server
 
2140
 
 
2141
    def _adjust_url(self, base, relpath):
 
2142
        """Get a URL (or maybe a path) for the readwrite transport.
 
2143
 
 
2144
        This will either be backed by '.' or to an equivalent non-file based
 
2145
        facility.
 
2146
        relpath provides for clients to get a path relative to the base url.
 
2147
        These should only be downwards relative, not upwards.
 
2148
        """
 
2149
        if relpath is not None and relpath != '.':
 
2150
            if not base.endswith('/'):
 
2151
                base = base + '/'
 
2152
            # XXX: Really base should be a url; we did after all call
 
2153
            # get_url()!  But sometimes it's just a path (from
 
2154
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2155
            # to a non-escaped local path.
 
2156
            if base.startswith('./') or base.startswith('/'):
 
2157
                base += relpath
 
2158
            else:
 
2159
                base += urlutils.escape(relpath)
 
2160
        return base
 
2161
 
 
2162
    def get_url(self, relpath=None):
 
2163
        """Get a URL (or maybe a path) for the readwrite transport.
 
2164
 
 
2165
        This will either be backed by '.' or to an equivalent non-file based
 
2166
        facility.
 
2167
        relpath provides for clients to get a path relative to the base url.
 
2168
        These should only be downwards relative, not upwards.
 
2169
        """
 
2170
        base = self.get_server().get_url()
 
2171
        return self._adjust_url(base, relpath)
 
2172
 
 
2173
    def get_vfs_only_url(self, relpath=None):
 
2174
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2175
 
 
2176
        This will never be a smart protocol.  It always has all the
 
2177
        capabilities of the local filesystem, but it might actually be a
 
2178
        MemoryTransport or some other similar virtual filesystem.
 
2179
 
 
2180
        This is the backing transport (if any) of the server returned by
 
2181
        get_url and get_readonly_url.
 
2182
 
 
2183
        :param relpath: provides for clients to get a path relative to the base
 
2184
            url.  These should only be downwards relative, not upwards.
 
2185
        :return: A URL
 
2186
        """
 
2187
        base = self.get_vfs_only_server().get_url()
 
2188
        return self._adjust_url(base, relpath)
 
2189
 
 
2190
    def _create_safety_net(self):
 
2191
        """Make a fake bzr directory.
 
2192
 
 
2193
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2194
        real branch.
 
2195
        """
 
2196
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2197
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2198
 
 
2199
    def _check_safety_net(self):
 
2200
        """Check that the safety .bzr directory have not been touched.
 
2201
 
 
2202
        _make_test_root have created a .bzr directory to prevent tests from
 
2203
        propagating. This method ensures than a test did not leaked.
 
2204
        """
 
2205
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2206
        wt = workingtree.WorkingTree.open(root)
 
2207
        last_rev = wt.last_revision()
 
2208
        if last_rev != 'null:':
 
2209
            # The current test have modified the /bzr directory, we need to
 
2210
            # recreate a new one or all the followng tests will fail.
 
2211
            # If you need to inspect its content uncomment the following line
 
2212
            # import pdb; pdb.set_trace()
 
2213
            _rmtree_temp_dir(root + '/.bzr')
 
2214
            self._create_safety_net()
 
2215
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2216
 
 
2217
    def _make_test_root(self):
 
2218
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2219
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2220
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2221
 
 
2222
            self._create_safety_net()
 
2223
 
 
2224
            # The same directory is used by all tests, and we're not
 
2225
            # specifically told when all tests are finished.  This will do.
 
2226
            atexit.register(_rmtree_temp_dir, root)
 
2227
 
 
2228
        self.addCleanup(self._check_safety_net)
 
2229
 
 
2230
    def makeAndChdirToTestDir(self):
 
2231
        """Create a temporary directories for this one test.
 
2232
 
 
2233
        This must set self.test_home_dir and self.test_dir and chdir to
 
2234
        self.test_dir.
 
2235
 
 
2236
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2237
        """
 
2238
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2239
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2240
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2241
 
 
2242
    def make_branch(self, relpath, format=None):
 
2243
        """Create a branch on the transport at relpath."""
 
2244
        repo = self.make_repository(relpath, format=format)
 
2245
        return repo.bzrdir.create_branch()
 
2246
 
 
2247
    def make_bzrdir(self, relpath, format=None):
 
2248
        try:
 
2249
            # might be a relative or absolute path
 
2250
            maybe_a_url = self.get_url(relpath)
 
2251
            segments = maybe_a_url.rsplit('/', 1)
 
2252
            t = get_transport(maybe_a_url)
 
2253
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2254
                t.ensure_base()
 
2255
            if format is None:
 
2256
                format = 'default'
 
2257
            if isinstance(format, basestring):
 
2258
                format = bzrdir.format_registry.make_bzrdir(format)
 
2259
            return format.initialize_on_transport(t)
 
2260
        except errors.UninitializableFormat:
 
2261
            raise TestSkipped("Format %s is not initializable." % format)
 
2262
 
 
2263
    def make_repository(self, relpath, shared=False, format=None):
 
2264
        """Create a repository on our default transport at relpath.
 
2265
 
 
2266
        Note that relpath must be a relative path, not a full url.
 
2267
        """
 
2268
        # FIXME: If you create a remoterepository this returns the underlying
 
2269
        # real format, which is incorrect.  Actually we should make sure that
 
2270
        # RemoteBzrDir returns a RemoteRepository.
 
2271
        # maybe  mbp 20070410
 
2272
        made_control = self.make_bzrdir(relpath, format=format)
 
2273
        return made_control.create_repository(shared=shared)
 
2274
 
 
2275
    def make_smart_server(self, path):
 
2276
        smart_server = server.SmartTCPServer_for_testing()
 
2277
        self.start_server(smart_server, self.get_server())
 
2278
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2279
        return remote_transport
 
2280
 
 
2281
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2282
        """Create a branch on the default transport and a MemoryTree for it."""
 
2283
        b = self.make_branch(relpath, format=format)
 
2284
        return memorytree.MemoryTree.create_on_branch(b)
 
2285
 
 
2286
    def make_branch_builder(self, relpath, format=None):
 
2287
        branch = self.make_branch(relpath, format=format)
 
2288
        return branchbuilder.BranchBuilder(branch=branch)
 
2289
 
 
2290
    def overrideEnvironmentForTesting(self):
 
2291
        os.environ['HOME'] = self.test_home_dir
 
2292
        os.environ['BZR_HOME'] = self.test_home_dir
 
2293
 
 
2294
    def setUp(self):
 
2295
        super(TestCaseWithMemoryTransport, self).setUp()
 
2296
        self._make_test_root()
 
2297
        _currentdir = os.getcwdu()
 
2298
        def _leaveDirectory():
 
2299
            os.chdir(_currentdir)
 
2300
        self.addCleanup(_leaveDirectory)
 
2301
        self.makeAndChdirToTestDir()
 
2302
        self.overrideEnvironmentForTesting()
 
2303
        self.__readonly_server = None
 
2304
        self.__server = None
 
2305
        self.reduceLockdirTimeout()
 
2306
 
 
2307
    def setup_smart_server_with_call_log(self):
 
2308
        """Sets up a smart server as the transport server with a call log."""
 
2309
        self.transport_server = server.SmartTCPServer_for_testing
 
2310
        self.hpss_calls = []
 
2311
        import traceback
 
2312
        # Skip the current stack down to the caller of
 
2313
        # setup_smart_server_with_call_log
 
2314
        prefix_length = len(traceback.extract_stack()) - 2
 
2315
        def capture_hpss_call(params):
 
2316
            self.hpss_calls.append(
 
2317
                CapturedCall(params, prefix_length))
 
2318
        client._SmartClient.hooks.install_named_hook(
 
2319
            'call', capture_hpss_call, None)
 
2320
 
 
2321
    def reset_smart_call_log(self):
 
2322
        self.hpss_calls = []
 
2323
 
 
2324
 
 
2325
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2326
    """Derived class that runs a test within a temporary directory.
 
2327
 
 
2328
    This is useful for tests that need to create a branch, etc.
 
2329
 
 
2330
    The directory is created in a slightly complex way: for each
 
2331
    Python invocation, a new temporary top-level directory is created.
 
2332
    All test cases create their own directory within that.  If the
 
2333
    tests complete successfully, the directory is removed.
 
2334
 
 
2335
    :ivar test_base_dir: The path of the top-level directory for this
 
2336
    test, which contains a home directory and a work directory.
 
2337
 
 
2338
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2339
    which is used as $HOME for this test.
 
2340
 
 
2341
    :ivar test_dir: A directory under test_base_dir used as the current
 
2342
    directory when the test proper is run.
 
2343
    """
 
2344
 
 
2345
    OVERRIDE_PYTHON = 'python'
 
2346
 
 
2347
    def check_file_contents(self, filename, expect):
 
2348
        self.log("check contents of file %s" % filename)
 
2349
        contents = file(filename, 'r').read()
 
2350
        if contents != expect:
 
2351
            self.log("expected: %r" % expect)
 
2352
            self.log("actually: %r" % contents)
 
2353
            self.fail("contents of %s not as expected" % filename)
 
2354
 
 
2355
    def _getTestDirPrefix(self):
 
2356
        # create a directory within the top level test directory
 
2357
        if sys.platform in ('win32', 'cygwin'):
 
2358
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2359
            # windows is likely to have path-length limits so use a short name
 
2360
            name_prefix = name_prefix[-30:]
 
2361
        else:
 
2362
            name_prefix = re.sub('[/]', '_', self.id())
 
2363
        return name_prefix
 
2364
 
 
2365
    def makeAndChdirToTestDir(self):
 
2366
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2367
 
 
2368
        For TestCaseInTempDir we create a temporary directory based on the test
 
2369
        name and then create two subdirs - test and home under it.
 
2370
        """
 
2371
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2372
            self._getTestDirPrefix())
 
2373
        name = name_prefix
 
2374
        for i in range(100):
 
2375
            if os.path.exists(name):
 
2376
                name = name_prefix + '_' + str(i)
 
2377
            else:
 
2378
                os.mkdir(name)
 
2379
                break
 
2380
        # now create test and home directories within this dir
 
2381
        self.test_base_dir = name
 
2382
        self.test_home_dir = self.test_base_dir + '/home'
 
2383
        os.mkdir(self.test_home_dir)
 
2384
        self.test_dir = self.test_base_dir + '/work'
 
2385
        os.mkdir(self.test_dir)
 
2386
        os.chdir(self.test_dir)
 
2387
        # put name of test inside
 
2388
        f = file(self.test_base_dir + '/name', 'w')
 
2389
        try:
 
2390
            f.write(self.id())
 
2391
        finally:
 
2392
            f.close()
 
2393
        self.addCleanup(self.deleteTestDir)
 
2394
 
 
2395
    def deleteTestDir(self):
 
2396
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2397
        _rmtree_temp_dir(self.test_base_dir)
 
2398
 
 
2399
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2400
        """Build a test tree according to a pattern.
 
2401
 
 
2402
        shape is a sequence of file specifications.  If the final
 
2403
        character is '/', a directory is created.
 
2404
 
 
2405
        This assumes that all the elements in the tree being built are new.
 
2406
 
 
2407
        This doesn't add anything to a branch.
 
2408
 
 
2409
        :type shape:    list or tuple.
 
2410
        :param line_endings: Either 'binary' or 'native'
 
2411
            in binary mode, exact contents are written in native mode, the
 
2412
            line endings match the default platform endings.
 
2413
        :param transport: A transport to write to, for building trees on VFS's.
 
2414
            If the transport is readonly or None, "." is opened automatically.
 
2415
        :return: None
 
2416
        """
 
2417
        if type(shape) not in (list, tuple):
 
2418
            raise AssertionError("Parameter 'shape' should be "
 
2419
                "a list or a tuple. Got %r instead" % (shape,))
 
2420
        # It's OK to just create them using forward slashes on windows.
 
2421
        if transport is None or transport.is_readonly():
 
2422
            transport = get_transport(".")
 
2423
        for name in shape:
 
2424
            self.assertIsInstance(name, basestring)
 
2425
            if name[-1] == '/':
 
2426
                transport.mkdir(urlutils.escape(name[:-1]))
 
2427
            else:
 
2428
                if line_endings == 'binary':
 
2429
                    end = '\n'
 
2430
                elif line_endings == 'native':
 
2431
                    end = os.linesep
 
2432
                else:
 
2433
                    raise errors.BzrError(
 
2434
                        'Invalid line ending request %r' % line_endings)
 
2435
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2436
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2437
 
 
2438
    def build_tree_contents(self, shape):
 
2439
        build_tree_contents(shape)
 
2440
 
 
2441
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2442
        """Assert whether path or paths are in the WorkingTree"""
 
2443
        if tree is None:
 
2444
            tree = workingtree.WorkingTree.open(root_path)
 
2445
        if not isinstance(path, basestring):
 
2446
            for p in path:
 
2447
                self.assertInWorkingTree(p, tree=tree)
 
2448
        else:
 
2449
            self.assertIsNot(tree.path2id(path), None,
 
2450
                path+' not in working tree.')
 
2451
 
 
2452
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2453
        """Assert whether path or paths are not in the WorkingTree"""
 
2454
        if tree is None:
 
2455
            tree = workingtree.WorkingTree.open(root_path)
 
2456
        if not isinstance(path, basestring):
 
2457
            for p in path:
 
2458
                self.assertNotInWorkingTree(p,tree=tree)
 
2459
        else:
 
2460
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2461
 
 
2462
 
 
2463
class TestCaseWithTransport(TestCaseInTempDir):
 
2464
    """A test case that provides get_url and get_readonly_url facilities.
 
2465
 
 
2466
    These back onto two transport servers, one for readonly access and one for
 
2467
    read write access.
 
2468
 
 
2469
    If no explicit class is provided for readonly access, a
 
2470
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2471
    based read write transports.
 
2472
 
 
2473
    If an explicit class is provided for readonly access, that server and the
 
2474
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2475
    """
 
2476
 
 
2477
    def get_vfs_only_server(self):
 
2478
        """See TestCaseWithMemoryTransport.
 
2479
 
 
2480
        This is useful for some tests with specific servers that need
 
2481
        diagnostics.
 
2482
        """
 
2483
        if self.__vfs_server is None:
 
2484
            self.__vfs_server = self.vfs_transport_factory()
 
2485
            self.start_server(self.__vfs_server)
 
2486
        return self.__vfs_server
 
2487
 
 
2488
    def make_branch_and_tree(self, relpath, format=None):
 
2489
        """Create a branch on the transport and a tree locally.
 
2490
 
 
2491
        If the transport is not a LocalTransport, the Tree can't be created on
 
2492
        the transport.  In that case if the vfs_transport_factory is
 
2493
        LocalURLServer the working tree is created in the local
 
2494
        directory backing the transport, and the returned tree's branch and
 
2495
        repository will also be accessed locally. Otherwise a lightweight
 
2496
        checkout is created and returned.
 
2497
 
 
2498
        We do this because we can't physically create a tree in the local
 
2499
        path, with a branch reference to the transport_factory url, and
 
2500
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2501
        namespace is distinct from the local disk - the two branch objects
 
2502
        would collide. While we could construct a tree with its branch object
 
2503
        pointing at the transport_factory transport in memory, reopening it
 
2504
        would behaving unexpectedly, and has in the past caused testing bugs
 
2505
        when we tried to do it that way.
 
2506
 
 
2507
        :param format: The BzrDirFormat.
 
2508
        :returns: the WorkingTree.
 
2509
        """
 
2510
        # TODO: always use the local disk path for the working tree,
 
2511
        # this obviously requires a format that supports branch references
 
2512
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2513
        # RBC 20060208
 
2514
        b = self.make_branch(relpath, format=format)
 
2515
        try:
 
2516
            return b.bzrdir.create_workingtree()
 
2517
        except errors.NotLocalUrl:
 
2518
            # We can only make working trees locally at the moment.  If the
 
2519
            # transport can't support them, then we keep the non-disk-backed
 
2520
            # branch and create a local checkout.
 
2521
            if self.vfs_transport_factory is LocalURLServer:
 
2522
                # the branch is colocated on disk, we cannot create a checkout.
 
2523
                # hopefully callers will expect this.
 
2524
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2525
                wt = local_controldir.create_workingtree()
 
2526
                if wt.branch._format != b._format:
 
2527
                    wt._branch = b
 
2528
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2529
                    # in case the implementation details of workingtree objects
 
2530
                    # change.
 
2531
                    self.assertIs(b, wt.branch)
 
2532
                return wt
 
2533
            else:
 
2534
                return b.create_checkout(relpath, lightweight=True)
 
2535
 
 
2536
    def assertIsDirectory(self, relpath, transport):
 
2537
        """Assert that relpath within transport is a directory.
 
2538
 
 
2539
        This may not be possible on all transports; in that case it propagates
 
2540
        a TransportNotPossible.
 
2541
        """
 
2542
        try:
 
2543
            mode = transport.stat(relpath).st_mode
 
2544
        except errors.NoSuchFile:
 
2545
            self.fail("path %s is not a directory; no such file"
 
2546
                      % (relpath))
 
2547
        if not stat.S_ISDIR(mode):
 
2548
            self.fail("path %s is not a directory; has mode %#o"
 
2549
                      % (relpath, mode))
 
2550
 
 
2551
    def assertTreesEqual(self, left, right):
 
2552
        """Check that left and right have the same content and properties."""
 
2553
        # we use a tree delta to check for equality of the content, and we
 
2554
        # manually check for equality of other things such as the parents list.
 
2555
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2556
        differences = left.changes_from(right)
 
2557
        self.assertFalse(differences.has_changed(),
 
2558
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2559
 
 
2560
    def setUp(self):
 
2561
        super(TestCaseWithTransport, self).setUp()
 
2562
        self.__vfs_server = None
 
2563
 
 
2564
 
 
2565
class ChrootedTestCase(TestCaseWithTransport):
 
2566
    """A support class that provides readonly urls outside the local namespace.
 
2567
 
 
2568
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2569
    is then we are chrooted already, if it is not then an HttpServer is used
 
2570
    for readonly urls.
 
2571
 
 
2572
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2573
                       be used without needed to redo it when a different
 
2574
                       subclass is in use ?
 
2575
    """
 
2576
 
 
2577
    def setUp(self):
 
2578
        super(ChrootedTestCase, self).setUp()
 
2579
        if not self.vfs_transport_factory == MemoryServer:
 
2580
            self.transport_readonly_server = HttpServer
 
2581
 
 
2582
 
 
2583
def condition_id_re(pattern):
 
2584
    """Create a condition filter which performs a re check on a test's id.
 
2585
 
 
2586
    :param pattern: A regular expression string.
 
2587
    :return: A callable that returns True if the re matches.
 
2588
    """
 
2589
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2590
        'test filter')
 
2591
    def condition(test):
 
2592
        test_id = test.id()
 
2593
        return filter_re.search(test_id)
 
2594
    return condition
 
2595
 
 
2596
 
 
2597
def condition_isinstance(klass_or_klass_list):
 
2598
    """Create a condition filter which returns isinstance(param, klass).
 
2599
 
 
2600
    :return: A callable which when called with one parameter obj return the
 
2601
        result of isinstance(obj, klass_or_klass_list).
 
2602
    """
 
2603
    def condition(obj):
 
2604
        return isinstance(obj, klass_or_klass_list)
 
2605
    return condition
 
2606
 
 
2607
 
 
2608
def condition_id_in_list(id_list):
 
2609
    """Create a condition filter which verify that test's id in a list.
 
2610
 
 
2611
    :param id_list: A TestIdList object.
 
2612
    :return: A callable that returns True if the test's id appears in the list.
 
2613
    """
 
2614
    def condition(test):
 
2615
        return id_list.includes(test.id())
 
2616
    return condition
 
2617
 
 
2618
 
 
2619
def condition_id_startswith(starts):
 
2620
    """Create a condition filter verifying that test's id starts with a string.
 
2621
 
 
2622
    :param starts: A list of string.
 
2623
    :return: A callable that returns True if the test's id starts with one of
 
2624
        the given strings.
 
2625
    """
 
2626
    def condition(test):
 
2627
        for start in starts:
 
2628
            if test.id().startswith(start):
 
2629
                return True
 
2630
        return False
 
2631
    return condition
 
2632
 
 
2633
 
 
2634
def exclude_tests_by_condition(suite, condition):
 
2635
    """Create a test suite which excludes some tests from suite.
 
2636
 
 
2637
    :param suite: The suite to get tests from.
 
2638
    :param condition: A callable whose result evaluates True when called with a
 
2639
        test case which should be excluded from the result.
 
2640
    :return: A suite which contains the tests found in suite that fail
 
2641
        condition.
 
2642
    """
 
2643
    result = []
 
2644
    for test in iter_suite_tests(suite):
 
2645
        if not condition(test):
 
2646
            result.append(test)
 
2647
    return TestUtil.TestSuite(result)
 
2648
 
 
2649
 
 
2650
def filter_suite_by_condition(suite, condition):
 
2651
    """Create a test suite by filtering another one.
 
2652
 
 
2653
    :param suite: The source suite.
 
2654
    :param condition: A callable whose result evaluates True when called with a
 
2655
        test case which should be included in the result.
 
2656
    :return: A suite which contains the tests found in suite that pass
 
2657
        condition.
 
2658
    """
 
2659
    result = []
 
2660
    for test in iter_suite_tests(suite):
 
2661
        if condition(test):
 
2662
            result.append(test)
 
2663
    return TestUtil.TestSuite(result)
 
2664
 
 
2665
 
 
2666
def filter_suite_by_re(suite, pattern):
 
2667
    """Create a test suite by filtering another one.
 
2668
 
 
2669
    :param suite:           the source suite
 
2670
    :param pattern:         pattern that names must match
 
2671
    :returns: the newly created suite
 
2672
    """
 
2673
    condition = condition_id_re(pattern)
 
2674
    result_suite = filter_suite_by_condition(suite, condition)
 
2675
    return result_suite
 
2676
 
 
2677
 
 
2678
def filter_suite_by_id_list(suite, test_id_list):
 
2679
    """Create a test suite by filtering another one.
 
2680
 
 
2681
    :param suite: The source suite.
 
2682
    :param test_id_list: A list of the test ids to keep as strings.
 
2683
    :returns: the newly created suite
 
2684
    """
 
2685
    condition = condition_id_in_list(test_id_list)
 
2686
    result_suite = filter_suite_by_condition(suite, condition)
 
2687
    return result_suite
 
2688
 
 
2689
 
 
2690
def filter_suite_by_id_startswith(suite, start):
 
2691
    """Create a test suite by filtering another one.
 
2692
 
 
2693
    :param suite: The source suite.
 
2694
    :param start: A list of string the test id must start with one of.
 
2695
    :returns: the newly created suite
 
2696
    """
 
2697
    condition = condition_id_startswith(start)
 
2698
    result_suite = filter_suite_by_condition(suite, condition)
 
2699
    return result_suite
 
2700
 
 
2701
 
 
2702
def exclude_tests_by_re(suite, pattern):
 
2703
    """Create a test suite which excludes some tests from suite.
 
2704
 
 
2705
    :param suite: The suite to get tests from.
 
2706
    :param pattern: A regular expression string. Test ids that match this
 
2707
        pattern will be excluded from the result.
 
2708
    :return: A TestSuite that contains all the tests from suite without the
 
2709
        tests that matched pattern. The order of tests is the same as it was in
 
2710
        suite.
 
2711
    """
 
2712
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2713
 
 
2714
 
 
2715
def preserve_input(something):
 
2716
    """A helper for performing test suite transformation chains.
 
2717
 
 
2718
    :param something: Anything you want to preserve.
 
2719
    :return: Something.
 
2720
    """
 
2721
    return something
 
2722
 
 
2723
 
 
2724
def randomize_suite(suite):
 
2725
    """Return a new TestSuite with suite's tests in random order.
 
2726
 
 
2727
    The tests in the input suite are flattened into a single suite in order to
 
2728
    accomplish this. Any nested TestSuites are removed to provide global
 
2729
    randomness.
 
2730
    """
 
2731
    tests = list(iter_suite_tests(suite))
 
2732
    random.shuffle(tests)
 
2733
    return TestUtil.TestSuite(tests)
 
2734
 
 
2735
 
 
2736
def split_suite_by_condition(suite, condition):
 
2737
    """Split a test suite into two by a condition.
 
2738
 
 
2739
    :param suite: The suite to split.
 
2740
    :param condition: The condition to match on. Tests that match this
 
2741
        condition are returned in the first test suite, ones that do not match
 
2742
        are in the second suite.
 
2743
    :return: A tuple of two test suites, where the first contains tests from
 
2744
        suite matching the condition, and the second contains the remainder
 
2745
        from suite. The order within each output suite is the same as it was in
 
2746
        suite.
 
2747
    """
 
2748
    matched = []
 
2749
    did_not_match = []
 
2750
    for test in iter_suite_tests(suite):
 
2751
        if condition(test):
 
2752
            matched.append(test)
 
2753
        else:
 
2754
            did_not_match.append(test)
 
2755
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2756
 
 
2757
 
 
2758
def split_suite_by_re(suite, pattern):
 
2759
    """Split a test suite into two by a regular expression.
 
2760
 
 
2761
    :param suite: The suite to split.
 
2762
    :param pattern: A regular expression string. Test ids that match this
 
2763
        pattern will be in the first test suite returned, and the others in the
 
2764
        second test suite returned.
 
2765
    :return: A tuple of two test suites, where the first contains tests from
 
2766
        suite matching pattern, and the second contains the remainder from
 
2767
        suite. The order within each output suite is the same as it was in
 
2768
        suite.
 
2769
    """
 
2770
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2771
 
 
2772
 
 
2773
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2774
              stop_on_failure=False,
 
2775
              transport=None, lsprof_timed=None, bench_history=None,
 
2776
              matching_tests_first=None,
 
2777
              list_only=False,
 
2778
              random_seed=None,
 
2779
              exclude_pattern=None,
 
2780
              strict=False,
 
2781
              runner_class=None,
 
2782
              suite_decorators=None,
 
2783
              stream=None,
 
2784
              result_decorators=None,
 
2785
              ):
 
2786
    """Run a test suite for bzr selftest.
 
2787
 
 
2788
    :param runner_class: The class of runner to use. Must support the
 
2789
        constructor arguments passed by run_suite which are more than standard
 
2790
        python uses.
 
2791
    :return: A boolean indicating success.
 
2792
    """
 
2793
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2794
    if verbose:
 
2795
        verbosity = 2
 
2796
    else:
 
2797
        verbosity = 1
 
2798
    if runner_class is None:
 
2799
        runner_class = TextTestRunner
 
2800
    if stream is None:
 
2801
        stream = sys.stdout
 
2802
    runner = runner_class(stream=stream,
 
2803
                            descriptions=0,
 
2804
                            verbosity=verbosity,
 
2805
                            bench_history=bench_history,
 
2806
                            strict=strict,
 
2807
                            result_decorators=result_decorators,
 
2808
                            )
 
2809
    runner.stop_on_failure=stop_on_failure
 
2810
    # built in decorator factories:
 
2811
    decorators = [
 
2812
        random_order(random_seed, runner),
 
2813
        exclude_tests(exclude_pattern),
 
2814
        ]
 
2815
    if matching_tests_first:
 
2816
        decorators.append(tests_first(pattern))
 
2817
    else:
 
2818
        decorators.append(filter_tests(pattern))
 
2819
    if suite_decorators:
 
2820
        decorators.extend(suite_decorators)
 
2821
    # tell the result object how many tests will be running: (except if
 
2822
    # --parallel=fork is being used. Robert said he will provide a better
 
2823
    # progress design later -- vila 20090817)
 
2824
    if fork_decorator not in decorators:
 
2825
        decorators.append(CountingDecorator)
 
2826
    for decorator in decorators:
 
2827
        suite = decorator(suite)
 
2828
    if list_only:
 
2829
        # Done after test suite decoration to allow randomisation etc
 
2830
        # to take effect, though that is of marginal benefit.
 
2831
        if verbosity >= 2:
 
2832
            stream.write("Listing tests only ...\n")
 
2833
        for t in iter_suite_tests(suite):
 
2834
            stream.write("%s\n" % (t.id()))
 
2835
        return True
 
2836
    result = runner.run(suite)
 
2837
    if strict:
 
2838
        return result.wasStrictlySuccessful()
 
2839
    else:
 
2840
        return result.wasSuccessful()
 
2841
 
 
2842
 
 
2843
# A registry where get() returns a suite decorator.
 
2844
parallel_registry = registry.Registry()
 
2845
 
 
2846
 
 
2847
def fork_decorator(suite):
 
2848
    concurrency = osutils.local_concurrency()
 
2849
    if concurrency == 1:
 
2850
        return suite
 
2851
    from testtools import ConcurrentTestSuite
 
2852
    return ConcurrentTestSuite(suite, fork_for_tests)
 
2853
parallel_registry.register('fork', fork_decorator)
 
2854
 
 
2855
 
 
2856
def subprocess_decorator(suite):
 
2857
    concurrency = osutils.local_concurrency()
 
2858
    if concurrency == 1:
 
2859
        return suite
 
2860
    from testtools import ConcurrentTestSuite
 
2861
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
2862
parallel_registry.register('subprocess', subprocess_decorator)
 
2863
 
 
2864
 
 
2865
def exclude_tests(exclude_pattern):
 
2866
    """Return a test suite decorator that excludes tests."""
 
2867
    if exclude_pattern is None:
 
2868
        return identity_decorator
 
2869
    def decorator(suite):
 
2870
        return ExcludeDecorator(suite, exclude_pattern)
 
2871
    return decorator
 
2872
 
 
2873
 
 
2874
def filter_tests(pattern):
 
2875
    if pattern == '.*':
 
2876
        return identity_decorator
 
2877
    def decorator(suite):
 
2878
        return FilterTestsDecorator(suite, pattern)
 
2879
    return decorator
 
2880
 
 
2881
 
 
2882
def random_order(random_seed, runner):
 
2883
    """Return a test suite decorator factory for randomising tests order.
 
2884
    
 
2885
    :param random_seed: now, a string which casts to a long, or a long.
 
2886
    :param runner: A test runner with a stream attribute to report on.
 
2887
    """
 
2888
    if random_seed is None:
 
2889
        return identity_decorator
 
2890
    def decorator(suite):
 
2891
        return RandomDecorator(suite, random_seed, runner.stream)
 
2892
    return decorator
 
2893
 
 
2894
 
 
2895
def tests_first(pattern):
 
2896
    if pattern == '.*':
 
2897
        return identity_decorator
 
2898
    def decorator(suite):
 
2899
        return TestFirstDecorator(suite, pattern)
 
2900
    return decorator
 
2901
 
 
2902
 
 
2903
def identity_decorator(suite):
 
2904
    """Return suite."""
 
2905
    return suite
 
2906
 
 
2907
 
 
2908
class TestDecorator(TestSuite):
 
2909
    """A decorator for TestCase/TestSuite objects.
 
2910
    
 
2911
    Usually, subclasses should override __iter__(used when flattening test
 
2912
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
2913
    debug().
 
2914
    """
 
2915
 
 
2916
    def __init__(self, suite):
 
2917
        TestSuite.__init__(self)
 
2918
        self.addTest(suite)
 
2919
 
 
2920
    def countTestCases(self):
 
2921
        cases = 0
 
2922
        for test in self:
 
2923
            cases += test.countTestCases()
 
2924
        return cases
 
2925
 
 
2926
    def debug(self):
 
2927
        for test in self:
 
2928
            test.debug()
 
2929
 
 
2930
    def run(self, result):
 
2931
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
2932
        # into __iter__.
 
2933
        for test in self:
 
2934
            if result.shouldStop:
 
2935
                break
 
2936
            test.run(result)
 
2937
        return result
 
2938
 
 
2939
 
 
2940
class CountingDecorator(TestDecorator):
 
2941
    """A decorator which calls result.progress(self.countTestCases)."""
 
2942
 
 
2943
    def run(self, result):
 
2944
        progress_method = getattr(result, 'progress', None)
 
2945
        if callable(progress_method):
 
2946
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
2947
        return super(CountingDecorator, self).run(result)
 
2948
 
 
2949
 
 
2950
class ExcludeDecorator(TestDecorator):
 
2951
    """A decorator which excludes test matching an exclude pattern."""
 
2952
 
 
2953
    def __init__(self, suite, exclude_pattern):
 
2954
        TestDecorator.__init__(self, suite)
 
2955
        self.exclude_pattern = exclude_pattern
 
2956
        self.excluded = False
 
2957
 
 
2958
    def __iter__(self):
 
2959
        if self.excluded:
 
2960
            return iter(self._tests)
 
2961
        self.excluded = True
 
2962
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
2963
        del self._tests[:]
 
2964
        self.addTests(suite)
 
2965
        return iter(self._tests)
 
2966
 
 
2967
 
 
2968
class FilterTestsDecorator(TestDecorator):
 
2969
    """A decorator which filters tests to those matching a pattern."""
 
2970
 
 
2971
    def __init__(self, suite, pattern):
 
2972
        TestDecorator.__init__(self, suite)
 
2973
        self.pattern = pattern
 
2974
        self.filtered = False
 
2975
 
 
2976
    def __iter__(self):
 
2977
        if self.filtered:
 
2978
            return iter(self._tests)
 
2979
        self.filtered = True
 
2980
        suite = filter_suite_by_re(self, self.pattern)
 
2981
        del self._tests[:]
 
2982
        self.addTests(suite)
 
2983
        return iter(self._tests)
 
2984
 
 
2985
 
 
2986
class RandomDecorator(TestDecorator):
 
2987
    """A decorator which randomises the order of its tests."""
 
2988
 
 
2989
    def __init__(self, suite, random_seed, stream):
 
2990
        TestDecorator.__init__(self, suite)
 
2991
        self.random_seed = random_seed
 
2992
        self.randomised = False
 
2993
        self.stream = stream
 
2994
 
 
2995
    def __iter__(self):
 
2996
        if self.randomised:
 
2997
            return iter(self._tests)
 
2998
        self.randomised = True
 
2999
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3000
            (self.actual_seed()))
 
3001
        # Initialise the random number generator.
 
3002
        random.seed(self.actual_seed())
 
3003
        suite = randomize_suite(self)
 
3004
        del self._tests[:]
 
3005
        self.addTests(suite)
 
3006
        return iter(self._tests)
 
3007
 
 
3008
    def actual_seed(self):
 
3009
        if self.random_seed == "now":
 
3010
            # We convert the seed to a long to make it reuseable across
 
3011
            # invocations (because the user can reenter it).
 
3012
            self.random_seed = long(time.time())
 
3013
        else:
 
3014
            # Convert the seed to a long if we can
 
3015
            try:
 
3016
                self.random_seed = long(self.random_seed)
 
3017
            except:
 
3018
                pass
 
3019
        return self.random_seed
 
3020
 
 
3021
 
 
3022
class TestFirstDecorator(TestDecorator):
 
3023
    """A decorator which moves named tests to the front."""
 
3024
 
 
3025
    def __init__(self, suite, pattern):
 
3026
        TestDecorator.__init__(self, suite)
 
3027
        self.pattern = pattern
 
3028
        self.filtered = False
 
3029
 
 
3030
    def __iter__(self):
 
3031
        if self.filtered:
 
3032
            return iter(self._tests)
 
3033
        self.filtered = True
 
3034
        suites = split_suite_by_re(self, self.pattern)
 
3035
        del self._tests[:]
 
3036
        self.addTests(suites)
 
3037
        return iter(self._tests)
 
3038
 
 
3039
 
 
3040
def partition_tests(suite, count):
 
3041
    """Partition suite into count lists of tests."""
 
3042
    result = []
 
3043
    tests = list(iter_suite_tests(suite))
 
3044
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3045
    for block in range(count):
 
3046
        low_test = block * tests_per_process
 
3047
        high_test = low_test + tests_per_process
 
3048
        process_tests = tests[low_test:high_test]
 
3049
        result.append(process_tests)
 
3050
    return result
 
3051
 
 
3052
 
 
3053
def fork_for_tests(suite):
 
3054
    """Take suite and start up one runner per CPU by forking()
 
3055
 
 
3056
    :return: An iterable of TestCase-like objects which can each have
 
3057
        run(result) called on them to feed tests to result.
 
3058
    """
 
3059
    concurrency = osutils.local_concurrency()
 
3060
    result = []
 
3061
    from subunit import TestProtocolClient, ProtocolTestCase
 
3062
    try:
 
3063
        from subunit.test_results import AutoTimingTestResultDecorator
 
3064
    except ImportError:
 
3065
        AutoTimingTestResultDecorator = lambda x:x
 
3066
    class TestInOtherProcess(ProtocolTestCase):
 
3067
        # Should be in subunit, I think. RBC.
 
3068
        def __init__(self, stream, pid):
 
3069
            ProtocolTestCase.__init__(self, stream)
 
3070
            self.pid = pid
 
3071
 
 
3072
        def run(self, result):
 
3073
            try:
 
3074
                ProtocolTestCase.run(self, result)
 
3075
            finally:
 
3076
                os.waitpid(self.pid, os.WNOHANG)
 
3077
 
 
3078
    test_blocks = partition_tests(suite, concurrency)
 
3079
    for process_tests in test_blocks:
 
3080
        process_suite = TestSuite()
 
3081
        process_suite.addTests(process_tests)
 
3082
        c2pread, c2pwrite = os.pipe()
 
3083
        pid = os.fork()
 
3084
        if pid == 0:
 
3085
            try:
 
3086
                os.close(c2pread)
 
3087
                # Leave stderr and stdout open so we can see test noise
 
3088
                # Close stdin so that the child goes away if it decides to
 
3089
                # read from stdin (otherwise its a roulette to see what
 
3090
                # child actually gets keystrokes for pdb etc).
 
3091
                sys.stdin.close()
 
3092
                sys.stdin = None
 
3093
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3094
                subunit_result = AutoTimingTestResultDecorator(
 
3095
                    TestProtocolClient(stream))
 
3096
                process_suite.run(subunit_result)
 
3097
            finally:
 
3098
                os._exit(0)
 
3099
        else:
 
3100
            os.close(c2pwrite)
 
3101
            stream = os.fdopen(c2pread, 'rb', 1)
 
3102
            test = TestInOtherProcess(stream, pid)
 
3103
            result.append(test)
 
3104
    return result
 
3105
 
 
3106
 
 
3107
def reinvoke_for_tests(suite):
 
3108
    """Take suite and start up one runner per CPU using subprocess().
 
3109
 
 
3110
    :return: An iterable of TestCase-like objects which can each have
 
3111
        run(result) called on them to feed tests to result.
 
3112
    """
 
3113
    concurrency = osutils.local_concurrency()
 
3114
    result = []
 
3115
    from subunit import ProtocolTestCase
 
3116
    class TestInSubprocess(ProtocolTestCase):
 
3117
        def __init__(self, process, name):
 
3118
            ProtocolTestCase.__init__(self, process.stdout)
 
3119
            self.process = process
 
3120
            self.process.stdin.close()
 
3121
            self.name = name
 
3122
 
 
3123
        def run(self, result):
 
3124
            try:
 
3125
                ProtocolTestCase.run(self, result)
 
3126
            finally:
 
3127
                self.process.wait()
 
3128
                os.unlink(self.name)
 
3129
            # print "pid %d finished" % finished_process
 
3130
    test_blocks = partition_tests(suite, concurrency)
 
3131
    for process_tests in test_blocks:
 
3132
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3133
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3134
        if not os.path.isfile(bzr_path):
 
3135
            # We are probably installed. Assume sys.argv is the right file
 
3136
            bzr_path = sys.argv[0]
 
3137
        fd, test_list_file_name = tempfile.mkstemp()
 
3138
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3139
        for test in process_tests:
 
3140
            test_list_file.write(test.id() + '\n')
 
3141
        test_list_file.close()
 
3142
        try:
 
3143
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3144
                '--subunit']
 
3145
            if '--no-plugins' in sys.argv:
 
3146
                argv.append('--no-plugins')
 
3147
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3148
            # stderr it can interrupt the subunit protocol.
 
3149
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3150
                bufsize=1)
 
3151
            test = TestInSubprocess(process, test_list_file_name)
 
3152
            result.append(test)
 
3153
        except:
 
3154
            os.unlink(test_list_file_name)
 
3155
            raise
 
3156
    return result
 
3157
 
 
3158
 
 
3159
class ForwardingResult(unittest.TestResult):
 
3160
 
 
3161
    def __init__(self, target):
 
3162
        unittest.TestResult.__init__(self)
 
3163
        self.result = target
 
3164
 
 
3165
    def startTest(self, test):
 
3166
        self.result.startTest(test)
 
3167
 
 
3168
    def stopTest(self, test):
 
3169
        self.result.stopTest(test)
 
3170
 
 
3171
    def startTestRun(self):
 
3172
        self.result.startTestRun()
 
3173
 
 
3174
    def stopTestRun(self):
 
3175
        self.result.stopTestRun()
 
3176
 
 
3177
    def addSkip(self, test, reason):
 
3178
        self.result.addSkip(test, reason)
 
3179
 
 
3180
    def addSuccess(self, test):
 
3181
        self.result.addSuccess(test)
 
3182
 
 
3183
    def addError(self, test, err):
 
3184
        self.result.addError(test, err)
 
3185
 
 
3186
    def addFailure(self, test, err):
 
3187
        self.result.addFailure(test, err)
 
3188
 
 
3189
 
 
3190
class BZRTransformingResult(ForwardingResult):
 
3191
 
 
3192
    def addError(self, test, err):
 
3193
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3194
        if feature is not None:
 
3195
            self.result.addNotSupported(test, feature)
 
3196
        else:
 
3197
            self.result.addError(test, err)
 
3198
 
 
3199
    def addFailure(self, test, err):
 
3200
        known = self._error_looks_like('KnownFailure: ', err)
 
3201
        if known is not None:
 
3202
            self.result._addKnownFailure(test, [KnownFailure,
 
3203
                                                KnownFailure(known), None])
 
3204
        else:
 
3205
            self.result.addFailure(test, err)
 
3206
 
 
3207
    def _error_looks_like(self, prefix, err):
 
3208
        """Deserialize exception and returns the stringify value."""
 
3209
        import subunit
 
3210
        value = None
 
3211
        typ, exc, _ = err
 
3212
        if isinstance(exc, subunit.RemoteException):
 
3213
            # stringify the exception gives access to the remote traceback
 
3214
            # We search the last line for 'prefix'
 
3215
            lines = str(exc).split('\n')
 
3216
            while lines and not lines[-1]:
 
3217
                lines.pop(-1)
 
3218
            if lines:
 
3219
                if lines[-1].startswith(prefix):
 
3220
                    value = lines[-1][len(prefix):]
 
3221
        return value
 
3222
 
 
3223
 
 
3224
class ProfileResult(ForwardingResult):
 
3225
    """Generate profiling data for all activity between start and success.
 
3226
    
 
3227
    The profile data is appended to the test's _benchcalls attribute and can
 
3228
    be accessed by the forwarded-to TestResult.
 
3229
 
 
3230
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3231
    where our existing output support for lsprof is, and this class aims to
 
3232
    fit in with that: while it could be moved it's not necessary to accomplish
 
3233
    test profiling, nor would it be dramatically cleaner.
 
3234
    """
 
3235
 
 
3236
    def startTest(self, test):
 
3237
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3238
        self.profiler.start()
 
3239
        ForwardingResult.startTest(self, test)
 
3240
 
 
3241
    def addSuccess(self, test):
 
3242
        stats = self.profiler.stop()
 
3243
        try:
 
3244
            calls = test._benchcalls
 
3245
        except AttributeError:
 
3246
            test._benchcalls = []
 
3247
            calls = test._benchcalls
 
3248
        calls.append(((test.id(), "", ""), stats))
 
3249
        ForwardingResult.addSuccess(self, test)
 
3250
 
 
3251
    def stopTest(self, test):
 
3252
        ForwardingResult.stopTest(self, test)
 
3253
        self.profiler = None
 
3254
 
 
3255
 
 
3256
# Controlled by "bzr selftest -E=..." option
 
3257
# Currently supported:
 
3258
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3259
#                           preserves any flags supplied at the command line.
 
3260
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3261
#                           rather than failing tests. And no longer raise
 
3262
#                           LockContention when fctnl locks are not being used
 
3263
#                           with proper exclusion rules.
 
3264
selftest_debug_flags = set()
 
3265
 
 
3266
 
 
3267
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3268
             transport=None,
 
3269
             test_suite_factory=None,
 
3270
             lsprof_timed=None,
 
3271
             bench_history=None,
 
3272
             matching_tests_first=None,
 
3273
             list_only=False,
 
3274
             random_seed=None,
 
3275
             exclude_pattern=None,
 
3276
             strict=False,
 
3277
             load_list=None,
 
3278
             debug_flags=None,
 
3279
             starting_with=None,
 
3280
             runner_class=None,
 
3281
             suite_decorators=None,
 
3282
             stream=None,
 
3283
             lsprof_tests=False,
 
3284
             ):
 
3285
    """Run the whole test suite under the enhanced runner"""
 
3286
    # XXX: Very ugly way to do this...
 
3287
    # Disable warning about old formats because we don't want it to disturb
 
3288
    # any blackbox tests.
 
3289
    from bzrlib import repository
 
3290
    repository._deprecation_warning_done = True
 
3291
 
 
3292
    global default_transport
 
3293
    if transport is None:
 
3294
        transport = default_transport
 
3295
    old_transport = default_transport
 
3296
    default_transport = transport
 
3297
    global selftest_debug_flags
 
3298
    old_debug_flags = selftest_debug_flags
 
3299
    if debug_flags is not None:
 
3300
        selftest_debug_flags = set(debug_flags)
 
3301
    try:
 
3302
        if load_list is None:
 
3303
            keep_only = None
 
3304
        else:
 
3305
            keep_only = load_test_id_list(load_list)
 
3306
        if starting_with:
 
3307
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3308
                             for start in starting_with]
 
3309
        if test_suite_factory is None:
 
3310
            # Reduce loading time by loading modules based on the starting_with
 
3311
            # patterns.
 
3312
            suite = test_suite(keep_only, starting_with)
 
3313
        else:
 
3314
            suite = test_suite_factory()
 
3315
        if starting_with:
 
3316
            # But always filter as requested.
 
3317
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3318
        result_decorators = []
 
3319
        if lsprof_tests:
 
3320
            result_decorators.append(ProfileResult)
 
3321
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3322
                     stop_on_failure=stop_on_failure,
 
3323
                     transport=transport,
 
3324
                     lsprof_timed=lsprof_timed,
 
3325
                     bench_history=bench_history,
 
3326
                     matching_tests_first=matching_tests_first,
 
3327
                     list_only=list_only,
 
3328
                     random_seed=random_seed,
 
3329
                     exclude_pattern=exclude_pattern,
 
3330
                     strict=strict,
 
3331
                     runner_class=runner_class,
 
3332
                     suite_decorators=suite_decorators,
 
3333
                     stream=stream,
 
3334
                     result_decorators=result_decorators,
 
3335
                     )
 
3336
    finally:
 
3337
        default_transport = old_transport
 
3338
        selftest_debug_flags = old_debug_flags
 
3339
 
 
3340
 
 
3341
def load_test_id_list(file_name):
 
3342
    """Load a test id list from a text file.
 
3343
 
 
3344
    The format is one test id by line.  No special care is taken to impose
 
3345
    strict rules, these test ids are used to filter the test suite so a test id
 
3346
    that do not match an existing test will do no harm. This allows user to add
 
3347
    comments, leave blank lines, etc.
 
3348
    """
 
3349
    test_list = []
 
3350
    try:
 
3351
        ftest = open(file_name, 'rt')
 
3352
    except IOError, e:
 
3353
        if e.errno != errno.ENOENT:
 
3354
            raise
 
3355
        else:
 
3356
            raise errors.NoSuchFile(file_name)
 
3357
 
 
3358
    for test_name in ftest.readlines():
 
3359
        test_list.append(test_name.strip())
 
3360
    ftest.close()
 
3361
    return test_list
 
3362
 
 
3363
 
 
3364
def suite_matches_id_list(test_suite, id_list):
 
3365
    """Warns about tests not appearing or appearing more than once.
 
3366
 
 
3367
    :param test_suite: A TestSuite object.
 
3368
    :param test_id_list: The list of test ids that should be found in
 
3369
         test_suite.
 
3370
 
 
3371
    :return: (absents, duplicates) absents is a list containing the test found
 
3372
        in id_list but not in test_suite, duplicates is a list containing the
 
3373
        test found multiple times in test_suite.
 
3374
 
 
3375
    When using a prefined test id list, it may occurs that some tests do not
 
3376
    exist anymore or that some tests use the same id. This function warns the
 
3377
    tester about potential problems in his workflow (test lists are volatile)
 
3378
    or in the test suite itself (using the same id for several tests does not
 
3379
    help to localize defects).
 
3380
    """
 
3381
    # Build a dict counting id occurrences
 
3382
    tests = dict()
 
3383
    for test in iter_suite_tests(test_suite):
 
3384
        id = test.id()
 
3385
        tests[id] = tests.get(id, 0) + 1
 
3386
 
 
3387
    not_found = []
 
3388
    duplicates = []
 
3389
    for id in id_list:
 
3390
        occurs = tests.get(id, 0)
 
3391
        if not occurs:
 
3392
            not_found.append(id)
 
3393
        elif occurs > 1:
 
3394
            duplicates.append(id)
 
3395
 
 
3396
    return not_found, duplicates
 
3397
 
 
3398
 
 
3399
class TestIdList(object):
 
3400
    """Test id list to filter a test suite.
 
3401
 
 
3402
    Relying on the assumption that test ids are built as:
 
3403
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3404
    notation, this class offers methods to :
 
3405
    - avoid building a test suite for modules not refered to in the test list,
 
3406
    - keep only the tests listed from the module test suite.
 
3407
    """
 
3408
 
 
3409
    def __init__(self, test_id_list):
 
3410
        # When a test suite needs to be filtered against us we compare test ids
 
3411
        # for equality, so a simple dict offers a quick and simple solution.
 
3412
        self.tests = dict().fromkeys(test_id_list, True)
 
3413
 
 
3414
        # While unittest.TestCase have ids like:
 
3415
        # <module>.<class>.<method>[(<param+)],
 
3416
        # doctest.DocTestCase can have ids like:
 
3417
        # <module>
 
3418
        # <module>.<class>
 
3419
        # <module>.<function>
 
3420
        # <module>.<class>.<method>
 
3421
 
 
3422
        # Since we can't predict a test class from its name only, we settle on
 
3423
        # a simple constraint: a test id always begins with its module name.
 
3424
 
 
3425
        modules = {}
 
3426
        for test_id in test_id_list:
 
3427
            parts = test_id.split('.')
 
3428
            mod_name = parts.pop(0)
 
3429
            modules[mod_name] = True
 
3430
            for part in parts:
 
3431
                mod_name += '.' + part
 
3432
                modules[mod_name] = True
 
3433
        self.modules = modules
 
3434
 
 
3435
    def refers_to(self, module_name):
 
3436
        """Is there tests for the module or one of its sub modules."""
 
3437
        return self.modules.has_key(module_name)
 
3438
 
 
3439
    def includes(self, test_id):
 
3440
        return self.tests.has_key(test_id)
 
3441
 
 
3442
 
 
3443
class TestPrefixAliasRegistry(registry.Registry):
 
3444
    """A registry for test prefix aliases.
 
3445
 
 
3446
    This helps implement shorcuts for the --starting-with selftest
 
3447
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3448
    warning will be emitted).
 
3449
    """
 
3450
 
 
3451
    def register(self, key, obj, help=None, info=None,
 
3452
                 override_existing=False):
 
3453
        """See Registry.register.
 
3454
 
 
3455
        Trying to override an existing alias causes a warning to be emitted,
 
3456
        not a fatal execption.
 
3457
        """
 
3458
        try:
 
3459
            super(TestPrefixAliasRegistry, self).register(
 
3460
                key, obj, help=help, info=info, override_existing=False)
 
3461
        except KeyError:
 
3462
            actual = self.get(key)
 
3463
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3464
                 % (key, actual, obj))
 
3465
 
 
3466
    def resolve_alias(self, id_start):
 
3467
        """Replace the alias by the prefix in the given string.
 
3468
 
 
3469
        Using an unknown prefix is an error to help catching typos.
 
3470
        """
 
3471
        parts = id_start.split('.')
 
3472
        try:
 
3473
            parts[0] = self.get(parts[0])
 
3474
        except KeyError:
 
3475
            raise errors.BzrCommandError(
 
3476
                '%s is not a known test prefix alias' % parts[0])
 
3477
        return '.'.join(parts)
 
3478
 
 
3479
 
 
3480
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3481
"""Registry of test prefix aliases."""
 
3482
 
 
3483
 
 
3484
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3485
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3486
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3487
 
 
3488
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3489
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3490
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3491
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3492
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3493
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3494
 
 
3495
 
 
3496
def _test_suite_testmod_names():
 
3497
    """Return the standard list of test module names to test."""
 
3498
    return [
 
3499
        'bzrlib.doc',
 
3500
        'bzrlib.tests.blackbox',
 
3501
        'bzrlib.tests.commands',
 
3502
        'bzrlib.tests.per_branch',
 
3503
        'bzrlib.tests.per_bzrdir',
 
3504
        'bzrlib.tests.per_interrepository',
 
3505
        'bzrlib.tests.per_intertree',
 
3506
        'bzrlib.tests.per_inventory',
 
3507
        'bzrlib.tests.per_interbranch',
 
3508
        'bzrlib.tests.per_lock',
 
3509
        'bzrlib.tests.per_transport',
 
3510
        'bzrlib.tests.per_tree',
 
3511
        'bzrlib.tests.per_pack_repository',
 
3512
        'bzrlib.tests.per_repository',
 
3513
        'bzrlib.tests.per_repository_chk',
 
3514
        'bzrlib.tests.per_repository_reference',
 
3515
        'bzrlib.tests.per_versionedfile',
 
3516
        'bzrlib.tests.per_workingtree',
 
3517
        'bzrlib.tests.test__annotator',
 
3518
        'bzrlib.tests.test__chk_map',
 
3519
        'bzrlib.tests.test__dirstate_helpers',
 
3520
        'bzrlib.tests.test__groupcompress',
 
3521
        'bzrlib.tests.test__known_graph',
 
3522
        'bzrlib.tests.test__rio',
 
3523
        'bzrlib.tests.test__walkdirs_win32',
 
3524
        'bzrlib.tests.test_ancestry',
 
3525
        'bzrlib.tests.test_annotate',
 
3526
        'bzrlib.tests.test_api',
 
3527
        'bzrlib.tests.test_atomicfile',
 
3528
        'bzrlib.tests.test_bad_files',
 
3529
        'bzrlib.tests.test_bencode',
 
3530
        'bzrlib.tests.test_bisect_multi',
 
3531
        'bzrlib.tests.test_branch',
 
3532
        'bzrlib.tests.test_branchbuilder',
 
3533
        'bzrlib.tests.test_btree_index',
 
3534
        'bzrlib.tests.test_bugtracker',
 
3535
        'bzrlib.tests.test_bundle',
 
3536
        'bzrlib.tests.test_bzrdir',
 
3537
        'bzrlib.tests.test__chunks_to_lines',
 
3538
        'bzrlib.tests.test_cache_utf8',
 
3539
        'bzrlib.tests.test_chk_map',
 
3540
        'bzrlib.tests.test_chk_serializer',
 
3541
        'bzrlib.tests.test_chunk_writer',
 
3542
        'bzrlib.tests.test_clean_tree',
 
3543
        'bzrlib.tests.test_commands',
 
3544
        'bzrlib.tests.test_commit',
 
3545
        'bzrlib.tests.test_commit_merge',
 
3546
        'bzrlib.tests.test_config',
 
3547
        'bzrlib.tests.test_conflicts',
 
3548
        'bzrlib.tests.test_counted_lock',
 
3549
        'bzrlib.tests.test_crash',
 
3550
        'bzrlib.tests.test_decorators',
 
3551
        'bzrlib.tests.test_delta',
 
3552
        'bzrlib.tests.test_debug',
 
3553
        'bzrlib.tests.test_deprecated_graph',
 
3554
        'bzrlib.tests.test_diff',
 
3555
        'bzrlib.tests.test_directory_service',
 
3556
        'bzrlib.tests.test_dirstate',
 
3557
        'bzrlib.tests.test_email_message',
 
3558
        'bzrlib.tests.test_eol_filters',
 
3559
        'bzrlib.tests.test_errors',
 
3560
        'bzrlib.tests.test_export',
 
3561
        'bzrlib.tests.test_extract',
 
3562
        'bzrlib.tests.test_fetch',
 
3563
        'bzrlib.tests.test_fifo_cache',
 
3564
        'bzrlib.tests.test_filters',
 
3565
        'bzrlib.tests.test_ftp_transport',
 
3566
        'bzrlib.tests.test_foreign',
 
3567
        'bzrlib.tests.test_generate_docs',
 
3568
        'bzrlib.tests.test_generate_ids',
 
3569
        'bzrlib.tests.test_globbing',
 
3570
        'bzrlib.tests.test_gpg',
 
3571
        'bzrlib.tests.test_graph',
 
3572
        'bzrlib.tests.test_groupcompress',
 
3573
        'bzrlib.tests.test_hashcache',
 
3574
        'bzrlib.tests.test_help',
 
3575
        'bzrlib.tests.test_hooks',
 
3576
        'bzrlib.tests.test_http',
 
3577
        'bzrlib.tests.test_http_response',
 
3578
        'bzrlib.tests.test_https_ca_bundle',
 
3579
        'bzrlib.tests.test_identitymap',
 
3580
        'bzrlib.tests.test_ignores',
 
3581
        'bzrlib.tests.test_index',
 
3582
        'bzrlib.tests.test_info',
 
3583
        'bzrlib.tests.test_inv',
 
3584
        'bzrlib.tests.test_inventory_delta',
 
3585
        'bzrlib.tests.test_knit',
 
3586
        'bzrlib.tests.test_lazy_import',
 
3587
        'bzrlib.tests.test_lazy_regex',
 
3588
        'bzrlib.tests.test_lock',
 
3589
        'bzrlib.tests.test_lockable_files',
 
3590
        'bzrlib.tests.test_lockdir',
 
3591
        'bzrlib.tests.test_log',
 
3592
        'bzrlib.tests.test_lru_cache',
 
3593
        'bzrlib.tests.test_lsprof',
 
3594
        'bzrlib.tests.test_mail_client',
 
3595
        'bzrlib.tests.test_memorytree',
 
3596
        'bzrlib.tests.test_merge',
 
3597
        'bzrlib.tests.test_merge3',
 
3598
        'bzrlib.tests.test_merge_core',
 
3599
        'bzrlib.tests.test_merge_directive',
 
3600
        'bzrlib.tests.test_missing',
 
3601
        'bzrlib.tests.test_msgeditor',
 
3602
        'bzrlib.tests.test_multiparent',
 
3603
        'bzrlib.tests.test_mutabletree',
 
3604
        'bzrlib.tests.test_nonascii',
 
3605
        'bzrlib.tests.test_options',
 
3606
        'bzrlib.tests.test_osutils',
 
3607
        'bzrlib.tests.test_osutils_encodings',
 
3608
        'bzrlib.tests.test_pack',
 
3609
        'bzrlib.tests.test_patch',
 
3610
        'bzrlib.tests.test_patches',
 
3611
        'bzrlib.tests.test_permissions',
 
3612
        'bzrlib.tests.test_plugins',
 
3613
        'bzrlib.tests.test_progress',
 
3614
        'bzrlib.tests.test_read_bundle',
 
3615
        'bzrlib.tests.test_reconcile',
 
3616
        'bzrlib.tests.test_reconfigure',
 
3617
        'bzrlib.tests.test_registry',
 
3618
        'bzrlib.tests.test_remote',
 
3619
        'bzrlib.tests.test_rename_map',
 
3620
        'bzrlib.tests.test_repository',
 
3621
        'bzrlib.tests.test_revert',
 
3622
        'bzrlib.tests.test_revision',
 
3623
        'bzrlib.tests.test_revisionspec',
 
3624
        'bzrlib.tests.test_revisiontree',
 
3625
        'bzrlib.tests.test_rio',
 
3626
        'bzrlib.tests.test_rules',
 
3627
        'bzrlib.tests.test_sampler',
 
3628
        'bzrlib.tests.test_selftest',
 
3629
        'bzrlib.tests.test_serializer',
 
3630
        'bzrlib.tests.test_setup',
 
3631
        'bzrlib.tests.test_sftp_transport',
 
3632
        'bzrlib.tests.test_shelf',
 
3633
        'bzrlib.tests.test_shelf_ui',
 
3634
        'bzrlib.tests.test_smart',
 
3635
        'bzrlib.tests.test_smart_add',
 
3636
        'bzrlib.tests.test_smart_request',
 
3637
        'bzrlib.tests.test_smart_transport',
 
3638
        'bzrlib.tests.test_smtp_connection',
 
3639
        'bzrlib.tests.test_source',
 
3640
        'bzrlib.tests.test_ssh_transport',
 
3641
        'bzrlib.tests.test_status',
 
3642
        'bzrlib.tests.test_store',
 
3643
        'bzrlib.tests.test_strace',
 
3644
        'bzrlib.tests.test_subsume',
 
3645
        'bzrlib.tests.test_switch',
 
3646
        'bzrlib.tests.test_symbol_versioning',
 
3647
        'bzrlib.tests.test_tag',
 
3648
        'bzrlib.tests.test_testament',
 
3649
        'bzrlib.tests.test_textfile',
 
3650
        'bzrlib.tests.test_textmerge',
 
3651
        'bzrlib.tests.test_timestamp',
 
3652
        'bzrlib.tests.test_trace',
 
3653
        'bzrlib.tests.test_transactions',
 
3654
        'bzrlib.tests.test_transform',
 
3655
        'bzrlib.tests.test_transport',
 
3656
        'bzrlib.tests.test_transport_log',
 
3657
        'bzrlib.tests.test_tree',
 
3658
        'bzrlib.tests.test_treebuilder',
 
3659
        'bzrlib.tests.test_tsort',
 
3660
        'bzrlib.tests.test_tuned_gzip',
 
3661
        'bzrlib.tests.test_ui',
 
3662
        'bzrlib.tests.test_uncommit',
 
3663
        'bzrlib.tests.test_upgrade',
 
3664
        'bzrlib.tests.test_upgrade_stacked',
 
3665
        'bzrlib.tests.test_urlutils',
 
3666
        'bzrlib.tests.test_version',
 
3667
        'bzrlib.tests.test_version_info',
 
3668
        'bzrlib.tests.test_weave',
 
3669
        'bzrlib.tests.test_whitebox',
 
3670
        'bzrlib.tests.test_win32utils',
 
3671
        'bzrlib.tests.test_workingtree',
 
3672
        'bzrlib.tests.test_workingtree_4',
 
3673
        'bzrlib.tests.test_wsgi',
 
3674
        'bzrlib.tests.test_xml',
 
3675
        ]
 
3676
 
 
3677
 
 
3678
def _test_suite_modules_to_doctest():
 
3679
    """Return the list of modules to doctest."""   
 
3680
    return [
 
3681
        'bzrlib',
 
3682
        'bzrlib.branchbuilder',
 
3683
        'bzrlib.export',
 
3684
        'bzrlib.inventory',
 
3685
        'bzrlib.iterablefile',
 
3686
        'bzrlib.lockdir',
 
3687
        'bzrlib.merge3',
 
3688
        'bzrlib.option',
 
3689
        'bzrlib.symbol_versioning',
 
3690
        'bzrlib.tests',
 
3691
        'bzrlib.timestamp',
 
3692
        'bzrlib.version_info_formats.format_custom',
 
3693
        ]
 
3694
 
 
3695
 
 
3696
def test_suite(keep_only=None, starting_with=None):
 
3697
    """Build and return TestSuite for the whole of bzrlib.
 
3698
 
 
3699
    :param keep_only: A list of test ids limiting the suite returned.
 
3700
 
 
3701
    :param starting_with: An id limiting the suite returned to the tests
 
3702
         starting with it.
 
3703
 
 
3704
    This function can be replaced if you need to change the default test
 
3705
    suite on a global basis, but it is not encouraged.
 
3706
    """
 
3707
 
 
3708
    loader = TestUtil.TestLoader()
 
3709
 
 
3710
    if keep_only is not None:
 
3711
        id_filter = TestIdList(keep_only)
 
3712
    if starting_with:
 
3713
        # We take precedence over keep_only because *at loading time* using
 
3714
        # both options means we will load less tests for the same final result.
 
3715
        def interesting_module(name):
 
3716
            for start in starting_with:
 
3717
                if (
 
3718
                    # Either the module name starts with the specified string
 
3719
                    name.startswith(start)
 
3720
                    # or it may contain tests starting with the specified string
 
3721
                    or start.startswith(name)
 
3722
                    ):
 
3723
                    return True
 
3724
            return False
 
3725
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3726
 
 
3727
    elif keep_only is not None:
 
3728
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3729
        def interesting_module(name):
 
3730
            return id_filter.refers_to(name)
 
3731
 
 
3732
    else:
 
3733
        loader = TestUtil.TestLoader()
 
3734
        def interesting_module(name):
 
3735
            # No filtering, all modules are interesting
 
3736
            return True
 
3737
 
 
3738
    suite = loader.suiteClass()
 
3739
 
 
3740
    # modules building their suite with loadTestsFromModuleNames
 
3741
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
3742
 
 
3743
    for mod in _test_suite_modules_to_doctest():
 
3744
        if not interesting_module(mod):
 
3745
            # No tests to keep here, move along
 
3746
            continue
 
3747
        try:
 
3748
            # note that this really does mean "report only" -- doctest
 
3749
            # still runs the rest of the examples
 
3750
            doc_suite = doctest.DocTestSuite(mod,
 
3751
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3752
        except ValueError, e:
 
3753
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3754
            raise
 
3755
        if len(doc_suite._tests) == 0:
 
3756
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3757
        suite.addTest(doc_suite)
 
3758
 
 
3759
    default_encoding = sys.getdefaultencoding()
 
3760
    for name, plugin in bzrlib.plugin.plugins().items():
 
3761
        if not interesting_module(plugin.module.__name__):
 
3762
            continue
 
3763
        plugin_suite = plugin.test_suite()
 
3764
        # We used to catch ImportError here and turn it into just a warning,
 
3765
        # but really if you don't have --no-plugins this should be a failure.
 
3766
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3767
        if plugin_suite is None:
 
3768
            plugin_suite = plugin.load_plugin_tests(loader)
 
3769
        if plugin_suite is not None:
 
3770
            suite.addTest(plugin_suite)
 
3771
        if default_encoding != sys.getdefaultencoding():
 
3772
            bzrlib.trace.warning(
 
3773
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3774
                sys.getdefaultencoding())
 
3775
            reload(sys)
 
3776
            sys.setdefaultencoding(default_encoding)
 
3777
 
 
3778
    if keep_only is not None:
 
3779
        # Now that the referred modules have loaded their tests, keep only the
 
3780
        # requested ones.
 
3781
        suite = filter_suite_by_id_list(suite, id_filter)
 
3782
        # Do some sanity checks on the id_list filtering
 
3783
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3784
        if starting_with:
 
3785
            # The tester has used both keep_only and starting_with, so he is
 
3786
            # already aware that some tests are excluded from the list, there
 
3787
            # is no need to tell him which.
 
3788
            pass
 
3789
        else:
 
3790
            # Some tests mentioned in the list are not in the test suite. The
 
3791
            # list may be out of date, report to the tester.
 
3792
            for id in not_found:
 
3793
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3794
        for id in duplicates:
 
3795
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3796
 
 
3797
    return suite
 
3798
 
 
3799
 
 
3800
def multiply_scenarios(scenarios_left, scenarios_right):
 
3801
    """Multiply two sets of scenarios.
 
3802
 
 
3803
    :returns: the cartesian product of the two sets of scenarios, that is
 
3804
        a scenario for every possible combination of a left scenario and a
 
3805
        right scenario.
 
3806
    """
 
3807
    return [
 
3808
        ('%s,%s' % (left_name, right_name),
 
3809
         dict(left_dict.items() + right_dict.items()))
 
3810
        for left_name, left_dict in scenarios_left
 
3811
        for right_name, right_dict in scenarios_right]
 
3812
 
 
3813
 
 
3814
def multiply_tests(tests, scenarios, result):
 
3815
    """Multiply tests_list by scenarios into result.
 
3816
 
 
3817
    This is the core workhorse for test parameterisation.
 
3818
 
 
3819
    Typically the load_tests() method for a per-implementation test suite will
 
3820
    call multiply_tests and return the result.
 
3821
 
 
3822
    :param tests: The tests to parameterise.
 
3823
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
3824
        scenario_param_dict).
 
3825
    :param result: A TestSuite to add created tests to.
 
3826
 
 
3827
    This returns the passed in result TestSuite with the cross product of all
 
3828
    the tests repeated once for each scenario.  Each test is adapted by adding
 
3829
    the scenario name at the end of its id(), and updating the test object's
 
3830
    __dict__ with the scenario_param_dict.
 
3831
 
 
3832
    >>> import bzrlib.tests.test_sampler
 
3833
    >>> r = multiply_tests(
 
3834
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
3835
    ...     [('one', dict(param=1)),
 
3836
    ...      ('two', dict(param=2))],
 
3837
    ...     TestSuite())
 
3838
    >>> tests = list(iter_suite_tests(r))
 
3839
    >>> len(tests)
 
3840
    2
 
3841
    >>> tests[0].id()
 
3842
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
3843
    >>> tests[0].param
 
3844
    1
 
3845
    >>> tests[1].param
 
3846
    2
 
3847
    """
 
3848
    for test in iter_suite_tests(tests):
 
3849
        apply_scenarios(test, scenarios, result)
 
3850
    return result
 
3851
 
 
3852
 
 
3853
def apply_scenarios(test, scenarios, result):
 
3854
    """Apply the scenarios in scenarios to test and add to result.
 
3855
 
 
3856
    :param test: The test to apply scenarios to.
 
3857
    :param scenarios: An iterable of scenarios to apply to test.
 
3858
    :return: result
 
3859
    :seealso: apply_scenario
 
3860
    """
 
3861
    for scenario in scenarios:
 
3862
        result.addTest(apply_scenario(test, scenario))
 
3863
    return result
 
3864
 
 
3865
 
 
3866
def apply_scenario(test, scenario):
 
3867
    """Copy test and apply scenario to it.
 
3868
 
 
3869
    :param test: A test to adapt.
 
3870
    :param scenario: A tuple describing the scenarion.
 
3871
        The first element of the tuple is the new test id.
 
3872
        The second element is a dict containing attributes to set on the
 
3873
        test.
 
3874
    :return: The adapted test.
 
3875
    """
 
3876
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
3877
    new_test = clone_test(test, new_id)
 
3878
    for name, value in scenario[1].items():
 
3879
        setattr(new_test, name, value)
 
3880
    return new_test
 
3881
 
 
3882
 
 
3883
def clone_test(test, new_id):
 
3884
    """Clone a test giving it a new id.
 
3885
 
 
3886
    :param test: The test to clone.
 
3887
    :param new_id: The id to assign to it.
 
3888
    :return: The new test.
 
3889
    """
 
3890
    new_test = copy(test)
 
3891
    new_test.id = lambda: new_id
 
3892
    return new_test
 
3893
 
 
3894
 
 
3895
def _rmtree_temp_dir(dirname):
 
3896
    # If LANG=C we probably have created some bogus paths
 
3897
    # which rmtree(unicode) will fail to delete
 
3898
    # so make sure we are using rmtree(str) to delete everything
 
3899
    # except on win32, where rmtree(str) will fail
 
3900
    # since it doesn't have the property of byte-stream paths
 
3901
    # (they are either ascii or mbcs)
 
3902
    if sys.platform == 'win32':
 
3903
        # make sure we are using the unicode win32 api
 
3904
        dirname = unicode(dirname)
 
3905
    else:
 
3906
        dirname = dirname.encode(sys.getfilesystemencoding())
 
3907
    try:
 
3908
        osutils.rmtree(dirname)
 
3909
    except OSError, e:
 
3910
        # We don't want to fail here because some useful display will be lost
 
3911
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
3912
        # possible info to the test runner is even worse.
 
3913
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
3914
                         % (os.path.basename(dirname), e))
 
3915
 
 
3916
 
 
3917
class Feature(object):
 
3918
    """An operating system Feature."""
 
3919
 
 
3920
    def __init__(self):
 
3921
        self._available = None
 
3922
 
 
3923
    def available(self):
 
3924
        """Is the feature available?
 
3925
 
 
3926
        :return: True if the feature is available.
 
3927
        """
 
3928
        if self._available is None:
 
3929
            self._available = self._probe()
 
3930
        return self._available
 
3931
 
 
3932
    def _probe(self):
 
3933
        """Implement this method in concrete features.
 
3934
 
 
3935
        :return: True if the feature is available.
 
3936
        """
 
3937
        raise NotImplementedError
 
3938
 
 
3939
    def __str__(self):
 
3940
        if getattr(self, 'feature_name', None):
 
3941
            return self.feature_name()
 
3942
        return self.__class__.__name__
 
3943
 
 
3944
 
 
3945
class _SymlinkFeature(Feature):
 
3946
 
 
3947
    def _probe(self):
 
3948
        return osutils.has_symlinks()
 
3949
 
 
3950
    def feature_name(self):
 
3951
        return 'symlinks'
 
3952
 
 
3953
SymlinkFeature = _SymlinkFeature()
 
3954
 
 
3955
 
 
3956
class _HardlinkFeature(Feature):
 
3957
 
 
3958
    def _probe(self):
 
3959
        return osutils.has_hardlinks()
 
3960
 
 
3961
    def feature_name(self):
 
3962
        return 'hardlinks'
 
3963
 
 
3964
HardlinkFeature = _HardlinkFeature()
 
3965
 
 
3966
 
 
3967
class _OsFifoFeature(Feature):
 
3968
 
 
3969
    def _probe(self):
 
3970
        return getattr(os, 'mkfifo', None)
 
3971
 
 
3972
    def feature_name(self):
 
3973
        return 'filesystem fifos'
 
3974
 
 
3975
OsFifoFeature = _OsFifoFeature()
 
3976
 
 
3977
 
 
3978
class _UnicodeFilenameFeature(Feature):
 
3979
    """Does the filesystem support Unicode filenames?"""
 
3980
 
 
3981
    def _probe(self):
 
3982
        try:
 
3983
            # Check for character combinations unlikely to be covered by any
 
3984
            # single non-unicode encoding. We use the characters
 
3985
            # - greek small letter alpha (U+03B1) and
 
3986
            # - braille pattern dots-123456 (U+283F).
 
3987
            os.stat(u'\u03b1\u283f')
 
3988
        except UnicodeEncodeError:
 
3989
            return False
 
3990
        except (IOError, OSError):
 
3991
            # The filesystem allows the Unicode filename but the file doesn't
 
3992
            # exist.
 
3993
            return True
 
3994
        else:
 
3995
            # The filesystem allows the Unicode filename and the file exists,
 
3996
            # for some reason.
 
3997
            return True
 
3998
 
 
3999
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4000
 
 
4001
 
 
4002
def probe_unicode_in_user_encoding():
 
4003
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4004
    Return first successfull match.
 
4005
 
 
4006
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4007
    """
 
4008
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4009
    for uni_val in possible_vals:
 
4010
        try:
 
4011
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4012
        except UnicodeEncodeError:
 
4013
            # Try a different character
 
4014
            pass
 
4015
        else:
 
4016
            return uni_val, str_val
 
4017
    return None, None
 
4018
 
 
4019
 
 
4020
def probe_bad_non_ascii(encoding):
 
4021
    """Try to find [bad] character with code [128..255]
 
4022
    that cannot be decoded to unicode in some encoding.
 
4023
    Return None if all non-ascii characters is valid
 
4024
    for given encoding.
 
4025
    """
 
4026
    for i in xrange(128, 256):
 
4027
        char = chr(i)
 
4028
        try:
 
4029
            char.decode(encoding)
 
4030
        except UnicodeDecodeError:
 
4031
            return char
 
4032
    return None
 
4033
 
 
4034
 
 
4035
class _HTTPSServerFeature(Feature):
 
4036
    """Some tests want an https Server, check if one is available.
 
4037
 
 
4038
    Right now, the only way this is available is under python2.6 which provides
 
4039
    an ssl module.
 
4040
    """
 
4041
 
 
4042
    def _probe(self):
 
4043
        try:
 
4044
            import ssl
 
4045
            return True
 
4046
        except ImportError:
 
4047
            return False
 
4048
 
 
4049
    def feature_name(self):
 
4050
        return 'HTTPSServer'
 
4051
 
 
4052
 
 
4053
HTTPSServerFeature = _HTTPSServerFeature()
 
4054
 
 
4055
 
 
4056
class _UnicodeFilename(Feature):
 
4057
    """Does the filesystem support Unicode filenames?"""
 
4058
 
 
4059
    def _probe(self):
 
4060
        try:
 
4061
            os.stat(u'\u03b1')
 
4062
        except UnicodeEncodeError:
 
4063
            return False
 
4064
        except (IOError, OSError):
 
4065
            # The filesystem allows the Unicode filename but the file doesn't
 
4066
            # exist.
 
4067
            return True
 
4068
        else:
 
4069
            # The filesystem allows the Unicode filename and the file exists,
 
4070
            # for some reason.
 
4071
            return True
 
4072
 
 
4073
UnicodeFilename = _UnicodeFilename()
 
4074
 
 
4075
 
 
4076
class _UTF8Filesystem(Feature):
 
4077
    """Is the filesystem UTF-8?"""
 
4078
 
 
4079
    def _probe(self):
 
4080
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4081
            return True
 
4082
        return False
 
4083
 
 
4084
UTF8Filesystem = _UTF8Filesystem()
 
4085
 
 
4086
 
 
4087
class _CaseInsCasePresFilenameFeature(Feature):
 
4088
    """Is the file-system case insensitive, but case-preserving?"""
 
4089
 
 
4090
    def _probe(self):
 
4091
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4092
        try:
 
4093
            # first check truly case-preserving for created files, then check
 
4094
            # case insensitive when opening existing files.
 
4095
            name = osutils.normpath(name)
 
4096
            base, rel = osutils.split(name)
 
4097
            found_rel = osutils.canonical_relpath(base, name)
 
4098
            return (found_rel == rel
 
4099
                    and os.path.isfile(name.upper())
 
4100
                    and os.path.isfile(name.lower()))
 
4101
        finally:
 
4102
            os.close(fileno)
 
4103
            os.remove(name)
 
4104
 
 
4105
    def feature_name(self):
 
4106
        return "case-insensitive case-preserving filesystem"
 
4107
 
 
4108
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4109
 
 
4110
 
 
4111
class _CaseInsensitiveFilesystemFeature(Feature):
 
4112
    """Check if underlying filesystem is case-insensitive but *not* case
 
4113
    preserving.
 
4114
    """
 
4115
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4116
    # more likely to be case preserving, so this case is rare.
 
4117
 
 
4118
    def _probe(self):
 
4119
        if CaseInsCasePresFilenameFeature.available():
 
4120
            return False
 
4121
 
 
4122
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4123
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4124
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4125
        else:
 
4126
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4127
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4128
            dir=root)
 
4129
        name_a = osutils.pathjoin(tdir, 'a')
 
4130
        name_A = osutils.pathjoin(tdir, 'A')
 
4131
        os.mkdir(name_a)
 
4132
        result = osutils.isdir(name_A)
 
4133
        _rmtree_temp_dir(tdir)
 
4134
        return result
 
4135
 
 
4136
    def feature_name(self):
 
4137
        return 'case-insensitive filesystem'
 
4138
 
 
4139
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4140
 
 
4141
 
 
4142
class _SubUnitFeature(Feature):
 
4143
    """Check if subunit is available."""
 
4144
 
 
4145
    def _probe(self):
 
4146
        try:
 
4147
            import subunit
 
4148
            return True
 
4149
        except ImportError:
 
4150
            return False
 
4151
 
 
4152
    def feature_name(self):
 
4153
        return 'subunit'
 
4154
 
 
4155
SubUnitFeature = _SubUnitFeature()
 
4156
# Only define SubUnitBzrRunner if subunit is available.
 
4157
try:
 
4158
    from subunit import TestProtocolClient
 
4159
    try:
 
4160
        from subunit.test_results import AutoTimingTestResultDecorator
 
4161
    except ImportError:
 
4162
        AutoTimingTestResultDecorator = lambda x:x
 
4163
    class SubUnitBzrRunner(TextTestRunner):
 
4164
        def run(self, test):
 
4165
            result = AutoTimingTestResultDecorator(
 
4166
                TestProtocolClient(self.stream))
 
4167
            test.run(result)
 
4168
            return result
 
4169
except ImportError:
 
4170
    pass