/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2018-02-18 19:34:37 UTC
  • mfrom: (6857.1.1 fix-import-stacked)
  • Revision ID: jelmer@jelmer.uk-20180218193437-pytr0vyldq867owo
Merge lp:~jelmer/brz/fix-import-stacked.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2013, 2015, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Testing framework extensions"""
 
18
 
 
19
from __future__ import absolute_import
 
20
 
 
21
# NOTE: Some classes in here use camelCaseNaming() rather than
 
22
# underscore_naming().  That's for consistency with unittest; it's not the
 
23
# general style of breezy.  Please continue that consistency when adding e.g.
 
24
# new assertFoo() methods.
 
25
 
 
26
import atexit
 
27
import codecs
 
28
import copy
 
29
import difflib
 
30
import doctest
 
31
import errno
 
32
import functools
 
33
import itertools
 
34
import logging
 
35
import math
 
36
import os
 
37
import platform
 
38
import pprint
 
39
import random
 
40
import re
 
41
import shlex
 
42
import site
 
43
import stat
 
44
import subprocess
 
45
import sys
 
46
import tempfile
 
47
import threading
 
48
import time
 
49
import traceback
 
50
import unittest
 
51
import warnings
 
52
 
 
53
import testtools
 
54
# nb: check this before importing anything else from within it
 
55
_testtools_version = getattr(testtools, '__version__', ())
 
56
if _testtools_version < (0, 9, 5):
 
57
    raise ImportError("need at least testtools 0.9.5: %s is %r"
 
58
        % (testtools.__file__, _testtools_version))
 
59
from testtools import content
 
60
 
 
61
import breezy
 
62
from .. import (
 
63
    branchbuilder,
 
64
    controldir,
 
65
    commands as _mod_commands,
 
66
    config,
 
67
    i18n,
 
68
    debug,
 
69
    errors,
 
70
    hooks,
 
71
    lock as _mod_lock,
 
72
    lockdir,
 
73
    memorytree,
 
74
    osutils,
 
75
    plugin as _mod_plugin,
 
76
    pyutils,
 
77
    ui,
 
78
    urlutils,
 
79
    registry,
 
80
    symbol_versioning,
 
81
    trace,
 
82
    transport as _mod_transport,
 
83
    workingtree,
 
84
    )
 
85
from breezy.bzr import (
 
86
    chk_map,
 
87
    )
 
88
try:
 
89
    import breezy.lsprof
 
90
except ImportError:
 
91
    # lsprof not available
 
92
    pass
 
93
from ..sixish import (
 
94
    BytesIO,
 
95
    PY3,
 
96
    string_types,
 
97
    text_type,
 
98
    )
 
99
from ..bzr.smart import client, request
 
100
from ..transport import (
 
101
    memory,
 
102
    pathfilter,
 
103
    )
 
104
from ..tests import (
 
105
    fixtures,
 
106
    test_server,
 
107
    TestUtil,
 
108
    treeshape,
 
109
    ui_testing,
 
110
    )
 
111
from ..tests.features import _CompatabilityThunkFeature
 
112
 
 
113
# Mark this python module as being part of the implementation
 
114
# of unittest: this gives us better tracebacks where the last
 
115
# shown frame is the test code, not our assertXYZ.
 
116
__unittest = 1
 
117
 
 
118
default_transport = test_server.LocalURLServer
 
119
 
 
120
 
 
121
_unitialized_attr = object()
 
122
"""A sentinel needed to act as a default value in a method signature."""
 
123
 
 
124
 
 
125
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
126
SUBUNIT_SEEK_SET = 0
 
127
SUBUNIT_SEEK_CUR = 1
 
128
 
 
129
# These are intentionally brought into this namespace. That way plugins, etc
 
130
# can just "from breezy.tests import TestCase, TestLoader, etc"
 
131
TestSuite = TestUtil.TestSuite
 
132
TestLoader = TestUtil.TestLoader
 
133
 
 
134
# Tests should run in a clean and clearly defined environment. The goal is to
 
135
# keep them isolated from the running environment as mush as possible. The test
 
136
# framework ensures the variables defined below are set (or deleted if the
 
137
# value is None) before a test is run and reset to their original value after
 
138
# the test is run. Generally if some code depends on an environment variable,
 
139
# the tests should start without this variable in the environment. There are a
 
140
# few exceptions but you shouldn't violate this rule lightly.
 
141
isolated_environ = {
 
142
    'BRZ_HOME': None,
 
143
    'HOME': None,
 
144
    'GNUPGHOME': None,
 
145
    'XDG_CONFIG_HOME': None,
 
146
    # brz now uses the Win32 API and doesn't rely on APPDATA, but the
 
147
    # tests do check our impls match APPDATA
 
148
    'BRZ_EDITOR': None, # test_msgeditor manipulates this variable
 
149
    'VISUAL': None,
 
150
    'EDITOR': None,
 
151
    'BRZ_EMAIL': None,
 
152
    'BZREMAIL': None, # may still be present in the environment
 
153
    'EMAIL': 'jrandom@example.com', # set EMAIL as brz does not guess
 
154
    'BRZ_PROGRESS_BAR': None,
 
155
    # This should trap leaks to ~/.brz.log. This occurs when tests use TestCase
 
156
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
157
    # TestCase should not use disk resources, BRZ_LOG is one.
 
158
    'BRZ_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
159
    'BRZ_PLUGIN_PATH': '-site',
 
160
    'BRZ_DISABLE_PLUGINS': None,
 
161
    'BRZ_PLUGINS_AT': None,
 
162
    'BRZ_CONCURRENCY': None,
 
163
    # Make sure that any text ui tests are consistent regardless of
 
164
    # the environment the test case is run in; you may want tests that
 
165
    # test other combinations.  'dumb' is a reasonable guess for tests
 
166
    # going to a pipe or a BytesIO.
 
167
    'TERM': 'dumb',
 
168
    'LINES': '25',
 
169
    'COLUMNS': '80',
 
170
    'BRZ_COLUMNS': '80',
 
171
    # Disable SSH Agent
 
172
    'SSH_AUTH_SOCK': None,
 
173
    # Proxies
 
174
    'http_proxy': None,
 
175
    'HTTP_PROXY': None,
 
176
    'https_proxy': None,
 
177
    'HTTPS_PROXY': None,
 
178
    'no_proxy': None,
 
179
    'NO_PROXY': None,
 
180
    'all_proxy': None,
 
181
    'ALL_PROXY': None,
 
182
    # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
183
    # least. If you do (care), please update this comment
 
184
    # -- vila 20080401
 
185
    'ftp_proxy': None,
 
186
    'FTP_PROXY': None,
 
187
    'BZR_REMOTE_PATH': None,
 
188
    # Generally speaking, we don't want apport reporting on crashes in
 
189
    # the test envirnoment unless we're specifically testing apport,
 
190
    # so that it doesn't leak into the real system environment.  We
 
191
    # use an env var so it propagates to subprocesses.
 
192
    'APPORT_DISABLE': '1',
 
193
    }
 
194
 
 
195
 
 
196
def override_os_environ(test, env=None):
 
197
    """Modify os.environ keeping a copy.
 
198
 
 
199
    :param test: A test instance
 
200
 
 
201
    :param env: A dict containing variable definitions to be installed
 
202
    """
 
203
    if env is None:
 
204
        env = isolated_environ
 
205
    test._original_os_environ = dict(**os.environ)
 
206
    for var in env:
 
207
        osutils.set_or_unset_env(var, env[var])
 
208
        if var not in test._original_os_environ:
 
209
            # The var is new, add it with a value of None, so
 
210
            # restore_os_environ will delete it
 
211
            test._original_os_environ[var] = None
 
212
 
 
213
 
 
214
def restore_os_environ(test):
 
215
    """Restore os.environ to its original state.
 
216
 
 
217
    :param test: A test instance previously passed to override_os_environ.
 
218
    """
 
219
    for var, value in test._original_os_environ.items():
 
220
        # Restore the original value (or delete it if the value has been set to
 
221
        # None in override_os_environ).
 
222
        osutils.set_or_unset_env(var, value)
 
223
 
 
224
 
 
225
def _clear__type_equality_funcs(test):
 
226
    """Cleanup bound methods stored on TestCase instances
 
227
 
 
228
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
229
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
230
 
 
231
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
232
    shipped in Oneiric, an object with no clear method was used, hence the
 
233
    extra complications, see bug 809048 for details.
 
234
    """
 
235
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
236
    if type_equality_funcs is not None:
 
237
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
238
        if tef_clear is None:
 
239
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
240
            if tef_instance_dict is not None:
 
241
                tef_clear = tef_instance_dict.clear
 
242
        if tef_clear is not None:
 
243
            tef_clear()
 
244
 
 
245
 
 
246
class ExtendedTestResult(testtools.TextTestResult):
 
247
    """Accepts, reports and accumulates the results of running tests.
 
248
 
 
249
    Compared to the unittest version this class adds support for
 
250
    profiling, benchmarking, stopping as soon as a test fails,  and
 
251
    skipping tests.  There are further-specialized subclasses for
 
252
    different types of display.
 
253
 
 
254
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
255
    addFailure or addError methods.  These in turn may redirect to a more
 
256
    specific case for the special test results supported by our extended
 
257
    tests.
 
258
 
 
259
    Note that just one of these objects is fed the results from many tests.
 
260
    """
 
261
 
 
262
    stop_early = False
 
263
 
 
264
    def __init__(self, stream, descriptions, verbosity,
 
265
                 bench_history=None,
 
266
                 strict=False,
 
267
                 ):
 
268
        """Construct new TestResult.
 
269
 
 
270
        :param bench_history: Optionally, a writable file object to accumulate
 
271
            benchmark results.
 
272
        """
 
273
        testtools.TextTestResult.__init__(self, stream)
 
274
        if bench_history is not None:
 
275
            from breezy.version import _get_bzr_source_tree
 
276
            src_tree = _get_bzr_source_tree()
 
277
            if src_tree:
 
278
                try:
 
279
                    revision_id = src_tree.get_parent_ids()[0]
 
280
                except IndexError:
 
281
                    # XXX: if this is a brand new tree, do the same as if there
 
282
                    # is no branch.
 
283
                    revision_id = ''
 
284
            else:
 
285
                # XXX: If there's no branch, what should we do?
 
286
                revision_id = ''
 
287
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
288
        self._bench_history = bench_history
 
289
        self.ui = ui.ui_factory
 
290
        self.num_tests = 0
 
291
        self.error_count = 0
 
292
        self.failure_count = 0
 
293
        self.known_failure_count = 0
 
294
        self.skip_count = 0
 
295
        self.not_applicable_count = 0
 
296
        self.unsupported = {}
 
297
        self.count = 0
 
298
        self._overall_start_time = time.time()
 
299
        self._strict = strict
 
300
        self._first_thread_leaker_id = None
 
301
        self._tests_leaking_threads_count = 0
 
302
        self._traceback_from_test = None
 
303
 
 
304
    def stopTestRun(self):
 
305
        run = self.testsRun
 
306
        actionTaken = "Ran"
 
307
        stopTime = time.time()
 
308
        timeTaken = stopTime - self.startTime
 
309
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
310
        #                the parent class method is similar have to duplicate
 
311
        self._show_list('ERROR', self.errors)
 
312
        self._show_list('FAIL', self.failures)
 
313
        self.stream.write(self.sep2)
 
314
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
315
                            run, run != 1 and "s" or "", timeTaken))
 
316
        if not self.wasSuccessful():
 
317
            self.stream.write("FAILED (")
 
318
            failed, errored = map(len, (self.failures, self.errors))
 
319
            if failed:
 
320
                self.stream.write("failures=%d" % failed)
 
321
            if errored:
 
322
                if failed: self.stream.write(", ")
 
323
                self.stream.write("errors=%d" % errored)
 
324
            if self.known_failure_count:
 
325
                if failed or errored: self.stream.write(", ")
 
326
                self.stream.write("known_failure_count=%d" %
 
327
                    self.known_failure_count)
 
328
            self.stream.write(")\n")
 
329
        else:
 
330
            if self.known_failure_count:
 
331
                self.stream.write("OK (known_failures=%d)\n" %
 
332
                    self.known_failure_count)
 
333
            else:
 
334
                self.stream.write("OK\n")
 
335
        if self.skip_count > 0:
 
336
            skipped = self.skip_count
 
337
            self.stream.write('%d test%s skipped\n' %
 
338
                                (skipped, skipped != 1 and "s" or ""))
 
339
        if self.unsupported:
 
340
            for feature, count in sorted(self.unsupported.items()):
 
341
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
342
                    (feature, count))
 
343
        if self._strict:
 
344
            ok = self.wasStrictlySuccessful()
 
345
        else:
 
346
            ok = self.wasSuccessful()
 
347
        if self._first_thread_leaker_id:
 
348
            self.stream.write(
 
349
                '%s is leaking threads among %d leaking tests.\n' % (
 
350
                self._first_thread_leaker_id,
 
351
                self._tests_leaking_threads_count))
 
352
            # We don't report the main thread as an active one.
 
353
            self.stream.write(
 
354
                '%d non-main threads were left active in the end.\n'
 
355
                % (len(self._active_threads) - 1))
 
356
 
 
357
    def getDescription(self, test):
 
358
        return test.id()
 
359
 
 
360
    def _extractBenchmarkTime(self, testCase, details=None):
 
361
        """Add a benchmark time for the current test case."""
 
362
        if details and 'benchtime' in details:
 
363
            return float(''.join(details['benchtime'].iter_bytes()))
 
364
        return getattr(testCase, "_benchtime", None)
 
365
 
 
366
    def _delta_to_float(self, a_timedelta, precision):
 
367
        # This calls ceiling to ensure that the most pessimistic view of time
 
368
        # taken is shown (rather than leaving it to the Python %f operator
 
369
        # to decide whether to round/floor/ceiling. This was added when we
 
370
        # had pyp3 test failures that suggest a floor was happening.
 
371
        shift = 10 ** precision
 
372
        return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
 
373
            a_timedelta.microseconds / 1000000.0) * shift) / shift
 
374
 
 
375
    def _elapsedTestTimeString(self):
 
376
        """Return a time string for the overall time the current test has taken."""
 
377
        return self._formatTime(self._delta_to_float(
 
378
            self._now() - self._start_datetime, 3))
 
379
 
 
380
    def _testTimeString(self, testCase):
 
381
        benchmark_time = self._extractBenchmarkTime(testCase)
 
382
        if benchmark_time is not None:
 
383
            return self._formatTime(benchmark_time) + "*"
 
384
        else:
 
385
            return self._elapsedTestTimeString()
 
386
 
 
387
    def _formatTime(self, seconds):
 
388
        """Format seconds as milliseconds with leading spaces."""
 
389
        # some benchmarks can take thousands of seconds to run, so we need 8
 
390
        # places
 
391
        return "%8dms" % (1000 * seconds)
 
392
 
 
393
    def _shortened_test_description(self, test):
 
394
        what = test.id()
 
395
        what = re.sub(r'^breezy\.tests\.', '', what)
 
396
        return what
 
397
 
 
398
    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
 
399
    #                multiple times in a row, because the handler is added for
 
400
    #                each test but the container list is shared between cases.
 
401
    #                See lp:498869 lp:625574 and lp:637725 for background.
 
402
    def _record_traceback_from_test(self, exc_info):
 
403
        """Store the traceback from passed exc_info tuple till"""
 
404
        self._traceback_from_test = exc_info[2]
 
405
 
 
406
    def startTest(self, test):
 
407
        super(ExtendedTestResult, self).startTest(test)
 
408
        if self.count == 0:
 
409
            self.startTests()
 
410
        self.count += 1
 
411
        self.report_test_start(test)
 
412
        test.number = self.count
 
413
        self._recordTestStartTime()
 
414
        # Make testtools cases give us the real traceback on failure
 
415
        addOnException = getattr(test, "addOnException", None)
 
416
        if addOnException is not None:
 
417
            addOnException(self._record_traceback_from_test)
 
418
        # Only check for thread leaks on breezy derived test cases
 
419
        if isinstance(test, TestCase):
 
420
            test.addCleanup(self._check_leaked_threads, test)
 
421
 
 
422
    def stopTest(self, test):
 
423
        super(ExtendedTestResult, self).stopTest(test)
 
424
        # Manually break cycles, means touching various private things but hey
 
425
        getDetails = getattr(test, "getDetails", None)
 
426
        if getDetails is not None:
 
427
            getDetails().clear()
 
428
        _clear__type_equality_funcs(test)
 
429
        self._traceback_from_test = None
 
430
 
 
431
    def startTests(self):
 
432
        self.report_tests_starting()
 
433
        self._active_threads = threading.enumerate()
 
434
 
 
435
    def _check_leaked_threads(self, test):
 
436
        """See if any threads have leaked since last call
 
437
 
 
438
        A sample of live threads is stored in the _active_threads attribute,
 
439
        when this method runs it compares the current live threads and any not
 
440
        in the previous sample are treated as having leaked.
 
441
        """
 
442
        now_active_threads = set(threading.enumerate())
 
443
        threads_leaked = now_active_threads.difference(self._active_threads)
 
444
        if threads_leaked:
 
445
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
446
            self._tests_leaking_threads_count += 1
 
447
            if self._first_thread_leaker_id is None:
 
448
                self._first_thread_leaker_id = test.id()
 
449
            self._active_threads = now_active_threads
 
450
 
 
451
    def _recordTestStartTime(self):
 
452
        """Record that a test has started."""
 
453
        self._start_datetime = self._now()
 
454
 
 
455
    def addError(self, test, err):
 
456
        """Tell result that test finished with an error.
 
457
 
 
458
        Called from the TestCase run() method when the test
 
459
        fails with an unexpected error.
 
460
        """
 
461
        self._post_mortem(self._traceback_from_test)
 
462
        super(ExtendedTestResult, self).addError(test, err)
 
463
        self.error_count += 1
 
464
        self.report_error(test, err)
 
465
        if self.stop_early:
 
466
            self.stop()
 
467
 
 
468
    def addFailure(self, test, err):
 
469
        """Tell result that test failed.
 
470
 
 
471
        Called from the TestCase run() method when the test
 
472
        fails because e.g. an assert() method failed.
 
473
        """
 
474
        self._post_mortem(self._traceback_from_test)
 
475
        super(ExtendedTestResult, self).addFailure(test, err)
 
476
        self.failure_count += 1
 
477
        self.report_failure(test, err)
 
478
        if self.stop_early:
 
479
            self.stop()
 
480
 
 
481
    def addSuccess(self, test, details=None):
 
482
        """Tell result that test completed successfully.
 
483
 
 
484
        Called from the TestCase run()
 
485
        """
 
486
        if self._bench_history is not None:
 
487
            benchmark_time = self._extractBenchmarkTime(test, details)
 
488
            if benchmark_time is not None:
 
489
                self._bench_history.write("%s %s\n" % (
 
490
                    self._formatTime(benchmark_time),
 
491
                    test.id()))
 
492
        self.report_success(test)
 
493
        super(ExtendedTestResult, self).addSuccess(test)
 
494
        test._log_contents = ''
 
495
 
 
496
    def addExpectedFailure(self, test, err):
 
497
        self.known_failure_count += 1
 
498
        self.report_known_failure(test, err)
 
499
 
 
500
    def addUnexpectedSuccess(self, test, details=None):
 
501
        """Tell result the test unexpectedly passed, counting as a failure
 
502
 
 
503
        When the minimum version of testtools required becomes 0.9.8 this
 
504
        can be updated to use the new handling there.
 
505
        """
 
506
        super(ExtendedTestResult, self).addFailure(test, details=details)
 
507
        self.failure_count += 1
 
508
        self.report_unexpected_success(test,
 
509
            "".join(details["reason"].iter_text()))
 
510
        if self.stop_early:
 
511
            self.stop()
 
512
 
 
513
    def addNotSupported(self, test, feature):
 
514
        """The test will not be run because of a missing feature.
 
515
        """
 
516
        # this can be called in two different ways: it may be that the
 
517
        # test started running, and then raised (through requireFeature)
 
518
        # UnavailableFeature.  Alternatively this method can be called
 
519
        # while probing for features before running the test code proper; in
 
520
        # that case we will see startTest and stopTest, but the test will
 
521
        # never actually run.
 
522
        self.unsupported.setdefault(str(feature), 0)
 
523
        self.unsupported[str(feature)] += 1
 
524
        self.report_unsupported(test, feature)
 
525
 
 
526
    def addSkip(self, test, reason):
 
527
        """A test has not run for 'reason'."""
 
528
        self.skip_count += 1
 
529
        self.report_skip(test, reason)
 
530
 
 
531
    def addNotApplicable(self, test, reason):
 
532
        self.not_applicable_count += 1
 
533
        self.report_not_applicable(test, reason)
 
534
 
 
535
    def _count_stored_tests(self):
 
536
        """Count of tests instances kept alive due to not succeeding"""
 
537
        return self.error_count + self.failure_count + self.known_failure_count
 
538
 
 
539
    def _post_mortem(self, tb=None):
 
540
        """Start a PDB post mortem session."""
 
541
        if os.environ.get('BRZ_TEST_PDB', None):
 
542
            import pdb
 
543
            pdb.post_mortem(tb)
 
544
 
 
545
    def progress(self, offset, whence):
 
546
        """The test is adjusting the count of tests to run."""
 
547
        if whence == SUBUNIT_SEEK_SET:
 
548
            self.num_tests = offset
 
549
        elif whence == SUBUNIT_SEEK_CUR:
 
550
            self.num_tests += offset
 
551
        else:
 
552
            raise errors.BzrError("Unknown whence %r" % whence)
 
553
 
 
554
    def report_tests_starting(self):
 
555
        """Display information before the test run begins"""
 
556
        if getattr(sys, 'frozen', None) is None:
 
557
            bzr_path = osutils.realpath(sys.argv[0])
 
558
        else:
 
559
            bzr_path = sys.executable
 
560
        self.stream.write(
 
561
            'brz selftest: %s\n' % (bzr_path,))
 
562
        self.stream.write(
 
563
            '   %s\n' % (
 
564
                    breezy.__path__[0],))
 
565
        self.stream.write(
 
566
            '   bzr-%s python-%s %s\n' % (
 
567
                    breezy.version_string,
 
568
                    breezy._format_version_tuple(sys.version_info),
 
569
                    platform.platform(aliased=1),
 
570
                    ))
 
571
        self.stream.write('\n')
 
572
 
 
573
    def report_test_start(self, test):
 
574
        """Display information on the test just about to be run"""
 
575
 
 
576
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
577
        """Display information on a test that leaked one or more threads"""
 
578
        # GZ 2010-09-09: A leak summary reported separately from the general
 
579
        #                thread debugging would be nice. Tests under subunit
 
580
        #                need something not using stream, perhaps adding a
 
581
        #                testtools details object would be fitting.
 
582
        if 'threads' in selftest_debug_flags:
 
583
            self.stream.write('%s is leaking, active is now %d\n' %
 
584
                (test.id(), len(active_threads)))
 
585
 
 
586
    def startTestRun(self):
 
587
        self.startTime = time.time()
 
588
 
 
589
    def report_success(self, test):
 
590
        pass
 
591
 
 
592
    def wasStrictlySuccessful(self):
 
593
        if self.unsupported or self.known_failure_count:
 
594
            return False
 
595
        return self.wasSuccessful()
 
596
 
 
597
 
 
598
class TextTestResult(ExtendedTestResult):
 
599
    """Displays progress and results of tests in text form"""
 
600
 
 
601
    def __init__(self, stream, descriptions, verbosity,
 
602
                 bench_history=None,
 
603
                 strict=None,
 
604
                 ):
 
605
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
606
            bench_history, strict)
 
607
        self.pb = self.ui.nested_progress_bar()
 
608
        self.pb.show_pct = False
 
609
        self.pb.show_spinner = False
 
610
        self.pb.show_eta = False,
 
611
        self.pb.show_count = False
 
612
        self.pb.show_bar = False
 
613
        self.pb.update_latency = 0
 
614
        self.pb.show_transport_activity = False
 
615
 
 
616
    def stopTestRun(self):
 
617
        # called when the tests that are going to run have run
 
618
        self.pb.clear()
 
619
        self.pb.finished()
 
620
        super(TextTestResult, self).stopTestRun()
 
621
 
 
622
    def report_tests_starting(self):
 
623
        super(TextTestResult, self).report_tests_starting()
 
624
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
625
 
 
626
    def _progress_prefix_text(self):
 
627
        # the longer this text, the less space we have to show the test
 
628
        # name...
 
629
        a = '[%d' % self.count              # total that have been run
 
630
        # tests skipped as known not to be relevant are not important enough
 
631
        # to show here
 
632
        ## if self.skip_count:
 
633
        ##     a += ', %d skip' % self.skip_count
 
634
        ## if self.known_failure_count:
 
635
        ##     a += '+%dX' % self.known_failure_count
 
636
        if self.num_tests:
 
637
            a +='/%d' % self.num_tests
 
638
        a += ' in '
 
639
        runtime = time.time() - self._overall_start_time
 
640
        if runtime >= 60:
 
641
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
642
        else:
 
643
            a += '%ds' % runtime
 
644
        total_fail_count = self.error_count + self.failure_count
 
645
        if total_fail_count:
 
646
            a += ', %d failed' % total_fail_count
 
647
        # if self.unsupported:
 
648
        #     a += ', %d missing' % len(self.unsupported)
 
649
        a += ']'
 
650
        return a
 
651
 
 
652
    def report_test_start(self, test):
 
653
        self.pb.update(
 
654
                self._progress_prefix_text()
 
655
                + ' '
 
656
                + self._shortened_test_description(test))
 
657
 
 
658
    def _test_description(self, test):
 
659
        return self._shortened_test_description(test)
 
660
 
 
661
    def report_error(self, test, err):
 
662
        self.stream.write('ERROR: %s\n    %s\n' % (
 
663
            self._test_description(test),
 
664
            err[1],
 
665
            ))
 
666
 
 
667
    def report_failure(self, test, err):
 
668
        self.stream.write('FAIL: %s\n    %s\n' % (
 
669
            self._test_description(test),
 
670
            err[1],
 
671
            ))
 
672
 
 
673
    def report_known_failure(self, test, err):
 
674
        pass
 
675
 
 
676
    def report_unexpected_success(self, test, reason):
 
677
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
 
678
            self._test_description(test),
 
679
            "Unexpected success. Should have failed",
 
680
            reason,
 
681
            ))
 
682
 
 
683
    def report_skip(self, test, reason):
 
684
        pass
 
685
 
 
686
    def report_not_applicable(self, test, reason):
 
687
        pass
 
688
 
 
689
    def report_unsupported(self, test, feature):
 
690
        """test cannot be run because feature is missing."""
 
691
 
 
692
 
 
693
class VerboseTestResult(ExtendedTestResult):
 
694
    """Produce long output, with one line per test run plus times"""
 
695
 
 
696
    def _ellipsize_to_right(self, a_string, final_width):
 
697
        """Truncate and pad a string, keeping the right hand side"""
 
698
        if len(a_string) > final_width:
 
699
            result = '...' + a_string[3-final_width:]
 
700
        else:
 
701
            result = a_string
 
702
        return result.ljust(final_width)
 
703
 
 
704
    def report_tests_starting(self):
 
705
        self.stream.write('running %d tests...\n' % self.num_tests)
 
706
        super(VerboseTestResult, self).report_tests_starting()
 
707
 
 
708
    def report_test_start(self, test):
 
709
        name = self._shortened_test_description(test)
 
710
        width = osutils.terminal_width()
 
711
        if width is not None:
 
712
            # width needs space for 6 char status, plus 1 for slash, plus an
 
713
            # 11-char time string, plus a trailing blank
 
714
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
715
            # space
 
716
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
717
        else:
 
718
            self.stream.write(name)
 
719
        self.stream.flush()
 
720
 
 
721
    def _error_summary(self, err):
 
722
        indent = ' ' * 4
 
723
        return '%s%s' % (indent, err[1])
 
724
 
 
725
    def report_error(self, test, err):
 
726
        self.stream.write('ERROR %s\n%s\n'
 
727
                % (self._testTimeString(test),
 
728
                   self._error_summary(err)))
 
729
 
 
730
    def report_failure(self, test, err):
 
731
        self.stream.write(' FAIL %s\n%s\n'
 
732
                % (self._testTimeString(test),
 
733
                   self._error_summary(err)))
 
734
 
 
735
    def report_known_failure(self, test, err):
 
736
        self.stream.write('XFAIL %s\n%s\n'
 
737
                % (self._testTimeString(test),
 
738
                   self._error_summary(err)))
 
739
 
 
740
    def report_unexpected_success(self, test, reason):
 
741
        self.stream.write(' FAIL %s\n%s: %s\n'
 
742
                % (self._testTimeString(test),
 
743
                   "Unexpected success. Should have failed",
 
744
                   reason))
 
745
 
 
746
    def report_success(self, test):
 
747
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
748
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
749
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
750
            stats.pprint(file=self.stream)
 
751
        # flush the stream so that we get smooth output. This verbose mode is
 
752
        # used to show the output in PQM.
 
753
        self.stream.flush()
 
754
 
 
755
    def report_skip(self, test, reason):
 
756
        self.stream.write(' SKIP %s\n%s\n'
 
757
                % (self._testTimeString(test), reason))
 
758
 
 
759
    def report_not_applicable(self, test, reason):
 
760
        self.stream.write('  N/A %s\n    %s\n'
 
761
                % (self._testTimeString(test), reason))
 
762
 
 
763
    def report_unsupported(self, test, feature):
 
764
        """test cannot be run because feature is missing."""
 
765
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
766
                %(self._testTimeString(test), feature))
 
767
 
 
768
 
 
769
class TextTestRunner(object):
 
770
    stop_on_failure = False
 
771
 
 
772
    def __init__(self,
 
773
                 stream=sys.stderr,
 
774
                 descriptions=0,
 
775
                 verbosity=1,
 
776
                 bench_history=None,
 
777
                 strict=False,
 
778
                 result_decorators=None,
 
779
                 ):
 
780
        """Create a TextTestRunner.
 
781
 
 
782
        :param result_decorators: An optional list of decorators to apply
 
783
            to the result object being used by the runner. Decorators are
 
784
            applied left to right - the first element in the list is the 
 
785
            innermost decorator.
 
786
        """
 
787
        # stream may know claim to know to write unicode strings, but in older
 
788
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
789
        # specifically a built in file with encoding 'UTF-8' will still try
 
790
        # to encode using ascii.
 
791
        new_encoding = osutils.get_terminal_encoding()
 
792
        codec = codecs.lookup(new_encoding)
 
793
        encode = codec.encode
 
794
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
 
795
        #                so should swap to the plain codecs.StreamWriter
 
796
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
 
797
            "backslashreplace")
 
798
        stream.encoding = new_encoding
 
799
        self.stream = stream
 
800
        self.descriptions = descriptions
 
801
        self.verbosity = verbosity
 
802
        self._bench_history = bench_history
 
803
        self._strict = strict
 
804
        self._result_decorators = result_decorators or []
 
805
 
 
806
    def run(self, test):
 
807
        "Run the given test case or test suite."
 
808
        if self.verbosity == 1:
 
809
            result_class = TextTestResult
 
810
        elif self.verbosity >= 2:
 
811
            result_class = VerboseTestResult
 
812
        original_result = result_class(self.stream,
 
813
                              self.descriptions,
 
814
                              self.verbosity,
 
815
                              bench_history=self._bench_history,
 
816
                              strict=self._strict,
 
817
                              )
 
818
        # Signal to result objects that look at stop early policy to stop,
 
819
        original_result.stop_early = self.stop_on_failure
 
820
        result = original_result
 
821
        for decorator in self._result_decorators:
 
822
            result = decorator(result)
 
823
            result.stop_early = self.stop_on_failure
 
824
        result.startTestRun()
 
825
        try:
 
826
            test.run(result)
 
827
        finally:
 
828
            result.stopTestRun()
 
829
        # higher level code uses our extended protocol to determine
 
830
        # what exit code to give.
 
831
        return original_result
 
832
 
 
833
 
 
834
def iter_suite_tests(suite):
 
835
    """Return all tests in a suite, recursing through nested suites"""
 
836
    if isinstance(suite, unittest.TestCase):
 
837
        yield suite
 
838
    elif isinstance(suite, unittest.TestSuite):
 
839
        for item in suite:
 
840
            for r in iter_suite_tests(item):
 
841
                yield r
 
842
    else:
 
843
        raise Exception('unknown type %r for object %r'
 
844
                        % (type(suite), suite))
 
845
 
 
846
 
 
847
TestSkipped = testtools.testcase.TestSkipped
 
848
 
 
849
 
 
850
class TestNotApplicable(TestSkipped):
 
851
    """A test is not applicable to the situation where it was run.
 
852
 
 
853
    This is only normally raised by parameterized tests, if they find that
 
854
    the instance they're constructed upon does not support one aspect
 
855
    of its interface.
 
856
    """
 
857
 
 
858
 
 
859
# traceback._some_str fails to format exceptions that have the default
 
860
# __str__ which does an implicit ascii conversion. However, repr() on those
 
861
# objects works, for all that its not quite what the doctor may have ordered.
 
862
def _clever_some_str(value):
 
863
    try:
 
864
        return str(value)
 
865
    except:
 
866
        try:
 
867
            return repr(value).replace('\\n', '\n')
 
868
        except:
 
869
            return '<unprintable %s object>' % type(value).__name__
 
870
 
 
871
traceback._some_str = _clever_some_str
 
872
 
 
873
 
 
874
# deprecated - use self.knownFailure(), or self.expectFailure.
 
875
KnownFailure = testtools.testcase._ExpectedFailure
 
876
 
 
877
 
 
878
class UnavailableFeature(Exception):
 
879
    """A feature required for this test was not available.
 
880
 
 
881
    This can be considered a specialised form of SkippedTest.
 
882
 
 
883
    The feature should be used to construct the exception.
 
884
    """
 
885
 
 
886
 
 
887
class StringIOWrapper(ui_testing.BytesIOWithEncoding):
 
888
 
 
889
    @symbol_versioning.deprecated_method(
 
890
        symbol_versioning.deprecated_in((3, 0)))
 
891
    def __init__(self, s=None):
 
892
        super(StringIOWrapper, self).__init__(s)
 
893
 
 
894
 
 
895
TestUIFactory = ui_testing.TestUIFactory
 
896
 
 
897
 
 
898
def isolated_doctest_setUp(test):
 
899
    override_os_environ(test)
 
900
    osutils.set_or_unset_env('BRZ_HOME', '/nonexistent')
 
901
    test._orig_ui_factory = ui.ui_factory
 
902
    ui.ui_factory = ui.SilentUIFactory()
 
903
 
 
904
 
 
905
def isolated_doctest_tearDown(test):
 
906
    restore_os_environ(test)
 
907
    ui.ui_factory = test._orig_ui_factory
 
908
 
 
909
 
 
910
def IsolatedDocTestSuite(*args, **kwargs):
 
911
    """Overrides doctest.DocTestSuite to handle isolation.
 
912
 
 
913
    The method is really a factory and users are expected to use it as such.
 
914
    """
 
915
 
 
916
    kwargs['setUp'] = isolated_doctest_setUp
 
917
    kwargs['tearDown'] = isolated_doctest_tearDown
 
918
    return doctest.DocTestSuite(*args, **kwargs)
 
919
 
 
920
 
 
921
class TestCase(testtools.TestCase):
 
922
    """Base class for brz unit tests.
 
923
 
 
924
    Tests that need access to disk resources should subclass
 
925
    TestCaseInTempDir not TestCase.
 
926
 
 
927
    Error and debug log messages are redirected from their usual
 
928
    location into a temporary file, the contents of which can be
 
929
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
930
    so that it can also capture file IO.  When the test completes this file
 
931
    is read into memory and removed from disk.
 
932
 
 
933
    There are also convenience functions to invoke bzr's command-line
 
934
    routine, and to build and check brz trees.
 
935
 
 
936
    In addition to the usual method of overriding tearDown(), this class also
 
937
    allows subclasses to register cleanup functions via addCleanup, which are
 
938
    run in order as the object is torn down.  It's less likely this will be
 
939
    accidentally overlooked.
 
940
    """
 
941
 
 
942
    _log_file = None
 
943
    # record lsprof data when performing benchmark calls.
 
944
    _gather_lsprof_in_benchmarks = False
 
945
 
 
946
    def __init__(self, methodName='testMethod'):
 
947
        super(TestCase, self).__init__(methodName)
 
948
        self._directory_isolation = True
 
949
        self.exception_handlers.insert(0,
 
950
            (UnavailableFeature, self._do_unsupported_or_skip))
 
951
        self.exception_handlers.insert(0,
 
952
            (TestNotApplicable, self._do_not_applicable))
 
953
 
 
954
    def setUp(self):
 
955
        super(TestCase, self).setUp()
 
956
 
 
957
        # At this point we're still accessing the config files in $BRZ_HOME (as
 
958
        # set by the user running selftest).
 
959
        timeout = config.GlobalStack().get('selftest.timeout')
 
960
        if timeout:
 
961
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
962
            timeout_fixture.setUp()
 
963
            self.addCleanup(timeout_fixture.cleanUp)
 
964
 
 
965
        for feature in getattr(self, '_test_needs_features', []):
 
966
            self.requireFeature(feature)
 
967
        self._cleanEnvironment()
 
968
 
 
969
        self.overrideAttr(breezy.get_global_state(), 'cmdline_overrides',
 
970
                          config.CommandLineStore())
 
971
 
 
972
        self._silenceUI()
 
973
        self._startLogFile()
 
974
        self._benchcalls = []
 
975
        self._benchtime = None
 
976
        self._clear_hooks()
 
977
        self._track_transports()
 
978
        self._track_locks()
 
979
        self._clear_debug_flags()
 
980
        # Isolate global verbosity level, to make sure it's reproducible
 
981
        # between tests.  We should get rid of this altogether: bug 656694. --
 
982
        # mbp 20101008
 
983
        self.overrideAttr(breezy.trace, '_verbosity_level', 0)
 
984
        self._log_files = set()
 
985
        # Each key in the ``_counters`` dict holds a value for a different
 
986
        # counter. When the test ends, addDetail() should be used to output the
 
987
        # counter values. This happens in install_counter_hook().
 
988
        self._counters = {}
 
989
        if 'config_stats' in selftest_debug_flags:
 
990
            self._install_config_stats_hooks()
 
991
        # Do not use i18n for tests (unless the test reverses this)
 
992
        i18n.disable_i18n()
 
993
 
 
994
    def debug(self):
 
995
        # debug a frame up.
 
996
        import pdb
 
997
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
998
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
999
                ).set_trace(sys._getframe().f_back)
 
1000
 
 
1001
    def discardDetail(self, name):
 
1002
        """Extend the addDetail, getDetails api so we can remove a detail.
 
1003
 
 
1004
        eg. brz always adds the 'log' detail at startup, but we don't want to
 
1005
        include it for skipped, xfail, etc tests.
 
1006
 
 
1007
        It is safe to call this for a detail that doesn't exist, in case this
 
1008
        gets called multiple times.
 
1009
        """
 
1010
        # We cheat. details is stored in __details which means we shouldn't
 
1011
        # touch it. but getDetails() returns the dict directly, so we can
 
1012
        # mutate it.
 
1013
        details = self.getDetails()
 
1014
        if name in details:
 
1015
            del details[name]
 
1016
 
 
1017
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1018
        """Install a counting hook.
 
1019
 
 
1020
        Any hook can be counted as long as it doesn't need to return a value.
 
1021
 
 
1022
        :param hooks: Where the hook should be installed.
 
1023
 
 
1024
        :param name: The hook name that will be counted.
 
1025
 
 
1026
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1027
            to ``name``.
 
1028
        """
 
1029
        _counters = self._counters # Avoid closing over self
 
1030
        if counter_name is None:
 
1031
            counter_name = name
 
1032
        if counter_name in _counters:
 
1033
            raise AssertionError('%s is already used as a counter name'
 
1034
                                  % (counter_name,))
 
1035
        _counters[counter_name] = 0
 
1036
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1037
            lambda: [b'%d' % (_counters[counter_name],)]))
 
1038
        def increment_counter(*args, **kwargs):
 
1039
            _counters[counter_name] += 1
 
1040
        label = 'count %s calls' % (counter_name,)
 
1041
        hooks.install_named_hook(name, increment_counter, label)
 
1042
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1043
 
 
1044
    def _install_config_stats_hooks(self):
 
1045
        """Install config hooks to count hook calls.
 
1046
 
 
1047
        """
 
1048
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1049
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1050
                                       'config.%s' % (hook_name,))
 
1051
 
 
1052
        # The OldConfigHooks are private and need special handling to protect
 
1053
        # against recursive tests (tests that run other tests), so we just do
 
1054
        # manually what registering them into _builtin_known_hooks will provide
 
1055
        # us.
 
1056
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1057
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1058
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1059
                                      'old_config.%s' % (hook_name,))
 
1060
 
 
1061
    def _clear_debug_flags(self):
 
1062
        """Prevent externally set debug flags affecting tests.
 
1063
 
 
1064
        Tests that want to use debug flags can just set them in the
 
1065
        debug_flags set during setup/teardown.
 
1066
        """
 
1067
        # Start with a copy of the current debug flags we can safely modify.
 
1068
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
1069
        if 'allow_debug' not in selftest_debug_flags:
 
1070
            debug.debug_flags.clear()
 
1071
        if 'disable_lock_checks' not in selftest_debug_flags:
 
1072
            debug.debug_flags.add('strict_locks')
 
1073
 
 
1074
    def _clear_hooks(self):
 
1075
        # prevent hooks affecting tests
 
1076
        known_hooks = hooks.known_hooks
 
1077
        self._preserved_hooks = {}
 
1078
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1079
            current_hooks = getattr(parent, name)
 
1080
            self._preserved_hooks[parent] = (name, current_hooks)
 
1081
        self._preserved_lazy_hooks = hooks._lazy_hooks
 
1082
        hooks._lazy_hooks = {}
 
1083
        self.addCleanup(self._restoreHooks)
 
1084
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1085
            factory = known_hooks.get(key)
 
1086
            setattr(parent, name, factory())
 
1087
        # this hook should always be installed
 
1088
        request._install_hook()
 
1089
 
 
1090
    def disable_directory_isolation(self):
 
1091
        """Turn off directory isolation checks."""
 
1092
        self._directory_isolation = False
 
1093
 
 
1094
    def enable_directory_isolation(self):
 
1095
        """Enable directory isolation checks."""
 
1096
        self._directory_isolation = True
 
1097
 
 
1098
    def _silenceUI(self):
 
1099
        """Turn off UI for duration of test"""
 
1100
        # by default the UI is off; tests can turn it on if they want it.
 
1101
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
1102
 
 
1103
    def _check_locks(self):
 
1104
        """Check that all lock take/release actions have been paired."""
 
1105
        # We always check for mismatched locks. If a mismatch is found, we
 
1106
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
1107
        # case we just print a warning.
 
1108
        # unhook:
 
1109
        acquired_locks = [lock for action, lock in self._lock_actions
 
1110
                          if action == 'acquired']
 
1111
        released_locks = [lock for action, lock in self._lock_actions
 
1112
                          if action == 'released']
 
1113
        broken_locks = [lock for action, lock in self._lock_actions
 
1114
                        if action == 'broken']
 
1115
        # trivially, given the tests for lock acquistion and release, if we
 
1116
        # have as many in each list, it should be ok. Some lock tests also
 
1117
        # break some locks on purpose and should be taken into account by
 
1118
        # considering that breaking a lock is just a dirty way of releasing it.
 
1119
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
1120
            message = (
 
1121
                'Different number of acquired and '
 
1122
                'released or broken locks.\n'
 
1123
                'acquired=%s\n'
 
1124
                'released=%s\n'
 
1125
                'broken=%s\n' %
 
1126
                (acquired_locks, released_locks, broken_locks))
 
1127
            if not self._lock_check_thorough:
 
1128
                # Rather than fail, just warn
 
1129
                print("Broken test %s: %s" % (self, message))
 
1130
                return
 
1131
            self.fail(message)
 
1132
 
 
1133
    def _track_locks(self):
 
1134
        """Track lock activity during tests."""
 
1135
        self._lock_actions = []
 
1136
        if 'disable_lock_checks' in selftest_debug_flags:
 
1137
            self._lock_check_thorough = False
 
1138
        else:
 
1139
            self._lock_check_thorough = True
 
1140
 
 
1141
        self.addCleanup(self._check_locks)
 
1142
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
1143
                                                self._lock_acquired, None)
 
1144
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
1145
                                                self._lock_released, None)
 
1146
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
1147
                                                self._lock_broken, None)
 
1148
 
 
1149
    def _lock_acquired(self, result):
 
1150
        self._lock_actions.append(('acquired', result))
 
1151
 
 
1152
    def _lock_released(self, result):
 
1153
        self._lock_actions.append(('released', result))
 
1154
 
 
1155
    def _lock_broken(self, result):
 
1156
        self._lock_actions.append(('broken', result))
 
1157
 
 
1158
    def permit_dir(self, name):
 
1159
        """Permit a directory to be used by this test. See permit_url."""
 
1160
        name_transport = _mod_transport.get_transport_from_path(name)
 
1161
        self.permit_url(name)
 
1162
        self.permit_url(name_transport.base)
 
1163
 
 
1164
    def permit_url(self, url):
 
1165
        """Declare that url is an ok url to use in this test.
 
1166
 
 
1167
        Do this for memory transports, temporary test directory etc.
 
1168
 
 
1169
        Do not do this for the current working directory, /tmp, or any other
 
1170
        preexisting non isolated url.
 
1171
        """
 
1172
        if not url.endswith('/'):
 
1173
            url += '/'
 
1174
        self._bzr_selftest_roots.append(url)
 
1175
 
 
1176
    def permit_source_tree_branch_repo(self):
 
1177
        """Permit the source tree brz is running from to be opened.
 
1178
 
 
1179
        Some code such as breezy.version attempts to read from the brz branch
 
1180
        that brz is executing from (if any). This method permits that directory
 
1181
        to be used in the test suite.
 
1182
        """
 
1183
        path = self.get_source_path()
 
1184
        self.record_directory_isolation()
 
1185
        try:
 
1186
            try:
 
1187
                workingtree.WorkingTree.open(path)
 
1188
            except (errors.NotBranchError, errors.NoWorkingTree):
 
1189
                raise TestSkipped('Needs a working tree of brz sources')
 
1190
        finally:
 
1191
            self.enable_directory_isolation()
 
1192
 
 
1193
    def _preopen_isolate_transport(self, transport):
 
1194
        """Check that all transport openings are done in the test work area."""
 
1195
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
1196
            # Unwrap pathfiltered transports
 
1197
            transport = transport.server.backing_transport.clone(
 
1198
                transport._filter('.'))
 
1199
        url = transport.base
 
1200
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1201
        # urls it is given by prepending readonly+. This is appropriate as the
 
1202
        # client shouldn't know that the server is readonly (or not readonly).
 
1203
        # We could register all servers twice, with readonly+ prepending, but
 
1204
        # that makes for a long list; this is about the same but easier to
 
1205
        # read.
 
1206
        if url.startswith('readonly+'):
 
1207
            url = url[len('readonly+'):]
 
1208
        self._preopen_isolate_url(url)
 
1209
 
 
1210
    def _preopen_isolate_url(self, url):
 
1211
        if not self._directory_isolation:
 
1212
            return
 
1213
        if self._directory_isolation == 'record':
 
1214
            self._bzr_selftest_roots.append(url)
 
1215
            return
 
1216
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1217
        # from working unless they are explicitly granted permission. We then
 
1218
        # depend on the code that sets up test transports to check that they are
 
1219
        # appropriately isolated and enable their use by calling
 
1220
        # self.permit_transport()
 
1221
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1222
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1223
                % (url, self._bzr_selftest_roots))
 
1224
 
 
1225
    def record_directory_isolation(self):
 
1226
        """Gather accessed directories to permit later access.
 
1227
 
 
1228
        This is used for tests that access the branch brz is running from.
 
1229
        """
 
1230
        self._directory_isolation = "record"
 
1231
 
 
1232
    def start_server(self, transport_server, backing_server=None):
 
1233
        """Start transport_server for this test.
 
1234
 
 
1235
        This starts the server, registers a cleanup for it and permits the
 
1236
        server's urls to be used.
 
1237
        """
 
1238
        if backing_server is None:
 
1239
            transport_server.start_server()
 
1240
        else:
 
1241
            transport_server.start_server(backing_server)
 
1242
        self.addCleanup(transport_server.stop_server)
 
1243
        # Obtain a real transport because if the server supplies a password, it
 
1244
        # will be hidden from the base on the client side.
 
1245
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1246
        # Some transport servers effectively chroot the backing transport;
 
1247
        # others like SFTPServer don't - users of the transport can walk up the
 
1248
        # transport to read the entire backing transport. This wouldn't matter
 
1249
        # except that the workdir tests are given - and that they expect the
 
1250
        # server's url to point at - is one directory under the safety net. So
 
1251
        # Branch operations into the transport will attempt to walk up one
 
1252
        # directory. Chrooting all servers would avoid this but also mean that
 
1253
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1254
        # getting the test framework to start the server with a backing server
 
1255
        # at the actual safety net directory would work too, but this then
 
1256
        # means that the self.get_url/self.get_transport methods would need
 
1257
        # to transform all their results. On balance its cleaner to handle it
 
1258
        # here, and permit a higher url when we have one of these transports.
 
1259
        if t.base.endswith('/work/'):
 
1260
            # we have safety net/test root/work
 
1261
            t = t.clone('../..')
 
1262
        elif isinstance(transport_server,
 
1263
                        test_server.SmartTCPServer_for_testing):
 
1264
            # The smart server adds a path similar to work, which is traversed
 
1265
            # up from by the client. But the server is chrooted - the actual
 
1266
            # backing transport is not escaped from, and VFS requests to the
 
1267
            # root will error (because they try to escape the chroot).
 
1268
            t2 = t.clone('..')
 
1269
            while t2.base != t.base:
 
1270
                t = t2
 
1271
                t2 = t.clone('..')
 
1272
        self.permit_url(t.base)
 
1273
 
 
1274
    def _track_transports(self):
 
1275
        """Install checks for transport usage."""
 
1276
        # TestCase has no safe place it can write to.
 
1277
        self._bzr_selftest_roots = []
 
1278
        # Currently the easiest way to be sure that nothing is going on is to
 
1279
        # hook into brz dir opening. This leaves a small window of error for
 
1280
        # transport tests, but they are well known, and we can improve on this
 
1281
        # step.
 
1282
        controldir.ControlDir.hooks.install_named_hook("pre_open",
 
1283
            self._preopen_isolate_transport, "Check brz directories are safe.")
 
1284
 
 
1285
    def _ndiff_strings(self, a, b):
 
1286
        """Return ndiff between two strings containing lines.
 
1287
 
 
1288
        A trailing newline is added if missing to make the strings
 
1289
        print properly."""
 
1290
        if b and b[-1] != '\n':
 
1291
            b += '\n'
 
1292
        if a and a[-1] != '\n':
 
1293
            a += '\n'
 
1294
        difflines = difflib.ndiff(a.splitlines(True),
 
1295
                                  b.splitlines(True),
 
1296
                                  linejunk=lambda x: False,
 
1297
                                  charjunk=lambda x: False)
 
1298
        return ''.join(difflines)
 
1299
 
 
1300
    def assertEqual(self, a, b, message=''):
 
1301
        try:
 
1302
            if a == b:
 
1303
                return
 
1304
        except UnicodeError as e:
 
1305
            # If we can't compare without getting a UnicodeError, then
 
1306
            # obviously they are different
 
1307
            trace.mutter('UnicodeError: %s', e)
 
1308
        if message:
 
1309
            message += '\n'
 
1310
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1311
            % (message,
 
1312
               pprint.pformat(a), pprint.pformat(b)))
 
1313
 
 
1314
    # FIXME: This is deprecated in unittest2 but plugins may still use it so we
 
1315
    # need a deprecation period for them -- vila 2016-02-01
 
1316
    assertEquals = assertEqual
 
1317
 
 
1318
    def assertEqualDiff(self, a, b, message=None):
 
1319
        """Assert two texts are equal, if not raise an exception.
 
1320
 
 
1321
        This is intended for use with multi-line strings where it can
 
1322
        be hard to find the differences by eye.
 
1323
        """
 
1324
        # TODO: perhaps override assertEqual to call this for strings?
 
1325
        if a == b:
 
1326
            return
 
1327
        if message is None:
 
1328
            message = "texts not equal:\n"
 
1329
        if a + '\n' == b:
 
1330
            message = 'first string is missing a final newline.\n'
 
1331
        if a == b + '\n':
 
1332
            message = 'second string is missing a final newline.\n'
 
1333
        raise AssertionError(message +
 
1334
                             self._ndiff_strings(a, b))
 
1335
 
 
1336
    def assertEqualMode(self, mode, mode_test):
 
1337
        self.assertEqual(mode, mode_test,
 
1338
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1339
 
 
1340
    def assertEqualStat(self, expected, actual):
 
1341
        """assert that expected and actual are the same stat result.
 
1342
 
 
1343
        :param expected: A stat result.
 
1344
        :param actual: A stat result.
 
1345
        :raises AssertionError: If the expected and actual stat values differ
 
1346
            other than by atime.
 
1347
        """
 
1348
        self.assertEqual(expected.st_size, actual.st_size,
 
1349
                         'st_size did not match')
 
1350
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1351
                         'st_mtime did not match')
 
1352
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1353
                         'st_ctime did not match')
 
1354
        if sys.platform == 'win32':
 
1355
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1356
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1357
            # odd. We just force it to always be 0 to avoid any problems.
 
1358
            self.assertEqual(0, expected.st_dev)
 
1359
            self.assertEqual(0, actual.st_dev)
 
1360
            self.assertEqual(0, expected.st_ino)
 
1361
            self.assertEqual(0, actual.st_ino)
 
1362
        else:
 
1363
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1364
                             'st_dev did not match')
 
1365
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1366
                             'st_ino did not match')
 
1367
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1368
                         'st_mode did not match')
 
1369
 
 
1370
    def assertLength(self, length, obj_with_len):
 
1371
        """Assert that obj_with_len is of length length."""
 
1372
        if len(obj_with_len) != length:
 
1373
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1374
                length, len(obj_with_len), obj_with_len))
 
1375
 
 
1376
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1377
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1378
        """
 
1379
        captured = []
 
1380
        orig_log_exception_quietly = trace.log_exception_quietly
 
1381
        try:
 
1382
            def capture():
 
1383
                orig_log_exception_quietly()
 
1384
                captured.append(sys.exc_info()[1])
 
1385
            trace.log_exception_quietly = capture
 
1386
            func(*args, **kwargs)
 
1387
        finally:
 
1388
            trace.log_exception_quietly = orig_log_exception_quietly
 
1389
        self.assertLength(1, captured)
 
1390
        err = captured[0]
 
1391
        self.assertIsInstance(err, exception_class)
 
1392
        return err
 
1393
 
 
1394
    def assertPositive(self, val):
 
1395
        """Assert that val is greater than 0."""
 
1396
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1397
 
 
1398
    def assertNegative(self, val):
 
1399
        """Assert that val is less than 0."""
 
1400
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1401
 
 
1402
    def assertStartsWith(self, s, prefix):
 
1403
        if not s.startswith(prefix):
 
1404
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1405
 
 
1406
    def assertEndsWith(self, s, suffix):
 
1407
        """Asserts that s ends with suffix."""
 
1408
        if not s.endswith(suffix):
 
1409
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1410
 
 
1411
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1412
        """Assert that a contains something matching a regular expression."""
 
1413
        if not re.search(needle_re, haystack, flags):
 
1414
            if '\n' in haystack or len(haystack) > 60:
 
1415
                # a long string, format it in a more readable way
 
1416
                raise AssertionError(
 
1417
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1418
                        % (needle_re, haystack))
 
1419
            else:
 
1420
                raise AssertionError('pattern "%s" not found in "%s"'
 
1421
                        % (needle_re, haystack))
 
1422
 
 
1423
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1424
        """Assert that a does not match a regular expression"""
 
1425
        if re.search(needle_re, haystack, flags):
 
1426
            raise AssertionError('pattern "%s" found in "%s"'
 
1427
                    % (needle_re, haystack))
 
1428
 
 
1429
    def assertContainsString(self, haystack, needle):
 
1430
        if haystack.find(needle) == -1:
 
1431
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1432
 
 
1433
    def assertNotContainsString(self, haystack, needle):
 
1434
        if haystack.find(needle) != -1:
 
1435
            self.fail("string %r found in '''%s'''" % (needle, haystack))
 
1436
 
 
1437
    def assertSubset(self, sublist, superlist):
 
1438
        """Assert that every entry in sublist is present in superlist."""
 
1439
        missing = set(sublist) - set(superlist)
 
1440
        if len(missing) > 0:
 
1441
            raise AssertionError("value(s) %r not present in container %r" %
 
1442
                                 (missing, superlist))
 
1443
 
 
1444
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1445
        """Fail unless excClass is raised when the iterator from func is used.
 
1446
 
 
1447
        Many functions can return generators this makes sure
 
1448
        to wrap them in a list() call to make sure the whole generator
 
1449
        is run, and that the proper exception is raised.
 
1450
        """
 
1451
        try:
 
1452
            list(func(*args, **kwargs))
 
1453
        except excClass as e:
 
1454
            return e
 
1455
        else:
 
1456
            if getattr(excClass, '__name__', None) is not None:
 
1457
                excName = excClass.__name__
 
1458
            else:
 
1459
                excName = str(excClass)
 
1460
            raise self.failureException("%s not raised" % excName)
 
1461
 
 
1462
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1463
        """Assert that a callable raises a particular exception.
 
1464
 
 
1465
        :param excClass: As for the except statement, this may be either an
 
1466
            exception class, or a tuple of classes.
 
1467
        :param callableObj: A callable, will be passed ``*args`` and
 
1468
            ``**kwargs``.
 
1469
 
 
1470
        Returns the exception so that you can examine it.
 
1471
        """
 
1472
        try:
 
1473
            callableObj(*args, **kwargs)
 
1474
        except excClass as e:
 
1475
            return e
 
1476
        else:
 
1477
            if getattr(excClass, '__name__', None) is not None:
 
1478
                excName = excClass.__name__
 
1479
            else:
 
1480
                # probably a tuple
 
1481
                excName = str(excClass)
 
1482
            raise self.failureException("%s not raised" % excName)
 
1483
 
 
1484
    def assertIs(self, left, right, message=None):
 
1485
        if not (left is right):
 
1486
            if message is not None:
 
1487
                raise AssertionError(message)
 
1488
            else:
 
1489
                raise AssertionError("%r is not %r." % (left, right))
 
1490
 
 
1491
    def assertIsNot(self, left, right, message=None):
 
1492
        if (left is right):
 
1493
            if message is not None:
 
1494
                raise AssertionError(message)
 
1495
            else:
 
1496
                raise AssertionError("%r is %r." % (left, right))
 
1497
 
 
1498
    def assertTransportMode(self, transport, path, mode):
 
1499
        """Fail if a path does not have mode "mode".
 
1500
 
 
1501
        If modes are not supported on this transport, the assertion is ignored.
 
1502
        """
 
1503
        if not transport._can_roundtrip_unix_modebits():
 
1504
            return
 
1505
        path_stat = transport.stat(path)
 
1506
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1507
        self.assertEqual(mode, actual_mode,
 
1508
                         'mode of %r incorrect (%s != %s)'
 
1509
                         % (path, oct(mode), oct(actual_mode)))
 
1510
 
 
1511
    def assertIsSameRealPath(self, path1, path2):
 
1512
        """Fail if path1 and path2 points to different files"""
 
1513
        self.assertEqual(osutils.realpath(path1),
 
1514
                         osutils.realpath(path2),
 
1515
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1516
 
 
1517
    def assertIsInstance(self, obj, kls, msg=None):
 
1518
        """Fail if obj is not an instance of kls
 
1519
        
 
1520
        :param msg: Supplementary message to show if the assertion fails.
 
1521
        """
 
1522
        if not isinstance(obj, kls):
 
1523
            m = "%r is an instance of %s rather than %s" % (
 
1524
                obj, obj.__class__, kls)
 
1525
            if msg:
 
1526
                m += ": " + msg
 
1527
            self.fail(m)
 
1528
 
 
1529
    def assertFileEqual(self, content, path):
 
1530
        """Fail if path does not contain 'content'."""
 
1531
        self.assertPathExists(path)
 
1532
        with open(path, 'rb') as f:
 
1533
            s = f.read()
 
1534
        self.assertEqualDiff(content, s)
 
1535
 
 
1536
    def assertDocstring(self, expected_docstring, obj):
 
1537
        """Fail if obj does not have expected_docstring"""
 
1538
        if __doc__ is None:
 
1539
            # With -OO the docstring should be None instead
 
1540
            self.assertIs(obj.__doc__, None)
 
1541
        else:
 
1542
            self.assertEqual(expected_docstring, obj.__doc__)
 
1543
 
 
1544
    def assertPathExists(self, path):
 
1545
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1546
        if not isinstance(path, (str, text_type)):
 
1547
            for p in path:
 
1548
                self.assertPathExists(p)
 
1549
        else:
 
1550
            self.assertTrue(osutils.lexists(path),
 
1551
                path + " does not exist")
 
1552
 
 
1553
    def assertPathDoesNotExist(self, path):
 
1554
        """Fail if path or paths, which may be abs or relative, exist."""
 
1555
        if not isinstance(path, (str, text_type)):
 
1556
            for p in path:
 
1557
                self.assertPathDoesNotExist(p)
 
1558
        else:
 
1559
            self.assertFalse(osutils.lexists(path),
 
1560
                path + " exists")
 
1561
 
 
1562
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1563
        """A helper for callDeprecated and applyDeprecated.
 
1564
 
 
1565
        :param a_callable: A callable to call.
 
1566
        :param args: The positional arguments for the callable
 
1567
        :param kwargs: The keyword arguments for the callable
 
1568
        :return: A tuple (warnings, result). result is the result of calling
 
1569
            a_callable(``*args``, ``**kwargs``).
 
1570
        """
 
1571
        local_warnings = []
 
1572
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1573
            # we've hooked into a deprecation specific callpath,
 
1574
            # only deprecations should getting sent via it.
 
1575
            self.assertEqual(cls, DeprecationWarning)
 
1576
            local_warnings.append(msg)
 
1577
        original_warning_method = symbol_versioning.warn
 
1578
        symbol_versioning.set_warning_method(capture_warnings)
 
1579
        try:
 
1580
            result = a_callable(*args, **kwargs)
 
1581
        finally:
 
1582
            symbol_versioning.set_warning_method(original_warning_method)
 
1583
        return (local_warnings, result)
 
1584
 
 
1585
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1586
        """Call a deprecated callable without warning the user.
 
1587
 
 
1588
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1589
        not other callers that go direct to the warning module.
 
1590
 
 
1591
        To test that a deprecated method raises an error, do something like
 
1592
        this (remember that both assertRaises and applyDeprecated delays *args
 
1593
        and **kwargs passing)::
 
1594
 
 
1595
            self.assertRaises(errors.ReservedId,
 
1596
                self.applyDeprecated,
 
1597
                deprecated_in((1, 5, 0)),
 
1598
                br.append_revision,
 
1599
                'current:')
 
1600
 
 
1601
        :param deprecation_format: The deprecation format that the callable
 
1602
            should have been deprecated with. This is the same type as the
 
1603
            parameter to deprecated_method/deprecated_function. If the
 
1604
            callable is not deprecated with this format, an assertion error
 
1605
            will be raised.
 
1606
        :param a_callable: A callable to call. This may be a bound method or
 
1607
            a regular function. It will be called with ``*args`` and
 
1608
            ``**kwargs``.
 
1609
        :param args: The positional arguments for the callable
 
1610
        :param kwargs: The keyword arguments for the callable
 
1611
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1612
        """
 
1613
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1614
            *args, **kwargs)
 
1615
        expected_first_warning = symbol_versioning.deprecation_string(
 
1616
            a_callable, deprecation_format)
 
1617
        if len(call_warnings) == 0:
 
1618
            self.fail("No deprecation warning generated by call to %s" %
 
1619
                a_callable)
 
1620
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1621
        return result
 
1622
 
 
1623
    def callCatchWarnings(self, fn, *args, **kw):
 
1624
        """Call a callable that raises python warnings.
 
1625
 
 
1626
        The caller's responsible for examining the returned warnings.
 
1627
 
 
1628
        If the callable raises an exception, the exception is not
 
1629
        caught and propagates up to the caller.  In that case, the list
 
1630
        of warnings is not available.
 
1631
 
 
1632
        :returns: ([warning_object, ...], fn_result)
 
1633
        """
 
1634
        # XXX: This is not perfect, because it completely overrides the
 
1635
        # warnings filters, and some code may depend on suppressing particular
 
1636
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1637
        # though.  -- Andrew, 20071062
 
1638
        wlist = []
 
1639
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1640
            # despite the name, 'message' is normally(?) a Warning subclass
 
1641
            # instance
 
1642
            wlist.append(message)
 
1643
        saved_showwarning = warnings.showwarning
 
1644
        saved_filters = warnings.filters
 
1645
        try:
 
1646
            warnings.showwarning = _catcher
 
1647
            warnings.filters = []
 
1648
            result = fn(*args, **kw)
 
1649
        finally:
 
1650
            warnings.showwarning = saved_showwarning
 
1651
            warnings.filters = saved_filters
 
1652
        return wlist, result
 
1653
 
 
1654
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1655
        """Assert that a callable is deprecated in a particular way.
 
1656
 
 
1657
        This is a very precise test for unusual requirements. The
 
1658
        applyDeprecated helper function is probably more suited for most tests
 
1659
        as it allows you to simply specify the deprecation format being used
 
1660
        and will ensure that that is issued for the function being called.
 
1661
 
 
1662
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1663
        not other callers that go direct to the warning module.  To catch
 
1664
        general warnings, use callCatchWarnings.
 
1665
 
 
1666
        :param expected: a list of the deprecation warnings expected, in order
 
1667
        :param callable: The callable to call
 
1668
        :param args: The positional arguments for the callable
 
1669
        :param kwargs: The keyword arguments for the callable
 
1670
        """
 
1671
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1672
            *args, **kwargs)
 
1673
        self.assertEqual(expected, call_warnings)
 
1674
        return result
 
1675
 
 
1676
    def _startLogFile(self):
 
1677
        """Setup a in-memory target for bzr and testcase log messages"""
 
1678
        pseudo_log_file = BytesIO()
 
1679
        def _get_log_contents_for_weird_testtools_api():
 
1680
            return [pseudo_log_file.getvalue().decode(
 
1681
                "utf-8", "replace").encode("utf-8")]
 
1682
        self.addDetail("log", content.Content(content.ContentType("text",
 
1683
            "plain", {"charset": "utf8"}),
 
1684
            _get_log_contents_for_weird_testtools_api))
 
1685
        self._log_file = pseudo_log_file
 
1686
        self._log_memento = trace.push_log_file(self._log_file)
 
1687
        self.addCleanup(self._finishLogFile)
 
1688
 
 
1689
    def _finishLogFile(self):
 
1690
        """Flush and dereference the in-memory log for this testcase"""
 
1691
        if trace._trace_file:
 
1692
            # flush the log file, to get all content
 
1693
            trace._trace_file.flush()
 
1694
        trace.pop_log_file(self._log_memento)
 
1695
        # The logging module now tracks references for cleanup so discard ours
 
1696
        del self._log_memento
 
1697
 
 
1698
    def thisFailsStrictLockCheck(self):
 
1699
        """It is known that this test would fail with -Dstrict_locks.
 
1700
 
 
1701
        By default, all tests are run with strict lock checking unless
 
1702
        -Edisable_lock_checks is supplied. However there are some tests which
 
1703
        we know fail strict locks at this point that have not been fixed.
 
1704
        They should call this function to disable the strict checking.
 
1705
 
 
1706
        This should be used sparingly, it is much better to fix the locking
 
1707
        issues rather than papering over the problem by calling this function.
 
1708
        """
 
1709
        debug.debug_flags.discard('strict_locks')
 
1710
 
 
1711
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1712
        """Overrides an object attribute restoring it after the test.
 
1713
 
 
1714
        :note: This should be used with discretion; you should think about
 
1715
        whether it's better to make the code testable without monkey-patching.
 
1716
 
 
1717
        :param obj: The object that will be mutated.
 
1718
 
 
1719
        :param attr_name: The attribute name we want to preserve/override in
 
1720
            the object.
 
1721
 
 
1722
        :param new: The optional value we want to set the attribute to.
 
1723
 
 
1724
        :returns: The actual attr value.
 
1725
        """
 
1726
        # The actual value is captured by the call below
 
1727
        value = getattr(obj, attr_name, _unitialized_attr)
 
1728
        if value is _unitialized_attr:
 
1729
            # When the test completes, the attribute should not exist, but if
 
1730
            # we aren't setting a value, we don't need to do anything.
 
1731
            if new is not _unitialized_attr:
 
1732
                self.addCleanup(delattr, obj, attr_name)
 
1733
        else:
 
1734
            self.addCleanup(setattr, obj, attr_name, value)
 
1735
        if new is not _unitialized_attr:
 
1736
            setattr(obj, attr_name, new)
 
1737
        return value
 
1738
 
 
1739
    def overrideEnv(self, name, new):
 
1740
        """Set an environment variable, and reset it after the test.
 
1741
 
 
1742
        :param name: The environment variable name.
 
1743
 
 
1744
        :param new: The value to set the variable to. If None, the 
 
1745
            variable is deleted from the environment.
 
1746
 
 
1747
        :returns: The actual variable value.
 
1748
        """
 
1749
        value = osutils.set_or_unset_env(name, new)
 
1750
        self.addCleanup(osutils.set_or_unset_env, name, value)
 
1751
        return value
 
1752
 
 
1753
    def recordCalls(self, obj, attr_name):
 
1754
        """Monkeypatch in a wrapper that will record calls.
 
1755
 
 
1756
        The monkeypatch is automatically removed when the test concludes.
 
1757
 
 
1758
        :param obj: The namespace holding the reference to be replaced;
 
1759
            typically a module, class, or object.
 
1760
        :param attr_name: A string for the name of the attribute to 
 
1761
            patch.
 
1762
        :returns: A list that will be extended with one item every time the
 
1763
            function is called, with a tuple of (args, kwargs).
 
1764
        """
 
1765
        calls = []
 
1766
 
 
1767
        def decorator(*args, **kwargs):
 
1768
            calls.append((args, kwargs))
 
1769
            return orig(*args, **kwargs)
 
1770
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1771
        return calls
 
1772
 
 
1773
    def _cleanEnvironment(self):
 
1774
        for name, value in isolated_environ.items():
 
1775
            self.overrideEnv(name, value)
 
1776
 
 
1777
    def _restoreHooks(self):
 
1778
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1779
            setattr(klass, name, hooks)
 
1780
        self._preserved_hooks.clear()
 
1781
        breezy.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1782
        self._preserved_lazy_hooks.clear()
 
1783
 
 
1784
    def knownFailure(self, reason):
 
1785
        """Declare that this test fails for a known reason
 
1786
 
 
1787
        Tests that are known to fail should generally be using expectedFailure
 
1788
        with an appropriate reverse assertion if a change could cause the test
 
1789
        to start passing. Conversely if the test has no immediate prospect of
 
1790
        succeeding then using skip is more suitable.
 
1791
 
 
1792
        When this method is called while an exception is being handled, that
 
1793
        traceback will be used, otherwise a new exception will be thrown to
 
1794
        provide one but won't be reported.
 
1795
        """
 
1796
        self._add_reason(reason)
 
1797
        try:
 
1798
            exc_info = sys.exc_info()
 
1799
            if exc_info != (None, None, None):
 
1800
                self._report_traceback(exc_info)
 
1801
            else:
 
1802
                try:
 
1803
                    raise self.failureException(reason)
 
1804
                except self.failureException:
 
1805
                    exc_info = sys.exc_info()
 
1806
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1807
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1808
        finally:
 
1809
            del exc_info
 
1810
 
 
1811
    def _suppress_log(self):
 
1812
        """Remove the log info from details."""
 
1813
        self.discardDetail('log')
 
1814
 
 
1815
    def _do_skip(self, result, reason):
 
1816
        self._suppress_log()
 
1817
        addSkip = getattr(result, 'addSkip', None)
 
1818
        if not callable(addSkip):
 
1819
            result.addSuccess(result)
 
1820
        else:
 
1821
            addSkip(self, str(reason))
 
1822
 
 
1823
    @staticmethod
 
1824
    def _do_known_failure(self, result, e):
 
1825
        self._suppress_log()
 
1826
        err = sys.exc_info()
 
1827
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1828
        if addExpectedFailure is not None:
 
1829
            addExpectedFailure(self, err)
 
1830
        else:
 
1831
            result.addSuccess(self)
 
1832
 
 
1833
    @staticmethod
 
1834
    def _do_not_applicable(self, result, e):
 
1835
        if not e.args:
 
1836
            reason = 'No reason given'
 
1837
        else:
 
1838
            reason = e.args[0]
 
1839
        self._suppress_log ()
 
1840
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1841
        if addNotApplicable is not None:
 
1842
            result.addNotApplicable(self, reason)
 
1843
        else:
 
1844
            self._do_skip(result, reason)
 
1845
 
 
1846
    @staticmethod
 
1847
    def _report_skip(self, result, err):
 
1848
        """Override the default _report_skip.
 
1849
 
 
1850
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1851
        already been formatted into the 'reason' string, and we can't pull it
 
1852
        out again.
 
1853
        """
 
1854
        self._suppress_log()
 
1855
        super(TestCase, self)._report_skip(self, result, err)
 
1856
 
 
1857
    @staticmethod
 
1858
    def _report_expected_failure(self, result, err):
 
1859
        """Strip the log.
 
1860
 
 
1861
        See _report_skip for motivation.
 
1862
        """
 
1863
        self._suppress_log()
 
1864
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1865
 
 
1866
    @staticmethod
 
1867
    def _do_unsupported_or_skip(self, result, e):
 
1868
        reason = e.args[0]
 
1869
        self._suppress_log()
 
1870
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1871
        if addNotSupported is not None:
 
1872
            result.addNotSupported(self, reason)
 
1873
        else:
 
1874
            self._do_skip(result, reason)
 
1875
 
 
1876
    def time(self, callable, *args, **kwargs):
 
1877
        """Run callable and accrue the time it takes to the benchmark time.
 
1878
 
 
1879
        If lsprofiling is enabled (i.e. by --lsprof-time to brz selftest) then
 
1880
        this will cause lsprofile statistics to be gathered and stored in
 
1881
        self._benchcalls.
 
1882
        """
 
1883
        if self._benchtime is None:
 
1884
            self.addDetail('benchtime', content.Content(content.UTF8_TEXT,
 
1885
                lambda:[str(self._benchtime).encode('utf-8')]))
 
1886
            self._benchtime = 0
 
1887
        start = time.time()
 
1888
        try:
 
1889
            if not self._gather_lsprof_in_benchmarks:
 
1890
                return callable(*args, **kwargs)
 
1891
            else:
 
1892
                # record this benchmark
 
1893
                ret, stats = breezy.lsprof.profile(callable, *args, **kwargs)
 
1894
                stats.sort()
 
1895
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1896
                return ret
 
1897
        finally:
 
1898
            self._benchtime += time.time() - start
 
1899
 
 
1900
    def log(self, *args):
 
1901
        trace.mutter(*args)
 
1902
 
 
1903
    def get_log(self):
 
1904
        """Get a unicode string containing the log from breezy.trace.
 
1905
 
 
1906
        Undecodable characters are replaced.
 
1907
        """
 
1908
        return u"".join(self.getDetails()['log'].iter_text())
 
1909
 
 
1910
    def requireFeature(self, feature):
 
1911
        """This test requires a specific feature is available.
 
1912
 
 
1913
        :raises UnavailableFeature: When feature is not available.
 
1914
        """
 
1915
        if not feature.available():
 
1916
            raise UnavailableFeature(feature)
 
1917
 
 
1918
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1919
            working_dir):
 
1920
        """Run bazaar command line, splitting up a string command line."""
 
1921
        if isinstance(args, string_types):
 
1922
            args = shlex.split(args)
 
1923
        return self._run_bzr_core(args, retcode=retcode,
 
1924
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1925
                )
 
1926
 
 
1927
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1928
            working_dir):
 
1929
        # Clear chk_map page cache, because the contents are likely to mask
 
1930
        # locking errors.
 
1931
        chk_map.clear_cache()
 
1932
        if encoding is None:
 
1933
            encoding = osutils.get_user_encoding()
 
1934
 
 
1935
        self.log('run brz: %r', args)
 
1936
 
 
1937
        stdout = ui_testing.BytesIOWithEncoding()
 
1938
        stderr = ui_testing.BytesIOWithEncoding()
 
1939
        stdout.encoding = stderr.encoding = encoding
 
1940
 
 
1941
        # FIXME: don't call into logging here
 
1942
        handler = trace.EncodedStreamHandler(
 
1943
            stderr, errors="replace", level=logging.INFO)
 
1944
        logger = logging.getLogger('')
 
1945
        logger.addHandler(handler)
 
1946
 
 
1947
        self._last_cmd_stdout = codecs.getwriter(encoding)(stdout)
 
1948
        self._last_cmd_stderr = codecs.getwriter(encoding)(stderr)
 
1949
 
 
1950
        old_ui_factory = ui.ui_factory
 
1951
        ui.ui_factory = ui_testing.TestUIFactory(
 
1952
            stdin=stdin,
 
1953
            stdout=self._last_cmd_stdout,
 
1954
            stderr=self._last_cmd_stderr)
 
1955
 
 
1956
        cwd = None
 
1957
        if working_dir is not None:
 
1958
            cwd = osutils.getcwd()
 
1959
            os.chdir(working_dir)
 
1960
 
 
1961
        try:
 
1962
            result = self.apply_redirected(
 
1963
                ui.ui_factory.stdin,
 
1964
                stdout, stderr,
 
1965
                _mod_commands.run_bzr_catch_user_errors,
 
1966
                args)
 
1967
        finally:
 
1968
            logger.removeHandler(handler)
 
1969
            ui.ui_factory = old_ui_factory
 
1970
            if cwd is not None:
 
1971
                os.chdir(cwd)
 
1972
 
 
1973
        out = stdout.getvalue()
 
1974
        err = stderr.getvalue()
 
1975
        if out:
 
1976
            self.log('output:\n%r', out)
 
1977
        if err:
 
1978
            self.log('errors:\n%r', err)
 
1979
        if retcode is not None:
 
1980
            self.assertEqual(retcode, result,
 
1981
                              message='Unexpected return code')
 
1982
        return result, out, err
 
1983
 
 
1984
    def run_bzr(self, args, retcode=0, stdin=None, encoding=None,
 
1985
                working_dir=None, error_regexes=[]):
 
1986
        """Invoke brz, as if it were run from the command line.
 
1987
 
 
1988
        The argument list should not include the brz program name - the
 
1989
        first argument is normally the brz command.  Arguments may be
 
1990
        passed in three ways:
 
1991
 
 
1992
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1993
        when the command contains whitespace or metacharacters, or
 
1994
        is built up at run time.
 
1995
 
 
1996
        2- A single string, eg "add a".  This is the most convenient
 
1997
        for hardcoded commands.
 
1998
 
 
1999
        This runs brz through the interface that catches and reports
 
2000
        errors, and with logging set to something approximating the
 
2001
        default, so that error reporting can be checked.
 
2002
 
 
2003
        This should be the main method for tests that want to exercise the
 
2004
        overall behavior of the brz application (rather than a unit test
 
2005
        or a functional test of the library.)
 
2006
 
 
2007
        This sends the stdout/stderr results into the test's log,
 
2008
        where it may be useful for debugging.  See also run_captured.
 
2009
 
 
2010
        :keyword stdin: A string to be used as stdin for the command.
 
2011
        :keyword retcode: The status code the command should return;
 
2012
            default 0.
 
2013
        :keyword working_dir: The directory to run the command in
 
2014
        :keyword error_regexes: A list of expected error messages.  If
 
2015
            specified they must be seen in the error output of the command.
 
2016
        """
 
2017
        retcode, out, err = self._run_bzr_autosplit(
 
2018
            args=args,
 
2019
            retcode=retcode,
 
2020
            encoding=encoding,
 
2021
            stdin=stdin,
 
2022
            working_dir=working_dir,
 
2023
            )
 
2024
        self.assertIsInstance(error_regexes, (list, tuple))
 
2025
        for regex in error_regexes:
 
2026
            self.assertContainsRe(err, regex)
 
2027
        return out, err
 
2028
 
 
2029
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
2030
        """Run brz, and check that stderr contains the supplied regexes
 
2031
 
 
2032
        :param error_regexes: Sequence of regular expressions which
 
2033
            must each be found in the error output. The relative ordering
 
2034
            is not enforced.
 
2035
        :param args: command-line arguments for brz
 
2036
        :param kwargs: Keyword arguments which are interpreted by run_brz
 
2037
            This function changes the default value of retcode to be 3,
 
2038
            since in most cases this is run when you expect brz to fail.
 
2039
 
 
2040
        :return: (out, err) The actual output of running the command (in case
 
2041
            you want to do more inspection)
 
2042
 
 
2043
        Examples of use::
 
2044
 
 
2045
            # Make sure that commit is failing because there is nothing to do
 
2046
            self.run_bzr_error(['no changes to commit'],
 
2047
                               ['commit', '-m', 'my commit comment'])
 
2048
            # Make sure --strict is handling an unknown file, rather than
 
2049
            # giving us the 'nothing to do' error
 
2050
            self.build_tree(['unknown'])
 
2051
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
2052
                               ['commit', --strict', '-m', 'my commit comment'])
 
2053
        """
 
2054
        kwargs.setdefault('retcode', 3)
 
2055
        kwargs['error_regexes'] = error_regexes
 
2056
        out, err = self.run_bzr(*args, **kwargs)
 
2057
        return out, err
 
2058
 
 
2059
    def run_bzr_subprocess(self, *args, **kwargs):
 
2060
        """Run brz in a subprocess for testing.
 
2061
 
 
2062
        This starts a new Python interpreter and runs brz in there.
 
2063
        This should only be used for tests that have a justifiable need for
 
2064
        this isolation: e.g. they are testing startup time, or signal
 
2065
        handling, or early startup code, etc.  Subprocess code can't be
 
2066
        profiled or debugged so easily.
 
2067
 
 
2068
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
2069
            None is supplied, the status code is not checked.
 
2070
        :keyword env_changes: A dictionary which lists changes to environment
 
2071
            variables. A value of None will unset the env variable.
 
2072
            The values must be strings. The change will only occur in the
 
2073
            child, so you don't need to fix the environment after running.
 
2074
        :keyword universal_newlines: Convert CRLF => LF
 
2075
        :keyword allow_plugins: By default the subprocess is run with
 
2076
            --no-plugins to ensure test reproducibility. Also, it is possible
 
2077
            for system-wide plugins to create unexpected output on stderr,
 
2078
            which can cause unnecessary test failures.
 
2079
        """
 
2080
        env_changes = kwargs.get('env_changes', {})
 
2081
        working_dir = kwargs.get('working_dir', None)
 
2082
        allow_plugins = kwargs.get('allow_plugins', False)
 
2083
        if len(args) == 1:
 
2084
            if isinstance(args[0], list):
 
2085
                args = args[0]
 
2086
            elif isinstance(args[0], (str, text_type)):
 
2087
                args = list(shlex.split(args[0]))
 
2088
        else:
 
2089
            raise ValueError("passing varargs to run_bzr_subprocess")
 
2090
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
2091
                                            working_dir=working_dir,
 
2092
                                            allow_plugins=allow_plugins)
 
2093
        # We distinguish between retcode=None and retcode not passed.
 
2094
        supplied_retcode = kwargs.get('retcode', 0)
 
2095
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
2096
            universal_newlines=kwargs.get('universal_newlines', False),
 
2097
            process_args=args)
 
2098
 
 
2099
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2100
                             skip_if_plan_to_signal=False,
 
2101
                             working_dir=None,
 
2102
                             allow_plugins=False, stderr=subprocess.PIPE):
 
2103
        """Start brz in a subprocess for testing.
 
2104
 
 
2105
        This starts a new Python interpreter and runs brz in there.
 
2106
        This should only be used for tests that have a justifiable need for
 
2107
        this isolation: e.g. they are testing startup time, or signal
 
2108
        handling, or early startup code, etc.  Subprocess code can't be
 
2109
        profiled or debugged so easily.
 
2110
 
 
2111
        :param process_args: a list of arguments to pass to the brz executable,
 
2112
            for example ``['--version']``.
 
2113
        :param env_changes: A dictionary which lists changes to environment
 
2114
            variables. A value of None will unset the env variable.
 
2115
            The values must be strings. The change will only occur in the
 
2116
            child, so you don't need to fix the environment after running.
 
2117
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
2118
            doesn't support signalling subprocesses.
 
2119
        :param allow_plugins: If False (default) pass --no-plugins to brz.
 
2120
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2121
            are those valid for the stderr argument of `subprocess.Popen`.
 
2122
            Default value is ``subprocess.PIPE``.
 
2123
 
 
2124
        :returns: Popen object for the started process.
 
2125
        """
 
2126
        if skip_if_plan_to_signal:
 
2127
            if os.name != "posix":
 
2128
                raise TestSkipped("Sending signals not supported")
 
2129
 
 
2130
        if env_changes is None:
 
2131
            env_changes = {}
 
2132
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2133
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2134
        # gets set to the computed directory of this parent process.
 
2135
        if site.USER_BASE is not None:
 
2136
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
 
2137
        old_env = {}
 
2138
 
 
2139
        def cleanup_environment():
 
2140
            for env_var, value in env_changes.items():
 
2141
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
2142
 
 
2143
        def restore_environment():
 
2144
            for env_var, value in old_env.items():
 
2145
                osutils.set_or_unset_env(env_var, value)
 
2146
 
 
2147
        bzr_path = self.get_brz_path()
 
2148
 
 
2149
        cwd = None
 
2150
        if working_dir is not None:
 
2151
            cwd = osutils.getcwd()
 
2152
            os.chdir(working_dir)
 
2153
 
 
2154
        try:
 
2155
            # win32 subprocess doesn't support preexec_fn
 
2156
            # so we will avoid using it on all platforms, just to
 
2157
            # make sure the code path is used, and we don't break on win32
 
2158
            cleanup_environment()
 
2159
            # Include the subprocess's log file in the test details, in case
 
2160
            # the test fails due to an error in the subprocess.
 
2161
            self._add_subprocess_log(trace._get_brz_log_filename())
 
2162
            command = [sys.executable]
 
2163
            # frozen executables don't need the path to bzr
 
2164
            if getattr(sys, "frozen", None) is None:
 
2165
                command.append(bzr_path)
 
2166
            if not allow_plugins:
 
2167
                command.append('--no-plugins')
 
2168
            command.extend(process_args)
 
2169
            process = self._popen(command, stdin=subprocess.PIPE,
 
2170
                                  stdout=subprocess.PIPE,
 
2171
                                  stderr=stderr)
 
2172
        finally:
 
2173
            restore_environment()
 
2174
            if cwd is not None:
 
2175
                os.chdir(cwd)
 
2176
 
 
2177
        return process
 
2178
 
 
2179
    def _add_subprocess_log(self, log_file_path):
 
2180
        if len(self._log_files) == 0:
 
2181
            # Register an addCleanup func.  We do this on the first call to
 
2182
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2183
            # addCleanup is registered after any cleanups for tempdirs that
 
2184
            # subclasses might create, which will probably remove the log file
 
2185
            # we want to read.
 
2186
            self.addCleanup(self._subprocess_log_cleanup)
 
2187
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2188
        # twice.
 
2189
        self._log_files.add(log_file_path)
 
2190
 
 
2191
    def _subprocess_log_cleanup(self):
 
2192
        for count, log_file_path in enumerate(self._log_files):
 
2193
            # We use buffer_now=True to avoid holding the file open beyond
 
2194
            # the life of this function, which might interfere with e.g.
 
2195
            # cleaning tempdirs on Windows.
 
2196
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2197
            #detail_content = content.content_from_file(
 
2198
            #    log_file_path, buffer_now=True)
 
2199
            with open(log_file_path, 'rb') as log_file:
 
2200
                log_file_bytes = log_file.read()
 
2201
            detail_content = content.Content(content.ContentType("text",
 
2202
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2203
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2204
                detail_content)
 
2205
 
 
2206
    def _popen(self, *args, **kwargs):
 
2207
        """Place a call to Popen.
 
2208
 
 
2209
        Allows tests to override this method to intercept the calls made to
 
2210
        Popen for introspection.
 
2211
        """
 
2212
        return subprocess.Popen(*args, **kwargs)
 
2213
 
 
2214
    def get_source_path(self):
 
2215
        """Return the path of the directory containing breezy."""
 
2216
        return os.path.dirname(os.path.dirname(breezy.__file__))
 
2217
 
 
2218
    def get_brz_path(self):
 
2219
        """Return the path of the 'brz' executable for this test suite."""
 
2220
        brz_path = os.path.join(self.get_source_path(), "brz")
 
2221
        if not os.path.isfile(brz_path):
 
2222
            # We are probably installed. Assume sys.argv is the right file
 
2223
            brz_path = sys.argv[0]
 
2224
        return brz_path
 
2225
 
 
2226
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2227
                              universal_newlines=False, process_args=None):
 
2228
        """Finish the execution of process.
 
2229
 
 
2230
        :param process: the Popen object returned from start_bzr_subprocess.
 
2231
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2232
            None is supplied, the status code is not checked.
 
2233
        :param send_signal: an optional signal to send to the process.
 
2234
        :param universal_newlines: Convert CRLF => LF
 
2235
        :returns: (stdout, stderr)
 
2236
        """
 
2237
        if send_signal is not None:
 
2238
            os.kill(process.pid, send_signal)
 
2239
        out, err = process.communicate()
 
2240
 
 
2241
        if universal_newlines:
 
2242
            out = out.replace('\r\n', '\n')
 
2243
            err = err.replace('\r\n', '\n')
 
2244
 
 
2245
        if retcode is not None and retcode != process.returncode:
 
2246
            if process_args is None:
 
2247
                process_args = "(unknown args)"
 
2248
            trace.mutter('Output of brz %r:\n%s', process_args, out)
 
2249
            trace.mutter('Error for brz %r:\n%s', process_args, err)
 
2250
            self.fail('Command brz %r failed with retcode %d != %d'
 
2251
                      % (process_args, retcode, process.returncode))
 
2252
        return [out, err]
 
2253
 
 
2254
    def check_tree_shape(self, tree, shape):
 
2255
        """Compare a tree to a list of expected names.
 
2256
 
 
2257
        Fail if they are not precisely equal.
 
2258
        """
 
2259
        extras = []
 
2260
        shape = list(shape)             # copy
 
2261
        for path, ie in tree.iter_entries_by_dir():
 
2262
            name = path.replace('\\', '/')
 
2263
            if ie.kind == 'directory':
 
2264
                name = name + '/'
 
2265
            if name == "/":
 
2266
                pass # ignore root entry
 
2267
            elif name in shape:
 
2268
                shape.remove(name)
 
2269
            else:
 
2270
                extras.append(name)
 
2271
        if shape:
 
2272
            self.fail("expected paths not found in inventory: %r" % shape)
 
2273
        if extras:
 
2274
            self.fail("unexpected paths found in inventory: %r" % extras)
 
2275
 
 
2276
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2277
                         a_callable=None, *args, **kwargs):
 
2278
        """Call callable with redirected std io pipes.
 
2279
 
 
2280
        Returns the return code."""
 
2281
        if not callable(a_callable):
 
2282
            raise ValueError("a_callable must be callable.")
 
2283
        if stdin is None:
 
2284
            stdin = BytesIO(b"")
 
2285
        if stdout is None:
 
2286
            if getattr(self, "_log_file", None) is not None:
 
2287
                stdout = self._log_file
 
2288
            else:
 
2289
                stdout = BytesIO()
 
2290
        if stderr is None:
 
2291
            if getattr(self, "_log_file", None is not None):
 
2292
                stderr = self._log_file
 
2293
            else:
 
2294
                stderr = BytesIO()
 
2295
        real_stdin = sys.stdin
 
2296
        real_stdout = sys.stdout
 
2297
        real_stderr = sys.stderr
 
2298
        try:
 
2299
            sys.stdout = stdout
 
2300
            sys.stderr = stderr
 
2301
            sys.stdin = stdin
 
2302
            return a_callable(*args, **kwargs)
 
2303
        finally:
 
2304
            sys.stdout = real_stdout
 
2305
            sys.stderr = real_stderr
 
2306
            sys.stdin = real_stdin
 
2307
 
 
2308
    def reduceLockdirTimeout(self):
 
2309
        """Reduce the default lock timeout for the duration of the test, so that
 
2310
        if LockContention occurs during a test, it does so quickly.
 
2311
 
 
2312
        Tests that expect to provoke LockContention errors should call this.
 
2313
        """
 
2314
        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2315
 
 
2316
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2317
        """Return a wrapped BytesIO, that will encode text input to UTF-8."""
 
2318
        if encoding_type is None:
 
2319
            encoding_type = 'strict'
 
2320
        bio = BytesIO()
 
2321
        output_encoding = 'utf-8'
 
2322
        sio = codecs.getwriter(output_encoding)(bio, errors=encoding_type)
 
2323
        sio.encoding = output_encoding
 
2324
        return sio
 
2325
 
 
2326
    def disable_verb(self, verb):
 
2327
        """Disable a smart server verb for one test."""
 
2328
        from breezy.bzr.smart import request
 
2329
        request_handlers = request.request_handlers
 
2330
        orig_method = request_handlers.get(verb)
 
2331
        orig_info = request_handlers.get_info(verb)
 
2332
        request_handlers.remove(verb)
 
2333
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2334
            info=orig_info)
 
2335
 
 
2336
    def __hash__(self):
 
2337
        return id(self)
 
2338
 
 
2339
 
 
2340
class CapturedCall(object):
 
2341
    """A helper for capturing smart server calls for easy debug analysis."""
 
2342
 
 
2343
    def __init__(self, params, prefix_length):
 
2344
        """Capture the call with params and skip prefix_length stack frames."""
 
2345
        self.call = params
 
2346
        import traceback
 
2347
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2348
        # client frames. Beyond this we could get more clever, but this is good
 
2349
        # enough for now.
 
2350
        stack = traceback.extract_stack()[prefix_length:-5]
 
2351
        self.stack = ''.join(traceback.format_list(stack))
 
2352
 
 
2353
    def __str__(self):
 
2354
        return self.call.method
 
2355
 
 
2356
    def __repr__(self):
 
2357
        return self.call.method
 
2358
 
 
2359
    def stack(self):
 
2360
        return self.stack
 
2361
 
 
2362
 
 
2363
class TestCaseWithMemoryTransport(TestCase):
 
2364
    """Common test class for tests that do not need disk resources.
 
2365
 
 
2366
    Tests that need disk resources should derive from TestCaseInTempDir
 
2367
    orTestCaseWithTransport.
 
2368
 
 
2369
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all brz tests.
 
2370
 
 
2371
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2372
    a directory which does not exist. This serves to help ensure test isolation
 
2373
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2374
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2375
    defaults for the transport in tests, nor does it obey the command line
 
2376
    override, so tests that accidentally write to the common directory should
 
2377
    be rare.
 
2378
 
 
2379
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2380
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2381
    """
 
2382
 
 
2383
    TEST_ROOT = None
 
2384
    _TEST_NAME = 'test'
 
2385
 
 
2386
    def __init__(self, methodName='runTest'):
 
2387
        # allow test parameterization after test construction and before test
 
2388
        # execution. Variables that the parameterizer sets need to be
 
2389
        # ones that are not set by setUp, or setUp will trash them.
 
2390
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2391
        self.vfs_transport_factory = default_transport
 
2392
        self.transport_server = None
 
2393
        self.transport_readonly_server = None
 
2394
        self.__vfs_server = None
 
2395
 
 
2396
    def setUp(self):
 
2397
        super(TestCaseWithMemoryTransport, self).setUp()
 
2398
 
 
2399
        def _add_disconnect_cleanup(transport):
 
2400
            """Schedule disconnection of given transport at test cleanup
 
2401
 
 
2402
            This needs to happen for all connected transports or leaks occur.
 
2403
 
 
2404
            Note reconnections may mean we call disconnect multiple times per
 
2405
            transport which is suboptimal but seems harmless.
 
2406
            """
 
2407
            self.addCleanup(transport.disconnect)
 
2408
 
 
2409
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
 
2410
            _add_disconnect_cleanup, None)
 
2411
 
 
2412
        self._make_test_root()
 
2413
        self.addCleanup(os.chdir, osutils.getcwd())
 
2414
        self.makeAndChdirToTestDir()
 
2415
        self.overrideEnvironmentForTesting()
 
2416
        self.__readonly_server = None
 
2417
        self.__server = None
 
2418
        self.reduceLockdirTimeout()
 
2419
        # Each test may use its own config files even if the local config files
 
2420
        # don't actually exist. They'll rightly fail if they try to create them
 
2421
        # though.
 
2422
        self.overrideAttr(config, '_shared_stores', {})
 
2423
 
 
2424
    def get_transport(self, relpath=None):
 
2425
        """Return a writeable transport.
 
2426
 
 
2427
        This transport is for the test scratch space relative to
 
2428
        "self._test_root"
 
2429
 
 
2430
        :param relpath: a path relative to the base url.
 
2431
        """
 
2432
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2433
        self.assertFalse(t.is_readonly())
 
2434
        return t
 
2435
 
 
2436
    def get_readonly_transport(self, relpath=None):
 
2437
        """Return a readonly transport for the test scratch space
 
2438
 
 
2439
        This can be used to test that operations which should only need
 
2440
        readonly access in fact do not try to write.
 
2441
 
 
2442
        :param relpath: a path relative to the base url.
 
2443
        """
 
2444
        t = _mod_transport.get_transport_from_url(
 
2445
            self.get_readonly_url(relpath))
 
2446
        self.assertTrue(t.is_readonly())
 
2447
        return t
 
2448
 
 
2449
    def create_transport_readonly_server(self):
 
2450
        """Create a transport server from class defined at init.
 
2451
 
 
2452
        This is mostly a hook for daughter classes.
 
2453
        """
 
2454
        return self.transport_readonly_server()
 
2455
 
 
2456
    def get_readonly_server(self):
 
2457
        """Get the server instance for the readonly transport
 
2458
 
 
2459
        This is useful for some tests with specific servers to do diagnostics.
 
2460
        """
 
2461
        if self.__readonly_server is None:
 
2462
            if self.transport_readonly_server is None:
 
2463
                # readonly decorator requested
 
2464
                self.__readonly_server = test_server.ReadonlyServer()
 
2465
            else:
 
2466
                # explicit readonly transport.
 
2467
                self.__readonly_server = self.create_transport_readonly_server()
 
2468
            self.start_server(self.__readonly_server,
 
2469
                self.get_vfs_only_server())
 
2470
        return self.__readonly_server
 
2471
 
 
2472
    def get_readonly_url(self, relpath=None):
 
2473
        """Get a URL for the readonly transport.
 
2474
 
 
2475
        This will either be backed by '.' or a decorator to the transport
 
2476
        used by self.get_url()
 
2477
        relpath provides for clients to get a path relative to the base url.
 
2478
        These should only be downwards relative, not upwards.
 
2479
        """
 
2480
        base = self.get_readonly_server().get_url()
 
2481
        return self._adjust_url(base, relpath)
 
2482
 
 
2483
    def get_vfs_only_server(self):
 
2484
        """Get the vfs only read/write server instance.
 
2485
 
 
2486
        This is useful for some tests with specific servers that need
 
2487
        diagnostics.
 
2488
 
 
2489
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2490
        is no means to override it.
 
2491
        """
 
2492
        if self.__vfs_server is None:
 
2493
            self.__vfs_server = memory.MemoryServer()
 
2494
            self.start_server(self.__vfs_server)
 
2495
        return self.__vfs_server
 
2496
 
 
2497
    def get_server(self):
 
2498
        """Get the read/write server instance.
 
2499
 
 
2500
        This is useful for some tests with specific servers that need
 
2501
        diagnostics.
 
2502
 
 
2503
        This is built from the self.transport_server factory. If that is None,
 
2504
        then the self.get_vfs_server is returned.
 
2505
        """
 
2506
        if self.__server is None:
 
2507
            if (self.transport_server is None or self.transport_server is
 
2508
                self.vfs_transport_factory):
 
2509
                self.__server = self.get_vfs_only_server()
 
2510
            else:
 
2511
                # bring up a decorated means of access to the vfs only server.
 
2512
                self.__server = self.transport_server()
 
2513
                self.start_server(self.__server, self.get_vfs_only_server())
 
2514
        return self.__server
 
2515
 
 
2516
    def _adjust_url(self, base, relpath):
 
2517
        """Get a URL (or maybe a path) for the readwrite transport.
 
2518
 
 
2519
        This will either be backed by '.' or to an equivalent non-file based
 
2520
        facility.
 
2521
        relpath provides for clients to get a path relative to the base url.
 
2522
        These should only be downwards relative, not upwards.
 
2523
        """
 
2524
        if relpath is not None and relpath != '.':
 
2525
            if not base.endswith('/'):
 
2526
                base = base + '/'
 
2527
            # XXX: Really base should be a url; we did after all call
 
2528
            # get_url()!  But sometimes it's just a path (from
 
2529
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2530
            # to a non-escaped local path.
 
2531
            if base.startswith('./') or base.startswith('/'):
 
2532
                base += relpath
 
2533
            else:
 
2534
                base += urlutils.escape(relpath)
 
2535
        return base
 
2536
 
 
2537
    def get_url(self, relpath=None):
 
2538
        """Get a URL (or maybe a path) for the readwrite transport.
 
2539
 
 
2540
        This will either be backed by '.' or to an equivalent non-file based
 
2541
        facility.
 
2542
        relpath provides for clients to get a path relative to the base url.
 
2543
        These should only be downwards relative, not upwards.
 
2544
        """
 
2545
        base = self.get_server().get_url()
 
2546
        return self._adjust_url(base, relpath)
 
2547
 
 
2548
    def get_vfs_only_url(self, relpath=None):
 
2549
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2550
 
 
2551
        This will never be a smart protocol.  It always has all the
 
2552
        capabilities of the local filesystem, but it might actually be a
 
2553
        MemoryTransport or some other similar virtual filesystem.
 
2554
 
 
2555
        This is the backing transport (if any) of the server returned by
 
2556
        get_url and get_readonly_url.
 
2557
 
 
2558
        :param relpath: provides for clients to get a path relative to the base
 
2559
            url.  These should only be downwards relative, not upwards.
 
2560
        :return: A URL
 
2561
        """
 
2562
        base = self.get_vfs_only_server().get_url()
 
2563
        return self._adjust_url(base, relpath)
 
2564
 
 
2565
    def _create_safety_net(self):
 
2566
        """Make a fake bzr directory.
 
2567
 
 
2568
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2569
        real branch.
 
2570
        """
 
2571
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2572
        try:
 
2573
            # Make sure we get a readable and accessible home for .brz.log
 
2574
            # and/or config files, and not fallback to weird defaults (see
 
2575
            # http://pad.lv/825027).
 
2576
            self.assertIs(None, os.environ.get('BRZ_HOME', None))
 
2577
            os.environ['BRZ_HOME'] = root
 
2578
            wt = controldir.ControlDir.create_standalone_workingtree(root)
 
2579
            del os.environ['BRZ_HOME']
 
2580
        except Exception as e:
 
2581
            self.fail("Fail to initialize the safety net: %r\n" % (e,))
 
2582
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2583
        # we don't need to re-open the wt to check it hasn't changed.
 
2584
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2585
            wt.control_transport.get_bytes('dirstate'))
 
2586
 
 
2587
    def _check_safety_net(self):
 
2588
        """Check that the safety .bzr directory have not been touched.
 
2589
 
 
2590
        _make_test_root have created a .bzr directory to prevent tests from
 
2591
        propagating. This method ensures than a test did not leaked.
 
2592
        """
 
2593
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2594
        t = _mod_transport.get_transport_from_path(root)
 
2595
        self.permit_url(t.base)
 
2596
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2597
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2598
            # The current test have modified the /bzr directory, we need to
 
2599
            # recreate a new one or all the followng tests will fail.
 
2600
            # If you need to inspect its content uncomment the following line
 
2601
            # import pdb; pdb.set_trace()
 
2602
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2603
            self._create_safety_net()
 
2604
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2605
 
 
2606
    def _make_test_root(self):
 
2607
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2608
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2609
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2610
                                                    suffix='.tmp'))
 
2611
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2612
 
 
2613
            self._create_safety_net()
 
2614
 
 
2615
            # The same directory is used by all tests, and we're not
 
2616
            # specifically told when all tests are finished.  This will do.
 
2617
            atexit.register(_rmtree_temp_dir, root)
 
2618
 
 
2619
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2620
        self.addCleanup(self._check_safety_net)
 
2621
 
 
2622
    def makeAndChdirToTestDir(self):
 
2623
        """Create a temporary directories for this one test.
 
2624
 
 
2625
        This must set self.test_home_dir and self.test_dir and chdir to
 
2626
        self.test_dir.
 
2627
 
 
2628
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2629
        """
 
2630
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2631
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2632
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2633
        self.permit_dir(self.test_dir)
 
2634
 
 
2635
    def make_branch(self, relpath, format=None, name=None):
 
2636
        """Create a branch on the transport at relpath."""
 
2637
        repo = self.make_repository(relpath, format=format)
 
2638
        return repo.controldir.create_branch(append_revisions_only=False, name=name)
 
2639
 
 
2640
    def get_default_format(self):
 
2641
        return 'default'
 
2642
 
 
2643
    def resolve_format(self, format):
 
2644
        """Resolve an object to a ControlDir format object.
 
2645
 
 
2646
        The initial format object can either already be
 
2647
        a ControlDirFormat, None (for the default format),
 
2648
        or a string with the name of the control dir format.
 
2649
 
 
2650
        :param format: Object to resolve
 
2651
        :return A ControlDirFormat instance
 
2652
        """
 
2653
        if format is None:
 
2654
            format = self.get_default_format()
 
2655
        if isinstance(format, str):
 
2656
            format = controldir.format_registry.make_controldir(format)
 
2657
        return format
 
2658
 
 
2659
    def make_controldir(self, relpath, format=None):
 
2660
        try:
 
2661
            # might be a relative or absolute path
 
2662
            maybe_a_url = self.get_url(relpath)
 
2663
            segments = maybe_a_url.rsplit('/', 1)
 
2664
            t = _mod_transport.get_transport(maybe_a_url)
 
2665
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2666
                t.ensure_base()
 
2667
            format = self.resolve_format(format)
 
2668
            return format.initialize_on_transport(t)
 
2669
        except errors.UninitializableFormat:
 
2670
            raise TestSkipped("Format %s is not initializable." % format)
 
2671
 
 
2672
    def make_repository(self, relpath, shared=None, format=None):
 
2673
        """Create a repository on our default transport at relpath.
 
2674
 
 
2675
        Note that relpath must be a relative path, not a full url.
 
2676
        """
 
2677
        # FIXME: If you create a remoterepository this returns the underlying
 
2678
        # real format, which is incorrect.  Actually we should make sure that
 
2679
        # RemoteBzrDir returns a RemoteRepository.
 
2680
        # maybe  mbp 20070410
 
2681
        made_control = self.make_controldir(relpath, format=format)
 
2682
        return made_control.create_repository(shared=shared)
 
2683
 
 
2684
    def make_smart_server(self, path, backing_server=None):
 
2685
        if backing_server is None:
 
2686
            backing_server = self.get_server()
 
2687
        smart_server = test_server.SmartTCPServer_for_testing()
 
2688
        self.start_server(smart_server, backing_server)
 
2689
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2690
                                                   ).clone(path)
 
2691
        return remote_transport
 
2692
 
 
2693
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2694
        """Create a branch on the default transport and a MemoryTree for it."""
 
2695
        b = self.make_branch(relpath, format=format)
 
2696
        return memorytree.MemoryTree.create_on_branch(b)
 
2697
 
 
2698
    def make_branch_builder(self, relpath, format=None):
 
2699
        branch = self.make_branch(relpath, format=format)
 
2700
        return branchbuilder.BranchBuilder(branch=branch)
 
2701
 
 
2702
    def overrideEnvironmentForTesting(self):
 
2703
        test_home_dir = self.test_home_dir
 
2704
        if not PY3 and isinstance(test_home_dir, text_type):
 
2705
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2706
        self.overrideEnv('HOME', test_home_dir)
 
2707
        self.overrideEnv('BRZ_HOME', test_home_dir)
 
2708
        self.overrideEnv('GNUPGHOME', os.path.join(test_home_dir, '.gnupg'))
 
2709
 
 
2710
    def setup_smart_server_with_call_log(self):
 
2711
        """Sets up a smart server as the transport server with a call log."""
 
2712
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2713
        self.hpss_connections = []
 
2714
        self.hpss_calls = []
 
2715
        import traceback
 
2716
        # Skip the current stack down to the caller of
 
2717
        # setup_smart_server_with_call_log
 
2718
        prefix_length = len(traceback.extract_stack()) - 2
 
2719
        def capture_hpss_call(params):
 
2720
            self.hpss_calls.append(
 
2721
                CapturedCall(params, prefix_length))
 
2722
        def capture_connect(transport):
 
2723
            self.hpss_connections.append(transport)
 
2724
        client._SmartClient.hooks.install_named_hook(
 
2725
            'call', capture_hpss_call, None)
 
2726
        _mod_transport.Transport.hooks.install_named_hook(
 
2727
            'post_connect', capture_connect, None)
 
2728
 
 
2729
    def reset_smart_call_log(self):
 
2730
        self.hpss_calls = []
 
2731
        self.hpss_connections = []
 
2732
 
 
2733
 
 
2734
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2735
    """Derived class that runs a test within a temporary directory.
 
2736
 
 
2737
    This is useful for tests that need to create a branch, etc.
 
2738
 
 
2739
    The directory is created in a slightly complex way: for each
 
2740
    Python invocation, a new temporary top-level directory is created.
 
2741
    All test cases create their own directory within that.  If the
 
2742
    tests complete successfully, the directory is removed.
 
2743
 
 
2744
    :ivar test_base_dir: The path of the top-level directory for this
 
2745
    test, which contains a home directory and a work directory.
 
2746
 
 
2747
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2748
    which is used as $HOME for this test.
 
2749
 
 
2750
    :ivar test_dir: A directory under test_base_dir used as the current
 
2751
    directory when the test proper is run.
 
2752
    """
 
2753
 
 
2754
    OVERRIDE_PYTHON = 'python'
 
2755
 
 
2756
    def setUp(self):
 
2757
        super(TestCaseInTempDir, self).setUp()
 
2758
        # Remove the protection set in isolated_environ, we have a proper
 
2759
        # access to disk resources now.
 
2760
        self.overrideEnv('BRZ_LOG', None)
 
2761
 
 
2762
    def check_file_contents(self, filename, expect):
 
2763
        self.log("check contents of file %s" % filename)
 
2764
        with open(filename) as f:
 
2765
            contents = f.read()
 
2766
        if contents != expect:
 
2767
            self.log("expected: %r" % expect)
 
2768
            self.log("actually: %r" % contents)
 
2769
            self.fail("contents of %s not as expected" % filename)
 
2770
 
 
2771
    def _getTestDirPrefix(self):
 
2772
        # create a directory within the top level test directory
 
2773
        if sys.platform in ('win32', 'cygwin'):
 
2774
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2775
            # windows is likely to have path-length limits so use a short name
 
2776
            name_prefix = name_prefix[-30:]
 
2777
        else:
 
2778
            name_prefix = re.sub('[/]', '_', self.id())
 
2779
        return name_prefix
 
2780
 
 
2781
    def makeAndChdirToTestDir(self):
 
2782
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2783
 
 
2784
        For TestCaseInTempDir we create a temporary directory based on the test
 
2785
        name and then create two subdirs - test and home under it.
 
2786
        """
 
2787
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2788
            self._getTestDirPrefix())
 
2789
        name = name_prefix
 
2790
        for i in range(100):
 
2791
            if os.path.exists(name):
 
2792
                name = name_prefix + '_' + str(i)
 
2793
            else:
 
2794
                # now create test and home directories within this dir
 
2795
                self.test_base_dir = name
 
2796
                self.addCleanup(self.deleteTestDir)
 
2797
                os.mkdir(self.test_base_dir)
 
2798
                break
 
2799
        self.permit_dir(self.test_base_dir)
 
2800
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2801
        # stacking policy to honour; create a bzr dir with an unshared
 
2802
        # repository (but not a branch - our code would be trying to escape
 
2803
        # then!) to stop them, and permit it to be read.
 
2804
        # control = controldir.ControlDir.create(self.test_base_dir)
 
2805
        # control.create_repository()
 
2806
        self.test_home_dir = self.test_base_dir + '/home'
 
2807
        os.mkdir(self.test_home_dir)
 
2808
        self.test_dir = self.test_base_dir + '/work'
 
2809
        os.mkdir(self.test_dir)
 
2810
        os.chdir(self.test_dir)
 
2811
        # put name of test inside
 
2812
        with open(self.test_base_dir + '/name', 'w') as f:
 
2813
            f.write(self.id())
 
2814
 
 
2815
    def deleteTestDir(self):
 
2816
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2817
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2818
 
 
2819
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2820
        """Build a test tree according to a pattern.
 
2821
 
 
2822
        shape is a sequence of file specifications.  If the final
 
2823
        character is '/', a directory is created.
 
2824
 
 
2825
        This assumes that all the elements in the tree being built are new.
 
2826
 
 
2827
        This doesn't add anything to a branch.
 
2828
 
 
2829
        :type shape:    list or tuple.
 
2830
        :param line_endings: Either 'binary' or 'native'
 
2831
            in binary mode, exact contents are written in native mode, the
 
2832
            line endings match the default platform endings.
 
2833
        :param transport: A transport to write to, for building trees on VFS's.
 
2834
            If the transport is readonly or None, "." is opened automatically.
 
2835
        :return: None
 
2836
        """
 
2837
        if type(shape) not in (list, tuple):
 
2838
            raise AssertionError("Parameter 'shape' should be "
 
2839
                "a list or a tuple. Got %r instead" % (shape,))
 
2840
        # It's OK to just create them using forward slashes on windows.
 
2841
        if transport is None or transport.is_readonly():
 
2842
            transport = _mod_transport.get_transport_from_path(".")
 
2843
        for name in shape:
 
2844
            self.assertIsInstance(name, (str, text_type))
 
2845
            if name[-1] == '/':
 
2846
                transport.mkdir(urlutils.escape(name[:-1]))
 
2847
            else:
 
2848
                if line_endings == 'binary':
 
2849
                    end = b'\n'
 
2850
                elif line_endings == 'native':
 
2851
                    end = os.linesep.encode('ascii')
 
2852
                else:
 
2853
                    raise errors.BzrError(
 
2854
                        'Invalid line ending request %r' % line_endings)
 
2855
                content = b"contents of %s%s" % (name.encode('utf-8'), end)
 
2856
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2857
 
 
2858
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2859
 
 
2860
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2861
        """Assert whether path or paths are in the WorkingTree"""
 
2862
        if tree is None:
 
2863
            tree = workingtree.WorkingTree.open(root_path)
 
2864
        if not isinstance(path, (str, text_type)):
 
2865
            for p in path:
 
2866
                self.assertInWorkingTree(p, tree=tree)
 
2867
        else:
 
2868
            self.assertTrue(tree.is_versioned(path),
 
2869
                path+' not in working tree.')
 
2870
 
 
2871
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2872
        """Assert whether path or paths are not in the WorkingTree"""
 
2873
        if tree is None:
 
2874
            tree = workingtree.WorkingTree.open(root_path)
 
2875
        if not isinstance(path, (str, text_type)):
 
2876
            for p in path:
 
2877
                self.assertNotInWorkingTree(p, tree=tree)
 
2878
        else:
 
2879
            self.assertFalse(tree.is_versioned(path), path+' in working tree.')
 
2880
 
 
2881
 
 
2882
class TestCaseWithTransport(TestCaseInTempDir):
 
2883
    """A test case that provides get_url and get_readonly_url facilities.
 
2884
 
 
2885
    These back onto two transport servers, one for readonly access and one for
 
2886
    read write access.
 
2887
 
 
2888
    If no explicit class is provided for readonly access, a
 
2889
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2890
    based read write transports.
 
2891
 
 
2892
    If an explicit class is provided for readonly access, that server and the
 
2893
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2894
    """
 
2895
 
 
2896
    def setUp(self):
 
2897
        super(TestCaseWithTransport, self).setUp()
 
2898
        self.__vfs_server = None
 
2899
 
 
2900
    def get_vfs_only_server(self):
 
2901
        """See TestCaseWithMemoryTransport.
 
2902
 
 
2903
        This is useful for some tests with specific servers that need
 
2904
        diagnostics.
 
2905
        """
 
2906
        if self.__vfs_server is None:
 
2907
            self.__vfs_server = self.vfs_transport_factory()
 
2908
            self.start_server(self.__vfs_server)
 
2909
        return self.__vfs_server
 
2910
 
 
2911
    def make_branch_and_tree(self, relpath, format=None):
 
2912
        """Create a branch on the transport and a tree locally.
 
2913
 
 
2914
        If the transport is not a LocalTransport, the Tree can't be created on
 
2915
        the transport.  In that case if the vfs_transport_factory is
 
2916
        LocalURLServer the working tree is created in the local
 
2917
        directory backing the transport, and the returned tree's branch and
 
2918
        repository will also be accessed locally. Otherwise a lightweight
 
2919
        checkout is created and returned.
 
2920
 
 
2921
        We do this because we can't physically create a tree in the local
 
2922
        path, with a branch reference to the transport_factory url, and
 
2923
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2924
        namespace is distinct from the local disk - the two branch objects
 
2925
        would collide. While we could construct a tree with its branch object
 
2926
        pointing at the transport_factory transport in memory, reopening it
 
2927
        would behaving unexpectedly, and has in the past caused testing bugs
 
2928
        when we tried to do it that way.
 
2929
 
 
2930
        :param format: The BzrDirFormat.
 
2931
        :returns: the WorkingTree.
 
2932
        """
 
2933
        # TODO: always use the local disk path for the working tree,
 
2934
        # this obviously requires a format that supports branch references
 
2935
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2936
        # RBC 20060208
 
2937
        format = self.resolve_format(format=format)
 
2938
        if not format.supports_workingtrees:
 
2939
            b = self.make_branch(relpath+'.branch', format=format)
 
2940
            return b.create_checkout(relpath, lightweight=True)
 
2941
        b = self.make_branch(relpath, format=format)
 
2942
        try:
 
2943
            return b.controldir.create_workingtree()
 
2944
        except errors.NotLocalUrl:
 
2945
            # We can only make working trees locally at the moment.  If the
 
2946
            # transport can't support them, then we keep the non-disk-backed
 
2947
            # branch and create a local checkout.
 
2948
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2949
                # the branch is colocated on disk, we cannot create a checkout.
 
2950
                # hopefully callers will expect this.
 
2951
                local_controldir = controldir.ControlDir.open(
 
2952
                    self.get_vfs_only_url(relpath))
 
2953
                wt = local_controldir.create_workingtree()
 
2954
                if wt.branch._format != b._format:
 
2955
                    wt._branch = b
 
2956
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2957
                    # in case the implementation details of workingtree objects
 
2958
                    # change.
 
2959
                    self.assertIs(b, wt.branch)
 
2960
                return wt
 
2961
            else:
 
2962
                return b.create_checkout(relpath, lightweight=True)
 
2963
 
 
2964
    def assertIsDirectory(self, relpath, transport):
 
2965
        """Assert that relpath within transport is a directory.
 
2966
 
 
2967
        This may not be possible on all transports; in that case it propagates
 
2968
        a TransportNotPossible.
 
2969
        """
 
2970
        try:
 
2971
            mode = transport.stat(relpath).st_mode
 
2972
        except errors.NoSuchFile:
 
2973
            self.fail("path %s is not a directory; no such file"
 
2974
                      % (relpath))
 
2975
        if not stat.S_ISDIR(mode):
 
2976
            self.fail("path %s is not a directory; has mode %#o"
 
2977
                      % (relpath, mode))
 
2978
 
 
2979
    def assertTreesEqual(self, left, right):
 
2980
        """Check that left and right have the same content and properties."""
 
2981
        # we use a tree delta to check for equality of the content, and we
 
2982
        # manually check for equality of other things such as the parents list.
 
2983
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2984
        differences = left.changes_from(right)
 
2985
        self.assertFalse(differences.has_changed(),
 
2986
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2987
 
 
2988
    def disable_missing_extensions_warning(self):
 
2989
        """Some tests expect a precise stderr content.
 
2990
 
 
2991
        There is no point in forcing them to duplicate the extension related
 
2992
        warning.
 
2993
        """
 
2994
        config.GlobalConfig().set_user_option(
 
2995
            'suppress_warnings', 'missing_extensions')
 
2996
 
 
2997
 
 
2998
class ChrootedTestCase(TestCaseWithTransport):
 
2999
    """A support class that provides readonly urls outside the local namespace.
 
3000
 
 
3001
    This is done by checking if self.transport_server is a MemoryServer. if it
 
3002
    is then we are chrooted already, if it is not then an HttpServer is used
 
3003
    for readonly urls.
 
3004
 
 
3005
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
3006
                       be used without needed to redo it when a different
 
3007
                       subclass is in use ?
 
3008
    """
 
3009
 
 
3010
    def setUp(self):
 
3011
        from breezy.tests import http_server
 
3012
        super(ChrootedTestCase, self).setUp()
 
3013
        if not self.vfs_transport_factory == memory.MemoryServer:
 
3014
            self.transport_readonly_server = http_server.HttpServer
 
3015
 
 
3016
 
 
3017
def condition_id_re(pattern):
 
3018
    """Create a condition filter which performs a re check on a test's id.
 
3019
 
 
3020
    :param pattern: A regular expression string.
 
3021
    :return: A callable that returns True if the re matches.
 
3022
    """
 
3023
    filter_re = re.compile(pattern, 0)
 
3024
    def condition(test):
 
3025
        test_id = test.id()
 
3026
        return filter_re.search(test_id)
 
3027
    return condition
 
3028
 
 
3029
 
 
3030
def condition_isinstance(klass_or_klass_list):
 
3031
    """Create a condition filter which returns isinstance(param, klass).
 
3032
 
 
3033
    :return: A callable which when called with one parameter obj return the
 
3034
        result of isinstance(obj, klass_or_klass_list).
 
3035
    """
 
3036
    def condition(obj):
 
3037
        return isinstance(obj, klass_or_klass_list)
 
3038
    return condition
 
3039
 
 
3040
 
 
3041
def condition_id_in_list(id_list):
 
3042
    """Create a condition filter which verify that test's id in a list.
 
3043
 
 
3044
    :param id_list: A TestIdList object.
 
3045
    :return: A callable that returns True if the test's id appears in the list.
 
3046
    """
 
3047
    def condition(test):
 
3048
        return id_list.includes(test.id())
 
3049
    return condition
 
3050
 
 
3051
 
 
3052
def condition_id_startswith(starts):
 
3053
    """Create a condition filter verifying that test's id starts with a string.
 
3054
 
 
3055
    :param starts: A list of string.
 
3056
    :return: A callable that returns True if the test's id starts with one of
 
3057
        the given strings.
 
3058
    """
 
3059
    def condition(test):
 
3060
        for start in starts:
 
3061
            if test.id().startswith(start):
 
3062
                return True
 
3063
        return False
 
3064
    return condition
 
3065
 
 
3066
 
 
3067
def exclude_tests_by_condition(suite, condition):
 
3068
    """Create a test suite which excludes some tests from suite.
 
3069
 
 
3070
    :param suite: The suite to get tests from.
 
3071
    :param condition: A callable whose result evaluates True when called with a
 
3072
        test case which should be excluded from the result.
 
3073
    :return: A suite which contains the tests found in suite that fail
 
3074
        condition.
 
3075
    """
 
3076
    result = []
 
3077
    for test in iter_suite_tests(suite):
 
3078
        if not condition(test):
 
3079
            result.append(test)
 
3080
    return TestUtil.TestSuite(result)
 
3081
 
 
3082
 
 
3083
def filter_suite_by_condition(suite, condition):
 
3084
    """Create a test suite by filtering another one.
 
3085
 
 
3086
    :param suite: The source suite.
 
3087
    :param condition: A callable whose result evaluates True when called with a
 
3088
        test case which should be included in the result.
 
3089
    :return: A suite which contains the tests found in suite that pass
 
3090
        condition.
 
3091
    """
 
3092
    result = []
 
3093
    for test in iter_suite_tests(suite):
 
3094
        if condition(test):
 
3095
            result.append(test)
 
3096
    return TestUtil.TestSuite(result)
 
3097
 
 
3098
 
 
3099
def filter_suite_by_re(suite, pattern):
 
3100
    """Create a test suite by filtering another one.
 
3101
 
 
3102
    :param suite:           the source suite
 
3103
    :param pattern:         pattern that names must match
 
3104
    :returns: the newly created suite
 
3105
    """
 
3106
    condition = condition_id_re(pattern)
 
3107
    result_suite = filter_suite_by_condition(suite, condition)
 
3108
    return result_suite
 
3109
 
 
3110
 
 
3111
def filter_suite_by_id_list(suite, test_id_list):
 
3112
    """Create a test suite by filtering another one.
 
3113
 
 
3114
    :param suite: The source suite.
 
3115
    :param test_id_list: A list of the test ids to keep as strings.
 
3116
    :returns: the newly created suite
 
3117
    """
 
3118
    condition = condition_id_in_list(test_id_list)
 
3119
    result_suite = filter_suite_by_condition(suite, condition)
 
3120
    return result_suite
 
3121
 
 
3122
 
 
3123
def filter_suite_by_id_startswith(suite, start):
 
3124
    """Create a test suite by filtering another one.
 
3125
 
 
3126
    :param suite: The source suite.
 
3127
    :param start: A list of string the test id must start with one of.
 
3128
    :returns: the newly created suite
 
3129
    """
 
3130
    condition = condition_id_startswith(start)
 
3131
    result_suite = filter_suite_by_condition(suite, condition)
 
3132
    return result_suite
 
3133
 
 
3134
 
 
3135
def exclude_tests_by_re(suite, pattern):
 
3136
    """Create a test suite which excludes some tests from suite.
 
3137
 
 
3138
    :param suite: The suite to get tests from.
 
3139
    :param pattern: A regular expression string. Test ids that match this
 
3140
        pattern will be excluded from the result.
 
3141
    :return: A TestSuite that contains all the tests from suite without the
 
3142
        tests that matched pattern. The order of tests is the same as it was in
 
3143
        suite.
 
3144
    """
 
3145
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
3146
 
 
3147
 
 
3148
def preserve_input(something):
 
3149
    """A helper for performing test suite transformation chains.
 
3150
 
 
3151
    :param something: Anything you want to preserve.
 
3152
    :return: Something.
 
3153
    """
 
3154
    return something
 
3155
 
 
3156
 
 
3157
def randomize_suite(suite):
 
3158
    """Return a new TestSuite with suite's tests in random order.
 
3159
 
 
3160
    The tests in the input suite are flattened into a single suite in order to
 
3161
    accomplish this. Any nested TestSuites are removed to provide global
 
3162
    randomness.
 
3163
    """
 
3164
    tests = list(iter_suite_tests(suite))
 
3165
    random.shuffle(tests)
 
3166
    return TestUtil.TestSuite(tests)
 
3167
 
 
3168
 
 
3169
def split_suite_by_condition(suite, condition):
 
3170
    """Split a test suite into two by a condition.
 
3171
 
 
3172
    :param suite: The suite to split.
 
3173
    :param condition: The condition to match on. Tests that match this
 
3174
        condition are returned in the first test suite, ones that do not match
 
3175
        are in the second suite.
 
3176
    :return: A tuple of two test suites, where the first contains tests from
 
3177
        suite matching the condition, and the second contains the remainder
 
3178
        from suite. The order within each output suite is the same as it was in
 
3179
        suite.
 
3180
    """
 
3181
    matched = []
 
3182
    did_not_match = []
 
3183
    for test in iter_suite_tests(suite):
 
3184
        if condition(test):
 
3185
            matched.append(test)
 
3186
        else:
 
3187
            did_not_match.append(test)
 
3188
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
3189
 
 
3190
 
 
3191
def split_suite_by_re(suite, pattern):
 
3192
    """Split a test suite into two by a regular expression.
 
3193
 
 
3194
    :param suite: The suite to split.
 
3195
    :param pattern: A regular expression string. Test ids that match this
 
3196
        pattern will be in the first test suite returned, and the others in the
 
3197
        second test suite returned.
 
3198
    :return: A tuple of two test suites, where the first contains tests from
 
3199
        suite matching pattern, and the second contains the remainder from
 
3200
        suite. The order within each output suite is the same as it was in
 
3201
        suite.
 
3202
    """
 
3203
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
3204
 
 
3205
 
 
3206
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
3207
              stop_on_failure=False,
 
3208
              transport=None, lsprof_timed=None, bench_history=None,
 
3209
              matching_tests_first=None,
 
3210
              list_only=False,
 
3211
              random_seed=None,
 
3212
              exclude_pattern=None,
 
3213
              strict=False,
 
3214
              runner_class=None,
 
3215
              suite_decorators=None,
 
3216
              stream=None,
 
3217
              result_decorators=None,
 
3218
              ):
 
3219
    """Run a test suite for brz selftest.
 
3220
 
 
3221
    :param runner_class: The class of runner to use. Must support the
 
3222
        constructor arguments passed by run_suite which are more than standard
 
3223
        python uses.
 
3224
    :return: A boolean indicating success.
 
3225
    """
 
3226
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
3227
    if verbose:
 
3228
        verbosity = 2
 
3229
    else:
 
3230
        verbosity = 1
 
3231
    if runner_class is None:
 
3232
        runner_class = TextTestRunner
 
3233
    if stream is None:
 
3234
        stream = sys.stdout
 
3235
    runner = runner_class(stream=stream,
 
3236
                            descriptions=0,
 
3237
                            verbosity=verbosity,
 
3238
                            bench_history=bench_history,
 
3239
                            strict=strict,
 
3240
                            result_decorators=result_decorators,
 
3241
                            )
 
3242
    runner.stop_on_failure=stop_on_failure
 
3243
    if isinstance(suite, unittest.TestSuite):
 
3244
        # Empty out _tests list of passed suite and populate new TestSuite
 
3245
        suite._tests[:], suite = [], TestSuite(suite)
 
3246
    # built in decorator factories:
 
3247
    decorators = [
 
3248
        random_order(random_seed, runner),
 
3249
        exclude_tests(exclude_pattern),
 
3250
        ]
 
3251
    if matching_tests_first:
 
3252
        decorators.append(tests_first(pattern))
 
3253
    else:
 
3254
        decorators.append(filter_tests(pattern))
 
3255
    if suite_decorators:
 
3256
        decorators.extend(suite_decorators)
 
3257
    # tell the result object how many tests will be running: (except if
 
3258
    # --parallel=fork is being used. Robert said he will provide a better
 
3259
    # progress design later -- vila 20090817)
 
3260
    if fork_decorator not in decorators:
 
3261
        decorators.append(CountingDecorator)
 
3262
    for decorator in decorators:
 
3263
        suite = decorator(suite)
 
3264
    if list_only:
 
3265
        # Done after test suite decoration to allow randomisation etc
 
3266
        # to take effect, though that is of marginal benefit.
 
3267
        if verbosity >= 2:
 
3268
            stream.write("Listing tests only ...\n")
 
3269
        if getattr(runner, 'list', None) is not None:
 
3270
            runner.list(suite)
 
3271
        else:
 
3272
            for t in iter_suite_tests(suite):
 
3273
                stream.write("%s\n" % (t.id()))
 
3274
        return True
 
3275
    result = runner.run(suite)
 
3276
    if strict:
 
3277
        return result.wasStrictlySuccessful()
 
3278
    else:
 
3279
        return result.wasSuccessful()
 
3280
 
 
3281
 
 
3282
# A registry where get() returns a suite decorator.
 
3283
parallel_registry = registry.Registry()
 
3284
 
 
3285
 
 
3286
def fork_decorator(suite):
 
3287
    if getattr(os, "fork", None) is None:
 
3288
        raise errors.BzrCommandError("platform does not support fork,"
 
3289
            " try --parallel=subprocess instead.")
 
3290
    concurrency = osutils.local_concurrency()
 
3291
    if concurrency == 1:
 
3292
        return suite
 
3293
    from testtools import ConcurrentTestSuite
 
3294
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3295
parallel_registry.register('fork', fork_decorator)
 
3296
 
 
3297
 
 
3298
def subprocess_decorator(suite):
 
3299
    concurrency = osutils.local_concurrency()
 
3300
    if concurrency == 1:
 
3301
        return suite
 
3302
    from testtools import ConcurrentTestSuite
 
3303
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3304
parallel_registry.register('subprocess', subprocess_decorator)
 
3305
 
 
3306
 
 
3307
def exclude_tests(exclude_pattern):
 
3308
    """Return a test suite decorator that excludes tests."""
 
3309
    if exclude_pattern is None:
 
3310
        return identity_decorator
 
3311
    def decorator(suite):
 
3312
        return ExcludeDecorator(suite, exclude_pattern)
 
3313
    return decorator
 
3314
 
 
3315
 
 
3316
def filter_tests(pattern):
 
3317
    if pattern == '.*':
 
3318
        return identity_decorator
 
3319
    def decorator(suite):
 
3320
        return FilterTestsDecorator(suite, pattern)
 
3321
    return decorator
 
3322
 
 
3323
 
 
3324
def random_order(random_seed, runner):
 
3325
    """Return a test suite decorator factory for randomising tests order.
 
3326
 
 
3327
    :param random_seed: now, a string which casts to an integer, or an integer.
 
3328
    :param runner: A test runner with a stream attribute to report on.
 
3329
    """
 
3330
    if random_seed is None:
 
3331
        return identity_decorator
 
3332
    def decorator(suite):
 
3333
        return RandomDecorator(suite, random_seed, runner.stream)
 
3334
    return decorator
 
3335
 
 
3336
 
 
3337
def tests_first(pattern):
 
3338
    if pattern == '.*':
 
3339
        return identity_decorator
 
3340
    def decorator(suite):
 
3341
        return TestFirstDecorator(suite, pattern)
 
3342
    return decorator
 
3343
 
 
3344
 
 
3345
def identity_decorator(suite):
 
3346
    """Return suite."""
 
3347
    return suite
 
3348
 
 
3349
 
 
3350
class TestDecorator(TestUtil.TestSuite):
 
3351
    """A decorator for TestCase/TestSuite objects.
 
3352
 
 
3353
    Contains rather than flattening suite passed on construction
 
3354
    """
 
3355
 
 
3356
    def __init__(self, suite=None):
 
3357
        super(TestDecorator, self).__init__()
 
3358
        if suite is not None:
 
3359
            self.addTest(suite)
 
3360
 
 
3361
    # Don't need subclass run method with suite emptying
 
3362
    run = unittest.TestSuite.run
 
3363
 
 
3364
 
 
3365
class CountingDecorator(TestDecorator):
 
3366
    """A decorator which calls result.progress(self.countTestCases)."""
 
3367
 
 
3368
    def run(self, result):
 
3369
        progress_method = getattr(result, 'progress', None)
 
3370
        if callable(progress_method):
 
3371
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3372
        return super(CountingDecorator, self).run(result)
 
3373
 
 
3374
 
 
3375
class ExcludeDecorator(TestDecorator):
 
3376
    """A decorator which excludes test matching an exclude pattern."""
 
3377
 
 
3378
    def __init__(self, suite, exclude_pattern):
 
3379
        super(ExcludeDecorator, self).__init__(
 
3380
            exclude_tests_by_re(suite, exclude_pattern))
 
3381
 
 
3382
 
 
3383
class FilterTestsDecorator(TestDecorator):
 
3384
    """A decorator which filters tests to those matching a pattern."""
 
3385
 
 
3386
    def __init__(self, suite, pattern):
 
3387
        super(FilterTestsDecorator, self).__init__(
 
3388
            filter_suite_by_re(suite, pattern))
 
3389
 
 
3390
 
 
3391
class RandomDecorator(TestDecorator):
 
3392
    """A decorator which randomises the order of its tests."""
 
3393
 
 
3394
    def __init__(self, suite, random_seed, stream):
 
3395
        random_seed = self.actual_seed(random_seed)
 
3396
        stream.write("Randomizing test order using seed %s\n\n" %
 
3397
            (random_seed,))
 
3398
        # Initialise the random number generator.
 
3399
        random.seed(random_seed)
 
3400
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3401
 
 
3402
    @staticmethod
 
3403
    def actual_seed(seed):
 
3404
        if seed == "now":
 
3405
            # We convert the seed to an integer to make it reuseable across
 
3406
            # invocations (because the user can reenter it).
 
3407
            return int(time.time())
 
3408
        else:
 
3409
            # Convert the seed to an integer if we can
 
3410
            try:
 
3411
                return int(seed)
 
3412
            except (TypeError, ValueError):
 
3413
                pass
 
3414
        return seed
 
3415
 
 
3416
 
 
3417
class TestFirstDecorator(TestDecorator):
 
3418
    """A decorator which moves named tests to the front."""
 
3419
 
 
3420
    def __init__(self, suite, pattern):
 
3421
        super(TestFirstDecorator, self).__init__()
 
3422
        self.addTests(split_suite_by_re(suite, pattern))
 
3423
 
 
3424
 
 
3425
def partition_tests(suite, count):
 
3426
    """Partition suite into count lists of tests."""
 
3427
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3428
    # splits up blocks of related tests that might run faster if they shared
 
3429
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3430
    # just one partition.  So the slowest partition shouldn't be much slower
 
3431
    # than the fastest.
 
3432
    partitions = [list() for i in range(count)]
 
3433
    tests = iter_suite_tests(suite)
 
3434
    for partition, test in zip(itertools.cycle(partitions), tests):
 
3435
        partition.append(test)
 
3436
    return partitions
 
3437
 
 
3438
 
 
3439
def workaround_zealous_crypto_random():
 
3440
    """Crypto.Random want to help us being secure, but we don't care here.
 
3441
 
 
3442
    This workaround some test failure related to the sftp server. Once paramiko
 
3443
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3444
    """
 
3445
    try:
 
3446
        from Crypto.Random import atfork
 
3447
        atfork()
 
3448
    except ImportError:
 
3449
        pass
 
3450
 
 
3451
 
 
3452
def fork_for_tests(suite):
 
3453
    """Take suite and start up one runner per CPU by forking()
 
3454
 
 
3455
    :return: An iterable of TestCase-like objects which can each have
 
3456
        run(result) called on them to feed tests to result.
 
3457
    """
 
3458
    concurrency = osutils.local_concurrency()
 
3459
    result = []
 
3460
    from subunit import ProtocolTestCase
 
3461
    from subunit.test_results import AutoTimingTestResultDecorator
 
3462
    class TestInOtherProcess(ProtocolTestCase):
 
3463
        # Should be in subunit, I think. RBC.
 
3464
        def __init__(self, stream, pid):
 
3465
            ProtocolTestCase.__init__(self, stream)
 
3466
            self.pid = pid
 
3467
 
 
3468
        def run(self, result):
 
3469
            try:
 
3470
                ProtocolTestCase.run(self, result)
 
3471
            finally:
 
3472
                pid, status = os.waitpid(self.pid, 0)
 
3473
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3474
            #                that something went wrong.
 
3475
 
 
3476
    test_blocks = partition_tests(suite, concurrency)
 
3477
    # Clear the tests from the original suite so it doesn't keep them alive
 
3478
    suite._tests[:] = []
 
3479
    for process_tests in test_blocks:
 
3480
        process_suite = TestUtil.TestSuite(process_tests)
 
3481
        # Also clear each split list so new suite has only reference
 
3482
        process_tests[:] = []
 
3483
        c2pread, c2pwrite = os.pipe()
 
3484
        pid = os.fork()
 
3485
        if pid == 0:
 
3486
            try:
 
3487
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3488
                workaround_zealous_crypto_random()
 
3489
                os.close(c2pread)
 
3490
                # Leave stderr and stdout open so we can see test noise
 
3491
                # Close stdin so that the child goes away if it decides to
 
3492
                # read from stdin (otherwise its a roulette to see what
 
3493
                # child actually gets keystrokes for pdb etc).
 
3494
                sys.stdin.close()
 
3495
                subunit_result = AutoTimingTestResultDecorator(
 
3496
                    SubUnitBzrProtocolClientv1(stream))
 
3497
                process_suite.run(subunit_result)
 
3498
            except:
 
3499
                # Try and report traceback on stream, but exit with error even
 
3500
                # if stream couldn't be created or something else goes wrong.
 
3501
                # The traceback is formatted to a string and written in one go
 
3502
                # to avoid interleaving lines from multiple failing children.
 
3503
                try:
 
3504
                    stream.write(traceback.format_exc())
 
3505
                finally:
 
3506
                    os._exit(1)
 
3507
            os._exit(0)
 
3508
        else:
 
3509
            os.close(c2pwrite)
 
3510
            stream = os.fdopen(c2pread, 'rb', 1)
 
3511
            test = TestInOtherProcess(stream, pid)
 
3512
            result.append(test)
 
3513
    return result
 
3514
 
 
3515
 
 
3516
def reinvoke_for_tests(suite):
 
3517
    """Take suite and start up one runner per CPU using subprocess().
 
3518
 
 
3519
    :return: An iterable of TestCase-like objects which can each have
 
3520
        run(result) called on them to feed tests to result.
 
3521
    """
 
3522
    concurrency = osutils.local_concurrency()
 
3523
    result = []
 
3524
    from subunit import ProtocolTestCase
 
3525
    class TestInSubprocess(ProtocolTestCase):
 
3526
        def __init__(self, process, name):
 
3527
            ProtocolTestCase.__init__(self, process.stdout)
 
3528
            self.process = process
 
3529
            self.process.stdin.close()
 
3530
            self.name = name
 
3531
 
 
3532
        def run(self, result):
 
3533
            try:
 
3534
                ProtocolTestCase.run(self, result)
 
3535
            finally:
 
3536
                self.process.wait()
 
3537
                os.unlink(self.name)
 
3538
            # print "pid %d finished" % finished_process
 
3539
    test_blocks = partition_tests(suite, concurrency)
 
3540
    for process_tests in test_blocks:
 
3541
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3542
        bzr_path = os.path.dirname(os.path.dirname(breezy.__file__))+'/bzr'
 
3543
        if not os.path.isfile(bzr_path):
 
3544
            # We are probably installed. Assume sys.argv is the right file
 
3545
            bzr_path = sys.argv[0]
 
3546
        bzr_path = [bzr_path]
 
3547
        if sys.platform == "win32":
 
3548
            # if we're on windows, we can't execute the bzr script directly
 
3549
            bzr_path = [sys.executable] + bzr_path
 
3550
        fd, test_list_file_name = tempfile.mkstemp()
 
3551
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3552
        for test in process_tests:
 
3553
            test_list_file.write(test.id() + '\n')
 
3554
        test_list_file.close()
 
3555
        try:
 
3556
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3557
                '--subunit']
 
3558
            if '--no-plugins' in sys.argv:
 
3559
                argv.append('--no-plugins')
 
3560
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3561
            # noise on stderr it can interrupt the subunit protocol.
 
3562
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3563
                                      stdout=subprocess.PIPE,
 
3564
                                      stderr=subprocess.PIPE,
 
3565
                                      bufsize=1)
 
3566
            test = TestInSubprocess(process, test_list_file_name)
 
3567
            result.append(test)
 
3568
        except:
 
3569
            os.unlink(test_list_file_name)
 
3570
            raise
 
3571
    return result
 
3572
 
 
3573
 
 
3574
class ProfileResult(testtools.ExtendedToOriginalDecorator):
 
3575
    """Generate profiling data for all activity between start and success.
 
3576
    
 
3577
    The profile data is appended to the test's _benchcalls attribute and can
 
3578
    be accessed by the forwarded-to TestResult.
 
3579
 
 
3580
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3581
    where our existing output support for lsprof is, and this class aims to
 
3582
    fit in with that: while it could be moved it's not necessary to accomplish
 
3583
    test profiling, nor would it be dramatically cleaner.
 
3584
    """
 
3585
 
 
3586
    def startTest(self, test):
 
3587
        self.profiler = breezy.lsprof.BzrProfiler()
 
3588
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3589
        # unavoidably fail.
 
3590
        breezy.lsprof.BzrProfiler.profiler_block = 0
 
3591
        self.profiler.start()
 
3592
        testtools.ExtendedToOriginalDecorator.startTest(self, test)
 
3593
 
 
3594
    def addSuccess(self, test):
 
3595
        stats = self.profiler.stop()
 
3596
        try:
 
3597
            calls = test._benchcalls
 
3598
        except AttributeError:
 
3599
            test._benchcalls = []
 
3600
            calls = test._benchcalls
 
3601
        calls.append(((test.id(), "", ""), stats))
 
3602
        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
 
3603
 
 
3604
    def stopTest(self, test):
 
3605
        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
 
3606
        self.profiler = None
 
3607
 
 
3608
 
 
3609
# Controlled by "brz selftest -E=..." option
 
3610
# Currently supported:
 
3611
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3612
#                           preserves any flags supplied at the command line.
 
3613
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3614
#                           rather than failing tests. And no longer raise
 
3615
#                           LockContention when fctnl locks are not being used
 
3616
#                           with proper exclusion rules.
 
3617
#   -Ethreads               Will display thread ident at creation/join time to
 
3618
#                           help track thread leaks
 
3619
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3620
#                           deallocated after being completed.
 
3621
#   -Econfig_stats          Will collect statistics using addDetail
 
3622
selftest_debug_flags = set()
 
3623
 
 
3624
 
 
3625
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3626
             transport=None,
 
3627
             test_suite_factory=None,
 
3628
             lsprof_timed=None,
 
3629
             bench_history=None,
 
3630
             matching_tests_first=None,
 
3631
             list_only=False,
 
3632
             random_seed=None,
 
3633
             exclude_pattern=None,
 
3634
             strict=False,
 
3635
             load_list=None,
 
3636
             debug_flags=None,
 
3637
             starting_with=None,
 
3638
             runner_class=None,
 
3639
             suite_decorators=None,
 
3640
             stream=None,
 
3641
             lsprof_tests=False,
 
3642
             ):
 
3643
    """Run the whole test suite under the enhanced runner"""
 
3644
    # XXX: Very ugly way to do this...
 
3645
    # Disable warning about old formats because we don't want it to disturb
 
3646
    # any blackbox tests.
 
3647
    from breezy import repository
 
3648
    repository._deprecation_warning_done = True
 
3649
 
 
3650
    global default_transport
 
3651
    if transport is None:
 
3652
        transport = default_transport
 
3653
    old_transport = default_transport
 
3654
    default_transport = transport
 
3655
    global selftest_debug_flags
 
3656
    old_debug_flags = selftest_debug_flags
 
3657
    if debug_flags is not None:
 
3658
        selftest_debug_flags = set(debug_flags)
 
3659
    try:
 
3660
        if load_list is None:
 
3661
            keep_only = None
 
3662
        else:
 
3663
            keep_only = load_test_id_list(load_list)
 
3664
        if starting_with:
 
3665
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3666
                             for start in starting_with]
 
3667
            # Always consider 'unittest' an interesting name so that failed
 
3668
            # suites wrapped as test cases appear in the output.
 
3669
            starting_with.append('unittest')
 
3670
        if test_suite_factory is None:
 
3671
            # Reduce loading time by loading modules based on the starting_with
 
3672
            # patterns.
 
3673
            suite = test_suite(keep_only, starting_with)
 
3674
        else:
 
3675
            suite = test_suite_factory()
 
3676
        if starting_with:
 
3677
            # But always filter as requested.
 
3678
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3679
        result_decorators = []
 
3680
        if lsprof_tests:
 
3681
            result_decorators.append(ProfileResult)
 
3682
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3683
                     stop_on_failure=stop_on_failure,
 
3684
                     transport=transport,
 
3685
                     lsprof_timed=lsprof_timed,
 
3686
                     bench_history=bench_history,
 
3687
                     matching_tests_first=matching_tests_first,
 
3688
                     list_only=list_only,
 
3689
                     random_seed=random_seed,
 
3690
                     exclude_pattern=exclude_pattern,
 
3691
                     strict=strict,
 
3692
                     runner_class=runner_class,
 
3693
                     suite_decorators=suite_decorators,
 
3694
                     stream=stream,
 
3695
                     result_decorators=result_decorators,
 
3696
                     )
 
3697
    finally:
 
3698
        default_transport = old_transport
 
3699
        selftest_debug_flags = old_debug_flags
 
3700
 
 
3701
 
 
3702
def load_test_id_list(file_name):
 
3703
    """Load a test id list from a text file.
 
3704
 
 
3705
    The format is one test id by line.  No special care is taken to impose
 
3706
    strict rules, these test ids are used to filter the test suite so a test id
 
3707
    that do not match an existing test will do no harm. This allows user to add
 
3708
    comments, leave blank lines, etc.
 
3709
    """
 
3710
    test_list = []
 
3711
    try:
 
3712
        ftest = open(file_name, 'rt')
 
3713
    except IOError as e:
 
3714
        if e.errno != errno.ENOENT:
 
3715
            raise
 
3716
        else:
 
3717
            raise errors.NoSuchFile(file_name)
 
3718
 
 
3719
    for test_name in ftest.readlines():
 
3720
        test_list.append(test_name.strip())
 
3721
    ftest.close()
 
3722
    return test_list
 
3723
 
 
3724
 
 
3725
def suite_matches_id_list(test_suite, id_list):
 
3726
    """Warns about tests not appearing or appearing more than once.
 
3727
 
 
3728
    :param test_suite: A TestSuite object.
 
3729
    :param test_id_list: The list of test ids that should be found in
 
3730
         test_suite.
 
3731
 
 
3732
    :return: (absents, duplicates) absents is a list containing the test found
 
3733
        in id_list but not in test_suite, duplicates is a list containing the
 
3734
        tests found multiple times in test_suite.
 
3735
 
 
3736
    When using a prefined test id list, it may occurs that some tests do not
 
3737
    exist anymore or that some tests use the same id. This function warns the
 
3738
    tester about potential problems in his workflow (test lists are volatile)
 
3739
    or in the test suite itself (using the same id for several tests does not
 
3740
    help to localize defects).
 
3741
    """
 
3742
    # Build a dict counting id occurrences
 
3743
    tests = dict()
 
3744
    for test in iter_suite_tests(test_suite):
 
3745
        id = test.id()
 
3746
        tests[id] = tests.get(id, 0) + 1
 
3747
 
 
3748
    not_found = []
 
3749
    duplicates = []
 
3750
    for id in id_list:
 
3751
        occurs = tests.get(id, 0)
 
3752
        if not occurs:
 
3753
            not_found.append(id)
 
3754
        elif occurs > 1:
 
3755
            duplicates.append(id)
 
3756
 
 
3757
    return not_found, duplicates
 
3758
 
 
3759
 
 
3760
class TestIdList(object):
 
3761
    """Test id list to filter a test suite.
 
3762
 
 
3763
    Relying on the assumption that test ids are built as:
 
3764
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3765
    notation, this class offers methods to :
 
3766
    - avoid building a test suite for modules not refered to in the test list,
 
3767
    - keep only the tests listed from the module test suite.
 
3768
    """
 
3769
 
 
3770
    def __init__(self, test_id_list):
 
3771
        # When a test suite needs to be filtered against us we compare test ids
 
3772
        # for equality, so a simple dict offers a quick and simple solution.
 
3773
        self.tests = dict().fromkeys(test_id_list, True)
 
3774
 
 
3775
        # While unittest.TestCase have ids like:
 
3776
        # <module>.<class>.<method>[(<param+)],
 
3777
        # doctest.DocTestCase can have ids like:
 
3778
        # <module>
 
3779
        # <module>.<class>
 
3780
        # <module>.<function>
 
3781
        # <module>.<class>.<method>
 
3782
 
 
3783
        # Since we can't predict a test class from its name only, we settle on
 
3784
        # a simple constraint: a test id always begins with its module name.
 
3785
 
 
3786
        modules = {}
 
3787
        for test_id in test_id_list:
 
3788
            parts = test_id.split('.')
 
3789
            mod_name = parts.pop(0)
 
3790
            modules[mod_name] = True
 
3791
            for part in parts:
 
3792
                mod_name += '.' + part
 
3793
                modules[mod_name] = True
 
3794
        self.modules = modules
 
3795
 
 
3796
    def refers_to(self, module_name):
 
3797
        """Is there tests for the module or one of its sub modules."""
 
3798
        return module_name in self.modules
 
3799
 
 
3800
    def includes(self, test_id):
 
3801
        return test_id in self.tests
 
3802
 
 
3803
 
 
3804
class TestPrefixAliasRegistry(registry.Registry):
 
3805
    """A registry for test prefix aliases.
 
3806
 
 
3807
    This helps implement shorcuts for the --starting-with selftest
 
3808
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3809
    warning will be emitted).
 
3810
    """
 
3811
 
 
3812
    def register(self, key, obj, help=None, info=None,
 
3813
                 override_existing=False):
 
3814
        """See Registry.register.
 
3815
 
 
3816
        Trying to override an existing alias causes a warning to be emitted,
 
3817
        not a fatal execption.
 
3818
        """
 
3819
        try:
 
3820
            super(TestPrefixAliasRegistry, self).register(
 
3821
                key, obj, help=help, info=info, override_existing=False)
 
3822
        except KeyError:
 
3823
            actual = self.get(key)
 
3824
            trace.note(
 
3825
                'Test prefix alias %s is already used for %s, ignoring %s'
 
3826
                % (key, actual, obj))
 
3827
 
 
3828
    def resolve_alias(self, id_start):
 
3829
        """Replace the alias by the prefix in the given string.
 
3830
 
 
3831
        Using an unknown prefix is an error to help catching typos.
 
3832
        """
 
3833
        parts = id_start.split('.')
 
3834
        try:
 
3835
            parts[0] = self.get(parts[0])
 
3836
        except KeyError:
 
3837
            raise errors.BzrCommandError(
 
3838
                '%s is not a known test prefix alias' % parts[0])
 
3839
        return '.'.join(parts)
 
3840
 
 
3841
 
 
3842
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3843
"""Registry of test prefix aliases."""
 
3844
 
 
3845
 
 
3846
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3847
# appear prefixed ('breezy.' is "replaced" by 'breezy.').
 
3848
test_prefix_alias_registry.register('breezy', 'breezy')
 
3849
 
 
3850
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3851
test_prefix_alias_registry.register('bd', 'breezy.doc')
 
3852
test_prefix_alias_registry.register('bu', 'breezy.utils')
 
3853
test_prefix_alias_registry.register('bt', 'breezy.tests')
 
3854
test_prefix_alias_registry.register('bb', 'breezy.tests.blackbox')
 
3855
test_prefix_alias_registry.register('bp', 'breezy.plugins')
 
3856
 
 
3857
 
 
3858
def _test_suite_testmod_names():
 
3859
    """Return the standard list of test module names to test."""
 
3860
    return [
 
3861
        'breezy.doc',
 
3862
        'breezy.tests.blackbox',
 
3863
        'breezy.tests.commands',
 
3864
        'breezy.tests.per_branch',
 
3865
        'breezy.tests.per_bzrdir',
 
3866
        'breezy.tests.per_controldir',
 
3867
        'breezy.tests.per_controldir_colo',
 
3868
        'breezy.tests.per_foreign_vcs',
 
3869
        'breezy.tests.per_interrepository',
 
3870
        'breezy.tests.per_intertree',
 
3871
        'breezy.tests.per_inventory',
 
3872
        'breezy.tests.per_interbranch',
 
3873
        'breezy.tests.per_lock',
 
3874
        'breezy.tests.per_merger',
 
3875
        'breezy.tests.per_transport',
 
3876
        'breezy.tests.per_tree',
 
3877
        'breezy.tests.per_pack_repository',
 
3878
        'breezy.tests.per_repository',
 
3879
        'breezy.tests.per_repository_chk',
 
3880
        'breezy.tests.per_repository_reference',
 
3881
        'breezy.tests.per_repository_vf',
 
3882
        'breezy.tests.per_uifactory',
 
3883
        'breezy.tests.per_versionedfile',
 
3884
        'breezy.tests.per_workingtree',
 
3885
        'breezy.tests.test__annotator',
 
3886
        'breezy.tests.test__bencode',
 
3887
        'breezy.tests.test__btree_serializer',
 
3888
        'breezy.tests.test__chk_map',
 
3889
        'breezy.tests.test__dirstate_helpers',
 
3890
        'breezy.tests.test__groupcompress',
 
3891
        'breezy.tests.test__known_graph',
 
3892
        'breezy.tests.test__rio',
 
3893
        'breezy.tests.test__simple_set',
 
3894
        'breezy.tests.test__static_tuple',
 
3895
        'breezy.tests.test__walkdirs_win32',
 
3896
        'breezy.tests.test_ancestry',
 
3897
        'breezy.tests.test_annotate',
 
3898
        'breezy.tests.test_atomicfile',
 
3899
        'breezy.tests.test_bad_files',
 
3900
        'breezy.tests.test_bisect',
 
3901
        'breezy.tests.test_bisect_multi',
 
3902
        'breezy.tests.test_branch',
 
3903
        'breezy.tests.test_branchbuilder',
 
3904
        'breezy.tests.test_btree_index',
 
3905
        'breezy.tests.test_bugtracker',
 
3906
        'breezy.tests.test_bundle',
 
3907
        'breezy.tests.test_bzrdir',
 
3908
        'breezy.tests.test__chunks_to_lines',
 
3909
        'breezy.tests.test_cache_utf8',
 
3910
        'breezy.tests.test_chk_map',
 
3911
        'breezy.tests.test_chk_serializer',
 
3912
        'breezy.tests.test_chunk_writer',
 
3913
        'breezy.tests.test_clean_tree',
 
3914
        'breezy.tests.test_cleanup',
 
3915
        'breezy.tests.test_cmdline',
 
3916
        'breezy.tests.test_commands',
 
3917
        'breezy.tests.test_commit',
 
3918
        'breezy.tests.test_commit_merge',
 
3919
        'breezy.tests.test_config',
 
3920
        'breezy.tests.test_conflicts',
 
3921
        'breezy.tests.test_controldir',
 
3922
        'breezy.tests.test_counted_lock',
 
3923
        'breezy.tests.test_crash',
 
3924
        'breezy.tests.test_decorators',
 
3925
        'breezy.tests.test_delta',
 
3926
        'breezy.tests.test_debug',
 
3927
        'breezy.tests.test_diff',
 
3928
        'breezy.tests.test_directory_service',
 
3929
        'breezy.tests.test_dirstate',
 
3930
        'breezy.tests.test_email_message',
 
3931
        'breezy.tests.test_eol_filters',
 
3932
        'breezy.tests.test_errors',
 
3933
        'breezy.tests.test_estimate_compressed_size',
 
3934
        'breezy.tests.test_export',
 
3935
        'breezy.tests.test_export_pot',
 
3936
        'breezy.tests.test_extract',
 
3937
        'breezy.tests.test_features',
 
3938
        'breezy.tests.test_fetch',
 
3939
        'breezy.tests.test_fetch_ghosts',
 
3940
        'breezy.tests.test_fixtures',
 
3941
        'breezy.tests.test_fifo_cache',
 
3942
        'breezy.tests.test_filters',
 
3943
        'breezy.tests.test_filter_tree',
 
3944
        'breezy.tests.test_ftp_transport',
 
3945
        'breezy.tests.test_foreign',
 
3946
        'breezy.tests.test_generate_docs',
 
3947
        'breezy.tests.test_generate_ids',
 
3948
        'breezy.tests.test_globbing',
 
3949
        'breezy.tests.test_gpg',
 
3950
        'breezy.tests.test_graph',
 
3951
        'breezy.tests.test_groupcompress',
 
3952
        'breezy.tests.test_hashcache',
 
3953
        'breezy.tests.test_help',
 
3954
        'breezy.tests.test_hooks',
 
3955
        'breezy.tests.test_http',
 
3956
        'breezy.tests.test_http_response',
 
3957
        'breezy.tests.test_https_ca_bundle',
 
3958
        'breezy.tests.test_https_urllib',
 
3959
        'breezy.tests.test_i18n',
 
3960
        'breezy.tests.test_identitymap',
 
3961
        'breezy.tests.test_ignores',
 
3962
        'breezy.tests.test_index',
 
3963
        'breezy.tests.test_import_tariff',
 
3964
        'breezy.tests.test_info',
 
3965
        'breezy.tests.test_inv',
 
3966
        'breezy.tests.test_inventory_delta',
 
3967
        'breezy.tests.test_knit',
 
3968
        'breezy.tests.test_lazy_import',
 
3969
        'breezy.tests.test_lazy_regex',
 
3970
        'breezy.tests.test_library_state',
 
3971
        'breezy.tests.test_lock',
 
3972
        'breezy.tests.test_lockable_files',
 
3973
        'breezy.tests.test_lockdir',
 
3974
        'breezy.tests.test_log',
 
3975
        'breezy.tests.test_lru_cache',
 
3976
        'breezy.tests.test_lsprof',
 
3977
        'breezy.tests.test_mail_client',
 
3978
        'breezy.tests.test_matchers',
 
3979
        'breezy.tests.test_memorytree',
 
3980
        'breezy.tests.test_merge',
 
3981
        'breezy.tests.test_merge3',
 
3982
        'breezy.tests.test_merge_core',
 
3983
        'breezy.tests.test_merge_directive',
 
3984
        'breezy.tests.test_mergetools',
 
3985
        'breezy.tests.test_missing',
 
3986
        'breezy.tests.test_msgeditor',
 
3987
        'breezy.tests.test_multiparent',
 
3988
        'breezy.tests.test_mutabletree',
 
3989
        'breezy.tests.test_nonascii',
 
3990
        'breezy.tests.test_options',
 
3991
        'breezy.tests.test_osutils',
 
3992
        'breezy.tests.test_osutils_encodings',
 
3993
        'breezy.tests.test_pack',
 
3994
        'breezy.tests.test_patch',
 
3995
        'breezy.tests.test_patches',
 
3996
        'breezy.tests.test_permissions',
 
3997
        'breezy.tests.test_plugins',
 
3998
        'breezy.tests.test_progress',
 
3999
        'breezy.tests.test_pyutils',
 
4000
        'breezy.tests.test_read_bundle',
 
4001
        'breezy.tests.test_reconcile',
 
4002
        'breezy.tests.test_reconfigure',
 
4003
        'breezy.tests.test_registry',
 
4004
        'breezy.tests.test_remote',
 
4005
        'breezy.tests.test_rename_map',
 
4006
        'breezy.tests.test_repository',
 
4007
        'breezy.tests.test_revert',
 
4008
        'breezy.tests.test_revision',
 
4009
        'breezy.tests.test_revisionspec',
 
4010
        'breezy.tests.test_revisiontree',
 
4011
        'breezy.tests.test_rio',
 
4012
        'breezy.tests.test_rules',
 
4013
        'breezy.tests.test_url_policy_open',
 
4014
        'breezy.tests.test_sampler',
 
4015
        'breezy.tests.test_scenarios',
 
4016
        'breezy.tests.test_script',
 
4017
        'breezy.tests.test_selftest',
 
4018
        'breezy.tests.test_serializer',
 
4019
        'breezy.tests.test_setup',
 
4020
        'breezy.tests.test_sftp_transport',
 
4021
        'breezy.tests.test_shelf',
 
4022
        'breezy.tests.test_shelf_ui',
 
4023
        'breezy.tests.test_smart',
 
4024
        'breezy.tests.test_smart_add',
 
4025
        'breezy.tests.test_smart_request',
 
4026
        'breezy.tests.test_smart_signals',
 
4027
        'breezy.tests.test_smart_transport',
 
4028
        'breezy.tests.test_smtp_connection',
 
4029
        'breezy.tests.test_source',
 
4030
        'breezy.tests.test_ssh_transport',
 
4031
        'breezy.tests.test_status',
 
4032
        'breezy.tests.test_strace',
 
4033
        'breezy.tests.test_subsume',
 
4034
        'breezy.tests.test_switch',
 
4035
        'breezy.tests.test_symbol_versioning',
 
4036
        'breezy.tests.test_tag',
 
4037
        'breezy.tests.test_test_server',
 
4038
        'breezy.tests.test_testament',
 
4039
        'breezy.tests.test_textfile',
 
4040
        'breezy.tests.test_textmerge',
 
4041
        'breezy.tests.test_cethread',
 
4042
        'breezy.tests.test_timestamp',
 
4043
        'breezy.tests.test_trace',
 
4044
        'breezy.tests.test_transactions',
 
4045
        'breezy.tests.test_transform',
 
4046
        'breezy.tests.test_transport',
 
4047
        'breezy.tests.test_transport_log',
 
4048
        'breezy.tests.test_tree',
 
4049
        'breezy.tests.test_treebuilder',
 
4050
        'breezy.tests.test_treeshape',
 
4051
        'breezy.tests.test_tsort',
 
4052
        'breezy.tests.test_tuned_gzip',
 
4053
        'breezy.tests.test_ui',
 
4054
        'breezy.tests.test_uncommit',
 
4055
        'breezy.tests.test_upgrade',
 
4056
        'breezy.tests.test_upgrade_stacked',
 
4057
        'breezy.tests.test_upstream_import',
 
4058
        'breezy.tests.test_urlutils',
 
4059
        'breezy.tests.test_utextwrap',
 
4060
        'breezy.tests.test_version',
 
4061
        'breezy.tests.test_version_info',
 
4062
        'breezy.tests.test_versionedfile',
 
4063
        'breezy.tests.test_vf_search',
 
4064
        'breezy.tests.test_views',
 
4065
        'breezy.tests.test_weave',
 
4066
        'breezy.tests.test_whitebox',
 
4067
        'breezy.tests.test_win32utils',
 
4068
        'breezy.tests.test_workingtree',
 
4069
        'breezy.tests.test_workingtree_4',
 
4070
        'breezy.tests.test_wsgi',
 
4071
        'breezy.tests.test_xml',
 
4072
        ]
 
4073
 
 
4074
 
 
4075
def _test_suite_modules_to_doctest():
 
4076
    """Return the list of modules to doctest."""
 
4077
    if __doc__ is None:
 
4078
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
4079
        return []
 
4080
    return [
 
4081
        'breezy',
 
4082
        'breezy.branchbuilder',
 
4083
        'breezy.bzr.inventory',
 
4084
        'breezy.decorators',
 
4085
        'breezy.iterablefile',
 
4086
        'breezy.lockdir',
 
4087
        'breezy.merge3',
 
4088
        'breezy.option',
 
4089
        'breezy.pyutils',
 
4090
        'breezy.symbol_versioning',
 
4091
        'breezy.tests',
 
4092
        'breezy.tests.fixtures',
 
4093
        'breezy.timestamp',
 
4094
        'breezy.transport.http',
 
4095
        'breezy.version_info_formats.format_custom',
 
4096
        ]
 
4097
 
 
4098
 
 
4099
def test_suite(keep_only=None, starting_with=None):
 
4100
    """Build and return TestSuite for the whole of breezy.
 
4101
 
 
4102
    :param keep_only: A list of test ids limiting the suite returned.
 
4103
 
 
4104
    :param starting_with: An id limiting the suite returned to the tests
 
4105
         starting with it.
 
4106
 
 
4107
    This function can be replaced if you need to change the default test
 
4108
    suite on a global basis, but it is not encouraged.
 
4109
    """
 
4110
 
 
4111
    loader = TestUtil.TestLoader()
 
4112
 
 
4113
    if keep_only is not None:
 
4114
        id_filter = TestIdList(keep_only)
 
4115
    if starting_with:
 
4116
        # We take precedence over keep_only because *at loading time* using
 
4117
        # both options means we will load less tests for the same final result.
 
4118
        def interesting_module(name):
 
4119
            for start in starting_with:
 
4120
                if (
 
4121
                    # Either the module name starts with the specified string
 
4122
                    name.startswith(start)
 
4123
                    # or it may contain tests starting with the specified string
 
4124
                    or start.startswith(name)
 
4125
                    ):
 
4126
                    return True
 
4127
            return False
 
4128
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
4129
 
 
4130
    elif keep_only is not None:
 
4131
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
4132
        def interesting_module(name):
 
4133
            return id_filter.refers_to(name)
 
4134
 
 
4135
    else:
 
4136
        loader = TestUtil.TestLoader()
 
4137
        def interesting_module(name):
 
4138
            # No filtering, all modules are interesting
 
4139
            return True
 
4140
 
 
4141
    suite = loader.suiteClass()
 
4142
 
 
4143
    # modules building their suite with loadTestsFromModuleNames
 
4144
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
4145
 
 
4146
    for mod in _test_suite_modules_to_doctest():
 
4147
        if not interesting_module(mod):
 
4148
            # No tests to keep here, move along
 
4149
            continue
 
4150
        try:
 
4151
            # note that this really does mean "report only" -- doctest
 
4152
            # still runs the rest of the examples
 
4153
            doc_suite = IsolatedDocTestSuite(
 
4154
                mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
4155
        except ValueError as e:
 
4156
            print('**failed to get doctest for: %s\n%s' % (mod, e))
 
4157
            raise
 
4158
        if len(doc_suite._tests) == 0:
 
4159
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
4160
        suite.addTest(doc_suite)
 
4161
 
 
4162
    default_encoding = sys.getdefaultencoding()
 
4163
    for name, plugin in _mod_plugin.plugins().items():
 
4164
        if not interesting_module(plugin.module.__name__):
 
4165
            continue
 
4166
        plugin_suite = plugin.test_suite()
 
4167
        # We used to catch ImportError here and turn it into just a warning,
 
4168
        # but really if you don't have --no-plugins this should be a failure.
 
4169
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
4170
        if plugin_suite is None:
 
4171
            plugin_suite = plugin.load_plugin_tests(loader)
 
4172
        if plugin_suite is not None:
 
4173
            suite.addTest(plugin_suite)
 
4174
        if default_encoding != sys.getdefaultencoding():
 
4175
            trace.warning(
 
4176
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
4177
                sys.getdefaultencoding())
 
4178
            reload(sys)
 
4179
            sys.setdefaultencoding(default_encoding)
 
4180
 
 
4181
    if keep_only is not None:
 
4182
        # Now that the referred modules have loaded their tests, keep only the
 
4183
        # requested ones.
 
4184
        suite = filter_suite_by_id_list(suite, id_filter)
 
4185
        # Do some sanity checks on the id_list filtering
 
4186
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
4187
        if starting_with:
 
4188
            # The tester has used both keep_only and starting_with, so he is
 
4189
            # already aware that some tests are excluded from the list, there
 
4190
            # is no need to tell him which.
 
4191
            pass
 
4192
        else:
 
4193
            # Some tests mentioned in the list are not in the test suite. The
 
4194
            # list may be out of date, report to the tester.
 
4195
            for id in not_found:
 
4196
                trace.warning('"%s" not found in the test suite', id)
 
4197
        for id in duplicates:
 
4198
            trace.warning('"%s" is used as an id by several tests', id)
 
4199
 
 
4200
    return suite
 
4201
 
 
4202
 
 
4203
def multiply_scenarios(*scenarios):
 
4204
    """Multiply two or more iterables of scenarios.
 
4205
 
 
4206
    It is safe to pass scenario generators or iterators.
 
4207
 
 
4208
    :returns: A list of compound scenarios: the cross-product of all
 
4209
        scenarios, with the names concatenated and the parameters
 
4210
        merged together.
 
4211
    """
 
4212
    return functools.reduce(_multiply_two_scenarios, map(list, scenarios))
 
4213
 
 
4214
 
 
4215
def _multiply_two_scenarios(scenarios_left, scenarios_right):
 
4216
    """Multiply two sets of scenarios.
 
4217
 
 
4218
    :returns: the cartesian product of the two sets of scenarios, that is
 
4219
        a scenario for every possible combination of a left scenario and a
 
4220
        right scenario.
 
4221
    """
 
4222
    return [
 
4223
        ('%s,%s' % (left_name, right_name),
 
4224
         dict(left_dict, **right_dict))
 
4225
        for left_name, left_dict in scenarios_left
 
4226
        for right_name, right_dict in scenarios_right]
 
4227
 
 
4228
 
 
4229
def multiply_tests(tests, scenarios, result):
 
4230
    """Multiply tests_list by scenarios into result.
 
4231
 
 
4232
    This is the core workhorse for test parameterisation.
 
4233
 
 
4234
    Typically the load_tests() method for a per-implementation test suite will
 
4235
    call multiply_tests and return the result.
 
4236
 
 
4237
    :param tests: The tests to parameterise.
 
4238
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4239
        scenario_param_dict).
 
4240
    :param result: A TestSuite to add created tests to.
 
4241
 
 
4242
    This returns the passed in result TestSuite with the cross product of all
 
4243
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4244
    the scenario name at the end of its id(), and updating the test object's
 
4245
    __dict__ with the scenario_param_dict.
 
4246
 
 
4247
    >>> import breezy.tests.test_sampler
 
4248
    >>> r = multiply_tests(
 
4249
    ...     breezy.tests.test_sampler.DemoTest('test_nothing'),
 
4250
    ...     [('one', dict(param=1)),
 
4251
    ...      ('two', dict(param=2))],
 
4252
    ...     TestUtil.TestSuite())
 
4253
    >>> tests = list(iter_suite_tests(r))
 
4254
    >>> len(tests)
 
4255
    2
 
4256
    >>> tests[0].id()
 
4257
    'breezy.tests.test_sampler.DemoTest.test_nothing(one)'
 
4258
    >>> tests[0].param
 
4259
    1
 
4260
    >>> tests[1].param
 
4261
    2
 
4262
    """
 
4263
    for test in iter_suite_tests(tests):
 
4264
        apply_scenarios(test, scenarios, result)
 
4265
    return result
 
4266
 
 
4267
 
 
4268
def apply_scenarios(test, scenarios, result):
 
4269
    """Apply the scenarios in scenarios to test and add to result.
 
4270
 
 
4271
    :param test: The test to apply scenarios to.
 
4272
    :param scenarios: An iterable of scenarios to apply to test.
 
4273
    :return: result
 
4274
    :seealso: apply_scenario
 
4275
    """
 
4276
    for scenario in scenarios:
 
4277
        result.addTest(apply_scenario(test, scenario))
 
4278
    return result
 
4279
 
 
4280
 
 
4281
def apply_scenario(test, scenario):
 
4282
    """Copy test and apply scenario to it.
 
4283
 
 
4284
    :param test: A test to adapt.
 
4285
    :param scenario: A tuple describing the scenario.
 
4286
        The first element of the tuple is the new test id.
 
4287
        The second element is a dict containing attributes to set on the
 
4288
        test.
 
4289
    :return: The adapted test.
 
4290
    """
 
4291
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4292
    new_test = clone_test(test, new_id)
 
4293
    for name, value in scenario[1].items():
 
4294
        setattr(new_test, name, value)
 
4295
    return new_test
 
4296
 
 
4297
 
 
4298
def clone_test(test, new_id):
 
4299
    """Clone a test giving it a new id.
 
4300
 
 
4301
    :param test: The test to clone.
 
4302
    :param new_id: The id to assign to it.
 
4303
    :return: The new test.
 
4304
    """
 
4305
    new_test = copy.copy(test)
 
4306
    new_test.id = lambda: new_id
 
4307
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4308
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4309
    # read the test output for parameterized tests, because tracebacks will be
 
4310
    # associated with irrelevant tests.
 
4311
    try:
 
4312
        details = new_test._TestCase__details
 
4313
    except AttributeError:
 
4314
        # must be a different version of testtools than expected.  Do nothing.
 
4315
        pass
 
4316
    else:
 
4317
        # Reset the '__details' dict.
 
4318
        new_test._TestCase__details = {}
 
4319
    return new_test
 
4320
 
 
4321
 
 
4322
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4323
                                ext_module_name):
 
4324
    """Helper for permutating tests against an extension module.
 
4325
 
 
4326
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4327
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4328
    against both implementations. Setting 'test.module' to the appropriate
 
4329
    module. See breezy.tests.test__chk_map.load_tests as an example.
 
4330
 
 
4331
    :param standard_tests: A test suite to permute
 
4332
    :param loader: A TestLoader
 
4333
    :param py_module_name: The python path to a python module that can always
 
4334
        be loaded, and will be considered the 'python' implementation. (eg
 
4335
        'breezy._chk_map_py')
 
4336
    :param ext_module_name: The python path to an extension module. If the
 
4337
        module cannot be loaded, a single test will be added, which notes that
 
4338
        the module is not available. If it can be loaded, all standard_tests
 
4339
        will be run against that module.
 
4340
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4341
        tests. feature is the Feature object that can be used to determine if
 
4342
        the module is available.
 
4343
    """
 
4344
 
 
4345
    from .features import ModuleAvailableFeature
 
4346
    py_module = pyutils.get_named_object(py_module_name)
 
4347
    scenarios = [
 
4348
        ('python', {'module': py_module}),
 
4349
    ]
 
4350
    suite = loader.suiteClass()
 
4351
    feature = ModuleAvailableFeature(ext_module_name)
 
4352
    if feature.available():
 
4353
        scenarios.append(('C', {'module': feature.module}))
 
4354
    else:
 
4355
        # the compiled module isn't available, so we add a failing test
 
4356
        class FailWithoutFeature(TestCase):
 
4357
            def test_fail(self):
 
4358
                self.requireFeature(feature)
 
4359
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4360
    result = multiply_tests(standard_tests, scenarios, suite)
 
4361
    return result, feature
 
4362
 
 
4363
 
 
4364
def _rmtree_temp_dir(dirname, test_id=None):
 
4365
    # If LANG=C we probably have created some bogus paths
 
4366
    # which rmtree(unicode) will fail to delete
 
4367
    # so make sure we are using rmtree(str) to delete everything
 
4368
    # except on win32, where rmtree(str) will fail
 
4369
    # since it doesn't have the property of byte-stream paths
 
4370
    # (they are either ascii or mbcs)
 
4371
    if sys.platform == 'win32' and isinstance(dirname, bytes):
 
4372
        # make sure we are using the unicode win32 api
 
4373
        dirname = dirname.decode('mbcs')
 
4374
    else:
 
4375
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4376
    try:
 
4377
        osutils.rmtree(dirname)
 
4378
    except OSError as e:
 
4379
        # We don't want to fail here because some useful display will be lost
 
4380
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4381
        # possible info to the test runner is even worse.
 
4382
        if test_id != None:
 
4383
            ui.ui_factory.clear_term()
 
4384
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4385
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4386
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4387
                                    ).encode('ascii', 'replace')
 
4388
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4389
                         % (os.path.basename(dirname), printable_e))
 
4390
 
 
4391
 
 
4392
def probe_unicode_in_user_encoding():
 
4393
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4394
    Return first successfull match.
 
4395
 
 
4396
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4397
    """
 
4398
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4399
    for uni_val in possible_vals:
 
4400
        try:
 
4401
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4402
        except UnicodeEncodeError:
 
4403
            # Try a different character
 
4404
            pass
 
4405
        else:
 
4406
            return uni_val, str_val
 
4407
    return None, None
 
4408
 
 
4409
 
 
4410
def probe_bad_non_ascii(encoding):
 
4411
    """Try to find [bad] character with code [128..255]
 
4412
    that cannot be decoded to unicode in some encoding.
 
4413
    Return None if all non-ascii characters is valid
 
4414
    for given encoding.
 
4415
    """
 
4416
    for i in range(128, 256):
 
4417
        char = chr(i)
 
4418
        try:
 
4419
            char.decode(encoding)
 
4420
        except UnicodeDecodeError:
 
4421
            return char
 
4422
    return None
 
4423
 
 
4424
 
 
4425
# Only define SubUnitBzrRunner if subunit is available.
 
4426
try:
 
4427
    from subunit import TestProtocolClient
 
4428
    from subunit.test_results import AutoTimingTestResultDecorator
 
4429
 
 
4430
    class SubUnitBzrProtocolClientv1(TestProtocolClient):
 
4431
 
 
4432
        def stopTest(self, test):
 
4433
            super(SubUnitBzrProtocolClientv1, self).stopTest(test)
 
4434
            _clear__type_equality_funcs(test)
 
4435
 
 
4436
        def addSuccess(self, test, details=None):
 
4437
            # The subunit client always includes the details in the subunit
 
4438
            # stream, but we don't want to include it in ours.
 
4439
            if details is not None and 'log' in details:
 
4440
                del details['log']
 
4441
            return super(SubUnitBzrProtocolClientv1, self).addSuccess(
 
4442
                test, details)
 
4443
 
 
4444
    class SubUnitBzrRunnerv1(TextTestRunner):
 
4445
 
 
4446
        def run(self, test):
 
4447
            result = AutoTimingTestResultDecorator(
 
4448
                SubUnitBzrProtocolClientv1(self.stream))
 
4449
            test.run(result)
 
4450
            return result
 
4451
except ImportError:
 
4452
    pass
 
4453
 
 
4454
 
 
4455
try:
 
4456
    from subunit.run import SubunitTestRunner
 
4457
 
 
4458
    class SubUnitBzrRunnerv2(TextTestRunner, SubunitTestRunner):
 
4459
 
 
4460
        def __init__(self, stream=sys.stderr, descriptions=0, verbosity=1,
 
4461
                     bench_history=None, strict=False, result_decorators=None):
 
4462
            TextTestRunner.__init__(
 
4463
                    self, stream=stream,
 
4464
                    descriptions=descriptions, verbosity=verbosity,
 
4465
                    bench_history=bench_history, strict=strict,
 
4466
                    result_decorators=result_decorators)
 
4467
            SubunitTestRunner.__init__(self, verbosity=verbosity,
 
4468
                                       stream=stream)
 
4469
 
 
4470
        run = SubunitTestRunner.run
 
4471
except ImportError:
 
4472
    pass