/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Robert Collins
  • Date: 2009-08-26 22:47:18 UTC
  • mto: This revision was merged to the branch mainline in revision 4659.
  • Revision ID: robertc@robertcollins.net-20090826224718-xjqbmhs14m7lmnjg
Fix interface skew between bzr selftest and python unittest - use stopTestRun not done to end test runs.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from copy import copy
 
32
from cStringIO import StringIO
 
33
import difflib
 
34
import doctest
 
35
import errno
 
36
import logging
 
37
import math
 
38
import os
 
39
from pprint import pformat
 
40
import random
 
41
import re
 
42
import shlex
 
43
import stat
 
44
from subprocess import Popen, PIPE, STDOUT
 
45
import sys
 
46
import tempfile
 
47
import threading
 
48
import time
 
49
import unittest
 
50
import warnings
 
51
 
 
52
 
 
53
from bzrlib import (
 
54
    branchbuilder,
 
55
    bzrdir,
 
56
    debug,
 
57
    errors,
 
58
    hooks,
 
59
    lock as _mod_lock,
 
60
    memorytree,
 
61
    osutils,
 
62
    progress,
 
63
    ui,
 
64
    urlutils,
 
65
    registry,
 
66
    workingtree,
 
67
    )
 
68
import bzrlib.branch
 
69
import bzrlib.commands
 
70
import bzrlib.timestamp
 
71
import bzrlib.export
 
72
import bzrlib.inventory
 
73
import bzrlib.iterablefile
 
74
import bzrlib.lockdir
 
75
try:
 
76
    import bzrlib.lsprof
 
77
except ImportError:
 
78
    # lsprof not available
 
79
    pass
 
80
from bzrlib.merge import merge_inner
 
81
import bzrlib.merge3
 
82
import bzrlib.plugin
 
83
from bzrlib.smart import client, request, server
 
84
import bzrlib.store
 
85
from bzrlib import symbol_versioning
 
86
from bzrlib.symbol_versioning import (
 
87
    DEPRECATED_PARAMETER,
 
88
    deprecated_function,
 
89
    deprecated_method,
 
90
    deprecated_passed,
 
91
    )
 
92
import bzrlib.trace
 
93
from bzrlib.transport import get_transport
 
94
import bzrlib.transport
 
95
from bzrlib.transport.local import LocalURLServer
 
96
from bzrlib.transport.memory import MemoryServer
 
97
from bzrlib.transport.readonly import ReadonlyServer
 
98
from bzrlib.trace import mutter, note
 
99
from bzrlib.tests import TestUtil
 
100
from bzrlib.tests.http_server import HttpServer
 
101
from bzrlib.tests.TestUtil import (
 
102
                          TestSuite,
 
103
                          TestLoader,
 
104
                          )
 
105
from bzrlib.tests.treeshape import build_tree_contents
 
106
from bzrlib.ui import NullProgressView
 
107
from bzrlib.ui.text import TextUIFactory
 
108
import bzrlib.version_info_formats.format_custom
 
109
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
110
 
 
111
# Mark this python module as being part of the implementation
 
112
# of unittest: this gives us better tracebacks where the last
 
113
# shown frame is the test code, not our assertXYZ.
 
114
__unittest = 1
 
115
 
 
116
default_transport = LocalURLServer
 
117
 
 
118
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
119
SUBUNIT_SEEK_SET = 0
 
120
SUBUNIT_SEEK_CUR = 1
 
121
 
 
122
 
 
123
class ExtendedTestResult(unittest._TextTestResult):
 
124
    """Accepts, reports and accumulates the results of running tests.
 
125
 
 
126
    Compared to the unittest version this class adds support for
 
127
    profiling, benchmarking, stopping as soon as a test fails,  and
 
128
    skipping tests.  There are further-specialized subclasses for
 
129
    different types of display.
 
130
 
 
131
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
132
    addFailure or addError classes.  These in turn may redirect to a more
 
133
    specific case for the special test results supported by our extended
 
134
    tests.
 
135
 
 
136
    Note that just one of these objects is fed the results from many tests.
 
137
    """
 
138
 
 
139
    stop_early = False
 
140
 
 
141
    def __init__(self, stream, descriptions, verbosity,
 
142
                 bench_history=None,
 
143
                 strict=False,
 
144
                 ):
 
145
        """Construct new TestResult.
 
146
 
 
147
        :param bench_history: Optionally, a writable file object to accumulate
 
148
            benchmark results.
 
149
        """
 
150
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
151
        if bench_history is not None:
 
152
            from bzrlib.version import _get_bzr_source_tree
 
153
            src_tree = _get_bzr_source_tree()
 
154
            if src_tree:
 
155
                try:
 
156
                    revision_id = src_tree.get_parent_ids()[0]
 
157
                except IndexError:
 
158
                    # XXX: if this is a brand new tree, do the same as if there
 
159
                    # is no branch.
 
160
                    revision_id = ''
 
161
            else:
 
162
                # XXX: If there's no branch, what should we do?
 
163
                revision_id = ''
 
164
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
165
        self._bench_history = bench_history
 
166
        self.ui = ui.ui_factory
 
167
        self.num_tests = 0
 
168
        self.error_count = 0
 
169
        self.failure_count = 0
 
170
        self.known_failure_count = 0
 
171
        self.skip_count = 0
 
172
        self.not_applicable_count = 0
 
173
        self.unsupported = {}
 
174
        self.count = 0
 
175
        self._overall_start_time = time.time()
 
176
        self._strict = strict
 
177
 
 
178
    def stopTestRun(self):
 
179
        if self._strict:
 
180
            ok = self.wasStrictlySuccessful()
 
181
        else:
 
182
            ok = self.wasSuccessful()
 
183
        if ok:
 
184
            self.stream.write('tests passed\n')
 
185
        else:
 
186
            self.stream.write('tests failed\n')
 
187
        if TestCase._first_thread_leaker_id:
 
188
            self.stream.write(
 
189
                '%s is leaking threads among %d leaking tests.\n' % (
 
190
                TestCase._first_thread_leaker_id,
 
191
                TestCase._leaking_threads_tests))
 
192
 
 
193
    def _extractBenchmarkTime(self, testCase):
 
194
        """Add a benchmark time for the current test case."""
 
195
        return getattr(testCase, "_benchtime", None)
 
196
 
 
197
    def _elapsedTestTimeString(self):
 
198
        """Return a time string for the overall time the current test has taken."""
 
199
        return self._formatTime(time.time() - self._start_time)
 
200
 
 
201
    def _testTimeString(self, testCase):
 
202
        benchmark_time = self._extractBenchmarkTime(testCase)
 
203
        if benchmark_time is not None:
 
204
            return self._formatTime(benchmark_time) + "*"
 
205
        else:
 
206
            return self._elapsedTestTimeString()
 
207
 
 
208
    def _formatTime(self, seconds):
 
209
        """Format seconds as milliseconds with leading spaces."""
 
210
        # some benchmarks can take thousands of seconds to run, so we need 8
 
211
        # places
 
212
        return "%8dms" % (1000 * seconds)
 
213
 
 
214
    def _shortened_test_description(self, test):
 
215
        what = test.id()
 
216
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
217
        return what
 
218
 
 
219
    def startTest(self, test):
 
220
        unittest.TestResult.startTest(self, test)
 
221
        if self.count == 0:
 
222
            self.startTests()
 
223
        self.report_test_start(test)
 
224
        test.number = self.count
 
225
        self._recordTestStartTime()
 
226
 
 
227
    def startTests(self):
 
228
        import platform
 
229
        if getattr(sys, 'frozen', None) is None:
 
230
            bzr_path = osutils.realpath(sys.argv[0])
 
231
        else:
 
232
            bzr_path = sys.executable
 
233
        self.stream.write(
 
234
            'testing: %s\n' % (bzr_path,))
 
235
        self.stream.write(
 
236
            '   %s\n' % (
 
237
                    bzrlib.__path__[0],))
 
238
        self.stream.write(
 
239
            '   bzr-%s python-%s %s\n' % (
 
240
                    bzrlib.version_string,
 
241
                    bzrlib._format_version_tuple(sys.version_info),
 
242
                    platform.platform(aliased=1),
 
243
                    ))
 
244
        self.stream.write('\n')
 
245
 
 
246
    def _recordTestStartTime(self):
 
247
        """Record that a test has started."""
 
248
        self._start_time = time.time()
 
249
 
 
250
    def _cleanupLogFile(self, test):
 
251
        # We can only do this if we have one of our TestCases, not if
 
252
        # we have a doctest.
 
253
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
254
        if setKeepLogfile is not None:
 
255
            setKeepLogfile()
 
256
 
 
257
    def addError(self, test, err):
 
258
        """Tell result that test finished with an error.
 
259
 
 
260
        Called from the TestCase run() method when the test
 
261
        fails with an unexpected error.
 
262
        """
 
263
        self._testConcluded(test)
 
264
        if isinstance(err[1], TestNotApplicable):
 
265
            return self._addNotApplicable(test, err)
 
266
        elif isinstance(err[1], UnavailableFeature):
 
267
            return self.addNotSupported(test, err[1].args[0])
 
268
        else:
 
269
            unittest.TestResult.addError(self, test, err)
 
270
            self.error_count += 1
 
271
            self.report_error(test, err)
 
272
            if self.stop_early:
 
273
                self.stop()
 
274
            self._cleanupLogFile(test)
 
275
 
 
276
    def addFailure(self, test, err):
 
277
        """Tell result that test failed.
 
278
 
 
279
        Called from the TestCase run() method when the test
 
280
        fails because e.g. an assert() method failed.
 
281
        """
 
282
        self._testConcluded(test)
 
283
        if isinstance(err[1], KnownFailure):
 
284
            return self._addKnownFailure(test, err)
 
285
        else:
 
286
            unittest.TestResult.addFailure(self, test, err)
 
287
            self.failure_count += 1
 
288
            self.report_failure(test, err)
 
289
            if self.stop_early:
 
290
                self.stop()
 
291
            self._cleanupLogFile(test)
 
292
 
 
293
    def addSuccess(self, test):
 
294
        """Tell result that test completed successfully.
 
295
 
 
296
        Called from the TestCase run()
 
297
        """
 
298
        self._testConcluded(test)
 
299
        if self._bench_history is not None:
 
300
            benchmark_time = self._extractBenchmarkTime(test)
 
301
            if benchmark_time is not None:
 
302
                self._bench_history.write("%s %s\n" % (
 
303
                    self._formatTime(benchmark_time),
 
304
                    test.id()))
 
305
        self.report_success(test)
 
306
        self._cleanupLogFile(test)
 
307
        unittest.TestResult.addSuccess(self, test)
 
308
        test._log_contents = ''
 
309
 
 
310
    def _testConcluded(self, test):
 
311
        """Common code when a test has finished.
 
312
 
 
313
        Called regardless of whether it succeded, failed, etc.
 
314
        """
 
315
        pass
 
316
 
 
317
    def _addKnownFailure(self, test, err):
 
318
        self.known_failure_count += 1
 
319
        self.report_known_failure(test, err)
 
320
 
 
321
    def addNotSupported(self, test, feature):
 
322
        """The test will not be run because of a missing feature.
 
323
        """
 
324
        # this can be called in two different ways: it may be that the
 
325
        # test started running, and then raised (through addError)
 
326
        # UnavailableFeature.  Alternatively this method can be called
 
327
        # while probing for features before running the tests; in that
 
328
        # case we will see startTest and stopTest, but the test will never
 
329
        # actually run.
 
330
        self.unsupported.setdefault(str(feature), 0)
 
331
        self.unsupported[str(feature)] += 1
 
332
        self.report_unsupported(test, feature)
 
333
 
 
334
    def addSkip(self, test, reason):
 
335
        """A test has not run for 'reason'."""
 
336
        self.skip_count += 1
 
337
        self.report_skip(test, reason)
 
338
 
 
339
    def _addNotApplicable(self, test, skip_excinfo):
 
340
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
341
            self.not_applicable_count += 1
 
342
            self.report_not_applicable(test, skip_excinfo)
 
343
        try:
 
344
            test.tearDown()
 
345
        except KeyboardInterrupt:
 
346
            raise
 
347
        except:
 
348
            self.addError(test, test.exc_info())
 
349
        else:
 
350
            # seems best to treat this as success from point-of-view of unittest
 
351
            # -- it actually does nothing so it barely matters :)
 
352
            unittest.TestResult.addSuccess(self, test)
 
353
            test._log_contents = ''
 
354
 
 
355
    def printErrorList(self, flavour, errors):
 
356
        for test, err in errors:
 
357
            self.stream.writeln(self.separator1)
 
358
            self.stream.write("%s: " % flavour)
 
359
            self.stream.writeln(self.getDescription(test))
 
360
            if getattr(test, '_get_log', None) is not None:
 
361
                log_contents = test._get_log()
 
362
                if log_contents:
 
363
                    self.stream.write('\n')
 
364
                    self.stream.write(
 
365
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
366
                    self.stream.write('\n')
 
367
                    self.stream.write(log_contents)
 
368
                    self.stream.write('\n')
 
369
                    self.stream.write(
 
370
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
371
                    self.stream.write('\n')
 
372
            self.stream.writeln(self.separator2)
 
373
            self.stream.writeln("%s" % err)
 
374
 
 
375
    def progress(self, offset, whence):
 
376
        """The test is adjusting the count of tests to run."""
 
377
        if whence == SUBUNIT_SEEK_SET:
 
378
            self.num_tests = offset
 
379
        elif whence == SUBUNIT_SEEK_CUR:
 
380
            self.num_tests += offset
 
381
        else:
 
382
            raise errors.BzrError("Unknown whence %r" % whence)
 
383
 
 
384
    def finished(self):
 
385
        pass
 
386
 
 
387
    def report_cleaning_up(self):
 
388
        pass
 
389
 
 
390
    def report_success(self, test):
 
391
        pass
 
392
 
 
393
    def wasStrictlySuccessful(self):
 
394
        if self.unsupported or self.known_failure_count:
 
395
            return False
 
396
        return self.wasSuccessful()
 
397
 
 
398
 
 
399
class TextTestResult(ExtendedTestResult):
 
400
    """Displays progress and results of tests in text form"""
 
401
 
 
402
    def __init__(self, stream, descriptions, verbosity,
 
403
                 bench_history=None,
 
404
                 pb=None,
 
405
                 strict=None,
 
406
                 ):
 
407
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
408
            bench_history, strict)
 
409
        # We no longer pass them around, but just rely on the UIFactory stack
 
410
        # for state
 
411
        if pb is not None:
 
412
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
413
        self.pb = self.ui.nested_progress_bar()
 
414
        self.pb.show_pct = False
 
415
        self.pb.show_spinner = False
 
416
        self.pb.show_eta = False,
 
417
        self.pb.show_count = False
 
418
        self.pb.show_bar = False
 
419
        self.pb.update_latency = 0
 
420
        self.pb.show_transport_activity = False
 
421
 
 
422
    def stopTestRun(self):
 
423
        # called when the tests that are going to run have run
 
424
        self.pb.clear()
 
425
        super(TextTestResult, self).stopTestRun()
 
426
 
 
427
    def finished(self):
 
428
        self.pb.finished()
 
429
 
 
430
    def report_starting(self):
 
431
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
432
 
 
433
    def printErrors(self):
 
434
        # clear the pb to make room for the error listing
 
435
        self.pb.clear()
 
436
        super(TextTestResult, self).printErrors()
 
437
 
 
438
    def _progress_prefix_text(self):
 
439
        # the longer this text, the less space we have to show the test
 
440
        # name...
 
441
        a = '[%d' % self.count              # total that have been run
 
442
        # tests skipped as known not to be relevant are not important enough
 
443
        # to show here
 
444
        ## if self.skip_count:
 
445
        ##     a += ', %d skip' % self.skip_count
 
446
        ## if self.known_failure_count:
 
447
        ##     a += '+%dX' % self.known_failure_count
 
448
        if self.num_tests:
 
449
            a +='/%d' % self.num_tests
 
450
        a += ' in '
 
451
        runtime = time.time() - self._overall_start_time
 
452
        if runtime >= 60:
 
453
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
454
        else:
 
455
            a += '%ds' % runtime
 
456
        if self.error_count:
 
457
            a += ', %d err' % self.error_count
 
458
        if self.failure_count:
 
459
            a += ', %d fail' % self.failure_count
 
460
        if self.unsupported:
 
461
            a += ', %d missing' % len(self.unsupported)
 
462
        a += ']'
 
463
        return a
 
464
 
 
465
    def report_test_start(self, test):
 
466
        self.count += 1
 
467
        self.pb.update(
 
468
                self._progress_prefix_text()
 
469
                + ' '
 
470
                + self._shortened_test_description(test))
 
471
 
 
472
    def _test_description(self, test):
 
473
        return self._shortened_test_description(test)
 
474
 
 
475
    def report_error(self, test, err):
 
476
        self.pb.note('ERROR: %s\n    %s\n',
 
477
            self._test_description(test),
 
478
            err[1],
 
479
            )
 
480
 
 
481
    def report_failure(self, test, err):
 
482
        self.pb.note('FAIL: %s\n    %s\n',
 
483
            self._test_description(test),
 
484
            err[1],
 
485
            )
 
486
 
 
487
    def report_known_failure(self, test, err):
 
488
        self.pb.note('XFAIL: %s\n%s\n',
 
489
            self._test_description(test), err[1])
 
490
 
 
491
    def report_skip(self, test, reason):
 
492
        pass
 
493
 
 
494
    def report_not_applicable(self, test, skip_excinfo):
 
495
        pass
 
496
 
 
497
    def report_unsupported(self, test, feature):
 
498
        """test cannot be run because feature is missing."""
 
499
 
 
500
    def report_cleaning_up(self):
 
501
        self.pb.update('Cleaning up')
 
502
 
 
503
 
 
504
class VerboseTestResult(ExtendedTestResult):
 
505
    """Produce long output, with one line per test run plus times"""
 
506
 
 
507
    def _ellipsize_to_right(self, a_string, final_width):
 
508
        """Truncate and pad a string, keeping the right hand side"""
 
509
        if len(a_string) > final_width:
 
510
            result = '...' + a_string[3-final_width:]
 
511
        else:
 
512
            result = a_string
 
513
        return result.ljust(final_width)
 
514
 
 
515
    def report_starting(self):
 
516
        self.stream.write('running %d tests...\n' % self.num_tests)
 
517
 
 
518
    def report_test_start(self, test):
 
519
        self.count += 1
 
520
        name = self._shortened_test_description(test)
 
521
        # width needs space for 6 char status, plus 1 for slash, plus an
 
522
        # 11-char time string, plus a trailing blank
 
523
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
524
        self.stream.write(self._ellipsize_to_right(name,
 
525
                          osutils.terminal_width()-18))
 
526
        self.stream.flush()
 
527
 
 
528
    def _error_summary(self, err):
 
529
        indent = ' ' * 4
 
530
        return '%s%s' % (indent, err[1])
 
531
 
 
532
    def report_error(self, test, err):
 
533
        self.stream.writeln('ERROR %s\n%s'
 
534
                % (self._testTimeString(test),
 
535
                   self._error_summary(err)))
 
536
 
 
537
    def report_failure(self, test, err):
 
538
        self.stream.writeln(' FAIL %s\n%s'
 
539
                % (self._testTimeString(test),
 
540
                   self._error_summary(err)))
 
541
 
 
542
    def report_known_failure(self, test, err):
 
543
        self.stream.writeln('XFAIL %s\n%s'
 
544
                % (self._testTimeString(test),
 
545
                   self._error_summary(err)))
 
546
 
 
547
    def report_success(self, test):
 
548
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
549
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
550
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
551
            stats.pprint(file=self.stream)
 
552
        # flush the stream so that we get smooth output. This verbose mode is
 
553
        # used to show the output in PQM.
 
554
        self.stream.flush()
 
555
 
 
556
    def report_skip(self, test, reason):
 
557
        self.stream.writeln(' SKIP %s\n%s'
 
558
                % (self._testTimeString(test), reason))
 
559
 
 
560
    def report_not_applicable(self, test, skip_excinfo):
 
561
        self.stream.writeln('  N/A %s\n%s'
 
562
                % (self._testTimeString(test),
 
563
                   self._error_summary(skip_excinfo)))
 
564
 
 
565
    def report_unsupported(self, test, feature):
 
566
        """test cannot be run because feature is missing."""
 
567
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
568
                %(self._testTimeString(test), feature))
 
569
 
 
570
 
 
571
class TextTestRunner(object):
 
572
    stop_on_failure = False
 
573
 
 
574
    def __init__(self,
 
575
                 stream=sys.stderr,
 
576
                 descriptions=0,
 
577
                 verbosity=1,
 
578
                 bench_history=None,
 
579
                 list_only=False,
 
580
                 strict=False,
 
581
                 result_decorators=None,
 
582
                 ):
 
583
        """Create a TextTestRunner.
 
584
 
 
585
        :param result_decorators: An optional list of decorators to apply
 
586
            to the result object being used by the runner. Decorators are
 
587
            applied left to right - the first element in the list is the 
 
588
            innermost decorator.
 
589
        """
 
590
        self.stream = unittest._WritelnDecorator(stream)
 
591
        self.descriptions = descriptions
 
592
        self.verbosity = verbosity
 
593
        self._bench_history = bench_history
 
594
        self.list_only = list_only
 
595
        self._strict = strict
 
596
        self._result_decorators = result_decorators or []
 
597
 
 
598
    def run(self, test):
 
599
        "Run the given test case or test suite."
 
600
        startTime = time.time()
 
601
        if self.verbosity == 1:
 
602
            result_class = TextTestResult
 
603
        elif self.verbosity >= 2:
 
604
            result_class = VerboseTestResult
 
605
        result = result_class(self.stream,
 
606
                              self.descriptions,
 
607
                              self.verbosity,
 
608
                              bench_history=self._bench_history,
 
609
                              strict=self._strict,
 
610
                              )
 
611
        run_result = result
 
612
        for decorator in self._result_decorators:
 
613
            run_result = decorator(run_result)
 
614
        result.stop_early = self.stop_on_failure
 
615
        result.report_starting()
 
616
        if self.list_only:
 
617
            if self.verbosity >= 2:
 
618
                self.stream.writeln("Listing tests only ...\n")
 
619
            run = 0
 
620
            for t in iter_suite_tests(test):
 
621
                self.stream.writeln("%s" % (t.id()))
 
622
                run += 1
 
623
            return None
 
624
        else:
 
625
            try:
 
626
                import testtools
 
627
            except ImportError:
 
628
                test.run(run_result)
 
629
            else:
 
630
                if isinstance(test, testtools.ConcurrentTestSuite):
 
631
                    # We need to catch bzr specific behaviors
 
632
                    test.run(BZRTransformingResult(run_result))
 
633
                else:
 
634
                    test.run(run_result)
 
635
            run = result.testsRun
 
636
            actionTaken = "Ran"
 
637
        stopTime = time.time()
 
638
        timeTaken = stopTime - startTime
 
639
        result.printErrors()
 
640
        self.stream.writeln(result.separator2)
 
641
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
642
                            run, run != 1 and "s" or "", timeTaken))
 
643
        self.stream.writeln()
 
644
        if not result.wasSuccessful():
 
645
            self.stream.write("FAILED (")
 
646
            failed, errored = map(len, (result.failures, result.errors))
 
647
            if failed:
 
648
                self.stream.write("failures=%d" % failed)
 
649
            if errored:
 
650
                if failed: self.stream.write(", ")
 
651
                self.stream.write("errors=%d" % errored)
 
652
            if result.known_failure_count:
 
653
                if failed or errored: self.stream.write(", ")
 
654
                self.stream.write("known_failure_count=%d" %
 
655
                    result.known_failure_count)
 
656
            self.stream.writeln(")")
 
657
        else:
 
658
            if result.known_failure_count:
 
659
                self.stream.writeln("OK (known_failures=%d)" %
 
660
                    result.known_failure_count)
 
661
            else:
 
662
                self.stream.writeln("OK")
 
663
        if result.skip_count > 0:
 
664
            skipped = result.skip_count
 
665
            self.stream.writeln('%d test%s skipped' %
 
666
                                (skipped, skipped != 1 and "s" or ""))
 
667
        if result.unsupported:
 
668
            for feature, count in sorted(result.unsupported.items()):
 
669
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
670
                    (feature, count))
 
671
        result.finished()
 
672
        return result
 
673
 
 
674
 
 
675
def iter_suite_tests(suite):
 
676
    """Return all tests in a suite, recursing through nested suites"""
 
677
    if isinstance(suite, unittest.TestCase):
 
678
        yield suite
 
679
    elif isinstance(suite, unittest.TestSuite):
 
680
        for item in suite:
 
681
            for r in iter_suite_tests(item):
 
682
                yield r
 
683
    else:
 
684
        raise Exception('unknown type %r for object %r'
 
685
                        % (type(suite), suite))
 
686
 
 
687
 
 
688
class TestSkipped(Exception):
 
689
    """Indicates that a test was intentionally skipped, rather than failing."""
 
690
 
 
691
 
 
692
class TestNotApplicable(TestSkipped):
 
693
    """A test is not applicable to the situation where it was run.
 
694
 
 
695
    This is only normally raised by parameterized tests, if they find that
 
696
    the instance they're constructed upon does not support one aspect
 
697
    of its interface.
 
698
    """
 
699
 
 
700
 
 
701
class KnownFailure(AssertionError):
 
702
    """Indicates that a test failed in a precisely expected manner.
 
703
 
 
704
    Such failures dont block the whole test suite from passing because they are
 
705
    indicators of partially completed code or of future work. We have an
 
706
    explicit error for them so that we can ensure that they are always visible:
 
707
    KnownFailures are always shown in the output of bzr selftest.
 
708
    """
 
709
 
 
710
 
 
711
class UnavailableFeature(Exception):
 
712
    """A feature required for this test was not available.
 
713
 
 
714
    The feature should be used to construct the exception.
 
715
    """
 
716
 
 
717
 
 
718
class CommandFailed(Exception):
 
719
    pass
 
720
 
 
721
 
 
722
class StringIOWrapper(object):
 
723
    """A wrapper around cStringIO which just adds an encoding attribute.
 
724
 
 
725
    Internally we can check sys.stdout to see what the output encoding
 
726
    should be. However, cStringIO has no encoding attribute that we can
 
727
    set. So we wrap it instead.
 
728
    """
 
729
    encoding='ascii'
 
730
    _cstring = None
 
731
 
 
732
    def __init__(self, s=None):
 
733
        if s is not None:
 
734
            self.__dict__['_cstring'] = StringIO(s)
 
735
        else:
 
736
            self.__dict__['_cstring'] = StringIO()
 
737
 
 
738
    def __getattr__(self, name, getattr=getattr):
 
739
        return getattr(self.__dict__['_cstring'], name)
 
740
 
 
741
    def __setattr__(self, name, val):
 
742
        if name == 'encoding':
 
743
            self.__dict__['encoding'] = val
 
744
        else:
 
745
            return setattr(self._cstring, name, val)
 
746
 
 
747
 
 
748
class TestUIFactory(TextUIFactory):
 
749
    """A UI Factory for testing.
 
750
 
 
751
    Hide the progress bar but emit note()s.
 
752
    Redirect stdin.
 
753
    Allows get_password to be tested without real tty attached.
 
754
 
 
755
    See also CannedInputUIFactory which lets you provide programmatic input in
 
756
    a structured way.
 
757
    """
 
758
    # TODO: Capture progress events at the model level and allow them to be
 
759
    # observed by tests that care.
 
760
    #
 
761
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
762
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
763
    # idea of how they're supposed to be different.
 
764
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
765
 
 
766
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
767
        if stdin is not None:
 
768
            # We use a StringIOWrapper to be able to test various
 
769
            # encodings, but the user is still responsible to
 
770
            # encode the string and to set the encoding attribute
 
771
            # of StringIOWrapper.
 
772
            stdin = StringIOWrapper(stdin)
 
773
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
774
 
 
775
    def get_non_echoed_password(self):
 
776
        """Get password from stdin without trying to handle the echo mode"""
 
777
        password = self.stdin.readline()
 
778
        if not password:
 
779
            raise EOFError
 
780
        if password[-1] == '\n':
 
781
            password = password[:-1]
 
782
        return password
 
783
 
 
784
    def make_progress_view(self):
 
785
        return NullProgressView()
 
786
 
 
787
 
 
788
class TestCase(unittest.TestCase):
 
789
    """Base class for bzr unit tests.
 
790
 
 
791
    Tests that need access to disk resources should subclass
 
792
    TestCaseInTempDir not TestCase.
 
793
 
 
794
    Error and debug log messages are redirected from their usual
 
795
    location into a temporary file, the contents of which can be
 
796
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
797
    so that it can also capture file IO.  When the test completes this file
 
798
    is read into memory and removed from disk.
 
799
 
 
800
    There are also convenience functions to invoke bzr's command-line
 
801
    routine, and to build and check bzr trees.
 
802
 
 
803
    In addition to the usual method of overriding tearDown(), this class also
 
804
    allows subclasses to register functions into the _cleanups list, which is
 
805
    run in order as the object is torn down.  It's less likely this will be
 
806
    accidentally overlooked.
 
807
    """
 
808
 
 
809
    _active_threads = None
 
810
    _leaking_threads_tests = 0
 
811
    _first_thread_leaker_id = None
 
812
    _log_file_name = None
 
813
    _log_contents = ''
 
814
    _keep_log_file = False
 
815
    # record lsprof data when performing benchmark calls.
 
816
    _gather_lsprof_in_benchmarks = False
 
817
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
818
                     '_log_contents', '_log_file_name', '_benchtime',
 
819
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
820
 
 
821
    def __init__(self, methodName='testMethod'):
 
822
        super(TestCase, self).__init__(methodName)
 
823
        self._cleanups = []
 
824
        self._bzr_test_setUp_run = False
 
825
        self._bzr_test_tearDown_run = False
 
826
 
 
827
    def setUp(self):
 
828
        unittest.TestCase.setUp(self)
 
829
        self._bzr_test_setUp_run = True
 
830
        self._cleanEnvironment()
 
831
        self._silenceUI()
 
832
        self._startLogFile()
 
833
        self._benchcalls = []
 
834
        self._benchtime = None
 
835
        self._clear_hooks()
 
836
        self._track_locks()
 
837
        self._clear_debug_flags()
 
838
        TestCase._active_threads = threading.activeCount()
 
839
        self.addCleanup(self._check_leaked_threads)
 
840
 
 
841
    def debug(self):
 
842
        # debug a frame up.
 
843
        import pdb
 
844
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
845
 
 
846
    def _check_leaked_threads(self):
 
847
        active = threading.activeCount()
 
848
        leaked_threads = active - TestCase._active_threads
 
849
        TestCase._active_threads = active
 
850
        if leaked_threads:
 
851
            TestCase._leaking_threads_tests += 1
 
852
            if TestCase._first_thread_leaker_id is None:
 
853
                TestCase._first_thread_leaker_id = self.id()
 
854
 
 
855
    def _clear_debug_flags(self):
 
856
        """Prevent externally set debug flags affecting tests.
 
857
 
 
858
        Tests that want to use debug flags can just set them in the
 
859
        debug_flags set during setup/teardown.
 
860
        """
 
861
        self._preserved_debug_flags = set(debug.debug_flags)
 
862
        if 'allow_debug' not in selftest_debug_flags:
 
863
            debug.debug_flags.clear()
 
864
        if 'disable_lock_checks' not in selftest_debug_flags:
 
865
            debug.debug_flags.add('strict_locks')
 
866
        self.addCleanup(self._restore_debug_flags)
 
867
 
 
868
    def _clear_hooks(self):
 
869
        # prevent hooks affecting tests
 
870
        self._preserved_hooks = {}
 
871
        for key, factory in hooks.known_hooks.items():
 
872
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
873
            current_hooks = hooks.known_hooks_key_to_object(key)
 
874
            self._preserved_hooks[parent] = (name, current_hooks)
 
875
        self.addCleanup(self._restoreHooks)
 
876
        for key, factory in hooks.known_hooks.items():
 
877
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
878
            setattr(parent, name, factory())
 
879
        # this hook should always be installed
 
880
        request._install_hook()
 
881
 
 
882
    def _silenceUI(self):
 
883
        """Turn off UI for duration of test"""
 
884
        # by default the UI is off; tests can turn it on if they want it.
 
885
        saved = ui.ui_factory
 
886
        def _restore():
 
887
            ui.ui_factory = saved
 
888
        ui.ui_factory = ui.SilentUIFactory()
 
889
        self.addCleanup(_restore)
 
890
 
 
891
    def _check_locks(self):
 
892
        """Check that all lock take/release actions have been paired."""
 
893
        # We always check for mismatched locks. If a mismatch is found, we
 
894
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
895
        # case we just print a warning.
 
896
        # unhook:
 
897
        acquired_locks = [lock for action, lock in self._lock_actions
 
898
                          if action == 'acquired']
 
899
        released_locks = [lock for action, lock in self._lock_actions
 
900
                          if action == 'released']
 
901
        broken_locks = [lock for action, lock in self._lock_actions
 
902
                        if action == 'broken']
 
903
        # trivially, given the tests for lock acquistion and release, if we
 
904
        # have as many in each list, it should be ok. Some lock tests also
 
905
        # break some locks on purpose and should be taken into account by
 
906
        # considering that breaking a lock is just a dirty way of releasing it.
 
907
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
908
            message = ('Different number of acquired and '
 
909
                       'released or broken locks. (%s, %s + %s)' %
 
910
                       (acquired_locks, released_locks, broken_locks))
 
911
            if not self._lock_check_thorough:
 
912
                # Rather than fail, just warn
 
913
                print "Broken test %s: %s" % (self, message)
 
914
                return
 
915
            self.fail(message)
 
916
 
 
917
    def _track_locks(self):
 
918
        """Track lock activity during tests."""
 
919
        self._lock_actions = []
 
920
        if 'disable_lock_checks' in selftest_debug_flags:
 
921
            self._lock_check_thorough = False
 
922
        else:
 
923
            self._lock_check_thorough = True
 
924
            
 
925
        self.addCleanup(self._check_locks)
 
926
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
927
                                                self._lock_acquired, None)
 
928
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
929
                                                self._lock_released, None)
 
930
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
931
                                                self._lock_broken, None)
 
932
 
 
933
    def _lock_acquired(self, result):
 
934
        self._lock_actions.append(('acquired', result))
 
935
 
 
936
    def _lock_released(self, result):
 
937
        self._lock_actions.append(('released', result))
 
938
 
 
939
    def _lock_broken(self, result):
 
940
        self._lock_actions.append(('broken', result))
 
941
 
 
942
    def _ndiff_strings(self, a, b):
 
943
        """Return ndiff between two strings containing lines.
 
944
 
 
945
        A trailing newline is added if missing to make the strings
 
946
        print properly."""
 
947
        if b and b[-1] != '\n':
 
948
            b += '\n'
 
949
        if a and a[-1] != '\n':
 
950
            a += '\n'
 
951
        difflines = difflib.ndiff(a.splitlines(True),
 
952
                                  b.splitlines(True),
 
953
                                  linejunk=lambda x: False,
 
954
                                  charjunk=lambda x: False)
 
955
        return ''.join(difflines)
 
956
 
 
957
    def assertEqual(self, a, b, message=''):
 
958
        try:
 
959
            if a == b:
 
960
                return
 
961
        except UnicodeError, e:
 
962
            # If we can't compare without getting a UnicodeError, then
 
963
            # obviously they are different
 
964
            mutter('UnicodeError: %s', e)
 
965
        if message:
 
966
            message += '\n'
 
967
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
968
            % (message,
 
969
               pformat(a), pformat(b)))
 
970
 
 
971
    assertEquals = assertEqual
 
972
 
 
973
    def assertEqualDiff(self, a, b, message=None):
 
974
        """Assert two texts are equal, if not raise an exception.
 
975
 
 
976
        This is intended for use with multi-line strings where it can
 
977
        be hard to find the differences by eye.
 
978
        """
 
979
        # TODO: perhaps override assertEquals to call this for strings?
 
980
        if a == b:
 
981
            return
 
982
        if message is None:
 
983
            message = "texts not equal:\n"
 
984
        if a == b + '\n':
 
985
            message = 'first string is missing a final newline.\n'
 
986
        if a + '\n' == b:
 
987
            message = 'second string is missing a final newline.\n'
 
988
        raise AssertionError(message +
 
989
                             self._ndiff_strings(a, b))
 
990
 
 
991
    def assertEqualMode(self, mode, mode_test):
 
992
        self.assertEqual(mode, mode_test,
 
993
                         'mode mismatch %o != %o' % (mode, mode_test))
 
994
 
 
995
    def assertEqualStat(self, expected, actual):
 
996
        """assert that expected and actual are the same stat result.
 
997
 
 
998
        :param expected: A stat result.
 
999
        :param actual: A stat result.
 
1000
        :raises AssertionError: If the expected and actual stat values differ
 
1001
            other than by atime.
 
1002
        """
 
1003
        self.assertEqual(expected.st_size, actual.st_size)
 
1004
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
1005
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
1006
        self.assertEqual(expected.st_dev, actual.st_dev)
 
1007
        self.assertEqual(expected.st_ino, actual.st_ino)
 
1008
        self.assertEqual(expected.st_mode, actual.st_mode)
 
1009
 
 
1010
    def assertLength(self, length, obj_with_len):
 
1011
        """Assert that obj_with_len is of length length."""
 
1012
        if len(obj_with_len) != length:
 
1013
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1014
                length, len(obj_with_len), obj_with_len))
 
1015
 
 
1016
    def assertPositive(self, val):
 
1017
        """Assert that val is greater than 0."""
 
1018
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1019
 
 
1020
    def assertNegative(self, val):
 
1021
        """Assert that val is less than 0."""
 
1022
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1023
 
 
1024
    def assertStartsWith(self, s, prefix):
 
1025
        if not s.startswith(prefix):
 
1026
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1027
 
 
1028
    def assertEndsWith(self, s, suffix):
 
1029
        """Asserts that s ends with suffix."""
 
1030
        if not s.endswith(suffix):
 
1031
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1032
 
 
1033
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1034
        """Assert that a contains something matching a regular expression."""
 
1035
        if not re.search(needle_re, haystack, flags):
 
1036
            if '\n' in haystack or len(haystack) > 60:
 
1037
                # a long string, format it in a more readable way
 
1038
                raise AssertionError(
 
1039
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1040
                        % (needle_re, haystack))
 
1041
            else:
 
1042
                raise AssertionError('pattern "%s" not found in "%s"'
 
1043
                        % (needle_re, haystack))
 
1044
 
 
1045
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1046
        """Assert that a does not match a regular expression"""
 
1047
        if re.search(needle_re, haystack, flags):
 
1048
            raise AssertionError('pattern "%s" found in "%s"'
 
1049
                    % (needle_re, haystack))
 
1050
 
 
1051
    def assertSubset(self, sublist, superlist):
 
1052
        """Assert that every entry in sublist is present in superlist."""
 
1053
        missing = set(sublist) - set(superlist)
 
1054
        if len(missing) > 0:
 
1055
            raise AssertionError("value(s) %r not present in container %r" %
 
1056
                                 (missing, superlist))
 
1057
 
 
1058
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1059
        """Fail unless excClass is raised when the iterator from func is used.
 
1060
 
 
1061
        Many functions can return generators this makes sure
 
1062
        to wrap them in a list() call to make sure the whole generator
 
1063
        is run, and that the proper exception is raised.
 
1064
        """
 
1065
        try:
 
1066
            list(func(*args, **kwargs))
 
1067
        except excClass, e:
 
1068
            return e
 
1069
        else:
 
1070
            if getattr(excClass,'__name__', None) is not None:
 
1071
                excName = excClass.__name__
 
1072
            else:
 
1073
                excName = str(excClass)
 
1074
            raise self.failureException, "%s not raised" % excName
 
1075
 
 
1076
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1077
        """Assert that a callable raises a particular exception.
 
1078
 
 
1079
        :param excClass: As for the except statement, this may be either an
 
1080
            exception class, or a tuple of classes.
 
1081
        :param callableObj: A callable, will be passed ``*args`` and
 
1082
            ``**kwargs``.
 
1083
 
 
1084
        Returns the exception so that you can examine it.
 
1085
        """
 
1086
        try:
 
1087
            callableObj(*args, **kwargs)
 
1088
        except excClass, e:
 
1089
            return e
 
1090
        else:
 
1091
            if getattr(excClass,'__name__', None) is not None:
 
1092
                excName = excClass.__name__
 
1093
            else:
 
1094
                # probably a tuple
 
1095
                excName = str(excClass)
 
1096
            raise self.failureException, "%s not raised" % excName
 
1097
 
 
1098
    def assertIs(self, left, right, message=None):
 
1099
        if not (left is right):
 
1100
            if message is not None:
 
1101
                raise AssertionError(message)
 
1102
            else:
 
1103
                raise AssertionError("%r is not %r." % (left, right))
 
1104
 
 
1105
    def assertIsNot(self, left, right, message=None):
 
1106
        if (left is right):
 
1107
            if message is not None:
 
1108
                raise AssertionError(message)
 
1109
            else:
 
1110
                raise AssertionError("%r is %r." % (left, right))
 
1111
 
 
1112
    def assertTransportMode(self, transport, path, mode):
 
1113
        """Fail if a path does not have mode "mode".
 
1114
 
 
1115
        If modes are not supported on this transport, the assertion is ignored.
 
1116
        """
 
1117
        if not transport._can_roundtrip_unix_modebits():
 
1118
            return
 
1119
        path_stat = transport.stat(path)
 
1120
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1121
        self.assertEqual(mode, actual_mode,
 
1122
                         'mode of %r incorrect (%s != %s)'
 
1123
                         % (path, oct(mode), oct(actual_mode)))
 
1124
 
 
1125
    def assertIsSameRealPath(self, path1, path2):
 
1126
        """Fail if path1 and path2 points to different files"""
 
1127
        self.assertEqual(osutils.realpath(path1),
 
1128
                         osutils.realpath(path2),
 
1129
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1130
 
 
1131
    def assertIsInstance(self, obj, kls, msg=None):
 
1132
        """Fail if obj is not an instance of kls
 
1133
        
 
1134
        :param msg: Supplementary message to show if the assertion fails.
 
1135
        """
 
1136
        if not isinstance(obj, kls):
 
1137
            m = "%r is an instance of %s rather than %s" % (
 
1138
                obj, obj.__class__, kls)
 
1139
            if msg:
 
1140
                m += ": " + msg
 
1141
            self.fail(m)
 
1142
 
 
1143
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1144
        """Invoke a test, expecting it to fail for the given reason.
 
1145
 
 
1146
        This is for assertions that ought to succeed, but currently fail.
 
1147
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1148
        about the failure you're expecting.  If a new bug is introduced,
 
1149
        AssertionError should be raised, not KnownFailure.
 
1150
 
 
1151
        Frequently, expectFailure should be followed by an opposite assertion.
 
1152
        See example below.
 
1153
 
 
1154
        Intended to be used with a callable that raises AssertionError as the
 
1155
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1156
 
 
1157
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1158
        test succeeds.
 
1159
 
 
1160
        example usage::
 
1161
 
 
1162
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1163
                             dynamic_val)
 
1164
          self.assertEqual(42, dynamic_val)
 
1165
 
 
1166
          This means that a dynamic_val of 54 will cause the test to raise
 
1167
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1168
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1169
          than 54 or 42 will cause an AssertionError.
 
1170
        """
 
1171
        try:
 
1172
            assertion(*args, **kwargs)
 
1173
        except AssertionError:
 
1174
            raise KnownFailure(reason)
 
1175
        else:
 
1176
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1177
 
 
1178
    def assertFileEqual(self, content, path):
 
1179
        """Fail if path does not contain 'content'."""
 
1180
        self.failUnlessExists(path)
 
1181
        f = file(path, 'rb')
 
1182
        try:
 
1183
            s = f.read()
 
1184
        finally:
 
1185
            f.close()
 
1186
        self.assertEqualDiff(content, s)
 
1187
 
 
1188
    def failUnlessExists(self, path):
 
1189
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1190
        if not isinstance(path, basestring):
 
1191
            for p in path:
 
1192
                self.failUnlessExists(p)
 
1193
        else:
 
1194
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1195
 
 
1196
    def failIfExists(self, path):
 
1197
        """Fail if path or paths, which may be abs or relative, exist."""
 
1198
        if not isinstance(path, basestring):
 
1199
            for p in path:
 
1200
                self.failIfExists(p)
 
1201
        else:
 
1202
            self.failIf(osutils.lexists(path),path+" exists")
 
1203
 
 
1204
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1205
        """A helper for callDeprecated and applyDeprecated.
 
1206
 
 
1207
        :param a_callable: A callable to call.
 
1208
        :param args: The positional arguments for the callable
 
1209
        :param kwargs: The keyword arguments for the callable
 
1210
        :return: A tuple (warnings, result). result is the result of calling
 
1211
            a_callable(``*args``, ``**kwargs``).
 
1212
        """
 
1213
        local_warnings = []
 
1214
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1215
            # we've hooked into a deprecation specific callpath,
 
1216
            # only deprecations should getting sent via it.
 
1217
            self.assertEqual(cls, DeprecationWarning)
 
1218
            local_warnings.append(msg)
 
1219
        original_warning_method = symbol_versioning.warn
 
1220
        symbol_versioning.set_warning_method(capture_warnings)
 
1221
        try:
 
1222
            result = a_callable(*args, **kwargs)
 
1223
        finally:
 
1224
            symbol_versioning.set_warning_method(original_warning_method)
 
1225
        return (local_warnings, result)
 
1226
 
 
1227
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1228
        """Call a deprecated callable without warning the user.
 
1229
 
 
1230
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1231
        not other callers that go direct to the warning module.
 
1232
 
 
1233
        To test that a deprecated method raises an error, do something like
 
1234
        this::
 
1235
 
 
1236
            self.assertRaises(errors.ReservedId,
 
1237
                self.applyDeprecated,
 
1238
                deprecated_in((1, 5, 0)),
 
1239
                br.append_revision,
 
1240
                'current:')
 
1241
 
 
1242
        :param deprecation_format: The deprecation format that the callable
 
1243
            should have been deprecated with. This is the same type as the
 
1244
            parameter to deprecated_method/deprecated_function. If the
 
1245
            callable is not deprecated with this format, an assertion error
 
1246
            will be raised.
 
1247
        :param a_callable: A callable to call. This may be a bound method or
 
1248
            a regular function. It will be called with ``*args`` and
 
1249
            ``**kwargs``.
 
1250
        :param args: The positional arguments for the callable
 
1251
        :param kwargs: The keyword arguments for the callable
 
1252
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1253
        """
 
1254
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1255
            *args, **kwargs)
 
1256
        expected_first_warning = symbol_versioning.deprecation_string(
 
1257
            a_callable, deprecation_format)
 
1258
        if len(call_warnings) == 0:
 
1259
            self.fail("No deprecation warning generated by call to %s" %
 
1260
                a_callable)
 
1261
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1262
        return result
 
1263
 
 
1264
    def callCatchWarnings(self, fn, *args, **kw):
 
1265
        """Call a callable that raises python warnings.
 
1266
 
 
1267
        The caller's responsible for examining the returned warnings.
 
1268
 
 
1269
        If the callable raises an exception, the exception is not
 
1270
        caught and propagates up to the caller.  In that case, the list
 
1271
        of warnings is not available.
 
1272
 
 
1273
        :returns: ([warning_object, ...], fn_result)
 
1274
        """
 
1275
        # XXX: This is not perfect, because it completely overrides the
 
1276
        # warnings filters, and some code may depend on suppressing particular
 
1277
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1278
        # though.  -- Andrew, 20071062
 
1279
        wlist = []
 
1280
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1281
            # despite the name, 'message' is normally(?) a Warning subclass
 
1282
            # instance
 
1283
            wlist.append(message)
 
1284
        saved_showwarning = warnings.showwarning
 
1285
        saved_filters = warnings.filters
 
1286
        try:
 
1287
            warnings.showwarning = _catcher
 
1288
            warnings.filters = []
 
1289
            result = fn(*args, **kw)
 
1290
        finally:
 
1291
            warnings.showwarning = saved_showwarning
 
1292
            warnings.filters = saved_filters
 
1293
        return wlist, result
 
1294
 
 
1295
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1296
        """Assert that a callable is deprecated in a particular way.
 
1297
 
 
1298
        This is a very precise test for unusual requirements. The
 
1299
        applyDeprecated helper function is probably more suited for most tests
 
1300
        as it allows you to simply specify the deprecation format being used
 
1301
        and will ensure that that is issued for the function being called.
 
1302
 
 
1303
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1304
        not other callers that go direct to the warning module.  To catch
 
1305
        general warnings, use callCatchWarnings.
 
1306
 
 
1307
        :param expected: a list of the deprecation warnings expected, in order
 
1308
        :param callable: The callable to call
 
1309
        :param args: The positional arguments for the callable
 
1310
        :param kwargs: The keyword arguments for the callable
 
1311
        """
 
1312
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1313
            *args, **kwargs)
 
1314
        self.assertEqual(expected, call_warnings)
 
1315
        return result
 
1316
 
 
1317
    def _startLogFile(self):
 
1318
        """Send bzr and test log messages to a temporary file.
 
1319
 
 
1320
        The file is removed as the test is torn down.
 
1321
        """
 
1322
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1323
        self._log_file = os.fdopen(fileno, 'w+')
 
1324
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1325
        self._log_file_name = name
 
1326
        self.addCleanup(self._finishLogFile)
 
1327
 
 
1328
    def _finishLogFile(self):
 
1329
        """Finished with the log file.
 
1330
 
 
1331
        Close the file and delete it, unless setKeepLogfile was called.
 
1332
        """
 
1333
        if self._log_file is None:
 
1334
            return
 
1335
        bzrlib.trace.pop_log_file(self._log_memento)
 
1336
        self._log_file.close()
 
1337
        self._log_file = None
 
1338
        if not self._keep_log_file:
 
1339
            os.remove(self._log_file_name)
 
1340
            self._log_file_name = None
 
1341
 
 
1342
    def setKeepLogfile(self):
 
1343
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1344
        self._keep_log_file = True
 
1345
 
 
1346
    def thisFailsStrictLockCheck(self):
 
1347
        """It is known that this test would fail with -Dstrict_locks.
 
1348
 
 
1349
        By default, all tests are run with strict lock checking unless
 
1350
        -Edisable_lock_checks is supplied. However there are some tests which
 
1351
        we know fail strict locks at this point that have not been fixed.
 
1352
        They should call this function to disable the strict checking.
 
1353
 
 
1354
        This should be used sparingly, it is much better to fix the locking
 
1355
        issues rather than papering over the problem by calling this function.
 
1356
        """
 
1357
        debug.debug_flags.discard('strict_locks')
 
1358
 
 
1359
    def addCleanup(self, callable, *args, **kwargs):
 
1360
        """Arrange to run a callable when this case is torn down.
 
1361
 
 
1362
        Callables are run in the reverse of the order they are registered,
 
1363
        ie last-in first-out.
 
1364
        """
 
1365
        self._cleanups.append((callable, args, kwargs))
 
1366
 
 
1367
    def _cleanEnvironment(self):
 
1368
        new_env = {
 
1369
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1370
            'HOME': os.getcwd(),
 
1371
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1372
            # tests do check our impls match APPDATA
 
1373
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1374
            'VISUAL': None,
 
1375
            'EDITOR': None,
 
1376
            'BZR_EMAIL': None,
 
1377
            'BZREMAIL': None, # may still be present in the environment
 
1378
            'EMAIL': None,
 
1379
            'BZR_PROGRESS_BAR': None,
 
1380
            'BZR_LOG': None,
 
1381
            'BZR_PLUGIN_PATH': None,
 
1382
            # Make sure that any text ui tests are consistent regardless of
 
1383
            # the environment the test case is run in; you may want tests that
 
1384
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1385
            # going to a pipe or a StringIO.
 
1386
            'TERM': 'dumb',
 
1387
            'LINES': '25',
 
1388
            'COLUMNS': '80',
 
1389
            # SSH Agent
 
1390
            'SSH_AUTH_SOCK': None,
 
1391
            # Proxies
 
1392
            'http_proxy': None,
 
1393
            'HTTP_PROXY': None,
 
1394
            'https_proxy': None,
 
1395
            'HTTPS_PROXY': None,
 
1396
            'no_proxy': None,
 
1397
            'NO_PROXY': None,
 
1398
            'all_proxy': None,
 
1399
            'ALL_PROXY': None,
 
1400
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1401
            # least. If you do (care), please update this comment
 
1402
            # -- vila 20080401
 
1403
            'ftp_proxy': None,
 
1404
            'FTP_PROXY': None,
 
1405
            'BZR_REMOTE_PATH': None,
 
1406
        }
 
1407
        self.__old_env = {}
 
1408
        self.addCleanup(self._restoreEnvironment)
 
1409
        for name, value in new_env.iteritems():
 
1410
            self._captureVar(name, value)
 
1411
 
 
1412
    def _captureVar(self, name, newvalue):
 
1413
        """Set an environment variable, and reset it when finished."""
 
1414
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1415
 
 
1416
    def _restore_debug_flags(self):
 
1417
        debug.debug_flags.clear()
 
1418
        debug.debug_flags.update(self._preserved_debug_flags)
 
1419
 
 
1420
    def _restoreEnvironment(self):
 
1421
        for name, value in self.__old_env.iteritems():
 
1422
            osutils.set_or_unset_env(name, value)
 
1423
 
 
1424
    def _restoreHooks(self):
 
1425
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1426
            setattr(klass, name, hooks)
 
1427
 
 
1428
    def knownFailure(self, reason):
 
1429
        """This test has failed for some known reason."""
 
1430
        raise KnownFailure(reason)
 
1431
 
 
1432
    def _do_skip(self, result, reason):
 
1433
        addSkip = getattr(result, 'addSkip', None)
 
1434
        if not callable(addSkip):
 
1435
            result.addError(self, sys.exc_info())
 
1436
        else:
 
1437
            addSkip(self, reason)
 
1438
 
 
1439
    def run(self, result=None):
 
1440
        if result is None: result = self.defaultTestResult()
 
1441
        for feature in getattr(self, '_test_needs_features', []):
 
1442
            if not feature.available():
 
1443
                result.startTest(self)
 
1444
                if getattr(result, 'addNotSupported', None):
 
1445
                    result.addNotSupported(self, feature)
 
1446
                else:
 
1447
                    result.addSuccess(self)
 
1448
                result.stopTest(self)
 
1449
                return result
 
1450
        try:
 
1451
            try:
 
1452
                result.startTest(self)
 
1453
                absent_attr = object()
 
1454
                # Python 2.5
 
1455
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1456
                if method_name is absent_attr:
 
1457
                    # Python 2.4
 
1458
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1459
                testMethod = getattr(self, method_name)
 
1460
                try:
 
1461
                    try:
 
1462
                        self.setUp()
 
1463
                        if not self._bzr_test_setUp_run:
 
1464
                            self.fail(
 
1465
                                "test setUp did not invoke "
 
1466
                                "bzrlib.tests.TestCase's setUp")
 
1467
                    except KeyboardInterrupt:
 
1468
                        self._runCleanups()
 
1469
                        raise
 
1470
                    except TestSkipped, e:
 
1471
                        self._do_skip(result, e.args[0])
 
1472
                        self.tearDown()
 
1473
                        return result
 
1474
                    except:
 
1475
                        result.addError(self, sys.exc_info())
 
1476
                        self._runCleanups()
 
1477
                        return result
 
1478
 
 
1479
                    ok = False
 
1480
                    try:
 
1481
                        testMethod()
 
1482
                        ok = True
 
1483
                    except self.failureException:
 
1484
                        result.addFailure(self, sys.exc_info())
 
1485
                    except TestSkipped, e:
 
1486
                        if not e.args:
 
1487
                            reason = "No reason given."
 
1488
                        else:
 
1489
                            reason = e.args[0]
 
1490
                        self._do_skip(result, reason)
 
1491
                    except KeyboardInterrupt:
 
1492
                        self._runCleanups()
 
1493
                        raise
 
1494
                    except:
 
1495
                        result.addError(self, sys.exc_info())
 
1496
 
 
1497
                    try:
 
1498
                        self.tearDown()
 
1499
                        if not self._bzr_test_tearDown_run:
 
1500
                            self.fail(
 
1501
                                "test tearDown did not invoke "
 
1502
                                "bzrlib.tests.TestCase's tearDown")
 
1503
                    except KeyboardInterrupt:
 
1504
                        self._runCleanups()
 
1505
                        raise
 
1506
                    except:
 
1507
                        result.addError(self, sys.exc_info())
 
1508
                        self._runCleanups()
 
1509
                        ok = False
 
1510
                    if ok: result.addSuccess(self)
 
1511
                finally:
 
1512
                    result.stopTest(self)
 
1513
                return result
 
1514
            except TestNotApplicable:
 
1515
                # Not moved from the result [yet].
 
1516
                self._runCleanups()
 
1517
                raise
 
1518
            except KeyboardInterrupt:
 
1519
                self._runCleanups()
 
1520
                raise
 
1521
        finally:
 
1522
            saved_attrs = {}
 
1523
            for attr_name in self.attrs_to_keep:
 
1524
                if attr_name in self.__dict__:
 
1525
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1526
            self.__dict__ = saved_attrs
 
1527
 
 
1528
    def tearDown(self):
 
1529
        self._runCleanups()
 
1530
        self._log_contents = ''
 
1531
        self._bzr_test_tearDown_run = True
 
1532
        unittest.TestCase.tearDown(self)
 
1533
 
 
1534
    def time(self, callable, *args, **kwargs):
 
1535
        """Run callable and accrue the time it takes to the benchmark time.
 
1536
 
 
1537
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1538
        this will cause lsprofile statistics to be gathered and stored in
 
1539
        self._benchcalls.
 
1540
        """
 
1541
        if self._benchtime is None:
 
1542
            self._benchtime = 0
 
1543
        start = time.time()
 
1544
        try:
 
1545
            if not self._gather_lsprof_in_benchmarks:
 
1546
                return callable(*args, **kwargs)
 
1547
            else:
 
1548
                # record this benchmark
 
1549
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1550
                stats.sort()
 
1551
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1552
                return ret
 
1553
        finally:
 
1554
            self._benchtime += time.time() - start
 
1555
 
 
1556
    def _runCleanups(self):
 
1557
        """Run registered cleanup functions.
 
1558
 
 
1559
        This should only be called from TestCase.tearDown.
 
1560
        """
 
1561
        # TODO: Perhaps this should keep running cleanups even if
 
1562
        # one of them fails?
 
1563
 
 
1564
        # Actually pop the cleanups from the list so tearDown running
 
1565
        # twice is safe (this happens for skipped tests).
 
1566
        while self._cleanups:
 
1567
            cleanup, args, kwargs = self._cleanups.pop()
 
1568
            cleanup(*args, **kwargs)
 
1569
 
 
1570
    def log(self, *args):
 
1571
        mutter(*args)
 
1572
 
 
1573
    def _get_log(self, keep_log_file=False):
 
1574
        """Get the log from bzrlib.trace calls from this test.
 
1575
 
 
1576
        :param keep_log_file: When True, if the log is still a file on disk
 
1577
            leave it as a file on disk. When False, if the log is still a file
 
1578
            on disk, the log file is deleted and the log preserved as
 
1579
            self._log_contents.
 
1580
        :return: A string containing the log.
 
1581
        """
 
1582
        # flush the log file, to get all content
 
1583
        import bzrlib.trace
 
1584
        if bzrlib.trace._trace_file:
 
1585
            bzrlib.trace._trace_file.flush()
 
1586
        if self._log_contents:
 
1587
            # XXX: this can hardly contain the content flushed above --vila
 
1588
            # 20080128
 
1589
            return self._log_contents
 
1590
        if self._log_file_name is not None:
 
1591
            logfile = open(self._log_file_name)
 
1592
            try:
 
1593
                log_contents = logfile.read()
 
1594
            finally:
 
1595
                logfile.close()
 
1596
            if not keep_log_file:
 
1597
                self._log_contents = log_contents
 
1598
                try:
 
1599
                    os.remove(self._log_file_name)
 
1600
                except OSError, e:
 
1601
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1602
                        sys.stderr.write(('Unable to delete log file '
 
1603
                                             ' %r\n' % self._log_file_name))
 
1604
                    else:
 
1605
                        raise
 
1606
            return log_contents
 
1607
        else:
 
1608
            return "DELETED log file to reduce memory footprint"
 
1609
 
 
1610
    def requireFeature(self, feature):
 
1611
        """This test requires a specific feature is available.
 
1612
 
 
1613
        :raises UnavailableFeature: When feature is not available.
 
1614
        """
 
1615
        if not feature.available():
 
1616
            raise UnavailableFeature(feature)
 
1617
 
 
1618
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1619
            working_dir):
 
1620
        """Run bazaar command line, splitting up a string command line."""
 
1621
        if isinstance(args, basestring):
 
1622
            # shlex don't understand unicode strings,
 
1623
            # so args should be plain string (bialix 20070906)
 
1624
            args = list(shlex.split(str(args)))
 
1625
        return self._run_bzr_core(args, retcode=retcode,
 
1626
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1627
                )
 
1628
 
 
1629
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1630
            working_dir):
 
1631
        if encoding is None:
 
1632
            encoding = osutils.get_user_encoding()
 
1633
        stdout = StringIOWrapper()
 
1634
        stderr = StringIOWrapper()
 
1635
        stdout.encoding = encoding
 
1636
        stderr.encoding = encoding
 
1637
 
 
1638
        self.log('run bzr: %r', args)
 
1639
        # FIXME: don't call into logging here
 
1640
        handler = logging.StreamHandler(stderr)
 
1641
        handler.setLevel(logging.INFO)
 
1642
        logger = logging.getLogger('')
 
1643
        logger.addHandler(handler)
 
1644
        old_ui_factory = ui.ui_factory
 
1645
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1646
 
 
1647
        cwd = None
 
1648
        if working_dir is not None:
 
1649
            cwd = osutils.getcwd()
 
1650
            os.chdir(working_dir)
 
1651
 
 
1652
        try:
 
1653
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1654
                stdout, stderr,
 
1655
                bzrlib.commands.run_bzr_catch_user_errors,
 
1656
                args)
 
1657
        finally:
 
1658
            logger.removeHandler(handler)
 
1659
            ui.ui_factory = old_ui_factory
 
1660
            if cwd is not None:
 
1661
                os.chdir(cwd)
 
1662
 
 
1663
        out = stdout.getvalue()
 
1664
        err = stderr.getvalue()
 
1665
        if out:
 
1666
            self.log('output:\n%r', out)
 
1667
        if err:
 
1668
            self.log('errors:\n%r', err)
 
1669
        if retcode is not None:
 
1670
            self.assertEquals(retcode, result,
 
1671
                              message='Unexpected return code')
 
1672
        return out, err
 
1673
 
 
1674
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1675
                working_dir=None, error_regexes=[], output_encoding=None):
 
1676
        """Invoke bzr, as if it were run from the command line.
 
1677
 
 
1678
        The argument list should not include the bzr program name - the
 
1679
        first argument is normally the bzr command.  Arguments may be
 
1680
        passed in three ways:
 
1681
 
 
1682
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1683
        when the command contains whitespace or metacharacters, or
 
1684
        is built up at run time.
 
1685
 
 
1686
        2- A single string, eg "add a".  This is the most convenient
 
1687
        for hardcoded commands.
 
1688
 
 
1689
        This runs bzr through the interface that catches and reports
 
1690
        errors, and with logging set to something approximating the
 
1691
        default, so that error reporting can be checked.
 
1692
 
 
1693
        This should be the main method for tests that want to exercise the
 
1694
        overall behavior of the bzr application (rather than a unit test
 
1695
        or a functional test of the library.)
 
1696
 
 
1697
        This sends the stdout/stderr results into the test's log,
 
1698
        where it may be useful for debugging.  See also run_captured.
 
1699
 
 
1700
        :keyword stdin: A string to be used as stdin for the command.
 
1701
        :keyword retcode: The status code the command should return;
 
1702
            default 0.
 
1703
        :keyword working_dir: The directory to run the command in
 
1704
        :keyword error_regexes: A list of expected error messages.  If
 
1705
            specified they must be seen in the error output of the command.
 
1706
        """
 
1707
        out, err = self._run_bzr_autosplit(
 
1708
            args=args,
 
1709
            retcode=retcode,
 
1710
            encoding=encoding,
 
1711
            stdin=stdin,
 
1712
            working_dir=working_dir,
 
1713
            )
 
1714
        self.assertIsInstance(error_regexes, (list, tuple))
 
1715
        for regex in error_regexes:
 
1716
            self.assertContainsRe(err, regex)
 
1717
        return out, err
 
1718
 
 
1719
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1720
        """Run bzr, and check that stderr contains the supplied regexes
 
1721
 
 
1722
        :param error_regexes: Sequence of regular expressions which
 
1723
            must each be found in the error output. The relative ordering
 
1724
            is not enforced.
 
1725
        :param args: command-line arguments for bzr
 
1726
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1727
            This function changes the default value of retcode to be 3,
 
1728
            since in most cases this is run when you expect bzr to fail.
 
1729
 
 
1730
        :return: (out, err) The actual output of running the command (in case
 
1731
            you want to do more inspection)
 
1732
 
 
1733
        Examples of use::
 
1734
 
 
1735
            # Make sure that commit is failing because there is nothing to do
 
1736
            self.run_bzr_error(['no changes to commit'],
 
1737
                               ['commit', '-m', 'my commit comment'])
 
1738
            # Make sure --strict is handling an unknown file, rather than
 
1739
            # giving us the 'nothing to do' error
 
1740
            self.build_tree(['unknown'])
 
1741
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1742
                               ['commit', --strict', '-m', 'my commit comment'])
 
1743
        """
 
1744
        kwargs.setdefault('retcode', 3)
 
1745
        kwargs['error_regexes'] = error_regexes
 
1746
        out, err = self.run_bzr(*args, **kwargs)
 
1747
        return out, err
 
1748
 
 
1749
    def run_bzr_subprocess(self, *args, **kwargs):
 
1750
        """Run bzr in a subprocess for testing.
 
1751
 
 
1752
        This starts a new Python interpreter and runs bzr in there.
 
1753
        This should only be used for tests that have a justifiable need for
 
1754
        this isolation: e.g. they are testing startup time, or signal
 
1755
        handling, or early startup code, etc.  Subprocess code can't be
 
1756
        profiled or debugged so easily.
 
1757
 
 
1758
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1759
            None is supplied, the status code is not checked.
 
1760
        :keyword env_changes: A dictionary which lists changes to environment
 
1761
            variables. A value of None will unset the env variable.
 
1762
            The values must be strings. The change will only occur in the
 
1763
            child, so you don't need to fix the environment after running.
 
1764
        :keyword universal_newlines: Convert CRLF => LF
 
1765
        :keyword allow_plugins: By default the subprocess is run with
 
1766
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1767
            for system-wide plugins to create unexpected output on stderr,
 
1768
            which can cause unnecessary test failures.
 
1769
        """
 
1770
        env_changes = kwargs.get('env_changes', {})
 
1771
        working_dir = kwargs.get('working_dir', None)
 
1772
        allow_plugins = kwargs.get('allow_plugins', False)
 
1773
        if len(args) == 1:
 
1774
            if isinstance(args[0], list):
 
1775
                args = args[0]
 
1776
            elif isinstance(args[0], basestring):
 
1777
                args = list(shlex.split(args[0]))
 
1778
        else:
 
1779
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1780
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1781
                                            working_dir=working_dir,
 
1782
                                            allow_plugins=allow_plugins)
 
1783
        # We distinguish between retcode=None and retcode not passed.
 
1784
        supplied_retcode = kwargs.get('retcode', 0)
 
1785
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1786
            universal_newlines=kwargs.get('universal_newlines', False),
 
1787
            process_args=args)
 
1788
 
 
1789
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1790
                             skip_if_plan_to_signal=False,
 
1791
                             working_dir=None,
 
1792
                             allow_plugins=False):
 
1793
        """Start bzr in a subprocess for testing.
 
1794
 
 
1795
        This starts a new Python interpreter and runs bzr in there.
 
1796
        This should only be used for tests that have a justifiable need for
 
1797
        this isolation: e.g. they are testing startup time, or signal
 
1798
        handling, or early startup code, etc.  Subprocess code can't be
 
1799
        profiled or debugged so easily.
 
1800
 
 
1801
        :param process_args: a list of arguments to pass to the bzr executable,
 
1802
            for example ``['--version']``.
 
1803
        :param env_changes: A dictionary which lists changes to environment
 
1804
            variables. A value of None will unset the env variable.
 
1805
            The values must be strings. The change will only occur in the
 
1806
            child, so you don't need to fix the environment after running.
 
1807
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1808
            is not available.
 
1809
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1810
 
 
1811
        :returns: Popen object for the started process.
 
1812
        """
 
1813
        if skip_if_plan_to_signal:
 
1814
            if not getattr(os, 'kill', None):
 
1815
                raise TestSkipped("os.kill not available.")
 
1816
 
 
1817
        if env_changes is None:
 
1818
            env_changes = {}
 
1819
        old_env = {}
 
1820
 
 
1821
        def cleanup_environment():
 
1822
            for env_var, value in env_changes.iteritems():
 
1823
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1824
 
 
1825
        def restore_environment():
 
1826
            for env_var, value in old_env.iteritems():
 
1827
                osutils.set_or_unset_env(env_var, value)
 
1828
 
 
1829
        bzr_path = self.get_bzr_path()
 
1830
 
 
1831
        cwd = None
 
1832
        if working_dir is not None:
 
1833
            cwd = osutils.getcwd()
 
1834
            os.chdir(working_dir)
 
1835
 
 
1836
        try:
 
1837
            # win32 subprocess doesn't support preexec_fn
 
1838
            # so we will avoid using it on all platforms, just to
 
1839
            # make sure the code path is used, and we don't break on win32
 
1840
            cleanup_environment()
 
1841
            command = [sys.executable]
 
1842
            # frozen executables don't need the path to bzr
 
1843
            if getattr(sys, "frozen", None) is None:
 
1844
                command.append(bzr_path)
 
1845
            if not allow_plugins:
 
1846
                command.append('--no-plugins')
 
1847
            command.extend(process_args)
 
1848
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1849
        finally:
 
1850
            restore_environment()
 
1851
            if cwd is not None:
 
1852
                os.chdir(cwd)
 
1853
 
 
1854
        return process
 
1855
 
 
1856
    def _popen(self, *args, **kwargs):
 
1857
        """Place a call to Popen.
 
1858
 
 
1859
        Allows tests to override this method to intercept the calls made to
 
1860
        Popen for introspection.
 
1861
        """
 
1862
        return Popen(*args, **kwargs)
 
1863
 
 
1864
    def get_bzr_path(self):
 
1865
        """Return the path of the 'bzr' executable for this test suite."""
 
1866
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1867
        if not os.path.isfile(bzr_path):
 
1868
            # We are probably installed. Assume sys.argv is the right file
 
1869
            bzr_path = sys.argv[0]
 
1870
        return bzr_path
 
1871
 
 
1872
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1873
                              universal_newlines=False, process_args=None):
 
1874
        """Finish the execution of process.
 
1875
 
 
1876
        :param process: the Popen object returned from start_bzr_subprocess.
 
1877
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1878
            None is supplied, the status code is not checked.
 
1879
        :param send_signal: an optional signal to send to the process.
 
1880
        :param universal_newlines: Convert CRLF => LF
 
1881
        :returns: (stdout, stderr)
 
1882
        """
 
1883
        if send_signal is not None:
 
1884
            os.kill(process.pid, send_signal)
 
1885
        out, err = process.communicate()
 
1886
 
 
1887
        if universal_newlines:
 
1888
            out = out.replace('\r\n', '\n')
 
1889
            err = err.replace('\r\n', '\n')
 
1890
 
 
1891
        if retcode is not None and retcode != process.returncode:
 
1892
            if process_args is None:
 
1893
                process_args = "(unknown args)"
 
1894
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1895
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1896
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1897
                      % (process_args, retcode, process.returncode))
 
1898
        return [out, err]
 
1899
 
 
1900
    def check_inventory_shape(self, inv, shape):
 
1901
        """Compare an inventory to a list of expected names.
 
1902
 
 
1903
        Fail if they are not precisely equal.
 
1904
        """
 
1905
        extras = []
 
1906
        shape = list(shape)             # copy
 
1907
        for path, ie in inv.entries():
 
1908
            name = path.replace('\\', '/')
 
1909
            if ie.kind == 'directory':
 
1910
                name = name + '/'
 
1911
            if name in shape:
 
1912
                shape.remove(name)
 
1913
            else:
 
1914
                extras.append(name)
 
1915
        if shape:
 
1916
            self.fail("expected paths not found in inventory: %r" % shape)
 
1917
        if extras:
 
1918
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1919
 
 
1920
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1921
                         a_callable=None, *args, **kwargs):
 
1922
        """Call callable with redirected std io pipes.
 
1923
 
 
1924
        Returns the return code."""
 
1925
        if not callable(a_callable):
 
1926
            raise ValueError("a_callable must be callable.")
 
1927
        if stdin is None:
 
1928
            stdin = StringIO("")
 
1929
        if stdout is None:
 
1930
            if getattr(self, "_log_file", None) is not None:
 
1931
                stdout = self._log_file
 
1932
            else:
 
1933
                stdout = StringIO()
 
1934
        if stderr is None:
 
1935
            if getattr(self, "_log_file", None is not None):
 
1936
                stderr = self._log_file
 
1937
            else:
 
1938
                stderr = StringIO()
 
1939
        real_stdin = sys.stdin
 
1940
        real_stdout = sys.stdout
 
1941
        real_stderr = sys.stderr
 
1942
        try:
 
1943
            sys.stdout = stdout
 
1944
            sys.stderr = stderr
 
1945
            sys.stdin = stdin
 
1946
            return a_callable(*args, **kwargs)
 
1947
        finally:
 
1948
            sys.stdout = real_stdout
 
1949
            sys.stderr = real_stderr
 
1950
            sys.stdin = real_stdin
 
1951
 
 
1952
    def reduceLockdirTimeout(self):
 
1953
        """Reduce the default lock timeout for the duration of the test, so that
 
1954
        if LockContention occurs during a test, it does so quickly.
 
1955
 
 
1956
        Tests that expect to provoke LockContention errors should call this.
 
1957
        """
 
1958
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1959
        def resetTimeout():
 
1960
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1961
        self.addCleanup(resetTimeout)
 
1962
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1963
 
 
1964
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1965
        """Return a StringIOWrapper instance, that will encode Unicode
 
1966
        input to UTF-8.
 
1967
        """
 
1968
        if encoding_type is None:
 
1969
            encoding_type = 'strict'
 
1970
        sio = StringIO()
 
1971
        output_encoding = 'utf-8'
 
1972
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1973
        sio.encoding = output_encoding
 
1974
        return sio
 
1975
 
 
1976
    def disable_verb(self, verb):
 
1977
        """Disable a smart server verb for one test."""
 
1978
        from bzrlib.smart import request
 
1979
        request_handlers = request.request_handlers
 
1980
        orig_method = request_handlers.get(verb)
 
1981
        request_handlers.remove(verb)
 
1982
        def restoreVerb():
 
1983
            request_handlers.register(verb, orig_method)
 
1984
        self.addCleanup(restoreVerb)
 
1985
 
 
1986
 
 
1987
class CapturedCall(object):
 
1988
    """A helper for capturing smart server calls for easy debug analysis."""
 
1989
 
 
1990
    def __init__(self, params, prefix_length):
 
1991
        """Capture the call with params and skip prefix_length stack frames."""
 
1992
        self.call = params
 
1993
        import traceback
 
1994
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
1995
        # client frames. Beyond this we could get more clever, but this is good
 
1996
        # enough for now.
 
1997
        stack = traceback.extract_stack()[prefix_length:-5]
 
1998
        self.stack = ''.join(traceback.format_list(stack))
 
1999
 
 
2000
    def __str__(self):
 
2001
        return self.call.method
 
2002
 
 
2003
    def __repr__(self):
 
2004
        return self.call.method
 
2005
 
 
2006
    def stack(self):
 
2007
        return self.stack
 
2008
 
 
2009
 
 
2010
class TestCaseWithMemoryTransport(TestCase):
 
2011
    """Common test class for tests that do not need disk resources.
 
2012
 
 
2013
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2014
 
 
2015
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2016
 
 
2017
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2018
    a directory which does not exist. This serves to help ensure test isolation
 
2019
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2020
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2021
    file defaults for the transport in tests, nor does it obey the command line
 
2022
    override, so tests that accidentally write to the common directory should
 
2023
    be rare.
 
2024
 
 
2025
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2026
    a .bzr directory that stops us ascending higher into the filesystem.
 
2027
    """
 
2028
 
 
2029
    TEST_ROOT = None
 
2030
    _TEST_NAME = 'test'
 
2031
 
 
2032
    def __init__(self, methodName='runTest'):
 
2033
        # allow test parameterization after test construction and before test
 
2034
        # execution. Variables that the parameterizer sets need to be
 
2035
        # ones that are not set by setUp, or setUp will trash them.
 
2036
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2037
        self.vfs_transport_factory = default_transport
 
2038
        self.transport_server = None
 
2039
        self.transport_readonly_server = None
 
2040
        self.__vfs_server = None
 
2041
 
 
2042
    def get_transport(self, relpath=None):
 
2043
        """Return a writeable transport.
 
2044
 
 
2045
        This transport is for the test scratch space relative to
 
2046
        "self._test_root"
 
2047
 
 
2048
        :param relpath: a path relative to the base url.
 
2049
        """
 
2050
        t = get_transport(self.get_url(relpath))
 
2051
        self.assertFalse(t.is_readonly())
 
2052
        return t
 
2053
 
 
2054
    def get_readonly_transport(self, relpath=None):
 
2055
        """Return a readonly transport for the test scratch space
 
2056
 
 
2057
        This can be used to test that operations which should only need
 
2058
        readonly access in fact do not try to write.
 
2059
 
 
2060
        :param relpath: a path relative to the base url.
 
2061
        """
 
2062
        t = get_transport(self.get_readonly_url(relpath))
 
2063
        self.assertTrue(t.is_readonly())
 
2064
        return t
 
2065
 
 
2066
    def create_transport_readonly_server(self):
 
2067
        """Create a transport server from class defined at init.
 
2068
 
 
2069
        This is mostly a hook for daughter classes.
 
2070
        """
 
2071
        return self.transport_readonly_server()
 
2072
 
 
2073
    def get_readonly_server(self):
 
2074
        """Get the server instance for the readonly transport
 
2075
 
 
2076
        This is useful for some tests with specific servers to do diagnostics.
 
2077
        """
 
2078
        if self.__readonly_server is None:
 
2079
            if self.transport_readonly_server is None:
 
2080
                # readonly decorator requested
 
2081
                # bring up the server
 
2082
                self.__readonly_server = ReadonlyServer()
 
2083
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2084
            else:
 
2085
                self.__readonly_server = self.create_transport_readonly_server()
 
2086
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2087
            self.addCleanup(self.__readonly_server.tearDown)
 
2088
        return self.__readonly_server
 
2089
 
 
2090
    def get_readonly_url(self, relpath=None):
 
2091
        """Get a URL for the readonly transport.
 
2092
 
 
2093
        This will either be backed by '.' or a decorator to the transport
 
2094
        used by self.get_url()
 
2095
        relpath provides for clients to get a path relative to the base url.
 
2096
        These should only be downwards relative, not upwards.
 
2097
        """
 
2098
        base = self.get_readonly_server().get_url()
 
2099
        return self._adjust_url(base, relpath)
 
2100
 
 
2101
    def get_vfs_only_server(self):
 
2102
        """Get the vfs only read/write server instance.
 
2103
 
 
2104
        This is useful for some tests with specific servers that need
 
2105
        diagnostics.
 
2106
 
 
2107
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2108
        is no means to override it.
 
2109
        """
 
2110
        if self.__vfs_server is None:
 
2111
            self.__vfs_server = MemoryServer()
 
2112
            self.__vfs_server.setUp()
 
2113
            self.addCleanup(self.__vfs_server.tearDown)
 
2114
        return self.__vfs_server
 
2115
 
 
2116
    def get_server(self):
 
2117
        """Get the read/write server instance.
 
2118
 
 
2119
        This is useful for some tests with specific servers that need
 
2120
        diagnostics.
 
2121
 
 
2122
        This is built from the self.transport_server factory. If that is None,
 
2123
        then the self.get_vfs_server is returned.
 
2124
        """
 
2125
        if self.__server is None:
 
2126
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
2127
                return self.get_vfs_only_server()
 
2128
            else:
 
2129
                # bring up a decorated means of access to the vfs only server.
 
2130
                self.__server = self.transport_server()
 
2131
                try:
 
2132
                    self.__server.setUp(self.get_vfs_only_server())
 
2133
                except TypeError, e:
 
2134
                    # This should never happen; the try:Except here is to assist
 
2135
                    # developers having to update code rather than seeing an
 
2136
                    # uninformative TypeError.
 
2137
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
2138
            self.addCleanup(self.__server.tearDown)
 
2139
        return self.__server
 
2140
 
 
2141
    def _adjust_url(self, base, relpath):
 
2142
        """Get a URL (or maybe a path) for the readwrite transport.
 
2143
 
 
2144
        This will either be backed by '.' or to an equivalent non-file based
 
2145
        facility.
 
2146
        relpath provides for clients to get a path relative to the base url.
 
2147
        These should only be downwards relative, not upwards.
 
2148
        """
 
2149
        if relpath is not None and relpath != '.':
 
2150
            if not base.endswith('/'):
 
2151
                base = base + '/'
 
2152
            # XXX: Really base should be a url; we did after all call
 
2153
            # get_url()!  But sometimes it's just a path (from
 
2154
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2155
            # to a non-escaped local path.
 
2156
            if base.startswith('./') or base.startswith('/'):
 
2157
                base += relpath
 
2158
            else:
 
2159
                base += urlutils.escape(relpath)
 
2160
        return base
 
2161
 
 
2162
    def get_url(self, relpath=None):
 
2163
        """Get a URL (or maybe a path) for the readwrite transport.
 
2164
 
 
2165
        This will either be backed by '.' or to an equivalent non-file based
 
2166
        facility.
 
2167
        relpath provides for clients to get a path relative to the base url.
 
2168
        These should only be downwards relative, not upwards.
 
2169
        """
 
2170
        base = self.get_server().get_url()
 
2171
        return self._adjust_url(base, relpath)
 
2172
 
 
2173
    def get_vfs_only_url(self, relpath=None):
 
2174
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2175
 
 
2176
        This will never be a smart protocol.  It always has all the
 
2177
        capabilities of the local filesystem, but it might actually be a
 
2178
        MemoryTransport or some other similar virtual filesystem.
 
2179
 
 
2180
        This is the backing transport (if any) of the server returned by
 
2181
        get_url and get_readonly_url.
 
2182
 
 
2183
        :param relpath: provides for clients to get a path relative to the base
 
2184
            url.  These should only be downwards relative, not upwards.
 
2185
        :return: A URL
 
2186
        """
 
2187
        base = self.get_vfs_only_server().get_url()
 
2188
        return self._adjust_url(base, relpath)
 
2189
 
 
2190
    def _create_safety_net(self):
 
2191
        """Make a fake bzr directory.
 
2192
 
 
2193
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2194
        real branch.
 
2195
        """
 
2196
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2197
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2198
 
 
2199
    def _check_safety_net(self):
 
2200
        """Check that the safety .bzr directory have not been touched.
 
2201
 
 
2202
        _make_test_root have created a .bzr directory to prevent tests from
 
2203
        propagating. This method ensures than a test did not leaked.
 
2204
        """
 
2205
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2206
        wt = workingtree.WorkingTree.open(root)
 
2207
        last_rev = wt.last_revision()
 
2208
        if last_rev != 'null:':
 
2209
            # The current test have modified the /bzr directory, we need to
 
2210
            # recreate a new one or all the followng tests will fail.
 
2211
            # If you need to inspect its content uncomment the following line
 
2212
            # import pdb; pdb.set_trace()
 
2213
            _rmtree_temp_dir(root + '/.bzr')
 
2214
            self._create_safety_net()
 
2215
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2216
 
 
2217
    def _make_test_root(self):
 
2218
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2219
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2220
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2221
 
 
2222
            self._create_safety_net()
 
2223
 
 
2224
            # The same directory is used by all tests, and we're not
 
2225
            # specifically told when all tests are finished.  This will do.
 
2226
            atexit.register(_rmtree_temp_dir, root)
 
2227
 
 
2228
        self.addCleanup(self._check_safety_net)
 
2229
 
 
2230
    def makeAndChdirToTestDir(self):
 
2231
        """Create a temporary directories for this one test.
 
2232
 
 
2233
        This must set self.test_home_dir and self.test_dir and chdir to
 
2234
        self.test_dir.
 
2235
 
 
2236
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2237
        """
 
2238
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2239
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2240
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2241
 
 
2242
    def make_branch(self, relpath, format=None):
 
2243
        """Create a branch on the transport at relpath."""
 
2244
        repo = self.make_repository(relpath, format=format)
 
2245
        return repo.bzrdir.create_branch()
 
2246
 
 
2247
    def make_bzrdir(self, relpath, format=None):
 
2248
        try:
 
2249
            # might be a relative or absolute path
 
2250
            maybe_a_url = self.get_url(relpath)
 
2251
            segments = maybe_a_url.rsplit('/', 1)
 
2252
            t = get_transport(maybe_a_url)
 
2253
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2254
                t.ensure_base()
 
2255
            if format is None:
 
2256
                format = 'default'
 
2257
            if isinstance(format, basestring):
 
2258
                format = bzrdir.format_registry.make_bzrdir(format)
 
2259
            return format.initialize_on_transport(t)
 
2260
        except errors.UninitializableFormat:
 
2261
            raise TestSkipped("Format %s is not initializable." % format)
 
2262
 
 
2263
    def make_repository(self, relpath, shared=False, format=None):
 
2264
        """Create a repository on our default transport at relpath.
 
2265
 
 
2266
        Note that relpath must be a relative path, not a full url.
 
2267
        """
 
2268
        # FIXME: If you create a remoterepository this returns the underlying
 
2269
        # real format, which is incorrect.  Actually we should make sure that
 
2270
        # RemoteBzrDir returns a RemoteRepository.
 
2271
        # maybe  mbp 20070410
 
2272
        made_control = self.make_bzrdir(relpath, format=format)
 
2273
        return made_control.create_repository(shared=shared)
 
2274
 
 
2275
    def make_smart_server(self, path):
 
2276
        smart_server = server.SmartTCPServer_for_testing()
 
2277
        smart_server.setUp(self.get_server())
 
2278
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2279
        self.addCleanup(smart_server.tearDown)
 
2280
        return remote_transport
 
2281
 
 
2282
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2283
        """Create a branch on the default transport and a MemoryTree for it."""
 
2284
        b = self.make_branch(relpath, format=format)
 
2285
        return memorytree.MemoryTree.create_on_branch(b)
 
2286
 
 
2287
    def make_branch_builder(self, relpath, format=None):
 
2288
        branch = self.make_branch(relpath, format=format)
 
2289
        return branchbuilder.BranchBuilder(branch=branch)
 
2290
 
 
2291
    def overrideEnvironmentForTesting(self):
 
2292
        os.environ['HOME'] = self.test_home_dir
 
2293
        os.environ['BZR_HOME'] = self.test_home_dir
 
2294
 
 
2295
    def setUp(self):
 
2296
        super(TestCaseWithMemoryTransport, self).setUp()
 
2297
        self._make_test_root()
 
2298
        _currentdir = os.getcwdu()
 
2299
        def _leaveDirectory():
 
2300
            os.chdir(_currentdir)
 
2301
        self.addCleanup(_leaveDirectory)
 
2302
        self.makeAndChdirToTestDir()
 
2303
        self.overrideEnvironmentForTesting()
 
2304
        self.__readonly_server = None
 
2305
        self.__server = None
 
2306
        self.reduceLockdirTimeout()
 
2307
 
 
2308
    def setup_smart_server_with_call_log(self):
 
2309
        """Sets up a smart server as the transport server with a call log."""
 
2310
        self.transport_server = server.SmartTCPServer_for_testing
 
2311
        self.hpss_calls = []
 
2312
        import traceback
 
2313
        # Skip the current stack down to the caller of
 
2314
        # setup_smart_server_with_call_log
 
2315
        prefix_length = len(traceback.extract_stack()) - 2
 
2316
        def capture_hpss_call(params):
 
2317
            self.hpss_calls.append(
 
2318
                CapturedCall(params, prefix_length))
 
2319
        client._SmartClient.hooks.install_named_hook(
 
2320
            'call', capture_hpss_call, None)
 
2321
 
 
2322
    def reset_smart_call_log(self):
 
2323
        self.hpss_calls = []
 
2324
 
 
2325
 
 
2326
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2327
    """Derived class that runs a test within a temporary directory.
 
2328
 
 
2329
    This is useful for tests that need to create a branch, etc.
 
2330
 
 
2331
    The directory is created in a slightly complex way: for each
 
2332
    Python invocation, a new temporary top-level directory is created.
 
2333
    All test cases create their own directory within that.  If the
 
2334
    tests complete successfully, the directory is removed.
 
2335
 
 
2336
    :ivar test_base_dir: The path of the top-level directory for this
 
2337
    test, which contains a home directory and a work directory.
 
2338
 
 
2339
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2340
    which is used as $HOME for this test.
 
2341
 
 
2342
    :ivar test_dir: A directory under test_base_dir used as the current
 
2343
    directory when the test proper is run.
 
2344
    """
 
2345
 
 
2346
    OVERRIDE_PYTHON = 'python'
 
2347
 
 
2348
    def check_file_contents(self, filename, expect):
 
2349
        self.log("check contents of file %s" % filename)
 
2350
        contents = file(filename, 'r').read()
 
2351
        if contents != expect:
 
2352
            self.log("expected: %r" % expect)
 
2353
            self.log("actually: %r" % contents)
 
2354
            self.fail("contents of %s not as expected" % filename)
 
2355
 
 
2356
    def _getTestDirPrefix(self):
 
2357
        # create a directory within the top level test directory
 
2358
        if sys.platform in ('win32', 'cygwin'):
 
2359
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2360
            # windows is likely to have path-length limits so use a short name
 
2361
            name_prefix = name_prefix[-30:]
 
2362
        else:
 
2363
            name_prefix = re.sub('[/]', '_', self.id())
 
2364
        return name_prefix
 
2365
 
 
2366
    def makeAndChdirToTestDir(self):
 
2367
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2368
 
 
2369
        For TestCaseInTempDir we create a temporary directory based on the test
 
2370
        name and then create two subdirs - test and home under it.
 
2371
        """
 
2372
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2373
            self._getTestDirPrefix())
 
2374
        name = name_prefix
 
2375
        for i in range(100):
 
2376
            if os.path.exists(name):
 
2377
                name = name_prefix + '_' + str(i)
 
2378
            else:
 
2379
                os.mkdir(name)
 
2380
                break
 
2381
        # now create test and home directories within this dir
 
2382
        self.test_base_dir = name
 
2383
        self.test_home_dir = self.test_base_dir + '/home'
 
2384
        os.mkdir(self.test_home_dir)
 
2385
        self.test_dir = self.test_base_dir + '/work'
 
2386
        os.mkdir(self.test_dir)
 
2387
        os.chdir(self.test_dir)
 
2388
        # put name of test inside
 
2389
        f = file(self.test_base_dir + '/name', 'w')
 
2390
        try:
 
2391
            f.write(self.id())
 
2392
        finally:
 
2393
            f.close()
 
2394
        self.addCleanup(self.deleteTestDir)
 
2395
 
 
2396
    def deleteTestDir(self):
 
2397
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2398
        _rmtree_temp_dir(self.test_base_dir)
 
2399
 
 
2400
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2401
        """Build a test tree according to a pattern.
 
2402
 
 
2403
        shape is a sequence of file specifications.  If the final
 
2404
        character is '/', a directory is created.
 
2405
 
 
2406
        This assumes that all the elements in the tree being built are new.
 
2407
 
 
2408
        This doesn't add anything to a branch.
 
2409
 
 
2410
        :type shape:    list or tuple.
 
2411
        :param line_endings: Either 'binary' or 'native'
 
2412
            in binary mode, exact contents are written in native mode, the
 
2413
            line endings match the default platform endings.
 
2414
        :param transport: A transport to write to, for building trees on VFS's.
 
2415
            If the transport is readonly or None, "." is opened automatically.
 
2416
        :return: None
 
2417
        """
 
2418
        if type(shape) not in (list, tuple):
 
2419
            raise AssertionError("Parameter 'shape' should be "
 
2420
                "a list or a tuple. Got %r instead" % (shape,))
 
2421
        # It's OK to just create them using forward slashes on windows.
 
2422
        if transport is None or transport.is_readonly():
 
2423
            transport = get_transport(".")
 
2424
        for name in shape:
 
2425
            self.assertIsInstance(name, basestring)
 
2426
            if name[-1] == '/':
 
2427
                transport.mkdir(urlutils.escape(name[:-1]))
 
2428
            else:
 
2429
                if line_endings == 'binary':
 
2430
                    end = '\n'
 
2431
                elif line_endings == 'native':
 
2432
                    end = os.linesep
 
2433
                else:
 
2434
                    raise errors.BzrError(
 
2435
                        'Invalid line ending request %r' % line_endings)
 
2436
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2437
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2438
 
 
2439
    def build_tree_contents(self, shape):
 
2440
        build_tree_contents(shape)
 
2441
 
 
2442
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2443
        """Assert whether path or paths are in the WorkingTree"""
 
2444
        if tree is None:
 
2445
            tree = workingtree.WorkingTree.open(root_path)
 
2446
        if not isinstance(path, basestring):
 
2447
            for p in path:
 
2448
                self.assertInWorkingTree(p, tree=tree)
 
2449
        else:
 
2450
            self.assertIsNot(tree.path2id(path), None,
 
2451
                path+' not in working tree.')
 
2452
 
 
2453
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2454
        """Assert whether path or paths are not in the WorkingTree"""
 
2455
        if tree is None:
 
2456
            tree = workingtree.WorkingTree.open(root_path)
 
2457
        if not isinstance(path, basestring):
 
2458
            for p in path:
 
2459
                self.assertNotInWorkingTree(p,tree=tree)
 
2460
        else:
 
2461
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2462
 
 
2463
 
 
2464
class TestCaseWithTransport(TestCaseInTempDir):
 
2465
    """A test case that provides get_url and get_readonly_url facilities.
 
2466
 
 
2467
    These back onto two transport servers, one for readonly access and one for
 
2468
    read write access.
 
2469
 
 
2470
    If no explicit class is provided for readonly access, a
 
2471
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2472
    based read write transports.
 
2473
 
 
2474
    If an explicit class is provided for readonly access, that server and the
 
2475
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2476
    """
 
2477
 
 
2478
    def get_vfs_only_server(self):
 
2479
        """See TestCaseWithMemoryTransport.
 
2480
 
 
2481
        This is useful for some tests with specific servers that need
 
2482
        diagnostics.
 
2483
        """
 
2484
        if self.__vfs_server is None:
 
2485
            self.__vfs_server = self.vfs_transport_factory()
 
2486
            self.__vfs_server.setUp()
 
2487
            self.addCleanup(self.__vfs_server.tearDown)
 
2488
        return self.__vfs_server
 
2489
 
 
2490
    def make_branch_and_tree(self, relpath, format=None):
 
2491
        """Create a branch on the transport and a tree locally.
 
2492
 
 
2493
        If the transport is not a LocalTransport, the Tree can't be created on
 
2494
        the transport.  In that case if the vfs_transport_factory is
 
2495
        LocalURLServer the working tree is created in the local
 
2496
        directory backing the transport, and the returned tree's branch and
 
2497
        repository will also be accessed locally. Otherwise a lightweight
 
2498
        checkout is created and returned.
 
2499
 
 
2500
        We do this because we can't physically create a tree in the local
 
2501
        path, with a branch reference to the transport_factory url, and
 
2502
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2503
        namespace is distinct from the local disk - the two branch objects
 
2504
        would collide. While we could construct a tree with its branch object
 
2505
        pointing at the transport_factory transport in memory, reopening it
 
2506
        would behaving unexpectedly, and has in the past caused testing bugs
 
2507
        when we tried to do it that way.
 
2508
 
 
2509
        :param format: The BzrDirFormat.
 
2510
        :returns: the WorkingTree.
 
2511
        """
 
2512
        # TODO: always use the local disk path for the working tree,
 
2513
        # this obviously requires a format that supports branch references
 
2514
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2515
        # RBC 20060208
 
2516
        b = self.make_branch(relpath, format=format)
 
2517
        try:
 
2518
            return b.bzrdir.create_workingtree()
 
2519
        except errors.NotLocalUrl:
 
2520
            # We can only make working trees locally at the moment.  If the
 
2521
            # transport can't support them, then we keep the non-disk-backed
 
2522
            # branch and create a local checkout.
 
2523
            if self.vfs_transport_factory is LocalURLServer:
 
2524
                # the branch is colocated on disk, we cannot create a checkout.
 
2525
                # hopefully callers will expect this.
 
2526
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2527
                wt = local_controldir.create_workingtree()
 
2528
                if wt.branch._format != b._format:
 
2529
                    wt._branch = b
 
2530
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2531
                    # in case the implementation details of workingtree objects
 
2532
                    # change.
 
2533
                    self.assertIs(b, wt.branch)
 
2534
                return wt
 
2535
            else:
 
2536
                return b.create_checkout(relpath, lightweight=True)
 
2537
 
 
2538
    def assertIsDirectory(self, relpath, transport):
 
2539
        """Assert that relpath within transport is a directory.
 
2540
 
 
2541
        This may not be possible on all transports; in that case it propagates
 
2542
        a TransportNotPossible.
 
2543
        """
 
2544
        try:
 
2545
            mode = transport.stat(relpath).st_mode
 
2546
        except errors.NoSuchFile:
 
2547
            self.fail("path %s is not a directory; no such file"
 
2548
                      % (relpath))
 
2549
        if not stat.S_ISDIR(mode):
 
2550
            self.fail("path %s is not a directory; has mode %#o"
 
2551
                      % (relpath, mode))
 
2552
 
 
2553
    def assertTreesEqual(self, left, right):
 
2554
        """Check that left and right have the same content and properties."""
 
2555
        # we use a tree delta to check for equality of the content, and we
 
2556
        # manually check for equality of other things such as the parents list.
 
2557
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2558
        differences = left.changes_from(right)
 
2559
        self.assertFalse(differences.has_changed(),
 
2560
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2561
 
 
2562
    def setUp(self):
 
2563
        super(TestCaseWithTransport, self).setUp()
 
2564
        self.__vfs_server = None
 
2565
 
 
2566
 
 
2567
class ChrootedTestCase(TestCaseWithTransport):
 
2568
    """A support class that provides readonly urls outside the local namespace.
 
2569
 
 
2570
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2571
    is then we are chrooted already, if it is not then an HttpServer is used
 
2572
    for readonly urls.
 
2573
 
 
2574
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2575
                       be used without needed to redo it when a different
 
2576
                       subclass is in use ?
 
2577
    """
 
2578
 
 
2579
    def setUp(self):
 
2580
        super(ChrootedTestCase, self).setUp()
 
2581
        if not self.vfs_transport_factory == MemoryServer:
 
2582
            self.transport_readonly_server = HttpServer
 
2583
 
 
2584
 
 
2585
def condition_id_re(pattern):
 
2586
    """Create a condition filter which performs a re check on a test's id.
 
2587
 
 
2588
    :param pattern: A regular expression string.
 
2589
    :return: A callable that returns True if the re matches.
 
2590
    """
 
2591
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2592
        'test filter')
 
2593
    def condition(test):
 
2594
        test_id = test.id()
 
2595
        return filter_re.search(test_id)
 
2596
    return condition
 
2597
 
 
2598
 
 
2599
def condition_isinstance(klass_or_klass_list):
 
2600
    """Create a condition filter which returns isinstance(param, klass).
 
2601
 
 
2602
    :return: A callable which when called with one parameter obj return the
 
2603
        result of isinstance(obj, klass_or_klass_list).
 
2604
    """
 
2605
    def condition(obj):
 
2606
        return isinstance(obj, klass_or_klass_list)
 
2607
    return condition
 
2608
 
 
2609
 
 
2610
def condition_id_in_list(id_list):
 
2611
    """Create a condition filter which verify that test's id in a list.
 
2612
 
 
2613
    :param id_list: A TestIdList object.
 
2614
    :return: A callable that returns True if the test's id appears in the list.
 
2615
    """
 
2616
    def condition(test):
 
2617
        return id_list.includes(test.id())
 
2618
    return condition
 
2619
 
 
2620
 
 
2621
def condition_id_startswith(starts):
 
2622
    """Create a condition filter verifying that test's id starts with a string.
 
2623
 
 
2624
    :param starts: A list of string.
 
2625
    :return: A callable that returns True if the test's id starts with one of
 
2626
        the given strings.
 
2627
    """
 
2628
    def condition(test):
 
2629
        for start in starts:
 
2630
            if test.id().startswith(start):
 
2631
                return True
 
2632
        return False
 
2633
    return condition
 
2634
 
 
2635
 
 
2636
def exclude_tests_by_condition(suite, condition):
 
2637
    """Create a test suite which excludes some tests from suite.
 
2638
 
 
2639
    :param suite: The suite to get tests from.
 
2640
    :param condition: A callable whose result evaluates True when called with a
 
2641
        test case which should be excluded from the result.
 
2642
    :return: A suite which contains the tests found in suite that fail
 
2643
        condition.
 
2644
    """
 
2645
    result = []
 
2646
    for test in iter_suite_tests(suite):
 
2647
        if not condition(test):
 
2648
            result.append(test)
 
2649
    return TestUtil.TestSuite(result)
 
2650
 
 
2651
 
 
2652
def filter_suite_by_condition(suite, condition):
 
2653
    """Create a test suite by filtering another one.
 
2654
 
 
2655
    :param suite: The source suite.
 
2656
    :param condition: A callable whose result evaluates True when called with a
 
2657
        test case which should be included in the result.
 
2658
    :return: A suite which contains the tests found in suite that pass
 
2659
        condition.
 
2660
    """
 
2661
    result = []
 
2662
    for test in iter_suite_tests(suite):
 
2663
        if condition(test):
 
2664
            result.append(test)
 
2665
    return TestUtil.TestSuite(result)
 
2666
 
 
2667
 
 
2668
def filter_suite_by_re(suite, pattern):
 
2669
    """Create a test suite by filtering another one.
 
2670
 
 
2671
    :param suite:           the source suite
 
2672
    :param pattern:         pattern that names must match
 
2673
    :returns: the newly created suite
 
2674
    """
 
2675
    condition = condition_id_re(pattern)
 
2676
    result_suite = filter_suite_by_condition(suite, condition)
 
2677
    return result_suite
 
2678
 
 
2679
 
 
2680
def filter_suite_by_id_list(suite, test_id_list):
 
2681
    """Create a test suite by filtering another one.
 
2682
 
 
2683
    :param suite: The source suite.
 
2684
    :param test_id_list: A list of the test ids to keep as strings.
 
2685
    :returns: the newly created suite
 
2686
    """
 
2687
    condition = condition_id_in_list(test_id_list)
 
2688
    result_suite = filter_suite_by_condition(suite, condition)
 
2689
    return result_suite
 
2690
 
 
2691
 
 
2692
def filter_suite_by_id_startswith(suite, start):
 
2693
    """Create a test suite by filtering another one.
 
2694
 
 
2695
    :param suite: The source suite.
 
2696
    :param start: A list of string the test id must start with one of.
 
2697
    :returns: the newly created suite
 
2698
    """
 
2699
    condition = condition_id_startswith(start)
 
2700
    result_suite = filter_suite_by_condition(suite, condition)
 
2701
    return result_suite
 
2702
 
 
2703
 
 
2704
def exclude_tests_by_re(suite, pattern):
 
2705
    """Create a test suite which excludes some tests from suite.
 
2706
 
 
2707
    :param suite: The suite to get tests from.
 
2708
    :param pattern: A regular expression string. Test ids that match this
 
2709
        pattern will be excluded from the result.
 
2710
    :return: A TestSuite that contains all the tests from suite without the
 
2711
        tests that matched pattern. The order of tests is the same as it was in
 
2712
        suite.
 
2713
    """
 
2714
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2715
 
 
2716
 
 
2717
def preserve_input(something):
 
2718
    """A helper for performing test suite transformation chains.
 
2719
 
 
2720
    :param something: Anything you want to preserve.
 
2721
    :return: Something.
 
2722
    """
 
2723
    return something
 
2724
 
 
2725
 
 
2726
def randomize_suite(suite):
 
2727
    """Return a new TestSuite with suite's tests in random order.
 
2728
 
 
2729
    The tests in the input suite are flattened into a single suite in order to
 
2730
    accomplish this. Any nested TestSuites are removed to provide global
 
2731
    randomness.
 
2732
    """
 
2733
    tests = list(iter_suite_tests(suite))
 
2734
    random.shuffle(tests)
 
2735
    return TestUtil.TestSuite(tests)
 
2736
 
 
2737
 
 
2738
def split_suite_by_condition(suite, condition):
 
2739
    """Split a test suite into two by a condition.
 
2740
 
 
2741
    :param suite: The suite to split.
 
2742
    :param condition: The condition to match on. Tests that match this
 
2743
        condition are returned in the first test suite, ones that do not match
 
2744
        are in the second suite.
 
2745
    :return: A tuple of two test suites, where the first contains tests from
 
2746
        suite matching the condition, and the second contains the remainder
 
2747
        from suite. The order within each output suite is the same as it was in
 
2748
        suite.
 
2749
    """
 
2750
    matched = []
 
2751
    did_not_match = []
 
2752
    for test in iter_suite_tests(suite):
 
2753
        if condition(test):
 
2754
            matched.append(test)
 
2755
        else:
 
2756
            did_not_match.append(test)
 
2757
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2758
 
 
2759
 
 
2760
def split_suite_by_re(suite, pattern):
 
2761
    """Split a test suite into two by a regular expression.
 
2762
 
 
2763
    :param suite: The suite to split.
 
2764
    :param pattern: A regular expression string. Test ids that match this
 
2765
        pattern will be in the first test suite returned, and the others in the
 
2766
        second test suite returned.
 
2767
    :return: A tuple of two test suites, where the first contains tests from
 
2768
        suite matching pattern, and the second contains the remainder from
 
2769
        suite. The order within each output suite is the same as it was in
 
2770
        suite.
 
2771
    """
 
2772
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2773
 
 
2774
 
 
2775
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2776
              stop_on_failure=False,
 
2777
              transport=None, lsprof_timed=None, bench_history=None,
 
2778
              matching_tests_first=None,
 
2779
              list_only=False,
 
2780
              random_seed=None,
 
2781
              exclude_pattern=None,
 
2782
              strict=False,
 
2783
              runner_class=None,
 
2784
              suite_decorators=None,
 
2785
              stream=None,
 
2786
              result_decorators=None,
 
2787
              ):
 
2788
    """Run a test suite for bzr selftest.
 
2789
 
 
2790
    :param runner_class: The class of runner to use. Must support the
 
2791
        constructor arguments passed by run_suite which are more than standard
 
2792
        python uses.
 
2793
    :return: A boolean indicating success.
 
2794
    """
 
2795
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2796
    if verbose:
 
2797
        verbosity = 2
 
2798
    else:
 
2799
        verbosity = 1
 
2800
    if runner_class is None:
 
2801
        runner_class = TextTestRunner
 
2802
    if stream is None:
 
2803
        stream = sys.stdout
 
2804
    runner = runner_class(stream=stream,
 
2805
                            descriptions=0,
 
2806
                            verbosity=verbosity,
 
2807
                            bench_history=bench_history,
 
2808
                            list_only=list_only,
 
2809
                            strict=strict,
 
2810
                            result_decorators=result_decorators,
 
2811
                            )
 
2812
    runner.stop_on_failure=stop_on_failure
 
2813
    # built in decorator factories:
 
2814
    decorators = [
 
2815
        random_order(random_seed, runner),
 
2816
        exclude_tests(exclude_pattern),
 
2817
        ]
 
2818
    if matching_tests_first:
 
2819
        decorators.append(tests_first(pattern))
 
2820
    else:
 
2821
        decorators.append(filter_tests(pattern))
 
2822
    if suite_decorators:
 
2823
        decorators.extend(suite_decorators)
 
2824
    # tell the result object how many tests will be running: (except if
 
2825
    # --parallel=fork is being used. Robert said he will provide a better
 
2826
    # progress design later -- vila 20090817)
 
2827
    if fork_decorator not in decorators:
 
2828
        decorators.append(CountingDecorator)
 
2829
    for decorator in decorators:
 
2830
        suite = decorator(suite)
 
2831
    result = runner.run(suite)
 
2832
    if list_only:
 
2833
        return True
 
2834
    result.stopTestRun()
 
2835
    if strict:
 
2836
        return result.wasStrictlySuccessful()
 
2837
    else:
 
2838
        return result.wasSuccessful()
 
2839
 
 
2840
 
 
2841
# A registry where get() returns a suite decorator.
 
2842
parallel_registry = registry.Registry()
 
2843
 
 
2844
 
 
2845
def fork_decorator(suite):
 
2846
    concurrency = osutils.local_concurrency()
 
2847
    if concurrency == 1:
 
2848
        return suite
 
2849
    from testtools import ConcurrentTestSuite
 
2850
    return ConcurrentTestSuite(suite, fork_for_tests)
 
2851
parallel_registry.register('fork', fork_decorator)
 
2852
 
 
2853
 
 
2854
def subprocess_decorator(suite):
 
2855
    concurrency = osutils.local_concurrency()
 
2856
    if concurrency == 1:
 
2857
        return suite
 
2858
    from testtools import ConcurrentTestSuite
 
2859
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
2860
parallel_registry.register('subprocess', subprocess_decorator)
 
2861
 
 
2862
 
 
2863
def exclude_tests(exclude_pattern):
 
2864
    """Return a test suite decorator that excludes tests."""
 
2865
    if exclude_pattern is None:
 
2866
        return identity_decorator
 
2867
    def decorator(suite):
 
2868
        return ExcludeDecorator(suite, exclude_pattern)
 
2869
    return decorator
 
2870
 
 
2871
 
 
2872
def filter_tests(pattern):
 
2873
    if pattern == '.*':
 
2874
        return identity_decorator
 
2875
    def decorator(suite):
 
2876
        return FilterTestsDecorator(suite, pattern)
 
2877
    return decorator
 
2878
 
 
2879
 
 
2880
def random_order(random_seed, runner):
 
2881
    """Return a test suite decorator factory for randomising tests order.
 
2882
    
 
2883
    :param random_seed: now, a string which casts to a long, or a long.
 
2884
    :param runner: A test runner with a stream attribute to report on.
 
2885
    """
 
2886
    if random_seed is None:
 
2887
        return identity_decorator
 
2888
    def decorator(suite):
 
2889
        return RandomDecorator(suite, random_seed, runner.stream)
 
2890
    return decorator
 
2891
 
 
2892
 
 
2893
def tests_first(pattern):
 
2894
    if pattern == '.*':
 
2895
        return identity_decorator
 
2896
    def decorator(suite):
 
2897
        return TestFirstDecorator(suite, pattern)
 
2898
    return decorator
 
2899
 
 
2900
 
 
2901
def identity_decorator(suite):
 
2902
    """Return suite."""
 
2903
    return suite
 
2904
 
 
2905
 
 
2906
class TestDecorator(TestSuite):
 
2907
    """A decorator for TestCase/TestSuite objects.
 
2908
    
 
2909
    Usually, subclasses should override __iter__(used when flattening test
 
2910
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
2911
    debug().
 
2912
    """
 
2913
 
 
2914
    def __init__(self, suite):
 
2915
        TestSuite.__init__(self)
 
2916
        self.addTest(suite)
 
2917
 
 
2918
    def countTestCases(self):
 
2919
        cases = 0
 
2920
        for test in self:
 
2921
            cases += test.countTestCases()
 
2922
        return cases
 
2923
 
 
2924
    def debug(self):
 
2925
        for test in self:
 
2926
            test.debug()
 
2927
 
 
2928
    def run(self, result):
 
2929
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
2930
        # into __iter__.
 
2931
        for test in self:
 
2932
            if result.shouldStop:
 
2933
                break
 
2934
            test.run(result)
 
2935
        return result
 
2936
 
 
2937
 
 
2938
class CountingDecorator(TestDecorator):
 
2939
    """A decorator which calls result.progress(self.countTestCases)."""
 
2940
 
 
2941
    def run(self, result):
 
2942
        progress_method = getattr(result, 'progress', None)
 
2943
        if callable(progress_method):
 
2944
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
2945
        return super(CountingDecorator, self).run(result)
 
2946
 
 
2947
 
 
2948
class ExcludeDecorator(TestDecorator):
 
2949
    """A decorator which excludes test matching an exclude pattern."""
 
2950
 
 
2951
    def __init__(self, suite, exclude_pattern):
 
2952
        TestDecorator.__init__(self, suite)
 
2953
        self.exclude_pattern = exclude_pattern
 
2954
        self.excluded = False
 
2955
 
 
2956
    def __iter__(self):
 
2957
        if self.excluded:
 
2958
            return iter(self._tests)
 
2959
        self.excluded = True
 
2960
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
2961
        del self._tests[:]
 
2962
        self.addTests(suite)
 
2963
        return iter(self._tests)
 
2964
 
 
2965
 
 
2966
class FilterTestsDecorator(TestDecorator):
 
2967
    """A decorator which filters tests to those matching a pattern."""
 
2968
 
 
2969
    def __init__(self, suite, pattern):
 
2970
        TestDecorator.__init__(self, suite)
 
2971
        self.pattern = pattern
 
2972
        self.filtered = False
 
2973
 
 
2974
    def __iter__(self):
 
2975
        if self.filtered:
 
2976
            return iter(self._tests)
 
2977
        self.filtered = True
 
2978
        suite = filter_suite_by_re(self, self.pattern)
 
2979
        del self._tests[:]
 
2980
        self.addTests(suite)
 
2981
        return iter(self._tests)
 
2982
 
 
2983
 
 
2984
class RandomDecorator(TestDecorator):
 
2985
    """A decorator which randomises the order of its tests."""
 
2986
 
 
2987
    def __init__(self, suite, random_seed, stream):
 
2988
        TestDecorator.__init__(self, suite)
 
2989
        self.random_seed = random_seed
 
2990
        self.randomised = False
 
2991
        self.stream = stream
 
2992
 
 
2993
    def __iter__(self):
 
2994
        if self.randomised:
 
2995
            return iter(self._tests)
 
2996
        self.randomised = True
 
2997
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
2998
            (self.actual_seed()))
 
2999
        # Initialise the random number generator.
 
3000
        random.seed(self.actual_seed())
 
3001
        suite = randomize_suite(self)
 
3002
        del self._tests[:]
 
3003
        self.addTests(suite)
 
3004
        return iter(self._tests)
 
3005
 
 
3006
    def actual_seed(self):
 
3007
        if self.random_seed == "now":
 
3008
            # We convert the seed to a long to make it reuseable across
 
3009
            # invocations (because the user can reenter it).
 
3010
            self.random_seed = long(time.time())
 
3011
        else:
 
3012
            # Convert the seed to a long if we can
 
3013
            try:
 
3014
                self.random_seed = long(self.random_seed)
 
3015
            except:
 
3016
                pass
 
3017
        return self.random_seed
 
3018
 
 
3019
 
 
3020
class TestFirstDecorator(TestDecorator):
 
3021
    """A decorator which moves named tests to the front."""
 
3022
 
 
3023
    def __init__(self, suite, pattern):
 
3024
        TestDecorator.__init__(self, suite)
 
3025
        self.pattern = pattern
 
3026
        self.filtered = False
 
3027
 
 
3028
    def __iter__(self):
 
3029
        if self.filtered:
 
3030
            return iter(self._tests)
 
3031
        self.filtered = True
 
3032
        suites = split_suite_by_re(self, self.pattern)
 
3033
        del self._tests[:]
 
3034
        self.addTests(suites)
 
3035
        return iter(self._tests)
 
3036
 
 
3037
 
 
3038
def partition_tests(suite, count):
 
3039
    """Partition suite into count lists of tests."""
 
3040
    result = []
 
3041
    tests = list(iter_suite_tests(suite))
 
3042
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3043
    for block in range(count):
 
3044
        low_test = block * tests_per_process
 
3045
        high_test = low_test + tests_per_process
 
3046
        process_tests = tests[low_test:high_test]
 
3047
        result.append(process_tests)
 
3048
    return result
 
3049
 
 
3050
 
 
3051
def fork_for_tests(suite):
 
3052
    """Take suite and start up one runner per CPU by forking()
 
3053
 
 
3054
    :return: An iterable of TestCase-like objects which can each have
 
3055
        run(result) called on them to feed tests to result.
 
3056
    """
 
3057
    concurrency = osutils.local_concurrency()
 
3058
    result = []
 
3059
    from subunit import TestProtocolClient, ProtocolTestCase
 
3060
    try:
 
3061
        from subunit.test_results import AutoTimingTestResultDecorator
 
3062
    except ImportError:
 
3063
        AutoTimingTestResultDecorator = lambda x:x
 
3064
    class TestInOtherProcess(ProtocolTestCase):
 
3065
        # Should be in subunit, I think. RBC.
 
3066
        def __init__(self, stream, pid):
 
3067
            ProtocolTestCase.__init__(self, stream)
 
3068
            self.pid = pid
 
3069
 
 
3070
        def run(self, result):
 
3071
            try:
 
3072
                ProtocolTestCase.run(self, result)
 
3073
            finally:
 
3074
                os.waitpid(self.pid, os.WNOHANG)
 
3075
 
 
3076
    test_blocks = partition_tests(suite, concurrency)
 
3077
    for process_tests in test_blocks:
 
3078
        process_suite = TestSuite()
 
3079
        process_suite.addTests(process_tests)
 
3080
        c2pread, c2pwrite = os.pipe()
 
3081
        pid = os.fork()
 
3082
        if pid == 0:
 
3083
            try:
 
3084
                os.close(c2pread)
 
3085
                # Leave stderr and stdout open so we can see test noise
 
3086
                # Close stdin so that the child goes away if it decides to
 
3087
                # read from stdin (otherwise its a roulette to see what
 
3088
                # child actually gets keystrokes for pdb etc).
 
3089
                sys.stdin.close()
 
3090
                sys.stdin = None
 
3091
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3092
                subunit_result = AutoTimingTestResultDecorator(
 
3093
                    TestProtocolClient(stream))
 
3094
                process_suite.run(subunit_result)
 
3095
            finally:
 
3096
                os._exit(0)
 
3097
        else:
 
3098
            os.close(c2pwrite)
 
3099
            stream = os.fdopen(c2pread, 'rb', 1)
 
3100
            test = TestInOtherProcess(stream, pid)
 
3101
            result.append(test)
 
3102
    return result
 
3103
 
 
3104
 
 
3105
def reinvoke_for_tests(suite):
 
3106
    """Take suite and start up one runner per CPU using subprocess().
 
3107
 
 
3108
    :return: An iterable of TestCase-like objects which can each have
 
3109
        run(result) called on them to feed tests to result.
 
3110
    """
 
3111
    concurrency = osutils.local_concurrency()
 
3112
    result = []
 
3113
    from subunit import ProtocolTestCase
 
3114
    class TestInSubprocess(ProtocolTestCase):
 
3115
        def __init__(self, process, name):
 
3116
            ProtocolTestCase.__init__(self, process.stdout)
 
3117
            self.process = process
 
3118
            self.process.stdin.close()
 
3119
            self.name = name
 
3120
 
 
3121
        def run(self, result):
 
3122
            try:
 
3123
                ProtocolTestCase.run(self, result)
 
3124
            finally:
 
3125
                self.process.wait()
 
3126
                os.unlink(self.name)
 
3127
            # print "pid %d finished" % finished_process
 
3128
    test_blocks = partition_tests(suite, concurrency)
 
3129
    for process_tests in test_blocks:
 
3130
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3131
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3132
        if not os.path.isfile(bzr_path):
 
3133
            # We are probably installed. Assume sys.argv is the right file
 
3134
            bzr_path = sys.argv[0]
 
3135
        fd, test_list_file_name = tempfile.mkstemp()
 
3136
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3137
        for test in process_tests:
 
3138
            test_list_file.write(test.id() + '\n')
 
3139
        test_list_file.close()
 
3140
        try:
 
3141
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3142
                '--subunit']
 
3143
            if '--no-plugins' in sys.argv:
 
3144
                argv.append('--no-plugins')
 
3145
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3146
            # stderr it can interrupt the subunit protocol.
 
3147
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3148
                bufsize=1)
 
3149
            test = TestInSubprocess(process, test_list_file_name)
 
3150
            result.append(test)
 
3151
        except:
 
3152
            os.unlink(test_list_file_name)
 
3153
            raise
 
3154
    return result
 
3155
 
 
3156
 
 
3157
class ForwardingResult(unittest.TestResult):
 
3158
 
 
3159
    def __init__(self, target):
 
3160
        unittest.TestResult.__init__(self)
 
3161
        self.result = target
 
3162
 
 
3163
    def startTest(self, test):
 
3164
        self.result.startTest(test)
 
3165
 
 
3166
    def stopTest(self, test):
 
3167
        self.result.stopTest(test)
 
3168
 
 
3169
    def startTestRun(self):
 
3170
        self.result.startTestRun()
 
3171
 
 
3172
    def stopTestRun(self):
 
3173
        self.result.stopTest()
 
3174
 
 
3175
    def addSkip(self, test, reason):
 
3176
        self.result.addSkip(test, reason)
 
3177
 
 
3178
    def addSuccess(self, test):
 
3179
        self.result.addSuccess(test)
 
3180
 
 
3181
    def addError(self, test, err):
 
3182
        self.result.addError(test, err)
 
3183
 
 
3184
    def addFailure(self, test, err):
 
3185
        self.result.addFailure(test, err)
 
3186
 
 
3187
 
 
3188
class BZRTransformingResult(ForwardingResult):
 
3189
 
 
3190
    def addError(self, test, err):
 
3191
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3192
        if feature is not None:
 
3193
            self.result.addNotSupported(test, feature)
 
3194
        else:
 
3195
            self.result.addError(test, err)
 
3196
 
 
3197
    def addFailure(self, test, err):
 
3198
        known = self._error_looks_like('KnownFailure: ', err)
 
3199
        if known is not None:
 
3200
            self.result._addKnownFailure(test, [KnownFailure,
 
3201
                                                KnownFailure(known), None])
 
3202
        else:
 
3203
            self.result.addFailure(test, err)
 
3204
 
 
3205
    def _error_looks_like(self, prefix, err):
 
3206
        """Deserialize exception and returns the stringify value."""
 
3207
        import subunit
 
3208
        value = None
 
3209
        typ, exc, _ = err
 
3210
        if isinstance(exc, subunit.RemoteException):
 
3211
            # stringify the exception gives access to the remote traceback
 
3212
            # We search the last line for 'prefix'
 
3213
            lines = str(exc).split('\n')
 
3214
            while lines and not lines[-1]:
 
3215
                lines.pop(-1)
 
3216
            if lines:
 
3217
                if lines[-1].startswith(prefix):
 
3218
                    value = lines[-1][len(prefix):]
 
3219
        return value
 
3220
 
 
3221
 
 
3222
class ProfileResult(ForwardingResult):
 
3223
    """Generate profiling data for all activity between start and success.
 
3224
    
 
3225
    The profile data is appended to the test's _benchcalls attribute and can
 
3226
    be accessed by the forwarded-to TestResult.
 
3227
 
 
3228
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3229
    where our existing output support for lsprof is, and this class aims to
 
3230
    fit in with that: while it could be moved it's not necessary to accomplish
 
3231
    test profiling, nor would it be dramatically cleaner.
 
3232
    """
 
3233
 
 
3234
    def startTest(self, test):
 
3235
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3236
        self.profiler.start()
 
3237
        ForwardingResult.startTest(self, test)
 
3238
 
 
3239
    def addSuccess(self, test):
 
3240
        stats = self.profiler.stop()
 
3241
        try:
 
3242
            calls = test._benchcalls
 
3243
        except AttributeError:
 
3244
            test._benchcalls = []
 
3245
            calls = test._benchcalls
 
3246
        calls.append(((test.id(), "", ""), stats))
 
3247
        ForwardingResult.addSuccess(self, test)
 
3248
 
 
3249
    def stopTest(self, test):
 
3250
        ForwardingResult.stopTest(self, test)
 
3251
        self.profiler = None
 
3252
 
 
3253
 
 
3254
# Controlled by "bzr selftest -E=..." option
 
3255
# Currently supported:
 
3256
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3257
#                           preserves any flags supplied at the command line.
 
3258
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3259
#                           rather than failing tests. And no longer raise
 
3260
#                           LockContention when fctnl locks are not being used
 
3261
#                           with proper exclusion rules.
 
3262
selftest_debug_flags = set()
 
3263
 
 
3264
 
 
3265
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3266
             transport=None,
 
3267
             test_suite_factory=None,
 
3268
             lsprof_timed=None,
 
3269
             bench_history=None,
 
3270
             matching_tests_first=None,
 
3271
             list_only=False,
 
3272
             random_seed=None,
 
3273
             exclude_pattern=None,
 
3274
             strict=False,
 
3275
             load_list=None,
 
3276
             debug_flags=None,
 
3277
             starting_with=None,
 
3278
             runner_class=None,
 
3279
             suite_decorators=None,
 
3280
             stream=None,
 
3281
             lsprof_tests=False,
 
3282
             ):
 
3283
    """Run the whole test suite under the enhanced runner"""
 
3284
    # XXX: Very ugly way to do this...
 
3285
    # Disable warning about old formats because we don't want it to disturb
 
3286
    # any blackbox tests.
 
3287
    from bzrlib import repository
 
3288
    repository._deprecation_warning_done = True
 
3289
 
 
3290
    global default_transport
 
3291
    if transport is None:
 
3292
        transport = default_transport
 
3293
    old_transport = default_transport
 
3294
    default_transport = transport
 
3295
    global selftest_debug_flags
 
3296
    old_debug_flags = selftest_debug_flags
 
3297
    if debug_flags is not None:
 
3298
        selftest_debug_flags = set(debug_flags)
 
3299
    try:
 
3300
        if load_list is None:
 
3301
            keep_only = None
 
3302
        else:
 
3303
            keep_only = load_test_id_list(load_list)
 
3304
        if starting_with:
 
3305
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3306
                             for start in starting_with]
 
3307
        if test_suite_factory is None:
 
3308
            # Reduce loading time by loading modules based on the starting_with
 
3309
            # patterns.
 
3310
            suite = test_suite(keep_only, starting_with)
 
3311
        else:
 
3312
            suite = test_suite_factory()
 
3313
        if starting_with:
 
3314
            # But always filter as requested.
 
3315
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3316
        result_decorators = []
 
3317
        if lsprof_tests:
 
3318
            result_decorators.append(ProfileResult)
 
3319
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3320
                     stop_on_failure=stop_on_failure,
 
3321
                     transport=transport,
 
3322
                     lsprof_timed=lsprof_timed,
 
3323
                     bench_history=bench_history,
 
3324
                     matching_tests_first=matching_tests_first,
 
3325
                     list_only=list_only,
 
3326
                     random_seed=random_seed,
 
3327
                     exclude_pattern=exclude_pattern,
 
3328
                     strict=strict,
 
3329
                     runner_class=runner_class,
 
3330
                     suite_decorators=suite_decorators,
 
3331
                     stream=stream,
 
3332
                     result_decorators=result_decorators,
 
3333
                     )
 
3334
    finally:
 
3335
        default_transport = old_transport
 
3336
        selftest_debug_flags = old_debug_flags
 
3337
 
 
3338
 
 
3339
def load_test_id_list(file_name):
 
3340
    """Load a test id list from a text file.
 
3341
 
 
3342
    The format is one test id by line.  No special care is taken to impose
 
3343
    strict rules, these test ids are used to filter the test suite so a test id
 
3344
    that do not match an existing test will do no harm. This allows user to add
 
3345
    comments, leave blank lines, etc.
 
3346
    """
 
3347
    test_list = []
 
3348
    try:
 
3349
        ftest = open(file_name, 'rt')
 
3350
    except IOError, e:
 
3351
        if e.errno != errno.ENOENT:
 
3352
            raise
 
3353
        else:
 
3354
            raise errors.NoSuchFile(file_name)
 
3355
 
 
3356
    for test_name in ftest.readlines():
 
3357
        test_list.append(test_name.strip())
 
3358
    ftest.close()
 
3359
    return test_list
 
3360
 
 
3361
 
 
3362
def suite_matches_id_list(test_suite, id_list):
 
3363
    """Warns about tests not appearing or appearing more than once.
 
3364
 
 
3365
    :param test_suite: A TestSuite object.
 
3366
    :param test_id_list: The list of test ids that should be found in
 
3367
         test_suite.
 
3368
 
 
3369
    :return: (absents, duplicates) absents is a list containing the test found
 
3370
        in id_list but not in test_suite, duplicates is a list containing the
 
3371
        test found multiple times in test_suite.
 
3372
 
 
3373
    When using a prefined test id list, it may occurs that some tests do not
 
3374
    exist anymore or that some tests use the same id. This function warns the
 
3375
    tester about potential problems in his workflow (test lists are volatile)
 
3376
    or in the test suite itself (using the same id for several tests does not
 
3377
    help to localize defects).
 
3378
    """
 
3379
    # Build a dict counting id occurrences
 
3380
    tests = dict()
 
3381
    for test in iter_suite_tests(test_suite):
 
3382
        id = test.id()
 
3383
        tests[id] = tests.get(id, 0) + 1
 
3384
 
 
3385
    not_found = []
 
3386
    duplicates = []
 
3387
    for id in id_list:
 
3388
        occurs = tests.get(id, 0)
 
3389
        if not occurs:
 
3390
            not_found.append(id)
 
3391
        elif occurs > 1:
 
3392
            duplicates.append(id)
 
3393
 
 
3394
    return not_found, duplicates
 
3395
 
 
3396
 
 
3397
class TestIdList(object):
 
3398
    """Test id list to filter a test suite.
 
3399
 
 
3400
    Relying on the assumption that test ids are built as:
 
3401
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3402
    notation, this class offers methods to :
 
3403
    - avoid building a test suite for modules not refered to in the test list,
 
3404
    - keep only the tests listed from the module test suite.
 
3405
    """
 
3406
 
 
3407
    def __init__(self, test_id_list):
 
3408
        # When a test suite needs to be filtered against us we compare test ids
 
3409
        # for equality, so a simple dict offers a quick and simple solution.
 
3410
        self.tests = dict().fromkeys(test_id_list, True)
 
3411
 
 
3412
        # While unittest.TestCase have ids like:
 
3413
        # <module>.<class>.<method>[(<param+)],
 
3414
        # doctest.DocTestCase can have ids like:
 
3415
        # <module>
 
3416
        # <module>.<class>
 
3417
        # <module>.<function>
 
3418
        # <module>.<class>.<method>
 
3419
 
 
3420
        # Since we can't predict a test class from its name only, we settle on
 
3421
        # a simple constraint: a test id always begins with its module name.
 
3422
 
 
3423
        modules = {}
 
3424
        for test_id in test_id_list:
 
3425
            parts = test_id.split('.')
 
3426
            mod_name = parts.pop(0)
 
3427
            modules[mod_name] = True
 
3428
            for part in parts:
 
3429
                mod_name += '.' + part
 
3430
                modules[mod_name] = True
 
3431
        self.modules = modules
 
3432
 
 
3433
    def refers_to(self, module_name):
 
3434
        """Is there tests for the module or one of its sub modules."""
 
3435
        return self.modules.has_key(module_name)
 
3436
 
 
3437
    def includes(self, test_id):
 
3438
        return self.tests.has_key(test_id)
 
3439
 
 
3440
 
 
3441
class TestPrefixAliasRegistry(registry.Registry):
 
3442
    """A registry for test prefix aliases.
 
3443
 
 
3444
    This helps implement shorcuts for the --starting-with selftest
 
3445
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3446
    warning will be emitted).
 
3447
    """
 
3448
 
 
3449
    def register(self, key, obj, help=None, info=None,
 
3450
                 override_existing=False):
 
3451
        """See Registry.register.
 
3452
 
 
3453
        Trying to override an existing alias causes a warning to be emitted,
 
3454
        not a fatal execption.
 
3455
        """
 
3456
        try:
 
3457
            super(TestPrefixAliasRegistry, self).register(
 
3458
                key, obj, help=help, info=info, override_existing=False)
 
3459
        except KeyError:
 
3460
            actual = self.get(key)
 
3461
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3462
                 % (key, actual, obj))
 
3463
 
 
3464
    def resolve_alias(self, id_start):
 
3465
        """Replace the alias by the prefix in the given string.
 
3466
 
 
3467
        Using an unknown prefix is an error to help catching typos.
 
3468
        """
 
3469
        parts = id_start.split('.')
 
3470
        try:
 
3471
            parts[0] = self.get(parts[0])
 
3472
        except KeyError:
 
3473
            raise errors.BzrCommandError(
 
3474
                '%s is not a known test prefix alias' % parts[0])
 
3475
        return '.'.join(parts)
 
3476
 
 
3477
 
 
3478
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3479
"""Registry of test prefix aliases."""
 
3480
 
 
3481
 
 
3482
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3483
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3484
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3485
 
 
3486
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3487
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3488
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3489
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3490
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3491
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3492
 
 
3493
 
 
3494
def _test_suite_testmod_names():
 
3495
    """Return the standard list of test module names to test."""
 
3496
    return [
 
3497
        'bzrlib.doc',
 
3498
        'bzrlib.tests.blackbox',
 
3499
        'bzrlib.tests.commands',
 
3500
        'bzrlib.tests.per_branch',
 
3501
        'bzrlib.tests.per_bzrdir',
 
3502
        'bzrlib.tests.per_interrepository',
 
3503
        'bzrlib.tests.per_intertree',
 
3504
        'bzrlib.tests.per_inventory',
 
3505
        'bzrlib.tests.per_interbranch',
 
3506
        'bzrlib.tests.per_lock',
 
3507
        'bzrlib.tests.per_transport',
 
3508
        'bzrlib.tests.per_tree',
 
3509
        'bzrlib.tests.per_pack_repository',
 
3510
        'bzrlib.tests.per_repository',
 
3511
        'bzrlib.tests.per_repository_chk',
 
3512
        'bzrlib.tests.per_repository_reference',
 
3513
        'bzrlib.tests.per_versionedfile',
 
3514
        'bzrlib.tests.per_workingtree',
 
3515
        'bzrlib.tests.test__annotator',
 
3516
        'bzrlib.tests.test__chk_map',
 
3517
        'bzrlib.tests.test__dirstate_helpers',
 
3518
        'bzrlib.tests.test__groupcompress',
 
3519
        'bzrlib.tests.test__known_graph',
 
3520
        'bzrlib.tests.test__rio',
 
3521
        'bzrlib.tests.test__walkdirs_win32',
 
3522
        'bzrlib.tests.test_ancestry',
 
3523
        'bzrlib.tests.test_annotate',
 
3524
        'bzrlib.tests.test_api',
 
3525
        'bzrlib.tests.test_atomicfile',
 
3526
        'bzrlib.tests.test_bad_files',
 
3527
        'bzrlib.tests.test_bencode',
 
3528
        'bzrlib.tests.test_bisect_multi',
 
3529
        'bzrlib.tests.test_branch',
 
3530
        'bzrlib.tests.test_branchbuilder',
 
3531
        'bzrlib.tests.test_btree_index',
 
3532
        'bzrlib.tests.test_bugtracker',
 
3533
        'bzrlib.tests.test_bundle',
 
3534
        'bzrlib.tests.test_bzrdir',
 
3535
        'bzrlib.tests.test__chunks_to_lines',
 
3536
        'bzrlib.tests.test_cache_utf8',
 
3537
        'bzrlib.tests.test_chk_map',
 
3538
        'bzrlib.tests.test_chk_serializer',
 
3539
        'bzrlib.tests.test_chunk_writer',
 
3540
        'bzrlib.tests.test_clean_tree',
 
3541
        'bzrlib.tests.test_commands',
 
3542
        'bzrlib.tests.test_commit',
 
3543
        'bzrlib.tests.test_commit_merge',
 
3544
        'bzrlib.tests.test_config',
 
3545
        'bzrlib.tests.test_conflicts',
 
3546
        'bzrlib.tests.test_counted_lock',
 
3547
        'bzrlib.tests.test_crash',
 
3548
        'bzrlib.tests.test_decorators',
 
3549
        'bzrlib.tests.test_delta',
 
3550
        'bzrlib.tests.test_debug',
 
3551
        'bzrlib.tests.test_deprecated_graph',
 
3552
        'bzrlib.tests.test_diff',
 
3553
        'bzrlib.tests.test_directory_service',
 
3554
        'bzrlib.tests.test_dirstate',
 
3555
        'bzrlib.tests.test_email_message',
 
3556
        'bzrlib.tests.test_eol_filters',
 
3557
        'bzrlib.tests.test_errors',
 
3558
        'bzrlib.tests.test_export',
 
3559
        'bzrlib.tests.test_extract',
 
3560
        'bzrlib.tests.test_fetch',
 
3561
        'bzrlib.tests.test_fifo_cache',
 
3562
        'bzrlib.tests.test_filters',
 
3563
        'bzrlib.tests.test_ftp_transport',
 
3564
        'bzrlib.tests.test_foreign',
 
3565
        'bzrlib.tests.test_generate_docs',
 
3566
        'bzrlib.tests.test_generate_ids',
 
3567
        'bzrlib.tests.test_globbing',
 
3568
        'bzrlib.tests.test_gpg',
 
3569
        'bzrlib.tests.test_graph',
 
3570
        'bzrlib.tests.test_groupcompress',
 
3571
        'bzrlib.tests.test_hashcache',
 
3572
        'bzrlib.tests.test_help',
 
3573
        'bzrlib.tests.test_hooks',
 
3574
        'bzrlib.tests.test_http',
 
3575
        'bzrlib.tests.test_http_response',
 
3576
        'bzrlib.tests.test_https_ca_bundle',
 
3577
        'bzrlib.tests.test_identitymap',
 
3578
        'bzrlib.tests.test_ignores',
 
3579
        'bzrlib.tests.test_index',
 
3580
        'bzrlib.tests.test_info',
 
3581
        'bzrlib.tests.test_inv',
 
3582
        'bzrlib.tests.test_inventory_delta',
 
3583
        'bzrlib.tests.test_knit',
 
3584
        'bzrlib.tests.test_lazy_import',
 
3585
        'bzrlib.tests.test_lazy_regex',
 
3586
        'bzrlib.tests.test_lock',
 
3587
        'bzrlib.tests.test_lockable_files',
 
3588
        'bzrlib.tests.test_lockdir',
 
3589
        'bzrlib.tests.test_log',
 
3590
        'bzrlib.tests.test_lru_cache',
 
3591
        'bzrlib.tests.test_lsprof',
 
3592
        'bzrlib.tests.test_mail_client',
 
3593
        'bzrlib.tests.test_memorytree',
 
3594
        'bzrlib.tests.test_merge',
 
3595
        'bzrlib.tests.test_merge3',
 
3596
        'bzrlib.tests.test_merge_core',
 
3597
        'bzrlib.tests.test_merge_directive',
 
3598
        'bzrlib.tests.test_missing',
 
3599
        'bzrlib.tests.test_msgeditor',
 
3600
        'bzrlib.tests.test_multiparent',
 
3601
        'bzrlib.tests.test_mutabletree',
 
3602
        'bzrlib.tests.test_nonascii',
 
3603
        'bzrlib.tests.test_options',
 
3604
        'bzrlib.tests.test_osutils',
 
3605
        'bzrlib.tests.test_osutils_encodings',
 
3606
        'bzrlib.tests.test_pack',
 
3607
        'bzrlib.tests.test_patch',
 
3608
        'bzrlib.tests.test_patches',
 
3609
        'bzrlib.tests.test_permissions',
 
3610
        'bzrlib.tests.test_plugins',
 
3611
        'bzrlib.tests.test_progress',
 
3612
        'bzrlib.tests.test_read_bundle',
 
3613
        'bzrlib.tests.test_reconcile',
 
3614
        'bzrlib.tests.test_reconfigure',
 
3615
        'bzrlib.tests.test_registry',
 
3616
        'bzrlib.tests.test_remote',
 
3617
        'bzrlib.tests.test_rename_map',
 
3618
        'bzrlib.tests.test_repository',
 
3619
        'bzrlib.tests.test_revert',
 
3620
        'bzrlib.tests.test_revision',
 
3621
        'bzrlib.tests.test_revisionspec',
 
3622
        'bzrlib.tests.test_revisiontree',
 
3623
        'bzrlib.tests.test_rio',
 
3624
        'bzrlib.tests.test_rules',
 
3625
        'bzrlib.tests.test_sampler',
 
3626
        'bzrlib.tests.test_selftest',
 
3627
        'bzrlib.tests.test_serializer',
 
3628
        'bzrlib.tests.test_setup',
 
3629
        'bzrlib.tests.test_sftp_transport',
 
3630
        'bzrlib.tests.test_shelf',
 
3631
        'bzrlib.tests.test_shelf_ui',
 
3632
        'bzrlib.tests.test_smart',
 
3633
        'bzrlib.tests.test_smart_add',
 
3634
        'bzrlib.tests.test_smart_request',
 
3635
        'bzrlib.tests.test_smart_transport',
 
3636
        'bzrlib.tests.test_smtp_connection',
 
3637
        'bzrlib.tests.test_source',
 
3638
        'bzrlib.tests.test_ssh_transport',
 
3639
        'bzrlib.tests.test_status',
 
3640
        'bzrlib.tests.test_store',
 
3641
        'bzrlib.tests.test_strace',
 
3642
        'bzrlib.tests.test_subsume',
 
3643
        'bzrlib.tests.test_switch',
 
3644
        'bzrlib.tests.test_symbol_versioning',
 
3645
        'bzrlib.tests.test_tag',
 
3646
        'bzrlib.tests.test_testament',
 
3647
        'bzrlib.tests.test_textfile',
 
3648
        'bzrlib.tests.test_textmerge',
 
3649
        'bzrlib.tests.test_timestamp',
 
3650
        'bzrlib.tests.test_trace',
 
3651
        'bzrlib.tests.test_transactions',
 
3652
        'bzrlib.tests.test_transform',
 
3653
        'bzrlib.tests.test_transport',
 
3654
        'bzrlib.tests.test_transport_log',
 
3655
        'bzrlib.tests.test_tree',
 
3656
        'bzrlib.tests.test_treebuilder',
 
3657
        'bzrlib.tests.test_tsort',
 
3658
        'bzrlib.tests.test_tuned_gzip',
 
3659
        'bzrlib.tests.test_ui',
 
3660
        'bzrlib.tests.test_uncommit',
 
3661
        'bzrlib.tests.test_upgrade',
 
3662
        'bzrlib.tests.test_upgrade_stacked',
 
3663
        'bzrlib.tests.test_urlutils',
 
3664
        'bzrlib.tests.test_version',
 
3665
        'bzrlib.tests.test_version_info',
 
3666
        'bzrlib.tests.test_weave',
 
3667
        'bzrlib.tests.test_whitebox',
 
3668
        'bzrlib.tests.test_win32utils',
 
3669
        'bzrlib.tests.test_workingtree',
 
3670
        'bzrlib.tests.test_workingtree_4',
 
3671
        'bzrlib.tests.test_wsgi',
 
3672
        'bzrlib.tests.test_xml',
 
3673
        ]
 
3674
 
 
3675
 
 
3676
def _test_suite_modules_to_doctest():
 
3677
    """Return the list of modules to doctest."""   
 
3678
    return [
 
3679
        'bzrlib',
 
3680
        'bzrlib.branchbuilder',
 
3681
        'bzrlib.export',
 
3682
        'bzrlib.inventory',
 
3683
        'bzrlib.iterablefile',
 
3684
        'bzrlib.lockdir',
 
3685
        'bzrlib.merge3',
 
3686
        'bzrlib.option',
 
3687
        'bzrlib.symbol_versioning',
 
3688
        'bzrlib.tests',
 
3689
        'bzrlib.timestamp',
 
3690
        'bzrlib.version_info_formats.format_custom',
 
3691
        ]
 
3692
 
 
3693
 
 
3694
def test_suite(keep_only=None, starting_with=None):
 
3695
    """Build and return TestSuite for the whole of bzrlib.
 
3696
 
 
3697
    :param keep_only: A list of test ids limiting the suite returned.
 
3698
 
 
3699
    :param starting_with: An id limiting the suite returned to the tests
 
3700
         starting with it.
 
3701
 
 
3702
    This function can be replaced if you need to change the default test
 
3703
    suite on a global basis, but it is not encouraged.
 
3704
    """
 
3705
 
 
3706
    loader = TestUtil.TestLoader()
 
3707
 
 
3708
    if keep_only is not None:
 
3709
        id_filter = TestIdList(keep_only)
 
3710
    if starting_with:
 
3711
        # We take precedence over keep_only because *at loading time* using
 
3712
        # both options means we will load less tests for the same final result.
 
3713
        def interesting_module(name):
 
3714
            for start in starting_with:
 
3715
                if (
 
3716
                    # Either the module name starts with the specified string
 
3717
                    name.startswith(start)
 
3718
                    # or it may contain tests starting with the specified string
 
3719
                    or start.startswith(name)
 
3720
                    ):
 
3721
                    return True
 
3722
            return False
 
3723
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3724
 
 
3725
    elif keep_only is not None:
 
3726
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3727
        def interesting_module(name):
 
3728
            return id_filter.refers_to(name)
 
3729
 
 
3730
    else:
 
3731
        loader = TestUtil.TestLoader()
 
3732
        def interesting_module(name):
 
3733
            # No filtering, all modules are interesting
 
3734
            return True
 
3735
 
 
3736
    suite = loader.suiteClass()
 
3737
 
 
3738
    # modules building their suite with loadTestsFromModuleNames
 
3739
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
3740
 
 
3741
    for mod in _test_suite_modules_to_doctest():
 
3742
        if not interesting_module(mod):
 
3743
            # No tests to keep here, move along
 
3744
            continue
 
3745
        try:
 
3746
            # note that this really does mean "report only" -- doctest
 
3747
            # still runs the rest of the examples
 
3748
            doc_suite = doctest.DocTestSuite(mod,
 
3749
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3750
        except ValueError, e:
 
3751
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3752
            raise
 
3753
        if len(doc_suite._tests) == 0:
 
3754
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3755
        suite.addTest(doc_suite)
 
3756
 
 
3757
    default_encoding = sys.getdefaultencoding()
 
3758
    for name, plugin in bzrlib.plugin.plugins().items():
 
3759
        if not interesting_module(plugin.module.__name__):
 
3760
            continue
 
3761
        plugin_suite = plugin.test_suite()
 
3762
        # We used to catch ImportError here and turn it into just a warning,
 
3763
        # but really if you don't have --no-plugins this should be a failure.
 
3764
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3765
        if plugin_suite is None:
 
3766
            plugin_suite = plugin.load_plugin_tests(loader)
 
3767
        if plugin_suite is not None:
 
3768
            suite.addTest(plugin_suite)
 
3769
        if default_encoding != sys.getdefaultencoding():
 
3770
            bzrlib.trace.warning(
 
3771
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3772
                sys.getdefaultencoding())
 
3773
            reload(sys)
 
3774
            sys.setdefaultencoding(default_encoding)
 
3775
 
 
3776
    if keep_only is not None:
 
3777
        # Now that the referred modules have loaded their tests, keep only the
 
3778
        # requested ones.
 
3779
        suite = filter_suite_by_id_list(suite, id_filter)
 
3780
        # Do some sanity checks on the id_list filtering
 
3781
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3782
        if starting_with:
 
3783
            # The tester has used both keep_only and starting_with, so he is
 
3784
            # already aware that some tests are excluded from the list, there
 
3785
            # is no need to tell him which.
 
3786
            pass
 
3787
        else:
 
3788
            # Some tests mentioned in the list are not in the test suite. The
 
3789
            # list may be out of date, report to the tester.
 
3790
            for id in not_found:
 
3791
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3792
        for id in duplicates:
 
3793
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3794
 
 
3795
    return suite
 
3796
 
 
3797
 
 
3798
def multiply_scenarios(scenarios_left, scenarios_right):
 
3799
    """Multiply two sets of scenarios.
 
3800
 
 
3801
    :returns: the cartesian product of the two sets of scenarios, that is
 
3802
        a scenario for every possible combination of a left scenario and a
 
3803
        right scenario.
 
3804
    """
 
3805
    return [
 
3806
        ('%s,%s' % (left_name, right_name),
 
3807
         dict(left_dict.items() + right_dict.items()))
 
3808
        for left_name, left_dict in scenarios_left
 
3809
        for right_name, right_dict in scenarios_right]
 
3810
 
 
3811
 
 
3812
def multiply_tests(tests, scenarios, result):
 
3813
    """Multiply tests_list by scenarios into result.
 
3814
 
 
3815
    This is the core workhorse for test parameterisation.
 
3816
 
 
3817
    Typically the load_tests() method for a per-implementation test suite will
 
3818
    call multiply_tests and return the result.
 
3819
 
 
3820
    :param tests: The tests to parameterise.
 
3821
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
3822
        scenario_param_dict).
 
3823
    :param result: A TestSuite to add created tests to.
 
3824
 
 
3825
    This returns the passed in result TestSuite with the cross product of all
 
3826
    the tests repeated once for each scenario.  Each test is adapted by adding
 
3827
    the scenario name at the end of its id(), and updating the test object's
 
3828
    __dict__ with the scenario_param_dict.
 
3829
 
 
3830
    >>> import bzrlib.tests.test_sampler
 
3831
    >>> r = multiply_tests(
 
3832
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
3833
    ...     [('one', dict(param=1)),
 
3834
    ...      ('two', dict(param=2))],
 
3835
    ...     TestSuite())
 
3836
    >>> tests = list(iter_suite_tests(r))
 
3837
    >>> len(tests)
 
3838
    2
 
3839
    >>> tests[0].id()
 
3840
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
3841
    >>> tests[0].param
 
3842
    1
 
3843
    >>> tests[1].param
 
3844
    2
 
3845
    """
 
3846
    for test in iter_suite_tests(tests):
 
3847
        apply_scenarios(test, scenarios, result)
 
3848
    return result
 
3849
 
 
3850
 
 
3851
def apply_scenarios(test, scenarios, result):
 
3852
    """Apply the scenarios in scenarios to test and add to result.
 
3853
 
 
3854
    :param test: The test to apply scenarios to.
 
3855
    :param scenarios: An iterable of scenarios to apply to test.
 
3856
    :return: result
 
3857
    :seealso: apply_scenario
 
3858
    """
 
3859
    for scenario in scenarios:
 
3860
        result.addTest(apply_scenario(test, scenario))
 
3861
    return result
 
3862
 
 
3863
 
 
3864
def apply_scenario(test, scenario):
 
3865
    """Copy test and apply scenario to it.
 
3866
 
 
3867
    :param test: A test to adapt.
 
3868
    :param scenario: A tuple describing the scenarion.
 
3869
        The first element of the tuple is the new test id.
 
3870
        The second element is a dict containing attributes to set on the
 
3871
        test.
 
3872
    :return: The adapted test.
 
3873
    """
 
3874
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
3875
    new_test = clone_test(test, new_id)
 
3876
    for name, value in scenario[1].items():
 
3877
        setattr(new_test, name, value)
 
3878
    return new_test
 
3879
 
 
3880
 
 
3881
def clone_test(test, new_id):
 
3882
    """Clone a test giving it a new id.
 
3883
 
 
3884
    :param test: The test to clone.
 
3885
    :param new_id: The id to assign to it.
 
3886
    :return: The new test.
 
3887
    """
 
3888
    new_test = copy(test)
 
3889
    new_test.id = lambda: new_id
 
3890
    return new_test
 
3891
 
 
3892
 
 
3893
def _rmtree_temp_dir(dirname):
 
3894
    # If LANG=C we probably have created some bogus paths
 
3895
    # which rmtree(unicode) will fail to delete
 
3896
    # so make sure we are using rmtree(str) to delete everything
 
3897
    # except on win32, where rmtree(str) will fail
 
3898
    # since it doesn't have the property of byte-stream paths
 
3899
    # (they are either ascii or mbcs)
 
3900
    if sys.platform == 'win32':
 
3901
        # make sure we are using the unicode win32 api
 
3902
        dirname = unicode(dirname)
 
3903
    else:
 
3904
        dirname = dirname.encode(sys.getfilesystemencoding())
 
3905
    try:
 
3906
        osutils.rmtree(dirname)
 
3907
    except OSError, e:
 
3908
        # We don't want to fail here because some useful display will be lost
 
3909
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
3910
        # possible info to the test runner is even worse.
 
3911
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
3912
                         % (os.path.basename(dirname), e))
 
3913
 
 
3914
 
 
3915
class Feature(object):
 
3916
    """An operating system Feature."""
 
3917
 
 
3918
    def __init__(self):
 
3919
        self._available = None
 
3920
 
 
3921
    def available(self):
 
3922
        """Is the feature available?
 
3923
 
 
3924
        :return: True if the feature is available.
 
3925
        """
 
3926
        if self._available is None:
 
3927
            self._available = self._probe()
 
3928
        return self._available
 
3929
 
 
3930
    def _probe(self):
 
3931
        """Implement this method in concrete features.
 
3932
 
 
3933
        :return: True if the feature is available.
 
3934
        """
 
3935
        raise NotImplementedError
 
3936
 
 
3937
    def __str__(self):
 
3938
        if getattr(self, 'feature_name', None):
 
3939
            return self.feature_name()
 
3940
        return self.__class__.__name__
 
3941
 
 
3942
 
 
3943
class _SymlinkFeature(Feature):
 
3944
 
 
3945
    def _probe(self):
 
3946
        return osutils.has_symlinks()
 
3947
 
 
3948
    def feature_name(self):
 
3949
        return 'symlinks'
 
3950
 
 
3951
SymlinkFeature = _SymlinkFeature()
 
3952
 
 
3953
 
 
3954
class _HardlinkFeature(Feature):
 
3955
 
 
3956
    def _probe(self):
 
3957
        return osutils.has_hardlinks()
 
3958
 
 
3959
    def feature_name(self):
 
3960
        return 'hardlinks'
 
3961
 
 
3962
HardlinkFeature = _HardlinkFeature()
 
3963
 
 
3964
 
 
3965
class _OsFifoFeature(Feature):
 
3966
 
 
3967
    def _probe(self):
 
3968
        return getattr(os, 'mkfifo', None)
 
3969
 
 
3970
    def feature_name(self):
 
3971
        return 'filesystem fifos'
 
3972
 
 
3973
OsFifoFeature = _OsFifoFeature()
 
3974
 
 
3975
 
 
3976
class _UnicodeFilenameFeature(Feature):
 
3977
    """Does the filesystem support Unicode filenames?"""
 
3978
 
 
3979
    def _probe(self):
 
3980
        try:
 
3981
            # Check for character combinations unlikely to be covered by any
 
3982
            # single non-unicode encoding. We use the characters
 
3983
            # - greek small letter alpha (U+03B1) and
 
3984
            # - braille pattern dots-123456 (U+283F).
 
3985
            os.stat(u'\u03b1\u283f')
 
3986
        except UnicodeEncodeError:
 
3987
            return False
 
3988
        except (IOError, OSError):
 
3989
            # The filesystem allows the Unicode filename but the file doesn't
 
3990
            # exist.
 
3991
            return True
 
3992
        else:
 
3993
            # The filesystem allows the Unicode filename and the file exists,
 
3994
            # for some reason.
 
3995
            return True
 
3996
 
 
3997
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
3998
 
 
3999
 
 
4000
def probe_unicode_in_user_encoding():
 
4001
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4002
    Return first successfull match.
 
4003
 
 
4004
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4005
    """
 
4006
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4007
    for uni_val in possible_vals:
 
4008
        try:
 
4009
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4010
        except UnicodeEncodeError:
 
4011
            # Try a different character
 
4012
            pass
 
4013
        else:
 
4014
            return uni_val, str_val
 
4015
    return None, None
 
4016
 
 
4017
 
 
4018
def probe_bad_non_ascii(encoding):
 
4019
    """Try to find [bad] character with code [128..255]
 
4020
    that cannot be decoded to unicode in some encoding.
 
4021
    Return None if all non-ascii characters is valid
 
4022
    for given encoding.
 
4023
    """
 
4024
    for i in xrange(128, 256):
 
4025
        char = chr(i)
 
4026
        try:
 
4027
            char.decode(encoding)
 
4028
        except UnicodeDecodeError:
 
4029
            return char
 
4030
    return None
 
4031
 
 
4032
 
 
4033
class _HTTPSServerFeature(Feature):
 
4034
    """Some tests want an https Server, check if one is available.
 
4035
 
 
4036
    Right now, the only way this is available is under python2.6 which provides
 
4037
    an ssl module.
 
4038
    """
 
4039
 
 
4040
    def _probe(self):
 
4041
        try:
 
4042
            import ssl
 
4043
            return True
 
4044
        except ImportError:
 
4045
            return False
 
4046
 
 
4047
    def feature_name(self):
 
4048
        return 'HTTPSServer'
 
4049
 
 
4050
 
 
4051
HTTPSServerFeature = _HTTPSServerFeature()
 
4052
 
 
4053
 
 
4054
class _UnicodeFilename(Feature):
 
4055
    """Does the filesystem support Unicode filenames?"""
 
4056
 
 
4057
    def _probe(self):
 
4058
        try:
 
4059
            os.stat(u'\u03b1')
 
4060
        except UnicodeEncodeError:
 
4061
            return False
 
4062
        except (IOError, OSError):
 
4063
            # The filesystem allows the Unicode filename but the file doesn't
 
4064
            # exist.
 
4065
            return True
 
4066
        else:
 
4067
            # The filesystem allows the Unicode filename and the file exists,
 
4068
            # for some reason.
 
4069
            return True
 
4070
 
 
4071
UnicodeFilename = _UnicodeFilename()
 
4072
 
 
4073
 
 
4074
class _UTF8Filesystem(Feature):
 
4075
    """Is the filesystem UTF-8?"""
 
4076
 
 
4077
    def _probe(self):
 
4078
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4079
            return True
 
4080
        return False
 
4081
 
 
4082
UTF8Filesystem = _UTF8Filesystem()
 
4083
 
 
4084
 
 
4085
class _CaseInsCasePresFilenameFeature(Feature):
 
4086
    """Is the file-system case insensitive, but case-preserving?"""
 
4087
 
 
4088
    def _probe(self):
 
4089
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4090
        try:
 
4091
            # first check truly case-preserving for created files, then check
 
4092
            # case insensitive when opening existing files.
 
4093
            name = osutils.normpath(name)
 
4094
            base, rel = osutils.split(name)
 
4095
            found_rel = osutils.canonical_relpath(base, name)
 
4096
            return (found_rel == rel
 
4097
                    and os.path.isfile(name.upper())
 
4098
                    and os.path.isfile(name.lower()))
 
4099
        finally:
 
4100
            os.close(fileno)
 
4101
            os.remove(name)
 
4102
 
 
4103
    def feature_name(self):
 
4104
        return "case-insensitive case-preserving filesystem"
 
4105
 
 
4106
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4107
 
 
4108
 
 
4109
class _CaseInsensitiveFilesystemFeature(Feature):
 
4110
    """Check if underlying filesystem is case-insensitive but *not* case
 
4111
    preserving.
 
4112
    """
 
4113
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4114
    # more likely to be case preserving, so this case is rare.
 
4115
 
 
4116
    def _probe(self):
 
4117
        if CaseInsCasePresFilenameFeature.available():
 
4118
            return False
 
4119
 
 
4120
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4121
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4122
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4123
        else:
 
4124
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4125
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4126
            dir=root)
 
4127
        name_a = osutils.pathjoin(tdir, 'a')
 
4128
        name_A = osutils.pathjoin(tdir, 'A')
 
4129
        os.mkdir(name_a)
 
4130
        result = osutils.isdir(name_A)
 
4131
        _rmtree_temp_dir(tdir)
 
4132
        return result
 
4133
 
 
4134
    def feature_name(self):
 
4135
        return 'case-insensitive filesystem'
 
4136
 
 
4137
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4138
 
 
4139
 
 
4140
class _SubUnitFeature(Feature):
 
4141
    """Check if subunit is available."""
 
4142
 
 
4143
    def _probe(self):
 
4144
        try:
 
4145
            import subunit
 
4146
            return True
 
4147
        except ImportError:
 
4148
            return False
 
4149
 
 
4150
    def feature_name(self):
 
4151
        return 'subunit'
 
4152
 
 
4153
SubUnitFeature = _SubUnitFeature()
 
4154
# Only define SubUnitBzrRunner if subunit is available.
 
4155
try:
 
4156
    from subunit import TestProtocolClient
 
4157
    try:
 
4158
        from subunit.test_results import AutoTimingTestResultDecorator
 
4159
    except ImportError:
 
4160
        AutoTimingTestResultDecorator = lambda x:x
 
4161
    class SubUnitBzrRunner(TextTestRunner):
 
4162
        def run(self, test):
 
4163
            result = AutoTimingTestResultDecorator(
 
4164
                TestProtocolClient(self.stream))
 
4165
            test.run(result)
 
4166
            return result
 
4167
except ImportError:
 
4168
    pass