/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Benoît Pierre
  • Date: 2009-11-02 21:56:42 UTC
  • mto: (4603.1.22 shelve-editor)
  • mto: This revision was merged to the branch mainline in revision 4795.
  • Revision ID: benoit.pierre@gmail.com-20091102215642-t5fforovkpj3zqla
Lock the work tree in Shelver only after everything else has been setup.

This avoid having to use break-lock if a crash occurs during initialization,
for example due to an invalid 'change_editor' entry in ~/.bazaar/bazaar.conf.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from copy import copy
 
32
from cStringIO import StringIO
 
33
import difflib
 
34
import doctest
 
35
import errno
 
36
import logging
 
37
import math
 
38
import os
 
39
from pprint import pformat
 
40
import random
 
41
import re
 
42
import shlex
 
43
import stat
 
44
from subprocess import Popen, PIPE, STDOUT
 
45
import sys
 
46
import tempfile
 
47
import threading
 
48
import time
 
49
import unittest
 
50
import warnings
 
51
 
 
52
 
 
53
from bzrlib import (
 
54
    branchbuilder,
 
55
    bzrdir,
 
56
    chk_map,
 
57
    config,
 
58
    debug,
 
59
    errors,
 
60
    hooks,
 
61
    lock as _mod_lock,
 
62
    memorytree,
 
63
    osutils,
 
64
    progress,
 
65
    ui,
 
66
    urlutils,
 
67
    registry,
 
68
    workingtree,
 
69
    )
 
70
import bzrlib.branch
 
71
import bzrlib.commands
 
72
import bzrlib.timestamp
 
73
import bzrlib.export
 
74
import bzrlib.inventory
 
75
import bzrlib.iterablefile
 
76
import bzrlib.lockdir
 
77
try:
 
78
    import bzrlib.lsprof
 
79
except ImportError:
 
80
    # lsprof not available
 
81
    pass
 
82
from bzrlib.merge import merge_inner
 
83
import bzrlib.merge3
 
84
import bzrlib.plugin
 
85
from bzrlib.smart import client, request, server
 
86
import bzrlib.store
 
87
from bzrlib import symbol_versioning
 
88
from bzrlib.symbol_versioning import (
 
89
    DEPRECATED_PARAMETER,
 
90
    deprecated_function,
 
91
    deprecated_method,
 
92
    deprecated_passed,
 
93
    )
 
94
import bzrlib.trace
 
95
from bzrlib.transport import get_transport, pathfilter
 
96
import bzrlib.transport
 
97
from bzrlib.transport.local import LocalURLServer
 
98
from bzrlib.transport.memory import MemoryServer
 
99
from bzrlib.transport.readonly import ReadonlyServer
 
100
from bzrlib.trace import mutter, note
 
101
from bzrlib.tests import TestUtil
 
102
from bzrlib.tests.http_server import HttpServer
 
103
from bzrlib.tests.TestUtil import (
 
104
                          TestSuite,
 
105
                          TestLoader,
 
106
                          )
 
107
from bzrlib.tests.treeshape import build_tree_contents
 
108
from bzrlib.ui import NullProgressView
 
109
from bzrlib.ui.text import TextUIFactory
 
110
import bzrlib.version_info_formats.format_custom
 
111
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
112
 
 
113
# Mark this python module as being part of the implementation
 
114
# of unittest: this gives us better tracebacks where the last
 
115
# shown frame is the test code, not our assertXYZ.
 
116
__unittest = 1
 
117
 
 
118
default_transport = LocalURLServer
 
119
 
 
120
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
121
SUBUNIT_SEEK_SET = 0
 
122
SUBUNIT_SEEK_CUR = 1
 
123
 
 
124
 
 
125
class ExtendedTestResult(unittest._TextTestResult):
 
126
    """Accepts, reports and accumulates the results of running tests.
 
127
 
 
128
    Compared to the unittest version this class adds support for
 
129
    profiling, benchmarking, stopping as soon as a test fails,  and
 
130
    skipping tests.  There are further-specialized subclasses for
 
131
    different types of display.
 
132
 
 
133
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
134
    addFailure or addError classes.  These in turn may redirect to a more
 
135
    specific case for the special test results supported by our extended
 
136
    tests.
 
137
 
 
138
    Note that just one of these objects is fed the results from many tests.
 
139
    """
 
140
 
 
141
    stop_early = False
 
142
 
 
143
    def __init__(self, stream, descriptions, verbosity,
 
144
                 bench_history=None,
 
145
                 strict=False,
 
146
                 ):
 
147
        """Construct new TestResult.
 
148
 
 
149
        :param bench_history: Optionally, a writable file object to accumulate
 
150
            benchmark results.
 
151
        """
 
152
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
153
        if bench_history is not None:
 
154
            from bzrlib.version import _get_bzr_source_tree
 
155
            src_tree = _get_bzr_source_tree()
 
156
            if src_tree:
 
157
                try:
 
158
                    revision_id = src_tree.get_parent_ids()[0]
 
159
                except IndexError:
 
160
                    # XXX: if this is a brand new tree, do the same as if there
 
161
                    # is no branch.
 
162
                    revision_id = ''
 
163
            else:
 
164
                # XXX: If there's no branch, what should we do?
 
165
                revision_id = ''
 
166
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
167
        self._bench_history = bench_history
 
168
        self.ui = ui.ui_factory
 
169
        self.num_tests = 0
 
170
        self.error_count = 0
 
171
        self.failure_count = 0
 
172
        self.known_failure_count = 0
 
173
        self.skip_count = 0
 
174
        self.not_applicable_count = 0
 
175
        self.unsupported = {}
 
176
        self.count = 0
 
177
        self._overall_start_time = time.time()
 
178
        self._strict = strict
 
179
 
 
180
    def stopTestRun(self):
 
181
        run = self.testsRun
 
182
        actionTaken = "Ran"
 
183
        stopTime = time.time()
 
184
        timeTaken = stopTime - self.startTime
 
185
        self.printErrors()
 
186
        self.stream.writeln(self.separator2)
 
187
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
188
                            run, run != 1 and "s" or "", timeTaken))
 
189
        self.stream.writeln()
 
190
        if not self.wasSuccessful():
 
191
            self.stream.write("FAILED (")
 
192
            failed, errored = map(len, (self.failures, self.errors))
 
193
            if failed:
 
194
                self.stream.write("failures=%d" % failed)
 
195
            if errored:
 
196
                if failed: self.stream.write(", ")
 
197
                self.stream.write("errors=%d" % errored)
 
198
            if self.known_failure_count:
 
199
                if failed or errored: self.stream.write(", ")
 
200
                self.stream.write("known_failure_count=%d" %
 
201
                    self.known_failure_count)
 
202
            self.stream.writeln(")")
 
203
        else:
 
204
            if self.known_failure_count:
 
205
                self.stream.writeln("OK (known_failures=%d)" %
 
206
                    self.known_failure_count)
 
207
            else:
 
208
                self.stream.writeln("OK")
 
209
        if self.skip_count > 0:
 
210
            skipped = self.skip_count
 
211
            self.stream.writeln('%d test%s skipped' %
 
212
                                (skipped, skipped != 1 and "s" or ""))
 
213
        if self.unsupported:
 
214
            for feature, count in sorted(self.unsupported.items()):
 
215
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
216
                    (feature, count))
 
217
        if self._strict:
 
218
            ok = self.wasStrictlySuccessful()
 
219
        else:
 
220
            ok = self.wasSuccessful()
 
221
        if TestCase._first_thread_leaker_id:
 
222
            self.stream.write(
 
223
                '%s is leaking threads among %d leaking tests.\n' % (
 
224
                TestCase._first_thread_leaker_id,
 
225
                TestCase._leaking_threads_tests))
 
226
            # We don't report the main thread as an active one.
 
227
            self.stream.write(
 
228
                '%d non-main threads were left active in the end.\n'
 
229
                % (TestCase._active_threads - 1))
 
230
 
 
231
    def _extractBenchmarkTime(self, testCase):
 
232
        """Add a benchmark time for the current test case."""
 
233
        return getattr(testCase, "_benchtime", None)
 
234
 
 
235
    def _elapsedTestTimeString(self):
 
236
        """Return a time string for the overall time the current test has taken."""
 
237
        return self._formatTime(time.time() - self._start_time)
 
238
 
 
239
    def _testTimeString(self, testCase):
 
240
        benchmark_time = self._extractBenchmarkTime(testCase)
 
241
        if benchmark_time is not None:
 
242
            return self._formatTime(benchmark_time) + "*"
 
243
        else:
 
244
            return self._elapsedTestTimeString()
 
245
 
 
246
    def _formatTime(self, seconds):
 
247
        """Format seconds as milliseconds with leading spaces."""
 
248
        # some benchmarks can take thousands of seconds to run, so we need 8
 
249
        # places
 
250
        return "%8dms" % (1000 * seconds)
 
251
 
 
252
    def _shortened_test_description(self, test):
 
253
        what = test.id()
 
254
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
255
        return what
 
256
 
 
257
    def startTest(self, test):
 
258
        unittest.TestResult.startTest(self, test)
 
259
        if self.count == 0:
 
260
            self.startTests()
 
261
        self.report_test_start(test)
 
262
        test.number = self.count
 
263
        self._recordTestStartTime()
 
264
 
 
265
    def startTests(self):
 
266
        import platform
 
267
        if getattr(sys, 'frozen', None) is None:
 
268
            bzr_path = osutils.realpath(sys.argv[0])
 
269
        else:
 
270
            bzr_path = sys.executable
 
271
        self.stream.write(
 
272
            'testing: %s\n' % (bzr_path,))
 
273
        self.stream.write(
 
274
            '   %s\n' % (
 
275
                    bzrlib.__path__[0],))
 
276
        self.stream.write(
 
277
            '   bzr-%s python-%s %s\n' % (
 
278
                    bzrlib.version_string,
 
279
                    bzrlib._format_version_tuple(sys.version_info),
 
280
                    platform.platform(aliased=1),
 
281
                    ))
 
282
        self.stream.write('\n')
 
283
 
 
284
    def _recordTestStartTime(self):
 
285
        """Record that a test has started."""
 
286
        self._start_time = time.time()
 
287
 
 
288
    def _cleanupLogFile(self, test):
 
289
        # We can only do this if we have one of our TestCases, not if
 
290
        # we have a doctest.
 
291
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
292
        if setKeepLogfile is not None:
 
293
            setKeepLogfile()
 
294
 
 
295
    def addError(self, test, err):
 
296
        """Tell result that test finished with an error.
 
297
 
 
298
        Called from the TestCase run() method when the test
 
299
        fails with an unexpected error.
 
300
        """
 
301
        self._testConcluded(test)
 
302
        if isinstance(err[1], TestNotApplicable):
 
303
            return self._addNotApplicable(test, err)
 
304
        elif isinstance(err[1], UnavailableFeature):
 
305
            return self.addNotSupported(test, err[1].args[0])
 
306
        else:
 
307
            self._post_mortem()
 
308
            unittest.TestResult.addError(self, test, err)
 
309
            self.error_count += 1
 
310
            self.report_error(test, err)
 
311
            if self.stop_early:
 
312
                self.stop()
 
313
            self._cleanupLogFile(test)
 
314
 
 
315
    def addFailure(self, test, err):
 
316
        """Tell result that test failed.
 
317
 
 
318
        Called from the TestCase run() method when the test
 
319
        fails because e.g. an assert() method failed.
 
320
        """
 
321
        self._testConcluded(test)
 
322
        if isinstance(err[1], KnownFailure):
 
323
            return self._addKnownFailure(test, err)
 
324
        else:
 
325
            self._post_mortem()
 
326
            unittest.TestResult.addFailure(self, test, err)
 
327
            self.failure_count += 1
 
328
            self.report_failure(test, err)
 
329
            if self.stop_early:
 
330
                self.stop()
 
331
            self._cleanupLogFile(test)
 
332
 
 
333
    def addSuccess(self, test):
 
334
        """Tell result that test completed successfully.
 
335
 
 
336
        Called from the TestCase run()
 
337
        """
 
338
        self._testConcluded(test)
 
339
        if self._bench_history is not None:
 
340
            benchmark_time = self._extractBenchmarkTime(test)
 
341
            if benchmark_time is not None:
 
342
                self._bench_history.write("%s %s\n" % (
 
343
                    self._formatTime(benchmark_time),
 
344
                    test.id()))
 
345
        self.report_success(test)
 
346
        self._cleanupLogFile(test)
 
347
        unittest.TestResult.addSuccess(self, test)
 
348
        test._log_contents = ''
 
349
 
 
350
    def _testConcluded(self, test):
 
351
        """Common code when a test has finished.
 
352
 
 
353
        Called regardless of whether it succeded, failed, etc.
 
354
        """
 
355
        pass
 
356
 
 
357
    def _addKnownFailure(self, test, err):
 
358
        self.known_failure_count += 1
 
359
        self.report_known_failure(test, err)
 
360
 
 
361
    def addNotSupported(self, test, feature):
 
362
        """The test will not be run because of a missing feature.
 
363
        """
 
364
        # this can be called in two different ways: it may be that the
 
365
        # test started running, and then raised (through addError)
 
366
        # UnavailableFeature.  Alternatively this method can be called
 
367
        # while probing for features before running the tests; in that
 
368
        # case we will see startTest and stopTest, but the test will never
 
369
        # actually run.
 
370
        self.unsupported.setdefault(str(feature), 0)
 
371
        self.unsupported[str(feature)] += 1
 
372
        self.report_unsupported(test, feature)
 
373
 
 
374
    def addSkip(self, test, reason):
 
375
        """A test has not run for 'reason'."""
 
376
        self.skip_count += 1
 
377
        self.report_skip(test, reason)
 
378
 
 
379
    def _addNotApplicable(self, test, skip_excinfo):
 
380
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
381
            self.not_applicable_count += 1
 
382
            self.report_not_applicable(test, skip_excinfo)
 
383
        try:
 
384
            test.tearDown()
 
385
        except KeyboardInterrupt:
 
386
            raise
 
387
        except:
 
388
            self.addError(test, test.exc_info())
 
389
        else:
 
390
            # seems best to treat this as success from point-of-view of unittest
 
391
            # -- it actually does nothing so it barely matters :)
 
392
            unittest.TestResult.addSuccess(self, test)
 
393
            test._log_contents = ''
 
394
 
 
395
    def printErrorList(self, flavour, errors):
 
396
        for test, err in errors:
 
397
            self.stream.writeln(self.separator1)
 
398
            self.stream.write("%s: " % flavour)
 
399
            self.stream.writeln(self.getDescription(test))
 
400
            if getattr(test, '_get_log', None) is not None:
 
401
                log_contents = test._get_log()
 
402
                if log_contents:
 
403
                    self.stream.write('\n')
 
404
                    self.stream.write(
 
405
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
406
                    self.stream.write('\n')
 
407
                    self.stream.write(log_contents)
 
408
                    self.stream.write('\n')
 
409
                    self.stream.write(
 
410
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
411
                    self.stream.write('\n')
 
412
            self.stream.writeln(self.separator2)
 
413
            self.stream.writeln("%s" % err)
 
414
 
 
415
    def _post_mortem(self):
 
416
        """Start a PDB post mortem session."""
 
417
        if os.environ.get('BZR_TEST_PDB', None):
 
418
            import pdb;pdb.post_mortem()
 
419
 
 
420
    def progress(self, offset, whence):
 
421
        """The test is adjusting the count of tests to run."""
 
422
        if whence == SUBUNIT_SEEK_SET:
 
423
            self.num_tests = offset
 
424
        elif whence == SUBUNIT_SEEK_CUR:
 
425
            self.num_tests += offset
 
426
        else:
 
427
            raise errors.BzrError("Unknown whence %r" % whence)
 
428
 
 
429
    def report_cleaning_up(self):
 
430
        pass
 
431
 
 
432
    def startTestRun(self):
 
433
        self.startTime = time.time()
 
434
 
 
435
    def report_success(self, test):
 
436
        pass
 
437
 
 
438
    def wasStrictlySuccessful(self):
 
439
        if self.unsupported or self.known_failure_count:
 
440
            return False
 
441
        return self.wasSuccessful()
 
442
 
 
443
 
 
444
class TextTestResult(ExtendedTestResult):
 
445
    """Displays progress and results of tests in text form"""
 
446
 
 
447
    def __init__(self, stream, descriptions, verbosity,
 
448
                 bench_history=None,
 
449
                 pb=None,
 
450
                 strict=None,
 
451
                 ):
 
452
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
453
            bench_history, strict)
 
454
        # We no longer pass them around, but just rely on the UIFactory stack
 
455
        # for state
 
456
        if pb is not None:
 
457
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
458
        self.pb = self.ui.nested_progress_bar()
 
459
        self.pb.show_pct = False
 
460
        self.pb.show_spinner = False
 
461
        self.pb.show_eta = False,
 
462
        self.pb.show_count = False
 
463
        self.pb.show_bar = False
 
464
        self.pb.update_latency = 0
 
465
        self.pb.show_transport_activity = False
 
466
 
 
467
    def stopTestRun(self):
 
468
        # called when the tests that are going to run have run
 
469
        self.pb.clear()
 
470
        self.pb.finished()
 
471
        super(TextTestResult, self).stopTestRun()
 
472
 
 
473
    def startTestRun(self):
 
474
        super(TextTestResult, self).startTestRun()
 
475
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
476
 
 
477
    def printErrors(self):
 
478
        # clear the pb to make room for the error listing
 
479
        self.pb.clear()
 
480
        super(TextTestResult, self).printErrors()
 
481
 
 
482
    def _progress_prefix_text(self):
 
483
        # the longer this text, the less space we have to show the test
 
484
        # name...
 
485
        a = '[%d' % self.count              # total that have been run
 
486
        # tests skipped as known not to be relevant are not important enough
 
487
        # to show here
 
488
        ## if self.skip_count:
 
489
        ##     a += ', %d skip' % self.skip_count
 
490
        ## if self.known_failure_count:
 
491
        ##     a += '+%dX' % self.known_failure_count
 
492
        if self.num_tests:
 
493
            a +='/%d' % self.num_tests
 
494
        a += ' in '
 
495
        runtime = time.time() - self._overall_start_time
 
496
        if runtime >= 60:
 
497
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
498
        else:
 
499
            a += '%ds' % runtime
 
500
        if self.error_count:
 
501
            a += ', %d err' % self.error_count
 
502
        if self.failure_count:
 
503
            a += ', %d fail' % self.failure_count
 
504
        # if self.unsupported:
 
505
        #     a += ', %d missing' % len(self.unsupported)
 
506
        a += ']'
 
507
        return a
 
508
 
 
509
    def report_test_start(self, test):
 
510
        self.count += 1
 
511
        self.pb.update(
 
512
                self._progress_prefix_text()
 
513
                + ' '
 
514
                + self._shortened_test_description(test))
 
515
 
 
516
    def _test_description(self, test):
 
517
        return self._shortened_test_description(test)
 
518
 
 
519
    def report_error(self, test, err):
 
520
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
 
521
            self._test_description(test),
 
522
            err[1],
 
523
            ))
 
524
 
 
525
    def report_failure(self, test, err):
 
526
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
 
527
            self._test_description(test),
 
528
            err[1],
 
529
            ))
 
530
 
 
531
    def report_known_failure(self, test, err):
 
532
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
 
533
            self._test_description(test), err[1]))
 
534
 
 
535
    def report_skip(self, test, reason):
 
536
        pass
 
537
 
 
538
    def report_not_applicable(self, test, skip_excinfo):
 
539
        pass
 
540
 
 
541
    def report_unsupported(self, test, feature):
 
542
        """test cannot be run because feature is missing."""
 
543
 
 
544
    def report_cleaning_up(self):
 
545
        self.pb.update('Cleaning up')
 
546
 
 
547
 
 
548
class VerboseTestResult(ExtendedTestResult):
 
549
    """Produce long output, with one line per test run plus times"""
 
550
 
 
551
    def _ellipsize_to_right(self, a_string, final_width):
 
552
        """Truncate and pad a string, keeping the right hand side"""
 
553
        if len(a_string) > final_width:
 
554
            result = '...' + a_string[3-final_width:]
 
555
        else:
 
556
            result = a_string
 
557
        return result.ljust(final_width)
 
558
 
 
559
    def startTestRun(self):
 
560
        super(VerboseTestResult, self).startTestRun()
 
561
        self.stream.write('running %d tests...\n' % self.num_tests)
 
562
 
 
563
    def report_test_start(self, test):
 
564
        self.count += 1
 
565
        name = self._shortened_test_description(test)
 
566
        # width needs space for 6 char status, plus 1 for slash, plus an
 
567
        # 11-char time string, plus a trailing blank
 
568
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
569
        self.stream.write(self._ellipsize_to_right(name,
 
570
                          osutils.terminal_width()-18))
 
571
        self.stream.flush()
 
572
 
 
573
    def _error_summary(self, err):
 
574
        indent = ' ' * 4
 
575
        return '%s%s' % (indent, err[1])
 
576
 
 
577
    def report_error(self, test, err):
 
578
        self.stream.writeln('ERROR %s\n%s'
 
579
                % (self._testTimeString(test),
 
580
                   self._error_summary(err)))
 
581
 
 
582
    def report_failure(self, test, err):
 
583
        self.stream.writeln(' FAIL %s\n%s'
 
584
                % (self._testTimeString(test),
 
585
                   self._error_summary(err)))
 
586
 
 
587
    def report_known_failure(self, test, err):
 
588
        self.stream.writeln('XFAIL %s\n%s'
 
589
                % (self._testTimeString(test),
 
590
                   self._error_summary(err)))
 
591
 
 
592
    def report_success(self, test):
 
593
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
594
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
595
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
596
            stats.pprint(file=self.stream)
 
597
        # flush the stream so that we get smooth output. This verbose mode is
 
598
        # used to show the output in PQM.
 
599
        self.stream.flush()
 
600
 
 
601
    def report_skip(self, test, reason):
 
602
        self.stream.writeln(' SKIP %s\n%s'
 
603
                % (self._testTimeString(test), reason))
 
604
 
 
605
    def report_not_applicable(self, test, skip_excinfo):
 
606
        self.stream.writeln('  N/A %s\n%s'
 
607
                % (self._testTimeString(test),
 
608
                   self._error_summary(skip_excinfo)))
 
609
 
 
610
    def report_unsupported(self, test, feature):
 
611
        """test cannot be run because feature is missing."""
 
612
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
613
                %(self._testTimeString(test), feature))
 
614
 
 
615
 
 
616
class TextTestRunner(object):
 
617
    stop_on_failure = False
 
618
 
 
619
    def __init__(self,
 
620
                 stream=sys.stderr,
 
621
                 descriptions=0,
 
622
                 verbosity=1,
 
623
                 bench_history=None,
 
624
                 strict=False,
 
625
                 result_decorators=None,
 
626
                 ):
 
627
        """Create a TextTestRunner.
 
628
 
 
629
        :param result_decorators: An optional list of decorators to apply
 
630
            to the result object being used by the runner. Decorators are
 
631
            applied left to right - the first element in the list is the 
 
632
            innermost decorator.
 
633
        """
 
634
        self.stream = unittest._WritelnDecorator(stream)
 
635
        self.descriptions = descriptions
 
636
        self.verbosity = verbosity
 
637
        self._bench_history = bench_history
 
638
        self._strict = strict
 
639
        self._result_decorators = result_decorators or []
 
640
 
 
641
    def run(self, test):
 
642
        "Run the given test case or test suite."
 
643
        if self.verbosity == 1:
 
644
            result_class = TextTestResult
 
645
        elif self.verbosity >= 2:
 
646
            result_class = VerboseTestResult
 
647
        original_result = result_class(self.stream,
 
648
                              self.descriptions,
 
649
                              self.verbosity,
 
650
                              bench_history=self._bench_history,
 
651
                              strict=self._strict,
 
652
                              )
 
653
        # Signal to result objects that look at stop early policy to stop,
 
654
        original_result.stop_early = self.stop_on_failure
 
655
        result = original_result
 
656
        for decorator in self._result_decorators:
 
657
            result = decorator(result)
 
658
            result.stop_early = self.stop_on_failure
 
659
        try:
 
660
            import testtools
 
661
        except ImportError:
 
662
            pass
 
663
        else:
 
664
            if isinstance(test, testtools.ConcurrentTestSuite):
 
665
                # We need to catch bzr specific behaviors
 
666
                result = BZRTransformingResult(result)
 
667
        result.startTestRun()
 
668
        try:
 
669
            test.run(result)
 
670
        finally:
 
671
            result.stopTestRun()
 
672
        # higher level code uses our extended protocol to determine
 
673
        # what exit code to give.
 
674
        return original_result
 
675
 
 
676
 
 
677
def iter_suite_tests(suite):
 
678
    """Return all tests in a suite, recursing through nested suites"""
 
679
    if isinstance(suite, unittest.TestCase):
 
680
        yield suite
 
681
    elif isinstance(suite, unittest.TestSuite):
 
682
        for item in suite:
 
683
            for r in iter_suite_tests(item):
 
684
                yield r
 
685
    else:
 
686
        raise Exception('unknown type %r for object %r'
 
687
                        % (type(suite), suite))
 
688
 
 
689
 
 
690
class TestSkipped(Exception):
 
691
    """Indicates that a test was intentionally skipped, rather than failing."""
 
692
 
 
693
 
 
694
class TestNotApplicable(TestSkipped):
 
695
    """A test is not applicable to the situation where it was run.
 
696
 
 
697
    This is only normally raised by parameterized tests, if they find that
 
698
    the instance they're constructed upon does not support one aspect
 
699
    of its interface.
 
700
    """
 
701
 
 
702
 
 
703
class KnownFailure(AssertionError):
 
704
    """Indicates that a test failed in a precisely expected manner.
 
705
 
 
706
    Such failures dont block the whole test suite from passing because they are
 
707
    indicators of partially completed code or of future work. We have an
 
708
    explicit error for them so that we can ensure that they are always visible:
 
709
    KnownFailures are always shown in the output of bzr selftest.
 
710
    """
 
711
 
 
712
 
 
713
class UnavailableFeature(Exception):
 
714
    """A feature required for this test was not available.
 
715
 
 
716
    The feature should be used to construct the exception.
 
717
    """
 
718
 
 
719
 
 
720
class CommandFailed(Exception):
 
721
    pass
 
722
 
 
723
 
 
724
class StringIOWrapper(object):
 
725
    """A wrapper around cStringIO which just adds an encoding attribute.
 
726
 
 
727
    Internally we can check sys.stdout to see what the output encoding
 
728
    should be. However, cStringIO has no encoding attribute that we can
 
729
    set. So we wrap it instead.
 
730
    """
 
731
    encoding='ascii'
 
732
    _cstring = None
 
733
 
 
734
    def __init__(self, s=None):
 
735
        if s is not None:
 
736
            self.__dict__['_cstring'] = StringIO(s)
 
737
        else:
 
738
            self.__dict__['_cstring'] = StringIO()
 
739
 
 
740
    def __getattr__(self, name, getattr=getattr):
 
741
        return getattr(self.__dict__['_cstring'], name)
 
742
 
 
743
    def __setattr__(self, name, val):
 
744
        if name == 'encoding':
 
745
            self.__dict__['encoding'] = val
 
746
        else:
 
747
            return setattr(self._cstring, name, val)
 
748
 
 
749
 
 
750
class TestUIFactory(TextUIFactory):
 
751
    """A UI Factory for testing.
 
752
 
 
753
    Hide the progress bar but emit note()s.
 
754
    Redirect stdin.
 
755
    Allows get_password to be tested without real tty attached.
 
756
 
 
757
    See also CannedInputUIFactory which lets you provide programmatic input in
 
758
    a structured way.
 
759
    """
 
760
    # TODO: Capture progress events at the model level and allow them to be
 
761
    # observed by tests that care.
 
762
    #
 
763
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
764
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
765
    # idea of how they're supposed to be different.
 
766
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
767
 
 
768
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
769
        if stdin is not None:
 
770
            # We use a StringIOWrapper to be able to test various
 
771
            # encodings, but the user is still responsible to
 
772
            # encode the string and to set the encoding attribute
 
773
            # of StringIOWrapper.
 
774
            stdin = StringIOWrapper(stdin)
 
775
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
776
 
 
777
    def get_non_echoed_password(self):
 
778
        """Get password from stdin without trying to handle the echo mode"""
 
779
        password = self.stdin.readline()
 
780
        if not password:
 
781
            raise EOFError
 
782
        if password[-1] == '\n':
 
783
            password = password[:-1]
 
784
        return password
 
785
 
 
786
    def make_progress_view(self):
 
787
        return NullProgressView()
 
788
 
 
789
 
 
790
class TestCase(unittest.TestCase):
 
791
    """Base class for bzr unit tests.
 
792
 
 
793
    Tests that need access to disk resources should subclass
 
794
    TestCaseInTempDir not TestCase.
 
795
 
 
796
    Error and debug log messages are redirected from their usual
 
797
    location into a temporary file, the contents of which can be
 
798
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
799
    so that it can also capture file IO.  When the test completes this file
 
800
    is read into memory and removed from disk.
 
801
 
 
802
    There are also convenience functions to invoke bzr's command-line
 
803
    routine, and to build and check bzr trees.
 
804
 
 
805
    In addition to the usual method of overriding tearDown(), this class also
 
806
    allows subclasses to register functions into the _cleanups list, which is
 
807
    run in order as the object is torn down.  It's less likely this will be
 
808
    accidentally overlooked.
 
809
    """
 
810
 
 
811
    _active_threads = None
 
812
    _leaking_threads_tests = 0
 
813
    _first_thread_leaker_id = None
 
814
    _log_file_name = None
 
815
    _log_contents = ''
 
816
    _keep_log_file = False
 
817
    # record lsprof data when performing benchmark calls.
 
818
    _gather_lsprof_in_benchmarks = False
 
819
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
820
                     '_log_contents', '_log_file_name', '_benchtime',
 
821
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
822
 
 
823
    def __init__(self, methodName='testMethod'):
 
824
        super(TestCase, self).__init__(methodName)
 
825
        self._cleanups = []
 
826
        self._bzr_test_setUp_run = False
 
827
        self._bzr_test_tearDown_run = False
 
828
        self._directory_isolation = True
 
829
 
 
830
    def setUp(self):
 
831
        unittest.TestCase.setUp(self)
 
832
        self._bzr_test_setUp_run = True
 
833
        self._cleanEnvironment()
 
834
        self._silenceUI()
 
835
        self._startLogFile()
 
836
        self._benchcalls = []
 
837
        self._benchtime = None
 
838
        self._clear_hooks()
 
839
        self._track_transports()
 
840
        self._track_locks()
 
841
        self._clear_debug_flags()
 
842
        TestCase._active_threads = threading.activeCount()
 
843
        self.addCleanup(self._check_leaked_threads)
 
844
 
 
845
    def debug(self):
 
846
        # debug a frame up.
 
847
        import pdb
 
848
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
849
 
 
850
    def _check_leaked_threads(self):
 
851
        active = threading.activeCount()
 
852
        leaked_threads = active - TestCase._active_threads
 
853
        TestCase._active_threads = active
 
854
        # If some tests make the number of threads *decrease*, we'll consider
 
855
        # that they are just observing old threads dieing, not agressively kill
 
856
        # random threads. So we don't report these tests as leaking. The risk
 
857
        # is that we have false positives that way (the test see 2 threads
 
858
        # going away but leak one) but it seems less likely than the actual
 
859
        # false positives (the test see threads going away and does not leak).
 
860
        if leaked_threads > 0:
 
861
            TestCase._leaking_threads_tests += 1
 
862
            if TestCase._first_thread_leaker_id is None:
 
863
                TestCase._first_thread_leaker_id = self.id()
 
864
 
 
865
    def _clear_debug_flags(self):
 
866
        """Prevent externally set debug flags affecting tests.
 
867
 
 
868
        Tests that want to use debug flags can just set them in the
 
869
        debug_flags set during setup/teardown.
 
870
        """
 
871
        self._preserved_debug_flags = set(debug.debug_flags)
 
872
        if 'allow_debug' not in selftest_debug_flags:
 
873
            debug.debug_flags.clear()
 
874
        if 'disable_lock_checks' not in selftest_debug_flags:
 
875
            debug.debug_flags.add('strict_locks')
 
876
        self.addCleanup(self._restore_debug_flags)
 
877
 
 
878
    def _clear_hooks(self):
 
879
        # prevent hooks affecting tests
 
880
        self._preserved_hooks = {}
 
881
        for key, factory in hooks.known_hooks.items():
 
882
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
883
            current_hooks = hooks.known_hooks_key_to_object(key)
 
884
            self._preserved_hooks[parent] = (name, current_hooks)
 
885
        self.addCleanup(self._restoreHooks)
 
886
        for key, factory in hooks.known_hooks.items():
 
887
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
888
            setattr(parent, name, factory())
 
889
        # this hook should always be installed
 
890
        request._install_hook()
 
891
 
 
892
    def disable_directory_isolation(self):
 
893
        """Turn off directory isolation checks."""
 
894
        self._directory_isolation = False
 
895
 
 
896
    def enable_directory_isolation(self):
 
897
        """Enable directory isolation checks."""
 
898
        self._directory_isolation = True
 
899
 
 
900
    def _silenceUI(self):
 
901
        """Turn off UI for duration of test"""
 
902
        # by default the UI is off; tests can turn it on if they want it.
 
903
        saved = ui.ui_factory
 
904
        def _restore():
 
905
            ui.ui_factory = saved
 
906
        ui.ui_factory = ui.SilentUIFactory()
 
907
        self.addCleanup(_restore)
 
908
 
 
909
    def _check_locks(self):
 
910
        """Check that all lock take/release actions have been paired."""
 
911
        # We always check for mismatched locks. If a mismatch is found, we
 
912
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
913
        # case we just print a warning.
 
914
        # unhook:
 
915
        acquired_locks = [lock for action, lock in self._lock_actions
 
916
                          if action == 'acquired']
 
917
        released_locks = [lock for action, lock in self._lock_actions
 
918
                          if action == 'released']
 
919
        broken_locks = [lock for action, lock in self._lock_actions
 
920
                        if action == 'broken']
 
921
        # trivially, given the tests for lock acquistion and release, if we
 
922
        # have as many in each list, it should be ok. Some lock tests also
 
923
        # break some locks on purpose and should be taken into account by
 
924
        # considering that breaking a lock is just a dirty way of releasing it.
 
925
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
926
            message = ('Different number of acquired and '
 
927
                       'released or broken locks. (%s, %s + %s)' %
 
928
                       (acquired_locks, released_locks, broken_locks))
 
929
            if not self._lock_check_thorough:
 
930
                # Rather than fail, just warn
 
931
                print "Broken test %s: %s" % (self, message)
 
932
                return
 
933
            self.fail(message)
 
934
 
 
935
    def _track_locks(self):
 
936
        """Track lock activity during tests."""
 
937
        self._lock_actions = []
 
938
        if 'disable_lock_checks' in selftest_debug_flags:
 
939
            self._lock_check_thorough = False
 
940
        else:
 
941
            self._lock_check_thorough = True
 
942
            
 
943
        self.addCleanup(self._check_locks)
 
944
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
945
                                                self._lock_acquired, None)
 
946
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
947
                                                self._lock_released, None)
 
948
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
949
                                                self._lock_broken, None)
 
950
 
 
951
    def _lock_acquired(self, result):
 
952
        self._lock_actions.append(('acquired', result))
 
953
 
 
954
    def _lock_released(self, result):
 
955
        self._lock_actions.append(('released', result))
 
956
 
 
957
    def _lock_broken(self, result):
 
958
        self._lock_actions.append(('broken', result))
 
959
 
 
960
    def permit_dir(self, name):
 
961
        """Permit a directory to be used by this test. See permit_url."""
 
962
        name_transport = get_transport(name)
 
963
        self.permit_url(name)
 
964
        self.permit_url(name_transport.base)
 
965
 
 
966
    def permit_url(self, url):
 
967
        """Declare that url is an ok url to use in this test.
 
968
        
 
969
        Do this for memory transports, temporary test directory etc.
 
970
        
 
971
        Do not do this for the current working directory, /tmp, or any other
 
972
        preexisting non isolated url.
 
973
        """
 
974
        if not url.endswith('/'):
 
975
            url += '/'
 
976
        self._bzr_selftest_roots.append(url)
 
977
 
 
978
    def permit_source_tree_branch_repo(self):
 
979
        """Permit the source tree bzr is running from to be opened.
 
980
 
 
981
        Some code such as bzrlib.version attempts to read from the bzr branch
 
982
        that bzr is executing from (if any). This method permits that directory
 
983
        to be used in the test suite.
 
984
        """
 
985
        path = self.get_source_path()
 
986
        self.record_directory_isolation()
 
987
        try:
 
988
            try:
 
989
                workingtree.WorkingTree.open(path)
 
990
            except (errors.NotBranchError, errors.NoWorkingTree):
 
991
                return
 
992
        finally:
 
993
            self.enable_directory_isolation()
 
994
 
 
995
    def _preopen_isolate_transport(self, transport):
 
996
        """Check that all transport openings are done in the test work area."""
 
997
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
998
            # Unwrap pathfiltered transports
 
999
            transport = transport.server.backing_transport.clone(
 
1000
                transport._filter('.'))
 
1001
        url = transport.base
 
1002
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1003
        # urls it is given by prepending readonly+. This is appropriate as the
 
1004
        # client shouldn't know that the server is readonly (or not readonly).
 
1005
        # We could register all servers twice, with readonly+ prepending, but
 
1006
        # that makes for a long list; this is about the same but easier to
 
1007
        # read.
 
1008
        if url.startswith('readonly+'):
 
1009
            url = url[len('readonly+'):]
 
1010
        self._preopen_isolate_url(url)
 
1011
 
 
1012
    def _preopen_isolate_url(self, url):
 
1013
        if not self._directory_isolation:
 
1014
            return
 
1015
        if self._directory_isolation == 'record':
 
1016
            self._bzr_selftest_roots.append(url)
 
1017
            return
 
1018
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1019
        # from working unless they are explicitly granted permission. We then
 
1020
        # depend on the code that sets up test transports to check that they are
 
1021
        # appropriately isolated and enable their use by calling
 
1022
        # self.permit_transport()
 
1023
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1024
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1025
                % (url, self._bzr_selftest_roots))
 
1026
 
 
1027
    def record_directory_isolation(self):
 
1028
        """Gather accessed directories to permit later access.
 
1029
        
 
1030
        This is used for tests that access the branch bzr is running from.
 
1031
        """
 
1032
        self._directory_isolation = "record"
 
1033
 
 
1034
    def start_server(self, transport_server, backing_server=None):
 
1035
        """Start transport_server for this test.
 
1036
 
 
1037
        This starts the server, registers a cleanup for it and permits the
 
1038
        server's urls to be used.
 
1039
        """
 
1040
        if backing_server is None:
 
1041
            transport_server.setUp()
 
1042
        else:
 
1043
            transport_server.setUp(backing_server)
 
1044
        self.addCleanup(transport_server.tearDown)
 
1045
        # Obtain a real transport because if the server supplies a password, it
 
1046
        # will be hidden from the base on the client side.
 
1047
        t = get_transport(transport_server.get_url())
 
1048
        # Some transport servers effectively chroot the backing transport;
 
1049
        # others like SFTPServer don't - users of the transport can walk up the
 
1050
        # transport to read the entire backing transport. This wouldn't matter
 
1051
        # except that the workdir tests are given - and that they expect the
 
1052
        # server's url to point at - is one directory under the safety net. So
 
1053
        # Branch operations into the transport will attempt to walk up one
 
1054
        # directory. Chrooting all servers would avoid this but also mean that
 
1055
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1056
        # getting the test framework to start the server with a backing server
 
1057
        # at the actual safety net directory would work too, but this then
 
1058
        # means that the self.get_url/self.get_transport methods would need
 
1059
        # to transform all their results. On balance its cleaner to handle it
 
1060
        # here, and permit a higher url when we have one of these transports.
 
1061
        if t.base.endswith('/work/'):
 
1062
            # we have safety net/test root/work
 
1063
            t = t.clone('../..')
 
1064
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1065
            # The smart server adds a path similar to work, which is traversed
 
1066
            # up from by the client. But the server is chrooted - the actual
 
1067
            # backing transport is not escaped from, and VFS requests to the
 
1068
            # root will error (because they try to escape the chroot).
 
1069
            t2 = t.clone('..')
 
1070
            while t2.base != t.base:
 
1071
                t = t2
 
1072
                t2 = t.clone('..')
 
1073
        self.permit_url(t.base)
 
1074
 
 
1075
    def _track_transports(self):
 
1076
        """Install checks for transport usage."""
 
1077
        # TestCase has no safe place it can write to.
 
1078
        self._bzr_selftest_roots = []
 
1079
        # Currently the easiest way to be sure that nothing is going on is to
 
1080
        # hook into bzr dir opening. This leaves a small window of error for
 
1081
        # transport tests, but they are well known, and we can improve on this
 
1082
        # step.
 
1083
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1084
            self._preopen_isolate_transport, "Check bzr directories are safe.")
 
1085
 
 
1086
    def _ndiff_strings(self, a, b):
 
1087
        """Return ndiff between two strings containing lines.
 
1088
 
 
1089
        A trailing newline is added if missing to make the strings
 
1090
        print properly."""
 
1091
        if b and b[-1] != '\n':
 
1092
            b += '\n'
 
1093
        if a and a[-1] != '\n':
 
1094
            a += '\n'
 
1095
        difflines = difflib.ndiff(a.splitlines(True),
 
1096
                                  b.splitlines(True),
 
1097
                                  linejunk=lambda x: False,
 
1098
                                  charjunk=lambda x: False)
 
1099
        return ''.join(difflines)
 
1100
 
 
1101
    def assertEqual(self, a, b, message=''):
 
1102
        try:
 
1103
            if a == b:
 
1104
                return
 
1105
        except UnicodeError, e:
 
1106
            # If we can't compare without getting a UnicodeError, then
 
1107
            # obviously they are different
 
1108
            mutter('UnicodeError: %s', e)
 
1109
        if message:
 
1110
            message += '\n'
 
1111
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1112
            % (message,
 
1113
               pformat(a), pformat(b)))
 
1114
 
 
1115
    assertEquals = assertEqual
 
1116
 
 
1117
    def assertEqualDiff(self, a, b, message=None):
 
1118
        """Assert two texts are equal, if not raise an exception.
 
1119
 
 
1120
        This is intended for use with multi-line strings where it can
 
1121
        be hard to find the differences by eye.
 
1122
        """
 
1123
        # TODO: perhaps override assertEquals to call this for strings?
 
1124
        if a == b:
 
1125
            return
 
1126
        if message is None:
 
1127
            message = "texts not equal:\n"
 
1128
        if a + '\n' == b:
 
1129
            message = 'first string is missing a final newline.\n'
 
1130
        if a == b + '\n':
 
1131
            message = 'second string is missing a final newline.\n'
 
1132
        raise AssertionError(message +
 
1133
                             self._ndiff_strings(a, b))
 
1134
 
 
1135
    def assertEqualMode(self, mode, mode_test):
 
1136
        self.assertEqual(mode, mode_test,
 
1137
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1138
 
 
1139
    def assertEqualStat(self, expected, actual):
 
1140
        """assert that expected and actual are the same stat result.
 
1141
 
 
1142
        :param expected: A stat result.
 
1143
        :param actual: A stat result.
 
1144
        :raises AssertionError: If the expected and actual stat values differ
 
1145
            other than by atime.
 
1146
        """
 
1147
        self.assertEqual(expected.st_size, actual.st_size)
 
1148
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
1149
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
1150
        self.assertEqual(expected.st_dev, actual.st_dev)
 
1151
        self.assertEqual(expected.st_ino, actual.st_ino)
 
1152
        self.assertEqual(expected.st_mode, actual.st_mode)
 
1153
 
 
1154
    def assertLength(self, length, obj_with_len):
 
1155
        """Assert that obj_with_len is of length length."""
 
1156
        if len(obj_with_len) != length:
 
1157
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1158
                length, len(obj_with_len), obj_with_len))
 
1159
 
 
1160
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1161
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
 
1162
        """
 
1163
        from bzrlib import trace
 
1164
        captured = []
 
1165
        orig_log_exception_quietly = trace.log_exception_quietly
 
1166
        try:
 
1167
            def capture():
 
1168
                orig_log_exception_quietly()
 
1169
                captured.append(sys.exc_info())
 
1170
            trace.log_exception_quietly = capture
 
1171
            func(*args, **kwargs)
 
1172
        finally:
 
1173
            trace.log_exception_quietly = orig_log_exception_quietly
 
1174
        self.assertLength(1, captured)
 
1175
        err = captured[0][1]
 
1176
        self.assertIsInstance(err, exception_class)
 
1177
        return err
 
1178
 
 
1179
    def assertPositive(self, val):
 
1180
        """Assert that val is greater than 0."""
 
1181
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1182
 
 
1183
    def assertNegative(self, val):
 
1184
        """Assert that val is less than 0."""
 
1185
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1186
 
 
1187
    def assertStartsWith(self, s, prefix):
 
1188
        if not s.startswith(prefix):
 
1189
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1190
 
 
1191
    def assertEndsWith(self, s, suffix):
 
1192
        """Asserts that s ends with suffix."""
 
1193
        if not s.endswith(suffix):
 
1194
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1195
 
 
1196
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1197
        """Assert that a contains something matching a regular expression."""
 
1198
        if not re.search(needle_re, haystack, flags):
 
1199
            if '\n' in haystack or len(haystack) > 60:
 
1200
                # a long string, format it in a more readable way
 
1201
                raise AssertionError(
 
1202
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1203
                        % (needle_re, haystack))
 
1204
            else:
 
1205
                raise AssertionError('pattern "%s" not found in "%s"'
 
1206
                        % (needle_re, haystack))
 
1207
 
 
1208
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1209
        """Assert that a does not match a regular expression"""
 
1210
        if re.search(needle_re, haystack, flags):
 
1211
            raise AssertionError('pattern "%s" found in "%s"'
 
1212
                    % (needle_re, haystack))
 
1213
 
 
1214
    def assertSubset(self, sublist, superlist):
 
1215
        """Assert that every entry in sublist is present in superlist."""
 
1216
        missing = set(sublist) - set(superlist)
 
1217
        if len(missing) > 0:
 
1218
            raise AssertionError("value(s) %r not present in container %r" %
 
1219
                                 (missing, superlist))
 
1220
 
 
1221
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1222
        """Fail unless excClass is raised when the iterator from func is used.
 
1223
 
 
1224
        Many functions can return generators this makes sure
 
1225
        to wrap them in a list() call to make sure the whole generator
 
1226
        is run, and that the proper exception is raised.
 
1227
        """
 
1228
        try:
 
1229
            list(func(*args, **kwargs))
 
1230
        except excClass, e:
 
1231
            return e
 
1232
        else:
 
1233
            if getattr(excClass,'__name__', None) is not None:
 
1234
                excName = excClass.__name__
 
1235
            else:
 
1236
                excName = str(excClass)
 
1237
            raise self.failureException, "%s not raised" % excName
 
1238
 
 
1239
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1240
        """Assert that a callable raises a particular exception.
 
1241
 
 
1242
        :param excClass: As for the except statement, this may be either an
 
1243
            exception class, or a tuple of classes.
 
1244
        :param callableObj: A callable, will be passed ``*args`` and
 
1245
            ``**kwargs``.
 
1246
 
 
1247
        Returns the exception so that you can examine it.
 
1248
        """
 
1249
        try:
 
1250
            callableObj(*args, **kwargs)
 
1251
        except excClass, e:
 
1252
            return e
 
1253
        else:
 
1254
            if getattr(excClass,'__name__', None) is not None:
 
1255
                excName = excClass.__name__
 
1256
            else:
 
1257
                # probably a tuple
 
1258
                excName = str(excClass)
 
1259
            raise self.failureException, "%s not raised" % excName
 
1260
 
 
1261
    def assertIs(self, left, right, message=None):
 
1262
        if not (left is right):
 
1263
            if message is not None:
 
1264
                raise AssertionError(message)
 
1265
            else:
 
1266
                raise AssertionError("%r is not %r." % (left, right))
 
1267
 
 
1268
    def assertIsNot(self, left, right, message=None):
 
1269
        if (left is right):
 
1270
            if message is not None:
 
1271
                raise AssertionError(message)
 
1272
            else:
 
1273
                raise AssertionError("%r is %r." % (left, right))
 
1274
 
 
1275
    def assertTransportMode(self, transport, path, mode):
 
1276
        """Fail if a path does not have mode "mode".
 
1277
 
 
1278
        If modes are not supported on this transport, the assertion is ignored.
 
1279
        """
 
1280
        if not transport._can_roundtrip_unix_modebits():
 
1281
            return
 
1282
        path_stat = transport.stat(path)
 
1283
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1284
        self.assertEqual(mode, actual_mode,
 
1285
                         'mode of %r incorrect (%s != %s)'
 
1286
                         % (path, oct(mode), oct(actual_mode)))
 
1287
 
 
1288
    def assertIsSameRealPath(self, path1, path2):
 
1289
        """Fail if path1 and path2 points to different files"""
 
1290
        self.assertEqual(osutils.realpath(path1),
 
1291
                         osutils.realpath(path2),
 
1292
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1293
 
 
1294
    def assertIsInstance(self, obj, kls, msg=None):
 
1295
        """Fail if obj is not an instance of kls
 
1296
        
 
1297
        :param msg: Supplementary message to show if the assertion fails.
 
1298
        """
 
1299
        if not isinstance(obj, kls):
 
1300
            m = "%r is an instance of %s rather than %s" % (
 
1301
                obj, obj.__class__, kls)
 
1302
            if msg:
 
1303
                m += ": " + msg
 
1304
            self.fail(m)
 
1305
 
 
1306
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1307
        """Invoke a test, expecting it to fail for the given reason.
 
1308
 
 
1309
        This is for assertions that ought to succeed, but currently fail.
 
1310
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1311
        about the failure you're expecting.  If a new bug is introduced,
 
1312
        AssertionError should be raised, not KnownFailure.
 
1313
 
 
1314
        Frequently, expectFailure should be followed by an opposite assertion.
 
1315
        See example below.
 
1316
 
 
1317
        Intended to be used with a callable that raises AssertionError as the
 
1318
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1319
 
 
1320
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1321
        test succeeds.
 
1322
 
 
1323
        example usage::
 
1324
 
 
1325
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1326
                             dynamic_val)
 
1327
          self.assertEqual(42, dynamic_val)
 
1328
 
 
1329
          This means that a dynamic_val of 54 will cause the test to raise
 
1330
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1331
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1332
          than 54 or 42 will cause an AssertionError.
 
1333
        """
 
1334
        try:
 
1335
            assertion(*args, **kwargs)
 
1336
        except AssertionError:
 
1337
            raise KnownFailure(reason)
 
1338
        else:
 
1339
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1340
 
 
1341
    def assertFileEqual(self, content, path):
 
1342
        """Fail if path does not contain 'content'."""
 
1343
        self.failUnlessExists(path)
 
1344
        f = file(path, 'rb')
 
1345
        try:
 
1346
            s = f.read()
 
1347
        finally:
 
1348
            f.close()
 
1349
        self.assertEqualDiff(content, s)
 
1350
 
 
1351
    def failUnlessExists(self, path):
 
1352
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1353
        if not isinstance(path, basestring):
 
1354
            for p in path:
 
1355
                self.failUnlessExists(p)
 
1356
        else:
 
1357
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1358
 
 
1359
    def failIfExists(self, path):
 
1360
        """Fail if path or paths, which may be abs or relative, exist."""
 
1361
        if not isinstance(path, basestring):
 
1362
            for p in path:
 
1363
                self.failIfExists(p)
 
1364
        else:
 
1365
            self.failIf(osutils.lexists(path),path+" exists")
 
1366
 
 
1367
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1368
        """A helper for callDeprecated and applyDeprecated.
 
1369
 
 
1370
        :param a_callable: A callable to call.
 
1371
        :param args: The positional arguments for the callable
 
1372
        :param kwargs: The keyword arguments for the callable
 
1373
        :return: A tuple (warnings, result). result is the result of calling
 
1374
            a_callable(``*args``, ``**kwargs``).
 
1375
        """
 
1376
        local_warnings = []
 
1377
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1378
            # we've hooked into a deprecation specific callpath,
 
1379
            # only deprecations should getting sent via it.
 
1380
            self.assertEqual(cls, DeprecationWarning)
 
1381
            local_warnings.append(msg)
 
1382
        original_warning_method = symbol_versioning.warn
 
1383
        symbol_versioning.set_warning_method(capture_warnings)
 
1384
        try:
 
1385
            result = a_callable(*args, **kwargs)
 
1386
        finally:
 
1387
            symbol_versioning.set_warning_method(original_warning_method)
 
1388
        return (local_warnings, result)
 
1389
 
 
1390
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1391
        """Call a deprecated callable without warning the user.
 
1392
 
 
1393
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1394
        not other callers that go direct to the warning module.
 
1395
 
 
1396
        To test that a deprecated method raises an error, do something like
 
1397
        this::
 
1398
 
 
1399
            self.assertRaises(errors.ReservedId,
 
1400
                self.applyDeprecated,
 
1401
                deprecated_in((1, 5, 0)),
 
1402
                br.append_revision,
 
1403
                'current:')
 
1404
 
 
1405
        :param deprecation_format: The deprecation format that the callable
 
1406
            should have been deprecated with. This is the same type as the
 
1407
            parameter to deprecated_method/deprecated_function. If the
 
1408
            callable is not deprecated with this format, an assertion error
 
1409
            will be raised.
 
1410
        :param a_callable: A callable to call. This may be a bound method or
 
1411
            a regular function. It will be called with ``*args`` and
 
1412
            ``**kwargs``.
 
1413
        :param args: The positional arguments for the callable
 
1414
        :param kwargs: The keyword arguments for the callable
 
1415
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1416
        """
 
1417
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1418
            *args, **kwargs)
 
1419
        expected_first_warning = symbol_versioning.deprecation_string(
 
1420
            a_callable, deprecation_format)
 
1421
        if len(call_warnings) == 0:
 
1422
            self.fail("No deprecation warning generated by call to %s" %
 
1423
                a_callable)
 
1424
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1425
        return result
 
1426
 
 
1427
    def callCatchWarnings(self, fn, *args, **kw):
 
1428
        """Call a callable that raises python warnings.
 
1429
 
 
1430
        The caller's responsible for examining the returned warnings.
 
1431
 
 
1432
        If the callable raises an exception, the exception is not
 
1433
        caught and propagates up to the caller.  In that case, the list
 
1434
        of warnings is not available.
 
1435
 
 
1436
        :returns: ([warning_object, ...], fn_result)
 
1437
        """
 
1438
        # XXX: This is not perfect, because it completely overrides the
 
1439
        # warnings filters, and some code may depend on suppressing particular
 
1440
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1441
        # though.  -- Andrew, 20071062
 
1442
        wlist = []
 
1443
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1444
            # despite the name, 'message' is normally(?) a Warning subclass
 
1445
            # instance
 
1446
            wlist.append(message)
 
1447
        saved_showwarning = warnings.showwarning
 
1448
        saved_filters = warnings.filters
 
1449
        try:
 
1450
            warnings.showwarning = _catcher
 
1451
            warnings.filters = []
 
1452
            result = fn(*args, **kw)
 
1453
        finally:
 
1454
            warnings.showwarning = saved_showwarning
 
1455
            warnings.filters = saved_filters
 
1456
        return wlist, result
 
1457
 
 
1458
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1459
        """Assert that a callable is deprecated in a particular way.
 
1460
 
 
1461
        This is a very precise test for unusual requirements. The
 
1462
        applyDeprecated helper function is probably more suited for most tests
 
1463
        as it allows you to simply specify the deprecation format being used
 
1464
        and will ensure that that is issued for the function being called.
 
1465
 
 
1466
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1467
        not other callers that go direct to the warning module.  To catch
 
1468
        general warnings, use callCatchWarnings.
 
1469
 
 
1470
        :param expected: a list of the deprecation warnings expected, in order
 
1471
        :param callable: The callable to call
 
1472
        :param args: The positional arguments for the callable
 
1473
        :param kwargs: The keyword arguments for the callable
 
1474
        """
 
1475
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1476
            *args, **kwargs)
 
1477
        self.assertEqual(expected, call_warnings)
 
1478
        return result
 
1479
 
 
1480
    def _startLogFile(self):
 
1481
        """Send bzr and test log messages to a temporary file.
 
1482
 
 
1483
        The file is removed as the test is torn down.
 
1484
        """
 
1485
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1486
        self._log_file = os.fdopen(fileno, 'w+')
 
1487
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1488
        self._log_file_name = name
 
1489
        self.addCleanup(self._finishLogFile)
 
1490
 
 
1491
    def _finishLogFile(self):
 
1492
        """Finished with the log file.
 
1493
 
 
1494
        Close the file and delete it, unless setKeepLogfile was called.
 
1495
        """
 
1496
        if self._log_file is None:
 
1497
            return
 
1498
        bzrlib.trace.pop_log_file(self._log_memento)
 
1499
        self._log_file.close()
 
1500
        self._log_file = None
 
1501
        if not self._keep_log_file:
 
1502
            os.remove(self._log_file_name)
 
1503
            self._log_file_name = None
 
1504
 
 
1505
    def setKeepLogfile(self):
 
1506
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1507
        self._keep_log_file = True
 
1508
 
 
1509
    def thisFailsStrictLockCheck(self):
 
1510
        """It is known that this test would fail with -Dstrict_locks.
 
1511
 
 
1512
        By default, all tests are run with strict lock checking unless
 
1513
        -Edisable_lock_checks is supplied. However there are some tests which
 
1514
        we know fail strict locks at this point that have not been fixed.
 
1515
        They should call this function to disable the strict checking.
 
1516
 
 
1517
        This should be used sparingly, it is much better to fix the locking
 
1518
        issues rather than papering over the problem by calling this function.
 
1519
        """
 
1520
        debug.debug_flags.discard('strict_locks')
 
1521
 
 
1522
    def addCleanup(self, callable, *args, **kwargs):
 
1523
        """Arrange to run a callable when this case is torn down.
 
1524
 
 
1525
        Callables are run in the reverse of the order they are registered,
 
1526
        ie last-in first-out.
 
1527
        """
 
1528
        self._cleanups.append((callable, args, kwargs))
 
1529
 
 
1530
    def _cleanEnvironment(self):
 
1531
        new_env = {
 
1532
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1533
            'HOME': os.getcwd(),
 
1534
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1535
            # tests do check our impls match APPDATA
 
1536
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1537
            'VISUAL': None,
 
1538
            'EDITOR': None,
 
1539
            'BZR_EMAIL': None,
 
1540
            'BZREMAIL': None, # may still be present in the environment
 
1541
            'EMAIL': None,
 
1542
            'BZR_PROGRESS_BAR': None,
 
1543
            'BZR_LOG': None,
 
1544
            'BZR_PLUGIN_PATH': None,
 
1545
            # Make sure that any text ui tests are consistent regardless of
 
1546
            # the environment the test case is run in; you may want tests that
 
1547
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1548
            # going to a pipe or a StringIO.
 
1549
            'TERM': 'dumb',
 
1550
            'LINES': '25',
 
1551
            'COLUMNS': '80',
 
1552
            # SSH Agent
 
1553
            'SSH_AUTH_SOCK': None,
 
1554
            # Proxies
 
1555
            'http_proxy': None,
 
1556
            'HTTP_PROXY': None,
 
1557
            'https_proxy': None,
 
1558
            'HTTPS_PROXY': None,
 
1559
            'no_proxy': None,
 
1560
            'NO_PROXY': None,
 
1561
            'all_proxy': None,
 
1562
            'ALL_PROXY': None,
 
1563
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1564
            # least. If you do (care), please update this comment
 
1565
            # -- vila 20080401
 
1566
            'ftp_proxy': None,
 
1567
            'FTP_PROXY': None,
 
1568
            'BZR_REMOTE_PATH': None,
 
1569
        }
 
1570
        self.__old_env = {}
 
1571
        self.addCleanup(self._restoreEnvironment)
 
1572
        for name, value in new_env.iteritems():
 
1573
            self._captureVar(name, value)
 
1574
 
 
1575
    def _captureVar(self, name, newvalue):
 
1576
        """Set an environment variable, and reset it when finished."""
 
1577
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1578
 
 
1579
    def _restore_debug_flags(self):
 
1580
        debug.debug_flags.clear()
 
1581
        debug.debug_flags.update(self._preserved_debug_flags)
 
1582
 
 
1583
    def _restoreEnvironment(self):
 
1584
        for name, value in self.__old_env.iteritems():
 
1585
            osutils.set_or_unset_env(name, value)
 
1586
 
 
1587
    def _restoreHooks(self):
 
1588
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1589
            setattr(klass, name, hooks)
 
1590
 
 
1591
    def knownFailure(self, reason):
 
1592
        """This test has failed for some known reason."""
 
1593
        raise KnownFailure(reason)
 
1594
 
 
1595
    def _do_skip(self, result, reason):
 
1596
        addSkip = getattr(result, 'addSkip', None)
 
1597
        if not callable(addSkip):
 
1598
            result.addError(self, sys.exc_info())
 
1599
        else:
 
1600
            addSkip(self, reason)
 
1601
 
 
1602
    def run(self, result=None):
 
1603
        if result is None: result = self.defaultTestResult()
 
1604
        for feature in getattr(self, '_test_needs_features', []):
 
1605
            if not feature.available():
 
1606
                result.startTest(self)
 
1607
                if getattr(result, 'addNotSupported', None):
 
1608
                    result.addNotSupported(self, feature)
 
1609
                else:
 
1610
                    result.addSuccess(self)
 
1611
                result.stopTest(self)
 
1612
                return result
 
1613
        try:
 
1614
            try:
 
1615
                result.startTest(self)
 
1616
                absent_attr = object()
 
1617
                # Python 2.5
 
1618
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1619
                if method_name is absent_attr:
 
1620
                    # Python 2.4
 
1621
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1622
                testMethod = getattr(self, method_name)
 
1623
                try:
 
1624
                    try:
 
1625
                        self.setUp()
 
1626
                        if not self._bzr_test_setUp_run:
 
1627
                            self.fail(
 
1628
                                "test setUp did not invoke "
 
1629
                                "bzrlib.tests.TestCase's setUp")
 
1630
                    except KeyboardInterrupt:
 
1631
                        self._runCleanups()
 
1632
                        raise
 
1633
                    except TestSkipped, e:
 
1634
                        self._do_skip(result, e.args[0])
 
1635
                        self.tearDown()
 
1636
                        return result
 
1637
                    except:
 
1638
                        result.addError(self, sys.exc_info())
 
1639
                        self._runCleanups()
 
1640
                        return result
 
1641
 
 
1642
                    ok = False
 
1643
                    try:
 
1644
                        testMethod()
 
1645
                        ok = True
 
1646
                    except self.failureException:
 
1647
                        result.addFailure(self, sys.exc_info())
 
1648
                    except TestSkipped, e:
 
1649
                        if not e.args:
 
1650
                            reason = "No reason given."
 
1651
                        else:
 
1652
                            reason = e.args[0]
 
1653
                        self._do_skip(result, reason)
 
1654
                    except KeyboardInterrupt:
 
1655
                        self._runCleanups()
 
1656
                        raise
 
1657
                    except:
 
1658
                        result.addError(self, sys.exc_info())
 
1659
 
 
1660
                    try:
 
1661
                        self.tearDown()
 
1662
                        if not self._bzr_test_tearDown_run:
 
1663
                            self.fail(
 
1664
                                "test tearDown did not invoke "
 
1665
                                "bzrlib.tests.TestCase's tearDown")
 
1666
                    except KeyboardInterrupt:
 
1667
                        self._runCleanups()
 
1668
                        raise
 
1669
                    except:
 
1670
                        result.addError(self, sys.exc_info())
 
1671
                        self._runCleanups()
 
1672
                        ok = False
 
1673
                    if ok: result.addSuccess(self)
 
1674
                finally:
 
1675
                    result.stopTest(self)
 
1676
                return result
 
1677
            except TestNotApplicable:
 
1678
                # Not moved from the result [yet].
 
1679
                self._runCleanups()
 
1680
                raise
 
1681
            except KeyboardInterrupt:
 
1682
                self._runCleanups()
 
1683
                raise
 
1684
        finally:
 
1685
            saved_attrs = {}
 
1686
            for attr_name in self.attrs_to_keep:
 
1687
                if attr_name in self.__dict__:
 
1688
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1689
            self.__dict__ = saved_attrs
 
1690
 
 
1691
    def tearDown(self):
 
1692
        self._runCleanups()
 
1693
        self._log_contents = ''
 
1694
        self._bzr_test_tearDown_run = True
 
1695
        unittest.TestCase.tearDown(self)
 
1696
 
 
1697
    def time(self, callable, *args, **kwargs):
 
1698
        """Run callable and accrue the time it takes to the benchmark time.
 
1699
 
 
1700
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1701
        this will cause lsprofile statistics to be gathered and stored in
 
1702
        self._benchcalls.
 
1703
        """
 
1704
        if self._benchtime is None:
 
1705
            self._benchtime = 0
 
1706
        start = time.time()
 
1707
        try:
 
1708
            if not self._gather_lsprof_in_benchmarks:
 
1709
                return callable(*args, **kwargs)
 
1710
            else:
 
1711
                # record this benchmark
 
1712
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1713
                stats.sort()
 
1714
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1715
                return ret
 
1716
        finally:
 
1717
            self._benchtime += time.time() - start
 
1718
 
 
1719
    def _runCleanups(self):
 
1720
        """Run registered cleanup functions.
 
1721
 
 
1722
        This should only be called from TestCase.tearDown.
 
1723
        """
 
1724
        # TODO: Perhaps this should keep running cleanups even if
 
1725
        # one of them fails?
 
1726
 
 
1727
        # Actually pop the cleanups from the list so tearDown running
 
1728
        # twice is safe (this happens for skipped tests).
 
1729
        while self._cleanups:
 
1730
            cleanup, args, kwargs = self._cleanups.pop()
 
1731
            cleanup(*args, **kwargs)
 
1732
 
 
1733
    def log(self, *args):
 
1734
        mutter(*args)
 
1735
 
 
1736
    def _get_log(self, keep_log_file=False):
 
1737
        """Get the log from bzrlib.trace calls from this test.
 
1738
 
 
1739
        :param keep_log_file: When True, if the log is still a file on disk
 
1740
            leave it as a file on disk. When False, if the log is still a file
 
1741
            on disk, the log file is deleted and the log preserved as
 
1742
            self._log_contents.
 
1743
        :return: A string containing the log.
 
1744
        """
 
1745
        # flush the log file, to get all content
 
1746
        import bzrlib.trace
 
1747
        if bzrlib.trace._trace_file:
 
1748
            bzrlib.trace._trace_file.flush()
 
1749
        if self._log_contents:
 
1750
            # XXX: this can hardly contain the content flushed above --vila
 
1751
            # 20080128
 
1752
            return self._log_contents
 
1753
        if self._log_file_name is not None:
 
1754
            logfile = open(self._log_file_name)
 
1755
            try:
 
1756
                log_contents = logfile.read()
 
1757
            finally:
 
1758
                logfile.close()
 
1759
            if not keep_log_file:
 
1760
                self._log_contents = log_contents
 
1761
                try:
 
1762
                    os.remove(self._log_file_name)
 
1763
                except OSError, e:
 
1764
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1765
                        sys.stderr.write(('Unable to delete log file '
 
1766
                                             ' %r\n' % self._log_file_name))
 
1767
                    else:
 
1768
                        raise
 
1769
            return log_contents
 
1770
        else:
 
1771
            return "DELETED log file to reduce memory footprint"
 
1772
 
 
1773
    def requireFeature(self, feature):
 
1774
        """This test requires a specific feature is available.
 
1775
 
 
1776
        :raises UnavailableFeature: When feature is not available.
 
1777
        """
 
1778
        if not feature.available():
 
1779
            raise UnavailableFeature(feature)
 
1780
 
 
1781
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1782
            working_dir):
 
1783
        """Run bazaar command line, splitting up a string command line."""
 
1784
        if isinstance(args, basestring):
 
1785
            # shlex don't understand unicode strings,
 
1786
            # so args should be plain string (bialix 20070906)
 
1787
            args = list(shlex.split(str(args)))
 
1788
        return self._run_bzr_core(args, retcode=retcode,
 
1789
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1790
                )
 
1791
 
 
1792
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1793
            working_dir):
 
1794
        # Clear chk_map page cache, because the contents are likely to mask
 
1795
        # locking errors.
 
1796
        chk_map.clear_cache()
 
1797
        if encoding is None:
 
1798
            encoding = osutils.get_user_encoding()
 
1799
        stdout = StringIOWrapper()
 
1800
        stderr = StringIOWrapper()
 
1801
        stdout.encoding = encoding
 
1802
        stderr.encoding = encoding
 
1803
 
 
1804
        self.log('run bzr: %r', args)
 
1805
        # FIXME: don't call into logging here
 
1806
        handler = logging.StreamHandler(stderr)
 
1807
        handler.setLevel(logging.INFO)
 
1808
        logger = logging.getLogger('')
 
1809
        logger.addHandler(handler)
 
1810
        old_ui_factory = ui.ui_factory
 
1811
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1812
 
 
1813
        cwd = None
 
1814
        if working_dir is not None:
 
1815
            cwd = osutils.getcwd()
 
1816
            os.chdir(working_dir)
 
1817
 
 
1818
        try:
 
1819
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1820
                stdout, stderr,
 
1821
                bzrlib.commands.run_bzr_catch_user_errors,
 
1822
                args)
 
1823
        finally:
 
1824
            logger.removeHandler(handler)
 
1825
            ui.ui_factory = old_ui_factory
 
1826
            if cwd is not None:
 
1827
                os.chdir(cwd)
 
1828
 
 
1829
        out = stdout.getvalue()
 
1830
        err = stderr.getvalue()
 
1831
        if out:
 
1832
            self.log('output:\n%r', out)
 
1833
        if err:
 
1834
            self.log('errors:\n%r', err)
 
1835
        if retcode is not None:
 
1836
            self.assertEquals(retcode, result,
 
1837
                              message='Unexpected return code')
 
1838
        return result, out, err
 
1839
 
 
1840
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1841
                working_dir=None, error_regexes=[], output_encoding=None):
 
1842
        """Invoke bzr, as if it were run from the command line.
 
1843
 
 
1844
        The argument list should not include the bzr program name - the
 
1845
        first argument is normally the bzr command.  Arguments may be
 
1846
        passed in three ways:
 
1847
 
 
1848
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1849
        when the command contains whitespace or metacharacters, or
 
1850
        is built up at run time.
 
1851
 
 
1852
        2- A single string, eg "add a".  This is the most convenient
 
1853
        for hardcoded commands.
 
1854
 
 
1855
        This runs bzr through the interface that catches and reports
 
1856
        errors, and with logging set to something approximating the
 
1857
        default, so that error reporting can be checked.
 
1858
 
 
1859
        This should be the main method for tests that want to exercise the
 
1860
        overall behavior of the bzr application (rather than a unit test
 
1861
        or a functional test of the library.)
 
1862
 
 
1863
        This sends the stdout/stderr results into the test's log,
 
1864
        where it may be useful for debugging.  See also run_captured.
 
1865
 
 
1866
        :keyword stdin: A string to be used as stdin for the command.
 
1867
        :keyword retcode: The status code the command should return;
 
1868
            default 0.
 
1869
        :keyword working_dir: The directory to run the command in
 
1870
        :keyword error_regexes: A list of expected error messages.  If
 
1871
            specified they must be seen in the error output of the command.
 
1872
        """
 
1873
        retcode, out, err = self._run_bzr_autosplit(
 
1874
            args=args,
 
1875
            retcode=retcode,
 
1876
            encoding=encoding,
 
1877
            stdin=stdin,
 
1878
            working_dir=working_dir,
 
1879
            )
 
1880
        self.assertIsInstance(error_regexes, (list, tuple))
 
1881
        for regex in error_regexes:
 
1882
            self.assertContainsRe(err, regex)
 
1883
        return out, err
 
1884
 
 
1885
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1886
        """Run bzr, and check that stderr contains the supplied regexes
 
1887
 
 
1888
        :param error_regexes: Sequence of regular expressions which
 
1889
            must each be found in the error output. The relative ordering
 
1890
            is not enforced.
 
1891
        :param args: command-line arguments for bzr
 
1892
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1893
            This function changes the default value of retcode to be 3,
 
1894
            since in most cases this is run when you expect bzr to fail.
 
1895
 
 
1896
        :return: (out, err) The actual output of running the command (in case
 
1897
            you want to do more inspection)
 
1898
 
 
1899
        Examples of use::
 
1900
 
 
1901
            # Make sure that commit is failing because there is nothing to do
 
1902
            self.run_bzr_error(['no changes to commit'],
 
1903
                               ['commit', '-m', 'my commit comment'])
 
1904
            # Make sure --strict is handling an unknown file, rather than
 
1905
            # giving us the 'nothing to do' error
 
1906
            self.build_tree(['unknown'])
 
1907
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1908
                               ['commit', --strict', '-m', 'my commit comment'])
 
1909
        """
 
1910
        kwargs.setdefault('retcode', 3)
 
1911
        kwargs['error_regexes'] = error_regexes
 
1912
        out, err = self.run_bzr(*args, **kwargs)
 
1913
        return out, err
 
1914
 
 
1915
    def run_bzr_subprocess(self, *args, **kwargs):
 
1916
        """Run bzr in a subprocess for testing.
 
1917
 
 
1918
        This starts a new Python interpreter and runs bzr in there.
 
1919
        This should only be used for tests that have a justifiable need for
 
1920
        this isolation: e.g. they are testing startup time, or signal
 
1921
        handling, or early startup code, etc.  Subprocess code can't be
 
1922
        profiled or debugged so easily.
 
1923
 
 
1924
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1925
            None is supplied, the status code is not checked.
 
1926
        :keyword env_changes: A dictionary which lists changes to environment
 
1927
            variables. A value of None will unset the env variable.
 
1928
            The values must be strings. The change will only occur in the
 
1929
            child, so you don't need to fix the environment after running.
 
1930
        :keyword universal_newlines: Convert CRLF => LF
 
1931
        :keyword allow_plugins: By default the subprocess is run with
 
1932
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1933
            for system-wide plugins to create unexpected output on stderr,
 
1934
            which can cause unnecessary test failures.
 
1935
        """
 
1936
        env_changes = kwargs.get('env_changes', {})
 
1937
        working_dir = kwargs.get('working_dir', None)
 
1938
        allow_plugins = kwargs.get('allow_plugins', False)
 
1939
        if len(args) == 1:
 
1940
            if isinstance(args[0], list):
 
1941
                args = args[0]
 
1942
            elif isinstance(args[0], basestring):
 
1943
                args = list(shlex.split(args[0]))
 
1944
        else:
 
1945
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1946
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1947
                                            working_dir=working_dir,
 
1948
                                            allow_plugins=allow_plugins)
 
1949
        # We distinguish between retcode=None and retcode not passed.
 
1950
        supplied_retcode = kwargs.get('retcode', 0)
 
1951
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1952
            universal_newlines=kwargs.get('universal_newlines', False),
 
1953
            process_args=args)
 
1954
 
 
1955
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1956
                             skip_if_plan_to_signal=False,
 
1957
                             working_dir=None,
 
1958
                             allow_plugins=False):
 
1959
        """Start bzr in a subprocess for testing.
 
1960
 
 
1961
        This starts a new Python interpreter and runs bzr in there.
 
1962
        This should only be used for tests that have a justifiable need for
 
1963
        this isolation: e.g. they are testing startup time, or signal
 
1964
        handling, or early startup code, etc.  Subprocess code can't be
 
1965
        profiled or debugged so easily.
 
1966
 
 
1967
        :param process_args: a list of arguments to pass to the bzr executable,
 
1968
            for example ``['--version']``.
 
1969
        :param env_changes: A dictionary which lists changes to environment
 
1970
            variables. A value of None will unset the env variable.
 
1971
            The values must be strings. The change will only occur in the
 
1972
            child, so you don't need to fix the environment after running.
 
1973
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1974
            is not available.
 
1975
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1976
 
 
1977
        :returns: Popen object for the started process.
 
1978
        """
 
1979
        if skip_if_plan_to_signal:
 
1980
            if not getattr(os, 'kill', None):
 
1981
                raise TestSkipped("os.kill not available.")
 
1982
 
 
1983
        if env_changes is None:
 
1984
            env_changes = {}
 
1985
        old_env = {}
 
1986
 
 
1987
        def cleanup_environment():
 
1988
            for env_var, value in env_changes.iteritems():
 
1989
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1990
 
 
1991
        def restore_environment():
 
1992
            for env_var, value in old_env.iteritems():
 
1993
                osutils.set_or_unset_env(env_var, value)
 
1994
 
 
1995
        bzr_path = self.get_bzr_path()
 
1996
 
 
1997
        cwd = None
 
1998
        if working_dir is not None:
 
1999
            cwd = osutils.getcwd()
 
2000
            os.chdir(working_dir)
 
2001
 
 
2002
        try:
 
2003
            # win32 subprocess doesn't support preexec_fn
 
2004
            # so we will avoid using it on all platforms, just to
 
2005
            # make sure the code path is used, and we don't break on win32
 
2006
            cleanup_environment()
 
2007
            command = [sys.executable]
 
2008
            # frozen executables don't need the path to bzr
 
2009
            if getattr(sys, "frozen", None) is None:
 
2010
                command.append(bzr_path)
 
2011
            if not allow_plugins:
 
2012
                command.append('--no-plugins')
 
2013
            command.extend(process_args)
 
2014
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
2015
        finally:
 
2016
            restore_environment()
 
2017
            if cwd is not None:
 
2018
                os.chdir(cwd)
 
2019
 
 
2020
        return process
 
2021
 
 
2022
    def _popen(self, *args, **kwargs):
 
2023
        """Place a call to Popen.
 
2024
 
 
2025
        Allows tests to override this method to intercept the calls made to
 
2026
        Popen for introspection.
 
2027
        """
 
2028
        return Popen(*args, **kwargs)
 
2029
 
 
2030
    def get_source_path(self):
 
2031
        """Return the path of the directory containing bzrlib."""
 
2032
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2033
 
 
2034
    def get_bzr_path(self):
 
2035
        """Return the path of the 'bzr' executable for this test suite."""
 
2036
        bzr_path = self.get_source_path()+'/bzr'
 
2037
        if not os.path.isfile(bzr_path):
 
2038
            # We are probably installed. Assume sys.argv is the right file
 
2039
            bzr_path = sys.argv[0]
 
2040
        return bzr_path
 
2041
 
 
2042
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2043
                              universal_newlines=False, process_args=None):
 
2044
        """Finish the execution of process.
 
2045
 
 
2046
        :param process: the Popen object returned from start_bzr_subprocess.
 
2047
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2048
            None is supplied, the status code is not checked.
 
2049
        :param send_signal: an optional signal to send to the process.
 
2050
        :param universal_newlines: Convert CRLF => LF
 
2051
        :returns: (stdout, stderr)
 
2052
        """
 
2053
        if send_signal is not None:
 
2054
            os.kill(process.pid, send_signal)
 
2055
        out, err = process.communicate()
 
2056
 
 
2057
        if universal_newlines:
 
2058
            out = out.replace('\r\n', '\n')
 
2059
            err = err.replace('\r\n', '\n')
 
2060
 
 
2061
        if retcode is not None and retcode != process.returncode:
 
2062
            if process_args is None:
 
2063
                process_args = "(unknown args)"
 
2064
            mutter('Output of bzr %s:\n%s', process_args, out)
 
2065
            mutter('Error for bzr %s:\n%s', process_args, err)
 
2066
            self.fail('Command bzr %s failed with retcode %s != %s'
 
2067
                      % (process_args, retcode, process.returncode))
 
2068
        return [out, err]
 
2069
 
 
2070
    def check_inventory_shape(self, inv, shape):
 
2071
        """Compare an inventory to a list of expected names.
 
2072
 
 
2073
        Fail if they are not precisely equal.
 
2074
        """
 
2075
        extras = []
 
2076
        shape = list(shape)             # copy
 
2077
        for path, ie in inv.entries():
 
2078
            name = path.replace('\\', '/')
 
2079
            if ie.kind == 'directory':
 
2080
                name = name + '/'
 
2081
            if name in shape:
 
2082
                shape.remove(name)
 
2083
            else:
 
2084
                extras.append(name)
 
2085
        if shape:
 
2086
            self.fail("expected paths not found in inventory: %r" % shape)
 
2087
        if extras:
 
2088
            self.fail("unexpected paths found in inventory: %r" % extras)
 
2089
 
 
2090
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2091
                         a_callable=None, *args, **kwargs):
 
2092
        """Call callable with redirected std io pipes.
 
2093
 
 
2094
        Returns the return code."""
 
2095
        if not callable(a_callable):
 
2096
            raise ValueError("a_callable must be callable.")
 
2097
        if stdin is None:
 
2098
            stdin = StringIO("")
 
2099
        if stdout is None:
 
2100
            if getattr(self, "_log_file", None) is not None:
 
2101
                stdout = self._log_file
 
2102
            else:
 
2103
                stdout = StringIO()
 
2104
        if stderr is None:
 
2105
            if getattr(self, "_log_file", None is not None):
 
2106
                stderr = self._log_file
 
2107
            else:
 
2108
                stderr = StringIO()
 
2109
        real_stdin = sys.stdin
 
2110
        real_stdout = sys.stdout
 
2111
        real_stderr = sys.stderr
 
2112
        try:
 
2113
            sys.stdout = stdout
 
2114
            sys.stderr = stderr
 
2115
            sys.stdin = stdin
 
2116
            return a_callable(*args, **kwargs)
 
2117
        finally:
 
2118
            sys.stdout = real_stdout
 
2119
            sys.stderr = real_stderr
 
2120
            sys.stdin = real_stdin
 
2121
 
 
2122
    def reduceLockdirTimeout(self):
 
2123
        """Reduce the default lock timeout for the duration of the test, so that
 
2124
        if LockContention occurs during a test, it does so quickly.
 
2125
 
 
2126
        Tests that expect to provoke LockContention errors should call this.
 
2127
        """
 
2128
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
2129
        def resetTimeout():
 
2130
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
2131
        self.addCleanup(resetTimeout)
 
2132
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
2133
 
 
2134
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2135
        """Return a StringIOWrapper instance, that will encode Unicode
 
2136
        input to UTF-8.
 
2137
        """
 
2138
        if encoding_type is None:
 
2139
            encoding_type = 'strict'
 
2140
        sio = StringIO()
 
2141
        output_encoding = 'utf-8'
 
2142
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
2143
        sio.encoding = output_encoding
 
2144
        return sio
 
2145
 
 
2146
    def disable_verb(self, verb):
 
2147
        """Disable a smart server verb for one test."""
 
2148
        from bzrlib.smart import request
 
2149
        request_handlers = request.request_handlers
 
2150
        orig_method = request_handlers.get(verb)
 
2151
        request_handlers.remove(verb)
 
2152
        def restoreVerb():
 
2153
            request_handlers.register(verb, orig_method)
 
2154
        self.addCleanup(restoreVerb)
 
2155
 
 
2156
 
 
2157
class CapturedCall(object):
 
2158
    """A helper for capturing smart server calls for easy debug analysis."""
 
2159
 
 
2160
    def __init__(self, params, prefix_length):
 
2161
        """Capture the call with params and skip prefix_length stack frames."""
 
2162
        self.call = params
 
2163
        import traceback
 
2164
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2165
        # client frames. Beyond this we could get more clever, but this is good
 
2166
        # enough for now.
 
2167
        stack = traceback.extract_stack()[prefix_length:-5]
 
2168
        self.stack = ''.join(traceback.format_list(stack))
 
2169
 
 
2170
    def __str__(self):
 
2171
        return self.call.method
 
2172
 
 
2173
    def __repr__(self):
 
2174
        return self.call.method
 
2175
 
 
2176
    def stack(self):
 
2177
        return self.stack
 
2178
 
 
2179
 
 
2180
class TestCaseWithMemoryTransport(TestCase):
 
2181
    """Common test class for tests that do not need disk resources.
 
2182
 
 
2183
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2184
 
 
2185
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2186
 
 
2187
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2188
    a directory which does not exist. This serves to help ensure test isolation
 
2189
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2190
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2191
    file defaults for the transport in tests, nor does it obey the command line
 
2192
    override, so tests that accidentally write to the common directory should
 
2193
    be rare.
 
2194
 
 
2195
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2196
    a .bzr directory that stops us ascending higher into the filesystem.
 
2197
    """
 
2198
 
 
2199
    TEST_ROOT = None
 
2200
    _TEST_NAME = 'test'
 
2201
 
 
2202
    def __init__(self, methodName='runTest'):
 
2203
        # allow test parameterization after test construction and before test
 
2204
        # execution. Variables that the parameterizer sets need to be
 
2205
        # ones that are not set by setUp, or setUp will trash them.
 
2206
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2207
        self.vfs_transport_factory = default_transport
 
2208
        self.transport_server = None
 
2209
        self.transport_readonly_server = None
 
2210
        self.__vfs_server = None
 
2211
 
 
2212
    def get_transport(self, relpath=None):
 
2213
        """Return a writeable transport.
 
2214
 
 
2215
        This transport is for the test scratch space relative to
 
2216
        "self._test_root"
 
2217
 
 
2218
        :param relpath: a path relative to the base url.
 
2219
        """
 
2220
        t = get_transport(self.get_url(relpath))
 
2221
        self.assertFalse(t.is_readonly())
 
2222
        return t
 
2223
 
 
2224
    def get_readonly_transport(self, relpath=None):
 
2225
        """Return a readonly transport for the test scratch space
 
2226
 
 
2227
        This can be used to test that operations which should only need
 
2228
        readonly access in fact do not try to write.
 
2229
 
 
2230
        :param relpath: a path relative to the base url.
 
2231
        """
 
2232
        t = get_transport(self.get_readonly_url(relpath))
 
2233
        self.assertTrue(t.is_readonly())
 
2234
        return t
 
2235
 
 
2236
    def create_transport_readonly_server(self):
 
2237
        """Create a transport server from class defined at init.
 
2238
 
 
2239
        This is mostly a hook for daughter classes.
 
2240
        """
 
2241
        return self.transport_readonly_server()
 
2242
 
 
2243
    def get_readonly_server(self):
 
2244
        """Get the server instance for the readonly transport
 
2245
 
 
2246
        This is useful for some tests with specific servers to do diagnostics.
 
2247
        """
 
2248
        if self.__readonly_server is None:
 
2249
            if self.transport_readonly_server is None:
 
2250
                # readonly decorator requested
 
2251
                self.__readonly_server = ReadonlyServer()
 
2252
            else:
 
2253
                # explicit readonly transport.
 
2254
                self.__readonly_server = self.create_transport_readonly_server()
 
2255
            self.start_server(self.__readonly_server,
 
2256
                self.get_vfs_only_server())
 
2257
        return self.__readonly_server
 
2258
 
 
2259
    def get_readonly_url(self, relpath=None):
 
2260
        """Get a URL for the readonly transport.
 
2261
 
 
2262
        This will either be backed by '.' or a decorator to the transport
 
2263
        used by self.get_url()
 
2264
        relpath provides for clients to get a path relative to the base url.
 
2265
        These should only be downwards relative, not upwards.
 
2266
        """
 
2267
        base = self.get_readonly_server().get_url()
 
2268
        return self._adjust_url(base, relpath)
 
2269
 
 
2270
    def get_vfs_only_server(self):
 
2271
        """Get the vfs only read/write server instance.
 
2272
 
 
2273
        This is useful for some tests with specific servers that need
 
2274
        diagnostics.
 
2275
 
 
2276
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2277
        is no means to override it.
 
2278
        """
 
2279
        if self.__vfs_server is None:
 
2280
            self.__vfs_server = MemoryServer()
 
2281
            self.start_server(self.__vfs_server)
 
2282
        return self.__vfs_server
 
2283
 
 
2284
    def get_server(self):
 
2285
        """Get the read/write server instance.
 
2286
 
 
2287
        This is useful for some tests with specific servers that need
 
2288
        diagnostics.
 
2289
 
 
2290
        This is built from the self.transport_server factory. If that is None,
 
2291
        then the self.get_vfs_server is returned.
 
2292
        """
 
2293
        if self.__server is None:
 
2294
            if (self.transport_server is None or self.transport_server is
 
2295
                self.vfs_transport_factory):
 
2296
                self.__server = self.get_vfs_only_server()
 
2297
            else:
 
2298
                # bring up a decorated means of access to the vfs only server.
 
2299
                self.__server = self.transport_server()
 
2300
                self.start_server(self.__server, self.get_vfs_only_server())
 
2301
        return self.__server
 
2302
 
 
2303
    def _adjust_url(self, base, relpath):
 
2304
        """Get a URL (or maybe a path) for the readwrite transport.
 
2305
 
 
2306
        This will either be backed by '.' or to an equivalent non-file based
 
2307
        facility.
 
2308
        relpath provides for clients to get a path relative to the base url.
 
2309
        These should only be downwards relative, not upwards.
 
2310
        """
 
2311
        if relpath is not None and relpath != '.':
 
2312
            if not base.endswith('/'):
 
2313
                base = base + '/'
 
2314
            # XXX: Really base should be a url; we did after all call
 
2315
            # get_url()!  But sometimes it's just a path (from
 
2316
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2317
            # to a non-escaped local path.
 
2318
            if base.startswith('./') or base.startswith('/'):
 
2319
                base += relpath
 
2320
            else:
 
2321
                base += urlutils.escape(relpath)
 
2322
        return base
 
2323
 
 
2324
    def get_url(self, relpath=None):
 
2325
        """Get a URL (or maybe a path) for the readwrite transport.
 
2326
 
 
2327
        This will either be backed by '.' or to an equivalent non-file based
 
2328
        facility.
 
2329
        relpath provides for clients to get a path relative to the base url.
 
2330
        These should only be downwards relative, not upwards.
 
2331
        """
 
2332
        base = self.get_server().get_url()
 
2333
        return self._adjust_url(base, relpath)
 
2334
 
 
2335
    def get_vfs_only_url(self, relpath=None):
 
2336
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2337
 
 
2338
        This will never be a smart protocol.  It always has all the
 
2339
        capabilities of the local filesystem, but it might actually be a
 
2340
        MemoryTransport or some other similar virtual filesystem.
 
2341
 
 
2342
        This is the backing transport (if any) of the server returned by
 
2343
        get_url and get_readonly_url.
 
2344
 
 
2345
        :param relpath: provides for clients to get a path relative to the base
 
2346
            url.  These should only be downwards relative, not upwards.
 
2347
        :return: A URL
 
2348
        """
 
2349
        base = self.get_vfs_only_server().get_url()
 
2350
        return self._adjust_url(base, relpath)
 
2351
 
 
2352
    def _create_safety_net(self):
 
2353
        """Make a fake bzr directory.
 
2354
 
 
2355
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2356
        real branch.
 
2357
        """
 
2358
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2359
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2360
 
 
2361
    def _check_safety_net(self):
 
2362
        """Check that the safety .bzr directory have not been touched.
 
2363
 
 
2364
        _make_test_root have created a .bzr directory to prevent tests from
 
2365
        propagating. This method ensures than a test did not leaked.
 
2366
        """
 
2367
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2368
        self.permit_url(get_transport(root).base)
 
2369
        wt = workingtree.WorkingTree.open(root)
 
2370
        last_rev = wt.last_revision()
 
2371
        if last_rev != 'null:':
 
2372
            # The current test have modified the /bzr directory, we need to
 
2373
            # recreate a new one or all the followng tests will fail.
 
2374
            # If you need to inspect its content uncomment the following line
 
2375
            # import pdb; pdb.set_trace()
 
2376
            _rmtree_temp_dir(root + '/.bzr')
 
2377
            self._create_safety_net()
 
2378
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2379
 
 
2380
    def _make_test_root(self):
 
2381
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2382
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2383
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2384
                                                    suffix='.tmp'))
 
2385
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2386
 
 
2387
            self._create_safety_net()
 
2388
 
 
2389
            # The same directory is used by all tests, and we're not
 
2390
            # specifically told when all tests are finished.  This will do.
 
2391
            atexit.register(_rmtree_temp_dir, root)
 
2392
 
 
2393
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2394
        self.addCleanup(self._check_safety_net)
 
2395
 
 
2396
    def makeAndChdirToTestDir(self):
 
2397
        """Create a temporary directories for this one test.
 
2398
 
 
2399
        This must set self.test_home_dir and self.test_dir and chdir to
 
2400
        self.test_dir.
 
2401
 
 
2402
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2403
        """
 
2404
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2405
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2406
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2407
        self.permit_dir(self.test_dir)
 
2408
 
 
2409
    def make_branch(self, relpath, format=None):
 
2410
        """Create a branch on the transport at relpath."""
 
2411
        repo = self.make_repository(relpath, format=format)
 
2412
        return repo.bzrdir.create_branch()
 
2413
 
 
2414
    def make_bzrdir(self, relpath, format=None):
 
2415
        try:
 
2416
            # might be a relative or absolute path
 
2417
            maybe_a_url = self.get_url(relpath)
 
2418
            segments = maybe_a_url.rsplit('/', 1)
 
2419
            t = get_transport(maybe_a_url)
 
2420
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2421
                t.ensure_base()
 
2422
            if format is None:
 
2423
                format = 'default'
 
2424
            if isinstance(format, basestring):
 
2425
                format = bzrdir.format_registry.make_bzrdir(format)
 
2426
            return format.initialize_on_transport(t)
 
2427
        except errors.UninitializableFormat:
 
2428
            raise TestSkipped("Format %s is not initializable." % format)
 
2429
 
 
2430
    def make_repository(self, relpath, shared=False, format=None):
 
2431
        """Create a repository on our default transport at relpath.
 
2432
 
 
2433
        Note that relpath must be a relative path, not a full url.
 
2434
        """
 
2435
        # FIXME: If you create a remoterepository this returns the underlying
 
2436
        # real format, which is incorrect.  Actually we should make sure that
 
2437
        # RemoteBzrDir returns a RemoteRepository.
 
2438
        # maybe  mbp 20070410
 
2439
        made_control = self.make_bzrdir(relpath, format=format)
 
2440
        return made_control.create_repository(shared=shared)
 
2441
 
 
2442
    def make_smart_server(self, path):
 
2443
        smart_server = server.SmartTCPServer_for_testing()
 
2444
        self.start_server(smart_server, self.get_server())
 
2445
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2446
        return remote_transport
 
2447
 
 
2448
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2449
        """Create a branch on the default transport and a MemoryTree for it."""
 
2450
        b = self.make_branch(relpath, format=format)
 
2451
        return memorytree.MemoryTree.create_on_branch(b)
 
2452
 
 
2453
    def make_branch_builder(self, relpath, format=None):
 
2454
        branch = self.make_branch(relpath, format=format)
 
2455
        return branchbuilder.BranchBuilder(branch=branch)
 
2456
 
 
2457
    def overrideEnvironmentForTesting(self):
 
2458
        os.environ['HOME'] = self.test_home_dir
 
2459
        os.environ['BZR_HOME'] = self.test_home_dir
 
2460
 
 
2461
    def setUp(self):
 
2462
        super(TestCaseWithMemoryTransport, self).setUp()
 
2463
        self._make_test_root()
 
2464
        _currentdir = os.getcwdu()
 
2465
        def _leaveDirectory():
 
2466
            os.chdir(_currentdir)
 
2467
        self.addCleanup(_leaveDirectory)
 
2468
        self.makeAndChdirToTestDir()
 
2469
        self.overrideEnvironmentForTesting()
 
2470
        self.__readonly_server = None
 
2471
        self.__server = None
 
2472
        self.reduceLockdirTimeout()
 
2473
 
 
2474
    def setup_smart_server_with_call_log(self):
 
2475
        """Sets up a smart server as the transport server with a call log."""
 
2476
        self.transport_server = server.SmartTCPServer_for_testing
 
2477
        self.hpss_calls = []
 
2478
        import traceback
 
2479
        # Skip the current stack down to the caller of
 
2480
        # setup_smart_server_with_call_log
 
2481
        prefix_length = len(traceback.extract_stack()) - 2
 
2482
        def capture_hpss_call(params):
 
2483
            self.hpss_calls.append(
 
2484
                CapturedCall(params, prefix_length))
 
2485
        client._SmartClient.hooks.install_named_hook(
 
2486
            'call', capture_hpss_call, None)
 
2487
 
 
2488
    def reset_smart_call_log(self):
 
2489
        self.hpss_calls = []
 
2490
 
 
2491
 
 
2492
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2493
    """Derived class that runs a test within a temporary directory.
 
2494
 
 
2495
    This is useful for tests that need to create a branch, etc.
 
2496
 
 
2497
    The directory is created in a slightly complex way: for each
 
2498
    Python invocation, a new temporary top-level directory is created.
 
2499
    All test cases create their own directory within that.  If the
 
2500
    tests complete successfully, the directory is removed.
 
2501
 
 
2502
    :ivar test_base_dir: The path of the top-level directory for this
 
2503
    test, which contains a home directory and a work directory.
 
2504
 
 
2505
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2506
    which is used as $HOME for this test.
 
2507
 
 
2508
    :ivar test_dir: A directory under test_base_dir used as the current
 
2509
    directory when the test proper is run.
 
2510
    """
 
2511
 
 
2512
    OVERRIDE_PYTHON = 'python'
 
2513
 
 
2514
    def check_file_contents(self, filename, expect):
 
2515
        self.log("check contents of file %s" % filename)
 
2516
        contents = file(filename, 'r').read()
 
2517
        if contents != expect:
 
2518
            self.log("expected: %r" % expect)
 
2519
            self.log("actually: %r" % contents)
 
2520
            self.fail("contents of %s not as expected" % filename)
 
2521
 
 
2522
    def _getTestDirPrefix(self):
 
2523
        # create a directory within the top level test directory
 
2524
        if sys.platform in ('win32', 'cygwin'):
 
2525
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2526
            # windows is likely to have path-length limits so use a short name
 
2527
            name_prefix = name_prefix[-30:]
 
2528
        else:
 
2529
            name_prefix = re.sub('[/]', '_', self.id())
 
2530
        return name_prefix
 
2531
 
 
2532
    def makeAndChdirToTestDir(self):
 
2533
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2534
 
 
2535
        For TestCaseInTempDir we create a temporary directory based on the test
 
2536
        name and then create two subdirs - test and home under it.
 
2537
        """
 
2538
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2539
            self._getTestDirPrefix())
 
2540
        name = name_prefix
 
2541
        for i in range(100):
 
2542
            if os.path.exists(name):
 
2543
                name = name_prefix + '_' + str(i)
 
2544
            else:
 
2545
                # now create test and home directories within this dir
 
2546
                self.test_base_dir = name
 
2547
                self.addCleanup(self.deleteTestDir)
 
2548
                os.mkdir(self.test_base_dir)
 
2549
                break
 
2550
        self.permit_dir(self.test_base_dir)
 
2551
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2552
        # stacking policy to honour; create a bzr dir with an unshared
 
2553
        # repository (but not a branch - our code would be trying to escape
 
2554
        # then!) to stop them, and permit it to be read.
 
2555
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2556
        # control.create_repository()
 
2557
        self.test_home_dir = self.test_base_dir + '/home'
 
2558
        os.mkdir(self.test_home_dir)
 
2559
        self.test_dir = self.test_base_dir + '/work'
 
2560
        os.mkdir(self.test_dir)
 
2561
        os.chdir(self.test_dir)
 
2562
        # put name of test inside
 
2563
        f = file(self.test_base_dir + '/name', 'w')
 
2564
        try:
 
2565
            f.write(self.id())
 
2566
        finally:
 
2567
            f.close()
 
2568
 
 
2569
    def deleteTestDir(self):
 
2570
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2571
        _rmtree_temp_dir(self.test_base_dir)
 
2572
 
 
2573
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2574
        """Build a test tree according to a pattern.
 
2575
 
 
2576
        shape is a sequence of file specifications.  If the final
 
2577
        character is '/', a directory is created.
 
2578
 
 
2579
        This assumes that all the elements in the tree being built are new.
 
2580
 
 
2581
        This doesn't add anything to a branch.
 
2582
 
 
2583
        :type shape:    list or tuple.
 
2584
        :param line_endings: Either 'binary' or 'native'
 
2585
            in binary mode, exact contents are written in native mode, the
 
2586
            line endings match the default platform endings.
 
2587
        :param transport: A transport to write to, for building trees on VFS's.
 
2588
            If the transport is readonly or None, "." is opened automatically.
 
2589
        :return: None
 
2590
        """
 
2591
        if type(shape) not in (list, tuple):
 
2592
            raise AssertionError("Parameter 'shape' should be "
 
2593
                "a list or a tuple. Got %r instead" % (shape,))
 
2594
        # It's OK to just create them using forward slashes on windows.
 
2595
        if transport is None or transport.is_readonly():
 
2596
            transport = get_transport(".")
 
2597
        for name in shape:
 
2598
            self.assertIsInstance(name, basestring)
 
2599
            if name[-1] == '/':
 
2600
                transport.mkdir(urlutils.escape(name[:-1]))
 
2601
            else:
 
2602
                if line_endings == 'binary':
 
2603
                    end = '\n'
 
2604
                elif line_endings == 'native':
 
2605
                    end = os.linesep
 
2606
                else:
 
2607
                    raise errors.BzrError(
 
2608
                        'Invalid line ending request %r' % line_endings)
 
2609
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2610
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2611
 
 
2612
    def build_tree_contents(self, shape):
 
2613
        build_tree_contents(shape)
 
2614
 
 
2615
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2616
        """Assert whether path or paths are in the WorkingTree"""
 
2617
        if tree is None:
 
2618
            tree = workingtree.WorkingTree.open(root_path)
 
2619
        if not isinstance(path, basestring):
 
2620
            for p in path:
 
2621
                self.assertInWorkingTree(p, tree=tree)
 
2622
        else:
 
2623
            self.assertIsNot(tree.path2id(path), None,
 
2624
                path+' not in working tree.')
 
2625
 
 
2626
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2627
        """Assert whether path or paths are not in the WorkingTree"""
 
2628
        if tree is None:
 
2629
            tree = workingtree.WorkingTree.open(root_path)
 
2630
        if not isinstance(path, basestring):
 
2631
            for p in path:
 
2632
                self.assertNotInWorkingTree(p,tree=tree)
 
2633
        else:
 
2634
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2635
 
 
2636
 
 
2637
class TestCaseWithTransport(TestCaseInTempDir):
 
2638
    """A test case that provides get_url and get_readonly_url facilities.
 
2639
 
 
2640
    These back onto two transport servers, one for readonly access and one for
 
2641
    read write access.
 
2642
 
 
2643
    If no explicit class is provided for readonly access, a
 
2644
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2645
    based read write transports.
 
2646
 
 
2647
    If an explicit class is provided for readonly access, that server and the
 
2648
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2649
    """
 
2650
 
 
2651
    def get_vfs_only_server(self):
 
2652
        """See TestCaseWithMemoryTransport.
 
2653
 
 
2654
        This is useful for some tests with specific servers that need
 
2655
        diagnostics.
 
2656
        """
 
2657
        if self.__vfs_server is None:
 
2658
            self.__vfs_server = self.vfs_transport_factory()
 
2659
            self.start_server(self.__vfs_server)
 
2660
        return self.__vfs_server
 
2661
 
 
2662
    def make_branch_and_tree(self, relpath, format=None):
 
2663
        """Create a branch on the transport and a tree locally.
 
2664
 
 
2665
        If the transport is not a LocalTransport, the Tree can't be created on
 
2666
        the transport.  In that case if the vfs_transport_factory is
 
2667
        LocalURLServer the working tree is created in the local
 
2668
        directory backing the transport, and the returned tree's branch and
 
2669
        repository will also be accessed locally. Otherwise a lightweight
 
2670
        checkout is created and returned.
 
2671
 
 
2672
        We do this because we can't physically create a tree in the local
 
2673
        path, with a branch reference to the transport_factory url, and
 
2674
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2675
        namespace is distinct from the local disk - the two branch objects
 
2676
        would collide. While we could construct a tree with its branch object
 
2677
        pointing at the transport_factory transport in memory, reopening it
 
2678
        would behaving unexpectedly, and has in the past caused testing bugs
 
2679
        when we tried to do it that way.
 
2680
 
 
2681
        :param format: The BzrDirFormat.
 
2682
        :returns: the WorkingTree.
 
2683
        """
 
2684
        # TODO: always use the local disk path for the working tree,
 
2685
        # this obviously requires a format that supports branch references
 
2686
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2687
        # RBC 20060208
 
2688
        b = self.make_branch(relpath, format=format)
 
2689
        try:
 
2690
            return b.bzrdir.create_workingtree()
 
2691
        except errors.NotLocalUrl:
 
2692
            # We can only make working trees locally at the moment.  If the
 
2693
            # transport can't support them, then we keep the non-disk-backed
 
2694
            # branch and create a local checkout.
 
2695
            if self.vfs_transport_factory is LocalURLServer:
 
2696
                # the branch is colocated on disk, we cannot create a checkout.
 
2697
                # hopefully callers will expect this.
 
2698
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2699
                wt = local_controldir.create_workingtree()
 
2700
                if wt.branch._format != b._format:
 
2701
                    wt._branch = b
 
2702
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2703
                    # in case the implementation details of workingtree objects
 
2704
                    # change.
 
2705
                    self.assertIs(b, wt.branch)
 
2706
                return wt
 
2707
            else:
 
2708
                return b.create_checkout(relpath, lightweight=True)
 
2709
 
 
2710
    def assertIsDirectory(self, relpath, transport):
 
2711
        """Assert that relpath within transport is a directory.
 
2712
 
 
2713
        This may not be possible on all transports; in that case it propagates
 
2714
        a TransportNotPossible.
 
2715
        """
 
2716
        try:
 
2717
            mode = transport.stat(relpath).st_mode
 
2718
        except errors.NoSuchFile:
 
2719
            self.fail("path %s is not a directory; no such file"
 
2720
                      % (relpath))
 
2721
        if not stat.S_ISDIR(mode):
 
2722
            self.fail("path %s is not a directory; has mode %#o"
 
2723
                      % (relpath, mode))
 
2724
 
 
2725
    def assertTreesEqual(self, left, right):
 
2726
        """Check that left and right have the same content and properties."""
 
2727
        # we use a tree delta to check for equality of the content, and we
 
2728
        # manually check for equality of other things such as the parents list.
 
2729
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2730
        differences = left.changes_from(right)
 
2731
        self.assertFalse(differences.has_changed(),
 
2732
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2733
 
 
2734
    def setUp(self):
 
2735
        super(TestCaseWithTransport, self).setUp()
 
2736
        self.__vfs_server = None
 
2737
 
 
2738
    def disable_missing_extensions_warning(self):
 
2739
        """Some tests expect a precise stderr content.
 
2740
 
 
2741
        There is no point in forcing them to duplicate the extension related
 
2742
        warning.
 
2743
        """
 
2744
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
2745
 
 
2746
 
 
2747
class ChrootedTestCase(TestCaseWithTransport):
 
2748
    """A support class that provides readonly urls outside the local namespace.
 
2749
 
 
2750
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2751
    is then we are chrooted already, if it is not then an HttpServer is used
 
2752
    for readonly urls.
 
2753
 
 
2754
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2755
                       be used without needed to redo it when a different
 
2756
                       subclass is in use ?
 
2757
    """
 
2758
 
 
2759
    def setUp(self):
 
2760
        super(ChrootedTestCase, self).setUp()
 
2761
        if not self.vfs_transport_factory == MemoryServer:
 
2762
            self.transport_readonly_server = HttpServer
 
2763
 
 
2764
 
 
2765
def condition_id_re(pattern):
 
2766
    """Create a condition filter which performs a re check on a test's id.
 
2767
 
 
2768
    :param pattern: A regular expression string.
 
2769
    :return: A callable that returns True if the re matches.
 
2770
    """
 
2771
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2772
        'test filter')
 
2773
    def condition(test):
 
2774
        test_id = test.id()
 
2775
        return filter_re.search(test_id)
 
2776
    return condition
 
2777
 
 
2778
 
 
2779
def condition_isinstance(klass_or_klass_list):
 
2780
    """Create a condition filter which returns isinstance(param, klass).
 
2781
 
 
2782
    :return: A callable which when called with one parameter obj return the
 
2783
        result of isinstance(obj, klass_or_klass_list).
 
2784
    """
 
2785
    def condition(obj):
 
2786
        return isinstance(obj, klass_or_klass_list)
 
2787
    return condition
 
2788
 
 
2789
 
 
2790
def condition_id_in_list(id_list):
 
2791
    """Create a condition filter which verify that test's id in a list.
 
2792
 
 
2793
    :param id_list: A TestIdList object.
 
2794
    :return: A callable that returns True if the test's id appears in the list.
 
2795
    """
 
2796
    def condition(test):
 
2797
        return id_list.includes(test.id())
 
2798
    return condition
 
2799
 
 
2800
 
 
2801
def condition_id_startswith(starts):
 
2802
    """Create a condition filter verifying that test's id starts with a string.
 
2803
 
 
2804
    :param starts: A list of string.
 
2805
    :return: A callable that returns True if the test's id starts with one of
 
2806
        the given strings.
 
2807
    """
 
2808
    def condition(test):
 
2809
        for start in starts:
 
2810
            if test.id().startswith(start):
 
2811
                return True
 
2812
        return False
 
2813
    return condition
 
2814
 
 
2815
 
 
2816
def exclude_tests_by_condition(suite, condition):
 
2817
    """Create a test suite which excludes some tests from suite.
 
2818
 
 
2819
    :param suite: The suite to get tests from.
 
2820
    :param condition: A callable whose result evaluates True when called with a
 
2821
        test case which should be excluded from the result.
 
2822
    :return: A suite which contains the tests found in suite that fail
 
2823
        condition.
 
2824
    """
 
2825
    result = []
 
2826
    for test in iter_suite_tests(suite):
 
2827
        if not condition(test):
 
2828
            result.append(test)
 
2829
    return TestUtil.TestSuite(result)
 
2830
 
 
2831
 
 
2832
def filter_suite_by_condition(suite, condition):
 
2833
    """Create a test suite by filtering another one.
 
2834
 
 
2835
    :param suite: The source suite.
 
2836
    :param condition: A callable whose result evaluates True when called with a
 
2837
        test case which should be included in the result.
 
2838
    :return: A suite which contains the tests found in suite that pass
 
2839
        condition.
 
2840
    """
 
2841
    result = []
 
2842
    for test in iter_suite_tests(suite):
 
2843
        if condition(test):
 
2844
            result.append(test)
 
2845
    return TestUtil.TestSuite(result)
 
2846
 
 
2847
 
 
2848
def filter_suite_by_re(suite, pattern):
 
2849
    """Create a test suite by filtering another one.
 
2850
 
 
2851
    :param suite:           the source suite
 
2852
    :param pattern:         pattern that names must match
 
2853
    :returns: the newly created suite
 
2854
    """
 
2855
    condition = condition_id_re(pattern)
 
2856
    result_suite = filter_suite_by_condition(suite, condition)
 
2857
    return result_suite
 
2858
 
 
2859
 
 
2860
def filter_suite_by_id_list(suite, test_id_list):
 
2861
    """Create a test suite by filtering another one.
 
2862
 
 
2863
    :param suite: The source suite.
 
2864
    :param test_id_list: A list of the test ids to keep as strings.
 
2865
    :returns: the newly created suite
 
2866
    """
 
2867
    condition = condition_id_in_list(test_id_list)
 
2868
    result_suite = filter_suite_by_condition(suite, condition)
 
2869
    return result_suite
 
2870
 
 
2871
 
 
2872
def filter_suite_by_id_startswith(suite, start):
 
2873
    """Create a test suite by filtering another one.
 
2874
 
 
2875
    :param suite: The source suite.
 
2876
    :param start: A list of string the test id must start with one of.
 
2877
    :returns: the newly created suite
 
2878
    """
 
2879
    condition = condition_id_startswith(start)
 
2880
    result_suite = filter_suite_by_condition(suite, condition)
 
2881
    return result_suite
 
2882
 
 
2883
 
 
2884
def exclude_tests_by_re(suite, pattern):
 
2885
    """Create a test suite which excludes some tests from suite.
 
2886
 
 
2887
    :param suite: The suite to get tests from.
 
2888
    :param pattern: A regular expression string. Test ids that match this
 
2889
        pattern will be excluded from the result.
 
2890
    :return: A TestSuite that contains all the tests from suite without the
 
2891
        tests that matched pattern. The order of tests is the same as it was in
 
2892
        suite.
 
2893
    """
 
2894
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2895
 
 
2896
 
 
2897
def preserve_input(something):
 
2898
    """A helper for performing test suite transformation chains.
 
2899
 
 
2900
    :param something: Anything you want to preserve.
 
2901
    :return: Something.
 
2902
    """
 
2903
    return something
 
2904
 
 
2905
 
 
2906
def randomize_suite(suite):
 
2907
    """Return a new TestSuite with suite's tests in random order.
 
2908
 
 
2909
    The tests in the input suite are flattened into a single suite in order to
 
2910
    accomplish this. Any nested TestSuites are removed to provide global
 
2911
    randomness.
 
2912
    """
 
2913
    tests = list(iter_suite_tests(suite))
 
2914
    random.shuffle(tests)
 
2915
    return TestUtil.TestSuite(tests)
 
2916
 
 
2917
 
 
2918
def split_suite_by_condition(suite, condition):
 
2919
    """Split a test suite into two by a condition.
 
2920
 
 
2921
    :param suite: The suite to split.
 
2922
    :param condition: The condition to match on. Tests that match this
 
2923
        condition are returned in the first test suite, ones that do not match
 
2924
        are in the second suite.
 
2925
    :return: A tuple of two test suites, where the first contains tests from
 
2926
        suite matching the condition, and the second contains the remainder
 
2927
        from suite. The order within each output suite is the same as it was in
 
2928
        suite.
 
2929
    """
 
2930
    matched = []
 
2931
    did_not_match = []
 
2932
    for test in iter_suite_tests(suite):
 
2933
        if condition(test):
 
2934
            matched.append(test)
 
2935
        else:
 
2936
            did_not_match.append(test)
 
2937
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2938
 
 
2939
 
 
2940
def split_suite_by_re(suite, pattern):
 
2941
    """Split a test suite into two by a regular expression.
 
2942
 
 
2943
    :param suite: The suite to split.
 
2944
    :param pattern: A regular expression string. Test ids that match this
 
2945
        pattern will be in the first test suite returned, and the others in the
 
2946
        second test suite returned.
 
2947
    :return: A tuple of two test suites, where the first contains tests from
 
2948
        suite matching pattern, and the second contains the remainder from
 
2949
        suite. The order within each output suite is the same as it was in
 
2950
        suite.
 
2951
    """
 
2952
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2953
 
 
2954
 
 
2955
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2956
              stop_on_failure=False,
 
2957
              transport=None, lsprof_timed=None, bench_history=None,
 
2958
              matching_tests_first=None,
 
2959
              list_only=False,
 
2960
              random_seed=None,
 
2961
              exclude_pattern=None,
 
2962
              strict=False,
 
2963
              runner_class=None,
 
2964
              suite_decorators=None,
 
2965
              stream=None,
 
2966
              result_decorators=None,
 
2967
              ):
 
2968
    """Run a test suite for bzr selftest.
 
2969
 
 
2970
    :param runner_class: The class of runner to use. Must support the
 
2971
        constructor arguments passed by run_suite which are more than standard
 
2972
        python uses.
 
2973
    :return: A boolean indicating success.
 
2974
    """
 
2975
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2976
    if verbose:
 
2977
        verbosity = 2
 
2978
    else:
 
2979
        verbosity = 1
 
2980
    if runner_class is None:
 
2981
        runner_class = TextTestRunner
 
2982
    if stream is None:
 
2983
        stream = sys.stdout
 
2984
    runner = runner_class(stream=stream,
 
2985
                            descriptions=0,
 
2986
                            verbosity=verbosity,
 
2987
                            bench_history=bench_history,
 
2988
                            strict=strict,
 
2989
                            result_decorators=result_decorators,
 
2990
                            )
 
2991
    runner.stop_on_failure=stop_on_failure
 
2992
    # built in decorator factories:
 
2993
    decorators = [
 
2994
        random_order(random_seed, runner),
 
2995
        exclude_tests(exclude_pattern),
 
2996
        ]
 
2997
    if matching_tests_first:
 
2998
        decorators.append(tests_first(pattern))
 
2999
    else:
 
3000
        decorators.append(filter_tests(pattern))
 
3001
    if suite_decorators:
 
3002
        decorators.extend(suite_decorators)
 
3003
    # tell the result object how many tests will be running: (except if
 
3004
    # --parallel=fork is being used. Robert said he will provide a better
 
3005
    # progress design later -- vila 20090817)
 
3006
    if fork_decorator not in decorators:
 
3007
        decorators.append(CountingDecorator)
 
3008
    for decorator in decorators:
 
3009
        suite = decorator(suite)
 
3010
    if list_only:
 
3011
        # Done after test suite decoration to allow randomisation etc
 
3012
        # to take effect, though that is of marginal benefit.
 
3013
        if verbosity >= 2:
 
3014
            stream.write("Listing tests only ...\n")
 
3015
        for t in iter_suite_tests(suite):
 
3016
            stream.write("%s\n" % (t.id()))
 
3017
        return True
 
3018
    result = runner.run(suite)
 
3019
    if strict:
 
3020
        return result.wasStrictlySuccessful()
 
3021
    else:
 
3022
        return result.wasSuccessful()
 
3023
 
 
3024
 
 
3025
# A registry where get() returns a suite decorator.
 
3026
parallel_registry = registry.Registry()
 
3027
 
 
3028
 
 
3029
def fork_decorator(suite):
 
3030
    concurrency = osutils.local_concurrency()
 
3031
    if concurrency == 1:
 
3032
        return suite
 
3033
    from testtools import ConcurrentTestSuite
 
3034
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3035
parallel_registry.register('fork', fork_decorator)
 
3036
 
 
3037
 
 
3038
def subprocess_decorator(suite):
 
3039
    concurrency = osutils.local_concurrency()
 
3040
    if concurrency == 1:
 
3041
        return suite
 
3042
    from testtools import ConcurrentTestSuite
 
3043
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3044
parallel_registry.register('subprocess', subprocess_decorator)
 
3045
 
 
3046
 
 
3047
def exclude_tests(exclude_pattern):
 
3048
    """Return a test suite decorator that excludes tests."""
 
3049
    if exclude_pattern is None:
 
3050
        return identity_decorator
 
3051
    def decorator(suite):
 
3052
        return ExcludeDecorator(suite, exclude_pattern)
 
3053
    return decorator
 
3054
 
 
3055
 
 
3056
def filter_tests(pattern):
 
3057
    if pattern == '.*':
 
3058
        return identity_decorator
 
3059
    def decorator(suite):
 
3060
        return FilterTestsDecorator(suite, pattern)
 
3061
    return decorator
 
3062
 
 
3063
 
 
3064
def random_order(random_seed, runner):
 
3065
    """Return a test suite decorator factory for randomising tests order.
 
3066
    
 
3067
    :param random_seed: now, a string which casts to a long, or a long.
 
3068
    :param runner: A test runner with a stream attribute to report on.
 
3069
    """
 
3070
    if random_seed is None:
 
3071
        return identity_decorator
 
3072
    def decorator(suite):
 
3073
        return RandomDecorator(suite, random_seed, runner.stream)
 
3074
    return decorator
 
3075
 
 
3076
 
 
3077
def tests_first(pattern):
 
3078
    if pattern == '.*':
 
3079
        return identity_decorator
 
3080
    def decorator(suite):
 
3081
        return TestFirstDecorator(suite, pattern)
 
3082
    return decorator
 
3083
 
 
3084
 
 
3085
def identity_decorator(suite):
 
3086
    """Return suite."""
 
3087
    return suite
 
3088
 
 
3089
 
 
3090
class TestDecorator(TestSuite):
 
3091
    """A decorator for TestCase/TestSuite objects.
 
3092
    
 
3093
    Usually, subclasses should override __iter__(used when flattening test
 
3094
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
3095
    debug().
 
3096
    """
 
3097
 
 
3098
    def __init__(self, suite):
 
3099
        TestSuite.__init__(self)
 
3100
        self.addTest(suite)
 
3101
 
 
3102
    def countTestCases(self):
 
3103
        cases = 0
 
3104
        for test in self:
 
3105
            cases += test.countTestCases()
 
3106
        return cases
 
3107
 
 
3108
    def debug(self):
 
3109
        for test in self:
 
3110
            test.debug()
 
3111
 
 
3112
    def run(self, result):
 
3113
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
3114
        # into __iter__.
 
3115
        for test in self:
 
3116
            if result.shouldStop:
 
3117
                break
 
3118
            test.run(result)
 
3119
        return result
 
3120
 
 
3121
 
 
3122
class CountingDecorator(TestDecorator):
 
3123
    """A decorator which calls result.progress(self.countTestCases)."""
 
3124
 
 
3125
    def run(self, result):
 
3126
        progress_method = getattr(result, 'progress', None)
 
3127
        if callable(progress_method):
 
3128
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3129
        return super(CountingDecorator, self).run(result)
 
3130
 
 
3131
 
 
3132
class ExcludeDecorator(TestDecorator):
 
3133
    """A decorator which excludes test matching an exclude pattern."""
 
3134
 
 
3135
    def __init__(self, suite, exclude_pattern):
 
3136
        TestDecorator.__init__(self, suite)
 
3137
        self.exclude_pattern = exclude_pattern
 
3138
        self.excluded = False
 
3139
 
 
3140
    def __iter__(self):
 
3141
        if self.excluded:
 
3142
            return iter(self._tests)
 
3143
        self.excluded = True
 
3144
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
3145
        del self._tests[:]
 
3146
        self.addTests(suite)
 
3147
        return iter(self._tests)
 
3148
 
 
3149
 
 
3150
class FilterTestsDecorator(TestDecorator):
 
3151
    """A decorator which filters tests to those matching a pattern."""
 
3152
 
 
3153
    def __init__(self, suite, pattern):
 
3154
        TestDecorator.__init__(self, suite)
 
3155
        self.pattern = pattern
 
3156
        self.filtered = False
 
3157
 
 
3158
    def __iter__(self):
 
3159
        if self.filtered:
 
3160
            return iter(self._tests)
 
3161
        self.filtered = True
 
3162
        suite = filter_suite_by_re(self, self.pattern)
 
3163
        del self._tests[:]
 
3164
        self.addTests(suite)
 
3165
        return iter(self._tests)
 
3166
 
 
3167
 
 
3168
class RandomDecorator(TestDecorator):
 
3169
    """A decorator which randomises the order of its tests."""
 
3170
 
 
3171
    def __init__(self, suite, random_seed, stream):
 
3172
        TestDecorator.__init__(self, suite)
 
3173
        self.random_seed = random_seed
 
3174
        self.randomised = False
 
3175
        self.stream = stream
 
3176
 
 
3177
    def __iter__(self):
 
3178
        if self.randomised:
 
3179
            return iter(self._tests)
 
3180
        self.randomised = True
 
3181
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3182
            (self.actual_seed()))
 
3183
        # Initialise the random number generator.
 
3184
        random.seed(self.actual_seed())
 
3185
        suite = randomize_suite(self)
 
3186
        del self._tests[:]
 
3187
        self.addTests(suite)
 
3188
        return iter(self._tests)
 
3189
 
 
3190
    def actual_seed(self):
 
3191
        if self.random_seed == "now":
 
3192
            # We convert the seed to a long to make it reuseable across
 
3193
            # invocations (because the user can reenter it).
 
3194
            self.random_seed = long(time.time())
 
3195
        else:
 
3196
            # Convert the seed to a long if we can
 
3197
            try:
 
3198
                self.random_seed = long(self.random_seed)
 
3199
            except:
 
3200
                pass
 
3201
        return self.random_seed
 
3202
 
 
3203
 
 
3204
class TestFirstDecorator(TestDecorator):
 
3205
    """A decorator which moves named tests to the front."""
 
3206
 
 
3207
    def __init__(self, suite, pattern):
 
3208
        TestDecorator.__init__(self, suite)
 
3209
        self.pattern = pattern
 
3210
        self.filtered = False
 
3211
 
 
3212
    def __iter__(self):
 
3213
        if self.filtered:
 
3214
            return iter(self._tests)
 
3215
        self.filtered = True
 
3216
        suites = split_suite_by_re(self, self.pattern)
 
3217
        del self._tests[:]
 
3218
        self.addTests(suites)
 
3219
        return iter(self._tests)
 
3220
 
 
3221
 
 
3222
def partition_tests(suite, count):
 
3223
    """Partition suite into count lists of tests."""
 
3224
    result = []
 
3225
    tests = list(iter_suite_tests(suite))
 
3226
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3227
    for block in range(count):
 
3228
        low_test = block * tests_per_process
 
3229
        high_test = low_test + tests_per_process
 
3230
        process_tests = tests[low_test:high_test]
 
3231
        result.append(process_tests)
 
3232
    return result
 
3233
 
 
3234
 
 
3235
def fork_for_tests(suite):
 
3236
    """Take suite and start up one runner per CPU by forking()
 
3237
 
 
3238
    :return: An iterable of TestCase-like objects which can each have
 
3239
        run(result) called on them to feed tests to result.
 
3240
    """
 
3241
    concurrency = osutils.local_concurrency()
 
3242
    result = []
 
3243
    from subunit import TestProtocolClient, ProtocolTestCase
 
3244
    try:
 
3245
        from subunit.test_results import AutoTimingTestResultDecorator
 
3246
    except ImportError:
 
3247
        AutoTimingTestResultDecorator = lambda x:x
 
3248
    class TestInOtherProcess(ProtocolTestCase):
 
3249
        # Should be in subunit, I think. RBC.
 
3250
        def __init__(self, stream, pid):
 
3251
            ProtocolTestCase.__init__(self, stream)
 
3252
            self.pid = pid
 
3253
 
 
3254
        def run(self, result):
 
3255
            try:
 
3256
                ProtocolTestCase.run(self, result)
 
3257
            finally:
 
3258
                os.waitpid(self.pid, os.WNOHANG)
 
3259
 
 
3260
    test_blocks = partition_tests(suite, concurrency)
 
3261
    for process_tests in test_blocks:
 
3262
        process_suite = TestSuite()
 
3263
        process_suite.addTests(process_tests)
 
3264
        c2pread, c2pwrite = os.pipe()
 
3265
        pid = os.fork()
 
3266
        if pid == 0:
 
3267
            try:
 
3268
                os.close(c2pread)
 
3269
                # Leave stderr and stdout open so we can see test noise
 
3270
                # Close stdin so that the child goes away if it decides to
 
3271
                # read from stdin (otherwise its a roulette to see what
 
3272
                # child actually gets keystrokes for pdb etc).
 
3273
                sys.stdin.close()
 
3274
                sys.stdin = None
 
3275
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3276
                subunit_result = AutoTimingTestResultDecorator(
 
3277
                    TestProtocolClient(stream))
 
3278
                process_suite.run(subunit_result)
 
3279
            finally:
 
3280
                os._exit(0)
 
3281
        else:
 
3282
            os.close(c2pwrite)
 
3283
            stream = os.fdopen(c2pread, 'rb', 1)
 
3284
            test = TestInOtherProcess(stream, pid)
 
3285
            result.append(test)
 
3286
    return result
 
3287
 
 
3288
 
 
3289
def reinvoke_for_tests(suite):
 
3290
    """Take suite and start up one runner per CPU using subprocess().
 
3291
 
 
3292
    :return: An iterable of TestCase-like objects which can each have
 
3293
        run(result) called on them to feed tests to result.
 
3294
    """
 
3295
    concurrency = osutils.local_concurrency()
 
3296
    result = []
 
3297
    from subunit import ProtocolTestCase
 
3298
    class TestInSubprocess(ProtocolTestCase):
 
3299
        def __init__(self, process, name):
 
3300
            ProtocolTestCase.__init__(self, process.stdout)
 
3301
            self.process = process
 
3302
            self.process.stdin.close()
 
3303
            self.name = name
 
3304
 
 
3305
        def run(self, result):
 
3306
            try:
 
3307
                ProtocolTestCase.run(self, result)
 
3308
            finally:
 
3309
                self.process.wait()
 
3310
                os.unlink(self.name)
 
3311
            # print "pid %d finished" % finished_process
 
3312
    test_blocks = partition_tests(suite, concurrency)
 
3313
    for process_tests in test_blocks:
 
3314
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3315
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3316
        if not os.path.isfile(bzr_path):
 
3317
            # We are probably installed. Assume sys.argv is the right file
 
3318
            bzr_path = sys.argv[0]
 
3319
        fd, test_list_file_name = tempfile.mkstemp()
 
3320
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3321
        for test in process_tests:
 
3322
            test_list_file.write(test.id() + '\n')
 
3323
        test_list_file.close()
 
3324
        try:
 
3325
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3326
                '--subunit']
 
3327
            if '--no-plugins' in sys.argv:
 
3328
                argv.append('--no-plugins')
 
3329
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3330
            # stderr it can interrupt the subunit protocol.
 
3331
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3332
                bufsize=1)
 
3333
            test = TestInSubprocess(process, test_list_file_name)
 
3334
            result.append(test)
 
3335
        except:
 
3336
            os.unlink(test_list_file_name)
 
3337
            raise
 
3338
    return result
 
3339
 
 
3340
 
 
3341
class ForwardingResult(unittest.TestResult):
 
3342
 
 
3343
    def __init__(self, target):
 
3344
        unittest.TestResult.__init__(self)
 
3345
        self.result = target
 
3346
 
 
3347
    def startTest(self, test):
 
3348
        self.result.startTest(test)
 
3349
 
 
3350
    def stopTest(self, test):
 
3351
        self.result.stopTest(test)
 
3352
 
 
3353
    def startTestRun(self):
 
3354
        self.result.startTestRun()
 
3355
 
 
3356
    def stopTestRun(self):
 
3357
        self.result.stopTestRun()
 
3358
 
 
3359
    def addSkip(self, test, reason):
 
3360
        self.result.addSkip(test, reason)
 
3361
 
 
3362
    def addSuccess(self, test):
 
3363
        self.result.addSuccess(test)
 
3364
 
 
3365
    def addError(self, test, err):
 
3366
        self.result.addError(test, err)
 
3367
 
 
3368
    def addFailure(self, test, err):
 
3369
        self.result.addFailure(test, err)
 
3370
 
 
3371
 
 
3372
class BZRTransformingResult(ForwardingResult):
 
3373
 
 
3374
    def addError(self, test, err):
 
3375
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3376
        if feature is not None:
 
3377
            self.result.addNotSupported(test, feature)
 
3378
        else:
 
3379
            self.result.addError(test, err)
 
3380
 
 
3381
    def addFailure(self, test, err):
 
3382
        known = self._error_looks_like('KnownFailure: ', err)
 
3383
        if known is not None:
 
3384
            self.result._addKnownFailure(test, [KnownFailure,
 
3385
                                                KnownFailure(known), None])
 
3386
        else:
 
3387
            self.result.addFailure(test, err)
 
3388
 
 
3389
    def _error_looks_like(self, prefix, err):
 
3390
        """Deserialize exception and returns the stringify value."""
 
3391
        import subunit
 
3392
        value = None
 
3393
        typ, exc, _ = err
 
3394
        if isinstance(exc, subunit.RemoteException):
 
3395
            # stringify the exception gives access to the remote traceback
 
3396
            # We search the last line for 'prefix'
 
3397
            lines = str(exc).split('\n')
 
3398
            while lines and not lines[-1]:
 
3399
                lines.pop(-1)
 
3400
            if lines:
 
3401
                if lines[-1].startswith(prefix):
 
3402
                    value = lines[-1][len(prefix):]
 
3403
        return value
 
3404
 
 
3405
 
 
3406
class ProfileResult(ForwardingResult):
 
3407
    """Generate profiling data for all activity between start and success.
 
3408
    
 
3409
    The profile data is appended to the test's _benchcalls attribute and can
 
3410
    be accessed by the forwarded-to TestResult.
 
3411
 
 
3412
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3413
    where our existing output support for lsprof is, and this class aims to
 
3414
    fit in with that: while it could be moved it's not necessary to accomplish
 
3415
    test profiling, nor would it be dramatically cleaner.
 
3416
    """
 
3417
 
 
3418
    def startTest(self, test):
 
3419
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3420
        self.profiler.start()
 
3421
        ForwardingResult.startTest(self, test)
 
3422
 
 
3423
    def addSuccess(self, test):
 
3424
        stats = self.profiler.stop()
 
3425
        try:
 
3426
            calls = test._benchcalls
 
3427
        except AttributeError:
 
3428
            test._benchcalls = []
 
3429
            calls = test._benchcalls
 
3430
        calls.append(((test.id(), "", ""), stats))
 
3431
        ForwardingResult.addSuccess(self, test)
 
3432
 
 
3433
    def stopTest(self, test):
 
3434
        ForwardingResult.stopTest(self, test)
 
3435
        self.profiler = None
 
3436
 
 
3437
 
 
3438
# Controlled by "bzr selftest -E=..." option
 
3439
# Currently supported:
 
3440
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3441
#                           preserves any flags supplied at the command line.
 
3442
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3443
#                           rather than failing tests. And no longer raise
 
3444
#                           LockContention when fctnl locks are not being used
 
3445
#                           with proper exclusion rules.
 
3446
selftest_debug_flags = set()
 
3447
 
 
3448
 
 
3449
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3450
             transport=None,
 
3451
             test_suite_factory=None,
 
3452
             lsprof_timed=None,
 
3453
             bench_history=None,
 
3454
             matching_tests_first=None,
 
3455
             list_only=False,
 
3456
             random_seed=None,
 
3457
             exclude_pattern=None,
 
3458
             strict=False,
 
3459
             load_list=None,
 
3460
             debug_flags=None,
 
3461
             starting_with=None,
 
3462
             runner_class=None,
 
3463
             suite_decorators=None,
 
3464
             stream=None,
 
3465
             lsprof_tests=False,
 
3466
             ):
 
3467
    """Run the whole test suite under the enhanced runner"""
 
3468
    # XXX: Very ugly way to do this...
 
3469
    # Disable warning about old formats because we don't want it to disturb
 
3470
    # any blackbox tests.
 
3471
    from bzrlib import repository
 
3472
    repository._deprecation_warning_done = True
 
3473
 
 
3474
    global default_transport
 
3475
    if transport is None:
 
3476
        transport = default_transport
 
3477
    old_transport = default_transport
 
3478
    default_transport = transport
 
3479
    global selftest_debug_flags
 
3480
    old_debug_flags = selftest_debug_flags
 
3481
    if debug_flags is not None:
 
3482
        selftest_debug_flags = set(debug_flags)
 
3483
    try:
 
3484
        if load_list is None:
 
3485
            keep_only = None
 
3486
        else:
 
3487
            keep_only = load_test_id_list(load_list)
 
3488
        if starting_with:
 
3489
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3490
                             for start in starting_with]
 
3491
        if test_suite_factory is None:
 
3492
            # Reduce loading time by loading modules based on the starting_with
 
3493
            # patterns.
 
3494
            suite = test_suite(keep_only, starting_with)
 
3495
        else:
 
3496
            suite = test_suite_factory()
 
3497
        if starting_with:
 
3498
            # But always filter as requested.
 
3499
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3500
        result_decorators = []
 
3501
        if lsprof_tests:
 
3502
            result_decorators.append(ProfileResult)
 
3503
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3504
                     stop_on_failure=stop_on_failure,
 
3505
                     transport=transport,
 
3506
                     lsprof_timed=lsprof_timed,
 
3507
                     bench_history=bench_history,
 
3508
                     matching_tests_first=matching_tests_first,
 
3509
                     list_only=list_only,
 
3510
                     random_seed=random_seed,
 
3511
                     exclude_pattern=exclude_pattern,
 
3512
                     strict=strict,
 
3513
                     runner_class=runner_class,
 
3514
                     suite_decorators=suite_decorators,
 
3515
                     stream=stream,
 
3516
                     result_decorators=result_decorators,
 
3517
                     )
 
3518
    finally:
 
3519
        default_transport = old_transport
 
3520
        selftest_debug_flags = old_debug_flags
 
3521
 
 
3522
 
 
3523
def load_test_id_list(file_name):
 
3524
    """Load a test id list from a text file.
 
3525
 
 
3526
    The format is one test id by line.  No special care is taken to impose
 
3527
    strict rules, these test ids are used to filter the test suite so a test id
 
3528
    that do not match an existing test will do no harm. This allows user to add
 
3529
    comments, leave blank lines, etc.
 
3530
    """
 
3531
    test_list = []
 
3532
    try:
 
3533
        ftest = open(file_name, 'rt')
 
3534
    except IOError, e:
 
3535
        if e.errno != errno.ENOENT:
 
3536
            raise
 
3537
        else:
 
3538
            raise errors.NoSuchFile(file_name)
 
3539
 
 
3540
    for test_name in ftest.readlines():
 
3541
        test_list.append(test_name.strip())
 
3542
    ftest.close()
 
3543
    return test_list
 
3544
 
 
3545
 
 
3546
def suite_matches_id_list(test_suite, id_list):
 
3547
    """Warns about tests not appearing or appearing more than once.
 
3548
 
 
3549
    :param test_suite: A TestSuite object.
 
3550
    :param test_id_list: The list of test ids that should be found in
 
3551
         test_suite.
 
3552
 
 
3553
    :return: (absents, duplicates) absents is a list containing the test found
 
3554
        in id_list but not in test_suite, duplicates is a list containing the
 
3555
        test found multiple times in test_suite.
 
3556
 
 
3557
    When using a prefined test id list, it may occurs that some tests do not
 
3558
    exist anymore or that some tests use the same id. This function warns the
 
3559
    tester about potential problems in his workflow (test lists are volatile)
 
3560
    or in the test suite itself (using the same id for several tests does not
 
3561
    help to localize defects).
 
3562
    """
 
3563
    # Build a dict counting id occurrences
 
3564
    tests = dict()
 
3565
    for test in iter_suite_tests(test_suite):
 
3566
        id = test.id()
 
3567
        tests[id] = tests.get(id, 0) + 1
 
3568
 
 
3569
    not_found = []
 
3570
    duplicates = []
 
3571
    for id in id_list:
 
3572
        occurs = tests.get(id, 0)
 
3573
        if not occurs:
 
3574
            not_found.append(id)
 
3575
        elif occurs > 1:
 
3576
            duplicates.append(id)
 
3577
 
 
3578
    return not_found, duplicates
 
3579
 
 
3580
 
 
3581
class TestIdList(object):
 
3582
    """Test id list to filter a test suite.
 
3583
 
 
3584
    Relying on the assumption that test ids are built as:
 
3585
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3586
    notation, this class offers methods to :
 
3587
    - avoid building a test suite for modules not refered to in the test list,
 
3588
    - keep only the tests listed from the module test suite.
 
3589
    """
 
3590
 
 
3591
    def __init__(self, test_id_list):
 
3592
        # When a test suite needs to be filtered against us we compare test ids
 
3593
        # for equality, so a simple dict offers a quick and simple solution.
 
3594
        self.tests = dict().fromkeys(test_id_list, True)
 
3595
 
 
3596
        # While unittest.TestCase have ids like:
 
3597
        # <module>.<class>.<method>[(<param+)],
 
3598
        # doctest.DocTestCase can have ids like:
 
3599
        # <module>
 
3600
        # <module>.<class>
 
3601
        # <module>.<function>
 
3602
        # <module>.<class>.<method>
 
3603
 
 
3604
        # Since we can't predict a test class from its name only, we settle on
 
3605
        # a simple constraint: a test id always begins with its module name.
 
3606
 
 
3607
        modules = {}
 
3608
        for test_id in test_id_list:
 
3609
            parts = test_id.split('.')
 
3610
            mod_name = parts.pop(0)
 
3611
            modules[mod_name] = True
 
3612
            for part in parts:
 
3613
                mod_name += '.' + part
 
3614
                modules[mod_name] = True
 
3615
        self.modules = modules
 
3616
 
 
3617
    def refers_to(self, module_name):
 
3618
        """Is there tests for the module or one of its sub modules."""
 
3619
        return self.modules.has_key(module_name)
 
3620
 
 
3621
    def includes(self, test_id):
 
3622
        return self.tests.has_key(test_id)
 
3623
 
 
3624
 
 
3625
class TestPrefixAliasRegistry(registry.Registry):
 
3626
    """A registry for test prefix aliases.
 
3627
 
 
3628
    This helps implement shorcuts for the --starting-with selftest
 
3629
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3630
    warning will be emitted).
 
3631
    """
 
3632
 
 
3633
    def register(self, key, obj, help=None, info=None,
 
3634
                 override_existing=False):
 
3635
        """See Registry.register.
 
3636
 
 
3637
        Trying to override an existing alias causes a warning to be emitted,
 
3638
        not a fatal execption.
 
3639
        """
 
3640
        try:
 
3641
            super(TestPrefixAliasRegistry, self).register(
 
3642
                key, obj, help=help, info=info, override_existing=False)
 
3643
        except KeyError:
 
3644
            actual = self.get(key)
 
3645
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3646
                 % (key, actual, obj))
 
3647
 
 
3648
    def resolve_alias(self, id_start):
 
3649
        """Replace the alias by the prefix in the given string.
 
3650
 
 
3651
        Using an unknown prefix is an error to help catching typos.
 
3652
        """
 
3653
        parts = id_start.split('.')
 
3654
        try:
 
3655
            parts[0] = self.get(parts[0])
 
3656
        except KeyError:
 
3657
            raise errors.BzrCommandError(
 
3658
                '%s is not a known test prefix alias' % parts[0])
 
3659
        return '.'.join(parts)
 
3660
 
 
3661
 
 
3662
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3663
"""Registry of test prefix aliases."""
 
3664
 
 
3665
 
 
3666
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3667
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3668
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3669
 
 
3670
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3671
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3672
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3673
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3674
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3675
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3676
 
 
3677
 
 
3678
def _test_suite_testmod_names():
 
3679
    """Return the standard list of test module names to test."""
 
3680
    return [
 
3681
        'bzrlib.doc',
 
3682
        'bzrlib.tests.blackbox',
 
3683
        'bzrlib.tests.commands',
 
3684
        'bzrlib.tests.per_branch',
 
3685
        'bzrlib.tests.per_bzrdir',
 
3686
        'bzrlib.tests.per_foreign_vcs',
 
3687
        'bzrlib.tests.per_interrepository',
 
3688
        'bzrlib.tests.per_intertree',
 
3689
        'bzrlib.tests.per_inventory',
 
3690
        'bzrlib.tests.per_interbranch',
 
3691
        'bzrlib.tests.per_lock',
 
3692
        'bzrlib.tests.per_transport',
 
3693
        'bzrlib.tests.per_tree',
 
3694
        'bzrlib.tests.per_pack_repository',
 
3695
        'bzrlib.tests.per_repository',
 
3696
        'bzrlib.tests.per_repository_chk',
 
3697
        'bzrlib.tests.per_repository_reference',
 
3698
        'bzrlib.tests.per_uifactory',
 
3699
        'bzrlib.tests.per_versionedfile',
 
3700
        'bzrlib.tests.per_workingtree',
 
3701
        'bzrlib.tests.test__annotator',
 
3702
        'bzrlib.tests.test__chk_map',
 
3703
        'bzrlib.tests.test__dirstate_helpers',
 
3704
        'bzrlib.tests.test__groupcompress',
 
3705
        'bzrlib.tests.test__known_graph',
 
3706
        'bzrlib.tests.test__rio',
 
3707
        'bzrlib.tests.test__simple_set',
 
3708
        'bzrlib.tests.test__static_tuple',
 
3709
        'bzrlib.tests.test__walkdirs_win32',
 
3710
        'bzrlib.tests.test_ancestry',
 
3711
        'bzrlib.tests.test_annotate',
 
3712
        'bzrlib.tests.test_api',
 
3713
        'bzrlib.tests.test_atomicfile',
 
3714
        'bzrlib.tests.test_bad_files',
 
3715
        'bzrlib.tests.test_bencode',
 
3716
        'bzrlib.tests.test_bisect_multi',
 
3717
        'bzrlib.tests.test_branch',
 
3718
        'bzrlib.tests.test_branchbuilder',
 
3719
        'bzrlib.tests.test_btree_index',
 
3720
        'bzrlib.tests.test_bugtracker',
 
3721
        'bzrlib.tests.test_bundle',
 
3722
        'bzrlib.tests.test_bzrdir',
 
3723
        'bzrlib.tests.test__chunks_to_lines',
 
3724
        'bzrlib.tests.test_cache_utf8',
 
3725
        'bzrlib.tests.test_chk_map',
 
3726
        'bzrlib.tests.test_chk_serializer',
 
3727
        'bzrlib.tests.test_chunk_writer',
 
3728
        'bzrlib.tests.test_clean_tree',
 
3729
        'bzrlib.tests.test_cleanup',
 
3730
        'bzrlib.tests.test_commands',
 
3731
        'bzrlib.tests.test_commit',
 
3732
        'bzrlib.tests.test_commit_merge',
 
3733
        'bzrlib.tests.test_config',
 
3734
        'bzrlib.tests.test_conflicts',
 
3735
        'bzrlib.tests.test_counted_lock',
 
3736
        'bzrlib.tests.test_crash',
 
3737
        'bzrlib.tests.test_decorators',
 
3738
        'bzrlib.tests.test_delta',
 
3739
        'bzrlib.tests.test_debug',
 
3740
        'bzrlib.tests.test_deprecated_graph',
 
3741
        'bzrlib.tests.test_diff',
 
3742
        'bzrlib.tests.test_directory_service',
 
3743
        'bzrlib.tests.test_dirstate',
 
3744
        'bzrlib.tests.test_email_message',
 
3745
        'bzrlib.tests.test_eol_filters',
 
3746
        'bzrlib.tests.test_errors',
 
3747
        'bzrlib.tests.test_export',
 
3748
        'bzrlib.tests.test_extract',
 
3749
        'bzrlib.tests.test_fetch',
 
3750
        'bzrlib.tests.test_fifo_cache',
 
3751
        'bzrlib.tests.test_filters',
 
3752
        'bzrlib.tests.test_ftp_transport',
 
3753
        'bzrlib.tests.test_foreign',
 
3754
        'bzrlib.tests.test_generate_docs',
 
3755
        'bzrlib.tests.test_generate_ids',
 
3756
        'bzrlib.tests.test_globbing',
 
3757
        'bzrlib.tests.test_gpg',
 
3758
        'bzrlib.tests.test_graph',
 
3759
        'bzrlib.tests.test_groupcompress',
 
3760
        'bzrlib.tests.test_hashcache',
 
3761
        'bzrlib.tests.test_help',
 
3762
        'bzrlib.tests.test_hooks',
 
3763
        'bzrlib.tests.test_http',
 
3764
        'bzrlib.tests.test_http_response',
 
3765
        'bzrlib.tests.test_https_ca_bundle',
 
3766
        'bzrlib.tests.test_identitymap',
 
3767
        'bzrlib.tests.test_ignores',
 
3768
        'bzrlib.tests.test_index',
 
3769
        'bzrlib.tests.test_info',
 
3770
        'bzrlib.tests.test_inv',
 
3771
        'bzrlib.tests.test_inventory_delta',
 
3772
        'bzrlib.tests.test_knit',
 
3773
        'bzrlib.tests.test_lazy_import',
 
3774
        'bzrlib.tests.test_lazy_regex',
 
3775
        'bzrlib.tests.test_lock',
 
3776
        'bzrlib.tests.test_lockable_files',
 
3777
        'bzrlib.tests.test_lockdir',
 
3778
        'bzrlib.tests.test_log',
 
3779
        'bzrlib.tests.test_lru_cache',
 
3780
        'bzrlib.tests.test_lsprof',
 
3781
        'bzrlib.tests.test_mail_client',
 
3782
        'bzrlib.tests.test_memorytree',
 
3783
        'bzrlib.tests.test_merge',
 
3784
        'bzrlib.tests.test_merge3',
 
3785
        'bzrlib.tests.test_merge_core',
 
3786
        'bzrlib.tests.test_merge_directive',
 
3787
        'bzrlib.tests.test_missing',
 
3788
        'bzrlib.tests.test_msgeditor',
 
3789
        'bzrlib.tests.test_multiparent',
 
3790
        'bzrlib.tests.test_mutabletree',
 
3791
        'bzrlib.tests.test_nonascii',
 
3792
        'bzrlib.tests.test_options',
 
3793
        'bzrlib.tests.test_osutils',
 
3794
        'bzrlib.tests.test_osutils_encodings',
 
3795
        'bzrlib.tests.test_pack',
 
3796
        'bzrlib.tests.test_patch',
 
3797
        'bzrlib.tests.test_patches',
 
3798
        'bzrlib.tests.test_permissions',
 
3799
        'bzrlib.tests.test_plugins',
 
3800
        'bzrlib.tests.test_progress',
 
3801
        'bzrlib.tests.test_read_bundle',
 
3802
        'bzrlib.tests.test_reconcile',
 
3803
        'bzrlib.tests.test_reconfigure',
 
3804
        'bzrlib.tests.test_registry',
 
3805
        'bzrlib.tests.test_remote',
 
3806
        'bzrlib.tests.test_rename_map',
 
3807
        'bzrlib.tests.test_repository',
 
3808
        'bzrlib.tests.test_revert',
 
3809
        'bzrlib.tests.test_revision',
 
3810
        'bzrlib.tests.test_revisionspec',
 
3811
        'bzrlib.tests.test_revisiontree',
 
3812
        'bzrlib.tests.test_rio',
 
3813
        'bzrlib.tests.test_rules',
 
3814
        'bzrlib.tests.test_sampler',
 
3815
        'bzrlib.tests.test_script',
 
3816
        'bzrlib.tests.test_selftest',
 
3817
        'bzrlib.tests.test_serializer',
 
3818
        'bzrlib.tests.test_setup',
 
3819
        'bzrlib.tests.test_sftp_transport',
 
3820
        'bzrlib.tests.test_shelf',
 
3821
        'bzrlib.tests.test_shelf_ui',
 
3822
        'bzrlib.tests.test_smart',
 
3823
        'bzrlib.tests.test_smart_add',
 
3824
        'bzrlib.tests.test_smart_request',
 
3825
        'bzrlib.tests.test_smart_transport',
 
3826
        'bzrlib.tests.test_smtp_connection',
 
3827
        'bzrlib.tests.test_source',
 
3828
        'bzrlib.tests.test_ssh_transport',
 
3829
        'bzrlib.tests.test_status',
 
3830
        'bzrlib.tests.test_store',
 
3831
        'bzrlib.tests.test_strace',
 
3832
        'bzrlib.tests.test_subsume',
 
3833
        'bzrlib.tests.test_switch',
 
3834
        'bzrlib.tests.test_symbol_versioning',
 
3835
        'bzrlib.tests.test_tag',
 
3836
        'bzrlib.tests.test_testament',
 
3837
        'bzrlib.tests.test_textfile',
 
3838
        'bzrlib.tests.test_textmerge',
 
3839
        'bzrlib.tests.test_timestamp',
 
3840
        'bzrlib.tests.test_trace',
 
3841
        'bzrlib.tests.test_transactions',
 
3842
        'bzrlib.tests.test_transform',
 
3843
        'bzrlib.tests.test_transport',
 
3844
        'bzrlib.tests.test_transport_log',
 
3845
        'bzrlib.tests.test_tree',
 
3846
        'bzrlib.tests.test_treebuilder',
 
3847
        'bzrlib.tests.test_tsort',
 
3848
        'bzrlib.tests.test_tuned_gzip',
 
3849
        'bzrlib.tests.test_ui',
 
3850
        'bzrlib.tests.test_uncommit',
 
3851
        'bzrlib.tests.test_upgrade',
 
3852
        'bzrlib.tests.test_upgrade_stacked',
 
3853
        'bzrlib.tests.test_urlutils',
 
3854
        'bzrlib.tests.test_version',
 
3855
        'bzrlib.tests.test_version_info',
 
3856
        'bzrlib.tests.test_weave',
 
3857
        'bzrlib.tests.test_whitebox',
 
3858
        'bzrlib.tests.test_win32utils',
 
3859
        'bzrlib.tests.test_workingtree',
 
3860
        'bzrlib.tests.test_workingtree_4',
 
3861
        'bzrlib.tests.test_wsgi',
 
3862
        'bzrlib.tests.test_xml',
 
3863
        ]
 
3864
 
 
3865
 
 
3866
def _test_suite_modules_to_doctest():
 
3867
    """Return the list of modules to doctest."""   
 
3868
    return [
 
3869
        'bzrlib',
 
3870
        'bzrlib.branchbuilder',
 
3871
        'bzrlib.export',
 
3872
        'bzrlib.inventory',
 
3873
        'bzrlib.iterablefile',
 
3874
        'bzrlib.lockdir',
 
3875
        'bzrlib.merge3',
 
3876
        'bzrlib.option',
 
3877
        'bzrlib.symbol_versioning',
 
3878
        'bzrlib.tests',
 
3879
        'bzrlib.timestamp',
 
3880
        'bzrlib.version_info_formats.format_custom',
 
3881
        ]
 
3882
 
 
3883
 
 
3884
def test_suite(keep_only=None, starting_with=None):
 
3885
    """Build and return TestSuite for the whole of bzrlib.
 
3886
 
 
3887
    :param keep_only: A list of test ids limiting the suite returned.
 
3888
 
 
3889
    :param starting_with: An id limiting the suite returned to the tests
 
3890
         starting with it.
 
3891
 
 
3892
    This function can be replaced if you need to change the default test
 
3893
    suite on a global basis, but it is not encouraged.
 
3894
    """
 
3895
 
 
3896
    loader = TestUtil.TestLoader()
 
3897
 
 
3898
    if keep_only is not None:
 
3899
        id_filter = TestIdList(keep_only)
 
3900
    if starting_with:
 
3901
        # We take precedence over keep_only because *at loading time* using
 
3902
        # both options means we will load less tests for the same final result.
 
3903
        def interesting_module(name):
 
3904
            for start in starting_with:
 
3905
                if (
 
3906
                    # Either the module name starts with the specified string
 
3907
                    name.startswith(start)
 
3908
                    # or it may contain tests starting with the specified string
 
3909
                    or start.startswith(name)
 
3910
                    ):
 
3911
                    return True
 
3912
            return False
 
3913
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3914
 
 
3915
    elif keep_only is not None:
 
3916
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3917
        def interesting_module(name):
 
3918
            return id_filter.refers_to(name)
 
3919
 
 
3920
    else:
 
3921
        loader = TestUtil.TestLoader()
 
3922
        def interesting_module(name):
 
3923
            # No filtering, all modules are interesting
 
3924
            return True
 
3925
 
 
3926
    suite = loader.suiteClass()
 
3927
 
 
3928
    # modules building their suite with loadTestsFromModuleNames
 
3929
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
3930
 
 
3931
    for mod in _test_suite_modules_to_doctest():
 
3932
        if not interesting_module(mod):
 
3933
            # No tests to keep here, move along
 
3934
            continue
 
3935
        try:
 
3936
            # note that this really does mean "report only" -- doctest
 
3937
            # still runs the rest of the examples
 
3938
            doc_suite = doctest.DocTestSuite(mod,
 
3939
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3940
        except ValueError, e:
 
3941
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3942
            raise
 
3943
        if len(doc_suite._tests) == 0:
 
3944
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3945
        suite.addTest(doc_suite)
 
3946
 
 
3947
    default_encoding = sys.getdefaultencoding()
 
3948
    for name, plugin in bzrlib.plugin.plugins().items():
 
3949
        if not interesting_module(plugin.module.__name__):
 
3950
            continue
 
3951
        plugin_suite = plugin.test_suite()
 
3952
        # We used to catch ImportError here and turn it into just a warning,
 
3953
        # but really if you don't have --no-plugins this should be a failure.
 
3954
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3955
        if plugin_suite is None:
 
3956
            plugin_suite = plugin.load_plugin_tests(loader)
 
3957
        if plugin_suite is not None:
 
3958
            suite.addTest(plugin_suite)
 
3959
        if default_encoding != sys.getdefaultencoding():
 
3960
            bzrlib.trace.warning(
 
3961
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3962
                sys.getdefaultencoding())
 
3963
            reload(sys)
 
3964
            sys.setdefaultencoding(default_encoding)
 
3965
 
 
3966
    if keep_only is not None:
 
3967
        # Now that the referred modules have loaded their tests, keep only the
 
3968
        # requested ones.
 
3969
        suite = filter_suite_by_id_list(suite, id_filter)
 
3970
        # Do some sanity checks on the id_list filtering
 
3971
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3972
        if starting_with:
 
3973
            # The tester has used both keep_only and starting_with, so he is
 
3974
            # already aware that some tests are excluded from the list, there
 
3975
            # is no need to tell him which.
 
3976
            pass
 
3977
        else:
 
3978
            # Some tests mentioned in the list are not in the test suite. The
 
3979
            # list may be out of date, report to the tester.
 
3980
            for id in not_found:
 
3981
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3982
        for id in duplicates:
 
3983
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3984
 
 
3985
    return suite
 
3986
 
 
3987
 
 
3988
def multiply_scenarios(scenarios_left, scenarios_right):
 
3989
    """Multiply two sets of scenarios.
 
3990
 
 
3991
    :returns: the cartesian product of the two sets of scenarios, that is
 
3992
        a scenario for every possible combination of a left scenario and a
 
3993
        right scenario.
 
3994
    """
 
3995
    return [
 
3996
        ('%s,%s' % (left_name, right_name),
 
3997
         dict(left_dict.items() + right_dict.items()))
 
3998
        for left_name, left_dict in scenarios_left
 
3999
        for right_name, right_dict in scenarios_right]
 
4000
 
 
4001
 
 
4002
def multiply_tests(tests, scenarios, result):
 
4003
    """Multiply tests_list by scenarios into result.
 
4004
 
 
4005
    This is the core workhorse for test parameterisation.
 
4006
 
 
4007
    Typically the load_tests() method for a per-implementation test suite will
 
4008
    call multiply_tests and return the result.
 
4009
 
 
4010
    :param tests: The tests to parameterise.
 
4011
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4012
        scenario_param_dict).
 
4013
    :param result: A TestSuite to add created tests to.
 
4014
 
 
4015
    This returns the passed in result TestSuite with the cross product of all
 
4016
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4017
    the scenario name at the end of its id(), and updating the test object's
 
4018
    __dict__ with the scenario_param_dict.
 
4019
 
 
4020
    >>> import bzrlib.tests.test_sampler
 
4021
    >>> r = multiply_tests(
 
4022
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
4023
    ...     [('one', dict(param=1)),
 
4024
    ...      ('two', dict(param=2))],
 
4025
    ...     TestSuite())
 
4026
    >>> tests = list(iter_suite_tests(r))
 
4027
    >>> len(tests)
 
4028
    2
 
4029
    >>> tests[0].id()
 
4030
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
4031
    >>> tests[0].param
 
4032
    1
 
4033
    >>> tests[1].param
 
4034
    2
 
4035
    """
 
4036
    for test in iter_suite_tests(tests):
 
4037
        apply_scenarios(test, scenarios, result)
 
4038
    return result
 
4039
 
 
4040
 
 
4041
def apply_scenarios(test, scenarios, result):
 
4042
    """Apply the scenarios in scenarios to test and add to result.
 
4043
 
 
4044
    :param test: The test to apply scenarios to.
 
4045
    :param scenarios: An iterable of scenarios to apply to test.
 
4046
    :return: result
 
4047
    :seealso: apply_scenario
 
4048
    """
 
4049
    for scenario in scenarios:
 
4050
        result.addTest(apply_scenario(test, scenario))
 
4051
    return result
 
4052
 
 
4053
 
 
4054
def apply_scenario(test, scenario):
 
4055
    """Copy test and apply scenario to it.
 
4056
 
 
4057
    :param test: A test to adapt.
 
4058
    :param scenario: A tuple describing the scenarion.
 
4059
        The first element of the tuple is the new test id.
 
4060
        The second element is a dict containing attributes to set on the
 
4061
        test.
 
4062
    :return: The adapted test.
 
4063
    """
 
4064
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4065
    new_test = clone_test(test, new_id)
 
4066
    for name, value in scenario[1].items():
 
4067
        setattr(new_test, name, value)
 
4068
    return new_test
 
4069
 
 
4070
 
 
4071
def clone_test(test, new_id):
 
4072
    """Clone a test giving it a new id.
 
4073
 
 
4074
    :param test: The test to clone.
 
4075
    :param new_id: The id to assign to it.
 
4076
    :return: The new test.
 
4077
    """
 
4078
    new_test = copy(test)
 
4079
    new_test.id = lambda: new_id
 
4080
    return new_test
 
4081
 
 
4082
 
 
4083
def _rmtree_temp_dir(dirname):
 
4084
    # If LANG=C we probably have created some bogus paths
 
4085
    # which rmtree(unicode) will fail to delete
 
4086
    # so make sure we are using rmtree(str) to delete everything
 
4087
    # except on win32, where rmtree(str) will fail
 
4088
    # since it doesn't have the property of byte-stream paths
 
4089
    # (they are either ascii or mbcs)
 
4090
    if sys.platform == 'win32':
 
4091
        # make sure we are using the unicode win32 api
 
4092
        dirname = unicode(dirname)
 
4093
    else:
 
4094
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4095
    try:
 
4096
        osutils.rmtree(dirname)
 
4097
    except OSError, e:
 
4098
        # We don't want to fail here because some useful display will be lost
 
4099
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4100
        # possible info to the test runner is even worse.
 
4101
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4102
                         % (os.path.basename(dirname), e))
 
4103
 
 
4104
 
 
4105
class Feature(object):
 
4106
    """An operating system Feature."""
 
4107
 
 
4108
    def __init__(self):
 
4109
        self._available = None
 
4110
 
 
4111
    def available(self):
 
4112
        """Is the feature available?
 
4113
 
 
4114
        :return: True if the feature is available.
 
4115
        """
 
4116
        if self._available is None:
 
4117
            self._available = self._probe()
 
4118
        return self._available
 
4119
 
 
4120
    def _probe(self):
 
4121
        """Implement this method in concrete features.
 
4122
 
 
4123
        :return: True if the feature is available.
 
4124
        """
 
4125
        raise NotImplementedError
 
4126
 
 
4127
    def __str__(self):
 
4128
        if getattr(self, 'feature_name', None):
 
4129
            return self.feature_name()
 
4130
        return self.__class__.__name__
 
4131
 
 
4132
 
 
4133
class _SymlinkFeature(Feature):
 
4134
 
 
4135
    def _probe(self):
 
4136
        return osutils.has_symlinks()
 
4137
 
 
4138
    def feature_name(self):
 
4139
        return 'symlinks'
 
4140
 
 
4141
SymlinkFeature = _SymlinkFeature()
 
4142
 
 
4143
 
 
4144
class _HardlinkFeature(Feature):
 
4145
 
 
4146
    def _probe(self):
 
4147
        return osutils.has_hardlinks()
 
4148
 
 
4149
    def feature_name(self):
 
4150
        return 'hardlinks'
 
4151
 
 
4152
HardlinkFeature = _HardlinkFeature()
 
4153
 
 
4154
 
 
4155
class _OsFifoFeature(Feature):
 
4156
 
 
4157
    def _probe(self):
 
4158
        return getattr(os, 'mkfifo', None)
 
4159
 
 
4160
    def feature_name(self):
 
4161
        return 'filesystem fifos'
 
4162
 
 
4163
OsFifoFeature = _OsFifoFeature()
 
4164
 
 
4165
 
 
4166
class _UnicodeFilenameFeature(Feature):
 
4167
    """Does the filesystem support Unicode filenames?"""
 
4168
 
 
4169
    def _probe(self):
 
4170
        try:
 
4171
            # Check for character combinations unlikely to be covered by any
 
4172
            # single non-unicode encoding. We use the characters
 
4173
            # - greek small letter alpha (U+03B1) and
 
4174
            # - braille pattern dots-123456 (U+283F).
 
4175
            os.stat(u'\u03b1\u283f')
 
4176
        except UnicodeEncodeError:
 
4177
            return False
 
4178
        except (IOError, OSError):
 
4179
            # The filesystem allows the Unicode filename but the file doesn't
 
4180
            # exist.
 
4181
            return True
 
4182
        else:
 
4183
            # The filesystem allows the Unicode filename and the file exists,
 
4184
            # for some reason.
 
4185
            return True
 
4186
 
 
4187
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4188
 
 
4189
 
 
4190
def probe_unicode_in_user_encoding():
 
4191
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4192
    Return first successfull match.
 
4193
 
 
4194
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4195
    """
 
4196
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4197
    for uni_val in possible_vals:
 
4198
        try:
 
4199
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4200
        except UnicodeEncodeError:
 
4201
            # Try a different character
 
4202
            pass
 
4203
        else:
 
4204
            return uni_val, str_val
 
4205
    return None, None
 
4206
 
 
4207
 
 
4208
def probe_bad_non_ascii(encoding):
 
4209
    """Try to find [bad] character with code [128..255]
 
4210
    that cannot be decoded to unicode in some encoding.
 
4211
    Return None if all non-ascii characters is valid
 
4212
    for given encoding.
 
4213
    """
 
4214
    for i in xrange(128, 256):
 
4215
        char = chr(i)
 
4216
        try:
 
4217
            char.decode(encoding)
 
4218
        except UnicodeDecodeError:
 
4219
            return char
 
4220
    return None
 
4221
 
 
4222
 
 
4223
class _HTTPSServerFeature(Feature):
 
4224
    """Some tests want an https Server, check if one is available.
 
4225
 
 
4226
    Right now, the only way this is available is under python2.6 which provides
 
4227
    an ssl module.
 
4228
    """
 
4229
 
 
4230
    def _probe(self):
 
4231
        try:
 
4232
            import ssl
 
4233
            return True
 
4234
        except ImportError:
 
4235
            return False
 
4236
 
 
4237
    def feature_name(self):
 
4238
        return 'HTTPSServer'
 
4239
 
 
4240
 
 
4241
HTTPSServerFeature = _HTTPSServerFeature()
 
4242
 
 
4243
 
 
4244
class _ParamikoFeature(Feature):
 
4245
    """Is paramiko available?"""
 
4246
 
 
4247
    def _probe(self):
 
4248
        try:
 
4249
            from bzrlib.transport.sftp import SFTPAbsoluteServer
 
4250
            return True
 
4251
        except errors.ParamikoNotPresent:
 
4252
            return False
 
4253
 
 
4254
    def feature_name(self):
 
4255
        return "Paramiko"
 
4256
 
 
4257
 
 
4258
ParamikoFeature = _ParamikoFeature()
 
4259
 
 
4260
 
 
4261
class _UnicodeFilename(Feature):
 
4262
    """Does the filesystem support Unicode filenames?"""
 
4263
 
 
4264
    def _probe(self):
 
4265
        try:
 
4266
            os.stat(u'\u03b1')
 
4267
        except UnicodeEncodeError:
 
4268
            return False
 
4269
        except (IOError, OSError):
 
4270
            # The filesystem allows the Unicode filename but the file doesn't
 
4271
            # exist.
 
4272
            return True
 
4273
        else:
 
4274
            # The filesystem allows the Unicode filename and the file exists,
 
4275
            # for some reason.
 
4276
            return True
 
4277
 
 
4278
UnicodeFilename = _UnicodeFilename()
 
4279
 
 
4280
 
 
4281
class _UTF8Filesystem(Feature):
 
4282
    """Is the filesystem UTF-8?"""
 
4283
 
 
4284
    def _probe(self):
 
4285
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4286
            return True
 
4287
        return False
 
4288
 
 
4289
UTF8Filesystem = _UTF8Filesystem()
 
4290
 
 
4291
 
 
4292
class _BreakinFeature(Feature):
 
4293
    """Does this platform support the breakin feature?"""
 
4294
 
 
4295
    def _probe(self):
 
4296
        from bzrlib import breakin
 
4297
        if breakin.determine_signal() is None:
 
4298
            return False
 
4299
        if sys.platform == 'win32':
 
4300
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4301
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4302
            # access the function
 
4303
            if not have_ctypes:
 
4304
                return False
 
4305
        return True
 
4306
 
 
4307
    def feature_name(self):
 
4308
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4309
 
 
4310
 
 
4311
BreakinFeature = _BreakinFeature()
 
4312
 
 
4313
 
 
4314
class _CaseInsCasePresFilenameFeature(Feature):
 
4315
    """Is the file-system case insensitive, but case-preserving?"""
 
4316
 
 
4317
    def _probe(self):
 
4318
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4319
        try:
 
4320
            # first check truly case-preserving for created files, then check
 
4321
            # case insensitive when opening existing files.
 
4322
            name = osutils.normpath(name)
 
4323
            base, rel = osutils.split(name)
 
4324
            found_rel = osutils.canonical_relpath(base, name)
 
4325
            return (found_rel == rel
 
4326
                    and os.path.isfile(name.upper())
 
4327
                    and os.path.isfile(name.lower()))
 
4328
        finally:
 
4329
            os.close(fileno)
 
4330
            os.remove(name)
 
4331
 
 
4332
    def feature_name(self):
 
4333
        return "case-insensitive case-preserving filesystem"
 
4334
 
 
4335
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4336
 
 
4337
 
 
4338
class _CaseInsensitiveFilesystemFeature(Feature):
 
4339
    """Check if underlying filesystem is case-insensitive but *not* case
 
4340
    preserving.
 
4341
    """
 
4342
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4343
    # more likely to be case preserving, so this case is rare.
 
4344
 
 
4345
    def _probe(self):
 
4346
        if CaseInsCasePresFilenameFeature.available():
 
4347
            return False
 
4348
 
 
4349
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4350
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4351
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4352
        else:
 
4353
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4354
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4355
            dir=root)
 
4356
        name_a = osutils.pathjoin(tdir, 'a')
 
4357
        name_A = osutils.pathjoin(tdir, 'A')
 
4358
        os.mkdir(name_a)
 
4359
        result = osutils.isdir(name_A)
 
4360
        _rmtree_temp_dir(tdir)
 
4361
        return result
 
4362
 
 
4363
    def feature_name(self):
 
4364
        return 'case-insensitive filesystem'
 
4365
 
 
4366
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4367
 
 
4368
 
 
4369
class _SubUnitFeature(Feature):
 
4370
    """Check if subunit is available."""
 
4371
 
 
4372
    def _probe(self):
 
4373
        try:
 
4374
            import subunit
 
4375
            return True
 
4376
        except ImportError:
 
4377
            return False
 
4378
 
 
4379
    def feature_name(self):
 
4380
        return 'subunit'
 
4381
 
 
4382
SubUnitFeature = _SubUnitFeature()
 
4383
# Only define SubUnitBzrRunner if subunit is available.
 
4384
try:
 
4385
    from subunit import TestProtocolClient
 
4386
    try:
 
4387
        from subunit.test_results import AutoTimingTestResultDecorator
 
4388
    except ImportError:
 
4389
        AutoTimingTestResultDecorator = lambda x:x
 
4390
    class SubUnitBzrRunner(TextTestRunner):
 
4391
        def run(self, test):
 
4392
            result = AutoTimingTestResultDecorator(
 
4393
                TestProtocolClient(self.stream))
 
4394
            test.run(result)
 
4395
            return result
 
4396
except ImportError:
 
4397
    pass