/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2012-06-29 08:19:21 UTC
  • mfrom: (0.40.147 trunk)
  • mto: This revision was merged to the branch mainline in revision 6555.
  • Revision ID: jelmer@samba.org-20120629081921-m3asg03n6z9fzcxn
Ship the grep plugin.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Testing framework extensions"""
 
18
 
 
19
from __future__ import absolute_import
 
20
 
 
21
# NOTE: Some classes in here use camelCaseNaming() rather than
 
22
# underscore_naming().  That's for consistency with unittest; it's not the
 
23
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
24
# new assertFoo() methods.
 
25
 
 
26
import atexit
 
27
import codecs
 
28
import copy
 
29
from cStringIO import StringIO
 
30
import difflib
 
31
import doctest
 
32
import errno
 
33
import itertools
 
34
import logging
 
35
import os
 
36
import platform
 
37
import pprint
 
38
import random
 
39
import re
 
40
import shlex
 
41
import site
 
42
import stat
 
43
import subprocess
 
44
import sys
 
45
import tempfile
 
46
import threading
 
47
import time
 
48
import traceback
 
49
import unittest
 
50
import warnings
 
51
 
 
52
import testtools
 
53
# nb: check this before importing anything else from within it
 
54
_testtools_version = getattr(testtools, '__version__', ())
 
55
if _testtools_version < (0, 9, 5):
 
56
    raise ImportError("need at least testtools 0.9.5: %s is %r"
 
57
        % (testtools.__file__, _testtools_version))
 
58
from testtools import content
 
59
 
 
60
import bzrlib
 
61
from bzrlib import (
 
62
    branchbuilder,
 
63
    controldir,
 
64
    chk_map,
 
65
    commands as _mod_commands,
 
66
    config,
 
67
    i18n,
 
68
    debug,
 
69
    errors,
 
70
    hooks,
 
71
    lock as _mod_lock,
 
72
    lockdir,
 
73
    memorytree,
 
74
    osutils,
 
75
    plugin as _mod_plugin,
 
76
    pyutils,
 
77
    ui,
 
78
    urlutils,
 
79
    registry,
 
80
    symbol_versioning,
 
81
    trace,
 
82
    transport as _mod_transport,
 
83
    workingtree,
 
84
    )
 
85
try:
 
86
    import bzrlib.lsprof
 
87
except ImportError:
 
88
    # lsprof not available
 
89
    pass
 
90
from bzrlib.smart import client, request
 
91
from bzrlib.transport import (
 
92
    memory,
 
93
    pathfilter,
 
94
    )
 
95
from bzrlib.symbol_versioning import (
 
96
    deprecated_function,
 
97
    deprecated_in,
 
98
    )
 
99
from bzrlib.tests import (
 
100
    fixtures,
 
101
    test_server,
 
102
    TestUtil,
 
103
    treeshape,
 
104
    )
 
105
from bzrlib.ui import NullProgressView
 
106
from bzrlib.ui.text import TextUIFactory
 
107
from bzrlib.tests.features import _CompatabilityThunkFeature
 
108
 
 
109
# Mark this python module as being part of the implementation
 
110
# of unittest: this gives us better tracebacks where the last
 
111
# shown frame is the test code, not our assertXYZ.
 
112
__unittest = 1
 
113
 
 
114
default_transport = test_server.LocalURLServer
 
115
 
 
116
 
 
117
_unitialized_attr = object()
 
118
"""A sentinel needed to act as a default value in a method signature."""
 
119
 
 
120
 
 
121
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
122
SUBUNIT_SEEK_SET = 0
 
123
SUBUNIT_SEEK_CUR = 1
 
124
 
 
125
# These are intentionally brought into this namespace. That way plugins, etc
 
126
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
 
127
TestSuite = TestUtil.TestSuite
 
128
TestLoader = TestUtil.TestLoader
 
129
 
 
130
# Tests should run in a clean and clearly defined environment. The goal is to
 
131
# keep them isolated from the running environment as mush as possible. The test
 
132
# framework ensures the variables defined below are set (or deleted if the
 
133
# value is None) before a test is run and reset to their original value after
 
134
# the test is run. Generally if some code depends on an environment variable,
 
135
# the tests should start without this variable in the environment. There are a
 
136
# few exceptions but you shouldn't violate this rule lightly.
 
137
isolated_environ = {
 
138
    'BZR_HOME': None,
 
139
    'HOME': None,
 
140
    'XDG_CONFIG_HOME': None,
 
141
    # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
142
    # tests do check our impls match APPDATA
 
143
    'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
144
    'VISUAL': None,
 
145
    'EDITOR': None,
 
146
    'BZR_EMAIL': None,
 
147
    'BZREMAIL': None, # may still be present in the environment
 
148
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
 
149
    'BZR_PROGRESS_BAR': None,
 
150
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
151
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
152
    # TestCase should not use disk resources, BZR_LOG is one.
 
153
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
154
    'BZR_PLUGIN_PATH': None,
 
155
    'BZR_DISABLE_PLUGINS': None,
 
156
    'BZR_PLUGINS_AT': None,
 
157
    'BZR_CONCURRENCY': None,
 
158
    # Make sure that any text ui tests are consistent regardless of
 
159
    # the environment the test case is run in; you may want tests that
 
160
    # test other combinations.  'dumb' is a reasonable guess for tests
 
161
    # going to a pipe or a StringIO.
 
162
    'TERM': 'dumb',
 
163
    'LINES': '25',
 
164
    'COLUMNS': '80',
 
165
    'BZR_COLUMNS': '80',
 
166
    # Disable SSH Agent
 
167
    'SSH_AUTH_SOCK': None,
 
168
    # Proxies
 
169
    'http_proxy': None,
 
170
    'HTTP_PROXY': None,
 
171
    'https_proxy': None,
 
172
    'HTTPS_PROXY': None,
 
173
    'no_proxy': None,
 
174
    'NO_PROXY': None,
 
175
    'all_proxy': None,
 
176
    'ALL_PROXY': None,
 
177
    # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
178
    # least. If you do (care), please update this comment
 
179
    # -- vila 20080401
 
180
    'ftp_proxy': None,
 
181
    'FTP_PROXY': None,
 
182
    'BZR_REMOTE_PATH': None,
 
183
    # Generally speaking, we don't want apport reporting on crashes in
 
184
    # the test envirnoment unless we're specifically testing apport,
 
185
    # so that it doesn't leak into the real system environment.  We
 
186
    # use an env var so it propagates to subprocesses.
 
187
    'APPORT_DISABLE': '1',
 
188
    }
 
189
 
 
190
 
 
191
def override_os_environ(test, env=None):
 
192
    """Modify os.environ keeping a copy.
 
193
    
 
194
    :param test: A test instance
 
195
 
 
196
    :param env: A dict containing variable definitions to be installed
 
197
    """
 
198
    if env is None:
 
199
        env = isolated_environ
 
200
    test._original_os_environ = dict([(var, value)
 
201
                                      for var, value in os.environ.iteritems()])
 
202
    for var, value in env.iteritems():
 
203
        osutils.set_or_unset_env(var, value)
 
204
        if var not in test._original_os_environ:
 
205
            # The var is new, add it with a value of None, so
 
206
            # restore_os_environ will delete it
 
207
            test._original_os_environ[var] = None
 
208
 
 
209
 
 
210
def restore_os_environ(test):
 
211
    """Restore os.environ to its original state.
 
212
 
 
213
    :param test: A test instance previously passed to override_os_environ.
 
214
    """
 
215
    for var, value in test._original_os_environ.iteritems():
 
216
        # Restore the original value (or delete it if the value has been set to
 
217
        # None in override_os_environ).
 
218
        osutils.set_or_unset_env(var, value)
 
219
 
 
220
 
 
221
def _clear__type_equality_funcs(test):
 
222
    """Cleanup bound methods stored on TestCase instances
 
223
 
 
224
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
225
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
226
 
 
227
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
228
    shipped in Oneiric, an object with no clear method was used, hence the
 
229
    extra complications, see bug 809048 for details.
 
230
    """
 
231
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
232
    if type_equality_funcs is not None:
 
233
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
234
        if tef_clear is None:
 
235
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
236
            if tef_instance_dict is not None:
 
237
                tef_clear = tef_instance_dict.clear
 
238
        if tef_clear is not None:
 
239
            tef_clear()
 
240
 
 
241
 
 
242
class ExtendedTestResult(testtools.TextTestResult):
 
243
    """Accepts, reports and accumulates the results of running tests.
 
244
 
 
245
    Compared to the unittest version this class adds support for
 
246
    profiling, benchmarking, stopping as soon as a test fails,  and
 
247
    skipping tests.  There are further-specialized subclasses for
 
248
    different types of display.
 
249
 
 
250
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
251
    addFailure or addError methods.  These in turn may redirect to a more
 
252
    specific case for the special test results supported by our extended
 
253
    tests.
 
254
 
 
255
    Note that just one of these objects is fed the results from many tests.
 
256
    """
 
257
 
 
258
    stop_early = False
 
259
 
 
260
    def __init__(self, stream, descriptions, verbosity,
 
261
                 bench_history=None,
 
262
                 strict=False,
 
263
                 ):
 
264
        """Construct new TestResult.
 
265
 
 
266
        :param bench_history: Optionally, a writable file object to accumulate
 
267
            benchmark results.
 
268
        """
 
269
        testtools.TextTestResult.__init__(self, stream)
 
270
        if bench_history is not None:
 
271
            from bzrlib.version import _get_bzr_source_tree
 
272
            src_tree = _get_bzr_source_tree()
 
273
            if src_tree:
 
274
                try:
 
275
                    revision_id = src_tree.get_parent_ids()[0]
 
276
                except IndexError:
 
277
                    # XXX: if this is a brand new tree, do the same as if there
 
278
                    # is no branch.
 
279
                    revision_id = ''
 
280
            else:
 
281
                # XXX: If there's no branch, what should we do?
 
282
                revision_id = ''
 
283
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
284
        self._bench_history = bench_history
 
285
        self.ui = ui.ui_factory
 
286
        self.num_tests = 0
 
287
        self.error_count = 0
 
288
        self.failure_count = 0
 
289
        self.known_failure_count = 0
 
290
        self.skip_count = 0
 
291
        self.not_applicable_count = 0
 
292
        self.unsupported = {}
 
293
        self.count = 0
 
294
        self._overall_start_time = time.time()
 
295
        self._strict = strict
 
296
        self._first_thread_leaker_id = None
 
297
        self._tests_leaking_threads_count = 0
 
298
        self._traceback_from_test = None
 
299
 
 
300
    def stopTestRun(self):
 
301
        run = self.testsRun
 
302
        actionTaken = "Ran"
 
303
        stopTime = time.time()
 
304
        timeTaken = stopTime - self.startTime
 
305
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
306
        #                the parent class method is similar have to duplicate
 
307
        self._show_list('ERROR', self.errors)
 
308
        self._show_list('FAIL', self.failures)
 
309
        self.stream.write(self.sep2)
 
310
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
311
                            run, run != 1 and "s" or "", timeTaken))
 
312
        if not self.wasSuccessful():
 
313
            self.stream.write("FAILED (")
 
314
            failed, errored = map(len, (self.failures, self.errors))
 
315
            if failed:
 
316
                self.stream.write("failures=%d" % failed)
 
317
            if errored:
 
318
                if failed: self.stream.write(", ")
 
319
                self.stream.write("errors=%d" % errored)
 
320
            if self.known_failure_count:
 
321
                if failed or errored: self.stream.write(", ")
 
322
                self.stream.write("known_failure_count=%d" %
 
323
                    self.known_failure_count)
 
324
            self.stream.write(")\n")
 
325
        else:
 
326
            if self.known_failure_count:
 
327
                self.stream.write("OK (known_failures=%d)\n" %
 
328
                    self.known_failure_count)
 
329
            else:
 
330
                self.stream.write("OK\n")
 
331
        if self.skip_count > 0:
 
332
            skipped = self.skip_count
 
333
            self.stream.write('%d test%s skipped\n' %
 
334
                                (skipped, skipped != 1 and "s" or ""))
 
335
        if self.unsupported:
 
336
            for feature, count in sorted(self.unsupported.items()):
 
337
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
338
                    (feature, count))
 
339
        if self._strict:
 
340
            ok = self.wasStrictlySuccessful()
 
341
        else:
 
342
            ok = self.wasSuccessful()
 
343
        if self._first_thread_leaker_id:
 
344
            self.stream.write(
 
345
                '%s is leaking threads among %d leaking tests.\n' % (
 
346
                self._first_thread_leaker_id,
 
347
                self._tests_leaking_threads_count))
 
348
            # We don't report the main thread as an active one.
 
349
            self.stream.write(
 
350
                '%d non-main threads were left active in the end.\n'
 
351
                % (len(self._active_threads) - 1))
 
352
 
 
353
    def getDescription(self, test):
 
354
        return test.id()
 
355
 
 
356
    def _extractBenchmarkTime(self, testCase, details=None):
 
357
        """Add a benchmark time for the current test case."""
 
358
        if details and 'benchtime' in details:
 
359
            return float(''.join(details['benchtime'].iter_bytes()))
 
360
        return getattr(testCase, "_benchtime", None)
 
361
 
 
362
    def _elapsedTestTimeString(self):
 
363
        """Return a time string for the overall time the current test has taken."""
 
364
        return self._formatTime(self._delta_to_float(
 
365
            self._now() - self._start_datetime))
 
366
 
 
367
    def _testTimeString(self, testCase):
 
368
        benchmark_time = self._extractBenchmarkTime(testCase)
 
369
        if benchmark_time is not None:
 
370
            return self._formatTime(benchmark_time) + "*"
 
371
        else:
 
372
            return self._elapsedTestTimeString()
 
373
 
 
374
    def _formatTime(self, seconds):
 
375
        """Format seconds as milliseconds with leading spaces."""
 
376
        # some benchmarks can take thousands of seconds to run, so we need 8
 
377
        # places
 
378
        return "%8dms" % (1000 * seconds)
 
379
 
 
380
    def _shortened_test_description(self, test):
 
381
        what = test.id()
 
382
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
383
        return what
 
384
 
 
385
    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
 
386
    #                multiple times in a row, because the handler is added for
 
387
    #                each test but the container list is shared between cases.
 
388
    #                See lp:498869 lp:625574 and lp:637725 for background.
 
389
    def _record_traceback_from_test(self, exc_info):
 
390
        """Store the traceback from passed exc_info tuple till"""
 
391
        self._traceback_from_test = exc_info[2]
 
392
 
 
393
    def startTest(self, test):
 
394
        super(ExtendedTestResult, self).startTest(test)
 
395
        if self.count == 0:
 
396
            self.startTests()
 
397
        self.count += 1
 
398
        self.report_test_start(test)
 
399
        test.number = self.count
 
400
        self._recordTestStartTime()
 
401
        # Make testtools cases give us the real traceback on failure
 
402
        addOnException = getattr(test, "addOnException", None)
 
403
        if addOnException is not None:
 
404
            addOnException(self._record_traceback_from_test)
 
405
        # Only check for thread leaks on bzrlib derived test cases
 
406
        if isinstance(test, TestCase):
 
407
            test.addCleanup(self._check_leaked_threads, test)
 
408
 
 
409
    def stopTest(self, test):
 
410
        super(ExtendedTestResult, self).stopTest(test)
 
411
        # Manually break cycles, means touching various private things but hey
 
412
        getDetails = getattr(test, "getDetails", None)
 
413
        if getDetails is not None:
 
414
            getDetails().clear()
 
415
        _clear__type_equality_funcs(test)
 
416
        self._traceback_from_test = None
 
417
 
 
418
    def startTests(self):
 
419
        self.report_tests_starting()
 
420
        self._active_threads = threading.enumerate()
 
421
 
 
422
    def _check_leaked_threads(self, test):
 
423
        """See if any threads have leaked since last call
 
424
 
 
425
        A sample of live threads is stored in the _active_threads attribute,
 
426
        when this method runs it compares the current live threads and any not
 
427
        in the previous sample are treated as having leaked.
 
428
        """
 
429
        now_active_threads = set(threading.enumerate())
 
430
        threads_leaked = now_active_threads.difference(self._active_threads)
 
431
        if threads_leaked:
 
432
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
433
            self._tests_leaking_threads_count += 1
 
434
            if self._first_thread_leaker_id is None:
 
435
                self._first_thread_leaker_id = test.id()
 
436
            self._active_threads = now_active_threads
 
437
 
 
438
    def _recordTestStartTime(self):
 
439
        """Record that a test has started."""
 
440
        self._start_datetime = self._now()
 
441
 
 
442
    def addError(self, test, err):
 
443
        """Tell result that test finished with an error.
 
444
 
 
445
        Called from the TestCase run() method when the test
 
446
        fails with an unexpected error.
 
447
        """
 
448
        self._post_mortem(self._traceback_from_test)
 
449
        super(ExtendedTestResult, self).addError(test, err)
 
450
        self.error_count += 1
 
451
        self.report_error(test, err)
 
452
        if self.stop_early:
 
453
            self.stop()
 
454
 
 
455
    def addFailure(self, test, err):
 
456
        """Tell result that test failed.
 
457
 
 
458
        Called from the TestCase run() method when the test
 
459
        fails because e.g. an assert() method failed.
 
460
        """
 
461
        self._post_mortem(self._traceback_from_test)
 
462
        super(ExtendedTestResult, self).addFailure(test, err)
 
463
        self.failure_count += 1
 
464
        self.report_failure(test, err)
 
465
        if self.stop_early:
 
466
            self.stop()
 
467
 
 
468
    def addSuccess(self, test, details=None):
 
469
        """Tell result that test completed successfully.
 
470
 
 
471
        Called from the TestCase run()
 
472
        """
 
473
        if self._bench_history is not None:
 
474
            benchmark_time = self._extractBenchmarkTime(test, details)
 
475
            if benchmark_time is not None:
 
476
                self._bench_history.write("%s %s\n" % (
 
477
                    self._formatTime(benchmark_time),
 
478
                    test.id()))
 
479
        self.report_success(test)
 
480
        super(ExtendedTestResult, self).addSuccess(test)
 
481
        test._log_contents = ''
 
482
 
 
483
    def addExpectedFailure(self, test, err):
 
484
        self.known_failure_count += 1
 
485
        self.report_known_failure(test, err)
 
486
 
 
487
    def addUnexpectedSuccess(self, test, details=None):
 
488
        """Tell result the test unexpectedly passed, counting as a failure
 
489
 
 
490
        When the minimum version of testtools required becomes 0.9.8 this
 
491
        can be updated to use the new handling there.
 
492
        """
 
493
        super(ExtendedTestResult, self).addFailure(test, details=details)
 
494
        self.failure_count += 1
 
495
        self.report_unexpected_success(test,
 
496
            "".join(details["reason"].iter_text()))
 
497
        if self.stop_early:
 
498
            self.stop()
 
499
 
 
500
    def addNotSupported(self, test, feature):
 
501
        """The test will not be run because of a missing feature.
 
502
        """
 
503
        # this can be called in two different ways: it may be that the
 
504
        # test started running, and then raised (through requireFeature)
 
505
        # UnavailableFeature.  Alternatively this method can be called
 
506
        # while probing for features before running the test code proper; in
 
507
        # that case we will see startTest and stopTest, but the test will
 
508
        # never actually run.
 
509
        self.unsupported.setdefault(str(feature), 0)
 
510
        self.unsupported[str(feature)] += 1
 
511
        self.report_unsupported(test, feature)
 
512
 
 
513
    def addSkip(self, test, reason):
 
514
        """A test has not run for 'reason'."""
 
515
        self.skip_count += 1
 
516
        self.report_skip(test, reason)
 
517
 
 
518
    def addNotApplicable(self, test, reason):
 
519
        self.not_applicable_count += 1
 
520
        self.report_not_applicable(test, reason)
 
521
 
 
522
    def _count_stored_tests(self):
 
523
        """Count of tests instances kept alive due to not succeeding"""
 
524
        return self.error_count + self.failure_count + self.known_failure_count
 
525
 
 
526
    def _post_mortem(self, tb=None):
 
527
        """Start a PDB post mortem session."""
 
528
        if os.environ.get('BZR_TEST_PDB', None):
 
529
            import pdb
 
530
            pdb.post_mortem(tb)
 
531
 
 
532
    def progress(self, offset, whence):
 
533
        """The test is adjusting the count of tests to run."""
 
534
        if whence == SUBUNIT_SEEK_SET:
 
535
            self.num_tests = offset
 
536
        elif whence == SUBUNIT_SEEK_CUR:
 
537
            self.num_tests += offset
 
538
        else:
 
539
            raise errors.BzrError("Unknown whence %r" % whence)
 
540
 
 
541
    def report_tests_starting(self):
 
542
        """Display information before the test run begins"""
 
543
        if getattr(sys, 'frozen', None) is None:
 
544
            bzr_path = osutils.realpath(sys.argv[0])
 
545
        else:
 
546
            bzr_path = sys.executable
 
547
        self.stream.write(
 
548
            'bzr selftest: %s\n' % (bzr_path,))
 
549
        self.stream.write(
 
550
            '   %s\n' % (
 
551
                    bzrlib.__path__[0],))
 
552
        self.stream.write(
 
553
            '   bzr-%s python-%s %s\n' % (
 
554
                    bzrlib.version_string,
 
555
                    bzrlib._format_version_tuple(sys.version_info),
 
556
                    platform.platform(aliased=1),
 
557
                    ))
 
558
        self.stream.write('\n')
 
559
 
 
560
    def report_test_start(self, test):
 
561
        """Display information on the test just about to be run"""
 
562
 
 
563
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
564
        """Display information on a test that leaked one or more threads"""
 
565
        # GZ 2010-09-09: A leak summary reported separately from the general
 
566
        #                thread debugging would be nice. Tests under subunit
 
567
        #                need something not using stream, perhaps adding a
 
568
        #                testtools details object would be fitting.
 
569
        if 'threads' in selftest_debug_flags:
 
570
            self.stream.write('%s is leaking, active is now %d\n' %
 
571
                (test.id(), len(active_threads)))
 
572
 
 
573
    def startTestRun(self):
 
574
        self.startTime = time.time()
 
575
 
 
576
    def report_success(self, test):
 
577
        pass
 
578
 
 
579
    def wasStrictlySuccessful(self):
 
580
        if self.unsupported or self.known_failure_count:
 
581
            return False
 
582
        return self.wasSuccessful()
 
583
 
 
584
 
 
585
class TextTestResult(ExtendedTestResult):
 
586
    """Displays progress and results of tests in text form"""
 
587
 
 
588
    def __init__(self, stream, descriptions, verbosity,
 
589
                 bench_history=None,
 
590
                 pb=None,
 
591
                 strict=None,
 
592
                 ):
 
593
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
594
            bench_history, strict)
 
595
        # We no longer pass them around, but just rely on the UIFactory stack
 
596
        # for state
 
597
        if pb is not None:
 
598
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
599
        self.pb = self.ui.nested_progress_bar()
 
600
        self.pb.show_pct = False
 
601
        self.pb.show_spinner = False
 
602
        self.pb.show_eta = False,
 
603
        self.pb.show_count = False
 
604
        self.pb.show_bar = False
 
605
        self.pb.update_latency = 0
 
606
        self.pb.show_transport_activity = False
 
607
 
 
608
    def stopTestRun(self):
 
609
        # called when the tests that are going to run have run
 
610
        self.pb.clear()
 
611
        self.pb.finished()
 
612
        super(TextTestResult, self).stopTestRun()
 
613
 
 
614
    def report_tests_starting(self):
 
615
        super(TextTestResult, self).report_tests_starting()
 
616
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
617
 
 
618
    def _progress_prefix_text(self):
 
619
        # the longer this text, the less space we have to show the test
 
620
        # name...
 
621
        a = '[%d' % self.count              # total that have been run
 
622
        # tests skipped as known not to be relevant are not important enough
 
623
        # to show here
 
624
        ## if self.skip_count:
 
625
        ##     a += ', %d skip' % self.skip_count
 
626
        ## if self.known_failure_count:
 
627
        ##     a += '+%dX' % self.known_failure_count
 
628
        if self.num_tests:
 
629
            a +='/%d' % self.num_tests
 
630
        a += ' in '
 
631
        runtime = time.time() - self._overall_start_time
 
632
        if runtime >= 60:
 
633
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
634
        else:
 
635
            a += '%ds' % runtime
 
636
        total_fail_count = self.error_count + self.failure_count
 
637
        if total_fail_count:
 
638
            a += ', %d failed' % total_fail_count
 
639
        # if self.unsupported:
 
640
        #     a += ', %d missing' % len(self.unsupported)
 
641
        a += ']'
 
642
        return a
 
643
 
 
644
    def report_test_start(self, test):
 
645
        self.pb.update(
 
646
                self._progress_prefix_text()
 
647
                + ' '
 
648
                + self._shortened_test_description(test))
 
649
 
 
650
    def _test_description(self, test):
 
651
        return self._shortened_test_description(test)
 
652
 
 
653
    def report_error(self, test, err):
 
654
        self.stream.write('ERROR: %s\n    %s\n' % (
 
655
            self._test_description(test),
 
656
            err[1],
 
657
            ))
 
658
 
 
659
    def report_failure(self, test, err):
 
660
        self.stream.write('FAIL: %s\n    %s\n' % (
 
661
            self._test_description(test),
 
662
            err[1],
 
663
            ))
 
664
 
 
665
    def report_known_failure(self, test, err):
 
666
        pass
 
667
 
 
668
    def report_unexpected_success(self, test, reason):
 
669
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
 
670
            self._test_description(test),
 
671
            "Unexpected success. Should have failed",
 
672
            reason,
 
673
            ))
 
674
 
 
675
    def report_skip(self, test, reason):
 
676
        pass
 
677
 
 
678
    def report_not_applicable(self, test, reason):
 
679
        pass
 
680
 
 
681
    def report_unsupported(self, test, feature):
 
682
        """test cannot be run because feature is missing."""
 
683
 
 
684
 
 
685
class VerboseTestResult(ExtendedTestResult):
 
686
    """Produce long output, with one line per test run plus times"""
 
687
 
 
688
    def _ellipsize_to_right(self, a_string, final_width):
 
689
        """Truncate and pad a string, keeping the right hand side"""
 
690
        if len(a_string) > final_width:
 
691
            result = '...' + a_string[3-final_width:]
 
692
        else:
 
693
            result = a_string
 
694
        return result.ljust(final_width)
 
695
 
 
696
    def report_tests_starting(self):
 
697
        self.stream.write('running %d tests...\n' % self.num_tests)
 
698
        super(VerboseTestResult, self).report_tests_starting()
 
699
 
 
700
    def report_test_start(self, test):
 
701
        name = self._shortened_test_description(test)
 
702
        width = osutils.terminal_width()
 
703
        if width is not None:
 
704
            # width needs space for 6 char status, plus 1 for slash, plus an
 
705
            # 11-char time string, plus a trailing blank
 
706
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
707
            # space
 
708
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
709
        else:
 
710
            self.stream.write(name)
 
711
        self.stream.flush()
 
712
 
 
713
    def _error_summary(self, err):
 
714
        indent = ' ' * 4
 
715
        return '%s%s' % (indent, err[1])
 
716
 
 
717
    def report_error(self, test, err):
 
718
        self.stream.write('ERROR %s\n%s\n'
 
719
                % (self._testTimeString(test),
 
720
                   self._error_summary(err)))
 
721
 
 
722
    def report_failure(self, test, err):
 
723
        self.stream.write(' FAIL %s\n%s\n'
 
724
                % (self._testTimeString(test),
 
725
                   self._error_summary(err)))
 
726
 
 
727
    def report_known_failure(self, test, err):
 
728
        self.stream.write('XFAIL %s\n%s\n'
 
729
                % (self._testTimeString(test),
 
730
                   self._error_summary(err)))
 
731
 
 
732
    def report_unexpected_success(self, test, reason):
 
733
        self.stream.write(' FAIL %s\n%s: %s\n'
 
734
                % (self._testTimeString(test),
 
735
                   "Unexpected success. Should have failed",
 
736
                   reason))
 
737
 
 
738
    def report_success(self, test):
 
739
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
740
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
741
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
742
            stats.pprint(file=self.stream)
 
743
        # flush the stream so that we get smooth output. This verbose mode is
 
744
        # used to show the output in PQM.
 
745
        self.stream.flush()
 
746
 
 
747
    def report_skip(self, test, reason):
 
748
        self.stream.write(' SKIP %s\n%s\n'
 
749
                % (self._testTimeString(test), reason))
 
750
 
 
751
    def report_not_applicable(self, test, reason):
 
752
        self.stream.write('  N/A %s\n    %s\n'
 
753
                % (self._testTimeString(test), reason))
 
754
 
 
755
    def report_unsupported(self, test, feature):
 
756
        """test cannot be run because feature is missing."""
 
757
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
758
                %(self._testTimeString(test), feature))
 
759
 
 
760
 
 
761
class TextTestRunner(object):
 
762
    stop_on_failure = False
 
763
 
 
764
    def __init__(self,
 
765
                 stream=sys.stderr,
 
766
                 descriptions=0,
 
767
                 verbosity=1,
 
768
                 bench_history=None,
 
769
                 strict=False,
 
770
                 result_decorators=None,
 
771
                 ):
 
772
        """Create a TextTestRunner.
 
773
 
 
774
        :param result_decorators: An optional list of decorators to apply
 
775
            to the result object being used by the runner. Decorators are
 
776
            applied left to right - the first element in the list is the 
 
777
            innermost decorator.
 
778
        """
 
779
        # stream may know claim to know to write unicode strings, but in older
 
780
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
781
        # specifically a built in file with encoding 'UTF-8' will still try
 
782
        # to encode using ascii.
 
783
        new_encoding = osutils.get_terminal_encoding()
 
784
        codec = codecs.lookup(new_encoding)
 
785
        if type(codec) is tuple:
 
786
            # Python 2.4
 
787
            encode = codec[0]
 
788
        else:
 
789
            encode = codec.encode
 
790
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
 
791
        #                so should swap to the plain codecs.StreamWriter
 
792
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
 
793
            "backslashreplace")
 
794
        stream.encoding = new_encoding
 
795
        self.stream = stream
 
796
        self.descriptions = descriptions
 
797
        self.verbosity = verbosity
 
798
        self._bench_history = bench_history
 
799
        self._strict = strict
 
800
        self._result_decorators = result_decorators or []
 
801
 
 
802
    def run(self, test):
 
803
        "Run the given test case or test suite."
 
804
        if self.verbosity == 1:
 
805
            result_class = TextTestResult
 
806
        elif self.verbosity >= 2:
 
807
            result_class = VerboseTestResult
 
808
        original_result = result_class(self.stream,
 
809
                              self.descriptions,
 
810
                              self.verbosity,
 
811
                              bench_history=self._bench_history,
 
812
                              strict=self._strict,
 
813
                              )
 
814
        # Signal to result objects that look at stop early policy to stop,
 
815
        original_result.stop_early = self.stop_on_failure
 
816
        result = original_result
 
817
        for decorator in self._result_decorators:
 
818
            result = decorator(result)
 
819
            result.stop_early = self.stop_on_failure
 
820
        result.startTestRun()
 
821
        try:
 
822
            test.run(result)
 
823
        finally:
 
824
            result.stopTestRun()
 
825
        # higher level code uses our extended protocol to determine
 
826
        # what exit code to give.
 
827
        return original_result
 
828
 
 
829
 
 
830
def iter_suite_tests(suite):
 
831
    """Return all tests in a suite, recursing through nested suites"""
 
832
    if isinstance(suite, unittest.TestCase):
 
833
        yield suite
 
834
    elif isinstance(suite, unittest.TestSuite):
 
835
        for item in suite:
 
836
            for r in iter_suite_tests(item):
 
837
                yield r
 
838
    else:
 
839
        raise Exception('unknown type %r for object %r'
 
840
                        % (type(suite), suite))
 
841
 
 
842
 
 
843
TestSkipped = testtools.testcase.TestSkipped
 
844
 
 
845
 
 
846
class TestNotApplicable(TestSkipped):
 
847
    """A test is not applicable to the situation where it was run.
 
848
 
 
849
    This is only normally raised by parameterized tests, if they find that
 
850
    the instance they're constructed upon does not support one aspect
 
851
    of its interface.
 
852
    """
 
853
 
 
854
 
 
855
# traceback._some_str fails to format exceptions that have the default
 
856
# __str__ which does an implicit ascii conversion. However, repr() on those
 
857
# objects works, for all that its not quite what the doctor may have ordered.
 
858
def _clever_some_str(value):
 
859
    try:
 
860
        return str(value)
 
861
    except:
 
862
        try:
 
863
            return repr(value).replace('\\n', '\n')
 
864
        except:
 
865
            return '<unprintable %s object>' % type(value).__name__
 
866
 
 
867
traceback._some_str = _clever_some_str
 
868
 
 
869
 
 
870
# deprecated - use self.knownFailure(), or self.expectFailure.
 
871
KnownFailure = testtools.testcase._ExpectedFailure
 
872
 
 
873
 
 
874
class UnavailableFeature(Exception):
 
875
    """A feature required for this test was not available.
 
876
 
 
877
    This can be considered a specialised form of SkippedTest.
 
878
 
 
879
    The feature should be used to construct the exception.
 
880
    """
 
881
 
 
882
 
 
883
class StringIOWrapper(object):
 
884
    """A wrapper around cStringIO which just adds an encoding attribute.
 
885
 
 
886
    Internally we can check sys.stdout to see what the output encoding
 
887
    should be. However, cStringIO has no encoding attribute that we can
 
888
    set. So we wrap it instead.
 
889
    """
 
890
    encoding='ascii'
 
891
    _cstring = None
 
892
 
 
893
    def __init__(self, s=None):
 
894
        if s is not None:
 
895
            self.__dict__['_cstring'] = StringIO(s)
 
896
        else:
 
897
            self.__dict__['_cstring'] = StringIO()
 
898
 
 
899
    def __getattr__(self, name, getattr=getattr):
 
900
        return getattr(self.__dict__['_cstring'], name)
 
901
 
 
902
    def __setattr__(self, name, val):
 
903
        if name == 'encoding':
 
904
            self.__dict__['encoding'] = val
 
905
        else:
 
906
            return setattr(self._cstring, name, val)
 
907
 
 
908
 
 
909
class TestUIFactory(TextUIFactory):
 
910
    """A UI Factory for testing.
 
911
 
 
912
    Hide the progress bar but emit note()s.
 
913
    Redirect stdin.
 
914
    Allows get_password to be tested without real tty attached.
 
915
 
 
916
    See also CannedInputUIFactory which lets you provide programmatic input in
 
917
    a structured way.
 
918
    """
 
919
    # TODO: Capture progress events at the model level and allow them to be
 
920
    # observed by tests that care.
 
921
    #
 
922
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
923
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
924
    # idea of how they're supposed to be different.
 
925
    # See https://bugs.launchpad.net/bzr/+bug/408213
 
926
 
 
927
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
928
        if stdin is not None:
 
929
            # We use a StringIOWrapper to be able to test various
 
930
            # encodings, but the user is still responsible to
 
931
            # encode the string and to set the encoding attribute
 
932
            # of StringIOWrapper.
 
933
            stdin = StringIOWrapper(stdin)
 
934
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
935
 
 
936
    def get_non_echoed_password(self):
 
937
        """Get password from stdin without trying to handle the echo mode"""
 
938
        password = self.stdin.readline()
 
939
        if not password:
 
940
            raise EOFError
 
941
        if password[-1] == '\n':
 
942
            password = password[:-1]
 
943
        return password
 
944
 
 
945
    def make_progress_view(self):
 
946
        return NullProgressView()
 
947
 
 
948
 
 
949
def isolated_doctest_setUp(test):
 
950
    override_os_environ(test)
 
951
 
 
952
 
 
953
def isolated_doctest_tearDown(test):
 
954
    restore_os_environ(test)
 
955
 
 
956
 
 
957
def IsolatedDocTestSuite(*args, **kwargs):
 
958
    """Overrides doctest.DocTestSuite to handle isolation.
 
959
 
 
960
    The method is really a factory and users are expected to use it as such.
 
961
    """
 
962
 
 
963
    kwargs['setUp'] = isolated_doctest_setUp
 
964
    kwargs['tearDown'] = isolated_doctest_tearDown
 
965
    return doctest.DocTestSuite(*args, **kwargs)
 
966
 
 
967
 
 
968
class TestCase(testtools.TestCase):
 
969
    """Base class for bzr unit tests.
 
970
 
 
971
    Tests that need access to disk resources should subclass
 
972
    TestCaseInTempDir not TestCase.
 
973
 
 
974
    Error and debug log messages are redirected from their usual
 
975
    location into a temporary file, the contents of which can be
 
976
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
977
    so that it can also capture file IO.  When the test completes this file
 
978
    is read into memory and removed from disk.
 
979
 
 
980
    There are also convenience functions to invoke bzr's command-line
 
981
    routine, and to build and check bzr trees.
 
982
 
 
983
    In addition to the usual method of overriding tearDown(), this class also
 
984
    allows subclasses to register cleanup functions via addCleanup, which are
 
985
    run in order as the object is torn down.  It's less likely this will be
 
986
    accidentally overlooked.
 
987
    """
 
988
 
 
989
    _log_file = None
 
990
    # record lsprof data when performing benchmark calls.
 
991
    _gather_lsprof_in_benchmarks = False
 
992
 
 
993
    def __init__(self, methodName='testMethod'):
 
994
        super(TestCase, self).__init__(methodName)
 
995
        self._directory_isolation = True
 
996
        self.exception_handlers.insert(0,
 
997
            (UnavailableFeature, self._do_unsupported_or_skip))
 
998
        self.exception_handlers.insert(0,
 
999
            (TestNotApplicable, self._do_not_applicable))
 
1000
 
 
1001
    def setUp(self):
 
1002
        super(TestCase, self).setUp()
 
1003
 
 
1004
        timeout = config.GlobalStack().get('selftest.timeout')
 
1005
        if timeout:
 
1006
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
1007
            timeout_fixture.setUp()
 
1008
            self.addCleanup(timeout_fixture.cleanUp)
 
1009
 
 
1010
        for feature in getattr(self, '_test_needs_features', []):
 
1011
            self.requireFeature(feature)
 
1012
        self._cleanEnvironment()
 
1013
 
 
1014
        if bzrlib.global_state is not None:
 
1015
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
 
1016
                              config.CommandLineStore())
 
1017
 
 
1018
        self._silenceUI()
 
1019
        self._startLogFile()
 
1020
        self._benchcalls = []
 
1021
        self._benchtime = None
 
1022
        self._clear_hooks()
 
1023
        self._track_transports()
 
1024
        self._track_locks()
 
1025
        self._clear_debug_flags()
 
1026
        # Isolate global verbosity level, to make sure it's reproducible
 
1027
        # between tests.  We should get rid of this altogether: bug 656694. --
 
1028
        # mbp 20101008
 
1029
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
 
1030
        self._log_files = set()
 
1031
        # Each key in the ``_counters`` dict holds a value for a different
 
1032
        # counter. When the test ends, addDetail() should be used to output the
 
1033
        # counter values. This happens in install_counter_hook().
 
1034
        self._counters = {}
 
1035
        if 'config_stats' in selftest_debug_flags:
 
1036
            self._install_config_stats_hooks()
 
1037
        # Do not use i18n for tests (unless the test reverses this)
 
1038
        i18n.disable_i18n()
 
1039
 
 
1040
    def debug(self):
 
1041
        # debug a frame up.
 
1042
        import pdb
 
1043
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1044
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1045
                ).set_trace(sys._getframe().f_back)
 
1046
 
 
1047
    def discardDetail(self, name):
 
1048
        """Extend the addDetail, getDetails api so we can remove a detail.
 
1049
 
 
1050
        eg. bzr always adds the 'log' detail at startup, but we don't want to
 
1051
        include it for skipped, xfail, etc tests.
 
1052
 
 
1053
        It is safe to call this for a detail that doesn't exist, in case this
 
1054
        gets called multiple times.
 
1055
        """
 
1056
        # We cheat. details is stored in __details which means we shouldn't
 
1057
        # touch it. but getDetails() returns the dict directly, so we can
 
1058
        # mutate it.
 
1059
        details = self.getDetails()
 
1060
        if name in details:
 
1061
            del details[name]
 
1062
 
 
1063
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1064
        """Install a counting hook.
 
1065
 
 
1066
        Any hook can be counted as long as it doesn't need to return a value.
 
1067
 
 
1068
        :param hooks: Where the hook should be installed.
 
1069
 
 
1070
        :param name: The hook name that will be counted.
 
1071
 
 
1072
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1073
            to ``name``.
 
1074
        """
 
1075
        _counters = self._counters # Avoid closing over self
 
1076
        if counter_name is None:
 
1077
            counter_name = name
 
1078
        if _counters.has_key(counter_name):
 
1079
            raise AssertionError('%s is already used as a counter name'
 
1080
                                  % (counter_name,))
 
1081
        _counters[counter_name] = 0
 
1082
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1083
            lambda: ['%d' % (_counters[counter_name],)]))
 
1084
        def increment_counter(*args, **kwargs):
 
1085
            _counters[counter_name] += 1
 
1086
        label = 'count %s calls' % (counter_name,)
 
1087
        hooks.install_named_hook(name, increment_counter, label)
 
1088
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1089
 
 
1090
    def _install_config_stats_hooks(self):
 
1091
        """Install config hooks to count hook calls.
 
1092
 
 
1093
        """
 
1094
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1095
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1096
                                       'config.%s' % (hook_name,))
 
1097
 
 
1098
        # The OldConfigHooks are private and need special handling to protect
 
1099
        # against recursive tests (tests that run other tests), so we just do
 
1100
        # manually what registering them into _builtin_known_hooks will provide
 
1101
        # us.
 
1102
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1103
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1104
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1105
                                      'old_config.%s' % (hook_name,))
 
1106
 
 
1107
    def _clear_debug_flags(self):
 
1108
        """Prevent externally set debug flags affecting tests.
 
1109
 
 
1110
        Tests that want to use debug flags can just set them in the
 
1111
        debug_flags set during setup/teardown.
 
1112
        """
 
1113
        # Start with a copy of the current debug flags we can safely modify.
 
1114
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
1115
        if 'allow_debug' not in selftest_debug_flags:
 
1116
            debug.debug_flags.clear()
 
1117
        if 'disable_lock_checks' not in selftest_debug_flags:
 
1118
            debug.debug_flags.add('strict_locks')
 
1119
 
 
1120
    def _clear_hooks(self):
 
1121
        # prevent hooks affecting tests
 
1122
        known_hooks = hooks.known_hooks
 
1123
        self._preserved_hooks = {}
 
1124
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1125
            current_hooks = getattr(parent, name)
 
1126
            self._preserved_hooks[parent] = (name, current_hooks)
 
1127
        self._preserved_lazy_hooks = hooks._lazy_hooks
 
1128
        hooks._lazy_hooks = {}
 
1129
        self.addCleanup(self._restoreHooks)
 
1130
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1131
            factory = known_hooks.get(key)
 
1132
            setattr(parent, name, factory())
 
1133
        # this hook should always be installed
 
1134
        request._install_hook()
 
1135
 
 
1136
    def disable_directory_isolation(self):
 
1137
        """Turn off directory isolation checks."""
 
1138
        self._directory_isolation = False
 
1139
 
 
1140
    def enable_directory_isolation(self):
 
1141
        """Enable directory isolation checks."""
 
1142
        self._directory_isolation = True
 
1143
 
 
1144
    def _silenceUI(self):
 
1145
        """Turn off UI for duration of test"""
 
1146
        # by default the UI is off; tests can turn it on if they want it.
 
1147
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
1148
 
 
1149
    def _check_locks(self):
 
1150
        """Check that all lock take/release actions have been paired."""
 
1151
        # We always check for mismatched locks. If a mismatch is found, we
 
1152
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
1153
        # case we just print a warning.
 
1154
        # unhook:
 
1155
        acquired_locks = [lock for action, lock in self._lock_actions
 
1156
                          if action == 'acquired']
 
1157
        released_locks = [lock for action, lock in self._lock_actions
 
1158
                          if action == 'released']
 
1159
        broken_locks = [lock for action, lock in self._lock_actions
 
1160
                        if action == 'broken']
 
1161
        # trivially, given the tests for lock acquistion and release, if we
 
1162
        # have as many in each list, it should be ok. Some lock tests also
 
1163
        # break some locks on purpose and should be taken into account by
 
1164
        # considering that breaking a lock is just a dirty way of releasing it.
 
1165
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
1166
            message = (
 
1167
                'Different number of acquired and '
 
1168
                'released or broken locks.\n'
 
1169
                'acquired=%s\n'
 
1170
                'released=%s\n'
 
1171
                'broken=%s\n' %
 
1172
                (acquired_locks, released_locks, broken_locks))
 
1173
            if not self._lock_check_thorough:
 
1174
                # Rather than fail, just warn
 
1175
                print "Broken test %s: %s" % (self, message)
 
1176
                return
 
1177
            self.fail(message)
 
1178
 
 
1179
    def _track_locks(self):
 
1180
        """Track lock activity during tests."""
 
1181
        self._lock_actions = []
 
1182
        if 'disable_lock_checks' in selftest_debug_flags:
 
1183
            self._lock_check_thorough = False
 
1184
        else:
 
1185
            self._lock_check_thorough = True
 
1186
 
 
1187
        self.addCleanup(self._check_locks)
 
1188
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
1189
                                                self._lock_acquired, None)
 
1190
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
1191
                                                self._lock_released, None)
 
1192
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
1193
                                                self._lock_broken, None)
 
1194
 
 
1195
    def _lock_acquired(self, result):
 
1196
        self._lock_actions.append(('acquired', result))
 
1197
 
 
1198
    def _lock_released(self, result):
 
1199
        self._lock_actions.append(('released', result))
 
1200
 
 
1201
    def _lock_broken(self, result):
 
1202
        self._lock_actions.append(('broken', result))
 
1203
 
 
1204
    def permit_dir(self, name):
 
1205
        """Permit a directory to be used by this test. See permit_url."""
 
1206
        name_transport = _mod_transport.get_transport_from_path(name)
 
1207
        self.permit_url(name)
 
1208
        self.permit_url(name_transport.base)
 
1209
 
 
1210
    def permit_url(self, url):
 
1211
        """Declare that url is an ok url to use in this test.
 
1212
        
 
1213
        Do this for memory transports, temporary test directory etc.
 
1214
        
 
1215
        Do not do this for the current working directory, /tmp, or any other
 
1216
        preexisting non isolated url.
 
1217
        """
 
1218
        if not url.endswith('/'):
 
1219
            url += '/'
 
1220
        self._bzr_selftest_roots.append(url)
 
1221
 
 
1222
    def permit_source_tree_branch_repo(self):
 
1223
        """Permit the source tree bzr is running from to be opened.
 
1224
 
 
1225
        Some code such as bzrlib.version attempts to read from the bzr branch
 
1226
        that bzr is executing from (if any). This method permits that directory
 
1227
        to be used in the test suite.
 
1228
        """
 
1229
        path = self.get_source_path()
 
1230
        self.record_directory_isolation()
 
1231
        try:
 
1232
            try:
 
1233
                workingtree.WorkingTree.open(path)
 
1234
            except (errors.NotBranchError, errors.NoWorkingTree):
 
1235
                raise TestSkipped('Needs a working tree of bzr sources')
 
1236
        finally:
 
1237
            self.enable_directory_isolation()
 
1238
 
 
1239
    def _preopen_isolate_transport(self, transport):
 
1240
        """Check that all transport openings are done in the test work area."""
 
1241
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
1242
            # Unwrap pathfiltered transports
 
1243
            transport = transport.server.backing_transport.clone(
 
1244
                transport._filter('.'))
 
1245
        url = transport.base
 
1246
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1247
        # urls it is given by prepending readonly+. This is appropriate as the
 
1248
        # client shouldn't know that the server is readonly (or not readonly).
 
1249
        # We could register all servers twice, with readonly+ prepending, but
 
1250
        # that makes for a long list; this is about the same but easier to
 
1251
        # read.
 
1252
        if url.startswith('readonly+'):
 
1253
            url = url[len('readonly+'):]
 
1254
        self._preopen_isolate_url(url)
 
1255
 
 
1256
    def _preopen_isolate_url(self, url):
 
1257
        if not self._directory_isolation:
 
1258
            return
 
1259
        if self._directory_isolation == 'record':
 
1260
            self._bzr_selftest_roots.append(url)
 
1261
            return
 
1262
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1263
        # from working unless they are explicitly granted permission. We then
 
1264
        # depend on the code that sets up test transports to check that they are
 
1265
        # appropriately isolated and enable their use by calling
 
1266
        # self.permit_transport()
 
1267
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1268
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1269
                % (url, self._bzr_selftest_roots))
 
1270
 
 
1271
    def record_directory_isolation(self):
 
1272
        """Gather accessed directories to permit later access.
 
1273
        
 
1274
        This is used for tests that access the branch bzr is running from.
 
1275
        """
 
1276
        self._directory_isolation = "record"
 
1277
 
 
1278
    def start_server(self, transport_server, backing_server=None):
 
1279
        """Start transport_server for this test.
 
1280
 
 
1281
        This starts the server, registers a cleanup for it and permits the
 
1282
        server's urls to be used.
 
1283
        """
 
1284
        if backing_server is None:
 
1285
            transport_server.start_server()
 
1286
        else:
 
1287
            transport_server.start_server(backing_server)
 
1288
        self.addCleanup(transport_server.stop_server)
 
1289
        # Obtain a real transport because if the server supplies a password, it
 
1290
        # will be hidden from the base on the client side.
 
1291
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1292
        # Some transport servers effectively chroot the backing transport;
 
1293
        # others like SFTPServer don't - users of the transport can walk up the
 
1294
        # transport to read the entire backing transport. This wouldn't matter
 
1295
        # except that the workdir tests are given - and that they expect the
 
1296
        # server's url to point at - is one directory under the safety net. So
 
1297
        # Branch operations into the transport will attempt to walk up one
 
1298
        # directory. Chrooting all servers would avoid this but also mean that
 
1299
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1300
        # getting the test framework to start the server with a backing server
 
1301
        # at the actual safety net directory would work too, but this then
 
1302
        # means that the self.get_url/self.get_transport methods would need
 
1303
        # to transform all their results. On balance its cleaner to handle it
 
1304
        # here, and permit a higher url when we have one of these transports.
 
1305
        if t.base.endswith('/work/'):
 
1306
            # we have safety net/test root/work
 
1307
            t = t.clone('../..')
 
1308
        elif isinstance(transport_server,
 
1309
                        test_server.SmartTCPServer_for_testing):
 
1310
            # The smart server adds a path similar to work, which is traversed
 
1311
            # up from by the client. But the server is chrooted - the actual
 
1312
            # backing transport is not escaped from, and VFS requests to the
 
1313
            # root will error (because they try to escape the chroot).
 
1314
            t2 = t.clone('..')
 
1315
            while t2.base != t.base:
 
1316
                t = t2
 
1317
                t2 = t.clone('..')
 
1318
        self.permit_url(t.base)
 
1319
 
 
1320
    def _track_transports(self):
 
1321
        """Install checks for transport usage."""
 
1322
        # TestCase has no safe place it can write to.
 
1323
        self._bzr_selftest_roots = []
 
1324
        # Currently the easiest way to be sure that nothing is going on is to
 
1325
        # hook into bzr dir opening. This leaves a small window of error for
 
1326
        # transport tests, but they are well known, and we can improve on this
 
1327
        # step.
 
1328
        controldir.ControlDir.hooks.install_named_hook("pre_open",
 
1329
            self._preopen_isolate_transport, "Check bzr directories are safe.")
 
1330
 
 
1331
    def _ndiff_strings(self, a, b):
 
1332
        """Return ndiff between two strings containing lines.
 
1333
 
 
1334
        A trailing newline is added if missing to make the strings
 
1335
        print properly."""
 
1336
        if b and b[-1] != '\n':
 
1337
            b += '\n'
 
1338
        if a and a[-1] != '\n':
 
1339
            a += '\n'
 
1340
        difflines = difflib.ndiff(a.splitlines(True),
 
1341
                                  b.splitlines(True),
 
1342
                                  linejunk=lambda x: False,
 
1343
                                  charjunk=lambda x: False)
 
1344
        return ''.join(difflines)
 
1345
 
 
1346
    def assertEqual(self, a, b, message=''):
 
1347
        try:
 
1348
            if a == b:
 
1349
                return
 
1350
        except UnicodeError, e:
 
1351
            # If we can't compare without getting a UnicodeError, then
 
1352
            # obviously they are different
 
1353
            trace.mutter('UnicodeError: %s', e)
 
1354
        if message:
 
1355
            message += '\n'
 
1356
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1357
            % (message,
 
1358
               pprint.pformat(a), pprint.pformat(b)))
 
1359
 
 
1360
    assertEquals = assertEqual
 
1361
 
 
1362
    def assertEqualDiff(self, a, b, message=None):
 
1363
        """Assert two texts are equal, if not raise an exception.
 
1364
 
 
1365
        This is intended for use with multi-line strings where it can
 
1366
        be hard to find the differences by eye.
 
1367
        """
 
1368
        # TODO: perhaps override assertEquals to call this for strings?
 
1369
        if a == b:
 
1370
            return
 
1371
        if message is None:
 
1372
            message = "texts not equal:\n"
 
1373
        if a + '\n' == b:
 
1374
            message = 'first string is missing a final newline.\n'
 
1375
        if a == b + '\n':
 
1376
            message = 'second string is missing a final newline.\n'
 
1377
        raise AssertionError(message +
 
1378
                             self._ndiff_strings(a, b))
 
1379
 
 
1380
    def assertEqualMode(self, mode, mode_test):
 
1381
        self.assertEqual(mode, mode_test,
 
1382
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1383
 
 
1384
    def assertEqualStat(self, expected, actual):
 
1385
        """assert that expected and actual are the same stat result.
 
1386
 
 
1387
        :param expected: A stat result.
 
1388
        :param actual: A stat result.
 
1389
        :raises AssertionError: If the expected and actual stat values differ
 
1390
            other than by atime.
 
1391
        """
 
1392
        self.assertEqual(expected.st_size, actual.st_size,
 
1393
                         'st_size did not match')
 
1394
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1395
                         'st_mtime did not match')
 
1396
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1397
                         'st_ctime did not match')
 
1398
        if sys.platform == 'win32':
 
1399
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1400
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1401
            # odd. We just force it to always be 0 to avoid any problems.
 
1402
            self.assertEqual(0, expected.st_dev)
 
1403
            self.assertEqual(0, actual.st_dev)
 
1404
            self.assertEqual(0, expected.st_ino)
 
1405
            self.assertEqual(0, actual.st_ino)
 
1406
        else:
 
1407
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1408
                             'st_dev did not match')
 
1409
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1410
                             'st_ino did not match')
 
1411
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1412
                         'st_mode did not match')
 
1413
 
 
1414
    def assertLength(self, length, obj_with_len):
 
1415
        """Assert that obj_with_len is of length length."""
 
1416
        if len(obj_with_len) != length:
 
1417
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1418
                length, len(obj_with_len), obj_with_len))
 
1419
 
 
1420
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1421
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1422
        """
 
1423
        captured = []
 
1424
        orig_log_exception_quietly = trace.log_exception_quietly
 
1425
        try:
 
1426
            def capture():
 
1427
                orig_log_exception_quietly()
 
1428
                captured.append(sys.exc_info()[1])
 
1429
            trace.log_exception_quietly = capture
 
1430
            func(*args, **kwargs)
 
1431
        finally:
 
1432
            trace.log_exception_quietly = orig_log_exception_quietly
 
1433
        self.assertLength(1, captured)
 
1434
        err = captured[0]
 
1435
        self.assertIsInstance(err, exception_class)
 
1436
        return err
 
1437
 
 
1438
    def assertPositive(self, val):
 
1439
        """Assert that val is greater than 0."""
 
1440
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1441
 
 
1442
    def assertNegative(self, val):
 
1443
        """Assert that val is less than 0."""
 
1444
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1445
 
 
1446
    def assertStartsWith(self, s, prefix):
 
1447
        if not s.startswith(prefix):
 
1448
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1449
 
 
1450
    def assertEndsWith(self, s, suffix):
 
1451
        """Asserts that s ends with suffix."""
 
1452
        if not s.endswith(suffix):
 
1453
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1454
 
 
1455
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1456
        """Assert that a contains something matching a regular expression."""
 
1457
        if not re.search(needle_re, haystack, flags):
 
1458
            if '\n' in haystack or len(haystack) > 60:
 
1459
                # a long string, format it in a more readable way
 
1460
                raise AssertionError(
 
1461
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1462
                        % (needle_re, haystack))
 
1463
            else:
 
1464
                raise AssertionError('pattern "%s" not found in "%s"'
 
1465
                        % (needle_re, haystack))
 
1466
 
 
1467
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1468
        """Assert that a does not match a regular expression"""
 
1469
        if re.search(needle_re, haystack, flags):
 
1470
            raise AssertionError('pattern "%s" found in "%s"'
 
1471
                    % (needle_re, haystack))
 
1472
 
 
1473
    def assertContainsString(self, haystack, needle):
 
1474
        if haystack.find(needle) == -1:
 
1475
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1476
 
 
1477
    def assertNotContainsString(self, haystack, needle):
 
1478
        if haystack.find(needle) != -1:
 
1479
            self.fail("string %r found in '''%s'''" % (needle, haystack))
 
1480
 
 
1481
    def assertSubset(self, sublist, superlist):
 
1482
        """Assert that every entry in sublist is present in superlist."""
 
1483
        missing = set(sublist) - set(superlist)
 
1484
        if len(missing) > 0:
 
1485
            raise AssertionError("value(s) %r not present in container %r" %
 
1486
                                 (missing, superlist))
 
1487
 
 
1488
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1489
        """Fail unless excClass is raised when the iterator from func is used.
 
1490
 
 
1491
        Many functions can return generators this makes sure
 
1492
        to wrap them in a list() call to make sure the whole generator
 
1493
        is run, and that the proper exception is raised.
 
1494
        """
 
1495
        try:
 
1496
            list(func(*args, **kwargs))
 
1497
        except excClass, e:
 
1498
            return e
 
1499
        else:
 
1500
            if getattr(excClass,'__name__', None) is not None:
 
1501
                excName = excClass.__name__
 
1502
            else:
 
1503
                excName = str(excClass)
 
1504
            raise self.failureException, "%s not raised" % excName
 
1505
 
 
1506
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1507
        """Assert that a callable raises a particular exception.
 
1508
 
 
1509
        :param excClass: As for the except statement, this may be either an
 
1510
            exception class, or a tuple of classes.
 
1511
        :param callableObj: A callable, will be passed ``*args`` and
 
1512
            ``**kwargs``.
 
1513
 
 
1514
        Returns the exception so that you can examine it.
 
1515
        """
 
1516
        try:
 
1517
            callableObj(*args, **kwargs)
 
1518
        except excClass, e:
 
1519
            return e
 
1520
        else:
 
1521
            if getattr(excClass,'__name__', None) is not None:
 
1522
                excName = excClass.__name__
 
1523
            else:
 
1524
                # probably a tuple
 
1525
                excName = str(excClass)
 
1526
            raise self.failureException, "%s not raised" % excName
 
1527
 
 
1528
    def assertIs(self, left, right, message=None):
 
1529
        if not (left is right):
 
1530
            if message is not None:
 
1531
                raise AssertionError(message)
 
1532
            else:
 
1533
                raise AssertionError("%r is not %r." % (left, right))
 
1534
 
 
1535
    def assertIsNot(self, left, right, message=None):
 
1536
        if (left is right):
 
1537
            if message is not None:
 
1538
                raise AssertionError(message)
 
1539
            else:
 
1540
                raise AssertionError("%r is %r." % (left, right))
 
1541
 
 
1542
    def assertTransportMode(self, transport, path, mode):
 
1543
        """Fail if a path does not have mode "mode".
 
1544
 
 
1545
        If modes are not supported on this transport, the assertion is ignored.
 
1546
        """
 
1547
        if not transport._can_roundtrip_unix_modebits():
 
1548
            return
 
1549
        path_stat = transport.stat(path)
 
1550
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1551
        self.assertEqual(mode, actual_mode,
 
1552
                         'mode of %r incorrect (%s != %s)'
 
1553
                         % (path, oct(mode), oct(actual_mode)))
 
1554
 
 
1555
    def assertIsSameRealPath(self, path1, path2):
 
1556
        """Fail if path1 and path2 points to different files"""
 
1557
        self.assertEqual(osutils.realpath(path1),
 
1558
                         osutils.realpath(path2),
 
1559
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1560
 
 
1561
    def assertIsInstance(self, obj, kls, msg=None):
 
1562
        """Fail if obj is not an instance of kls
 
1563
        
 
1564
        :param msg: Supplementary message to show if the assertion fails.
 
1565
        """
 
1566
        if not isinstance(obj, kls):
 
1567
            m = "%r is an instance of %s rather than %s" % (
 
1568
                obj, obj.__class__, kls)
 
1569
            if msg:
 
1570
                m += ": " + msg
 
1571
            self.fail(m)
 
1572
 
 
1573
    def assertFileEqual(self, content, path):
 
1574
        """Fail if path does not contain 'content'."""
 
1575
        self.assertPathExists(path)
 
1576
        f = file(path, 'rb')
 
1577
        try:
 
1578
            s = f.read()
 
1579
        finally:
 
1580
            f.close()
 
1581
        self.assertEqualDiff(content, s)
 
1582
 
 
1583
    def assertDocstring(self, expected_docstring, obj):
 
1584
        """Fail if obj does not have expected_docstring"""
 
1585
        if __doc__ is None:
 
1586
            # With -OO the docstring should be None instead
 
1587
            self.assertIs(obj.__doc__, None)
 
1588
        else:
 
1589
            self.assertEqual(expected_docstring, obj.__doc__)
 
1590
 
 
1591
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
 
1592
    def failUnlessExists(self, path):
 
1593
        return self.assertPathExists(path)
 
1594
 
 
1595
    def assertPathExists(self, path):
 
1596
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1597
        if not isinstance(path, basestring):
 
1598
            for p in path:
 
1599
                self.assertPathExists(p)
 
1600
        else:
 
1601
            self.assertTrue(osutils.lexists(path),
 
1602
                path + " does not exist")
 
1603
 
 
1604
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
 
1605
    def failIfExists(self, path):
 
1606
        return self.assertPathDoesNotExist(path)
 
1607
 
 
1608
    def assertPathDoesNotExist(self, path):
 
1609
        """Fail if path or paths, which may be abs or relative, exist."""
 
1610
        if not isinstance(path, basestring):
 
1611
            for p in path:
 
1612
                self.assertPathDoesNotExist(p)
 
1613
        else:
 
1614
            self.assertFalse(osutils.lexists(path),
 
1615
                path + " exists")
 
1616
 
 
1617
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1618
        """A helper for callDeprecated and applyDeprecated.
 
1619
 
 
1620
        :param a_callable: A callable to call.
 
1621
        :param args: The positional arguments for the callable
 
1622
        :param kwargs: The keyword arguments for the callable
 
1623
        :return: A tuple (warnings, result). result is the result of calling
 
1624
            a_callable(``*args``, ``**kwargs``).
 
1625
        """
 
1626
        local_warnings = []
 
1627
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1628
            # we've hooked into a deprecation specific callpath,
 
1629
            # only deprecations should getting sent via it.
 
1630
            self.assertEqual(cls, DeprecationWarning)
 
1631
            local_warnings.append(msg)
 
1632
        original_warning_method = symbol_versioning.warn
 
1633
        symbol_versioning.set_warning_method(capture_warnings)
 
1634
        try:
 
1635
            result = a_callable(*args, **kwargs)
 
1636
        finally:
 
1637
            symbol_versioning.set_warning_method(original_warning_method)
 
1638
        return (local_warnings, result)
 
1639
 
 
1640
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1641
        """Call a deprecated callable without warning the user.
 
1642
 
 
1643
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1644
        not other callers that go direct to the warning module.
 
1645
 
 
1646
        To test that a deprecated method raises an error, do something like
 
1647
        this (remember that both assertRaises and applyDeprecated delays *args
 
1648
        and **kwargs passing)::
 
1649
 
 
1650
            self.assertRaises(errors.ReservedId,
 
1651
                self.applyDeprecated,
 
1652
                deprecated_in((1, 5, 0)),
 
1653
                br.append_revision,
 
1654
                'current:')
 
1655
 
 
1656
        :param deprecation_format: The deprecation format that the callable
 
1657
            should have been deprecated with. This is the same type as the
 
1658
            parameter to deprecated_method/deprecated_function. If the
 
1659
            callable is not deprecated with this format, an assertion error
 
1660
            will be raised.
 
1661
        :param a_callable: A callable to call. This may be a bound method or
 
1662
            a regular function. It will be called with ``*args`` and
 
1663
            ``**kwargs``.
 
1664
        :param args: The positional arguments for the callable
 
1665
        :param kwargs: The keyword arguments for the callable
 
1666
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1667
        """
 
1668
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1669
            *args, **kwargs)
 
1670
        expected_first_warning = symbol_versioning.deprecation_string(
 
1671
            a_callable, deprecation_format)
 
1672
        if len(call_warnings) == 0:
 
1673
            self.fail("No deprecation warning generated by call to %s" %
 
1674
                a_callable)
 
1675
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1676
        return result
 
1677
 
 
1678
    def callCatchWarnings(self, fn, *args, **kw):
 
1679
        """Call a callable that raises python warnings.
 
1680
 
 
1681
        The caller's responsible for examining the returned warnings.
 
1682
 
 
1683
        If the callable raises an exception, the exception is not
 
1684
        caught and propagates up to the caller.  In that case, the list
 
1685
        of warnings is not available.
 
1686
 
 
1687
        :returns: ([warning_object, ...], fn_result)
 
1688
        """
 
1689
        # XXX: This is not perfect, because it completely overrides the
 
1690
        # warnings filters, and some code may depend on suppressing particular
 
1691
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1692
        # though.  -- Andrew, 20071062
 
1693
        wlist = []
 
1694
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1695
            # despite the name, 'message' is normally(?) a Warning subclass
 
1696
            # instance
 
1697
            wlist.append(message)
 
1698
        saved_showwarning = warnings.showwarning
 
1699
        saved_filters = warnings.filters
 
1700
        try:
 
1701
            warnings.showwarning = _catcher
 
1702
            warnings.filters = []
 
1703
            result = fn(*args, **kw)
 
1704
        finally:
 
1705
            warnings.showwarning = saved_showwarning
 
1706
            warnings.filters = saved_filters
 
1707
        return wlist, result
 
1708
 
 
1709
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1710
        """Assert that a callable is deprecated in a particular way.
 
1711
 
 
1712
        This is a very precise test for unusual requirements. The
 
1713
        applyDeprecated helper function is probably more suited for most tests
 
1714
        as it allows you to simply specify the deprecation format being used
 
1715
        and will ensure that that is issued for the function being called.
 
1716
 
 
1717
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1718
        not other callers that go direct to the warning module.  To catch
 
1719
        general warnings, use callCatchWarnings.
 
1720
 
 
1721
        :param expected: a list of the deprecation warnings expected, in order
 
1722
        :param callable: The callable to call
 
1723
        :param args: The positional arguments for the callable
 
1724
        :param kwargs: The keyword arguments for the callable
 
1725
        """
 
1726
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1727
            *args, **kwargs)
 
1728
        self.assertEqual(expected, call_warnings)
 
1729
        return result
 
1730
 
 
1731
    def _startLogFile(self):
 
1732
        """Setup a in-memory target for bzr and testcase log messages"""
 
1733
        pseudo_log_file = StringIO()
 
1734
        def _get_log_contents_for_weird_testtools_api():
 
1735
            return [pseudo_log_file.getvalue().decode(
 
1736
                "utf-8", "replace").encode("utf-8")]
 
1737
        self.addDetail("log", content.Content(content.ContentType("text",
 
1738
            "plain", {"charset": "utf8"}),
 
1739
            _get_log_contents_for_weird_testtools_api))
 
1740
        self._log_file = pseudo_log_file
 
1741
        self._log_memento = trace.push_log_file(self._log_file)
 
1742
        self.addCleanup(self._finishLogFile)
 
1743
 
 
1744
    def _finishLogFile(self):
 
1745
        """Flush and dereference the in-memory log for this testcase"""
 
1746
        if trace._trace_file:
 
1747
            # flush the log file, to get all content
 
1748
            trace._trace_file.flush()
 
1749
        trace.pop_log_file(self._log_memento)
 
1750
        # The logging module now tracks references for cleanup so discard ours
 
1751
        del self._log_memento
 
1752
 
 
1753
    def thisFailsStrictLockCheck(self):
 
1754
        """It is known that this test would fail with -Dstrict_locks.
 
1755
 
 
1756
        By default, all tests are run with strict lock checking unless
 
1757
        -Edisable_lock_checks is supplied. However there are some tests which
 
1758
        we know fail strict locks at this point that have not been fixed.
 
1759
        They should call this function to disable the strict checking.
 
1760
 
 
1761
        This should be used sparingly, it is much better to fix the locking
 
1762
        issues rather than papering over the problem by calling this function.
 
1763
        """
 
1764
        debug.debug_flags.discard('strict_locks')
 
1765
 
 
1766
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1767
        """Overrides an object attribute restoring it after the test.
 
1768
 
 
1769
        :note: This should be used with discretion; you should think about
 
1770
        whether it's better to make the code testable without monkey-patching.
 
1771
 
 
1772
        :param obj: The object that will be mutated.
 
1773
 
 
1774
        :param attr_name: The attribute name we want to preserve/override in
 
1775
            the object.
 
1776
 
 
1777
        :param new: The optional value we want to set the attribute to.
 
1778
 
 
1779
        :returns: The actual attr value.
 
1780
        """
 
1781
        value = getattr(obj, attr_name)
 
1782
        # The actual value is captured by the call below
 
1783
        self.addCleanup(setattr, obj, attr_name, value)
 
1784
        if new is not _unitialized_attr:
 
1785
            setattr(obj, attr_name, new)
 
1786
        return value
 
1787
 
 
1788
    def overrideEnv(self, name, new):
 
1789
        """Set an environment variable, and reset it after the test.
 
1790
 
 
1791
        :param name: The environment variable name.
 
1792
 
 
1793
        :param new: The value to set the variable to. If None, the 
 
1794
            variable is deleted from the environment.
 
1795
 
 
1796
        :returns: The actual variable value.
 
1797
        """
 
1798
        value = osutils.set_or_unset_env(name, new)
 
1799
        self.addCleanup(osutils.set_or_unset_env, name, value)
 
1800
        return value
 
1801
 
 
1802
    def recordCalls(self, obj, attr_name):
 
1803
        """Monkeypatch in a wrapper that will record calls.
 
1804
 
 
1805
        The monkeypatch is automatically removed when the test concludes.
 
1806
 
 
1807
        :param obj: The namespace holding the reference to be replaced;
 
1808
            typically a module, class, or object.
 
1809
        :param attr_name: A string for the name of the attribute to 
 
1810
            patch.
 
1811
        :returns: A list that will be extended with one item every time the
 
1812
            function is called, with a tuple of (args, kwargs).
 
1813
        """
 
1814
        calls = []
 
1815
 
 
1816
        def decorator(*args, **kwargs):
 
1817
            calls.append((args, kwargs))
 
1818
            return orig(*args, **kwargs)
 
1819
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1820
        return calls
 
1821
 
 
1822
    def _cleanEnvironment(self):
 
1823
        for name, value in isolated_environ.iteritems():
 
1824
            self.overrideEnv(name, value)
 
1825
 
 
1826
    def _restoreHooks(self):
 
1827
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1828
            setattr(klass, name, hooks)
 
1829
        self._preserved_hooks.clear()
 
1830
        bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1831
        self._preserved_lazy_hooks.clear()
 
1832
 
 
1833
    def knownFailure(self, reason):
 
1834
        """Declare that this test fails for a known reason
 
1835
 
 
1836
        Tests that are known to fail should generally be using expectedFailure
 
1837
        with an appropriate reverse assertion if a change could cause the test
 
1838
        to start passing. Conversely if the test has no immediate prospect of
 
1839
        succeeding then using skip is more suitable.
 
1840
 
 
1841
        When this method is called while an exception is being handled, that
 
1842
        traceback will be used, otherwise a new exception will be thrown to
 
1843
        provide one but won't be reported.
 
1844
        """
 
1845
        self._add_reason(reason)
 
1846
        try:
 
1847
            exc_info = sys.exc_info()
 
1848
            if exc_info != (None, None, None):
 
1849
                self._report_traceback(exc_info)
 
1850
            else:
 
1851
                try:
 
1852
                    raise self.failureException(reason)
 
1853
                except self.failureException:
 
1854
                    exc_info = sys.exc_info()
 
1855
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1856
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1857
        finally:
 
1858
            del exc_info
 
1859
 
 
1860
    def _suppress_log(self):
 
1861
        """Remove the log info from details."""
 
1862
        self.discardDetail('log')
 
1863
 
 
1864
    def _do_skip(self, result, reason):
 
1865
        self._suppress_log()
 
1866
        addSkip = getattr(result, 'addSkip', None)
 
1867
        if not callable(addSkip):
 
1868
            result.addSuccess(result)
 
1869
        else:
 
1870
            addSkip(self, reason)
 
1871
 
 
1872
    @staticmethod
 
1873
    def _do_known_failure(self, result, e):
 
1874
        self._suppress_log()
 
1875
        err = sys.exc_info()
 
1876
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1877
        if addExpectedFailure is not None:
 
1878
            addExpectedFailure(self, err)
 
1879
        else:
 
1880
            result.addSuccess(self)
 
1881
 
 
1882
    @staticmethod
 
1883
    def _do_not_applicable(self, result, e):
 
1884
        if not e.args:
 
1885
            reason = 'No reason given'
 
1886
        else:
 
1887
            reason = e.args[0]
 
1888
        self._suppress_log ()
 
1889
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1890
        if addNotApplicable is not None:
 
1891
            result.addNotApplicable(self, reason)
 
1892
        else:
 
1893
            self._do_skip(result, reason)
 
1894
 
 
1895
    @staticmethod
 
1896
    def _report_skip(self, result, err):
 
1897
        """Override the default _report_skip.
 
1898
 
 
1899
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1900
        already been formatted into the 'reason' string, and we can't pull it
 
1901
        out again.
 
1902
        """
 
1903
        self._suppress_log()
 
1904
        super(TestCase, self)._report_skip(self, result, err)
 
1905
 
 
1906
    @staticmethod
 
1907
    def _report_expected_failure(self, result, err):
 
1908
        """Strip the log.
 
1909
 
 
1910
        See _report_skip for motivation.
 
1911
        """
 
1912
        self._suppress_log()
 
1913
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1914
 
 
1915
    @staticmethod
 
1916
    def _do_unsupported_or_skip(self, result, e):
 
1917
        reason = e.args[0]
 
1918
        self._suppress_log()
 
1919
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1920
        if addNotSupported is not None:
 
1921
            result.addNotSupported(self, reason)
 
1922
        else:
 
1923
            self._do_skip(result, reason)
 
1924
 
 
1925
    def time(self, callable, *args, **kwargs):
 
1926
        """Run callable and accrue the time it takes to the benchmark time.
 
1927
 
 
1928
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1929
        this will cause lsprofile statistics to be gathered and stored in
 
1930
        self._benchcalls.
 
1931
        """
 
1932
        if self._benchtime is None:
 
1933
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1934
                "text", "plain"), lambda:[str(self._benchtime)]))
 
1935
            self._benchtime = 0
 
1936
        start = time.time()
 
1937
        try:
 
1938
            if not self._gather_lsprof_in_benchmarks:
 
1939
                return callable(*args, **kwargs)
 
1940
            else:
 
1941
                # record this benchmark
 
1942
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1943
                stats.sort()
 
1944
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1945
                return ret
 
1946
        finally:
 
1947
            self._benchtime += time.time() - start
 
1948
 
 
1949
    def log(self, *args):
 
1950
        trace.mutter(*args)
 
1951
 
 
1952
    def get_log(self):
 
1953
        """Get a unicode string containing the log from bzrlib.trace.
 
1954
 
 
1955
        Undecodable characters are replaced.
 
1956
        """
 
1957
        return u"".join(self.getDetails()['log'].iter_text())
 
1958
 
 
1959
    def requireFeature(self, feature):
 
1960
        """This test requires a specific feature is available.
 
1961
 
 
1962
        :raises UnavailableFeature: When feature is not available.
 
1963
        """
 
1964
        if not feature.available():
 
1965
            raise UnavailableFeature(feature)
 
1966
 
 
1967
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1968
            working_dir):
 
1969
        """Run bazaar command line, splitting up a string command line."""
 
1970
        if isinstance(args, basestring):
 
1971
            # shlex don't understand unicode strings,
 
1972
            # so args should be plain string (bialix 20070906)
 
1973
            args = list(shlex.split(str(args)))
 
1974
        return self._run_bzr_core(args, retcode=retcode,
 
1975
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1976
                )
 
1977
 
 
1978
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1979
            working_dir):
 
1980
        # Clear chk_map page cache, because the contents are likely to mask
 
1981
        # locking errors.
 
1982
        chk_map.clear_cache()
 
1983
        if encoding is None:
 
1984
            encoding = osutils.get_user_encoding()
 
1985
        stdout = StringIOWrapper()
 
1986
        stderr = StringIOWrapper()
 
1987
        stdout.encoding = encoding
 
1988
        stderr.encoding = encoding
 
1989
 
 
1990
        self.log('run bzr: %r', args)
 
1991
        # FIXME: don't call into logging here
 
1992
        handler = trace.EncodedStreamHandler(stderr, errors="replace",
 
1993
            level=logging.INFO)
 
1994
        logger = logging.getLogger('')
 
1995
        logger.addHandler(handler)
 
1996
        old_ui_factory = ui.ui_factory
 
1997
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1998
 
 
1999
        cwd = None
 
2000
        if working_dir is not None:
 
2001
            cwd = osutils.getcwd()
 
2002
            os.chdir(working_dir)
 
2003
 
 
2004
        try:
 
2005
            try:
 
2006
                result = self.apply_redirected(
 
2007
                    ui.ui_factory.stdin,
 
2008
                    stdout, stderr,
 
2009
                    _mod_commands.run_bzr_catch_user_errors,
 
2010
                    args)
 
2011
            except KeyboardInterrupt:
 
2012
                # Reraise KeyboardInterrupt with contents of redirected stdout
 
2013
                # and stderr as arguments, for tests which are interested in
 
2014
                # stdout and stderr and are expecting the exception.
 
2015
                out = stdout.getvalue()
 
2016
                err = stderr.getvalue()
 
2017
                if out:
 
2018
                    self.log('output:\n%r', out)
 
2019
                if err:
 
2020
                    self.log('errors:\n%r', err)
 
2021
                raise KeyboardInterrupt(out, err)
 
2022
        finally:
 
2023
            logger.removeHandler(handler)
 
2024
            ui.ui_factory = old_ui_factory
 
2025
            if cwd is not None:
 
2026
                os.chdir(cwd)
 
2027
 
 
2028
        out = stdout.getvalue()
 
2029
        err = stderr.getvalue()
 
2030
        if out:
 
2031
            self.log('output:\n%r', out)
 
2032
        if err:
 
2033
            self.log('errors:\n%r', err)
 
2034
        if retcode is not None:
 
2035
            self.assertEquals(retcode, result,
 
2036
                              message='Unexpected return code')
 
2037
        return result, out, err
 
2038
 
 
2039
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
2040
                working_dir=None, error_regexes=[], output_encoding=None):
 
2041
        """Invoke bzr, as if it were run from the command line.
 
2042
 
 
2043
        The argument list should not include the bzr program name - the
 
2044
        first argument is normally the bzr command.  Arguments may be
 
2045
        passed in three ways:
 
2046
 
 
2047
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
2048
        when the command contains whitespace or metacharacters, or
 
2049
        is built up at run time.
 
2050
 
 
2051
        2- A single string, eg "add a".  This is the most convenient
 
2052
        for hardcoded commands.
 
2053
 
 
2054
        This runs bzr through the interface that catches and reports
 
2055
        errors, and with logging set to something approximating the
 
2056
        default, so that error reporting can be checked.
 
2057
 
 
2058
        This should be the main method for tests that want to exercise the
 
2059
        overall behavior of the bzr application (rather than a unit test
 
2060
        or a functional test of the library.)
 
2061
 
 
2062
        This sends the stdout/stderr results into the test's log,
 
2063
        where it may be useful for debugging.  See also run_captured.
 
2064
 
 
2065
        :keyword stdin: A string to be used as stdin for the command.
 
2066
        :keyword retcode: The status code the command should return;
 
2067
            default 0.
 
2068
        :keyword working_dir: The directory to run the command in
 
2069
        :keyword error_regexes: A list of expected error messages.  If
 
2070
            specified they must be seen in the error output of the command.
 
2071
        """
 
2072
        retcode, out, err = self._run_bzr_autosplit(
 
2073
            args=args,
 
2074
            retcode=retcode,
 
2075
            encoding=encoding,
 
2076
            stdin=stdin,
 
2077
            working_dir=working_dir,
 
2078
            )
 
2079
        self.assertIsInstance(error_regexes, (list, tuple))
 
2080
        for regex in error_regexes:
 
2081
            self.assertContainsRe(err, regex)
 
2082
        return out, err
 
2083
 
 
2084
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
2085
        """Run bzr, and check that stderr contains the supplied regexes
 
2086
 
 
2087
        :param error_regexes: Sequence of regular expressions which
 
2088
            must each be found in the error output. The relative ordering
 
2089
            is not enforced.
 
2090
        :param args: command-line arguments for bzr
 
2091
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
2092
            This function changes the default value of retcode to be 3,
 
2093
            since in most cases this is run when you expect bzr to fail.
 
2094
 
 
2095
        :return: (out, err) The actual output of running the command (in case
 
2096
            you want to do more inspection)
 
2097
 
 
2098
        Examples of use::
 
2099
 
 
2100
            # Make sure that commit is failing because there is nothing to do
 
2101
            self.run_bzr_error(['no changes to commit'],
 
2102
                               ['commit', '-m', 'my commit comment'])
 
2103
            # Make sure --strict is handling an unknown file, rather than
 
2104
            # giving us the 'nothing to do' error
 
2105
            self.build_tree(['unknown'])
 
2106
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
2107
                               ['commit', --strict', '-m', 'my commit comment'])
 
2108
        """
 
2109
        kwargs.setdefault('retcode', 3)
 
2110
        kwargs['error_regexes'] = error_regexes
 
2111
        out, err = self.run_bzr(*args, **kwargs)
 
2112
        return out, err
 
2113
 
 
2114
    def run_bzr_subprocess(self, *args, **kwargs):
 
2115
        """Run bzr in a subprocess for testing.
 
2116
 
 
2117
        This starts a new Python interpreter and runs bzr in there.
 
2118
        This should only be used for tests that have a justifiable need for
 
2119
        this isolation: e.g. they are testing startup time, or signal
 
2120
        handling, or early startup code, etc.  Subprocess code can't be
 
2121
        profiled or debugged so easily.
 
2122
 
 
2123
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
2124
            None is supplied, the status code is not checked.
 
2125
        :keyword env_changes: A dictionary which lists changes to environment
 
2126
            variables. A value of None will unset the env variable.
 
2127
            The values must be strings. The change will only occur in the
 
2128
            child, so you don't need to fix the environment after running.
 
2129
        :keyword universal_newlines: Convert CRLF => LF
 
2130
        :keyword allow_plugins: By default the subprocess is run with
 
2131
            --no-plugins to ensure test reproducibility. Also, it is possible
 
2132
            for system-wide plugins to create unexpected output on stderr,
 
2133
            which can cause unnecessary test failures.
 
2134
        """
 
2135
        env_changes = kwargs.get('env_changes', {})
 
2136
        working_dir = kwargs.get('working_dir', None)
 
2137
        allow_plugins = kwargs.get('allow_plugins', False)
 
2138
        if len(args) == 1:
 
2139
            if isinstance(args[0], list):
 
2140
                args = args[0]
 
2141
            elif isinstance(args[0], basestring):
 
2142
                args = list(shlex.split(args[0]))
 
2143
        else:
 
2144
            raise ValueError("passing varargs to run_bzr_subprocess")
 
2145
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
2146
                                            working_dir=working_dir,
 
2147
                                            allow_plugins=allow_plugins)
 
2148
        # We distinguish between retcode=None and retcode not passed.
 
2149
        supplied_retcode = kwargs.get('retcode', 0)
 
2150
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
2151
            universal_newlines=kwargs.get('universal_newlines', False),
 
2152
            process_args=args)
 
2153
 
 
2154
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2155
                             skip_if_plan_to_signal=False,
 
2156
                             working_dir=None,
 
2157
                             allow_plugins=False, stderr=subprocess.PIPE):
 
2158
        """Start bzr in a subprocess for testing.
 
2159
 
 
2160
        This starts a new Python interpreter and runs bzr in there.
 
2161
        This should only be used for tests that have a justifiable need for
 
2162
        this isolation: e.g. they are testing startup time, or signal
 
2163
        handling, or early startup code, etc.  Subprocess code can't be
 
2164
        profiled or debugged so easily.
 
2165
 
 
2166
        :param process_args: a list of arguments to pass to the bzr executable,
 
2167
            for example ``['--version']``.
 
2168
        :param env_changes: A dictionary which lists changes to environment
 
2169
            variables. A value of None will unset the env variable.
 
2170
            The values must be strings. The change will only occur in the
 
2171
            child, so you don't need to fix the environment after running.
 
2172
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
2173
            doesn't support signalling subprocesses.
 
2174
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
2175
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2176
            are those valid for the stderr argument of `subprocess.Popen`.
 
2177
            Default value is ``subprocess.PIPE``.
 
2178
 
 
2179
        :returns: Popen object for the started process.
 
2180
        """
 
2181
        if skip_if_plan_to_signal:
 
2182
            if os.name != "posix":
 
2183
                raise TestSkipped("Sending signals not supported")
 
2184
 
 
2185
        if env_changes is None:
 
2186
            env_changes = {}
 
2187
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2188
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2189
        # gets set to the computed directory of this parent process.
 
2190
        if site.USER_BASE is not None:
 
2191
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
 
2192
        old_env = {}
 
2193
 
 
2194
        def cleanup_environment():
 
2195
            for env_var, value in env_changes.iteritems():
 
2196
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
2197
 
 
2198
        def restore_environment():
 
2199
            for env_var, value in old_env.iteritems():
 
2200
                osutils.set_or_unset_env(env_var, value)
 
2201
 
 
2202
        bzr_path = self.get_bzr_path()
 
2203
 
 
2204
        cwd = None
 
2205
        if working_dir is not None:
 
2206
            cwd = osutils.getcwd()
 
2207
            os.chdir(working_dir)
 
2208
 
 
2209
        try:
 
2210
            # win32 subprocess doesn't support preexec_fn
 
2211
            # so we will avoid using it on all platforms, just to
 
2212
            # make sure the code path is used, and we don't break on win32
 
2213
            cleanup_environment()
 
2214
            # Include the subprocess's log file in the test details, in case
 
2215
            # the test fails due to an error in the subprocess.
 
2216
            self._add_subprocess_log(trace._get_bzr_log_filename())
 
2217
            command = [sys.executable]
 
2218
            # frozen executables don't need the path to bzr
 
2219
            if getattr(sys, "frozen", None) is None:
 
2220
                command.append(bzr_path)
 
2221
            if not allow_plugins:
 
2222
                command.append('--no-plugins')
 
2223
            command.extend(process_args)
 
2224
            process = self._popen(command, stdin=subprocess.PIPE,
 
2225
                                  stdout=subprocess.PIPE,
 
2226
                                  stderr=stderr)
 
2227
        finally:
 
2228
            restore_environment()
 
2229
            if cwd is not None:
 
2230
                os.chdir(cwd)
 
2231
 
 
2232
        return process
 
2233
 
 
2234
    def _add_subprocess_log(self, log_file_path):
 
2235
        if len(self._log_files) == 0:
 
2236
            # Register an addCleanup func.  We do this on the first call to
 
2237
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2238
            # addCleanup is registered after any cleanups for tempdirs that
 
2239
            # subclasses might create, which will probably remove the log file
 
2240
            # we want to read.
 
2241
            self.addCleanup(self._subprocess_log_cleanup)
 
2242
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2243
        # twice.
 
2244
        self._log_files.add(log_file_path)
 
2245
 
 
2246
    def _subprocess_log_cleanup(self):
 
2247
        for count, log_file_path in enumerate(self._log_files):
 
2248
            # We use buffer_now=True to avoid holding the file open beyond
 
2249
            # the life of this function, which might interfere with e.g.
 
2250
            # cleaning tempdirs on Windows.
 
2251
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2252
            #detail_content = content.content_from_file(
 
2253
            #    log_file_path, buffer_now=True)
 
2254
            with open(log_file_path, 'rb') as log_file:
 
2255
                log_file_bytes = log_file.read()
 
2256
            detail_content = content.Content(content.ContentType("text",
 
2257
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2258
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2259
                detail_content)
 
2260
 
 
2261
    def _popen(self, *args, **kwargs):
 
2262
        """Place a call to Popen.
 
2263
 
 
2264
        Allows tests to override this method to intercept the calls made to
 
2265
        Popen for introspection.
 
2266
        """
 
2267
        return subprocess.Popen(*args, **kwargs)
 
2268
 
 
2269
    def get_source_path(self):
 
2270
        """Return the path of the directory containing bzrlib."""
 
2271
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2272
 
 
2273
    def get_bzr_path(self):
 
2274
        """Return the path of the 'bzr' executable for this test suite."""
 
2275
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
2276
        if not os.path.isfile(bzr_path):
 
2277
            # We are probably installed. Assume sys.argv is the right file
 
2278
            bzr_path = sys.argv[0]
 
2279
        return bzr_path
 
2280
 
 
2281
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2282
                              universal_newlines=False, process_args=None):
 
2283
        """Finish the execution of process.
 
2284
 
 
2285
        :param process: the Popen object returned from start_bzr_subprocess.
 
2286
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2287
            None is supplied, the status code is not checked.
 
2288
        :param send_signal: an optional signal to send to the process.
 
2289
        :param universal_newlines: Convert CRLF => LF
 
2290
        :returns: (stdout, stderr)
 
2291
        """
 
2292
        if send_signal is not None:
 
2293
            os.kill(process.pid, send_signal)
 
2294
        out, err = process.communicate()
 
2295
 
 
2296
        if universal_newlines:
 
2297
            out = out.replace('\r\n', '\n')
 
2298
            err = err.replace('\r\n', '\n')
 
2299
 
 
2300
        if retcode is not None and retcode != process.returncode:
 
2301
            if process_args is None:
 
2302
                process_args = "(unknown args)"
 
2303
            trace.mutter('Output of bzr %s:\n%s', process_args, out)
 
2304
            trace.mutter('Error for bzr %s:\n%s', process_args, err)
 
2305
            self.fail('Command bzr %s failed with retcode %s != %s'
 
2306
                      % (process_args, retcode, process.returncode))
 
2307
        return [out, err]
 
2308
 
 
2309
    def check_tree_shape(self, tree, shape):
 
2310
        """Compare a tree to a list of expected names.
 
2311
 
 
2312
        Fail if they are not precisely equal.
 
2313
        """
 
2314
        extras = []
 
2315
        shape = list(shape)             # copy
 
2316
        for path, ie in tree.iter_entries_by_dir():
 
2317
            name = path.replace('\\', '/')
 
2318
            if ie.kind == 'directory':
 
2319
                name = name + '/'
 
2320
            if name == "/":
 
2321
                pass # ignore root entry
 
2322
            elif name in shape:
 
2323
                shape.remove(name)
 
2324
            else:
 
2325
                extras.append(name)
 
2326
        if shape:
 
2327
            self.fail("expected paths not found in inventory: %r" % shape)
 
2328
        if extras:
 
2329
            self.fail("unexpected paths found in inventory: %r" % extras)
 
2330
 
 
2331
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2332
                         a_callable=None, *args, **kwargs):
 
2333
        """Call callable with redirected std io pipes.
 
2334
 
 
2335
        Returns the return code."""
 
2336
        if not callable(a_callable):
 
2337
            raise ValueError("a_callable must be callable.")
 
2338
        if stdin is None:
 
2339
            stdin = StringIO("")
 
2340
        if stdout is None:
 
2341
            if getattr(self, "_log_file", None) is not None:
 
2342
                stdout = self._log_file
 
2343
            else:
 
2344
                stdout = StringIO()
 
2345
        if stderr is None:
 
2346
            if getattr(self, "_log_file", None is not None):
 
2347
                stderr = self._log_file
 
2348
            else:
 
2349
                stderr = StringIO()
 
2350
        real_stdin = sys.stdin
 
2351
        real_stdout = sys.stdout
 
2352
        real_stderr = sys.stderr
 
2353
        try:
 
2354
            sys.stdout = stdout
 
2355
            sys.stderr = stderr
 
2356
            sys.stdin = stdin
 
2357
            return a_callable(*args, **kwargs)
 
2358
        finally:
 
2359
            sys.stdout = real_stdout
 
2360
            sys.stderr = real_stderr
 
2361
            sys.stdin = real_stdin
 
2362
 
 
2363
    def reduceLockdirTimeout(self):
 
2364
        """Reduce the default lock timeout for the duration of the test, so that
 
2365
        if LockContention occurs during a test, it does so quickly.
 
2366
 
 
2367
        Tests that expect to provoke LockContention errors should call this.
 
2368
        """
 
2369
        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2370
 
 
2371
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2372
        """Return a StringIOWrapper instance, that will encode Unicode
 
2373
        input to UTF-8.
 
2374
        """
 
2375
        if encoding_type is None:
 
2376
            encoding_type = 'strict'
 
2377
        sio = StringIO()
 
2378
        output_encoding = 'utf-8'
 
2379
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
2380
        sio.encoding = output_encoding
 
2381
        return sio
 
2382
 
 
2383
    def disable_verb(self, verb):
 
2384
        """Disable a smart server verb for one test."""
 
2385
        from bzrlib.smart import request
 
2386
        request_handlers = request.request_handlers
 
2387
        orig_method = request_handlers.get(verb)
 
2388
        orig_info = request_handlers.get_info(verb)
 
2389
        request_handlers.remove(verb)
 
2390
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2391
            info=orig_info)
 
2392
 
 
2393
 
 
2394
class CapturedCall(object):
 
2395
    """A helper for capturing smart server calls for easy debug analysis."""
 
2396
 
 
2397
    def __init__(self, params, prefix_length):
 
2398
        """Capture the call with params and skip prefix_length stack frames."""
 
2399
        self.call = params
 
2400
        import traceback
 
2401
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2402
        # client frames. Beyond this we could get more clever, but this is good
 
2403
        # enough for now.
 
2404
        stack = traceback.extract_stack()[prefix_length:-5]
 
2405
        self.stack = ''.join(traceback.format_list(stack))
 
2406
 
 
2407
    def __str__(self):
 
2408
        return self.call.method
 
2409
 
 
2410
    def __repr__(self):
 
2411
        return self.call.method
 
2412
 
 
2413
    def stack(self):
 
2414
        return self.stack
 
2415
 
 
2416
 
 
2417
class TestCaseWithMemoryTransport(TestCase):
 
2418
    """Common test class for tests that do not need disk resources.
 
2419
 
 
2420
    Tests that need disk resources should derive from TestCaseInTempDir
 
2421
    orTestCaseWithTransport.
 
2422
 
 
2423
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2424
 
 
2425
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2426
    a directory which does not exist. This serves to help ensure test isolation
 
2427
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2428
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2429
    defaults for the transport in tests, nor does it obey the command line
 
2430
    override, so tests that accidentally write to the common directory should
 
2431
    be rare.
 
2432
 
 
2433
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2434
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2435
    """
 
2436
 
 
2437
    TEST_ROOT = None
 
2438
    _TEST_NAME = 'test'
 
2439
 
 
2440
    def __init__(self, methodName='runTest'):
 
2441
        # allow test parameterization after test construction and before test
 
2442
        # execution. Variables that the parameterizer sets need to be
 
2443
        # ones that are not set by setUp, or setUp will trash them.
 
2444
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2445
        self.vfs_transport_factory = default_transport
 
2446
        self.transport_server = None
 
2447
        self.transport_readonly_server = None
 
2448
        self.__vfs_server = None
 
2449
 
 
2450
    def get_transport(self, relpath=None):
 
2451
        """Return a writeable transport.
 
2452
 
 
2453
        This transport is for the test scratch space relative to
 
2454
        "self._test_root"
 
2455
 
 
2456
        :param relpath: a path relative to the base url.
 
2457
        """
 
2458
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2459
        self.assertFalse(t.is_readonly())
 
2460
        return t
 
2461
 
 
2462
    def get_readonly_transport(self, relpath=None):
 
2463
        """Return a readonly transport for the test scratch space
 
2464
 
 
2465
        This can be used to test that operations which should only need
 
2466
        readonly access in fact do not try to write.
 
2467
 
 
2468
        :param relpath: a path relative to the base url.
 
2469
        """
 
2470
        t = _mod_transport.get_transport_from_url(
 
2471
            self.get_readonly_url(relpath))
 
2472
        self.assertTrue(t.is_readonly())
 
2473
        return t
 
2474
 
 
2475
    def create_transport_readonly_server(self):
 
2476
        """Create a transport server from class defined at init.
 
2477
 
 
2478
        This is mostly a hook for daughter classes.
 
2479
        """
 
2480
        return self.transport_readonly_server()
 
2481
 
 
2482
    def get_readonly_server(self):
 
2483
        """Get the server instance for the readonly transport
 
2484
 
 
2485
        This is useful for some tests with specific servers to do diagnostics.
 
2486
        """
 
2487
        if self.__readonly_server is None:
 
2488
            if self.transport_readonly_server is None:
 
2489
                # readonly decorator requested
 
2490
                self.__readonly_server = test_server.ReadonlyServer()
 
2491
            else:
 
2492
                # explicit readonly transport.
 
2493
                self.__readonly_server = self.create_transport_readonly_server()
 
2494
            self.start_server(self.__readonly_server,
 
2495
                self.get_vfs_only_server())
 
2496
        return self.__readonly_server
 
2497
 
 
2498
    def get_readonly_url(self, relpath=None):
 
2499
        """Get a URL for the readonly transport.
 
2500
 
 
2501
        This will either be backed by '.' or a decorator to the transport
 
2502
        used by self.get_url()
 
2503
        relpath provides for clients to get a path relative to the base url.
 
2504
        These should only be downwards relative, not upwards.
 
2505
        """
 
2506
        base = self.get_readonly_server().get_url()
 
2507
        return self._adjust_url(base, relpath)
 
2508
 
 
2509
    def get_vfs_only_server(self):
 
2510
        """Get the vfs only read/write server instance.
 
2511
 
 
2512
        This is useful for some tests with specific servers that need
 
2513
        diagnostics.
 
2514
 
 
2515
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2516
        is no means to override it.
 
2517
        """
 
2518
        if self.__vfs_server is None:
 
2519
            self.__vfs_server = memory.MemoryServer()
 
2520
            self.start_server(self.__vfs_server)
 
2521
        return self.__vfs_server
 
2522
 
 
2523
    def get_server(self):
 
2524
        """Get the read/write server instance.
 
2525
 
 
2526
        This is useful for some tests with specific servers that need
 
2527
        diagnostics.
 
2528
 
 
2529
        This is built from the self.transport_server factory. If that is None,
 
2530
        then the self.get_vfs_server is returned.
 
2531
        """
 
2532
        if self.__server is None:
 
2533
            if (self.transport_server is None or self.transport_server is
 
2534
                self.vfs_transport_factory):
 
2535
                self.__server = self.get_vfs_only_server()
 
2536
            else:
 
2537
                # bring up a decorated means of access to the vfs only server.
 
2538
                self.__server = self.transport_server()
 
2539
                self.start_server(self.__server, self.get_vfs_only_server())
 
2540
        return self.__server
 
2541
 
 
2542
    def _adjust_url(self, base, relpath):
 
2543
        """Get a URL (or maybe a path) for the readwrite transport.
 
2544
 
 
2545
        This will either be backed by '.' or to an equivalent non-file based
 
2546
        facility.
 
2547
        relpath provides for clients to get a path relative to the base url.
 
2548
        These should only be downwards relative, not upwards.
 
2549
        """
 
2550
        if relpath is not None and relpath != '.':
 
2551
            if not base.endswith('/'):
 
2552
                base = base + '/'
 
2553
            # XXX: Really base should be a url; we did after all call
 
2554
            # get_url()!  But sometimes it's just a path (from
 
2555
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2556
            # to a non-escaped local path.
 
2557
            if base.startswith('./') or base.startswith('/'):
 
2558
                base += relpath
 
2559
            else:
 
2560
                base += urlutils.escape(relpath)
 
2561
        return base
 
2562
 
 
2563
    def get_url(self, relpath=None):
 
2564
        """Get a URL (or maybe a path) for the readwrite transport.
 
2565
 
 
2566
        This will either be backed by '.' or to an equivalent non-file based
 
2567
        facility.
 
2568
        relpath provides for clients to get a path relative to the base url.
 
2569
        These should only be downwards relative, not upwards.
 
2570
        """
 
2571
        base = self.get_server().get_url()
 
2572
        return self._adjust_url(base, relpath)
 
2573
 
 
2574
    def get_vfs_only_url(self, relpath=None):
 
2575
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2576
 
 
2577
        This will never be a smart protocol.  It always has all the
 
2578
        capabilities of the local filesystem, but it might actually be a
 
2579
        MemoryTransport or some other similar virtual filesystem.
 
2580
 
 
2581
        This is the backing transport (if any) of the server returned by
 
2582
        get_url and get_readonly_url.
 
2583
 
 
2584
        :param relpath: provides for clients to get a path relative to the base
 
2585
            url.  These should only be downwards relative, not upwards.
 
2586
        :return: A URL
 
2587
        """
 
2588
        base = self.get_vfs_only_server().get_url()
 
2589
        return self._adjust_url(base, relpath)
 
2590
 
 
2591
    def _create_safety_net(self):
 
2592
        """Make a fake bzr directory.
 
2593
 
 
2594
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2595
        real branch.
 
2596
        """
 
2597
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2598
        # Make sure we get a readable and accessible home for .bzr.log
 
2599
        # and/or config files, and not fallback to weird defaults (see
 
2600
        # http://pad.lv/825027).
 
2601
        self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2602
        os.environ['BZR_HOME'] = root
 
2603
        wt = controldir.ControlDir.create_standalone_workingtree(root)
 
2604
        del os.environ['BZR_HOME']
 
2605
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2606
        # we don't need to re-open the wt to check it hasn't changed.
 
2607
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2608
            wt.control_transport.get_bytes('dirstate'))
 
2609
 
 
2610
    def _check_safety_net(self):
 
2611
        """Check that the safety .bzr directory have not been touched.
 
2612
 
 
2613
        _make_test_root have created a .bzr directory to prevent tests from
 
2614
        propagating. This method ensures than a test did not leaked.
 
2615
        """
 
2616
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2617
        t = _mod_transport.get_transport_from_path(root)
 
2618
        self.permit_url(t.base)
 
2619
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2620
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2621
            # The current test have modified the /bzr directory, we need to
 
2622
            # recreate a new one or all the followng tests will fail.
 
2623
            # If you need to inspect its content uncomment the following line
 
2624
            # import pdb; pdb.set_trace()
 
2625
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2626
            self._create_safety_net()
 
2627
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2628
 
 
2629
    def _make_test_root(self):
 
2630
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2631
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2632
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2633
                                                    suffix='.tmp'))
 
2634
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2635
 
 
2636
            self._create_safety_net()
 
2637
 
 
2638
            # The same directory is used by all tests, and we're not
 
2639
            # specifically told when all tests are finished.  This will do.
 
2640
            atexit.register(_rmtree_temp_dir, root)
 
2641
 
 
2642
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2643
        self.addCleanup(self._check_safety_net)
 
2644
 
 
2645
    def makeAndChdirToTestDir(self):
 
2646
        """Create a temporary directories for this one test.
 
2647
 
 
2648
        This must set self.test_home_dir and self.test_dir and chdir to
 
2649
        self.test_dir.
 
2650
 
 
2651
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2652
        """
 
2653
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2654
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2655
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2656
        self.permit_dir(self.test_dir)
 
2657
 
 
2658
    def make_branch(self, relpath, format=None, name=None):
 
2659
        """Create a branch on the transport at relpath."""
 
2660
        repo = self.make_repository(relpath, format=format)
 
2661
        return repo.bzrdir.create_branch(append_revisions_only=False,
 
2662
                                         name=name)
 
2663
 
 
2664
    def get_default_format(self):
 
2665
        return 'default'
 
2666
 
 
2667
    def resolve_format(self, format):
 
2668
        """Resolve an object to a ControlDir format object.
 
2669
 
 
2670
        The initial format object can either already be
 
2671
        a ControlDirFormat, None (for the default format),
 
2672
        or a string with the name of the control dir format.
 
2673
 
 
2674
        :param format: Object to resolve
 
2675
        :return A ControlDirFormat instance
 
2676
        """
 
2677
        if format is None:
 
2678
            format = self.get_default_format()
 
2679
        if isinstance(format, basestring):
 
2680
            format = controldir.format_registry.make_bzrdir(format)
 
2681
        return format
 
2682
 
 
2683
    def make_bzrdir(self, relpath, format=None):
 
2684
        try:
 
2685
            # might be a relative or absolute path
 
2686
            maybe_a_url = self.get_url(relpath)
 
2687
            segments = maybe_a_url.rsplit('/', 1)
 
2688
            t = _mod_transport.get_transport(maybe_a_url)
 
2689
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2690
                t.ensure_base()
 
2691
            format = self.resolve_format(format)
 
2692
            return format.initialize_on_transport(t)
 
2693
        except errors.UninitializableFormat:
 
2694
            raise TestSkipped("Format %s is not initializable." % format)
 
2695
 
 
2696
    def make_repository(self, relpath, shared=None, format=None):
 
2697
        """Create a repository on our default transport at relpath.
 
2698
 
 
2699
        Note that relpath must be a relative path, not a full url.
 
2700
        """
 
2701
        # FIXME: If you create a remoterepository this returns the underlying
 
2702
        # real format, which is incorrect.  Actually we should make sure that
 
2703
        # RemoteBzrDir returns a RemoteRepository.
 
2704
        # maybe  mbp 20070410
 
2705
        made_control = self.make_bzrdir(relpath, format=format)
 
2706
        return made_control.create_repository(shared=shared)
 
2707
 
 
2708
    def make_smart_server(self, path, backing_server=None):
 
2709
        if backing_server is None:
 
2710
            backing_server = self.get_server()
 
2711
        smart_server = test_server.SmartTCPServer_for_testing()
 
2712
        self.start_server(smart_server, backing_server)
 
2713
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2714
                                                   ).clone(path)
 
2715
        return remote_transport
 
2716
 
 
2717
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2718
        """Create a branch on the default transport and a MemoryTree for it."""
 
2719
        b = self.make_branch(relpath, format=format)
 
2720
        return memorytree.MemoryTree.create_on_branch(b)
 
2721
 
 
2722
    def make_branch_builder(self, relpath, format=None):
 
2723
        branch = self.make_branch(relpath, format=format)
 
2724
        return branchbuilder.BranchBuilder(branch=branch)
 
2725
 
 
2726
    def overrideEnvironmentForTesting(self):
 
2727
        test_home_dir = self.test_home_dir
 
2728
        if isinstance(test_home_dir, unicode):
 
2729
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2730
        self.overrideEnv('HOME', test_home_dir)
 
2731
        self.overrideEnv('BZR_HOME', test_home_dir)
 
2732
 
 
2733
    def setUp(self):
 
2734
        super(TestCaseWithMemoryTransport, self).setUp()
 
2735
 
 
2736
        def _add_disconnect_cleanup(transport):
 
2737
            """Schedule disconnection of given transport at test cleanup
 
2738
 
 
2739
            This needs to happen for all connected transports or leaks occur.
 
2740
 
 
2741
            Note reconnections may mean we call disconnect multiple times per
 
2742
            transport which is suboptimal but seems harmless.
 
2743
            """
 
2744
            self.addCleanup(transport.disconnect)
 
2745
 
 
2746
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
 
2747
            _add_disconnect_cleanup, None)
 
2748
 
 
2749
        self._make_test_root()
 
2750
        self.addCleanup(os.chdir, os.getcwdu())
 
2751
        self.makeAndChdirToTestDir()
 
2752
        self.overrideEnvironmentForTesting()
 
2753
        self.__readonly_server = None
 
2754
        self.__server = None
 
2755
        self.reduceLockdirTimeout()
 
2756
 
 
2757
    def setup_smart_server_with_call_log(self):
 
2758
        """Sets up a smart server as the transport server with a call log."""
 
2759
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2760
        self.hpss_connections = []
 
2761
        self.hpss_calls = []
 
2762
        import traceback
 
2763
        # Skip the current stack down to the caller of
 
2764
        # setup_smart_server_with_call_log
 
2765
        prefix_length = len(traceback.extract_stack()) - 2
 
2766
        def capture_hpss_call(params):
 
2767
            self.hpss_calls.append(
 
2768
                CapturedCall(params, prefix_length))
 
2769
        def capture_connect(transport):
 
2770
            self.hpss_connections.append(transport)
 
2771
        client._SmartClient.hooks.install_named_hook(
 
2772
            'call', capture_hpss_call, None)
 
2773
        _mod_transport.Transport.hooks.install_named_hook(
 
2774
            'post_connect', capture_connect, None)
 
2775
 
 
2776
    def reset_smart_call_log(self):
 
2777
        self.hpss_calls = []
 
2778
        self.hpss_connections = []
 
2779
 
 
2780
 
 
2781
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2782
    """Derived class that runs a test within a temporary directory.
 
2783
 
 
2784
    This is useful for tests that need to create a branch, etc.
 
2785
 
 
2786
    The directory is created in a slightly complex way: for each
 
2787
    Python invocation, a new temporary top-level directory is created.
 
2788
    All test cases create their own directory within that.  If the
 
2789
    tests complete successfully, the directory is removed.
 
2790
 
 
2791
    :ivar test_base_dir: The path of the top-level directory for this
 
2792
    test, which contains a home directory and a work directory.
 
2793
 
 
2794
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2795
    which is used as $HOME for this test.
 
2796
 
 
2797
    :ivar test_dir: A directory under test_base_dir used as the current
 
2798
    directory when the test proper is run.
 
2799
    """
 
2800
 
 
2801
    OVERRIDE_PYTHON = 'python'
 
2802
 
 
2803
    def setUp(self):
 
2804
        super(TestCaseInTempDir, self).setUp()
 
2805
        # Remove the protection set in isolated_environ, we have a proper
 
2806
        # access to disk resources now.
 
2807
        self.overrideEnv('BZR_LOG', None)
 
2808
 
 
2809
    def check_file_contents(self, filename, expect):
 
2810
        self.log("check contents of file %s" % filename)
 
2811
        f = file(filename)
 
2812
        try:
 
2813
            contents = f.read()
 
2814
        finally:
 
2815
            f.close()
 
2816
        if contents != expect:
 
2817
            self.log("expected: %r" % expect)
 
2818
            self.log("actually: %r" % contents)
 
2819
            self.fail("contents of %s not as expected" % filename)
 
2820
 
 
2821
    def _getTestDirPrefix(self):
 
2822
        # create a directory within the top level test directory
 
2823
        if sys.platform in ('win32', 'cygwin'):
 
2824
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2825
            # windows is likely to have path-length limits so use a short name
 
2826
            name_prefix = name_prefix[-30:]
 
2827
        else:
 
2828
            name_prefix = re.sub('[/]', '_', self.id())
 
2829
        return name_prefix
 
2830
 
 
2831
    def makeAndChdirToTestDir(self):
 
2832
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2833
 
 
2834
        For TestCaseInTempDir we create a temporary directory based on the test
 
2835
        name and then create two subdirs - test and home under it.
 
2836
        """
 
2837
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2838
            self._getTestDirPrefix())
 
2839
        name = name_prefix
 
2840
        for i in range(100):
 
2841
            if os.path.exists(name):
 
2842
                name = name_prefix + '_' + str(i)
 
2843
            else:
 
2844
                # now create test and home directories within this dir
 
2845
                self.test_base_dir = name
 
2846
                self.addCleanup(self.deleteTestDir)
 
2847
                os.mkdir(self.test_base_dir)
 
2848
                break
 
2849
        self.permit_dir(self.test_base_dir)
 
2850
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2851
        # stacking policy to honour; create a bzr dir with an unshared
 
2852
        # repository (but not a branch - our code would be trying to escape
 
2853
        # then!) to stop them, and permit it to be read.
 
2854
        # control = controldir.ControlDir.create(self.test_base_dir)
 
2855
        # control.create_repository()
 
2856
        self.test_home_dir = self.test_base_dir + '/home'
 
2857
        os.mkdir(self.test_home_dir)
 
2858
        self.test_dir = self.test_base_dir + '/work'
 
2859
        os.mkdir(self.test_dir)
 
2860
        os.chdir(self.test_dir)
 
2861
        # put name of test inside
 
2862
        f = file(self.test_base_dir + '/name', 'w')
 
2863
        try:
 
2864
            f.write(self.id())
 
2865
        finally:
 
2866
            f.close()
 
2867
 
 
2868
    def deleteTestDir(self):
 
2869
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2870
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2871
 
 
2872
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2873
        """Build a test tree according to a pattern.
 
2874
 
 
2875
        shape is a sequence of file specifications.  If the final
 
2876
        character is '/', a directory is created.
 
2877
 
 
2878
        This assumes that all the elements in the tree being built are new.
 
2879
 
 
2880
        This doesn't add anything to a branch.
 
2881
 
 
2882
        :type shape:    list or tuple.
 
2883
        :param line_endings: Either 'binary' or 'native'
 
2884
            in binary mode, exact contents are written in native mode, the
 
2885
            line endings match the default platform endings.
 
2886
        :param transport: A transport to write to, for building trees on VFS's.
 
2887
            If the transport is readonly or None, "." is opened automatically.
 
2888
        :return: None
 
2889
        """
 
2890
        if type(shape) not in (list, tuple):
 
2891
            raise AssertionError("Parameter 'shape' should be "
 
2892
                "a list or a tuple. Got %r instead" % (shape,))
 
2893
        # It's OK to just create them using forward slashes on windows.
 
2894
        if transport is None or transport.is_readonly():
 
2895
            transport = _mod_transport.get_transport_from_path(".")
 
2896
        for name in shape:
 
2897
            self.assertIsInstance(name, basestring)
 
2898
            if name[-1] == '/':
 
2899
                transport.mkdir(urlutils.escape(name[:-1]))
 
2900
            else:
 
2901
                if line_endings == 'binary':
 
2902
                    end = '\n'
 
2903
                elif line_endings == 'native':
 
2904
                    end = os.linesep
 
2905
                else:
 
2906
                    raise errors.BzrError(
 
2907
                        'Invalid line ending request %r' % line_endings)
 
2908
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2909
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2910
 
 
2911
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2912
 
 
2913
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2914
        """Assert whether path or paths are in the WorkingTree"""
 
2915
        if tree is None:
 
2916
            tree = workingtree.WorkingTree.open(root_path)
 
2917
        if not isinstance(path, basestring):
 
2918
            for p in path:
 
2919
                self.assertInWorkingTree(p, tree=tree)
 
2920
        else:
 
2921
            self.assertIsNot(tree.path2id(path), None,
 
2922
                path+' not in working tree.')
 
2923
 
 
2924
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2925
        """Assert whether path or paths are not in the WorkingTree"""
 
2926
        if tree is None:
 
2927
            tree = workingtree.WorkingTree.open(root_path)
 
2928
        if not isinstance(path, basestring):
 
2929
            for p in path:
 
2930
                self.assertNotInWorkingTree(p,tree=tree)
 
2931
        else:
 
2932
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2933
 
 
2934
 
 
2935
class TestCaseWithTransport(TestCaseInTempDir):
 
2936
    """A test case that provides get_url and get_readonly_url facilities.
 
2937
 
 
2938
    These back onto two transport servers, one for readonly access and one for
 
2939
    read write access.
 
2940
 
 
2941
    If no explicit class is provided for readonly access, a
 
2942
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2943
    based read write transports.
 
2944
 
 
2945
    If an explicit class is provided for readonly access, that server and the
 
2946
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2947
    """
 
2948
 
 
2949
    def get_vfs_only_server(self):
 
2950
        """See TestCaseWithMemoryTransport.
 
2951
 
 
2952
        This is useful for some tests with specific servers that need
 
2953
        diagnostics.
 
2954
        """
 
2955
        if self.__vfs_server is None:
 
2956
            self.__vfs_server = self.vfs_transport_factory()
 
2957
            self.start_server(self.__vfs_server)
 
2958
        return self.__vfs_server
 
2959
 
 
2960
    def make_branch_and_tree(self, relpath, format=None):
 
2961
        """Create a branch on the transport and a tree locally.
 
2962
 
 
2963
        If the transport is not a LocalTransport, the Tree can't be created on
 
2964
        the transport.  In that case if the vfs_transport_factory is
 
2965
        LocalURLServer the working tree is created in the local
 
2966
        directory backing the transport, and the returned tree's branch and
 
2967
        repository will also be accessed locally. Otherwise a lightweight
 
2968
        checkout is created and returned.
 
2969
 
 
2970
        We do this because we can't physically create a tree in the local
 
2971
        path, with a branch reference to the transport_factory url, and
 
2972
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2973
        namespace is distinct from the local disk - the two branch objects
 
2974
        would collide. While we could construct a tree with its branch object
 
2975
        pointing at the transport_factory transport in memory, reopening it
 
2976
        would behaving unexpectedly, and has in the past caused testing bugs
 
2977
        when we tried to do it that way.
 
2978
 
 
2979
        :param format: The BzrDirFormat.
 
2980
        :returns: the WorkingTree.
 
2981
        """
 
2982
        # TODO: always use the local disk path for the working tree,
 
2983
        # this obviously requires a format that supports branch references
 
2984
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2985
        # RBC 20060208
 
2986
        format = self.resolve_format(format=format)
 
2987
        if not format.supports_workingtrees:
 
2988
            b = self.make_branch(relpath+'.branch', format=format)
 
2989
            return b.create_checkout(relpath, lightweight=True)
 
2990
        b = self.make_branch(relpath, format=format)
 
2991
        try:
 
2992
            return b.bzrdir.create_workingtree()
 
2993
        except errors.NotLocalUrl:
 
2994
            # We can only make working trees locally at the moment.  If the
 
2995
            # transport can't support them, then we keep the non-disk-backed
 
2996
            # branch and create a local checkout.
 
2997
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2998
                # the branch is colocated on disk, we cannot create a checkout.
 
2999
                # hopefully callers will expect this.
 
3000
                local_controldir = controldir.ControlDir.open(
 
3001
                    self.get_vfs_only_url(relpath))
 
3002
                wt = local_controldir.create_workingtree()
 
3003
                if wt.branch._format != b._format:
 
3004
                    wt._branch = b
 
3005
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
3006
                    # in case the implementation details of workingtree objects
 
3007
                    # change.
 
3008
                    self.assertIs(b, wt.branch)
 
3009
                return wt
 
3010
            else:
 
3011
                return b.create_checkout(relpath, lightweight=True)
 
3012
 
 
3013
    def assertIsDirectory(self, relpath, transport):
 
3014
        """Assert that relpath within transport is a directory.
 
3015
 
 
3016
        This may not be possible on all transports; in that case it propagates
 
3017
        a TransportNotPossible.
 
3018
        """
 
3019
        try:
 
3020
            mode = transport.stat(relpath).st_mode
 
3021
        except errors.NoSuchFile:
 
3022
            self.fail("path %s is not a directory; no such file"
 
3023
                      % (relpath))
 
3024
        if not stat.S_ISDIR(mode):
 
3025
            self.fail("path %s is not a directory; has mode %#o"
 
3026
                      % (relpath, mode))
 
3027
 
 
3028
    def assertTreesEqual(self, left, right):
 
3029
        """Check that left and right have the same content and properties."""
 
3030
        # we use a tree delta to check for equality of the content, and we
 
3031
        # manually check for equality of other things such as the parents list.
 
3032
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
3033
        differences = left.changes_from(right)
 
3034
        self.assertFalse(differences.has_changed(),
 
3035
            "Trees %r and %r are different: %r" % (left, right, differences))
 
3036
 
 
3037
    def setUp(self):
 
3038
        super(TestCaseWithTransport, self).setUp()
 
3039
        self.__vfs_server = None
 
3040
 
 
3041
    def disable_missing_extensions_warning(self):
 
3042
        """Some tests expect a precise stderr content.
 
3043
 
 
3044
        There is no point in forcing them to duplicate the extension related
 
3045
        warning.
 
3046
        """
 
3047
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
3048
 
 
3049
 
 
3050
class ChrootedTestCase(TestCaseWithTransport):
 
3051
    """A support class that provides readonly urls outside the local namespace.
 
3052
 
 
3053
    This is done by checking if self.transport_server is a MemoryServer. if it
 
3054
    is then we are chrooted already, if it is not then an HttpServer is used
 
3055
    for readonly urls.
 
3056
 
 
3057
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
3058
                       be used without needed to redo it when a different
 
3059
                       subclass is in use ?
 
3060
    """
 
3061
 
 
3062
    def setUp(self):
 
3063
        from bzrlib.tests import http_server
 
3064
        super(ChrootedTestCase, self).setUp()
 
3065
        if not self.vfs_transport_factory == memory.MemoryServer:
 
3066
            self.transport_readonly_server = http_server.HttpServer
 
3067
 
 
3068
 
 
3069
def condition_id_re(pattern):
 
3070
    """Create a condition filter which performs a re check on a test's id.
 
3071
 
 
3072
    :param pattern: A regular expression string.
 
3073
    :return: A callable that returns True if the re matches.
 
3074
    """
 
3075
    filter_re = re.compile(pattern, 0)
 
3076
    def condition(test):
 
3077
        test_id = test.id()
 
3078
        return filter_re.search(test_id)
 
3079
    return condition
 
3080
 
 
3081
 
 
3082
def condition_isinstance(klass_or_klass_list):
 
3083
    """Create a condition filter which returns isinstance(param, klass).
 
3084
 
 
3085
    :return: A callable which when called with one parameter obj return the
 
3086
        result of isinstance(obj, klass_or_klass_list).
 
3087
    """
 
3088
    def condition(obj):
 
3089
        return isinstance(obj, klass_or_klass_list)
 
3090
    return condition
 
3091
 
 
3092
 
 
3093
def condition_id_in_list(id_list):
 
3094
    """Create a condition filter which verify that test's id in a list.
 
3095
 
 
3096
    :param id_list: A TestIdList object.
 
3097
    :return: A callable that returns True if the test's id appears in the list.
 
3098
    """
 
3099
    def condition(test):
 
3100
        return id_list.includes(test.id())
 
3101
    return condition
 
3102
 
 
3103
 
 
3104
def condition_id_startswith(starts):
 
3105
    """Create a condition filter verifying that test's id starts with a string.
 
3106
 
 
3107
    :param starts: A list of string.
 
3108
    :return: A callable that returns True if the test's id starts with one of
 
3109
        the given strings.
 
3110
    """
 
3111
    def condition(test):
 
3112
        for start in starts:
 
3113
            if test.id().startswith(start):
 
3114
                return True
 
3115
        return False
 
3116
    return condition
 
3117
 
 
3118
 
 
3119
def exclude_tests_by_condition(suite, condition):
 
3120
    """Create a test suite which excludes some tests from suite.
 
3121
 
 
3122
    :param suite: The suite to get tests from.
 
3123
    :param condition: A callable whose result evaluates True when called with a
 
3124
        test case which should be excluded from the result.
 
3125
    :return: A suite which contains the tests found in suite that fail
 
3126
        condition.
 
3127
    """
 
3128
    result = []
 
3129
    for test in iter_suite_tests(suite):
 
3130
        if not condition(test):
 
3131
            result.append(test)
 
3132
    return TestUtil.TestSuite(result)
 
3133
 
 
3134
 
 
3135
def filter_suite_by_condition(suite, condition):
 
3136
    """Create a test suite by filtering another one.
 
3137
 
 
3138
    :param suite: The source suite.
 
3139
    :param condition: A callable whose result evaluates True when called with a
 
3140
        test case which should be included in the result.
 
3141
    :return: A suite which contains the tests found in suite that pass
 
3142
        condition.
 
3143
    """
 
3144
    result = []
 
3145
    for test in iter_suite_tests(suite):
 
3146
        if condition(test):
 
3147
            result.append(test)
 
3148
    return TestUtil.TestSuite(result)
 
3149
 
 
3150
 
 
3151
def filter_suite_by_re(suite, pattern):
 
3152
    """Create a test suite by filtering another one.
 
3153
 
 
3154
    :param suite:           the source suite
 
3155
    :param pattern:         pattern that names must match
 
3156
    :returns: the newly created suite
 
3157
    """
 
3158
    condition = condition_id_re(pattern)
 
3159
    result_suite = filter_suite_by_condition(suite, condition)
 
3160
    return result_suite
 
3161
 
 
3162
 
 
3163
def filter_suite_by_id_list(suite, test_id_list):
 
3164
    """Create a test suite by filtering another one.
 
3165
 
 
3166
    :param suite: The source suite.
 
3167
    :param test_id_list: A list of the test ids to keep as strings.
 
3168
    :returns: the newly created suite
 
3169
    """
 
3170
    condition = condition_id_in_list(test_id_list)
 
3171
    result_suite = filter_suite_by_condition(suite, condition)
 
3172
    return result_suite
 
3173
 
 
3174
 
 
3175
def filter_suite_by_id_startswith(suite, start):
 
3176
    """Create a test suite by filtering another one.
 
3177
 
 
3178
    :param suite: The source suite.
 
3179
    :param start: A list of string the test id must start with one of.
 
3180
    :returns: the newly created suite
 
3181
    """
 
3182
    condition = condition_id_startswith(start)
 
3183
    result_suite = filter_suite_by_condition(suite, condition)
 
3184
    return result_suite
 
3185
 
 
3186
 
 
3187
def exclude_tests_by_re(suite, pattern):
 
3188
    """Create a test suite which excludes some tests from suite.
 
3189
 
 
3190
    :param suite: The suite to get tests from.
 
3191
    :param pattern: A regular expression string. Test ids that match this
 
3192
        pattern will be excluded from the result.
 
3193
    :return: A TestSuite that contains all the tests from suite without the
 
3194
        tests that matched pattern. The order of tests is the same as it was in
 
3195
        suite.
 
3196
    """
 
3197
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
3198
 
 
3199
 
 
3200
def preserve_input(something):
 
3201
    """A helper for performing test suite transformation chains.
 
3202
 
 
3203
    :param something: Anything you want to preserve.
 
3204
    :return: Something.
 
3205
    """
 
3206
    return something
 
3207
 
 
3208
 
 
3209
def randomize_suite(suite):
 
3210
    """Return a new TestSuite with suite's tests in random order.
 
3211
 
 
3212
    The tests in the input suite are flattened into a single suite in order to
 
3213
    accomplish this. Any nested TestSuites are removed to provide global
 
3214
    randomness.
 
3215
    """
 
3216
    tests = list(iter_suite_tests(suite))
 
3217
    random.shuffle(tests)
 
3218
    return TestUtil.TestSuite(tests)
 
3219
 
 
3220
 
 
3221
def split_suite_by_condition(suite, condition):
 
3222
    """Split a test suite into two by a condition.
 
3223
 
 
3224
    :param suite: The suite to split.
 
3225
    :param condition: The condition to match on. Tests that match this
 
3226
        condition are returned in the first test suite, ones that do not match
 
3227
        are in the second suite.
 
3228
    :return: A tuple of two test suites, where the first contains tests from
 
3229
        suite matching the condition, and the second contains the remainder
 
3230
        from suite. The order within each output suite is the same as it was in
 
3231
        suite.
 
3232
    """
 
3233
    matched = []
 
3234
    did_not_match = []
 
3235
    for test in iter_suite_tests(suite):
 
3236
        if condition(test):
 
3237
            matched.append(test)
 
3238
        else:
 
3239
            did_not_match.append(test)
 
3240
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
3241
 
 
3242
 
 
3243
def split_suite_by_re(suite, pattern):
 
3244
    """Split a test suite into two by a regular expression.
 
3245
 
 
3246
    :param suite: The suite to split.
 
3247
    :param pattern: A regular expression string. Test ids that match this
 
3248
        pattern will be in the first test suite returned, and the others in the
 
3249
        second test suite returned.
 
3250
    :return: A tuple of two test suites, where the first contains tests from
 
3251
        suite matching pattern, and the second contains the remainder from
 
3252
        suite. The order within each output suite is the same as it was in
 
3253
        suite.
 
3254
    """
 
3255
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
3256
 
 
3257
 
 
3258
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
3259
              stop_on_failure=False,
 
3260
              transport=None, lsprof_timed=None, bench_history=None,
 
3261
              matching_tests_first=None,
 
3262
              list_only=False,
 
3263
              random_seed=None,
 
3264
              exclude_pattern=None,
 
3265
              strict=False,
 
3266
              runner_class=None,
 
3267
              suite_decorators=None,
 
3268
              stream=None,
 
3269
              result_decorators=None,
 
3270
              ):
 
3271
    """Run a test suite for bzr selftest.
 
3272
 
 
3273
    :param runner_class: The class of runner to use. Must support the
 
3274
        constructor arguments passed by run_suite which are more than standard
 
3275
        python uses.
 
3276
    :return: A boolean indicating success.
 
3277
    """
 
3278
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
3279
    if verbose:
 
3280
        verbosity = 2
 
3281
    else:
 
3282
        verbosity = 1
 
3283
    if runner_class is None:
 
3284
        runner_class = TextTestRunner
 
3285
    if stream is None:
 
3286
        stream = sys.stdout
 
3287
    runner = runner_class(stream=stream,
 
3288
                            descriptions=0,
 
3289
                            verbosity=verbosity,
 
3290
                            bench_history=bench_history,
 
3291
                            strict=strict,
 
3292
                            result_decorators=result_decorators,
 
3293
                            )
 
3294
    runner.stop_on_failure=stop_on_failure
 
3295
    if isinstance(suite, unittest.TestSuite):
 
3296
        # Empty out _tests list of passed suite and populate new TestSuite
 
3297
        suite._tests[:], suite = [], TestSuite(suite)
 
3298
    # built in decorator factories:
 
3299
    decorators = [
 
3300
        random_order(random_seed, runner),
 
3301
        exclude_tests(exclude_pattern),
 
3302
        ]
 
3303
    if matching_tests_first:
 
3304
        decorators.append(tests_first(pattern))
 
3305
    else:
 
3306
        decorators.append(filter_tests(pattern))
 
3307
    if suite_decorators:
 
3308
        decorators.extend(suite_decorators)
 
3309
    # tell the result object how many tests will be running: (except if
 
3310
    # --parallel=fork is being used. Robert said he will provide a better
 
3311
    # progress design later -- vila 20090817)
 
3312
    if fork_decorator not in decorators:
 
3313
        decorators.append(CountingDecorator)
 
3314
    for decorator in decorators:
 
3315
        suite = decorator(suite)
 
3316
    if list_only:
 
3317
        # Done after test suite decoration to allow randomisation etc
 
3318
        # to take effect, though that is of marginal benefit.
 
3319
        if verbosity >= 2:
 
3320
            stream.write("Listing tests only ...\n")
 
3321
        for t in iter_suite_tests(suite):
 
3322
            stream.write("%s\n" % (t.id()))
 
3323
        return True
 
3324
    result = runner.run(suite)
 
3325
    if strict:
 
3326
        return result.wasStrictlySuccessful()
 
3327
    else:
 
3328
        return result.wasSuccessful()
 
3329
 
 
3330
 
 
3331
# A registry where get() returns a suite decorator.
 
3332
parallel_registry = registry.Registry()
 
3333
 
 
3334
 
 
3335
def fork_decorator(suite):
 
3336
    if getattr(os, "fork", None) is None:
 
3337
        raise errors.BzrCommandError("platform does not support fork,"
 
3338
            " try --parallel=subprocess instead.")
 
3339
    concurrency = osutils.local_concurrency()
 
3340
    if concurrency == 1:
 
3341
        return suite
 
3342
    from testtools import ConcurrentTestSuite
 
3343
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3344
parallel_registry.register('fork', fork_decorator)
 
3345
 
 
3346
 
 
3347
def subprocess_decorator(suite):
 
3348
    concurrency = osutils.local_concurrency()
 
3349
    if concurrency == 1:
 
3350
        return suite
 
3351
    from testtools import ConcurrentTestSuite
 
3352
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3353
parallel_registry.register('subprocess', subprocess_decorator)
 
3354
 
 
3355
 
 
3356
def exclude_tests(exclude_pattern):
 
3357
    """Return a test suite decorator that excludes tests."""
 
3358
    if exclude_pattern is None:
 
3359
        return identity_decorator
 
3360
    def decorator(suite):
 
3361
        return ExcludeDecorator(suite, exclude_pattern)
 
3362
    return decorator
 
3363
 
 
3364
 
 
3365
def filter_tests(pattern):
 
3366
    if pattern == '.*':
 
3367
        return identity_decorator
 
3368
    def decorator(suite):
 
3369
        return FilterTestsDecorator(suite, pattern)
 
3370
    return decorator
 
3371
 
 
3372
 
 
3373
def random_order(random_seed, runner):
 
3374
    """Return a test suite decorator factory for randomising tests order.
 
3375
    
 
3376
    :param random_seed: now, a string which casts to a long, or a long.
 
3377
    :param runner: A test runner with a stream attribute to report on.
 
3378
    """
 
3379
    if random_seed is None:
 
3380
        return identity_decorator
 
3381
    def decorator(suite):
 
3382
        return RandomDecorator(suite, random_seed, runner.stream)
 
3383
    return decorator
 
3384
 
 
3385
 
 
3386
def tests_first(pattern):
 
3387
    if pattern == '.*':
 
3388
        return identity_decorator
 
3389
    def decorator(suite):
 
3390
        return TestFirstDecorator(suite, pattern)
 
3391
    return decorator
 
3392
 
 
3393
 
 
3394
def identity_decorator(suite):
 
3395
    """Return suite."""
 
3396
    return suite
 
3397
 
 
3398
 
 
3399
class TestDecorator(TestUtil.TestSuite):
 
3400
    """A decorator for TestCase/TestSuite objects.
 
3401
 
 
3402
    Contains rather than flattening suite passed on construction
 
3403
    """
 
3404
 
 
3405
    def __init__(self, suite=None):
 
3406
        super(TestDecorator, self).__init__()
 
3407
        if suite is not None:
 
3408
            self.addTest(suite)
 
3409
 
 
3410
    # Don't need subclass run method with suite emptying
 
3411
    run = unittest.TestSuite.run
 
3412
 
 
3413
 
 
3414
class CountingDecorator(TestDecorator):
 
3415
    """A decorator which calls result.progress(self.countTestCases)."""
 
3416
 
 
3417
    def run(self, result):
 
3418
        progress_method = getattr(result, 'progress', None)
 
3419
        if callable(progress_method):
 
3420
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3421
        return super(CountingDecorator, self).run(result)
 
3422
 
 
3423
 
 
3424
class ExcludeDecorator(TestDecorator):
 
3425
    """A decorator which excludes test matching an exclude pattern."""
 
3426
 
 
3427
    def __init__(self, suite, exclude_pattern):
 
3428
        super(ExcludeDecorator, self).__init__(
 
3429
            exclude_tests_by_re(suite, exclude_pattern))
 
3430
 
 
3431
 
 
3432
class FilterTestsDecorator(TestDecorator):
 
3433
    """A decorator which filters tests to those matching a pattern."""
 
3434
 
 
3435
    def __init__(self, suite, pattern):
 
3436
        super(FilterTestsDecorator, self).__init__(
 
3437
            filter_suite_by_re(suite, pattern))
 
3438
 
 
3439
 
 
3440
class RandomDecorator(TestDecorator):
 
3441
    """A decorator which randomises the order of its tests."""
 
3442
 
 
3443
    def __init__(self, suite, random_seed, stream):
 
3444
        random_seed = self.actual_seed(random_seed)
 
3445
        stream.write("Randomizing test order using seed %s\n\n" %
 
3446
            (random_seed,))
 
3447
        # Initialise the random number generator.
 
3448
        random.seed(random_seed)
 
3449
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3450
 
 
3451
    @staticmethod
 
3452
    def actual_seed(seed):
 
3453
        if seed == "now":
 
3454
            # We convert the seed to a long to make it reuseable across
 
3455
            # invocations (because the user can reenter it).
 
3456
            return long(time.time())
 
3457
        else:
 
3458
            # Convert the seed to a long if we can
 
3459
            try:
 
3460
                return long(seed)
 
3461
            except (TypeError, ValueError):
 
3462
                pass
 
3463
        return seed
 
3464
 
 
3465
 
 
3466
class TestFirstDecorator(TestDecorator):
 
3467
    """A decorator which moves named tests to the front."""
 
3468
 
 
3469
    def __init__(self, suite, pattern):
 
3470
        super(TestFirstDecorator, self).__init__()
 
3471
        self.addTests(split_suite_by_re(suite, pattern))
 
3472
 
 
3473
 
 
3474
def partition_tests(suite, count):
 
3475
    """Partition suite into count lists of tests."""
 
3476
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3477
    # splits up blocks of related tests that might run faster if they shared
 
3478
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3479
    # just one partition.  So the slowest partition shouldn't be much slower
 
3480
    # than the fastest.
 
3481
    partitions = [list() for i in range(count)]
 
3482
    tests = iter_suite_tests(suite)
 
3483
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
 
3484
        partition.append(test)
 
3485
    return partitions
 
3486
 
 
3487
 
 
3488
def workaround_zealous_crypto_random():
 
3489
    """Crypto.Random want to help us being secure, but we don't care here.
 
3490
 
 
3491
    This workaround some test failure related to the sftp server. Once paramiko
 
3492
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3493
    """
 
3494
    try:
 
3495
        from Crypto.Random import atfork
 
3496
        atfork()
 
3497
    except ImportError:
 
3498
        pass
 
3499
 
 
3500
 
 
3501
def fork_for_tests(suite):
 
3502
    """Take suite and start up one runner per CPU by forking()
 
3503
 
 
3504
    :return: An iterable of TestCase-like objects which can each have
 
3505
        run(result) called on them to feed tests to result.
 
3506
    """
 
3507
    concurrency = osutils.local_concurrency()
 
3508
    result = []
 
3509
    from subunit import ProtocolTestCase
 
3510
    from subunit.test_results import AutoTimingTestResultDecorator
 
3511
    class TestInOtherProcess(ProtocolTestCase):
 
3512
        # Should be in subunit, I think. RBC.
 
3513
        def __init__(self, stream, pid):
 
3514
            ProtocolTestCase.__init__(self, stream)
 
3515
            self.pid = pid
 
3516
 
 
3517
        def run(self, result):
 
3518
            try:
 
3519
                ProtocolTestCase.run(self, result)
 
3520
            finally:
 
3521
                pid, status = os.waitpid(self.pid, 0)
 
3522
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3523
            #                that something went wrong.
 
3524
 
 
3525
    test_blocks = partition_tests(suite, concurrency)
 
3526
    # Clear the tests from the original suite so it doesn't keep them alive
 
3527
    suite._tests[:] = []
 
3528
    for process_tests in test_blocks:
 
3529
        process_suite = TestUtil.TestSuite(process_tests)
 
3530
        # Also clear each split list so new suite has only reference
 
3531
        process_tests[:] = []
 
3532
        c2pread, c2pwrite = os.pipe()
 
3533
        pid = os.fork()
 
3534
        if pid == 0:
 
3535
            try:
 
3536
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3537
                workaround_zealous_crypto_random()
 
3538
                os.close(c2pread)
 
3539
                # Leave stderr and stdout open so we can see test noise
 
3540
                # Close stdin so that the child goes away if it decides to
 
3541
                # read from stdin (otherwise its a roulette to see what
 
3542
                # child actually gets keystrokes for pdb etc).
 
3543
                sys.stdin.close()
 
3544
                subunit_result = AutoTimingTestResultDecorator(
 
3545
                    SubUnitBzrProtocolClient(stream))
 
3546
                process_suite.run(subunit_result)
 
3547
            except:
 
3548
                # Try and report traceback on stream, but exit with error even
 
3549
                # if stream couldn't be created or something else goes wrong.
 
3550
                # The traceback is formatted to a string and written in one go
 
3551
                # to avoid interleaving lines from multiple failing children.
 
3552
                try:
 
3553
                    stream.write(traceback.format_exc())
 
3554
                finally:
 
3555
                    os._exit(1)
 
3556
            os._exit(0)
 
3557
        else:
 
3558
            os.close(c2pwrite)
 
3559
            stream = os.fdopen(c2pread, 'rb', 1)
 
3560
            test = TestInOtherProcess(stream, pid)
 
3561
            result.append(test)
 
3562
    return result
 
3563
 
 
3564
 
 
3565
def reinvoke_for_tests(suite):
 
3566
    """Take suite and start up one runner per CPU using subprocess().
 
3567
 
 
3568
    :return: An iterable of TestCase-like objects which can each have
 
3569
        run(result) called on them to feed tests to result.
 
3570
    """
 
3571
    concurrency = osutils.local_concurrency()
 
3572
    result = []
 
3573
    from subunit import ProtocolTestCase
 
3574
    class TestInSubprocess(ProtocolTestCase):
 
3575
        def __init__(self, process, name):
 
3576
            ProtocolTestCase.__init__(self, process.stdout)
 
3577
            self.process = process
 
3578
            self.process.stdin.close()
 
3579
            self.name = name
 
3580
 
 
3581
        def run(self, result):
 
3582
            try:
 
3583
                ProtocolTestCase.run(self, result)
 
3584
            finally:
 
3585
                self.process.wait()
 
3586
                os.unlink(self.name)
 
3587
            # print "pid %d finished" % finished_process
 
3588
    test_blocks = partition_tests(suite, concurrency)
 
3589
    for process_tests in test_blocks:
 
3590
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3591
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3592
        if not os.path.isfile(bzr_path):
 
3593
            # We are probably installed. Assume sys.argv is the right file
 
3594
            bzr_path = sys.argv[0]
 
3595
        bzr_path = [bzr_path]
 
3596
        if sys.platform == "win32":
 
3597
            # if we're on windows, we can't execute the bzr script directly
 
3598
            bzr_path = [sys.executable] + bzr_path
 
3599
        fd, test_list_file_name = tempfile.mkstemp()
 
3600
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3601
        for test in process_tests:
 
3602
            test_list_file.write(test.id() + '\n')
 
3603
        test_list_file.close()
 
3604
        try:
 
3605
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3606
                '--subunit']
 
3607
            if '--no-plugins' in sys.argv:
 
3608
                argv.append('--no-plugins')
 
3609
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3610
            # noise on stderr it can interrupt the subunit protocol.
 
3611
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3612
                                      stdout=subprocess.PIPE,
 
3613
                                      stderr=subprocess.PIPE,
 
3614
                                      bufsize=1)
 
3615
            test = TestInSubprocess(process, test_list_file_name)
 
3616
            result.append(test)
 
3617
        except:
 
3618
            os.unlink(test_list_file_name)
 
3619
            raise
 
3620
    return result
 
3621
 
 
3622
 
 
3623
class ProfileResult(testtools.ExtendedToOriginalDecorator):
 
3624
    """Generate profiling data for all activity between start and success.
 
3625
    
 
3626
    The profile data is appended to the test's _benchcalls attribute and can
 
3627
    be accessed by the forwarded-to TestResult.
 
3628
 
 
3629
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3630
    where our existing output support for lsprof is, and this class aims to
 
3631
    fit in with that: while it could be moved it's not necessary to accomplish
 
3632
    test profiling, nor would it be dramatically cleaner.
 
3633
    """
 
3634
 
 
3635
    def startTest(self, test):
 
3636
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3637
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3638
        # unavoidably fail.
 
3639
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
 
3640
        self.profiler.start()
 
3641
        testtools.ExtendedToOriginalDecorator.startTest(self, test)
 
3642
 
 
3643
    def addSuccess(self, test):
 
3644
        stats = self.profiler.stop()
 
3645
        try:
 
3646
            calls = test._benchcalls
 
3647
        except AttributeError:
 
3648
            test._benchcalls = []
 
3649
            calls = test._benchcalls
 
3650
        calls.append(((test.id(), "", ""), stats))
 
3651
        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
 
3652
 
 
3653
    def stopTest(self, test):
 
3654
        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
 
3655
        self.profiler = None
 
3656
 
 
3657
 
 
3658
# Controlled by "bzr selftest -E=..." option
 
3659
# Currently supported:
 
3660
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3661
#                           preserves any flags supplied at the command line.
 
3662
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3663
#                           rather than failing tests. And no longer raise
 
3664
#                           LockContention when fctnl locks are not being used
 
3665
#                           with proper exclusion rules.
 
3666
#   -Ethreads               Will display thread ident at creation/join time to
 
3667
#                           help track thread leaks
 
3668
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3669
#                           deallocated after being completed.
 
3670
#   -Econfig_stats          Will collect statistics using addDetail
 
3671
selftest_debug_flags = set()
 
3672
 
 
3673
 
 
3674
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3675
             transport=None,
 
3676
             test_suite_factory=None,
 
3677
             lsprof_timed=None,
 
3678
             bench_history=None,
 
3679
             matching_tests_first=None,
 
3680
             list_only=False,
 
3681
             random_seed=None,
 
3682
             exclude_pattern=None,
 
3683
             strict=False,
 
3684
             load_list=None,
 
3685
             debug_flags=None,
 
3686
             starting_with=None,
 
3687
             runner_class=None,
 
3688
             suite_decorators=None,
 
3689
             stream=None,
 
3690
             lsprof_tests=False,
 
3691
             ):
 
3692
    """Run the whole test suite under the enhanced runner"""
 
3693
    # XXX: Very ugly way to do this...
 
3694
    # Disable warning about old formats because we don't want it to disturb
 
3695
    # any blackbox tests.
 
3696
    from bzrlib import repository
 
3697
    repository._deprecation_warning_done = True
 
3698
 
 
3699
    global default_transport
 
3700
    if transport is None:
 
3701
        transport = default_transport
 
3702
    old_transport = default_transport
 
3703
    default_transport = transport
 
3704
    global selftest_debug_flags
 
3705
    old_debug_flags = selftest_debug_flags
 
3706
    if debug_flags is not None:
 
3707
        selftest_debug_flags = set(debug_flags)
 
3708
    try:
 
3709
        if load_list is None:
 
3710
            keep_only = None
 
3711
        else:
 
3712
            keep_only = load_test_id_list(load_list)
 
3713
        if starting_with:
 
3714
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3715
                             for start in starting_with]
 
3716
        if test_suite_factory is None:
 
3717
            # Reduce loading time by loading modules based on the starting_with
 
3718
            # patterns.
 
3719
            suite = test_suite(keep_only, starting_with)
 
3720
        else:
 
3721
            suite = test_suite_factory()
 
3722
        if starting_with:
 
3723
            # But always filter as requested.
 
3724
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3725
        result_decorators = []
 
3726
        if lsprof_tests:
 
3727
            result_decorators.append(ProfileResult)
 
3728
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3729
                     stop_on_failure=stop_on_failure,
 
3730
                     transport=transport,
 
3731
                     lsprof_timed=lsprof_timed,
 
3732
                     bench_history=bench_history,
 
3733
                     matching_tests_first=matching_tests_first,
 
3734
                     list_only=list_only,
 
3735
                     random_seed=random_seed,
 
3736
                     exclude_pattern=exclude_pattern,
 
3737
                     strict=strict,
 
3738
                     runner_class=runner_class,
 
3739
                     suite_decorators=suite_decorators,
 
3740
                     stream=stream,
 
3741
                     result_decorators=result_decorators,
 
3742
                     )
 
3743
    finally:
 
3744
        default_transport = old_transport
 
3745
        selftest_debug_flags = old_debug_flags
 
3746
 
 
3747
 
 
3748
def load_test_id_list(file_name):
 
3749
    """Load a test id list from a text file.
 
3750
 
 
3751
    The format is one test id by line.  No special care is taken to impose
 
3752
    strict rules, these test ids are used to filter the test suite so a test id
 
3753
    that do not match an existing test will do no harm. This allows user to add
 
3754
    comments, leave blank lines, etc.
 
3755
    """
 
3756
    test_list = []
 
3757
    try:
 
3758
        ftest = open(file_name, 'rt')
 
3759
    except IOError, e:
 
3760
        if e.errno != errno.ENOENT:
 
3761
            raise
 
3762
        else:
 
3763
            raise errors.NoSuchFile(file_name)
 
3764
 
 
3765
    for test_name in ftest.readlines():
 
3766
        test_list.append(test_name.strip())
 
3767
    ftest.close()
 
3768
    return test_list
 
3769
 
 
3770
 
 
3771
def suite_matches_id_list(test_suite, id_list):
 
3772
    """Warns about tests not appearing or appearing more than once.
 
3773
 
 
3774
    :param test_suite: A TestSuite object.
 
3775
    :param test_id_list: The list of test ids that should be found in
 
3776
         test_suite.
 
3777
 
 
3778
    :return: (absents, duplicates) absents is a list containing the test found
 
3779
        in id_list but not in test_suite, duplicates is a list containing the
 
3780
        test found multiple times in test_suite.
 
3781
 
 
3782
    When using a prefined test id list, it may occurs that some tests do not
 
3783
    exist anymore or that some tests use the same id. This function warns the
 
3784
    tester about potential problems in his workflow (test lists are volatile)
 
3785
    or in the test suite itself (using the same id for several tests does not
 
3786
    help to localize defects).
 
3787
    """
 
3788
    # Build a dict counting id occurrences
 
3789
    tests = dict()
 
3790
    for test in iter_suite_tests(test_suite):
 
3791
        id = test.id()
 
3792
        tests[id] = tests.get(id, 0) + 1
 
3793
 
 
3794
    not_found = []
 
3795
    duplicates = []
 
3796
    for id in id_list:
 
3797
        occurs = tests.get(id, 0)
 
3798
        if not occurs:
 
3799
            not_found.append(id)
 
3800
        elif occurs > 1:
 
3801
            duplicates.append(id)
 
3802
 
 
3803
    return not_found, duplicates
 
3804
 
 
3805
 
 
3806
class TestIdList(object):
 
3807
    """Test id list to filter a test suite.
 
3808
 
 
3809
    Relying on the assumption that test ids are built as:
 
3810
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3811
    notation, this class offers methods to :
 
3812
    - avoid building a test suite for modules not refered to in the test list,
 
3813
    - keep only the tests listed from the module test suite.
 
3814
    """
 
3815
 
 
3816
    def __init__(self, test_id_list):
 
3817
        # When a test suite needs to be filtered against us we compare test ids
 
3818
        # for equality, so a simple dict offers a quick and simple solution.
 
3819
        self.tests = dict().fromkeys(test_id_list, True)
 
3820
 
 
3821
        # While unittest.TestCase have ids like:
 
3822
        # <module>.<class>.<method>[(<param+)],
 
3823
        # doctest.DocTestCase can have ids like:
 
3824
        # <module>
 
3825
        # <module>.<class>
 
3826
        # <module>.<function>
 
3827
        # <module>.<class>.<method>
 
3828
 
 
3829
        # Since we can't predict a test class from its name only, we settle on
 
3830
        # a simple constraint: a test id always begins with its module name.
 
3831
 
 
3832
        modules = {}
 
3833
        for test_id in test_id_list:
 
3834
            parts = test_id.split('.')
 
3835
            mod_name = parts.pop(0)
 
3836
            modules[mod_name] = True
 
3837
            for part in parts:
 
3838
                mod_name += '.' + part
 
3839
                modules[mod_name] = True
 
3840
        self.modules = modules
 
3841
 
 
3842
    def refers_to(self, module_name):
 
3843
        """Is there tests for the module or one of its sub modules."""
 
3844
        return self.modules.has_key(module_name)
 
3845
 
 
3846
    def includes(self, test_id):
 
3847
        return self.tests.has_key(test_id)
 
3848
 
 
3849
 
 
3850
class TestPrefixAliasRegistry(registry.Registry):
 
3851
    """A registry for test prefix aliases.
 
3852
 
 
3853
    This helps implement shorcuts for the --starting-with selftest
 
3854
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3855
    warning will be emitted).
 
3856
    """
 
3857
 
 
3858
    def register(self, key, obj, help=None, info=None,
 
3859
                 override_existing=False):
 
3860
        """See Registry.register.
 
3861
 
 
3862
        Trying to override an existing alias causes a warning to be emitted,
 
3863
        not a fatal execption.
 
3864
        """
 
3865
        try:
 
3866
            super(TestPrefixAliasRegistry, self).register(
 
3867
                key, obj, help=help, info=info, override_existing=False)
 
3868
        except KeyError:
 
3869
            actual = self.get(key)
 
3870
            trace.note(
 
3871
                'Test prefix alias %s is already used for %s, ignoring %s'
 
3872
                % (key, actual, obj))
 
3873
 
 
3874
    def resolve_alias(self, id_start):
 
3875
        """Replace the alias by the prefix in the given string.
 
3876
 
 
3877
        Using an unknown prefix is an error to help catching typos.
 
3878
        """
 
3879
        parts = id_start.split('.')
 
3880
        try:
 
3881
            parts[0] = self.get(parts[0])
 
3882
        except KeyError:
 
3883
            raise errors.BzrCommandError(
 
3884
                '%s is not a known test prefix alias' % parts[0])
 
3885
        return '.'.join(parts)
 
3886
 
 
3887
 
 
3888
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3889
"""Registry of test prefix aliases."""
 
3890
 
 
3891
 
 
3892
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3893
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3894
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3895
 
 
3896
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3897
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3898
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3899
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3900
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3901
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3902
 
 
3903
 
 
3904
def _test_suite_testmod_names():
 
3905
    """Return the standard list of test module names to test."""
 
3906
    return [
 
3907
        'bzrlib.doc',
 
3908
        'bzrlib.tests.blackbox',
 
3909
        'bzrlib.tests.commands',
 
3910
        'bzrlib.tests.per_branch',
 
3911
        'bzrlib.tests.per_bzrdir',
 
3912
        'bzrlib.tests.per_controldir',
 
3913
        'bzrlib.tests.per_controldir_colo',
 
3914
        'bzrlib.tests.per_foreign_vcs',
 
3915
        'bzrlib.tests.per_interrepository',
 
3916
        'bzrlib.tests.per_intertree',
 
3917
        'bzrlib.tests.per_inventory',
 
3918
        'bzrlib.tests.per_interbranch',
 
3919
        'bzrlib.tests.per_lock',
 
3920
        'bzrlib.tests.per_merger',
 
3921
        'bzrlib.tests.per_transport',
 
3922
        'bzrlib.tests.per_tree',
 
3923
        'bzrlib.tests.per_pack_repository',
 
3924
        'bzrlib.tests.per_repository',
 
3925
        'bzrlib.tests.per_repository_chk',
 
3926
        'bzrlib.tests.per_repository_reference',
 
3927
        'bzrlib.tests.per_repository_vf',
 
3928
        'bzrlib.tests.per_uifactory',
 
3929
        'bzrlib.tests.per_versionedfile',
 
3930
        'bzrlib.tests.per_workingtree',
 
3931
        'bzrlib.tests.test__annotator',
 
3932
        'bzrlib.tests.test__bencode',
 
3933
        'bzrlib.tests.test__btree_serializer',
 
3934
        'bzrlib.tests.test__chk_map',
 
3935
        'bzrlib.tests.test__dirstate_helpers',
 
3936
        'bzrlib.tests.test__groupcompress',
 
3937
        'bzrlib.tests.test__known_graph',
 
3938
        'bzrlib.tests.test__rio',
 
3939
        'bzrlib.tests.test__simple_set',
 
3940
        'bzrlib.tests.test__static_tuple',
 
3941
        'bzrlib.tests.test__walkdirs_win32',
 
3942
        'bzrlib.tests.test_ancestry',
 
3943
        'bzrlib.tests.test_annotate',
 
3944
        'bzrlib.tests.test_api',
 
3945
        'bzrlib.tests.test_atomicfile',
 
3946
        'bzrlib.tests.test_bad_files',
 
3947
        'bzrlib.tests.test_bisect_multi',
 
3948
        'bzrlib.tests.test_branch',
 
3949
        'bzrlib.tests.test_branchbuilder',
 
3950
        'bzrlib.tests.test_btree_index',
 
3951
        'bzrlib.tests.test_bugtracker',
 
3952
        'bzrlib.tests.test_bundle',
 
3953
        'bzrlib.tests.test_bzrdir',
 
3954
        'bzrlib.tests.test__chunks_to_lines',
 
3955
        'bzrlib.tests.test_cache_utf8',
 
3956
        'bzrlib.tests.test_chk_map',
 
3957
        'bzrlib.tests.test_chk_serializer',
 
3958
        'bzrlib.tests.test_chunk_writer',
 
3959
        'bzrlib.tests.test_clean_tree',
 
3960
        'bzrlib.tests.test_cleanup',
 
3961
        'bzrlib.tests.test_cmdline',
 
3962
        'bzrlib.tests.test_commands',
 
3963
        'bzrlib.tests.test_commit',
 
3964
        'bzrlib.tests.test_commit_merge',
 
3965
        'bzrlib.tests.test_config',
 
3966
        'bzrlib.tests.test_conflicts',
 
3967
        'bzrlib.tests.test_controldir',
 
3968
        'bzrlib.tests.test_counted_lock',
 
3969
        'bzrlib.tests.test_crash',
 
3970
        'bzrlib.tests.test_decorators',
 
3971
        'bzrlib.tests.test_delta',
 
3972
        'bzrlib.tests.test_debug',
 
3973
        'bzrlib.tests.test_diff',
 
3974
        'bzrlib.tests.test_directory_service',
 
3975
        'bzrlib.tests.test_dirstate',
 
3976
        'bzrlib.tests.test_email_message',
 
3977
        'bzrlib.tests.test_eol_filters',
 
3978
        'bzrlib.tests.test_errors',
 
3979
        'bzrlib.tests.test_estimate_compressed_size',
 
3980
        'bzrlib.tests.test_export',
 
3981
        'bzrlib.tests.test_export_pot',
 
3982
        'bzrlib.tests.test_extract',
 
3983
        'bzrlib.tests.test_features',
 
3984
        'bzrlib.tests.test_fetch',
 
3985
        'bzrlib.tests.test_fixtures',
 
3986
        'bzrlib.tests.test_fifo_cache',
 
3987
        'bzrlib.tests.test_filters',
 
3988
        'bzrlib.tests.test_filter_tree',
 
3989
        'bzrlib.tests.test_ftp_transport',
 
3990
        'bzrlib.tests.test_foreign',
 
3991
        'bzrlib.tests.test_generate_docs',
 
3992
        'bzrlib.tests.test_generate_ids',
 
3993
        'bzrlib.tests.test_globbing',
 
3994
        'bzrlib.tests.test_gpg',
 
3995
        'bzrlib.tests.test_graph',
 
3996
        'bzrlib.tests.test_groupcompress',
 
3997
        'bzrlib.tests.test_hashcache',
 
3998
        'bzrlib.tests.test_help',
 
3999
        'bzrlib.tests.test_hooks',
 
4000
        'bzrlib.tests.test_http',
 
4001
        'bzrlib.tests.test_http_response',
 
4002
        'bzrlib.tests.test_https_ca_bundle',
 
4003
        'bzrlib.tests.test_https_urllib',
 
4004
        'bzrlib.tests.test_i18n',
 
4005
        'bzrlib.tests.test_identitymap',
 
4006
        'bzrlib.tests.test_ignores',
 
4007
        'bzrlib.tests.test_index',
 
4008
        'bzrlib.tests.test_import_tariff',
 
4009
        'bzrlib.tests.test_info',
 
4010
        'bzrlib.tests.test_inv',
 
4011
        'bzrlib.tests.test_inventory_delta',
 
4012
        'bzrlib.tests.test_knit',
 
4013
        'bzrlib.tests.test_lazy_import',
 
4014
        'bzrlib.tests.test_lazy_regex',
 
4015
        'bzrlib.tests.test_library_state',
 
4016
        'bzrlib.tests.test_lock',
 
4017
        'bzrlib.tests.test_lockable_files',
 
4018
        'bzrlib.tests.test_lockdir',
 
4019
        'bzrlib.tests.test_log',
 
4020
        'bzrlib.tests.test_lru_cache',
 
4021
        'bzrlib.tests.test_lsprof',
 
4022
        'bzrlib.tests.test_mail_client',
 
4023
        'bzrlib.tests.test_matchers',
 
4024
        'bzrlib.tests.test_memorytree',
 
4025
        'bzrlib.tests.test_merge',
 
4026
        'bzrlib.tests.test_merge3',
 
4027
        'bzrlib.tests.test_merge_core',
 
4028
        'bzrlib.tests.test_merge_directive',
 
4029
        'bzrlib.tests.test_mergetools',
 
4030
        'bzrlib.tests.test_missing',
 
4031
        'bzrlib.tests.test_msgeditor',
 
4032
        'bzrlib.tests.test_multiparent',
 
4033
        'bzrlib.tests.test_mutabletree',
 
4034
        'bzrlib.tests.test_nonascii',
 
4035
        'bzrlib.tests.test_options',
 
4036
        'bzrlib.tests.test_osutils',
 
4037
        'bzrlib.tests.test_osutils_encodings',
 
4038
        'bzrlib.tests.test_pack',
 
4039
        'bzrlib.tests.test_patch',
 
4040
        'bzrlib.tests.test_patches',
 
4041
        'bzrlib.tests.test_permissions',
 
4042
        'bzrlib.tests.test_plugins',
 
4043
        'bzrlib.tests.test_progress',
 
4044
        'bzrlib.tests.test_pyutils',
 
4045
        'bzrlib.tests.test_read_bundle',
 
4046
        'bzrlib.tests.test_reconcile',
 
4047
        'bzrlib.tests.test_reconfigure',
 
4048
        'bzrlib.tests.test_registry',
 
4049
        'bzrlib.tests.test_remote',
 
4050
        'bzrlib.tests.test_rename_map',
 
4051
        'bzrlib.tests.test_repository',
 
4052
        'bzrlib.tests.test_revert',
 
4053
        'bzrlib.tests.test_revision',
 
4054
        'bzrlib.tests.test_revisionspec',
 
4055
        'bzrlib.tests.test_revisiontree',
 
4056
        'bzrlib.tests.test_rio',
 
4057
        'bzrlib.tests.test_rules',
 
4058
        'bzrlib.tests.test_url_policy_open',
 
4059
        'bzrlib.tests.test_sampler',
 
4060
        'bzrlib.tests.test_scenarios',
 
4061
        'bzrlib.tests.test_script',
 
4062
        'bzrlib.tests.test_selftest',
 
4063
        'bzrlib.tests.test_serializer',
 
4064
        'bzrlib.tests.test_setup',
 
4065
        'bzrlib.tests.test_sftp_transport',
 
4066
        'bzrlib.tests.test_shelf',
 
4067
        'bzrlib.tests.test_shelf_ui',
 
4068
        'bzrlib.tests.test_smart',
 
4069
        'bzrlib.tests.test_smart_add',
 
4070
        'bzrlib.tests.test_smart_request',
 
4071
        'bzrlib.tests.test_smart_signals',
 
4072
        'bzrlib.tests.test_smart_transport',
 
4073
        'bzrlib.tests.test_smtp_connection',
 
4074
        'bzrlib.tests.test_source',
 
4075
        'bzrlib.tests.test_ssh_transport',
 
4076
        'bzrlib.tests.test_status',
 
4077
        'bzrlib.tests.test_store',
 
4078
        'bzrlib.tests.test_strace',
 
4079
        'bzrlib.tests.test_subsume',
 
4080
        'bzrlib.tests.test_switch',
 
4081
        'bzrlib.tests.test_symbol_versioning',
 
4082
        'bzrlib.tests.test_tag',
 
4083
        'bzrlib.tests.test_test_server',
 
4084
        'bzrlib.tests.test_testament',
 
4085
        'bzrlib.tests.test_textfile',
 
4086
        'bzrlib.tests.test_textmerge',
 
4087
        'bzrlib.tests.test_cethread',
 
4088
        'bzrlib.tests.test_timestamp',
 
4089
        'bzrlib.tests.test_trace',
 
4090
        'bzrlib.tests.test_transactions',
 
4091
        'bzrlib.tests.test_transform',
 
4092
        'bzrlib.tests.test_transport',
 
4093
        'bzrlib.tests.test_transport_log',
 
4094
        'bzrlib.tests.test_tree',
 
4095
        'bzrlib.tests.test_treebuilder',
 
4096
        'bzrlib.tests.test_treeshape',
 
4097
        'bzrlib.tests.test_tsort',
 
4098
        'bzrlib.tests.test_tuned_gzip',
 
4099
        'bzrlib.tests.test_ui',
 
4100
        'bzrlib.tests.test_uncommit',
 
4101
        'bzrlib.tests.test_upgrade',
 
4102
        'bzrlib.tests.test_upgrade_stacked',
 
4103
        'bzrlib.tests.test_urlutils',
 
4104
        'bzrlib.tests.test_utextwrap',
 
4105
        'bzrlib.tests.test_version',
 
4106
        'bzrlib.tests.test_version_info',
 
4107
        'bzrlib.tests.test_versionedfile',
 
4108
        'bzrlib.tests.test_vf_search',
 
4109
        'bzrlib.tests.test_weave',
 
4110
        'bzrlib.tests.test_whitebox',
 
4111
        'bzrlib.tests.test_win32utils',
 
4112
        'bzrlib.tests.test_workingtree',
 
4113
        'bzrlib.tests.test_workingtree_4',
 
4114
        'bzrlib.tests.test_wsgi',
 
4115
        'bzrlib.tests.test_xml',
 
4116
        ]
 
4117
 
 
4118
 
 
4119
def _test_suite_modules_to_doctest():
 
4120
    """Return the list of modules to doctest."""
 
4121
    if __doc__ is None:
 
4122
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
4123
        return []
 
4124
    return [
 
4125
        'bzrlib',
 
4126
        'bzrlib.branchbuilder',
 
4127
        'bzrlib.decorators',
 
4128
        'bzrlib.inventory',
 
4129
        'bzrlib.iterablefile',
 
4130
        'bzrlib.lockdir',
 
4131
        'bzrlib.merge3',
 
4132
        'bzrlib.option',
 
4133
        'bzrlib.pyutils',
 
4134
        'bzrlib.symbol_versioning',
 
4135
        'bzrlib.tests',
 
4136
        'bzrlib.tests.fixtures',
 
4137
        'bzrlib.timestamp',
 
4138
        'bzrlib.transport.http',
 
4139
        'bzrlib.version_info_formats.format_custom',
 
4140
        ]
 
4141
 
 
4142
 
 
4143
def test_suite(keep_only=None, starting_with=None):
 
4144
    """Build and return TestSuite for the whole of bzrlib.
 
4145
 
 
4146
    :param keep_only: A list of test ids limiting the suite returned.
 
4147
 
 
4148
    :param starting_with: An id limiting the suite returned to the tests
 
4149
         starting with it.
 
4150
 
 
4151
    This function can be replaced if you need to change the default test
 
4152
    suite on a global basis, but it is not encouraged.
 
4153
    """
 
4154
 
 
4155
    loader = TestUtil.TestLoader()
 
4156
 
 
4157
    if keep_only is not None:
 
4158
        id_filter = TestIdList(keep_only)
 
4159
    if starting_with:
 
4160
        # We take precedence over keep_only because *at loading time* using
 
4161
        # both options means we will load less tests for the same final result.
 
4162
        def interesting_module(name):
 
4163
            for start in starting_with:
 
4164
                if (
 
4165
                    # Either the module name starts with the specified string
 
4166
                    name.startswith(start)
 
4167
                    # or it may contain tests starting with the specified string
 
4168
                    or start.startswith(name)
 
4169
                    ):
 
4170
                    return True
 
4171
            return False
 
4172
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
4173
 
 
4174
    elif keep_only is not None:
 
4175
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
4176
        def interesting_module(name):
 
4177
            return id_filter.refers_to(name)
 
4178
 
 
4179
    else:
 
4180
        loader = TestUtil.TestLoader()
 
4181
        def interesting_module(name):
 
4182
            # No filtering, all modules are interesting
 
4183
            return True
 
4184
 
 
4185
    suite = loader.suiteClass()
 
4186
 
 
4187
    # modules building their suite with loadTestsFromModuleNames
 
4188
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
4189
 
 
4190
    for mod in _test_suite_modules_to_doctest():
 
4191
        if not interesting_module(mod):
 
4192
            # No tests to keep here, move along
 
4193
            continue
 
4194
        try:
 
4195
            # note that this really does mean "report only" -- doctest
 
4196
            # still runs the rest of the examples
 
4197
            doc_suite = IsolatedDocTestSuite(
 
4198
                mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
4199
        except ValueError, e:
 
4200
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
4201
            raise
 
4202
        if len(doc_suite._tests) == 0:
 
4203
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
4204
        suite.addTest(doc_suite)
 
4205
 
 
4206
    default_encoding = sys.getdefaultencoding()
 
4207
    for name, plugin in _mod_plugin.plugins().items():
 
4208
        if not interesting_module(plugin.module.__name__):
 
4209
            continue
 
4210
        plugin_suite = plugin.test_suite()
 
4211
        # We used to catch ImportError here and turn it into just a warning,
 
4212
        # but really if you don't have --no-plugins this should be a failure.
 
4213
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
4214
        if plugin_suite is None:
 
4215
            plugin_suite = plugin.load_plugin_tests(loader)
 
4216
        if plugin_suite is not None:
 
4217
            suite.addTest(plugin_suite)
 
4218
        if default_encoding != sys.getdefaultencoding():
 
4219
            trace.warning(
 
4220
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
4221
                sys.getdefaultencoding())
 
4222
            reload(sys)
 
4223
            sys.setdefaultencoding(default_encoding)
 
4224
 
 
4225
    if keep_only is not None:
 
4226
        # Now that the referred modules have loaded their tests, keep only the
 
4227
        # requested ones.
 
4228
        suite = filter_suite_by_id_list(suite, id_filter)
 
4229
        # Do some sanity checks on the id_list filtering
 
4230
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
4231
        if starting_with:
 
4232
            # The tester has used both keep_only and starting_with, so he is
 
4233
            # already aware that some tests are excluded from the list, there
 
4234
            # is no need to tell him which.
 
4235
            pass
 
4236
        else:
 
4237
            # Some tests mentioned in the list are not in the test suite. The
 
4238
            # list may be out of date, report to the tester.
 
4239
            for id in not_found:
 
4240
                trace.warning('"%s" not found in the test suite', id)
 
4241
        for id in duplicates:
 
4242
            trace.warning('"%s" is used as an id by several tests', id)
 
4243
 
 
4244
    return suite
 
4245
 
 
4246
 
 
4247
def multiply_scenarios(*scenarios):
 
4248
    """Multiply two or more iterables of scenarios.
 
4249
 
 
4250
    It is safe to pass scenario generators or iterators.
 
4251
 
 
4252
    :returns: A list of compound scenarios: the cross-product of all 
 
4253
        scenarios, with the names concatenated and the parameters
 
4254
        merged together.
 
4255
    """
 
4256
    return reduce(_multiply_two_scenarios, map(list, scenarios))
 
4257
 
 
4258
 
 
4259
def _multiply_two_scenarios(scenarios_left, scenarios_right):
 
4260
    """Multiply two sets of scenarios.
 
4261
 
 
4262
    :returns: the cartesian product of the two sets of scenarios, that is
 
4263
        a scenario for every possible combination of a left scenario and a
 
4264
        right scenario.
 
4265
    """
 
4266
    return [
 
4267
        ('%s,%s' % (left_name, right_name),
 
4268
         dict(left_dict.items() + right_dict.items()))
 
4269
        for left_name, left_dict in scenarios_left
 
4270
        for right_name, right_dict in scenarios_right]
 
4271
 
 
4272
 
 
4273
def multiply_tests(tests, scenarios, result):
 
4274
    """Multiply tests_list by scenarios into result.
 
4275
 
 
4276
    This is the core workhorse for test parameterisation.
 
4277
 
 
4278
    Typically the load_tests() method for a per-implementation test suite will
 
4279
    call multiply_tests and return the result.
 
4280
 
 
4281
    :param tests: The tests to parameterise.
 
4282
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4283
        scenario_param_dict).
 
4284
    :param result: A TestSuite to add created tests to.
 
4285
 
 
4286
    This returns the passed in result TestSuite with the cross product of all
 
4287
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4288
    the scenario name at the end of its id(), and updating the test object's
 
4289
    __dict__ with the scenario_param_dict.
 
4290
 
 
4291
    >>> import bzrlib.tests.test_sampler
 
4292
    >>> r = multiply_tests(
 
4293
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
4294
    ...     [('one', dict(param=1)),
 
4295
    ...      ('two', dict(param=2))],
 
4296
    ...     TestUtil.TestSuite())
 
4297
    >>> tests = list(iter_suite_tests(r))
 
4298
    >>> len(tests)
 
4299
    2
 
4300
    >>> tests[0].id()
 
4301
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
4302
    >>> tests[0].param
 
4303
    1
 
4304
    >>> tests[1].param
 
4305
    2
 
4306
    """
 
4307
    for test in iter_suite_tests(tests):
 
4308
        apply_scenarios(test, scenarios, result)
 
4309
    return result
 
4310
 
 
4311
 
 
4312
def apply_scenarios(test, scenarios, result):
 
4313
    """Apply the scenarios in scenarios to test and add to result.
 
4314
 
 
4315
    :param test: The test to apply scenarios to.
 
4316
    :param scenarios: An iterable of scenarios to apply to test.
 
4317
    :return: result
 
4318
    :seealso: apply_scenario
 
4319
    """
 
4320
    for scenario in scenarios:
 
4321
        result.addTest(apply_scenario(test, scenario))
 
4322
    return result
 
4323
 
 
4324
 
 
4325
def apply_scenario(test, scenario):
 
4326
    """Copy test and apply scenario to it.
 
4327
 
 
4328
    :param test: A test to adapt.
 
4329
    :param scenario: A tuple describing the scenarion.
 
4330
        The first element of the tuple is the new test id.
 
4331
        The second element is a dict containing attributes to set on the
 
4332
        test.
 
4333
    :return: The adapted test.
 
4334
    """
 
4335
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4336
    new_test = clone_test(test, new_id)
 
4337
    for name, value in scenario[1].items():
 
4338
        setattr(new_test, name, value)
 
4339
    return new_test
 
4340
 
 
4341
 
 
4342
def clone_test(test, new_id):
 
4343
    """Clone a test giving it a new id.
 
4344
 
 
4345
    :param test: The test to clone.
 
4346
    :param new_id: The id to assign to it.
 
4347
    :return: The new test.
 
4348
    """
 
4349
    new_test = copy.copy(test)
 
4350
    new_test.id = lambda: new_id
 
4351
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4352
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4353
    # read the test output for parameterized tests, because tracebacks will be
 
4354
    # associated with irrelevant tests.
 
4355
    try:
 
4356
        details = new_test._TestCase__details
 
4357
    except AttributeError:
 
4358
        # must be a different version of testtools than expected.  Do nothing.
 
4359
        pass
 
4360
    else:
 
4361
        # Reset the '__details' dict.
 
4362
        new_test._TestCase__details = {}
 
4363
    return new_test
 
4364
 
 
4365
 
 
4366
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4367
                                ext_module_name):
 
4368
    """Helper for permutating tests against an extension module.
 
4369
 
 
4370
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4371
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4372
    against both implementations. Setting 'test.module' to the appropriate
 
4373
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
4374
 
 
4375
    :param standard_tests: A test suite to permute
 
4376
    :param loader: A TestLoader
 
4377
    :param py_module_name: The python path to a python module that can always
 
4378
        be loaded, and will be considered the 'python' implementation. (eg
 
4379
        'bzrlib._chk_map_py')
 
4380
    :param ext_module_name: The python path to an extension module. If the
 
4381
        module cannot be loaded, a single test will be added, which notes that
 
4382
        the module is not available. If it can be loaded, all standard_tests
 
4383
        will be run against that module.
 
4384
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4385
        tests. feature is the Feature object that can be used to determine if
 
4386
        the module is available.
 
4387
    """
 
4388
 
 
4389
    from bzrlib.tests.features import ModuleAvailableFeature
 
4390
    py_module = pyutils.get_named_object(py_module_name)
 
4391
    scenarios = [
 
4392
        ('python', {'module': py_module}),
 
4393
    ]
 
4394
    suite = loader.suiteClass()
 
4395
    feature = ModuleAvailableFeature(ext_module_name)
 
4396
    if feature.available():
 
4397
        scenarios.append(('C', {'module': feature.module}))
 
4398
    else:
 
4399
        # the compiled module isn't available, so we add a failing test
 
4400
        class FailWithoutFeature(TestCase):
 
4401
            def test_fail(self):
 
4402
                self.requireFeature(feature)
 
4403
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4404
    result = multiply_tests(standard_tests, scenarios, suite)
 
4405
    return result, feature
 
4406
 
 
4407
 
 
4408
def _rmtree_temp_dir(dirname, test_id=None):
 
4409
    # If LANG=C we probably have created some bogus paths
 
4410
    # which rmtree(unicode) will fail to delete
 
4411
    # so make sure we are using rmtree(str) to delete everything
 
4412
    # except on win32, where rmtree(str) will fail
 
4413
    # since it doesn't have the property of byte-stream paths
 
4414
    # (they are either ascii or mbcs)
 
4415
    if sys.platform == 'win32':
 
4416
        # make sure we are using the unicode win32 api
 
4417
        dirname = unicode(dirname)
 
4418
    else:
 
4419
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4420
    try:
 
4421
        osutils.rmtree(dirname)
 
4422
    except OSError, e:
 
4423
        # We don't want to fail here because some useful display will be lost
 
4424
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4425
        # possible info to the test runner is even worse.
 
4426
        if test_id != None:
 
4427
            ui.ui_factory.clear_term()
 
4428
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4429
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4430
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4431
                                    ).encode('ascii', 'replace')
 
4432
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4433
                         % (os.path.basename(dirname), printable_e))
 
4434
 
 
4435
 
 
4436
def probe_unicode_in_user_encoding():
 
4437
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4438
    Return first successfull match.
 
4439
 
 
4440
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4441
    """
 
4442
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4443
    for uni_val in possible_vals:
 
4444
        try:
 
4445
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4446
        except UnicodeEncodeError:
 
4447
            # Try a different character
 
4448
            pass
 
4449
        else:
 
4450
            return uni_val, str_val
 
4451
    return None, None
 
4452
 
 
4453
 
 
4454
def probe_bad_non_ascii(encoding):
 
4455
    """Try to find [bad] character with code [128..255]
 
4456
    that cannot be decoded to unicode in some encoding.
 
4457
    Return None if all non-ascii characters is valid
 
4458
    for given encoding.
 
4459
    """
 
4460
    for i in xrange(128, 256):
 
4461
        char = chr(i)
 
4462
        try:
 
4463
            char.decode(encoding)
 
4464
        except UnicodeDecodeError:
 
4465
            return char
 
4466
    return None
 
4467
 
 
4468
 
 
4469
# Only define SubUnitBzrRunner if subunit is available.
 
4470
try:
 
4471
    from subunit import TestProtocolClient
 
4472
    from subunit.test_results import AutoTimingTestResultDecorator
 
4473
    class SubUnitBzrProtocolClient(TestProtocolClient):
 
4474
 
 
4475
        def stopTest(self, test):
 
4476
            super(SubUnitBzrProtocolClient, self).stopTest(test)
 
4477
            _clear__type_equality_funcs(test)
 
4478
 
 
4479
        def addSuccess(self, test, details=None):
 
4480
            # The subunit client always includes the details in the subunit
 
4481
            # stream, but we don't want to include it in ours.
 
4482
            if details is not None and 'log' in details:
 
4483
                del details['log']
 
4484
            return super(SubUnitBzrProtocolClient, self).addSuccess(
 
4485
                test, details)
 
4486
 
 
4487
    class SubUnitBzrRunner(TextTestRunner):
 
4488
        def run(self, test):
 
4489
            result = AutoTimingTestResultDecorator(
 
4490
                SubUnitBzrProtocolClient(self.stream))
 
4491
            test.run(result)
 
4492
            return result
 
4493
except ImportError:
 
4494
    pass
 
4495
 
 
4496
 
 
4497
# API compatibility for old plugins; see bug 892622.
 
4498
for name in [
 
4499
    'Feature',
 
4500
    'HTTPServerFeature', 
 
4501
    'ModuleAvailableFeature',
 
4502
    'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
 
4503
    'OsFifoFeature', 'UnicodeFilenameFeature',
 
4504
    'ByteStringNamedFilesystem', 'UTF8Filesystem',
 
4505
    'BreakinFeature', 'CaseInsCasePresFilenameFeature',
 
4506
    'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
 
4507
    'posix_permissions_feature',
 
4508
    ]:
 
4509
    globals()[name] = _CompatabilityThunkFeature(
 
4510
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4511
        'bzrlib.tests', name,
 
4512
        name, 'bzrlib.tests.features')
 
4513
 
 
4514
 
 
4515
for (old_name, new_name) in [
 
4516
    ('UnicodeFilename', 'UnicodeFilenameFeature'),
 
4517
    ]:
 
4518
    globals()[name] = _CompatabilityThunkFeature(
 
4519
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4520
        'bzrlib.tests', old_name,
 
4521
        new_name, 'bzrlib.tests.features')