/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2019-08-12 20:24:50 UTC
  • mto: (7290.1.35 work)
  • mto: This revision was merged to the branch mainline in revision 7405.
  • Revision ID: jelmer@jelmer.uk-20190812202450-vdpamxay6sebo93w
Fix path to brz.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2013, 2015, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Testing framework extensions"""
 
18
 
 
19
from __future__ import absolute_import
 
20
 
 
21
# NOTE: Some classes in here use camelCaseNaming() rather than
 
22
# underscore_naming().  That's for consistency with unittest; it's not the
 
23
# general style of breezy.  Please continue that consistency when adding e.g.
 
24
# new assertFoo() methods.
 
25
 
 
26
import atexit
 
27
import codecs
 
28
import copy
 
29
import difflib
 
30
import doctest
 
31
import errno
 
32
import functools
 
33
from io import (
 
34
    BytesIO,
 
35
    StringIO,
 
36
    TextIOWrapper,
 
37
    )
 
38
import itertools
 
39
import logging
 
40
import math
 
41
import os
 
42
import platform
 
43
import pprint
 
44
import random
 
45
import re
 
46
import shlex
 
47
import site
 
48
import stat
 
49
import subprocess
 
50
import sys
 
51
import tempfile
 
52
import threading
 
53
import time
 
54
import traceback
 
55
import unittest
 
56
import warnings
 
57
 
 
58
import testtools
 
59
# nb: check this before importing anything else from within it
 
60
_testtools_version = getattr(testtools, '__version__', ())
 
61
if _testtools_version < (0, 9, 5):
 
62
    raise ImportError("need at least testtools 0.9.5: %s is %r"
 
63
                      % (testtools.__file__, _testtools_version))
 
64
from testtools import content
 
65
 
 
66
import breezy
 
67
from .. import (
 
68
    branchbuilder,
 
69
    controldir,
 
70
    commands as _mod_commands,
 
71
    config,
 
72
    i18n,
 
73
    debug,
 
74
    errors,
 
75
    hooks,
 
76
    lock as _mod_lock,
 
77
    lockdir,
 
78
    osutils,
 
79
    plugin as _mod_plugin,
 
80
    pyutils,
 
81
    ui,
 
82
    urlutils,
 
83
    registry,
 
84
    symbol_versioning,
 
85
    trace,
 
86
    transport as _mod_transport,
 
87
    workingtree,
 
88
    )
 
89
from breezy.bzr import (
 
90
    chk_map,
 
91
    )
 
92
try:
 
93
    import breezy.lsprof
 
94
except ImportError:
 
95
    # lsprof not available
 
96
    pass
 
97
from ..sixish import (
 
98
    int2byte,
 
99
    PY3,
 
100
    string_types,
 
101
    text_type,
 
102
    )
 
103
from ..bzr.smart import client, request
 
104
from ..transport import (
 
105
    memory,
 
106
    pathfilter,
 
107
    )
 
108
from ..tests import (
 
109
    fixtures,
 
110
    test_server,
 
111
    TestUtil,
 
112
    treeshape,
 
113
    ui_testing,
 
114
    )
 
115
 
 
116
# Mark this python module as being part of the implementation
 
117
# of unittest: this gives us better tracebacks where the last
 
118
# shown frame is the test code, not our assertXYZ.
 
119
__unittest = 1
 
120
 
 
121
default_transport = test_server.LocalURLServer
 
122
 
 
123
 
 
124
_unitialized_attr = object()
 
125
"""A sentinel needed to act as a default value in a method signature."""
 
126
 
 
127
 
 
128
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
129
SUBUNIT_SEEK_SET = 0
 
130
SUBUNIT_SEEK_CUR = 1
 
131
 
 
132
# These are intentionally brought into this namespace. That way plugins, etc
 
133
# can just "from breezy.tests import TestCase, TestLoader, etc"
 
134
TestSuite = TestUtil.TestSuite
 
135
TestLoader = TestUtil.TestLoader
 
136
 
 
137
# Tests should run in a clean and clearly defined environment. The goal is to
 
138
# keep them isolated from the running environment as mush as possible. The test
 
139
# framework ensures the variables defined below are set (or deleted if the
 
140
# value is None) before a test is run and reset to their original value after
 
141
# the test is run. Generally if some code depends on an environment variable,
 
142
# the tests should start without this variable in the environment. There are a
 
143
# few exceptions but you shouldn't violate this rule lightly.
 
144
isolated_environ = {
 
145
    'BRZ_HOME': None,
 
146
    'HOME': None,
 
147
    'GNUPGHOME': None,
 
148
    'XDG_CONFIG_HOME': None,
 
149
    # brz now uses the Win32 API and doesn't rely on APPDATA, but the
 
150
    # tests do check our impls match APPDATA
 
151
    'BRZ_EDITOR': None,  # test_msgeditor manipulates this variable
 
152
    'VISUAL': None,
 
153
    'EDITOR': None,
 
154
    'BRZ_EMAIL': None,
 
155
    'BZREMAIL': None,  # may still be present in the environment
 
156
    'EMAIL': 'jrandom@example.com',  # set EMAIL as brz does not guess
 
157
    'BRZ_PROGRESS_BAR': None,
 
158
    # This should trap leaks to ~/.brz.log. This occurs when tests use TestCase
 
159
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
160
    # TestCase should not use disk resources, BRZ_LOG is one.
 
161
    'BRZ_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
162
    'BRZ_PLUGIN_PATH': '-site',
 
163
    'BRZ_DISABLE_PLUGINS': None,
 
164
    'BRZ_PLUGINS_AT': None,
 
165
    'BRZ_CONCURRENCY': None,
 
166
    # Make sure that any text ui tests are consistent regardless of
 
167
    # the environment the test case is run in; you may want tests that
 
168
    # test other combinations.  'dumb' is a reasonable guess for tests
 
169
    # going to a pipe or a BytesIO.
 
170
    'TERM': 'dumb',
 
171
    'LINES': '25',
 
172
    'COLUMNS': '80',
 
173
    'BRZ_COLUMNS': '80',
 
174
    # Disable SSH Agent
 
175
    'SSH_AUTH_SOCK': None,
 
176
    # Proxies
 
177
    'http_proxy': None,
 
178
    'HTTP_PROXY': None,
 
179
    'https_proxy': None,
 
180
    'HTTPS_PROXY': None,
 
181
    'no_proxy': None,
 
182
    'NO_PROXY': None,
 
183
    'all_proxy': None,
 
184
    'ALL_PROXY': None,
 
185
    'BZR_REMOTE_PATH': None,
 
186
    # Generally speaking, we don't want apport reporting on crashes in
 
187
    # the test envirnoment unless we're specifically testing apport,
 
188
    # so that it doesn't leak into the real system environment.  We
 
189
    # use an env var so it propagates to subprocesses.
 
190
    'APPORT_DISABLE': '1',
 
191
    }
 
192
 
 
193
 
 
194
def override_os_environ(test, env=None):
 
195
    """Modify os.environ keeping a copy.
 
196
 
 
197
    :param test: A test instance
 
198
 
 
199
    :param env: A dict containing variable definitions to be installed
 
200
    """
 
201
    if env is None:
 
202
        env = isolated_environ
 
203
    test._original_os_environ = dict(**os.environ)
 
204
    for var in env:
 
205
        osutils.set_or_unset_env(var, env[var])
 
206
        if var not in test._original_os_environ:
 
207
            # The var is new, add it with a value of None, so
 
208
            # restore_os_environ will delete it
 
209
            test._original_os_environ[var] = None
 
210
 
 
211
 
 
212
def restore_os_environ(test):
 
213
    """Restore os.environ to its original state.
 
214
 
 
215
    :param test: A test instance previously passed to override_os_environ.
 
216
    """
 
217
    for var, value in test._original_os_environ.items():
 
218
        # Restore the original value (or delete it if the value has been set to
 
219
        # None in override_os_environ).
 
220
        osutils.set_or_unset_env(var, value)
 
221
 
 
222
 
 
223
def _clear__type_equality_funcs(test):
 
224
    """Cleanup bound methods stored on TestCase instances
 
225
 
 
226
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
227
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
228
 
 
229
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
230
    shipped in Oneiric, an object with no clear method was used, hence the
 
231
    extra complications, see bug 809048 for details.
 
232
    """
 
233
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
234
    if type_equality_funcs is not None:
 
235
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
236
        if tef_clear is None:
 
237
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
238
            if tef_instance_dict is not None:
 
239
                tef_clear = tef_instance_dict.clear
 
240
        if tef_clear is not None:
 
241
            tef_clear()
 
242
 
 
243
 
 
244
class ExtendedTestResult(testtools.TextTestResult):
 
245
    """Accepts, reports and accumulates the results of running tests.
 
246
 
 
247
    Compared to the unittest version this class adds support for
 
248
    profiling, benchmarking, stopping as soon as a test fails,  and
 
249
    skipping tests.  There are further-specialized subclasses for
 
250
    different types of display.
 
251
 
 
252
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
253
    addFailure or addError methods.  These in turn may redirect to a more
 
254
    specific case for the special test results supported by our extended
 
255
    tests.
 
256
 
 
257
    Note that just one of these objects is fed the results from many tests.
 
258
    """
 
259
 
 
260
    stop_early = False
 
261
 
 
262
    def __init__(self, stream, descriptions, verbosity,
 
263
                 bench_history=None,
 
264
                 strict=False,
 
265
                 ):
 
266
        """Construct new TestResult.
 
267
 
 
268
        :param bench_history: Optionally, a writable file object to accumulate
 
269
            benchmark results.
 
270
        """
 
271
        testtools.TextTestResult.__init__(self, stream)
 
272
        if bench_history is not None:
 
273
            from breezy.version import _get_bzr_source_tree
 
274
            src_tree = _get_bzr_source_tree()
 
275
            if src_tree:
 
276
                try:
 
277
                    revision_id = src_tree.get_parent_ids()[0]
 
278
                except IndexError:
 
279
                    # XXX: if this is a brand new tree, do the same as if there
 
280
                    # is no branch.
 
281
                    revision_id = b''
 
282
            else:
 
283
                # XXX: If there's no branch, what should we do?
 
284
                revision_id = b''
 
285
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
286
        self._bench_history = bench_history
 
287
        self.ui = ui.ui_factory
 
288
        self.num_tests = 0
 
289
        self.error_count = 0
 
290
        self.failure_count = 0
 
291
        self.known_failure_count = 0
 
292
        self.skip_count = 0
 
293
        self.not_applicable_count = 0
 
294
        self.unsupported = {}
 
295
        self.count = 0
 
296
        self._overall_start_time = time.time()
 
297
        self._strict = strict
 
298
        self._first_thread_leaker_id = None
 
299
        self._tests_leaking_threads_count = 0
 
300
        self._traceback_from_test = None
 
301
 
 
302
    def stopTestRun(self):
 
303
        run = self.testsRun
 
304
        actionTaken = "Ran"
 
305
        stopTime = time.time()
 
306
        timeTaken = stopTime - self.startTime
 
307
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
308
        #                the parent class method is similar have to duplicate
 
309
        self._show_list('ERROR', self.errors)
 
310
        self._show_list('FAIL', self.failures)
 
311
        self.stream.write(self.sep2)
 
312
        self.stream.write("%s %d test%s in %.3fs\n\n" % (
 
313
            actionTaken, run, run != 1 and "s" or "", timeTaken))
 
314
        if not self.wasSuccessful():
 
315
            self.stream.write("FAILED (")
 
316
            failed, errored = map(len, (self.failures, self.errors))
 
317
            if failed:
 
318
                self.stream.write("failures=%d" % failed)
 
319
            if errored:
 
320
                if failed:
 
321
                    self.stream.write(", ")
 
322
                self.stream.write("errors=%d" % errored)
 
323
            if self.known_failure_count:
 
324
                if failed or errored:
 
325
                    self.stream.write(", ")
 
326
                self.stream.write("known_failure_count=%d" %
 
327
                                  self.known_failure_count)
 
328
            self.stream.write(")\n")
 
329
        else:
 
330
            if self.known_failure_count:
 
331
                self.stream.write("OK (known_failures=%d)\n" %
 
332
                                  self.known_failure_count)
 
333
            else:
 
334
                self.stream.write("OK\n")
 
335
        if self.skip_count > 0:
 
336
            skipped = self.skip_count
 
337
            self.stream.write('%d test%s skipped\n' %
 
338
                              (skipped, skipped != 1 and "s" or ""))
 
339
        if self.unsupported:
 
340
            for feature, count in sorted(self.unsupported.items()):
 
341
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
342
                                  (feature, count))
 
343
        if self._strict:
 
344
            ok = self.wasStrictlySuccessful()
 
345
        else:
 
346
            ok = self.wasSuccessful()
 
347
        if self._first_thread_leaker_id:
 
348
            self.stream.write(
 
349
                '%s is leaking threads among %d leaking tests.\n' % (
 
350
                    self._first_thread_leaker_id,
 
351
                    self._tests_leaking_threads_count))
 
352
            # We don't report the main thread as an active one.
 
353
            self.stream.write(
 
354
                '%d non-main threads were left active in the end.\n'
 
355
                % (len(self._active_threads) - 1))
 
356
 
 
357
    def getDescription(self, test):
 
358
        return test.id()
 
359
 
 
360
    def _extractBenchmarkTime(self, testCase, details=None):
 
361
        """Add a benchmark time for the current test case."""
 
362
        if details and 'benchtime' in details:
 
363
            return float(''.join(details['benchtime'].iter_bytes()))
 
364
        return getattr(testCase, "_benchtime", None)
 
365
 
 
366
    def _delta_to_float(self, a_timedelta, precision):
 
367
        # This calls ceiling to ensure that the most pessimistic view of time
 
368
        # taken is shown (rather than leaving it to the Python %f operator
 
369
        # to decide whether to round/floor/ceiling. This was added when we
 
370
        # had pyp3 test failures that suggest a floor was happening.
 
371
        shift = 10 ** precision
 
372
        return math.ceil(
 
373
            (a_timedelta.days * 86400.0 + a_timedelta.seconds +
 
374
             a_timedelta.microseconds / 1000000.0) * shift) / shift
 
375
 
 
376
    def _elapsedTestTimeString(self):
 
377
        """Return time string for overall time the current test has taken."""
 
378
        return self._formatTime(self._delta_to_float(
 
379
            self._now() - self._start_datetime, 3))
 
380
 
 
381
    def _testTimeString(self, testCase):
 
382
        benchmark_time = self._extractBenchmarkTime(testCase)
 
383
        if benchmark_time is not None:
 
384
            return self._formatTime(benchmark_time) + "*"
 
385
        else:
 
386
            return self._elapsedTestTimeString()
 
387
 
 
388
    def _formatTime(self, seconds):
 
389
        """Format seconds as milliseconds with leading spaces."""
 
390
        # some benchmarks can take thousands of seconds to run, so we need 8
 
391
        # places
 
392
        return "%8dms" % (1000 * seconds)
 
393
 
 
394
    def _shortened_test_description(self, test):
 
395
        what = test.id()
 
396
        what = re.sub(r'^breezy\.tests\.', '', what)
 
397
        return what
 
398
 
 
399
    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
 
400
    #                multiple times in a row, because the handler is added for
 
401
    #                each test but the container list is shared between cases.
 
402
    #                See lp:498869 lp:625574 and lp:637725 for background.
 
403
    def _record_traceback_from_test(self, exc_info):
 
404
        """Store the traceback from passed exc_info tuple till"""
 
405
        self._traceback_from_test = exc_info[2]
 
406
 
 
407
    def startTest(self, test):
 
408
        super(ExtendedTestResult, self).startTest(test)
 
409
        if self.count == 0:
 
410
            self.startTests()
 
411
        self.count += 1
 
412
        self.report_test_start(test)
 
413
        test.number = self.count
 
414
        self._recordTestStartTime()
 
415
        # Make testtools cases give us the real traceback on failure
 
416
        addOnException = getattr(test, "addOnException", None)
 
417
        if addOnException is not None:
 
418
            addOnException(self._record_traceback_from_test)
 
419
        # Only check for thread leaks on breezy derived test cases
 
420
        if isinstance(test, TestCase):
 
421
            test.addCleanup(self._check_leaked_threads, test)
 
422
 
 
423
    def stopTest(self, test):
 
424
        super(ExtendedTestResult, self).stopTest(test)
 
425
        # Manually break cycles, means touching various private things but hey
 
426
        getDetails = getattr(test, "getDetails", None)
 
427
        if getDetails is not None:
 
428
            getDetails().clear()
 
429
        _clear__type_equality_funcs(test)
 
430
        self._traceback_from_test = None
 
431
 
 
432
    def startTests(self):
 
433
        self.report_tests_starting()
 
434
        self._active_threads = threading.enumerate()
 
435
 
 
436
    def _check_leaked_threads(self, test):
 
437
        """See if any threads have leaked since last call
 
438
 
 
439
        A sample of live threads is stored in the _active_threads attribute,
 
440
        when this method runs it compares the current live threads and any not
 
441
        in the previous sample are treated as having leaked.
 
442
        """
 
443
        now_active_threads = set(threading.enumerate())
 
444
        threads_leaked = now_active_threads.difference(self._active_threads)
 
445
        if threads_leaked:
 
446
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
447
            self._tests_leaking_threads_count += 1
 
448
            if self._first_thread_leaker_id is None:
 
449
                self._first_thread_leaker_id = test.id()
 
450
            self._active_threads = now_active_threads
 
451
 
 
452
    def _recordTestStartTime(self):
 
453
        """Record that a test has started."""
 
454
        self._start_datetime = self._now()
 
455
 
 
456
    def addError(self, test, err):
 
457
        """Tell result that test finished with an error.
 
458
 
 
459
        Called from the TestCase run() method when the test
 
460
        fails with an unexpected error.
 
461
        """
 
462
        self._post_mortem(self._traceback_from_test or err[2])
 
463
        super(ExtendedTestResult, self).addError(test, err)
 
464
        self.error_count += 1
 
465
        self.report_error(test, err)
 
466
        if self.stop_early:
 
467
            self.stop()
 
468
 
 
469
    def addFailure(self, test, err):
 
470
        """Tell result that test failed.
 
471
 
 
472
        Called from the TestCase run() method when the test
 
473
        fails because e.g. an assert() method failed.
 
474
        """
 
475
        self._post_mortem(self._traceback_from_test or err[2])
 
476
        super(ExtendedTestResult, self).addFailure(test, err)
 
477
        self.failure_count += 1
 
478
        self.report_failure(test, err)
 
479
        if self.stop_early:
 
480
            self.stop()
 
481
 
 
482
    def addSuccess(self, test, details=None):
 
483
        """Tell result that test completed successfully.
 
484
 
 
485
        Called from the TestCase run()
 
486
        """
 
487
        if self._bench_history is not None:
 
488
            benchmark_time = self._extractBenchmarkTime(test, details)
 
489
            if benchmark_time is not None:
 
490
                self._bench_history.write("%s %s\n" % (
 
491
                    self._formatTime(benchmark_time),
 
492
                    test.id()))
 
493
        self.report_success(test)
 
494
        super(ExtendedTestResult, self).addSuccess(test)
 
495
        test._log_contents = ''
 
496
 
 
497
    def addExpectedFailure(self, test, err):
 
498
        self.known_failure_count += 1
 
499
        self.report_known_failure(test, err)
 
500
 
 
501
    def addUnexpectedSuccess(self, test, details=None):
 
502
        """Tell result the test unexpectedly passed, counting as a failure
 
503
 
 
504
        When the minimum version of testtools required becomes 0.9.8 this
 
505
        can be updated to use the new handling there.
 
506
        """
 
507
        super(ExtendedTestResult, self).addFailure(test, details=details)
 
508
        self.failure_count += 1
 
509
        self.report_unexpected_success(test,
 
510
                                       "".join(details["reason"].iter_text()))
 
511
        if self.stop_early:
 
512
            self.stop()
 
513
 
 
514
    def addNotSupported(self, test, feature):
 
515
        """The test will not be run because of a missing feature.
 
516
        """
 
517
        # this can be called in two different ways: it may be that the
 
518
        # test started running, and then raised (through requireFeature)
 
519
        # UnavailableFeature.  Alternatively this method can be called
 
520
        # while probing for features before running the test code proper; in
 
521
        # that case we will see startTest and stopTest, but the test will
 
522
        # never actually run.
 
523
        self.unsupported.setdefault(str(feature), 0)
 
524
        self.unsupported[str(feature)] += 1
 
525
        self.report_unsupported(test, feature)
 
526
 
 
527
    def addSkip(self, test, reason):
 
528
        """A test has not run for 'reason'."""
 
529
        self.skip_count += 1
 
530
        self.report_skip(test, reason)
 
531
 
 
532
    def addNotApplicable(self, test, reason):
 
533
        self.not_applicable_count += 1
 
534
        self.report_not_applicable(test, reason)
 
535
 
 
536
    def _count_stored_tests(self):
 
537
        """Count of tests instances kept alive due to not succeeding"""
 
538
        return self.error_count + self.failure_count + self.known_failure_count
 
539
 
 
540
    def _post_mortem(self, tb=None):
 
541
        """Start a PDB post mortem session."""
 
542
        if os.environ.get('BRZ_TEST_PDB', None):
 
543
            import pdb
 
544
            pdb.post_mortem(tb)
 
545
 
 
546
    def progress(self, offset, whence):
 
547
        """The test is adjusting the count of tests to run."""
 
548
        if whence == SUBUNIT_SEEK_SET:
 
549
            self.num_tests = offset
 
550
        elif whence == SUBUNIT_SEEK_CUR:
 
551
            self.num_tests += offset
 
552
        else:
 
553
            raise errors.BzrError("Unknown whence %r" % whence)
 
554
 
 
555
    def report_tests_starting(self):
 
556
        """Display information before the test run begins"""
 
557
        if getattr(sys, 'frozen', None) is None:
 
558
            bzr_path = osutils.realpath(sys.argv[0])
 
559
        else:
 
560
            bzr_path = sys.executable
 
561
        self.stream.write(
 
562
            'brz selftest: %s\n' % (bzr_path,))
 
563
        self.stream.write(
 
564
            '   %s\n' % (
 
565
                breezy.__path__[0],))
 
566
        self.stream.write(
 
567
            '   bzr-%s python-%s %s\n' % (
 
568
                breezy.version_string,
 
569
                breezy._format_version_tuple(sys.version_info),
 
570
                platform.platform(aliased=1),
 
571
                ))
 
572
        self.stream.write('\n')
 
573
 
 
574
    def report_test_start(self, test):
 
575
        """Display information on the test just about to be run"""
 
576
 
 
577
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
578
        """Display information on a test that leaked one or more threads"""
 
579
        # GZ 2010-09-09: A leak summary reported separately from the general
 
580
        #                thread debugging would be nice. Tests under subunit
 
581
        #                need something not using stream, perhaps adding a
 
582
        #                testtools details object would be fitting.
 
583
        if 'threads' in selftest_debug_flags:
 
584
            self.stream.write('%s is leaking, active is now %d\n' %
 
585
                              (test.id(), len(active_threads)))
 
586
 
 
587
    def startTestRun(self):
 
588
        self.startTime = time.time()
 
589
 
 
590
    def report_success(self, test):
 
591
        pass
 
592
 
 
593
    def wasStrictlySuccessful(self):
 
594
        if self.unsupported or self.known_failure_count:
 
595
            return False
 
596
        return self.wasSuccessful()
 
597
 
 
598
 
 
599
class TextTestResult(ExtendedTestResult):
 
600
    """Displays progress and results of tests in text form"""
 
601
 
 
602
    def __init__(self, stream, descriptions, verbosity,
 
603
                 bench_history=None,
 
604
                 strict=None,
 
605
                 ):
 
606
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
607
                                    bench_history, strict)
 
608
        self.pb = self.ui.nested_progress_bar()
 
609
        self.pb.show_pct = False
 
610
        self.pb.show_spinner = False
 
611
        self.pb.show_eta = False,
 
612
        self.pb.show_count = False
 
613
        self.pb.show_bar = False
 
614
        self.pb.update_latency = 0
 
615
        self.pb.show_transport_activity = False
 
616
 
 
617
    def stopTestRun(self):
 
618
        # called when the tests that are going to run have run
 
619
        self.pb.clear()
 
620
        self.pb.finished()
 
621
        super(TextTestResult, self).stopTestRun()
 
622
 
 
623
    def report_tests_starting(self):
 
624
        super(TextTestResult, self).report_tests_starting()
 
625
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
626
 
 
627
    def _progress_prefix_text(self):
 
628
        # the longer this text, the less space we have to show the test
 
629
        # name...
 
630
        a = '[%d' % self.count              # total that have been run
 
631
        # tests skipped as known not to be relevant are not important enough
 
632
        # to show here
 
633
        # if self.skip_count:
 
634
        #     a += ', %d skip' % self.skip_count
 
635
        # if self.known_failure_count:
 
636
        #     a += '+%dX' % self.known_failure_count
 
637
        if self.num_tests:
 
638
            a += '/%d' % self.num_tests
 
639
        a += ' in '
 
640
        runtime = time.time() - self._overall_start_time
 
641
        if runtime >= 60:
 
642
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
643
        else:
 
644
            a += '%ds' % runtime
 
645
        total_fail_count = self.error_count + self.failure_count
 
646
        if total_fail_count:
 
647
            a += ', %d failed' % total_fail_count
 
648
        # if self.unsupported:
 
649
        #     a += ', %d missing' % len(self.unsupported)
 
650
        a += ']'
 
651
        return a
 
652
 
 
653
    def report_test_start(self, test):
 
654
        self.pb.update(
 
655
            self._progress_prefix_text() +
 
656
            ' ' +
 
657
            self._shortened_test_description(test))
 
658
 
 
659
    def _test_description(self, test):
 
660
        return self._shortened_test_description(test)
 
661
 
 
662
    def report_error(self, test, err):
 
663
        self.stream.write('ERROR: %s\n    %s\n' % (
 
664
            self._test_description(test),
 
665
            err[1],
 
666
            ))
 
667
 
 
668
    def report_failure(self, test, err):
 
669
        self.stream.write('FAIL: %s\n    %s\n' % (
 
670
            self._test_description(test),
 
671
            err[1],
 
672
            ))
 
673
 
 
674
    def report_known_failure(self, test, err):
 
675
        pass
 
676
 
 
677
    def report_unexpected_success(self, test, reason):
 
678
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
 
679
            self._test_description(test),
 
680
            "Unexpected success. Should have failed",
 
681
            reason,
 
682
            ))
 
683
 
 
684
    def report_skip(self, test, reason):
 
685
        pass
 
686
 
 
687
    def report_not_applicable(self, test, reason):
 
688
        pass
 
689
 
 
690
    def report_unsupported(self, test, feature):
 
691
        """test cannot be run because feature is missing."""
 
692
 
 
693
 
 
694
class VerboseTestResult(ExtendedTestResult):
 
695
    """Produce long output, with one line per test run plus times"""
 
696
 
 
697
    def _ellipsize_to_right(self, a_string, final_width):
 
698
        """Truncate and pad a string, keeping the right hand side"""
 
699
        if len(a_string) > final_width:
 
700
            result = '...' + a_string[3 - final_width:]
 
701
        else:
 
702
            result = a_string
 
703
        return result.ljust(final_width)
 
704
 
 
705
    def report_tests_starting(self):
 
706
        self.stream.write('running %d tests...\n' % self.num_tests)
 
707
        super(VerboseTestResult, self).report_tests_starting()
 
708
 
 
709
    def report_test_start(self, test):
 
710
        name = self._shortened_test_description(test)
 
711
        width = osutils.terminal_width()
 
712
        if width is not None:
 
713
            # width needs space for 6 char status, plus 1 for slash, plus an
 
714
            # 11-char time string, plus a trailing blank
 
715
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
716
            # space
 
717
            self.stream.write(self._ellipsize_to_right(name, width - 18))
 
718
        else:
 
719
            self.stream.write(name)
 
720
        self.stream.flush()
 
721
 
 
722
    def _error_summary(self, err):
 
723
        indent = ' ' * 4
 
724
        return '%s%s' % (indent, err[1])
 
725
 
 
726
    def report_error(self, test, err):
 
727
        self.stream.write('ERROR %s\n%s\n'
 
728
                          % (self._testTimeString(test),
 
729
                             self._error_summary(err)))
 
730
 
 
731
    def report_failure(self, test, err):
 
732
        self.stream.write(' FAIL %s\n%s\n'
 
733
                          % (self._testTimeString(test),
 
734
                             self._error_summary(err)))
 
735
 
 
736
    def report_known_failure(self, test, err):
 
737
        self.stream.write('XFAIL %s\n%s\n'
 
738
                          % (self._testTimeString(test),
 
739
                             self._error_summary(err)))
 
740
 
 
741
    def report_unexpected_success(self, test, reason):
 
742
        self.stream.write(' FAIL %s\n%s: %s\n'
 
743
                          % (self._testTimeString(test),
 
744
                             "Unexpected success. Should have failed",
 
745
                             reason))
 
746
 
 
747
    def report_success(self, test):
 
748
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
749
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
750
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
751
            stats.pprint(file=self.stream)
 
752
        # flush the stream so that we get smooth output. This verbose mode is
 
753
        # used to show the output in PQM.
 
754
        self.stream.flush()
 
755
 
 
756
    def report_skip(self, test, reason):
 
757
        self.stream.write(' SKIP %s\n%s\n'
 
758
                          % (self._testTimeString(test), reason))
 
759
 
 
760
    def report_not_applicable(self, test, reason):
 
761
        self.stream.write('  N/A %s\n    %s\n'
 
762
                          % (self._testTimeString(test), reason))
 
763
 
 
764
    def report_unsupported(self, test, feature):
 
765
        """test cannot be run because feature is missing."""
 
766
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
767
                          % (self._testTimeString(test), feature))
 
768
 
 
769
 
 
770
class TextTestRunner(object):
 
771
    stop_on_failure = False
 
772
 
 
773
    def __init__(self,
 
774
                 stream=sys.stderr,
 
775
                 descriptions=0,
 
776
                 verbosity=1,
 
777
                 bench_history=None,
 
778
                 strict=False,
 
779
                 result_decorators=None,
 
780
                 ):
 
781
        """Create a TextTestRunner.
 
782
 
 
783
        :param result_decorators: An optional list of decorators to apply
 
784
            to the result object being used by the runner. Decorators are
 
785
            applied left to right - the first element in the list is the
 
786
            innermost decorator.
 
787
        """
 
788
        # stream may know claim to know to write unicode strings, but in older
 
789
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
790
        # specifically a built in file with encoding 'UTF-8' will still try
 
791
        # to encode using ascii.
 
792
        new_encoding = osutils.get_terminal_encoding()
 
793
        codec = codecs.lookup(new_encoding)
 
794
        encode = codec.encode
 
795
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
 
796
        #                so should swap to the plain codecs.StreamWriter
 
797
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
 
798
                                                     "backslashreplace")
 
799
        stream.encoding = new_encoding
 
800
        self.stream = stream
 
801
        self.descriptions = descriptions
 
802
        self.verbosity = verbosity
 
803
        self._bench_history = bench_history
 
804
        self._strict = strict
 
805
        self._result_decorators = result_decorators or []
 
806
 
 
807
    def run(self, test):
 
808
        "Run the given test case or test suite."
 
809
        if self.verbosity == 1:
 
810
            result_class = TextTestResult
 
811
        elif self.verbosity >= 2:
 
812
            result_class = VerboseTestResult
 
813
        original_result = result_class(self.stream,
 
814
                                       self.descriptions,
 
815
                                       self.verbosity,
 
816
                                       bench_history=self._bench_history,
 
817
                                       strict=self._strict,
 
818
                                       )
 
819
        # Signal to result objects that look at stop early policy to stop,
 
820
        original_result.stop_early = self.stop_on_failure
 
821
        result = original_result
 
822
        for decorator in self._result_decorators:
 
823
            result = decorator(result)
 
824
            result.stop_early = self.stop_on_failure
 
825
        result.startTestRun()
 
826
        try:
 
827
            test.run(result)
 
828
        finally:
 
829
            result.stopTestRun()
 
830
        # higher level code uses our extended protocol to determine
 
831
        # what exit code to give.
 
832
        return original_result
 
833
 
 
834
 
 
835
def iter_suite_tests(suite):
 
836
    """Return all tests in a suite, recursing through nested suites"""
 
837
    if isinstance(suite, unittest.TestCase):
 
838
        yield suite
 
839
    elif isinstance(suite, unittest.TestSuite):
 
840
        for item in suite:
 
841
            for r in iter_suite_tests(item):
 
842
                yield r
 
843
    else:
 
844
        raise Exception('unknown type %r for object %r'
 
845
                        % (type(suite), suite))
 
846
 
 
847
 
 
848
TestSkipped = testtools.testcase.TestSkipped
 
849
 
 
850
 
 
851
class TestNotApplicable(TestSkipped):
 
852
    """A test is not applicable to the situation where it was run.
 
853
 
 
854
    This is only normally raised by parameterized tests, if they find that
 
855
    the instance they're constructed upon does not support one aspect
 
856
    of its interface.
 
857
    """
 
858
 
 
859
 
 
860
# traceback._some_str fails to format exceptions that have the default
 
861
# __str__ which does an implicit ascii conversion. However, repr() on those
 
862
# objects works, for all that its not quite what the doctor may have ordered.
 
863
def _clever_some_str(value):
 
864
    try:
 
865
        return str(value)
 
866
    except BaseException:
 
867
        try:
 
868
            return repr(value).replace('\\n', '\n')
 
869
        except BaseException:
 
870
            return '<unprintable %s object>' % type(value).__name__
 
871
 
 
872
 
 
873
traceback._some_str = _clever_some_str
 
874
 
 
875
 
 
876
# deprecated - use self.knownFailure(), or self.expectFailure.
 
877
KnownFailure = testtools.testcase._ExpectedFailure
 
878
 
 
879
 
 
880
class UnavailableFeature(Exception):
 
881
    """A feature required for this test was not available.
 
882
 
 
883
    This can be considered a specialised form of SkippedTest.
 
884
 
 
885
    The feature should be used to construct the exception.
 
886
    """
 
887
 
 
888
 
 
889
class StringIOWrapper(ui_testing.BytesIOWithEncoding):
 
890
 
 
891
    @symbol_versioning.deprecated_method(
 
892
        symbol_versioning.deprecated_in((3, 0)))
 
893
    def __init__(self, s=None):
 
894
        super(StringIOWrapper, self).__init__(s)
 
895
 
 
896
 
 
897
TestUIFactory = ui_testing.TestUIFactory
 
898
 
 
899
 
 
900
def isolated_doctest_setUp(test):
 
901
    override_os_environ(test)
 
902
    osutils.set_or_unset_env('BRZ_HOME', '/nonexistent')
 
903
    test._orig_ui_factory = ui.ui_factory
 
904
    ui.ui_factory = ui.SilentUIFactory()
 
905
 
 
906
 
 
907
def isolated_doctest_tearDown(test):
 
908
    restore_os_environ(test)
 
909
    ui.ui_factory = test._orig_ui_factory
 
910
 
 
911
 
 
912
def IsolatedDocTestSuite(*args, **kwargs):
 
913
    """Overrides doctest.DocTestSuite to handle isolation.
 
914
 
 
915
    The method is really a factory and users are expected to use it as such.
 
916
    """
 
917
 
 
918
    kwargs['setUp'] = isolated_doctest_setUp
 
919
    kwargs['tearDown'] = isolated_doctest_tearDown
 
920
    return doctest.DocTestSuite(*args, **kwargs)
 
921
 
 
922
 
 
923
class TestCase(testtools.TestCase):
 
924
    """Base class for brz unit tests.
 
925
 
 
926
    Tests that need access to disk resources should subclass
 
927
    TestCaseInTempDir not TestCase.
 
928
 
 
929
    Error and debug log messages are redirected from their usual
 
930
    location into a temporary file, the contents of which can be
 
931
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
932
    so that it can also capture file IO.  When the test completes this file
 
933
    is read into memory and removed from disk.
 
934
 
 
935
    There are also convenience functions to invoke bzr's command-line
 
936
    routine, and to build and check brz trees.
 
937
 
 
938
    In addition to the usual method of overriding tearDown(), this class also
 
939
    allows subclasses to register cleanup functions via addCleanup, which are
 
940
    run in order as the object is torn down.  It's less likely this will be
 
941
    accidentally overlooked.
 
942
    """
 
943
 
 
944
    _log_file = None
 
945
    # record lsprof data when performing benchmark calls.
 
946
    _gather_lsprof_in_benchmarks = False
 
947
 
 
948
    def __init__(self, methodName='testMethod'):
 
949
        super(TestCase, self).__init__(methodName)
 
950
        self._directory_isolation = True
 
951
        self.exception_handlers.insert(
 
952
            0, (UnavailableFeature, self._do_unsupported_or_skip))
 
953
        self.exception_handlers.insert(
 
954
            0, (TestNotApplicable, self._do_not_applicable))
 
955
 
 
956
    def setUp(self):
 
957
        super(TestCase, self).setUp()
 
958
 
 
959
        # At this point we're still accessing the config files in $BRZ_HOME (as
 
960
        # set by the user running selftest).
 
961
        timeout = config.GlobalStack().get('selftest.timeout')
 
962
        if timeout:
 
963
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
964
            timeout_fixture.setUp()
 
965
            self.addCleanup(timeout_fixture.cleanUp)
 
966
 
 
967
        for feature in getattr(self, '_test_needs_features', []):
 
968
            self.requireFeature(feature)
 
969
        self._cleanEnvironment()
 
970
 
 
971
        self.overrideAttr(breezy.get_global_state(), 'cmdline_overrides',
 
972
                          config.CommandLineStore())
 
973
 
 
974
        self._silenceUI()
 
975
        self._startLogFile()
 
976
        self._benchcalls = []
 
977
        self._benchtime = None
 
978
        self._clear_hooks()
 
979
        self._track_transports()
 
980
        self._track_locks()
 
981
        self._clear_debug_flags()
 
982
        # Isolate global verbosity level, to make sure it's reproducible
 
983
        # between tests.  We should get rid of this altogether: bug 656694. --
 
984
        # mbp 20101008
 
985
        self.overrideAttr(breezy.trace, '_verbosity_level', 0)
 
986
        self._log_files = set()
 
987
        # Each key in the ``_counters`` dict holds a value for a different
 
988
        # counter. When the test ends, addDetail() should be used to output the
 
989
        # counter values. This happens in install_counter_hook().
 
990
        self._counters = {}
 
991
        if 'config_stats' in selftest_debug_flags:
 
992
            self._install_config_stats_hooks()
 
993
        # Do not use i18n for tests (unless the test reverses this)
 
994
        i18n.disable_i18n()
 
995
 
 
996
    def debug(self):
 
997
        # debug a frame up.
 
998
        import pdb
 
999
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1000
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1001
                ).set_trace(sys._getframe().f_back)
 
1002
 
 
1003
    def discardDetail(self, name):
 
1004
        """Extend the addDetail, getDetails api so we can remove a detail.
 
1005
 
 
1006
        eg. brz always adds the 'log' detail at startup, but we don't want to
 
1007
        include it for skipped, xfail, etc tests.
 
1008
 
 
1009
        It is safe to call this for a detail that doesn't exist, in case this
 
1010
        gets called multiple times.
 
1011
        """
 
1012
        # We cheat. details is stored in __details which means we shouldn't
 
1013
        # touch it. but getDetails() returns the dict directly, so we can
 
1014
        # mutate it.
 
1015
        details = self.getDetails()
 
1016
        if name in details:
 
1017
            del details[name]
 
1018
 
 
1019
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1020
        """Install a counting hook.
 
1021
 
 
1022
        Any hook can be counted as long as it doesn't need to return a value.
 
1023
 
 
1024
        :param hooks: Where the hook should be installed.
 
1025
 
 
1026
        :param name: The hook name that will be counted.
 
1027
 
 
1028
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1029
            to ``name``.
 
1030
        """
 
1031
        _counters = self._counters  # Avoid closing over self
 
1032
        if counter_name is None:
 
1033
            counter_name = name
 
1034
        if counter_name in _counters:
 
1035
            raise AssertionError('%s is already used as a counter name'
 
1036
                                 % (counter_name,))
 
1037
        _counters[counter_name] = 0
 
1038
        self.addDetail(counter_name, content.Content(
 
1039
            content.UTF8_TEXT,
 
1040
            lambda: [b'%d' % (_counters[counter_name],)]))
 
1041
 
 
1042
        def increment_counter(*args, **kwargs):
 
1043
            _counters[counter_name] += 1
 
1044
        label = 'count %s calls' % (counter_name,)
 
1045
        hooks.install_named_hook(name, increment_counter, label)
 
1046
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1047
 
 
1048
    def _install_config_stats_hooks(self):
 
1049
        """Install config hooks to count hook calls.
 
1050
 
 
1051
        """
 
1052
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1053
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1054
                                      'config.%s' % (hook_name,))
 
1055
 
 
1056
        # The OldConfigHooks are private and need special handling to protect
 
1057
        # against recursive tests (tests that run other tests), so we just do
 
1058
        # manually what registering them into _builtin_known_hooks will provide
 
1059
        # us.
 
1060
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1061
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1062
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1063
                                      'old_config.%s' % (hook_name,))
 
1064
 
 
1065
    def _clear_debug_flags(self):
 
1066
        """Prevent externally set debug flags affecting tests.
 
1067
 
 
1068
        Tests that want to use debug flags can just set them in the
 
1069
        debug_flags set during setup/teardown.
 
1070
        """
 
1071
        # Start with a copy of the current debug flags we can safely modify.
 
1072
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
1073
        if 'allow_debug' not in selftest_debug_flags:
 
1074
            debug.debug_flags.clear()
 
1075
        if 'disable_lock_checks' not in selftest_debug_flags:
 
1076
            debug.debug_flags.add('strict_locks')
 
1077
 
 
1078
    def _clear_hooks(self):
 
1079
        # prevent hooks affecting tests
 
1080
        known_hooks = hooks.known_hooks
 
1081
        self._preserved_hooks = {}
 
1082
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1083
            current_hooks = getattr(parent, name)
 
1084
            self._preserved_hooks[parent] = (name, current_hooks)
 
1085
        self._preserved_lazy_hooks = hooks._lazy_hooks
 
1086
        hooks._lazy_hooks = {}
 
1087
        self.addCleanup(self._restoreHooks)
 
1088
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1089
            factory = known_hooks.get(key)
 
1090
            setattr(parent, name, factory())
 
1091
        # this hook should always be installed
 
1092
        request._install_hook()
 
1093
 
 
1094
    def disable_directory_isolation(self):
 
1095
        """Turn off directory isolation checks."""
 
1096
        self._directory_isolation = False
 
1097
 
 
1098
    def enable_directory_isolation(self):
 
1099
        """Enable directory isolation checks."""
 
1100
        self._directory_isolation = True
 
1101
 
 
1102
    def _silenceUI(self):
 
1103
        """Turn off UI for duration of test"""
 
1104
        # by default the UI is off; tests can turn it on if they want it.
 
1105
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
1106
 
 
1107
    def _check_locks(self):
 
1108
        """Check that all lock take/release actions have been paired."""
 
1109
        # We always check for mismatched locks. If a mismatch is found, we
 
1110
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
1111
        # case we just print a warning.
 
1112
        # unhook:
 
1113
        acquired_locks = [lock for action, lock in self._lock_actions
 
1114
                          if action == 'acquired']
 
1115
        released_locks = [lock for action, lock in self._lock_actions
 
1116
                          if action == 'released']
 
1117
        broken_locks = [lock for action, lock in self._lock_actions
 
1118
                        if action == 'broken']
 
1119
        # trivially, given the tests for lock acquistion and release, if we
 
1120
        # have as many in each list, it should be ok. Some lock tests also
 
1121
        # break some locks on purpose and should be taken into account by
 
1122
        # considering that breaking a lock is just a dirty way of releasing it.
 
1123
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
1124
            message = (
 
1125
                'Different number of acquired and '
 
1126
                'released or broken locks.\n'
 
1127
                'acquired=%s\n'
 
1128
                'released=%s\n'
 
1129
                'broken=%s\n' %
 
1130
                (acquired_locks, released_locks, broken_locks))
 
1131
            if not self._lock_check_thorough:
 
1132
                # Rather than fail, just warn
 
1133
                print("Broken test %s: %s" % (self, message))
 
1134
                return
 
1135
            self.fail(message)
 
1136
 
 
1137
    def _track_locks(self):
 
1138
        """Track lock activity during tests."""
 
1139
        self._lock_actions = []
 
1140
        if 'disable_lock_checks' in selftest_debug_flags:
 
1141
            self._lock_check_thorough = False
 
1142
        else:
 
1143
            self._lock_check_thorough = True
 
1144
 
 
1145
        self.addCleanup(self._check_locks)
 
1146
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
1147
                                                self._lock_acquired, None)
 
1148
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
1149
                                                self._lock_released, None)
 
1150
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
1151
                                                self._lock_broken, None)
 
1152
 
 
1153
    def _lock_acquired(self, result):
 
1154
        self._lock_actions.append(('acquired', result))
 
1155
 
 
1156
    def _lock_released(self, result):
 
1157
        self._lock_actions.append(('released', result))
 
1158
 
 
1159
    def _lock_broken(self, result):
 
1160
        self._lock_actions.append(('broken', result))
 
1161
 
 
1162
    def permit_dir(self, name):
 
1163
        """Permit a directory to be used by this test. See permit_url."""
 
1164
        name_transport = _mod_transport.get_transport_from_path(name)
 
1165
        self.permit_url(name)
 
1166
        self.permit_url(name_transport.base)
 
1167
 
 
1168
    def permit_url(self, url):
 
1169
        """Declare that url is an ok url to use in this test.
 
1170
 
 
1171
        Do this for memory transports, temporary test directory etc.
 
1172
 
 
1173
        Do not do this for the current working directory, /tmp, or any other
 
1174
        preexisting non isolated url.
 
1175
        """
 
1176
        if not url.endswith('/'):
 
1177
            url += '/'
 
1178
        self._bzr_selftest_roots.append(url)
 
1179
 
 
1180
    def permit_source_tree_branch_repo(self):
 
1181
        """Permit the source tree brz is running from to be opened.
 
1182
 
 
1183
        Some code such as breezy.version attempts to read from the brz branch
 
1184
        that brz is executing from (if any). This method permits that directory
 
1185
        to be used in the test suite.
 
1186
        """
 
1187
        path = self.get_source_path()
 
1188
        self.record_directory_isolation()
 
1189
        try:
 
1190
            try:
 
1191
                workingtree.WorkingTree.open(path)
 
1192
            except (errors.NotBranchError, errors.NoWorkingTree):
 
1193
                raise TestSkipped('Needs a working tree of brz sources')
 
1194
        finally:
 
1195
            self.enable_directory_isolation()
 
1196
 
 
1197
    def _preopen_isolate_transport(self, transport):
 
1198
        """Check that all transport openings are done in the test work area."""
 
1199
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
1200
            # Unwrap pathfiltered transports
 
1201
            transport = transport.server.backing_transport.clone(
 
1202
                transport._filter('.'))
 
1203
        url = transport.base
 
1204
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1205
        # urls it is given by prepending readonly+. This is appropriate as the
 
1206
        # client shouldn't know that the server is readonly (or not readonly).
 
1207
        # We could register all servers twice, with readonly+ prepending, but
 
1208
        # that makes for a long list; this is about the same but easier to
 
1209
        # read.
 
1210
        if url.startswith('readonly+'):
 
1211
            url = url[len('readonly+'):]
 
1212
        self._preopen_isolate_url(url)
 
1213
 
 
1214
    def _preopen_isolate_url(self, url):
 
1215
        if not self._directory_isolation:
 
1216
            return
 
1217
        if self._directory_isolation == 'record':
 
1218
            self._bzr_selftest_roots.append(url)
 
1219
            return
 
1220
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1221
        # from working unless they are explicitly granted permission. We then
 
1222
        # depend on the code that sets up test transports to check that they
 
1223
        # are appropriately isolated and enable their use by calling
 
1224
        # self.permit_transport()
 
1225
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1226
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1227
                                  % (url, self._bzr_selftest_roots))
 
1228
 
 
1229
    def record_directory_isolation(self):
 
1230
        """Gather accessed directories to permit later access.
 
1231
 
 
1232
        This is used for tests that access the branch brz is running from.
 
1233
        """
 
1234
        self._directory_isolation = "record"
 
1235
 
 
1236
    def start_server(self, transport_server, backing_server=None):
 
1237
        """Start transport_server for this test.
 
1238
 
 
1239
        This starts the server, registers a cleanup for it and permits the
 
1240
        server's urls to be used.
 
1241
        """
 
1242
        if backing_server is None:
 
1243
            transport_server.start_server()
 
1244
        else:
 
1245
            transport_server.start_server(backing_server)
 
1246
        self.addCleanup(transport_server.stop_server)
 
1247
        # Obtain a real transport because if the server supplies a password, it
 
1248
        # will be hidden from the base on the client side.
 
1249
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1250
        # Some transport servers effectively chroot the backing transport;
 
1251
        # others like SFTPServer don't - users of the transport can walk up the
 
1252
        # transport to read the entire backing transport. This wouldn't matter
 
1253
        # except that the workdir tests are given - and that they expect the
 
1254
        # server's url to point at - is one directory under the safety net. So
 
1255
        # Branch operations into the transport will attempt to walk up one
 
1256
        # directory. Chrooting all servers would avoid this but also mean that
 
1257
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1258
        # getting the test framework to start the server with a backing server
 
1259
        # at the actual safety net directory would work too, but this then
 
1260
        # means that the self.get_url/self.get_transport methods would need
 
1261
        # to transform all their results. On balance its cleaner to handle it
 
1262
        # here, and permit a higher url when we have one of these transports.
 
1263
        if t.base.endswith('/work/'):
 
1264
            # we have safety net/test root/work
 
1265
            t = t.clone('../..')
 
1266
        elif isinstance(transport_server,
 
1267
                        test_server.SmartTCPServer_for_testing):
 
1268
            # The smart server adds a path similar to work, which is traversed
 
1269
            # up from by the client. But the server is chrooted - the actual
 
1270
            # backing transport is not escaped from, and VFS requests to the
 
1271
            # root will error (because they try to escape the chroot).
 
1272
            t2 = t.clone('..')
 
1273
            while t2.base != t.base:
 
1274
                t = t2
 
1275
                t2 = t.clone('..')
 
1276
        self.permit_url(t.base)
 
1277
 
 
1278
    def _track_transports(self):
 
1279
        """Install checks for transport usage."""
 
1280
        # TestCase has no safe place it can write to.
 
1281
        self._bzr_selftest_roots = []
 
1282
        # Currently the easiest way to be sure that nothing is going on is to
 
1283
        # hook into brz dir opening. This leaves a small window of error for
 
1284
        # transport tests, but they are well known, and we can improve on this
 
1285
        # step.
 
1286
        controldir.ControlDir.hooks.install_named_hook(
 
1287
            "pre_open", self._preopen_isolate_transport,
 
1288
            "Check brz directories are safe.")
 
1289
 
 
1290
    def _ndiff_strings(self, a, b):
 
1291
        """Return ndiff between two strings containing lines.
 
1292
 
 
1293
        A trailing newline is added if missing to make the strings
 
1294
        print properly."""
 
1295
        if b and not b.endswith('\n'):
 
1296
            b += '\n'
 
1297
        if a and not a.endswith('\n'):
 
1298
            a += '\n'
 
1299
        difflines = difflib.ndiff(a.splitlines(True),
 
1300
                                  b.splitlines(True),
 
1301
                                  linejunk=lambda x: False,
 
1302
                                  charjunk=lambda x: False)
 
1303
        return ''.join(difflines)
 
1304
 
 
1305
    def assertEqual(self, a, b, message=''):
 
1306
        try:
 
1307
            if a == b:
 
1308
                return
 
1309
        except UnicodeError as e:
 
1310
            # If we can't compare without getting a UnicodeError, then
 
1311
            # obviously they are different
 
1312
            trace.mutter('UnicodeError: %s', e)
 
1313
        if message:
 
1314
            message += '\n'
 
1315
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1316
                             % (message,
 
1317
                                pprint.pformat(a), pprint.pformat(b)))
 
1318
 
 
1319
    # FIXME: This is deprecated in unittest2 but plugins may still use it so we
 
1320
    # need a deprecation period for them -- vila 2016-02-01
 
1321
    assertEquals = assertEqual
 
1322
 
 
1323
    def assertEqualDiff(self, a, b, message=None):
 
1324
        """Assert two texts are equal, if not raise an exception.
 
1325
 
 
1326
        This is intended for use with multi-line strings where it can
 
1327
        be hard to find the differences by eye.
 
1328
        """
 
1329
        # TODO: perhaps override assertEqual to call this for strings?
 
1330
        if a == b:
 
1331
            return
 
1332
        if message is None:
 
1333
            message = "texts not equal:\n"
 
1334
        if a + ('\n' if isinstance(a, text_type) else b'\n') == b:
 
1335
            message = 'first string is missing a final newline.\n'
 
1336
        if a == b + ('\n' if isinstance(b, text_type) else b'\n'):
 
1337
            message = 'second string is missing a final newline.\n'
 
1338
        raise AssertionError(message
 
1339
                             + self._ndiff_strings(a, b))
 
1340
 
 
1341
    def assertEqualMode(self, mode, mode_test):
 
1342
        self.assertEqual(mode, mode_test,
 
1343
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1344
 
 
1345
    def assertEqualStat(self, expected, actual):
 
1346
        """assert that expected and actual are the same stat result.
 
1347
 
 
1348
        :param expected: A stat result.
 
1349
        :param actual: A stat result.
 
1350
        :raises AssertionError: If the expected and actual stat values differ
 
1351
            other than by atime.
 
1352
        """
 
1353
        self.assertEqual(expected.st_size, actual.st_size,
 
1354
                         'st_size did not match')
 
1355
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1356
                         'st_mtime did not match')
 
1357
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1358
                         'st_ctime did not match')
 
1359
        if sys.platform == 'win32':
 
1360
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1361
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1362
            # odd. We just force it to always be 0 to avoid any problems.
 
1363
            self.assertEqual(0, expected.st_dev)
 
1364
            self.assertEqual(0, actual.st_dev)
 
1365
            self.assertEqual(0, expected.st_ino)
 
1366
            self.assertEqual(0, actual.st_ino)
 
1367
        else:
 
1368
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1369
                             'st_dev did not match')
 
1370
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1371
                             'st_ino did not match')
 
1372
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1373
                         'st_mode did not match')
 
1374
 
 
1375
    def assertLength(self, length, obj_with_len):
 
1376
        """Assert that obj_with_len is of length length."""
 
1377
        if len(obj_with_len) != length:
 
1378
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1379
                length, len(obj_with_len), obj_with_len))
 
1380
 
 
1381
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1382
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1383
        """
 
1384
        captured = []
 
1385
        orig_log_exception_quietly = trace.log_exception_quietly
 
1386
        try:
 
1387
            def capture():
 
1388
                orig_log_exception_quietly()
 
1389
                captured.append(sys.exc_info()[1])
 
1390
            trace.log_exception_quietly = capture
 
1391
            func(*args, **kwargs)
 
1392
        finally:
 
1393
            trace.log_exception_quietly = orig_log_exception_quietly
 
1394
        self.assertLength(1, captured)
 
1395
        err = captured[0]
 
1396
        self.assertIsInstance(err, exception_class)
 
1397
        return err
 
1398
 
 
1399
    def assertPositive(self, val):
 
1400
        """Assert that val is greater than 0."""
 
1401
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1402
 
 
1403
    def assertNegative(self, val):
 
1404
        """Assert that val is less than 0."""
 
1405
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1406
 
 
1407
    def assertStartsWith(self, s, prefix):
 
1408
        if not s.startswith(prefix):
 
1409
            raise AssertionError(
 
1410
                'string %r does not start with %r' % (s, prefix))
 
1411
 
 
1412
    def assertEndsWith(self, s, suffix):
 
1413
        """Asserts that s ends with suffix."""
 
1414
        if not s.endswith(suffix):
 
1415
            raise AssertionError(
 
1416
                'string %r does not end with %r' % (s, suffix))
 
1417
 
 
1418
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1419
        """Assert that a contains something matching a regular expression."""
 
1420
        if not re.search(needle_re, haystack, flags):
 
1421
            if (('\n' if isinstance(haystack, str) else b'\n') in haystack or
 
1422
                    len(haystack) > 60):
 
1423
                # a long string, format it in a more readable way
 
1424
                raise AssertionError(
 
1425
                    'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1426
                    % (needle_re, haystack))
 
1427
            else:
 
1428
                raise AssertionError('pattern "%s" not found in "%s"'
 
1429
                                     % (needle_re, haystack))
 
1430
 
 
1431
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1432
        """Assert that a does not match a regular expression"""
 
1433
        if re.search(needle_re, haystack, flags):
 
1434
            raise AssertionError('pattern "%s" found in "%s"'
 
1435
                                 % (needle_re, haystack))
 
1436
 
 
1437
    def assertContainsString(self, haystack, needle):
 
1438
        if haystack.find(needle) == -1:
 
1439
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1440
 
 
1441
    def assertNotContainsString(self, haystack, needle):
 
1442
        if haystack.find(needle) != -1:
 
1443
            self.fail("string %r found in '''%s'''" % (needle, haystack))
 
1444
 
 
1445
    def assertSubset(self, sublist, superlist):
 
1446
        """Assert that every entry in sublist is present in superlist."""
 
1447
        missing = set(sublist) - set(superlist)
 
1448
        if len(missing) > 0:
 
1449
            raise AssertionError("value(s) %r not present in container %r" %
 
1450
                                 (missing, superlist))
 
1451
 
 
1452
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1453
        """Fail unless excClass is raised when the iterator from func is used.
 
1454
 
 
1455
        Many functions can return generators this makes sure
 
1456
        to wrap them in a list() call to make sure the whole generator
 
1457
        is run, and that the proper exception is raised.
 
1458
        """
 
1459
        try:
 
1460
            list(func(*args, **kwargs))
 
1461
        except excClass as e:
 
1462
            return e
 
1463
        else:
 
1464
            if getattr(excClass, '__name__', None) is not None:
 
1465
                excName = excClass.__name__
 
1466
            else:
 
1467
                excName = str(excClass)
 
1468
            raise self.failureException("%s not raised" % excName)
 
1469
 
 
1470
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1471
        """Assert that a callable raises a particular exception.
 
1472
 
 
1473
        :param excClass: As for the except statement, this may be either an
 
1474
            exception class, or a tuple of classes.
 
1475
        :param callableObj: A callable, will be passed ``*args`` and
 
1476
            ``**kwargs``.
 
1477
 
 
1478
        Returns the exception so that you can examine it.
 
1479
        """
 
1480
        try:
 
1481
            callableObj(*args, **kwargs)
 
1482
        except excClass as e:
 
1483
            return e
 
1484
        else:
 
1485
            if getattr(excClass, '__name__', None) is not None:
 
1486
                excName = excClass.__name__
 
1487
            else:
 
1488
                # probably a tuple
 
1489
                excName = str(excClass)
 
1490
            raise self.failureException("%s not raised" % excName)
 
1491
 
 
1492
    def assertIs(self, left, right, message=None):
 
1493
        if not (left is right):
 
1494
            if message is not None:
 
1495
                raise AssertionError(message)
 
1496
            else:
 
1497
                raise AssertionError("%r is not %r." % (left, right))
 
1498
 
 
1499
    def assertIsNot(self, left, right, message=None):
 
1500
        if (left is right):
 
1501
            if message is not None:
 
1502
                raise AssertionError(message)
 
1503
            else:
 
1504
                raise AssertionError("%r is %r." % (left, right))
 
1505
 
 
1506
    def assertTransportMode(self, transport, path, mode):
 
1507
        """Fail if a path does not have mode "mode".
 
1508
 
 
1509
        If modes are not supported on this transport, the assertion is ignored.
 
1510
        """
 
1511
        if not transport._can_roundtrip_unix_modebits():
 
1512
            return
 
1513
        path_stat = transport.stat(path)
 
1514
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1515
        self.assertEqual(mode, actual_mode,
 
1516
                         'mode of %r incorrect (%s != %s)'
 
1517
                         % (path, oct(mode), oct(actual_mode)))
 
1518
 
 
1519
    def assertIsSameRealPath(self, path1, path2):
 
1520
        """Fail if path1 and path2 points to different files"""
 
1521
        self.assertEqual(osutils.realpath(path1),
 
1522
                         osutils.realpath(path2),
 
1523
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1524
 
 
1525
    def assertIsInstance(self, obj, kls, msg=None):
 
1526
        """Fail if obj is not an instance of kls
 
1527
 
 
1528
        :param msg: Supplementary message to show if the assertion fails.
 
1529
        """
 
1530
        if not isinstance(obj, kls):
 
1531
            m = "%r is an instance of %s rather than %s" % (
 
1532
                obj, obj.__class__, kls)
 
1533
            if msg:
 
1534
                m += ": " + msg
 
1535
            self.fail(m)
 
1536
 
 
1537
    def assertFileEqual(self, content, path):
 
1538
        """Fail if path does not contain 'content'."""
 
1539
        self.assertPathExists(path)
 
1540
 
 
1541
        mode = 'r' + ('b' if isinstance(content, bytes) else '')
 
1542
        with open(path, mode) as f:
 
1543
            s = f.read()
 
1544
        self.assertEqualDiff(content, s)
 
1545
 
 
1546
    def assertDocstring(self, expected_docstring, obj):
 
1547
        """Fail if obj does not have expected_docstring"""
 
1548
        if __doc__ is None:
 
1549
            # With -OO the docstring should be None instead
 
1550
            self.assertIs(obj.__doc__, None)
 
1551
        else:
 
1552
            self.assertEqual(expected_docstring, obj.__doc__)
 
1553
 
 
1554
    def assertPathExists(self, path):
 
1555
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1556
        # TODO(jelmer): Clean this up for pad.lv/1696545
 
1557
        if not isinstance(path, (bytes, str, text_type)):
 
1558
            for p in path:
 
1559
                self.assertPathExists(p)
 
1560
        else:
 
1561
            self.assertTrue(osutils.lexists(path),
 
1562
                            path + " does not exist")
 
1563
 
 
1564
    def assertPathDoesNotExist(self, path):
 
1565
        """Fail if path or paths, which may be abs or relative, exist."""
 
1566
        if not isinstance(path, (str, text_type)):
 
1567
            for p in path:
 
1568
                self.assertPathDoesNotExist(p)
 
1569
        else:
 
1570
            self.assertFalse(osutils.lexists(path),
 
1571
                             path + " exists")
 
1572
 
 
1573
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1574
        """A helper for callDeprecated and applyDeprecated.
 
1575
 
 
1576
        :param a_callable: A callable to call.
 
1577
        :param args: The positional arguments for the callable
 
1578
        :param kwargs: The keyword arguments for the callable
 
1579
        :return: A tuple (warnings, result). result is the result of calling
 
1580
            a_callable(``*args``, ``**kwargs``).
 
1581
        """
 
1582
        local_warnings = []
 
1583
 
 
1584
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1585
            # we've hooked into a deprecation specific callpath,
 
1586
            # only deprecations should getting sent via it.
 
1587
            self.assertEqual(cls, DeprecationWarning)
 
1588
            local_warnings.append(msg)
 
1589
        original_warning_method = symbol_versioning.warn
 
1590
        symbol_versioning.set_warning_method(capture_warnings)
 
1591
        try:
 
1592
            result = a_callable(*args, **kwargs)
 
1593
        finally:
 
1594
            symbol_versioning.set_warning_method(original_warning_method)
 
1595
        return (local_warnings, result)
 
1596
 
 
1597
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1598
        """Call a deprecated callable without warning the user.
 
1599
 
 
1600
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1601
        not other callers that go direct to the warning module.
 
1602
 
 
1603
        To test that a deprecated method raises an error, do something like
 
1604
        this (remember that both assertRaises and applyDeprecated delays *args
 
1605
        and **kwargs passing)::
 
1606
 
 
1607
            self.assertRaises(errors.ReservedId,
 
1608
                self.applyDeprecated,
 
1609
                deprecated_in((1, 5, 0)),
 
1610
                br.append_revision,
 
1611
                'current:')
 
1612
 
 
1613
        :param deprecation_format: The deprecation format that the callable
 
1614
            should have been deprecated with. This is the same type as the
 
1615
            parameter to deprecated_method/deprecated_function. If the
 
1616
            callable is not deprecated with this format, an assertion error
 
1617
            will be raised.
 
1618
        :param a_callable: A callable to call. This may be a bound method or
 
1619
            a regular function. It will be called with ``*args`` and
 
1620
            ``**kwargs``.
 
1621
        :param args: The positional arguments for the callable
 
1622
        :param kwargs: The keyword arguments for the callable
 
1623
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1624
        """
 
1625
        call_warnings, result = self._capture_deprecation_warnings(
 
1626
            a_callable, *args, **kwargs)
 
1627
        expected_first_warning = symbol_versioning.deprecation_string(
 
1628
            a_callable, deprecation_format)
 
1629
        if len(call_warnings) == 0:
 
1630
            self.fail("No deprecation warning generated by call to %s" %
 
1631
                      a_callable)
 
1632
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1633
        return result
 
1634
 
 
1635
    def callCatchWarnings(self, fn, *args, **kw):
 
1636
        """Call a callable that raises python warnings.
 
1637
 
 
1638
        The caller's responsible for examining the returned warnings.
 
1639
 
 
1640
        If the callable raises an exception, the exception is not
 
1641
        caught and propagates up to the caller.  In that case, the list
 
1642
        of warnings is not available.
 
1643
 
 
1644
        :returns: ([warning_object, ...], fn_result)
 
1645
        """
 
1646
        # XXX: This is not perfect, because it completely overrides the
 
1647
        # warnings filters, and some code may depend on suppressing particular
 
1648
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1649
        # though.  -- Andrew, 20071062
 
1650
        wlist = []
 
1651
 
 
1652
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1653
            # despite the name, 'message' is normally(?) a Warning subclass
 
1654
            # instance
 
1655
            wlist.append(message)
 
1656
        saved_showwarning = warnings.showwarning
 
1657
        saved_filters = warnings.filters
 
1658
        try:
 
1659
            warnings.showwarning = _catcher
 
1660
            warnings.filters = []
 
1661
            result = fn(*args, **kw)
 
1662
        finally:
 
1663
            warnings.showwarning = saved_showwarning
 
1664
            warnings.filters = saved_filters
 
1665
        return wlist, result
 
1666
 
 
1667
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1668
        """Assert that a callable is deprecated in a particular way.
 
1669
 
 
1670
        This is a very precise test for unusual requirements. The
 
1671
        applyDeprecated helper function is probably more suited for most tests
 
1672
        as it allows you to simply specify the deprecation format being used
 
1673
        and will ensure that that is issued for the function being called.
 
1674
 
 
1675
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1676
        not other callers that go direct to the warning module.  To catch
 
1677
        general warnings, use callCatchWarnings.
 
1678
 
 
1679
        :param expected: a list of the deprecation warnings expected, in order
 
1680
        :param callable: The callable to call
 
1681
        :param args: The positional arguments for the callable
 
1682
        :param kwargs: The keyword arguments for the callable
 
1683
        """
 
1684
        call_warnings, result = self._capture_deprecation_warnings(
 
1685
            callable, *args, **kwargs)
 
1686
        self.assertEqual(expected, call_warnings)
 
1687
        return result
 
1688
 
 
1689
    def _startLogFile(self):
 
1690
        """Setup a in-memory target for bzr and testcase log messages"""
 
1691
        pseudo_log_file = BytesIO()
 
1692
 
 
1693
        def _get_log_contents_for_weird_testtools_api():
 
1694
            return [pseudo_log_file.getvalue().decode(
 
1695
                "utf-8", "replace").encode("utf-8")]
 
1696
        self.addDetail(
 
1697
            "log", content.Content(
 
1698
                content.ContentType("text", "plain", {"charset": "utf8"}),
 
1699
                _get_log_contents_for_weird_testtools_api))
 
1700
        self._log_file = pseudo_log_file
 
1701
        self._log_memento = trace.push_log_file(self._log_file)
 
1702
        self.addCleanup(self._finishLogFile)
 
1703
 
 
1704
    def _finishLogFile(self):
 
1705
        """Flush and dereference the in-memory log for this testcase"""
 
1706
        if trace._trace_file:
 
1707
            # flush the log file, to get all content
 
1708
            trace._trace_file.flush()
 
1709
        trace.pop_log_file(self._log_memento)
 
1710
        # The logging module now tracks references for cleanup so discard ours
 
1711
        del self._log_memento
 
1712
 
 
1713
    def thisFailsStrictLockCheck(self):
 
1714
        """It is known that this test would fail with -Dstrict_locks.
 
1715
 
 
1716
        By default, all tests are run with strict lock checking unless
 
1717
        -Edisable_lock_checks is supplied. However there are some tests which
 
1718
        we know fail strict locks at this point that have not been fixed.
 
1719
        They should call this function to disable the strict checking.
 
1720
 
 
1721
        This should be used sparingly, it is much better to fix the locking
 
1722
        issues rather than papering over the problem by calling this function.
 
1723
        """
 
1724
        debug.debug_flags.discard('strict_locks')
 
1725
 
 
1726
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1727
        """Overrides an object attribute restoring it after the test.
 
1728
 
 
1729
        :note: This should be used with discretion; you should think about
 
1730
        whether it's better to make the code testable without monkey-patching.
 
1731
 
 
1732
        :param obj: The object that will be mutated.
 
1733
 
 
1734
        :param attr_name: The attribute name we want to preserve/override in
 
1735
            the object.
 
1736
 
 
1737
        :param new: The optional value we want to set the attribute to.
 
1738
 
 
1739
        :returns: The actual attr value.
 
1740
        """
 
1741
        # The actual value is captured by the call below
 
1742
        value = getattr(obj, attr_name, _unitialized_attr)
 
1743
        if value is _unitialized_attr:
 
1744
            # When the test completes, the attribute should not exist, but if
 
1745
            # we aren't setting a value, we don't need to do anything.
 
1746
            if new is not _unitialized_attr:
 
1747
                self.addCleanup(delattr, obj, attr_name)
 
1748
        else:
 
1749
            self.addCleanup(setattr, obj, attr_name, value)
 
1750
        if new is not _unitialized_attr:
 
1751
            setattr(obj, attr_name, new)
 
1752
        return value
 
1753
 
 
1754
    def overrideEnv(self, name, new):
 
1755
        """Set an environment variable, and reset it after the test.
 
1756
 
 
1757
        :param name: The environment variable name.
 
1758
 
 
1759
        :param new: The value to set the variable to. If None, the
 
1760
            variable is deleted from the environment.
 
1761
 
 
1762
        :returns: The actual variable value.
 
1763
        """
 
1764
        value = osutils.set_or_unset_env(name, new)
 
1765
        self.addCleanup(osutils.set_or_unset_env, name, value)
 
1766
        return value
 
1767
 
 
1768
    def recordCalls(self, obj, attr_name):
 
1769
        """Monkeypatch in a wrapper that will record calls.
 
1770
 
 
1771
        The monkeypatch is automatically removed when the test concludes.
 
1772
 
 
1773
        :param obj: The namespace holding the reference to be replaced;
 
1774
            typically a module, class, or object.
 
1775
        :param attr_name: A string for the name of the attribute to patch.
 
1776
        :returns: A list that will be extended with one item every time the
 
1777
            function is called, with a tuple of (args, kwargs).
 
1778
        """
 
1779
        calls = []
 
1780
 
 
1781
        def decorator(*args, **kwargs):
 
1782
            calls.append((args, kwargs))
 
1783
            return orig(*args, **kwargs)
 
1784
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1785
        return calls
 
1786
 
 
1787
    def _cleanEnvironment(self):
 
1788
        for name, value in isolated_environ.items():
 
1789
            self.overrideEnv(name, value)
 
1790
 
 
1791
    def _restoreHooks(self):
 
1792
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1793
            setattr(klass, name, hooks)
 
1794
        self._preserved_hooks.clear()
 
1795
        breezy.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1796
        self._preserved_lazy_hooks.clear()
 
1797
 
 
1798
    def knownFailure(self, reason):
 
1799
        """Declare that this test fails for a known reason
 
1800
 
 
1801
        Tests that are known to fail should generally be using expectedFailure
 
1802
        with an appropriate reverse assertion if a change could cause the test
 
1803
        to start passing. Conversely if the test has no immediate prospect of
 
1804
        succeeding then using skip is more suitable.
 
1805
 
 
1806
        When this method is called while an exception is being handled, that
 
1807
        traceback will be used, otherwise a new exception will be thrown to
 
1808
        provide one but won't be reported.
 
1809
        """
 
1810
        self._add_reason(reason)
 
1811
        try:
 
1812
            exc_info = sys.exc_info()
 
1813
            if exc_info != (None, None, None):
 
1814
                self._report_traceback(exc_info)
 
1815
            else:
 
1816
                try:
 
1817
                    raise self.failureException(reason)
 
1818
                except self.failureException:
 
1819
                    exc_info = sys.exc_info()
 
1820
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1821
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1822
        finally:
 
1823
            del exc_info
 
1824
 
 
1825
    def _suppress_log(self):
 
1826
        """Remove the log info from details."""
 
1827
        self.discardDetail('log')
 
1828
 
 
1829
    def _do_skip(self, result, reason):
 
1830
        self._suppress_log()
 
1831
        addSkip = getattr(result, 'addSkip', None)
 
1832
        if not callable(addSkip):
 
1833
            result.addSuccess(result)
 
1834
        else:
 
1835
            addSkip(self, str(reason))
 
1836
 
 
1837
    @staticmethod
 
1838
    def _do_known_failure(self, result, e):
 
1839
        self._suppress_log()
 
1840
        err = sys.exc_info()
 
1841
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1842
        if addExpectedFailure is not None:
 
1843
            addExpectedFailure(self, err)
 
1844
        else:
 
1845
            result.addSuccess(self)
 
1846
 
 
1847
    @staticmethod
 
1848
    def _do_not_applicable(self, result, e):
 
1849
        if not e.args:
 
1850
            reason = 'No reason given'
 
1851
        else:
 
1852
            reason = e.args[0]
 
1853
        self._suppress_log()
 
1854
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1855
        if addNotApplicable is not None:
 
1856
            result.addNotApplicable(self, reason)
 
1857
        else:
 
1858
            self._do_skip(result, reason)
 
1859
 
 
1860
    @staticmethod
 
1861
    def _report_skip(self, result, err):
 
1862
        """Override the default _report_skip.
 
1863
 
 
1864
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1865
        already been formatted into the 'reason' string, and we can't pull it
 
1866
        out again.
 
1867
        """
 
1868
        self._suppress_log()
 
1869
        super(TestCase, self)._report_skip(self, result, err)
 
1870
 
 
1871
    @staticmethod
 
1872
    def _report_expected_failure(self, result, err):
 
1873
        """Strip the log.
 
1874
 
 
1875
        See _report_skip for motivation.
 
1876
        """
 
1877
        self._suppress_log()
 
1878
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1879
 
 
1880
    @staticmethod
 
1881
    def _do_unsupported_or_skip(self, result, e):
 
1882
        reason = e.args[0]
 
1883
        self._suppress_log()
 
1884
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1885
        if addNotSupported is not None:
 
1886
            result.addNotSupported(self, reason)
 
1887
        else:
 
1888
            self._do_skip(result, reason)
 
1889
 
 
1890
    def time(self, callable, *args, **kwargs):
 
1891
        """Run callable and accrue the time it takes to the benchmark time.
 
1892
 
 
1893
        If lsprofiling is enabled (i.e. by --lsprof-time to brz selftest) then
 
1894
        this will cause lsprofile statistics to be gathered and stored in
 
1895
        self._benchcalls.
 
1896
        """
 
1897
        if self._benchtime is None:
 
1898
            self.addDetail('benchtime', content.Content(
 
1899
                content.UTF8_TEXT,
 
1900
                lambda: [str(self._benchtime).encode('utf-8')]))
 
1901
            self._benchtime = 0
 
1902
        start = time.time()
 
1903
        try:
 
1904
            if not self._gather_lsprof_in_benchmarks:
 
1905
                return callable(*args, **kwargs)
 
1906
            else:
 
1907
                # record this benchmark
 
1908
                ret, stats = breezy.lsprof.profile(callable, *args, **kwargs)
 
1909
                stats.sort()
 
1910
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1911
                return ret
 
1912
        finally:
 
1913
            self._benchtime += time.time() - start
 
1914
 
 
1915
    def log(self, *args):
 
1916
        trace.mutter(*args)
 
1917
 
 
1918
    def get_log(self):
 
1919
        """Get a unicode string containing the log from breezy.trace.
 
1920
 
 
1921
        Undecodable characters are replaced.
 
1922
        """
 
1923
        return u"".join(self.getDetails()['log'].iter_text())
 
1924
 
 
1925
    def requireFeature(self, feature):
 
1926
        """This test requires a specific feature is available.
 
1927
 
 
1928
        :raises UnavailableFeature: When feature is not available.
 
1929
        """
 
1930
        if not feature.available():
 
1931
            raise UnavailableFeature(feature)
 
1932
 
 
1933
    def _run_bzr_core(self, args, encoding, stdin, stdout, stderr,
 
1934
                      working_dir):
 
1935
        # Clear chk_map page cache, because the contents are likely to mask
 
1936
        # locking errors.
 
1937
        chk_map.clear_cache()
 
1938
 
 
1939
        self.log('run brz: %r', args)
 
1940
 
 
1941
        if PY3:
 
1942
            self._last_cmd_stdout = stdout
 
1943
            self._last_cmd_stderr = stderr
 
1944
        else:
 
1945
            self._last_cmd_stdout = codecs.getwriter(encoding)(stdout)
 
1946
            self._last_cmd_stderr = codecs.getwriter(encoding)(stderr)
 
1947
 
 
1948
        old_ui_factory = ui.ui_factory
 
1949
        ui.ui_factory = ui_testing.TestUIFactory(
 
1950
            stdin=stdin,
 
1951
            stdout=self._last_cmd_stdout,
 
1952
            stderr=self._last_cmd_stderr)
 
1953
 
 
1954
        cwd = None
 
1955
        if working_dir is not None:
 
1956
            cwd = osutils.getcwd()
 
1957
            os.chdir(working_dir)
 
1958
 
 
1959
        try:
 
1960
            with ui.ui_factory:
 
1961
                result = self.apply_redirected(
 
1962
                    ui.ui_factory.stdin,
 
1963
                    stdout, stderr,
 
1964
                    _mod_commands.run_bzr_catch_user_errors,
 
1965
                    args)
 
1966
        finally:
 
1967
            ui.ui_factory = old_ui_factory
 
1968
            if cwd is not None:
 
1969
                os.chdir(cwd)
 
1970
 
 
1971
        return result
 
1972
 
 
1973
    def run_bzr_raw(self, args, retcode=0, stdin=None, encoding=None,
 
1974
                    working_dir=None, error_regexes=[]):
 
1975
        """Invoke brz, as if it were run from the command line.
 
1976
 
 
1977
        The argument list should not include the brz program name - the
 
1978
        first argument is normally the brz command.  Arguments may be
 
1979
        passed in three ways:
 
1980
 
 
1981
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1982
        when the command contains whitespace or metacharacters, or
 
1983
        is built up at run time.
 
1984
 
 
1985
        2- A single string, eg "add a".  This is the most convenient
 
1986
        for hardcoded commands.
 
1987
 
 
1988
        This runs brz through the interface that catches and reports
 
1989
        errors, and with logging set to something approximating the
 
1990
        default, so that error reporting can be checked.
 
1991
 
 
1992
        This should be the main method for tests that want to exercise the
 
1993
        overall behavior of the brz application (rather than a unit test
 
1994
        or a functional test of the library.)
 
1995
 
 
1996
        This sends the stdout/stderr results into the test's log,
 
1997
        where it may be useful for debugging.  See also run_captured.
 
1998
 
 
1999
        :keyword stdin: A string to be used as stdin for the command.
 
2000
        :keyword retcode: The status code the command should return;
 
2001
            default 0.
 
2002
        :keyword working_dir: The directory to run the command in
 
2003
        :keyword error_regexes: A list of expected error messages.  If
 
2004
            specified they must be seen in the error output of the command.
 
2005
        """
 
2006
        if isinstance(args, string_types):
 
2007
            args = shlex.split(args)
 
2008
 
 
2009
        if encoding is None:
 
2010
            encoding = osutils.get_user_encoding()
 
2011
 
 
2012
        if sys.version_info[0] == 2:
 
2013
            wrapped_stdout = stdout = ui_testing.BytesIOWithEncoding()
 
2014
            wrapped_stderr = stderr = ui_testing.BytesIOWithEncoding()
 
2015
            stdout.encoding = stderr.encoding = encoding
 
2016
 
 
2017
            # FIXME: don't call into logging here
 
2018
            handler = trace.EncodedStreamHandler(
 
2019
                stderr, errors="replace")
 
2020
        else:
 
2021
            stdout = BytesIO()
 
2022
            stderr = BytesIO()
 
2023
            wrapped_stdout = TextIOWrapper(stdout, encoding)
 
2024
            wrapped_stderr = TextIOWrapper(stderr, encoding)
 
2025
            handler = logging.StreamHandler(wrapped_stderr)
 
2026
        handler.setLevel(logging.INFO)
 
2027
 
 
2028
        logger = logging.getLogger('')
 
2029
        logger.addHandler(handler)
 
2030
        try:
 
2031
            result = self._run_bzr_core(
 
2032
                args, encoding=encoding, stdin=stdin, stdout=wrapped_stdout,
 
2033
                stderr=wrapped_stderr, working_dir=working_dir,
 
2034
                )
 
2035
        finally:
 
2036
            logger.removeHandler(handler)
 
2037
 
 
2038
        if PY3:
 
2039
            wrapped_stdout.flush()
 
2040
            wrapped_stderr.flush()
 
2041
 
 
2042
        out = stdout.getvalue()
 
2043
        err = stderr.getvalue()
 
2044
        if out:
 
2045
            self.log('output:\n%r', out)
 
2046
        if err:
 
2047
            self.log('errors:\n%r', err)
 
2048
        if retcode is not None:
 
2049
            self.assertEqual(retcode, result,
 
2050
                             message='Unexpected return code')
 
2051
        self.assertIsInstance(error_regexes, (list, tuple))
 
2052
        for regex in error_regexes:
 
2053
            self.assertContainsRe(err, regex)
 
2054
        return out, err
 
2055
 
 
2056
    def run_bzr(self, args, retcode=0, stdin=None, encoding=None,
 
2057
                working_dir=None, error_regexes=[]):
 
2058
        """Invoke brz, as if it were run from the command line.
 
2059
 
 
2060
        The argument list should not include the brz program name - the
 
2061
        first argument is normally the brz command.  Arguments may be
 
2062
        passed in three ways:
 
2063
 
 
2064
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
2065
        when the command contains whitespace or metacharacters, or
 
2066
        is built up at run time.
 
2067
 
 
2068
        2- A single string, eg "add a".  This is the most convenient
 
2069
        for hardcoded commands.
 
2070
 
 
2071
        This runs brz through the interface that catches and reports
 
2072
        errors, and with logging set to something approximating the
 
2073
        default, so that error reporting can be checked.
 
2074
 
 
2075
        This should be the main method for tests that want to exercise the
 
2076
        overall behavior of the brz application (rather than a unit test
 
2077
        or a functional test of the library.)
 
2078
 
 
2079
        This sends the stdout/stderr results into the test's log,
 
2080
        where it may be useful for debugging.  See also run_captured.
 
2081
 
 
2082
        :keyword stdin: A string to be used as stdin for the command.
 
2083
        :keyword retcode: The status code the command should return;
 
2084
            default 0.
 
2085
        :keyword working_dir: The directory to run the command in
 
2086
        :keyword error_regexes: A list of expected error messages.  If
 
2087
            specified they must be seen in the error output of the command.
 
2088
        """
 
2089
        if isinstance(args, string_types):
 
2090
            args = shlex.split(args)
 
2091
 
 
2092
        if encoding is None:
 
2093
            encoding = osutils.get_user_encoding()
 
2094
 
 
2095
        if sys.version_info[0] == 2:
 
2096
            stdout = ui_testing.BytesIOWithEncoding()
 
2097
            stderr = ui_testing.BytesIOWithEncoding()
 
2098
            stdout.encoding = stderr.encoding = encoding
 
2099
            # FIXME: don't call into logging here
 
2100
            handler = trace.EncodedStreamHandler(
 
2101
                stderr, errors="replace")
 
2102
        else:
 
2103
            stdout = ui_testing.StringIOWithEncoding()
 
2104
            stderr = ui_testing.StringIOWithEncoding()
 
2105
            stdout.encoding = stderr.encoding = encoding
 
2106
            handler = logging.StreamHandler(stream=stderr)
 
2107
        handler.setLevel(logging.INFO)
 
2108
 
 
2109
        logger = logging.getLogger('')
 
2110
        logger.addHandler(handler)
 
2111
 
 
2112
        try:
 
2113
            result = self._run_bzr_core(
 
2114
                args, encoding=encoding, stdin=stdin, stdout=stdout,
 
2115
                stderr=stderr, working_dir=working_dir)
 
2116
        finally:
 
2117
            logger.removeHandler(handler)
 
2118
 
 
2119
        out = stdout.getvalue()
 
2120
        err = stderr.getvalue()
 
2121
        if out:
 
2122
            self.log('output:\n%r', out)
 
2123
        if err:
 
2124
            self.log('errors:\n%r', err)
 
2125
        if retcode is not None:
 
2126
            self.assertEqual(retcode, result,
 
2127
                             message='Unexpected return code')
 
2128
        self.assertIsInstance(error_regexes, (list, tuple))
 
2129
        for regex in error_regexes:
 
2130
            self.assertContainsRe(err, regex)
 
2131
        return out, err
 
2132
 
 
2133
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
2134
        """Run brz, and check that stderr contains the supplied regexes
 
2135
 
 
2136
        :param error_regexes: Sequence of regular expressions which
 
2137
            must each be found in the error output. The relative ordering
 
2138
            is not enforced.
 
2139
        :param args: command-line arguments for brz
 
2140
        :param kwargs: Keyword arguments which are interpreted by run_brz
 
2141
            This function changes the default value of retcode to be 3,
 
2142
            since in most cases this is run when you expect brz to fail.
 
2143
 
 
2144
        :return: (out, err) The actual output of running the command (in case
 
2145
            you want to do more inspection)
 
2146
 
 
2147
        Examples of use::
 
2148
 
 
2149
            # Make sure that commit is failing because there is nothing to do
 
2150
            self.run_bzr_error(['no changes to commit'],
 
2151
                               ['commit', '-m', 'my commit comment'])
 
2152
            # Make sure --strict is handling an unknown file, rather than
 
2153
            # giving us the 'nothing to do' error
 
2154
            self.build_tree(['unknown'])
 
2155
            self.run_bzr_error(
 
2156
                ['Commit refused because there are unknown files'],
 
2157
                ['commit', --strict', '-m', 'my commit comment'])
 
2158
        """
 
2159
        kwargs.setdefault('retcode', 3)
 
2160
        kwargs['error_regexes'] = error_regexes
 
2161
        out, err = self.run_bzr(*args, **kwargs)
 
2162
        return out, err
 
2163
 
 
2164
    def run_bzr_subprocess(self, *args, **kwargs):
 
2165
        """Run brz in a subprocess for testing.
 
2166
 
 
2167
        This starts a new Python interpreter and runs brz in there.
 
2168
        This should only be used for tests that have a justifiable need for
 
2169
        this isolation: e.g. they are testing startup time, or signal
 
2170
        handling, or early startup code, etc.  Subprocess code can't be
 
2171
        profiled or debugged so easily.
 
2172
 
 
2173
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
2174
            None is supplied, the status code is not checked.
 
2175
        :keyword env_changes: A dictionary which lists changes to environment
 
2176
            variables. A value of None will unset the env variable.
 
2177
            The values must be strings. The change will only occur in the
 
2178
            child, so you don't need to fix the environment after running.
 
2179
        :keyword universal_newlines: Convert CRLF => LF
 
2180
        :keyword allow_plugins: By default the subprocess is run with
 
2181
            --no-plugins to ensure test reproducibility. Also, it is possible
 
2182
            for system-wide plugins to create unexpected output on stderr,
 
2183
            which can cause unnecessary test failures.
 
2184
        """
 
2185
        env_changes = kwargs.get('env_changes', {})
 
2186
        working_dir = kwargs.get('working_dir', None)
 
2187
        allow_plugins = kwargs.get('allow_plugins', False)
 
2188
        if len(args) == 1:
 
2189
            if isinstance(args[0], list):
 
2190
                args = args[0]
 
2191
            elif isinstance(args[0], (str, text_type)):
 
2192
                args = list(shlex.split(args[0]))
 
2193
        else:
 
2194
            raise ValueError("passing varargs to run_bzr_subprocess")
 
2195
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
2196
                                            working_dir=working_dir,
 
2197
                                            allow_plugins=allow_plugins)
 
2198
        # We distinguish between retcode=None and retcode not passed.
 
2199
        supplied_retcode = kwargs.get('retcode', 0)
 
2200
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
2201
                                          universal_newlines=kwargs.get(
 
2202
                                              'universal_newlines', False),
 
2203
                                          process_args=args)
 
2204
 
 
2205
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2206
                             skip_if_plan_to_signal=False,
 
2207
                             working_dir=None,
 
2208
                             allow_plugins=False, stderr=subprocess.PIPE):
 
2209
        """Start brz in a subprocess for testing.
 
2210
 
 
2211
        This starts a new Python interpreter and runs brz in there.
 
2212
        This should only be used for tests that have a justifiable need for
 
2213
        this isolation: e.g. they are testing startup time, or signal
 
2214
        handling, or early startup code, etc.  Subprocess code can't be
 
2215
        profiled or debugged so easily.
 
2216
 
 
2217
        :param process_args: a list of arguments to pass to the brz executable,
 
2218
            for example ``['--version']``.
 
2219
        :param env_changes: A dictionary which lists changes to environment
 
2220
            variables. A value of None will unset the env variable.
 
2221
            The values must be strings. The change will only occur in the
 
2222
            child, so you don't need to fix the environment after running.
 
2223
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
2224
            doesn't support signalling subprocesses.
 
2225
        :param allow_plugins: If False (default) pass --no-plugins to brz.
 
2226
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2227
            are those valid for the stderr argument of `subprocess.Popen`.
 
2228
            Default value is ``subprocess.PIPE``.
 
2229
 
 
2230
        :returns: Popen object for the started process.
 
2231
        """
 
2232
        if skip_if_plan_to_signal:
 
2233
            if os.name != "posix":
 
2234
                raise TestSkipped("Sending signals not supported")
 
2235
 
 
2236
        if env_changes is None:
 
2237
            env_changes = {}
 
2238
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2239
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2240
        # gets set to the computed directory of this parent process.
 
2241
        if site.USER_BASE is not None:
 
2242
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
 
2243
        old_env = {}
 
2244
 
 
2245
        def cleanup_environment():
 
2246
            for env_var, value in env_changes.items():
 
2247
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
2248
 
 
2249
        def restore_environment():
 
2250
            for env_var, value in old_env.items():
 
2251
                osutils.set_or_unset_env(env_var, value)
 
2252
 
 
2253
        bzr_path = self.get_brz_path()
 
2254
 
 
2255
        cwd = None
 
2256
        if working_dir is not None:
 
2257
            cwd = osutils.getcwd()
 
2258
            os.chdir(working_dir)
 
2259
 
 
2260
        try:
 
2261
            # win32 subprocess doesn't support preexec_fn
 
2262
            # so we will avoid using it on all platforms, just to
 
2263
            # make sure the code path is used, and we don't break on win32
 
2264
            cleanup_environment()
 
2265
            # Include the subprocess's log file in the test details, in case
 
2266
            # the test fails due to an error in the subprocess.
 
2267
            self._add_subprocess_log(trace._get_brz_log_filename())
 
2268
            command = [sys.executable]
 
2269
            # frozen executables don't need the path to bzr
 
2270
            if getattr(sys, "frozen", None) is None:
 
2271
                command.append(bzr_path)
 
2272
            if not allow_plugins:
 
2273
                command.append('--no-plugins')
 
2274
            command.extend(process_args)
 
2275
            process = self._popen(command, stdin=subprocess.PIPE,
 
2276
                                  stdout=subprocess.PIPE,
 
2277
                                  stderr=stderr, bufsize=0)
 
2278
        finally:
 
2279
            restore_environment()
 
2280
            if cwd is not None:
 
2281
                os.chdir(cwd)
 
2282
 
 
2283
        return process
 
2284
 
 
2285
    def _add_subprocess_log(self, log_file_path):
 
2286
        if len(self._log_files) == 0:
 
2287
            # Register an addCleanup func.  We do this on the first call to
 
2288
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2289
            # addCleanup is registered after any cleanups for tempdirs that
 
2290
            # subclasses might create, which will probably remove the log file
 
2291
            # we want to read.
 
2292
            self.addCleanup(self._subprocess_log_cleanup)
 
2293
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2294
        # twice.
 
2295
        self._log_files.add(log_file_path)
 
2296
 
 
2297
    def _subprocess_log_cleanup(self):
 
2298
        for count, log_file_path in enumerate(self._log_files):
 
2299
            # We use buffer_now=True to avoid holding the file open beyond
 
2300
            # the life of this function, which might interfere with e.g.
 
2301
            # cleaning tempdirs on Windows.
 
2302
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2303
            # detail_content = content.content_from_file(
 
2304
            #    log_file_path, buffer_now=True)
 
2305
            with open(log_file_path, 'rb') as log_file:
 
2306
                log_file_bytes = log_file.read()
 
2307
            detail_content = content.Content(
 
2308
                content.ContentType("text", "plain", {"charset": "utf8"}),
 
2309
                lambda: [log_file_bytes])
 
2310
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2311
                           detail_content)
 
2312
 
 
2313
    def _popen(self, *args, **kwargs):
 
2314
        """Place a call to Popen.
 
2315
 
 
2316
        Allows tests to override this method to intercept the calls made to
 
2317
        Popen for introspection.
 
2318
        """
 
2319
        return subprocess.Popen(*args, **kwargs)
 
2320
 
 
2321
    def get_source_path(self):
 
2322
        """Return the path of the directory containing breezy."""
 
2323
        return os.path.dirname(os.path.dirname(breezy.__file__))
 
2324
 
 
2325
    def get_brz_path(self):
 
2326
        """Return the path of the 'brz' executable for this test suite."""
 
2327
        brz_path = os.path.join(self.get_source_path(), "brz")
 
2328
        if not os.path.isfile(brz_path):
 
2329
            # We are probably installed. Assume sys.argv is the right file
 
2330
            brz_path = sys.argv[0]
 
2331
        return brz_path
 
2332
 
 
2333
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2334
                              universal_newlines=False, process_args=None):
 
2335
        """Finish the execution of process.
 
2336
 
 
2337
        :param process: the Popen object returned from start_bzr_subprocess.
 
2338
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2339
            None is supplied, the status code is not checked.
 
2340
        :param send_signal: an optional signal to send to the process.
 
2341
        :param universal_newlines: Convert CRLF => LF
 
2342
        :returns: (stdout, stderr)
 
2343
        """
 
2344
        if send_signal is not None:
 
2345
            os.kill(process.pid, send_signal)
 
2346
        out, err = process.communicate()
 
2347
 
 
2348
        if universal_newlines:
 
2349
            out = out.replace(b'\r\n', b'\n')
 
2350
            err = err.replace(b'\r\n', b'\n')
 
2351
 
 
2352
        if retcode is not None and retcode != process.returncode:
 
2353
            if process_args is None:
 
2354
                process_args = "(unknown args)"
 
2355
            trace.mutter('Output of brz %r:\n%s', process_args, out)
 
2356
            trace.mutter('Error for brz %r:\n%s', process_args, err)
 
2357
            self.fail('Command brz %r failed with retcode %d != %d'
 
2358
                      % (process_args, retcode, process.returncode))
 
2359
        return [out, err]
 
2360
 
 
2361
    def check_tree_shape(self, tree, shape):
 
2362
        """Compare a tree to a list of expected names.
 
2363
 
 
2364
        Fail if they are not precisely equal.
 
2365
        """
 
2366
        extras = []
 
2367
        shape = list(shape)             # copy
 
2368
        for path, ie in tree.iter_entries_by_dir():
 
2369
            name = path.replace('\\', '/')
 
2370
            if ie.kind == 'directory':
 
2371
                name = name + '/'
 
2372
            if name == "/":
 
2373
                pass  # ignore root entry
 
2374
            elif name in shape:
 
2375
                shape.remove(name)
 
2376
            else:
 
2377
                extras.append(name)
 
2378
        if shape:
 
2379
            self.fail("expected paths not found in inventory: %r" % shape)
 
2380
        if extras:
 
2381
            self.fail("unexpected paths found in inventory: %r" % extras)
 
2382
 
 
2383
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2384
                         a_callable=None, *args, **kwargs):
 
2385
        """Call callable with redirected std io pipes.
 
2386
 
 
2387
        Returns the return code."""
 
2388
        if not callable(a_callable):
 
2389
            raise ValueError("a_callable must be callable.")
 
2390
        if stdin is None:
 
2391
            stdin = BytesIO(b"")
 
2392
        if stdout is None:
 
2393
            if getattr(self, "_log_file", None) is not None:
 
2394
                stdout = self._log_file
 
2395
            else:
 
2396
                if sys.version_info[0] == 2:
 
2397
                    stdout = BytesIO()
 
2398
                else:
 
2399
                    stdout = StringIO()
 
2400
        if stderr is None:
 
2401
            if getattr(self, "_log_file", None is not None):
 
2402
                stderr = self._log_file
 
2403
            else:
 
2404
                if sys.version_info[0] == 2:
 
2405
                    stderr = BytesIO()
 
2406
                else:
 
2407
                    stderr = StringIO()
 
2408
        real_stdin = sys.stdin
 
2409
        real_stdout = sys.stdout
 
2410
        real_stderr = sys.stderr
 
2411
        try:
 
2412
            sys.stdout = stdout
 
2413
            sys.stderr = stderr
 
2414
            sys.stdin = stdin
 
2415
            return a_callable(*args, **kwargs)
 
2416
        finally:
 
2417
            sys.stdout = real_stdout
 
2418
            sys.stderr = real_stderr
 
2419
            sys.stdin = real_stdin
 
2420
 
 
2421
    def reduceLockdirTimeout(self):
 
2422
        """Reduce the default lock timeout for the duration of the test, so that
 
2423
        if LockContention occurs during a test, it does so quickly.
 
2424
 
 
2425
        Tests that expect to provoke LockContention errors should call this.
 
2426
        """
 
2427
        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2428
 
 
2429
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2430
        """Return a wrapped BytesIO, that will encode text input to UTF-8."""
 
2431
        if encoding_type is None:
 
2432
            encoding_type = 'strict'
 
2433
        bio = BytesIO()
 
2434
        output_encoding = 'utf-8'
 
2435
        sio = codecs.getwriter(output_encoding)(bio, errors=encoding_type)
 
2436
        sio.encoding = output_encoding
 
2437
        return sio
 
2438
 
 
2439
    def disable_verb(self, verb):
 
2440
        """Disable a smart server verb for one test."""
 
2441
        from breezy.bzr.smart import request
 
2442
        request_handlers = request.request_handlers
 
2443
        orig_method = request_handlers.get(verb)
 
2444
        orig_info = request_handlers.get_info(verb)
 
2445
        request_handlers.remove(verb)
 
2446
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2447
                        info=orig_info)
 
2448
 
 
2449
    def __hash__(self):
 
2450
        return id(self)
 
2451
 
 
2452
 
 
2453
class CapturedCall(object):
 
2454
    """A helper for capturing smart server calls for easy debug analysis."""
 
2455
 
 
2456
    def __init__(self, params, prefix_length):
 
2457
        """Capture the call with params and skip prefix_length stack frames."""
 
2458
        self.call = params
 
2459
        import traceback
 
2460
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2461
        # client frames. Beyond this we could get more clever, but this is good
 
2462
        # enough for now.
 
2463
        stack = traceback.extract_stack()[prefix_length:-5]
 
2464
        self.stack = ''.join(traceback.format_list(stack))
 
2465
 
 
2466
    def __str__(self):
 
2467
        return self.call.method
 
2468
 
 
2469
    def __repr__(self):
 
2470
        return self.call.method
 
2471
 
 
2472
    def stack(self):
 
2473
        return self.stack
 
2474
 
 
2475
 
 
2476
class TestCaseWithMemoryTransport(TestCase):
 
2477
    """Common test class for tests that do not need disk resources.
 
2478
 
 
2479
    Tests that need disk resources should derive from TestCaseInTempDir
 
2480
    orTestCaseWithTransport.
 
2481
 
 
2482
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all brz tests.
 
2483
 
 
2484
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2485
    a directory which does not exist. This serves to help ensure test isolation
 
2486
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2487
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2488
    defaults for the transport in tests, nor does it obey the command line
 
2489
    override, so tests that accidentally write to the common directory should
 
2490
    be rare.
 
2491
 
 
2492
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2493
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2494
    """
 
2495
 
 
2496
    TEST_ROOT = None
 
2497
    _TEST_NAME = 'test'
 
2498
 
 
2499
    def __init__(self, methodName='runTest'):
 
2500
        # allow test parameterization after test construction and before test
 
2501
        # execution. Variables that the parameterizer sets need to be
 
2502
        # ones that are not set by setUp, or setUp will trash them.
 
2503
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2504
        self.vfs_transport_factory = default_transport
 
2505
        self.transport_server = None
 
2506
        self.transport_readonly_server = None
 
2507
        self.__vfs_server = None
 
2508
 
 
2509
    def setUp(self):
 
2510
        super(TestCaseWithMemoryTransport, self).setUp()
 
2511
 
 
2512
        def _add_disconnect_cleanup(transport):
 
2513
            """Schedule disconnection of given transport at test cleanup
 
2514
 
 
2515
            This needs to happen for all connected transports or leaks occur.
 
2516
 
 
2517
            Note reconnections may mean we call disconnect multiple times per
 
2518
            transport which is suboptimal but seems harmless.
 
2519
            """
 
2520
            self.addCleanup(transport.disconnect)
 
2521
 
 
2522
        _mod_transport.Transport.hooks.install_named_hook(
 
2523
            'post_connect', _add_disconnect_cleanup, None)
 
2524
 
 
2525
        self._make_test_root()
 
2526
        self.addCleanup(os.chdir, osutils.getcwd())
 
2527
        self.makeAndChdirToTestDir()
 
2528
        self.overrideEnvironmentForTesting()
 
2529
        self.__readonly_server = None
 
2530
        self.__server = None
 
2531
        self.reduceLockdirTimeout()
 
2532
        # Each test may use its own config files even if the local config files
 
2533
        # don't actually exist. They'll rightly fail if they try to create them
 
2534
        # though.
 
2535
        self.overrideAttr(config, '_shared_stores', {})
 
2536
 
 
2537
    def get_transport(self, relpath=None):
 
2538
        """Return a writeable transport.
 
2539
 
 
2540
        This transport is for the test scratch space relative to
 
2541
        "self._test_root"
 
2542
 
 
2543
        :param relpath: a path relative to the base url.
 
2544
        """
 
2545
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2546
        self.assertFalse(t.is_readonly())
 
2547
        return t
 
2548
 
 
2549
    def get_readonly_transport(self, relpath=None):
 
2550
        """Return a readonly transport for the test scratch space
 
2551
 
 
2552
        This can be used to test that operations which should only need
 
2553
        readonly access in fact do not try to write.
 
2554
 
 
2555
        :param relpath: a path relative to the base url.
 
2556
        """
 
2557
        t = _mod_transport.get_transport_from_url(
 
2558
            self.get_readonly_url(relpath))
 
2559
        self.assertTrue(t.is_readonly())
 
2560
        return t
 
2561
 
 
2562
    def create_transport_readonly_server(self):
 
2563
        """Create a transport server from class defined at init.
 
2564
 
 
2565
        This is mostly a hook for daughter classes.
 
2566
        """
 
2567
        return self.transport_readonly_server()
 
2568
 
 
2569
    def get_readonly_server(self):
 
2570
        """Get the server instance for the readonly transport
 
2571
 
 
2572
        This is useful for some tests with specific servers to do diagnostics.
 
2573
        """
 
2574
        if self.__readonly_server is None:
 
2575
            if self.transport_readonly_server is None:
 
2576
                # readonly decorator requested
 
2577
                self.__readonly_server = test_server.ReadonlyServer()
 
2578
            else:
 
2579
                # explicit readonly transport.
 
2580
                self.__readonly_server = (
 
2581
                    self.create_transport_readonly_server())
 
2582
            self.start_server(self.__readonly_server,
 
2583
                              self.get_vfs_only_server())
 
2584
        return self.__readonly_server
 
2585
 
 
2586
    def get_readonly_url(self, relpath=None):
 
2587
        """Get a URL for the readonly transport.
 
2588
 
 
2589
        This will either be backed by '.' or a decorator to the transport
 
2590
        used by self.get_url()
 
2591
        relpath provides for clients to get a path relative to the base url.
 
2592
        These should only be downwards relative, not upwards.
 
2593
        """
 
2594
        base = self.get_readonly_server().get_url()
 
2595
        return self._adjust_url(base, relpath)
 
2596
 
 
2597
    def get_vfs_only_server(self):
 
2598
        """Get the vfs only read/write server instance.
 
2599
 
 
2600
        This is useful for some tests with specific servers that need
 
2601
        diagnostics.
 
2602
 
 
2603
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2604
        is no means to override it.
 
2605
        """
 
2606
        if self.__vfs_server is None:
 
2607
            self.__vfs_server = memory.MemoryServer()
 
2608
            self.start_server(self.__vfs_server)
 
2609
        return self.__vfs_server
 
2610
 
 
2611
    def get_server(self):
 
2612
        """Get the read/write server instance.
 
2613
 
 
2614
        This is useful for some tests with specific servers that need
 
2615
        diagnostics.
 
2616
 
 
2617
        This is built from the self.transport_server factory. If that is None,
 
2618
        then the self.get_vfs_server is returned.
 
2619
        """
 
2620
        if self.__server is None:
 
2621
            if (self.transport_server is None or self.transport_server is
 
2622
                    self.vfs_transport_factory):
 
2623
                self.__server = self.get_vfs_only_server()
 
2624
            else:
 
2625
                # bring up a decorated means of access to the vfs only server.
 
2626
                self.__server = self.transport_server()
 
2627
                self.start_server(self.__server, self.get_vfs_only_server())
 
2628
        return self.__server
 
2629
 
 
2630
    def _adjust_url(self, base, relpath):
 
2631
        """Get a URL (or maybe a path) for the readwrite transport.
 
2632
 
 
2633
        This will either be backed by '.' or to an equivalent non-file based
 
2634
        facility.
 
2635
        relpath provides for clients to get a path relative to the base url.
 
2636
        These should only be downwards relative, not upwards.
 
2637
        """
 
2638
        if relpath is not None and relpath != '.':
 
2639
            if not base.endswith('/'):
 
2640
                base = base + '/'
 
2641
            # XXX: Really base should be a url; we did after all call
 
2642
            # get_url()!  But sometimes it's just a path (from
 
2643
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2644
            # to a non-escaped local path.
 
2645
            if base.startswith('./') or base.startswith('/'):
 
2646
                base += relpath
 
2647
            else:
 
2648
                base += urlutils.escape(relpath)
 
2649
        return base
 
2650
 
 
2651
    def get_url(self, relpath=None):
 
2652
        """Get a URL (or maybe a path) for the readwrite transport.
 
2653
 
 
2654
        This will either be backed by '.' or to an equivalent non-file based
 
2655
        facility.
 
2656
        relpath provides for clients to get a path relative to the base url.
 
2657
        These should only be downwards relative, not upwards.
 
2658
        """
 
2659
        base = self.get_server().get_url()
 
2660
        return self._adjust_url(base, relpath)
 
2661
 
 
2662
    def get_vfs_only_url(self, relpath=None):
 
2663
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2664
 
 
2665
        This will never be a smart protocol.  It always has all the
 
2666
        capabilities of the local filesystem, but it might actually be a
 
2667
        MemoryTransport or some other similar virtual filesystem.
 
2668
 
 
2669
        This is the backing transport (if any) of the server returned by
 
2670
        get_url and get_readonly_url.
 
2671
 
 
2672
        :param relpath: provides for clients to get a path relative to the base
 
2673
            url.  These should only be downwards relative, not upwards.
 
2674
        :return: A URL
 
2675
        """
 
2676
        base = self.get_vfs_only_server().get_url()
 
2677
        return self._adjust_url(base, relpath)
 
2678
 
 
2679
    def _create_safety_net(self):
 
2680
        """Make a fake bzr directory.
 
2681
 
 
2682
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2683
        real branch.
 
2684
        """
 
2685
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2686
        try:
 
2687
            # Make sure we get a readable and accessible home for .brz.log
 
2688
            # and/or config files, and not fallback to weird defaults (see
 
2689
            # http://pad.lv/825027).
 
2690
            self.assertIs(None, os.environ.get('BRZ_HOME', None))
 
2691
            os.environ['BRZ_HOME'] = root
 
2692
            wt = controldir.ControlDir.create_standalone_workingtree(root)
 
2693
            del os.environ['BRZ_HOME']
 
2694
        except Exception as e:
 
2695
            self.fail("Fail to initialize the safety net: %r\n" % (e,))
 
2696
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2697
        # we don't need to re-open the wt to check it hasn't changed.
 
2698
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2699
            wt.control_transport.get_bytes('dirstate'))
 
2700
 
 
2701
    def _check_safety_net(self):
 
2702
        """Check that the safety .bzr directory have not been touched.
 
2703
 
 
2704
        _make_test_root have created a .bzr directory to prevent tests from
 
2705
        propagating. This method ensures than a test did not leaked.
 
2706
        """
 
2707
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2708
        t = _mod_transport.get_transport_from_path(root)
 
2709
        self.permit_url(t.base)
 
2710
        if (t.get_bytes('.bzr/checkout/dirstate') !=
 
2711
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2712
            # The current test have modified the /bzr directory, we need to
 
2713
            # recreate a new one or all the followng tests will fail.
 
2714
            # If you need to inspect its content uncomment the following line
 
2715
            # import pdb; pdb.set_trace()
 
2716
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2717
            self._create_safety_net()
 
2718
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2719
 
 
2720
    def _make_test_root(self):
 
2721
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2722
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2723
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2724
                                                    suffix='.tmp'))
 
2725
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2726
 
 
2727
            self._create_safety_net()
 
2728
 
 
2729
            # The same directory is used by all tests, and we're not
 
2730
            # specifically told when all tests are finished.  This will do.
 
2731
            atexit.register(_rmtree_temp_dir, root)
 
2732
 
 
2733
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2734
        self.addCleanup(self._check_safety_net)
 
2735
 
 
2736
    def makeAndChdirToTestDir(self):
 
2737
        """Create a temporary directories for this one test.
 
2738
 
 
2739
        This must set self.test_home_dir and self.test_dir and chdir to
 
2740
        self.test_dir.
 
2741
 
 
2742
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this
 
2743
        test.
 
2744
        """
 
2745
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2746
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2747
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2748
        self.permit_dir(self.test_dir)
 
2749
 
 
2750
    def make_branch(self, relpath, format=None, name=None):
 
2751
        """Create a branch on the transport at relpath."""
 
2752
        repo = self.make_repository(relpath, format=format)
 
2753
        return repo.controldir.create_branch(append_revisions_only=False, name=name)
 
2754
 
 
2755
    def get_default_format(self):
 
2756
        return 'default'
 
2757
 
 
2758
    def resolve_format(self, format):
 
2759
        """Resolve an object to a ControlDir format object.
 
2760
 
 
2761
        The initial format object can either already be
 
2762
        a ControlDirFormat, None (for the default format),
 
2763
        or a string with the name of the control dir format.
 
2764
 
 
2765
        :param format: Object to resolve
 
2766
        :return A ControlDirFormat instance
 
2767
        """
 
2768
        if format is None:
 
2769
            format = self.get_default_format()
 
2770
        if isinstance(format, str):
 
2771
            format = controldir.format_registry.make_controldir(format)
 
2772
        return format
 
2773
 
 
2774
    def make_controldir(self, relpath, format=None):
 
2775
        try:
 
2776
            # might be a relative or absolute path
 
2777
            maybe_a_url = self.get_url(relpath)
 
2778
            segments = maybe_a_url.rsplit('/', 1)
 
2779
            t = _mod_transport.get_transport(maybe_a_url)
 
2780
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2781
                t.ensure_base()
 
2782
            format = self.resolve_format(format)
 
2783
            return format.initialize_on_transport(t)
 
2784
        except errors.UninitializableFormat:
 
2785
            raise TestSkipped("Format %s is not initializable." % format)
 
2786
 
 
2787
    def make_repository(self, relpath, shared=None, format=None):
 
2788
        """Create a repository on our default transport at relpath.
 
2789
 
 
2790
        Note that relpath must be a relative path, not a full url.
 
2791
        """
 
2792
        # FIXME: If you create a remoterepository this returns the underlying
 
2793
        # real format, which is incorrect.  Actually we should make sure that
 
2794
        # RemoteBzrDir returns a RemoteRepository.
 
2795
        # maybe  mbp 20070410
 
2796
        made_control = self.make_controldir(relpath, format=format)
 
2797
        return made_control.create_repository(shared=shared)
 
2798
 
 
2799
    def make_smart_server(self, path, backing_server=None):
 
2800
        if backing_server is None:
 
2801
            backing_server = self.get_server()
 
2802
        smart_server = test_server.SmartTCPServer_for_testing()
 
2803
        self.start_server(smart_server, backing_server)
 
2804
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2805
                                                                 ).clone(path)
 
2806
        return remote_transport
 
2807
 
 
2808
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2809
        """Create a branch on the default transport and a MemoryTree for it."""
 
2810
        b = self.make_branch(relpath, format=format)
 
2811
        return b.create_memorytree()
 
2812
 
 
2813
    def make_branch_builder(self, relpath, format=None):
 
2814
        branch = self.make_branch(relpath, format=format)
 
2815
        return branchbuilder.BranchBuilder(branch=branch)
 
2816
 
 
2817
    def overrideEnvironmentForTesting(self):
 
2818
        test_home_dir = self.test_home_dir
 
2819
        if not PY3 and isinstance(test_home_dir, text_type):
 
2820
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2821
        self.overrideEnv('HOME', test_home_dir)
 
2822
        self.overrideEnv('BRZ_HOME', test_home_dir)
 
2823
        self.overrideEnv('GNUPGHOME', os.path.join(test_home_dir, '.gnupg'))
 
2824
 
 
2825
    def setup_smart_server_with_call_log(self):
 
2826
        """Sets up a smart server as the transport server with a call log."""
 
2827
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2828
        self.hpss_connections = []
 
2829
        self.hpss_calls = []
 
2830
        import traceback
 
2831
        # Skip the current stack down to the caller of
 
2832
        # setup_smart_server_with_call_log
 
2833
        prefix_length = len(traceback.extract_stack()) - 2
 
2834
 
 
2835
        def capture_hpss_call(params):
 
2836
            self.hpss_calls.append(
 
2837
                CapturedCall(params, prefix_length))
 
2838
 
 
2839
        def capture_connect(transport):
 
2840
            self.hpss_connections.append(transport)
 
2841
        client._SmartClient.hooks.install_named_hook(
 
2842
            'call', capture_hpss_call, None)
 
2843
        _mod_transport.Transport.hooks.install_named_hook(
 
2844
            'post_connect', capture_connect, None)
 
2845
 
 
2846
    def reset_smart_call_log(self):
 
2847
        self.hpss_calls = []
 
2848
        self.hpss_connections = []
 
2849
 
 
2850
 
 
2851
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2852
    """Derived class that runs a test within a temporary directory.
 
2853
 
 
2854
    This is useful for tests that need to create a branch, etc.
 
2855
 
 
2856
    The directory is created in a slightly complex way: for each
 
2857
    Python invocation, a new temporary top-level directory is created.
 
2858
    All test cases create their own directory within that.  If the
 
2859
    tests complete successfully, the directory is removed.
 
2860
 
 
2861
    :ivar test_base_dir: The path of the top-level directory for this
 
2862
    test, which contains a home directory and a work directory.
 
2863
 
 
2864
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2865
    which is used as $HOME for this test.
 
2866
 
 
2867
    :ivar test_dir: A directory under test_base_dir used as the current
 
2868
    directory when the test proper is run.
 
2869
    """
 
2870
 
 
2871
    OVERRIDE_PYTHON = 'python'
 
2872
 
 
2873
    def setUp(self):
 
2874
        super(TestCaseInTempDir, self).setUp()
 
2875
        # Remove the protection set in isolated_environ, we have a proper
 
2876
        # access to disk resources now.
 
2877
        self.overrideEnv('BRZ_LOG', None)
 
2878
 
 
2879
    def check_file_contents(self, filename, expect):
 
2880
        self.log("check contents of file %s" % filename)
 
2881
        with open(filename, 'rb') as f:
 
2882
            contents = f.read()
 
2883
        if contents != expect:
 
2884
            self.log("expected: %r" % expect)
 
2885
            self.log("actually: %r" % contents)
 
2886
            self.fail("contents of %s not as expected" % filename)
 
2887
 
 
2888
    def _getTestDirPrefix(self):
 
2889
        # create a directory within the top level test directory
 
2890
        if sys.platform in ('win32', 'cygwin'):
 
2891
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2892
            # windows is likely to have path-length limits so use a short name
 
2893
            name_prefix = name_prefix[-30:]
 
2894
        else:
 
2895
            name_prefix = re.sub('[/]', '_', self.id())
 
2896
        return name_prefix
 
2897
 
 
2898
    def makeAndChdirToTestDir(self):
 
2899
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2900
 
 
2901
        For TestCaseInTempDir we create a temporary directory based on the test
 
2902
        name and then create two subdirs - test and home under it.
 
2903
        """
 
2904
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2905
                                       self._getTestDirPrefix())
 
2906
        name = name_prefix
 
2907
        for i in range(100):
 
2908
            if os.path.exists(name):
 
2909
                name = name_prefix + '_' + str(i)
 
2910
            else:
 
2911
                # now create test and home directories within this dir
 
2912
                self.test_base_dir = name
 
2913
                self.addCleanup(self.deleteTestDir)
 
2914
                os.mkdir(self.test_base_dir)
 
2915
                break
 
2916
        self.permit_dir(self.test_base_dir)
 
2917
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2918
        # stacking policy to honour; create a bzr dir with an unshared
 
2919
        # repository (but not a branch - our code would be trying to escape
 
2920
        # then!) to stop them, and permit it to be read.
 
2921
        # control = controldir.ControlDir.create(self.test_base_dir)
 
2922
        # control.create_repository()
 
2923
        self.test_home_dir = self.test_base_dir + '/home'
 
2924
        os.mkdir(self.test_home_dir)
 
2925
        self.test_dir = self.test_base_dir + '/work'
 
2926
        os.mkdir(self.test_dir)
 
2927
        os.chdir(self.test_dir)
 
2928
        # put name of test inside
 
2929
        with open(self.test_base_dir + '/name', 'w') as f:
 
2930
            f.write(self.id())
 
2931
 
 
2932
    def deleteTestDir(self):
 
2933
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2934
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2935
 
 
2936
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2937
        """Build a test tree according to a pattern.
 
2938
 
 
2939
        shape is a sequence of file specifications.  If the final
 
2940
        character is '/', a directory is created.
 
2941
 
 
2942
        This assumes that all the elements in the tree being built are new.
 
2943
 
 
2944
        This doesn't add anything to a branch.
 
2945
 
 
2946
        :type shape:    list or tuple.
 
2947
        :param line_endings: Either 'binary' or 'native'
 
2948
            in binary mode, exact contents are written in native mode, the
 
2949
            line endings match the default platform endings.
 
2950
        :param transport: A transport to write to, for building trees on VFS's.
 
2951
            If the transport is readonly or None, "." is opened automatically.
 
2952
        :return: None
 
2953
        """
 
2954
        if type(shape) not in (list, tuple):
 
2955
            raise AssertionError("Parameter 'shape' should be "
 
2956
                                 "a list or a tuple. Got %r instead" % (shape,))
 
2957
        # It's OK to just create them using forward slashes on windows.
 
2958
        if transport is None or transport.is_readonly():
 
2959
            transport = _mod_transport.get_transport_from_path(".")
 
2960
        for name in shape:
 
2961
            self.assertIsInstance(name, (str, text_type))
 
2962
            if name[-1] == '/':
 
2963
                transport.mkdir(urlutils.escape(name[:-1]))
 
2964
            else:
 
2965
                if line_endings == 'binary':
 
2966
                    end = b'\n'
 
2967
                elif line_endings == 'native':
 
2968
                    end = os.linesep.encode('ascii')
 
2969
                else:
 
2970
                    raise errors.BzrError(
 
2971
                        'Invalid line ending request %r' % line_endings)
 
2972
                content = b"contents of %s%s" % (name.encode('utf-8'), end)
 
2973
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2974
 
 
2975
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2976
 
 
2977
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2978
        """Assert whether path or paths are in the WorkingTree"""
 
2979
        if tree is None:
 
2980
            tree = workingtree.WorkingTree.open(root_path)
 
2981
        if not isinstance(path, (str, text_type)):
 
2982
            for p in path:
 
2983
                self.assertInWorkingTree(p, tree=tree)
 
2984
        else:
 
2985
            self.assertTrue(tree.is_versioned(path),
 
2986
                            path + ' not in working tree.')
 
2987
 
 
2988
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2989
        """Assert whether path or paths are not in the WorkingTree"""
 
2990
        if tree is None:
 
2991
            tree = workingtree.WorkingTree.open(root_path)
 
2992
        if not isinstance(path, (str, text_type)):
 
2993
            for p in path:
 
2994
                self.assertNotInWorkingTree(p, tree=tree)
 
2995
        else:
 
2996
            self.assertFalse(tree.is_versioned(
 
2997
                path), path + ' in working tree.')
 
2998
 
 
2999
 
 
3000
class TestCaseWithTransport(TestCaseInTempDir):
 
3001
    """A test case that provides get_url and get_readonly_url facilities.
 
3002
 
 
3003
    These back onto two transport servers, one for readonly access and one for
 
3004
    read write access.
 
3005
 
 
3006
    If no explicit class is provided for readonly access, a
 
3007
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
3008
    based read write transports.
 
3009
 
 
3010
    If an explicit class is provided for readonly access, that server and the
 
3011
    readwrite one must both define get_url() as resolving to os.getcwd().
 
3012
    """
 
3013
 
 
3014
    def setUp(self):
 
3015
        super(TestCaseWithTransport, self).setUp()
 
3016
        self.__vfs_server = None
 
3017
 
 
3018
    def get_vfs_only_server(self):
 
3019
        """See TestCaseWithMemoryTransport.
 
3020
 
 
3021
        This is useful for some tests with specific servers that need
 
3022
        diagnostics.
 
3023
        """
 
3024
        if self.__vfs_server is None:
 
3025
            self.__vfs_server = self.vfs_transport_factory()
 
3026
            self.start_server(self.__vfs_server)
 
3027
        return self.__vfs_server
 
3028
 
 
3029
    def make_branch_and_tree(self, relpath, format=None):
 
3030
        """Create a branch on the transport and a tree locally.
 
3031
 
 
3032
        If the transport is not a LocalTransport, the Tree can't be created on
 
3033
        the transport.  In that case if the vfs_transport_factory is
 
3034
        LocalURLServer the working tree is created in the local
 
3035
        directory backing the transport, and the returned tree's branch and
 
3036
        repository will also be accessed locally. Otherwise a lightweight
 
3037
        checkout is created and returned.
 
3038
 
 
3039
        We do this because we can't physically create a tree in the local
 
3040
        path, with a branch reference to the transport_factory url, and
 
3041
        a branch + repository in the vfs_transport, unless the vfs_transport
 
3042
        namespace is distinct from the local disk - the two branch objects
 
3043
        would collide. While we could construct a tree with its branch object
 
3044
        pointing at the transport_factory transport in memory, reopening it
 
3045
        would behaving unexpectedly, and has in the past caused testing bugs
 
3046
        when we tried to do it that way.
 
3047
 
 
3048
        :param format: The BzrDirFormat.
 
3049
        :returns: the WorkingTree.
 
3050
        """
 
3051
        # TODO: always use the local disk path for the working tree,
 
3052
        # this obviously requires a format that supports branch references
 
3053
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
3054
        # RBC 20060208
 
3055
        format = self.resolve_format(format=format)
 
3056
        if not format.supports_workingtrees:
 
3057
            b = self.make_branch(relpath + '.branch', format=format)
 
3058
            return b.create_checkout(relpath, lightweight=True)
 
3059
        b = self.make_branch(relpath, format=format)
 
3060
        try:
 
3061
            return b.controldir.create_workingtree()
 
3062
        except errors.NotLocalUrl:
 
3063
            # We can only make working trees locally at the moment.  If the
 
3064
            # transport can't support them, then we keep the non-disk-backed
 
3065
            # branch and create a local checkout.
 
3066
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
3067
                # the branch is colocated on disk, we cannot create a checkout.
 
3068
                # hopefully callers will expect this.
 
3069
                local_controldir = controldir.ControlDir.open(
 
3070
                    self.get_vfs_only_url(relpath))
 
3071
                wt = local_controldir.create_workingtree()
 
3072
                if wt.branch._format != b._format:
 
3073
                    wt._branch = b
 
3074
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
3075
                    # in case the implementation details of workingtree objects
 
3076
                    # change.
 
3077
                    self.assertIs(b, wt.branch)
 
3078
                return wt
 
3079
            else:
 
3080
                return b.create_checkout(relpath, lightweight=True)
 
3081
 
 
3082
    def assertIsDirectory(self, relpath, transport):
 
3083
        """Assert that relpath within transport is a directory.
 
3084
 
 
3085
        This may not be possible on all transports; in that case it propagates
 
3086
        a TransportNotPossible.
 
3087
        """
 
3088
        try:
 
3089
            mode = transport.stat(relpath).st_mode
 
3090
        except errors.NoSuchFile:
 
3091
            self.fail("path %s is not a directory; no such file"
 
3092
                      % (relpath))
 
3093
        if not stat.S_ISDIR(mode):
 
3094
            self.fail("path %s is not a directory; has mode %#o"
 
3095
                      % (relpath, mode))
 
3096
 
 
3097
    def assertTreesEqual(self, left, right):
 
3098
        """Check that left and right have the same content and properties."""
 
3099
        # we use a tree delta to check for equality of the content, and we
 
3100
        # manually check for equality of other things such as the parents list.
 
3101
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
3102
        differences = left.changes_from(right)
 
3103
        self.assertFalse(differences.has_changed(),
 
3104
                         "Trees %r and %r are different: %r" % (left, right, differences))
 
3105
 
 
3106
    def disable_missing_extensions_warning(self):
 
3107
        """Some tests expect a precise stderr content.
 
3108
 
 
3109
        There is no point in forcing them to duplicate the extension related
 
3110
        warning.
 
3111
        """
 
3112
        config.GlobalConfig().set_user_option(
 
3113
            'suppress_warnings', 'missing_extensions')
 
3114
 
 
3115
 
 
3116
class ChrootedTestCase(TestCaseWithTransport):
 
3117
    """A support class that provides readonly urls outside the local namespace.
 
3118
 
 
3119
    This is done by checking if self.transport_server is a MemoryServer. if it
 
3120
    is then we are chrooted already, if it is not then an HttpServer is used
 
3121
    for readonly urls.
 
3122
 
 
3123
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
3124
                       be used without needed to redo it when a different
 
3125
                       subclass is in use ?
 
3126
    """
 
3127
 
 
3128
    def setUp(self):
 
3129
        from breezy.tests import http_server
 
3130
        super(ChrootedTestCase, self).setUp()
 
3131
        if not self.vfs_transport_factory == memory.MemoryServer:
 
3132
            self.transport_readonly_server = http_server.HttpServer
 
3133
 
 
3134
 
 
3135
def condition_id_re(pattern):
 
3136
    """Create a condition filter which performs a re check on a test's id.
 
3137
 
 
3138
    :param pattern: A regular expression string.
 
3139
    :return: A callable that returns True if the re matches.
 
3140
    """
 
3141
    filter_re = re.compile(pattern, 0)
 
3142
 
 
3143
    def condition(test):
 
3144
        test_id = test.id()
 
3145
        return filter_re.search(test_id)
 
3146
    return condition
 
3147
 
 
3148
 
 
3149
def condition_isinstance(klass_or_klass_list):
 
3150
    """Create a condition filter which returns isinstance(param, klass).
 
3151
 
 
3152
    :return: A callable which when called with one parameter obj return the
 
3153
        result of isinstance(obj, klass_or_klass_list).
 
3154
    """
 
3155
    def condition(obj):
 
3156
        return isinstance(obj, klass_or_klass_list)
 
3157
    return condition
 
3158
 
 
3159
 
 
3160
def condition_id_in_list(id_list):
 
3161
    """Create a condition filter which verify that test's id in a list.
 
3162
 
 
3163
    :param id_list: A TestIdList object.
 
3164
    :return: A callable that returns True if the test's id appears in the list.
 
3165
    """
 
3166
    def condition(test):
 
3167
        return id_list.includes(test.id())
 
3168
    return condition
 
3169
 
 
3170
 
 
3171
def condition_id_startswith(starts):
 
3172
    """Create a condition filter verifying that test's id starts with a string.
 
3173
 
 
3174
    :param starts: A list of string.
 
3175
    :return: A callable that returns True if the test's id starts with one of
 
3176
        the given strings.
 
3177
    """
 
3178
    def condition(test):
 
3179
        for start in starts:
 
3180
            if test.id().startswith(start):
 
3181
                return True
 
3182
        return False
 
3183
    return condition
 
3184
 
 
3185
 
 
3186
def exclude_tests_by_condition(suite, condition):
 
3187
    """Create a test suite which excludes some tests from suite.
 
3188
 
 
3189
    :param suite: The suite to get tests from.
 
3190
    :param condition: A callable whose result evaluates True when called with a
 
3191
        test case which should be excluded from the result.
 
3192
    :return: A suite which contains the tests found in suite that fail
 
3193
        condition.
 
3194
    """
 
3195
    result = []
 
3196
    for test in iter_suite_tests(suite):
 
3197
        if not condition(test):
 
3198
            result.append(test)
 
3199
    return TestUtil.TestSuite(result)
 
3200
 
 
3201
 
 
3202
def filter_suite_by_condition(suite, condition):
 
3203
    """Create a test suite by filtering another one.
 
3204
 
 
3205
    :param suite: The source suite.
 
3206
    :param condition: A callable whose result evaluates True when called with a
 
3207
        test case which should be included in the result.
 
3208
    :return: A suite which contains the tests found in suite that pass
 
3209
        condition.
 
3210
    """
 
3211
    result = []
 
3212
    for test in iter_suite_tests(suite):
 
3213
        if condition(test):
 
3214
            result.append(test)
 
3215
    return TestUtil.TestSuite(result)
 
3216
 
 
3217
 
 
3218
def filter_suite_by_re(suite, pattern):
 
3219
    """Create a test suite by filtering another one.
 
3220
 
 
3221
    :param suite:           the source suite
 
3222
    :param pattern:         pattern that names must match
 
3223
    :returns: the newly created suite
 
3224
    """
 
3225
    condition = condition_id_re(pattern)
 
3226
    result_suite = filter_suite_by_condition(suite, condition)
 
3227
    return result_suite
 
3228
 
 
3229
 
 
3230
def filter_suite_by_id_list(suite, test_id_list):
 
3231
    """Create a test suite by filtering another one.
 
3232
 
 
3233
    :param suite: The source suite.
 
3234
    :param test_id_list: A list of the test ids to keep as strings.
 
3235
    :returns: the newly created suite
 
3236
    """
 
3237
    condition = condition_id_in_list(test_id_list)
 
3238
    result_suite = filter_suite_by_condition(suite, condition)
 
3239
    return result_suite
 
3240
 
 
3241
 
 
3242
def filter_suite_by_id_startswith(suite, start):
 
3243
    """Create a test suite by filtering another one.
 
3244
 
 
3245
    :param suite: The source suite.
 
3246
    :param start: A list of string the test id must start with one of.
 
3247
    :returns: the newly created suite
 
3248
    """
 
3249
    condition = condition_id_startswith(start)
 
3250
    result_suite = filter_suite_by_condition(suite, condition)
 
3251
    return result_suite
 
3252
 
 
3253
 
 
3254
def exclude_tests_by_re(suite, pattern):
 
3255
    """Create a test suite which excludes some tests from suite.
 
3256
 
 
3257
    :param suite: The suite to get tests from.
 
3258
    :param pattern: A regular expression string. Test ids that match this
 
3259
        pattern will be excluded from the result.
 
3260
    :return: A TestSuite that contains all the tests from suite without the
 
3261
        tests that matched pattern. The order of tests is the same as it was in
 
3262
        suite.
 
3263
    """
 
3264
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
3265
 
 
3266
 
 
3267
def preserve_input(something):
 
3268
    """A helper for performing test suite transformation chains.
 
3269
 
 
3270
    :param something: Anything you want to preserve.
 
3271
    :return: Something.
 
3272
    """
 
3273
    return something
 
3274
 
 
3275
 
 
3276
def randomize_suite(suite):
 
3277
    """Return a new TestSuite with suite's tests in random order.
 
3278
 
 
3279
    The tests in the input suite are flattened into a single suite in order to
 
3280
    accomplish this. Any nested TestSuites are removed to provide global
 
3281
    randomness.
 
3282
    """
 
3283
    tests = list(iter_suite_tests(suite))
 
3284
    random.shuffle(tests)
 
3285
    return TestUtil.TestSuite(tests)
 
3286
 
 
3287
 
 
3288
def split_suite_by_condition(suite, condition):
 
3289
    """Split a test suite into two by a condition.
 
3290
 
 
3291
    :param suite: The suite to split.
 
3292
    :param condition: The condition to match on. Tests that match this
 
3293
        condition are returned in the first test suite, ones that do not match
 
3294
        are in the second suite.
 
3295
    :return: A tuple of two test suites, where the first contains tests from
 
3296
        suite matching the condition, and the second contains the remainder
 
3297
        from suite. The order within each output suite is the same as it was in
 
3298
        suite.
 
3299
    """
 
3300
    matched = []
 
3301
    did_not_match = []
 
3302
    for test in iter_suite_tests(suite):
 
3303
        if condition(test):
 
3304
            matched.append(test)
 
3305
        else:
 
3306
            did_not_match.append(test)
 
3307
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
3308
 
 
3309
 
 
3310
def split_suite_by_re(suite, pattern):
 
3311
    """Split a test suite into two by a regular expression.
 
3312
 
 
3313
    :param suite: The suite to split.
 
3314
    :param pattern: A regular expression string. Test ids that match this
 
3315
        pattern will be in the first test suite returned, and the others in the
 
3316
        second test suite returned.
 
3317
    :return: A tuple of two test suites, where the first contains tests from
 
3318
        suite matching pattern, and the second contains the remainder from
 
3319
        suite. The order within each output suite is the same as it was in
 
3320
        suite.
 
3321
    """
 
3322
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
3323
 
 
3324
 
 
3325
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
3326
              stop_on_failure=False,
 
3327
              transport=None, lsprof_timed=None, bench_history=None,
 
3328
              matching_tests_first=None,
 
3329
              list_only=False,
 
3330
              random_seed=None,
 
3331
              exclude_pattern=None,
 
3332
              strict=False,
 
3333
              runner_class=None,
 
3334
              suite_decorators=None,
 
3335
              stream=None,
 
3336
              result_decorators=None,
 
3337
              ):
 
3338
    """Run a test suite for brz selftest.
 
3339
 
 
3340
    :param runner_class: The class of runner to use. Must support the
 
3341
        constructor arguments passed by run_suite which are more than standard
 
3342
        python uses.
 
3343
    :return: A boolean indicating success.
 
3344
    """
 
3345
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
3346
    if verbose:
 
3347
        verbosity = 2
 
3348
    else:
 
3349
        verbosity = 1
 
3350
    if runner_class is None:
 
3351
        runner_class = TextTestRunner
 
3352
    if stream is None:
 
3353
        stream = sys.stdout
 
3354
    runner = runner_class(stream=stream,
 
3355
                          descriptions=0,
 
3356
                          verbosity=verbosity,
 
3357
                          bench_history=bench_history,
 
3358
                          strict=strict,
 
3359
                          result_decorators=result_decorators,
 
3360
                          )
 
3361
    runner.stop_on_failure = stop_on_failure
 
3362
    if isinstance(suite, unittest.TestSuite):
 
3363
        # Empty out _tests list of passed suite and populate new TestSuite
 
3364
        suite._tests[:], suite = [], TestSuite(suite)
 
3365
    # built in decorator factories:
 
3366
    decorators = [
 
3367
        random_order(random_seed, runner),
 
3368
        exclude_tests(exclude_pattern),
 
3369
        ]
 
3370
    if matching_tests_first:
 
3371
        decorators.append(tests_first(pattern))
 
3372
    else:
 
3373
        decorators.append(filter_tests(pattern))
 
3374
    if suite_decorators:
 
3375
        decorators.extend(suite_decorators)
 
3376
    # tell the result object how many tests will be running: (except if
 
3377
    # --parallel=fork is being used. Robert said he will provide a better
 
3378
    # progress design later -- vila 20090817)
 
3379
    if fork_decorator not in decorators:
 
3380
        decorators.append(CountingDecorator)
 
3381
    for decorator in decorators:
 
3382
        suite = decorator(suite)
 
3383
    if list_only:
 
3384
        # Done after test suite decoration to allow randomisation etc
 
3385
        # to take effect, though that is of marginal benefit.
 
3386
        if verbosity >= 2:
 
3387
            stream.write("Listing tests only ...\n")
 
3388
        if getattr(runner, 'list', None) is not None:
 
3389
            runner.list(suite)
 
3390
        else:
 
3391
            for t in iter_suite_tests(suite):
 
3392
                stream.write("%s\n" % (t.id()))
 
3393
        return True
 
3394
    result = runner.run(suite)
 
3395
    if strict and getattr(result, 'wasStrictlySuccessful', False):
 
3396
        return result.wasStrictlySuccessful()
 
3397
    else:
 
3398
        return result.wasSuccessful()
 
3399
 
 
3400
 
 
3401
# A registry where get() returns a suite decorator.
 
3402
parallel_registry = registry.Registry()
 
3403
 
 
3404
 
 
3405
def fork_decorator(suite):
 
3406
    if getattr(os, "fork", None) is None:
 
3407
        raise errors.BzrCommandError("platform does not support fork,"
 
3408
                                     " try --parallel=subprocess instead.")
 
3409
    concurrency = osutils.local_concurrency()
 
3410
    if concurrency == 1:
 
3411
        return suite
 
3412
    from testtools import ConcurrentTestSuite
 
3413
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3414
 
 
3415
 
 
3416
parallel_registry.register('fork', fork_decorator)
 
3417
 
 
3418
 
 
3419
def subprocess_decorator(suite):
 
3420
    concurrency = osutils.local_concurrency()
 
3421
    if concurrency == 1:
 
3422
        return suite
 
3423
    from testtools import ConcurrentTestSuite
 
3424
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3425
 
 
3426
 
 
3427
parallel_registry.register('subprocess', subprocess_decorator)
 
3428
 
 
3429
 
 
3430
def exclude_tests(exclude_pattern):
 
3431
    """Return a test suite decorator that excludes tests."""
 
3432
    if exclude_pattern is None:
 
3433
        return identity_decorator
 
3434
 
 
3435
    def decorator(suite):
 
3436
        return ExcludeDecorator(suite, exclude_pattern)
 
3437
    return decorator
 
3438
 
 
3439
 
 
3440
def filter_tests(pattern):
 
3441
    if pattern == '.*':
 
3442
        return identity_decorator
 
3443
 
 
3444
    def decorator(suite):
 
3445
        return FilterTestsDecorator(suite, pattern)
 
3446
    return decorator
 
3447
 
 
3448
 
 
3449
def random_order(random_seed, runner):
 
3450
    """Return a test suite decorator factory for randomising tests order.
 
3451
 
 
3452
    :param random_seed: now, a string which casts to an integer, or an integer.
 
3453
    :param runner: A test runner with a stream attribute to report on.
 
3454
    """
 
3455
    if random_seed is None:
 
3456
        return identity_decorator
 
3457
 
 
3458
    def decorator(suite):
 
3459
        return RandomDecorator(suite, random_seed, runner.stream)
 
3460
    return decorator
 
3461
 
 
3462
 
 
3463
def tests_first(pattern):
 
3464
    if pattern == '.*':
 
3465
        return identity_decorator
 
3466
 
 
3467
    def decorator(suite):
 
3468
        return TestFirstDecorator(suite, pattern)
 
3469
    return decorator
 
3470
 
 
3471
 
 
3472
def identity_decorator(suite):
 
3473
    """Return suite."""
 
3474
    return suite
 
3475
 
 
3476
 
 
3477
class TestDecorator(TestUtil.TestSuite):
 
3478
    """A decorator for TestCase/TestSuite objects.
 
3479
 
 
3480
    Contains rather than flattening suite passed on construction
 
3481
    """
 
3482
 
 
3483
    def __init__(self, suite=None):
 
3484
        super(TestDecorator, self).__init__()
 
3485
        if suite is not None:
 
3486
            self.addTest(suite)
 
3487
 
 
3488
    # Don't need subclass run method with suite emptying
 
3489
    run = unittest.TestSuite.run
 
3490
 
 
3491
 
 
3492
class CountingDecorator(TestDecorator):
 
3493
    """A decorator which calls result.progress(self.countTestCases)."""
 
3494
 
 
3495
    def run(self, result):
 
3496
        progress_method = getattr(result, 'progress', None)
 
3497
        if callable(progress_method):
 
3498
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3499
        return super(CountingDecorator, self).run(result)
 
3500
 
 
3501
 
 
3502
class ExcludeDecorator(TestDecorator):
 
3503
    """A decorator which excludes test matching an exclude pattern."""
 
3504
 
 
3505
    def __init__(self, suite, exclude_pattern):
 
3506
        super(ExcludeDecorator, self).__init__(
 
3507
            exclude_tests_by_re(suite, exclude_pattern))
 
3508
 
 
3509
 
 
3510
class FilterTestsDecorator(TestDecorator):
 
3511
    """A decorator which filters tests to those matching a pattern."""
 
3512
 
 
3513
    def __init__(self, suite, pattern):
 
3514
        super(FilterTestsDecorator, self).__init__(
 
3515
            filter_suite_by_re(suite, pattern))
 
3516
 
 
3517
 
 
3518
class RandomDecorator(TestDecorator):
 
3519
    """A decorator which randomises the order of its tests."""
 
3520
 
 
3521
    def __init__(self, suite, random_seed, stream):
 
3522
        random_seed = self.actual_seed(random_seed)
 
3523
        stream.write("Randomizing test order using seed %s\n\n" %
 
3524
                     (random_seed,))
 
3525
        # Initialise the random number generator.
 
3526
        random.seed(random_seed)
 
3527
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3528
 
 
3529
    @staticmethod
 
3530
    def actual_seed(seed):
 
3531
        if seed == "now":
 
3532
            # We convert the seed to an integer to make it reuseable across
 
3533
            # invocations (because the user can reenter it).
 
3534
            return int(time.time())
 
3535
        else:
 
3536
            # Convert the seed to an integer if we can
 
3537
            try:
 
3538
                return int(seed)
 
3539
            except (TypeError, ValueError):
 
3540
                pass
 
3541
        return seed
 
3542
 
 
3543
 
 
3544
class TestFirstDecorator(TestDecorator):
 
3545
    """A decorator which moves named tests to the front."""
 
3546
 
 
3547
    def __init__(self, suite, pattern):
 
3548
        super(TestFirstDecorator, self).__init__()
 
3549
        self.addTests(split_suite_by_re(suite, pattern))
 
3550
 
 
3551
 
 
3552
def partition_tests(suite, count):
 
3553
    """Partition suite into count lists of tests."""
 
3554
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3555
    # splits up blocks of related tests that might run faster if they shared
 
3556
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3557
    # just one partition.  So the slowest partition shouldn't be much slower
 
3558
    # than the fastest.
 
3559
    partitions = [list() for i in range(count)]
 
3560
    tests = iter_suite_tests(suite)
 
3561
    for partition, test in zip(itertools.cycle(partitions), tests):
 
3562
        partition.append(test)
 
3563
    return partitions
 
3564
 
 
3565
 
 
3566
def workaround_zealous_crypto_random():
 
3567
    """Crypto.Random want to help us being secure, but we don't care here.
 
3568
 
 
3569
    This workaround some test failure related to the sftp server. Once paramiko
 
3570
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3571
    """
 
3572
    try:
 
3573
        from Crypto.Random import atfork
 
3574
        atfork()
 
3575
    except ImportError:
 
3576
        pass
 
3577
 
 
3578
 
 
3579
def fork_for_tests(suite):
 
3580
    """Take suite and start up one runner per CPU by forking()
 
3581
 
 
3582
    :return: An iterable of TestCase-like objects which can each have
 
3583
        run(result) called on them to feed tests to result.
 
3584
    """
 
3585
    concurrency = osutils.local_concurrency()
 
3586
    result = []
 
3587
    from subunit import ProtocolTestCase
 
3588
    from subunit.test_results import AutoTimingTestResultDecorator
 
3589
 
 
3590
    class TestInOtherProcess(ProtocolTestCase):
 
3591
        # Should be in subunit, I think. RBC.
 
3592
        def __init__(self, stream, pid):
 
3593
            ProtocolTestCase.__init__(self, stream)
 
3594
            self.pid = pid
 
3595
 
 
3596
        def run(self, result):
 
3597
            try:
 
3598
                ProtocolTestCase.run(self, result)
 
3599
            finally:
 
3600
                pid, status = os.waitpid(self.pid, 0)
 
3601
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3602
            #                that something went wrong.
 
3603
 
 
3604
    test_blocks = partition_tests(suite, concurrency)
 
3605
    # Clear the tests from the original suite so it doesn't keep them alive
 
3606
    suite._tests[:] = []
 
3607
    for process_tests in test_blocks:
 
3608
        process_suite = TestUtil.TestSuite(process_tests)
 
3609
        # Also clear each split list so new suite has only reference
 
3610
        process_tests[:] = []
 
3611
        c2pread, c2pwrite = os.pipe()
 
3612
        pid = os.fork()
 
3613
        if pid == 0:
 
3614
            try:
 
3615
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3616
                workaround_zealous_crypto_random()
 
3617
                try:
 
3618
                    import coverage
 
3619
                except ImportError:
 
3620
                    pass
 
3621
                else:
 
3622
                    coverage.process_startup()
 
3623
                os.close(c2pread)
 
3624
                # Leave stderr and stdout open so we can see test noise
 
3625
                # Close stdin so that the child goes away if it decides to
 
3626
                # read from stdin (otherwise its a roulette to see what
 
3627
                # child actually gets keystrokes for pdb etc).
 
3628
                sys.stdin.close()
 
3629
                subunit_result = AutoTimingTestResultDecorator(
 
3630
                    SubUnitBzrProtocolClientv1(stream))
 
3631
                process_suite.run(subunit_result)
 
3632
            except:
 
3633
                # Try and report traceback on stream, but exit with error even
 
3634
                # if stream couldn't be created or something else goes wrong.
 
3635
                # The traceback is formatted to a string and written in one go
 
3636
                # to avoid interleaving lines from multiple failing children.
 
3637
                tb = traceback.format_exc()
 
3638
                if isinstance(tb, text_type):
 
3639
                    tb = tb.encode('utf-8')
 
3640
                try:
 
3641
                    stream.write(tb)
 
3642
                finally:
 
3643
                    stream.flush()
 
3644
                    os._exit(1)
 
3645
            os._exit(0)
 
3646
        else:
 
3647
            os.close(c2pwrite)
 
3648
            stream = os.fdopen(c2pread, 'rb', 1)
 
3649
            test = TestInOtherProcess(stream, pid)
 
3650
            result.append(test)
 
3651
    return result
 
3652
 
 
3653
 
 
3654
def reinvoke_for_tests(suite):
 
3655
    """Take suite and start up one runner per CPU using subprocess().
 
3656
 
 
3657
    :return: An iterable of TestCase-like objects which can each have
 
3658
        run(result) called on them to feed tests to result.
 
3659
    """
 
3660
    concurrency = osutils.local_concurrency()
 
3661
    result = []
 
3662
    from subunit import ProtocolTestCase
 
3663
 
 
3664
    class TestInSubprocess(ProtocolTestCase):
 
3665
        def __init__(self, process, name):
 
3666
            ProtocolTestCase.__init__(self, process.stdout)
 
3667
            self.process = process
 
3668
            self.process.stdin.close()
 
3669
            self.name = name
 
3670
 
 
3671
        def run(self, result):
 
3672
            try:
 
3673
                ProtocolTestCase.run(self, result)
 
3674
            finally:
 
3675
                self.process.wait()
 
3676
                os.unlink(self.name)
 
3677
            # print "pid %d finished" % finished_process
 
3678
    test_blocks = partition_tests(suite, concurrency)
 
3679
    for process_tests in test_blocks:
 
3680
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3681
        bzr_path = os.path.dirname(os.path.dirname(breezy.__file__)) + '/bzr'
 
3682
        if not os.path.isfile(bzr_path):
 
3683
            # We are probably installed. Assume sys.argv is the right file
 
3684
            bzr_path = sys.argv[0]
 
3685
        bzr_path = [bzr_path]
 
3686
        if sys.platform == "win32":
 
3687
            # if we're on windows, we can't execute the bzr script directly
 
3688
            bzr_path = [sys.executable] + bzr_path
 
3689
        fd, test_list_file_name = tempfile.mkstemp()
 
3690
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3691
        for test in process_tests:
 
3692
            test_list_file.write(test.id() + '\n')
 
3693
        test_list_file.close()
 
3694
        try:
 
3695
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3696
                               '--subunit']
 
3697
            if '--no-plugins' in sys.argv:
 
3698
                argv.append('--no-plugins')
 
3699
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3700
            # noise on stderr it can interrupt the subunit protocol.
 
3701
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3702
                                       stdout=subprocess.PIPE,
 
3703
                                       stderr=subprocess.PIPE,
 
3704
                                       bufsize=1)
 
3705
            test = TestInSubprocess(process, test_list_file_name)
 
3706
            result.append(test)
 
3707
        except:
 
3708
            os.unlink(test_list_file_name)
 
3709
            raise
 
3710
    return result
 
3711
 
 
3712
 
 
3713
class ProfileResult(testtools.ExtendedToOriginalDecorator):
 
3714
    """Generate profiling data for all activity between start and success.
 
3715
 
 
3716
    The profile data is appended to the test's _benchcalls attribute and can
 
3717
    be accessed by the forwarded-to TestResult.
 
3718
 
 
3719
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3720
    where our existing output support for lsprof is, and this class aims to
 
3721
    fit in with that: while it could be moved it's not necessary to accomplish
 
3722
    test profiling, nor would it be dramatically cleaner.
 
3723
    """
 
3724
 
 
3725
    def startTest(self, test):
 
3726
        self.profiler = breezy.lsprof.BzrProfiler()
 
3727
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3728
        # unavoidably fail.
 
3729
        breezy.lsprof.BzrProfiler.profiler_block = 0
 
3730
        self.profiler.start()
 
3731
        testtools.ExtendedToOriginalDecorator.startTest(self, test)
 
3732
 
 
3733
    def addSuccess(self, test):
 
3734
        stats = self.profiler.stop()
 
3735
        try:
 
3736
            calls = test._benchcalls
 
3737
        except AttributeError:
 
3738
            test._benchcalls = []
 
3739
            calls = test._benchcalls
 
3740
        calls.append(((test.id(), "", ""), stats))
 
3741
        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
 
3742
 
 
3743
    def stopTest(self, test):
 
3744
        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
 
3745
        self.profiler = None
 
3746
 
 
3747
 
 
3748
# Controlled by "brz selftest -E=..." option
 
3749
# Currently supported:
 
3750
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3751
#                           preserves any flags supplied at the command line.
 
3752
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3753
#                           rather than failing tests. And no longer raise
 
3754
#                           LockContention when fctnl locks are not being used
 
3755
#                           with proper exclusion rules.
 
3756
#   -Ethreads               Will display thread ident at creation/join time to
 
3757
#                           help track thread leaks
 
3758
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3759
#                           deallocated after being completed.
 
3760
#   -Econfig_stats          Will collect statistics using addDetail
 
3761
selftest_debug_flags = set()
 
3762
 
 
3763
 
 
3764
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3765
             transport=None,
 
3766
             test_suite_factory=None,
 
3767
             lsprof_timed=None,
 
3768
             bench_history=None,
 
3769
             matching_tests_first=None,
 
3770
             list_only=False,
 
3771
             random_seed=None,
 
3772
             exclude_pattern=None,
 
3773
             strict=False,
 
3774
             load_list=None,
 
3775
             debug_flags=None,
 
3776
             starting_with=None,
 
3777
             runner_class=None,
 
3778
             suite_decorators=None,
 
3779
             stream=None,
 
3780
             lsprof_tests=False,
 
3781
             ):
 
3782
    """Run the whole test suite under the enhanced runner"""
 
3783
    # XXX: Very ugly way to do this...
 
3784
    # Disable warning about old formats because we don't want it to disturb
 
3785
    # any blackbox tests.
 
3786
    from breezy import repository
 
3787
    repository._deprecation_warning_done = True
 
3788
 
 
3789
    global default_transport
 
3790
    if transport is None:
 
3791
        transport = default_transport
 
3792
    old_transport = default_transport
 
3793
    default_transport = transport
 
3794
    global selftest_debug_flags
 
3795
    old_debug_flags = selftest_debug_flags
 
3796
    if debug_flags is not None:
 
3797
        selftest_debug_flags = set(debug_flags)
 
3798
    try:
 
3799
        if load_list is None:
 
3800
            keep_only = None
 
3801
        else:
 
3802
            keep_only = load_test_id_list(load_list)
 
3803
        if starting_with:
 
3804
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3805
                             for start in starting_with]
 
3806
            # Always consider 'unittest' an interesting name so that failed
 
3807
            # suites wrapped as test cases appear in the output.
 
3808
            starting_with.append('unittest')
 
3809
        if test_suite_factory is None:
 
3810
            # Reduce loading time by loading modules based on the starting_with
 
3811
            # patterns.
 
3812
            suite = test_suite(keep_only, starting_with)
 
3813
        else:
 
3814
            suite = test_suite_factory()
 
3815
        if starting_with:
 
3816
            # But always filter as requested.
 
3817
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3818
        result_decorators = []
 
3819
        if lsprof_tests:
 
3820
            result_decorators.append(ProfileResult)
 
3821
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3822
                         stop_on_failure=stop_on_failure,
 
3823
                         transport=transport,
 
3824
                         lsprof_timed=lsprof_timed,
 
3825
                         bench_history=bench_history,
 
3826
                         matching_tests_first=matching_tests_first,
 
3827
                         list_only=list_only,
 
3828
                         random_seed=random_seed,
 
3829
                         exclude_pattern=exclude_pattern,
 
3830
                         strict=strict,
 
3831
                         runner_class=runner_class,
 
3832
                         suite_decorators=suite_decorators,
 
3833
                         stream=stream,
 
3834
                         result_decorators=result_decorators,
 
3835
                         )
 
3836
    finally:
 
3837
        default_transport = old_transport
 
3838
        selftest_debug_flags = old_debug_flags
 
3839
 
 
3840
 
 
3841
def load_test_id_list(file_name):
 
3842
    """Load a test id list from a text file.
 
3843
 
 
3844
    The format is one test id by line.  No special care is taken to impose
 
3845
    strict rules, these test ids are used to filter the test suite so a test id
 
3846
    that do not match an existing test will do no harm. This allows user to add
 
3847
    comments, leave blank lines, etc.
 
3848
    """
 
3849
    test_list = []
 
3850
    try:
 
3851
        ftest = open(file_name, 'rt')
 
3852
    except IOError as e:
 
3853
        if e.errno != errno.ENOENT:
 
3854
            raise
 
3855
        else:
 
3856
            raise errors.NoSuchFile(file_name)
 
3857
 
 
3858
    for test_name in ftest.readlines():
 
3859
        test_list.append(test_name.strip())
 
3860
    ftest.close()
 
3861
    return test_list
 
3862
 
 
3863
 
 
3864
def suite_matches_id_list(test_suite, id_list):
 
3865
    """Warns about tests not appearing or appearing more than once.
 
3866
 
 
3867
    :param test_suite: A TestSuite object.
 
3868
    :param test_id_list: The list of test ids that should be found in
 
3869
         test_suite.
 
3870
 
 
3871
    :return: (absents, duplicates) absents is a list containing the test found
 
3872
        in id_list but not in test_suite, duplicates is a list containing the
 
3873
        tests found multiple times in test_suite.
 
3874
 
 
3875
    When using a prefined test id list, it may occurs that some tests do not
 
3876
    exist anymore or that some tests use the same id. This function warns the
 
3877
    tester about potential problems in his workflow (test lists are volatile)
 
3878
    or in the test suite itself (using the same id for several tests does not
 
3879
    help to localize defects).
 
3880
    """
 
3881
    # Build a dict counting id occurrences
 
3882
    tests = dict()
 
3883
    for test in iter_suite_tests(test_suite):
 
3884
        id = test.id()
 
3885
        tests[id] = tests.get(id, 0) + 1
 
3886
 
 
3887
    not_found = []
 
3888
    duplicates = []
 
3889
    for id in id_list:
 
3890
        occurs = tests.get(id, 0)
 
3891
        if not occurs:
 
3892
            not_found.append(id)
 
3893
        elif occurs > 1:
 
3894
            duplicates.append(id)
 
3895
 
 
3896
    return not_found, duplicates
 
3897
 
 
3898
 
 
3899
class TestIdList(object):
 
3900
    """Test id list to filter a test suite.
 
3901
 
 
3902
    Relying on the assumption that test ids are built as:
 
3903
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3904
    notation, this class offers methods to :
 
3905
    - avoid building a test suite for modules not refered to in the test list,
 
3906
    - keep only the tests listed from the module test suite.
 
3907
    """
 
3908
 
 
3909
    def __init__(self, test_id_list):
 
3910
        # When a test suite needs to be filtered against us we compare test ids
 
3911
        # for equality, so a simple dict offers a quick and simple solution.
 
3912
        self.tests = dict().fromkeys(test_id_list, True)
 
3913
 
 
3914
        # While unittest.TestCase have ids like:
 
3915
        # <module>.<class>.<method>[(<param+)],
 
3916
        # doctest.DocTestCase can have ids like:
 
3917
        # <module>
 
3918
        # <module>.<class>
 
3919
        # <module>.<function>
 
3920
        # <module>.<class>.<method>
 
3921
 
 
3922
        # Since we can't predict a test class from its name only, we settle on
 
3923
        # a simple constraint: a test id always begins with its module name.
 
3924
 
 
3925
        modules = {}
 
3926
        for test_id in test_id_list:
 
3927
            parts = test_id.split('.')
 
3928
            mod_name = parts.pop(0)
 
3929
            modules[mod_name] = True
 
3930
            for part in parts:
 
3931
                mod_name += '.' + part
 
3932
                modules[mod_name] = True
 
3933
        self.modules = modules
 
3934
 
 
3935
    def refers_to(self, module_name):
 
3936
        """Is there tests for the module or one of its sub modules."""
 
3937
        return module_name in self.modules
 
3938
 
 
3939
    def includes(self, test_id):
 
3940
        return test_id in self.tests
 
3941
 
 
3942
 
 
3943
class TestPrefixAliasRegistry(registry.Registry):
 
3944
    """A registry for test prefix aliases.
 
3945
 
 
3946
    This helps implement shorcuts for the --starting-with selftest
 
3947
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3948
    warning will be emitted).
 
3949
    """
 
3950
 
 
3951
    def register(self, key, obj, help=None, info=None,
 
3952
                 override_existing=False):
 
3953
        """See Registry.register.
 
3954
 
 
3955
        Trying to override an existing alias causes a warning to be emitted,
 
3956
        not a fatal execption.
 
3957
        """
 
3958
        try:
 
3959
            super(TestPrefixAliasRegistry, self).register(
 
3960
                key, obj, help=help, info=info, override_existing=False)
 
3961
        except KeyError:
 
3962
            actual = self.get(key)
 
3963
            trace.note(
 
3964
                'Test prefix alias %s is already used for %s, ignoring %s'
 
3965
                % (key, actual, obj))
 
3966
 
 
3967
    def resolve_alias(self, id_start):
 
3968
        """Replace the alias by the prefix in the given string.
 
3969
 
 
3970
        Using an unknown prefix is an error to help catching typos.
 
3971
        """
 
3972
        parts = id_start.split('.')
 
3973
        try:
 
3974
            parts[0] = self.get(parts[0])
 
3975
        except KeyError:
 
3976
            raise errors.BzrCommandError(
 
3977
                '%s is not a known test prefix alias' % parts[0])
 
3978
        return '.'.join(parts)
 
3979
 
 
3980
 
 
3981
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3982
"""Registry of test prefix aliases."""
 
3983
 
 
3984
 
 
3985
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3986
# appear prefixed ('breezy.' is "replaced" by 'breezy.').
 
3987
test_prefix_alias_registry.register('breezy', 'breezy')
 
3988
 
 
3989
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3990
test_prefix_alias_registry.register('bd', 'breezy.doc')
 
3991
test_prefix_alias_registry.register('bu', 'breezy.utils')
 
3992
test_prefix_alias_registry.register('bt', 'breezy.tests')
 
3993
test_prefix_alias_registry.register('bb', 'breezy.tests.blackbox')
 
3994
test_prefix_alias_registry.register('bp', 'breezy.plugins')
 
3995
 
 
3996
 
 
3997
def _test_suite_testmod_names():
 
3998
    """Return the standard list of test module names to test."""
 
3999
    return [
 
4000
        'breezy.git.tests.test_blackbox',
 
4001
        'breezy.git.tests.test_builder',
 
4002
        'breezy.git.tests.test_branch',
 
4003
        'breezy.git.tests.test_cache',
 
4004
        'breezy.git.tests.test_dir',
 
4005
        'breezy.git.tests.test_fetch',
 
4006
        'breezy.git.tests.test_git_remote_helper',
 
4007
        'breezy.git.tests.test_mapping',
 
4008
        'breezy.git.tests.test_memorytree',
 
4009
        'breezy.git.tests.test_object_store',
 
4010
        'breezy.git.tests.test_pristine_tar',
 
4011
        'breezy.git.tests.test_push',
 
4012
        'breezy.git.tests.test_remote',
 
4013
        'breezy.git.tests.test_repository',
 
4014
        'breezy.git.tests.test_refs',
 
4015
        'breezy.git.tests.test_revspec',
 
4016
        'breezy.git.tests.test_roundtrip',
 
4017
        'breezy.git.tests.test_server',
 
4018
        'breezy.git.tests.test_transportgit',
 
4019
        'breezy.git.tests.test_unpeel_map',
 
4020
        'breezy.git.tests.test_urls',
 
4021
        'breezy.git.tests.test_workingtree',
 
4022
        'breezy.tests.blackbox',
 
4023
        'breezy.tests.commands',
 
4024
        'breezy.tests.per_branch',
 
4025
        'breezy.tests.per_bzrdir',
 
4026
        'breezy.tests.per_controldir',
 
4027
        'breezy.tests.per_controldir_colo',
 
4028
        'breezy.tests.per_foreign_vcs',
 
4029
        'breezy.tests.per_interrepository',
 
4030
        'breezy.tests.per_intertree',
 
4031
        'breezy.tests.per_inventory',
 
4032
        'breezy.tests.per_interbranch',
 
4033
        'breezy.tests.per_lock',
 
4034
        'breezy.tests.per_merger',
 
4035
        'breezy.tests.per_transport',
 
4036
        'breezy.tests.per_tree',
 
4037
        'breezy.tests.per_pack_repository',
 
4038
        'breezy.tests.per_repository',
 
4039
        'breezy.tests.per_repository_chk',
 
4040
        'breezy.tests.per_repository_reference',
 
4041
        'breezy.tests.per_repository_vf',
 
4042
        'breezy.tests.per_uifactory',
 
4043
        'breezy.tests.per_versionedfile',
 
4044
        'breezy.tests.per_workingtree',
 
4045
        'breezy.tests.test__annotator',
 
4046
        'breezy.tests.test__bencode',
 
4047
        'breezy.tests.test__btree_serializer',
 
4048
        'breezy.tests.test__chk_map',
 
4049
        'breezy.tests.test__dirstate_helpers',
 
4050
        'breezy.tests.test__groupcompress',
 
4051
        'breezy.tests.test__known_graph',
 
4052
        'breezy.tests.test__rio',
 
4053
        'breezy.tests.test__simple_set',
 
4054
        'breezy.tests.test__static_tuple',
 
4055
        'breezy.tests.test__walkdirs_win32',
 
4056
        'breezy.tests.test_ancestry',
 
4057
        'breezy.tests.test_annotate',
 
4058
        'breezy.tests.test_atomicfile',
 
4059
        'breezy.tests.test_bad_files',
 
4060
        'breezy.tests.test_bisect',
 
4061
        'breezy.tests.test_bisect_multi',
 
4062
        'breezy.tests.test_branch',
 
4063
        'breezy.tests.test_branchbuilder',
 
4064
        'breezy.tests.test_btree_index',
 
4065
        'breezy.tests.test_bugtracker',
 
4066
        'breezy.tests.test_bundle',
 
4067
        'breezy.tests.test_bzrdir',
 
4068
        'breezy.tests.test__chunks_to_lines',
 
4069
        'breezy.tests.test_cache_utf8',
 
4070
        'breezy.tests.test_chk_map',
 
4071
        'breezy.tests.test_chk_serializer',
 
4072
        'breezy.tests.test_chunk_writer',
 
4073
        'breezy.tests.test_clean_tree',
 
4074
        'breezy.tests.test_cleanup',
 
4075
        'breezy.tests.test_cmdline',
 
4076
        'breezy.tests.test_commands',
 
4077
        'breezy.tests.test_commit',
 
4078
        'breezy.tests.test_commit_merge',
 
4079
        'breezy.tests.test_config',
 
4080
        'breezy.tests.test_conflicts',
 
4081
        'breezy.tests.test_controldir',
 
4082
        'breezy.tests.test_counted_lock',
 
4083
        'breezy.tests.test_crash',
 
4084
        'breezy.tests.test_decorators',
 
4085
        'breezy.tests.test_delta',
 
4086
        'breezy.tests.test_debug',
 
4087
        'breezy.tests.test_diff',
 
4088
        'breezy.tests.test_directory_service',
 
4089
        'breezy.tests.test_dirstate',
 
4090
        'breezy.tests.test_email_message',
 
4091
        'breezy.tests.test_eol_filters',
 
4092
        'breezy.tests.test_errors',
 
4093
        'breezy.tests.test_estimate_compressed_size',
 
4094
        'breezy.tests.test_export',
 
4095
        'breezy.tests.test_export_pot',
 
4096
        'breezy.tests.test_extract',
 
4097
        'breezy.tests.test_features',
 
4098
        'breezy.tests.test_fetch',
 
4099
        'breezy.tests.test_fetch_ghosts',
 
4100
        'breezy.tests.test_fixtures',
 
4101
        'breezy.tests.test_fifo_cache',
 
4102
        'breezy.tests.test_filters',
 
4103
        'breezy.tests.test_filter_tree',
 
4104
        'breezy.tests.test_foreign',
 
4105
        'breezy.tests.test_generate_docs',
 
4106
        'breezy.tests.test_generate_ids',
 
4107
        'breezy.tests.test_globbing',
 
4108
        'breezy.tests.test_gpg',
 
4109
        'breezy.tests.test_graph',
 
4110
        'breezy.tests.test_grep',
 
4111
        'breezy.tests.test_groupcompress',
 
4112
        'breezy.tests.test_hashcache',
 
4113
        'breezy.tests.test_help',
 
4114
        'breezy.tests.test_hooks',
 
4115
        'breezy.tests.test_http',
 
4116
        'breezy.tests.test_http_response',
 
4117
        'breezy.tests.test_https_ca_bundle',
 
4118
        'breezy.tests.test_https_urllib',
 
4119
        'breezy.tests.test_i18n',
 
4120
        'breezy.tests.test_identitymap',
 
4121
        'breezy.tests.test_ignores',
 
4122
        'breezy.tests.test_index',
 
4123
        'breezy.tests.test_import_tariff',
 
4124
        'breezy.tests.test_info',
 
4125
        'breezy.tests.test_inv',
 
4126
        'breezy.tests.test_inventory_delta',
 
4127
        'breezy.tests.test_knit',
 
4128
        'breezy.tests.test_lazy_import',
 
4129
        'breezy.tests.test_lazy_regex',
 
4130
        'breezy.tests.test_library_state',
 
4131
        'breezy.tests.test_location',
 
4132
        'breezy.tests.test_lock',
 
4133
        'breezy.tests.test_lockable_files',
 
4134
        'breezy.tests.test_lockdir',
 
4135
        'breezy.tests.test_log',
 
4136
        'breezy.tests.test_lru_cache',
 
4137
        'breezy.tests.test_lsprof',
 
4138
        'breezy.tests.test_mail_client',
 
4139
        'breezy.tests.test_matchers',
 
4140
        'breezy.tests.test_memorytree',
 
4141
        'breezy.tests.test_merge',
 
4142
        'breezy.tests.test_merge3',
 
4143
        'breezy.tests.test_merge_core',
 
4144
        'breezy.tests.test_merge_directive',
 
4145
        'breezy.tests.test_mergetools',
 
4146
        'breezy.tests.test_missing',
 
4147
        'breezy.tests.test_msgeditor',
 
4148
        'breezy.tests.test_multiparent',
 
4149
        'breezy.tests.test_mutabletree',
 
4150
        'breezy.tests.test_nonascii',
 
4151
        'breezy.tests.test_options',
 
4152
        'breezy.tests.test_osutils',
 
4153
        'breezy.tests.test_osutils_encodings',
 
4154
        'breezy.tests.test_pack',
 
4155
        'breezy.tests.test_patch',
 
4156
        'breezy.tests.test_patches',
 
4157
        'breezy.tests.test_permissions',
 
4158
        'breezy.tests.test_plugins',
 
4159
        'breezy.tests.test_progress',
 
4160
        'breezy.tests.test_pyutils',
 
4161
        'breezy.tests.test_read_bundle',
 
4162
        'breezy.tests.test_reconcile',
 
4163
        'breezy.tests.test_reconfigure',
 
4164
        'breezy.tests.test_registry',
 
4165
        'breezy.tests.test_remote',
 
4166
        'breezy.tests.test_rename_map',
 
4167
        'breezy.tests.test_repository',
 
4168
        'breezy.tests.test_revert',
 
4169
        'breezy.tests.test_revision',
 
4170
        'breezy.tests.test_revisionspec',
 
4171
        'breezy.tests.test_revisiontree',
 
4172
        'breezy.tests.test_rio',
 
4173
        'breezy.tests.test_rules',
 
4174
        'breezy.tests.test_url_policy_open',
 
4175
        'breezy.tests.test_sampler',
 
4176
        'breezy.tests.test_scenarios',
 
4177
        'breezy.tests.test_script',
 
4178
        'breezy.tests.test_selftest',
 
4179
        'breezy.tests.test_serializer',
 
4180
        'breezy.tests.test_setup',
 
4181
        'breezy.tests.test_sftp_transport',
 
4182
        'breezy.tests.test_shelf',
 
4183
        'breezy.tests.test_shelf_ui',
 
4184
        'breezy.tests.test_smart',
 
4185
        'breezy.tests.test_smart_add',
 
4186
        'breezy.tests.test_smart_request',
 
4187
        'breezy.tests.test_smart_signals',
 
4188
        'breezy.tests.test_smart_transport',
 
4189
        'breezy.tests.test_smtp_connection',
 
4190
        'breezy.tests.test_source',
 
4191
        'breezy.tests.test_ssh_transport',
 
4192
        'breezy.tests.test_status',
 
4193
        'breezy.tests.test_strace',
 
4194
        'breezy.tests.test_subsume',
 
4195
        'breezy.tests.test_switch',
 
4196
        'breezy.tests.test_symbol_versioning',
 
4197
        'breezy.tests.test_tag',
 
4198
        'breezy.tests.test_test_server',
 
4199
        'breezy.tests.test_testament',
 
4200
        'breezy.tests.test_textfile',
 
4201
        'breezy.tests.test_textmerge',
 
4202
        'breezy.tests.test_cethread',
 
4203
        'breezy.tests.test_timestamp',
 
4204
        'breezy.tests.test_trace',
 
4205
        'breezy.tests.test_transactions',
 
4206
        'breezy.tests.test_transform',
 
4207
        'breezy.tests.test_transport',
 
4208
        'breezy.tests.test_transport_log',
 
4209
        'breezy.tests.test_tree',
 
4210
        'breezy.tests.test_treebuilder',
 
4211
        'breezy.tests.test_treeshape',
 
4212
        'breezy.tests.test_tsort',
 
4213
        'breezy.tests.test_tuned_gzip',
 
4214
        'breezy.tests.test_ui',
 
4215
        'breezy.tests.test_uncommit',
 
4216
        'breezy.tests.test_upgrade',
 
4217
        'breezy.tests.test_upgrade_stacked',
 
4218
        'breezy.tests.test_upstream_import',
 
4219
        'breezy.tests.test_urlutils',
 
4220
        'breezy.tests.test_utextwrap',
 
4221
        'breezy.tests.test_version',
 
4222
        'breezy.tests.test_version_info',
 
4223
        'breezy.tests.test_versionedfile',
 
4224
        'breezy.tests.test_vf_search',
 
4225
        'breezy.tests.test_views',
 
4226
        'breezy.tests.test_weave',
 
4227
        'breezy.tests.test_whitebox',
 
4228
        'breezy.tests.test_win32utils',
 
4229
        'breezy.tests.test_workingtree',
 
4230
        'breezy.tests.test_workingtree_4',
 
4231
        'breezy.tests.test_wsgi',
 
4232
        'breezy.tests.test_xml',
 
4233
        ]
 
4234
 
 
4235
 
 
4236
def _test_suite_modules_to_doctest():
 
4237
    """Return the list of modules to doctest."""
 
4238
    if __doc__ is None:
 
4239
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
4240
        return []
 
4241
    return [
 
4242
        'breezy',
 
4243
        'breezy.branchbuilder',
 
4244
        'breezy.bzr.inventory',
 
4245
        'breezy.decorators',
 
4246
        'breezy.iterablefile',
 
4247
        'breezy.lockdir',
 
4248
        'breezy.merge3',
 
4249
        'breezy.option',
 
4250
        'breezy.pyutils',
 
4251
        'breezy.symbol_versioning',
 
4252
        'breezy.tests',
 
4253
        'breezy.tests.fixtures',
 
4254
        'breezy.timestamp',
 
4255
        'breezy.transport.http',
 
4256
        'breezy.version_info_formats.format_custom',
 
4257
        ]
 
4258
 
 
4259
 
 
4260
def test_suite(keep_only=None, starting_with=None):
 
4261
    """Build and return TestSuite for the whole of breezy.
 
4262
 
 
4263
    :param keep_only: A list of test ids limiting the suite returned.
 
4264
 
 
4265
    :param starting_with: An id limiting the suite returned to the tests
 
4266
         starting with it.
 
4267
 
 
4268
    This function can be replaced if you need to change the default test
 
4269
    suite on a global basis, but it is not encouraged.
 
4270
    """
 
4271
 
 
4272
    loader = TestUtil.TestLoader()
 
4273
 
 
4274
    if keep_only is not None:
 
4275
        id_filter = TestIdList(keep_only)
 
4276
    if starting_with:
 
4277
        # We take precedence over keep_only because *at loading time* using
 
4278
        # both options means we will load less tests for the same final result.
 
4279
        def interesting_module(name):
 
4280
            for start in starting_with:
 
4281
                # Either the module name starts with the specified string
 
4282
                # or it may contain tests starting with the specified string
 
4283
                if name.startswith(start) or start.startswith(name):
 
4284
                    return True
 
4285
            return False
 
4286
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
4287
 
 
4288
    elif keep_only is not None:
 
4289
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
4290
 
 
4291
        def interesting_module(name):
 
4292
            return id_filter.refers_to(name)
 
4293
 
 
4294
    else:
 
4295
        loader = TestUtil.TestLoader()
 
4296
 
 
4297
        def interesting_module(name):
 
4298
            # No filtering, all modules are interesting
 
4299
            return True
 
4300
 
 
4301
    suite = loader.suiteClass()
 
4302
 
 
4303
    # modules building their suite with loadTestsFromModuleNames
 
4304
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
4305
 
 
4306
    if not PY3:
 
4307
        suite.addTest(loader.loadTestsFromModuleNames(['breezy.doc']))
 
4308
 
 
4309
        # It's pretty much impossible to write readable doctests that work on
 
4310
        # both Python 2 and Python 3 because of their overreliance on
 
4311
        # consistent repr() return values.
 
4312
        # For now, just run doctests on Python 2 so we now they haven't broken.
 
4313
        for mod in _test_suite_modules_to_doctest():
 
4314
            if not interesting_module(mod):
 
4315
                # No tests to keep here, move along
 
4316
                continue
 
4317
            try:
 
4318
                # note that this really does mean "report only" -- doctest
 
4319
                # still runs the rest of the examples
 
4320
                doc_suite = IsolatedDocTestSuite(
 
4321
                    mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
4322
            except ValueError as e:
 
4323
                print('**failed to get doctest for: %s\n%s' % (mod, e))
 
4324
                raise
 
4325
            if len(doc_suite._tests) == 0:
 
4326
                raise errors.BzrError("no doctests found in %s" % (mod,))
 
4327
            suite.addTest(doc_suite)
 
4328
 
 
4329
    default_encoding = sys.getdefaultencoding()
 
4330
    for name, plugin in _mod_plugin.plugins().items():
 
4331
        if not interesting_module(plugin.module.__name__):
 
4332
            continue
 
4333
        plugin_suite = plugin.test_suite()
 
4334
        # We used to catch ImportError here and turn it into just a warning,
 
4335
        # but really if you don't have --no-plugins this should be a failure.
 
4336
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
4337
        if plugin_suite is None:
 
4338
            plugin_suite = plugin.load_plugin_tests(loader)
 
4339
        if plugin_suite is not None:
 
4340
            suite.addTest(plugin_suite)
 
4341
        if default_encoding != sys.getdefaultencoding():
 
4342
            trace.warning(
 
4343
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
4344
                sys.getdefaultencoding())
 
4345
            reload(sys)
 
4346
            sys.setdefaultencoding(default_encoding)
 
4347
 
 
4348
    if keep_only is not None:
 
4349
        # Now that the referred modules have loaded their tests, keep only the
 
4350
        # requested ones.
 
4351
        suite = filter_suite_by_id_list(suite, id_filter)
 
4352
        # Do some sanity checks on the id_list filtering
 
4353
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
4354
        if starting_with:
 
4355
            # The tester has used both keep_only and starting_with, so he is
 
4356
            # already aware that some tests are excluded from the list, there
 
4357
            # is no need to tell him which.
 
4358
            pass
 
4359
        else:
 
4360
            # Some tests mentioned in the list are not in the test suite. The
 
4361
            # list may be out of date, report to the tester.
 
4362
            for id in not_found:
 
4363
                trace.warning('"%s" not found in the test suite', id)
 
4364
        for id in duplicates:
 
4365
            trace.warning('"%s" is used as an id by several tests', id)
 
4366
 
 
4367
    return suite
 
4368
 
 
4369
 
 
4370
def multiply_scenarios(*scenarios):
 
4371
    """Multiply two or more iterables of scenarios.
 
4372
 
 
4373
    It is safe to pass scenario generators or iterators.
 
4374
 
 
4375
    :returns: A list of compound scenarios: the cross-product of all
 
4376
        scenarios, with the names concatenated and the parameters
 
4377
        merged together.
 
4378
    """
 
4379
    return functools.reduce(_multiply_two_scenarios, map(list, scenarios))
 
4380
 
 
4381
 
 
4382
def _multiply_two_scenarios(scenarios_left, scenarios_right):
 
4383
    """Multiply two sets of scenarios.
 
4384
 
 
4385
    :returns: the cartesian product of the two sets of scenarios, that is
 
4386
        a scenario for every possible combination of a left scenario and a
 
4387
        right scenario.
 
4388
    """
 
4389
    return [
 
4390
        ('%s,%s' % (left_name, right_name),
 
4391
         dict(left_dict, **right_dict))
 
4392
        for left_name, left_dict in scenarios_left
 
4393
        for right_name, right_dict in scenarios_right]
 
4394
 
 
4395
 
 
4396
def multiply_tests(tests, scenarios, result):
 
4397
    """Multiply tests_list by scenarios into result.
 
4398
 
 
4399
    This is the core workhorse for test parameterisation.
 
4400
 
 
4401
    Typically the load_tests() method for a per-implementation test suite will
 
4402
    call multiply_tests and return the result.
 
4403
 
 
4404
    :param tests: The tests to parameterise.
 
4405
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4406
        scenario_param_dict).
 
4407
    :param result: A TestSuite to add created tests to.
 
4408
 
 
4409
    This returns the passed in result TestSuite with the cross product of all
 
4410
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4411
    the scenario name at the end of its id(), and updating the test object's
 
4412
    __dict__ with the scenario_param_dict.
 
4413
 
 
4414
    >>> import breezy.tests.test_sampler
 
4415
    >>> r = multiply_tests(
 
4416
    ...     breezy.tests.test_sampler.DemoTest('test_nothing'),
 
4417
    ...     [('one', dict(param=1)),
 
4418
    ...      ('two', dict(param=2))],
 
4419
    ...     TestUtil.TestSuite())
 
4420
    >>> tests = list(iter_suite_tests(r))
 
4421
    >>> len(tests)
 
4422
    2
 
4423
    >>> tests[0].id()
 
4424
    'breezy.tests.test_sampler.DemoTest.test_nothing(one)'
 
4425
    >>> tests[0].param
 
4426
    1
 
4427
    >>> tests[1].param
 
4428
    2
 
4429
    """
 
4430
    for test in iter_suite_tests(tests):
 
4431
        apply_scenarios(test, scenarios, result)
 
4432
    return result
 
4433
 
 
4434
 
 
4435
def apply_scenarios(test, scenarios, result):
 
4436
    """Apply the scenarios in scenarios to test and add to result.
 
4437
 
 
4438
    :param test: The test to apply scenarios to.
 
4439
    :param scenarios: An iterable of scenarios to apply to test.
 
4440
    :return: result
 
4441
    :seealso: apply_scenario
 
4442
    """
 
4443
    for scenario in scenarios:
 
4444
        result.addTest(apply_scenario(test, scenario))
 
4445
    return result
 
4446
 
 
4447
 
 
4448
def apply_scenario(test, scenario):
 
4449
    """Copy test and apply scenario to it.
 
4450
 
 
4451
    :param test: A test to adapt.
 
4452
    :param scenario: A tuple describing the scenario.
 
4453
        The first element of the tuple is the new test id.
 
4454
        The second element is a dict containing attributes to set on the
 
4455
        test.
 
4456
    :return: The adapted test.
 
4457
    """
 
4458
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4459
    new_test = clone_test(test, new_id)
 
4460
    for name, value in scenario[1].items():
 
4461
        setattr(new_test, name, value)
 
4462
    return new_test
 
4463
 
 
4464
 
 
4465
def clone_test(test, new_id):
 
4466
    """Clone a test giving it a new id.
 
4467
 
 
4468
    :param test: The test to clone.
 
4469
    :param new_id: The id to assign to it.
 
4470
    :return: The new test.
 
4471
    """
 
4472
    new_test = copy.copy(test)
 
4473
    new_test.id = lambda: new_id
 
4474
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4475
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4476
    # read the test output for parameterized tests, because tracebacks will be
 
4477
    # associated with irrelevant tests.
 
4478
    try:
 
4479
        details = new_test._TestCase__details
 
4480
    except AttributeError:
 
4481
        # must be a different version of testtools than expected.  Do nothing.
 
4482
        pass
 
4483
    else:
 
4484
        # Reset the '__details' dict.
 
4485
        new_test._TestCase__details = {}
 
4486
    return new_test
 
4487
 
 
4488
 
 
4489
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4490
                                ext_module_name):
 
4491
    """Helper for permutating tests against an extension module.
 
4492
 
 
4493
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4494
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4495
    against both implementations. Setting 'test.module' to the appropriate
 
4496
    module. See breezy.tests.test__chk_map.load_tests as an example.
 
4497
 
 
4498
    :param standard_tests: A test suite to permute
 
4499
    :param loader: A TestLoader
 
4500
    :param py_module_name: The python path to a python module that can always
 
4501
        be loaded, and will be considered the 'python' implementation. (eg
 
4502
        'breezy._chk_map_py')
 
4503
    :param ext_module_name: The python path to an extension module. If the
 
4504
        module cannot be loaded, a single test will be added, which notes that
 
4505
        the module is not available. If it can be loaded, all standard_tests
 
4506
        will be run against that module.
 
4507
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4508
        tests. feature is the Feature object that can be used to determine if
 
4509
        the module is available.
 
4510
    """
 
4511
 
 
4512
    from .features import ModuleAvailableFeature
 
4513
    py_module = pyutils.get_named_object(py_module_name)
 
4514
    scenarios = [
 
4515
        ('python', {'module': py_module}),
 
4516
    ]
 
4517
    suite = loader.suiteClass()
 
4518
    feature = ModuleAvailableFeature(ext_module_name)
 
4519
    if feature.available():
 
4520
        scenarios.append(('C', {'module': feature.module}))
 
4521
    else:
 
4522
        # the compiled module isn't available, so we add a failing test
 
4523
        class FailWithoutFeature(TestCase):
 
4524
            def test_fail(self):
 
4525
                self.requireFeature(feature)
 
4526
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4527
    result = multiply_tests(standard_tests, scenarios, suite)
 
4528
    return result, feature
 
4529
 
 
4530
 
 
4531
def _rmtree_temp_dir(dirname, test_id=None):
 
4532
    # If LANG=C we probably have created some bogus paths
 
4533
    # which rmtree(unicode) will fail to delete
 
4534
    # so make sure we are using rmtree(str) to delete everything
 
4535
    # except on win32, where rmtree(str) will fail
 
4536
    # since it doesn't have the property of byte-stream paths
 
4537
    # (they are either ascii or mbcs)
 
4538
    if sys.platform == 'win32' and isinstance(dirname, bytes):
 
4539
        # make sure we are using the unicode win32 api
 
4540
        dirname = dirname.decode('mbcs')
 
4541
    else:
 
4542
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4543
    try:
 
4544
        osutils.rmtree(dirname)
 
4545
    except OSError as e:
 
4546
        # We don't want to fail here because some useful display will be lost
 
4547
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4548
        # possible info to the test runner is even worse.
 
4549
        if test_id is not None:
 
4550
            ui.ui_factory.clear_term()
 
4551
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4552
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4553
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4554
                                    ).encode('ascii', 'replace')
 
4555
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4556
                         % (os.path.basename(dirname), printable_e))
 
4557
 
 
4558
 
 
4559
def probe_unicode_in_user_encoding():
 
4560
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4561
    Return first successfull match.
 
4562
 
 
4563
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4564
    """
 
4565
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4566
    for uni_val in possible_vals:
 
4567
        try:
 
4568
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4569
        except UnicodeEncodeError:
 
4570
            # Try a different character
 
4571
            pass
 
4572
        else:
 
4573
            return uni_val, str_val
 
4574
    return None, None
 
4575
 
 
4576
 
 
4577
def probe_bad_non_ascii(encoding):
 
4578
    """Try to find [bad] character with code [128..255]
 
4579
    that cannot be decoded to unicode in some encoding.
 
4580
    Return None if all non-ascii characters is valid
 
4581
    for given encoding.
 
4582
    """
 
4583
    for i in range(128, 256):
 
4584
        char = int2byte(i)
 
4585
        try:
 
4586
            char.decode(encoding)
 
4587
        except UnicodeDecodeError:
 
4588
            return char
 
4589
    return None
 
4590
 
 
4591
 
 
4592
# Only define SubUnitBzrRunner if subunit is available.
 
4593
try:
 
4594
    from subunit import TestProtocolClient
 
4595
    from subunit.test_results import AutoTimingTestResultDecorator
 
4596
 
 
4597
    class SubUnitBzrProtocolClientv1(TestProtocolClient):
 
4598
 
 
4599
        def stopTest(self, test):
 
4600
            super(SubUnitBzrProtocolClientv1, self).stopTest(test)
 
4601
            _clear__type_equality_funcs(test)
 
4602
 
 
4603
        def addSuccess(self, test, details=None):
 
4604
            # The subunit client always includes the details in the subunit
 
4605
            # stream, but we don't want to include it in ours.
 
4606
            if details is not None and 'log' in details:
 
4607
                del details['log']
 
4608
            return super(SubUnitBzrProtocolClientv1, self).addSuccess(
 
4609
                test, details)
 
4610
 
 
4611
    class SubUnitBzrRunnerv1(TextTestRunner):
 
4612
 
 
4613
        def run(self, test):
 
4614
            result = AutoTimingTestResultDecorator(
 
4615
                SubUnitBzrProtocolClientv1(self.stream))
 
4616
            test.run(result)
 
4617
            return result
 
4618
except ImportError:
 
4619
    pass
 
4620
 
 
4621
 
 
4622
try:
 
4623
    from subunit.run import SubunitTestRunner
 
4624
 
 
4625
    class SubUnitBzrRunnerv2(TextTestRunner, SubunitTestRunner):
 
4626
 
 
4627
        def __init__(self, stream=sys.stderr, descriptions=0, verbosity=1,
 
4628
                     bench_history=None, strict=False, result_decorators=None):
 
4629
            TextTestRunner.__init__(
 
4630
                self, stream=stream,
 
4631
                descriptions=descriptions, verbosity=verbosity,
 
4632
                bench_history=bench_history, strict=strict,
 
4633
                result_decorators=result_decorators)
 
4634
            SubunitTestRunner.__init__(self, verbosity=verbosity,
 
4635
                                       stream=stream)
 
4636
 
 
4637
        run = SubunitTestRunner.run
 
4638
except ImportError:
 
4639
    pass