/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2009-10-19 15:39:25 UTC
  • mto: This revision was merged to the branch mainline in revision 4758.
  • Revision ID: john@arbash-meinel.com-20091019153925-pkvnaoho6a2aawj7
Start exposing an GraphIndex.clear_cache() member.
This is exposed on GraphIndex, CombinedGraphIndex and BTreeGraphIndex.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from copy import copy
 
32
from cStringIO import StringIO
 
33
import difflib
 
34
import doctest
 
35
import errno
 
36
import logging
 
37
import math
 
38
import os
 
39
from pprint import pformat
 
40
import random
 
41
import re
 
42
import shlex
 
43
import stat
 
44
from subprocess import Popen, PIPE, STDOUT
 
45
import sys
 
46
import tempfile
 
47
import threading
 
48
import time
 
49
import unittest
 
50
import warnings
 
51
 
 
52
 
 
53
from bzrlib import (
 
54
    branchbuilder,
 
55
    bzrdir,
 
56
    config,
 
57
    debug,
 
58
    errors,
 
59
    hooks,
 
60
    lock as _mod_lock,
 
61
    memorytree,
 
62
    osutils,
 
63
    progress,
 
64
    ui,
 
65
    urlutils,
 
66
    registry,
 
67
    workingtree,
 
68
    )
 
69
import bzrlib.branch
 
70
import bzrlib.commands
 
71
import bzrlib.timestamp
 
72
import bzrlib.export
 
73
import bzrlib.inventory
 
74
import bzrlib.iterablefile
 
75
import bzrlib.lockdir
 
76
try:
 
77
    import bzrlib.lsprof
 
78
except ImportError:
 
79
    # lsprof not available
 
80
    pass
 
81
from bzrlib.merge import merge_inner
 
82
import bzrlib.merge3
 
83
import bzrlib.plugin
 
84
from bzrlib.smart import client, request, server
 
85
import bzrlib.store
 
86
from bzrlib import symbol_versioning
 
87
from bzrlib.symbol_versioning import (
 
88
    DEPRECATED_PARAMETER,
 
89
    deprecated_function,
 
90
    deprecated_method,
 
91
    deprecated_passed,
 
92
    )
 
93
import bzrlib.trace
 
94
from bzrlib.transport import get_transport, pathfilter
 
95
import bzrlib.transport
 
96
from bzrlib.transport.local import LocalURLServer
 
97
from bzrlib.transport.memory import MemoryServer
 
98
from bzrlib.transport.readonly import ReadonlyServer
 
99
from bzrlib.trace import mutter, note
 
100
from bzrlib.tests import TestUtil
 
101
from bzrlib.tests.http_server import HttpServer
 
102
from bzrlib.tests.TestUtil import (
 
103
                          TestSuite,
 
104
                          TestLoader,
 
105
                          )
 
106
from bzrlib.tests.treeshape import build_tree_contents
 
107
from bzrlib.ui import NullProgressView
 
108
from bzrlib.ui.text import TextUIFactory
 
109
import bzrlib.version_info_formats.format_custom
 
110
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
111
 
 
112
# Mark this python module as being part of the implementation
 
113
# of unittest: this gives us better tracebacks where the last
 
114
# shown frame is the test code, not our assertXYZ.
 
115
__unittest = 1
 
116
 
 
117
default_transport = LocalURLServer
 
118
 
 
119
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
120
SUBUNIT_SEEK_SET = 0
 
121
SUBUNIT_SEEK_CUR = 1
 
122
 
 
123
 
 
124
class ExtendedTestResult(unittest._TextTestResult):
 
125
    """Accepts, reports and accumulates the results of running tests.
 
126
 
 
127
    Compared to the unittest version this class adds support for
 
128
    profiling, benchmarking, stopping as soon as a test fails,  and
 
129
    skipping tests.  There are further-specialized subclasses for
 
130
    different types of display.
 
131
 
 
132
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
133
    addFailure or addError classes.  These in turn may redirect to a more
 
134
    specific case for the special test results supported by our extended
 
135
    tests.
 
136
 
 
137
    Note that just one of these objects is fed the results from many tests.
 
138
    """
 
139
 
 
140
    stop_early = False
 
141
 
 
142
    def __init__(self, stream, descriptions, verbosity,
 
143
                 bench_history=None,
 
144
                 strict=False,
 
145
                 ):
 
146
        """Construct new TestResult.
 
147
 
 
148
        :param bench_history: Optionally, a writable file object to accumulate
 
149
            benchmark results.
 
150
        """
 
151
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
152
        if bench_history is not None:
 
153
            from bzrlib.version import _get_bzr_source_tree
 
154
            src_tree = _get_bzr_source_tree()
 
155
            if src_tree:
 
156
                try:
 
157
                    revision_id = src_tree.get_parent_ids()[0]
 
158
                except IndexError:
 
159
                    # XXX: if this is a brand new tree, do the same as if there
 
160
                    # is no branch.
 
161
                    revision_id = ''
 
162
            else:
 
163
                # XXX: If there's no branch, what should we do?
 
164
                revision_id = ''
 
165
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
166
        self._bench_history = bench_history
 
167
        self.ui = ui.ui_factory
 
168
        self.num_tests = 0
 
169
        self.error_count = 0
 
170
        self.failure_count = 0
 
171
        self.known_failure_count = 0
 
172
        self.skip_count = 0
 
173
        self.not_applicable_count = 0
 
174
        self.unsupported = {}
 
175
        self.count = 0
 
176
        self._overall_start_time = time.time()
 
177
        self._strict = strict
 
178
 
 
179
    def stopTestRun(self):
 
180
        run = self.testsRun
 
181
        actionTaken = "Ran"
 
182
        stopTime = time.time()
 
183
        timeTaken = stopTime - self.startTime
 
184
        self.printErrors()
 
185
        self.stream.writeln(self.separator2)
 
186
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
187
                            run, run != 1 and "s" or "", timeTaken))
 
188
        self.stream.writeln()
 
189
        if not self.wasSuccessful():
 
190
            self.stream.write("FAILED (")
 
191
            failed, errored = map(len, (self.failures, self.errors))
 
192
            if failed:
 
193
                self.stream.write("failures=%d" % failed)
 
194
            if errored:
 
195
                if failed: self.stream.write(", ")
 
196
                self.stream.write("errors=%d" % errored)
 
197
            if self.known_failure_count:
 
198
                if failed or errored: self.stream.write(", ")
 
199
                self.stream.write("known_failure_count=%d" %
 
200
                    self.known_failure_count)
 
201
            self.stream.writeln(")")
 
202
        else:
 
203
            if self.known_failure_count:
 
204
                self.stream.writeln("OK (known_failures=%d)" %
 
205
                    self.known_failure_count)
 
206
            else:
 
207
                self.stream.writeln("OK")
 
208
        if self.skip_count > 0:
 
209
            skipped = self.skip_count
 
210
            self.stream.writeln('%d test%s skipped' %
 
211
                                (skipped, skipped != 1 and "s" or ""))
 
212
        if self.unsupported:
 
213
            for feature, count in sorted(self.unsupported.items()):
 
214
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
215
                    (feature, count))
 
216
        if self._strict:
 
217
            ok = self.wasStrictlySuccessful()
 
218
        else:
 
219
            ok = self.wasSuccessful()
 
220
        if TestCase._first_thread_leaker_id:
 
221
            self.stream.write(
 
222
                '%s is leaking threads among %d leaking tests.\n' % (
 
223
                TestCase._first_thread_leaker_id,
 
224
                TestCase._leaking_threads_tests))
 
225
            # We don't report the main thread as an active one.
 
226
            self.stream.write(
 
227
                '%d non-main threads were left active in the end.\n'
 
228
                % (TestCase._active_threads - 1))
 
229
 
 
230
    def _extractBenchmarkTime(self, testCase):
 
231
        """Add a benchmark time for the current test case."""
 
232
        return getattr(testCase, "_benchtime", None)
 
233
 
 
234
    def _elapsedTestTimeString(self):
 
235
        """Return a time string for the overall time the current test has taken."""
 
236
        return self._formatTime(time.time() - self._start_time)
 
237
 
 
238
    def _testTimeString(self, testCase):
 
239
        benchmark_time = self._extractBenchmarkTime(testCase)
 
240
        if benchmark_time is not None:
 
241
            return self._formatTime(benchmark_time) + "*"
 
242
        else:
 
243
            return self._elapsedTestTimeString()
 
244
 
 
245
    def _formatTime(self, seconds):
 
246
        """Format seconds as milliseconds with leading spaces."""
 
247
        # some benchmarks can take thousands of seconds to run, so we need 8
 
248
        # places
 
249
        return "%8dms" % (1000 * seconds)
 
250
 
 
251
    def _shortened_test_description(self, test):
 
252
        what = test.id()
 
253
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
254
        return what
 
255
 
 
256
    def startTest(self, test):
 
257
        unittest.TestResult.startTest(self, test)
 
258
        if self.count == 0:
 
259
            self.startTests()
 
260
        self.report_test_start(test)
 
261
        test.number = self.count
 
262
        self._recordTestStartTime()
 
263
 
 
264
    def startTests(self):
 
265
        import platform
 
266
        if getattr(sys, 'frozen', None) is None:
 
267
            bzr_path = osutils.realpath(sys.argv[0])
 
268
        else:
 
269
            bzr_path = sys.executable
 
270
        self.stream.write(
 
271
            'testing: %s\n' % (bzr_path,))
 
272
        self.stream.write(
 
273
            '   %s\n' % (
 
274
                    bzrlib.__path__[0],))
 
275
        self.stream.write(
 
276
            '   bzr-%s python-%s %s\n' % (
 
277
                    bzrlib.version_string,
 
278
                    bzrlib._format_version_tuple(sys.version_info),
 
279
                    platform.platform(aliased=1),
 
280
                    ))
 
281
        self.stream.write('\n')
 
282
 
 
283
    def _recordTestStartTime(self):
 
284
        """Record that a test has started."""
 
285
        self._start_time = time.time()
 
286
 
 
287
    def _cleanupLogFile(self, test):
 
288
        # We can only do this if we have one of our TestCases, not if
 
289
        # we have a doctest.
 
290
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
291
        if setKeepLogfile is not None:
 
292
            setKeepLogfile()
 
293
 
 
294
    def addError(self, test, err):
 
295
        """Tell result that test finished with an error.
 
296
 
 
297
        Called from the TestCase run() method when the test
 
298
        fails with an unexpected error.
 
299
        """
 
300
        self._testConcluded(test)
 
301
        if isinstance(err[1], TestNotApplicable):
 
302
            return self._addNotApplicable(test, err)
 
303
        elif isinstance(err[1], UnavailableFeature):
 
304
            return self.addNotSupported(test, err[1].args[0])
 
305
        else:
 
306
            self._post_mortem()
 
307
            unittest.TestResult.addError(self, test, err)
 
308
            self.error_count += 1
 
309
            self.report_error(test, err)
 
310
            if self.stop_early:
 
311
                self.stop()
 
312
            self._cleanupLogFile(test)
 
313
 
 
314
    def addFailure(self, test, err):
 
315
        """Tell result that test failed.
 
316
 
 
317
        Called from the TestCase run() method when the test
 
318
        fails because e.g. an assert() method failed.
 
319
        """
 
320
        self._testConcluded(test)
 
321
        if isinstance(err[1], KnownFailure):
 
322
            return self._addKnownFailure(test, err)
 
323
        else:
 
324
            self._post_mortem()
 
325
            unittest.TestResult.addFailure(self, test, err)
 
326
            self.failure_count += 1
 
327
            self.report_failure(test, err)
 
328
            if self.stop_early:
 
329
                self.stop()
 
330
            self._cleanupLogFile(test)
 
331
 
 
332
    def addSuccess(self, test):
 
333
        """Tell result that test completed successfully.
 
334
 
 
335
        Called from the TestCase run()
 
336
        """
 
337
        self._testConcluded(test)
 
338
        if self._bench_history is not None:
 
339
            benchmark_time = self._extractBenchmarkTime(test)
 
340
            if benchmark_time is not None:
 
341
                self._bench_history.write("%s %s\n" % (
 
342
                    self._formatTime(benchmark_time),
 
343
                    test.id()))
 
344
        self.report_success(test)
 
345
        self._cleanupLogFile(test)
 
346
        unittest.TestResult.addSuccess(self, test)
 
347
        test._log_contents = ''
 
348
 
 
349
    def _testConcluded(self, test):
 
350
        """Common code when a test has finished.
 
351
 
 
352
        Called regardless of whether it succeded, failed, etc.
 
353
        """
 
354
        pass
 
355
 
 
356
    def _addKnownFailure(self, test, err):
 
357
        self.known_failure_count += 1
 
358
        self.report_known_failure(test, err)
 
359
 
 
360
    def addNotSupported(self, test, feature):
 
361
        """The test will not be run because of a missing feature.
 
362
        """
 
363
        # this can be called in two different ways: it may be that the
 
364
        # test started running, and then raised (through addError)
 
365
        # UnavailableFeature.  Alternatively this method can be called
 
366
        # while probing for features before running the tests; in that
 
367
        # case we will see startTest and stopTest, but the test will never
 
368
        # actually run.
 
369
        self.unsupported.setdefault(str(feature), 0)
 
370
        self.unsupported[str(feature)] += 1
 
371
        self.report_unsupported(test, feature)
 
372
 
 
373
    def addSkip(self, test, reason):
 
374
        """A test has not run for 'reason'."""
 
375
        self.skip_count += 1
 
376
        self.report_skip(test, reason)
 
377
 
 
378
    def _addNotApplicable(self, test, skip_excinfo):
 
379
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
380
            self.not_applicable_count += 1
 
381
            self.report_not_applicable(test, skip_excinfo)
 
382
        try:
 
383
            test.tearDown()
 
384
        except KeyboardInterrupt:
 
385
            raise
 
386
        except:
 
387
            self.addError(test, test.exc_info())
 
388
        else:
 
389
            # seems best to treat this as success from point-of-view of unittest
 
390
            # -- it actually does nothing so it barely matters :)
 
391
            unittest.TestResult.addSuccess(self, test)
 
392
            test._log_contents = ''
 
393
 
 
394
    def printErrorList(self, flavour, errors):
 
395
        for test, err in errors:
 
396
            self.stream.writeln(self.separator1)
 
397
            self.stream.write("%s: " % flavour)
 
398
            self.stream.writeln(self.getDescription(test))
 
399
            if getattr(test, '_get_log', None) is not None:
 
400
                log_contents = test._get_log()
 
401
                if log_contents:
 
402
                    self.stream.write('\n')
 
403
                    self.stream.write(
 
404
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
405
                    self.stream.write('\n')
 
406
                    self.stream.write(log_contents)
 
407
                    self.stream.write('\n')
 
408
                    self.stream.write(
 
409
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
410
                    self.stream.write('\n')
 
411
            self.stream.writeln(self.separator2)
 
412
            self.stream.writeln("%s" % err)
 
413
 
 
414
    def _post_mortem(self):
 
415
        """Start a PDB post mortem session."""
 
416
        if os.environ.get('BZR_TEST_PDB', None):
 
417
            import pdb;pdb.post_mortem()
 
418
 
 
419
    def progress(self, offset, whence):
 
420
        """The test is adjusting the count of tests to run."""
 
421
        if whence == SUBUNIT_SEEK_SET:
 
422
            self.num_tests = offset
 
423
        elif whence == SUBUNIT_SEEK_CUR:
 
424
            self.num_tests += offset
 
425
        else:
 
426
            raise errors.BzrError("Unknown whence %r" % whence)
 
427
 
 
428
    def report_cleaning_up(self):
 
429
        pass
 
430
 
 
431
    def startTestRun(self):
 
432
        self.startTime = time.time()
 
433
 
 
434
    def report_success(self, test):
 
435
        pass
 
436
 
 
437
    def wasStrictlySuccessful(self):
 
438
        if self.unsupported or self.known_failure_count:
 
439
            return False
 
440
        return self.wasSuccessful()
 
441
 
 
442
 
 
443
class TextTestResult(ExtendedTestResult):
 
444
    """Displays progress and results of tests in text form"""
 
445
 
 
446
    def __init__(self, stream, descriptions, verbosity,
 
447
                 bench_history=None,
 
448
                 pb=None,
 
449
                 strict=None,
 
450
                 ):
 
451
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
452
            bench_history, strict)
 
453
        # We no longer pass them around, but just rely on the UIFactory stack
 
454
        # for state
 
455
        if pb is not None:
 
456
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
457
        self.pb = self.ui.nested_progress_bar()
 
458
        self.pb.show_pct = False
 
459
        self.pb.show_spinner = False
 
460
        self.pb.show_eta = False,
 
461
        self.pb.show_count = False
 
462
        self.pb.show_bar = False
 
463
        self.pb.update_latency = 0
 
464
        self.pb.show_transport_activity = False
 
465
 
 
466
    def stopTestRun(self):
 
467
        # called when the tests that are going to run have run
 
468
        self.pb.clear()
 
469
        self.pb.finished()
 
470
        super(TextTestResult, self).stopTestRun()
 
471
 
 
472
    def startTestRun(self):
 
473
        super(TextTestResult, self).startTestRun()
 
474
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
475
 
 
476
    def printErrors(self):
 
477
        # clear the pb to make room for the error listing
 
478
        self.pb.clear()
 
479
        super(TextTestResult, self).printErrors()
 
480
 
 
481
    def _progress_prefix_text(self):
 
482
        # the longer this text, the less space we have to show the test
 
483
        # name...
 
484
        a = '[%d' % self.count              # total that have been run
 
485
        # tests skipped as known not to be relevant are not important enough
 
486
        # to show here
 
487
        ## if self.skip_count:
 
488
        ##     a += ', %d skip' % self.skip_count
 
489
        ## if self.known_failure_count:
 
490
        ##     a += '+%dX' % self.known_failure_count
 
491
        if self.num_tests:
 
492
            a +='/%d' % self.num_tests
 
493
        a += ' in '
 
494
        runtime = time.time() - self._overall_start_time
 
495
        if runtime >= 60:
 
496
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
497
        else:
 
498
            a += '%ds' % runtime
 
499
        if self.error_count:
 
500
            a += ', %d err' % self.error_count
 
501
        if self.failure_count:
 
502
            a += ', %d fail' % self.failure_count
 
503
        # if self.unsupported:
 
504
        #     a += ', %d missing' % len(self.unsupported)
 
505
        a += ']'
 
506
        return a
 
507
 
 
508
    def report_test_start(self, test):
 
509
        self.count += 1
 
510
        self.pb.update(
 
511
                self._progress_prefix_text()
 
512
                + ' '
 
513
                + self._shortened_test_description(test))
 
514
 
 
515
    def _test_description(self, test):
 
516
        return self._shortened_test_description(test)
 
517
 
 
518
    def report_error(self, test, err):
 
519
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
 
520
            self._test_description(test),
 
521
            err[1],
 
522
            ))
 
523
 
 
524
    def report_failure(self, test, err):
 
525
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
 
526
            self._test_description(test),
 
527
            err[1],
 
528
            ))
 
529
 
 
530
    def report_known_failure(self, test, err):
 
531
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
 
532
            self._test_description(test), err[1]))
 
533
 
 
534
    def report_skip(self, test, reason):
 
535
        pass
 
536
 
 
537
    def report_not_applicable(self, test, skip_excinfo):
 
538
        pass
 
539
 
 
540
    def report_unsupported(self, test, feature):
 
541
        """test cannot be run because feature is missing."""
 
542
 
 
543
    def report_cleaning_up(self):
 
544
        self.pb.update('Cleaning up')
 
545
 
 
546
 
 
547
class VerboseTestResult(ExtendedTestResult):
 
548
    """Produce long output, with one line per test run plus times"""
 
549
 
 
550
    def _ellipsize_to_right(self, a_string, final_width):
 
551
        """Truncate and pad a string, keeping the right hand side"""
 
552
        if len(a_string) > final_width:
 
553
            result = '...' + a_string[3-final_width:]
 
554
        else:
 
555
            result = a_string
 
556
        return result.ljust(final_width)
 
557
 
 
558
    def startTestRun(self):
 
559
        super(VerboseTestResult, self).startTestRun()
 
560
        self.stream.write('running %d tests...\n' % self.num_tests)
 
561
 
 
562
    def report_test_start(self, test):
 
563
        self.count += 1
 
564
        name = self._shortened_test_description(test)
 
565
        # width needs space for 6 char status, plus 1 for slash, plus an
 
566
        # 11-char time string, plus a trailing blank
 
567
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
568
        self.stream.write(self._ellipsize_to_right(name,
 
569
                          osutils.terminal_width()-18))
 
570
        self.stream.flush()
 
571
 
 
572
    def _error_summary(self, err):
 
573
        indent = ' ' * 4
 
574
        return '%s%s' % (indent, err[1])
 
575
 
 
576
    def report_error(self, test, err):
 
577
        self.stream.writeln('ERROR %s\n%s'
 
578
                % (self._testTimeString(test),
 
579
                   self._error_summary(err)))
 
580
 
 
581
    def report_failure(self, test, err):
 
582
        self.stream.writeln(' FAIL %s\n%s'
 
583
                % (self._testTimeString(test),
 
584
                   self._error_summary(err)))
 
585
 
 
586
    def report_known_failure(self, test, err):
 
587
        self.stream.writeln('XFAIL %s\n%s'
 
588
                % (self._testTimeString(test),
 
589
                   self._error_summary(err)))
 
590
 
 
591
    def report_success(self, test):
 
592
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
593
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
594
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
595
            stats.pprint(file=self.stream)
 
596
        # flush the stream so that we get smooth output. This verbose mode is
 
597
        # used to show the output in PQM.
 
598
        self.stream.flush()
 
599
 
 
600
    def report_skip(self, test, reason):
 
601
        self.stream.writeln(' SKIP %s\n%s'
 
602
                % (self._testTimeString(test), reason))
 
603
 
 
604
    def report_not_applicable(self, test, skip_excinfo):
 
605
        self.stream.writeln('  N/A %s\n%s'
 
606
                % (self._testTimeString(test),
 
607
                   self._error_summary(skip_excinfo)))
 
608
 
 
609
    def report_unsupported(self, test, feature):
 
610
        """test cannot be run because feature is missing."""
 
611
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
612
                %(self._testTimeString(test), feature))
 
613
 
 
614
 
 
615
class TextTestRunner(object):
 
616
    stop_on_failure = False
 
617
 
 
618
    def __init__(self,
 
619
                 stream=sys.stderr,
 
620
                 descriptions=0,
 
621
                 verbosity=1,
 
622
                 bench_history=None,
 
623
                 strict=False,
 
624
                 result_decorators=None,
 
625
                 ):
 
626
        """Create a TextTestRunner.
 
627
 
 
628
        :param result_decorators: An optional list of decorators to apply
 
629
            to the result object being used by the runner. Decorators are
 
630
            applied left to right - the first element in the list is the 
 
631
            innermost decorator.
 
632
        """
 
633
        self.stream = unittest._WritelnDecorator(stream)
 
634
        self.descriptions = descriptions
 
635
        self.verbosity = verbosity
 
636
        self._bench_history = bench_history
 
637
        self._strict = strict
 
638
        self._result_decorators = result_decorators or []
 
639
 
 
640
    def run(self, test):
 
641
        "Run the given test case or test suite."
 
642
        if self.verbosity == 1:
 
643
            result_class = TextTestResult
 
644
        elif self.verbosity >= 2:
 
645
            result_class = VerboseTestResult
 
646
        original_result = result_class(self.stream,
 
647
                              self.descriptions,
 
648
                              self.verbosity,
 
649
                              bench_history=self._bench_history,
 
650
                              strict=self._strict,
 
651
                              )
 
652
        # Signal to result objects that look at stop early policy to stop,
 
653
        original_result.stop_early = self.stop_on_failure
 
654
        result = original_result
 
655
        for decorator in self._result_decorators:
 
656
            result = decorator(result)
 
657
            result.stop_early = self.stop_on_failure
 
658
        try:
 
659
            import testtools
 
660
        except ImportError:
 
661
            pass
 
662
        else:
 
663
            if isinstance(test, testtools.ConcurrentTestSuite):
 
664
                # We need to catch bzr specific behaviors
 
665
                result = BZRTransformingResult(result)
 
666
        result.startTestRun()
 
667
        try:
 
668
            test.run(result)
 
669
        finally:
 
670
            result.stopTestRun()
 
671
        # higher level code uses our extended protocol to determine
 
672
        # what exit code to give.
 
673
        return original_result
 
674
 
 
675
 
 
676
def iter_suite_tests(suite):
 
677
    """Return all tests in a suite, recursing through nested suites"""
 
678
    if isinstance(suite, unittest.TestCase):
 
679
        yield suite
 
680
    elif isinstance(suite, unittest.TestSuite):
 
681
        for item in suite:
 
682
            for r in iter_suite_tests(item):
 
683
                yield r
 
684
    else:
 
685
        raise Exception('unknown type %r for object %r'
 
686
                        % (type(suite), suite))
 
687
 
 
688
 
 
689
class TestSkipped(Exception):
 
690
    """Indicates that a test was intentionally skipped, rather than failing."""
 
691
 
 
692
 
 
693
class TestNotApplicable(TestSkipped):
 
694
    """A test is not applicable to the situation where it was run.
 
695
 
 
696
    This is only normally raised by parameterized tests, if they find that
 
697
    the instance they're constructed upon does not support one aspect
 
698
    of its interface.
 
699
    """
 
700
 
 
701
 
 
702
class KnownFailure(AssertionError):
 
703
    """Indicates that a test failed in a precisely expected manner.
 
704
 
 
705
    Such failures dont block the whole test suite from passing because they are
 
706
    indicators of partially completed code or of future work. We have an
 
707
    explicit error for them so that we can ensure that they are always visible:
 
708
    KnownFailures are always shown in the output of bzr selftest.
 
709
    """
 
710
 
 
711
 
 
712
class UnavailableFeature(Exception):
 
713
    """A feature required for this test was not available.
 
714
 
 
715
    The feature should be used to construct the exception.
 
716
    """
 
717
 
 
718
 
 
719
class CommandFailed(Exception):
 
720
    pass
 
721
 
 
722
 
 
723
class StringIOWrapper(object):
 
724
    """A wrapper around cStringIO which just adds an encoding attribute.
 
725
 
 
726
    Internally we can check sys.stdout to see what the output encoding
 
727
    should be. However, cStringIO has no encoding attribute that we can
 
728
    set. So we wrap it instead.
 
729
    """
 
730
    encoding='ascii'
 
731
    _cstring = None
 
732
 
 
733
    def __init__(self, s=None):
 
734
        if s is not None:
 
735
            self.__dict__['_cstring'] = StringIO(s)
 
736
        else:
 
737
            self.__dict__['_cstring'] = StringIO()
 
738
 
 
739
    def __getattr__(self, name, getattr=getattr):
 
740
        return getattr(self.__dict__['_cstring'], name)
 
741
 
 
742
    def __setattr__(self, name, val):
 
743
        if name == 'encoding':
 
744
            self.__dict__['encoding'] = val
 
745
        else:
 
746
            return setattr(self._cstring, name, val)
 
747
 
 
748
 
 
749
class TestUIFactory(TextUIFactory):
 
750
    """A UI Factory for testing.
 
751
 
 
752
    Hide the progress bar but emit note()s.
 
753
    Redirect stdin.
 
754
    Allows get_password to be tested without real tty attached.
 
755
 
 
756
    See also CannedInputUIFactory which lets you provide programmatic input in
 
757
    a structured way.
 
758
    """
 
759
    # TODO: Capture progress events at the model level and allow them to be
 
760
    # observed by tests that care.
 
761
    #
 
762
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
763
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
764
    # idea of how they're supposed to be different.
 
765
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
766
 
 
767
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
768
        if stdin is not None:
 
769
            # We use a StringIOWrapper to be able to test various
 
770
            # encodings, but the user is still responsible to
 
771
            # encode the string and to set the encoding attribute
 
772
            # of StringIOWrapper.
 
773
            stdin = StringIOWrapper(stdin)
 
774
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
775
 
 
776
    def get_non_echoed_password(self):
 
777
        """Get password from stdin without trying to handle the echo mode"""
 
778
        password = self.stdin.readline()
 
779
        if not password:
 
780
            raise EOFError
 
781
        if password[-1] == '\n':
 
782
            password = password[:-1]
 
783
        return password
 
784
 
 
785
    def make_progress_view(self):
 
786
        return NullProgressView()
 
787
 
 
788
 
 
789
class TestCase(unittest.TestCase):
 
790
    """Base class for bzr unit tests.
 
791
 
 
792
    Tests that need access to disk resources should subclass
 
793
    TestCaseInTempDir not TestCase.
 
794
 
 
795
    Error and debug log messages are redirected from their usual
 
796
    location into a temporary file, the contents of which can be
 
797
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
798
    so that it can also capture file IO.  When the test completes this file
 
799
    is read into memory and removed from disk.
 
800
 
 
801
    There are also convenience functions to invoke bzr's command-line
 
802
    routine, and to build and check bzr trees.
 
803
 
 
804
    In addition to the usual method of overriding tearDown(), this class also
 
805
    allows subclasses to register functions into the _cleanups list, which is
 
806
    run in order as the object is torn down.  It's less likely this will be
 
807
    accidentally overlooked.
 
808
    """
 
809
 
 
810
    _active_threads = None
 
811
    _leaking_threads_tests = 0
 
812
    _first_thread_leaker_id = None
 
813
    _log_file_name = None
 
814
    _log_contents = ''
 
815
    _keep_log_file = False
 
816
    # record lsprof data when performing benchmark calls.
 
817
    _gather_lsprof_in_benchmarks = False
 
818
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
819
                     '_log_contents', '_log_file_name', '_benchtime',
 
820
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
821
 
 
822
    def __init__(self, methodName='testMethod'):
 
823
        super(TestCase, self).__init__(methodName)
 
824
        self._cleanups = []
 
825
        self._bzr_test_setUp_run = False
 
826
        self._bzr_test_tearDown_run = False
 
827
        self._directory_isolation = True
 
828
 
 
829
    def setUp(self):
 
830
        unittest.TestCase.setUp(self)
 
831
        self._bzr_test_setUp_run = True
 
832
        self._cleanEnvironment()
 
833
        self._silenceUI()
 
834
        self._startLogFile()
 
835
        self._benchcalls = []
 
836
        self._benchtime = None
 
837
        self._clear_hooks()
 
838
        self._track_transports()
 
839
        self._track_locks()
 
840
        self._clear_debug_flags()
 
841
        TestCase._active_threads = threading.activeCount()
 
842
        self.addCleanup(self._check_leaked_threads)
 
843
 
 
844
    def debug(self):
 
845
        # debug a frame up.
 
846
        import pdb
 
847
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
848
 
 
849
    def _check_leaked_threads(self):
 
850
        active = threading.activeCount()
 
851
        leaked_threads = active - TestCase._active_threads
 
852
        TestCase._active_threads = active
 
853
        # If some tests make the number of threads *decrease*, we'll consider
 
854
        # that they are just observing old threads dieing, not agressively kill
 
855
        # random threads. So we don't report these tests as leaking. The risk
 
856
        # is that we have false positives that way (the test see 2 threads
 
857
        # going away but leak one) but it seems less likely than the actual
 
858
        # false positives (the test see threads going away and does not leak).
 
859
        if leaked_threads > 0:
 
860
            TestCase._leaking_threads_tests += 1
 
861
            if TestCase._first_thread_leaker_id is None:
 
862
                TestCase._first_thread_leaker_id = self.id()
 
863
 
 
864
    def _clear_debug_flags(self):
 
865
        """Prevent externally set debug flags affecting tests.
 
866
 
 
867
        Tests that want to use debug flags can just set them in the
 
868
        debug_flags set during setup/teardown.
 
869
        """
 
870
        self._preserved_debug_flags = set(debug.debug_flags)
 
871
        if 'allow_debug' not in selftest_debug_flags:
 
872
            debug.debug_flags.clear()
 
873
        if 'disable_lock_checks' not in selftest_debug_flags:
 
874
            debug.debug_flags.add('strict_locks')
 
875
        self.addCleanup(self._restore_debug_flags)
 
876
 
 
877
    def _clear_hooks(self):
 
878
        # prevent hooks affecting tests
 
879
        self._preserved_hooks = {}
 
880
        for key, factory in hooks.known_hooks.items():
 
881
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
882
            current_hooks = hooks.known_hooks_key_to_object(key)
 
883
            self._preserved_hooks[parent] = (name, current_hooks)
 
884
        self.addCleanup(self._restoreHooks)
 
885
        for key, factory in hooks.known_hooks.items():
 
886
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
887
            setattr(parent, name, factory())
 
888
        # this hook should always be installed
 
889
        request._install_hook()
 
890
 
 
891
    def disable_directory_isolation(self):
 
892
        """Turn off directory isolation checks."""
 
893
        self._directory_isolation = False
 
894
 
 
895
    def enable_directory_isolation(self):
 
896
        """Enable directory isolation checks."""
 
897
        self._directory_isolation = True
 
898
 
 
899
    def _silenceUI(self):
 
900
        """Turn off UI for duration of test"""
 
901
        # by default the UI is off; tests can turn it on if they want it.
 
902
        saved = ui.ui_factory
 
903
        def _restore():
 
904
            ui.ui_factory = saved
 
905
        ui.ui_factory = ui.SilentUIFactory()
 
906
        self.addCleanup(_restore)
 
907
 
 
908
    def _check_locks(self):
 
909
        """Check that all lock take/release actions have been paired."""
 
910
        # We always check for mismatched locks. If a mismatch is found, we
 
911
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
912
        # case we just print a warning.
 
913
        # unhook:
 
914
        acquired_locks = [lock for action, lock in self._lock_actions
 
915
                          if action == 'acquired']
 
916
        released_locks = [lock for action, lock in self._lock_actions
 
917
                          if action == 'released']
 
918
        broken_locks = [lock for action, lock in self._lock_actions
 
919
                        if action == 'broken']
 
920
        # trivially, given the tests for lock acquistion and release, if we
 
921
        # have as many in each list, it should be ok. Some lock tests also
 
922
        # break some locks on purpose and should be taken into account by
 
923
        # considering that breaking a lock is just a dirty way of releasing it.
 
924
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
925
            message = ('Different number of acquired and '
 
926
                       'released or broken locks. (%s, %s + %s)' %
 
927
                       (acquired_locks, released_locks, broken_locks))
 
928
            if not self._lock_check_thorough:
 
929
                # Rather than fail, just warn
 
930
                print "Broken test %s: %s" % (self, message)
 
931
                return
 
932
            self.fail(message)
 
933
 
 
934
    def _track_locks(self):
 
935
        """Track lock activity during tests."""
 
936
        self._lock_actions = []
 
937
        if 'disable_lock_checks' in selftest_debug_flags:
 
938
            self._lock_check_thorough = False
 
939
        else:
 
940
            self._lock_check_thorough = True
 
941
            
 
942
        self.addCleanup(self._check_locks)
 
943
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
944
                                                self._lock_acquired, None)
 
945
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
946
                                                self._lock_released, None)
 
947
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
948
                                                self._lock_broken, None)
 
949
 
 
950
    def _lock_acquired(self, result):
 
951
        self._lock_actions.append(('acquired', result))
 
952
 
 
953
    def _lock_released(self, result):
 
954
        self._lock_actions.append(('released', result))
 
955
 
 
956
    def _lock_broken(self, result):
 
957
        self._lock_actions.append(('broken', result))
 
958
 
 
959
    def permit_dir(self, name):
 
960
        """Permit a directory to be used by this test. See permit_url."""
 
961
        name_transport = get_transport(name)
 
962
        self.permit_url(name)
 
963
        self.permit_url(name_transport.base)
 
964
 
 
965
    def permit_url(self, url):
 
966
        """Declare that url is an ok url to use in this test.
 
967
        
 
968
        Do this for memory transports, temporary test directory etc.
 
969
        
 
970
        Do not do this for the current working directory, /tmp, or any other
 
971
        preexisting non isolated url.
 
972
        """
 
973
        if not url.endswith('/'):
 
974
            url += '/'
 
975
        self._bzr_selftest_roots.append(url)
 
976
 
 
977
    def permit_source_tree_branch_repo(self):
 
978
        """Permit the source tree bzr is running from to be opened.
 
979
 
 
980
        Some code such as bzrlib.version attempts to read from the bzr branch
 
981
        that bzr is executing from (if any). This method permits that directory
 
982
        to be used in the test suite.
 
983
        """
 
984
        path = self.get_source_path()
 
985
        self.record_directory_isolation()
 
986
        try:
 
987
            try:
 
988
                workingtree.WorkingTree.open(path)
 
989
            except (errors.NotBranchError, errors.NoWorkingTree):
 
990
                return
 
991
        finally:
 
992
            self.enable_directory_isolation()
 
993
 
 
994
    def _preopen_isolate_transport(self, transport):
 
995
        """Check that all transport openings are done in the test work area."""
 
996
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
997
            # Unwrap pathfiltered transports
 
998
            transport = transport.server.backing_transport.clone(
 
999
                transport._filter('.'))
 
1000
        url = transport.base
 
1001
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1002
        # urls it is given by prepending readonly+. This is appropriate as the
 
1003
        # client shouldn't know that the server is readonly (or not readonly).
 
1004
        # We could register all servers twice, with readonly+ prepending, but
 
1005
        # that makes for a long list; this is about the same but easier to
 
1006
        # read.
 
1007
        if url.startswith('readonly+'):
 
1008
            url = url[len('readonly+'):]
 
1009
        self._preopen_isolate_url(url)
 
1010
 
 
1011
    def _preopen_isolate_url(self, url):
 
1012
        if not self._directory_isolation:
 
1013
            return
 
1014
        if self._directory_isolation == 'record':
 
1015
            self._bzr_selftest_roots.append(url)
 
1016
            return
 
1017
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1018
        # from working unless they are explicitly granted permission. We then
 
1019
        # depend on the code that sets up test transports to check that they are
 
1020
        # appropriately isolated and enable their use by calling
 
1021
        # self.permit_transport()
 
1022
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1023
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1024
                % (url, self._bzr_selftest_roots))
 
1025
 
 
1026
    def record_directory_isolation(self):
 
1027
        """Gather accessed directories to permit later access.
 
1028
        
 
1029
        This is used for tests that access the branch bzr is running from.
 
1030
        """
 
1031
        self._directory_isolation = "record"
 
1032
 
 
1033
    def start_server(self, transport_server, backing_server=None):
 
1034
        """Start transport_server for this test.
 
1035
 
 
1036
        This starts the server, registers a cleanup for it and permits the
 
1037
        server's urls to be used.
 
1038
        """
 
1039
        if backing_server is None:
 
1040
            transport_server.setUp()
 
1041
        else:
 
1042
            transport_server.setUp(backing_server)
 
1043
        self.addCleanup(transport_server.tearDown)
 
1044
        # Obtain a real transport because if the server supplies a password, it
 
1045
        # will be hidden from the base on the client side.
 
1046
        t = get_transport(transport_server.get_url())
 
1047
        # Some transport servers effectively chroot the backing transport;
 
1048
        # others like SFTPServer don't - users of the transport can walk up the
 
1049
        # transport to read the entire backing transport. This wouldn't matter
 
1050
        # except that the workdir tests are given - and that they expect the
 
1051
        # server's url to point at - is one directory under the safety net. So
 
1052
        # Branch operations into the transport will attempt to walk up one
 
1053
        # directory. Chrooting all servers would avoid this but also mean that
 
1054
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1055
        # getting the test framework to start the server with a backing server
 
1056
        # at the actual safety net directory would work too, but this then
 
1057
        # means that the self.get_url/self.get_transport methods would need
 
1058
        # to transform all their results. On balance its cleaner to handle it
 
1059
        # here, and permit a higher url when we have one of these transports.
 
1060
        if t.base.endswith('/work/'):
 
1061
            # we have safety net/test root/work
 
1062
            t = t.clone('../..')
 
1063
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1064
            # The smart server adds a path similar to work, which is traversed
 
1065
            # up from by the client. But the server is chrooted - the actual
 
1066
            # backing transport is not escaped from, and VFS requests to the
 
1067
            # root will error (because they try to escape the chroot).
 
1068
            t2 = t.clone('..')
 
1069
            while t2.base != t.base:
 
1070
                t = t2
 
1071
                t2 = t.clone('..')
 
1072
        self.permit_url(t.base)
 
1073
 
 
1074
    def _track_transports(self):
 
1075
        """Install checks for transport usage."""
 
1076
        # TestCase has no safe place it can write to.
 
1077
        self._bzr_selftest_roots = []
 
1078
        # Currently the easiest way to be sure that nothing is going on is to
 
1079
        # hook into bzr dir opening. This leaves a small window of error for
 
1080
        # transport tests, but they are well known, and we can improve on this
 
1081
        # step.
 
1082
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1083
            self._preopen_isolate_transport, "Check bzr directories are safe.")
 
1084
 
 
1085
    def _ndiff_strings(self, a, b):
 
1086
        """Return ndiff between two strings containing lines.
 
1087
 
 
1088
        A trailing newline is added if missing to make the strings
 
1089
        print properly."""
 
1090
        if b and b[-1] != '\n':
 
1091
            b += '\n'
 
1092
        if a and a[-1] != '\n':
 
1093
            a += '\n'
 
1094
        difflines = difflib.ndiff(a.splitlines(True),
 
1095
                                  b.splitlines(True),
 
1096
                                  linejunk=lambda x: False,
 
1097
                                  charjunk=lambda x: False)
 
1098
        return ''.join(difflines)
 
1099
 
 
1100
    def assertEqual(self, a, b, message=''):
 
1101
        try:
 
1102
            if a == b:
 
1103
                return
 
1104
        except UnicodeError, e:
 
1105
            # If we can't compare without getting a UnicodeError, then
 
1106
            # obviously they are different
 
1107
            mutter('UnicodeError: %s', e)
 
1108
        if message:
 
1109
            message += '\n'
 
1110
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1111
            % (message,
 
1112
               pformat(a), pformat(b)))
 
1113
 
 
1114
    assertEquals = assertEqual
 
1115
 
 
1116
    def assertEqualDiff(self, a, b, message=None):
 
1117
        """Assert two texts are equal, if not raise an exception.
 
1118
 
 
1119
        This is intended for use with multi-line strings where it can
 
1120
        be hard to find the differences by eye.
 
1121
        """
 
1122
        # TODO: perhaps override assertEquals to call this for strings?
 
1123
        if a == b:
 
1124
            return
 
1125
        if message is None:
 
1126
            message = "texts not equal:\n"
 
1127
        if a + '\n' == b:
 
1128
            message = 'first string is missing a final newline.\n'
 
1129
        if a == b + '\n':
 
1130
            message = 'second string is missing a final newline.\n'
 
1131
        raise AssertionError(message +
 
1132
                             self._ndiff_strings(a, b))
 
1133
 
 
1134
    def assertEqualMode(self, mode, mode_test):
 
1135
        self.assertEqual(mode, mode_test,
 
1136
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1137
 
 
1138
    def assertEqualStat(self, expected, actual):
 
1139
        """assert that expected and actual are the same stat result.
 
1140
 
 
1141
        :param expected: A stat result.
 
1142
        :param actual: A stat result.
 
1143
        :raises AssertionError: If the expected and actual stat values differ
 
1144
            other than by atime.
 
1145
        """
 
1146
        self.assertEqual(expected.st_size, actual.st_size)
 
1147
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
1148
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
1149
        self.assertEqual(expected.st_dev, actual.st_dev)
 
1150
        self.assertEqual(expected.st_ino, actual.st_ino)
 
1151
        self.assertEqual(expected.st_mode, actual.st_mode)
 
1152
 
 
1153
    def assertLength(self, length, obj_with_len):
 
1154
        """Assert that obj_with_len is of length length."""
 
1155
        if len(obj_with_len) != length:
 
1156
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1157
                length, len(obj_with_len), obj_with_len))
 
1158
 
 
1159
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1160
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
 
1161
        """
 
1162
        from bzrlib import trace
 
1163
        captured = []
 
1164
        orig_log_exception_quietly = trace.log_exception_quietly
 
1165
        try:
 
1166
            def capture():
 
1167
                orig_log_exception_quietly()
 
1168
                captured.append(sys.exc_info())
 
1169
            trace.log_exception_quietly = capture
 
1170
            func(*args, **kwargs)
 
1171
        finally:
 
1172
            trace.log_exception_quietly = orig_log_exception_quietly
 
1173
        self.assertLength(1, captured)
 
1174
        err = captured[0][1]
 
1175
        self.assertIsInstance(err, exception_class)
 
1176
        return err
 
1177
 
 
1178
    def assertPositive(self, val):
 
1179
        """Assert that val is greater than 0."""
 
1180
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1181
 
 
1182
    def assertNegative(self, val):
 
1183
        """Assert that val is less than 0."""
 
1184
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1185
 
 
1186
    def assertStartsWith(self, s, prefix):
 
1187
        if not s.startswith(prefix):
 
1188
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1189
 
 
1190
    def assertEndsWith(self, s, suffix):
 
1191
        """Asserts that s ends with suffix."""
 
1192
        if not s.endswith(suffix):
 
1193
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1194
 
 
1195
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1196
        """Assert that a contains something matching a regular expression."""
 
1197
        if not re.search(needle_re, haystack, flags):
 
1198
            if '\n' in haystack or len(haystack) > 60:
 
1199
                # a long string, format it in a more readable way
 
1200
                raise AssertionError(
 
1201
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1202
                        % (needle_re, haystack))
 
1203
            else:
 
1204
                raise AssertionError('pattern "%s" not found in "%s"'
 
1205
                        % (needle_re, haystack))
 
1206
 
 
1207
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1208
        """Assert that a does not match a regular expression"""
 
1209
        if re.search(needle_re, haystack, flags):
 
1210
            raise AssertionError('pattern "%s" found in "%s"'
 
1211
                    % (needle_re, haystack))
 
1212
 
 
1213
    def assertSubset(self, sublist, superlist):
 
1214
        """Assert that every entry in sublist is present in superlist."""
 
1215
        missing = set(sublist) - set(superlist)
 
1216
        if len(missing) > 0:
 
1217
            raise AssertionError("value(s) %r not present in container %r" %
 
1218
                                 (missing, superlist))
 
1219
 
 
1220
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1221
        """Fail unless excClass is raised when the iterator from func is used.
 
1222
 
 
1223
        Many functions can return generators this makes sure
 
1224
        to wrap them in a list() call to make sure the whole generator
 
1225
        is run, and that the proper exception is raised.
 
1226
        """
 
1227
        try:
 
1228
            list(func(*args, **kwargs))
 
1229
        except excClass, e:
 
1230
            return e
 
1231
        else:
 
1232
            if getattr(excClass,'__name__', None) is not None:
 
1233
                excName = excClass.__name__
 
1234
            else:
 
1235
                excName = str(excClass)
 
1236
            raise self.failureException, "%s not raised" % excName
 
1237
 
 
1238
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1239
        """Assert that a callable raises a particular exception.
 
1240
 
 
1241
        :param excClass: As for the except statement, this may be either an
 
1242
            exception class, or a tuple of classes.
 
1243
        :param callableObj: A callable, will be passed ``*args`` and
 
1244
            ``**kwargs``.
 
1245
 
 
1246
        Returns the exception so that you can examine it.
 
1247
        """
 
1248
        try:
 
1249
            callableObj(*args, **kwargs)
 
1250
        except excClass, e:
 
1251
            return e
 
1252
        else:
 
1253
            if getattr(excClass,'__name__', None) is not None:
 
1254
                excName = excClass.__name__
 
1255
            else:
 
1256
                # probably a tuple
 
1257
                excName = str(excClass)
 
1258
            raise self.failureException, "%s not raised" % excName
 
1259
 
 
1260
    def assertIs(self, left, right, message=None):
 
1261
        if not (left is right):
 
1262
            if message is not None:
 
1263
                raise AssertionError(message)
 
1264
            else:
 
1265
                raise AssertionError("%r is not %r." % (left, right))
 
1266
 
 
1267
    def assertIsNot(self, left, right, message=None):
 
1268
        if (left is right):
 
1269
            if message is not None:
 
1270
                raise AssertionError(message)
 
1271
            else:
 
1272
                raise AssertionError("%r is %r." % (left, right))
 
1273
 
 
1274
    def assertTransportMode(self, transport, path, mode):
 
1275
        """Fail if a path does not have mode "mode".
 
1276
 
 
1277
        If modes are not supported on this transport, the assertion is ignored.
 
1278
        """
 
1279
        if not transport._can_roundtrip_unix_modebits():
 
1280
            return
 
1281
        path_stat = transport.stat(path)
 
1282
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1283
        self.assertEqual(mode, actual_mode,
 
1284
                         'mode of %r incorrect (%s != %s)'
 
1285
                         % (path, oct(mode), oct(actual_mode)))
 
1286
 
 
1287
    def assertIsSameRealPath(self, path1, path2):
 
1288
        """Fail if path1 and path2 points to different files"""
 
1289
        self.assertEqual(osutils.realpath(path1),
 
1290
                         osutils.realpath(path2),
 
1291
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1292
 
 
1293
    def assertIsInstance(self, obj, kls, msg=None):
 
1294
        """Fail if obj is not an instance of kls
 
1295
        
 
1296
        :param msg: Supplementary message to show if the assertion fails.
 
1297
        """
 
1298
        if not isinstance(obj, kls):
 
1299
            m = "%r is an instance of %s rather than %s" % (
 
1300
                obj, obj.__class__, kls)
 
1301
            if msg:
 
1302
                m += ": " + msg
 
1303
            self.fail(m)
 
1304
 
 
1305
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1306
        """Invoke a test, expecting it to fail for the given reason.
 
1307
 
 
1308
        This is for assertions that ought to succeed, but currently fail.
 
1309
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1310
        about the failure you're expecting.  If a new bug is introduced,
 
1311
        AssertionError should be raised, not KnownFailure.
 
1312
 
 
1313
        Frequently, expectFailure should be followed by an opposite assertion.
 
1314
        See example below.
 
1315
 
 
1316
        Intended to be used with a callable that raises AssertionError as the
 
1317
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1318
 
 
1319
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1320
        test succeeds.
 
1321
 
 
1322
        example usage::
 
1323
 
 
1324
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1325
                             dynamic_val)
 
1326
          self.assertEqual(42, dynamic_val)
 
1327
 
 
1328
          This means that a dynamic_val of 54 will cause the test to raise
 
1329
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1330
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1331
          than 54 or 42 will cause an AssertionError.
 
1332
        """
 
1333
        try:
 
1334
            assertion(*args, **kwargs)
 
1335
        except AssertionError:
 
1336
            raise KnownFailure(reason)
 
1337
        else:
 
1338
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1339
 
 
1340
    def assertFileEqual(self, content, path):
 
1341
        """Fail if path does not contain 'content'."""
 
1342
        self.failUnlessExists(path)
 
1343
        f = file(path, 'rb')
 
1344
        try:
 
1345
            s = f.read()
 
1346
        finally:
 
1347
            f.close()
 
1348
        self.assertEqualDiff(content, s)
 
1349
 
 
1350
    def failUnlessExists(self, path):
 
1351
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1352
        if not isinstance(path, basestring):
 
1353
            for p in path:
 
1354
                self.failUnlessExists(p)
 
1355
        else:
 
1356
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1357
 
 
1358
    def failIfExists(self, path):
 
1359
        """Fail if path or paths, which may be abs or relative, exist."""
 
1360
        if not isinstance(path, basestring):
 
1361
            for p in path:
 
1362
                self.failIfExists(p)
 
1363
        else:
 
1364
            self.failIf(osutils.lexists(path),path+" exists")
 
1365
 
 
1366
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1367
        """A helper for callDeprecated and applyDeprecated.
 
1368
 
 
1369
        :param a_callable: A callable to call.
 
1370
        :param args: The positional arguments for the callable
 
1371
        :param kwargs: The keyword arguments for the callable
 
1372
        :return: A tuple (warnings, result). result is the result of calling
 
1373
            a_callable(``*args``, ``**kwargs``).
 
1374
        """
 
1375
        local_warnings = []
 
1376
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1377
            # we've hooked into a deprecation specific callpath,
 
1378
            # only deprecations should getting sent via it.
 
1379
            self.assertEqual(cls, DeprecationWarning)
 
1380
            local_warnings.append(msg)
 
1381
        original_warning_method = symbol_versioning.warn
 
1382
        symbol_versioning.set_warning_method(capture_warnings)
 
1383
        try:
 
1384
            result = a_callable(*args, **kwargs)
 
1385
        finally:
 
1386
            symbol_versioning.set_warning_method(original_warning_method)
 
1387
        return (local_warnings, result)
 
1388
 
 
1389
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1390
        """Call a deprecated callable without warning the user.
 
1391
 
 
1392
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1393
        not other callers that go direct to the warning module.
 
1394
 
 
1395
        To test that a deprecated method raises an error, do something like
 
1396
        this::
 
1397
 
 
1398
            self.assertRaises(errors.ReservedId,
 
1399
                self.applyDeprecated,
 
1400
                deprecated_in((1, 5, 0)),
 
1401
                br.append_revision,
 
1402
                'current:')
 
1403
 
 
1404
        :param deprecation_format: The deprecation format that the callable
 
1405
            should have been deprecated with. This is the same type as the
 
1406
            parameter to deprecated_method/deprecated_function. If the
 
1407
            callable is not deprecated with this format, an assertion error
 
1408
            will be raised.
 
1409
        :param a_callable: A callable to call. This may be a bound method or
 
1410
            a regular function. It will be called with ``*args`` and
 
1411
            ``**kwargs``.
 
1412
        :param args: The positional arguments for the callable
 
1413
        :param kwargs: The keyword arguments for the callable
 
1414
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1415
        """
 
1416
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1417
            *args, **kwargs)
 
1418
        expected_first_warning = symbol_versioning.deprecation_string(
 
1419
            a_callable, deprecation_format)
 
1420
        if len(call_warnings) == 0:
 
1421
            self.fail("No deprecation warning generated by call to %s" %
 
1422
                a_callable)
 
1423
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1424
        return result
 
1425
 
 
1426
    def callCatchWarnings(self, fn, *args, **kw):
 
1427
        """Call a callable that raises python warnings.
 
1428
 
 
1429
        The caller's responsible for examining the returned warnings.
 
1430
 
 
1431
        If the callable raises an exception, the exception is not
 
1432
        caught and propagates up to the caller.  In that case, the list
 
1433
        of warnings is not available.
 
1434
 
 
1435
        :returns: ([warning_object, ...], fn_result)
 
1436
        """
 
1437
        # XXX: This is not perfect, because it completely overrides the
 
1438
        # warnings filters, and some code may depend on suppressing particular
 
1439
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1440
        # though.  -- Andrew, 20071062
 
1441
        wlist = []
 
1442
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1443
            # despite the name, 'message' is normally(?) a Warning subclass
 
1444
            # instance
 
1445
            wlist.append(message)
 
1446
        saved_showwarning = warnings.showwarning
 
1447
        saved_filters = warnings.filters
 
1448
        try:
 
1449
            warnings.showwarning = _catcher
 
1450
            warnings.filters = []
 
1451
            result = fn(*args, **kw)
 
1452
        finally:
 
1453
            warnings.showwarning = saved_showwarning
 
1454
            warnings.filters = saved_filters
 
1455
        return wlist, result
 
1456
 
 
1457
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1458
        """Assert that a callable is deprecated in a particular way.
 
1459
 
 
1460
        This is a very precise test for unusual requirements. The
 
1461
        applyDeprecated helper function is probably more suited for most tests
 
1462
        as it allows you to simply specify the deprecation format being used
 
1463
        and will ensure that that is issued for the function being called.
 
1464
 
 
1465
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1466
        not other callers that go direct to the warning module.  To catch
 
1467
        general warnings, use callCatchWarnings.
 
1468
 
 
1469
        :param expected: a list of the deprecation warnings expected, in order
 
1470
        :param callable: The callable to call
 
1471
        :param args: The positional arguments for the callable
 
1472
        :param kwargs: The keyword arguments for the callable
 
1473
        """
 
1474
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1475
            *args, **kwargs)
 
1476
        self.assertEqual(expected, call_warnings)
 
1477
        return result
 
1478
 
 
1479
    def _startLogFile(self):
 
1480
        """Send bzr and test log messages to a temporary file.
 
1481
 
 
1482
        The file is removed as the test is torn down.
 
1483
        """
 
1484
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1485
        self._log_file = os.fdopen(fileno, 'w+')
 
1486
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1487
        self._log_file_name = name
 
1488
        self.addCleanup(self._finishLogFile)
 
1489
 
 
1490
    def _finishLogFile(self):
 
1491
        """Finished with the log file.
 
1492
 
 
1493
        Close the file and delete it, unless setKeepLogfile was called.
 
1494
        """
 
1495
        if self._log_file is None:
 
1496
            return
 
1497
        bzrlib.trace.pop_log_file(self._log_memento)
 
1498
        self._log_file.close()
 
1499
        self._log_file = None
 
1500
        if not self._keep_log_file:
 
1501
            os.remove(self._log_file_name)
 
1502
            self._log_file_name = None
 
1503
 
 
1504
    def setKeepLogfile(self):
 
1505
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1506
        self._keep_log_file = True
 
1507
 
 
1508
    def thisFailsStrictLockCheck(self):
 
1509
        """It is known that this test would fail with -Dstrict_locks.
 
1510
 
 
1511
        By default, all tests are run with strict lock checking unless
 
1512
        -Edisable_lock_checks is supplied. However there are some tests which
 
1513
        we know fail strict locks at this point that have not been fixed.
 
1514
        They should call this function to disable the strict checking.
 
1515
 
 
1516
        This should be used sparingly, it is much better to fix the locking
 
1517
        issues rather than papering over the problem by calling this function.
 
1518
        """
 
1519
        debug.debug_flags.discard('strict_locks')
 
1520
 
 
1521
    def addCleanup(self, callable, *args, **kwargs):
 
1522
        """Arrange to run a callable when this case is torn down.
 
1523
 
 
1524
        Callables are run in the reverse of the order they are registered,
 
1525
        ie last-in first-out.
 
1526
        """
 
1527
        self._cleanups.append((callable, args, kwargs))
 
1528
 
 
1529
    def _cleanEnvironment(self):
 
1530
        new_env = {
 
1531
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1532
            'HOME': os.getcwd(),
 
1533
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1534
            # tests do check our impls match APPDATA
 
1535
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1536
            'VISUAL': None,
 
1537
            'EDITOR': None,
 
1538
            'BZR_EMAIL': None,
 
1539
            'BZREMAIL': None, # may still be present in the environment
 
1540
            'EMAIL': None,
 
1541
            'BZR_PROGRESS_BAR': None,
 
1542
            'BZR_LOG': None,
 
1543
            'BZR_PLUGIN_PATH': None,
 
1544
            # Make sure that any text ui tests are consistent regardless of
 
1545
            # the environment the test case is run in; you may want tests that
 
1546
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1547
            # going to a pipe or a StringIO.
 
1548
            'TERM': 'dumb',
 
1549
            'LINES': '25',
 
1550
            'COLUMNS': '80',
 
1551
            # SSH Agent
 
1552
            'SSH_AUTH_SOCK': None,
 
1553
            # Proxies
 
1554
            'http_proxy': None,
 
1555
            'HTTP_PROXY': None,
 
1556
            'https_proxy': None,
 
1557
            'HTTPS_PROXY': None,
 
1558
            'no_proxy': None,
 
1559
            'NO_PROXY': None,
 
1560
            'all_proxy': None,
 
1561
            'ALL_PROXY': None,
 
1562
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1563
            # least. If you do (care), please update this comment
 
1564
            # -- vila 20080401
 
1565
            'ftp_proxy': None,
 
1566
            'FTP_PROXY': None,
 
1567
            'BZR_REMOTE_PATH': None,
 
1568
        }
 
1569
        self.__old_env = {}
 
1570
        self.addCleanup(self._restoreEnvironment)
 
1571
        for name, value in new_env.iteritems():
 
1572
            self._captureVar(name, value)
 
1573
 
 
1574
    def _captureVar(self, name, newvalue):
 
1575
        """Set an environment variable, and reset it when finished."""
 
1576
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1577
 
 
1578
    def _restore_debug_flags(self):
 
1579
        debug.debug_flags.clear()
 
1580
        debug.debug_flags.update(self._preserved_debug_flags)
 
1581
 
 
1582
    def _restoreEnvironment(self):
 
1583
        for name, value in self.__old_env.iteritems():
 
1584
            osutils.set_or_unset_env(name, value)
 
1585
 
 
1586
    def _restoreHooks(self):
 
1587
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1588
            setattr(klass, name, hooks)
 
1589
 
 
1590
    def knownFailure(self, reason):
 
1591
        """This test has failed for some known reason."""
 
1592
        raise KnownFailure(reason)
 
1593
 
 
1594
    def _do_skip(self, result, reason):
 
1595
        addSkip = getattr(result, 'addSkip', None)
 
1596
        if not callable(addSkip):
 
1597
            result.addError(self, sys.exc_info())
 
1598
        else:
 
1599
            addSkip(self, reason)
 
1600
 
 
1601
    def run(self, result=None):
 
1602
        if result is None: result = self.defaultTestResult()
 
1603
        for feature in getattr(self, '_test_needs_features', []):
 
1604
            if not feature.available():
 
1605
                result.startTest(self)
 
1606
                if getattr(result, 'addNotSupported', None):
 
1607
                    result.addNotSupported(self, feature)
 
1608
                else:
 
1609
                    result.addSuccess(self)
 
1610
                result.stopTest(self)
 
1611
                return result
 
1612
        try:
 
1613
            try:
 
1614
                result.startTest(self)
 
1615
                absent_attr = object()
 
1616
                # Python 2.5
 
1617
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1618
                if method_name is absent_attr:
 
1619
                    # Python 2.4
 
1620
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1621
                testMethod = getattr(self, method_name)
 
1622
                try:
 
1623
                    try:
 
1624
                        self.setUp()
 
1625
                        if not self._bzr_test_setUp_run:
 
1626
                            self.fail(
 
1627
                                "test setUp did not invoke "
 
1628
                                "bzrlib.tests.TestCase's setUp")
 
1629
                    except KeyboardInterrupt:
 
1630
                        self._runCleanups()
 
1631
                        raise
 
1632
                    except TestSkipped, e:
 
1633
                        self._do_skip(result, e.args[0])
 
1634
                        self.tearDown()
 
1635
                        return result
 
1636
                    except:
 
1637
                        result.addError(self, sys.exc_info())
 
1638
                        self._runCleanups()
 
1639
                        return result
 
1640
 
 
1641
                    ok = False
 
1642
                    try:
 
1643
                        testMethod()
 
1644
                        ok = True
 
1645
                    except self.failureException:
 
1646
                        result.addFailure(self, sys.exc_info())
 
1647
                    except TestSkipped, e:
 
1648
                        if not e.args:
 
1649
                            reason = "No reason given."
 
1650
                        else:
 
1651
                            reason = e.args[0]
 
1652
                        self._do_skip(result, reason)
 
1653
                    except KeyboardInterrupt:
 
1654
                        self._runCleanups()
 
1655
                        raise
 
1656
                    except:
 
1657
                        result.addError(self, sys.exc_info())
 
1658
 
 
1659
                    try:
 
1660
                        self.tearDown()
 
1661
                        if not self._bzr_test_tearDown_run:
 
1662
                            self.fail(
 
1663
                                "test tearDown did not invoke "
 
1664
                                "bzrlib.tests.TestCase's tearDown")
 
1665
                    except KeyboardInterrupt:
 
1666
                        self._runCleanups()
 
1667
                        raise
 
1668
                    except:
 
1669
                        result.addError(self, sys.exc_info())
 
1670
                        self._runCleanups()
 
1671
                        ok = False
 
1672
                    if ok: result.addSuccess(self)
 
1673
                finally:
 
1674
                    result.stopTest(self)
 
1675
                return result
 
1676
            except TestNotApplicable:
 
1677
                # Not moved from the result [yet].
 
1678
                self._runCleanups()
 
1679
                raise
 
1680
            except KeyboardInterrupt:
 
1681
                self._runCleanups()
 
1682
                raise
 
1683
        finally:
 
1684
            saved_attrs = {}
 
1685
            for attr_name in self.attrs_to_keep:
 
1686
                if attr_name in self.__dict__:
 
1687
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1688
            self.__dict__ = saved_attrs
 
1689
 
 
1690
    def tearDown(self):
 
1691
        self._runCleanups()
 
1692
        self._log_contents = ''
 
1693
        self._bzr_test_tearDown_run = True
 
1694
        unittest.TestCase.tearDown(self)
 
1695
 
 
1696
    def time(self, callable, *args, **kwargs):
 
1697
        """Run callable and accrue the time it takes to the benchmark time.
 
1698
 
 
1699
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1700
        this will cause lsprofile statistics to be gathered and stored in
 
1701
        self._benchcalls.
 
1702
        """
 
1703
        if self._benchtime is None:
 
1704
            self._benchtime = 0
 
1705
        start = time.time()
 
1706
        try:
 
1707
            if not self._gather_lsprof_in_benchmarks:
 
1708
                return callable(*args, **kwargs)
 
1709
            else:
 
1710
                # record this benchmark
 
1711
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1712
                stats.sort()
 
1713
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1714
                return ret
 
1715
        finally:
 
1716
            self._benchtime += time.time() - start
 
1717
 
 
1718
    def _runCleanups(self):
 
1719
        """Run registered cleanup functions.
 
1720
 
 
1721
        This should only be called from TestCase.tearDown.
 
1722
        """
 
1723
        # TODO: Perhaps this should keep running cleanups even if
 
1724
        # one of them fails?
 
1725
 
 
1726
        # Actually pop the cleanups from the list so tearDown running
 
1727
        # twice is safe (this happens for skipped tests).
 
1728
        while self._cleanups:
 
1729
            cleanup, args, kwargs = self._cleanups.pop()
 
1730
            cleanup(*args, **kwargs)
 
1731
 
 
1732
    def log(self, *args):
 
1733
        mutter(*args)
 
1734
 
 
1735
    def _get_log(self, keep_log_file=False):
 
1736
        """Get the log from bzrlib.trace calls from this test.
 
1737
 
 
1738
        :param keep_log_file: When True, if the log is still a file on disk
 
1739
            leave it as a file on disk. When False, if the log is still a file
 
1740
            on disk, the log file is deleted and the log preserved as
 
1741
            self._log_contents.
 
1742
        :return: A string containing the log.
 
1743
        """
 
1744
        # flush the log file, to get all content
 
1745
        import bzrlib.trace
 
1746
        if bzrlib.trace._trace_file:
 
1747
            bzrlib.trace._trace_file.flush()
 
1748
        if self._log_contents:
 
1749
            # XXX: this can hardly contain the content flushed above --vila
 
1750
            # 20080128
 
1751
            return self._log_contents
 
1752
        if self._log_file_name is not None:
 
1753
            logfile = open(self._log_file_name)
 
1754
            try:
 
1755
                log_contents = logfile.read()
 
1756
            finally:
 
1757
                logfile.close()
 
1758
            if not keep_log_file:
 
1759
                self._log_contents = log_contents
 
1760
                try:
 
1761
                    os.remove(self._log_file_name)
 
1762
                except OSError, e:
 
1763
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1764
                        sys.stderr.write(('Unable to delete log file '
 
1765
                                             ' %r\n' % self._log_file_name))
 
1766
                    else:
 
1767
                        raise
 
1768
            return log_contents
 
1769
        else:
 
1770
            return "DELETED log file to reduce memory footprint"
 
1771
 
 
1772
    def requireFeature(self, feature):
 
1773
        """This test requires a specific feature is available.
 
1774
 
 
1775
        :raises UnavailableFeature: When feature is not available.
 
1776
        """
 
1777
        if not feature.available():
 
1778
            raise UnavailableFeature(feature)
 
1779
 
 
1780
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1781
            working_dir):
 
1782
        """Run bazaar command line, splitting up a string command line."""
 
1783
        if isinstance(args, basestring):
 
1784
            # shlex don't understand unicode strings,
 
1785
            # so args should be plain string (bialix 20070906)
 
1786
            args = list(shlex.split(str(args)))
 
1787
        return self._run_bzr_core(args, retcode=retcode,
 
1788
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1789
                )
 
1790
 
 
1791
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1792
            working_dir):
 
1793
        if encoding is None:
 
1794
            encoding = osutils.get_user_encoding()
 
1795
        stdout = StringIOWrapper()
 
1796
        stderr = StringIOWrapper()
 
1797
        stdout.encoding = encoding
 
1798
        stderr.encoding = encoding
 
1799
 
 
1800
        self.log('run bzr: %r', args)
 
1801
        # FIXME: don't call into logging here
 
1802
        handler = logging.StreamHandler(stderr)
 
1803
        handler.setLevel(logging.INFO)
 
1804
        logger = logging.getLogger('')
 
1805
        logger.addHandler(handler)
 
1806
        old_ui_factory = ui.ui_factory
 
1807
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1808
 
 
1809
        cwd = None
 
1810
        if working_dir is not None:
 
1811
            cwd = osutils.getcwd()
 
1812
            os.chdir(working_dir)
 
1813
 
 
1814
        try:
 
1815
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1816
                stdout, stderr,
 
1817
                bzrlib.commands.run_bzr_catch_user_errors,
 
1818
                args)
 
1819
        finally:
 
1820
            logger.removeHandler(handler)
 
1821
            ui.ui_factory = old_ui_factory
 
1822
            if cwd is not None:
 
1823
                os.chdir(cwd)
 
1824
 
 
1825
        out = stdout.getvalue()
 
1826
        err = stderr.getvalue()
 
1827
        if out:
 
1828
            self.log('output:\n%r', out)
 
1829
        if err:
 
1830
            self.log('errors:\n%r', err)
 
1831
        if retcode is not None:
 
1832
            self.assertEquals(retcode, result,
 
1833
                              message='Unexpected return code')
 
1834
        return result, out, err
 
1835
 
 
1836
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1837
                working_dir=None, error_regexes=[], output_encoding=None):
 
1838
        """Invoke bzr, as if it were run from the command line.
 
1839
 
 
1840
        The argument list should not include the bzr program name - the
 
1841
        first argument is normally the bzr command.  Arguments may be
 
1842
        passed in three ways:
 
1843
 
 
1844
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1845
        when the command contains whitespace or metacharacters, or
 
1846
        is built up at run time.
 
1847
 
 
1848
        2- A single string, eg "add a".  This is the most convenient
 
1849
        for hardcoded commands.
 
1850
 
 
1851
        This runs bzr through the interface that catches and reports
 
1852
        errors, and with logging set to something approximating the
 
1853
        default, so that error reporting can be checked.
 
1854
 
 
1855
        This should be the main method for tests that want to exercise the
 
1856
        overall behavior of the bzr application (rather than a unit test
 
1857
        or a functional test of the library.)
 
1858
 
 
1859
        This sends the stdout/stderr results into the test's log,
 
1860
        where it may be useful for debugging.  See also run_captured.
 
1861
 
 
1862
        :keyword stdin: A string to be used as stdin for the command.
 
1863
        :keyword retcode: The status code the command should return;
 
1864
            default 0.
 
1865
        :keyword working_dir: The directory to run the command in
 
1866
        :keyword error_regexes: A list of expected error messages.  If
 
1867
            specified they must be seen in the error output of the command.
 
1868
        """
 
1869
        retcode, out, err = self._run_bzr_autosplit(
 
1870
            args=args,
 
1871
            retcode=retcode,
 
1872
            encoding=encoding,
 
1873
            stdin=stdin,
 
1874
            working_dir=working_dir,
 
1875
            )
 
1876
        self.assertIsInstance(error_regexes, (list, tuple))
 
1877
        for regex in error_regexes:
 
1878
            self.assertContainsRe(err, regex)
 
1879
        return out, err
 
1880
 
 
1881
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1882
        """Run bzr, and check that stderr contains the supplied regexes
 
1883
 
 
1884
        :param error_regexes: Sequence of regular expressions which
 
1885
            must each be found in the error output. The relative ordering
 
1886
            is not enforced.
 
1887
        :param args: command-line arguments for bzr
 
1888
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1889
            This function changes the default value of retcode to be 3,
 
1890
            since in most cases this is run when you expect bzr to fail.
 
1891
 
 
1892
        :return: (out, err) The actual output of running the command (in case
 
1893
            you want to do more inspection)
 
1894
 
 
1895
        Examples of use::
 
1896
 
 
1897
            # Make sure that commit is failing because there is nothing to do
 
1898
            self.run_bzr_error(['no changes to commit'],
 
1899
                               ['commit', '-m', 'my commit comment'])
 
1900
            # Make sure --strict is handling an unknown file, rather than
 
1901
            # giving us the 'nothing to do' error
 
1902
            self.build_tree(['unknown'])
 
1903
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1904
                               ['commit', --strict', '-m', 'my commit comment'])
 
1905
        """
 
1906
        kwargs.setdefault('retcode', 3)
 
1907
        kwargs['error_regexes'] = error_regexes
 
1908
        out, err = self.run_bzr(*args, **kwargs)
 
1909
        return out, err
 
1910
 
 
1911
    def run_bzr_subprocess(self, *args, **kwargs):
 
1912
        """Run bzr in a subprocess for testing.
 
1913
 
 
1914
        This starts a new Python interpreter and runs bzr in there.
 
1915
        This should only be used for tests that have a justifiable need for
 
1916
        this isolation: e.g. they are testing startup time, or signal
 
1917
        handling, or early startup code, etc.  Subprocess code can't be
 
1918
        profiled or debugged so easily.
 
1919
 
 
1920
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1921
            None is supplied, the status code is not checked.
 
1922
        :keyword env_changes: A dictionary which lists changes to environment
 
1923
            variables. A value of None will unset the env variable.
 
1924
            The values must be strings. The change will only occur in the
 
1925
            child, so you don't need to fix the environment after running.
 
1926
        :keyword universal_newlines: Convert CRLF => LF
 
1927
        :keyword allow_plugins: By default the subprocess is run with
 
1928
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1929
            for system-wide plugins to create unexpected output on stderr,
 
1930
            which can cause unnecessary test failures.
 
1931
        """
 
1932
        env_changes = kwargs.get('env_changes', {})
 
1933
        working_dir = kwargs.get('working_dir', None)
 
1934
        allow_plugins = kwargs.get('allow_plugins', False)
 
1935
        if len(args) == 1:
 
1936
            if isinstance(args[0], list):
 
1937
                args = args[0]
 
1938
            elif isinstance(args[0], basestring):
 
1939
                args = list(shlex.split(args[0]))
 
1940
        else:
 
1941
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1942
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1943
                                            working_dir=working_dir,
 
1944
                                            allow_plugins=allow_plugins)
 
1945
        # We distinguish between retcode=None and retcode not passed.
 
1946
        supplied_retcode = kwargs.get('retcode', 0)
 
1947
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1948
            universal_newlines=kwargs.get('universal_newlines', False),
 
1949
            process_args=args)
 
1950
 
 
1951
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1952
                             skip_if_plan_to_signal=False,
 
1953
                             working_dir=None,
 
1954
                             allow_plugins=False):
 
1955
        """Start bzr in a subprocess for testing.
 
1956
 
 
1957
        This starts a new Python interpreter and runs bzr in there.
 
1958
        This should only be used for tests that have a justifiable need for
 
1959
        this isolation: e.g. they are testing startup time, or signal
 
1960
        handling, or early startup code, etc.  Subprocess code can't be
 
1961
        profiled or debugged so easily.
 
1962
 
 
1963
        :param process_args: a list of arguments to pass to the bzr executable,
 
1964
            for example ``['--version']``.
 
1965
        :param env_changes: A dictionary which lists changes to environment
 
1966
            variables. A value of None will unset the env variable.
 
1967
            The values must be strings. The change will only occur in the
 
1968
            child, so you don't need to fix the environment after running.
 
1969
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1970
            is not available.
 
1971
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1972
 
 
1973
        :returns: Popen object for the started process.
 
1974
        """
 
1975
        if skip_if_plan_to_signal:
 
1976
            if not getattr(os, 'kill', None):
 
1977
                raise TestSkipped("os.kill not available.")
 
1978
 
 
1979
        if env_changes is None:
 
1980
            env_changes = {}
 
1981
        old_env = {}
 
1982
 
 
1983
        def cleanup_environment():
 
1984
            for env_var, value in env_changes.iteritems():
 
1985
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1986
 
 
1987
        def restore_environment():
 
1988
            for env_var, value in old_env.iteritems():
 
1989
                osutils.set_or_unset_env(env_var, value)
 
1990
 
 
1991
        bzr_path = self.get_bzr_path()
 
1992
 
 
1993
        cwd = None
 
1994
        if working_dir is not None:
 
1995
            cwd = osutils.getcwd()
 
1996
            os.chdir(working_dir)
 
1997
 
 
1998
        try:
 
1999
            # win32 subprocess doesn't support preexec_fn
 
2000
            # so we will avoid using it on all platforms, just to
 
2001
            # make sure the code path is used, and we don't break on win32
 
2002
            cleanup_environment()
 
2003
            command = [sys.executable]
 
2004
            # frozen executables don't need the path to bzr
 
2005
            if getattr(sys, "frozen", None) is None:
 
2006
                command.append(bzr_path)
 
2007
            if not allow_plugins:
 
2008
                command.append('--no-plugins')
 
2009
            command.extend(process_args)
 
2010
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
2011
        finally:
 
2012
            restore_environment()
 
2013
            if cwd is not None:
 
2014
                os.chdir(cwd)
 
2015
 
 
2016
        return process
 
2017
 
 
2018
    def _popen(self, *args, **kwargs):
 
2019
        """Place a call to Popen.
 
2020
 
 
2021
        Allows tests to override this method to intercept the calls made to
 
2022
        Popen for introspection.
 
2023
        """
 
2024
        return Popen(*args, **kwargs)
 
2025
 
 
2026
    def get_source_path(self):
 
2027
        """Return the path of the directory containing bzrlib."""
 
2028
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2029
 
 
2030
    def get_bzr_path(self):
 
2031
        """Return the path of the 'bzr' executable for this test suite."""
 
2032
        bzr_path = self.get_source_path()+'/bzr'
 
2033
        if not os.path.isfile(bzr_path):
 
2034
            # We are probably installed. Assume sys.argv is the right file
 
2035
            bzr_path = sys.argv[0]
 
2036
        return bzr_path
 
2037
 
 
2038
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2039
                              universal_newlines=False, process_args=None):
 
2040
        """Finish the execution of process.
 
2041
 
 
2042
        :param process: the Popen object returned from start_bzr_subprocess.
 
2043
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2044
            None is supplied, the status code is not checked.
 
2045
        :param send_signal: an optional signal to send to the process.
 
2046
        :param universal_newlines: Convert CRLF => LF
 
2047
        :returns: (stdout, stderr)
 
2048
        """
 
2049
        if send_signal is not None:
 
2050
            os.kill(process.pid, send_signal)
 
2051
        out, err = process.communicate()
 
2052
 
 
2053
        if universal_newlines:
 
2054
            out = out.replace('\r\n', '\n')
 
2055
            err = err.replace('\r\n', '\n')
 
2056
 
 
2057
        if retcode is not None and retcode != process.returncode:
 
2058
            if process_args is None:
 
2059
                process_args = "(unknown args)"
 
2060
            mutter('Output of bzr %s:\n%s', process_args, out)
 
2061
            mutter('Error for bzr %s:\n%s', process_args, err)
 
2062
            self.fail('Command bzr %s failed with retcode %s != %s'
 
2063
                      % (process_args, retcode, process.returncode))
 
2064
        return [out, err]
 
2065
 
 
2066
    def check_inventory_shape(self, inv, shape):
 
2067
        """Compare an inventory to a list of expected names.
 
2068
 
 
2069
        Fail if they are not precisely equal.
 
2070
        """
 
2071
        extras = []
 
2072
        shape = list(shape)             # copy
 
2073
        for path, ie in inv.entries():
 
2074
            name = path.replace('\\', '/')
 
2075
            if ie.kind == 'directory':
 
2076
                name = name + '/'
 
2077
            if name in shape:
 
2078
                shape.remove(name)
 
2079
            else:
 
2080
                extras.append(name)
 
2081
        if shape:
 
2082
            self.fail("expected paths not found in inventory: %r" % shape)
 
2083
        if extras:
 
2084
            self.fail("unexpected paths found in inventory: %r" % extras)
 
2085
 
 
2086
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2087
                         a_callable=None, *args, **kwargs):
 
2088
        """Call callable with redirected std io pipes.
 
2089
 
 
2090
        Returns the return code."""
 
2091
        if not callable(a_callable):
 
2092
            raise ValueError("a_callable must be callable.")
 
2093
        if stdin is None:
 
2094
            stdin = StringIO("")
 
2095
        if stdout is None:
 
2096
            if getattr(self, "_log_file", None) is not None:
 
2097
                stdout = self._log_file
 
2098
            else:
 
2099
                stdout = StringIO()
 
2100
        if stderr is None:
 
2101
            if getattr(self, "_log_file", None is not None):
 
2102
                stderr = self._log_file
 
2103
            else:
 
2104
                stderr = StringIO()
 
2105
        real_stdin = sys.stdin
 
2106
        real_stdout = sys.stdout
 
2107
        real_stderr = sys.stderr
 
2108
        try:
 
2109
            sys.stdout = stdout
 
2110
            sys.stderr = stderr
 
2111
            sys.stdin = stdin
 
2112
            return a_callable(*args, **kwargs)
 
2113
        finally:
 
2114
            sys.stdout = real_stdout
 
2115
            sys.stderr = real_stderr
 
2116
            sys.stdin = real_stdin
 
2117
 
 
2118
    def reduceLockdirTimeout(self):
 
2119
        """Reduce the default lock timeout for the duration of the test, so that
 
2120
        if LockContention occurs during a test, it does so quickly.
 
2121
 
 
2122
        Tests that expect to provoke LockContention errors should call this.
 
2123
        """
 
2124
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
2125
        def resetTimeout():
 
2126
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
2127
        self.addCleanup(resetTimeout)
 
2128
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
2129
 
 
2130
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2131
        """Return a StringIOWrapper instance, that will encode Unicode
 
2132
        input to UTF-8.
 
2133
        """
 
2134
        if encoding_type is None:
 
2135
            encoding_type = 'strict'
 
2136
        sio = StringIO()
 
2137
        output_encoding = 'utf-8'
 
2138
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
2139
        sio.encoding = output_encoding
 
2140
        return sio
 
2141
 
 
2142
    def disable_verb(self, verb):
 
2143
        """Disable a smart server verb for one test."""
 
2144
        from bzrlib.smart import request
 
2145
        request_handlers = request.request_handlers
 
2146
        orig_method = request_handlers.get(verb)
 
2147
        request_handlers.remove(verb)
 
2148
        def restoreVerb():
 
2149
            request_handlers.register(verb, orig_method)
 
2150
        self.addCleanup(restoreVerb)
 
2151
 
 
2152
 
 
2153
class CapturedCall(object):
 
2154
    """A helper for capturing smart server calls for easy debug analysis."""
 
2155
 
 
2156
    def __init__(self, params, prefix_length):
 
2157
        """Capture the call with params and skip prefix_length stack frames."""
 
2158
        self.call = params
 
2159
        import traceback
 
2160
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2161
        # client frames. Beyond this we could get more clever, but this is good
 
2162
        # enough for now.
 
2163
        stack = traceback.extract_stack()[prefix_length:-5]
 
2164
        self.stack = ''.join(traceback.format_list(stack))
 
2165
 
 
2166
    def __str__(self):
 
2167
        return self.call.method
 
2168
 
 
2169
    def __repr__(self):
 
2170
        return self.call.method
 
2171
 
 
2172
    def stack(self):
 
2173
        return self.stack
 
2174
 
 
2175
 
 
2176
class TestCaseWithMemoryTransport(TestCase):
 
2177
    """Common test class for tests that do not need disk resources.
 
2178
 
 
2179
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2180
 
 
2181
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2182
 
 
2183
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2184
    a directory which does not exist. This serves to help ensure test isolation
 
2185
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2186
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2187
    file defaults for the transport in tests, nor does it obey the command line
 
2188
    override, so tests that accidentally write to the common directory should
 
2189
    be rare.
 
2190
 
 
2191
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2192
    a .bzr directory that stops us ascending higher into the filesystem.
 
2193
    """
 
2194
 
 
2195
    TEST_ROOT = None
 
2196
    _TEST_NAME = 'test'
 
2197
 
 
2198
    def __init__(self, methodName='runTest'):
 
2199
        # allow test parameterization after test construction and before test
 
2200
        # execution. Variables that the parameterizer sets need to be
 
2201
        # ones that are not set by setUp, or setUp will trash them.
 
2202
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2203
        self.vfs_transport_factory = default_transport
 
2204
        self.transport_server = None
 
2205
        self.transport_readonly_server = None
 
2206
        self.__vfs_server = None
 
2207
 
 
2208
    def get_transport(self, relpath=None):
 
2209
        """Return a writeable transport.
 
2210
 
 
2211
        This transport is for the test scratch space relative to
 
2212
        "self._test_root"
 
2213
 
 
2214
        :param relpath: a path relative to the base url.
 
2215
        """
 
2216
        t = get_transport(self.get_url(relpath))
 
2217
        self.assertFalse(t.is_readonly())
 
2218
        return t
 
2219
 
 
2220
    def get_readonly_transport(self, relpath=None):
 
2221
        """Return a readonly transport for the test scratch space
 
2222
 
 
2223
        This can be used to test that operations which should only need
 
2224
        readonly access in fact do not try to write.
 
2225
 
 
2226
        :param relpath: a path relative to the base url.
 
2227
        """
 
2228
        t = get_transport(self.get_readonly_url(relpath))
 
2229
        self.assertTrue(t.is_readonly())
 
2230
        return t
 
2231
 
 
2232
    def create_transport_readonly_server(self):
 
2233
        """Create a transport server from class defined at init.
 
2234
 
 
2235
        This is mostly a hook for daughter classes.
 
2236
        """
 
2237
        return self.transport_readonly_server()
 
2238
 
 
2239
    def get_readonly_server(self):
 
2240
        """Get the server instance for the readonly transport
 
2241
 
 
2242
        This is useful for some tests with specific servers to do diagnostics.
 
2243
        """
 
2244
        if self.__readonly_server is None:
 
2245
            if self.transport_readonly_server is None:
 
2246
                # readonly decorator requested
 
2247
                self.__readonly_server = ReadonlyServer()
 
2248
            else:
 
2249
                # explicit readonly transport.
 
2250
                self.__readonly_server = self.create_transport_readonly_server()
 
2251
            self.start_server(self.__readonly_server,
 
2252
                self.get_vfs_only_server())
 
2253
        return self.__readonly_server
 
2254
 
 
2255
    def get_readonly_url(self, relpath=None):
 
2256
        """Get a URL for the readonly transport.
 
2257
 
 
2258
        This will either be backed by '.' or a decorator to the transport
 
2259
        used by self.get_url()
 
2260
        relpath provides for clients to get a path relative to the base url.
 
2261
        These should only be downwards relative, not upwards.
 
2262
        """
 
2263
        base = self.get_readonly_server().get_url()
 
2264
        return self._adjust_url(base, relpath)
 
2265
 
 
2266
    def get_vfs_only_server(self):
 
2267
        """Get the vfs only read/write server instance.
 
2268
 
 
2269
        This is useful for some tests with specific servers that need
 
2270
        diagnostics.
 
2271
 
 
2272
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2273
        is no means to override it.
 
2274
        """
 
2275
        if self.__vfs_server is None:
 
2276
            self.__vfs_server = MemoryServer()
 
2277
            self.start_server(self.__vfs_server)
 
2278
        return self.__vfs_server
 
2279
 
 
2280
    def get_server(self):
 
2281
        """Get the read/write server instance.
 
2282
 
 
2283
        This is useful for some tests with specific servers that need
 
2284
        diagnostics.
 
2285
 
 
2286
        This is built from the self.transport_server factory. If that is None,
 
2287
        then the self.get_vfs_server is returned.
 
2288
        """
 
2289
        if self.__server is None:
 
2290
            if (self.transport_server is None or self.transport_server is
 
2291
                self.vfs_transport_factory):
 
2292
                self.__server = self.get_vfs_only_server()
 
2293
            else:
 
2294
                # bring up a decorated means of access to the vfs only server.
 
2295
                self.__server = self.transport_server()
 
2296
                self.start_server(self.__server, self.get_vfs_only_server())
 
2297
        return self.__server
 
2298
 
 
2299
    def _adjust_url(self, base, relpath):
 
2300
        """Get a URL (or maybe a path) for the readwrite transport.
 
2301
 
 
2302
        This will either be backed by '.' or to an equivalent non-file based
 
2303
        facility.
 
2304
        relpath provides for clients to get a path relative to the base url.
 
2305
        These should only be downwards relative, not upwards.
 
2306
        """
 
2307
        if relpath is not None and relpath != '.':
 
2308
            if not base.endswith('/'):
 
2309
                base = base + '/'
 
2310
            # XXX: Really base should be a url; we did after all call
 
2311
            # get_url()!  But sometimes it's just a path (from
 
2312
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2313
            # to a non-escaped local path.
 
2314
            if base.startswith('./') or base.startswith('/'):
 
2315
                base += relpath
 
2316
            else:
 
2317
                base += urlutils.escape(relpath)
 
2318
        return base
 
2319
 
 
2320
    def get_url(self, relpath=None):
 
2321
        """Get a URL (or maybe a path) for the readwrite transport.
 
2322
 
 
2323
        This will either be backed by '.' or to an equivalent non-file based
 
2324
        facility.
 
2325
        relpath provides for clients to get a path relative to the base url.
 
2326
        These should only be downwards relative, not upwards.
 
2327
        """
 
2328
        base = self.get_server().get_url()
 
2329
        return self._adjust_url(base, relpath)
 
2330
 
 
2331
    def get_vfs_only_url(self, relpath=None):
 
2332
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2333
 
 
2334
        This will never be a smart protocol.  It always has all the
 
2335
        capabilities of the local filesystem, but it might actually be a
 
2336
        MemoryTransport or some other similar virtual filesystem.
 
2337
 
 
2338
        This is the backing transport (if any) of the server returned by
 
2339
        get_url and get_readonly_url.
 
2340
 
 
2341
        :param relpath: provides for clients to get a path relative to the base
 
2342
            url.  These should only be downwards relative, not upwards.
 
2343
        :return: A URL
 
2344
        """
 
2345
        base = self.get_vfs_only_server().get_url()
 
2346
        return self._adjust_url(base, relpath)
 
2347
 
 
2348
    def _create_safety_net(self):
 
2349
        """Make a fake bzr directory.
 
2350
 
 
2351
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2352
        real branch.
 
2353
        """
 
2354
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2355
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2356
 
 
2357
    def _check_safety_net(self):
 
2358
        """Check that the safety .bzr directory have not been touched.
 
2359
 
 
2360
        _make_test_root have created a .bzr directory to prevent tests from
 
2361
        propagating. This method ensures than a test did not leaked.
 
2362
        """
 
2363
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2364
        self.permit_url(get_transport(root).base)
 
2365
        wt = workingtree.WorkingTree.open(root)
 
2366
        last_rev = wt.last_revision()
 
2367
        if last_rev != 'null:':
 
2368
            # The current test have modified the /bzr directory, we need to
 
2369
            # recreate a new one or all the followng tests will fail.
 
2370
            # If you need to inspect its content uncomment the following line
 
2371
            # import pdb; pdb.set_trace()
 
2372
            _rmtree_temp_dir(root + '/.bzr')
 
2373
            self._create_safety_net()
 
2374
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2375
 
 
2376
    def _make_test_root(self):
 
2377
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2378
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2379
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2380
                                                    suffix='.tmp'))
 
2381
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2382
 
 
2383
            self._create_safety_net()
 
2384
 
 
2385
            # The same directory is used by all tests, and we're not
 
2386
            # specifically told when all tests are finished.  This will do.
 
2387
            atexit.register(_rmtree_temp_dir, root)
 
2388
 
 
2389
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2390
        self.addCleanup(self._check_safety_net)
 
2391
 
 
2392
    def makeAndChdirToTestDir(self):
 
2393
        """Create a temporary directories for this one test.
 
2394
 
 
2395
        This must set self.test_home_dir and self.test_dir and chdir to
 
2396
        self.test_dir.
 
2397
 
 
2398
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2399
        """
 
2400
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2401
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2402
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2403
        self.permit_dir(self.test_dir)
 
2404
 
 
2405
    def make_branch(self, relpath, format=None):
 
2406
        """Create a branch on the transport at relpath."""
 
2407
        repo = self.make_repository(relpath, format=format)
 
2408
        return repo.bzrdir.create_branch()
 
2409
 
 
2410
    def make_bzrdir(self, relpath, format=None):
 
2411
        try:
 
2412
            # might be a relative or absolute path
 
2413
            maybe_a_url = self.get_url(relpath)
 
2414
            segments = maybe_a_url.rsplit('/', 1)
 
2415
            t = get_transport(maybe_a_url)
 
2416
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2417
                t.ensure_base()
 
2418
            if format is None:
 
2419
                format = 'default'
 
2420
            if isinstance(format, basestring):
 
2421
                format = bzrdir.format_registry.make_bzrdir(format)
 
2422
            return format.initialize_on_transport(t)
 
2423
        except errors.UninitializableFormat:
 
2424
            raise TestSkipped("Format %s is not initializable." % format)
 
2425
 
 
2426
    def make_repository(self, relpath, shared=False, format=None):
 
2427
        """Create a repository on our default transport at relpath.
 
2428
 
 
2429
        Note that relpath must be a relative path, not a full url.
 
2430
        """
 
2431
        # FIXME: If you create a remoterepository this returns the underlying
 
2432
        # real format, which is incorrect.  Actually we should make sure that
 
2433
        # RemoteBzrDir returns a RemoteRepository.
 
2434
        # maybe  mbp 20070410
 
2435
        made_control = self.make_bzrdir(relpath, format=format)
 
2436
        return made_control.create_repository(shared=shared)
 
2437
 
 
2438
    def make_smart_server(self, path):
 
2439
        smart_server = server.SmartTCPServer_for_testing()
 
2440
        self.start_server(smart_server, self.get_server())
 
2441
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2442
        return remote_transport
 
2443
 
 
2444
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2445
        """Create a branch on the default transport and a MemoryTree for it."""
 
2446
        b = self.make_branch(relpath, format=format)
 
2447
        return memorytree.MemoryTree.create_on_branch(b)
 
2448
 
 
2449
    def make_branch_builder(self, relpath, format=None):
 
2450
        branch = self.make_branch(relpath, format=format)
 
2451
        return branchbuilder.BranchBuilder(branch=branch)
 
2452
 
 
2453
    def overrideEnvironmentForTesting(self):
 
2454
        os.environ['HOME'] = self.test_home_dir
 
2455
        os.environ['BZR_HOME'] = self.test_home_dir
 
2456
 
 
2457
    def setUp(self):
 
2458
        super(TestCaseWithMemoryTransport, self).setUp()
 
2459
        self._make_test_root()
 
2460
        _currentdir = os.getcwdu()
 
2461
        def _leaveDirectory():
 
2462
            os.chdir(_currentdir)
 
2463
        self.addCleanup(_leaveDirectory)
 
2464
        self.makeAndChdirToTestDir()
 
2465
        self.overrideEnvironmentForTesting()
 
2466
        self.__readonly_server = None
 
2467
        self.__server = None
 
2468
        self.reduceLockdirTimeout()
 
2469
 
 
2470
    def setup_smart_server_with_call_log(self):
 
2471
        """Sets up a smart server as the transport server with a call log."""
 
2472
        self.transport_server = server.SmartTCPServer_for_testing
 
2473
        self.hpss_calls = []
 
2474
        import traceback
 
2475
        # Skip the current stack down to the caller of
 
2476
        # setup_smart_server_with_call_log
 
2477
        prefix_length = len(traceback.extract_stack()) - 2
 
2478
        def capture_hpss_call(params):
 
2479
            self.hpss_calls.append(
 
2480
                CapturedCall(params, prefix_length))
 
2481
        client._SmartClient.hooks.install_named_hook(
 
2482
            'call', capture_hpss_call, None)
 
2483
 
 
2484
    def reset_smart_call_log(self):
 
2485
        self.hpss_calls = []
 
2486
 
 
2487
 
 
2488
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2489
    """Derived class that runs a test within a temporary directory.
 
2490
 
 
2491
    This is useful for tests that need to create a branch, etc.
 
2492
 
 
2493
    The directory is created in a slightly complex way: for each
 
2494
    Python invocation, a new temporary top-level directory is created.
 
2495
    All test cases create their own directory within that.  If the
 
2496
    tests complete successfully, the directory is removed.
 
2497
 
 
2498
    :ivar test_base_dir: The path of the top-level directory for this
 
2499
    test, which contains a home directory and a work directory.
 
2500
 
 
2501
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2502
    which is used as $HOME for this test.
 
2503
 
 
2504
    :ivar test_dir: A directory under test_base_dir used as the current
 
2505
    directory when the test proper is run.
 
2506
    """
 
2507
 
 
2508
    OVERRIDE_PYTHON = 'python'
 
2509
 
 
2510
    def check_file_contents(self, filename, expect):
 
2511
        self.log("check contents of file %s" % filename)
 
2512
        contents = file(filename, 'r').read()
 
2513
        if contents != expect:
 
2514
            self.log("expected: %r" % expect)
 
2515
            self.log("actually: %r" % contents)
 
2516
            self.fail("contents of %s not as expected" % filename)
 
2517
 
 
2518
    def _getTestDirPrefix(self):
 
2519
        # create a directory within the top level test directory
 
2520
        if sys.platform in ('win32', 'cygwin'):
 
2521
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2522
            # windows is likely to have path-length limits so use a short name
 
2523
            name_prefix = name_prefix[-30:]
 
2524
        else:
 
2525
            name_prefix = re.sub('[/]', '_', self.id())
 
2526
        return name_prefix
 
2527
 
 
2528
    def makeAndChdirToTestDir(self):
 
2529
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2530
 
 
2531
        For TestCaseInTempDir we create a temporary directory based on the test
 
2532
        name and then create two subdirs - test and home under it.
 
2533
        """
 
2534
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2535
            self._getTestDirPrefix())
 
2536
        name = name_prefix
 
2537
        for i in range(100):
 
2538
            if os.path.exists(name):
 
2539
                name = name_prefix + '_' + str(i)
 
2540
            else:
 
2541
                # now create test and home directories within this dir
 
2542
                self.test_base_dir = name
 
2543
                self.addCleanup(self.deleteTestDir)
 
2544
                os.mkdir(self.test_base_dir)
 
2545
                break
 
2546
        self.permit_dir(self.test_base_dir)
 
2547
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2548
        # stacking policy to honour; create a bzr dir with an unshared
 
2549
        # repository (but not a branch - our code would be trying to escape
 
2550
        # then!) to stop them, and permit it to be read.
 
2551
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2552
        # control.create_repository()
 
2553
        self.test_home_dir = self.test_base_dir + '/home'
 
2554
        os.mkdir(self.test_home_dir)
 
2555
        self.test_dir = self.test_base_dir + '/work'
 
2556
        os.mkdir(self.test_dir)
 
2557
        os.chdir(self.test_dir)
 
2558
        # put name of test inside
 
2559
        f = file(self.test_base_dir + '/name', 'w')
 
2560
        try:
 
2561
            f.write(self.id())
 
2562
        finally:
 
2563
            f.close()
 
2564
 
 
2565
    def deleteTestDir(self):
 
2566
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2567
        _rmtree_temp_dir(self.test_base_dir)
 
2568
 
 
2569
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2570
        """Build a test tree according to a pattern.
 
2571
 
 
2572
        shape is a sequence of file specifications.  If the final
 
2573
        character is '/', a directory is created.
 
2574
 
 
2575
        This assumes that all the elements in the tree being built are new.
 
2576
 
 
2577
        This doesn't add anything to a branch.
 
2578
 
 
2579
        :type shape:    list or tuple.
 
2580
        :param line_endings: Either 'binary' or 'native'
 
2581
            in binary mode, exact contents are written in native mode, the
 
2582
            line endings match the default platform endings.
 
2583
        :param transport: A transport to write to, for building trees on VFS's.
 
2584
            If the transport is readonly or None, "." is opened automatically.
 
2585
        :return: None
 
2586
        """
 
2587
        if type(shape) not in (list, tuple):
 
2588
            raise AssertionError("Parameter 'shape' should be "
 
2589
                "a list or a tuple. Got %r instead" % (shape,))
 
2590
        # It's OK to just create them using forward slashes on windows.
 
2591
        if transport is None or transport.is_readonly():
 
2592
            transport = get_transport(".")
 
2593
        for name in shape:
 
2594
            self.assertIsInstance(name, basestring)
 
2595
            if name[-1] == '/':
 
2596
                transport.mkdir(urlutils.escape(name[:-1]))
 
2597
            else:
 
2598
                if line_endings == 'binary':
 
2599
                    end = '\n'
 
2600
                elif line_endings == 'native':
 
2601
                    end = os.linesep
 
2602
                else:
 
2603
                    raise errors.BzrError(
 
2604
                        'Invalid line ending request %r' % line_endings)
 
2605
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2606
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2607
 
 
2608
    def build_tree_contents(self, shape):
 
2609
        build_tree_contents(shape)
 
2610
 
 
2611
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2612
        """Assert whether path or paths are in the WorkingTree"""
 
2613
        if tree is None:
 
2614
            tree = workingtree.WorkingTree.open(root_path)
 
2615
        if not isinstance(path, basestring):
 
2616
            for p in path:
 
2617
                self.assertInWorkingTree(p, tree=tree)
 
2618
        else:
 
2619
            self.assertIsNot(tree.path2id(path), None,
 
2620
                path+' not in working tree.')
 
2621
 
 
2622
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2623
        """Assert whether path or paths are not in the WorkingTree"""
 
2624
        if tree is None:
 
2625
            tree = workingtree.WorkingTree.open(root_path)
 
2626
        if not isinstance(path, basestring):
 
2627
            for p in path:
 
2628
                self.assertNotInWorkingTree(p,tree=tree)
 
2629
        else:
 
2630
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2631
 
 
2632
 
 
2633
class TestCaseWithTransport(TestCaseInTempDir):
 
2634
    """A test case that provides get_url and get_readonly_url facilities.
 
2635
 
 
2636
    These back onto two transport servers, one for readonly access and one for
 
2637
    read write access.
 
2638
 
 
2639
    If no explicit class is provided for readonly access, a
 
2640
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2641
    based read write transports.
 
2642
 
 
2643
    If an explicit class is provided for readonly access, that server and the
 
2644
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2645
    """
 
2646
 
 
2647
    def get_vfs_only_server(self):
 
2648
        """See TestCaseWithMemoryTransport.
 
2649
 
 
2650
        This is useful for some tests with specific servers that need
 
2651
        diagnostics.
 
2652
        """
 
2653
        if self.__vfs_server is None:
 
2654
            self.__vfs_server = self.vfs_transport_factory()
 
2655
            self.start_server(self.__vfs_server)
 
2656
        return self.__vfs_server
 
2657
 
 
2658
    def make_branch_and_tree(self, relpath, format=None):
 
2659
        """Create a branch on the transport and a tree locally.
 
2660
 
 
2661
        If the transport is not a LocalTransport, the Tree can't be created on
 
2662
        the transport.  In that case if the vfs_transport_factory is
 
2663
        LocalURLServer the working tree is created in the local
 
2664
        directory backing the transport, and the returned tree's branch and
 
2665
        repository will also be accessed locally. Otherwise a lightweight
 
2666
        checkout is created and returned.
 
2667
 
 
2668
        We do this because we can't physically create a tree in the local
 
2669
        path, with a branch reference to the transport_factory url, and
 
2670
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2671
        namespace is distinct from the local disk - the two branch objects
 
2672
        would collide. While we could construct a tree with its branch object
 
2673
        pointing at the transport_factory transport in memory, reopening it
 
2674
        would behaving unexpectedly, and has in the past caused testing bugs
 
2675
        when we tried to do it that way.
 
2676
 
 
2677
        :param format: The BzrDirFormat.
 
2678
        :returns: the WorkingTree.
 
2679
        """
 
2680
        # TODO: always use the local disk path for the working tree,
 
2681
        # this obviously requires a format that supports branch references
 
2682
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2683
        # RBC 20060208
 
2684
        b = self.make_branch(relpath, format=format)
 
2685
        try:
 
2686
            return b.bzrdir.create_workingtree()
 
2687
        except errors.NotLocalUrl:
 
2688
            # We can only make working trees locally at the moment.  If the
 
2689
            # transport can't support them, then we keep the non-disk-backed
 
2690
            # branch and create a local checkout.
 
2691
            if self.vfs_transport_factory is LocalURLServer:
 
2692
                # the branch is colocated on disk, we cannot create a checkout.
 
2693
                # hopefully callers will expect this.
 
2694
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2695
                wt = local_controldir.create_workingtree()
 
2696
                if wt.branch._format != b._format:
 
2697
                    wt._branch = b
 
2698
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2699
                    # in case the implementation details of workingtree objects
 
2700
                    # change.
 
2701
                    self.assertIs(b, wt.branch)
 
2702
                return wt
 
2703
            else:
 
2704
                return b.create_checkout(relpath, lightweight=True)
 
2705
 
 
2706
    def assertIsDirectory(self, relpath, transport):
 
2707
        """Assert that relpath within transport is a directory.
 
2708
 
 
2709
        This may not be possible on all transports; in that case it propagates
 
2710
        a TransportNotPossible.
 
2711
        """
 
2712
        try:
 
2713
            mode = transport.stat(relpath).st_mode
 
2714
        except errors.NoSuchFile:
 
2715
            self.fail("path %s is not a directory; no such file"
 
2716
                      % (relpath))
 
2717
        if not stat.S_ISDIR(mode):
 
2718
            self.fail("path %s is not a directory; has mode %#o"
 
2719
                      % (relpath, mode))
 
2720
 
 
2721
    def assertTreesEqual(self, left, right):
 
2722
        """Check that left and right have the same content and properties."""
 
2723
        # we use a tree delta to check for equality of the content, and we
 
2724
        # manually check for equality of other things such as the parents list.
 
2725
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2726
        differences = left.changes_from(right)
 
2727
        self.assertFalse(differences.has_changed(),
 
2728
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2729
 
 
2730
    def setUp(self):
 
2731
        super(TestCaseWithTransport, self).setUp()
 
2732
        self.__vfs_server = None
 
2733
 
 
2734
    def disable_missing_extensions_warning(self):
 
2735
        """Some tests expect a precise stderr content.
 
2736
 
 
2737
        There is no point in forcing them to duplicate the extension related
 
2738
        warning.
 
2739
        """
 
2740
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
2741
 
 
2742
 
 
2743
class ChrootedTestCase(TestCaseWithTransport):
 
2744
    """A support class that provides readonly urls outside the local namespace.
 
2745
 
 
2746
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2747
    is then we are chrooted already, if it is not then an HttpServer is used
 
2748
    for readonly urls.
 
2749
 
 
2750
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2751
                       be used without needed to redo it when a different
 
2752
                       subclass is in use ?
 
2753
    """
 
2754
 
 
2755
    def setUp(self):
 
2756
        super(ChrootedTestCase, self).setUp()
 
2757
        if not self.vfs_transport_factory == MemoryServer:
 
2758
            self.transport_readonly_server = HttpServer
 
2759
 
 
2760
 
 
2761
def condition_id_re(pattern):
 
2762
    """Create a condition filter which performs a re check on a test's id.
 
2763
 
 
2764
    :param pattern: A regular expression string.
 
2765
    :return: A callable that returns True if the re matches.
 
2766
    """
 
2767
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2768
        'test filter')
 
2769
    def condition(test):
 
2770
        test_id = test.id()
 
2771
        return filter_re.search(test_id)
 
2772
    return condition
 
2773
 
 
2774
 
 
2775
def condition_isinstance(klass_or_klass_list):
 
2776
    """Create a condition filter which returns isinstance(param, klass).
 
2777
 
 
2778
    :return: A callable which when called with one parameter obj return the
 
2779
        result of isinstance(obj, klass_or_klass_list).
 
2780
    """
 
2781
    def condition(obj):
 
2782
        return isinstance(obj, klass_or_klass_list)
 
2783
    return condition
 
2784
 
 
2785
 
 
2786
def condition_id_in_list(id_list):
 
2787
    """Create a condition filter which verify that test's id in a list.
 
2788
 
 
2789
    :param id_list: A TestIdList object.
 
2790
    :return: A callable that returns True if the test's id appears in the list.
 
2791
    """
 
2792
    def condition(test):
 
2793
        return id_list.includes(test.id())
 
2794
    return condition
 
2795
 
 
2796
 
 
2797
def condition_id_startswith(starts):
 
2798
    """Create a condition filter verifying that test's id starts with a string.
 
2799
 
 
2800
    :param starts: A list of string.
 
2801
    :return: A callable that returns True if the test's id starts with one of
 
2802
        the given strings.
 
2803
    """
 
2804
    def condition(test):
 
2805
        for start in starts:
 
2806
            if test.id().startswith(start):
 
2807
                return True
 
2808
        return False
 
2809
    return condition
 
2810
 
 
2811
 
 
2812
def exclude_tests_by_condition(suite, condition):
 
2813
    """Create a test suite which excludes some tests from suite.
 
2814
 
 
2815
    :param suite: The suite to get tests from.
 
2816
    :param condition: A callable whose result evaluates True when called with a
 
2817
        test case which should be excluded from the result.
 
2818
    :return: A suite which contains the tests found in suite that fail
 
2819
        condition.
 
2820
    """
 
2821
    result = []
 
2822
    for test in iter_suite_tests(suite):
 
2823
        if not condition(test):
 
2824
            result.append(test)
 
2825
    return TestUtil.TestSuite(result)
 
2826
 
 
2827
 
 
2828
def filter_suite_by_condition(suite, condition):
 
2829
    """Create a test suite by filtering another one.
 
2830
 
 
2831
    :param suite: The source suite.
 
2832
    :param condition: A callable whose result evaluates True when called with a
 
2833
        test case which should be included in the result.
 
2834
    :return: A suite which contains the tests found in suite that pass
 
2835
        condition.
 
2836
    """
 
2837
    result = []
 
2838
    for test in iter_suite_tests(suite):
 
2839
        if condition(test):
 
2840
            result.append(test)
 
2841
    return TestUtil.TestSuite(result)
 
2842
 
 
2843
 
 
2844
def filter_suite_by_re(suite, pattern):
 
2845
    """Create a test suite by filtering another one.
 
2846
 
 
2847
    :param suite:           the source suite
 
2848
    :param pattern:         pattern that names must match
 
2849
    :returns: the newly created suite
 
2850
    """
 
2851
    condition = condition_id_re(pattern)
 
2852
    result_suite = filter_suite_by_condition(suite, condition)
 
2853
    return result_suite
 
2854
 
 
2855
 
 
2856
def filter_suite_by_id_list(suite, test_id_list):
 
2857
    """Create a test suite by filtering another one.
 
2858
 
 
2859
    :param suite: The source suite.
 
2860
    :param test_id_list: A list of the test ids to keep as strings.
 
2861
    :returns: the newly created suite
 
2862
    """
 
2863
    condition = condition_id_in_list(test_id_list)
 
2864
    result_suite = filter_suite_by_condition(suite, condition)
 
2865
    return result_suite
 
2866
 
 
2867
 
 
2868
def filter_suite_by_id_startswith(suite, start):
 
2869
    """Create a test suite by filtering another one.
 
2870
 
 
2871
    :param suite: The source suite.
 
2872
    :param start: A list of string the test id must start with one of.
 
2873
    :returns: the newly created suite
 
2874
    """
 
2875
    condition = condition_id_startswith(start)
 
2876
    result_suite = filter_suite_by_condition(suite, condition)
 
2877
    return result_suite
 
2878
 
 
2879
 
 
2880
def exclude_tests_by_re(suite, pattern):
 
2881
    """Create a test suite which excludes some tests from suite.
 
2882
 
 
2883
    :param suite: The suite to get tests from.
 
2884
    :param pattern: A regular expression string. Test ids that match this
 
2885
        pattern will be excluded from the result.
 
2886
    :return: A TestSuite that contains all the tests from suite without the
 
2887
        tests that matched pattern. The order of tests is the same as it was in
 
2888
        suite.
 
2889
    """
 
2890
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2891
 
 
2892
 
 
2893
def preserve_input(something):
 
2894
    """A helper for performing test suite transformation chains.
 
2895
 
 
2896
    :param something: Anything you want to preserve.
 
2897
    :return: Something.
 
2898
    """
 
2899
    return something
 
2900
 
 
2901
 
 
2902
def randomize_suite(suite):
 
2903
    """Return a new TestSuite with suite's tests in random order.
 
2904
 
 
2905
    The tests in the input suite are flattened into a single suite in order to
 
2906
    accomplish this. Any nested TestSuites are removed to provide global
 
2907
    randomness.
 
2908
    """
 
2909
    tests = list(iter_suite_tests(suite))
 
2910
    random.shuffle(tests)
 
2911
    return TestUtil.TestSuite(tests)
 
2912
 
 
2913
 
 
2914
def split_suite_by_condition(suite, condition):
 
2915
    """Split a test suite into two by a condition.
 
2916
 
 
2917
    :param suite: The suite to split.
 
2918
    :param condition: The condition to match on. Tests that match this
 
2919
        condition are returned in the first test suite, ones that do not match
 
2920
        are in the second suite.
 
2921
    :return: A tuple of two test suites, where the first contains tests from
 
2922
        suite matching the condition, and the second contains the remainder
 
2923
        from suite. The order within each output suite is the same as it was in
 
2924
        suite.
 
2925
    """
 
2926
    matched = []
 
2927
    did_not_match = []
 
2928
    for test in iter_suite_tests(suite):
 
2929
        if condition(test):
 
2930
            matched.append(test)
 
2931
        else:
 
2932
            did_not_match.append(test)
 
2933
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2934
 
 
2935
 
 
2936
def split_suite_by_re(suite, pattern):
 
2937
    """Split a test suite into two by a regular expression.
 
2938
 
 
2939
    :param suite: The suite to split.
 
2940
    :param pattern: A regular expression string. Test ids that match this
 
2941
        pattern will be in the first test suite returned, and the others in the
 
2942
        second test suite returned.
 
2943
    :return: A tuple of two test suites, where the first contains tests from
 
2944
        suite matching pattern, and the second contains the remainder from
 
2945
        suite. The order within each output suite is the same as it was in
 
2946
        suite.
 
2947
    """
 
2948
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2949
 
 
2950
 
 
2951
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2952
              stop_on_failure=False,
 
2953
              transport=None, lsprof_timed=None, bench_history=None,
 
2954
              matching_tests_first=None,
 
2955
              list_only=False,
 
2956
              random_seed=None,
 
2957
              exclude_pattern=None,
 
2958
              strict=False,
 
2959
              runner_class=None,
 
2960
              suite_decorators=None,
 
2961
              stream=None,
 
2962
              result_decorators=None,
 
2963
              ):
 
2964
    """Run a test suite for bzr selftest.
 
2965
 
 
2966
    :param runner_class: The class of runner to use. Must support the
 
2967
        constructor arguments passed by run_suite which are more than standard
 
2968
        python uses.
 
2969
    :return: A boolean indicating success.
 
2970
    """
 
2971
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2972
    if verbose:
 
2973
        verbosity = 2
 
2974
    else:
 
2975
        verbosity = 1
 
2976
    if runner_class is None:
 
2977
        runner_class = TextTestRunner
 
2978
    if stream is None:
 
2979
        stream = sys.stdout
 
2980
    runner = runner_class(stream=stream,
 
2981
                            descriptions=0,
 
2982
                            verbosity=verbosity,
 
2983
                            bench_history=bench_history,
 
2984
                            strict=strict,
 
2985
                            result_decorators=result_decorators,
 
2986
                            )
 
2987
    runner.stop_on_failure=stop_on_failure
 
2988
    # built in decorator factories:
 
2989
    decorators = [
 
2990
        random_order(random_seed, runner),
 
2991
        exclude_tests(exclude_pattern),
 
2992
        ]
 
2993
    if matching_tests_first:
 
2994
        decorators.append(tests_first(pattern))
 
2995
    else:
 
2996
        decorators.append(filter_tests(pattern))
 
2997
    if suite_decorators:
 
2998
        decorators.extend(suite_decorators)
 
2999
    # tell the result object how many tests will be running: (except if
 
3000
    # --parallel=fork is being used. Robert said he will provide a better
 
3001
    # progress design later -- vila 20090817)
 
3002
    if fork_decorator not in decorators:
 
3003
        decorators.append(CountingDecorator)
 
3004
    for decorator in decorators:
 
3005
        suite = decorator(suite)
 
3006
    if list_only:
 
3007
        # Done after test suite decoration to allow randomisation etc
 
3008
        # to take effect, though that is of marginal benefit.
 
3009
        if verbosity >= 2:
 
3010
            stream.write("Listing tests only ...\n")
 
3011
        for t in iter_suite_tests(suite):
 
3012
            stream.write("%s\n" % (t.id()))
 
3013
        return True
 
3014
    result = runner.run(suite)
 
3015
    if strict:
 
3016
        return result.wasStrictlySuccessful()
 
3017
    else:
 
3018
        return result.wasSuccessful()
 
3019
 
 
3020
 
 
3021
# A registry where get() returns a suite decorator.
 
3022
parallel_registry = registry.Registry()
 
3023
 
 
3024
 
 
3025
def fork_decorator(suite):
 
3026
    concurrency = osutils.local_concurrency()
 
3027
    if concurrency == 1:
 
3028
        return suite
 
3029
    from testtools import ConcurrentTestSuite
 
3030
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3031
parallel_registry.register('fork', fork_decorator)
 
3032
 
 
3033
 
 
3034
def subprocess_decorator(suite):
 
3035
    concurrency = osutils.local_concurrency()
 
3036
    if concurrency == 1:
 
3037
        return suite
 
3038
    from testtools import ConcurrentTestSuite
 
3039
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3040
parallel_registry.register('subprocess', subprocess_decorator)
 
3041
 
 
3042
 
 
3043
def exclude_tests(exclude_pattern):
 
3044
    """Return a test suite decorator that excludes tests."""
 
3045
    if exclude_pattern is None:
 
3046
        return identity_decorator
 
3047
    def decorator(suite):
 
3048
        return ExcludeDecorator(suite, exclude_pattern)
 
3049
    return decorator
 
3050
 
 
3051
 
 
3052
def filter_tests(pattern):
 
3053
    if pattern == '.*':
 
3054
        return identity_decorator
 
3055
    def decorator(suite):
 
3056
        return FilterTestsDecorator(suite, pattern)
 
3057
    return decorator
 
3058
 
 
3059
 
 
3060
def random_order(random_seed, runner):
 
3061
    """Return a test suite decorator factory for randomising tests order.
 
3062
    
 
3063
    :param random_seed: now, a string which casts to a long, or a long.
 
3064
    :param runner: A test runner with a stream attribute to report on.
 
3065
    """
 
3066
    if random_seed is None:
 
3067
        return identity_decorator
 
3068
    def decorator(suite):
 
3069
        return RandomDecorator(suite, random_seed, runner.stream)
 
3070
    return decorator
 
3071
 
 
3072
 
 
3073
def tests_first(pattern):
 
3074
    if pattern == '.*':
 
3075
        return identity_decorator
 
3076
    def decorator(suite):
 
3077
        return TestFirstDecorator(suite, pattern)
 
3078
    return decorator
 
3079
 
 
3080
 
 
3081
def identity_decorator(suite):
 
3082
    """Return suite."""
 
3083
    return suite
 
3084
 
 
3085
 
 
3086
class TestDecorator(TestSuite):
 
3087
    """A decorator for TestCase/TestSuite objects.
 
3088
    
 
3089
    Usually, subclasses should override __iter__(used when flattening test
 
3090
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
3091
    debug().
 
3092
    """
 
3093
 
 
3094
    def __init__(self, suite):
 
3095
        TestSuite.__init__(self)
 
3096
        self.addTest(suite)
 
3097
 
 
3098
    def countTestCases(self):
 
3099
        cases = 0
 
3100
        for test in self:
 
3101
            cases += test.countTestCases()
 
3102
        return cases
 
3103
 
 
3104
    def debug(self):
 
3105
        for test in self:
 
3106
            test.debug()
 
3107
 
 
3108
    def run(self, result):
 
3109
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
3110
        # into __iter__.
 
3111
        for test in self:
 
3112
            if result.shouldStop:
 
3113
                break
 
3114
            test.run(result)
 
3115
        return result
 
3116
 
 
3117
 
 
3118
class CountingDecorator(TestDecorator):
 
3119
    """A decorator which calls result.progress(self.countTestCases)."""
 
3120
 
 
3121
    def run(self, result):
 
3122
        progress_method = getattr(result, 'progress', None)
 
3123
        if callable(progress_method):
 
3124
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3125
        return super(CountingDecorator, self).run(result)
 
3126
 
 
3127
 
 
3128
class ExcludeDecorator(TestDecorator):
 
3129
    """A decorator which excludes test matching an exclude pattern."""
 
3130
 
 
3131
    def __init__(self, suite, exclude_pattern):
 
3132
        TestDecorator.__init__(self, suite)
 
3133
        self.exclude_pattern = exclude_pattern
 
3134
        self.excluded = False
 
3135
 
 
3136
    def __iter__(self):
 
3137
        if self.excluded:
 
3138
            return iter(self._tests)
 
3139
        self.excluded = True
 
3140
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
3141
        del self._tests[:]
 
3142
        self.addTests(suite)
 
3143
        return iter(self._tests)
 
3144
 
 
3145
 
 
3146
class FilterTestsDecorator(TestDecorator):
 
3147
    """A decorator which filters tests to those matching a pattern."""
 
3148
 
 
3149
    def __init__(self, suite, pattern):
 
3150
        TestDecorator.__init__(self, suite)
 
3151
        self.pattern = pattern
 
3152
        self.filtered = False
 
3153
 
 
3154
    def __iter__(self):
 
3155
        if self.filtered:
 
3156
            return iter(self._tests)
 
3157
        self.filtered = True
 
3158
        suite = filter_suite_by_re(self, self.pattern)
 
3159
        del self._tests[:]
 
3160
        self.addTests(suite)
 
3161
        return iter(self._tests)
 
3162
 
 
3163
 
 
3164
class RandomDecorator(TestDecorator):
 
3165
    """A decorator which randomises the order of its tests."""
 
3166
 
 
3167
    def __init__(self, suite, random_seed, stream):
 
3168
        TestDecorator.__init__(self, suite)
 
3169
        self.random_seed = random_seed
 
3170
        self.randomised = False
 
3171
        self.stream = stream
 
3172
 
 
3173
    def __iter__(self):
 
3174
        if self.randomised:
 
3175
            return iter(self._tests)
 
3176
        self.randomised = True
 
3177
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3178
            (self.actual_seed()))
 
3179
        # Initialise the random number generator.
 
3180
        random.seed(self.actual_seed())
 
3181
        suite = randomize_suite(self)
 
3182
        del self._tests[:]
 
3183
        self.addTests(suite)
 
3184
        return iter(self._tests)
 
3185
 
 
3186
    def actual_seed(self):
 
3187
        if self.random_seed == "now":
 
3188
            # We convert the seed to a long to make it reuseable across
 
3189
            # invocations (because the user can reenter it).
 
3190
            self.random_seed = long(time.time())
 
3191
        else:
 
3192
            # Convert the seed to a long if we can
 
3193
            try:
 
3194
                self.random_seed = long(self.random_seed)
 
3195
            except:
 
3196
                pass
 
3197
        return self.random_seed
 
3198
 
 
3199
 
 
3200
class TestFirstDecorator(TestDecorator):
 
3201
    """A decorator which moves named tests to the front."""
 
3202
 
 
3203
    def __init__(self, suite, pattern):
 
3204
        TestDecorator.__init__(self, suite)
 
3205
        self.pattern = pattern
 
3206
        self.filtered = False
 
3207
 
 
3208
    def __iter__(self):
 
3209
        if self.filtered:
 
3210
            return iter(self._tests)
 
3211
        self.filtered = True
 
3212
        suites = split_suite_by_re(self, self.pattern)
 
3213
        del self._tests[:]
 
3214
        self.addTests(suites)
 
3215
        return iter(self._tests)
 
3216
 
 
3217
 
 
3218
def partition_tests(suite, count):
 
3219
    """Partition suite into count lists of tests."""
 
3220
    result = []
 
3221
    tests = list(iter_suite_tests(suite))
 
3222
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3223
    for block in range(count):
 
3224
        low_test = block * tests_per_process
 
3225
        high_test = low_test + tests_per_process
 
3226
        process_tests = tests[low_test:high_test]
 
3227
        result.append(process_tests)
 
3228
    return result
 
3229
 
 
3230
 
 
3231
def fork_for_tests(suite):
 
3232
    """Take suite and start up one runner per CPU by forking()
 
3233
 
 
3234
    :return: An iterable of TestCase-like objects which can each have
 
3235
        run(result) called on them to feed tests to result.
 
3236
    """
 
3237
    concurrency = osutils.local_concurrency()
 
3238
    result = []
 
3239
    from subunit import TestProtocolClient, ProtocolTestCase
 
3240
    try:
 
3241
        from subunit.test_results import AutoTimingTestResultDecorator
 
3242
    except ImportError:
 
3243
        AutoTimingTestResultDecorator = lambda x:x
 
3244
    class TestInOtherProcess(ProtocolTestCase):
 
3245
        # Should be in subunit, I think. RBC.
 
3246
        def __init__(self, stream, pid):
 
3247
            ProtocolTestCase.__init__(self, stream)
 
3248
            self.pid = pid
 
3249
 
 
3250
        def run(self, result):
 
3251
            try:
 
3252
                ProtocolTestCase.run(self, result)
 
3253
            finally:
 
3254
                os.waitpid(self.pid, os.WNOHANG)
 
3255
 
 
3256
    test_blocks = partition_tests(suite, concurrency)
 
3257
    for process_tests in test_blocks:
 
3258
        process_suite = TestSuite()
 
3259
        process_suite.addTests(process_tests)
 
3260
        c2pread, c2pwrite = os.pipe()
 
3261
        pid = os.fork()
 
3262
        if pid == 0:
 
3263
            try:
 
3264
                os.close(c2pread)
 
3265
                # Leave stderr and stdout open so we can see test noise
 
3266
                # Close stdin so that the child goes away if it decides to
 
3267
                # read from stdin (otherwise its a roulette to see what
 
3268
                # child actually gets keystrokes for pdb etc).
 
3269
                sys.stdin.close()
 
3270
                sys.stdin = None
 
3271
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3272
                subunit_result = AutoTimingTestResultDecorator(
 
3273
                    TestProtocolClient(stream))
 
3274
                process_suite.run(subunit_result)
 
3275
            finally:
 
3276
                os._exit(0)
 
3277
        else:
 
3278
            os.close(c2pwrite)
 
3279
            stream = os.fdopen(c2pread, 'rb', 1)
 
3280
            test = TestInOtherProcess(stream, pid)
 
3281
            result.append(test)
 
3282
    return result
 
3283
 
 
3284
 
 
3285
def reinvoke_for_tests(suite):
 
3286
    """Take suite and start up one runner per CPU using subprocess().
 
3287
 
 
3288
    :return: An iterable of TestCase-like objects which can each have
 
3289
        run(result) called on them to feed tests to result.
 
3290
    """
 
3291
    concurrency = osutils.local_concurrency()
 
3292
    result = []
 
3293
    from subunit import ProtocolTestCase
 
3294
    class TestInSubprocess(ProtocolTestCase):
 
3295
        def __init__(self, process, name):
 
3296
            ProtocolTestCase.__init__(self, process.stdout)
 
3297
            self.process = process
 
3298
            self.process.stdin.close()
 
3299
            self.name = name
 
3300
 
 
3301
        def run(self, result):
 
3302
            try:
 
3303
                ProtocolTestCase.run(self, result)
 
3304
            finally:
 
3305
                self.process.wait()
 
3306
                os.unlink(self.name)
 
3307
            # print "pid %d finished" % finished_process
 
3308
    test_blocks = partition_tests(suite, concurrency)
 
3309
    for process_tests in test_blocks:
 
3310
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3311
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3312
        if not os.path.isfile(bzr_path):
 
3313
            # We are probably installed. Assume sys.argv is the right file
 
3314
            bzr_path = sys.argv[0]
 
3315
        fd, test_list_file_name = tempfile.mkstemp()
 
3316
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3317
        for test in process_tests:
 
3318
            test_list_file.write(test.id() + '\n')
 
3319
        test_list_file.close()
 
3320
        try:
 
3321
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3322
                '--subunit']
 
3323
            if '--no-plugins' in sys.argv:
 
3324
                argv.append('--no-plugins')
 
3325
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3326
            # stderr it can interrupt the subunit protocol.
 
3327
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3328
                bufsize=1)
 
3329
            test = TestInSubprocess(process, test_list_file_name)
 
3330
            result.append(test)
 
3331
        except:
 
3332
            os.unlink(test_list_file_name)
 
3333
            raise
 
3334
    return result
 
3335
 
 
3336
 
 
3337
class ForwardingResult(unittest.TestResult):
 
3338
 
 
3339
    def __init__(self, target):
 
3340
        unittest.TestResult.__init__(self)
 
3341
        self.result = target
 
3342
 
 
3343
    def startTest(self, test):
 
3344
        self.result.startTest(test)
 
3345
 
 
3346
    def stopTest(self, test):
 
3347
        self.result.stopTest(test)
 
3348
 
 
3349
    def startTestRun(self):
 
3350
        self.result.startTestRun()
 
3351
 
 
3352
    def stopTestRun(self):
 
3353
        self.result.stopTestRun()
 
3354
 
 
3355
    def addSkip(self, test, reason):
 
3356
        self.result.addSkip(test, reason)
 
3357
 
 
3358
    def addSuccess(self, test):
 
3359
        self.result.addSuccess(test)
 
3360
 
 
3361
    def addError(self, test, err):
 
3362
        self.result.addError(test, err)
 
3363
 
 
3364
    def addFailure(self, test, err):
 
3365
        self.result.addFailure(test, err)
 
3366
 
 
3367
 
 
3368
class BZRTransformingResult(ForwardingResult):
 
3369
 
 
3370
    def addError(self, test, err):
 
3371
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3372
        if feature is not None:
 
3373
            self.result.addNotSupported(test, feature)
 
3374
        else:
 
3375
            self.result.addError(test, err)
 
3376
 
 
3377
    def addFailure(self, test, err):
 
3378
        known = self._error_looks_like('KnownFailure: ', err)
 
3379
        if known is not None:
 
3380
            self.result._addKnownFailure(test, [KnownFailure,
 
3381
                                                KnownFailure(known), None])
 
3382
        else:
 
3383
            self.result.addFailure(test, err)
 
3384
 
 
3385
    def _error_looks_like(self, prefix, err):
 
3386
        """Deserialize exception and returns the stringify value."""
 
3387
        import subunit
 
3388
        value = None
 
3389
        typ, exc, _ = err
 
3390
        if isinstance(exc, subunit.RemoteException):
 
3391
            # stringify the exception gives access to the remote traceback
 
3392
            # We search the last line for 'prefix'
 
3393
            lines = str(exc).split('\n')
 
3394
            while lines and not lines[-1]:
 
3395
                lines.pop(-1)
 
3396
            if lines:
 
3397
                if lines[-1].startswith(prefix):
 
3398
                    value = lines[-1][len(prefix):]
 
3399
        return value
 
3400
 
 
3401
 
 
3402
class ProfileResult(ForwardingResult):
 
3403
    """Generate profiling data for all activity between start and success.
 
3404
    
 
3405
    The profile data is appended to the test's _benchcalls attribute and can
 
3406
    be accessed by the forwarded-to TestResult.
 
3407
 
 
3408
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3409
    where our existing output support for lsprof is, and this class aims to
 
3410
    fit in with that: while it could be moved it's not necessary to accomplish
 
3411
    test profiling, nor would it be dramatically cleaner.
 
3412
    """
 
3413
 
 
3414
    def startTest(self, test):
 
3415
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3416
        self.profiler.start()
 
3417
        ForwardingResult.startTest(self, test)
 
3418
 
 
3419
    def addSuccess(self, test):
 
3420
        stats = self.profiler.stop()
 
3421
        try:
 
3422
            calls = test._benchcalls
 
3423
        except AttributeError:
 
3424
            test._benchcalls = []
 
3425
            calls = test._benchcalls
 
3426
        calls.append(((test.id(), "", ""), stats))
 
3427
        ForwardingResult.addSuccess(self, test)
 
3428
 
 
3429
    def stopTest(self, test):
 
3430
        ForwardingResult.stopTest(self, test)
 
3431
        self.profiler = None
 
3432
 
 
3433
 
 
3434
# Controlled by "bzr selftest -E=..." option
 
3435
# Currently supported:
 
3436
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3437
#                           preserves any flags supplied at the command line.
 
3438
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3439
#                           rather than failing tests. And no longer raise
 
3440
#                           LockContention when fctnl locks are not being used
 
3441
#                           with proper exclusion rules.
 
3442
selftest_debug_flags = set()
 
3443
 
 
3444
 
 
3445
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3446
             transport=None,
 
3447
             test_suite_factory=None,
 
3448
             lsprof_timed=None,
 
3449
             bench_history=None,
 
3450
             matching_tests_first=None,
 
3451
             list_only=False,
 
3452
             random_seed=None,
 
3453
             exclude_pattern=None,
 
3454
             strict=False,
 
3455
             load_list=None,
 
3456
             debug_flags=None,
 
3457
             starting_with=None,
 
3458
             runner_class=None,
 
3459
             suite_decorators=None,
 
3460
             stream=None,
 
3461
             lsprof_tests=False,
 
3462
             ):
 
3463
    """Run the whole test suite under the enhanced runner"""
 
3464
    # XXX: Very ugly way to do this...
 
3465
    # Disable warning about old formats because we don't want it to disturb
 
3466
    # any blackbox tests.
 
3467
    from bzrlib import repository
 
3468
    repository._deprecation_warning_done = True
 
3469
 
 
3470
    global default_transport
 
3471
    if transport is None:
 
3472
        transport = default_transport
 
3473
    old_transport = default_transport
 
3474
    default_transport = transport
 
3475
    global selftest_debug_flags
 
3476
    old_debug_flags = selftest_debug_flags
 
3477
    if debug_flags is not None:
 
3478
        selftest_debug_flags = set(debug_flags)
 
3479
    try:
 
3480
        if load_list is None:
 
3481
            keep_only = None
 
3482
        else:
 
3483
            keep_only = load_test_id_list(load_list)
 
3484
        if starting_with:
 
3485
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3486
                             for start in starting_with]
 
3487
        if test_suite_factory is None:
 
3488
            # Reduce loading time by loading modules based on the starting_with
 
3489
            # patterns.
 
3490
            suite = test_suite(keep_only, starting_with)
 
3491
        else:
 
3492
            suite = test_suite_factory()
 
3493
        if starting_with:
 
3494
            # But always filter as requested.
 
3495
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3496
        result_decorators = []
 
3497
        if lsprof_tests:
 
3498
            result_decorators.append(ProfileResult)
 
3499
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3500
                     stop_on_failure=stop_on_failure,
 
3501
                     transport=transport,
 
3502
                     lsprof_timed=lsprof_timed,
 
3503
                     bench_history=bench_history,
 
3504
                     matching_tests_first=matching_tests_first,
 
3505
                     list_only=list_only,
 
3506
                     random_seed=random_seed,
 
3507
                     exclude_pattern=exclude_pattern,
 
3508
                     strict=strict,
 
3509
                     runner_class=runner_class,
 
3510
                     suite_decorators=suite_decorators,
 
3511
                     stream=stream,
 
3512
                     result_decorators=result_decorators,
 
3513
                     )
 
3514
    finally:
 
3515
        default_transport = old_transport
 
3516
        selftest_debug_flags = old_debug_flags
 
3517
 
 
3518
 
 
3519
def load_test_id_list(file_name):
 
3520
    """Load a test id list from a text file.
 
3521
 
 
3522
    The format is one test id by line.  No special care is taken to impose
 
3523
    strict rules, these test ids are used to filter the test suite so a test id
 
3524
    that do not match an existing test will do no harm. This allows user to add
 
3525
    comments, leave blank lines, etc.
 
3526
    """
 
3527
    test_list = []
 
3528
    try:
 
3529
        ftest = open(file_name, 'rt')
 
3530
    except IOError, e:
 
3531
        if e.errno != errno.ENOENT:
 
3532
            raise
 
3533
        else:
 
3534
            raise errors.NoSuchFile(file_name)
 
3535
 
 
3536
    for test_name in ftest.readlines():
 
3537
        test_list.append(test_name.strip())
 
3538
    ftest.close()
 
3539
    return test_list
 
3540
 
 
3541
 
 
3542
def suite_matches_id_list(test_suite, id_list):
 
3543
    """Warns about tests not appearing or appearing more than once.
 
3544
 
 
3545
    :param test_suite: A TestSuite object.
 
3546
    :param test_id_list: The list of test ids that should be found in
 
3547
         test_suite.
 
3548
 
 
3549
    :return: (absents, duplicates) absents is a list containing the test found
 
3550
        in id_list but not in test_suite, duplicates is a list containing the
 
3551
        test found multiple times in test_suite.
 
3552
 
 
3553
    When using a prefined test id list, it may occurs that some tests do not
 
3554
    exist anymore or that some tests use the same id. This function warns the
 
3555
    tester about potential problems in his workflow (test lists are volatile)
 
3556
    or in the test suite itself (using the same id for several tests does not
 
3557
    help to localize defects).
 
3558
    """
 
3559
    # Build a dict counting id occurrences
 
3560
    tests = dict()
 
3561
    for test in iter_suite_tests(test_suite):
 
3562
        id = test.id()
 
3563
        tests[id] = tests.get(id, 0) + 1
 
3564
 
 
3565
    not_found = []
 
3566
    duplicates = []
 
3567
    for id in id_list:
 
3568
        occurs = tests.get(id, 0)
 
3569
        if not occurs:
 
3570
            not_found.append(id)
 
3571
        elif occurs > 1:
 
3572
            duplicates.append(id)
 
3573
 
 
3574
    return not_found, duplicates
 
3575
 
 
3576
 
 
3577
class TestIdList(object):
 
3578
    """Test id list to filter a test suite.
 
3579
 
 
3580
    Relying on the assumption that test ids are built as:
 
3581
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3582
    notation, this class offers methods to :
 
3583
    - avoid building a test suite for modules not refered to in the test list,
 
3584
    - keep only the tests listed from the module test suite.
 
3585
    """
 
3586
 
 
3587
    def __init__(self, test_id_list):
 
3588
        # When a test suite needs to be filtered against us we compare test ids
 
3589
        # for equality, so a simple dict offers a quick and simple solution.
 
3590
        self.tests = dict().fromkeys(test_id_list, True)
 
3591
 
 
3592
        # While unittest.TestCase have ids like:
 
3593
        # <module>.<class>.<method>[(<param+)],
 
3594
        # doctest.DocTestCase can have ids like:
 
3595
        # <module>
 
3596
        # <module>.<class>
 
3597
        # <module>.<function>
 
3598
        # <module>.<class>.<method>
 
3599
 
 
3600
        # Since we can't predict a test class from its name only, we settle on
 
3601
        # a simple constraint: a test id always begins with its module name.
 
3602
 
 
3603
        modules = {}
 
3604
        for test_id in test_id_list:
 
3605
            parts = test_id.split('.')
 
3606
            mod_name = parts.pop(0)
 
3607
            modules[mod_name] = True
 
3608
            for part in parts:
 
3609
                mod_name += '.' + part
 
3610
                modules[mod_name] = True
 
3611
        self.modules = modules
 
3612
 
 
3613
    def refers_to(self, module_name):
 
3614
        """Is there tests for the module or one of its sub modules."""
 
3615
        return self.modules.has_key(module_name)
 
3616
 
 
3617
    def includes(self, test_id):
 
3618
        return self.tests.has_key(test_id)
 
3619
 
 
3620
 
 
3621
class TestPrefixAliasRegistry(registry.Registry):
 
3622
    """A registry for test prefix aliases.
 
3623
 
 
3624
    This helps implement shorcuts for the --starting-with selftest
 
3625
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3626
    warning will be emitted).
 
3627
    """
 
3628
 
 
3629
    def register(self, key, obj, help=None, info=None,
 
3630
                 override_existing=False):
 
3631
        """See Registry.register.
 
3632
 
 
3633
        Trying to override an existing alias causes a warning to be emitted,
 
3634
        not a fatal execption.
 
3635
        """
 
3636
        try:
 
3637
            super(TestPrefixAliasRegistry, self).register(
 
3638
                key, obj, help=help, info=info, override_existing=False)
 
3639
        except KeyError:
 
3640
            actual = self.get(key)
 
3641
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3642
                 % (key, actual, obj))
 
3643
 
 
3644
    def resolve_alias(self, id_start):
 
3645
        """Replace the alias by the prefix in the given string.
 
3646
 
 
3647
        Using an unknown prefix is an error to help catching typos.
 
3648
        """
 
3649
        parts = id_start.split('.')
 
3650
        try:
 
3651
            parts[0] = self.get(parts[0])
 
3652
        except KeyError:
 
3653
            raise errors.BzrCommandError(
 
3654
                '%s is not a known test prefix alias' % parts[0])
 
3655
        return '.'.join(parts)
 
3656
 
 
3657
 
 
3658
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3659
"""Registry of test prefix aliases."""
 
3660
 
 
3661
 
 
3662
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3663
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3664
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3665
 
 
3666
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3667
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3668
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3669
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3670
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3671
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3672
 
 
3673
 
 
3674
def _test_suite_testmod_names():
 
3675
    """Return the standard list of test module names to test."""
 
3676
    return [
 
3677
        'bzrlib.doc',
 
3678
        'bzrlib.tests.blackbox',
 
3679
        'bzrlib.tests.commands',
 
3680
        'bzrlib.tests.per_branch',
 
3681
        'bzrlib.tests.per_bzrdir',
 
3682
        'bzrlib.tests.per_interrepository',
 
3683
        'bzrlib.tests.per_intertree',
 
3684
        'bzrlib.tests.per_inventory',
 
3685
        'bzrlib.tests.per_interbranch',
 
3686
        'bzrlib.tests.per_lock',
 
3687
        'bzrlib.tests.per_transport',
 
3688
        'bzrlib.tests.per_tree',
 
3689
        'bzrlib.tests.per_pack_repository',
 
3690
        'bzrlib.tests.per_repository',
 
3691
        'bzrlib.tests.per_repository_chk',
 
3692
        'bzrlib.tests.per_repository_reference',
 
3693
        'bzrlib.tests.per_uifactory',
 
3694
        'bzrlib.tests.per_versionedfile',
 
3695
        'bzrlib.tests.per_workingtree',
 
3696
        'bzrlib.tests.test__annotator',
 
3697
        'bzrlib.tests.test__chk_map',
 
3698
        'bzrlib.tests.test__dirstate_helpers',
 
3699
        'bzrlib.tests.test__groupcompress',
 
3700
        'bzrlib.tests.test__known_graph',
 
3701
        'bzrlib.tests.test__rio',
 
3702
        'bzrlib.tests.test__simple_set',
 
3703
        'bzrlib.tests.test__static_tuple',
 
3704
        'bzrlib.tests.test__walkdirs_win32',
 
3705
        'bzrlib.tests.test_ancestry',
 
3706
        'bzrlib.tests.test_annotate',
 
3707
        'bzrlib.tests.test_api',
 
3708
        'bzrlib.tests.test_atomicfile',
 
3709
        'bzrlib.tests.test_bad_files',
 
3710
        'bzrlib.tests.test_bencode',
 
3711
        'bzrlib.tests.test_bisect_multi',
 
3712
        'bzrlib.tests.test_branch',
 
3713
        'bzrlib.tests.test_branchbuilder',
 
3714
        'bzrlib.tests.test_btree_index',
 
3715
        'bzrlib.tests.test_bugtracker',
 
3716
        'bzrlib.tests.test_bundle',
 
3717
        'bzrlib.tests.test_bzrdir',
 
3718
        'bzrlib.tests.test__chunks_to_lines',
 
3719
        'bzrlib.tests.test_cache_utf8',
 
3720
        'bzrlib.tests.test_chk_map',
 
3721
        'bzrlib.tests.test_chk_serializer',
 
3722
        'bzrlib.tests.test_chunk_writer',
 
3723
        'bzrlib.tests.test_clean_tree',
 
3724
        'bzrlib.tests.test_commands',
 
3725
        'bzrlib.tests.test_commit',
 
3726
        'bzrlib.tests.test_commit_merge',
 
3727
        'bzrlib.tests.test_config',
 
3728
        'bzrlib.tests.test_conflicts',
 
3729
        'bzrlib.tests.test_counted_lock',
 
3730
        'bzrlib.tests.test_crash',
 
3731
        'bzrlib.tests.test_decorators',
 
3732
        'bzrlib.tests.test_delta',
 
3733
        'bzrlib.tests.test_debug',
 
3734
        'bzrlib.tests.test_deprecated_graph',
 
3735
        'bzrlib.tests.test_diff',
 
3736
        'bzrlib.tests.test_directory_service',
 
3737
        'bzrlib.tests.test_dirstate',
 
3738
        'bzrlib.tests.test_email_message',
 
3739
        'bzrlib.tests.test_eol_filters',
 
3740
        'bzrlib.tests.test_errors',
 
3741
        'bzrlib.tests.test_export',
 
3742
        'bzrlib.tests.test_extract',
 
3743
        'bzrlib.tests.test_fetch',
 
3744
        'bzrlib.tests.test_fifo_cache',
 
3745
        'bzrlib.tests.test_filters',
 
3746
        'bzrlib.tests.test_ftp_transport',
 
3747
        'bzrlib.tests.test_foreign',
 
3748
        'bzrlib.tests.test_generate_docs',
 
3749
        'bzrlib.tests.test_generate_ids',
 
3750
        'bzrlib.tests.test_globbing',
 
3751
        'bzrlib.tests.test_gpg',
 
3752
        'bzrlib.tests.test_graph',
 
3753
        'bzrlib.tests.test_groupcompress',
 
3754
        'bzrlib.tests.test_hashcache',
 
3755
        'bzrlib.tests.test_help',
 
3756
        'bzrlib.tests.test_hooks',
 
3757
        'bzrlib.tests.test_http',
 
3758
        'bzrlib.tests.test_http_response',
 
3759
        'bzrlib.tests.test_https_ca_bundle',
 
3760
        'bzrlib.tests.test_identitymap',
 
3761
        'bzrlib.tests.test_ignores',
 
3762
        'bzrlib.tests.test_index',
 
3763
        'bzrlib.tests.test_info',
 
3764
        'bzrlib.tests.test_inv',
 
3765
        'bzrlib.tests.test_inventory_delta',
 
3766
        'bzrlib.tests.test_knit',
 
3767
        'bzrlib.tests.test_lazy_import',
 
3768
        'bzrlib.tests.test_lazy_regex',
 
3769
        'bzrlib.tests.test_lock',
 
3770
        'bzrlib.tests.test_lockable_files',
 
3771
        'bzrlib.tests.test_lockdir',
 
3772
        'bzrlib.tests.test_log',
 
3773
        'bzrlib.tests.test_lru_cache',
 
3774
        'bzrlib.tests.test_lsprof',
 
3775
        'bzrlib.tests.test_mail_client',
 
3776
        'bzrlib.tests.test_memorytree',
 
3777
        'bzrlib.tests.test_merge',
 
3778
        'bzrlib.tests.test_merge3',
 
3779
        'bzrlib.tests.test_merge_core',
 
3780
        'bzrlib.tests.test_merge_directive',
 
3781
        'bzrlib.tests.test_missing',
 
3782
        'bzrlib.tests.test_msgeditor',
 
3783
        'bzrlib.tests.test_multiparent',
 
3784
        'bzrlib.tests.test_mutabletree',
 
3785
        'bzrlib.tests.test_nonascii',
 
3786
        'bzrlib.tests.test_options',
 
3787
        'bzrlib.tests.test_osutils',
 
3788
        'bzrlib.tests.test_osutils_encodings',
 
3789
        'bzrlib.tests.test_pack',
 
3790
        'bzrlib.tests.test_patch',
 
3791
        'bzrlib.tests.test_patches',
 
3792
        'bzrlib.tests.test_permissions',
 
3793
        'bzrlib.tests.test_plugins',
 
3794
        'bzrlib.tests.test_progress',
 
3795
        'bzrlib.tests.test_read_bundle',
 
3796
        'bzrlib.tests.test_reconcile',
 
3797
        'bzrlib.tests.test_reconfigure',
 
3798
        'bzrlib.tests.test_registry',
 
3799
        'bzrlib.tests.test_remote',
 
3800
        'bzrlib.tests.test_rename_map',
 
3801
        'bzrlib.tests.test_repository',
 
3802
        'bzrlib.tests.test_revert',
 
3803
        'bzrlib.tests.test_revision',
 
3804
        'bzrlib.tests.test_revisionspec',
 
3805
        'bzrlib.tests.test_revisiontree',
 
3806
        'bzrlib.tests.test_rio',
 
3807
        'bzrlib.tests.test_rules',
 
3808
        'bzrlib.tests.test_sampler',
 
3809
        'bzrlib.tests.test_script',
 
3810
        'bzrlib.tests.test_selftest',
 
3811
        'bzrlib.tests.test_serializer',
 
3812
        'bzrlib.tests.test_setup',
 
3813
        'bzrlib.tests.test_sftp_transport',
 
3814
        'bzrlib.tests.test_shelf',
 
3815
        'bzrlib.tests.test_shelf_ui',
 
3816
        'bzrlib.tests.test_smart',
 
3817
        'bzrlib.tests.test_smart_add',
 
3818
        'bzrlib.tests.test_smart_request',
 
3819
        'bzrlib.tests.test_smart_transport',
 
3820
        'bzrlib.tests.test_smtp_connection',
 
3821
        'bzrlib.tests.test_source',
 
3822
        'bzrlib.tests.test_ssh_transport',
 
3823
        'bzrlib.tests.test_status',
 
3824
        'bzrlib.tests.test_store',
 
3825
        'bzrlib.tests.test_strace',
 
3826
        'bzrlib.tests.test_subsume',
 
3827
        'bzrlib.tests.test_switch',
 
3828
        'bzrlib.tests.test_symbol_versioning',
 
3829
        'bzrlib.tests.test_tag',
 
3830
        'bzrlib.tests.test_testament',
 
3831
        'bzrlib.tests.test_textfile',
 
3832
        'bzrlib.tests.test_textmerge',
 
3833
        'bzrlib.tests.test_timestamp',
 
3834
        'bzrlib.tests.test_trace',
 
3835
        'bzrlib.tests.test_transactions',
 
3836
        'bzrlib.tests.test_transform',
 
3837
        'bzrlib.tests.test_transport',
 
3838
        'bzrlib.tests.test_transport_log',
 
3839
        'bzrlib.tests.test_tree',
 
3840
        'bzrlib.tests.test_treebuilder',
 
3841
        'bzrlib.tests.test_tsort',
 
3842
        'bzrlib.tests.test_tuned_gzip',
 
3843
        'bzrlib.tests.test_ui',
 
3844
        'bzrlib.tests.test_uncommit',
 
3845
        'bzrlib.tests.test_upgrade',
 
3846
        'bzrlib.tests.test_upgrade_stacked',
 
3847
        'bzrlib.tests.test_urlutils',
 
3848
        'bzrlib.tests.test_version',
 
3849
        'bzrlib.tests.test_version_info',
 
3850
        'bzrlib.tests.test_weave',
 
3851
        'bzrlib.tests.test_whitebox',
 
3852
        'bzrlib.tests.test_win32utils',
 
3853
        'bzrlib.tests.test_workingtree',
 
3854
        'bzrlib.tests.test_workingtree_4',
 
3855
        'bzrlib.tests.test_wsgi',
 
3856
        'bzrlib.tests.test_xml',
 
3857
        ]
 
3858
 
 
3859
 
 
3860
def _test_suite_modules_to_doctest():
 
3861
    """Return the list of modules to doctest."""   
 
3862
    return [
 
3863
        'bzrlib',
 
3864
        'bzrlib.branchbuilder',
 
3865
        'bzrlib.export',
 
3866
        'bzrlib.inventory',
 
3867
        'bzrlib.iterablefile',
 
3868
        'bzrlib.lockdir',
 
3869
        'bzrlib.merge3',
 
3870
        'bzrlib.option',
 
3871
        'bzrlib.symbol_versioning',
 
3872
        'bzrlib.tests',
 
3873
        'bzrlib.timestamp',
 
3874
        'bzrlib.version_info_formats.format_custom',
 
3875
        ]
 
3876
 
 
3877
 
 
3878
def test_suite(keep_only=None, starting_with=None):
 
3879
    """Build and return TestSuite for the whole of bzrlib.
 
3880
 
 
3881
    :param keep_only: A list of test ids limiting the suite returned.
 
3882
 
 
3883
    :param starting_with: An id limiting the suite returned to the tests
 
3884
         starting with it.
 
3885
 
 
3886
    This function can be replaced if you need to change the default test
 
3887
    suite on a global basis, but it is not encouraged.
 
3888
    """
 
3889
 
 
3890
    loader = TestUtil.TestLoader()
 
3891
 
 
3892
    if keep_only is not None:
 
3893
        id_filter = TestIdList(keep_only)
 
3894
    if starting_with:
 
3895
        # We take precedence over keep_only because *at loading time* using
 
3896
        # both options means we will load less tests for the same final result.
 
3897
        def interesting_module(name):
 
3898
            for start in starting_with:
 
3899
                if (
 
3900
                    # Either the module name starts with the specified string
 
3901
                    name.startswith(start)
 
3902
                    # or it may contain tests starting with the specified string
 
3903
                    or start.startswith(name)
 
3904
                    ):
 
3905
                    return True
 
3906
            return False
 
3907
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3908
 
 
3909
    elif keep_only is not None:
 
3910
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3911
        def interesting_module(name):
 
3912
            return id_filter.refers_to(name)
 
3913
 
 
3914
    else:
 
3915
        loader = TestUtil.TestLoader()
 
3916
        def interesting_module(name):
 
3917
            # No filtering, all modules are interesting
 
3918
            return True
 
3919
 
 
3920
    suite = loader.suiteClass()
 
3921
 
 
3922
    # modules building their suite with loadTestsFromModuleNames
 
3923
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
3924
 
 
3925
    for mod in _test_suite_modules_to_doctest():
 
3926
        if not interesting_module(mod):
 
3927
            # No tests to keep here, move along
 
3928
            continue
 
3929
        try:
 
3930
            # note that this really does mean "report only" -- doctest
 
3931
            # still runs the rest of the examples
 
3932
            doc_suite = doctest.DocTestSuite(mod,
 
3933
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3934
        except ValueError, e:
 
3935
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3936
            raise
 
3937
        if len(doc_suite._tests) == 0:
 
3938
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3939
        suite.addTest(doc_suite)
 
3940
 
 
3941
    default_encoding = sys.getdefaultencoding()
 
3942
    for name, plugin in bzrlib.plugin.plugins().items():
 
3943
        if not interesting_module(plugin.module.__name__):
 
3944
            continue
 
3945
        plugin_suite = plugin.test_suite()
 
3946
        # We used to catch ImportError here and turn it into just a warning,
 
3947
        # but really if you don't have --no-plugins this should be a failure.
 
3948
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3949
        if plugin_suite is None:
 
3950
            plugin_suite = plugin.load_plugin_tests(loader)
 
3951
        if plugin_suite is not None:
 
3952
            suite.addTest(plugin_suite)
 
3953
        if default_encoding != sys.getdefaultencoding():
 
3954
            bzrlib.trace.warning(
 
3955
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3956
                sys.getdefaultencoding())
 
3957
            reload(sys)
 
3958
            sys.setdefaultencoding(default_encoding)
 
3959
 
 
3960
    if keep_only is not None:
 
3961
        # Now that the referred modules have loaded their tests, keep only the
 
3962
        # requested ones.
 
3963
        suite = filter_suite_by_id_list(suite, id_filter)
 
3964
        # Do some sanity checks on the id_list filtering
 
3965
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3966
        if starting_with:
 
3967
            # The tester has used both keep_only and starting_with, so he is
 
3968
            # already aware that some tests are excluded from the list, there
 
3969
            # is no need to tell him which.
 
3970
            pass
 
3971
        else:
 
3972
            # Some tests mentioned in the list are not in the test suite. The
 
3973
            # list may be out of date, report to the tester.
 
3974
            for id in not_found:
 
3975
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3976
        for id in duplicates:
 
3977
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3978
 
 
3979
    return suite
 
3980
 
 
3981
 
 
3982
def multiply_scenarios(scenarios_left, scenarios_right):
 
3983
    """Multiply two sets of scenarios.
 
3984
 
 
3985
    :returns: the cartesian product of the two sets of scenarios, that is
 
3986
        a scenario for every possible combination of a left scenario and a
 
3987
        right scenario.
 
3988
    """
 
3989
    return [
 
3990
        ('%s,%s' % (left_name, right_name),
 
3991
         dict(left_dict.items() + right_dict.items()))
 
3992
        for left_name, left_dict in scenarios_left
 
3993
        for right_name, right_dict in scenarios_right]
 
3994
 
 
3995
 
 
3996
def multiply_tests(tests, scenarios, result):
 
3997
    """Multiply tests_list by scenarios into result.
 
3998
 
 
3999
    This is the core workhorse for test parameterisation.
 
4000
 
 
4001
    Typically the load_tests() method for a per-implementation test suite will
 
4002
    call multiply_tests and return the result.
 
4003
 
 
4004
    :param tests: The tests to parameterise.
 
4005
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4006
        scenario_param_dict).
 
4007
    :param result: A TestSuite to add created tests to.
 
4008
 
 
4009
    This returns the passed in result TestSuite with the cross product of all
 
4010
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4011
    the scenario name at the end of its id(), and updating the test object's
 
4012
    __dict__ with the scenario_param_dict.
 
4013
 
 
4014
    >>> import bzrlib.tests.test_sampler
 
4015
    >>> r = multiply_tests(
 
4016
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
4017
    ...     [('one', dict(param=1)),
 
4018
    ...      ('two', dict(param=2))],
 
4019
    ...     TestSuite())
 
4020
    >>> tests = list(iter_suite_tests(r))
 
4021
    >>> len(tests)
 
4022
    2
 
4023
    >>> tests[0].id()
 
4024
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
4025
    >>> tests[0].param
 
4026
    1
 
4027
    >>> tests[1].param
 
4028
    2
 
4029
    """
 
4030
    for test in iter_suite_tests(tests):
 
4031
        apply_scenarios(test, scenarios, result)
 
4032
    return result
 
4033
 
 
4034
 
 
4035
def apply_scenarios(test, scenarios, result):
 
4036
    """Apply the scenarios in scenarios to test and add to result.
 
4037
 
 
4038
    :param test: The test to apply scenarios to.
 
4039
    :param scenarios: An iterable of scenarios to apply to test.
 
4040
    :return: result
 
4041
    :seealso: apply_scenario
 
4042
    """
 
4043
    for scenario in scenarios:
 
4044
        result.addTest(apply_scenario(test, scenario))
 
4045
    return result
 
4046
 
 
4047
 
 
4048
def apply_scenario(test, scenario):
 
4049
    """Copy test and apply scenario to it.
 
4050
 
 
4051
    :param test: A test to adapt.
 
4052
    :param scenario: A tuple describing the scenarion.
 
4053
        The first element of the tuple is the new test id.
 
4054
        The second element is a dict containing attributes to set on the
 
4055
        test.
 
4056
    :return: The adapted test.
 
4057
    """
 
4058
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4059
    new_test = clone_test(test, new_id)
 
4060
    for name, value in scenario[1].items():
 
4061
        setattr(new_test, name, value)
 
4062
    return new_test
 
4063
 
 
4064
 
 
4065
def clone_test(test, new_id):
 
4066
    """Clone a test giving it a new id.
 
4067
 
 
4068
    :param test: The test to clone.
 
4069
    :param new_id: The id to assign to it.
 
4070
    :return: The new test.
 
4071
    """
 
4072
    new_test = copy(test)
 
4073
    new_test.id = lambda: new_id
 
4074
    return new_test
 
4075
 
 
4076
 
 
4077
def _rmtree_temp_dir(dirname):
 
4078
    # If LANG=C we probably have created some bogus paths
 
4079
    # which rmtree(unicode) will fail to delete
 
4080
    # so make sure we are using rmtree(str) to delete everything
 
4081
    # except on win32, where rmtree(str) will fail
 
4082
    # since it doesn't have the property of byte-stream paths
 
4083
    # (they are either ascii or mbcs)
 
4084
    if sys.platform == 'win32':
 
4085
        # make sure we are using the unicode win32 api
 
4086
        dirname = unicode(dirname)
 
4087
    else:
 
4088
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4089
    try:
 
4090
        osutils.rmtree(dirname)
 
4091
    except OSError, e:
 
4092
        # We don't want to fail here because some useful display will be lost
 
4093
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4094
        # possible info to the test runner is even worse.
 
4095
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4096
                         % (os.path.basename(dirname), e))
 
4097
 
 
4098
 
 
4099
class Feature(object):
 
4100
    """An operating system Feature."""
 
4101
 
 
4102
    def __init__(self):
 
4103
        self._available = None
 
4104
 
 
4105
    def available(self):
 
4106
        """Is the feature available?
 
4107
 
 
4108
        :return: True if the feature is available.
 
4109
        """
 
4110
        if self._available is None:
 
4111
            self._available = self._probe()
 
4112
        return self._available
 
4113
 
 
4114
    def _probe(self):
 
4115
        """Implement this method in concrete features.
 
4116
 
 
4117
        :return: True if the feature is available.
 
4118
        """
 
4119
        raise NotImplementedError
 
4120
 
 
4121
    def __str__(self):
 
4122
        if getattr(self, 'feature_name', None):
 
4123
            return self.feature_name()
 
4124
        return self.__class__.__name__
 
4125
 
 
4126
 
 
4127
class _SymlinkFeature(Feature):
 
4128
 
 
4129
    def _probe(self):
 
4130
        return osutils.has_symlinks()
 
4131
 
 
4132
    def feature_name(self):
 
4133
        return 'symlinks'
 
4134
 
 
4135
SymlinkFeature = _SymlinkFeature()
 
4136
 
 
4137
 
 
4138
class _HardlinkFeature(Feature):
 
4139
 
 
4140
    def _probe(self):
 
4141
        return osutils.has_hardlinks()
 
4142
 
 
4143
    def feature_name(self):
 
4144
        return 'hardlinks'
 
4145
 
 
4146
HardlinkFeature = _HardlinkFeature()
 
4147
 
 
4148
 
 
4149
class _OsFifoFeature(Feature):
 
4150
 
 
4151
    def _probe(self):
 
4152
        return getattr(os, 'mkfifo', None)
 
4153
 
 
4154
    def feature_name(self):
 
4155
        return 'filesystem fifos'
 
4156
 
 
4157
OsFifoFeature = _OsFifoFeature()
 
4158
 
 
4159
 
 
4160
class _UnicodeFilenameFeature(Feature):
 
4161
    """Does the filesystem support Unicode filenames?"""
 
4162
 
 
4163
    def _probe(self):
 
4164
        try:
 
4165
            # Check for character combinations unlikely to be covered by any
 
4166
            # single non-unicode encoding. We use the characters
 
4167
            # - greek small letter alpha (U+03B1) and
 
4168
            # - braille pattern dots-123456 (U+283F).
 
4169
            os.stat(u'\u03b1\u283f')
 
4170
        except UnicodeEncodeError:
 
4171
            return False
 
4172
        except (IOError, OSError):
 
4173
            # The filesystem allows the Unicode filename but the file doesn't
 
4174
            # exist.
 
4175
            return True
 
4176
        else:
 
4177
            # The filesystem allows the Unicode filename and the file exists,
 
4178
            # for some reason.
 
4179
            return True
 
4180
 
 
4181
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4182
 
 
4183
 
 
4184
def probe_unicode_in_user_encoding():
 
4185
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4186
    Return first successfull match.
 
4187
 
 
4188
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4189
    """
 
4190
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4191
    for uni_val in possible_vals:
 
4192
        try:
 
4193
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4194
        except UnicodeEncodeError:
 
4195
            # Try a different character
 
4196
            pass
 
4197
        else:
 
4198
            return uni_val, str_val
 
4199
    return None, None
 
4200
 
 
4201
 
 
4202
def probe_bad_non_ascii(encoding):
 
4203
    """Try to find [bad] character with code [128..255]
 
4204
    that cannot be decoded to unicode in some encoding.
 
4205
    Return None if all non-ascii characters is valid
 
4206
    for given encoding.
 
4207
    """
 
4208
    for i in xrange(128, 256):
 
4209
        char = chr(i)
 
4210
        try:
 
4211
            char.decode(encoding)
 
4212
        except UnicodeDecodeError:
 
4213
            return char
 
4214
    return None
 
4215
 
 
4216
 
 
4217
class _HTTPSServerFeature(Feature):
 
4218
    """Some tests want an https Server, check if one is available.
 
4219
 
 
4220
    Right now, the only way this is available is under python2.6 which provides
 
4221
    an ssl module.
 
4222
    """
 
4223
 
 
4224
    def _probe(self):
 
4225
        try:
 
4226
            import ssl
 
4227
            return True
 
4228
        except ImportError:
 
4229
            return False
 
4230
 
 
4231
    def feature_name(self):
 
4232
        return 'HTTPSServer'
 
4233
 
 
4234
 
 
4235
HTTPSServerFeature = _HTTPSServerFeature()
 
4236
 
 
4237
 
 
4238
class _ParamikoFeature(Feature):
 
4239
    """Is paramiko available?"""
 
4240
 
 
4241
    def _probe(self):
 
4242
        try:
 
4243
            from bzrlib.transport.sftp import SFTPAbsoluteServer
 
4244
            return True
 
4245
        except errors.ParamikoNotPresent:
 
4246
            return False
 
4247
 
 
4248
    def feature_name(self):
 
4249
        return "Paramiko"
 
4250
 
 
4251
 
 
4252
ParamikoFeature = _ParamikoFeature()
 
4253
 
 
4254
 
 
4255
class _UnicodeFilename(Feature):
 
4256
    """Does the filesystem support Unicode filenames?"""
 
4257
 
 
4258
    def _probe(self):
 
4259
        try:
 
4260
            os.stat(u'\u03b1')
 
4261
        except UnicodeEncodeError:
 
4262
            return False
 
4263
        except (IOError, OSError):
 
4264
            # The filesystem allows the Unicode filename but the file doesn't
 
4265
            # exist.
 
4266
            return True
 
4267
        else:
 
4268
            # The filesystem allows the Unicode filename and the file exists,
 
4269
            # for some reason.
 
4270
            return True
 
4271
 
 
4272
UnicodeFilename = _UnicodeFilename()
 
4273
 
 
4274
 
 
4275
class _UTF8Filesystem(Feature):
 
4276
    """Is the filesystem UTF-8?"""
 
4277
 
 
4278
    def _probe(self):
 
4279
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4280
            return True
 
4281
        return False
 
4282
 
 
4283
UTF8Filesystem = _UTF8Filesystem()
 
4284
 
 
4285
 
 
4286
class _BreakinFeature(Feature):
 
4287
    """Does this platform support the breakin feature?"""
 
4288
 
 
4289
    def _probe(self):
 
4290
        from bzrlib import breakin
 
4291
        if breakin.determine_signal() is None:
 
4292
            return False
 
4293
        if sys.platform == 'win32':
 
4294
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4295
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4296
            # access the function
 
4297
            if not have_ctypes:
 
4298
                return False
 
4299
        return True
 
4300
 
 
4301
    def feature_name(self):
 
4302
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4303
 
 
4304
 
 
4305
BreakinFeature = _BreakinFeature()
 
4306
 
 
4307
 
 
4308
class _CaseInsCasePresFilenameFeature(Feature):
 
4309
    """Is the file-system case insensitive, but case-preserving?"""
 
4310
 
 
4311
    def _probe(self):
 
4312
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4313
        try:
 
4314
            # first check truly case-preserving for created files, then check
 
4315
            # case insensitive when opening existing files.
 
4316
            name = osutils.normpath(name)
 
4317
            base, rel = osutils.split(name)
 
4318
            found_rel = osutils.canonical_relpath(base, name)
 
4319
            return (found_rel == rel
 
4320
                    and os.path.isfile(name.upper())
 
4321
                    and os.path.isfile(name.lower()))
 
4322
        finally:
 
4323
            os.close(fileno)
 
4324
            os.remove(name)
 
4325
 
 
4326
    def feature_name(self):
 
4327
        return "case-insensitive case-preserving filesystem"
 
4328
 
 
4329
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4330
 
 
4331
 
 
4332
class _CaseInsensitiveFilesystemFeature(Feature):
 
4333
    """Check if underlying filesystem is case-insensitive but *not* case
 
4334
    preserving.
 
4335
    """
 
4336
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4337
    # more likely to be case preserving, so this case is rare.
 
4338
 
 
4339
    def _probe(self):
 
4340
        if CaseInsCasePresFilenameFeature.available():
 
4341
            return False
 
4342
 
 
4343
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4344
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4345
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4346
        else:
 
4347
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4348
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4349
            dir=root)
 
4350
        name_a = osutils.pathjoin(tdir, 'a')
 
4351
        name_A = osutils.pathjoin(tdir, 'A')
 
4352
        os.mkdir(name_a)
 
4353
        result = osutils.isdir(name_A)
 
4354
        _rmtree_temp_dir(tdir)
 
4355
        return result
 
4356
 
 
4357
    def feature_name(self):
 
4358
        return 'case-insensitive filesystem'
 
4359
 
 
4360
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4361
 
 
4362
 
 
4363
class _SubUnitFeature(Feature):
 
4364
    """Check if subunit is available."""
 
4365
 
 
4366
    def _probe(self):
 
4367
        try:
 
4368
            import subunit
 
4369
            return True
 
4370
        except ImportError:
 
4371
            return False
 
4372
 
 
4373
    def feature_name(self):
 
4374
        return 'subunit'
 
4375
 
 
4376
SubUnitFeature = _SubUnitFeature()
 
4377
# Only define SubUnitBzrRunner if subunit is available.
 
4378
try:
 
4379
    from subunit import TestProtocolClient
 
4380
    try:
 
4381
        from subunit.test_results import AutoTimingTestResultDecorator
 
4382
    except ImportError:
 
4383
        AutoTimingTestResultDecorator = lambda x:x
 
4384
    class SubUnitBzrRunner(TextTestRunner):
 
4385
        def run(self, test):
 
4386
            result = AutoTimingTestResultDecorator(
 
4387
                TestProtocolClient(self.stream))
 
4388
            test.run(result)
 
4389
            return result
 
4390
except ImportError:
 
4391
    pass