/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2009-08-03 11:37:53 UTC
  • mto: This revision was merged to the branch mainline in revision 4590.
  • Revision ID: jelmer@samba.org-20090803113753-h2neg2rfmpj6qi34
Add 'bzr_transports' property to the plugin API.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
from testsweet import TestBase, run_suite, InTempDir
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import math
 
37
import os
 
38
from pprint import pformat
 
39
import random
 
40
import re
 
41
import shlex
 
42
import stat
 
43
from subprocess import Popen, PIPE, STDOUT
 
44
import sys
 
45
import tempfile
 
46
import threading
 
47
import time
 
48
import unittest
 
49
import warnings
 
50
 
 
51
 
 
52
from bzrlib import (
 
53
    branchbuilder,
 
54
    bzrdir,
 
55
    debug,
 
56
    errors,
 
57
    hooks,
 
58
    lock as _mod_lock,
 
59
    memorytree,
 
60
    osutils,
 
61
    progress,
 
62
    ui,
 
63
    urlutils,
 
64
    registry,
 
65
    workingtree,
 
66
    )
 
67
import bzrlib.branch
19
68
import bzrlib.commands
20
 
 
21
 
MODULES_TO_TEST = []
22
 
MODULES_TO_DOCTEST = []
23
 
 
24
 
 
25
 
class BzrTestBase(InTempDir):
26
 
    """bzr-specific test base class"""
27
 
    def run_bzr(self, *args, **kwargs):
28
 
        retcode = kwargs.get('retcode', 0)
29
 
        self.assertEquals(bzrlib.commands.run_bzr(args), retcode)
 
69
import bzrlib.timestamp
 
70
import bzrlib.export
 
71
import bzrlib.inventory
 
72
import bzrlib.iterablefile
 
73
import bzrlib.lockdir
 
74
try:
 
75
    import bzrlib.lsprof
 
76
except ImportError:
 
77
    # lsprof not available
 
78
    pass
 
79
from bzrlib.merge import merge_inner
 
80
import bzrlib.merge3
 
81
import bzrlib.plugin
 
82
from bzrlib.smart import client, request, server
 
83
import bzrlib.store
 
84
from bzrlib import symbol_versioning
 
85
from bzrlib.symbol_versioning import (
 
86
    DEPRECATED_PARAMETER,
 
87
    deprecated_function,
 
88
    deprecated_method,
 
89
    deprecated_passed,
 
90
    )
 
91
import bzrlib.trace
 
92
from bzrlib.transport import get_transport
 
93
import bzrlib.transport
 
94
from bzrlib.transport.local import LocalURLServer
 
95
from bzrlib.transport.memory import MemoryServer
 
96
from bzrlib.transport.readonly import ReadonlyServer
 
97
from bzrlib.trace import mutter, note
 
98
from bzrlib.tests import TestUtil
 
99
from bzrlib.tests.http_server import HttpServer
 
100
from bzrlib.tests.TestUtil import (
 
101
                          TestSuite,
 
102
                          TestLoader,
 
103
                          )
 
104
from bzrlib.tests.treeshape import build_tree_contents
 
105
from bzrlib.ui.text import TextUIFactory
 
106
import bzrlib.version_info_formats.format_custom
 
107
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
108
 
 
109
# Mark this python module as being part of the implementation
 
110
# of unittest: this gives us better tracebacks where the last
 
111
# shown frame is the test code, not our assertXYZ.
 
112
__unittest = 1
 
113
 
 
114
default_transport = LocalURLServer
 
115
 
 
116
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
117
SUBUNIT_SEEK_SET = 0
 
118
SUBUNIT_SEEK_CUR = 1
 
119
 
 
120
 
 
121
class ExtendedTestResult(unittest._TextTestResult):
 
122
    """Accepts, reports and accumulates the results of running tests.
 
123
 
 
124
    Compared to the unittest version this class adds support for
 
125
    profiling, benchmarking, stopping as soon as a test fails,  and
 
126
    skipping tests.  There are further-specialized subclasses for
 
127
    different types of display.
 
128
 
 
129
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
130
    addFailure or addError classes.  These in turn may redirect to a more
 
131
    specific case for the special test results supported by our extended
 
132
    tests.
 
133
 
 
134
    Note that just one of these objects is fed the results from many tests.
 
135
    """
 
136
 
 
137
    stop_early = False
 
138
 
 
139
    def __init__(self, stream, descriptions, verbosity,
 
140
                 bench_history=None,
 
141
                 strict=False,
 
142
                 ):
 
143
        """Construct new TestResult.
 
144
 
 
145
        :param bench_history: Optionally, a writable file object to accumulate
 
146
            benchmark results.
 
147
        """
 
148
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
149
        if bench_history is not None:
 
150
            from bzrlib.version import _get_bzr_source_tree
 
151
            src_tree = _get_bzr_source_tree()
 
152
            if src_tree:
 
153
                try:
 
154
                    revision_id = src_tree.get_parent_ids()[0]
 
155
                except IndexError:
 
156
                    # XXX: if this is a brand new tree, do the same as if there
 
157
                    # is no branch.
 
158
                    revision_id = ''
 
159
            else:
 
160
                # XXX: If there's no branch, what should we do?
 
161
                revision_id = ''
 
162
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
163
        self._bench_history = bench_history
 
164
        self.ui = ui.ui_factory
 
165
        self.num_tests = 0
 
166
        self.error_count = 0
 
167
        self.failure_count = 0
 
168
        self.known_failure_count = 0
 
169
        self.skip_count = 0
 
170
        self.not_applicable_count = 0
 
171
        self.unsupported = {}
 
172
        self.count = 0
 
173
        self._overall_start_time = time.time()
 
174
        self._strict = strict
 
175
 
 
176
    def done(self):
 
177
        if self._strict:
 
178
            ok = self.wasStrictlySuccessful()
 
179
        else:
 
180
            ok = self.wasSuccessful()
 
181
        if ok:
 
182
            self.stream.write('tests passed\n')
 
183
        else:
 
184
            self.stream.write('tests failed\n')
 
185
        if TestCase._first_thread_leaker_id:
 
186
            self.stream.write(
 
187
                '%s is leaking threads among %d leaking tests.\n' % (
 
188
                TestCase._first_thread_leaker_id,
 
189
                TestCase._leaking_threads_tests))
 
190
 
 
191
    def _extractBenchmarkTime(self, testCase):
 
192
        """Add a benchmark time for the current test case."""
 
193
        return getattr(testCase, "_benchtime", None)
 
194
 
 
195
    def _elapsedTestTimeString(self):
 
196
        """Return a time string for the overall time the current test has taken."""
 
197
        return self._formatTime(time.time() - self._start_time)
 
198
 
 
199
    def _testTimeString(self, testCase):
 
200
        benchmark_time = self._extractBenchmarkTime(testCase)
 
201
        if benchmark_time is not None:
 
202
            return self._formatTime(benchmark_time) + "*"
 
203
        else:
 
204
            return self._elapsedTestTimeString()
 
205
 
 
206
    def _formatTime(self, seconds):
 
207
        """Format seconds as milliseconds with leading spaces."""
 
208
        # some benchmarks can take thousands of seconds to run, so we need 8
 
209
        # places
 
210
        return "%8dms" % (1000 * seconds)
 
211
 
 
212
    def _shortened_test_description(self, test):
 
213
        what = test.id()
 
214
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
215
        return what
 
216
 
 
217
    def startTest(self, test):
 
218
        unittest.TestResult.startTest(self, test)
 
219
        if self.count == 0:
 
220
            self.startTests()
 
221
        self.report_test_start(test)
 
222
        test.number = self.count
 
223
        self._recordTestStartTime()
 
224
 
 
225
    def startTests(self):
 
226
        self.stream.write(
 
227
            'testing: %s\n' % (osutils.realpath(sys.argv[0]),))
 
228
        self.stream.write(
 
229
            '   %s (%s python%s)\n' % (
 
230
                    bzrlib.__path__[0],
 
231
                    bzrlib.version_string,
 
232
                    bzrlib._format_version_tuple(sys.version_info),
 
233
                    ))
 
234
        self.stream.write('\n')
 
235
 
 
236
    def _recordTestStartTime(self):
 
237
        """Record that a test has started."""
 
238
        self._start_time = time.time()
 
239
 
 
240
    def _cleanupLogFile(self, test):
 
241
        # We can only do this if we have one of our TestCases, not if
 
242
        # we have a doctest.
 
243
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
244
        if setKeepLogfile is not None:
 
245
            setKeepLogfile()
 
246
 
 
247
    def addError(self, test, err):
 
248
        """Tell result that test finished with an error.
 
249
 
 
250
        Called from the TestCase run() method when the test
 
251
        fails with an unexpected error.
 
252
        """
 
253
        self._testConcluded(test)
 
254
        if isinstance(err[1], TestNotApplicable):
 
255
            return self._addNotApplicable(test, err)
 
256
        elif isinstance(err[1], UnavailableFeature):
 
257
            return self.addNotSupported(test, err[1].args[0])
 
258
        else:
 
259
            unittest.TestResult.addError(self, test, err)
 
260
            self.error_count += 1
 
261
            self.report_error(test, err)
 
262
            if self.stop_early:
 
263
                self.stop()
 
264
            self._cleanupLogFile(test)
 
265
 
 
266
    def addFailure(self, test, err):
 
267
        """Tell result that test failed.
 
268
 
 
269
        Called from the TestCase run() method when the test
 
270
        fails because e.g. an assert() method failed.
 
271
        """
 
272
        self._testConcluded(test)
 
273
        if isinstance(err[1], KnownFailure):
 
274
            return self._addKnownFailure(test, err)
 
275
        else:
 
276
            unittest.TestResult.addFailure(self, test, err)
 
277
            self.failure_count += 1
 
278
            self.report_failure(test, err)
 
279
            if self.stop_early:
 
280
                self.stop()
 
281
            self._cleanupLogFile(test)
 
282
 
 
283
    def addSuccess(self, test):
 
284
        """Tell result that test completed successfully.
 
285
 
 
286
        Called from the TestCase run()
 
287
        """
 
288
        self._testConcluded(test)
 
289
        if self._bench_history is not None:
 
290
            benchmark_time = self._extractBenchmarkTime(test)
 
291
            if benchmark_time is not None:
 
292
                self._bench_history.write("%s %s\n" % (
 
293
                    self._formatTime(benchmark_time),
 
294
                    test.id()))
 
295
        self.report_success(test)
 
296
        self._cleanupLogFile(test)
 
297
        unittest.TestResult.addSuccess(self, test)
 
298
        test._log_contents = ''
 
299
 
 
300
    def _testConcluded(self, test):
 
301
        """Common code when a test has finished.
 
302
 
 
303
        Called regardless of whether it succeded, failed, etc.
 
304
        """
 
305
        pass
 
306
 
 
307
    def _addKnownFailure(self, test, err):
 
308
        self.known_failure_count += 1
 
309
        self.report_known_failure(test, err)
 
310
 
 
311
    def addNotSupported(self, test, feature):
 
312
        """The test will not be run because of a missing feature.
 
313
        """
 
314
        # this can be called in two different ways: it may be that the
 
315
        # test started running, and then raised (through addError)
 
316
        # UnavailableFeature.  Alternatively this method can be called
 
317
        # while probing for features before running the tests; in that
 
318
        # case we will see startTest and stopTest, but the test will never
 
319
        # actually run.
 
320
        self.unsupported.setdefault(str(feature), 0)
 
321
        self.unsupported[str(feature)] += 1
 
322
        self.report_unsupported(test, feature)
 
323
 
 
324
    def addSkip(self, test, reason):
 
325
        """A test has not run for 'reason'."""
 
326
        self.skip_count += 1
 
327
        self.report_skip(test, reason)
 
328
 
 
329
    def _addNotApplicable(self, test, skip_excinfo):
 
330
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
331
            self.not_applicable_count += 1
 
332
            self.report_not_applicable(test, skip_excinfo)
 
333
        try:
 
334
            test.tearDown()
 
335
        except KeyboardInterrupt:
 
336
            raise
 
337
        except:
 
338
            self.addError(test, test.exc_info())
 
339
        else:
 
340
            # seems best to treat this as success from point-of-view of unittest
 
341
            # -- it actually does nothing so it barely matters :)
 
342
            unittest.TestResult.addSuccess(self, test)
 
343
            test._log_contents = ''
 
344
 
 
345
    def printErrorList(self, flavour, errors):
 
346
        for test, err in errors:
 
347
            self.stream.writeln(self.separator1)
 
348
            self.stream.write("%s: " % flavour)
 
349
            self.stream.writeln(self.getDescription(test))
 
350
            if getattr(test, '_get_log', None) is not None:
 
351
                log_contents = test._get_log()
 
352
                if log_contents:
 
353
                    self.stream.write('\n')
 
354
                    self.stream.write(
 
355
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
356
                    self.stream.write('\n')
 
357
                    self.stream.write(log_contents)
 
358
                    self.stream.write('\n')
 
359
                    self.stream.write(
 
360
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
361
                    self.stream.write('\n')
 
362
            self.stream.writeln(self.separator2)
 
363
            self.stream.writeln("%s" % err)
 
364
 
 
365
    def progress(self, offset, whence):
 
366
        """The test is adjusting the count of tests to run."""
 
367
        if whence == SUBUNIT_SEEK_SET:
 
368
            self.num_tests = offset
 
369
        elif whence == SUBUNIT_SEEK_CUR:
 
370
            self.num_tests += offset
 
371
        else:
 
372
            raise errors.BzrError("Unknown whence %r" % whence)
 
373
 
 
374
    def finished(self):
 
375
        pass
 
376
 
 
377
    def report_cleaning_up(self):
 
378
        pass
 
379
 
 
380
    def report_success(self, test):
 
381
        pass
 
382
 
 
383
    def wasStrictlySuccessful(self):
 
384
        if self.unsupported or self.known_failure_count:
 
385
            return False
 
386
        return self.wasSuccessful()
 
387
 
 
388
 
 
389
class TextTestResult(ExtendedTestResult):
 
390
    """Displays progress and results of tests in text form"""
 
391
 
 
392
    def __init__(self, stream, descriptions, verbosity,
 
393
                 bench_history=None,
 
394
                 pb=None,
 
395
                 strict=None,
 
396
                 ):
 
397
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
398
            bench_history, strict)
 
399
        if pb is None:
 
400
            self.pb = self.ui.nested_progress_bar()
 
401
            self._supplied_pb = False
 
402
        else:
 
403
            self.pb = pb
 
404
            self._supplied_pb = True
 
405
        self.pb.show_pct = False
 
406
        self.pb.show_spinner = False
 
407
        self.pb.show_eta = False,
 
408
        self.pb.show_count = False
 
409
        self.pb.show_bar = False
 
410
 
 
411
    def report_starting(self):
 
412
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
413
 
 
414
    def _progress_prefix_text(self):
 
415
        # the longer this text, the less space we have to show the test
 
416
        # name...
 
417
        a = '[%d' % self.count              # total that have been run
 
418
        # tests skipped as known not to be relevant are not important enough
 
419
        # to show here
 
420
        ## if self.skip_count:
 
421
        ##     a += ', %d skip' % self.skip_count
 
422
        ## if self.known_failure_count:
 
423
        ##     a += '+%dX' % self.known_failure_count
 
424
        if self.num_tests:
 
425
            a +='/%d' % self.num_tests
 
426
        a += ' in '
 
427
        runtime = time.time() - self._overall_start_time
 
428
        if runtime >= 60:
 
429
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
430
        else:
 
431
            a += '%ds' % runtime
 
432
        if self.error_count:
 
433
            a += ', %d err' % self.error_count
 
434
        if self.failure_count:
 
435
            a += ', %d fail' % self.failure_count
 
436
        if self.unsupported:
 
437
            a += ', %d missing' % len(self.unsupported)
 
438
        a += ']'
 
439
        return a
 
440
 
 
441
    def report_test_start(self, test):
 
442
        self.count += 1
 
443
        self.pb.update(
 
444
                self._progress_prefix_text()
 
445
                + ' '
 
446
                + self._shortened_test_description(test))
 
447
 
 
448
    def _test_description(self, test):
 
449
        return self._shortened_test_description(test)
 
450
 
 
451
    def report_error(self, test, err):
 
452
        self.pb.note('ERROR: %s\n    %s\n',
 
453
            self._test_description(test),
 
454
            err[1],
 
455
            )
 
456
 
 
457
    def report_failure(self, test, err):
 
458
        self.pb.note('FAIL: %s\n    %s\n',
 
459
            self._test_description(test),
 
460
            err[1],
 
461
            )
 
462
 
 
463
    def report_known_failure(self, test, err):
 
464
        self.pb.note('XFAIL: %s\n%s\n',
 
465
            self._test_description(test), err[1])
 
466
 
 
467
    def report_skip(self, test, reason):
 
468
        pass
 
469
 
 
470
    def report_not_applicable(self, test, skip_excinfo):
 
471
        pass
 
472
 
 
473
    def report_unsupported(self, test, feature):
 
474
        """test cannot be run because feature is missing."""
 
475
 
 
476
    def report_cleaning_up(self):
 
477
        self.pb.update('Cleaning up')
 
478
 
 
479
    def finished(self):
 
480
        if not self._supplied_pb:
 
481
            self.pb.finished()
 
482
 
 
483
 
 
484
class VerboseTestResult(ExtendedTestResult):
 
485
    """Produce long output, with one line per test run plus times"""
 
486
 
 
487
    def _ellipsize_to_right(self, a_string, final_width):
 
488
        """Truncate and pad a string, keeping the right hand side"""
 
489
        if len(a_string) > final_width:
 
490
            result = '...' + a_string[3-final_width:]
 
491
        else:
 
492
            result = a_string
 
493
        return result.ljust(final_width)
 
494
 
 
495
    def report_starting(self):
 
496
        self.stream.write('running %d tests...\n' % self.num_tests)
 
497
 
 
498
    def report_test_start(self, test):
 
499
        self.count += 1
 
500
        name = self._shortened_test_description(test)
 
501
        # width needs space for 6 char status, plus 1 for slash, plus an
 
502
        # 11-char time string, plus a trailing blank
 
503
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
504
        self.stream.write(self._ellipsize_to_right(name,
 
505
                          osutils.terminal_width()-18))
 
506
        self.stream.flush()
 
507
 
 
508
    def _error_summary(self, err):
 
509
        indent = ' ' * 4
 
510
        return '%s%s' % (indent, err[1])
 
511
 
 
512
    def report_error(self, test, err):
 
513
        self.stream.writeln('ERROR %s\n%s'
 
514
                % (self._testTimeString(test),
 
515
                   self._error_summary(err)))
 
516
 
 
517
    def report_failure(self, test, err):
 
518
        self.stream.writeln(' FAIL %s\n%s'
 
519
                % (self._testTimeString(test),
 
520
                   self._error_summary(err)))
 
521
 
 
522
    def report_known_failure(self, test, err):
 
523
        self.stream.writeln('XFAIL %s\n%s'
 
524
                % (self._testTimeString(test),
 
525
                   self._error_summary(err)))
 
526
 
 
527
    def report_success(self, test):
 
528
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
529
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
530
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
531
            stats.pprint(file=self.stream)
 
532
        # flush the stream so that we get smooth output. This verbose mode is
 
533
        # used to show the output in PQM.
 
534
        self.stream.flush()
 
535
 
 
536
    def report_skip(self, test, reason):
 
537
        self.stream.writeln(' SKIP %s\n%s'
 
538
                % (self._testTimeString(test), reason))
 
539
 
 
540
    def report_not_applicable(self, test, skip_excinfo):
 
541
        self.stream.writeln('  N/A %s\n%s'
 
542
                % (self._testTimeString(test),
 
543
                   self._error_summary(skip_excinfo)))
 
544
 
 
545
    def report_unsupported(self, test, feature):
 
546
        """test cannot be run because feature is missing."""
 
547
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
548
                %(self._testTimeString(test), feature))
 
549
 
 
550
 
 
551
class TextTestRunner(object):
 
552
    stop_on_failure = False
 
553
 
 
554
    def __init__(self,
 
555
                 stream=sys.stderr,
 
556
                 descriptions=0,
 
557
                 verbosity=1,
 
558
                 bench_history=None,
 
559
                 list_only=False,
 
560
                 strict=False,
 
561
                 ):
 
562
        self.stream = unittest._WritelnDecorator(stream)
 
563
        self.descriptions = descriptions
 
564
        self.verbosity = verbosity
 
565
        self._bench_history = bench_history
 
566
        self.list_only = list_only
 
567
        self._strict = strict
 
568
 
 
569
    def run(self, test):
 
570
        "Run the given test case or test suite."
 
571
        startTime = time.time()
 
572
        if self.verbosity == 1:
 
573
            result_class = TextTestResult
 
574
        elif self.verbosity >= 2:
 
575
            result_class = VerboseTestResult
 
576
        result = result_class(self.stream,
 
577
                              self.descriptions,
 
578
                              self.verbosity,
 
579
                              bench_history=self._bench_history,
 
580
                              strict=self._strict,
 
581
                              )
 
582
        result.stop_early = self.stop_on_failure
 
583
        result.report_starting()
 
584
        if self.list_only:
 
585
            if self.verbosity >= 2:
 
586
                self.stream.writeln("Listing tests only ...\n")
 
587
            run = 0
 
588
            for t in iter_suite_tests(test):
 
589
                self.stream.writeln("%s" % (t.id()))
 
590
                run += 1
 
591
            return None
 
592
        else:
 
593
            try:
 
594
                import testtools
 
595
            except ImportError:
 
596
                test.run(result)
 
597
            else:
 
598
                if isinstance(test, testtools.ConcurrentTestSuite):
 
599
                    # We need to catch bzr specific behaviors
 
600
                    test.run(BZRTransformingResult(result))
 
601
                else:
 
602
                    test.run(result)
 
603
            run = result.testsRun
 
604
            actionTaken = "Ran"
 
605
        stopTime = time.time()
 
606
        timeTaken = stopTime - startTime
 
607
        result.printErrors()
 
608
        self.stream.writeln(result.separator2)
 
609
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
610
                            run, run != 1 and "s" or "", timeTaken))
 
611
        self.stream.writeln()
 
612
        if not result.wasSuccessful():
 
613
            self.stream.write("FAILED (")
 
614
            failed, errored = map(len, (result.failures, result.errors))
 
615
            if failed:
 
616
                self.stream.write("failures=%d" % failed)
 
617
            if errored:
 
618
                if failed: self.stream.write(", ")
 
619
                self.stream.write("errors=%d" % errored)
 
620
            if result.known_failure_count:
 
621
                if failed or errored: self.stream.write(", ")
 
622
                self.stream.write("known_failure_count=%d" %
 
623
                    result.known_failure_count)
 
624
            self.stream.writeln(")")
 
625
        else:
 
626
            if result.known_failure_count:
 
627
                self.stream.writeln("OK (known_failures=%d)" %
 
628
                    result.known_failure_count)
 
629
            else:
 
630
                self.stream.writeln("OK")
 
631
        if result.skip_count > 0:
 
632
            skipped = result.skip_count
 
633
            self.stream.writeln('%d test%s skipped' %
 
634
                                (skipped, skipped != 1 and "s" or ""))
 
635
        if result.unsupported:
 
636
            for feature, count in sorted(result.unsupported.items()):
 
637
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
638
                    (feature, count))
 
639
        result.finished()
 
640
        return result
 
641
 
 
642
 
 
643
def iter_suite_tests(suite):
 
644
    """Return all tests in a suite, recursing through nested suites"""
 
645
    if isinstance(suite, unittest.TestCase):
 
646
        yield suite
 
647
    elif isinstance(suite, unittest.TestSuite):
 
648
        for item in suite:
 
649
            for r in iter_suite_tests(item):
 
650
                yield r
 
651
    else:
 
652
        raise Exception('unknown type %r for object %r'
 
653
                        % (type(suite), suite))
 
654
 
 
655
 
 
656
class TestSkipped(Exception):
 
657
    """Indicates that a test was intentionally skipped, rather than failing."""
 
658
 
 
659
 
 
660
class TestNotApplicable(TestSkipped):
 
661
    """A test is not applicable to the situation where it was run.
 
662
 
 
663
    This is only normally raised by parameterized tests, if they find that
 
664
    the instance they're constructed upon does not support one aspect
 
665
    of its interface.
 
666
    """
 
667
 
 
668
 
 
669
class KnownFailure(AssertionError):
 
670
    """Indicates that a test failed in a precisely expected manner.
 
671
 
 
672
    Such failures dont block the whole test suite from passing because they are
 
673
    indicators of partially completed code or of future work. We have an
 
674
    explicit error for them so that we can ensure that they are always visible:
 
675
    KnownFailures are always shown in the output of bzr selftest.
 
676
    """
 
677
 
 
678
 
 
679
class UnavailableFeature(Exception):
 
680
    """A feature required for this test was not available.
 
681
 
 
682
    The feature should be used to construct the exception.
 
683
    """
 
684
 
 
685
 
 
686
class CommandFailed(Exception):
 
687
    pass
 
688
 
 
689
 
 
690
class StringIOWrapper(object):
 
691
    """A wrapper around cStringIO which just adds an encoding attribute.
 
692
 
 
693
    Internally we can check sys.stdout to see what the output encoding
 
694
    should be. However, cStringIO has no encoding attribute that we can
 
695
    set. So we wrap it instead.
 
696
    """
 
697
    encoding='ascii'
 
698
    _cstring = None
 
699
 
 
700
    def __init__(self, s=None):
 
701
        if s is not None:
 
702
            self.__dict__['_cstring'] = StringIO(s)
 
703
        else:
 
704
            self.__dict__['_cstring'] = StringIO()
 
705
 
 
706
    def __getattr__(self, name, getattr=getattr):
 
707
        return getattr(self.__dict__['_cstring'], name)
 
708
 
 
709
    def __setattr__(self, name, val):
 
710
        if name == 'encoding':
 
711
            self.__dict__['encoding'] = val
 
712
        else:
 
713
            return setattr(self._cstring, name, val)
 
714
 
 
715
 
 
716
class TestUIFactory(TextUIFactory):
 
717
    """A UI Factory for testing.
 
718
 
 
719
    Hide the progress bar but emit note()s.
 
720
    Redirect stdin.
 
721
    Allows get_password to be tested without real tty attached.
 
722
    """
 
723
 
 
724
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
725
        if stdin is not None:
 
726
            # We use a StringIOWrapper to be able to test various
 
727
            # encodings, but the user is still responsible to
 
728
            # encode the string and to set the encoding attribute
 
729
            # of StringIOWrapper.
 
730
            stdin = StringIOWrapper(stdin)
 
731
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
732
 
 
733
    def clear(self):
 
734
        """See progress.ProgressBar.clear()."""
 
735
 
 
736
    def clear_term(self):
 
737
        """See progress.ProgressBar.clear_term()."""
 
738
 
 
739
    def finished(self):
 
740
        """See progress.ProgressBar.finished()."""
 
741
 
 
742
    def note(self, fmt_string, *args):
 
743
        """See progress.ProgressBar.note()."""
 
744
        if args:
 
745
            fmt_string = fmt_string % args
 
746
        self.stdout.write(fmt_string + "\n")
 
747
 
 
748
    def progress_bar(self):
 
749
        return self
 
750
 
 
751
    def nested_progress_bar(self):
 
752
        return self
 
753
 
 
754
    def update(self, message, count=None, total=None):
 
755
        """See progress.ProgressBar.update()."""
 
756
 
 
757
    def get_non_echoed_password(self):
 
758
        """Get password from stdin without trying to handle the echo mode"""
 
759
        password = self.stdin.readline()
 
760
        if not password:
 
761
            raise EOFError
 
762
        if password[-1] == '\n':
 
763
            password = password[:-1]
 
764
        return password
 
765
 
 
766
 
 
767
class TestCase(unittest.TestCase):
 
768
    """Base class for bzr unit tests.
 
769
 
 
770
    Tests that need access to disk resources should subclass
 
771
    TestCaseInTempDir not TestCase.
 
772
 
 
773
    Error and debug log messages are redirected from their usual
 
774
    location into a temporary file, the contents of which can be
 
775
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
776
    so that it can also capture file IO.  When the test completes this file
 
777
    is read into memory and removed from disk.
 
778
 
 
779
    There are also convenience functions to invoke bzr's command-line
 
780
    routine, and to build and check bzr trees.
 
781
 
 
782
    In addition to the usual method of overriding tearDown(), this class also
 
783
    allows subclasses to register functions into the _cleanups list, which is
 
784
    run in order as the object is torn down.  It's less likely this will be
 
785
    accidentally overlooked.
 
786
    """
 
787
 
 
788
    _active_threads = None
 
789
    _leaking_threads_tests = 0
 
790
    _first_thread_leaker_id = None
 
791
    _log_file_name = None
 
792
    _log_contents = ''
 
793
    _keep_log_file = False
 
794
    # record lsprof data when performing benchmark calls.
 
795
    _gather_lsprof_in_benchmarks = False
 
796
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
797
                     '_log_contents', '_log_file_name', '_benchtime',
 
798
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
799
 
 
800
    def __init__(self, methodName='testMethod'):
 
801
        super(TestCase, self).__init__(methodName)
 
802
        self._cleanups = []
 
803
        self._bzr_test_setUp_run = False
 
804
        self._bzr_test_tearDown_run = False
 
805
 
 
806
    def setUp(self):
 
807
        unittest.TestCase.setUp(self)
 
808
        self._bzr_test_setUp_run = True
 
809
        self._cleanEnvironment()
 
810
        self._silenceUI()
 
811
        self._startLogFile()
 
812
        self._benchcalls = []
 
813
        self._benchtime = None
 
814
        self._clear_hooks()
 
815
        # Track locks - needs to be called before _clear_debug_flags.
 
816
        self._track_locks()
 
817
        self._clear_debug_flags()
 
818
        TestCase._active_threads = threading.activeCount()
 
819
        self.addCleanup(self._check_leaked_threads)
 
820
 
 
821
    def debug(self):
 
822
        # debug a frame up.
 
823
        import pdb
 
824
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
825
 
 
826
    def _check_leaked_threads(self):
 
827
        active = threading.activeCount()
 
828
        leaked_threads = active - TestCase._active_threads
 
829
        TestCase._active_threads = active
 
830
        if leaked_threads:
 
831
            TestCase._leaking_threads_tests += 1
 
832
            if TestCase._first_thread_leaker_id is None:
 
833
                TestCase._first_thread_leaker_id = self.id()
 
834
 
 
835
    def _clear_debug_flags(self):
 
836
        """Prevent externally set debug flags affecting tests.
 
837
 
 
838
        Tests that want to use debug flags can just set them in the
 
839
        debug_flags set during setup/teardown.
 
840
        """
 
841
        self._preserved_debug_flags = set(debug.debug_flags)
 
842
        if 'allow_debug' not in selftest_debug_flags:
 
843
            debug.debug_flags.clear()
 
844
        self.addCleanup(self._restore_debug_flags)
 
845
 
 
846
    def _clear_hooks(self):
 
847
        # prevent hooks affecting tests
 
848
        self._preserved_hooks = {}
 
849
        for key, factory in hooks.known_hooks.items():
 
850
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
851
            current_hooks = hooks.known_hooks_key_to_object(key)
 
852
            self._preserved_hooks[parent] = (name, current_hooks)
 
853
        self.addCleanup(self._restoreHooks)
 
854
        for key, factory in hooks.known_hooks.items():
 
855
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
856
            setattr(parent, name, factory())
 
857
        # this hook should always be installed
 
858
        request._install_hook()
 
859
 
 
860
    def _silenceUI(self):
 
861
        """Turn off UI for duration of test"""
 
862
        # by default the UI is off; tests can turn it on if they want it.
 
863
        saved = ui.ui_factory
 
864
        def _restore():
 
865
            ui.ui_factory = saved
 
866
        ui.ui_factory = ui.SilentUIFactory()
 
867
        self.addCleanup(_restore)
 
868
 
 
869
    def _check_locks(self):
 
870
        """Check that all lock take/release actions have been paired."""
 
871
        # once we have fixed all the current lock problems, we can change the
 
872
        # following code to always check for mismatched locks, but only do
 
873
        # traceback showing with -Dlock (self._lock_check_thorough is True).
 
874
        # For now, because the test suite will fail, we only assert that lock
 
875
        # matching has occured with -Dlock.
 
876
        # unhook:
 
877
        acquired_locks = [lock for action, lock in self._lock_actions
 
878
                          if action == 'acquired']
 
879
        released_locks = [lock for action, lock in self._lock_actions
 
880
                          if action == 'released']
 
881
        broken_locks = [lock for action, lock in self._lock_actions
 
882
                        if action == 'broken']
 
883
        # trivially, given the tests for lock acquistion and release, if we
 
884
        # have as many in each list, it should be ok. Some lock tests also
 
885
        # break some locks on purpose and should be taken into account by
 
886
        # considering that breaking a lock is just a dirty way of releasing it.
 
887
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
888
            message = ('Different number of acquired and '
 
889
                       'released or broken locks. (%s, %s + %s)' %
 
890
                       (acquired_locks, released_locks, broken_locks))
 
891
            if not self._lock_check_thorough:
 
892
                # Rather than fail, just warn
 
893
                print "Broken test %s: %s" % (self, message)
 
894
                return
 
895
            self.fail(message)
 
896
 
 
897
    def _track_locks(self):
 
898
        """Track lock activity during tests."""
 
899
        self._lock_actions = []
 
900
        self._lock_check_thorough = 'lock' not in debug.debug_flags
 
901
        self.addCleanup(self._check_locks)
 
902
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
903
                                                self._lock_acquired, None)
 
904
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
905
                                                self._lock_released, None)
 
906
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
907
                                                self._lock_broken, None)
 
908
 
 
909
    def _lock_acquired(self, result):
 
910
        self._lock_actions.append(('acquired', result))
 
911
 
 
912
    def _lock_released(self, result):
 
913
        self._lock_actions.append(('released', result))
 
914
 
 
915
    def _lock_broken(self, result):
 
916
        self._lock_actions.append(('broken', result))
 
917
 
 
918
    def _ndiff_strings(self, a, b):
 
919
        """Return ndiff between two strings containing lines.
 
920
 
 
921
        A trailing newline is added if missing to make the strings
 
922
        print properly."""
 
923
        if b and b[-1] != '\n':
 
924
            b += '\n'
 
925
        if a and a[-1] != '\n':
 
926
            a += '\n'
 
927
        difflines = difflib.ndiff(a.splitlines(True),
 
928
                                  b.splitlines(True),
 
929
                                  linejunk=lambda x: False,
 
930
                                  charjunk=lambda x: False)
 
931
        return ''.join(difflines)
 
932
 
 
933
    def assertEqual(self, a, b, message=''):
 
934
        try:
 
935
            if a == b:
 
936
                return
 
937
        except UnicodeError, e:
 
938
            # If we can't compare without getting a UnicodeError, then
 
939
            # obviously they are different
 
940
            mutter('UnicodeError: %s', e)
 
941
        if message:
 
942
            message += '\n'
 
943
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
944
            % (message,
 
945
               pformat(a), pformat(b)))
 
946
 
 
947
    assertEquals = assertEqual
 
948
 
 
949
    def assertEqualDiff(self, a, b, message=None):
 
950
        """Assert two texts are equal, if not raise an exception.
 
951
 
 
952
        This is intended for use with multi-line strings where it can
 
953
        be hard to find the differences by eye.
 
954
        """
 
955
        # TODO: perhaps override assertEquals to call this for strings?
 
956
        if a == b:
 
957
            return
 
958
        if message is None:
 
959
            message = "texts not equal:\n"
 
960
        if a == b + '\n':
 
961
            message = 'first string is missing a final newline.\n'
 
962
        if a + '\n' == b:
 
963
            message = 'second string is missing a final newline.\n'
 
964
        raise AssertionError(message +
 
965
                             self._ndiff_strings(a, b))
 
966
 
 
967
    def assertEqualMode(self, mode, mode_test):
 
968
        self.assertEqual(mode, mode_test,
 
969
                         'mode mismatch %o != %o' % (mode, mode_test))
 
970
 
 
971
    def assertEqualStat(self, expected, actual):
 
972
        """assert that expected and actual are the same stat result.
 
973
 
 
974
        :param expected: A stat result.
 
975
        :param actual: A stat result.
 
976
        :raises AssertionError: If the expected and actual stat values differ
 
977
            other than by atime.
 
978
        """
 
979
        self.assertEqual(expected.st_size, actual.st_size)
 
980
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
981
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
982
        self.assertEqual(expected.st_dev, actual.st_dev)
 
983
        self.assertEqual(expected.st_ino, actual.st_ino)
 
984
        self.assertEqual(expected.st_mode, actual.st_mode)
 
985
 
 
986
    def assertLength(self, length, obj_with_len):
 
987
        """Assert that obj_with_len is of length length."""
 
988
        if len(obj_with_len) != length:
 
989
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
990
                length, len(obj_with_len), obj_with_len))
 
991
 
 
992
    def assertPositive(self, val):
 
993
        """Assert that val is greater than 0."""
 
994
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
995
 
 
996
    def assertNegative(self, val):
 
997
        """Assert that val is less than 0."""
 
998
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
999
 
 
1000
    def assertStartsWith(self, s, prefix):
 
1001
        if not s.startswith(prefix):
 
1002
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1003
 
 
1004
    def assertEndsWith(self, s, suffix):
 
1005
        """Asserts that s ends with suffix."""
 
1006
        if not s.endswith(suffix):
 
1007
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1008
 
 
1009
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1010
        """Assert that a contains something matching a regular expression."""
 
1011
        if not re.search(needle_re, haystack, flags):
 
1012
            if '\n' in haystack or len(haystack) > 60:
 
1013
                # a long string, format it in a more readable way
 
1014
                raise AssertionError(
 
1015
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1016
                        % (needle_re, haystack))
 
1017
            else:
 
1018
                raise AssertionError('pattern "%s" not found in "%s"'
 
1019
                        % (needle_re, haystack))
 
1020
 
 
1021
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1022
        """Assert that a does not match a regular expression"""
 
1023
        if re.search(needle_re, haystack, flags):
 
1024
            raise AssertionError('pattern "%s" found in "%s"'
 
1025
                    % (needle_re, haystack))
 
1026
 
 
1027
    def assertSubset(self, sublist, superlist):
 
1028
        """Assert that every entry in sublist is present in superlist."""
 
1029
        missing = set(sublist) - set(superlist)
 
1030
        if len(missing) > 0:
 
1031
            raise AssertionError("value(s) %r not present in container %r" %
 
1032
                                 (missing, superlist))
 
1033
 
 
1034
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1035
        """Fail unless excClass is raised when the iterator from func is used.
 
1036
 
 
1037
        Many functions can return generators this makes sure
 
1038
        to wrap them in a list() call to make sure the whole generator
 
1039
        is run, and that the proper exception is raised.
 
1040
        """
 
1041
        try:
 
1042
            list(func(*args, **kwargs))
 
1043
        except excClass, e:
 
1044
            return e
 
1045
        else:
 
1046
            if getattr(excClass,'__name__', None) is not None:
 
1047
                excName = excClass.__name__
 
1048
            else:
 
1049
                excName = str(excClass)
 
1050
            raise self.failureException, "%s not raised" % excName
 
1051
 
 
1052
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1053
        """Assert that a callable raises a particular exception.
 
1054
 
 
1055
        :param excClass: As for the except statement, this may be either an
 
1056
            exception class, or a tuple of classes.
 
1057
        :param callableObj: A callable, will be passed ``*args`` and
 
1058
            ``**kwargs``.
 
1059
 
 
1060
        Returns the exception so that you can examine it.
 
1061
        """
 
1062
        try:
 
1063
            callableObj(*args, **kwargs)
 
1064
        except excClass, e:
 
1065
            return e
 
1066
        else:
 
1067
            if getattr(excClass,'__name__', None) is not None:
 
1068
                excName = excClass.__name__
 
1069
            else:
 
1070
                # probably a tuple
 
1071
                excName = str(excClass)
 
1072
            raise self.failureException, "%s not raised" % excName
 
1073
 
 
1074
    def assertIs(self, left, right, message=None):
 
1075
        if not (left is right):
 
1076
            if message is not None:
 
1077
                raise AssertionError(message)
 
1078
            else:
 
1079
                raise AssertionError("%r is not %r." % (left, right))
 
1080
 
 
1081
    def assertIsNot(self, left, right, message=None):
 
1082
        if (left is right):
 
1083
            if message is not None:
 
1084
                raise AssertionError(message)
 
1085
            else:
 
1086
                raise AssertionError("%r is %r." % (left, right))
 
1087
 
 
1088
    def assertTransportMode(self, transport, path, mode):
 
1089
        """Fail if a path does not have mode "mode".
 
1090
 
 
1091
        If modes are not supported on this transport, the assertion is ignored.
 
1092
        """
 
1093
        if not transport._can_roundtrip_unix_modebits():
 
1094
            return
 
1095
        path_stat = transport.stat(path)
 
1096
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1097
        self.assertEqual(mode, actual_mode,
 
1098
                         'mode of %r incorrect (%s != %s)'
 
1099
                         % (path, oct(mode), oct(actual_mode)))
 
1100
 
 
1101
    def assertIsSameRealPath(self, path1, path2):
 
1102
        """Fail if path1 and path2 points to different files"""
 
1103
        self.assertEqual(osutils.realpath(path1),
 
1104
                         osutils.realpath(path2),
 
1105
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1106
 
 
1107
    def assertIsInstance(self, obj, kls, msg=None):
 
1108
        """Fail if obj is not an instance of kls
30
1109
        
31
 
 
32
 
def selftest(verbose=False):
33
 
    from unittest import TestLoader, TestSuite
34
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
35
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
36
 
    from doctest import DocTestSuite
37
 
    import os
38
 
    import shutil
39
 
    import time
40
 
    import sys
41
 
    import unittest
42
 
 
43
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
44
 
 
45
 
    testmod_names = \
46
 
                  ['bzrlib.selftest.whitebox',
47
 
                   'bzrlib.selftest.versioning',
48
 
                   'bzrlib.selftest.testinv',
49
 
                   'bzrlib.selftest.testmerge3',
50
 
                   'bzrlib.selftest.testhashcache',
51
 
                   'bzrlib.selftest.teststatus',
52
 
                   'bzrlib.selftest.testlog',
53
 
                   'bzrlib.selftest.blackbox',
54
 
                   'bzrlib.selftest.testrevisionnamespaces',
55
 
                   'bzrlib.selftest.testbranch',
56
 
                   'bzrlib.selftest.testrevision',
57
 
                   'bzrlib.merge_core',
58
 
                   'bzrlib.selftest.testdiff',
 
1110
        :param msg: Supplementary message to show if the assertion fails.
 
1111
        """
 
1112
        if not isinstance(obj, kls):
 
1113
            m = "%r is an instance of %s rather than %s" % (
 
1114
                obj, obj.__class__, kls)
 
1115
            if msg:
 
1116
                m += ": " + msg
 
1117
            self.fail(m)
 
1118
 
 
1119
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1120
        """Invoke a test, expecting it to fail for the given reason.
 
1121
 
 
1122
        This is for assertions that ought to succeed, but currently fail.
 
1123
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1124
        about the failure you're expecting.  If a new bug is introduced,
 
1125
        AssertionError should be raised, not KnownFailure.
 
1126
 
 
1127
        Frequently, expectFailure should be followed by an opposite assertion.
 
1128
        See example below.
 
1129
 
 
1130
        Intended to be used with a callable that raises AssertionError as the
 
1131
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1132
 
 
1133
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1134
        test succeeds.
 
1135
 
 
1136
        example usage::
 
1137
 
 
1138
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1139
                             dynamic_val)
 
1140
          self.assertEqual(42, dynamic_val)
 
1141
 
 
1142
          This means that a dynamic_val of 54 will cause the test to raise
 
1143
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1144
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1145
          than 54 or 42 will cause an AssertionError.
 
1146
        """
 
1147
        try:
 
1148
            assertion(*args, **kwargs)
 
1149
        except AssertionError:
 
1150
            raise KnownFailure(reason)
 
1151
        else:
 
1152
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1153
 
 
1154
    def assertFileEqual(self, content, path):
 
1155
        """Fail if path does not contain 'content'."""
 
1156
        self.failUnlessExists(path)
 
1157
        f = file(path, 'rb')
 
1158
        try:
 
1159
            s = f.read()
 
1160
        finally:
 
1161
            f.close()
 
1162
        self.assertEqualDiff(content, s)
 
1163
 
 
1164
    def failUnlessExists(self, path):
 
1165
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1166
        if not isinstance(path, basestring):
 
1167
            for p in path:
 
1168
                self.failUnlessExists(p)
 
1169
        else:
 
1170
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1171
 
 
1172
    def failIfExists(self, path):
 
1173
        """Fail if path or paths, which may be abs or relative, exist."""
 
1174
        if not isinstance(path, basestring):
 
1175
            for p in path:
 
1176
                self.failIfExists(p)
 
1177
        else:
 
1178
            self.failIf(osutils.lexists(path),path+" exists")
 
1179
 
 
1180
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1181
        """A helper for callDeprecated and applyDeprecated.
 
1182
 
 
1183
        :param a_callable: A callable to call.
 
1184
        :param args: The positional arguments for the callable
 
1185
        :param kwargs: The keyword arguments for the callable
 
1186
        :return: A tuple (warnings, result). result is the result of calling
 
1187
            a_callable(``*args``, ``**kwargs``).
 
1188
        """
 
1189
        local_warnings = []
 
1190
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1191
            # we've hooked into a deprecation specific callpath,
 
1192
            # only deprecations should getting sent via it.
 
1193
            self.assertEqual(cls, DeprecationWarning)
 
1194
            local_warnings.append(msg)
 
1195
        original_warning_method = symbol_versioning.warn
 
1196
        symbol_versioning.set_warning_method(capture_warnings)
 
1197
        try:
 
1198
            result = a_callable(*args, **kwargs)
 
1199
        finally:
 
1200
            symbol_versioning.set_warning_method(original_warning_method)
 
1201
        return (local_warnings, result)
 
1202
 
 
1203
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1204
        """Call a deprecated callable without warning the user.
 
1205
 
 
1206
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1207
        not other callers that go direct to the warning module.
 
1208
 
 
1209
        To test that a deprecated method raises an error, do something like
 
1210
        this::
 
1211
 
 
1212
            self.assertRaises(errors.ReservedId,
 
1213
                self.applyDeprecated,
 
1214
                deprecated_in((1, 5, 0)),
 
1215
                br.append_revision,
 
1216
                'current:')
 
1217
 
 
1218
        :param deprecation_format: The deprecation format that the callable
 
1219
            should have been deprecated with. This is the same type as the
 
1220
            parameter to deprecated_method/deprecated_function. If the
 
1221
            callable is not deprecated with this format, an assertion error
 
1222
            will be raised.
 
1223
        :param a_callable: A callable to call. This may be a bound method or
 
1224
            a regular function. It will be called with ``*args`` and
 
1225
            ``**kwargs``.
 
1226
        :param args: The positional arguments for the callable
 
1227
        :param kwargs: The keyword arguments for the callable
 
1228
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1229
        """
 
1230
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1231
            *args, **kwargs)
 
1232
        expected_first_warning = symbol_versioning.deprecation_string(
 
1233
            a_callable, deprecation_format)
 
1234
        if len(call_warnings) == 0:
 
1235
            self.fail("No deprecation warning generated by call to %s" %
 
1236
                a_callable)
 
1237
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1238
        return result
 
1239
 
 
1240
    def callCatchWarnings(self, fn, *args, **kw):
 
1241
        """Call a callable that raises python warnings.
 
1242
 
 
1243
        The caller's responsible for examining the returned warnings.
 
1244
 
 
1245
        If the callable raises an exception, the exception is not
 
1246
        caught and propagates up to the caller.  In that case, the list
 
1247
        of warnings is not available.
 
1248
 
 
1249
        :returns: ([warning_object, ...], fn_result)
 
1250
        """
 
1251
        # XXX: This is not perfect, because it completely overrides the
 
1252
        # warnings filters, and some code may depend on suppressing particular
 
1253
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1254
        # though.  -- Andrew, 20071062
 
1255
        wlist = []
 
1256
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1257
            # despite the name, 'message' is normally(?) a Warning subclass
 
1258
            # instance
 
1259
            wlist.append(message)
 
1260
        saved_showwarning = warnings.showwarning
 
1261
        saved_filters = warnings.filters
 
1262
        try:
 
1263
            warnings.showwarning = _catcher
 
1264
            warnings.filters = []
 
1265
            result = fn(*args, **kw)
 
1266
        finally:
 
1267
            warnings.showwarning = saved_showwarning
 
1268
            warnings.filters = saved_filters
 
1269
        return wlist, result
 
1270
 
 
1271
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1272
        """Assert that a callable is deprecated in a particular way.
 
1273
 
 
1274
        This is a very precise test for unusual requirements. The
 
1275
        applyDeprecated helper function is probably more suited for most tests
 
1276
        as it allows you to simply specify the deprecation format being used
 
1277
        and will ensure that that is issued for the function being called.
 
1278
 
 
1279
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1280
        not other callers that go direct to the warning module.  To catch
 
1281
        general warnings, use callCatchWarnings.
 
1282
 
 
1283
        :param expected: a list of the deprecation warnings expected, in order
 
1284
        :param callable: The callable to call
 
1285
        :param args: The positional arguments for the callable
 
1286
        :param kwargs: The keyword arguments for the callable
 
1287
        """
 
1288
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1289
            *args, **kwargs)
 
1290
        self.assertEqual(expected, call_warnings)
 
1291
        return result
 
1292
 
 
1293
    def _startLogFile(self):
 
1294
        """Send bzr and test log messages to a temporary file.
 
1295
 
 
1296
        The file is removed as the test is torn down.
 
1297
        """
 
1298
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1299
        self._log_file = os.fdopen(fileno, 'w+')
 
1300
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1301
        self._log_file_name = name
 
1302
        self.addCleanup(self._finishLogFile)
 
1303
 
 
1304
    def _finishLogFile(self):
 
1305
        """Finished with the log file.
 
1306
 
 
1307
        Close the file and delete it, unless setKeepLogfile was called.
 
1308
        """
 
1309
        if self._log_file is None:
 
1310
            return
 
1311
        bzrlib.trace.pop_log_file(self._log_memento)
 
1312
        self._log_file.close()
 
1313
        self._log_file = None
 
1314
        if not self._keep_log_file:
 
1315
            os.remove(self._log_file_name)
 
1316
            self._log_file_name = None
 
1317
 
 
1318
    def setKeepLogfile(self):
 
1319
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1320
        self._keep_log_file = True
 
1321
 
 
1322
    def addCleanup(self, callable, *args, **kwargs):
 
1323
        """Arrange to run a callable when this case is torn down.
 
1324
 
 
1325
        Callables are run in the reverse of the order they are registered,
 
1326
        ie last-in first-out.
 
1327
        """
 
1328
        self._cleanups.append((callable, args, kwargs))
 
1329
 
 
1330
    def _cleanEnvironment(self):
 
1331
        new_env = {
 
1332
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1333
            'HOME': os.getcwd(),
 
1334
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1335
            # tests do check our impls match APPDATA
 
1336
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1337
            'VISUAL': None,
 
1338
            'EDITOR': None,
 
1339
            'BZR_EMAIL': None,
 
1340
            'BZREMAIL': None, # may still be present in the environment
 
1341
            'EMAIL': None,
 
1342
            'BZR_PROGRESS_BAR': None,
 
1343
            'BZR_LOG': None,
 
1344
            'BZR_PLUGIN_PATH': None,
 
1345
            # Make sure that any text ui tests are consistent regardless of
 
1346
            # the environment the test case is run in; you may want tests that
 
1347
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1348
            # going to a pipe or a StringIO.
 
1349
            'TERM': 'dumb',
 
1350
            'LINES': '25',
 
1351
            'COLUMNS': '80',
 
1352
            # SSH Agent
 
1353
            'SSH_AUTH_SOCK': None,
 
1354
            # Proxies
 
1355
            'http_proxy': None,
 
1356
            'HTTP_PROXY': None,
 
1357
            'https_proxy': None,
 
1358
            'HTTPS_PROXY': None,
 
1359
            'no_proxy': None,
 
1360
            'NO_PROXY': None,
 
1361
            'all_proxy': None,
 
1362
            'ALL_PROXY': None,
 
1363
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1364
            # least. If you do (care), please update this comment
 
1365
            # -- vila 20080401
 
1366
            'ftp_proxy': None,
 
1367
            'FTP_PROXY': None,
 
1368
            'BZR_REMOTE_PATH': None,
 
1369
        }
 
1370
        self.__old_env = {}
 
1371
        self.addCleanup(self._restoreEnvironment)
 
1372
        for name, value in new_env.iteritems():
 
1373
            self._captureVar(name, value)
 
1374
 
 
1375
    def _captureVar(self, name, newvalue):
 
1376
        """Set an environment variable, and reset it when finished."""
 
1377
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1378
 
 
1379
    def _restore_debug_flags(self):
 
1380
        debug.debug_flags.clear()
 
1381
        debug.debug_flags.update(self._preserved_debug_flags)
 
1382
 
 
1383
    def _restoreEnvironment(self):
 
1384
        for name, value in self.__old_env.iteritems():
 
1385
            osutils.set_or_unset_env(name, value)
 
1386
 
 
1387
    def _restoreHooks(self):
 
1388
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1389
            setattr(klass, name, hooks)
 
1390
 
 
1391
    def knownFailure(self, reason):
 
1392
        """This test has failed for some known reason."""
 
1393
        raise KnownFailure(reason)
 
1394
 
 
1395
    def _do_skip(self, result, reason):
 
1396
        addSkip = getattr(result, 'addSkip', None)
 
1397
        if not callable(addSkip):
 
1398
            result.addError(self, sys.exc_info())
 
1399
        else:
 
1400
            addSkip(self, reason)
 
1401
 
 
1402
    def run(self, result=None):
 
1403
        if result is None: result = self.defaultTestResult()
 
1404
        for feature in getattr(self, '_test_needs_features', []):
 
1405
            if not feature.available():
 
1406
                result.startTest(self)
 
1407
                if getattr(result, 'addNotSupported', None):
 
1408
                    result.addNotSupported(self, feature)
 
1409
                else:
 
1410
                    result.addSuccess(self)
 
1411
                result.stopTest(self)
 
1412
                return result
 
1413
        try:
 
1414
            try:
 
1415
                result.startTest(self)
 
1416
                absent_attr = object()
 
1417
                # Python 2.5
 
1418
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1419
                if method_name is absent_attr:
 
1420
                    # Python 2.4
 
1421
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1422
                testMethod = getattr(self, method_name)
 
1423
                try:
 
1424
                    try:
 
1425
                        self.setUp()
 
1426
                        if not self._bzr_test_setUp_run:
 
1427
                            self.fail(
 
1428
                                "test setUp did not invoke "
 
1429
                                "bzrlib.tests.TestCase's setUp")
 
1430
                    except KeyboardInterrupt:
 
1431
                        self._runCleanups()
 
1432
                        raise
 
1433
                    except TestSkipped, e:
 
1434
                        self._do_skip(result, e.args[0])
 
1435
                        self.tearDown()
 
1436
                        return result
 
1437
                    except:
 
1438
                        result.addError(self, sys.exc_info())
 
1439
                        self._runCleanups()
 
1440
                        return result
 
1441
 
 
1442
                    ok = False
 
1443
                    try:
 
1444
                        testMethod()
 
1445
                        ok = True
 
1446
                    except self.failureException:
 
1447
                        result.addFailure(self, sys.exc_info())
 
1448
                    except TestSkipped, e:
 
1449
                        if not e.args:
 
1450
                            reason = "No reason given."
 
1451
                        else:
 
1452
                            reason = e.args[0]
 
1453
                        self._do_skip(result, reason)
 
1454
                    except KeyboardInterrupt:
 
1455
                        self._runCleanups()
 
1456
                        raise
 
1457
                    except:
 
1458
                        result.addError(self, sys.exc_info())
 
1459
 
 
1460
                    try:
 
1461
                        self.tearDown()
 
1462
                        if not self._bzr_test_tearDown_run:
 
1463
                            self.fail(
 
1464
                                "test tearDown did not invoke "
 
1465
                                "bzrlib.tests.TestCase's tearDown")
 
1466
                    except KeyboardInterrupt:
 
1467
                        self._runCleanups()
 
1468
                        raise
 
1469
                    except:
 
1470
                        result.addError(self, sys.exc_info())
 
1471
                        self._runCleanups()
 
1472
                        ok = False
 
1473
                    if ok: result.addSuccess(self)
 
1474
                finally:
 
1475
                    result.stopTest(self)
 
1476
                return result
 
1477
            except TestNotApplicable:
 
1478
                # Not moved from the result [yet].
 
1479
                self._runCleanups()
 
1480
                raise
 
1481
            except KeyboardInterrupt:
 
1482
                self._runCleanups()
 
1483
                raise
 
1484
        finally:
 
1485
            saved_attrs = {}
 
1486
            for attr_name in self.attrs_to_keep:
 
1487
                if attr_name in self.__dict__:
 
1488
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1489
            self.__dict__ = saved_attrs
 
1490
 
 
1491
    def tearDown(self):
 
1492
        self._runCleanups()
 
1493
        self._log_contents = ''
 
1494
        self._bzr_test_tearDown_run = True
 
1495
        unittest.TestCase.tearDown(self)
 
1496
 
 
1497
    def time(self, callable, *args, **kwargs):
 
1498
        """Run callable and accrue the time it takes to the benchmark time.
 
1499
 
 
1500
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1501
        this will cause lsprofile statistics to be gathered and stored in
 
1502
        self._benchcalls.
 
1503
        """
 
1504
        if self._benchtime is None:
 
1505
            self._benchtime = 0
 
1506
        start = time.time()
 
1507
        try:
 
1508
            if not self._gather_lsprof_in_benchmarks:
 
1509
                return callable(*args, **kwargs)
 
1510
            else:
 
1511
                # record this benchmark
 
1512
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1513
                stats.sort()
 
1514
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1515
                return ret
 
1516
        finally:
 
1517
            self._benchtime += time.time() - start
 
1518
 
 
1519
    def _runCleanups(self):
 
1520
        """Run registered cleanup functions.
 
1521
 
 
1522
        This should only be called from TestCase.tearDown.
 
1523
        """
 
1524
        # TODO: Perhaps this should keep running cleanups even if
 
1525
        # one of them fails?
 
1526
 
 
1527
        # Actually pop the cleanups from the list so tearDown running
 
1528
        # twice is safe (this happens for skipped tests).
 
1529
        while self._cleanups:
 
1530
            cleanup, args, kwargs = self._cleanups.pop()
 
1531
            cleanup(*args, **kwargs)
 
1532
 
 
1533
    def log(self, *args):
 
1534
        mutter(*args)
 
1535
 
 
1536
    def _get_log(self, keep_log_file=False):
 
1537
        """Get the log from bzrlib.trace calls from this test.
 
1538
 
 
1539
        :param keep_log_file: When True, if the log is still a file on disk
 
1540
            leave it as a file on disk. When False, if the log is still a file
 
1541
            on disk, the log file is deleted and the log preserved as
 
1542
            self._log_contents.
 
1543
        :return: A string containing the log.
 
1544
        """
 
1545
        # flush the log file, to get all content
 
1546
        import bzrlib.trace
 
1547
        if bzrlib.trace._trace_file:
 
1548
            bzrlib.trace._trace_file.flush()
 
1549
        if self._log_contents:
 
1550
            # XXX: this can hardly contain the content flushed above --vila
 
1551
            # 20080128
 
1552
            return self._log_contents
 
1553
        if self._log_file_name is not None:
 
1554
            logfile = open(self._log_file_name)
 
1555
            try:
 
1556
                log_contents = logfile.read()
 
1557
            finally:
 
1558
                logfile.close()
 
1559
            if not keep_log_file:
 
1560
                self._log_contents = log_contents
 
1561
                try:
 
1562
                    os.remove(self._log_file_name)
 
1563
                except OSError, e:
 
1564
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1565
                        sys.stderr.write(('Unable to delete log file '
 
1566
                                             ' %r\n' % self._log_file_name))
 
1567
                    else:
 
1568
                        raise
 
1569
            return log_contents
 
1570
        else:
 
1571
            return "DELETED log file to reduce memory footprint"
 
1572
 
 
1573
    def requireFeature(self, feature):
 
1574
        """This test requires a specific feature is available.
 
1575
 
 
1576
        :raises UnavailableFeature: When feature is not available.
 
1577
        """
 
1578
        if not feature.available():
 
1579
            raise UnavailableFeature(feature)
 
1580
 
 
1581
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1582
            working_dir):
 
1583
        """Run bazaar command line, splitting up a string command line."""
 
1584
        if isinstance(args, basestring):
 
1585
            # shlex don't understand unicode strings,
 
1586
            # so args should be plain string (bialix 20070906)
 
1587
            args = list(shlex.split(str(args)))
 
1588
        return self._run_bzr_core(args, retcode=retcode,
 
1589
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1590
                )
 
1591
 
 
1592
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1593
            working_dir):
 
1594
        if encoding is None:
 
1595
            encoding = osutils.get_user_encoding()
 
1596
        stdout = StringIOWrapper()
 
1597
        stderr = StringIOWrapper()
 
1598
        stdout.encoding = encoding
 
1599
        stderr.encoding = encoding
 
1600
 
 
1601
        self.log('run bzr: %r', args)
 
1602
        # FIXME: don't call into logging here
 
1603
        handler = logging.StreamHandler(stderr)
 
1604
        handler.setLevel(logging.INFO)
 
1605
        logger = logging.getLogger('')
 
1606
        logger.addHandler(handler)
 
1607
        old_ui_factory = ui.ui_factory
 
1608
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1609
 
 
1610
        cwd = None
 
1611
        if working_dir is not None:
 
1612
            cwd = osutils.getcwd()
 
1613
            os.chdir(working_dir)
 
1614
 
 
1615
        try:
 
1616
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1617
                stdout, stderr,
 
1618
                bzrlib.commands.run_bzr_catch_user_errors,
 
1619
                args)
 
1620
        finally:
 
1621
            logger.removeHandler(handler)
 
1622
            ui.ui_factory = old_ui_factory
 
1623
            if cwd is not None:
 
1624
                os.chdir(cwd)
 
1625
 
 
1626
        out = stdout.getvalue()
 
1627
        err = stderr.getvalue()
 
1628
        if out:
 
1629
            self.log('output:\n%r', out)
 
1630
        if err:
 
1631
            self.log('errors:\n%r', err)
 
1632
        if retcode is not None:
 
1633
            self.assertEquals(retcode, result,
 
1634
                              message='Unexpected return code')
 
1635
        return out, err
 
1636
 
 
1637
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1638
                working_dir=None, error_regexes=[], output_encoding=None):
 
1639
        """Invoke bzr, as if it were run from the command line.
 
1640
 
 
1641
        The argument list should not include the bzr program name - the
 
1642
        first argument is normally the bzr command.  Arguments may be
 
1643
        passed in three ways:
 
1644
 
 
1645
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1646
        when the command contains whitespace or metacharacters, or
 
1647
        is built up at run time.
 
1648
 
 
1649
        2- A single string, eg "add a".  This is the most convenient
 
1650
        for hardcoded commands.
 
1651
 
 
1652
        This runs bzr through the interface that catches and reports
 
1653
        errors, and with logging set to something approximating the
 
1654
        default, so that error reporting can be checked.
 
1655
 
 
1656
        This should be the main method for tests that want to exercise the
 
1657
        overall behavior of the bzr application (rather than a unit test
 
1658
        or a functional test of the library.)
 
1659
 
 
1660
        This sends the stdout/stderr results into the test's log,
 
1661
        where it may be useful for debugging.  See also run_captured.
 
1662
 
 
1663
        :keyword stdin: A string to be used as stdin for the command.
 
1664
        :keyword retcode: The status code the command should return;
 
1665
            default 0.
 
1666
        :keyword working_dir: The directory to run the command in
 
1667
        :keyword error_regexes: A list of expected error messages.  If
 
1668
            specified they must be seen in the error output of the command.
 
1669
        """
 
1670
        out, err = self._run_bzr_autosplit(
 
1671
            args=args,
 
1672
            retcode=retcode,
 
1673
            encoding=encoding,
 
1674
            stdin=stdin,
 
1675
            working_dir=working_dir,
 
1676
            )
 
1677
        self.assertIsInstance(error_regexes, (list, tuple))
 
1678
        for regex in error_regexes:
 
1679
            self.assertContainsRe(err, regex)
 
1680
        return out, err
 
1681
 
 
1682
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1683
        """Run bzr, and check that stderr contains the supplied regexes
 
1684
 
 
1685
        :param error_regexes: Sequence of regular expressions which
 
1686
            must each be found in the error output. The relative ordering
 
1687
            is not enforced.
 
1688
        :param args: command-line arguments for bzr
 
1689
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1690
            This function changes the default value of retcode to be 3,
 
1691
            since in most cases this is run when you expect bzr to fail.
 
1692
 
 
1693
        :return: (out, err) The actual output of running the command (in case
 
1694
            you want to do more inspection)
 
1695
 
 
1696
        Examples of use::
 
1697
 
 
1698
            # Make sure that commit is failing because there is nothing to do
 
1699
            self.run_bzr_error(['no changes to commit'],
 
1700
                               ['commit', '-m', 'my commit comment'])
 
1701
            # Make sure --strict is handling an unknown file, rather than
 
1702
            # giving us the 'nothing to do' error
 
1703
            self.build_tree(['unknown'])
 
1704
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1705
                               ['commit', --strict', '-m', 'my commit comment'])
 
1706
        """
 
1707
        kwargs.setdefault('retcode', 3)
 
1708
        kwargs['error_regexes'] = error_regexes
 
1709
        out, err = self.run_bzr(*args, **kwargs)
 
1710
        return out, err
 
1711
 
 
1712
    def run_bzr_subprocess(self, *args, **kwargs):
 
1713
        """Run bzr in a subprocess for testing.
 
1714
 
 
1715
        This starts a new Python interpreter and runs bzr in there.
 
1716
        This should only be used for tests that have a justifiable need for
 
1717
        this isolation: e.g. they are testing startup time, or signal
 
1718
        handling, or early startup code, etc.  Subprocess code can't be
 
1719
        profiled or debugged so easily.
 
1720
 
 
1721
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1722
            None is supplied, the status code is not checked.
 
1723
        :keyword env_changes: A dictionary which lists changes to environment
 
1724
            variables. A value of None will unset the env variable.
 
1725
            The values must be strings. The change will only occur in the
 
1726
            child, so you don't need to fix the environment after running.
 
1727
        :keyword universal_newlines: Convert CRLF => LF
 
1728
        :keyword allow_plugins: By default the subprocess is run with
 
1729
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1730
            for system-wide plugins to create unexpected output on stderr,
 
1731
            which can cause unnecessary test failures.
 
1732
        """
 
1733
        env_changes = kwargs.get('env_changes', {})
 
1734
        working_dir = kwargs.get('working_dir', None)
 
1735
        allow_plugins = kwargs.get('allow_plugins', False)
 
1736
        if len(args) == 1:
 
1737
            if isinstance(args[0], list):
 
1738
                args = args[0]
 
1739
            elif isinstance(args[0], basestring):
 
1740
                args = list(shlex.split(args[0]))
 
1741
        else:
 
1742
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1743
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1744
                                            working_dir=working_dir,
 
1745
                                            allow_plugins=allow_plugins)
 
1746
        # We distinguish between retcode=None and retcode not passed.
 
1747
        supplied_retcode = kwargs.get('retcode', 0)
 
1748
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1749
            universal_newlines=kwargs.get('universal_newlines', False),
 
1750
            process_args=args)
 
1751
 
 
1752
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1753
                             skip_if_plan_to_signal=False,
 
1754
                             working_dir=None,
 
1755
                             allow_plugins=False):
 
1756
        """Start bzr in a subprocess for testing.
 
1757
 
 
1758
        This starts a new Python interpreter and runs bzr in there.
 
1759
        This should only be used for tests that have a justifiable need for
 
1760
        this isolation: e.g. they are testing startup time, or signal
 
1761
        handling, or early startup code, etc.  Subprocess code can't be
 
1762
        profiled or debugged so easily.
 
1763
 
 
1764
        :param process_args: a list of arguments to pass to the bzr executable,
 
1765
            for example ``['--version']``.
 
1766
        :param env_changes: A dictionary which lists changes to environment
 
1767
            variables. A value of None will unset the env variable.
 
1768
            The values must be strings. The change will only occur in the
 
1769
            child, so you don't need to fix the environment after running.
 
1770
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1771
            is not available.
 
1772
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1773
 
 
1774
        :returns: Popen object for the started process.
 
1775
        """
 
1776
        if skip_if_plan_to_signal:
 
1777
            if not getattr(os, 'kill', None):
 
1778
                raise TestSkipped("os.kill not available.")
 
1779
 
 
1780
        if env_changes is None:
 
1781
            env_changes = {}
 
1782
        old_env = {}
 
1783
 
 
1784
        def cleanup_environment():
 
1785
            for env_var, value in env_changes.iteritems():
 
1786
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1787
 
 
1788
        def restore_environment():
 
1789
            for env_var, value in old_env.iteritems():
 
1790
                osutils.set_or_unset_env(env_var, value)
 
1791
 
 
1792
        bzr_path = self.get_bzr_path()
 
1793
 
 
1794
        cwd = None
 
1795
        if working_dir is not None:
 
1796
            cwd = osutils.getcwd()
 
1797
            os.chdir(working_dir)
 
1798
 
 
1799
        try:
 
1800
            # win32 subprocess doesn't support preexec_fn
 
1801
            # so we will avoid using it on all platforms, just to
 
1802
            # make sure the code path is used, and we don't break on win32
 
1803
            cleanup_environment()
 
1804
            command = [sys.executable]
 
1805
            # frozen executables don't need the path to bzr
 
1806
            if getattr(sys, "frozen", None) is None:
 
1807
                command.append(bzr_path)
 
1808
            if not allow_plugins:
 
1809
                command.append('--no-plugins')
 
1810
            command.extend(process_args)
 
1811
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1812
        finally:
 
1813
            restore_environment()
 
1814
            if cwd is not None:
 
1815
                os.chdir(cwd)
 
1816
 
 
1817
        return process
 
1818
 
 
1819
    def _popen(self, *args, **kwargs):
 
1820
        """Place a call to Popen.
 
1821
 
 
1822
        Allows tests to override this method to intercept the calls made to
 
1823
        Popen for introspection.
 
1824
        """
 
1825
        return Popen(*args, **kwargs)
 
1826
 
 
1827
    def get_bzr_path(self):
 
1828
        """Return the path of the 'bzr' executable for this test suite."""
 
1829
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1830
        if not os.path.isfile(bzr_path):
 
1831
            # We are probably installed. Assume sys.argv is the right file
 
1832
            bzr_path = sys.argv[0]
 
1833
        return bzr_path
 
1834
 
 
1835
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1836
                              universal_newlines=False, process_args=None):
 
1837
        """Finish the execution of process.
 
1838
 
 
1839
        :param process: the Popen object returned from start_bzr_subprocess.
 
1840
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1841
            None is supplied, the status code is not checked.
 
1842
        :param send_signal: an optional signal to send to the process.
 
1843
        :param universal_newlines: Convert CRLF => LF
 
1844
        :returns: (stdout, stderr)
 
1845
        """
 
1846
        if send_signal is not None:
 
1847
            os.kill(process.pid, send_signal)
 
1848
        out, err = process.communicate()
 
1849
 
 
1850
        if universal_newlines:
 
1851
            out = out.replace('\r\n', '\n')
 
1852
            err = err.replace('\r\n', '\n')
 
1853
 
 
1854
        if retcode is not None and retcode != process.returncode:
 
1855
            if process_args is None:
 
1856
                process_args = "(unknown args)"
 
1857
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1858
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1859
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1860
                      % (process_args, retcode, process.returncode))
 
1861
        return [out, err]
 
1862
 
 
1863
    def check_inventory_shape(self, inv, shape):
 
1864
        """Compare an inventory to a list of expected names.
 
1865
 
 
1866
        Fail if they are not precisely equal.
 
1867
        """
 
1868
        extras = []
 
1869
        shape = list(shape)             # copy
 
1870
        for path, ie in inv.entries():
 
1871
            name = path.replace('\\', '/')
 
1872
            if ie.kind == 'directory':
 
1873
                name = name + '/'
 
1874
            if name in shape:
 
1875
                shape.remove(name)
 
1876
            else:
 
1877
                extras.append(name)
 
1878
        if shape:
 
1879
            self.fail("expected paths not found in inventory: %r" % shape)
 
1880
        if extras:
 
1881
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1882
 
 
1883
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1884
                         a_callable=None, *args, **kwargs):
 
1885
        """Call callable with redirected std io pipes.
 
1886
 
 
1887
        Returns the return code."""
 
1888
        if not callable(a_callable):
 
1889
            raise ValueError("a_callable must be callable.")
 
1890
        if stdin is None:
 
1891
            stdin = StringIO("")
 
1892
        if stdout is None:
 
1893
            if getattr(self, "_log_file", None) is not None:
 
1894
                stdout = self._log_file
 
1895
            else:
 
1896
                stdout = StringIO()
 
1897
        if stderr is None:
 
1898
            if getattr(self, "_log_file", None is not None):
 
1899
                stderr = self._log_file
 
1900
            else:
 
1901
                stderr = StringIO()
 
1902
        real_stdin = sys.stdin
 
1903
        real_stdout = sys.stdout
 
1904
        real_stderr = sys.stderr
 
1905
        try:
 
1906
            sys.stdout = stdout
 
1907
            sys.stderr = stderr
 
1908
            sys.stdin = stdin
 
1909
            return a_callable(*args, **kwargs)
 
1910
        finally:
 
1911
            sys.stdout = real_stdout
 
1912
            sys.stderr = real_stderr
 
1913
            sys.stdin = real_stdin
 
1914
 
 
1915
    def reduceLockdirTimeout(self):
 
1916
        """Reduce the default lock timeout for the duration of the test, so that
 
1917
        if LockContention occurs during a test, it does so quickly.
 
1918
 
 
1919
        Tests that expect to provoke LockContention errors should call this.
 
1920
        """
 
1921
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1922
        def resetTimeout():
 
1923
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1924
        self.addCleanup(resetTimeout)
 
1925
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1926
 
 
1927
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1928
        """Return a StringIOWrapper instance, that will encode Unicode
 
1929
        input to UTF-8.
 
1930
        """
 
1931
        if encoding_type is None:
 
1932
            encoding_type = 'strict'
 
1933
        sio = StringIO()
 
1934
        output_encoding = 'utf-8'
 
1935
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1936
        sio.encoding = output_encoding
 
1937
        return sio
 
1938
 
 
1939
 
 
1940
class CapturedCall(object):
 
1941
    """A helper for capturing smart server calls for easy debug analysis."""
 
1942
 
 
1943
    def __init__(self, params, prefix_length):
 
1944
        """Capture the call with params and skip prefix_length stack frames."""
 
1945
        self.call = params
 
1946
        import traceback
 
1947
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
1948
        # client frames. Beyond this we could get more clever, but this is good
 
1949
        # enough for now.
 
1950
        stack = traceback.extract_stack()[prefix_length:-5]
 
1951
        self.stack = ''.join(traceback.format_list(stack))
 
1952
 
 
1953
    def __str__(self):
 
1954
        return self.call.method
 
1955
 
 
1956
    def __repr__(self):
 
1957
        return self.call.method
 
1958
 
 
1959
    def stack(self):
 
1960
        return self.stack
 
1961
 
 
1962
 
 
1963
class TestCaseWithMemoryTransport(TestCase):
 
1964
    """Common test class for tests that do not need disk resources.
 
1965
 
 
1966
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1967
 
 
1968
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1969
 
 
1970
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1971
    a directory which does not exist. This serves to help ensure test isolation
 
1972
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1973
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1974
    file defaults for the transport in tests, nor does it obey the command line
 
1975
    override, so tests that accidentally write to the common directory should
 
1976
    be rare.
 
1977
 
 
1978
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1979
    a .bzr directory that stops us ascending higher into the filesystem.
 
1980
    """
 
1981
 
 
1982
    TEST_ROOT = None
 
1983
    _TEST_NAME = 'test'
 
1984
 
 
1985
    def __init__(self, methodName='runTest'):
 
1986
        # allow test parameterization after test construction and before test
 
1987
        # execution. Variables that the parameterizer sets need to be
 
1988
        # ones that are not set by setUp, or setUp will trash them.
 
1989
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1990
        self.vfs_transport_factory = default_transport
 
1991
        self.transport_server = None
 
1992
        self.transport_readonly_server = None
 
1993
        self.__vfs_server = None
 
1994
 
 
1995
    def get_transport(self, relpath=None):
 
1996
        """Return a writeable transport.
 
1997
 
 
1998
        This transport is for the test scratch space relative to
 
1999
        "self._test_root"
 
2000
 
 
2001
        :param relpath: a path relative to the base url.
 
2002
        """
 
2003
        t = get_transport(self.get_url(relpath))
 
2004
        self.assertFalse(t.is_readonly())
 
2005
        return t
 
2006
 
 
2007
    def get_readonly_transport(self, relpath=None):
 
2008
        """Return a readonly transport for the test scratch space
 
2009
 
 
2010
        This can be used to test that operations which should only need
 
2011
        readonly access in fact do not try to write.
 
2012
 
 
2013
        :param relpath: a path relative to the base url.
 
2014
        """
 
2015
        t = get_transport(self.get_readonly_url(relpath))
 
2016
        self.assertTrue(t.is_readonly())
 
2017
        return t
 
2018
 
 
2019
    def create_transport_readonly_server(self):
 
2020
        """Create a transport server from class defined at init.
 
2021
 
 
2022
        This is mostly a hook for daughter classes.
 
2023
        """
 
2024
        return self.transport_readonly_server()
 
2025
 
 
2026
    def get_readonly_server(self):
 
2027
        """Get the server instance for the readonly transport
 
2028
 
 
2029
        This is useful for some tests with specific servers to do diagnostics.
 
2030
        """
 
2031
        if self.__readonly_server is None:
 
2032
            if self.transport_readonly_server is None:
 
2033
                # readonly decorator requested
 
2034
                # bring up the server
 
2035
                self.__readonly_server = ReadonlyServer()
 
2036
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2037
            else:
 
2038
                self.__readonly_server = self.create_transport_readonly_server()
 
2039
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2040
            self.addCleanup(self.__readonly_server.tearDown)
 
2041
        return self.__readonly_server
 
2042
 
 
2043
    def get_readonly_url(self, relpath=None):
 
2044
        """Get a URL for the readonly transport.
 
2045
 
 
2046
        This will either be backed by '.' or a decorator to the transport
 
2047
        used by self.get_url()
 
2048
        relpath provides for clients to get a path relative to the base url.
 
2049
        These should only be downwards relative, not upwards.
 
2050
        """
 
2051
        base = self.get_readonly_server().get_url()
 
2052
        return self._adjust_url(base, relpath)
 
2053
 
 
2054
    def get_vfs_only_server(self):
 
2055
        """Get the vfs only read/write server instance.
 
2056
 
 
2057
        This is useful for some tests with specific servers that need
 
2058
        diagnostics.
 
2059
 
 
2060
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2061
        is no means to override it.
 
2062
        """
 
2063
        if self.__vfs_server is None:
 
2064
            self.__vfs_server = MemoryServer()
 
2065
            self.__vfs_server.setUp()
 
2066
            self.addCleanup(self.__vfs_server.tearDown)
 
2067
        return self.__vfs_server
 
2068
 
 
2069
    def get_server(self):
 
2070
        """Get the read/write server instance.
 
2071
 
 
2072
        This is useful for some tests with specific servers that need
 
2073
        diagnostics.
 
2074
 
 
2075
        This is built from the self.transport_server factory. If that is None,
 
2076
        then the self.get_vfs_server is returned.
 
2077
        """
 
2078
        if self.__server is None:
 
2079
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
2080
                return self.get_vfs_only_server()
 
2081
            else:
 
2082
                # bring up a decorated means of access to the vfs only server.
 
2083
                self.__server = self.transport_server()
 
2084
                try:
 
2085
                    self.__server.setUp(self.get_vfs_only_server())
 
2086
                except TypeError, e:
 
2087
                    # This should never happen; the try:Except here is to assist
 
2088
                    # developers having to update code rather than seeing an
 
2089
                    # uninformative TypeError.
 
2090
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
2091
            self.addCleanup(self.__server.tearDown)
 
2092
        return self.__server
 
2093
 
 
2094
    def _adjust_url(self, base, relpath):
 
2095
        """Get a URL (or maybe a path) for the readwrite transport.
 
2096
 
 
2097
        This will either be backed by '.' or to an equivalent non-file based
 
2098
        facility.
 
2099
        relpath provides for clients to get a path relative to the base url.
 
2100
        These should only be downwards relative, not upwards.
 
2101
        """
 
2102
        if relpath is not None and relpath != '.':
 
2103
            if not base.endswith('/'):
 
2104
                base = base + '/'
 
2105
            # XXX: Really base should be a url; we did after all call
 
2106
            # get_url()!  But sometimes it's just a path (from
 
2107
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2108
            # to a non-escaped local path.
 
2109
            if base.startswith('./') or base.startswith('/'):
 
2110
                base += relpath
 
2111
            else:
 
2112
                base += urlutils.escape(relpath)
 
2113
        return base
 
2114
 
 
2115
    def get_url(self, relpath=None):
 
2116
        """Get a URL (or maybe a path) for the readwrite transport.
 
2117
 
 
2118
        This will either be backed by '.' or to an equivalent non-file based
 
2119
        facility.
 
2120
        relpath provides for clients to get a path relative to the base url.
 
2121
        These should only be downwards relative, not upwards.
 
2122
        """
 
2123
        base = self.get_server().get_url()
 
2124
        return self._adjust_url(base, relpath)
 
2125
 
 
2126
    def get_vfs_only_url(self, relpath=None):
 
2127
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2128
 
 
2129
        This will never be a smart protocol.  It always has all the
 
2130
        capabilities of the local filesystem, but it might actually be a
 
2131
        MemoryTransport or some other similar virtual filesystem.
 
2132
 
 
2133
        This is the backing transport (if any) of the server returned by
 
2134
        get_url and get_readonly_url.
 
2135
 
 
2136
        :param relpath: provides for clients to get a path relative to the base
 
2137
            url.  These should only be downwards relative, not upwards.
 
2138
        :return: A URL
 
2139
        """
 
2140
        base = self.get_vfs_only_server().get_url()
 
2141
        return self._adjust_url(base, relpath)
 
2142
 
 
2143
    def _create_safety_net(self):
 
2144
        """Make a fake bzr directory.
 
2145
 
 
2146
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2147
        real branch.
 
2148
        """
 
2149
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2150
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2151
 
 
2152
    def _check_safety_net(self):
 
2153
        """Check that the safety .bzr directory have not been touched.
 
2154
 
 
2155
        _make_test_root have created a .bzr directory to prevent tests from
 
2156
        propagating. This method ensures than a test did not leaked.
 
2157
        """
 
2158
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2159
        wt = workingtree.WorkingTree.open(root)
 
2160
        last_rev = wt.last_revision()
 
2161
        if last_rev != 'null:':
 
2162
            # The current test have modified the /bzr directory, we need to
 
2163
            # recreate a new one or all the followng tests will fail.
 
2164
            # If you need to inspect its content uncomment the following line
 
2165
            # import pdb; pdb.set_trace()
 
2166
            _rmtree_temp_dir(root + '/.bzr')
 
2167
            self._create_safety_net()
 
2168
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2169
 
 
2170
    def _make_test_root(self):
 
2171
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2172
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2173
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2174
 
 
2175
            self._create_safety_net()
 
2176
 
 
2177
            # The same directory is used by all tests, and we're not
 
2178
            # specifically told when all tests are finished.  This will do.
 
2179
            atexit.register(_rmtree_temp_dir, root)
 
2180
 
 
2181
        self.addCleanup(self._check_safety_net)
 
2182
 
 
2183
    def makeAndChdirToTestDir(self):
 
2184
        """Create a temporary directories for this one test.
 
2185
 
 
2186
        This must set self.test_home_dir and self.test_dir and chdir to
 
2187
        self.test_dir.
 
2188
 
 
2189
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2190
        """
 
2191
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2192
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2193
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2194
 
 
2195
    def make_branch(self, relpath, format=None):
 
2196
        """Create a branch on the transport at relpath."""
 
2197
        repo = self.make_repository(relpath, format=format)
 
2198
        return repo.bzrdir.create_branch()
 
2199
 
 
2200
    def make_bzrdir(self, relpath, format=None):
 
2201
        try:
 
2202
            # might be a relative or absolute path
 
2203
            maybe_a_url = self.get_url(relpath)
 
2204
            segments = maybe_a_url.rsplit('/', 1)
 
2205
            t = get_transport(maybe_a_url)
 
2206
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2207
                t.ensure_base()
 
2208
            if format is None:
 
2209
                format = 'default'
 
2210
            if isinstance(format, basestring):
 
2211
                format = bzrdir.format_registry.make_bzrdir(format)
 
2212
            return format.initialize_on_transport(t)
 
2213
        except errors.UninitializableFormat:
 
2214
            raise TestSkipped("Format %s is not initializable." % format)
 
2215
 
 
2216
    def make_repository(self, relpath, shared=False, format=None):
 
2217
        """Create a repository on our default transport at relpath.
 
2218
 
 
2219
        Note that relpath must be a relative path, not a full url.
 
2220
        """
 
2221
        # FIXME: If you create a remoterepository this returns the underlying
 
2222
        # real format, which is incorrect.  Actually we should make sure that
 
2223
        # RemoteBzrDir returns a RemoteRepository.
 
2224
        # maybe  mbp 20070410
 
2225
        made_control = self.make_bzrdir(relpath, format=format)
 
2226
        return made_control.create_repository(shared=shared)
 
2227
 
 
2228
    def make_smart_server(self, path):
 
2229
        smart_server = server.SmartTCPServer_for_testing()
 
2230
        smart_server.setUp(self.get_server())
 
2231
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2232
        self.addCleanup(smart_server.tearDown)
 
2233
        return remote_transport
 
2234
 
 
2235
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2236
        """Create a branch on the default transport and a MemoryTree for it."""
 
2237
        b = self.make_branch(relpath, format=format)
 
2238
        return memorytree.MemoryTree.create_on_branch(b)
 
2239
 
 
2240
    def make_branch_builder(self, relpath, format=None):
 
2241
        branch = self.make_branch(relpath, format=format)
 
2242
        return branchbuilder.BranchBuilder(branch=branch)
 
2243
 
 
2244
    def overrideEnvironmentForTesting(self):
 
2245
        os.environ['HOME'] = self.test_home_dir
 
2246
        os.environ['BZR_HOME'] = self.test_home_dir
 
2247
 
 
2248
    def setUp(self):
 
2249
        super(TestCaseWithMemoryTransport, self).setUp()
 
2250
        self._make_test_root()
 
2251
        _currentdir = os.getcwdu()
 
2252
        def _leaveDirectory():
 
2253
            os.chdir(_currentdir)
 
2254
        self.addCleanup(_leaveDirectory)
 
2255
        self.makeAndChdirToTestDir()
 
2256
        self.overrideEnvironmentForTesting()
 
2257
        self.__readonly_server = None
 
2258
        self.__server = None
 
2259
        self.reduceLockdirTimeout()
 
2260
 
 
2261
    def setup_smart_server_with_call_log(self):
 
2262
        """Sets up a smart server as the transport server with a call log."""
 
2263
        self.transport_server = server.SmartTCPServer_for_testing
 
2264
        self.hpss_calls = []
 
2265
        import traceback
 
2266
        # Skip the current stack down to the caller of
 
2267
        # setup_smart_server_with_call_log
 
2268
        prefix_length = len(traceback.extract_stack()) - 2
 
2269
        def capture_hpss_call(params):
 
2270
            self.hpss_calls.append(
 
2271
                CapturedCall(params, prefix_length))
 
2272
        client._SmartClient.hooks.install_named_hook(
 
2273
            'call', capture_hpss_call, None)
 
2274
 
 
2275
    def reset_smart_call_log(self):
 
2276
        self.hpss_calls = []
 
2277
 
 
2278
 
 
2279
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2280
    """Derived class that runs a test within a temporary directory.
 
2281
 
 
2282
    This is useful for tests that need to create a branch, etc.
 
2283
 
 
2284
    The directory is created in a slightly complex way: for each
 
2285
    Python invocation, a new temporary top-level directory is created.
 
2286
    All test cases create their own directory within that.  If the
 
2287
    tests complete successfully, the directory is removed.
 
2288
 
 
2289
    :ivar test_base_dir: The path of the top-level directory for this
 
2290
    test, which contains a home directory and a work directory.
 
2291
 
 
2292
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2293
    which is used as $HOME for this test.
 
2294
 
 
2295
    :ivar test_dir: A directory under test_base_dir used as the current
 
2296
    directory when the test proper is run.
 
2297
    """
 
2298
 
 
2299
    OVERRIDE_PYTHON = 'python'
 
2300
 
 
2301
    def check_file_contents(self, filename, expect):
 
2302
        self.log("check contents of file %s" % filename)
 
2303
        contents = file(filename, 'r').read()
 
2304
        if contents != expect:
 
2305
            self.log("expected: %r" % expect)
 
2306
            self.log("actually: %r" % contents)
 
2307
            self.fail("contents of %s not as expected" % filename)
 
2308
 
 
2309
    def _getTestDirPrefix(self):
 
2310
        # create a directory within the top level test directory
 
2311
        if sys.platform == 'win32':
 
2312
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2313
            # windows is likely to have path-length limits so use a short name
 
2314
            name_prefix = name_prefix[-30:]
 
2315
        else:
 
2316
            name_prefix = re.sub('[/]', '_', self.id())
 
2317
        return name_prefix
 
2318
 
 
2319
    def makeAndChdirToTestDir(self):
 
2320
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2321
 
 
2322
        For TestCaseInTempDir we create a temporary directory based on the test
 
2323
        name and then create two subdirs - test and home under it.
 
2324
        """
 
2325
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2326
            self._getTestDirPrefix())
 
2327
        name = name_prefix
 
2328
        for i in range(100):
 
2329
            if os.path.exists(name):
 
2330
                name = name_prefix + '_' + str(i)
 
2331
            else:
 
2332
                os.mkdir(name)
 
2333
                break
 
2334
        # now create test and home directories within this dir
 
2335
        self.test_base_dir = name
 
2336
        self.test_home_dir = self.test_base_dir + '/home'
 
2337
        os.mkdir(self.test_home_dir)
 
2338
        self.test_dir = self.test_base_dir + '/work'
 
2339
        os.mkdir(self.test_dir)
 
2340
        os.chdir(self.test_dir)
 
2341
        # put name of test inside
 
2342
        f = file(self.test_base_dir + '/name', 'w')
 
2343
        try:
 
2344
            f.write(self.id())
 
2345
        finally:
 
2346
            f.close()
 
2347
        self.addCleanup(self.deleteTestDir)
 
2348
 
 
2349
    def deleteTestDir(self):
 
2350
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2351
        _rmtree_temp_dir(self.test_base_dir)
 
2352
 
 
2353
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2354
        """Build a test tree according to a pattern.
 
2355
 
 
2356
        shape is a sequence of file specifications.  If the final
 
2357
        character is '/', a directory is created.
 
2358
 
 
2359
        This assumes that all the elements in the tree being built are new.
 
2360
 
 
2361
        This doesn't add anything to a branch.
 
2362
 
 
2363
        :type shape:    list or tuple.
 
2364
        :param line_endings: Either 'binary' or 'native'
 
2365
            in binary mode, exact contents are written in native mode, the
 
2366
            line endings match the default platform endings.
 
2367
        :param transport: A transport to write to, for building trees on VFS's.
 
2368
            If the transport is readonly or None, "." is opened automatically.
 
2369
        :return: None
 
2370
        """
 
2371
        if type(shape) not in (list, tuple):
 
2372
            raise AssertionError("Parameter 'shape' should be "
 
2373
                "a list or a tuple. Got %r instead" % (shape,))
 
2374
        # It's OK to just create them using forward slashes on windows.
 
2375
        if transport is None or transport.is_readonly():
 
2376
            transport = get_transport(".")
 
2377
        for name in shape:
 
2378
            self.assertIsInstance(name, basestring)
 
2379
            if name[-1] == '/':
 
2380
                transport.mkdir(urlutils.escape(name[:-1]))
 
2381
            else:
 
2382
                if line_endings == 'binary':
 
2383
                    end = '\n'
 
2384
                elif line_endings == 'native':
 
2385
                    end = os.linesep
 
2386
                else:
 
2387
                    raise errors.BzrError(
 
2388
                        'Invalid line ending request %r' % line_endings)
 
2389
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2390
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2391
 
 
2392
    def build_tree_contents(self, shape):
 
2393
        build_tree_contents(shape)
 
2394
 
 
2395
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2396
        """Assert whether path or paths are in the WorkingTree"""
 
2397
        if tree is None:
 
2398
            tree = workingtree.WorkingTree.open(root_path)
 
2399
        if not isinstance(path, basestring):
 
2400
            for p in path:
 
2401
                self.assertInWorkingTree(p, tree=tree)
 
2402
        else:
 
2403
            self.assertIsNot(tree.path2id(path), None,
 
2404
                path+' not in working tree.')
 
2405
 
 
2406
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2407
        """Assert whether path or paths are not in the WorkingTree"""
 
2408
        if tree is None:
 
2409
            tree = workingtree.WorkingTree.open(root_path)
 
2410
        if not isinstance(path, basestring):
 
2411
            for p in path:
 
2412
                self.assertNotInWorkingTree(p,tree=tree)
 
2413
        else:
 
2414
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2415
 
 
2416
 
 
2417
class TestCaseWithTransport(TestCaseInTempDir):
 
2418
    """A test case that provides get_url and get_readonly_url facilities.
 
2419
 
 
2420
    These back onto two transport servers, one for readonly access and one for
 
2421
    read write access.
 
2422
 
 
2423
    If no explicit class is provided for readonly access, a
 
2424
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2425
    based read write transports.
 
2426
 
 
2427
    If an explicit class is provided for readonly access, that server and the
 
2428
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2429
    """
 
2430
 
 
2431
    def get_vfs_only_server(self):
 
2432
        """See TestCaseWithMemoryTransport.
 
2433
 
 
2434
        This is useful for some tests with specific servers that need
 
2435
        diagnostics.
 
2436
        """
 
2437
        if self.__vfs_server is None:
 
2438
            self.__vfs_server = self.vfs_transport_factory()
 
2439
            self.__vfs_server.setUp()
 
2440
            self.addCleanup(self.__vfs_server.tearDown)
 
2441
        return self.__vfs_server
 
2442
 
 
2443
    def make_branch_and_tree(self, relpath, format=None):
 
2444
        """Create a branch on the transport and a tree locally.
 
2445
 
 
2446
        If the transport is not a LocalTransport, the Tree can't be created on
 
2447
        the transport.  In that case if the vfs_transport_factory is
 
2448
        LocalURLServer the working tree is created in the local
 
2449
        directory backing the transport, and the returned tree's branch and
 
2450
        repository will also be accessed locally. Otherwise a lightweight
 
2451
        checkout is created and returned.
 
2452
 
 
2453
        :param format: The BzrDirFormat.
 
2454
        :returns: the WorkingTree.
 
2455
        """
 
2456
        # TODO: always use the local disk path for the working tree,
 
2457
        # this obviously requires a format that supports branch references
 
2458
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2459
        # RBC 20060208
 
2460
        b = self.make_branch(relpath, format=format)
 
2461
        try:
 
2462
            return b.bzrdir.create_workingtree()
 
2463
        except errors.NotLocalUrl:
 
2464
            # We can only make working trees locally at the moment.  If the
 
2465
            # transport can't support them, then we keep the non-disk-backed
 
2466
            # branch and create a local checkout.
 
2467
            if self.vfs_transport_factory is LocalURLServer:
 
2468
                # the branch is colocated on disk, we cannot create a checkout.
 
2469
                # hopefully callers will expect this.
 
2470
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2471
                wt = local_controldir.create_workingtree()
 
2472
                if wt.branch._format != b._format:
 
2473
                    wt._branch = b
 
2474
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2475
                    # in case the implementation details of workingtree objects
 
2476
                    # change.
 
2477
                    self.assertIs(b, wt.branch)
 
2478
                return wt
 
2479
            else:
 
2480
                return b.create_checkout(relpath, lightweight=True)
 
2481
 
 
2482
    def assertIsDirectory(self, relpath, transport):
 
2483
        """Assert that relpath within transport is a directory.
 
2484
 
 
2485
        This may not be possible on all transports; in that case it propagates
 
2486
        a TransportNotPossible.
 
2487
        """
 
2488
        try:
 
2489
            mode = transport.stat(relpath).st_mode
 
2490
        except errors.NoSuchFile:
 
2491
            self.fail("path %s is not a directory; no such file"
 
2492
                      % (relpath))
 
2493
        if not stat.S_ISDIR(mode):
 
2494
            self.fail("path %s is not a directory; has mode %#o"
 
2495
                      % (relpath, mode))
 
2496
 
 
2497
    def assertTreesEqual(self, left, right):
 
2498
        """Check that left and right have the same content and properties."""
 
2499
        # we use a tree delta to check for equality of the content, and we
 
2500
        # manually check for equality of other things such as the parents list.
 
2501
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2502
        differences = left.changes_from(right)
 
2503
        self.assertFalse(differences.has_changed(),
 
2504
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2505
 
 
2506
    def setUp(self):
 
2507
        super(TestCaseWithTransport, self).setUp()
 
2508
        self.__vfs_server = None
 
2509
 
 
2510
 
 
2511
class ChrootedTestCase(TestCaseWithTransport):
 
2512
    """A support class that provides readonly urls outside the local namespace.
 
2513
 
 
2514
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2515
    is then we are chrooted already, if it is not then an HttpServer is used
 
2516
    for readonly urls.
 
2517
 
 
2518
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2519
                       be used without needed to redo it when a different
 
2520
                       subclass is in use ?
 
2521
    """
 
2522
 
 
2523
    def setUp(self):
 
2524
        super(ChrootedTestCase, self).setUp()
 
2525
        if not self.vfs_transport_factory == MemoryServer:
 
2526
            self.transport_readonly_server = HttpServer
 
2527
 
 
2528
 
 
2529
def condition_id_re(pattern):
 
2530
    """Create a condition filter which performs a re check on a test's id.
 
2531
 
 
2532
    :param pattern: A regular expression string.
 
2533
    :return: A callable that returns True if the re matches.
 
2534
    """
 
2535
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2536
        'test filter')
 
2537
    def condition(test):
 
2538
        test_id = test.id()
 
2539
        return filter_re.search(test_id)
 
2540
    return condition
 
2541
 
 
2542
 
 
2543
def condition_isinstance(klass_or_klass_list):
 
2544
    """Create a condition filter which returns isinstance(param, klass).
 
2545
 
 
2546
    :return: A callable which when called with one parameter obj return the
 
2547
        result of isinstance(obj, klass_or_klass_list).
 
2548
    """
 
2549
    def condition(obj):
 
2550
        return isinstance(obj, klass_or_klass_list)
 
2551
    return condition
 
2552
 
 
2553
 
 
2554
def condition_id_in_list(id_list):
 
2555
    """Create a condition filter which verify that test's id in a list.
 
2556
 
 
2557
    :param id_list: A TestIdList object.
 
2558
    :return: A callable that returns True if the test's id appears in the list.
 
2559
    """
 
2560
    def condition(test):
 
2561
        return id_list.includes(test.id())
 
2562
    return condition
 
2563
 
 
2564
 
 
2565
def condition_id_startswith(starts):
 
2566
    """Create a condition filter verifying that test's id starts with a string.
 
2567
 
 
2568
    :param starts: A list of string.
 
2569
    :return: A callable that returns True if the test's id starts with one of
 
2570
        the given strings.
 
2571
    """
 
2572
    def condition(test):
 
2573
        for start in starts:
 
2574
            if test.id().startswith(start):
 
2575
                return True
 
2576
        return False
 
2577
    return condition
 
2578
 
 
2579
 
 
2580
def exclude_tests_by_condition(suite, condition):
 
2581
    """Create a test suite which excludes some tests from suite.
 
2582
 
 
2583
    :param suite: The suite to get tests from.
 
2584
    :param condition: A callable whose result evaluates True when called with a
 
2585
        test case which should be excluded from the result.
 
2586
    :return: A suite which contains the tests found in suite that fail
 
2587
        condition.
 
2588
    """
 
2589
    result = []
 
2590
    for test in iter_suite_tests(suite):
 
2591
        if not condition(test):
 
2592
            result.append(test)
 
2593
    return TestUtil.TestSuite(result)
 
2594
 
 
2595
 
 
2596
def filter_suite_by_condition(suite, condition):
 
2597
    """Create a test suite by filtering another one.
 
2598
 
 
2599
    :param suite: The source suite.
 
2600
    :param condition: A callable whose result evaluates True when called with a
 
2601
        test case which should be included in the result.
 
2602
    :return: A suite which contains the tests found in suite that pass
 
2603
        condition.
 
2604
    """
 
2605
    result = []
 
2606
    for test in iter_suite_tests(suite):
 
2607
        if condition(test):
 
2608
            result.append(test)
 
2609
    return TestUtil.TestSuite(result)
 
2610
 
 
2611
 
 
2612
def filter_suite_by_re(suite, pattern):
 
2613
    """Create a test suite by filtering another one.
 
2614
 
 
2615
    :param suite:           the source suite
 
2616
    :param pattern:         pattern that names must match
 
2617
    :returns: the newly created suite
 
2618
    """
 
2619
    condition = condition_id_re(pattern)
 
2620
    result_suite = filter_suite_by_condition(suite, condition)
 
2621
    return result_suite
 
2622
 
 
2623
 
 
2624
def filter_suite_by_id_list(suite, test_id_list):
 
2625
    """Create a test suite by filtering another one.
 
2626
 
 
2627
    :param suite: The source suite.
 
2628
    :param test_id_list: A list of the test ids to keep as strings.
 
2629
    :returns: the newly created suite
 
2630
    """
 
2631
    condition = condition_id_in_list(test_id_list)
 
2632
    result_suite = filter_suite_by_condition(suite, condition)
 
2633
    return result_suite
 
2634
 
 
2635
 
 
2636
def filter_suite_by_id_startswith(suite, start):
 
2637
    """Create a test suite by filtering another one.
 
2638
 
 
2639
    :param suite: The source suite.
 
2640
    :param start: A list of string the test id must start with one of.
 
2641
    :returns: the newly created suite
 
2642
    """
 
2643
    condition = condition_id_startswith(start)
 
2644
    result_suite = filter_suite_by_condition(suite, condition)
 
2645
    return result_suite
 
2646
 
 
2647
 
 
2648
def exclude_tests_by_re(suite, pattern):
 
2649
    """Create a test suite which excludes some tests from suite.
 
2650
 
 
2651
    :param suite: The suite to get tests from.
 
2652
    :param pattern: A regular expression string. Test ids that match this
 
2653
        pattern will be excluded from the result.
 
2654
    :return: A TestSuite that contains all the tests from suite without the
 
2655
        tests that matched pattern. The order of tests is the same as it was in
 
2656
        suite.
 
2657
    """
 
2658
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2659
 
 
2660
 
 
2661
def preserve_input(something):
 
2662
    """A helper for performing test suite transformation chains.
 
2663
 
 
2664
    :param something: Anything you want to preserve.
 
2665
    :return: Something.
 
2666
    """
 
2667
    return something
 
2668
 
 
2669
 
 
2670
def randomize_suite(suite):
 
2671
    """Return a new TestSuite with suite's tests in random order.
 
2672
 
 
2673
    The tests in the input suite are flattened into a single suite in order to
 
2674
    accomplish this. Any nested TestSuites are removed to provide global
 
2675
    randomness.
 
2676
    """
 
2677
    tests = list(iter_suite_tests(suite))
 
2678
    random.shuffle(tests)
 
2679
    return TestUtil.TestSuite(tests)
 
2680
 
 
2681
 
 
2682
def split_suite_by_condition(suite, condition):
 
2683
    """Split a test suite into two by a condition.
 
2684
 
 
2685
    :param suite: The suite to split.
 
2686
    :param condition: The condition to match on. Tests that match this
 
2687
        condition are returned in the first test suite, ones that do not match
 
2688
        are in the second suite.
 
2689
    :return: A tuple of two test suites, where the first contains tests from
 
2690
        suite matching the condition, and the second contains the remainder
 
2691
        from suite. The order within each output suite is the same as it was in
 
2692
        suite.
 
2693
    """
 
2694
    matched = []
 
2695
    did_not_match = []
 
2696
    for test in iter_suite_tests(suite):
 
2697
        if condition(test):
 
2698
            matched.append(test)
 
2699
        else:
 
2700
            did_not_match.append(test)
 
2701
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2702
 
 
2703
 
 
2704
def split_suite_by_re(suite, pattern):
 
2705
    """Split a test suite into two by a regular expression.
 
2706
 
 
2707
    :param suite: The suite to split.
 
2708
    :param pattern: A regular expression string. Test ids that match this
 
2709
        pattern will be in the first test suite returned, and the others in the
 
2710
        second test suite returned.
 
2711
    :return: A tuple of two test suites, where the first contains tests from
 
2712
        suite matching pattern, and the second contains the remainder from
 
2713
        suite. The order within each output suite is the same as it was in
 
2714
        suite.
 
2715
    """
 
2716
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2717
 
 
2718
 
 
2719
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2720
              stop_on_failure=False,
 
2721
              transport=None, lsprof_timed=None, bench_history=None,
 
2722
              matching_tests_first=None,
 
2723
              list_only=False,
 
2724
              random_seed=None,
 
2725
              exclude_pattern=None,
 
2726
              strict=False,
 
2727
              runner_class=None,
 
2728
              suite_decorators=None,
 
2729
              stream=None):
 
2730
    """Run a test suite for bzr selftest.
 
2731
 
 
2732
    :param runner_class: The class of runner to use. Must support the
 
2733
        constructor arguments passed by run_suite which are more than standard
 
2734
        python uses.
 
2735
    :return: A boolean indicating success.
 
2736
    """
 
2737
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2738
    if verbose:
 
2739
        verbosity = 2
 
2740
    else:
 
2741
        verbosity = 1
 
2742
    if runner_class is None:
 
2743
        runner_class = TextTestRunner
 
2744
    if stream is None:
 
2745
        stream = sys.stdout
 
2746
    runner = runner_class(stream=stream,
 
2747
                            descriptions=0,
 
2748
                            verbosity=verbosity,
 
2749
                            bench_history=bench_history,
 
2750
                            list_only=list_only,
 
2751
                            strict=strict,
 
2752
                            )
 
2753
    runner.stop_on_failure=stop_on_failure
 
2754
    # built in decorator factories:
 
2755
    decorators = [
 
2756
        random_order(random_seed, runner),
 
2757
        exclude_tests(exclude_pattern),
 
2758
        ]
 
2759
    if matching_tests_first:
 
2760
        decorators.append(tests_first(pattern))
 
2761
    else:
 
2762
        decorators.append(filter_tests(pattern))
 
2763
    if suite_decorators:
 
2764
        decorators.extend(suite_decorators)
 
2765
    # tell the result object how many tests will be running:
 
2766
    decorators.append(CountingDecorator)
 
2767
    for decorator in decorators:
 
2768
        suite = decorator(suite)
 
2769
    result = runner.run(suite)
 
2770
    if list_only:
 
2771
        return True
 
2772
    result.done()
 
2773
    if strict:
 
2774
        return result.wasStrictlySuccessful()
 
2775
    else:
 
2776
        return result.wasSuccessful()
 
2777
 
 
2778
 
 
2779
# A registry where get() returns a suite decorator.
 
2780
parallel_registry = registry.Registry()
 
2781
 
 
2782
 
 
2783
def fork_decorator(suite):
 
2784
    concurrency = osutils.local_concurrency()
 
2785
    if concurrency == 1:
 
2786
        return suite
 
2787
    from testtools import ConcurrentTestSuite
 
2788
    return ConcurrentTestSuite(suite, fork_for_tests)
 
2789
parallel_registry.register('fork', fork_decorator)
 
2790
 
 
2791
 
 
2792
def subprocess_decorator(suite):
 
2793
    concurrency = osutils.local_concurrency()
 
2794
    if concurrency == 1:
 
2795
        return suite
 
2796
    from testtools import ConcurrentTestSuite
 
2797
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
2798
parallel_registry.register('subprocess', subprocess_decorator)
 
2799
 
 
2800
 
 
2801
def exclude_tests(exclude_pattern):
 
2802
    """Return a test suite decorator that excludes tests."""
 
2803
    if exclude_pattern is None:
 
2804
        return identity_decorator
 
2805
    def decorator(suite):
 
2806
        return ExcludeDecorator(suite, exclude_pattern)
 
2807
    return decorator
 
2808
 
 
2809
 
 
2810
def filter_tests(pattern):
 
2811
    if pattern == '.*':
 
2812
        return identity_decorator
 
2813
    def decorator(suite):
 
2814
        return FilterTestsDecorator(suite, pattern)
 
2815
    return decorator
 
2816
 
 
2817
 
 
2818
def random_order(random_seed, runner):
 
2819
    """Return a test suite decorator factory for randomising tests order.
 
2820
    
 
2821
    :param random_seed: now, a string which casts to a long, or a long.
 
2822
    :param runner: A test runner with a stream attribute to report on.
 
2823
    """
 
2824
    if random_seed is None:
 
2825
        return identity_decorator
 
2826
    def decorator(suite):
 
2827
        return RandomDecorator(suite, random_seed, runner.stream)
 
2828
    return decorator
 
2829
 
 
2830
 
 
2831
def tests_first(pattern):
 
2832
    if pattern == '.*':
 
2833
        return identity_decorator
 
2834
    def decorator(suite):
 
2835
        return TestFirstDecorator(suite, pattern)
 
2836
    return decorator
 
2837
 
 
2838
 
 
2839
def identity_decorator(suite):
 
2840
    """Return suite."""
 
2841
    return suite
 
2842
 
 
2843
 
 
2844
class TestDecorator(TestSuite):
 
2845
    """A decorator for TestCase/TestSuite objects.
 
2846
    
 
2847
    Usually, subclasses should override __iter__(used when flattening test
 
2848
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
2849
    debug().
 
2850
    """
 
2851
 
 
2852
    def __init__(self, suite):
 
2853
        TestSuite.__init__(self)
 
2854
        self.addTest(suite)
 
2855
 
 
2856
    def countTestCases(self):
 
2857
        cases = 0
 
2858
        for test in self:
 
2859
            cases += test.countTestCases()
 
2860
        return cases
 
2861
 
 
2862
    def debug(self):
 
2863
        for test in self:
 
2864
            test.debug()
 
2865
 
 
2866
    def run(self, result):
 
2867
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
2868
        # into __iter__.
 
2869
        for test in self:
 
2870
            if result.shouldStop:
 
2871
                break
 
2872
            test.run(result)
 
2873
        return result
 
2874
 
 
2875
 
 
2876
class CountingDecorator(TestDecorator):
 
2877
    """A decorator which calls result.progress(self.countTestCases)."""
 
2878
 
 
2879
    def run(self, result):
 
2880
        progress_method = getattr(result, 'progress', None)
 
2881
        if callable(progress_method):
 
2882
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
2883
        return super(CountingDecorator, self).run(result)
 
2884
 
 
2885
 
 
2886
class ExcludeDecorator(TestDecorator):
 
2887
    """A decorator which excludes test matching an exclude pattern."""
 
2888
 
 
2889
    def __init__(self, suite, exclude_pattern):
 
2890
        TestDecorator.__init__(self, suite)
 
2891
        self.exclude_pattern = exclude_pattern
 
2892
        self.excluded = False
 
2893
 
 
2894
    def __iter__(self):
 
2895
        if self.excluded:
 
2896
            return iter(self._tests)
 
2897
        self.excluded = True
 
2898
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
2899
        del self._tests[:]
 
2900
        self.addTests(suite)
 
2901
        return iter(self._tests)
 
2902
 
 
2903
 
 
2904
class FilterTestsDecorator(TestDecorator):
 
2905
    """A decorator which filters tests to those matching a pattern."""
 
2906
 
 
2907
    def __init__(self, suite, pattern):
 
2908
        TestDecorator.__init__(self, suite)
 
2909
        self.pattern = pattern
 
2910
        self.filtered = False
 
2911
 
 
2912
    def __iter__(self):
 
2913
        if self.filtered:
 
2914
            return iter(self._tests)
 
2915
        self.filtered = True
 
2916
        suite = filter_suite_by_re(self, self.pattern)
 
2917
        del self._tests[:]
 
2918
        self.addTests(suite)
 
2919
        return iter(self._tests)
 
2920
 
 
2921
 
 
2922
class RandomDecorator(TestDecorator):
 
2923
    """A decorator which randomises the order of its tests."""
 
2924
 
 
2925
    def __init__(self, suite, random_seed, stream):
 
2926
        TestDecorator.__init__(self, suite)
 
2927
        self.random_seed = random_seed
 
2928
        self.randomised = False
 
2929
        self.stream = stream
 
2930
 
 
2931
    def __iter__(self):
 
2932
        if self.randomised:
 
2933
            return iter(self._tests)
 
2934
        self.randomised = True
 
2935
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
2936
            (self.actual_seed()))
 
2937
        # Initialise the random number generator.
 
2938
        random.seed(self.actual_seed())
 
2939
        suite = randomize_suite(self)
 
2940
        del self._tests[:]
 
2941
        self.addTests(suite)
 
2942
        return iter(self._tests)
 
2943
 
 
2944
    def actual_seed(self):
 
2945
        if self.random_seed == "now":
 
2946
            # We convert the seed to a long to make it reuseable across
 
2947
            # invocations (because the user can reenter it).
 
2948
            self.random_seed = long(time.time())
 
2949
        else:
 
2950
            # Convert the seed to a long if we can
 
2951
            try:
 
2952
                self.random_seed = long(self.random_seed)
 
2953
            except:
 
2954
                pass
 
2955
        return self.random_seed
 
2956
 
 
2957
 
 
2958
class TestFirstDecorator(TestDecorator):
 
2959
    """A decorator which moves named tests to the front."""
 
2960
 
 
2961
    def __init__(self, suite, pattern):
 
2962
        TestDecorator.__init__(self, suite)
 
2963
        self.pattern = pattern
 
2964
        self.filtered = False
 
2965
 
 
2966
    def __iter__(self):
 
2967
        if self.filtered:
 
2968
            return iter(self._tests)
 
2969
        self.filtered = True
 
2970
        suites = split_suite_by_re(self, self.pattern)
 
2971
        del self._tests[:]
 
2972
        self.addTests(suites)
 
2973
        return iter(self._tests)
 
2974
 
 
2975
 
 
2976
def partition_tests(suite, count):
 
2977
    """Partition suite into count lists of tests."""
 
2978
    result = []
 
2979
    tests = list(iter_suite_tests(suite))
 
2980
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
2981
    for block in range(count):
 
2982
        low_test = block * tests_per_process
 
2983
        high_test = low_test + tests_per_process
 
2984
        process_tests = tests[low_test:high_test]
 
2985
        result.append(process_tests)
 
2986
    return result
 
2987
 
 
2988
 
 
2989
def fork_for_tests(suite):
 
2990
    """Take suite and start up one runner per CPU by forking()
 
2991
 
 
2992
    :return: An iterable of TestCase-like objects which can each have
 
2993
        run(result) called on them to feed tests to result.
 
2994
    """
 
2995
    concurrency = osutils.local_concurrency()
 
2996
    result = []
 
2997
    from subunit import TestProtocolClient, ProtocolTestCase
 
2998
    try:
 
2999
        from subunit.test_results import AutoTimingTestResultDecorator
 
3000
    except ImportError:
 
3001
        AutoTimingTestResultDecorator = lambda x:x
 
3002
    class TestInOtherProcess(ProtocolTestCase):
 
3003
        # Should be in subunit, I think. RBC.
 
3004
        def __init__(self, stream, pid):
 
3005
            ProtocolTestCase.__init__(self, stream)
 
3006
            self.pid = pid
 
3007
 
 
3008
        def run(self, result):
 
3009
            try:
 
3010
                ProtocolTestCase.run(self, result)
 
3011
            finally:
 
3012
                os.waitpid(self.pid, os.WNOHANG)
 
3013
 
 
3014
    test_blocks = partition_tests(suite, concurrency)
 
3015
    for process_tests in test_blocks:
 
3016
        process_suite = TestSuite()
 
3017
        process_suite.addTests(process_tests)
 
3018
        c2pread, c2pwrite = os.pipe()
 
3019
        pid = os.fork()
 
3020
        if pid == 0:
 
3021
            try:
 
3022
                os.close(c2pread)
 
3023
                # Leave stderr and stdout open so we can see test noise
 
3024
                # Close stdin so that the child goes away if it decides to
 
3025
                # read from stdin (otherwise its a roulette to see what
 
3026
                # child actually gets keystrokes for pdb etc).
 
3027
                sys.stdin.close()
 
3028
                sys.stdin = None
 
3029
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3030
                subunit_result = AutoTimingTestResultDecorator(
 
3031
                    TestProtocolClient(stream))
 
3032
                process_suite.run(subunit_result)
 
3033
            finally:
 
3034
                os._exit(0)
 
3035
        else:
 
3036
            os.close(c2pwrite)
 
3037
            stream = os.fdopen(c2pread, 'rb', 1)
 
3038
            test = TestInOtherProcess(stream, pid)
 
3039
            result.append(test)
 
3040
    return result
 
3041
 
 
3042
 
 
3043
def reinvoke_for_tests(suite):
 
3044
    """Take suite and start up one runner per CPU using subprocess().
 
3045
 
 
3046
    :return: An iterable of TestCase-like objects which can each have
 
3047
        run(result) called on them to feed tests to result.
 
3048
    """
 
3049
    concurrency = osutils.local_concurrency()
 
3050
    result = []
 
3051
    from subunit import ProtocolTestCase
 
3052
    class TestInSubprocess(ProtocolTestCase):
 
3053
        def __init__(self, process, name):
 
3054
            ProtocolTestCase.__init__(self, process.stdout)
 
3055
            self.process = process
 
3056
            self.process.stdin.close()
 
3057
            self.name = name
 
3058
 
 
3059
        def run(self, result):
 
3060
            try:
 
3061
                ProtocolTestCase.run(self, result)
 
3062
            finally:
 
3063
                self.process.wait()
 
3064
                os.unlink(self.name)
 
3065
            # print "pid %d finished" % finished_process
 
3066
    test_blocks = partition_tests(suite, concurrency)
 
3067
    for process_tests in test_blocks:
 
3068
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3069
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3070
        if not os.path.isfile(bzr_path):
 
3071
            # We are probably installed. Assume sys.argv is the right file
 
3072
            bzr_path = sys.argv[0]
 
3073
        fd, test_list_file_name = tempfile.mkstemp()
 
3074
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3075
        for test in process_tests:
 
3076
            test_list_file.write(test.id() + '\n')
 
3077
        test_list_file.close()
 
3078
        try:
 
3079
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3080
                '--subunit']
 
3081
            if '--no-plugins' in sys.argv:
 
3082
                argv.append('--no-plugins')
 
3083
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3084
            # stderr it can interrupt the subunit protocol.
 
3085
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3086
                bufsize=1)
 
3087
            test = TestInSubprocess(process, test_list_file_name)
 
3088
            result.append(test)
 
3089
        except:
 
3090
            os.unlink(test_list_file_name)
 
3091
            raise
 
3092
    return result
 
3093
 
 
3094
 
 
3095
class BZRTransformingResult(unittest.TestResult):
 
3096
 
 
3097
    def __init__(self, target):
 
3098
        unittest.TestResult.__init__(self)
 
3099
        self.result = target
 
3100
 
 
3101
    def startTest(self, test):
 
3102
        self.result.startTest(test)
 
3103
 
 
3104
    def stopTest(self, test):
 
3105
        self.result.stopTest(test)
 
3106
 
 
3107
    def addError(self, test, err):
 
3108
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3109
        if feature is not None:
 
3110
            self.result.addNotSupported(test, feature)
 
3111
        else:
 
3112
            self.result.addError(test, err)
 
3113
 
 
3114
    def addFailure(self, test, err):
 
3115
        known = self._error_looks_like('KnownFailure: ', err)
 
3116
        if known is not None:
 
3117
            self.result._addKnownFailure(test, [KnownFailure,
 
3118
                                                KnownFailure(known), None])
 
3119
        else:
 
3120
            self.result.addFailure(test, err)
 
3121
 
 
3122
    def addSkip(self, test, reason):
 
3123
        self.result.addSkip(test, reason)
 
3124
 
 
3125
    def addSuccess(self, test):
 
3126
        self.result.addSuccess(test)
 
3127
 
 
3128
    def _error_looks_like(self, prefix, err):
 
3129
        """Deserialize exception and returns the stringify value."""
 
3130
        import subunit
 
3131
        value = None
 
3132
        typ, exc, _ = err
 
3133
        if isinstance(exc, subunit.RemoteException):
 
3134
            # stringify the exception gives access to the remote traceback
 
3135
            # We search the last line for 'prefix'
 
3136
            lines = str(exc).split('\n')
 
3137
            while lines and not lines[-1]:
 
3138
                lines.pop(-1)
 
3139
            if lines:
 
3140
                if lines[-1].startswith(prefix):
 
3141
                    value = lines[-1][len(prefix):]
 
3142
        return value
 
3143
 
 
3144
 
 
3145
# Controlled by "bzr selftest -E=..." option
 
3146
selftest_debug_flags = set()
 
3147
 
 
3148
 
 
3149
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3150
             transport=None,
 
3151
             test_suite_factory=None,
 
3152
             lsprof_timed=None,
 
3153
             bench_history=None,
 
3154
             matching_tests_first=None,
 
3155
             list_only=False,
 
3156
             random_seed=None,
 
3157
             exclude_pattern=None,
 
3158
             strict=False,
 
3159
             load_list=None,
 
3160
             debug_flags=None,
 
3161
             starting_with=None,
 
3162
             runner_class=None,
 
3163
             suite_decorators=None,
 
3164
             ):
 
3165
    """Run the whole test suite under the enhanced runner"""
 
3166
    # XXX: Very ugly way to do this...
 
3167
    # Disable warning about old formats because we don't want it to disturb
 
3168
    # any blackbox tests.
 
3169
    from bzrlib import repository
 
3170
    repository._deprecation_warning_done = True
 
3171
 
 
3172
    global default_transport
 
3173
    if transport is None:
 
3174
        transport = default_transport
 
3175
    old_transport = default_transport
 
3176
    default_transport = transport
 
3177
    global selftest_debug_flags
 
3178
    old_debug_flags = selftest_debug_flags
 
3179
    if debug_flags is not None:
 
3180
        selftest_debug_flags = set(debug_flags)
 
3181
    try:
 
3182
        if load_list is None:
 
3183
            keep_only = None
 
3184
        else:
 
3185
            keep_only = load_test_id_list(load_list)
 
3186
        if test_suite_factory is None:
 
3187
            suite = test_suite(keep_only, starting_with)
 
3188
        else:
 
3189
            suite = test_suite_factory()
 
3190
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3191
                     stop_on_failure=stop_on_failure,
 
3192
                     transport=transport,
 
3193
                     lsprof_timed=lsprof_timed,
 
3194
                     bench_history=bench_history,
 
3195
                     matching_tests_first=matching_tests_first,
 
3196
                     list_only=list_only,
 
3197
                     random_seed=random_seed,
 
3198
                     exclude_pattern=exclude_pattern,
 
3199
                     strict=strict,
 
3200
                     runner_class=runner_class,
 
3201
                     suite_decorators=suite_decorators,
 
3202
                     )
 
3203
    finally:
 
3204
        default_transport = old_transport
 
3205
        selftest_debug_flags = old_debug_flags
 
3206
 
 
3207
 
 
3208
def load_test_id_list(file_name):
 
3209
    """Load a test id list from a text file.
 
3210
 
 
3211
    The format is one test id by line.  No special care is taken to impose
 
3212
    strict rules, these test ids are used to filter the test suite so a test id
 
3213
    that do not match an existing test will do no harm. This allows user to add
 
3214
    comments, leave blank lines, etc.
 
3215
    """
 
3216
    test_list = []
 
3217
    try:
 
3218
        ftest = open(file_name, 'rt')
 
3219
    except IOError, e:
 
3220
        if e.errno != errno.ENOENT:
 
3221
            raise
 
3222
        else:
 
3223
            raise errors.NoSuchFile(file_name)
 
3224
 
 
3225
    for test_name in ftest.readlines():
 
3226
        test_list.append(test_name.strip())
 
3227
    ftest.close()
 
3228
    return test_list
 
3229
 
 
3230
 
 
3231
def suite_matches_id_list(test_suite, id_list):
 
3232
    """Warns about tests not appearing or appearing more than once.
 
3233
 
 
3234
    :param test_suite: A TestSuite object.
 
3235
    :param test_id_list: The list of test ids that should be found in
 
3236
         test_suite.
 
3237
 
 
3238
    :return: (absents, duplicates) absents is a list containing the test found
 
3239
        in id_list but not in test_suite, duplicates is a list containing the
 
3240
        test found multiple times in test_suite.
 
3241
 
 
3242
    When using a prefined test id list, it may occurs that some tests do not
 
3243
    exist anymore or that some tests use the same id. This function warns the
 
3244
    tester about potential problems in his workflow (test lists are volatile)
 
3245
    or in the test suite itself (using the same id for several tests does not
 
3246
    help to localize defects).
 
3247
    """
 
3248
    # Build a dict counting id occurrences
 
3249
    tests = dict()
 
3250
    for test in iter_suite_tests(test_suite):
 
3251
        id = test.id()
 
3252
        tests[id] = tests.get(id, 0) + 1
 
3253
 
 
3254
    not_found = []
 
3255
    duplicates = []
 
3256
    for id in id_list:
 
3257
        occurs = tests.get(id, 0)
 
3258
        if not occurs:
 
3259
            not_found.append(id)
 
3260
        elif occurs > 1:
 
3261
            duplicates.append(id)
 
3262
 
 
3263
    return not_found, duplicates
 
3264
 
 
3265
 
 
3266
class TestIdList(object):
 
3267
    """Test id list to filter a test suite.
 
3268
 
 
3269
    Relying on the assumption that test ids are built as:
 
3270
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3271
    notation, this class offers methods to :
 
3272
    - avoid building a test suite for modules not refered to in the test list,
 
3273
    - keep only the tests listed from the module test suite.
 
3274
    """
 
3275
 
 
3276
    def __init__(self, test_id_list):
 
3277
        # When a test suite needs to be filtered against us we compare test ids
 
3278
        # for equality, so a simple dict offers a quick and simple solution.
 
3279
        self.tests = dict().fromkeys(test_id_list, True)
 
3280
 
 
3281
        # While unittest.TestCase have ids like:
 
3282
        # <module>.<class>.<method>[(<param+)],
 
3283
        # doctest.DocTestCase can have ids like:
 
3284
        # <module>
 
3285
        # <module>.<class>
 
3286
        # <module>.<function>
 
3287
        # <module>.<class>.<method>
 
3288
 
 
3289
        # Since we can't predict a test class from its name only, we settle on
 
3290
        # a simple constraint: a test id always begins with its module name.
 
3291
 
 
3292
        modules = {}
 
3293
        for test_id in test_id_list:
 
3294
            parts = test_id.split('.')
 
3295
            mod_name = parts.pop(0)
 
3296
            modules[mod_name] = True
 
3297
            for part in parts:
 
3298
                mod_name += '.' + part
 
3299
                modules[mod_name] = True
 
3300
        self.modules = modules
 
3301
 
 
3302
    def refers_to(self, module_name):
 
3303
        """Is there tests for the module or one of its sub modules."""
 
3304
        return self.modules.has_key(module_name)
 
3305
 
 
3306
    def includes(self, test_id):
 
3307
        return self.tests.has_key(test_id)
 
3308
 
 
3309
 
 
3310
class TestPrefixAliasRegistry(registry.Registry):
 
3311
    """A registry for test prefix aliases.
 
3312
 
 
3313
    This helps implement shorcuts for the --starting-with selftest
 
3314
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3315
    warning will be emitted).
 
3316
    """
 
3317
 
 
3318
    def register(self, key, obj, help=None, info=None,
 
3319
                 override_existing=False):
 
3320
        """See Registry.register.
 
3321
 
 
3322
        Trying to override an existing alias causes a warning to be emitted,
 
3323
        not a fatal execption.
 
3324
        """
 
3325
        try:
 
3326
            super(TestPrefixAliasRegistry, self).register(
 
3327
                key, obj, help=help, info=info, override_existing=False)
 
3328
        except KeyError:
 
3329
            actual = self.get(key)
 
3330
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3331
                 % (key, actual, obj))
 
3332
 
 
3333
    def resolve_alias(self, id_start):
 
3334
        """Replace the alias by the prefix in the given string.
 
3335
 
 
3336
        Using an unknown prefix is an error to help catching typos.
 
3337
        """
 
3338
        parts = id_start.split('.')
 
3339
        try:
 
3340
            parts[0] = self.get(parts[0])
 
3341
        except KeyError:
 
3342
            raise errors.BzrCommandError(
 
3343
                '%s is not a known test prefix alias' % parts[0])
 
3344
        return '.'.join(parts)
 
3345
 
 
3346
 
 
3347
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3348
"""Registry of test prefix aliases."""
 
3349
 
 
3350
 
 
3351
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3352
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3353
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3354
 
 
3355
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3356
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3357
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3358
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3359
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3360
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3361
 
 
3362
 
 
3363
def test_suite(keep_only=None, starting_with=None):
 
3364
    """Build and return TestSuite for the whole of bzrlib.
 
3365
 
 
3366
    :param keep_only: A list of test ids limiting the suite returned.
 
3367
 
 
3368
    :param starting_with: An id limiting the suite returned to the tests
 
3369
         starting with it.
 
3370
 
 
3371
    This function can be replaced if you need to change the default test
 
3372
    suite on a global basis, but it is not encouraged.
 
3373
    """
 
3374
    testmod_names = [
 
3375
                   'bzrlib.doc',
 
3376
                   'bzrlib.tests.blackbox',
 
3377
                   'bzrlib.tests.commands',
 
3378
                   'bzrlib.tests.per_branch',
 
3379
                   'bzrlib.tests.per_bzrdir',
 
3380
                   'bzrlib.tests.per_interrepository',
 
3381
                   'bzrlib.tests.per_intertree',
 
3382
                   'bzrlib.tests.per_inventory',
 
3383
                   'bzrlib.tests.per_interbranch',
 
3384
                   'bzrlib.tests.per_lock',
 
3385
                   'bzrlib.tests.per_transport',
 
3386
                   'bzrlib.tests.per_tree',
 
3387
                   'bzrlib.tests.per_repository',
 
3388
                   'bzrlib.tests.per_repository_chk',
 
3389
                   'bzrlib.tests.per_repository_reference',
 
3390
                   'bzrlib.tests.per_workingtree',
 
3391
                   'bzrlib.tests.test__annotator',
 
3392
                   'bzrlib.tests.test__chk_map',
 
3393
                   'bzrlib.tests.test__dirstate_helpers',
 
3394
                   'bzrlib.tests.test__groupcompress',
 
3395
                   'bzrlib.tests.test__known_graph',
 
3396
                   'bzrlib.tests.test__rio',
 
3397
                   'bzrlib.tests.test__walkdirs_win32',
 
3398
                   'bzrlib.tests.test_ancestry',
 
3399
                   'bzrlib.tests.test_annotate',
 
3400
                   'bzrlib.tests.test_api',
 
3401
                   'bzrlib.tests.test_atomicfile',
 
3402
                   'bzrlib.tests.test_bad_files',
 
3403
                   'bzrlib.tests.test_bencode',
 
3404
                   'bzrlib.tests.test_bisect_multi',
 
3405
                   'bzrlib.tests.test_branch',
 
3406
                   'bzrlib.tests.test_branchbuilder',
 
3407
                   'bzrlib.tests.test_btree_index',
 
3408
                   'bzrlib.tests.test_bugtracker',
 
3409
                   'bzrlib.tests.test_bundle',
 
3410
                   'bzrlib.tests.test_bzrdir',
 
3411
                   'bzrlib.tests.test__chunks_to_lines',
 
3412
                   'bzrlib.tests.test_cache_utf8',
 
3413
                   'bzrlib.tests.test_chk_map',
 
3414
                   'bzrlib.tests.test_chk_serializer',
 
3415
                   'bzrlib.tests.test_chunk_writer',
 
3416
                   'bzrlib.tests.test_clean_tree',
 
3417
                   'bzrlib.tests.test_commands',
 
3418
                   'bzrlib.tests.test_commit',
 
3419
                   'bzrlib.tests.test_commit_merge',
 
3420
                   'bzrlib.tests.test_config',
 
3421
                   'bzrlib.tests.test_conflicts',
 
3422
                   'bzrlib.tests.test_counted_lock',
 
3423
                   'bzrlib.tests.test_decorators',
 
3424
                   'bzrlib.tests.test_delta',
 
3425
                   'bzrlib.tests.test_debug',
 
3426
                   'bzrlib.tests.test_deprecated_graph',
 
3427
                   'bzrlib.tests.test_diff',
 
3428
                   'bzrlib.tests.test_directory_service',
 
3429
                   'bzrlib.tests.test_dirstate',
 
3430
                   'bzrlib.tests.test_email_message',
 
3431
                   'bzrlib.tests.test_eol_filters',
 
3432
                   'bzrlib.tests.test_errors',
 
3433
                   'bzrlib.tests.test_export',
 
3434
                   'bzrlib.tests.test_extract',
 
3435
                   'bzrlib.tests.test_fetch',
 
3436
                   'bzrlib.tests.test_fifo_cache',
 
3437
                   'bzrlib.tests.test_filters',
 
3438
                   'bzrlib.tests.test_ftp_transport',
 
3439
                   'bzrlib.tests.test_foreign',
 
3440
                   'bzrlib.tests.test_generate_docs',
 
3441
                   'bzrlib.tests.test_generate_ids',
 
3442
                   'bzrlib.tests.test_globbing',
 
3443
                   'bzrlib.tests.test_gpg',
 
3444
                   'bzrlib.tests.test_graph',
 
3445
                   'bzrlib.tests.test_groupcompress',
 
3446
                   'bzrlib.tests.test_hashcache',
 
3447
                   'bzrlib.tests.test_help',
 
3448
                   'bzrlib.tests.test_hooks',
 
3449
                   'bzrlib.tests.test_http',
 
3450
                   'bzrlib.tests.test_http_response',
 
3451
                   'bzrlib.tests.test_https_ca_bundle',
 
3452
                   'bzrlib.tests.test_identitymap',
 
3453
                   'bzrlib.tests.test_ignores',
 
3454
                   'bzrlib.tests.test_index',
 
3455
                   'bzrlib.tests.test_info',
 
3456
                   'bzrlib.tests.test_inv',
 
3457
                   'bzrlib.tests.test_inventory_delta',
 
3458
                   'bzrlib.tests.test_knit',
 
3459
                   'bzrlib.tests.test_lazy_import',
 
3460
                   'bzrlib.tests.test_lazy_regex',
 
3461
                   'bzrlib.tests.test_lockable_files',
 
3462
                   'bzrlib.tests.test_lockdir',
 
3463
                   'bzrlib.tests.test_log',
 
3464
                   'bzrlib.tests.test_lru_cache',
 
3465
                   'bzrlib.tests.test_lsprof',
 
3466
                   'bzrlib.tests.test_mail_client',
 
3467
                   'bzrlib.tests.test_memorytree',
 
3468
                   'bzrlib.tests.test_merge',
 
3469
                   'bzrlib.tests.test_merge3',
 
3470
                   'bzrlib.tests.test_merge_core',
 
3471
                   'bzrlib.tests.test_merge_directive',
 
3472
                   'bzrlib.tests.test_missing',
 
3473
                   'bzrlib.tests.test_msgeditor',
 
3474
                   'bzrlib.tests.test_multiparent',
 
3475
                   'bzrlib.tests.test_mutabletree',
 
3476
                   'bzrlib.tests.test_nonascii',
 
3477
                   'bzrlib.tests.test_options',
 
3478
                   'bzrlib.tests.test_osutils',
 
3479
                   'bzrlib.tests.test_osutils_encodings',
 
3480
                   'bzrlib.tests.test_pack',
 
3481
                   'bzrlib.tests.test_pack_repository',
 
3482
                   'bzrlib.tests.test_patch',
 
3483
                   'bzrlib.tests.test_patches',
 
3484
                   'bzrlib.tests.test_permissions',
 
3485
                   'bzrlib.tests.test_plugins',
 
3486
                   'bzrlib.tests.test_progress',
 
3487
                   'bzrlib.tests.test_read_bundle',
 
3488
                   'bzrlib.tests.test_reconcile',
 
3489
                   'bzrlib.tests.test_reconfigure',
 
3490
                   'bzrlib.tests.test_registry',
 
3491
                   'bzrlib.tests.test_remote',
 
3492
                   'bzrlib.tests.test_rename_map',
 
3493
                   'bzrlib.tests.test_repository',
 
3494
                   'bzrlib.tests.test_revert',
 
3495
                   'bzrlib.tests.test_revision',
 
3496
                   'bzrlib.tests.test_revisionspec',
 
3497
                   'bzrlib.tests.test_revisiontree',
 
3498
                   'bzrlib.tests.test_rio',
 
3499
                   'bzrlib.tests.test_rules',
 
3500
                   'bzrlib.tests.test_sampler',
 
3501
                   'bzrlib.tests.test_selftest',
 
3502
                   'bzrlib.tests.test_serializer',
 
3503
                   'bzrlib.tests.test_setup',
 
3504
                   'bzrlib.tests.test_sftp_transport',
 
3505
                   'bzrlib.tests.test_shelf',
 
3506
                   'bzrlib.tests.test_shelf_ui',
 
3507
                   'bzrlib.tests.test_smart',
 
3508
                   'bzrlib.tests.test_smart_add',
 
3509
                   'bzrlib.tests.test_smart_request',
 
3510
                   'bzrlib.tests.test_smart_transport',
 
3511
                   'bzrlib.tests.test_smtp_connection',
 
3512
                   'bzrlib.tests.test_source',
 
3513
                   'bzrlib.tests.test_ssh_transport',
 
3514
                   'bzrlib.tests.test_status',
 
3515
                   'bzrlib.tests.test_store',
 
3516
                   'bzrlib.tests.test_strace',
 
3517
                   'bzrlib.tests.test_subsume',
 
3518
                   'bzrlib.tests.test_switch',
 
3519
                   'bzrlib.tests.test_symbol_versioning',
 
3520
                   'bzrlib.tests.test_tag',
 
3521
                   'bzrlib.tests.test_testament',
 
3522
                   'bzrlib.tests.test_textfile',
 
3523
                   'bzrlib.tests.test_textmerge',
 
3524
                   'bzrlib.tests.test_timestamp',
 
3525
                   'bzrlib.tests.test_trace',
 
3526
                   'bzrlib.tests.test_transactions',
 
3527
                   'bzrlib.tests.test_transform',
 
3528
                   'bzrlib.tests.test_transport',
 
3529
                   'bzrlib.tests.test_transport_log',
 
3530
                   'bzrlib.tests.test_tree',
 
3531
                   'bzrlib.tests.test_treebuilder',
 
3532
                   'bzrlib.tests.test_tsort',
 
3533
                   'bzrlib.tests.test_tuned_gzip',
 
3534
                   'bzrlib.tests.test_ui',
 
3535
                   'bzrlib.tests.test_uncommit',
 
3536
                   'bzrlib.tests.test_upgrade',
 
3537
                   'bzrlib.tests.test_upgrade_stacked',
 
3538
                   'bzrlib.tests.test_urlutils',
 
3539
                   'bzrlib.tests.test_version',
 
3540
                   'bzrlib.tests.test_version_info',
 
3541
                   'bzrlib.tests.test_versionedfile',
 
3542
                   'bzrlib.tests.test_weave',
 
3543
                   'bzrlib.tests.test_whitebox',
 
3544
                   'bzrlib.tests.test_win32utils',
 
3545
                   'bzrlib.tests.test_workingtree',
 
3546
                   'bzrlib.tests.test_workingtree_4',
 
3547
                   'bzrlib.tests.test_wsgi',
 
3548
                   'bzrlib.tests.test_xml',
59
3549
                   ]
60
3550
 
61
 
    # XXX: should also test bzrlib.merge_core, but they seem to be out
62
 
    # of date with the code.
63
 
 
64
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
65
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
66
 
        if m not in MODULES_TO_DOCTEST:
67
 
            MODULES_TO_DOCTEST.append(m)
68
 
 
69
 
    
70
 
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
71
 
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
72
 
 
73
 
    print
74
 
 
75
 
    suite = TestSuite()
76
 
 
77
 
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
78
 
 
79
 
    for m in MODULES_TO_TEST:
80
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
81
 
 
82
 
    for m in (MODULES_TO_DOCTEST):
83
 
        suite.addTest(DocTestSuite(m))
84
 
 
85
 
    for p in bzrlib.plugin.all_plugins:
86
 
        if hasattr(p, 'test_suite'):
87
 
            suite.addTest(p.test_suite())
88
 
 
89
 
    import bzrlib.merge_core
90
 
    suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
91
 
 
92
 
    return run_suite(suite, 'testbzr', verbose=verbose)
93
 
 
94
 
 
95
 
 
 
3551
    loader = TestUtil.TestLoader()
 
3552
 
 
3553
    if keep_only is not None:
 
3554
        id_filter = TestIdList(keep_only)
 
3555
    if starting_with:
 
3556
        starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3557
                         for start in starting_with]
 
3558
        # We take precedence over keep_only because *at loading time* using
 
3559
        # both options means we will load less tests for the same final result.
 
3560
        def interesting_module(name):
 
3561
            for start in starting_with:
 
3562
                if (
 
3563
                    # Either the module name starts with the specified string
 
3564
                    name.startswith(start)
 
3565
                    # or it may contain tests starting with the specified string
 
3566
                    or start.startswith(name)
 
3567
                    ):
 
3568
                    return True
 
3569
            return False
 
3570
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3571
 
 
3572
    elif keep_only is not None:
 
3573
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3574
        def interesting_module(name):
 
3575
            return id_filter.refers_to(name)
 
3576
 
 
3577
    else:
 
3578
        loader = TestUtil.TestLoader()
 
3579
        def interesting_module(name):
 
3580
            # No filtering, all modules are interesting
 
3581
            return True
 
3582
 
 
3583
    suite = loader.suiteClass()
 
3584
 
 
3585
    # modules building their suite with loadTestsFromModuleNames
 
3586
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
3587
 
 
3588
    modules_to_doctest = [
 
3589
        'bzrlib',
 
3590
        'bzrlib.branchbuilder',
 
3591
        'bzrlib.export',
 
3592
        'bzrlib.inventory',
 
3593
        'bzrlib.iterablefile',
 
3594
        'bzrlib.lockdir',
 
3595
        'bzrlib.merge3',
 
3596
        'bzrlib.option',
 
3597
        'bzrlib.symbol_versioning',
 
3598
        'bzrlib.tests',
 
3599
        'bzrlib.timestamp',
 
3600
        'bzrlib.version_info_formats.format_custom',
 
3601
        ]
 
3602
 
 
3603
    for mod in modules_to_doctest:
 
3604
        if not interesting_module(mod):
 
3605
            # No tests to keep here, move along
 
3606
            continue
 
3607
        try:
 
3608
            # note that this really does mean "report only" -- doctest
 
3609
            # still runs the rest of the examples
 
3610
            doc_suite = doctest.DocTestSuite(mod,
 
3611
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3612
        except ValueError, e:
 
3613
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3614
            raise
 
3615
        if len(doc_suite._tests) == 0:
 
3616
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3617
        suite.addTest(doc_suite)
 
3618
 
 
3619
    default_encoding = sys.getdefaultencoding()
 
3620
    for name, plugin in bzrlib.plugin.plugins().items():
 
3621
        if not interesting_module(plugin.module.__name__):
 
3622
            continue
 
3623
        plugin_suite = plugin.test_suite()
 
3624
        # We used to catch ImportError here and turn it into just a warning,
 
3625
        # but really if you don't have --no-plugins this should be a failure.
 
3626
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3627
        if plugin_suite is None:
 
3628
            plugin_suite = plugin.load_plugin_tests(loader)
 
3629
        if plugin_suite is not None:
 
3630
            suite.addTest(plugin_suite)
 
3631
        if default_encoding != sys.getdefaultencoding():
 
3632
            bzrlib.trace.warning(
 
3633
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3634
                sys.getdefaultencoding())
 
3635
            reload(sys)
 
3636
            sys.setdefaultencoding(default_encoding)
 
3637
 
 
3638
    if starting_with:
 
3639
        suite = filter_suite_by_id_startswith(suite, starting_with)
 
3640
 
 
3641
    if keep_only is not None:
 
3642
        # Now that the referred modules have loaded their tests, keep only the
 
3643
        # requested ones.
 
3644
        suite = filter_suite_by_id_list(suite, id_filter)
 
3645
        # Do some sanity checks on the id_list filtering
 
3646
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3647
        if starting_with:
 
3648
            # The tester has used both keep_only and starting_with, so he is
 
3649
            # already aware that some tests are excluded from the list, there
 
3650
            # is no need to tell him which.
 
3651
            pass
 
3652
        else:
 
3653
            # Some tests mentioned in the list are not in the test suite. The
 
3654
            # list may be out of date, report to the tester.
 
3655
            for id in not_found:
 
3656
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3657
        for id in duplicates:
 
3658
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3659
 
 
3660
    return suite
 
3661
 
 
3662
 
 
3663
def multiply_scenarios(scenarios_left, scenarios_right):
 
3664
    """Multiply two sets of scenarios.
 
3665
 
 
3666
    :returns: the cartesian product of the two sets of scenarios, that is
 
3667
        a scenario for every possible combination of a left scenario and a
 
3668
        right scenario.
 
3669
    """
 
3670
    return [
 
3671
        ('%s,%s' % (left_name, right_name),
 
3672
         dict(left_dict.items() + right_dict.items()))
 
3673
        for left_name, left_dict in scenarios_left
 
3674
        for right_name, right_dict in scenarios_right]
 
3675
 
 
3676
 
 
3677
def multiply_tests(tests, scenarios, result):
 
3678
    """Multiply tests_list by scenarios into result.
 
3679
 
 
3680
    This is the core workhorse for test parameterisation.
 
3681
 
 
3682
    Typically the load_tests() method for a per-implementation test suite will
 
3683
    call multiply_tests and return the result.
 
3684
 
 
3685
    :param tests: The tests to parameterise.
 
3686
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
3687
        scenario_param_dict).
 
3688
    :param result: A TestSuite to add created tests to.
 
3689
 
 
3690
    This returns the passed in result TestSuite with the cross product of all
 
3691
    the tests repeated once for each scenario.  Each test is adapted by adding
 
3692
    the scenario name at the end of its id(), and updating the test object's
 
3693
    __dict__ with the scenario_param_dict.
 
3694
 
 
3695
    >>> import bzrlib.tests.test_sampler
 
3696
    >>> r = multiply_tests(
 
3697
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
3698
    ...     [('one', dict(param=1)),
 
3699
    ...      ('two', dict(param=2))],
 
3700
    ...     TestSuite())
 
3701
    >>> tests = list(iter_suite_tests(r))
 
3702
    >>> len(tests)
 
3703
    2
 
3704
    >>> tests[0].id()
 
3705
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
3706
    >>> tests[0].param
 
3707
    1
 
3708
    >>> tests[1].param
 
3709
    2
 
3710
    """
 
3711
    for test in iter_suite_tests(tests):
 
3712
        apply_scenarios(test, scenarios, result)
 
3713
    return result
 
3714
 
 
3715
 
 
3716
def apply_scenarios(test, scenarios, result):
 
3717
    """Apply the scenarios in scenarios to test and add to result.
 
3718
 
 
3719
    :param test: The test to apply scenarios to.
 
3720
    :param scenarios: An iterable of scenarios to apply to test.
 
3721
    :return: result
 
3722
    :seealso: apply_scenario
 
3723
    """
 
3724
    for scenario in scenarios:
 
3725
        result.addTest(apply_scenario(test, scenario))
 
3726
    return result
 
3727
 
 
3728
 
 
3729
def apply_scenario(test, scenario):
 
3730
    """Copy test and apply scenario to it.
 
3731
 
 
3732
    :param test: A test to adapt.
 
3733
    :param scenario: A tuple describing the scenarion.
 
3734
        The first element of the tuple is the new test id.
 
3735
        The second element is a dict containing attributes to set on the
 
3736
        test.
 
3737
    :return: The adapted test.
 
3738
    """
 
3739
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
3740
    new_test = clone_test(test, new_id)
 
3741
    for name, value in scenario[1].items():
 
3742
        setattr(new_test, name, value)
 
3743
    return new_test
 
3744
 
 
3745
 
 
3746
def clone_test(test, new_id):
 
3747
    """Clone a test giving it a new id.
 
3748
 
 
3749
    :param test: The test to clone.
 
3750
    :param new_id: The id to assign to it.
 
3751
    :return: The new test.
 
3752
    """
 
3753
    from copy import deepcopy
 
3754
    new_test = deepcopy(test)
 
3755
    new_test.id = lambda: new_id
 
3756
    return new_test
 
3757
 
 
3758
 
 
3759
def _rmtree_temp_dir(dirname):
 
3760
    # If LANG=C we probably have created some bogus paths
 
3761
    # which rmtree(unicode) will fail to delete
 
3762
    # so make sure we are using rmtree(str) to delete everything
 
3763
    # except on win32, where rmtree(str) will fail
 
3764
    # since it doesn't have the property of byte-stream paths
 
3765
    # (they are either ascii or mbcs)
 
3766
    if sys.platform == 'win32':
 
3767
        # make sure we are using the unicode win32 api
 
3768
        dirname = unicode(dirname)
 
3769
    else:
 
3770
        dirname = dirname.encode(sys.getfilesystemencoding())
 
3771
    try:
 
3772
        osutils.rmtree(dirname)
 
3773
    except OSError, e:
 
3774
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
3775
            sys.stderr.write('Permission denied: '
 
3776
                             'unable to remove testing dir '
 
3777
                             '%s\n%s'
 
3778
                             % (os.path.basename(dirname), e))
 
3779
        else:
 
3780
            raise
 
3781
 
 
3782
 
 
3783
class Feature(object):
 
3784
    """An operating system Feature."""
 
3785
 
 
3786
    def __init__(self):
 
3787
        self._available = None
 
3788
 
 
3789
    def available(self):
 
3790
        """Is the feature available?
 
3791
 
 
3792
        :return: True if the feature is available.
 
3793
        """
 
3794
        if self._available is None:
 
3795
            self._available = self._probe()
 
3796
        return self._available
 
3797
 
 
3798
    def _probe(self):
 
3799
        """Implement this method in concrete features.
 
3800
 
 
3801
        :return: True if the feature is available.
 
3802
        """
 
3803
        raise NotImplementedError
 
3804
 
 
3805
    def __str__(self):
 
3806
        if getattr(self, 'feature_name', None):
 
3807
            return self.feature_name()
 
3808
        return self.__class__.__name__
 
3809
 
 
3810
 
 
3811
class _SymlinkFeature(Feature):
 
3812
 
 
3813
    def _probe(self):
 
3814
        return osutils.has_symlinks()
 
3815
 
 
3816
    def feature_name(self):
 
3817
        return 'symlinks'
 
3818
 
 
3819
SymlinkFeature = _SymlinkFeature()
 
3820
 
 
3821
 
 
3822
class _HardlinkFeature(Feature):
 
3823
 
 
3824
    def _probe(self):
 
3825
        return osutils.has_hardlinks()
 
3826
 
 
3827
    def feature_name(self):
 
3828
        return 'hardlinks'
 
3829
 
 
3830
HardlinkFeature = _HardlinkFeature()
 
3831
 
 
3832
 
 
3833
class _OsFifoFeature(Feature):
 
3834
 
 
3835
    def _probe(self):
 
3836
        return getattr(os, 'mkfifo', None)
 
3837
 
 
3838
    def feature_name(self):
 
3839
        return 'filesystem fifos'
 
3840
 
 
3841
OsFifoFeature = _OsFifoFeature()
 
3842
 
 
3843
 
 
3844
class _UnicodeFilenameFeature(Feature):
 
3845
    """Does the filesystem support Unicode filenames?"""
 
3846
 
 
3847
    def _probe(self):
 
3848
        try:
 
3849
            # Check for character combinations unlikely to be covered by any
 
3850
            # single non-unicode encoding. We use the characters
 
3851
            # - greek small letter alpha (U+03B1) and
 
3852
            # - braille pattern dots-123456 (U+283F).
 
3853
            os.stat(u'\u03b1\u283f')
 
3854
        except UnicodeEncodeError:
 
3855
            return False
 
3856
        except (IOError, OSError):
 
3857
            # The filesystem allows the Unicode filename but the file doesn't
 
3858
            # exist.
 
3859
            return True
 
3860
        else:
 
3861
            # The filesystem allows the Unicode filename and the file exists,
 
3862
            # for some reason.
 
3863
            return True
 
3864
 
 
3865
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
3866
 
 
3867
 
 
3868
def probe_unicode_in_user_encoding():
 
3869
    """Try to encode several unicode strings to use in unicode-aware tests.
 
3870
    Return first successfull match.
 
3871
 
 
3872
    :return:  (unicode value, encoded plain string value) or (None, None)
 
3873
    """
 
3874
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
3875
    for uni_val in possible_vals:
 
3876
        try:
 
3877
            str_val = uni_val.encode(osutils.get_user_encoding())
 
3878
        except UnicodeEncodeError:
 
3879
            # Try a different character
 
3880
            pass
 
3881
        else:
 
3882
            return uni_val, str_val
 
3883
    return None, None
 
3884
 
 
3885
 
 
3886
def probe_bad_non_ascii(encoding):
 
3887
    """Try to find [bad] character with code [128..255]
 
3888
    that cannot be decoded to unicode in some encoding.
 
3889
    Return None if all non-ascii characters is valid
 
3890
    for given encoding.
 
3891
    """
 
3892
    for i in xrange(128, 256):
 
3893
        char = chr(i)
 
3894
        try:
 
3895
            char.decode(encoding)
 
3896
        except UnicodeDecodeError:
 
3897
            return char
 
3898
    return None
 
3899
 
 
3900
 
 
3901
class _HTTPSServerFeature(Feature):
 
3902
    """Some tests want an https Server, check if one is available.
 
3903
 
 
3904
    Right now, the only way this is available is under python2.6 which provides
 
3905
    an ssl module.
 
3906
    """
 
3907
 
 
3908
    def _probe(self):
 
3909
        try:
 
3910
            import ssl
 
3911
            return True
 
3912
        except ImportError:
 
3913
            return False
 
3914
 
 
3915
    def feature_name(self):
 
3916
        return 'HTTPSServer'
 
3917
 
 
3918
 
 
3919
HTTPSServerFeature = _HTTPSServerFeature()
 
3920
 
 
3921
 
 
3922
class _UnicodeFilename(Feature):
 
3923
    """Does the filesystem support Unicode filenames?"""
 
3924
 
 
3925
    def _probe(self):
 
3926
        try:
 
3927
            os.stat(u'\u03b1')
 
3928
        except UnicodeEncodeError:
 
3929
            return False
 
3930
        except (IOError, OSError):
 
3931
            # The filesystem allows the Unicode filename but the file doesn't
 
3932
            # exist.
 
3933
            return True
 
3934
        else:
 
3935
            # The filesystem allows the Unicode filename and the file exists,
 
3936
            # for some reason.
 
3937
            return True
 
3938
 
 
3939
UnicodeFilename = _UnicodeFilename()
 
3940
 
 
3941
 
 
3942
class _UTF8Filesystem(Feature):
 
3943
    """Is the filesystem UTF-8?"""
 
3944
 
 
3945
    def _probe(self):
 
3946
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
3947
            return True
 
3948
        return False
 
3949
 
 
3950
UTF8Filesystem = _UTF8Filesystem()
 
3951
 
 
3952
 
 
3953
class _CaseInsCasePresFilenameFeature(Feature):
 
3954
    """Is the file-system case insensitive, but case-preserving?"""
 
3955
 
 
3956
    def _probe(self):
 
3957
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
3958
        try:
 
3959
            # first check truly case-preserving for created files, then check
 
3960
            # case insensitive when opening existing files.
 
3961
            name = osutils.normpath(name)
 
3962
            base, rel = osutils.split(name)
 
3963
            found_rel = osutils.canonical_relpath(base, name)
 
3964
            return (found_rel == rel
 
3965
                    and os.path.isfile(name.upper())
 
3966
                    and os.path.isfile(name.lower()))
 
3967
        finally:
 
3968
            os.close(fileno)
 
3969
            os.remove(name)
 
3970
 
 
3971
    def feature_name(self):
 
3972
        return "case-insensitive case-preserving filesystem"
 
3973
 
 
3974
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
3975
 
 
3976
 
 
3977
class _CaseInsensitiveFilesystemFeature(Feature):
 
3978
    """Check if underlying filesystem is case-insensitive but *not* case
 
3979
    preserving.
 
3980
    """
 
3981
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
3982
    # more likely to be case preserving, so this case is rare.
 
3983
 
 
3984
    def _probe(self):
 
3985
        if CaseInsCasePresFilenameFeature.available():
 
3986
            return False
 
3987
 
 
3988
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
3989
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
3990
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
3991
        else:
 
3992
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
3993
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
3994
            dir=root)
 
3995
        name_a = osutils.pathjoin(tdir, 'a')
 
3996
        name_A = osutils.pathjoin(tdir, 'A')
 
3997
        os.mkdir(name_a)
 
3998
        result = osutils.isdir(name_A)
 
3999
        _rmtree_temp_dir(tdir)
 
4000
        return result
 
4001
 
 
4002
    def feature_name(self):
 
4003
        return 'case-insensitive filesystem'
 
4004
 
 
4005
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4006
 
 
4007
 
 
4008
class _SubUnitFeature(Feature):
 
4009
    """Check if subunit is available."""
 
4010
 
 
4011
    def _probe(self):
 
4012
        try:
 
4013
            import subunit
 
4014
            return True
 
4015
        except ImportError:
 
4016
            return False
 
4017
 
 
4018
    def feature_name(self):
 
4019
        return 'subunit'
 
4020
 
 
4021
SubUnitFeature = _SubUnitFeature()
 
4022
# Only define SubUnitBzrRunner if subunit is available.
 
4023
try:
 
4024
    from subunit import TestProtocolClient
 
4025
    try:
 
4026
        from subunit.test_results import AutoTimingTestResultDecorator
 
4027
    except ImportError:
 
4028
        AutoTimingTestResultDecorator = lambda x:x
 
4029
    class SubUnitBzrRunner(TextTestRunner):
 
4030
        def run(self, test):
 
4031
            result = AutoTimingTestResultDecorator(
 
4032
                TestProtocolClient(self.stream))
 
4033
            test.run(result)
 
4034
            return result
 
4035
except ImportError:
 
4036
    pass