/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2017-12-03 15:14:22 UTC
  • mfrom: (6829.1.1 no-branch-nick)
  • Revision ID: jelmer@jelmer.uk-20171203151422-54pwtld2ae5cx11l
Merge lp:~jelmer/brz/no-branch-nick.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005-2013, 2015, 2016 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
# TODO: Perhaps there should be an API to find out if bzr running under the
19
 
# test suite -- some plugins might want to avoid making intrusive changes if
20
 
# this is the case.  However, we want behaviour under to test to diverge as
21
 
# little as possible, so this should be used rarely if it's added at all.
22
 
# (Suggestion from j-a-meinel, 2005-11-24)
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Testing framework extensions"""
 
18
 
 
19
from __future__ import absolute_import
23
20
 
24
21
# NOTE: Some classes in here use camelCaseNaming() rather than
25
22
# underscore_naming().  That's for consistency with unittest; it's not the
26
 
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
23
# general style of breezy.  Please continue that consistency when adding e.g.
27
24
# new assertFoo() methods.
28
25
 
 
26
import atexit
29
27
import codecs
30
 
from cStringIO import StringIO
 
28
import copy
31
29
import difflib
 
30
import doctest
32
31
import errno
 
32
import functools
 
33
import itertools
33
34
import logging
 
35
import math
34
36
import os
 
37
import platform
 
38
import pprint
 
39
import random
35
40
import re
36
 
import shutil
 
41
import shlex
 
42
import site
37
43
import stat
 
44
import subprocess
38
45
import sys
39
46
import tempfile
 
47
import threading
 
48
import time
 
49
import traceback
40
50
import unittest
41
 
import time
42
 
 
43
 
 
44
 
import bzrlib.branch
45
 
import bzrlib.bzrdir as bzrdir
46
 
import bzrlib.commands
47
 
import bzrlib.errors as errors
48
 
import bzrlib.inventory
49
 
import bzrlib.iterablefile
50
 
import bzrlib.lockdir
51
 
from bzrlib.merge import merge_inner
52
 
import bzrlib.merge3
53
 
import bzrlib.osutils
54
 
import bzrlib.osutils as osutils
55
 
import bzrlib.plugin
56
 
from bzrlib.revision import common_ancestor
57
 
import bzrlib.store
58
 
import bzrlib.trace
59
 
from bzrlib.transport import urlescape, get_transport
60
 
import bzrlib.transport
61
 
from bzrlib.transport.local import LocalRelpathServer
62
 
from bzrlib.transport.readonly import ReadonlyServer
63
 
from bzrlib.trace import mutter
64
 
from bzrlib.tests.TestUtil import TestLoader, TestSuite
65
 
from bzrlib.tests.treeshape import build_tree_contents
66
 
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
67
 
 
68
 
default_transport = LocalRelpathServer
69
 
 
70
 
MODULES_TO_TEST = []
71
 
MODULES_TO_DOCTEST = [
72
 
                      bzrlib.branch,
73
 
                      bzrlib.commands,
74
 
                      bzrlib.errors,
75
 
                      bzrlib.inventory,
76
 
                      bzrlib.iterablefile,
77
 
                      bzrlib.lockdir,
78
 
                      bzrlib.merge3,
79
 
                      bzrlib.option,
80
 
                      bzrlib.osutils,
81
 
                      bzrlib.store
82
 
                      ]
83
 
def packages_to_test():
84
 
    """Return a list of packages to test.
85
 
 
86
 
    The packages are not globally imported so that import failures are
87
 
    triggered when running selftest, not when importing the command.
88
 
    """
89
 
    import bzrlib.doc
90
 
    import bzrlib.tests.blackbox
91
 
    import bzrlib.tests.branch_implementations
92
 
    import bzrlib.tests.bzrdir_implementations
93
 
    import bzrlib.tests.interrepository_implementations
94
 
    import bzrlib.tests.interversionedfile_implementations
95
 
    import bzrlib.tests.repository_implementations
96
 
    import bzrlib.tests.revisionstore_implementations
97
 
    import bzrlib.tests.workingtree_implementations
98
 
    return [
99
 
            bzrlib.doc,
100
 
            bzrlib.tests.blackbox,
101
 
            bzrlib.tests.branch_implementations,
102
 
            bzrlib.tests.bzrdir_implementations,
103
 
            bzrlib.tests.interrepository_implementations,
104
 
            bzrlib.tests.interversionedfile_implementations,
105
 
            bzrlib.tests.repository_implementations,
106
 
            bzrlib.tests.revisionstore_implementations,
107
 
            bzrlib.tests.workingtree_implementations,
108
 
            ]
109
 
 
110
 
 
111
 
class _MyResult(unittest._TextTestResult):
112
 
    """Custom TestResult.
113
 
 
114
 
    Shows output in a different format, including displaying runtime for tests.
115
 
    """
 
51
import warnings
 
52
 
 
53
import testtools
 
54
# nb: check this before importing anything else from within it
 
55
_testtools_version = getattr(testtools, '__version__', ())
 
56
if _testtools_version < (0, 9, 5):
 
57
    raise ImportError("need at least testtools 0.9.5: %s is %r"
 
58
        % (testtools.__file__, _testtools_version))
 
59
from testtools import content
 
60
 
 
61
import breezy
 
62
from .. import (
 
63
    branchbuilder,
 
64
    controldir,
 
65
    commands as _mod_commands,
 
66
    config,
 
67
    i18n,
 
68
    debug,
 
69
    errors,
 
70
    hooks,
 
71
    lock as _mod_lock,
 
72
    lockdir,
 
73
    memorytree,
 
74
    osutils,
 
75
    plugin as _mod_plugin,
 
76
    pyutils,
 
77
    ui,
 
78
    urlutils,
 
79
    registry,
 
80
    symbol_versioning,
 
81
    trace,
 
82
    transport as _mod_transport,
 
83
    workingtree,
 
84
    )
 
85
from breezy.bzr import (
 
86
    chk_map,
 
87
    )
 
88
try:
 
89
    import breezy.lsprof
 
90
except ImportError:
 
91
    # lsprof not available
 
92
    pass
 
93
from ..sixish import (
 
94
    BytesIO,
 
95
    PY3,
 
96
    string_types,
 
97
    text_type,
 
98
    )
 
99
from ..bzr.smart import client, request
 
100
from ..transport import (
 
101
    memory,
 
102
    pathfilter,
 
103
    )
 
104
from ..tests import (
 
105
    fixtures,
 
106
    test_server,
 
107
    TestUtil,
 
108
    treeshape,
 
109
    ui_testing,
 
110
    )
 
111
from ..tests.features import _CompatabilityThunkFeature
 
112
 
 
113
# Mark this python module as being part of the implementation
 
114
# of unittest: this gives us better tracebacks where the last
 
115
# shown frame is the test code, not our assertXYZ.
 
116
__unittest = 1
 
117
 
 
118
default_transport = test_server.LocalURLServer
 
119
 
 
120
 
 
121
_unitialized_attr = object()
 
122
"""A sentinel needed to act as a default value in a method signature."""
 
123
 
 
124
 
 
125
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
126
SUBUNIT_SEEK_SET = 0
 
127
SUBUNIT_SEEK_CUR = 1
 
128
 
 
129
# These are intentionally brought into this namespace. That way plugins, etc
 
130
# can just "from breezy.tests import TestCase, TestLoader, etc"
 
131
TestSuite = TestUtil.TestSuite
 
132
TestLoader = TestUtil.TestLoader
 
133
 
 
134
# Tests should run in a clean and clearly defined environment. The goal is to
 
135
# keep them isolated from the running environment as mush as possible. The test
 
136
# framework ensures the variables defined below are set (or deleted if the
 
137
# value is None) before a test is run and reset to their original value after
 
138
# the test is run. Generally if some code depends on an environment variable,
 
139
# the tests should start without this variable in the environment. There are a
 
140
# few exceptions but you shouldn't violate this rule lightly.
 
141
isolated_environ = {
 
142
    'BRZ_HOME': None,
 
143
    'HOME': None,
 
144
    'GNUPGHOME': None,
 
145
    'XDG_CONFIG_HOME': None,
 
146
    # brz now uses the Win32 API and doesn't rely on APPDATA, but the
 
147
    # tests do check our impls match APPDATA
 
148
    'BRZ_EDITOR': None, # test_msgeditor manipulates this variable
 
149
    'VISUAL': None,
 
150
    'EDITOR': None,
 
151
    'BRZ_EMAIL': None,
 
152
    'BZREMAIL': None, # may still be present in the environment
 
153
    'EMAIL': 'jrandom@example.com', # set EMAIL as brz does not guess
 
154
    'BRZ_PROGRESS_BAR': None,
 
155
    # This should trap leaks to ~/.brz.log. This occurs when tests use TestCase
 
156
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
157
    # TestCase should not use disk resources, BRZ_LOG is one.
 
158
    'BRZ_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
159
    'BRZ_PLUGIN_PATH': '-site',
 
160
    'BRZ_DISABLE_PLUGINS': None,
 
161
    'BRZ_PLUGINS_AT': None,
 
162
    'BRZ_CONCURRENCY': None,
 
163
    # Make sure that any text ui tests are consistent regardless of
 
164
    # the environment the test case is run in; you may want tests that
 
165
    # test other combinations.  'dumb' is a reasonable guess for tests
 
166
    # going to a pipe or a BytesIO.
 
167
    'TERM': 'dumb',
 
168
    'LINES': '25',
 
169
    'COLUMNS': '80',
 
170
    'BRZ_COLUMNS': '80',
 
171
    # Disable SSH Agent
 
172
    'SSH_AUTH_SOCK': None,
 
173
    # Proxies
 
174
    'http_proxy': None,
 
175
    'HTTP_PROXY': None,
 
176
    'https_proxy': None,
 
177
    'HTTPS_PROXY': None,
 
178
    'no_proxy': None,
 
179
    'NO_PROXY': None,
 
180
    'all_proxy': None,
 
181
    'ALL_PROXY': None,
 
182
    # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
183
    # least. If you do (care), please update this comment
 
184
    # -- vila 20080401
 
185
    'ftp_proxy': None,
 
186
    'FTP_PROXY': None,
 
187
    'BZR_REMOTE_PATH': None,
 
188
    # Generally speaking, we don't want apport reporting on crashes in
 
189
    # the test envirnoment unless we're specifically testing apport,
 
190
    # so that it doesn't leak into the real system environment.  We
 
191
    # use an env var so it propagates to subprocesses.
 
192
    'APPORT_DISABLE': '1',
 
193
    }
 
194
 
 
195
 
 
196
def override_os_environ(test, env=None):
 
197
    """Modify os.environ keeping a copy.
 
198
 
 
199
    :param test: A test instance
 
200
 
 
201
    :param env: A dict containing variable definitions to be installed
 
202
    """
 
203
    if env is None:
 
204
        env = isolated_environ
 
205
    test._original_os_environ = dict(**os.environ)
 
206
    for var in env:
 
207
        osutils.set_or_unset_env(var, env[var])
 
208
        if var not in test._original_os_environ:
 
209
            # The var is new, add it with a value of None, so
 
210
            # restore_os_environ will delete it
 
211
            test._original_os_environ[var] = None
 
212
 
 
213
 
 
214
def restore_os_environ(test):
 
215
    """Restore os.environ to its original state.
 
216
 
 
217
    :param test: A test instance previously passed to override_os_environ.
 
218
    """
 
219
    for var, value in test._original_os_environ.items():
 
220
        # Restore the original value (or delete it if the value has been set to
 
221
        # None in override_os_environ).
 
222
        osutils.set_or_unset_env(var, value)
 
223
 
 
224
 
 
225
def _clear__type_equality_funcs(test):
 
226
    """Cleanup bound methods stored on TestCase instances
 
227
 
 
228
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
229
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
230
 
 
231
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
232
    shipped in Oneiric, an object with no clear method was used, hence the
 
233
    extra complications, see bug 809048 for details.
 
234
    """
 
235
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
236
    if type_equality_funcs is not None:
 
237
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
238
        if tef_clear is None:
 
239
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
240
            if tef_instance_dict is not None:
 
241
                tef_clear = tef_instance_dict.clear
 
242
        if tef_clear is not None:
 
243
            tef_clear()
 
244
 
 
245
 
 
246
class ExtendedTestResult(testtools.TextTestResult):
 
247
    """Accepts, reports and accumulates the results of running tests.
 
248
 
 
249
    Compared to the unittest version this class adds support for
 
250
    profiling, benchmarking, stopping as soon as a test fails,  and
 
251
    skipping tests.  There are further-specialized subclasses for
 
252
    different types of display.
 
253
 
 
254
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
255
    addFailure or addError methods.  These in turn may redirect to a more
 
256
    specific case for the special test results supported by our extended
 
257
    tests.
 
258
 
 
259
    Note that just one of these objects is fed the results from many tests.
 
260
    """
 
261
 
116
262
    stop_early = False
117
263
 
118
 
    def _elapsedTime(self):
119
 
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
264
    def __init__(self, stream, descriptions, verbosity,
 
265
                 bench_history=None,
 
266
                 strict=False,
 
267
                 ):
 
268
        """Construct new TestResult.
 
269
 
 
270
        :param bench_history: Optionally, a writable file object to accumulate
 
271
            benchmark results.
 
272
        """
 
273
        testtools.TextTestResult.__init__(self, stream)
 
274
        if bench_history is not None:
 
275
            from breezy.version import _get_bzr_source_tree
 
276
            src_tree = _get_bzr_source_tree()
 
277
            if src_tree:
 
278
                try:
 
279
                    revision_id = src_tree.get_parent_ids()[0]
 
280
                except IndexError:
 
281
                    # XXX: if this is a brand new tree, do the same as if there
 
282
                    # is no branch.
 
283
                    revision_id = ''
 
284
            else:
 
285
                # XXX: If there's no branch, what should we do?
 
286
                revision_id = ''
 
287
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
288
        self._bench_history = bench_history
 
289
        self.ui = ui.ui_factory
 
290
        self.num_tests = 0
 
291
        self.error_count = 0
 
292
        self.failure_count = 0
 
293
        self.known_failure_count = 0
 
294
        self.skip_count = 0
 
295
        self.not_applicable_count = 0
 
296
        self.unsupported = {}
 
297
        self.count = 0
 
298
        self._overall_start_time = time.time()
 
299
        self._strict = strict
 
300
        self._first_thread_leaker_id = None
 
301
        self._tests_leaking_threads_count = 0
 
302
        self._traceback_from_test = None
 
303
 
 
304
    def stopTestRun(self):
 
305
        run = self.testsRun
 
306
        actionTaken = "Ran"
 
307
        stopTime = time.time()
 
308
        timeTaken = stopTime - self.startTime
 
309
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
310
        #                the parent class method is similar have to duplicate
 
311
        self._show_list('ERROR', self.errors)
 
312
        self._show_list('FAIL', self.failures)
 
313
        self.stream.write(self.sep2)
 
314
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
315
                            run, run != 1 and "s" or "", timeTaken))
 
316
        if not self.wasSuccessful():
 
317
            self.stream.write("FAILED (")
 
318
            failed, errored = map(len, (self.failures, self.errors))
 
319
            if failed:
 
320
                self.stream.write("failures=%d" % failed)
 
321
            if errored:
 
322
                if failed: self.stream.write(", ")
 
323
                self.stream.write("errors=%d" % errored)
 
324
            if self.known_failure_count:
 
325
                if failed or errored: self.stream.write(", ")
 
326
                self.stream.write("known_failure_count=%d" %
 
327
                    self.known_failure_count)
 
328
            self.stream.write(")\n")
 
329
        else:
 
330
            if self.known_failure_count:
 
331
                self.stream.write("OK (known_failures=%d)\n" %
 
332
                    self.known_failure_count)
 
333
            else:
 
334
                self.stream.write("OK\n")
 
335
        if self.skip_count > 0:
 
336
            skipped = self.skip_count
 
337
            self.stream.write('%d test%s skipped\n' %
 
338
                                (skipped, skipped != 1 and "s" or ""))
 
339
        if self.unsupported:
 
340
            for feature, count in sorted(self.unsupported.items()):
 
341
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
342
                    (feature, count))
 
343
        if self._strict:
 
344
            ok = self.wasStrictlySuccessful()
 
345
        else:
 
346
            ok = self.wasSuccessful()
 
347
        if self._first_thread_leaker_id:
 
348
            self.stream.write(
 
349
                '%s is leaking threads among %d leaking tests.\n' % (
 
350
                self._first_thread_leaker_id,
 
351
                self._tests_leaking_threads_count))
 
352
            # We don't report the main thread as an active one.
 
353
            self.stream.write(
 
354
                '%d non-main threads were left active in the end.\n'
 
355
                % (len(self._active_threads) - 1))
 
356
 
 
357
    def getDescription(self, test):
 
358
        return test.id()
 
359
 
 
360
    def _extractBenchmarkTime(self, testCase, details=None):
 
361
        """Add a benchmark time for the current test case."""
 
362
        if details and 'benchtime' in details:
 
363
            return float(''.join(details['benchtime'].iter_bytes()))
 
364
        return getattr(testCase, "_benchtime", None)
 
365
 
 
366
    def _delta_to_float(self, a_timedelta, precision):
 
367
        # This calls ceiling to ensure that the most pessimistic view of time
 
368
        # taken is shown (rather than leaving it to the Python %f operator
 
369
        # to decide whether to round/floor/ceiling. This was added when we
 
370
        # had pyp3 test failures that suggest a floor was happening.
 
371
        shift = 10 ** precision
 
372
        return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
 
373
            a_timedelta.microseconds / 1000000.0) * shift) / shift
 
374
 
 
375
    def _elapsedTestTimeString(self):
 
376
        """Return a time string for the overall time the current test has taken."""
 
377
        return self._formatTime(self._delta_to_float(
 
378
            self._now() - self._start_datetime, 3))
 
379
 
 
380
    def _testTimeString(self, testCase):
 
381
        benchmark_time = self._extractBenchmarkTime(testCase)
 
382
        if benchmark_time is not None:
 
383
            return self._formatTime(benchmark_time) + "*"
 
384
        else:
 
385
            return self._elapsedTestTimeString()
 
386
 
 
387
    def _formatTime(self, seconds):
 
388
        """Format seconds as milliseconds with leading spaces."""
 
389
        # some benchmarks can take thousands of seconds to run, so we need 8
 
390
        # places
 
391
        return "%8dms" % (1000 * seconds)
 
392
 
 
393
    def _shortened_test_description(self, test):
 
394
        what = test.id()
 
395
        what = re.sub(r'^breezy\.tests\.', '', what)
 
396
        return what
 
397
 
 
398
    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
 
399
    #                multiple times in a row, because the handler is added for
 
400
    #                each test but the container list is shared between cases.
 
401
    #                See lp:498869 lp:625574 and lp:637725 for background.
 
402
    def _record_traceback_from_test(self, exc_info):
 
403
        """Store the traceback from passed exc_info tuple till"""
 
404
        self._traceback_from_test = exc_info[2]
120
405
 
121
406
    def startTest(self, test):
122
 
        unittest.TestResult.startTest(self, test)
123
 
        # In a short description, the important words are in
124
 
        # the beginning, but in an id, the important words are
125
 
        # at the end
126
 
        SHOW_DESCRIPTIONS = False
127
 
        if self.showAll:
128
 
            width = osutils.terminal_width()
129
 
            name_width = width - 15
130
 
            what = None
131
 
            if SHOW_DESCRIPTIONS:
132
 
                what = test.shortDescription()
133
 
                if what:
134
 
                    if len(what) > name_width:
135
 
                        what = what[:name_width-3] + '...'
136
 
            if what is None:
137
 
                what = test.id()
138
 
                if what.startswith('bzrlib.tests.'):
139
 
                    what = what[13:]
140
 
                if len(what) > name_width:
141
 
                    what = '...' + what[3-name_width:]
142
 
            what = what.ljust(name_width)
143
 
            self.stream.write(what)
144
 
        self.stream.flush()
145
 
        self._start_time = time.time()
 
407
        super(ExtendedTestResult, self).startTest(test)
 
408
        if self.count == 0:
 
409
            self.startTests()
 
410
        self.count += 1
 
411
        self.report_test_start(test)
 
412
        test.number = self.count
 
413
        self._recordTestStartTime()
 
414
        # Make testtools cases give us the real traceback on failure
 
415
        addOnException = getattr(test, "addOnException", None)
 
416
        if addOnException is not None:
 
417
            addOnException(self._record_traceback_from_test)
 
418
        # Only check for thread leaks on breezy derived test cases
 
419
        if isinstance(test, TestCase):
 
420
            test.addCleanup(self._check_leaked_threads, test)
 
421
 
 
422
    def stopTest(self, test):
 
423
        super(ExtendedTestResult, self).stopTest(test)
 
424
        # Manually break cycles, means touching various private things but hey
 
425
        getDetails = getattr(test, "getDetails", None)
 
426
        if getDetails is not None:
 
427
            getDetails().clear()
 
428
        _clear__type_equality_funcs(test)
 
429
        self._traceback_from_test = None
 
430
 
 
431
    def startTests(self):
 
432
        self.report_tests_starting()
 
433
        self._active_threads = threading.enumerate()
 
434
 
 
435
    def _check_leaked_threads(self, test):
 
436
        """See if any threads have leaked since last call
 
437
 
 
438
        A sample of live threads is stored in the _active_threads attribute,
 
439
        when this method runs it compares the current live threads and any not
 
440
        in the previous sample are treated as having leaked.
 
441
        """
 
442
        now_active_threads = set(threading.enumerate())
 
443
        threads_leaked = now_active_threads.difference(self._active_threads)
 
444
        if threads_leaked:
 
445
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
446
            self._tests_leaking_threads_count += 1
 
447
            if self._first_thread_leaker_id is None:
 
448
                self._first_thread_leaker_id = test.id()
 
449
            self._active_threads = now_active_threads
 
450
 
 
451
    def _recordTestStartTime(self):
 
452
        """Record that a test has started."""
 
453
        self._start_datetime = self._now()
146
454
 
147
455
    def addError(self, test, err):
148
 
        if isinstance(err[1], TestSkipped):
149
 
            return self.addSkipped(test, err)    
150
 
        unittest.TestResult.addError(self, test, err)
151
 
        if self.showAll:
152
 
            self.stream.writeln("ERROR %s" % self._elapsedTime())
153
 
        elif self.dots:
154
 
            self.stream.write('E')
155
 
        self.stream.flush()
 
456
        """Tell result that test finished with an error.
 
457
 
 
458
        Called from the TestCase run() method when the test
 
459
        fails with an unexpected error.
 
460
        """
 
461
        self._post_mortem(self._traceback_from_test)
 
462
        super(ExtendedTestResult, self).addError(test, err)
 
463
        self.error_count += 1
 
464
        self.report_error(test, err)
156
465
        if self.stop_early:
157
466
            self.stop()
158
467
 
159
468
    def addFailure(self, test, err):
160
 
        unittest.TestResult.addFailure(self, test, err)
161
 
        if self.showAll:
162
 
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
163
 
        elif self.dots:
164
 
            self.stream.write('F')
165
 
        self.stream.flush()
166
 
        if self.stop_early:
167
 
            self.stop()
168
 
 
169
 
    def addSuccess(self, test):
170
 
        if self.showAll:
171
 
            self.stream.writeln('   OK %s' % self._elapsedTime())
172
 
        elif self.dots:
173
 
            self.stream.write('~')
174
 
        self.stream.flush()
175
 
        unittest.TestResult.addSuccess(self, test)
176
 
 
177
 
    def addSkipped(self, test, skip_excinfo):
178
 
        if self.showAll:
179
 
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
180
 
            print >>self.stream, '     %s' % skip_excinfo[1]
181
 
        elif self.dots:
182
 
            self.stream.write('S')
183
 
        self.stream.flush()
184
 
        # seems best to treat this as success from point-of-view of unittest
185
 
        # -- it actually does nothing so it barely matters :)
186
 
        unittest.TestResult.addSuccess(self, test)
187
 
 
188
 
    def printErrorList(self, flavour, errors):
189
 
        for test, err in errors:
190
 
            self.stream.writeln(self.separator1)
191
 
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
192
 
            if getattr(test, '_get_log', None) is not None:
193
 
                print >>self.stream
194
 
                print >>self.stream, \
195
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
196
 
                print >>self.stream, test._get_log()
197
 
                print >>self.stream, \
198
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
199
 
            self.stream.writeln(self.separator2)
200
 
            self.stream.writeln("%s" % err)
201
 
 
202
 
 
203
 
class TextTestRunner(unittest.TextTestRunner):
 
469
        """Tell result that test failed.
 
470
 
 
471
        Called from the TestCase run() method when the test
 
472
        fails because e.g. an assert() method failed.
 
473
        """
 
474
        self._post_mortem(self._traceback_from_test)
 
475
        super(ExtendedTestResult, self).addFailure(test, err)
 
476
        self.failure_count += 1
 
477
        self.report_failure(test, err)
 
478
        if self.stop_early:
 
479
            self.stop()
 
480
 
 
481
    def addSuccess(self, test, details=None):
 
482
        """Tell result that test completed successfully.
 
483
 
 
484
        Called from the TestCase run()
 
485
        """
 
486
        if self._bench_history is not None:
 
487
            benchmark_time = self._extractBenchmarkTime(test, details)
 
488
            if benchmark_time is not None:
 
489
                self._bench_history.write("%s %s\n" % (
 
490
                    self._formatTime(benchmark_time),
 
491
                    test.id()))
 
492
        self.report_success(test)
 
493
        super(ExtendedTestResult, self).addSuccess(test)
 
494
        test._log_contents = ''
 
495
 
 
496
    def addExpectedFailure(self, test, err):
 
497
        self.known_failure_count += 1
 
498
        self.report_known_failure(test, err)
 
499
 
 
500
    def addUnexpectedSuccess(self, test, details=None):
 
501
        """Tell result the test unexpectedly passed, counting as a failure
 
502
 
 
503
        When the minimum version of testtools required becomes 0.9.8 this
 
504
        can be updated to use the new handling there.
 
505
        """
 
506
        super(ExtendedTestResult, self).addFailure(test, details=details)
 
507
        self.failure_count += 1
 
508
        self.report_unexpected_success(test,
 
509
            "".join(details["reason"].iter_text()))
 
510
        if self.stop_early:
 
511
            self.stop()
 
512
 
 
513
    def addNotSupported(self, test, feature):
 
514
        """The test will not be run because of a missing feature.
 
515
        """
 
516
        # this can be called in two different ways: it may be that the
 
517
        # test started running, and then raised (through requireFeature)
 
518
        # UnavailableFeature.  Alternatively this method can be called
 
519
        # while probing for features before running the test code proper; in
 
520
        # that case we will see startTest and stopTest, but the test will
 
521
        # never actually run.
 
522
        self.unsupported.setdefault(str(feature), 0)
 
523
        self.unsupported[str(feature)] += 1
 
524
        self.report_unsupported(test, feature)
 
525
 
 
526
    def addSkip(self, test, reason):
 
527
        """A test has not run for 'reason'."""
 
528
        self.skip_count += 1
 
529
        self.report_skip(test, reason)
 
530
 
 
531
    def addNotApplicable(self, test, reason):
 
532
        self.not_applicable_count += 1
 
533
        self.report_not_applicable(test, reason)
 
534
 
 
535
    def _count_stored_tests(self):
 
536
        """Count of tests instances kept alive due to not succeeding"""
 
537
        return self.error_count + self.failure_count + self.known_failure_count
 
538
 
 
539
    def _post_mortem(self, tb=None):
 
540
        """Start a PDB post mortem session."""
 
541
        if os.environ.get('BRZ_TEST_PDB', None):
 
542
            import pdb
 
543
            pdb.post_mortem(tb)
 
544
 
 
545
    def progress(self, offset, whence):
 
546
        """The test is adjusting the count of tests to run."""
 
547
        if whence == SUBUNIT_SEEK_SET:
 
548
            self.num_tests = offset
 
549
        elif whence == SUBUNIT_SEEK_CUR:
 
550
            self.num_tests += offset
 
551
        else:
 
552
            raise errors.BzrError("Unknown whence %r" % whence)
 
553
 
 
554
    def report_tests_starting(self):
 
555
        """Display information before the test run begins"""
 
556
        if getattr(sys, 'frozen', None) is None:
 
557
            bzr_path = osutils.realpath(sys.argv[0])
 
558
        else:
 
559
            bzr_path = sys.executable
 
560
        self.stream.write(
 
561
            'brz selftest: %s\n' % (bzr_path,))
 
562
        self.stream.write(
 
563
            '   %s\n' % (
 
564
                    breezy.__path__[0],))
 
565
        self.stream.write(
 
566
            '   bzr-%s python-%s %s\n' % (
 
567
                    breezy.version_string,
 
568
                    breezy._format_version_tuple(sys.version_info),
 
569
                    platform.platform(aliased=1),
 
570
                    ))
 
571
        self.stream.write('\n')
 
572
 
 
573
    def report_test_start(self, test):
 
574
        """Display information on the test just about to be run"""
 
575
 
 
576
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
577
        """Display information on a test that leaked one or more threads"""
 
578
        # GZ 2010-09-09: A leak summary reported separately from the general
 
579
        #                thread debugging would be nice. Tests under subunit
 
580
        #                need something not using stream, perhaps adding a
 
581
        #                testtools details object would be fitting.
 
582
        if 'threads' in selftest_debug_flags:
 
583
            self.stream.write('%s is leaking, active is now %d\n' %
 
584
                (test.id(), len(active_threads)))
 
585
 
 
586
    def startTestRun(self):
 
587
        self.startTime = time.time()
 
588
 
 
589
    def report_success(self, test):
 
590
        pass
 
591
 
 
592
    def wasStrictlySuccessful(self):
 
593
        if self.unsupported or self.known_failure_count:
 
594
            return False
 
595
        return self.wasSuccessful()
 
596
 
 
597
 
 
598
class TextTestResult(ExtendedTestResult):
 
599
    """Displays progress and results of tests in text form"""
 
600
 
 
601
    def __init__(self, stream, descriptions, verbosity,
 
602
                 bench_history=None,
 
603
                 strict=None,
 
604
                 ):
 
605
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
606
            bench_history, strict)
 
607
        self.pb = self.ui.nested_progress_bar()
 
608
        self.pb.show_pct = False
 
609
        self.pb.show_spinner = False
 
610
        self.pb.show_eta = False,
 
611
        self.pb.show_count = False
 
612
        self.pb.show_bar = False
 
613
        self.pb.update_latency = 0
 
614
        self.pb.show_transport_activity = False
 
615
 
 
616
    def stopTestRun(self):
 
617
        # called when the tests that are going to run have run
 
618
        self.pb.clear()
 
619
        self.pb.finished()
 
620
        super(TextTestResult, self).stopTestRun()
 
621
 
 
622
    def report_tests_starting(self):
 
623
        super(TextTestResult, self).report_tests_starting()
 
624
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
625
 
 
626
    def _progress_prefix_text(self):
 
627
        # the longer this text, the less space we have to show the test
 
628
        # name...
 
629
        a = '[%d' % self.count              # total that have been run
 
630
        # tests skipped as known not to be relevant are not important enough
 
631
        # to show here
 
632
        ## if self.skip_count:
 
633
        ##     a += ', %d skip' % self.skip_count
 
634
        ## if self.known_failure_count:
 
635
        ##     a += '+%dX' % self.known_failure_count
 
636
        if self.num_tests:
 
637
            a +='/%d' % self.num_tests
 
638
        a += ' in '
 
639
        runtime = time.time() - self._overall_start_time
 
640
        if runtime >= 60:
 
641
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
642
        else:
 
643
            a += '%ds' % runtime
 
644
        total_fail_count = self.error_count + self.failure_count
 
645
        if total_fail_count:
 
646
            a += ', %d failed' % total_fail_count
 
647
        # if self.unsupported:
 
648
        #     a += ', %d missing' % len(self.unsupported)
 
649
        a += ']'
 
650
        return a
 
651
 
 
652
    def report_test_start(self, test):
 
653
        self.pb.update(
 
654
                self._progress_prefix_text()
 
655
                + ' '
 
656
                + self._shortened_test_description(test))
 
657
 
 
658
    def _test_description(self, test):
 
659
        return self._shortened_test_description(test)
 
660
 
 
661
    def report_error(self, test, err):
 
662
        self.stream.write('ERROR: %s\n    %s\n' % (
 
663
            self._test_description(test),
 
664
            err[1],
 
665
            ))
 
666
 
 
667
    def report_failure(self, test, err):
 
668
        self.stream.write('FAIL: %s\n    %s\n' % (
 
669
            self._test_description(test),
 
670
            err[1],
 
671
            ))
 
672
 
 
673
    def report_known_failure(self, test, err):
 
674
        pass
 
675
 
 
676
    def report_unexpected_success(self, test, reason):
 
677
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
 
678
            self._test_description(test),
 
679
            "Unexpected success. Should have failed",
 
680
            reason,
 
681
            ))
 
682
 
 
683
    def report_skip(self, test, reason):
 
684
        pass
 
685
 
 
686
    def report_not_applicable(self, test, reason):
 
687
        pass
 
688
 
 
689
    def report_unsupported(self, test, feature):
 
690
        """test cannot be run because feature is missing."""
 
691
 
 
692
 
 
693
class VerboseTestResult(ExtendedTestResult):
 
694
    """Produce long output, with one line per test run plus times"""
 
695
 
 
696
    def _ellipsize_to_right(self, a_string, final_width):
 
697
        """Truncate and pad a string, keeping the right hand side"""
 
698
        if len(a_string) > final_width:
 
699
            result = '...' + a_string[3-final_width:]
 
700
        else:
 
701
            result = a_string
 
702
        return result.ljust(final_width)
 
703
 
 
704
    def report_tests_starting(self):
 
705
        self.stream.write('running %d tests...\n' % self.num_tests)
 
706
        super(VerboseTestResult, self).report_tests_starting()
 
707
 
 
708
    def report_test_start(self, test):
 
709
        name = self._shortened_test_description(test)
 
710
        width = osutils.terminal_width()
 
711
        if width is not None:
 
712
            # width needs space for 6 char status, plus 1 for slash, plus an
 
713
            # 11-char time string, plus a trailing blank
 
714
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
715
            # space
 
716
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
717
        else:
 
718
            self.stream.write(name)
 
719
        self.stream.flush()
 
720
 
 
721
    def _error_summary(self, err):
 
722
        indent = ' ' * 4
 
723
        return '%s%s' % (indent, err[1])
 
724
 
 
725
    def report_error(self, test, err):
 
726
        self.stream.write('ERROR %s\n%s\n'
 
727
                % (self._testTimeString(test),
 
728
                   self._error_summary(err)))
 
729
 
 
730
    def report_failure(self, test, err):
 
731
        self.stream.write(' FAIL %s\n%s\n'
 
732
                % (self._testTimeString(test),
 
733
                   self._error_summary(err)))
 
734
 
 
735
    def report_known_failure(self, test, err):
 
736
        self.stream.write('XFAIL %s\n%s\n'
 
737
                % (self._testTimeString(test),
 
738
                   self._error_summary(err)))
 
739
 
 
740
    def report_unexpected_success(self, test, reason):
 
741
        self.stream.write(' FAIL %s\n%s: %s\n'
 
742
                % (self._testTimeString(test),
 
743
                   "Unexpected success. Should have failed",
 
744
                   reason))
 
745
 
 
746
    def report_success(self, test):
 
747
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
748
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
749
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
750
            stats.pprint(file=self.stream)
 
751
        # flush the stream so that we get smooth output. This verbose mode is
 
752
        # used to show the output in PQM.
 
753
        self.stream.flush()
 
754
 
 
755
    def report_skip(self, test, reason):
 
756
        self.stream.write(' SKIP %s\n%s\n'
 
757
                % (self._testTimeString(test), reason))
 
758
 
 
759
    def report_not_applicable(self, test, reason):
 
760
        self.stream.write('  N/A %s\n    %s\n'
 
761
                % (self._testTimeString(test), reason))
 
762
 
 
763
    def report_unsupported(self, test, feature):
 
764
        """test cannot be run because feature is missing."""
 
765
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
766
                %(self._testTimeString(test), feature))
 
767
 
 
768
 
 
769
class TextTestRunner(object):
204
770
    stop_on_failure = False
205
771
 
206
 
    def _makeResult(self):
207
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
208
 
        result.stop_early = self.stop_on_failure
209
 
        return result
 
772
    def __init__(self,
 
773
                 stream=sys.stderr,
 
774
                 descriptions=0,
 
775
                 verbosity=1,
 
776
                 bench_history=None,
 
777
                 strict=False,
 
778
                 result_decorators=None,
 
779
                 ):
 
780
        """Create a TextTestRunner.
 
781
 
 
782
        :param result_decorators: An optional list of decorators to apply
 
783
            to the result object being used by the runner. Decorators are
 
784
            applied left to right - the first element in the list is the 
 
785
            innermost decorator.
 
786
        """
 
787
        # stream may know claim to know to write unicode strings, but in older
 
788
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
789
        # specifically a built in file with encoding 'UTF-8' will still try
 
790
        # to encode using ascii.
 
791
        new_encoding = osutils.get_terminal_encoding()
 
792
        codec = codecs.lookup(new_encoding)
 
793
        encode = codec.encode
 
794
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
 
795
        #                so should swap to the plain codecs.StreamWriter
 
796
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
 
797
            "backslashreplace")
 
798
        stream.encoding = new_encoding
 
799
        self.stream = stream
 
800
        self.descriptions = descriptions
 
801
        self.verbosity = verbosity
 
802
        self._bench_history = bench_history
 
803
        self._strict = strict
 
804
        self._result_decorators = result_decorators or []
 
805
 
 
806
    def run(self, test):
 
807
        "Run the given test case or test suite."
 
808
        if self.verbosity == 1:
 
809
            result_class = TextTestResult
 
810
        elif self.verbosity >= 2:
 
811
            result_class = VerboseTestResult
 
812
        original_result = result_class(self.stream,
 
813
                              self.descriptions,
 
814
                              self.verbosity,
 
815
                              bench_history=self._bench_history,
 
816
                              strict=self._strict,
 
817
                              )
 
818
        # Signal to result objects that look at stop early policy to stop,
 
819
        original_result.stop_early = self.stop_on_failure
 
820
        result = original_result
 
821
        for decorator in self._result_decorators:
 
822
            result = decorator(result)
 
823
            result.stop_early = self.stop_on_failure
 
824
        result.startTestRun()
 
825
        try:
 
826
            test.run(result)
 
827
        finally:
 
828
            result.stopTestRun()
 
829
        # higher level code uses our extended protocol to determine
 
830
        # what exit code to give.
 
831
        return original_result
210
832
 
211
833
 
212
834
def iter_suite_tests(suite):
213
835
    """Return all tests in a suite, recursing through nested suites"""
214
 
    for item in suite._tests:
215
 
        if isinstance(item, unittest.TestCase):
216
 
            yield item
217
 
        elif isinstance(item, unittest.TestSuite):
 
836
    if isinstance(suite, unittest.TestCase):
 
837
        yield suite
 
838
    elif isinstance(suite, unittest.TestSuite):
 
839
        for item in suite:
218
840
            for r in iter_suite_tests(item):
219
841
                yield r
220
 
        else:
221
 
            raise Exception('unknown object %r inside test suite %r'
222
 
                            % (item, suite))
223
 
 
224
 
 
225
 
class TestSkipped(Exception):
226
 
    """Indicates that a test was intentionally skipped, rather than failing."""
227
 
    # XXX: Not used yet
228
 
 
229
 
 
230
 
class CommandFailed(Exception):
231
 
    pass
232
 
 
233
 
class TestCase(unittest.TestCase):
234
 
    """Base class for bzr unit tests.
235
 
    
236
 
    Tests that need access to disk resources should subclass 
 
842
    else:
 
843
        raise Exception('unknown type %r for object %r'
 
844
                        % (type(suite), suite))
 
845
 
 
846
 
 
847
TestSkipped = testtools.testcase.TestSkipped
 
848
 
 
849
 
 
850
class TestNotApplicable(TestSkipped):
 
851
    """A test is not applicable to the situation where it was run.
 
852
 
 
853
    This is only normally raised by parameterized tests, if they find that
 
854
    the instance they're constructed upon does not support one aspect
 
855
    of its interface.
 
856
    """
 
857
 
 
858
 
 
859
# traceback._some_str fails to format exceptions that have the default
 
860
# __str__ which does an implicit ascii conversion. However, repr() on those
 
861
# objects works, for all that its not quite what the doctor may have ordered.
 
862
def _clever_some_str(value):
 
863
    try:
 
864
        return str(value)
 
865
    except:
 
866
        try:
 
867
            return repr(value).replace('\\n', '\n')
 
868
        except:
 
869
            return '<unprintable %s object>' % type(value).__name__
 
870
 
 
871
traceback._some_str = _clever_some_str
 
872
 
 
873
 
 
874
# deprecated - use self.knownFailure(), or self.expectFailure.
 
875
KnownFailure = testtools.testcase._ExpectedFailure
 
876
 
 
877
 
 
878
class UnavailableFeature(Exception):
 
879
    """A feature required for this test was not available.
 
880
 
 
881
    This can be considered a specialised form of SkippedTest.
 
882
 
 
883
    The feature should be used to construct the exception.
 
884
    """
 
885
 
 
886
 
 
887
class StringIOWrapper(ui_testing.BytesIOWithEncoding):
 
888
 
 
889
    @symbol_versioning.deprecated_method(
 
890
        symbol_versioning.deprecated_in((3, 0)))
 
891
    def __init__(self, s=None):
 
892
        super(StringIOWrapper, self).__init__(s)
 
893
 
 
894
 
 
895
TestUIFactory = ui_testing.TestUIFactory
 
896
 
 
897
 
 
898
def isolated_doctest_setUp(test):
 
899
    override_os_environ(test)
 
900
    osutils.set_or_unset_env('BRZ_HOME', '/nonexistent')
 
901
    test._orig_ui_factory = ui.ui_factory
 
902
    ui.ui_factory = ui.SilentUIFactory()
 
903
 
 
904
 
 
905
def isolated_doctest_tearDown(test):
 
906
    restore_os_environ(test)
 
907
    ui.ui_factory = test._orig_ui_factory
 
908
 
 
909
 
 
910
def IsolatedDocTestSuite(*args, **kwargs):
 
911
    """Overrides doctest.DocTestSuite to handle isolation.
 
912
 
 
913
    The method is really a factory and users are expected to use it as such.
 
914
    """
 
915
 
 
916
    kwargs['setUp'] = isolated_doctest_setUp
 
917
    kwargs['tearDown'] = isolated_doctest_tearDown
 
918
    return doctest.DocTestSuite(*args, **kwargs)
 
919
 
 
920
 
 
921
class TestCase(testtools.TestCase):
 
922
    """Base class for brz unit tests.
 
923
 
 
924
    Tests that need access to disk resources should subclass
237
925
    TestCaseInTempDir not TestCase.
238
926
 
239
927
    Error and debug log messages are redirected from their usual
241
929
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
242
930
    so that it can also capture file IO.  When the test completes this file
243
931
    is read into memory and removed from disk.
244
 
       
 
932
 
245
933
    There are also convenience functions to invoke bzr's command-line
246
 
    routine, and to build and check bzr trees.
247
 
   
 
934
    routine, and to build and check brz trees.
 
935
 
248
936
    In addition to the usual method of overriding tearDown(), this class also
249
 
    allows subclasses to register functions into the _cleanups list, which is
 
937
    allows subclasses to register cleanup functions via addCleanup, which are
250
938
    run in order as the object is torn down.  It's less likely this will be
251
939
    accidentally overlooked.
252
940
    """
253
941
 
254
 
    BZRPATH = 'bzr'
255
 
    _log_file_name = None
256
 
    _log_contents = ''
 
942
    _log_file = None
 
943
    # record lsprof data when performing benchmark calls.
 
944
    _gather_lsprof_in_benchmarks = False
257
945
 
258
946
    def __init__(self, methodName='testMethod'):
259
947
        super(TestCase, self).__init__(methodName)
260
 
        self._cleanups = []
 
948
        self._directory_isolation = True
 
949
        self.exception_handlers.insert(0,
 
950
            (UnavailableFeature, self._do_unsupported_or_skip))
 
951
        self.exception_handlers.insert(0,
 
952
            (TestNotApplicable, self._do_not_applicable))
261
953
 
262
954
    def setUp(self):
263
 
        unittest.TestCase.setUp(self)
 
955
        super(TestCase, self).setUp()
 
956
 
 
957
        # At this point we're still accessing the config files in $BRZ_HOME (as
 
958
        # set by the user running selftest).
 
959
        timeout = config.GlobalStack().get('selftest.timeout')
 
960
        if timeout:
 
961
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
962
            timeout_fixture.setUp()
 
963
            self.addCleanup(timeout_fixture.cleanUp)
 
964
 
 
965
        for feature in getattr(self, '_test_needs_features', []):
 
966
            self.requireFeature(feature)
264
967
        self._cleanEnvironment()
265
 
        bzrlib.trace.disable_default_logging()
 
968
 
 
969
        self.overrideAttr(breezy.get_global_state(), 'cmdline_overrides',
 
970
                          config.CommandLineStore())
 
971
 
 
972
        self._silenceUI()
266
973
        self._startLogFile()
 
974
        self._benchcalls = []
 
975
        self._benchtime = None
 
976
        self._clear_hooks()
 
977
        self._track_transports()
 
978
        self._track_locks()
 
979
        self._clear_debug_flags()
 
980
        # Isolate global verbosity level, to make sure it's reproducible
 
981
        # between tests.  We should get rid of this altogether: bug 656694. --
 
982
        # mbp 20101008
 
983
        self.overrideAttr(breezy.trace, '_verbosity_level', 0)
 
984
        self._log_files = set()
 
985
        # Each key in the ``_counters`` dict holds a value for a different
 
986
        # counter. When the test ends, addDetail() should be used to output the
 
987
        # counter values. This happens in install_counter_hook().
 
988
        self._counters = {}
 
989
        if 'config_stats' in selftest_debug_flags:
 
990
            self._install_config_stats_hooks()
 
991
        # Do not use i18n for tests (unless the test reverses this)
 
992
        i18n.disable_i18n()
 
993
 
 
994
    def debug(self):
 
995
        # debug a frame up.
 
996
        import pdb
 
997
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
998
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
999
                ).set_trace(sys._getframe().f_back)
 
1000
 
 
1001
    def discardDetail(self, name):
 
1002
        """Extend the addDetail, getDetails api so we can remove a detail.
 
1003
 
 
1004
        eg. brz always adds the 'log' detail at startup, but we don't want to
 
1005
        include it for skipped, xfail, etc tests.
 
1006
 
 
1007
        It is safe to call this for a detail that doesn't exist, in case this
 
1008
        gets called multiple times.
 
1009
        """
 
1010
        # We cheat. details is stored in __details which means we shouldn't
 
1011
        # touch it. but getDetails() returns the dict directly, so we can
 
1012
        # mutate it.
 
1013
        details = self.getDetails()
 
1014
        if name in details:
 
1015
            del details[name]
 
1016
 
 
1017
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1018
        """Install a counting hook.
 
1019
 
 
1020
        Any hook can be counted as long as it doesn't need to return a value.
 
1021
 
 
1022
        :param hooks: Where the hook should be installed.
 
1023
 
 
1024
        :param name: The hook name that will be counted.
 
1025
 
 
1026
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1027
            to ``name``.
 
1028
        """
 
1029
        _counters = self._counters # Avoid closing over self
 
1030
        if counter_name is None:
 
1031
            counter_name = name
 
1032
        if counter_name in _counters:
 
1033
            raise AssertionError('%s is already used as a counter name'
 
1034
                                  % (counter_name,))
 
1035
        _counters[counter_name] = 0
 
1036
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1037
            lambda: [b'%d' % (_counters[counter_name],)]))
 
1038
        def increment_counter(*args, **kwargs):
 
1039
            _counters[counter_name] += 1
 
1040
        label = 'count %s calls' % (counter_name,)
 
1041
        hooks.install_named_hook(name, increment_counter, label)
 
1042
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1043
 
 
1044
    def _install_config_stats_hooks(self):
 
1045
        """Install config hooks to count hook calls.
 
1046
 
 
1047
        """
 
1048
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1049
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1050
                                       'config.%s' % (hook_name,))
 
1051
 
 
1052
        # The OldConfigHooks are private and need special handling to protect
 
1053
        # against recursive tests (tests that run other tests), so we just do
 
1054
        # manually what registering them into _builtin_known_hooks will provide
 
1055
        # us.
 
1056
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1057
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1058
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1059
                                      'old_config.%s' % (hook_name,))
 
1060
 
 
1061
    def _clear_debug_flags(self):
 
1062
        """Prevent externally set debug flags affecting tests.
 
1063
 
 
1064
        Tests that want to use debug flags can just set them in the
 
1065
        debug_flags set during setup/teardown.
 
1066
        """
 
1067
        # Start with a copy of the current debug flags we can safely modify.
 
1068
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
1069
        if 'allow_debug' not in selftest_debug_flags:
 
1070
            debug.debug_flags.clear()
 
1071
        if 'disable_lock_checks' not in selftest_debug_flags:
 
1072
            debug.debug_flags.add('strict_locks')
 
1073
 
 
1074
    def _clear_hooks(self):
 
1075
        # prevent hooks affecting tests
 
1076
        known_hooks = hooks.known_hooks
 
1077
        self._preserved_hooks = {}
 
1078
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1079
            current_hooks = getattr(parent, name)
 
1080
            self._preserved_hooks[parent] = (name, current_hooks)
 
1081
        self._preserved_lazy_hooks = hooks._lazy_hooks
 
1082
        hooks._lazy_hooks = {}
 
1083
        self.addCleanup(self._restoreHooks)
 
1084
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1085
            factory = known_hooks.get(key)
 
1086
            setattr(parent, name, factory())
 
1087
        # this hook should always be installed
 
1088
        request._install_hook()
 
1089
 
 
1090
    def disable_directory_isolation(self):
 
1091
        """Turn off directory isolation checks."""
 
1092
        self._directory_isolation = False
 
1093
 
 
1094
    def enable_directory_isolation(self):
 
1095
        """Enable directory isolation checks."""
 
1096
        self._directory_isolation = True
 
1097
 
 
1098
    def _silenceUI(self):
 
1099
        """Turn off UI for duration of test"""
 
1100
        # by default the UI is off; tests can turn it on if they want it.
 
1101
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
1102
 
 
1103
    def _check_locks(self):
 
1104
        """Check that all lock take/release actions have been paired."""
 
1105
        # We always check for mismatched locks. If a mismatch is found, we
 
1106
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
1107
        # case we just print a warning.
 
1108
        # unhook:
 
1109
        acquired_locks = [lock for action, lock in self._lock_actions
 
1110
                          if action == 'acquired']
 
1111
        released_locks = [lock for action, lock in self._lock_actions
 
1112
                          if action == 'released']
 
1113
        broken_locks = [lock for action, lock in self._lock_actions
 
1114
                        if action == 'broken']
 
1115
        # trivially, given the tests for lock acquistion and release, if we
 
1116
        # have as many in each list, it should be ok. Some lock tests also
 
1117
        # break some locks on purpose and should be taken into account by
 
1118
        # considering that breaking a lock is just a dirty way of releasing it.
 
1119
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
1120
            message = (
 
1121
                'Different number of acquired and '
 
1122
                'released or broken locks.\n'
 
1123
                'acquired=%s\n'
 
1124
                'released=%s\n'
 
1125
                'broken=%s\n' %
 
1126
                (acquired_locks, released_locks, broken_locks))
 
1127
            if not self._lock_check_thorough:
 
1128
                # Rather than fail, just warn
 
1129
                print("Broken test %s: %s" % (self, message))
 
1130
                return
 
1131
            self.fail(message)
 
1132
 
 
1133
    def _track_locks(self):
 
1134
        """Track lock activity during tests."""
 
1135
        self._lock_actions = []
 
1136
        if 'disable_lock_checks' in selftest_debug_flags:
 
1137
            self._lock_check_thorough = False
 
1138
        else:
 
1139
            self._lock_check_thorough = True
 
1140
 
 
1141
        self.addCleanup(self._check_locks)
 
1142
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
1143
                                                self._lock_acquired, None)
 
1144
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
1145
                                                self._lock_released, None)
 
1146
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
1147
                                                self._lock_broken, None)
 
1148
 
 
1149
    def _lock_acquired(self, result):
 
1150
        self._lock_actions.append(('acquired', result))
 
1151
 
 
1152
    def _lock_released(self, result):
 
1153
        self._lock_actions.append(('released', result))
 
1154
 
 
1155
    def _lock_broken(self, result):
 
1156
        self._lock_actions.append(('broken', result))
 
1157
 
 
1158
    def permit_dir(self, name):
 
1159
        """Permit a directory to be used by this test. See permit_url."""
 
1160
        name_transport = _mod_transport.get_transport_from_path(name)
 
1161
        self.permit_url(name)
 
1162
        self.permit_url(name_transport.base)
 
1163
 
 
1164
    def permit_url(self, url):
 
1165
        """Declare that url is an ok url to use in this test.
 
1166
 
 
1167
        Do this for memory transports, temporary test directory etc.
 
1168
 
 
1169
        Do not do this for the current working directory, /tmp, or any other
 
1170
        preexisting non isolated url.
 
1171
        """
 
1172
        if not url.endswith('/'):
 
1173
            url += '/'
 
1174
        self._bzr_selftest_roots.append(url)
 
1175
 
 
1176
    def permit_source_tree_branch_repo(self):
 
1177
        """Permit the source tree brz is running from to be opened.
 
1178
 
 
1179
        Some code such as breezy.version attempts to read from the brz branch
 
1180
        that brz is executing from (if any). This method permits that directory
 
1181
        to be used in the test suite.
 
1182
        """
 
1183
        path = self.get_source_path()
 
1184
        self.record_directory_isolation()
 
1185
        try:
 
1186
            try:
 
1187
                workingtree.WorkingTree.open(path)
 
1188
            except (errors.NotBranchError, errors.NoWorkingTree):
 
1189
                raise TestSkipped('Needs a working tree of brz sources')
 
1190
        finally:
 
1191
            self.enable_directory_isolation()
 
1192
 
 
1193
    def _preopen_isolate_transport(self, transport):
 
1194
        """Check that all transport openings are done in the test work area."""
 
1195
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
1196
            # Unwrap pathfiltered transports
 
1197
            transport = transport.server.backing_transport.clone(
 
1198
                transport._filter('.'))
 
1199
        url = transport.base
 
1200
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1201
        # urls it is given by prepending readonly+. This is appropriate as the
 
1202
        # client shouldn't know that the server is readonly (or not readonly).
 
1203
        # We could register all servers twice, with readonly+ prepending, but
 
1204
        # that makes for a long list; this is about the same but easier to
 
1205
        # read.
 
1206
        if url.startswith('readonly+'):
 
1207
            url = url[len('readonly+'):]
 
1208
        self._preopen_isolate_url(url)
 
1209
 
 
1210
    def _preopen_isolate_url(self, url):
 
1211
        if not self._directory_isolation:
 
1212
            return
 
1213
        if self._directory_isolation == 'record':
 
1214
            self._bzr_selftest_roots.append(url)
 
1215
            return
 
1216
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1217
        # from working unless they are explicitly granted permission. We then
 
1218
        # depend on the code that sets up test transports to check that they are
 
1219
        # appropriately isolated and enable their use by calling
 
1220
        # self.permit_transport()
 
1221
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1222
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1223
                % (url, self._bzr_selftest_roots))
 
1224
 
 
1225
    def record_directory_isolation(self):
 
1226
        """Gather accessed directories to permit later access.
 
1227
 
 
1228
        This is used for tests that access the branch brz is running from.
 
1229
        """
 
1230
        self._directory_isolation = "record"
 
1231
 
 
1232
    def start_server(self, transport_server, backing_server=None):
 
1233
        """Start transport_server for this test.
 
1234
 
 
1235
        This starts the server, registers a cleanup for it and permits the
 
1236
        server's urls to be used.
 
1237
        """
 
1238
        if backing_server is None:
 
1239
            transport_server.start_server()
 
1240
        else:
 
1241
            transport_server.start_server(backing_server)
 
1242
        self.addCleanup(transport_server.stop_server)
 
1243
        # Obtain a real transport because if the server supplies a password, it
 
1244
        # will be hidden from the base on the client side.
 
1245
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1246
        # Some transport servers effectively chroot the backing transport;
 
1247
        # others like SFTPServer don't - users of the transport can walk up the
 
1248
        # transport to read the entire backing transport. This wouldn't matter
 
1249
        # except that the workdir tests are given - and that they expect the
 
1250
        # server's url to point at - is one directory under the safety net. So
 
1251
        # Branch operations into the transport will attempt to walk up one
 
1252
        # directory. Chrooting all servers would avoid this but also mean that
 
1253
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1254
        # getting the test framework to start the server with a backing server
 
1255
        # at the actual safety net directory would work too, but this then
 
1256
        # means that the self.get_url/self.get_transport methods would need
 
1257
        # to transform all their results. On balance its cleaner to handle it
 
1258
        # here, and permit a higher url when we have one of these transports.
 
1259
        if t.base.endswith('/work/'):
 
1260
            # we have safety net/test root/work
 
1261
            t = t.clone('../..')
 
1262
        elif isinstance(transport_server,
 
1263
                        test_server.SmartTCPServer_for_testing):
 
1264
            # The smart server adds a path similar to work, which is traversed
 
1265
            # up from by the client. But the server is chrooted - the actual
 
1266
            # backing transport is not escaped from, and VFS requests to the
 
1267
            # root will error (because they try to escape the chroot).
 
1268
            t2 = t.clone('..')
 
1269
            while t2.base != t.base:
 
1270
                t = t2
 
1271
                t2 = t.clone('..')
 
1272
        self.permit_url(t.base)
 
1273
 
 
1274
    def _track_transports(self):
 
1275
        """Install checks for transport usage."""
 
1276
        # TestCase has no safe place it can write to.
 
1277
        self._bzr_selftest_roots = []
 
1278
        # Currently the easiest way to be sure that nothing is going on is to
 
1279
        # hook into brz dir opening. This leaves a small window of error for
 
1280
        # transport tests, but they are well known, and we can improve on this
 
1281
        # step.
 
1282
        controldir.ControlDir.hooks.install_named_hook("pre_open",
 
1283
            self._preopen_isolate_transport, "Check brz directories are safe.")
267
1284
 
268
1285
    def _ndiff_strings(self, a, b):
269
1286
        """Return ndiff between two strings containing lines.
270
 
        
 
1287
 
271
1288
        A trailing newline is added if missing to make the strings
272
1289
        print properly."""
273
1290
        if b and b[-1] != '\n':
280
1297
                                  charjunk=lambda x: False)
281
1298
        return ''.join(difflines)
282
1299
 
 
1300
    def assertEqual(self, a, b, message=''):
 
1301
        try:
 
1302
            if a == b:
 
1303
                return
 
1304
        except UnicodeError as e:
 
1305
            # If we can't compare without getting a UnicodeError, then
 
1306
            # obviously they are different
 
1307
            trace.mutter('UnicodeError: %s', e)
 
1308
        if message:
 
1309
            message += '\n'
 
1310
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1311
            % (message,
 
1312
               pprint.pformat(a), pprint.pformat(b)))
 
1313
 
 
1314
    # FIXME: This is deprecated in unittest2 but plugins may still use it so we
 
1315
    # need a deprecation period for them -- vila 2016-02-01
 
1316
    assertEquals = assertEqual
 
1317
 
283
1318
    def assertEqualDiff(self, a, b, message=None):
284
1319
        """Assert two texts are equal, if not raise an exception.
285
 
        
286
 
        This is intended for use with multi-line strings where it can 
 
1320
 
 
1321
        This is intended for use with multi-line strings where it can
287
1322
        be hard to find the differences by eye.
288
1323
        """
289
 
        # TODO: perhaps override assertEquals to call this for strings?
 
1324
        # TODO: perhaps override assertEqual to call this for strings?
290
1325
        if a == b:
291
1326
            return
292
1327
        if message is None:
293
1328
            message = "texts not equal:\n"
294
 
        raise AssertionError(message + 
295
 
                             self._ndiff_strings(a, b))      
296
 
        
 
1329
        if a + '\n' == b:
 
1330
            message = 'first string is missing a final newline.\n'
 
1331
        if a == b + '\n':
 
1332
            message = 'second string is missing a final newline.\n'
 
1333
        raise AssertionError(message +
 
1334
                             self._ndiff_strings(a, b))
 
1335
 
297
1336
    def assertEqualMode(self, mode, mode_test):
298
1337
        self.assertEqual(mode, mode_test,
299
1338
                         'mode mismatch %o != %o' % (mode, mode_test))
300
1339
 
 
1340
    def assertEqualStat(self, expected, actual):
 
1341
        """assert that expected and actual are the same stat result.
 
1342
 
 
1343
        :param expected: A stat result.
 
1344
        :param actual: A stat result.
 
1345
        :raises AssertionError: If the expected and actual stat values differ
 
1346
            other than by atime.
 
1347
        """
 
1348
        self.assertEqual(expected.st_size, actual.st_size,
 
1349
                         'st_size did not match')
 
1350
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1351
                         'st_mtime did not match')
 
1352
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1353
                         'st_ctime did not match')
 
1354
        if sys.platform == 'win32':
 
1355
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1356
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1357
            # odd. We just force it to always be 0 to avoid any problems.
 
1358
            self.assertEqual(0, expected.st_dev)
 
1359
            self.assertEqual(0, actual.st_dev)
 
1360
            self.assertEqual(0, expected.st_ino)
 
1361
            self.assertEqual(0, actual.st_ino)
 
1362
        else:
 
1363
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1364
                             'st_dev did not match')
 
1365
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1366
                             'st_ino did not match')
 
1367
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1368
                         'st_mode did not match')
 
1369
 
 
1370
    def assertLength(self, length, obj_with_len):
 
1371
        """Assert that obj_with_len is of length length."""
 
1372
        if len(obj_with_len) != length:
 
1373
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1374
                length, len(obj_with_len), obj_with_len))
 
1375
 
 
1376
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1377
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1378
        """
 
1379
        captured = []
 
1380
        orig_log_exception_quietly = trace.log_exception_quietly
 
1381
        try:
 
1382
            def capture():
 
1383
                orig_log_exception_quietly()
 
1384
                captured.append(sys.exc_info()[1])
 
1385
            trace.log_exception_quietly = capture
 
1386
            func(*args, **kwargs)
 
1387
        finally:
 
1388
            trace.log_exception_quietly = orig_log_exception_quietly
 
1389
        self.assertLength(1, captured)
 
1390
        err = captured[0]
 
1391
        self.assertIsInstance(err, exception_class)
 
1392
        return err
 
1393
 
 
1394
    def assertPositive(self, val):
 
1395
        """Assert that val is greater than 0."""
 
1396
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1397
 
 
1398
    def assertNegative(self, val):
 
1399
        """Assert that val is less than 0."""
 
1400
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1401
 
301
1402
    def assertStartsWith(self, s, prefix):
302
1403
        if not s.startswith(prefix):
303
1404
            raise AssertionError('string %r does not start with %r' % (s, prefix))
304
1405
 
305
1406
    def assertEndsWith(self, s, suffix):
306
 
        if not s.endswith(prefix):
 
1407
        """Asserts that s ends with suffix."""
 
1408
        if not s.endswith(suffix):
307
1409
            raise AssertionError('string %r does not end with %r' % (s, suffix))
308
1410
 
309
 
    def assertContainsRe(self, haystack, needle_re):
 
1411
    def assertContainsRe(self, haystack, needle_re, flags=0):
310
1412
        """Assert that a contains something matching a regular expression."""
311
 
        if not re.search(needle_re, haystack):
312
 
            raise AssertionError('pattern "%s" not found in "%s"'
 
1413
        if not re.search(needle_re, haystack, flags):
 
1414
            if '\n' in haystack or len(haystack) > 60:
 
1415
                # a long string, format it in a more readable way
 
1416
                raise AssertionError(
 
1417
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1418
                        % (needle_re, haystack))
 
1419
            else:
 
1420
                raise AssertionError('pattern "%s" not found in "%s"'
 
1421
                        % (needle_re, haystack))
 
1422
 
 
1423
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1424
        """Assert that a does not match a regular expression"""
 
1425
        if re.search(needle_re, haystack, flags):
 
1426
            raise AssertionError('pattern "%s" found in "%s"'
313
1427
                    % (needle_re, haystack))
314
1428
 
 
1429
    def assertContainsString(self, haystack, needle):
 
1430
        if haystack.find(needle) == -1:
 
1431
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1432
 
 
1433
    def assertNotContainsString(self, haystack, needle):
 
1434
        if haystack.find(needle) != -1:
 
1435
            self.fail("string %r found in '''%s'''" % (needle, haystack))
 
1436
 
315
1437
    def assertSubset(self, sublist, superlist):
316
1438
        """Assert that every entry in sublist is present in superlist."""
317
 
        missing = []
318
 
        for entry in sublist:
319
 
            if entry not in superlist:
320
 
                missing.append(entry)
 
1439
        missing = set(sublist) - set(superlist)
321
1440
        if len(missing) > 0:
322
 
            raise AssertionError("value(s) %r not present in container %r" % 
 
1441
            raise AssertionError("value(s) %r not present in container %r" %
323
1442
                                 (missing, superlist))
324
1443
 
325
 
    def assertIs(self, left, right):
 
1444
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1445
        """Fail unless excClass is raised when the iterator from func is used.
 
1446
 
 
1447
        Many functions can return generators this makes sure
 
1448
        to wrap them in a list() call to make sure the whole generator
 
1449
        is run, and that the proper exception is raised.
 
1450
        """
 
1451
        try:
 
1452
            list(func(*args, **kwargs))
 
1453
        except excClass as e:
 
1454
            return e
 
1455
        else:
 
1456
            if getattr(excClass, '__name__', None) is not None:
 
1457
                excName = excClass.__name__
 
1458
            else:
 
1459
                excName = str(excClass)
 
1460
            raise self.failureException("%s not raised" % excName)
 
1461
 
 
1462
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1463
        """Assert that a callable raises a particular exception.
 
1464
 
 
1465
        :param excClass: As for the except statement, this may be either an
 
1466
            exception class, or a tuple of classes.
 
1467
        :param callableObj: A callable, will be passed ``*args`` and
 
1468
            ``**kwargs``.
 
1469
 
 
1470
        Returns the exception so that you can examine it.
 
1471
        """
 
1472
        try:
 
1473
            callableObj(*args, **kwargs)
 
1474
        except excClass as e:
 
1475
            return e
 
1476
        else:
 
1477
            if getattr(excClass, '__name__', None) is not None:
 
1478
                excName = excClass.__name__
 
1479
            else:
 
1480
                # probably a tuple
 
1481
                excName = str(excClass)
 
1482
            raise self.failureException("%s not raised" % excName)
 
1483
 
 
1484
    def assertIs(self, left, right, message=None):
326
1485
        if not (left is right):
327
 
            raise AssertionError("%r is not %r." % (left, right))
 
1486
            if message is not None:
 
1487
                raise AssertionError(message)
 
1488
            else:
 
1489
                raise AssertionError("%r is not %r." % (left, right))
 
1490
 
 
1491
    def assertIsNot(self, left, right, message=None):
 
1492
        if (left is right):
 
1493
            if message is not None:
 
1494
                raise AssertionError(message)
 
1495
            else:
 
1496
                raise AssertionError("%r is %r." % (left, right))
328
1497
 
329
1498
    def assertTransportMode(self, transport, path, mode):
330
 
        """Fail if a path does not have mode mode.
331
 
        
 
1499
        """Fail if a path does not have mode "mode".
 
1500
 
332
1501
        If modes are not supported on this transport, the assertion is ignored.
333
1502
        """
334
1503
        if not transport._can_roundtrip_unix_modebits():
336
1505
        path_stat = transport.stat(path)
337
1506
        actual_mode = stat.S_IMODE(path_stat.st_mode)
338
1507
        self.assertEqual(mode, actual_mode,
339
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
340
 
 
341
 
    def assertIsInstance(self, obj, kls):
342
 
        """Fail if obj is not an instance of kls"""
 
1508
                         'mode of %r incorrect (%s != %s)'
 
1509
                         % (path, oct(mode), oct(actual_mode)))
 
1510
 
 
1511
    def assertIsSameRealPath(self, path1, path2):
 
1512
        """Fail if path1 and path2 points to different files"""
 
1513
        self.assertEqual(osutils.realpath(path1),
 
1514
                         osutils.realpath(path2),
 
1515
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1516
 
 
1517
    def assertIsInstance(self, obj, kls, msg=None):
 
1518
        """Fail if obj is not an instance of kls
 
1519
        
 
1520
        :param msg: Supplementary message to show if the assertion fails.
 
1521
        """
343
1522
        if not isinstance(obj, kls):
344
 
            self.fail("%r is not an instance of %s" % (obj, kls))
 
1523
            m = "%r is an instance of %s rather than %s" % (
 
1524
                obj, obj.__class__, kls)
 
1525
            if msg:
 
1526
                m += ": " + msg
 
1527
            self.fail(m)
 
1528
 
 
1529
    def assertFileEqual(self, content, path):
 
1530
        """Fail if path does not contain 'content'."""
 
1531
        self.assertPathExists(path)
 
1532
        with open(path, 'rb') as f:
 
1533
            s = f.read()
 
1534
        self.assertEqualDiff(content, s)
 
1535
 
 
1536
    def assertDocstring(self, expected_docstring, obj):
 
1537
        """Fail if obj does not have expected_docstring"""
 
1538
        if __doc__ is None:
 
1539
            # With -OO the docstring should be None instead
 
1540
            self.assertIs(obj.__doc__, None)
 
1541
        else:
 
1542
            self.assertEqual(expected_docstring, obj.__doc__)
 
1543
 
 
1544
    def assertPathExists(self, path):
 
1545
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1546
        if not isinstance(path, (str, text_type)):
 
1547
            for p in path:
 
1548
                self.assertPathExists(p)
 
1549
        else:
 
1550
            self.assertTrue(osutils.lexists(path),
 
1551
                path + " does not exist")
 
1552
 
 
1553
    def assertPathDoesNotExist(self, path):
 
1554
        """Fail if path or paths, which may be abs or relative, exist."""
 
1555
        if not isinstance(path, (str, text_type)):
 
1556
            for p in path:
 
1557
                self.assertPathDoesNotExist(p)
 
1558
        else:
 
1559
            self.assertFalse(osutils.lexists(path),
 
1560
                path + " exists")
 
1561
 
 
1562
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1563
        """A helper for callDeprecated and applyDeprecated.
 
1564
 
 
1565
        :param a_callable: A callable to call.
 
1566
        :param args: The positional arguments for the callable
 
1567
        :param kwargs: The keyword arguments for the callable
 
1568
        :return: A tuple (warnings, result). result is the result of calling
 
1569
            a_callable(``*args``, ``**kwargs``).
 
1570
        """
 
1571
        local_warnings = []
 
1572
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1573
            # we've hooked into a deprecation specific callpath,
 
1574
            # only deprecations should getting sent via it.
 
1575
            self.assertEqual(cls, DeprecationWarning)
 
1576
            local_warnings.append(msg)
 
1577
        original_warning_method = symbol_versioning.warn
 
1578
        symbol_versioning.set_warning_method(capture_warnings)
 
1579
        try:
 
1580
            result = a_callable(*args, **kwargs)
 
1581
        finally:
 
1582
            symbol_versioning.set_warning_method(original_warning_method)
 
1583
        return (local_warnings, result)
 
1584
 
 
1585
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1586
        """Call a deprecated callable without warning the user.
 
1587
 
 
1588
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1589
        not other callers that go direct to the warning module.
 
1590
 
 
1591
        To test that a deprecated method raises an error, do something like
 
1592
        this (remember that both assertRaises and applyDeprecated delays *args
 
1593
        and **kwargs passing)::
 
1594
 
 
1595
            self.assertRaises(errors.ReservedId,
 
1596
                self.applyDeprecated,
 
1597
                deprecated_in((1, 5, 0)),
 
1598
                br.append_revision,
 
1599
                'current:')
 
1600
 
 
1601
        :param deprecation_format: The deprecation format that the callable
 
1602
            should have been deprecated with. This is the same type as the
 
1603
            parameter to deprecated_method/deprecated_function. If the
 
1604
            callable is not deprecated with this format, an assertion error
 
1605
            will be raised.
 
1606
        :param a_callable: A callable to call. This may be a bound method or
 
1607
            a regular function. It will be called with ``*args`` and
 
1608
            ``**kwargs``.
 
1609
        :param args: The positional arguments for the callable
 
1610
        :param kwargs: The keyword arguments for the callable
 
1611
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1612
        """
 
1613
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1614
            *args, **kwargs)
 
1615
        expected_first_warning = symbol_versioning.deprecation_string(
 
1616
            a_callable, deprecation_format)
 
1617
        if len(call_warnings) == 0:
 
1618
            self.fail("No deprecation warning generated by call to %s" %
 
1619
                a_callable)
 
1620
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1621
        return result
 
1622
 
 
1623
    def callCatchWarnings(self, fn, *args, **kw):
 
1624
        """Call a callable that raises python warnings.
 
1625
 
 
1626
        The caller's responsible for examining the returned warnings.
 
1627
 
 
1628
        If the callable raises an exception, the exception is not
 
1629
        caught and propagates up to the caller.  In that case, the list
 
1630
        of warnings is not available.
 
1631
 
 
1632
        :returns: ([warning_object, ...], fn_result)
 
1633
        """
 
1634
        # XXX: This is not perfect, because it completely overrides the
 
1635
        # warnings filters, and some code may depend on suppressing particular
 
1636
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1637
        # though.  -- Andrew, 20071062
 
1638
        wlist = []
 
1639
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1640
            # despite the name, 'message' is normally(?) a Warning subclass
 
1641
            # instance
 
1642
            wlist.append(message)
 
1643
        saved_showwarning = warnings.showwarning
 
1644
        saved_filters = warnings.filters
 
1645
        try:
 
1646
            warnings.showwarning = _catcher
 
1647
            warnings.filters = []
 
1648
            result = fn(*args, **kw)
 
1649
        finally:
 
1650
            warnings.showwarning = saved_showwarning
 
1651
            warnings.filters = saved_filters
 
1652
        return wlist, result
 
1653
 
 
1654
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1655
        """Assert that a callable is deprecated in a particular way.
 
1656
 
 
1657
        This is a very precise test for unusual requirements. The
 
1658
        applyDeprecated helper function is probably more suited for most tests
 
1659
        as it allows you to simply specify the deprecation format being used
 
1660
        and will ensure that that is issued for the function being called.
 
1661
 
 
1662
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1663
        not other callers that go direct to the warning module.  To catch
 
1664
        general warnings, use callCatchWarnings.
 
1665
 
 
1666
        :param expected: a list of the deprecation warnings expected, in order
 
1667
        :param callable: The callable to call
 
1668
        :param args: The positional arguments for the callable
 
1669
        :param kwargs: The keyword arguments for the callable
 
1670
        """
 
1671
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1672
            *args, **kwargs)
 
1673
        self.assertEqual(expected, call_warnings)
 
1674
        return result
345
1675
 
346
1676
    def _startLogFile(self):
347
 
        """Send bzr and test log messages to a temporary file.
348
 
 
349
 
        The file is removed as the test is torn down.
350
 
        """
351
 
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
352
 
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
353
 
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
354
 
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
355
 
        self._log_file_name = name
 
1677
        """Setup a in-memory target for bzr and testcase log messages"""
 
1678
        pseudo_log_file = BytesIO()
 
1679
        def _get_log_contents_for_weird_testtools_api():
 
1680
            return [pseudo_log_file.getvalue().decode(
 
1681
                "utf-8", "replace").encode("utf-8")]
 
1682
        self.addDetail("log", content.Content(content.ContentType("text",
 
1683
            "plain", {"charset": "utf8"}),
 
1684
            _get_log_contents_for_weird_testtools_api))
 
1685
        self._log_file = pseudo_log_file
 
1686
        self._log_memento = trace.push_log_file(self._log_file)
356
1687
        self.addCleanup(self._finishLogFile)
357
1688
 
358
1689
    def _finishLogFile(self):
359
 
        """Finished with the log file.
360
 
 
361
 
        Read contents into memory, close, and delete.
362
 
        """
363
 
        bzrlib.trace.disable_test_log(self._log_nonce)
364
 
        self._log_file.seek(0)
365
 
        self._log_contents = self._log_file.read()
366
 
        self._log_file.close()
367
 
        os.remove(self._log_file_name)
368
 
        self._log_file = self._log_file_name = None
369
 
 
370
 
    def addCleanup(self, callable):
371
 
        """Arrange to run a callable when this case is torn down.
372
 
 
373
 
        Callables are run in the reverse of the order they are registered, 
374
 
        ie last-in first-out.
375
 
        """
376
 
        if callable in self._cleanups:
377
 
            raise ValueError("cleanup function %r already registered on %s" 
378
 
                    % (callable, self))
379
 
        self._cleanups.append(callable)
 
1690
        """Flush and dereference the in-memory log for this testcase"""
 
1691
        if trace._trace_file:
 
1692
            # flush the log file, to get all content
 
1693
            trace._trace_file.flush()
 
1694
        trace.pop_log_file(self._log_memento)
 
1695
        # The logging module now tracks references for cleanup so discard ours
 
1696
        del self._log_memento
 
1697
 
 
1698
    def thisFailsStrictLockCheck(self):
 
1699
        """It is known that this test would fail with -Dstrict_locks.
 
1700
 
 
1701
        By default, all tests are run with strict lock checking unless
 
1702
        -Edisable_lock_checks is supplied. However there are some tests which
 
1703
        we know fail strict locks at this point that have not been fixed.
 
1704
        They should call this function to disable the strict checking.
 
1705
 
 
1706
        This should be used sparingly, it is much better to fix the locking
 
1707
        issues rather than papering over the problem by calling this function.
 
1708
        """
 
1709
        debug.debug_flags.discard('strict_locks')
 
1710
 
 
1711
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1712
        """Overrides an object attribute restoring it after the test.
 
1713
 
 
1714
        :note: This should be used with discretion; you should think about
 
1715
        whether it's better to make the code testable without monkey-patching.
 
1716
 
 
1717
        :param obj: The object that will be mutated.
 
1718
 
 
1719
        :param attr_name: The attribute name we want to preserve/override in
 
1720
            the object.
 
1721
 
 
1722
        :param new: The optional value we want to set the attribute to.
 
1723
 
 
1724
        :returns: The actual attr value.
 
1725
        """
 
1726
        # The actual value is captured by the call below
 
1727
        value = getattr(obj, attr_name, _unitialized_attr)
 
1728
        if value is _unitialized_attr:
 
1729
            # When the test completes, the attribute should not exist, but if
 
1730
            # we aren't setting a value, we don't need to do anything.
 
1731
            if new is not _unitialized_attr:
 
1732
                self.addCleanup(delattr, obj, attr_name)
 
1733
        else:
 
1734
            self.addCleanup(setattr, obj, attr_name, value)
 
1735
        if new is not _unitialized_attr:
 
1736
            setattr(obj, attr_name, new)
 
1737
        return value
 
1738
 
 
1739
    def overrideEnv(self, name, new):
 
1740
        """Set an environment variable, and reset it after the test.
 
1741
 
 
1742
        :param name: The environment variable name.
 
1743
 
 
1744
        :param new: The value to set the variable to. If None, the 
 
1745
            variable is deleted from the environment.
 
1746
 
 
1747
        :returns: The actual variable value.
 
1748
        """
 
1749
        value = osutils.set_or_unset_env(name, new)
 
1750
        self.addCleanup(osutils.set_or_unset_env, name, value)
 
1751
        return value
 
1752
 
 
1753
    def recordCalls(self, obj, attr_name):
 
1754
        """Monkeypatch in a wrapper that will record calls.
 
1755
 
 
1756
        The monkeypatch is automatically removed when the test concludes.
 
1757
 
 
1758
        :param obj: The namespace holding the reference to be replaced;
 
1759
            typically a module, class, or object.
 
1760
        :param attr_name: A string for the name of the attribute to 
 
1761
            patch.
 
1762
        :returns: A list that will be extended with one item every time the
 
1763
            function is called, with a tuple of (args, kwargs).
 
1764
        """
 
1765
        calls = []
 
1766
 
 
1767
        def decorator(*args, **kwargs):
 
1768
            calls.append((args, kwargs))
 
1769
            return orig(*args, **kwargs)
 
1770
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1771
        return calls
380
1772
 
381
1773
    def _cleanEnvironment(self):
382
 
        new_env = {
383
 
            'HOME': os.getcwd(),
384
 
            'APPDATA': os.getcwd(),
385
 
            'BZREMAIL': None,
386
 
            'EMAIL': None,
387
 
        }
388
 
        self.__old_env = {}
389
 
        self.addCleanup(self._restoreEnvironment)
390
 
        for name, value in new_env.iteritems():
391
 
            self._captureVar(name, value)
392
 
 
393
 
 
394
 
    def _captureVar(self, name, newvalue):
395
 
        """Set an environment variable, preparing it to be reset when finished."""
396
 
        self.__old_env[name] = os.environ.get(name, None)
397
 
        if newvalue is None:
398
 
            if name in os.environ:
399
 
                del os.environ[name]
400
 
        else:
401
 
            os.environ[name] = newvalue
402
 
 
403
 
    @staticmethod
404
 
    def _restoreVar(name, value):
405
 
        if value is None:
406
 
            if name in os.environ:
407
 
                del os.environ[name]
408
 
        else:
409
 
            os.environ[name] = value
410
 
 
411
 
    def _restoreEnvironment(self):
412
 
        for name, value in self.__old_env.iteritems():
413
 
            self._restoreVar(name, value)
414
 
 
415
 
    def tearDown(self):
416
 
        self._runCleanups()
417
 
        unittest.TestCase.tearDown(self)
418
 
 
419
 
    def _runCleanups(self):
420
 
        """Run registered cleanup functions. 
421
 
 
422
 
        This should only be called from TestCase.tearDown.
423
 
        """
424
 
        # TODO: Perhaps this should keep running cleanups even if 
425
 
        # one of them fails?
426
 
        for cleanup_fn in reversed(self._cleanups):
427
 
            cleanup_fn()
 
1774
        for name, value in isolated_environ.items():
 
1775
            self.overrideEnv(name, value)
 
1776
 
 
1777
    def _restoreHooks(self):
 
1778
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1779
            setattr(klass, name, hooks)
 
1780
        self._preserved_hooks.clear()
 
1781
        breezy.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1782
        self._preserved_lazy_hooks.clear()
 
1783
 
 
1784
    def knownFailure(self, reason):
 
1785
        """Declare that this test fails for a known reason
 
1786
 
 
1787
        Tests that are known to fail should generally be using expectedFailure
 
1788
        with an appropriate reverse assertion if a change could cause the test
 
1789
        to start passing. Conversely if the test has no immediate prospect of
 
1790
        succeeding then using skip is more suitable.
 
1791
 
 
1792
        When this method is called while an exception is being handled, that
 
1793
        traceback will be used, otherwise a new exception will be thrown to
 
1794
        provide one but won't be reported.
 
1795
        """
 
1796
        self._add_reason(reason)
 
1797
        try:
 
1798
            exc_info = sys.exc_info()
 
1799
            if exc_info != (None, None, None):
 
1800
                self._report_traceback(exc_info)
 
1801
            else:
 
1802
                try:
 
1803
                    raise self.failureException(reason)
 
1804
                except self.failureException:
 
1805
                    exc_info = sys.exc_info()
 
1806
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1807
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1808
        finally:
 
1809
            del exc_info
 
1810
 
 
1811
    def _suppress_log(self):
 
1812
        """Remove the log info from details."""
 
1813
        self.discardDetail('log')
 
1814
 
 
1815
    def _do_skip(self, result, reason):
 
1816
        self._suppress_log()
 
1817
        addSkip = getattr(result, 'addSkip', None)
 
1818
        if not callable(addSkip):
 
1819
            result.addSuccess(result)
 
1820
        else:
 
1821
            addSkip(self, str(reason))
 
1822
 
 
1823
    @staticmethod
 
1824
    def _do_known_failure(self, result, e):
 
1825
        self._suppress_log()
 
1826
        err = sys.exc_info()
 
1827
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1828
        if addExpectedFailure is not None:
 
1829
            addExpectedFailure(self, err)
 
1830
        else:
 
1831
            result.addSuccess(self)
 
1832
 
 
1833
    @staticmethod
 
1834
    def _do_not_applicable(self, result, e):
 
1835
        if not e.args:
 
1836
            reason = 'No reason given'
 
1837
        else:
 
1838
            reason = e.args[0]
 
1839
        self._suppress_log ()
 
1840
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1841
        if addNotApplicable is not None:
 
1842
            result.addNotApplicable(self, reason)
 
1843
        else:
 
1844
            self._do_skip(result, reason)
 
1845
 
 
1846
    @staticmethod
 
1847
    def _report_skip(self, result, err):
 
1848
        """Override the default _report_skip.
 
1849
 
 
1850
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1851
        already been formatted into the 'reason' string, and we can't pull it
 
1852
        out again.
 
1853
        """
 
1854
        self._suppress_log()
 
1855
        super(TestCase, self)._report_skip(self, result, err)
 
1856
 
 
1857
    @staticmethod
 
1858
    def _report_expected_failure(self, result, err):
 
1859
        """Strip the log.
 
1860
 
 
1861
        See _report_skip for motivation.
 
1862
        """
 
1863
        self._suppress_log()
 
1864
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1865
 
 
1866
    @staticmethod
 
1867
    def _do_unsupported_or_skip(self, result, e):
 
1868
        reason = e.args[0]
 
1869
        self._suppress_log()
 
1870
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1871
        if addNotSupported is not None:
 
1872
            result.addNotSupported(self, reason)
 
1873
        else:
 
1874
            self._do_skip(result, reason)
 
1875
 
 
1876
    def time(self, callable, *args, **kwargs):
 
1877
        """Run callable and accrue the time it takes to the benchmark time.
 
1878
 
 
1879
        If lsprofiling is enabled (i.e. by --lsprof-time to brz selftest) then
 
1880
        this will cause lsprofile statistics to be gathered and stored in
 
1881
        self._benchcalls.
 
1882
        """
 
1883
        if self._benchtime is None:
 
1884
            self.addDetail('benchtime', content.Content(content.UTF8_TEXT,
 
1885
                lambda:[str(self._benchtime).encode('utf-8')]))
 
1886
            self._benchtime = 0
 
1887
        start = time.time()
 
1888
        try:
 
1889
            if not self._gather_lsprof_in_benchmarks:
 
1890
                return callable(*args, **kwargs)
 
1891
            else:
 
1892
                # record this benchmark
 
1893
                ret, stats = breezy.lsprof.profile(callable, *args, **kwargs)
 
1894
                stats.sort()
 
1895
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1896
                return ret
 
1897
        finally:
 
1898
            self._benchtime += time.time() - start
428
1899
 
429
1900
    def log(self, *args):
430
 
        mutter(*args)
431
 
 
432
 
    def _get_log(self):
433
 
        """Return as a string the log for this test"""
434
 
        if self._log_file_name:
435
 
            return open(self._log_file_name).read()
436
 
        else:
437
 
            return self._log_contents
438
 
        # TODO: Delete the log after it's been read in
439
 
 
440
 
    def capture(self, cmd, retcode=0):
441
 
        """Shortcut that splits cmd into words, runs, and returns stdout"""
442
 
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
443
 
 
444
 
    def run_bzr_captured(self, argv, retcode=0):
445
 
        """Invoke bzr and return (stdout, stderr).
446
 
 
447
 
        Useful for code that wants to check the contents of the
448
 
        output, the way error messages are presented, etc.
449
 
 
450
 
        This should be the main method for tests that want to exercise the
451
 
        overall behavior of the bzr application (rather than a unit test
452
 
        or a functional test of the library.)
453
 
 
454
 
        Much of the old code runs bzr by forking a new copy of Python, but
455
 
        that is slower, harder to debug, and generally not necessary.
456
 
 
457
 
        This runs bzr through the interface that catches and reports
458
 
        errors, and with logging set to something approximating the
459
 
        default, so that error reporting can be checked.
460
 
 
461
 
        argv -- arguments to invoke bzr
462
 
        retcode -- expected return code, or None for don't-care.
463
 
        """
464
 
        stdout = StringIO()
465
 
        stderr = StringIO()
466
 
        self.log('run bzr: %s', ' '.join(argv))
 
1901
        trace.mutter(*args)
 
1902
 
 
1903
    def get_log(self):
 
1904
        """Get a unicode string containing the log from breezy.trace.
 
1905
 
 
1906
        Undecodable characters are replaced.
 
1907
        """
 
1908
        return u"".join(self.getDetails()['log'].iter_text())
 
1909
 
 
1910
    def requireFeature(self, feature):
 
1911
        """This test requires a specific feature is available.
 
1912
 
 
1913
        :raises UnavailableFeature: When feature is not available.
 
1914
        """
 
1915
        if not feature.available():
 
1916
            raise UnavailableFeature(feature)
 
1917
 
 
1918
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1919
            working_dir):
 
1920
        """Run bazaar command line, splitting up a string command line."""
 
1921
        if isinstance(args, string_types):
 
1922
            args = shlex.split(args)
 
1923
        return self._run_bzr_core(args, retcode=retcode,
 
1924
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1925
                )
 
1926
 
 
1927
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1928
            working_dir):
 
1929
        # Clear chk_map page cache, because the contents are likely to mask
 
1930
        # locking errors.
 
1931
        chk_map.clear_cache()
 
1932
        if encoding is None:
 
1933
            encoding = osutils.get_user_encoding()
 
1934
 
 
1935
        self.log('run brz: %r', args)
 
1936
 
 
1937
        stdout = ui_testing.BytesIOWithEncoding()
 
1938
        stderr = ui_testing.BytesIOWithEncoding()
 
1939
        stdout.encoding = stderr.encoding = encoding
 
1940
 
467
1941
        # FIXME: don't call into logging here
468
 
        handler = logging.StreamHandler(stderr)
469
 
        handler.setFormatter(bzrlib.trace.QuietFormatter())
470
 
        handler.setLevel(logging.INFO)
 
1942
        handler = trace.EncodedStreamHandler(
 
1943
            stderr, errors="replace", level=logging.INFO)
471
1944
        logger = logging.getLogger('')
472
1945
        logger.addHandler(handler)
 
1946
 
 
1947
        self._last_cmd_stdout = codecs.getwriter(encoding)(stdout)
 
1948
        self._last_cmd_stderr = codecs.getwriter(encoding)(stderr)
 
1949
 
 
1950
        old_ui_factory = ui.ui_factory
 
1951
        ui.ui_factory = ui_testing.TestUIFactory(
 
1952
            stdin=stdin,
 
1953
            stdout=self._last_cmd_stdout,
 
1954
            stderr=self._last_cmd_stderr)
 
1955
 
 
1956
        cwd = None
 
1957
        if working_dir is not None:
 
1958
            cwd = osutils.getcwd()
 
1959
            os.chdir(working_dir)
 
1960
 
473
1961
        try:
474
 
            result = self.apply_redirected(None, stdout, stderr,
475
 
                                           bzrlib.commands.run_bzr_catch_errors,
476
 
                                           argv)
 
1962
            result = self.apply_redirected(
 
1963
                ui.ui_factory.stdin,
 
1964
                stdout, stderr,
 
1965
                _mod_commands.run_bzr_catch_user_errors,
 
1966
                args)
477
1967
        finally:
478
1968
            logger.removeHandler(handler)
 
1969
            ui.ui_factory = old_ui_factory
 
1970
            if cwd is not None:
 
1971
                os.chdir(cwd)
 
1972
 
479
1973
        out = stdout.getvalue()
480
1974
        err = stderr.getvalue()
481
1975
        if out:
482
 
            self.log('output:\n%s', out)
 
1976
            self.log('output:\n%r', out)
483
1977
        if err:
484
 
            self.log('errors:\n%s', err)
 
1978
            self.log('errors:\n%r', err)
485
1979
        if retcode is not None:
486
 
            self.assertEquals(result, retcode)
487
 
        return out, err
488
 
 
489
 
    def run_bzr(self, *args, **kwargs):
490
 
        """Invoke bzr, as if it were run from the command line.
 
1980
            self.assertEqual(retcode, result,
 
1981
                              message='Unexpected return code')
 
1982
        return result, out, err
 
1983
 
 
1984
    def run_bzr(self, args, retcode=0, stdin=None, encoding=None,
 
1985
                working_dir=None, error_regexes=[]):
 
1986
        """Invoke brz, as if it were run from the command line.
 
1987
 
 
1988
        The argument list should not include the brz program name - the
 
1989
        first argument is normally the brz command.  Arguments may be
 
1990
        passed in three ways:
 
1991
 
 
1992
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1993
        when the command contains whitespace or metacharacters, or
 
1994
        is built up at run time.
 
1995
 
 
1996
        2- A single string, eg "add a".  This is the most convenient
 
1997
        for hardcoded commands.
 
1998
 
 
1999
        This runs brz through the interface that catches and reports
 
2000
        errors, and with logging set to something approximating the
 
2001
        default, so that error reporting can be checked.
491
2002
 
492
2003
        This should be the main method for tests that want to exercise the
493
 
        overall behavior of the bzr application (rather than a unit test
 
2004
        overall behavior of the brz application (rather than a unit test
494
2005
        or a functional test of the library.)
495
2006
 
496
2007
        This sends the stdout/stderr results into the test's log,
497
2008
        where it may be useful for debugging.  See also run_captured.
498
 
        """
499
 
        retcode = kwargs.pop('retcode', 0)
500
 
        return self.run_bzr_captured(args, retcode)
501
 
 
502
 
    def check_inventory_shape(self, inv, shape):
503
 
        """Compare an inventory to a list of expected names.
 
2009
 
 
2010
        :keyword stdin: A string to be used as stdin for the command.
 
2011
        :keyword retcode: The status code the command should return;
 
2012
            default 0.
 
2013
        :keyword working_dir: The directory to run the command in
 
2014
        :keyword error_regexes: A list of expected error messages.  If
 
2015
            specified they must be seen in the error output of the command.
 
2016
        """
 
2017
        retcode, out, err = self._run_bzr_autosplit(
 
2018
            args=args,
 
2019
            retcode=retcode,
 
2020
            encoding=encoding,
 
2021
            stdin=stdin,
 
2022
            working_dir=working_dir,
 
2023
            )
 
2024
        self.assertIsInstance(error_regexes, (list, tuple))
 
2025
        for regex in error_regexes:
 
2026
            self.assertContainsRe(err, regex)
 
2027
        return out, err
 
2028
 
 
2029
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
2030
        """Run brz, and check that stderr contains the supplied regexes
 
2031
 
 
2032
        :param error_regexes: Sequence of regular expressions which
 
2033
            must each be found in the error output. The relative ordering
 
2034
            is not enforced.
 
2035
        :param args: command-line arguments for brz
 
2036
        :param kwargs: Keyword arguments which are interpreted by run_brz
 
2037
            This function changes the default value of retcode to be 3,
 
2038
            since in most cases this is run when you expect brz to fail.
 
2039
 
 
2040
        :return: (out, err) The actual output of running the command (in case
 
2041
            you want to do more inspection)
 
2042
 
 
2043
        Examples of use::
 
2044
 
 
2045
            # Make sure that commit is failing because there is nothing to do
 
2046
            self.run_bzr_error(['no changes to commit'],
 
2047
                               ['commit', '-m', 'my commit comment'])
 
2048
            # Make sure --strict is handling an unknown file, rather than
 
2049
            # giving us the 'nothing to do' error
 
2050
            self.build_tree(['unknown'])
 
2051
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
2052
                               ['commit', --strict', '-m', 'my commit comment'])
 
2053
        """
 
2054
        kwargs.setdefault('retcode', 3)
 
2055
        kwargs['error_regexes'] = error_regexes
 
2056
        out, err = self.run_bzr(*args, **kwargs)
 
2057
        return out, err
 
2058
 
 
2059
    def run_bzr_subprocess(self, *args, **kwargs):
 
2060
        """Run brz in a subprocess for testing.
 
2061
 
 
2062
        This starts a new Python interpreter and runs brz in there.
 
2063
        This should only be used for tests that have a justifiable need for
 
2064
        this isolation: e.g. they are testing startup time, or signal
 
2065
        handling, or early startup code, etc.  Subprocess code can't be
 
2066
        profiled or debugged so easily.
 
2067
 
 
2068
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
2069
            None is supplied, the status code is not checked.
 
2070
        :keyword env_changes: A dictionary which lists changes to environment
 
2071
            variables. A value of None will unset the env variable.
 
2072
            The values must be strings. The change will only occur in the
 
2073
            child, so you don't need to fix the environment after running.
 
2074
        :keyword universal_newlines: Convert CRLF => LF
 
2075
        :keyword allow_plugins: By default the subprocess is run with
 
2076
            --no-plugins to ensure test reproducibility. Also, it is possible
 
2077
            for system-wide plugins to create unexpected output on stderr,
 
2078
            which can cause unnecessary test failures.
 
2079
        """
 
2080
        env_changes = kwargs.get('env_changes', {})
 
2081
        working_dir = kwargs.get('working_dir', None)
 
2082
        allow_plugins = kwargs.get('allow_plugins', False)
 
2083
        if len(args) == 1:
 
2084
            if isinstance(args[0], list):
 
2085
                args = args[0]
 
2086
            elif isinstance(args[0], (str, text_type)):
 
2087
                args = list(shlex.split(args[0]))
 
2088
        else:
 
2089
            raise ValueError("passing varargs to run_bzr_subprocess")
 
2090
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
2091
                                            working_dir=working_dir,
 
2092
                                            allow_plugins=allow_plugins)
 
2093
        # We distinguish between retcode=None and retcode not passed.
 
2094
        supplied_retcode = kwargs.get('retcode', 0)
 
2095
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
2096
            universal_newlines=kwargs.get('universal_newlines', False),
 
2097
            process_args=args)
 
2098
 
 
2099
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2100
                             skip_if_plan_to_signal=False,
 
2101
                             working_dir=None,
 
2102
                             allow_plugins=False, stderr=subprocess.PIPE):
 
2103
        """Start brz in a subprocess for testing.
 
2104
 
 
2105
        This starts a new Python interpreter and runs brz in there.
 
2106
        This should only be used for tests that have a justifiable need for
 
2107
        this isolation: e.g. they are testing startup time, or signal
 
2108
        handling, or early startup code, etc.  Subprocess code can't be
 
2109
        profiled or debugged so easily.
 
2110
 
 
2111
        :param process_args: a list of arguments to pass to the brz executable,
 
2112
            for example ``['--version']``.
 
2113
        :param env_changes: A dictionary which lists changes to environment
 
2114
            variables. A value of None will unset the env variable.
 
2115
            The values must be strings. The change will only occur in the
 
2116
            child, so you don't need to fix the environment after running.
 
2117
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
2118
            doesn't support signalling subprocesses.
 
2119
        :param allow_plugins: If False (default) pass --no-plugins to brz.
 
2120
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2121
            are those valid for the stderr argument of `subprocess.Popen`.
 
2122
            Default value is ``subprocess.PIPE``.
 
2123
 
 
2124
        :returns: Popen object for the started process.
 
2125
        """
 
2126
        if skip_if_plan_to_signal:
 
2127
            if os.name != "posix":
 
2128
                raise TestSkipped("Sending signals not supported")
 
2129
 
 
2130
        if env_changes is None:
 
2131
            env_changes = {}
 
2132
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2133
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2134
        # gets set to the computed directory of this parent process.
 
2135
        if site.USER_BASE is not None:
 
2136
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
 
2137
        old_env = {}
 
2138
 
 
2139
        def cleanup_environment():
 
2140
            for env_var, value in env_changes.items():
 
2141
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
2142
 
 
2143
        def restore_environment():
 
2144
            for env_var, value in old_env.items():
 
2145
                osutils.set_or_unset_env(env_var, value)
 
2146
 
 
2147
        bzr_path = self.get_brz_path()
 
2148
 
 
2149
        cwd = None
 
2150
        if working_dir is not None:
 
2151
            cwd = osutils.getcwd()
 
2152
            os.chdir(working_dir)
 
2153
 
 
2154
        try:
 
2155
            # win32 subprocess doesn't support preexec_fn
 
2156
            # so we will avoid using it on all platforms, just to
 
2157
            # make sure the code path is used, and we don't break on win32
 
2158
            cleanup_environment()
 
2159
            # Include the subprocess's log file in the test details, in case
 
2160
            # the test fails due to an error in the subprocess.
 
2161
            self._add_subprocess_log(trace._get_brz_log_filename())
 
2162
            command = [sys.executable]
 
2163
            # frozen executables don't need the path to bzr
 
2164
            if getattr(sys, "frozen", None) is None:
 
2165
                command.append(bzr_path)
 
2166
            if not allow_plugins:
 
2167
                command.append('--no-plugins')
 
2168
            command.extend(process_args)
 
2169
            process = self._popen(command, stdin=subprocess.PIPE,
 
2170
                                  stdout=subprocess.PIPE,
 
2171
                                  stderr=stderr)
 
2172
        finally:
 
2173
            restore_environment()
 
2174
            if cwd is not None:
 
2175
                os.chdir(cwd)
 
2176
 
 
2177
        return process
 
2178
 
 
2179
    def _add_subprocess_log(self, log_file_path):
 
2180
        if len(self._log_files) == 0:
 
2181
            # Register an addCleanup func.  We do this on the first call to
 
2182
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2183
            # addCleanup is registered after any cleanups for tempdirs that
 
2184
            # subclasses might create, which will probably remove the log file
 
2185
            # we want to read.
 
2186
            self.addCleanup(self._subprocess_log_cleanup)
 
2187
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2188
        # twice.
 
2189
        self._log_files.add(log_file_path)
 
2190
 
 
2191
    def _subprocess_log_cleanup(self):
 
2192
        for count, log_file_path in enumerate(self._log_files):
 
2193
            # We use buffer_now=True to avoid holding the file open beyond
 
2194
            # the life of this function, which might interfere with e.g.
 
2195
            # cleaning tempdirs on Windows.
 
2196
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2197
            #detail_content = content.content_from_file(
 
2198
            #    log_file_path, buffer_now=True)
 
2199
            with open(log_file_path, 'rb') as log_file:
 
2200
                log_file_bytes = log_file.read()
 
2201
            detail_content = content.Content(content.ContentType("text",
 
2202
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2203
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2204
                detail_content)
 
2205
 
 
2206
    def _popen(self, *args, **kwargs):
 
2207
        """Place a call to Popen.
 
2208
 
 
2209
        Allows tests to override this method to intercept the calls made to
 
2210
        Popen for introspection.
 
2211
        """
 
2212
        return subprocess.Popen(*args, **kwargs)
 
2213
 
 
2214
    def get_source_path(self):
 
2215
        """Return the path of the directory containing breezy."""
 
2216
        return os.path.dirname(os.path.dirname(breezy.__file__))
 
2217
 
 
2218
    def get_brz_path(self):
 
2219
        """Return the path of the 'brz' executable for this test suite."""
 
2220
        brz_path = os.path.join(self.get_source_path(), "brz")
 
2221
        if not os.path.isfile(brz_path):
 
2222
            # We are probably installed. Assume sys.argv is the right file
 
2223
            brz_path = sys.argv[0]
 
2224
        return brz_path
 
2225
 
 
2226
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2227
                              universal_newlines=False, process_args=None):
 
2228
        """Finish the execution of process.
 
2229
 
 
2230
        :param process: the Popen object returned from start_bzr_subprocess.
 
2231
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2232
            None is supplied, the status code is not checked.
 
2233
        :param send_signal: an optional signal to send to the process.
 
2234
        :param universal_newlines: Convert CRLF => LF
 
2235
        :returns: (stdout, stderr)
 
2236
        """
 
2237
        if send_signal is not None:
 
2238
            os.kill(process.pid, send_signal)
 
2239
        out, err = process.communicate()
 
2240
 
 
2241
        if universal_newlines:
 
2242
            out = out.replace('\r\n', '\n')
 
2243
            err = err.replace('\r\n', '\n')
 
2244
 
 
2245
        if retcode is not None and retcode != process.returncode:
 
2246
            if process_args is None:
 
2247
                process_args = "(unknown args)"
 
2248
            trace.mutter('Output of brz %r:\n%s', process_args, out)
 
2249
            trace.mutter('Error for brz %r:\n%s', process_args, err)
 
2250
            self.fail('Command brz %r failed with retcode %d != %d'
 
2251
                      % (process_args, retcode, process.returncode))
 
2252
        return [out, err]
 
2253
 
 
2254
    def check_tree_shape(self, tree, shape):
 
2255
        """Compare a tree to a list of expected names.
504
2256
 
505
2257
        Fail if they are not precisely equal.
506
2258
        """
507
2259
        extras = []
508
2260
        shape = list(shape)             # copy
509
 
        for path, ie in inv.entries():
 
2261
        for path, ie in tree.iter_entries_by_dir():
510
2262
            name = path.replace('\\', '/')
511
 
            if ie.kind == 'dir':
 
2263
            if ie.kind == 'directory':
512
2264
                name = name + '/'
513
 
            if name in shape:
 
2265
            if name == "/":
 
2266
                pass # ignore root entry
 
2267
            elif name in shape:
514
2268
                shape.remove(name)
515
2269
            else:
516
2270
                extras.append(name)
527
2281
        if not callable(a_callable):
528
2282
            raise ValueError("a_callable must be callable.")
529
2283
        if stdin is None:
530
 
            stdin = StringIO("")
 
2284
            stdin = BytesIO(b"")
531
2285
        if stdout is None:
532
2286
            if getattr(self, "_log_file", None) is not None:
533
2287
                stdout = self._log_file
534
2288
            else:
535
 
                stdout = StringIO()
 
2289
                stdout = BytesIO()
536
2290
        if stderr is None:
537
2291
            if getattr(self, "_log_file", None is not None):
538
2292
                stderr = self._log_file
539
2293
            else:
540
 
                stderr = StringIO()
 
2294
                stderr = BytesIO()
541
2295
        real_stdin = sys.stdin
542
2296
        real_stdout = sys.stdout
543
2297
        real_stderr = sys.stderr
551
2305
            sys.stderr = real_stderr
552
2306
            sys.stdin = real_stdin
553
2307
 
554
 
    def merge(self, branch_from, wt_to):
555
 
        """A helper for tests to do a ui-less merge.
556
 
 
557
 
        This should move to the main library when someone has time to integrate
558
 
        it in.
559
 
        """
560
 
        # minimal ui-less merge.
561
 
        wt_to.branch.fetch(branch_from)
562
 
        base_rev = common_ancestor(branch_from.last_revision(),
563
 
                                   wt_to.branch.last_revision(),
564
 
                                   wt_to.branch.repository)
565
 
        merge_inner(wt_to.branch, branch_from.basis_tree(), 
566
 
                    wt_to.branch.repository.revision_tree(base_rev),
567
 
                    this_tree=wt_to)
568
 
        wt_to.add_pending_merge(branch_from.last_revision())
569
 
 
570
 
 
571
 
BzrTestBase = TestCase
572
 
 
573
 
     
574
 
class TestCaseInTempDir(TestCase):
 
2308
    def reduceLockdirTimeout(self):
 
2309
        """Reduce the default lock timeout for the duration of the test, so that
 
2310
        if LockContention occurs during a test, it does so quickly.
 
2311
 
 
2312
        Tests that expect to provoke LockContention errors should call this.
 
2313
        """
 
2314
        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2315
 
 
2316
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2317
        """Return a wrapped BytesIO, that will encode text input to UTF-8."""
 
2318
        if encoding_type is None:
 
2319
            encoding_type = 'strict'
 
2320
        bio = BytesIO()
 
2321
        output_encoding = 'utf-8'
 
2322
        sio = codecs.getwriter(output_encoding)(bio, errors=encoding_type)
 
2323
        sio.encoding = output_encoding
 
2324
        return sio
 
2325
 
 
2326
    def disable_verb(self, verb):
 
2327
        """Disable a smart server verb for one test."""
 
2328
        from breezy.bzr.smart import request
 
2329
        request_handlers = request.request_handlers
 
2330
        orig_method = request_handlers.get(verb)
 
2331
        orig_info = request_handlers.get_info(verb)
 
2332
        request_handlers.remove(verb)
 
2333
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2334
            info=orig_info)
 
2335
 
 
2336
    def __hash__(self):
 
2337
        return id(self)
 
2338
 
 
2339
 
 
2340
class CapturedCall(object):
 
2341
    """A helper for capturing smart server calls for easy debug analysis."""
 
2342
 
 
2343
    def __init__(self, params, prefix_length):
 
2344
        """Capture the call with params and skip prefix_length stack frames."""
 
2345
        self.call = params
 
2346
        import traceback
 
2347
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2348
        # client frames. Beyond this we could get more clever, but this is good
 
2349
        # enough for now.
 
2350
        stack = traceback.extract_stack()[prefix_length:-5]
 
2351
        self.stack = ''.join(traceback.format_list(stack))
 
2352
 
 
2353
    def __str__(self):
 
2354
        return self.call.method
 
2355
 
 
2356
    def __repr__(self):
 
2357
        return self.call.method
 
2358
 
 
2359
    def stack(self):
 
2360
        return self.stack
 
2361
 
 
2362
 
 
2363
class TestCaseWithMemoryTransport(TestCase):
 
2364
    """Common test class for tests that do not need disk resources.
 
2365
 
 
2366
    Tests that need disk resources should derive from TestCaseInTempDir
 
2367
    orTestCaseWithTransport.
 
2368
 
 
2369
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all brz tests.
 
2370
 
 
2371
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2372
    a directory which does not exist. This serves to help ensure test isolation
 
2373
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2374
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2375
    defaults for the transport in tests, nor does it obey the command line
 
2376
    override, so tests that accidentally write to the common directory should
 
2377
    be rare.
 
2378
 
 
2379
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2380
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2381
    """
 
2382
 
 
2383
    TEST_ROOT = None
 
2384
    _TEST_NAME = 'test'
 
2385
 
 
2386
    def __init__(self, methodName='runTest'):
 
2387
        # allow test parameterization after test construction and before test
 
2388
        # execution. Variables that the parameterizer sets need to be
 
2389
        # ones that are not set by setUp, or setUp will trash them.
 
2390
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2391
        self.vfs_transport_factory = default_transport
 
2392
        self.transport_server = None
 
2393
        self.transport_readonly_server = None
 
2394
        self.__vfs_server = None
 
2395
 
 
2396
    def setUp(self):
 
2397
        super(TestCaseWithMemoryTransport, self).setUp()
 
2398
 
 
2399
        def _add_disconnect_cleanup(transport):
 
2400
            """Schedule disconnection of given transport at test cleanup
 
2401
 
 
2402
            This needs to happen for all connected transports or leaks occur.
 
2403
 
 
2404
            Note reconnections may mean we call disconnect multiple times per
 
2405
            transport which is suboptimal but seems harmless.
 
2406
            """
 
2407
            self.addCleanup(transport.disconnect)
 
2408
 
 
2409
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
 
2410
            _add_disconnect_cleanup, None)
 
2411
 
 
2412
        self._make_test_root()
 
2413
        self.addCleanup(os.chdir, osutils.getcwd())
 
2414
        self.makeAndChdirToTestDir()
 
2415
        self.overrideEnvironmentForTesting()
 
2416
        self.__readonly_server = None
 
2417
        self.__server = None
 
2418
        self.reduceLockdirTimeout()
 
2419
        # Each test may use its own config files even if the local config files
 
2420
        # don't actually exist. They'll rightly fail if they try to create them
 
2421
        # though.
 
2422
        self.overrideAttr(config, '_shared_stores', {})
 
2423
 
 
2424
    def get_transport(self, relpath=None):
 
2425
        """Return a writeable transport.
 
2426
 
 
2427
        This transport is for the test scratch space relative to
 
2428
        "self._test_root"
 
2429
 
 
2430
        :param relpath: a path relative to the base url.
 
2431
        """
 
2432
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2433
        self.assertFalse(t.is_readonly())
 
2434
        return t
 
2435
 
 
2436
    def get_readonly_transport(self, relpath=None):
 
2437
        """Return a readonly transport for the test scratch space
 
2438
 
 
2439
        This can be used to test that operations which should only need
 
2440
        readonly access in fact do not try to write.
 
2441
 
 
2442
        :param relpath: a path relative to the base url.
 
2443
        """
 
2444
        t = _mod_transport.get_transport_from_url(
 
2445
            self.get_readonly_url(relpath))
 
2446
        self.assertTrue(t.is_readonly())
 
2447
        return t
 
2448
 
 
2449
    def create_transport_readonly_server(self):
 
2450
        """Create a transport server from class defined at init.
 
2451
 
 
2452
        This is mostly a hook for daughter classes.
 
2453
        """
 
2454
        return self.transport_readonly_server()
 
2455
 
 
2456
    def get_readonly_server(self):
 
2457
        """Get the server instance for the readonly transport
 
2458
 
 
2459
        This is useful for some tests with specific servers to do diagnostics.
 
2460
        """
 
2461
        if self.__readonly_server is None:
 
2462
            if self.transport_readonly_server is None:
 
2463
                # readonly decorator requested
 
2464
                self.__readonly_server = test_server.ReadonlyServer()
 
2465
            else:
 
2466
                # explicit readonly transport.
 
2467
                self.__readonly_server = self.create_transport_readonly_server()
 
2468
            self.start_server(self.__readonly_server,
 
2469
                self.get_vfs_only_server())
 
2470
        return self.__readonly_server
 
2471
 
 
2472
    def get_readonly_url(self, relpath=None):
 
2473
        """Get a URL for the readonly transport.
 
2474
 
 
2475
        This will either be backed by '.' or a decorator to the transport
 
2476
        used by self.get_url()
 
2477
        relpath provides for clients to get a path relative to the base url.
 
2478
        These should only be downwards relative, not upwards.
 
2479
        """
 
2480
        base = self.get_readonly_server().get_url()
 
2481
        return self._adjust_url(base, relpath)
 
2482
 
 
2483
    def get_vfs_only_server(self):
 
2484
        """Get the vfs only read/write server instance.
 
2485
 
 
2486
        This is useful for some tests with specific servers that need
 
2487
        diagnostics.
 
2488
 
 
2489
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2490
        is no means to override it.
 
2491
        """
 
2492
        if self.__vfs_server is None:
 
2493
            self.__vfs_server = memory.MemoryServer()
 
2494
            self.start_server(self.__vfs_server)
 
2495
        return self.__vfs_server
 
2496
 
 
2497
    def get_server(self):
 
2498
        """Get the read/write server instance.
 
2499
 
 
2500
        This is useful for some tests with specific servers that need
 
2501
        diagnostics.
 
2502
 
 
2503
        This is built from the self.transport_server factory. If that is None,
 
2504
        then the self.get_vfs_server is returned.
 
2505
        """
 
2506
        if self.__server is None:
 
2507
            if (self.transport_server is None or self.transport_server is
 
2508
                self.vfs_transport_factory):
 
2509
                self.__server = self.get_vfs_only_server()
 
2510
            else:
 
2511
                # bring up a decorated means of access to the vfs only server.
 
2512
                self.__server = self.transport_server()
 
2513
                self.start_server(self.__server, self.get_vfs_only_server())
 
2514
        return self.__server
 
2515
 
 
2516
    def _adjust_url(self, base, relpath):
 
2517
        """Get a URL (or maybe a path) for the readwrite transport.
 
2518
 
 
2519
        This will either be backed by '.' or to an equivalent non-file based
 
2520
        facility.
 
2521
        relpath provides for clients to get a path relative to the base url.
 
2522
        These should only be downwards relative, not upwards.
 
2523
        """
 
2524
        if relpath is not None and relpath != '.':
 
2525
            if not base.endswith('/'):
 
2526
                base = base + '/'
 
2527
            # XXX: Really base should be a url; we did after all call
 
2528
            # get_url()!  But sometimes it's just a path (from
 
2529
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2530
            # to a non-escaped local path.
 
2531
            if base.startswith('./') or base.startswith('/'):
 
2532
                base += relpath
 
2533
            else:
 
2534
                base += urlutils.escape(relpath)
 
2535
        return base
 
2536
 
 
2537
    def get_url(self, relpath=None):
 
2538
        """Get a URL (or maybe a path) for the readwrite transport.
 
2539
 
 
2540
        This will either be backed by '.' or to an equivalent non-file based
 
2541
        facility.
 
2542
        relpath provides for clients to get a path relative to the base url.
 
2543
        These should only be downwards relative, not upwards.
 
2544
        """
 
2545
        base = self.get_server().get_url()
 
2546
        return self._adjust_url(base, relpath)
 
2547
 
 
2548
    def get_vfs_only_url(self, relpath=None):
 
2549
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2550
 
 
2551
        This will never be a smart protocol.  It always has all the
 
2552
        capabilities of the local filesystem, but it might actually be a
 
2553
        MemoryTransport or some other similar virtual filesystem.
 
2554
 
 
2555
        This is the backing transport (if any) of the server returned by
 
2556
        get_url and get_readonly_url.
 
2557
 
 
2558
        :param relpath: provides for clients to get a path relative to the base
 
2559
            url.  These should only be downwards relative, not upwards.
 
2560
        :return: A URL
 
2561
        """
 
2562
        base = self.get_vfs_only_server().get_url()
 
2563
        return self._adjust_url(base, relpath)
 
2564
 
 
2565
    def _create_safety_net(self):
 
2566
        """Make a fake bzr directory.
 
2567
 
 
2568
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2569
        real branch.
 
2570
        """
 
2571
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2572
        try:
 
2573
            # Make sure we get a readable and accessible home for .brz.log
 
2574
            # and/or config files, and not fallback to weird defaults (see
 
2575
            # http://pad.lv/825027).
 
2576
            self.assertIs(None, os.environ.get('BRZ_HOME', None))
 
2577
            os.environ['BRZ_HOME'] = root
 
2578
            wt = controldir.ControlDir.create_standalone_workingtree(root)
 
2579
            del os.environ['BRZ_HOME']
 
2580
        except Exception as e:
 
2581
            self.fail("Fail to initialize the safety net: %r\n" % (e,))
 
2582
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2583
        # we don't need to re-open the wt to check it hasn't changed.
 
2584
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2585
            wt.control_transport.get_bytes('dirstate'))
 
2586
 
 
2587
    def _check_safety_net(self):
 
2588
        """Check that the safety .bzr directory have not been touched.
 
2589
 
 
2590
        _make_test_root have created a .bzr directory to prevent tests from
 
2591
        propagating. This method ensures than a test did not leaked.
 
2592
        """
 
2593
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2594
        t = _mod_transport.get_transport_from_path(root)
 
2595
        self.permit_url(t.base)
 
2596
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2597
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2598
            # The current test have modified the /bzr directory, we need to
 
2599
            # recreate a new one or all the followng tests will fail.
 
2600
            # If you need to inspect its content uncomment the following line
 
2601
            # import pdb; pdb.set_trace()
 
2602
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2603
            self._create_safety_net()
 
2604
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2605
 
 
2606
    def _make_test_root(self):
 
2607
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2608
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2609
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2610
                                                    suffix='.tmp'))
 
2611
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2612
 
 
2613
            self._create_safety_net()
 
2614
 
 
2615
            # The same directory is used by all tests, and we're not
 
2616
            # specifically told when all tests are finished.  This will do.
 
2617
            atexit.register(_rmtree_temp_dir, root)
 
2618
 
 
2619
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2620
        self.addCleanup(self._check_safety_net)
 
2621
 
 
2622
    def makeAndChdirToTestDir(self):
 
2623
        """Create a temporary directories for this one test.
 
2624
 
 
2625
        This must set self.test_home_dir and self.test_dir and chdir to
 
2626
        self.test_dir.
 
2627
 
 
2628
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2629
        """
 
2630
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2631
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2632
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2633
        self.permit_dir(self.test_dir)
 
2634
 
 
2635
    def make_branch(self, relpath, format=None, name=None):
 
2636
        """Create a branch on the transport at relpath."""
 
2637
        repo = self.make_repository(relpath, format=format)
 
2638
        return repo.controldir.create_branch(append_revisions_only=False, name=name)
 
2639
 
 
2640
    def get_default_format(self):
 
2641
        return 'default'
 
2642
 
 
2643
    def resolve_format(self, format):
 
2644
        """Resolve an object to a ControlDir format object.
 
2645
 
 
2646
        The initial format object can either already be
 
2647
        a ControlDirFormat, None (for the default format),
 
2648
        or a string with the name of the control dir format.
 
2649
 
 
2650
        :param format: Object to resolve
 
2651
        :return A ControlDirFormat instance
 
2652
        """
 
2653
        if format is None:
 
2654
            format = self.get_default_format()
 
2655
        if isinstance(format, str):
 
2656
            format = controldir.format_registry.make_controldir(format)
 
2657
        return format
 
2658
 
 
2659
    def make_controldir(self, relpath, format=None):
 
2660
        try:
 
2661
            # might be a relative or absolute path
 
2662
            maybe_a_url = self.get_url(relpath)
 
2663
            segments = maybe_a_url.rsplit('/', 1)
 
2664
            t = _mod_transport.get_transport(maybe_a_url)
 
2665
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2666
                t.ensure_base()
 
2667
            format = self.resolve_format(format)
 
2668
            return format.initialize_on_transport(t)
 
2669
        except errors.UninitializableFormat:
 
2670
            raise TestSkipped("Format %s is not initializable." % format)
 
2671
 
 
2672
    def make_repository(self, relpath, shared=None, format=None):
 
2673
        """Create a repository on our default transport at relpath.
 
2674
 
 
2675
        Note that relpath must be a relative path, not a full url.
 
2676
        """
 
2677
        # FIXME: If you create a remoterepository this returns the underlying
 
2678
        # real format, which is incorrect.  Actually we should make sure that
 
2679
        # RemoteBzrDir returns a RemoteRepository.
 
2680
        # maybe  mbp 20070410
 
2681
        made_control = self.make_controldir(relpath, format=format)
 
2682
        return made_control.create_repository(shared=shared)
 
2683
 
 
2684
    def make_smart_server(self, path, backing_server=None):
 
2685
        if backing_server is None:
 
2686
            backing_server = self.get_server()
 
2687
        smart_server = test_server.SmartTCPServer_for_testing()
 
2688
        self.start_server(smart_server, backing_server)
 
2689
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2690
                                                   ).clone(path)
 
2691
        return remote_transport
 
2692
 
 
2693
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2694
        """Create a branch on the default transport and a MemoryTree for it."""
 
2695
        b = self.make_branch(relpath, format=format)
 
2696
        return memorytree.MemoryTree.create_on_branch(b)
 
2697
 
 
2698
    def make_branch_builder(self, relpath, format=None):
 
2699
        branch = self.make_branch(relpath, format=format)
 
2700
        return branchbuilder.BranchBuilder(branch=branch)
 
2701
 
 
2702
    def overrideEnvironmentForTesting(self):
 
2703
        test_home_dir = self.test_home_dir
 
2704
        if not PY3 and isinstance(test_home_dir, text_type):
 
2705
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2706
        self.overrideEnv('HOME', test_home_dir)
 
2707
        self.overrideEnv('BRZ_HOME', test_home_dir)
 
2708
        self.overrideEnv('GNUPGHOME', os.path.join(test_home_dir, '.gnupg'))
 
2709
 
 
2710
    def setup_smart_server_with_call_log(self):
 
2711
        """Sets up a smart server as the transport server with a call log."""
 
2712
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2713
        self.hpss_connections = []
 
2714
        self.hpss_calls = []
 
2715
        import traceback
 
2716
        # Skip the current stack down to the caller of
 
2717
        # setup_smart_server_with_call_log
 
2718
        prefix_length = len(traceback.extract_stack()) - 2
 
2719
        def capture_hpss_call(params):
 
2720
            self.hpss_calls.append(
 
2721
                CapturedCall(params, prefix_length))
 
2722
        def capture_connect(transport):
 
2723
            self.hpss_connections.append(transport)
 
2724
        client._SmartClient.hooks.install_named_hook(
 
2725
            'call', capture_hpss_call, None)
 
2726
        _mod_transport.Transport.hooks.install_named_hook(
 
2727
            'post_connect', capture_connect, None)
 
2728
 
 
2729
    def reset_smart_call_log(self):
 
2730
        self.hpss_calls = []
 
2731
        self.hpss_connections = []
 
2732
 
 
2733
 
 
2734
class TestCaseInTempDir(TestCaseWithMemoryTransport):
575
2735
    """Derived class that runs a test within a temporary directory.
576
2736
 
577
2737
    This is useful for tests that need to create a branch, etc.
581
2741
    All test cases create their own directory within that.  If the
582
2742
    tests complete successfully, the directory is removed.
583
2743
 
584
 
    InTempDir is an old alias for FunctionalTestCase.
 
2744
    :ivar test_base_dir: The path of the top-level directory for this
 
2745
    test, which contains a home directory and a work directory.
 
2746
 
 
2747
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2748
    which is used as $HOME for this test.
 
2749
 
 
2750
    :ivar test_dir: A directory under test_base_dir used as the current
 
2751
    directory when the test proper is run.
585
2752
    """
586
2753
 
587
 
    TEST_ROOT = None
588
 
    _TEST_NAME = 'test'
589
2754
    OVERRIDE_PYTHON = 'python'
590
2755
 
 
2756
    def setUp(self):
 
2757
        super(TestCaseInTempDir, self).setUp()
 
2758
        # Remove the protection set in isolated_environ, we have a proper
 
2759
        # access to disk resources now.
 
2760
        self.overrideEnv('BRZ_LOG', None)
 
2761
 
591
2762
    def check_file_contents(self, filename, expect):
592
2763
        self.log("check contents of file %s" % filename)
593
 
        contents = file(filename, 'r').read()
 
2764
        with open(filename) as f:
 
2765
            contents = f.read()
594
2766
        if contents != expect:
595
2767
            self.log("expected: %r" % expect)
596
2768
            self.log("actually: %r" % contents)
597
2769
            self.fail("contents of %s not as expected" % filename)
598
2770
 
599
 
    def _make_test_root(self):
600
 
        if TestCaseInTempDir.TEST_ROOT is not None:
601
 
            return
602
 
        i = 0
603
 
        while True:
604
 
            root = u'test%04d.tmp' % i
605
 
            try:
606
 
                os.mkdir(root)
607
 
            except OSError, e:
608
 
                if e.errno == errno.EEXIST:
609
 
                    i += 1
610
 
                    continue
611
 
                else:
612
 
                    raise
613
 
            # successfully created
614
 
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
615
 
            break
616
 
        # make a fake bzr directory there to prevent any tests propagating
617
 
        # up onto the source directory's real branch
618
 
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
619
 
 
620
 
    def setUp(self):
621
 
        super(TestCaseInTempDir, self).setUp()
622
 
        self._make_test_root()
623
 
        _currentdir = os.getcwdu()
624
 
        # shorten the name, to avoid test failures due to path length
625
 
        short_id = self.id().replace('bzrlib.tests.', '') \
626
 
                   .replace('__main__.', '')[-100:]
627
 
        # it's possible the same test class is run several times for
628
 
        # parameterized tests, so make sure the names don't collide.  
629
 
        i = 0
630
 
        while True:
631
 
            if i > 0:
632
 
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
633
 
            else:
634
 
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
635
 
            if os.path.exists(candidate_dir):
636
 
                i = i + 1
637
 
                continue
638
 
            else:
639
 
                self.test_dir = candidate_dir
640
 
                os.mkdir(self.test_dir)
641
 
                os.chdir(self.test_dir)
 
2771
    def _getTestDirPrefix(self):
 
2772
        # create a directory within the top level test directory
 
2773
        if sys.platform in ('win32', 'cygwin'):
 
2774
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2775
            # windows is likely to have path-length limits so use a short name
 
2776
            name_prefix = name_prefix[-30:]
 
2777
        else:
 
2778
            name_prefix = re.sub('[/]', '_', self.id())
 
2779
        return name_prefix
 
2780
 
 
2781
    def makeAndChdirToTestDir(self):
 
2782
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2783
 
 
2784
        For TestCaseInTempDir we create a temporary directory based on the test
 
2785
        name and then create two subdirs - test and home under it.
 
2786
        """
 
2787
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2788
            self._getTestDirPrefix())
 
2789
        name = name_prefix
 
2790
        for i in range(100):
 
2791
            if os.path.exists(name):
 
2792
                name = name_prefix + '_' + str(i)
 
2793
            else:
 
2794
                # now create test and home directories within this dir
 
2795
                self.test_base_dir = name
 
2796
                self.addCleanup(self.deleteTestDir)
 
2797
                os.mkdir(self.test_base_dir)
642
2798
                break
643
 
        os.environ['HOME'] = self.test_dir
644
 
        os.environ['APPDATA'] = self.test_dir
645
 
        def _leaveDirectory():
646
 
            os.chdir(_currentdir)
647
 
        self.addCleanup(_leaveDirectory)
648
 
        
649
 
    def build_tree(self, shape, line_endings='native', transport=None):
 
2799
        self.permit_dir(self.test_base_dir)
 
2800
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2801
        # stacking policy to honour; create a bzr dir with an unshared
 
2802
        # repository (but not a branch - our code would be trying to escape
 
2803
        # then!) to stop them, and permit it to be read.
 
2804
        # control = controldir.ControlDir.create(self.test_base_dir)
 
2805
        # control.create_repository()
 
2806
        self.test_home_dir = self.test_base_dir + '/home'
 
2807
        os.mkdir(self.test_home_dir)
 
2808
        self.test_dir = self.test_base_dir + '/work'
 
2809
        os.mkdir(self.test_dir)
 
2810
        os.chdir(self.test_dir)
 
2811
        # put name of test inside
 
2812
        with open(self.test_base_dir + '/name', 'w') as f:
 
2813
            f.write(self.id())
 
2814
 
 
2815
    def deleteTestDir(self):
 
2816
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2817
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2818
 
 
2819
    def build_tree(self, shape, line_endings='binary', transport=None):
650
2820
        """Build a test tree according to a pattern.
651
2821
 
652
2822
        shape is a sequence of file specifications.  If the final
653
2823
        character is '/', a directory is created.
654
2824
 
 
2825
        This assumes that all the elements in the tree being built are new.
 
2826
 
655
2827
        This doesn't add anything to a branch.
 
2828
 
 
2829
        :type shape:    list or tuple.
656
2830
        :param line_endings: Either 'binary' or 'native'
657
 
                             in binary mode, exact contents are written
658
 
                             in native mode, the line endings match the
659
 
                             default platform endings.
660
 
 
661
 
        :param transport: A transport to write to, for building trees on 
662
 
                          VFS's. If the transport is readonly or None,
663
 
                          "." is opened automatically.
 
2831
            in binary mode, exact contents are written in native mode, the
 
2832
            line endings match the default platform endings.
 
2833
        :param transport: A transport to write to, for building trees on VFS's.
 
2834
            If the transport is readonly or None, "." is opened automatically.
 
2835
        :return: None
664
2836
        """
665
 
        # XXX: It's OK to just create them using forward slashes on windows?
 
2837
        if type(shape) not in (list, tuple):
 
2838
            raise AssertionError("Parameter 'shape' should be "
 
2839
                "a list or a tuple. Got %r instead" % (shape,))
 
2840
        # It's OK to just create them using forward slashes on windows.
666
2841
        if transport is None or transport.is_readonly():
667
 
            transport = get_transport(".")
 
2842
            transport = _mod_transport.get_transport_from_path(".")
668
2843
        for name in shape:
669
 
            self.assert_(isinstance(name, basestring))
 
2844
            self.assertIsInstance(name, (str, text_type))
670
2845
            if name[-1] == '/':
671
 
                transport.mkdir(urlescape(name[:-1]))
 
2846
                transport.mkdir(urlutils.escape(name[:-1]))
672
2847
            else:
673
2848
                if line_endings == 'binary':
674
 
                    end = '\n'
 
2849
                    end = b'\n'
675
2850
                elif line_endings == 'native':
676
 
                    end = os.linesep
 
2851
                    end = os.linesep.encode('ascii')
677
2852
                else:
678
 
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
679
 
                content = "contents of %s%s" % (name, end)
680
 
                transport.put(urlescape(name), StringIO(content))
681
 
 
682
 
    def build_tree_contents(self, shape):
683
 
        build_tree_contents(shape)
684
 
 
685
 
    def failUnlessExists(self, path):
686
 
        """Fail unless path, which may be abs or relative, exists."""
687
 
        self.failUnless(osutils.lexists(path))
688
 
 
689
 
    def failIfExists(self, path):
690
 
        """Fail if path, which may be abs or relative, exists."""
691
 
        self.failIf(osutils.lexists(path))
692
 
        
693
 
    def assertFileEqual(self, content, path):
694
 
        """Fail if path does not contain 'content'."""
695
 
        self.failUnless(osutils.lexists(path))
696
 
        self.assertEqualDiff(content, open(path, 'r').read())
 
2853
                    raise errors.BzrError(
 
2854
                        'Invalid line ending request %r' % line_endings)
 
2855
                content = b"contents of %s%s" % (name.encode('utf-8'), end)
 
2856
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2857
 
 
2858
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2859
 
 
2860
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2861
        """Assert whether path or paths are in the WorkingTree"""
 
2862
        if tree is None:
 
2863
            tree = workingtree.WorkingTree.open(root_path)
 
2864
        if not isinstance(path, (str, text_type)):
 
2865
            for p in path:
 
2866
                self.assertInWorkingTree(p, tree=tree)
 
2867
        else:
 
2868
            self.assertIsNot(tree.path2id(path), None,
 
2869
                path+' not in working tree.')
 
2870
 
 
2871
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2872
        """Assert whether path or paths are not in the WorkingTree"""
 
2873
        if tree is None:
 
2874
            tree = workingtree.WorkingTree.open(root_path)
 
2875
        if not isinstance(path, (str, text_type)):
 
2876
            for p in path:
 
2877
                self.assertNotInWorkingTree(p, tree=tree)
 
2878
        else:
 
2879
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
697
2880
 
698
2881
 
699
2882
class TestCaseWithTransport(TestCaseInTempDir):
706
2889
    ReadonlyTransportDecorator is used instead which allows the use of non disk
707
2890
    based read write transports.
708
2891
 
709
 
    If an explicit class is provided for readonly access, that server and the 
 
2892
    If an explicit class is provided for readonly access, that server and the
710
2893
    readwrite one must both define get_url() as resolving to os.getcwd().
711
2894
    """
712
2895
 
713
 
    def __init__(self, methodName='testMethod'):
714
 
        super(TestCaseWithTransport, self).__init__(methodName)
715
 
        self.__readonly_server = None
716
 
        self.__server = None
717
 
        self.transport_server = default_transport
718
 
        self.transport_readonly_server = None
719
 
 
720
 
    def get_readonly_url(self, relpath=None):
721
 
        """Get a URL for the readonly transport.
722
 
 
723
 
        This will either be backed by '.' or a decorator to the transport 
724
 
        used by self.get_url()
725
 
        relpath provides for clients to get a path relative to the base url.
726
 
        These should only be downwards relative, not upwards.
727
 
        """
728
 
        base = self.get_readonly_server().get_url()
729
 
        if relpath is not None:
730
 
            if not base.endswith('/'):
731
 
                base = base + '/'
732
 
            base = base + relpath
733
 
        return base
734
 
 
735
 
    def get_readonly_server(self):
736
 
        """Get the server instance for the readonly transport
737
 
 
738
 
        This is useful for some tests with specific servers to do diagnostics.
739
 
        """
740
 
        if self.__readonly_server is None:
741
 
            if self.transport_readonly_server is None:
742
 
                # readonly decorator requested
743
 
                # bring up the server
744
 
                self.get_url()
745
 
                self.__readonly_server = ReadonlyServer()
746
 
                self.__readonly_server.setUp(self.__server)
747
 
            else:
748
 
                self.__readonly_server = self.transport_readonly_server()
749
 
                self.__readonly_server.setUp()
750
 
            self.addCleanup(self.__readonly_server.tearDown)
751
 
        return self.__readonly_server
752
 
 
753
 
    def get_server(self):
754
 
        """Get the read/write server instance.
 
2896
    def setUp(self):
 
2897
        super(TestCaseWithTransport, self).setUp()
 
2898
        self.__vfs_server = None
 
2899
 
 
2900
    def get_vfs_only_server(self):
 
2901
        """See TestCaseWithMemoryTransport.
755
2902
 
756
2903
        This is useful for some tests with specific servers that need
757
2904
        diagnostics.
758
2905
        """
759
 
        if self.__server is None:
760
 
            self.__server = self.transport_server()
761
 
            self.__server.setUp()
762
 
            self.addCleanup(self.__server.tearDown)
763
 
        return self.__server
764
 
 
765
 
    def get_url(self, relpath=None):
766
 
        """Get a URL for the readwrite transport.
767
 
 
768
 
        This will either be backed by '.' or to an equivalent non-file based
769
 
        facility.
770
 
        relpath provides for clients to get a path relative to the base url.
771
 
        These should only be downwards relative, not upwards.
772
 
        """
773
 
        base = self.get_server().get_url()
774
 
        if relpath is not None and relpath != '.':
775
 
            if not base.endswith('/'):
776
 
                base = base + '/'
777
 
            base = base + relpath
778
 
        return base
779
 
 
780
 
    def get_transport(self):
781
 
        """Return a writeable transport for the test scratch space"""
782
 
        t = get_transport(self.get_url())
783
 
        self.assertFalse(t.is_readonly())
784
 
        return t
785
 
 
786
 
    def get_readonly_transport(self):
787
 
        """Return a readonly transport for the test scratch space
788
 
        
789
 
        This can be used to test that operations which should only need
790
 
        readonly access in fact do not try to write.
791
 
        """
792
 
        t = get_transport(self.get_readonly_url())
793
 
        self.assertTrue(t.is_readonly())
794
 
        return t
795
 
 
796
 
    def make_branch(self, relpath):
797
 
        """Create a branch on the transport at relpath."""
798
 
        repo = self.make_repository(relpath)
799
 
        return repo.bzrdir.create_branch()
800
 
 
801
 
    def make_bzrdir(self, relpath):
802
 
        try:
803
 
            url = self.get_url(relpath)
804
 
            segments = relpath.split('/')
805
 
            if segments and segments[-1] not in ('', '.'):
806
 
                parent = self.get_url('/'.join(segments[:-1]))
807
 
                t = get_transport(parent)
808
 
                try:
809
 
                    t.mkdir(segments[-1])
810
 
                except errors.FileExists:
811
 
                    pass
812
 
            return bzrlib.bzrdir.BzrDir.create(url)
813
 
        except errors.UninitializableFormat:
814
 
            raise TestSkipped("Format %s is not initializable.")
815
 
 
816
 
    def make_repository(self, relpath, shared=False):
817
 
        """Create a repository on our default transport at relpath."""
818
 
        made_control = self.make_bzrdir(relpath)
819
 
        return made_control.create_repository(shared=shared)
820
 
 
821
 
    def make_branch_and_tree(self, relpath):
 
2906
        if self.__vfs_server is None:
 
2907
            self.__vfs_server = self.vfs_transport_factory()
 
2908
            self.start_server(self.__vfs_server)
 
2909
        return self.__vfs_server
 
2910
 
 
2911
    def make_branch_and_tree(self, relpath, format=None):
822
2912
        """Create a branch on the transport and a tree locally.
823
2913
 
824
 
        Returns the tree.
 
2914
        If the transport is not a LocalTransport, the Tree can't be created on
 
2915
        the transport.  In that case if the vfs_transport_factory is
 
2916
        LocalURLServer the working tree is created in the local
 
2917
        directory backing the transport, and the returned tree's branch and
 
2918
        repository will also be accessed locally. Otherwise a lightweight
 
2919
        checkout is created and returned.
 
2920
 
 
2921
        We do this because we can't physically create a tree in the local
 
2922
        path, with a branch reference to the transport_factory url, and
 
2923
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2924
        namespace is distinct from the local disk - the two branch objects
 
2925
        would collide. While we could construct a tree with its branch object
 
2926
        pointing at the transport_factory transport in memory, reopening it
 
2927
        would behaving unexpectedly, and has in the past caused testing bugs
 
2928
        when we tried to do it that way.
 
2929
 
 
2930
        :param format: The BzrDirFormat.
 
2931
        :returns: the WorkingTree.
825
2932
        """
826
2933
        # TODO: always use the local disk path for the working tree,
827
2934
        # this obviously requires a format that supports branch references
828
2935
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
829
2936
        # RBC 20060208
830
 
        b = self.make_branch(relpath)
 
2937
        format = self.resolve_format(format=format)
 
2938
        if not format.supports_workingtrees:
 
2939
            b = self.make_branch(relpath+'.branch', format=format)
 
2940
            return b.create_checkout(relpath, lightweight=True)
 
2941
        b = self.make_branch(relpath, format=format)
831
2942
        try:
832
 
            return b.bzrdir.create_workingtree()
 
2943
            return b.controldir.create_workingtree()
833
2944
        except errors.NotLocalUrl:
834
 
            # new formats - catch No tree error and create
835
 
            # a branch reference and a checkout.
836
 
            # old formats at that point - raise TestSkipped.
837
 
            # TODO: rbc 20060208
838
 
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
 
2945
            # We can only make working trees locally at the moment.  If the
 
2946
            # transport can't support them, then we keep the non-disk-backed
 
2947
            # branch and create a local checkout.
 
2948
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2949
                # the branch is colocated on disk, we cannot create a checkout.
 
2950
                # hopefully callers will expect this.
 
2951
                local_controldir = controldir.ControlDir.open(
 
2952
                    self.get_vfs_only_url(relpath))
 
2953
                wt = local_controldir.create_workingtree()
 
2954
                if wt.branch._format != b._format:
 
2955
                    wt._branch = b
 
2956
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2957
                    # in case the implementation details of workingtree objects
 
2958
                    # change.
 
2959
                    self.assertIs(b, wt.branch)
 
2960
                return wt
 
2961
            else:
 
2962
                return b.create_checkout(relpath, lightweight=True)
839
2963
 
840
2964
    def assertIsDirectory(self, relpath, transport):
841
2965
        """Assert that relpath within transport is a directory.
852
2976
            self.fail("path %s is not a directory; has mode %#o"
853
2977
                      % (relpath, mode))
854
2978
 
 
2979
    def assertTreesEqual(self, left, right):
 
2980
        """Check that left and right have the same content and properties."""
 
2981
        # we use a tree delta to check for equality of the content, and we
 
2982
        # manually check for equality of other things such as the parents list.
 
2983
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2984
        differences = left.changes_from(right)
 
2985
        self.assertFalse(differences.has_changed(),
 
2986
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2987
 
 
2988
    def disable_missing_extensions_warning(self):
 
2989
        """Some tests expect a precise stderr content.
 
2990
 
 
2991
        There is no point in forcing them to duplicate the extension related
 
2992
        warning.
 
2993
        """
 
2994
        config.GlobalConfig().set_user_option(
 
2995
            'suppress_warnings', 'missing_extensions')
 
2996
 
855
2997
 
856
2998
class ChrootedTestCase(TestCaseWithTransport):
857
2999
    """A support class that provides readonly urls outside the local namespace.
861
3003
    for readonly urls.
862
3004
 
863
3005
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
864
 
                       be used without needed to redo it when a different 
 
3006
                       be used without needed to redo it when a different
865
3007
                       subclass is in use ?
866
3008
    """
867
3009
 
868
3010
    def setUp(self):
 
3011
        from breezy.tests import http_server
869
3012
        super(ChrootedTestCase, self).setUp()
870
 
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
871
 
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
3013
        if not self.vfs_transport_factory == memory.MemoryServer:
 
3014
            self.transport_readonly_server = http_server.HttpServer
 
3015
 
 
3016
 
 
3017
def condition_id_re(pattern):
 
3018
    """Create a condition filter which performs a re check on a test's id.
 
3019
 
 
3020
    :param pattern: A regular expression string.
 
3021
    :return: A callable that returns True if the re matches.
 
3022
    """
 
3023
    filter_re = re.compile(pattern, 0)
 
3024
    def condition(test):
 
3025
        test_id = test.id()
 
3026
        return filter_re.search(test_id)
 
3027
    return condition
 
3028
 
 
3029
 
 
3030
def condition_isinstance(klass_or_klass_list):
 
3031
    """Create a condition filter which returns isinstance(param, klass).
 
3032
 
 
3033
    :return: A callable which when called with one parameter obj return the
 
3034
        result of isinstance(obj, klass_or_klass_list).
 
3035
    """
 
3036
    def condition(obj):
 
3037
        return isinstance(obj, klass_or_klass_list)
 
3038
    return condition
 
3039
 
 
3040
 
 
3041
def condition_id_in_list(id_list):
 
3042
    """Create a condition filter which verify that test's id in a list.
 
3043
 
 
3044
    :param id_list: A TestIdList object.
 
3045
    :return: A callable that returns True if the test's id appears in the list.
 
3046
    """
 
3047
    def condition(test):
 
3048
        return id_list.includes(test.id())
 
3049
    return condition
 
3050
 
 
3051
 
 
3052
def condition_id_startswith(starts):
 
3053
    """Create a condition filter verifying that test's id starts with a string.
 
3054
 
 
3055
    :param starts: A list of string.
 
3056
    :return: A callable that returns True if the test's id starts with one of
 
3057
        the given strings.
 
3058
    """
 
3059
    def condition(test):
 
3060
        for start in starts:
 
3061
            if test.id().startswith(start):
 
3062
                return True
 
3063
        return False
 
3064
    return condition
 
3065
 
 
3066
 
 
3067
def exclude_tests_by_condition(suite, condition):
 
3068
    """Create a test suite which excludes some tests from suite.
 
3069
 
 
3070
    :param suite: The suite to get tests from.
 
3071
    :param condition: A callable whose result evaluates True when called with a
 
3072
        test case which should be excluded from the result.
 
3073
    :return: A suite which contains the tests found in suite that fail
 
3074
        condition.
 
3075
    """
 
3076
    result = []
 
3077
    for test in iter_suite_tests(suite):
 
3078
        if not condition(test):
 
3079
            result.append(test)
 
3080
    return TestUtil.TestSuite(result)
 
3081
 
 
3082
 
 
3083
def filter_suite_by_condition(suite, condition):
 
3084
    """Create a test suite by filtering another one.
 
3085
 
 
3086
    :param suite: The source suite.
 
3087
    :param condition: A callable whose result evaluates True when called with a
 
3088
        test case which should be included in the result.
 
3089
    :return: A suite which contains the tests found in suite that pass
 
3090
        condition.
 
3091
    """
 
3092
    result = []
 
3093
    for test in iter_suite_tests(suite):
 
3094
        if condition(test):
 
3095
            result.append(test)
 
3096
    return TestUtil.TestSuite(result)
872
3097
 
873
3098
 
874
3099
def filter_suite_by_re(suite, pattern):
875
 
    result = TestSuite()
876
 
    filter_re = re.compile(pattern)
 
3100
    """Create a test suite by filtering another one.
 
3101
 
 
3102
    :param suite:           the source suite
 
3103
    :param pattern:         pattern that names must match
 
3104
    :returns: the newly created suite
 
3105
    """
 
3106
    condition = condition_id_re(pattern)
 
3107
    result_suite = filter_suite_by_condition(suite, condition)
 
3108
    return result_suite
 
3109
 
 
3110
 
 
3111
def filter_suite_by_id_list(suite, test_id_list):
 
3112
    """Create a test suite by filtering another one.
 
3113
 
 
3114
    :param suite: The source suite.
 
3115
    :param test_id_list: A list of the test ids to keep as strings.
 
3116
    :returns: the newly created suite
 
3117
    """
 
3118
    condition = condition_id_in_list(test_id_list)
 
3119
    result_suite = filter_suite_by_condition(suite, condition)
 
3120
    return result_suite
 
3121
 
 
3122
 
 
3123
def filter_suite_by_id_startswith(suite, start):
 
3124
    """Create a test suite by filtering another one.
 
3125
 
 
3126
    :param suite: The source suite.
 
3127
    :param start: A list of string the test id must start with one of.
 
3128
    :returns: the newly created suite
 
3129
    """
 
3130
    condition = condition_id_startswith(start)
 
3131
    result_suite = filter_suite_by_condition(suite, condition)
 
3132
    return result_suite
 
3133
 
 
3134
 
 
3135
def exclude_tests_by_re(suite, pattern):
 
3136
    """Create a test suite which excludes some tests from suite.
 
3137
 
 
3138
    :param suite: The suite to get tests from.
 
3139
    :param pattern: A regular expression string. Test ids that match this
 
3140
        pattern will be excluded from the result.
 
3141
    :return: A TestSuite that contains all the tests from suite without the
 
3142
        tests that matched pattern. The order of tests is the same as it was in
 
3143
        suite.
 
3144
    """
 
3145
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
3146
 
 
3147
 
 
3148
def preserve_input(something):
 
3149
    """A helper for performing test suite transformation chains.
 
3150
 
 
3151
    :param something: Anything you want to preserve.
 
3152
    :return: Something.
 
3153
    """
 
3154
    return something
 
3155
 
 
3156
 
 
3157
def randomize_suite(suite):
 
3158
    """Return a new TestSuite with suite's tests in random order.
 
3159
 
 
3160
    The tests in the input suite are flattened into a single suite in order to
 
3161
    accomplish this. Any nested TestSuites are removed to provide global
 
3162
    randomness.
 
3163
    """
 
3164
    tests = list(iter_suite_tests(suite))
 
3165
    random.shuffle(tests)
 
3166
    return TestUtil.TestSuite(tests)
 
3167
 
 
3168
 
 
3169
def split_suite_by_condition(suite, condition):
 
3170
    """Split a test suite into two by a condition.
 
3171
 
 
3172
    :param suite: The suite to split.
 
3173
    :param condition: The condition to match on. Tests that match this
 
3174
        condition are returned in the first test suite, ones that do not match
 
3175
        are in the second suite.
 
3176
    :return: A tuple of two test suites, where the first contains tests from
 
3177
        suite matching the condition, and the second contains the remainder
 
3178
        from suite. The order within each output suite is the same as it was in
 
3179
        suite.
 
3180
    """
 
3181
    matched = []
 
3182
    did_not_match = []
877
3183
    for test in iter_suite_tests(suite):
878
 
        if filter_re.search(test.id()):
879
 
            result.addTest(test)
880
 
    return result
 
3184
        if condition(test):
 
3185
            matched.append(test)
 
3186
        else:
 
3187
            did_not_match.append(test)
 
3188
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
3189
 
 
3190
 
 
3191
def split_suite_by_re(suite, pattern):
 
3192
    """Split a test suite into two by a regular expression.
 
3193
 
 
3194
    :param suite: The suite to split.
 
3195
    :param pattern: A regular expression string. Test ids that match this
 
3196
        pattern will be in the first test suite returned, and the others in the
 
3197
        second test suite returned.
 
3198
    :return: A tuple of two test suites, where the first contains tests from
 
3199
        suite matching pattern, and the second contains the remainder from
 
3200
        suite. The order within each output suite is the same as it was in
 
3201
        suite.
 
3202
    """
 
3203
    return split_suite_by_condition(suite, condition_id_re(pattern))
881
3204
 
882
3205
 
883
3206
def run_suite(suite, name='test', verbose=False, pattern=".*",
884
 
              stop_on_failure=False, keep_output=False,
885
 
              transport=None):
886
 
    TestCaseInTempDir._TEST_NAME = name
 
3207
              stop_on_failure=False,
 
3208
              transport=None, lsprof_timed=None, bench_history=None,
 
3209
              matching_tests_first=None,
 
3210
              list_only=False,
 
3211
              random_seed=None,
 
3212
              exclude_pattern=None,
 
3213
              strict=False,
 
3214
              runner_class=None,
 
3215
              suite_decorators=None,
 
3216
              stream=None,
 
3217
              result_decorators=None,
 
3218
              ):
 
3219
    """Run a test suite for brz selftest.
 
3220
 
 
3221
    :param runner_class: The class of runner to use. Must support the
 
3222
        constructor arguments passed by run_suite which are more than standard
 
3223
        python uses.
 
3224
    :return: A boolean indicating success.
 
3225
    """
 
3226
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
887
3227
    if verbose:
888
3228
        verbosity = 2
889
3229
    else:
890
3230
        verbosity = 1
891
 
    runner = TextTestRunner(stream=sys.stdout,
 
3231
    if runner_class is None:
 
3232
        runner_class = TextTestRunner
 
3233
    if stream is None:
 
3234
        stream = sys.stdout
 
3235
    runner = runner_class(stream=stream,
892
3236
                            descriptions=0,
893
 
                            verbosity=verbosity)
 
3237
                            verbosity=verbosity,
 
3238
                            bench_history=bench_history,
 
3239
                            strict=strict,
 
3240
                            result_decorators=result_decorators,
 
3241
                            )
894
3242
    runner.stop_on_failure=stop_on_failure
895
 
    if pattern != '.*':
896
 
        suite = filter_suite_by_re(suite, pattern)
 
3243
    if isinstance(suite, unittest.TestSuite):
 
3244
        # Empty out _tests list of passed suite and populate new TestSuite
 
3245
        suite._tests[:], suite = [], TestSuite(suite)
 
3246
    # built in decorator factories:
 
3247
    decorators = [
 
3248
        random_order(random_seed, runner),
 
3249
        exclude_tests(exclude_pattern),
 
3250
        ]
 
3251
    if matching_tests_first:
 
3252
        decorators.append(tests_first(pattern))
 
3253
    else:
 
3254
        decorators.append(filter_tests(pattern))
 
3255
    if suite_decorators:
 
3256
        decorators.extend(suite_decorators)
 
3257
    # tell the result object how many tests will be running: (except if
 
3258
    # --parallel=fork is being used. Robert said he will provide a better
 
3259
    # progress design later -- vila 20090817)
 
3260
    if fork_decorator not in decorators:
 
3261
        decorators.append(CountingDecorator)
 
3262
    for decorator in decorators:
 
3263
        suite = decorator(suite)
 
3264
    if list_only:
 
3265
        # Done after test suite decoration to allow randomisation etc
 
3266
        # to take effect, though that is of marginal benefit.
 
3267
        if verbosity >= 2:
 
3268
            stream.write("Listing tests only ...\n")
 
3269
        if getattr(runner, 'list', None) is not None:
 
3270
            runner.list(suite)
 
3271
        else:
 
3272
            for t in iter_suite_tests(suite):
 
3273
                stream.write("%s\n" % (t.id()))
 
3274
        return True
897
3275
    result = runner.run(suite)
898
 
    # This is still a little bogus, 
899
 
    # but only a little. Folk not using our testrunner will
900
 
    # have to delete their temp directories themselves.
901
 
    test_root = TestCaseInTempDir.TEST_ROOT
902
 
    if result.wasSuccessful() or not keep_output:
903
 
        if test_root is not None:
904
 
            print 'Deleting test root %s...' % test_root
905
 
            try:
906
 
                shutil.rmtree(test_root)
907
 
            finally:
908
 
                print
 
3276
    if strict:
 
3277
        return result.wasStrictlySuccessful()
909
3278
    else:
910
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
911
 
    return result.wasSuccessful()
 
3279
        return result.wasSuccessful()
 
3280
 
 
3281
 
 
3282
# A registry where get() returns a suite decorator.
 
3283
parallel_registry = registry.Registry()
 
3284
 
 
3285
 
 
3286
def fork_decorator(suite):
 
3287
    if getattr(os, "fork", None) is None:
 
3288
        raise errors.BzrCommandError("platform does not support fork,"
 
3289
            " try --parallel=subprocess instead.")
 
3290
    concurrency = osutils.local_concurrency()
 
3291
    if concurrency == 1:
 
3292
        return suite
 
3293
    from testtools import ConcurrentTestSuite
 
3294
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3295
parallel_registry.register('fork', fork_decorator)
 
3296
 
 
3297
 
 
3298
def subprocess_decorator(suite):
 
3299
    concurrency = osutils.local_concurrency()
 
3300
    if concurrency == 1:
 
3301
        return suite
 
3302
    from testtools import ConcurrentTestSuite
 
3303
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3304
parallel_registry.register('subprocess', subprocess_decorator)
 
3305
 
 
3306
 
 
3307
def exclude_tests(exclude_pattern):
 
3308
    """Return a test suite decorator that excludes tests."""
 
3309
    if exclude_pattern is None:
 
3310
        return identity_decorator
 
3311
    def decorator(suite):
 
3312
        return ExcludeDecorator(suite, exclude_pattern)
 
3313
    return decorator
 
3314
 
 
3315
 
 
3316
def filter_tests(pattern):
 
3317
    if pattern == '.*':
 
3318
        return identity_decorator
 
3319
    def decorator(suite):
 
3320
        return FilterTestsDecorator(suite, pattern)
 
3321
    return decorator
 
3322
 
 
3323
 
 
3324
def random_order(random_seed, runner):
 
3325
    """Return a test suite decorator factory for randomising tests order.
 
3326
 
 
3327
    :param random_seed: now, a string which casts to an integer, or an integer.
 
3328
    :param runner: A test runner with a stream attribute to report on.
 
3329
    """
 
3330
    if random_seed is None:
 
3331
        return identity_decorator
 
3332
    def decorator(suite):
 
3333
        return RandomDecorator(suite, random_seed, runner.stream)
 
3334
    return decorator
 
3335
 
 
3336
 
 
3337
def tests_first(pattern):
 
3338
    if pattern == '.*':
 
3339
        return identity_decorator
 
3340
    def decorator(suite):
 
3341
        return TestFirstDecorator(suite, pattern)
 
3342
    return decorator
 
3343
 
 
3344
 
 
3345
def identity_decorator(suite):
 
3346
    """Return suite."""
 
3347
    return suite
 
3348
 
 
3349
 
 
3350
class TestDecorator(TestUtil.TestSuite):
 
3351
    """A decorator for TestCase/TestSuite objects.
 
3352
 
 
3353
    Contains rather than flattening suite passed on construction
 
3354
    """
 
3355
 
 
3356
    def __init__(self, suite=None):
 
3357
        super(TestDecorator, self).__init__()
 
3358
        if suite is not None:
 
3359
            self.addTest(suite)
 
3360
 
 
3361
    # Don't need subclass run method with suite emptying
 
3362
    run = unittest.TestSuite.run
 
3363
 
 
3364
 
 
3365
class CountingDecorator(TestDecorator):
 
3366
    """A decorator which calls result.progress(self.countTestCases)."""
 
3367
 
 
3368
    def run(self, result):
 
3369
        progress_method = getattr(result, 'progress', None)
 
3370
        if callable(progress_method):
 
3371
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3372
        return super(CountingDecorator, self).run(result)
 
3373
 
 
3374
 
 
3375
class ExcludeDecorator(TestDecorator):
 
3376
    """A decorator which excludes test matching an exclude pattern."""
 
3377
 
 
3378
    def __init__(self, suite, exclude_pattern):
 
3379
        super(ExcludeDecorator, self).__init__(
 
3380
            exclude_tests_by_re(suite, exclude_pattern))
 
3381
 
 
3382
 
 
3383
class FilterTestsDecorator(TestDecorator):
 
3384
    """A decorator which filters tests to those matching a pattern."""
 
3385
 
 
3386
    def __init__(self, suite, pattern):
 
3387
        super(FilterTestsDecorator, self).__init__(
 
3388
            filter_suite_by_re(suite, pattern))
 
3389
 
 
3390
 
 
3391
class RandomDecorator(TestDecorator):
 
3392
    """A decorator which randomises the order of its tests."""
 
3393
 
 
3394
    def __init__(self, suite, random_seed, stream):
 
3395
        random_seed = self.actual_seed(random_seed)
 
3396
        stream.write("Randomizing test order using seed %s\n\n" %
 
3397
            (random_seed,))
 
3398
        # Initialise the random number generator.
 
3399
        random.seed(random_seed)
 
3400
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3401
 
 
3402
    @staticmethod
 
3403
    def actual_seed(seed):
 
3404
        if seed == "now":
 
3405
            # We convert the seed to an integer to make it reuseable across
 
3406
            # invocations (because the user can reenter it).
 
3407
            return int(time.time())
 
3408
        else:
 
3409
            # Convert the seed to an integer if we can
 
3410
            try:
 
3411
                return int(seed)
 
3412
            except (TypeError, ValueError):
 
3413
                pass
 
3414
        return seed
 
3415
 
 
3416
 
 
3417
class TestFirstDecorator(TestDecorator):
 
3418
    """A decorator which moves named tests to the front."""
 
3419
 
 
3420
    def __init__(self, suite, pattern):
 
3421
        super(TestFirstDecorator, self).__init__()
 
3422
        self.addTests(split_suite_by_re(suite, pattern))
 
3423
 
 
3424
 
 
3425
def partition_tests(suite, count):
 
3426
    """Partition suite into count lists of tests."""
 
3427
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3428
    # splits up blocks of related tests that might run faster if they shared
 
3429
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3430
    # just one partition.  So the slowest partition shouldn't be much slower
 
3431
    # than the fastest.
 
3432
    partitions = [list() for i in range(count)]
 
3433
    tests = iter_suite_tests(suite)
 
3434
    for partition, test in zip(itertools.cycle(partitions), tests):
 
3435
        partition.append(test)
 
3436
    return partitions
 
3437
 
 
3438
 
 
3439
def workaround_zealous_crypto_random():
 
3440
    """Crypto.Random want to help us being secure, but we don't care here.
 
3441
 
 
3442
    This workaround some test failure related to the sftp server. Once paramiko
 
3443
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3444
    """
 
3445
    try:
 
3446
        from Crypto.Random import atfork
 
3447
        atfork()
 
3448
    except ImportError:
 
3449
        pass
 
3450
 
 
3451
 
 
3452
def fork_for_tests(suite):
 
3453
    """Take suite and start up one runner per CPU by forking()
 
3454
 
 
3455
    :return: An iterable of TestCase-like objects which can each have
 
3456
        run(result) called on them to feed tests to result.
 
3457
    """
 
3458
    concurrency = osutils.local_concurrency()
 
3459
    result = []
 
3460
    from subunit import ProtocolTestCase
 
3461
    from subunit.test_results import AutoTimingTestResultDecorator
 
3462
    class TestInOtherProcess(ProtocolTestCase):
 
3463
        # Should be in subunit, I think. RBC.
 
3464
        def __init__(self, stream, pid):
 
3465
            ProtocolTestCase.__init__(self, stream)
 
3466
            self.pid = pid
 
3467
 
 
3468
        def run(self, result):
 
3469
            try:
 
3470
                ProtocolTestCase.run(self, result)
 
3471
            finally:
 
3472
                pid, status = os.waitpid(self.pid, 0)
 
3473
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3474
            #                that something went wrong.
 
3475
 
 
3476
    test_blocks = partition_tests(suite, concurrency)
 
3477
    # Clear the tests from the original suite so it doesn't keep them alive
 
3478
    suite._tests[:] = []
 
3479
    for process_tests in test_blocks:
 
3480
        process_suite = TestUtil.TestSuite(process_tests)
 
3481
        # Also clear each split list so new suite has only reference
 
3482
        process_tests[:] = []
 
3483
        c2pread, c2pwrite = os.pipe()
 
3484
        pid = os.fork()
 
3485
        if pid == 0:
 
3486
            try:
 
3487
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3488
                workaround_zealous_crypto_random()
 
3489
                os.close(c2pread)
 
3490
                # Leave stderr and stdout open so we can see test noise
 
3491
                # Close stdin so that the child goes away if it decides to
 
3492
                # read from stdin (otherwise its a roulette to see what
 
3493
                # child actually gets keystrokes for pdb etc).
 
3494
                sys.stdin.close()
 
3495
                subunit_result = AutoTimingTestResultDecorator(
 
3496
                    SubUnitBzrProtocolClientv1(stream))
 
3497
                process_suite.run(subunit_result)
 
3498
            except:
 
3499
                # Try and report traceback on stream, but exit with error even
 
3500
                # if stream couldn't be created or something else goes wrong.
 
3501
                # The traceback is formatted to a string and written in one go
 
3502
                # to avoid interleaving lines from multiple failing children.
 
3503
                try:
 
3504
                    stream.write(traceback.format_exc())
 
3505
                finally:
 
3506
                    os._exit(1)
 
3507
            os._exit(0)
 
3508
        else:
 
3509
            os.close(c2pwrite)
 
3510
            stream = os.fdopen(c2pread, 'rb', 1)
 
3511
            test = TestInOtherProcess(stream, pid)
 
3512
            result.append(test)
 
3513
    return result
 
3514
 
 
3515
 
 
3516
def reinvoke_for_tests(suite):
 
3517
    """Take suite and start up one runner per CPU using subprocess().
 
3518
 
 
3519
    :return: An iterable of TestCase-like objects which can each have
 
3520
        run(result) called on them to feed tests to result.
 
3521
    """
 
3522
    concurrency = osutils.local_concurrency()
 
3523
    result = []
 
3524
    from subunit import ProtocolTestCase
 
3525
    class TestInSubprocess(ProtocolTestCase):
 
3526
        def __init__(self, process, name):
 
3527
            ProtocolTestCase.__init__(self, process.stdout)
 
3528
            self.process = process
 
3529
            self.process.stdin.close()
 
3530
            self.name = name
 
3531
 
 
3532
        def run(self, result):
 
3533
            try:
 
3534
                ProtocolTestCase.run(self, result)
 
3535
            finally:
 
3536
                self.process.wait()
 
3537
                os.unlink(self.name)
 
3538
            # print "pid %d finished" % finished_process
 
3539
    test_blocks = partition_tests(suite, concurrency)
 
3540
    for process_tests in test_blocks:
 
3541
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3542
        bzr_path = os.path.dirname(os.path.dirname(breezy.__file__))+'/bzr'
 
3543
        if not os.path.isfile(bzr_path):
 
3544
            # We are probably installed. Assume sys.argv is the right file
 
3545
            bzr_path = sys.argv[0]
 
3546
        bzr_path = [bzr_path]
 
3547
        if sys.platform == "win32":
 
3548
            # if we're on windows, we can't execute the bzr script directly
 
3549
            bzr_path = [sys.executable] + bzr_path
 
3550
        fd, test_list_file_name = tempfile.mkstemp()
 
3551
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3552
        for test in process_tests:
 
3553
            test_list_file.write(test.id() + '\n')
 
3554
        test_list_file.close()
 
3555
        try:
 
3556
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3557
                '--subunit']
 
3558
            if '--no-plugins' in sys.argv:
 
3559
                argv.append('--no-plugins')
 
3560
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3561
            # noise on stderr it can interrupt the subunit protocol.
 
3562
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3563
                                      stdout=subprocess.PIPE,
 
3564
                                      stderr=subprocess.PIPE,
 
3565
                                      bufsize=1)
 
3566
            test = TestInSubprocess(process, test_list_file_name)
 
3567
            result.append(test)
 
3568
        except:
 
3569
            os.unlink(test_list_file_name)
 
3570
            raise
 
3571
    return result
 
3572
 
 
3573
 
 
3574
class ProfileResult(testtools.ExtendedToOriginalDecorator):
 
3575
    """Generate profiling data for all activity between start and success.
 
3576
    
 
3577
    The profile data is appended to the test's _benchcalls attribute and can
 
3578
    be accessed by the forwarded-to TestResult.
 
3579
 
 
3580
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3581
    where our existing output support for lsprof is, and this class aims to
 
3582
    fit in with that: while it could be moved it's not necessary to accomplish
 
3583
    test profiling, nor would it be dramatically cleaner.
 
3584
    """
 
3585
 
 
3586
    def startTest(self, test):
 
3587
        self.profiler = breezy.lsprof.BzrProfiler()
 
3588
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3589
        # unavoidably fail.
 
3590
        breezy.lsprof.BzrProfiler.profiler_block = 0
 
3591
        self.profiler.start()
 
3592
        testtools.ExtendedToOriginalDecorator.startTest(self, test)
 
3593
 
 
3594
    def addSuccess(self, test):
 
3595
        stats = self.profiler.stop()
 
3596
        try:
 
3597
            calls = test._benchcalls
 
3598
        except AttributeError:
 
3599
            test._benchcalls = []
 
3600
            calls = test._benchcalls
 
3601
        calls.append(((test.id(), "", ""), stats))
 
3602
        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
 
3603
 
 
3604
    def stopTest(self, test):
 
3605
        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
 
3606
        self.profiler = None
 
3607
 
 
3608
 
 
3609
# Controlled by "brz selftest -E=..." option
 
3610
# Currently supported:
 
3611
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3612
#                           preserves any flags supplied at the command line.
 
3613
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3614
#                           rather than failing tests. And no longer raise
 
3615
#                           LockContention when fctnl locks are not being used
 
3616
#                           with proper exclusion rules.
 
3617
#   -Ethreads               Will display thread ident at creation/join time to
 
3618
#                           help track thread leaks
 
3619
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3620
#                           deallocated after being completed.
 
3621
#   -Econfig_stats          Will collect statistics using addDetail
 
3622
selftest_debug_flags = set()
912
3623
 
913
3624
 
914
3625
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
915
 
             keep_output=False,
916
 
             transport=None):
 
3626
             transport=None,
 
3627
             test_suite_factory=None,
 
3628
             lsprof_timed=None,
 
3629
             bench_history=None,
 
3630
             matching_tests_first=None,
 
3631
             list_only=False,
 
3632
             random_seed=None,
 
3633
             exclude_pattern=None,
 
3634
             strict=False,
 
3635
             load_list=None,
 
3636
             debug_flags=None,
 
3637
             starting_with=None,
 
3638
             runner_class=None,
 
3639
             suite_decorators=None,
 
3640
             stream=None,
 
3641
             lsprof_tests=False,
 
3642
             ):
917
3643
    """Run the whole test suite under the enhanced runner"""
 
3644
    # XXX: Very ugly way to do this...
 
3645
    # Disable warning about old formats because we don't want it to disturb
 
3646
    # any blackbox tests.
 
3647
    from breezy import repository
 
3648
    repository._deprecation_warning_done = True
 
3649
 
918
3650
    global default_transport
919
3651
    if transport is None:
920
3652
        transport = default_transport
921
3653
    old_transport = default_transport
922
3654
    default_transport = transport
923
 
    suite = test_suite()
 
3655
    global selftest_debug_flags
 
3656
    old_debug_flags = selftest_debug_flags
 
3657
    if debug_flags is not None:
 
3658
        selftest_debug_flags = set(debug_flags)
924
3659
    try:
 
3660
        if load_list is None:
 
3661
            keep_only = None
 
3662
        else:
 
3663
            keep_only = load_test_id_list(load_list)
 
3664
        if starting_with:
 
3665
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3666
                             for start in starting_with]
 
3667
            # Always consider 'unittest' an interesting name so that failed
 
3668
            # suites wrapped as test cases appear in the output.
 
3669
            starting_with.append('unittest')
 
3670
        if test_suite_factory is None:
 
3671
            # Reduce loading time by loading modules based on the starting_with
 
3672
            # patterns.
 
3673
            suite = test_suite(keep_only, starting_with)
 
3674
        else:
 
3675
            suite = test_suite_factory()
 
3676
        if starting_with:
 
3677
            # But always filter as requested.
 
3678
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3679
        result_decorators = []
 
3680
        if lsprof_tests:
 
3681
            result_decorators.append(ProfileResult)
925
3682
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
926
 
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
927
 
                     transport=transport)
 
3683
                     stop_on_failure=stop_on_failure,
 
3684
                     transport=transport,
 
3685
                     lsprof_timed=lsprof_timed,
 
3686
                     bench_history=bench_history,
 
3687
                     matching_tests_first=matching_tests_first,
 
3688
                     list_only=list_only,
 
3689
                     random_seed=random_seed,
 
3690
                     exclude_pattern=exclude_pattern,
 
3691
                     strict=strict,
 
3692
                     runner_class=runner_class,
 
3693
                     suite_decorators=suite_decorators,
 
3694
                     stream=stream,
 
3695
                     result_decorators=result_decorators,
 
3696
                     )
928
3697
    finally:
929
3698
        default_transport = old_transport
930
 
 
931
 
 
932
 
 
933
 
def test_suite():
934
 
    """Build and return TestSuite for the whole program."""
935
 
    from doctest import DocTestSuite
936
 
 
937
 
    global MODULES_TO_DOCTEST
938
 
 
939
 
    testmod_names = [ \
940
 
                   'bzrlib.tests.test_ancestry',
941
 
                   'bzrlib.tests.test_annotate',
942
 
                   'bzrlib.tests.test_api',
943
 
                   'bzrlib.tests.test_bad_files',
944
 
                   'bzrlib.tests.test_basis_inventory',
945
 
                   'bzrlib.tests.test_branch',
946
 
                   'bzrlib.tests.test_bzrdir',
947
 
                   'bzrlib.tests.test_command',
948
 
                   'bzrlib.tests.test_commit',
949
 
                   'bzrlib.tests.test_commit_merge',
950
 
                   'bzrlib.tests.test_config',
951
 
                   'bzrlib.tests.test_conflicts',
952
 
                   'bzrlib.tests.test_decorators',
953
 
                   'bzrlib.tests.test_diff',
954
 
                   'bzrlib.tests.test_doc_generate',
955
 
                   'bzrlib.tests.test_errors',
956
 
                   'bzrlib.tests.test_escaped_store',
957
 
                   'bzrlib.tests.test_fetch',
958
 
                   'bzrlib.tests.test_gpg',
959
 
                   'bzrlib.tests.test_graph',
960
 
                   'bzrlib.tests.test_hashcache',
961
 
                   'bzrlib.tests.test_http',
962
 
                   'bzrlib.tests.test_identitymap',
963
 
                   'bzrlib.tests.test_inv',
964
 
                   'bzrlib.tests.test_knit',
965
 
                   'bzrlib.tests.test_lockdir',
966
 
                   'bzrlib.tests.test_lockable_files',
967
 
                   'bzrlib.tests.test_log',
968
 
                   'bzrlib.tests.test_merge',
969
 
                   'bzrlib.tests.test_merge3',
970
 
                   'bzrlib.tests.test_merge_core',
971
 
                   'bzrlib.tests.test_missing',
972
 
                   'bzrlib.tests.test_msgeditor',
973
 
                   'bzrlib.tests.test_nonascii',
974
 
                   'bzrlib.tests.test_options',
975
 
                   'bzrlib.tests.test_osutils',
976
 
                   'bzrlib.tests.test_permissions',
977
 
                   'bzrlib.tests.test_plugins',
978
 
                   'bzrlib.tests.test_progress',
979
 
                   'bzrlib.tests.test_reconcile',
980
 
                   'bzrlib.tests.test_repository',
981
 
                   'bzrlib.tests.test_revision',
982
 
                   'bzrlib.tests.test_revisionnamespaces',
983
 
                   'bzrlib.tests.test_revprops',
984
 
                   'bzrlib.tests.test_rio',
985
 
                   'bzrlib.tests.test_sampler',
986
 
                   'bzrlib.tests.test_selftest',
987
 
                   'bzrlib.tests.test_setup',
988
 
                   'bzrlib.tests.test_sftp_transport',
989
 
                   'bzrlib.tests.test_smart_add',
990
 
                   'bzrlib.tests.test_source',
991
 
                   'bzrlib.tests.test_store',
992
 
                   'bzrlib.tests.test_symbol_versioning',
993
 
                   'bzrlib.tests.test_testament',
994
 
                   'bzrlib.tests.test_trace',
995
 
                   'bzrlib.tests.test_transactions',
996
 
                   'bzrlib.tests.test_transform',
997
 
                   'bzrlib.tests.test_transport',
998
 
                   'bzrlib.tests.test_tsort',
999
 
                   'bzrlib.tests.test_ui',
1000
 
                   'bzrlib.tests.test_upgrade',
1001
 
                   'bzrlib.tests.test_versionedfile',
1002
 
                   'bzrlib.tests.test_weave',
1003
 
                   'bzrlib.tests.test_whitebox',
1004
 
                   'bzrlib.tests.test_workingtree',
1005
 
                   'bzrlib.tests.test_xml',
1006
 
                   ]
1007
 
    test_transport_implementations = [
1008
 
        'bzrlib.tests.test_transport_implementations']
1009
 
 
1010
 
    TestCase.BZRPATH = osutils.pathjoin(
1011
 
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
1012
 
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
1013
 
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
1014
 
    print
1015
 
    suite = TestSuite()
1016
 
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
1017
 
    # errors if it fails to load a named module - no indication of what's
1018
 
    # actually wrong, just "no such module".  We should probably override that
1019
 
    # class, but for the moment just load them ourselves. (mbp 20051202)
1020
 
    loader = TestLoader()
1021
 
    from bzrlib.transport import TransportTestProviderAdapter
1022
 
    adapter = TransportTestProviderAdapter()
1023
 
    adapt_modules(test_transport_implementations, adapter, loader, suite)
1024
 
    for mod_name in testmod_names:
1025
 
        mod = _load_module_by_name(mod_name)
1026
 
        suite.addTest(loader.loadTestsFromModule(mod))
1027
 
    for package in packages_to_test():
1028
 
        suite.addTest(package.test_suite())
1029
 
    for m in MODULES_TO_TEST:
1030
 
        suite.addTest(loader.loadTestsFromModule(m))
1031
 
    for m in (MODULES_TO_DOCTEST):
1032
 
        suite.addTest(DocTestSuite(m))
1033
 
    for name, plugin in bzrlib.plugin.all_plugins().items():
1034
 
        if getattr(plugin, 'test_suite', None) is not None:
1035
 
            suite.addTest(plugin.test_suite())
 
3699
        selftest_debug_flags = old_debug_flags
 
3700
 
 
3701
 
 
3702
def load_test_id_list(file_name):
 
3703
    """Load a test id list from a text file.
 
3704
 
 
3705
    The format is one test id by line.  No special care is taken to impose
 
3706
    strict rules, these test ids are used to filter the test suite so a test id
 
3707
    that do not match an existing test will do no harm. This allows user to add
 
3708
    comments, leave blank lines, etc.
 
3709
    """
 
3710
    test_list = []
 
3711
    try:
 
3712
        ftest = open(file_name, 'rt')
 
3713
    except IOError as e:
 
3714
        if e.errno != errno.ENOENT:
 
3715
            raise
 
3716
        else:
 
3717
            raise errors.NoSuchFile(file_name)
 
3718
 
 
3719
    for test_name in ftest.readlines():
 
3720
        test_list.append(test_name.strip())
 
3721
    ftest.close()
 
3722
    return test_list
 
3723
 
 
3724
 
 
3725
def suite_matches_id_list(test_suite, id_list):
 
3726
    """Warns about tests not appearing or appearing more than once.
 
3727
 
 
3728
    :param test_suite: A TestSuite object.
 
3729
    :param test_id_list: The list of test ids that should be found in
 
3730
         test_suite.
 
3731
 
 
3732
    :return: (absents, duplicates) absents is a list containing the test found
 
3733
        in id_list but not in test_suite, duplicates is a list containing the
 
3734
        tests found multiple times in test_suite.
 
3735
 
 
3736
    When using a prefined test id list, it may occurs that some tests do not
 
3737
    exist anymore or that some tests use the same id. This function warns the
 
3738
    tester about potential problems in his workflow (test lists are volatile)
 
3739
    or in the test suite itself (using the same id for several tests does not
 
3740
    help to localize defects).
 
3741
    """
 
3742
    # Build a dict counting id occurrences
 
3743
    tests = dict()
 
3744
    for test in iter_suite_tests(test_suite):
 
3745
        id = test.id()
 
3746
        tests[id] = tests.get(id, 0) + 1
 
3747
 
 
3748
    not_found = []
 
3749
    duplicates = []
 
3750
    for id in id_list:
 
3751
        occurs = tests.get(id, 0)
 
3752
        if not occurs:
 
3753
            not_found.append(id)
 
3754
        elif occurs > 1:
 
3755
            duplicates.append(id)
 
3756
 
 
3757
    return not_found, duplicates
 
3758
 
 
3759
 
 
3760
class TestIdList(object):
 
3761
    """Test id list to filter a test suite.
 
3762
 
 
3763
    Relying on the assumption that test ids are built as:
 
3764
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3765
    notation, this class offers methods to :
 
3766
    - avoid building a test suite for modules not refered to in the test list,
 
3767
    - keep only the tests listed from the module test suite.
 
3768
    """
 
3769
 
 
3770
    def __init__(self, test_id_list):
 
3771
        # When a test suite needs to be filtered against us we compare test ids
 
3772
        # for equality, so a simple dict offers a quick and simple solution.
 
3773
        self.tests = dict().fromkeys(test_id_list, True)
 
3774
 
 
3775
        # While unittest.TestCase have ids like:
 
3776
        # <module>.<class>.<method>[(<param+)],
 
3777
        # doctest.DocTestCase can have ids like:
 
3778
        # <module>
 
3779
        # <module>.<class>
 
3780
        # <module>.<function>
 
3781
        # <module>.<class>.<method>
 
3782
 
 
3783
        # Since we can't predict a test class from its name only, we settle on
 
3784
        # a simple constraint: a test id always begins with its module name.
 
3785
 
 
3786
        modules = {}
 
3787
        for test_id in test_id_list:
 
3788
            parts = test_id.split('.')
 
3789
            mod_name = parts.pop(0)
 
3790
            modules[mod_name] = True
 
3791
            for part in parts:
 
3792
                mod_name += '.' + part
 
3793
                modules[mod_name] = True
 
3794
        self.modules = modules
 
3795
 
 
3796
    def refers_to(self, module_name):
 
3797
        """Is there tests for the module or one of its sub modules."""
 
3798
        return module_name in self.modules
 
3799
 
 
3800
    def includes(self, test_id):
 
3801
        return test_id in self.tests
 
3802
 
 
3803
 
 
3804
class TestPrefixAliasRegistry(registry.Registry):
 
3805
    """A registry for test prefix aliases.
 
3806
 
 
3807
    This helps implement shorcuts for the --starting-with selftest
 
3808
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3809
    warning will be emitted).
 
3810
    """
 
3811
 
 
3812
    def register(self, key, obj, help=None, info=None,
 
3813
                 override_existing=False):
 
3814
        """See Registry.register.
 
3815
 
 
3816
        Trying to override an existing alias causes a warning to be emitted,
 
3817
        not a fatal execption.
 
3818
        """
 
3819
        try:
 
3820
            super(TestPrefixAliasRegistry, self).register(
 
3821
                key, obj, help=help, info=info, override_existing=False)
 
3822
        except KeyError:
 
3823
            actual = self.get(key)
 
3824
            trace.note(
 
3825
                'Test prefix alias %s is already used for %s, ignoring %s'
 
3826
                % (key, actual, obj))
 
3827
 
 
3828
    def resolve_alias(self, id_start):
 
3829
        """Replace the alias by the prefix in the given string.
 
3830
 
 
3831
        Using an unknown prefix is an error to help catching typos.
 
3832
        """
 
3833
        parts = id_start.split('.')
 
3834
        try:
 
3835
            parts[0] = self.get(parts[0])
 
3836
        except KeyError:
 
3837
            raise errors.BzrCommandError(
 
3838
                '%s is not a known test prefix alias' % parts[0])
 
3839
        return '.'.join(parts)
 
3840
 
 
3841
 
 
3842
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3843
"""Registry of test prefix aliases."""
 
3844
 
 
3845
 
 
3846
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3847
# appear prefixed ('breezy.' is "replaced" by 'breezy.').
 
3848
test_prefix_alias_registry.register('breezy', 'breezy')
 
3849
 
 
3850
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3851
test_prefix_alias_registry.register('bd', 'breezy.doc')
 
3852
test_prefix_alias_registry.register('bu', 'breezy.utils')
 
3853
test_prefix_alias_registry.register('bt', 'breezy.tests')
 
3854
test_prefix_alias_registry.register('bb', 'breezy.tests.blackbox')
 
3855
test_prefix_alias_registry.register('bp', 'breezy.plugins')
 
3856
 
 
3857
 
 
3858
def _test_suite_testmod_names():
 
3859
    """Return the standard list of test module names to test."""
 
3860
    return [
 
3861
        'breezy.doc',
 
3862
        'breezy.tests.blackbox',
 
3863
        'breezy.tests.commands',
 
3864
        'breezy.tests.per_branch',
 
3865
        'breezy.tests.per_bzrdir',
 
3866
        'breezy.tests.per_controldir',
 
3867
        'breezy.tests.per_controldir_colo',
 
3868
        'breezy.tests.per_foreign_vcs',
 
3869
        'breezy.tests.per_interrepository',
 
3870
        'breezy.tests.per_intertree',
 
3871
        'breezy.tests.per_inventory',
 
3872
        'breezy.tests.per_interbranch',
 
3873
        'breezy.tests.per_lock',
 
3874
        'breezy.tests.per_merger',
 
3875
        'breezy.tests.per_transport',
 
3876
        'breezy.tests.per_tree',
 
3877
        'breezy.tests.per_pack_repository',
 
3878
        'breezy.tests.per_repository',
 
3879
        'breezy.tests.per_repository_chk',
 
3880
        'breezy.tests.per_repository_reference',
 
3881
        'breezy.tests.per_repository_vf',
 
3882
        'breezy.tests.per_uifactory',
 
3883
        'breezy.tests.per_versionedfile',
 
3884
        'breezy.tests.per_workingtree',
 
3885
        'breezy.tests.test__annotator',
 
3886
        'breezy.tests.test__bencode',
 
3887
        'breezy.tests.test__btree_serializer',
 
3888
        'breezy.tests.test__chk_map',
 
3889
        'breezy.tests.test__dirstate_helpers',
 
3890
        'breezy.tests.test__groupcompress',
 
3891
        'breezy.tests.test__known_graph',
 
3892
        'breezy.tests.test__rio',
 
3893
        'breezy.tests.test__simple_set',
 
3894
        'breezy.tests.test__static_tuple',
 
3895
        'breezy.tests.test__walkdirs_win32',
 
3896
        'breezy.tests.test_ancestry',
 
3897
        'breezy.tests.test_annotate',
 
3898
        'breezy.tests.test_atomicfile',
 
3899
        'breezy.tests.test_bad_files',
 
3900
        'breezy.tests.test_bisect',
 
3901
        'breezy.tests.test_bisect_multi',
 
3902
        'breezy.tests.test_branch',
 
3903
        'breezy.tests.test_branchbuilder',
 
3904
        'breezy.tests.test_btree_index',
 
3905
        'breezy.tests.test_bugtracker',
 
3906
        'breezy.tests.test_bundle',
 
3907
        'breezy.tests.test_bzrdir',
 
3908
        'breezy.tests.test__chunks_to_lines',
 
3909
        'breezy.tests.test_cache_utf8',
 
3910
        'breezy.tests.test_chk_map',
 
3911
        'breezy.tests.test_chk_serializer',
 
3912
        'breezy.tests.test_chunk_writer',
 
3913
        'breezy.tests.test_clean_tree',
 
3914
        'breezy.tests.test_cleanup',
 
3915
        'breezy.tests.test_cmdline',
 
3916
        'breezy.tests.test_commands',
 
3917
        'breezy.tests.test_commit',
 
3918
        'breezy.tests.test_commit_merge',
 
3919
        'breezy.tests.test_config',
 
3920
        'breezy.tests.test_conflicts',
 
3921
        'breezy.tests.test_controldir',
 
3922
        'breezy.tests.test_counted_lock',
 
3923
        'breezy.tests.test_crash',
 
3924
        'breezy.tests.test_decorators',
 
3925
        'breezy.tests.test_delta',
 
3926
        'breezy.tests.test_debug',
 
3927
        'breezy.tests.test_diff',
 
3928
        'breezy.tests.test_directory_service',
 
3929
        'breezy.tests.test_dirstate',
 
3930
        'breezy.tests.test_email_message',
 
3931
        'breezy.tests.test_eol_filters',
 
3932
        'breezy.tests.test_errors',
 
3933
        'breezy.tests.test_estimate_compressed_size',
 
3934
        'breezy.tests.test_export',
 
3935
        'breezy.tests.test_export_pot',
 
3936
        'breezy.tests.test_extract',
 
3937
        'breezy.tests.test_features',
 
3938
        'breezy.tests.test_fetch',
 
3939
        'breezy.tests.test_fetch_ghosts',
 
3940
        'breezy.tests.test_fixtures',
 
3941
        'breezy.tests.test_fifo_cache',
 
3942
        'breezy.tests.test_filters',
 
3943
        'breezy.tests.test_filter_tree',
 
3944
        'breezy.tests.test_ftp_transport',
 
3945
        'breezy.tests.test_foreign',
 
3946
        'breezy.tests.test_generate_docs',
 
3947
        'breezy.tests.test_generate_ids',
 
3948
        'breezy.tests.test_globbing',
 
3949
        'breezy.tests.test_gpg',
 
3950
        'breezy.tests.test_graph',
 
3951
        'breezy.tests.test_groupcompress',
 
3952
        'breezy.tests.test_hashcache',
 
3953
        'breezy.tests.test_help',
 
3954
        'breezy.tests.test_hooks',
 
3955
        'breezy.tests.test_http',
 
3956
        'breezy.tests.test_http_response',
 
3957
        'breezy.tests.test_https_ca_bundle',
 
3958
        'breezy.tests.test_https_urllib',
 
3959
        'breezy.tests.test_i18n',
 
3960
        'breezy.tests.test_identitymap',
 
3961
        'breezy.tests.test_ignores',
 
3962
        'breezy.tests.test_index',
 
3963
        'breezy.tests.test_import_tariff',
 
3964
        'breezy.tests.test_info',
 
3965
        'breezy.tests.test_inv',
 
3966
        'breezy.tests.test_inventory_delta',
 
3967
        'breezy.tests.test_knit',
 
3968
        'breezy.tests.test_lazy_import',
 
3969
        'breezy.tests.test_lazy_regex',
 
3970
        'breezy.tests.test_library_state',
 
3971
        'breezy.tests.test_lock',
 
3972
        'breezy.tests.test_lockable_files',
 
3973
        'breezy.tests.test_lockdir',
 
3974
        'breezy.tests.test_log',
 
3975
        'breezy.tests.test_lru_cache',
 
3976
        'breezy.tests.test_lsprof',
 
3977
        'breezy.tests.test_mail_client',
 
3978
        'breezy.tests.test_matchers',
 
3979
        'breezy.tests.test_memorytree',
 
3980
        'breezy.tests.test_merge',
 
3981
        'breezy.tests.test_merge3',
 
3982
        'breezy.tests.test_merge_core',
 
3983
        'breezy.tests.test_merge_directive',
 
3984
        'breezy.tests.test_mergetools',
 
3985
        'breezy.tests.test_missing',
 
3986
        'breezy.tests.test_msgeditor',
 
3987
        'breezy.tests.test_multiparent',
 
3988
        'breezy.tests.test_mutabletree',
 
3989
        'breezy.tests.test_nonascii',
 
3990
        'breezy.tests.test_options',
 
3991
        'breezy.tests.test_osutils',
 
3992
        'breezy.tests.test_osutils_encodings',
 
3993
        'breezy.tests.test_pack',
 
3994
        'breezy.tests.test_patch',
 
3995
        'breezy.tests.test_patches',
 
3996
        'breezy.tests.test_permissions',
 
3997
        'breezy.tests.test_plugins',
 
3998
        'breezy.tests.test_progress',
 
3999
        'breezy.tests.test_pyutils',
 
4000
        'breezy.tests.test_read_bundle',
 
4001
        'breezy.tests.test_reconcile',
 
4002
        'breezy.tests.test_reconfigure',
 
4003
        'breezy.tests.test_registry',
 
4004
        'breezy.tests.test_remote',
 
4005
        'breezy.tests.test_rename_map',
 
4006
        'breezy.tests.test_repository',
 
4007
        'breezy.tests.test_revert',
 
4008
        'breezy.tests.test_revision',
 
4009
        'breezy.tests.test_revisionspec',
 
4010
        'breezy.tests.test_revisiontree',
 
4011
        'breezy.tests.test_rio',
 
4012
        'breezy.tests.test_rules',
 
4013
        'breezy.tests.test_url_policy_open',
 
4014
        'breezy.tests.test_sampler',
 
4015
        'breezy.tests.test_scenarios',
 
4016
        'breezy.tests.test_script',
 
4017
        'breezy.tests.test_selftest',
 
4018
        'breezy.tests.test_serializer',
 
4019
        'breezy.tests.test_setup',
 
4020
        'breezy.tests.test_sftp_transport',
 
4021
        'breezy.tests.test_shelf',
 
4022
        'breezy.tests.test_shelf_ui',
 
4023
        'breezy.tests.test_smart',
 
4024
        'breezy.tests.test_smart_add',
 
4025
        'breezy.tests.test_smart_request',
 
4026
        'breezy.tests.test_smart_signals',
 
4027
        'breezy.tests.test_smart_transport',
 
4028
        'breezy.tests.test_smtp_connection',
 
4029
        'breezy.tests.test_source',
 
4030
        'breezy.tests.test_ssh_transport',
 
4031
        'breezy.tests.test_status',
 
4032
        'breezy.tests.test_strace',
 
4033
        'breezy.tests.test_subsume',
 
4034
        'breezy.tests.test_switch',
 
4035
        'breezy.tests.test_symbol_versioning',
 
4036
        'breezy.tests.test_tag',
 
4037
        'breezy.tests.test_test_server',
 
4038
        'breezy.tests.test_testament',
 
4039
        'breezy.tests.test_textfile',
 
4040
        'breezy.tests.test_textmerge',
 
4041
        'breezy.tests.test_cethread',
 
4042
        'breezy.tests.test_timestamp',
 
4043
        'breezy.tests.test_trace',
 
4044
        'breezy.tests.test_transactions',
 
4045
        'breezy.tests.test_transform',
 
4046
        'breezy.tests.test_transport',
 
4047
        'breezy.tests.test_transport_log',
 
4048
        'breezy.tests.test_tree',
 
4049
        'breezy.tests.test_treebuilder',
 
4050
        'breezy.tests.test_treeshape',
 
4051
        'breezy.tests.test_tsort',
 
4052
        'breezy.tests.test_tuned_gzip',
 
4053
        'breezy.tests.test_ui',
 
4054
        'breezy.tests.test_uncommit',
 
4055
        'breezy.tests.test_upgrade',
 
4056
        'breezy.tests.test_upgrade_stacked',
 
4057
        'breezy.tests.test_upstream_import',
 
4058
        'breezy.tests.test_urlutils',
 
4059
        'breezy.tests.test_utextwrap',
 
4060
        'breezy.tests.test_version',
 
4061
        'breezy.tests.test_version_info',
 
4062
        'breezy.tests.test_versionedfile',
 
4063
        'breezy.tests.test_vf_search',
 
4064
        'breezy.tests.test_views',
 
4065
        'breezy.tests.test_weave',
 
4066
        'breezy.tests.test_whitebox',
 
4067
        'breezy.tests.test_win32utils',
 
4068
        'breezy.tests.test_workingtree',
 
4069
        'breezy.tests.test_workingtree_4',
 
4070
        'breezy.tests.test_wsgi',
 
4071
        'breezy.tests.test_xml',
 
4072
        ]
 
4073
 
 
4074
 
 
4075
def _test_suite_modules_to_doctest():
 
4076
    """Return the list of modules to doctest."""
 
4077
    if __doc__ is None:
 
4078
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
4079
        return []
 
4080
    return [
 
4081
        'breezy',
 
4082
        'breezy.branchbuilder',
 
4083
        'breezy.bzr.inventory',
 
4084
        'breezy.decorators',
 
4085
        'breezy.iterablefile',
 
4086
        'breezy.lockdir',
 
4087
        'breezy.merge3',
 
4088
        'breezy.option',
 
4089
        'breezy.pyutils',
 
4090
        'breezy.symbol_versioning',
 
4091
        'breezy.tests',
 
4092
        'breezy.tests.fixtures',
 
4093
        'breezy.timestamp',
 
4094
        'breezy.transport.http',
 
4095
        'breezy.version_info_formats.format_custom',
 
4096
        ]
 
4097
 
 
4098
 
 
4099
def test_suite(keep_only=None, starting_with=None):
 
4100
    """Build and return TestSuite for the whole of breezy.
 
4101
 
 
4102
    :param keep_only: A list of test ids limiting the suite returned.
 
4103
 
 
4104
    :param starting_with: An id limiting the suite returned to the tests
 
4105
         starting with it.
 
4106
 
 
4107
    This function can be replaced if you need to change the default test
 
4108
    suite on a global basis, but it is not encouraged.
 
4109
    """
 
4110
 
 
4111
    loader = TestUtil.TestLoader()
 
4112
 
 
4113
    if keep_only is not None:
 
4114
        id_filter = TestIdList(keep_only)
 
4115
    if starting_with:
 
4116
        # We take precedence over keep_only because *at loading time* using
 
4117
        # both options means we will load less tests for the same final result.
 
4118
        def interesting_module(name):
 
4119
            for start in starting_with:
 
4120
                if (
 
4121
                    # Either the module name starts with the specified string
 
4122
                    name.startswith(start)
 
4123
                    # or it may contain tests starting with the specified string
 
4124
                    or start.startswith(name)
 
4125
                    ):
 
4126
                    return True
 
4127
            return False
 
4128
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
4129
 
 
4130
    elif keep_only is not None:
 
4131
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
4132
        def interesting_module(name):
 
4133
            return id_filter.refers_to(name)
 
4134
 
 
4135
    else:
 
4136
        loader = TestUtil.TestLoader()
 
4137
        def interesting_module(name):
 
4138
            # No filtering, all modules are interesting
 
4139
            return True
 
4140
 
 
4141
    suite = loader.suiteClass()
 
4142
 
 
4143
    # modules building their suite with loadTestsFromModuleNames
 
4144
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
4145
 
 
4146
    for mod in _test_suite_modules_to_doctest():
 
4147
        if not interesting_module(mod):
 
4148
            # No tests to keep here, move along
 
4149
            continue
 
4150
        try:
 
4151
            # note that this really does mean "report only" -- doctest
 
4152
            # still runs the rest of the examples
 
4153
            doc_suite = IsolatedDocTestSuite(
 
4154
                mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
4155
        except ValueError as e:
 
4156
            print('**failed to get doctest for: %s\n%s' % (mod, e))
 
4157
            raise
 
4158
        if len(doc_suite._tests) == 0:
 
4159
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
4160
        suite.addTest(doc_suite)
 
4161
 
 
4162
    default_encoding = sys.getdefaultencoding()
 
4163
    for name, plugin in _mod_plugin.plugins().items():
 
4164
        if not interesting_module(plugin.module.__name__):
 
4165
            continue
 
4166
        plugin_suite = plugin.test_suite()
 
4167
        # We used to catch ImportError here and turn it into just a warning,
 
4168
        # but really if you don't have --no-plugins this should be a failure.
 
4169
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
4170
        if plugin_suite is None:
 
4171
            plugin_suite = plugin.load_plugin_tests(loader)
 
4172
        if plugin_suite is not None:
 
4173
            suite.addTest(plugin_suite)
 
4174
        if default_encoding != sys.getdefaultencoding():
 
4175
            trace.warning(
 
4176
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
4177
                sys.getdefaultencoding())
 
4178
            reload(sys)
 
4179
            sys.setdefaultencoding(default_encoding)
 
4180
 
 
4181
    if keep_only is not None:
 
4182
        # Now that the referred modules have loaded their tests, keep only the
 
4183
        # requested ones.
 
4184
        suite = filter_suite_by_id_list(suite, id_filter)
 
4185
        # Do some sanity checks on the id_list filtering
 
4186
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
4187
        if starting_with:
 
4188
            # The tester has used both keep_only and starting_with, so he is
 
4189
            # already aware that some tests are excluded from the list, there
 
4190
            # is no need to tell him which.
 
4191
            pass
 
4192
        else:
 
4193
            # Some tests mentioned in the list are not in the test suite. The
 
4194
            # list may be out of date, report to the tester.
 
4195
            for id in not_found:
 
4196
                trace.warning('"%s" not found in the test suite', id)
 
4197
        for id in duplicates:
 
4198
            trace.warning('"%s" is used as an id by several tests', id)
 
4199
 
1036
4200
    return suite
1037
4201
 
1038
4202
 
1039
 
def adapt_modules(mods_list, adapter, loader, suite):
1040
 
    """Adapt the modules in mods_list using adapter and add to suite."""
1041
 
    for mod_name in mods_list:
1042
 
        mod = _load_module_by_name(mod_name)
1043
 
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
1044
 
            suite.addTests(adapter.adapt(test))
1045
 
 
1046
 
 
1047
 
def _load_module_by_name(mod_name):
1048
 
    parts = mod_name.split('.')
1049
 
    module = __import__(mod_name)
1050
 
    del parts[0]
1051
 
    # for historical reasons python returns the top-level module even though
1052
 
    # it loads the submodule; we need to walk down to get the one we want.
1053
 
    while parts:
1054
 
        module = getattr(module, parts.pop(0))
1055
 
    return module
 
4203
def multiply_scenarios(*scenarios):
 
4204
    """Multiply two or more iterables of scenarios.
 
4205
 
 
4206
    It is safe to pass scenario generators or iterators.
 
4207
 
 
4208
    :returns: A list of compound scenarios: the cross-product of all
 
4209
        scenarios, with the names concatenated and the parameters
 
4210
        merged together.
 
4211
    """
 
4212
    return functools.reduce(_multiply_two_scenarios, map(list, scenarios))
 
4213
 
 
4214
 
 
4215
def _multiply_two_scenarios(scenarios_left, scenarios_right):
 
4216
    """Multiply two sets of scenarios.
 
4217
 
 
4218
    :returns: the cartesian product of the two sets of scenarios, that is
 
4219
        a scenario for every possible combination of a left scenario and a
 
4220
        right scenario.
 
4221
    """
 
4222
    return [
 
4223
        ('%s,%s' % (left_name, right_name),
 
4224
         dict(left_dict, **right_dict))
 
4225
        for left_name, left_dict in scenarios_left
 
4226
        for right_name, right_dict in scenarios_right]
 
4227
 
 
4228
 
 
4229
def multiply_tests(tests, scenarios, result):
 
4230
    """Multiply tests_list by scenarios into result.
 
4231
 
 
4232
    This is the core workhorse for test parameterisation.
 
4233
 
 
4234
    Typically the load_tests() method for a per-implementation test suite will
 
4235
    call multiply_tests and return the result.
 
4236
 
 
4237
    :param tests: The tests to parameterise.
 
4238
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4239
        scenario_param_dict).
 
4240
    :param result: A TestSuite to add created tests to.
 
4241
 
 
4242
    This returns the passed in result TestSuite with the cross product of all
 
4243
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4244
    the scenario name at the end of its id(), and updating the test object's
 
4245
    __dict__ with the scenario_param_dict.
 
4246
 
 
4247
    >>> import breezy.tests.test_sampler
 
4248
    >>> r = multiply_tests(
 
4249
    ...     breezy.tests.test_sampler.DemoTest('test_nothing'),
 
4250
    ...     [('one', dict(param=1)),
 
4251
    ...      ('two', dict(param=2))],
 
4252
    ...     TestUtil.TestSuite())
 
4253
    >>> tests = list(iter_suite_tests(r))
 
4254
    >>> len(tests)
 
4255
    2
 
4256
    >>> tests[0].id()
 
4257
    'breezy.tests.test_sampler.DemoTest.test_nothing(one)'
 
4258
    >>> tests[0].param
 
4259
    1
 
4260
    >>> tests[1].param
 
4261
    2
 
4262
    """
 
4263
    for test in iter_suite_tests(tests):
 
4264
        apply_scenarios(test, scenarios, result)
 
4265
    return result
 
4266
 
 
4267
 
 
4268
def apply_scenarios(test, scenarios, result):
 
4269
    """Apply the scenarios in scenarios to test and add to result.
 
4270
 
 
4271
    :param test: The test to apply scenarios to.
 
4272
    :param scenarios: An iterable of scenarios to apply to test.
 
4273
    :return: result
 
4274
    :seealso: apply_scenario
 
4275
    """
 
4276
    for scenario in scenarios:
 
4277
        result.addTest(apply_scenario(test, scenario))
 
4278
    return result
 
4279
 
 
4280
 
 
4281
def apply_scenario(test, scenario):
 
4282
    """Copy test and apply scenario to it.
 
4283
 
 
4284
    :param test: A test to adapt.
 
4285
    :param scenario: A tuple describing the scenario.
 
4286
        The first element of the tuple is the new test id.
 
4287
        The second element is a dict containing attributes to set on the
 
4288
        test.
 
4289
    :return: The adapted test.
 
4290
    """
 
4291
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4292
    new_test = clone_test(test, new_id)
 
4293
    for name, value in scenario[1].items():
 
4294
        setattr(new_test, name, value)
 
4295
    return new_test
 
4296
 
 
4297
 
 
4298
def clone_test(test, new_id):
 
4299
    """Clone a test giving it a new id.
 
4300
 
 
4301
    :param test: The test to clone.
 
4302
    :param new_id: The id to assign to it.
 
4303
    :return: The new test.
 
4304
    """
 
4305
    new_test = copy.copy(test)
 
4306
    new_test.id = lambda: new_id
 
4307
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4308
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4309
    # read the test output for parameterized tests, because tracebacks will be
 
4310
    # associated with irrelevant tests.
 
4311
    try:
 
4312
        details = new_test._TestCase__details
 
4313
    except AttributeError:
 
4314
        # must be a different version of testtools than expected.  Do nothing.
 
4315
        pass
 
4316
    else:
 
4317
        # Reset the '__details' dict.
 
4318
        new_test._TestCase__details = {}
 
4319
    return new_test
 
4320
 
 
4321
 
 
4322
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4323
                                ext_module_name):
 
4324
    """Helper for permutating tests against an extension module.
 
4325
 
 
4326
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4327
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4328
    against both implementations. Setting 'test.module' to the appropriate
 
4329
    module. See breezy.tests.test__chk_map.load_tests as an example.
 
4330
 
 
4331
    :param standard_tests: A test suite to permute
 
4332
    :param loader: A TestLoader
 
4333
    :param py_module_name: The python path to a python module that can always
 
4334
        be loaded, and will be considered the 'python' implementation. (eg
 
4335
        'breezy._chk_map_py')
 
4336
    :param ext_module_name: The python path to an extension module. If the
 
4337
        module cannot be loaded, a single test will be added, which notes that
 
4338
        the module is not available. If it can be loaded, all standard_tests
 
4339
        will be run against that module.
 
4340
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4341
        tests. feature is the Feature object that can be used to determine if
 
4342
        the module is available.
 
4343
    """
 
4344
 
 
4345
    from .features import ModuleAvailableFeature
 
4346
    py_module = pyutils.get_named_object(py_module_name)
 
4347
    scenarios = [
 
4348
        ('python', {'module': py_module}),
 
4349
    ]
 
4350
    suite = loader.suiteClass()
 
4351
    feature = ModuleAvailableFeature(ext_module_name)
 
4352
    if feature.available():
 
4353
        scenarios.append(('C', {'module': feature.module}))
 
4354
    else:
 
4355
        # the compiled module isn't available, so we add a failing test
 
4356
        class FailWithoutFeature(TestCase):
 
4357
            def test_fail(self):
 
4358
                self.requireFeature(feature)
 
4359
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4360
    result = multiply_tests(standard_tests, scenarios, suite)
 
4361
    return result, feature
 
4362
 
 
4363
 
 
4364
def _rmtree_temp_dir(dirname, test_id=None):
 
4365
    # If LANG=C we probably have created some bogus paths
 
4366
    # which rmtree(unicode) will fail to delete
 
4367
    # so make sure we are using rmtree(str) to delete everything
 
4368
    # except on win32, where rmtree(str) will fail
 
4369
    # since it doesn't have the property of byte-stream paths
 
4370
    # (they are either ascii or mbcs)
 
4371
    if sys.platform == 'win32' and isinstance(dirname, bytes):
 
4372
        # make sure we are using the unicode win32 api
 
4373
        dirname = dirname.decode('mbcs')
 
4374
    else:
 
4375
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4376
    try:
 
4377
        osutils.rmtree(dirname)
 
4378
    except OSError as e:
 
4379
        # We don't want to fail here because some useful display will be lost
 
4380
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4381
        # possible info to the test runner is even worse.
 
4382
        if test_id != None:
 
4383
            ui.ui_factory.clear_term()
 
4384
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4385
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4386
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4387
                                    ).encode('ascii', 'replace')
 
4388
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4389
                         % (os.path.basename(dirname), printable_e))
 
4390
 
 
4391
 
 
4392
def probe_unicode_in_user_encoding():
 
4393
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4394
    Return first successfull match.
 
4395
 
 
4396
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4397
    """
 
4398
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4399
    for uni_val in possible_vals:
 
4400
        try:
 
4401
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4402
        except UnicodeEncodeError:
 
4403
            # Try a different character
 
4404
            pass
 
4405
        else:
 
4406
            return uni_val, str_val
 
4407
    return None, None
 
4408
 
 
4409
 
 
4410
def probe_bad_non_ascii(encoding):
 
4411
    """Try to find [bad] character with code [128..255]
 
4412
    that cannot be decoded to unicode in some encoding.
 
4413
    Return None if all non-ascii characters is valid
 
4414
    for given encoding.
 
4415
    """
 
4416
    for i in range(128, 256):
 
4417
        char = chr(i)
 
4418
        try:
 
4419
            char.decode(encoding)
 
4420
        except UnicodeDecodeError:
 
4421
            return char
 
4422
    return None
 
4423
 
 
4424
 
 
4425
# Only define SubUnitBzrRunner if subunit is available.
 
4426
try:
 
4427
    from subunit import TestProtocolClient
 
4428
    from subunit.test_results import AutoTimingTestResultDecorator
 
4429
 
 
4430
    class SubUnitBzrProtocolClientv1(TestProtocolClient):
 
4431
 
 
4432
        def stopTest(self, test):
 
4433
            super(SubUnitBzrProtocolClientv1, self).stopTest(test)
 
4434
            _clear__type_equality_funcs(test)
 
4435
 
 
4436
        def addSuccess(self, test, details=None):
 
4437
            # The subunit client always includes the details in the subunit
 
4438
            # stream, but we don't want to include it in ours.
 
4439
            if details is not None and 'log' in details:
 
4440
                del details['log']
 
4441
            return super(SubUnitBzrProtocolClientv1, self).addSuccess(
 
4442
                test, details)
 
4443
 
 
4444
    class SubUnitBzrRunnerv1(TextTestRunner):
 
4445
 
 
4446
        def run(self, test):
 
4447
            result = AutoTimingTestResultDecorator(
 
4448
                SubUnitBzrProtocolClientv1(self.stream))
 
4449
            test.run(result)
 
4450
            return result
 
4451
except ImportError:
 
4452
    pass
 
4453
 
 
4454
 
 
4455
try:
 
4456
    from subunit.run import SubunitTestRunner
 
4457
 
 
4458
    class SubUnitBzrRunnerv2(TextTestRunner, SubunitTestRunner):
 
4459
 
 
4460
        def __init__(self, stream=sys.stderr, descriptions=0, verbosity=1,
 
4461
                     bench_history=None, strict=False, result_decorators=None):
 
4462
            TextTestRunner.__init__(
 
4463
                    self, stream=stream,
 
4464
                    descriptions=descriptions, verbosity=verbosity,
 
4465
                    bench_history=bench_history, strict=strict,
 
4466
                    result_decorators=result_decorators)
 
4467
            SubunitTestRunner.__init__(self, verbosity=verbosity,
 
4468
                                       stream=stream)
 
4469
 
 
4470
        run = SubunitTestRunner.run
 
4471
except ImportError:
 
4472
    pass