1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
 
3
# This program is free software; you can redistribute it and/or modify
 
 
4
# it under the terms of the GNU General Public License as published by
 
 
5
# the Free Software Foundation; either version 2 of the License, or
 
 
6
# (at your option) any later version.
 
 
8
# This program is distributed in the hope that it will be useful,
 
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
11
# GNU General Public License for more details.
 
 
13
# You should have received a copy of the GNU General Public License
 
 
14
# along with this program; if not, write to the Free Software
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
 
21
# little as possible, so this should be used rarely if it's added at all.
 
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
 
27
# new assertFoo() methods.
 
 
31
from cStringIO import StringIO
 
 
37
from pprint import pformat
 
 
42
from subprocess import Popen, PIPE
 
 
62
import bzrlib.commands
 
 
63
import bzrlib.timestamp
 
 
65
import bzrlib.inventory
 
 
66
import bzrlib.iterablefile
 
 
71
    # lsprof not available
 
 
73
from bzrlib.merge import merge_inner
 
 
77
from bzrlib.revision import common_ancestor
 
 
79
from bzrlib import symbol_versioning
 
 
80
from bzrlib.symbol_versioning import (
 
 
85
from bzrlib.transport import get_transport
 
 
86
import bzrlib.transport
 
 
87
from bzrlib.transport.local import LocalURLServer
 
 
88
from bzrlib.transport.memory import MemoryServer
 
 
89
from bzrlib.transport.readonly import ReadonlyServer
 
 
90
from bzrlib.trace import mutter, note
 
 
91
from bzrlib.tests import TestUtil
 
 
92
from bzrlib.tests.HttpServer import HttpServer
 
 
93
from bzrlib.tests.TestUtil import (
 
 
97
from bzrlib.tests.treeshape import build_tree_contents
 
 
98
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
 
100
# Mark this python module as being part of the implementation
 
 
101
# of unittest: this gives us better tracebacks where the last
 
 
102
# shown frame is the test code, not our assertXYZ.
 
 
105
default_transport = LocalURLServer
 
 
108
MODULES_TO_DOCTEST = [
 
 
121
def packages_to_test():
 
 
122
    """Return a list of packages to test.
 
 
124
    The packages are not globally imported so that import failures are
 
 
125
    triggered when running selftest, not when importing the command.
 
 
128
    import bzrlib.tests.blackbox
 
 
129
    import bzrlib.tests.branch_implementations
 
 
130
    import bzrlib.tests.bzrdir_implementations
 
 
131
    import bzrlib.tests.commands
 
 
132
    import bzrlib.tests.interrepository_implementations
 
 
133
    import bzrlib.tests.interversionedfile_implementations
 
 
134
    import bzrlib.tests.intertree_implementations
 
 
135
    import bzrlib.tests.per_lock
 
 
136
    import bzrlib.tests.repository_implementations
 
 
137
    import bzrlib.tests.revisionstore_implementations
 
 
138
    import bzrlib.tests.tree_implementations
 
 
139
    import bzrlib.tests.workingtree_implementations
 
 
142
            bzrlib.tests.blackbox,
 
 
143
            bzrlib.tests.branch_implementations,
 
 
144
            bzrlib.tests.bzrdir_implementations,
 
 
145
            bzrlib.tests.commands,
 
 
146
            bzrlib.tests.interrepository_implementations,
 
 
147
            bzrlib.tests.interversionedfile_implementations,
 
 
148
            bzrlib.tests.intertree_implementations,
 
 
149
            bzrlib.tests.per_lock,
 
 
150
            bzrlib.tests.repository_implementations,
 
 
151
            bzrlib.tests.revisionstore_implementations,
 
 
152
            bzrlib.tests.tree_implementations,
 
 
153
            bzrlib.tests.workingtree_implementations,
 
 
157
class ExtendedTestResult(unittest._TextTestResult):
 
 
158
    """Accepts, reports and accumulates the results of running tests.
 
 
160
    Compared to this unittest version this class adds support for profiling,
 
 
161
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
 
162
    There are further-specialized subclasses for different types of display.
 
 
167
    def __init__(self, stream, descriptions, verbosity,
 
 
171
        """Construct new TestResult.
 
 
173
        :param bench_history: Optionally, a writable file object to accumulate
 
 
176
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
 
177
        if bench_history is not None:
 
 
178
            from bzrlib.version import _get_bzr_source_tree
 
 
179
            src_tree = _get_bzr_source_tree()
 
 
182
                    revision_id = src_tree.get_parent_ids()[0]
 
 
184
                    # XXX: if this is a brand new tree, do the same as if there
 
 
188
                # XXX: If there's no branch, what should we do?
 
 
190
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
 
191
        self._bench_history = bench_history
 
 
192
        self.ui = ui.ui_factory
 
 
193
        self.num_tests = num_tests
 
 
195
        self.failure_count = 0
 
 
196
        self.known_failure_count = 0
 
 
198
        self.unsupported = {}
 
 
200
        self._overall_start_time = time.time()
 
 
202
    def extractBenchmarkTime(self, testCase):
 
 
203
        """Add a benchmark time for the current test case."""
 
 
204
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
 
206
    def _elapsedTestTimeString(self):
 
 
207
        """Return a time string for the overall time the current test has taken."""
 
 
208
        return self._formatTime(time.time() - self._start_time)
 
 
210
    def _testTimeString(self):
 
 
211
        if self._benchmarkTime is not None:
 
 
213
                self._formatTime(self._benchmarkTime),
 
 
214
                self._elapsedTestTimeString())
 
 
216
            return "           %s" % self._elapsedTestTimeString()
 
 
218
    def _formatTime(self, seconds):
 
 
219
        """Format seconds as milliseconds with leading spaces."""
 
 
220
        # some benchmarks can take thousands of seconds to run, so we need 8
 
 
222
        return "%8dms" % (1000 * seconds)
 
 
224
    def _shortened_test_description(self, test):
 
 
226
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
 
229
    def startTest(self, test):
 
 
230
        unittest.TestResult.startTest(self, test)
 
 
231
        self.report_test_start(test)
 
 
232
        test.number = self.count
 
 
233
        self._recordTestStartTime()
 
 
235
    def _recordTestStartTime(self):
 
 
236
        """Record that a test has started."""
 
 
237
        self._start_time = time.time()
 
 
239
    def _cleanupLogFile(self, test):
 
 
240
        # We can only do this if we have one of our TestCases, not if
 
 
242
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
 
243
        if setKeepLogfile is not None:
 
 
246
    def addError(self, test, err):
 
 
247
        self.extractBenchmarkTime(test)
 
 
248
        self._cleanupLogFile(test)
 
 
249
        if isinstance(err[1], TestSkipped):
 
 
250
            return self.addSkipped(test, err)
 
 
251
        elif isinstance(err[1], UnavailableFeature):
 
 
252
            return self.addNotSupported(test, err[1].args[0])
 
 
253
        unittest.TestResult.addError(self, test, err)
 
 
254
        self.error_count += 1
 
 
255
        self.report_error(test, err)
 
 
259
    def addFailure(self, test, err):
 
 
260
        self._cleanupLogFile(test)
 
 
261
        self.extractBenchmarkTime(test)
 
 
262
        if isinstance(err[1], KnownFailure):
 
 
263
            return self.addKnownFailure(test, err)
 
 
264
        unittest.TestResult.addFailure(self, test, err)
 
 
265
        self.failure_count += 1
 
 
266
        self.report_failure(test, err)
 
 
270
    def addKnownFailure(self, test, err):
 
 
271
        self.known_failure_count += 1
 
 
272
        self.report_known_failure(test, err)
 
 
274
    def addNotSupported(self, test, feature):
 
 
275
        self.unsupported.setdefault(str(feature), 0)
 
 
276
        self.unsupported[str(feature)] += 1
 
 
277
        self.report_unsupported(test, feature)
 
 
279
    def addSuccess(self, test):
 
 
280
        self.extractBenchmarkTime(test)
 
 
281
        if self._bench_history is not None:
 
 
282
            if self._benchmarkTime is not None:
 
 
283
                self._bench_history.write("%s %s\n" % (
 
 
284
                    self._formatTime(self._benchmarkTime),
 
 
286
        self.report_success(test)
 
 
287
        unittest.TestResult.addSuccess(self, test)
 
 
289
    def addSkipped(self, test, skip_excinfo):
 
 
290
        self.report_skip(test, skip_excinfo)
 
 
291
        # seems best to treat this as success from point-of-view of unittest
 
 
292
        # -- it actually does nothing so it barely matters :)
 
 
295
        except KeyboardInterrupt:
 
 
298
            self.addError(test, test.__exc_info())
 
 
300
            unittest.TestResult.addSuccess(self, test)
 
 
302
    def printErrorList(self, flavour, errors):
 
 
303
        for test, err in errors:
 
 
304
            self.stream.writeln(self.separator1)
 
 
305
            self.stream.write("%s: " % flavour)
 
 
306
            self.stream.writeln(self.getDescription(test))
 
 
307
            if getattr(test, '_get_log', None) is not None:
 
 
309
                print >>self.stream, \
 
 
310
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
 
311
                print >>self.stream, test._get_log()
 
 
312
                print >>self.stream, \
 
 
313
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
 
314
            self.stream.writeln(self.separator2)
 
 
315
            self.stream.writeln("%s" % err)
 
 
320
    def report_cleaning_up(self):
 
 
323
    def report_success(self, test):
 
 
327
class TextTestResult(ExtendedTestResult):
 
 
328
    """Displays progress and results of tests in text form"""
 
 
330
    def __init__(self, stream, descriptions, verbosity,
 
 
335
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
 
336
            bench_history, num_tests)
 
 
338
            self.pb = self.ui.nested_progress_bar()
 
 
339
            self._supplied_pb = False
 
 
342
            self._supplied_pb = True
 
 
343
        self.pb.show_pct = False
 
 
344
        self.pb.show_spinner = False
 
 
345
        self.pb.show_eta = False,
 
 
346
        self.pb.show_count = False
 
 
347
        self.pb.show_bar = False
 
 
349
    def report_starting(self):
 
 
350
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
 
352
    def _progress_prefix_text(self):
 
 
353
        a = '[%d' % self.count
 
 
354
        if self.num_tests is not None:
 
 
355
            a +='/%d' % self.num_tests
 
 
356
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
 
358
            a += ', %d errors' % self.error_count
 
 
359
        if self.failure_count:
 
 
360
            a += ', %d failed' % self.failure_count
 
 
361
        if self.known_failure_count:
 
 
362
            a += ', %d known failures' % self.known_failure_count
 
 
364
            a += ', %d skipped' % self.skip_count
 
 
366
            a += ', %d missing features' % len(self.unsupported)
 
 
370
    def report_test_start(self, test):
 
 
373
                self._progress_prefix_text()
 
 
375
                + self._shortened_test_description(test))
 
 
377
    def _test_description(self, test):
 
 
378
        return self._shortened_test_description(test)
 
 
380
    def report_error(self, test, err):
 
 
381
        self.pb.note('ERROR: %s\n    %s\n', 
 
 
382
            self._test_description(test),
 
 
386
    def report_failure(self, test, err):
 
 
387
        self.pb.note('FAIL: %s\n    %s\n', 
 
 
388
            self._test_description(test),
 
 
392
    def report_known_failure(self, test, err):
 
 
393
        self.pb.note('XFAIL: %s\n%s\n',
 
 
394
            self._test_description(test), err[1])
 
 
396
    def report_skip(self, test, skip_excinfo):
 
 
399
            # at the moment these are mostly not things we can fix
 
 
400
            # and so they just produce stipple; use the verbose reporter
 
 
403
                # show test and reason for skip
 
 
404
                self.pb.note('SKIP: %s\n    %s\n', 
 
 
405
                    self._shortened_test_description(test),
 
 
408
                # since the class name was left behind in the still-visible
 
 
410
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
 
412
    def report_unsupported(self, test, feature):
 
 
413
        """test cannot be run because feature is missing."""
 
 
415
    def report_cleaning_up(self):
 
 
416
        self.pb.update('cleaning up...')
 
 
419
        if not self._supplied_pb:
 
 
423
class VerboseTestResult(ExtendedTestResult):
 
 
424
    """Produce long output, with one line per test run plus times"""
 
 
426
    def _ellipsize_to_right(self, a_string, final_width):
 
 
427
        """Truncate and pad a string, keeping the right hand side"""
 
 
428
        if len(a_string) > final_width:
 
 
429
            result = '...' + a_string[3-final_width:]
 
 
432
        return result.ljust(final_width)
 
 
434
    def report_starting(self):
 
 
435
        self.stream.write('running %d tests...\n' % self.num_tests)
 
 
437
    def report_test_start(self, test):
 
 
439
        name = self._shortened_test_description(test)
 
 
440
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
 
441
        # numbers, plus a trailing blank
 
 
442
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
 
443
        self.stream.write(self._ellipsize_to_right(name,
 
 
444
                          osutils.terminal_width()-30))
 
 
447
    def _error_summary(self, err):
 
 
449
        return '%s%s' % (indent, err[1])
 
 
451
    def report_error(self, test, err):
 
 
452
        self.stream.writeln('ERROR %s\n%s'
 
 
453
                % (self._testTimeString(),
 
 
454
                   self._error_summary(err)))
 
 
456
    def report_failure(self, test, err):
 
 
457
        self.stream.writeln(' FAIL %s\n%s'
 
 
458
                % (self._testTimeString(),
 
 
459
                   self._error_summary(err)))
 
 
461
    def report_known_failure(self, test, err):
 
 
462
        self.stream.writeln('XFAIL %s\n%s'
 
 
463
                % (self._testTimeString(),
 
 
464
                   self._error_summary(err)))
 
 
466
    def report_success(self, test):
 
 
467
        self.stream.writeln('   OK %s' % self._testTimeString())
 
 
468
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
 
469
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
 
470
            stats.pprint(file=self.stream)
 
 
471
        # flush the stream so that we get smooth output. This verbose mode is
 
 
472
        # used to show the output in PQM.
 
 
475
    def report_skip(self, test, skip_excinfo):
 
 
477
        self.stream.writeln(' SKIP %s\n%s'
 
 
478
                % (self._testTimeString(),
 
 
479
                   self._error_summary(skip_excinfo)))
 
 
481
    def report_unsupported(self, test, feature):
 
 
482
        """test cannot be run because feature is missing."""
 
 
483
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
 
484
                %(self._testTimeString(), feature))
 
 
488
class TextTestRunner(object):
 
 
489
    stop_on_failure = False
 
 
498
        self.stream = unittest._WritelnDecorator(stream)
 
 
499
        self.descriptions = descriptions
 
 
500
        self.verbosity = verbosity
 
 
501
        self._bench_history = bench_history
 
 
502
        self.list_only = list_only
 
 
505
        "Run the given test case or test suite."
 
 
506
        startTime = time.time()
 
 
507
        if self.verbosity == 1:
 
 
508
            result_class = TextTestResult
 
 
509
        elif self.verbosity >= 2:
 
 
510
            result_class = VerboseTestResult
 
 
511
        result = result_class(self.stream,
 
 
514
                              bench_history=self._bench_history,
 
 
515
                              num_tests=test.countTestCases(),
 
 
517
        result.stop_early = self.stop_on_failure
 
 
518
        result.report_starting()
 
 
520
            if self.verbosity >= 2:
 
 
521
                self.stream.writeln("Listing tests only ...\n")
 
 
523
            for t in iter_suite_tests(test):
 
 
524
                self.stream.writeln("%s" % (t.id()))
 
 
526
            actionTaken = "Listed"
 
 
529
            run = result.testsRun
 
 
531
        stopTime = time.time()
 
 
532
        timeTaken = stopTime - startTime
 
 
534
        self.stream.writeln(result.separator2)
 
 
535
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
 
536
                            run, run != 1 and "s" or "", timeTaken))
 
 
537
        self.stream.writeln()
 
 
538
        if not result.wasSuccessful():
 
 
539
            self.stream.write("FAILED (")
 
 
540
            failed, errored = map(len, (result.failures, result.errors))
 
 
542
                self.stream.write("failures=%d" % failed)
 
 
544
                if failed: self.stream.write(", ")
 
 
545
                self.stream.write("errors=%d" % errored)
 
 
546
            if result.known_failure_count:
 
 
547
                if failed or errored: self.stream.write(", ")
 
 
548
                self.stream.write("known_failure_count=%d" %
 
 
549
                    result.known_failure_count)
 
 
550
            self.stream.writeln(")")
 
 
552
            if result.known_failure_count:
 
 
553
                self.stream.writeln("OK (known_failures=%d)" %
 
 
554
                    result.known_failure_count)
 
 
556
                self.stream.writeln("OK")
 
 
557
        if result.skip_count > 0:
 
 
558
            skipped = result.skip_count
 
 
559
            self.stream.writeln('%d test%s skipped' %
 
 
560
                                (skipped, skipped != 1 and "s" or ""))
 
 
561
        if result.unsupported:
 
 
562
            for feature, count in sorted(result.unsupported.items()):
 
 
563
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
 
569
def iter_suite_tests(suite):
 
 
570
    """Return all tests in a suite, recursing through nested suites"""
 
 
571
    for item in suite._tests:
 
 
572
        if isinstance(item, unittest.TestCase):
 
 
574
        elif isinstance(item, unittest.TestSuite):
 
 
575
            for r in iter_suite_tests(item):
 
 
578
            raise Exception('unknown object %r inside test suite %r'
 
 
582
class TestSkipped(Exception):
 
 
583
    """Indicates that a test was intentionally skipped, rather than failing."""
 
 
586
class KnownFailure(AssertionError):
 
 
587
    """Indicates that a test failed in a precisely expected manner.
 
 
589
    Such failures dont block the whole test suite from passing because they are
 
 
590
    indicators of partially completed code or of future work. We have an
 
 
591
    explicit error for them so that we can ensure that they are always visible:
 
 
592
    KnownFailures are always shown in the output of bzr selftest.
 
 
596
class UnavailableFeature(Exception):
 
 
597
    """A feature required for this test was not available.
 
 
599
    The feature should be used to construct the exception.
 
 
603
class CommandFailed(Exception):
 
 
607
class StringIOWrapper(object):
 
 
608
    """A wrapper around cStringIO which just adds an encoding attribute.
 
 
610
    Internally we can check sys.stdout to see what the output encoding
 
 
611
    should be. However, cStringIO has no encoding attribute that we can
 
 
612
    set. So we wrap it instead.
 
 
617
    def __init__(self, s=None):
 
 
619
            self.__dict__['_cstring'] = StringIO(s)
 
 
621
            self.__dict__['_cstring'] = StringIO()
 
 
623
    def __getattr__(self, name, getattr=getattr):
 
 
624
        return getattr(self.__dict__['_cstring'], name)
 
 
626
    def __setattr__(self, name, val):
 
 
627
        if name == 'encoding':
 
 
628
            self.__dict__['encoding'] = val
 
 
630
            return setattr(self._cstring, name, val)
 
 
633
class TestUIFactory(ui.CLIUIFactory):
 
 
634
    """A UI Factory for testing.
 
 
636
    Hide the progress bar but emit note()s.
 
 
638
    Allows get_password to be tested without real tty attached.
 
 
645
        super(TestUIFactory, self).__init__()
 
 
646
        if stdin is not None:
 
 
647
            # We use a StringIOWrapper to be able to test various
 
 
648
            # encodings, but the user is still responsible to
 
 
649
            # encode the string and to set the encoding attribute
 
 
650
            # of StringIOWrapper.
 
 
651
            self.stdin = StringIOWrapper(stdin)
 
 
653
            self.stdout = sys.stdout
 
 
657
            self.stderr = sys.stderr
 
 
662
        """See progress.ProgressBar.clear()."""
 
 
664
    def clear_term(self):
 
 
665
        """See progress.ProgressBar.clear_term()."""
 
 
667
    def clear_term(self):
 
 
668
        """See progress.ProgressBar.clear_term()."""
 
 
671
        """See progress.ProgressBar.finished()."""
 
 
673
    def note(self, fmt_string, *args, **kwargs):
 
 
674
        """See progress.ProgressBar.note()."""
 
 
675
        self.stdout.write((fmt_string + "\n") % args)
 
 
677
    def progress_bar(self):
 
 
680
    def nested_progress_bar(self):
 
 
683
    def update(self, message, count=None, total=None):
 
 
684
        """See progress.ProgressBar.update()."""
 
 
686
    def get_non_echoed_password(self, prompt):
 
 
687
        """Get password from stdin without trying to handle the echo mode"""
 
 
689
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
 
690
        password = self.stdin.readline()
 
 
693
        if password[-1] == '\n':
 
 
694
            password = password[:-1]
 
 
698
class TestCase(unittest.TestCase):
 
 
699
    """Base class for bzr unit tests.
 
 
701
    Tests that need access to disk resources should subclass 
 
 
702
    TestCaseInTempDir not TestCase.
 
 
704
    Error and debug log messages are redirected from their usual
 
 
705
    location into a temporary file, the contents of which can be
 
 
706
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
 
707
    so that it can also capture file IO.  When the test completes this file
 
 
708
    is read into memory and removed from disk.
 
 
710
    There are also convenience functions to invoke bzr's command-line
 
 
711
    routine, and to build and check bzr trees.
 
 
713
    In addition to the usual method of overriding tearDown(), this class also
 
 
714
    allows subclasses to register functions into the _cleanups list, which is
 
 
715
    run in order as the object is torn down.  It's less likely this will be
 
 
716
    accidentally overlooked.
 
 
719
    _log_file_name = None
 
 
721
    _keep_log_file = False
 
 
722
    # record lsprof data when performing benchmark calls.
 
 
723
    _gather_lsprof_in_benchmarks = False
 
 
725
    def __init__(self, methodName='testMethod'):
 
 
726
        super(TestCase, self).__init__(methodName)
 
 
730
        unittest.TestCase.setUp(self)
 
 
731
        self._cleanEnvironment()
 
 
732
        bzrlib.trace.disable_default_logging()
 
 
735
        self._benchcalls = []
 
 
736
        self._benchtime = None
 
 
738
        self._clear_debug_flags()
 
 
740
    def _clear_debug_flags(self):
 
 
741
        """Prevent externally set debug flags affecting tests.
 
 
743
        Tests that want to use debug flags can just set them in the
 
 
744
        debug_flags set during setup/teardown.
 
 
746
        self._preserved_debug_flags = set(debug.debug_flags)
 
 
747
        debug.debug_flags.clear()
 
 
748
        self.addCleanup(self._restore_debug_flags)
 
 
750
    def _clear_hooks(self):
 
 
751
        # prevent hooks affecting tests
 
 
753
        import bzrlib.smart.server
 
 
754
        self._preserved_hooks = {
 
 
755
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
 
756
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
 
758
        self.addCleanup(self._restoreHooks)
 
 
759
        # reset all hooks to an empty instance of the appropriate type
 
 
760
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
 
761
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
 
763
    def _silenceUI(self):
 
 
764
        """Turn off UI for duration of test"""
 
 
765
        # by default the UI is off; tests can turn it on if they want it.
 
 
766
        saved = ui.ui_factory
 
 
768
            ui.ui_factory = saved
 
 
769
        ui.ui_factory = ui.SilentUIFactory()
 
 
770
        self.addCleanup(_restore)
 
 
772
    def _ndiff_strings(self, a, b):
 
 
773
        """Return ndiff between two strings containing lines.
 
 
775
        A trailing newline is added if missing to make the strings
 
 
777
        if b and b[-1] != '\n':
 
 
779
        if a and a[-1] != '\n':
 
 
781
        difflines = difflib.ndiff(a.splitlines(True),
 
 
783
                                  linejunk=lambda x: False,
 
 
784
                                  charjunk=lambda x: False)
 
 
785
        return ''.join(difflines)
 
 
787
    def assertEqual(self, a, b, message=''):
 
 
791
        except UnicodeError, e:
 
 
792
            # If we can't compare without getting a UnicodeError, then
 
 
793
            # obviously they are different
 
 
794
            mutter('UnicodeError: %s', e)
 
 
797
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
 
799
               pformat(a), pformat(b)))
 
 
801
    assertEquals = assertEqual
 
 
803
    def assertEqualDiff(self, a, b, message=None):
 
 
804
        """Assert two texts are equal, if not raise an exception.
 
 
806
        This is intended for use with multi-line strings where it can 
 
 
807
        be hard to find the differences by eye.
 
 
809
        # TODO: perhaps override assertEquals to call this for strings?
 
 
813
            message = "texts not equal:\n"
 
 
814
        raise AssertionError(message +
 
 
815
                             self._ndiff_strings(a, b))
 
 
817
    def assertEqualMode(self, mode, mode_test):
 
 
818
        self.assertEqual(mode, mode_test,
 
 
819
                         'mode mismatch %o != %o' % (mode, mode_test))
 
 
821
    def assertPositive(self, val):
 
 
822
        """Assert that val is greater than 0."""
 
 
823
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
 
825
    def assertNegative(self, val):
 
 
826
        """Assert that val is less than 0."""
 
 
827
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
 
829
    def assertStartsWith(self, s, prefix):
 
 
830
        if not s.startswith(prefix):
 
 
831
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
 
833
    def assertEndsWith(self, s, suffix):
 
 
834
        """Asserts that s ends with suffix."""
 
 
835
        if not s.endswith(suffix):
 
 
836
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
 
838
    def assertContainsRe(self, haystack, needle_re):
 
 
839
        """Assert that a contains something matching a regular expression."""
 
 
840
        if not re.search(needle_re, haystack):
 
 
841
            if '\n' in haystack or len(haystack) > 60:
 
 
842
                # a long string, format it in a more readable way
 
 
843
                raise AssertionError(
 
 
844
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
 
845
                        % (needle_re, haystack))
 
 
847
                raise AssertionError('pattern "%s" not found in "%s"'
 
 
848
                        % (needle_re, haystack))
 
 
850
    def assertNotContainsRe(self, haystack, needle_re):
 
 
851
        """Assert that a does not match a regular expression"""
 
 
852
        if re.search(needle_re, haystack):
 
 
853
            raise AssertionError('pattern "%s" found in "%s"'
 
 
854
                    % (needle_re, haystack))
 
 
856
    def assertSubset(self, sublist, superlist):
 
 
857
        """Assert that every entry in sublist is present in superlist."""
 
 
859
        for entry in sublist:
 
 
860
            if entry not in superlist:
 
 
861
                missing.append(entry)
 
 
863
            raise AssertionError("value(s) %r not present in container %r" % 
 
 
864
                                 (missing, superlist))
 
 
866
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
 
867
        """Fail unless excClass is raised when the iterator from func is used.
 
 
869
        Many functions can return generators this makes sure
 
 
870
        to wrap them in a list() call to make sure the whole generator
 
 
871
        is run, and that the proper exception is raised.
 
 
874
            list(func(*args, **kwargs))
 
 
878
            if getattr(excClass,'__name__', None) is not None:
 
 
879
                excName = excClass.__name__
 
 
881
                excName = str(excClass)
 
 
882
            raise self.failureException, "%s not raised" % excName
 
 
884
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
 
885
        """Assert that a callable raises a particular exception.
 
 
887
        :param excClass: As for the except statement, this may be either an
 
 
888
            exception class, or a tuple of classes.
 
 
889
        :param callableObj: A callable, will be passed ``*args`` and
 
 
892
        Returns the exception so that you can examine it.
 
 
895
            callableObj(*args, **kwargs)
 
 
899
            if getattr(excClass,'__name__', None) is not None:
 
 
900
                excName = excClass.__name__
 
 
903
                excName = str(excClass)
 
 
904
            raise self.failureException, "%s not raised" % excName
 
 
906
    def assertIs(self, left, right, message=None):
 
 
907
        if not (left is right):
 
 
908
            if message is not None:
 
 
909
                raise AssertionError(message)
 
 
911
                raise AssertionError("%r is not %r." % (left, right))
 
 
913
    def assertIsNot(self, left, right, message=None):
 
 
915
            if message is not None:
 
 
916
                raise AssertionError(message)
 
 
918
                raise AssertionError("%r is %r." % (left, right))
 
 
920
    def assertTransportMode(self, transport, path, mode):
 
 
921
        """Fail if a path does not have mode mode.
 
 
923
        If modes are not supported on this transport, the assertion is ignored.
 
 
925
        if not transport._can_roundtrip_unix_modebits():
 
 
927
        path_stat = transport.stat(path)
 
 
928
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
 
929
        self.assertEqual(mode, actual_mode,
 
 
930
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
 
932
    def assertIsInstance(self, obj, kls):
 
 
933
        """Fail if obj is not an instance of kls"""
 
 
934
        if not isinstance(obj, kls):
 
 
935
            self.fail("%r is an instance of %s rather than %s" % (
 
 
936
                obj, obj.__class__, kls))
 
 
938
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
 
939
        """Invoke a test, expecting it to fail for the given reason.
 
 
941
        This is for assertions that ought to succeed, but currently fail.
 
 
942
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
 
943
        about the failure you're expecting.  If a new bug is introduced,
 
 
944
        AssertionError should be raised, not KnownFailure.
 
 
946
        Frequently, expectFailure should be followed by an opposite assertion.
 
 
949
        Intended to be used with a callable that raises AssertionError as the
 
 
950
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
 
952
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
 
957
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
 
959
          self.assertEqual(42, dynamic_val)
 
 
961
          This means that a dynamic_val of 54 will cause the test to raise
 
 
962
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
 
963
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
 
964
          than 54 or 42 will cause an AssertionError.
 
 
967
            assertion(*args, **kwargs)
 
 
968
        except AssertionError:
 
 
969
            raise KnownFailure(reason)
 
 
971
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
 
973
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
 
974
        """A helper for callDeprecated and applyDeprecated.
 
 
976
        :param a_callable: A callable to call.
 
 
977
        :param args: The positional arguments for the callable
 
 
978
        :param kwargs: The keyword arguments for the callable
 
 
979
        :return: A tuple (warnings, result). result is the result of calling
 
 
980
            a_callable(``*args``, ``**kwargs``).
 
 
983
        def capture_warnings(msg, cls=None, stacklevel=None):
 
 
984
            # we've hooked into a deprecation specific callpath,
 
 
985
            # only deprecations should getting sent via it.
 
 
986
            self.assertEqual(cls, DeprecationWarning)
 
 
987
            local_warnings.append(msg)
 
 
988
        original_warning_method = symbol_versioning.warn
 
 
989
        symbol_versioning.set_warning_method(capture_warnings)
 
 
991
            result = a_callable(*args, **kwargs)
 
 
993
            symbol_versioning.set_warning_method(original_warning_method)
 
 
994
        return (local_warnings, result)
 
 
996
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
 
997
        """Call a deprecated callable without warning the user.
 
 
999
        Note that this only captures warnings raised by symbol_versioning.warn,
 
 
1000
        not other callers that go direct to the warning module.
 
 
1002
        :param deprecation_format: The deprecation format that the callable
 
 
1003
            should have been deprecated with. This is the same type as the
 
 
1004
            parameter to deprecated_method/deprecated_function. If the
 
 
1005
            callable is not deprecated with this format, an assertion error
 
 
1007
        :param a_callable: A callable to call. This may be a bound method or
 
 
1008
            a regular function. It will be called with ``*args`` and
 
 
1010
        :param args: The positional arguments for the callable
 
 
1011
        :param kwargs: The keyword arguments for the callable
 
 
1012
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
 
1014
        call_warnings, result = self._capture_warnings(a_callable,
 
 
1016
        expected_first_warning = symbol_versioning.deprecation_string(
 
 
1017
            a_callable, deprecation_format)
 
 
1018
        if len(call_warnings) == 0:
 
 
1019
            self.fail("No deprecation warning generated by call to %s" %
 
 
1021
        self.assertEqual(expected_first_warning, call_warnings[0])
 
 
1024
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
 
1025
        """Assert that a callable is deprecated in a particular way.
 
 
1027
        This is a very precise test for unusual requirements. The 
 
 
1028
        applyDeprecated helper function is probably more suited for most tests
 
 
1029
        as it allows you to simply specify the deprecation format being used
 
 
1030
        and will ensure that that is issued for the function being called.
 
 
1032
        Note that this only captures warnings raised by symbol_versioning.warn,
 
 
1033
        not other callers that go direct to the warning module.
 
 
1035
        :param expected: a list of the deprecation warnings expected, in order
 
 
1036
        :param callable: The callable to call
 
 
1037
        :param args: The positional arguments for the callable
 
 
1038
        :param kwargs: The keyword arguments for the callable
 
 
1040
        call_warnings, result = self._capture_warnings(callable,
 
 
1042
        self.assertEqual(expected, call_warnings)
 
 
1045
    def _startLogFile(self):
 
 
1046
        """Send bzr and test log messages to a temporary file.
 
 
1048
        The file is removed as the test is torn down.
 
 
1050
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
 
1051
        self._log_file = os.fdopen(fileno, 'w+')
 
 
1052
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
 
1053
        self._log_file_name = name
 
 
1054
        self.addCleanup(self._finishLogFile)
 
 
1056
    def _finishLogFile(self):
 
 
1057
        """Finished with the log file.
 
 
1059
        Close the file and delete it, unless setKeepLogfile was called.
 
 
1061
        if self._log_file is None:
 
 
1063
        bzrlib.trace.disable_test_log(self._log_nonce)
 
 
1064
        self._log_file.close()
 
 
1065
        self._log_file = None
 
 
1066
        if not self._keep_log_file:
 
 
1067
            os.remove(self._log_file_name)
 
 
1068
            self._log_file_name = None
 
 
1070
    def setKeepLogfile(self):
 
 
1071
        """Make the logfile not be deleted when _finishLogFile is called."""
 
 
1072
        self._keep_log_file = True
 
 
1074
    def addCleanup(self, callable):
 
 
1075
        """Arrange to run a callable when this case is torn down.
 
 
1077
        Callables are run in the reverse of the order they are registered, 
 
 
1078
        ie last-in first-out.
 
 
1080
        if callable in self._cleanups:
 
 
1081
            raise ValueError("cleanup function %r already registered on %s" 
 
 
1083
        self._cleanups.append(callable)
 
 
1085
    def _cleanEnvironment(self):
 
 
1087
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
 
1088
            'HOME': os.getcwd(),
 
 
1089
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
 
1091
            'BZREMAIL': None, # may still be present in the environment
 
 
1093
            'BZR_PROGRESS_BAR': None,
 
 
1095
            'SSH_AUTH_SOCK': None,
 
 
1099
            'https_proxy': None,
 
 
1100
            'HTTPS_PROXY': None,
 
 
1105
            # Nobody cares about these ones AFAIK. So far at
 
 
1106
            # least. If you do (care), please update this comment
 
 
1110
            'BZR_REMOTE_PATH': None,
 
 
1113
        self.addCleanup(self._restoreEnvironment)
 
 
1114
        for name, value in new_env.iteritems():
 
 
1115
            self._captureVar(name, value)
 
 
1117
    def _captureVar(self, name, newvalue):
 
 
1118
        """Set an environment variable, and reset it when finished."""
 
 
1119
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
 
1121
    def _restore_debug_flags(self):
 
 
1122
        debug.debug_flags.clear()
 
 
1123
        debug.debug_flags.update(self._preserved_debug_flags)
 
 
1125
    def _restoreEnvironment(self):
 
 
1126
        for name, value in self.__old_env.iteritems():
 
 
1127
            osutils.set_or_unset_env(name, value)
 
 
1129
    def _restoreHooks(self):
 
 
1130
        for klass, hooks in self._preserved_hooks.items():
 
 
1131
            setattr(klass, 'hooks', hooks)
 
 
1133
    def knownFailure(self, reason):
 
 
1134
        """This test has failed for some known reason."""
 
 
1135
        raise KnownFailure(reason)
 
 
1137
    def run(self, result=None):
 
 
1138
        if result is None: result = self.defaultTestResult()
 
 
1139
        for feature in getattr(self, '_test_needs_features', []):
 
 
1140
            if not feature.available():
 
 
1141
                result.startTest(self)
 
 
1142
                if getattr(result, 'addNotSupported', None):
 
 
1143
                    result.addNotSupported(self, feature)
 
 
1145
                    result.addSuccess(self)
 
 
1146
                result.stopTest(self)
 
 
1148
        return unittest.TestCase.run(self, result)
 
 
1152
        unittest.TestCase.tearDown(self)
 
 
1154
    def time(self, callable, *args, **kwargs):
 
 
1155
        """Run callable and accrue the time it takes to the benchmark time.
 
 
1157
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
 
1158
        this will cause lsprofile statistics to be gathered and stored in
 
 
1161
        if self._benchtime is None:
 
 
1165
            if not self._gather_lsprof_in_benchmarks:
 
 
1166
                return callable(*args, **kwargs)
 
 
1168
                # record this benchmark
 
 
1169
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
 
1171
                self._benchcalls.append(((callable, args, kwargs), stats))
 
 
1174
            self._benchtime += time.time() - start
 
 
1176
    def _runCleanups(self):
 
 
1177
        """Run registered cleanup functions. 
 
 
1179
        This should only be called from TestCase.tearDown.
 
 
1181
        # TODO: Perhaps this should keep running cleanups even if 
 
 
1182
        # one of them fails?
 
 
1184
        # Actually pop the cleanups from the list so tearDown running
 
 
1185
        # twice is safe (this happens for skipped tests).
 
 
1186
        while self._cleanups:
 
 
1187
            self._cleanups.pop()()
 
 
1189
    def log(self, *args):
 
 
1192
    def _get_log(self, keep_log_file=False):
 
 
1193
        """Return as a string the log for this test. If the file is still
 
 
1194
        on disk and keep_log_file=False, delete the log file and store the
 
 
1195
        content in self._log_contents."""
 
 
1196
        # flush the log file, to get all content
 
 
1198
        bzrlib.trace._trace_file.flush()
 
 
1199
        if self._log_contents:
 
 
1200
            return self._log_contents
 
 
1201
        if self._log_file_name is not None:
 
 
1202
            logfile = open(self._log_file_name)
 
 
1204
                log_contents = logfile.read()
 
 
1207
            if not keep_log_file:
 
 
1208
                self._log_contents = log_contents
 
 
1210
                    os.remove(self._log_file_name)
 
 
1212
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
 
1213
                        print >>sys.stderr, ('Unable to delete log file '
 
 
1214
                                             ' %r' % self._log_file_name)
 
 
1219
            return "DELETED log file to reduce memory footprint"
 
 
1221
    @deprecated_method(zero_eighteen)
 
 
1222
    def capture(self, cmd, retcode=0):
 
 
1223
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
 
1224
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
 
1226
    def requireFeature(self, feature):
 
 
1227
        """This test requires a specific feature is available.
 
 
1229
        :raises UnavailableFeature: When feature is not available.
 
 
1231
        if not feature.available():
 
 
1232
            raise UnavailableFeature(feature)
 
 
1234
    @deprecated_method(zero_eighteen)
 
 
1235
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
 
1237
        """Invoke bzr and return (stdout, stderr).
 
 
1239
        Don't call this method, just use run_bzr() which is equivalent.
 
 
1241
        :param argv: Arguments to invoke bzr.  This may be either a 
 
 
1242
            single string, in which case it is split by shlex into words, 
 
 
1243
            or a list of arguments.
 
 
1244
        :param retcode: Expected return code, or None for don't-care.
 
 
1245
        :param encoding: Encoding for sys.stdout and sys.stderr
 
 
1246
        :param stdin: A string to be used as stdin for the command.
 
 
1247
        :param working_dir: Change to this directory before running
 
 
1249
        return self._run_bzr_autosplit(argv, retcode=retcode,
 
 
1250
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
 
1253
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
 
1255
        """Run bazaar command line, splitting up a string command line."""
 
 
1256
        if isinstance(args, basestring):
 
 
1257
            args = list(shlex.split(args))
 
 
1258
        return self._run_bzr_core(args, retcode=retcode,
 
 
1259
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
 
1262
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
 
1264
        if encoding is None:
 
 
1265
            encoding = bzrlib.user_encoding
 
 
1266
        stdout = StringIOWrapper()
 
 
1267
        stderr = StringIOWrapper()
 
 
1268
        stdout.encoding = encoding
 
 
1269
        stderr.encoding = encoding
 
 
1271
        self.log('run bzr: %r', args)
 
 
1272
        # FIXME: don't call into logging here
 
 
1273
        handler = logging.StreamHandler(stderr)
 
 
1274
        handler.setLevel(logging.INFO)
 
 
1275
        logger = logging.getLogger('')
 
 
1276
        logger.addHandler(handler)
 
 
1277
        old_ui_factory = ui.ui_factory
 
 
1278
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
 
1281
        if working_dir is not None:
 
 
1282
            cwd = osutils.getcwd()
 
 
1283
            os.chdir(working_dir)
 
 
1286
            result = self.apply_redirected(ui.ui_factory.stdin,
 
 
1288
                bzrlib.commands.run_bzr_catch_errors,
 
 
1291
            logger.removeHandler(handler)
 
 
1292
            ui.ui_factory = old_ui_factory
 
 
1296
        out = stdout.getvalue()
 
 
1297
        err = stderr.getvalue()
 
 
1299
            self.log('output:\n%r', out)
 
 
1301
            self.log('errors:\n%r', err)
 
 
1302
        if retcode is not None:
 
 
1303
            self.assertEquals(retcode, result,
 
 
1304
                              message='Unexpected return code')
 
 
1307
    def run_bzr(self, *args, **kwargs):
 
 
1308
        """Invoke bzr, as if it were run from the command line.
 
 
1310
        The argument list should not include the bzr program name - the
 
 
1311
        first argument is normally the bzr command.  Arguments may be
 
 
1312
        passed in three ways:
 
 
1314
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
 
1315
        when the command contains whitespace or metacharacters, or 
 
 
1316
        is built up at run time.
 
 
1318
        2- A single string, eg "add a".  This is the most convenient 
 
 
1319
        for hardcoded commands.
 
 
1321
        3- Several varargs parameters, eg run_bzr("add", "a").  
 
 
1322
        This is not recommended for new code.
 
 
1324
        This runs bzr through the interface that catches and reports
 
 
1325
        errors, and with logging set to something approximating the
 
 
1326
        default, so that error reporting can be checked.
 
 
1328
        This should be the main method for tests that want to exercise the
 
 
1329
        overall behavior of the bzr application (rather than a unit test
 
 
1330
        or a functional test of the library.)
 
 
1332
        This sends the stdout/stderr results into the test's log,
 
 
1333
        where it may be useful for debugging.  See also run_captured.
 
 
1335
        :keyword stdin: A string to be used as stdin for the command.
 
 
1336
        :keyword retcode: The status code the command should return;
 
 
1338
        :keyword working_dir: The directory to run the command in
 
 
1339
        :keyword error_regexes: A list of expected error messages.  If
 
 
1340
            specified they must be seen in the error output of the command.
 
 
1342
        retcode = kwargs.pop('retcode', 0)
 
 
1343
        encoding = kwargs.pop('encoding', None)
 
 
1344
        stdin = kwargs.pop('stdin', None)
 
 
1345
        working_dir = kwargs.pop('working_dir', None)
 
 
1346
        error_regexes = kwargs.pop('error_regexes', [])
 
 
1349
            raise TypeError("run_bzr() got unexpected keyword arguments '%s'"
 
 
1353
            if isinstance(args[0], (list, basestring)):
 
 
1356
            symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
 
 
1357
                                   DeprecationWarning, stacklevel=3)
 
 
1359
        out, err = self._run_bzr_autosplit(args=args,
 
 
1361
            encoding=encoding, stdin=stdin, working_dir=working_dir,
 
 
1364
        for regex in error_regexes:
 
 
1365
            self.assertContainsRe(err, regex)
 
 
1368
    def run_bzr_decode(self, *args, **kwargs):
 
 
1369
        if 'encoding' in kwargs:
 
 
1370
            encoding = kwargs['encoding']
 
 
1372
            encoding = bzrlib.user_encoding
 
 
1373
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
 
1375
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
 
1376
        """Run bzr, and check that stderr contains the supplied regexes
 
 
1378
        :param error_regexes: Sequence of regular expressions which
 
 
1379
            must each be found in the error output. The relative ordering
 
 
1381
        :param args: command-line arguments for bzr
 
 
1382
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
 
1383
            This function changes the default value of retcode to be 3,
 
 
1384
            since in most cases this is run when you expect bzr to fail.
 
 
1386
        :return: (out, err) The actual output of running the command (in case
 
 
1387
            you want to do more inspection)
 
 
1391
            # Make sure that commit is failing because there is nothing to do
 
 
1392
            self.run_bzr_error(['no changes to commit'],
 
 
1393
                               ['commit', '-m', 'my commit comment'])
 
 
1394
            # Make sure --strict is handling an unknown file, rather than
 
 
1395
            # giving us the 'nothing to do' error
 
 
1396
            self.build_tree(['unknown'])
 
 
1397
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
 
1398
                               ['commit', --strict', '-m', 'my commit comment'])
 
 
1400
        kwargs.setdefault('retcode', 3)
 
 
1401
        kwargs['error_regexes'] = error_regexes
 
 
1402
        out, err = self.run_bzr(*args, **kwargs)
 
 
1405
    def run_bzr_subprocess(self, *args, **kwargs):
 
 
1406
        """Run bzr in a subprocess for testing.
 
 
1408
        This starts a new Python interpreter and runs bzr in there. 
 
 
1409
        This should only be used for tests that have a justifiable need for
 
 
1410
        this isolation: e.g. they are testing startup time, or signal
 
 
1411
        handling, or early startup code, etc.  Subprocess code can't be 
 
 
1412
        profiled or debugged so easily.
 
 
1414
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
 
1415
            None is supplied, the status code is not checked.
 
 
1416
        :keyword env_changes: A dictionary which lists changes to environment
 
 
1417
            variables. A value of None will unset the env variable.
 
 
1418
            The values must be strings. The change will only occur in the
 
 
1419
            child, so you don't need to fix the environment after running.
 
 
1420
        :keyword universal_newlines: Convert CRLF => LF
 
 
1421
        :keyword allow_plugins: By default the subprocess is run with
 
 
1422
            --no-plugins to ensure test reproducibility. Also, it is possible
 
 
1423
            for system-wide plugins to create unexpected output on stderr,
 
 
1424
            which can cause unnecessary test failures.
 
 
1426
        env_changes = kwargs.get('env_changes', {})
 
 
1427
        working_dir = kwargs.get('working_dir', None)
 
 
1428
        allow_plugins = kwargs.get('allow_plugins', False)
 
 
1429
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
 
1430
                                            working_dir=working_dir,
 
 
1431
                                            allow_plugins=allow_plugins)
 
 
1432
        # We distinguish between retcode=None and retcode not passed.
 
 
1433
        supplied_retcode = kwargs.get('retcode', 0)
 
 
1434
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
 
1435
            universal_newlines=kwargs.get('universal_newlines', False),
 
 
1438
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
 
1439
                             skip_if_plan_to_signal=False,
 
 
1441
                             allow_plugins=False):
 
 
1442
        """Start bzr in a subprocess for testing.
 
 
1444
        This starts a new Python interpreter and runs bzr in there.
 
 
1445
        This should only be used for tests that have a justifiable need for
 
 
1446
        this isolation: e.g. they are testing startup time, or signal
 
 
1447
        handling, or early startup code, etc.  Subprocess code can't be
 
 
1448
        profiled or debugged so easily.
 
 
1450
        :param process_args: a list of arguments to pass to the bzr executable,
 
 
1451
            for example ``['--version']``.
 
 
1452
        :param env_changes: A dictionary which lists changes to environment
 
 
1453
            variables. A value of None will unset the env variable.
 
 
1454
            The values must be strings. The change will only occur in the
 
 
1455
            child, so you don't need to fix the environment after running.
 
 
1456
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
 
1458
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
 
1460
        :returns: Popen object for the started process.
 
 
1462
        if skip_if_plan_to_signal:
 
 
1463
            if not getattr(os, 'kill', None):
 
 
1464
                raise TestSkipped("os.kill not available.")
 
 
1466
        if env_changes is None:
 
 
1470
        def cleanup_environment():
 
 
1471
            for env_var, value in env_changes.iteritems():
 
 
1472
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
 
1474
        def restore_environment():
 
 
1475
            for env_var, value in old_env.iteritems():
 
 
1476
                osutils.set_or_unset_env(env_var, value)
 
 
1478
        bzr_path = self.get_bzr_path()
 
 
1481
        if working_dir is not None:
 
 
1482
            cwd = osutils.getcwd()
 
 
1483
            os.chdir(working_dir)
 
 
1486
            # win32 subprocess doesn't support preexec_fn
 
 
1487
            # so we will avoid using it on all platforms, just to
 
 
1488
            # make sure the code path is used, and we don't break on win32
 
 
1489
            cleanup_environment()
 
 
1490
            command = [sys.executable, bzr_path]
 
 
1491
            if not allow_plugins:
 
 
1492
                command.append('--no-plugins')
 
 
1493
            command.extend(process_args)
 
 
1494
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
 
1496
            restore_environment()
 
 
1502
    def _popen(self, *args, **kwargs):
 
 
1503
        """Place a call to Popen.
 
 
1505
        Allows tests to override this method to intercept the calls made to
 
 
1506
        Popen for introspection.
 
 
1508
        return Popen(*args, **kwargs)
 
 
1510
    def get_bzr_path(self):
 
 
1511
        """Return the path of the 'bzr' executable for this test suite."""
 
 
1512
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
 
1513
        if not os.path.isfile(bzr_path):
 
 
1514
            # We are probably installed. Assume sys.argv is the right file
 
 
1515
            bzr_path = sys.argv[0]
 
 
1518
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
 
1519
                              universal_newlines=False, process_args=None):
 
 
1520
        """Finish the execution of process.
 
 
1522
        :param process: the Popen object returned from start_bzr_subprocess.
 
 
1523
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
 
1524
            None is supplied, the status code is not checked.
 
 
1525
        :param send_signal: an optional signal to send to the process.
 
 
1526
        :param universal_newlines: Convert CRLF => LF
 
 
1527
        :returns: (stdout, stderr)
 
 
1529
        if send_signal is not None:
 
 
1530
            os.kill(process.pid, send_signal)
 
 
1531
        out, err = process.communicate()
 
 
1533
        if universal_newlines:
 
 
1534
            out = out.replace('\r\n', '\n')
 
 
1535
            err = err.replace('\r\n', '\n')
 
 
1537
        if retcode is not None and retcode != process.returncode:
 
 
1538
            if process_args is None:
 
 
1539
                process_args = "(unknown args)"
 
 
1540
            mutter('Output of bzr %s:\n%s', process_args, out)
 
 
1541
            mutter('Error for bzr %s:\n%s', process_args, err)
 
 
1542
            self.fail('Command bzr %s failed with retcode %s != %s'
 
 
1543
                      % (process_args, retcode, process.returncode))
 
 
1546
    def check_inventory_shape(self, inv, shape):
 
 
1547
        """Compare an inventory to a list of expected names.
 
 
1549
        Fail if they are not precisely equal.
 
 
1552
        shape = list(shape)             # copy
 
 
1553
        for path, ie in inv.entries():
 
 
1554
            name = path.replace('\\', '/')
 
 
1555
            if ie.kind == 'directory':
 
 
1562
            self.fail("expected paths not found in inventory: %r" % shape)
 
 
1564
            self.fail("unexpected paths found in inventory: %r" % extras)
 
 
1566
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
 
1567
                         a_callable=None, *args, **kwargs):
 
 
1568
        """Call callable with redirected std io pipes.
 
 
1570
        Returns the return code."""
 
 
1571
        if not callable(a_callable):
 
 
1572
            raise ValueError("a_callable must be callable.")
 
 
1574
            stdin = StringIO("")
 
 
1576
            if getattr(self, "_log_file", None) is not None:
 
 
1577
                stdout = self._log_file
 
 
1581
            if getattr(self, "_log_file", None is not None):
 
 
1582
                stderr = self._log_file
 
 
1585
        real_stdin = sys.stdin
 
 
1586
        real_stdout = sys.stdout
 
 
1587
        real_stderr = sys.stderr
 
 
1592
            return a_callable(*args, **kwargs)
 
 
1594
            sys.stdout = real_stdout
 
 
1595
            sys.stderr = real_stderr
 
 
1596
            sys.stdin = real_stdin
 
 
1598
    def reduceLockdirTimeout(self):
 
 
1599
        """Reduce the default lock timeout for the duration of the test, so that
 
 
1600
        if LockContention occurs during a test, it does so quickly.
 
 
1602
        Tests that expect to provoke LockContention errors should call this.
 
 
1604
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
 
1606
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
 
1607
        self.addCleanup(resetTimeout)
 
 
1608
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
 
1611
class TestCaseWithMemoryTransport(TestCase):
 
 
1612
    """Common test class for tests that do not need disk resources.
 
 
1614
    Tests that need disk resources should derive from TestCaseWithTransport.
 
 
1616
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
 
1618
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
 
1619
    a directory which does not exist. This serves to help ensure test isolation
 
 
1620
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
 
1621
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
 
1622
    file defaults for the transport in tests, nor does it obey the command line
 
 
1623
    override, so tests that accidentally write to the common directory should
 
 
1626
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
 
1627
    a .bzr directory that stops us ascending higher into the filesystem.
 
 
1633
    def __init__(self, methodName='runTest'):
 
 
1634
        # allow test parameterisation after test construction and before test
 
 
1635
        # execution. Variables that the parameteriser sets need to be 
 
 
1636
        # ones that are not set by setUp, or setUp will trash them.
 
 
1637
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
 
1638
        self.vfs_transport_factory = default_transport
 
 
1639
        self.transport_server = None
 
 
1640
        self.transport_readonly_server = None
 
 
1641
        self.__vfs_server = None
 
 
1643
    def get_transport(self, relpath=None):
 
 
1644
        """Return a writeable transport.
 
 
1646
        This transport is for the test scratch space relative to
 
 
1649
        :param relpath: a path relative to the base url.
 
 
1651
        t = get_transport(self.get_url(relpath))
 
 
1652
        self.assertFalse(t.is_readonly())
 
 
1655
    def get_readonly_transport(self, relpath=None):
 
 
1656
        """Return a readonly transport for the test scratch space
 
 
1658
        This can be used to test that operations which should only need
 
 
1659
        readonly access in fact do not try to write.
 
 
1661
        :param relpath: a path relative to the base url.
 
 
1663
        t = get_transport(self.get_readonly_url(relpath))
 
 
1664
        self.assertTrue(t.is_readonly())
 
 
1667
    def create_transport_readonly_server(self):
 
 
1668
        """Create a transport server from class defined at init.
 
 
1670
        This is mostly a hook for daughter classes.
 
 
1672
        return self.transport_readonly_server()
 
 
1674
    def get_readonly_server(self):
 
 
1675
        """Get the server instance for the readonly transport
 
 
1677
        This is useful for some tests with specific servers to do diagnostics.
 
 
1679
        if self.__readonly_server is None:
 
 
1680
            if self.transport_readonly_server is None:
 
 
1681
                # readonly decorator requested
 
 
1682
                # bring up the server
 
 
1683
                self.__readonly_server = ReadonlyServer()
 
 
1684
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
 
1686
                self.__readonly_server = self.create_transport_readonly_server()
 
 
1687
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
 
1688
            self.addCleanup(self.__readonly_server.tearDown)
 
 
1689
        return self.__readonly_server
 
 
1691
    def get_readonly_url(self, relpath=None):
 
 
1692
        """Get a URL for the readonly transport.
 
 
1694
        This will either be backed by '.' or a decorator to the transport 
 
 
1695
        used by self.get_url()
 
 
1696
        relpath provides for clients to get a path relative to the base url.
 
 
1697
        These should only be downwards relative, not upwards.
 
 
1699
        base = self.get_readonly_server().get_url()
 
 
1700
        return self._adjust_url(base, relpath)
 
 
1702
    def get_vfs_only_server(self):
 
 
1703
        """Get the vfs only read/write server instance.
 
 
1705
        This is useful for some tests with specific servers that need
 
 
1708
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
 
1709
        is no means to override it.
 
 
1711
        if self.__vfs_server is None:
 
 
1712
            self.__vfs_server = MemoryServer()
 
 
1713
            self.__vfs_server.setUp()
 
 
1714
            self.addCleanup(self.__vfs_server.tearDown)
 
 
1715
        return self.__vfs_server
 
 
1717
    def get_server(self):
 
 
1718
        """Get the read/write server instance.
 
 
1720
        This is useful for some tests with specific servers that need
 
 
1723
        This is built from the self.transport_server factory. If that is None,
 
 
1724
        then the self.get_vfs_server is returned.
 
 
1726
        if self.__server is None:
 
 
1727
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
 
1728
                return self.get_vfs_only_server()
 
 
1730
                # bring up a decorated means of access to the vfs only server.
 
 
1731
                self.__server = self.transport_server()
 
 
1733
                    self.__server.setUp(self.get_vfs_only_server())
 
 
1734
                except TypeError, e:
 
 
1735
                    # This should never happen; the try:Except here is to assist
 
 
1736
                    # developers having to update code rather than seeing an
 
 
1737
                    # uninformative TypeError.
 
 
1738
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
 
1739
            self.addCleanup(self.__server.tearDown)
 
 
1740
        return self.__server
 
 
1742
    def _adjust_url(self, base, relpath):
 
 
1743
        """Get a URL (or maybe a path) for the readwrite transport.
 
 
1745
        This will either be backed by '.' or to an equivalent non-file based
 
 
1747
        relpath provides for clients to get a path relative to the base url.
 
 
1748
        These should only be downwards relative, not upwards.
 
 
1750
        if relpath is not None and relpath != '.':
 
 
1751
            if not base.endswith('/'):
 
 
1753
            # XXX: Really base should be a url; we did after all call
 
 
1754
            # get_url()!  But sometimes it's just a path (from
 
 
1755
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
 
1756
            # to a non-escaped local path.
 
 
1757
            if base.startswith('./') or base.startswith('/'):
 
 
1760
                base += urlutils.escape(relpath)
 
 
1763
    def get_url(self, relpath=None):
 
 
1764
        """Get a URL (or maybe a path) for the readwrite transport.
 
 
1766
        This will either be backed by '.' or to an equivalent non-file based
 
 
1768
        relpath provides for clients to get a path relative to the base url.
 
 
1769
        These should only be downwards relative, not upwards.
 
 
1771
        base = self.get_server().get_url()
 
 
1772
        return self._adjust_url(base, relpath)
 
 
1774
    def get_vfs_only_url(self, relpath=None):
 
 
1775
        """Get a URL (or maybe a path for the plain old vfs transport.
 
 
1777
        This will never be a smart protocol.  It always has all the
 
 
1778
        capabilities of the local filesystem, but it might actually be a
 
 
1779
        MemoryTransport or some other similar virtual filesystem.
 
 
1781
        This is the backing transport (if any) of the server returned by
 
 
1782
        get_url and get_readonly_url.
 
 
1784
        :param relpath: provides for clients to get a path relative to the base
 
 
1785
            url.  These should only be downwards relative, not upwards.
 
 
1788
        base = self.get_vfs_only_server().get_url()
 
 
1789
        return self._adjust_url(base, relpath)
 
 
1791
    def _make_test_root(self):
 
 
1792
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
 
1794
        root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
 
1795
        TestCaseWithMemoryTransport.TEST_ROOT = root
 
 
1797
        # make a fake bzr directory there to prevent any tests propagating
 
 
1798
        # up onto the source directory's real branch
 
 
1799
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
 
1801
        # The same directory is used by all tests, and we're not specifically
 
 
1802
        # told when all tests are finished.  This will do.
 
 
1803
        atexit.register(_rmtree_temp_dir, root)
 
 
1805
    def makeAndChdirToTestDir(self):
 
 
1806
        """Create a temporary directories for this one test.
 
 
1808
        This must set self.test_home_dir and self.test_dir and chdir to
 
 
1811
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
 
1813
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
 
1814
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
 
1815
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
 
1817
    def make_branch(self, relpath, format=None):
 
 
1818
        """Create a branch on the transport at relpath."""
 
 
1819
        repo = self.make_repository(relpath, format=format)
 
 
1820
        return repo.bzrdir.create_branch()
 
 
1822
    def make_bzrdir(self, relpath, format=None):
 
 
1824
            # might be a relative or absolute path
 
 
1825
            maybe_a_url = self.get_url(relpath)
 
 
1826
            segments = maybe_a_url.rsplit('/', 1)
 
 
1827
            t = get_transport(maybe_a_url)
 
 
1828
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
 
1832
            if isinstance(format, basestring):
 
 
1833
                format = bzrdir.format_registry.make_bzrdir(format)
 
 
1834
            return format.initialize_on_transport(t)
 
 
1835
        except errors.UninitializableFormat:
 
 
1836
            raise TestSkipped("Format %s is not initializable." % format)
 
 
1838
    def make_repository(self, relpath, shared=False, format=None):
 
 
1839
        """Create a repository on our default transport at relpath.
 
 
1841
        Note that relpath must be a relative path, not a full url.
 
 
1843
        # FIXME: If you create a remoterepository this returns the underlying
 
 
1844
        # real format, which is incorrect.  Actually we should make sure that 
 
 
1845
        # RemoteBzrDir returns a RemoteRepository.
 
 
1846
        # maybe  mbp 20070410
 
 
1847
        made_control = self.make_bzrdir(relpath, format=format)
 
 
1848
        return made_control.create_repository(shared=shared)
 
 
1850
    def make_branch_and_memory_tree(self, relpath, format=None):
 
 
1851
        """Create a branch on the default transport and a MemoryTree for it."""
 
 
1852
        b = self.make_branch(relpath, format=format)
 
 
1853
        return memorytree.MemoryTree.create_on_branch(b)
 
 
1855
    def overrideEnvironmentForTesting(self):
 
 
1856
        os.environ['HOME'] = self.test_home_dir
 
 
1857
        os.environ['BZR_HOME'] = self.test_home_dir
 
 
1860
        super(TestCaseWithMemoryTransport, self).setUp()
 
 
1861
        self._make_test_root()
 
 
1862
        _currentdir = os.getcwdu()
 
 
1863
        def _leaveDirectory():
 
 
1864
            os.chdir(_currentdir)
 
 
1865
        self.addCleanup(_leaveDirectory)
 
 
1866
        self.makeAndChdirToTestDir()
 
 
1867
        self.overrideEnvironmentForTesting()
 
 
1868
        self.__readonly_server = None
 
 
1869
        self.__server = None
 
 
1870
        self.reduceLockdirTimeout()
 
 
1873
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
 
1874
    """Derived class that runs a test within a temporary directory.
 
 
1876
    This is useful for tests that need to create a branch, etc.
 
 
1878
    The directory is created in a slightly complex way: for each
 
 
1879
    Python invocation, a new temporary top-level directory is created.
 
 
1880
    All test cases create their own directory within that.  If the
 
 
1881
    tests complete successfully, the directory is removed.
 
 
1883
    :ivar test_base_dir: The path of the top-level directory for this 
 
 
1884
    test, which contains a home directory and a work directory.
 
 
1886
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
 
1887
    which is used as $HOME for this test.
 
 
1889
    :ivar test_dir: A directory under test_base_dir used as the current
 
 
1890
    directory when the test proper is run.
 
 
1893
    OVERRIDE_PYTHON = 'python'
 
 
1895
    def check_file_contents(self, filename, expect):
 
 
1896
        self.log("check contents of file %s" % filename)
 
 
1897
        contents = file(filename, 'r').read()
 
 
1898
        if contents != expect:
 
 
1899
            self.log("expected: %r" % expect)
 
 
1900
            self.log("actually: %r" % contents)
 
 
1901
            self.fail("contents of %s not as expected" % filename)
 
 
1903
    def makeAndChdirToTestDir(self):
 
 
1904
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
 
1906
        For TestCaseInTempDir we create a temporary directory based on the test
 
 
1907
        name and then create two subdirs - test and home under it.
 
 
1909
        # create a directory within the top level test directory
 
 
1910
        candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
 
 
1911
        # now create test and home directories within this dir
 
 
1912
        self.test_base_dir = candidate_dir
 
 
1913
        self.test_home_dir = self.test_base_dir + '/home'
 
 
1914
        os.mkdir(self.test_home_dir)
 
 
1915
        self.test_dir = self.test_base_dir + '/work'
 
 
1916
        os.mkdir(self.test_dir)
 
 
1917
        os.chdir(self.test_dir)
 
 
1918
        # put name of test inside
 
 
1919
        f = file(self.test_base_dir + '/name', 'w')
 
 
1924
        self.addCleanup(self.deleteTestDir)
 
 
1926
    def deleteTestDir(self):
 
 
1927
        os.chdir(self.TEST_ROOT)
 
 
1928
        _rmtree_temp_dir(self.test_base_dir)
 
 
1930
    def build_tree(self, shape, line_endings='binary', transport=None):
 
 
1931
        """Build a test tree according to a pattern.
 
 
1933
        shape is a sequence of file specifications.  If the final
 
 
1934
        character is '/', a directory is created.
 
 
1936
        This assumes that all the elements in the tree being built are new.
 
 
1938
        This doesn't add anything to a branch.
 
 
1940
        :param line_endings: Either 'binary' or 'native'
 
 
1941
            in binary mode, exact contents are written in native mode, the
 
 
1942
            line endings match the default platform endings.
 
 
1943
        :param transport: A transport to write to, for building trees on VFS's.
 
 
1944
            If the transport is readonly or None, "." is opened automatically.
 
 
1947
        # It's OK to just create them using forward slashes on windows.
 
 
1948
        if transport is None or transport.is_readonly():
 
 
1949
            transport = get_transport(".")
 
 
1951
            self.assert_(isinstance(name, basestring))
 
 
1953
                transport.mkdir(urlutils.escape(name[:-1]))
 
 
1955
                if line_endings == 'binary':
 
 
1957
                elif line_endings == 'native':
 
 
1960
                    raise errors.BzrError(
 
 
1961
                        'Invalid line ending request %r' % line_endings)
 
 
1962
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
 
1963
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
 
1965
    def build_tree_contents(self, shape):
 
 
1966
        build_tree_contents(shape)
 
 
1968
    def assertFileEqual(self, content, path):
 
 
1969
        """Fail if path does not contain 'content'."""
 
 
1970
        self.failUnlessExists(path)
 
 
1971
        f = file(path, 'rb')
 
 
1976
        self.assertEqualDiff(content, s)
 
 
1978
    def failUnlessExists(self, path):
 
 
1979
        """Fail unless path or paths, which may be abs or relative, exist."""
 
 
1980
        if not isinstance(path, basestring):
 
 
1982
                self.failUnlessExists(p)
 
 
1984
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
 
1986
    def failIfExists(self, path):
 
 
1987
        """Fail if path or paths, which may be abs or relative, exist."""
 
 
1988
        if not isinstance(path, basestring):
 
 
1990
                self.failIfExists(p)
 
 
1992
            self.failIf(osutils.lexists(path),path+" exists")
 
 
1994
    def assertInWorkingTree(self,path,root_path='.',tree=None):
 
 
1995
        """Assert whether path or paths are in the WorkingTree"""
 
 
1997
            tree = workingtree.WorkingTree.open(root_path)
 
 
1998
        if not isinstance(path, basestring):
 
 
2000
                self.assertInWorkingTree(p,tree=tree)
 
 
2002
            self.assertIsNot(tree.path2id(path), None,
 
 
2003
                path+' not in working tree.')
 
 
2005
    def assertNotInWorkingTree(self,path,root_path='.',tree=None):
 
 
2006
        """Assert whether path or paths are not in the WorkingTree"""
 
 
2008
            tree = workingtree.WorkingTree.open(root_path)
 
 
2009
        if not isinstance(path, basestring):
 
 
2011
                self.assertNotInWorkingTree(p,tree=tree)
 
 
2013
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
 
2016
class TestCaseWithTransport(TestCaseInTempDir):
 
 
2017
    """A test case that provides get_url and get_readonly_url facilities.
 
 
2019
    These back onto two transport servers, one for readonly access and one for
 
 
2022
    If no explicit class is provided for readonly access, a
 
 
2023
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
 
2024
    based read write transports.
 
 
2026
    If an explicit class is provided for readonly access, that server and the 
 
 
2027
    readwrite one must both define get_url() as resolving to os.getcwd().
 
 
2030
    def get_vfs_only_server(self):
 
 
2031
        """See TestCaseWithMemoryTransport.
 
 
2033
        This is useful for some tests with specific servers that need
 
 
2036
        if self.__vfs_server is None:
 
 
2037
            self.__vfs_server = self.vfs_transport_factory()
 
 
2038
            self.__vfs_server.setUp()
 
 
2039
            self.addCleanup(self.__vfs_server.tearDown)
 
 
2040
        return self.__vfs_server
 
 
2042
    def make_branch_and_tree(self, relpath, format=None):
 
 
2043
        """Create a branch on the transport and a tree locally.
 
 
2045
        If the transport is not a LocalTransport, the Tree can't be created on
 
 
2046
        the transport.  In that case if the vfs_transport_factory is
 
 
2047
        LocalURLServer the working tree is created in the local
 
 
2048
        directory backing the transport, and the returned tree's branch and
 
 
2049
        repository will also be accessed locally. Otherwise a lightweight
 
 
2050
        checkout is created and returned.
 
 
2052
        :param format: The BzrDirFormat.
 
 
2053
        :returns: the WorkingTree.
 
 
2055
        # TODO: always use the local disk path for the working tree,
 
 
2056
        # this obviously requires a format that supports branch references
 
 
2057
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
 
2059
        b = self.make_branch(relpath, format=format)
 
 
2061
            return b.bzrdir.create_workingtree()
 
 
2062
        except errors.NotLocalUrl:
 
 
2063
            # We can only make working trees locally at the moment.  If the
 
 
2064
            # transport can't support them, then we keep the non-disk-backed
 
 
2065
            # branch and create a local checkout.
 
 
2066
            if self.vfs_transport_factory is LocalURLServer:
 
 
2067
                # the branch is colocated on disk, we cannot create a checkout.
 
 
2068
                # hopefully callers will expect this.
 
 
2069
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
 
2070
                return local_controldir.create_workingtree()
 
 
2072
                return b.create_checkout(relpath, lightweight=True)
 
 
2074
    def assertIsDirectory(self, relpath, transport):
 
 
2075
        """Assert that relpath within transport is a directory.
 
 
2077
        This may not be possible on all transports; in that case it propagates
 
 
2078
        a TransportNotPossible.
 
 
2081
            mode = transport.stat(relpath).st_mode
 
 
2082
        except errors.NoSuchFile:
 
 
2083
            self.fail("path %s is not a directory; no such file"
 
 
2085
        if not stat.S_ISDIR(mode):
 
 
2086
            self.fail("path %s is not a directory; has mode %#o"
 
 
2089
    def assertTreesEqual(self, left, right):
 
 
2090
        """Check that left and right have the same content and properties."""
 
 
2091
        # we use a tree delta to check for equality of the content, and we
 
 
2092
        # manually check for equality of other things such as the parents list.
 
 
2093
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
 
2094
        differences = left.changes_from(right)
 
 
2095
        self.assertFalse(differences.has_changed(),
 
 
2096
            "Trees %r and %r are different: %r" % (left, right, differences))
 
 
2099
        super(TestCaseWithTransport, self).setUp()
 
 
2100
        self.__vfs_server = None
 
 
2103
class ChrootedTestCase(TestCaseWithTransport):
 
 
2104
    """A support class that provides readonly urls outside the local namespace.
 
 
2106
    This is done by checking if self.transport_server is a MemoryServer. if it
 
 
2107
    is then we are chrooted already, if it is not then an HttpServer is used
 
 
2110
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
 
2111
                       be used without needed to redo it when a different 
 
 
2112
                       subclass is in use ?
 
 
2116
        super(ChrootedTestCase, self).setUp()
 
 
2117
        if not self.vfs_transport_factory == MemoryServer:
 
 
2118
            self.transport_readonly_server = HttpServer
 
 
2121
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
 
 
2122
                       random_order=False):
 
 
2123
    """Create a test suite by filtering another one.
 
 
2125
    :param suite:           the source suite
 
 
2126
    :param pattern:         pattern that names must match
 
 
2127
    :param exclude_pattern: pattern that names must not match, if any
 
 
2128
    :param random_order:    if True, tests in the new suite will be put in
 
 
2130
    :returns: the newly created suite
 
 
2132
    return sort_suite_by_re(suite, pattern, exclude_pattern,
 
 
2133
        random_order, False)
 
 
2136
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
 
2137
                     random_order=False, append_rest=True):
 
 
2138
    """Create a test suite by sorting another one.
 
 
2140
    :param suite:           the source suite
 
 
2141
    :param pattern:         pattern that names must match in order to go
 
 
2142
                            first in the new suite
 
 
2143
    :param exclude_pattern: pattern that names must not match, if any
 
 
2144
    :param random_order:    if True, tests in the new suite will be put in
 
 
2146
    :param append_rest:     if False, pattern is a strict filter and not
 
 
2147
                            just an ordering directive
 
 
2148
    :returns: the newly created suite
 
 
2152
    filter_re = re.compile(pattern)
 
 
2153
    if exclude_pattern is not None:
 
 
2154
        exclude_re = re.compile(exclude_pattern)
 
 
2155
    for test in iter_suite_tests(suite):
 
 
2157
        if exclude_pattern is None or not exclude_re.search(test_id):
 
 
2158
            if filter_re.search(test_id):
 
 
2163
        random.shuffle(first)
 
 
2164
        random.shuffle(second)
 
 
2165
    return TestUtil.TestSuite(first + second)
 
 
2168
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
 
2169
              stop_on_failure=False,
 
 
2170
              transport=None, lsprof_timed=None, bench_history=None,
 
 
2171
              matching_tests_first=None,
 
 
2174
              exclude_pattern=None,
 
 
2176
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
 
2181
    runner = TextTestRunner(stream=sys.stdout,
 
 
2183
                            verbosity=verbosity,
 
 
2184
                            bench_history=bench_history,
 
 
2185
                            list_only=list_only,
 
 
2187
    runner.stop_on_failure=stop_on_failure
 
 
2188
    # Initialise the random number generator and display the seed used.
 
 
2189
    # We convert the seed to a long to make it reuseable across invocations.
 
 
2190
    random_order = False
 
 
2191
    if random_seed is not None:
 
 
2193
        if random_seed == "now":
 
 
2194
            random_seed = long(time.time())
 
 
2196
            # Convert the seed to a long if we can
 
 
2198
                random_seed = long(random_seed)
 
 
2201
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
 
2203
        random.seed(random_seed)
 
 
2204
    # Customise the list of tests if requested
 
 
2205
    if pattern != '.*' or exclude_pattern is not None or random_order:
 
 
2206
        if matching_tests_first:
 
 
2207
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
 
 
2210
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
 
 
2212
    result = runner.run(suite)
 
 
2213
    return result.wasSuccessful()
 
 
2216
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
 
2218
             test_suite_factory=None,
 
 
2221
             matching_tests_first=None,
 
 
2224
             exclude_pattern=None):
 
 
2225
    """Run the whole test suite under the enhanced runner"""
 
 
2226
    # XXX: Very ugly way to do this...
 
 
2227
    # Disable warning about old formats because we don't want it to disturb
 
 
2228
    # any blackbox tests.
 
 
2229
    from bzrlib import repository
 
 
2230
    repository._deprecation_warning_done = True
 
 
2232
    global default_transport
 
 
2233
    if transport is None:
 
 
2234
        transport = default_transport
 
 
2235
    old_transport = default_transport
 
 
2236
    default_transport = transport
 
 
2238
        if test_suite_factory is None:
 
 
2239
            suite = test_suite()
 
 
2241
            suite = test_suite_factory()
 
 
2242
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
 
2243
                     stop_on_failure=stop_on_failure,
 
 
2244
                     transport=transport,
 
 
2245
                     lsprof_timed=lsprof_timed,
 
 
2246
                     bench_history=bench_history,
 
 
2247
                     matching_tests_first=matching_tests_first,
 
 
2248
                     list_only=list_only,
 
 
2249
                     random_seed=random_seed,
 
 
2250
                     exclude_pattern=exclude_pattern)
 
 
2252
        default_transport = old_transport
 
 
2256
    """Build and return TestSuite for the whole of bzrlib.
 
 
2258
    This function can be replaced if you need to change the default test
 
 
2259
    suite on a global basis, but it is not encouraged.
 
 
2262
                   'bzrlib.util.tests.test_bencode',
 
 
2263
                   'bzrlib.tests.test__dirstate_helpers',
 
 
2264
                   'bzrlib.tests.test_ancestry',
 
 
2265
                   'bzrlib.tests.test_annotate',
 
 
2266
                   'bzrlib.tests.test_api',
 
 
2267
                   'bzrlib.tests.test_atomicfile',
 
 
2268
                   'bzrlib.tests.test_bad_files',
 
 
2269
                   'bzrlib.tests.test_branch',
 
 
2270
                   'bzrlib.tests.test_branchbuilder',
 
 
2271
                   'bzrlib.tests.test_bugtracker',
 
 
2272
                   'bzrlib.tests.test_bundle',
 
 
2273
                   'bzrlib.tests.test_bzrdir',
 
 
2274
                   'bzrlib.tests.test_cache_utf8',
 
 
2275
                   'bzrlib.tests.test_commands',
 
 
2276
                   'bzrlib.tests.test_commit',
 
 
2277
                   'bzrlib.tests.test_commit_merge',
 
 
2278
                   'bzrlib.tests.test_config',
 
 
2279
                   'bzrlib.tests.test_conflicts',
 
 
2280
                   'bzrlib.tests.test_counted_lock',
 
 
2281
                   'bzrlib.tests.test_decorators',
 
 
2282
                   'bzrlib.tests.test_delta',
 
 
2283
                   'bzrlib.tests.test_deprecated_graph',
 
 
2284
                   'bzrlib.tests.test_diff',
 
 
2285
                   'bzrlib.tests.test_dirstate',
 
 
2286
                   'bzrlib.tests.test_email_message',
 
 
2287
                   'bzrlib.tests.test_errors',
 
 
2288
                   'bzrlib.tests.test_escaped_store',
 
 
2289
                   'bzrlib.tests.test_extract',
 
 
2290
                   'bzrlib.tests.test_fetch',
 
 
2291
                   'bzrlib.tests.test_file_names',
 
 
2292
                   'bzrlib.tests.test_ftp_transport',
 
 
2293
                   'bzrlib.tests.test_generate_docs',
 
 
2294
                   'bzrlib.tests.test_generate_ids',
 
 
2295
                   'bzrlib.tests.test_globbing',
 
 
2296
                   'bzrlib.tests.test_gpg',
 
 
2297
                   'bzrlib.tests.test_graph',
 
 
2298
                   'bzrlib.tests.test_hashcache',
 
 
2299
                   'bzrlib.tests.test_help',
 
 
2300
                   'bzrlib.tests.test_hooks',
 
 
2301
                   'bzrlib.tests.test_http',
 
 
2302
                   'bzrlib.tests.test_http_response',
 
 
2303
                   'bzrlib.tests.test_https_ca_bundle',
 
 
2304
                   'bzrlib.tests.test_identitymap',
 
 
2305
                   'bzrlib.tests.test_ignores',
 
 
2306
                   'bzrlib.tests.test_index',
 
 
2307
                   'bzrlib.tests.test_info',
 
 
2308
                   'bzrlib.tests.test_inv',
 
 
2309
                   'bzrlib.tests.test_knit',
 
 
2310
                   'bzrlib.tests.test_lazy_import',
 
 
2311
                   'bzrlib.tests.test_lazy_regex',
 
 
2312
                   'bzrlib.tests.test_lockdir',
 
 
2313
                   'bzrlib.tests.test_lockable_files',
 
 
2314
                   'bzrlib.tests.test_log',
 
 
2315
                   'bzrlib.tests.test_lsprof',
 
 
2316
                   'bzrlib.tests.test_memorytree',
 
 
2317
                   'bzrlib.tests.test_merge',
 
 
2318
                   'bzrlib.tests.test_merge3',
 
 
2319
                   'bzrlib.tests.test_merge_core',
 
 
2320
                   'bzrlib.tests.test_merge_directive',
 
 
2321
                   'bzrlib.tests.test_missing',
 
 
2322
                   'bzrlib.tests.test_msgeditor',
 
 
2323
                   'bzrlib.tests.test_multiparent',
 
 
2324
                   'bzrlib.tests.test_nonascii',
 
 
2325
                   'bzrlib.tests.test_options',
 
 
2326
                   'bzrlib.tests.test_osutils',
 
 
2327
                   'bzrlib.tests.test_osutils_encodings',
 
 
2328
                   'bzrlib.tests.test_pack',
 
 
2329
                   'bzrlib.tests.test_patch',
 
 
2330
                   'bzrlib.tests.test_patches',
 
 
2331
                   'bzrlib.tests.test_permissions',
 
 
2332
                   'bzrlib.tests.test_plugins',
 
 
2333
                   'bzrlib.tests.test_progress',
 
 
2334
                   'bzrlib.tests.test_reconcile',
 
 
2335
                   'bzrlib.tests.test_registry',
 
 
2336
                   'bzrlib.tests.test_remote',
 
 
2337
                   'bzrlib.tests.test_repository',
 
 
2338
                   'bzrlib.tests.test_revert',
 
 
2339
                   'bzrlib.tests.test_revision',
 
 
2340
                   'bzrlib.tests.test_revisionnamespaces',
 
 
2341
                   'bzrlib.tests.test_revisiontree',
 
 
2342
                   'bzrlib.tests.test_rio',
 
 
2343
                   'bzrlib.tests.test_sampler',
 
 
2344
                   'bzrlib.tests.test_selftest',
 
 
2345
                   'bzrlib.tests.test_setup',
 
 
2346
                   'bzrlib.tests.test_sftp_transport',
 
 
2347
                   'bzrlib.tests.test_smart',
 
 
2348
                   'bzrlib.tests.test_smart_add',
 
 
2349
                   'bzrlib.tests.test_smart_transport',
 
 
2350
                   'bzrlib.tests.test_smtp_connection',
 
 
2351
                   'bzrlib.tests.test_source',
 
 
2352
                   'bzrlib.tests.test_ssh_transport',
 
 
2353
                   'bzrlib.tests.test_status',
 
 
2354
                   'bzrlib.tests.test_store',
 
 
2355
                   'bzrlib.tests.test_strace',
 
 
2356
                   'bzrlib.tests.test_subsume',
 
 
2357
                   'bzrlib.tests.test_symbol_versioning',
 
 
2358
                   'bzrlib.tests.test_tag',
 
 
2359
                   'bzrlib.tests.test_testament',
 
 
2360
                   'bzrlib.tests.test_textfile',
 
 
2361
                   'bzrlib.tests.test_textmerge',
 
 
2362
                   'bzrlib.tests.test_timestamp',
 
 
2363
                   'bzrlib.tests.test_trace',
 
 
2364
                   'bzrlib.tests.test_transactions',
 
 
2365
                   'bzrlib.tests.test_transform',
 
 
2366
                   'bzrlib.tests.test_transport',
 
 
2367
                   'bzrlib.tests.test_tree',
 
 
2368
                   'bzrlib.tests.test_treebuilder',
 
 
2369
                   'bzrlib.tests.test_tsort',
 
 
2370
                   'bzrlib.tests.test_tuned_gzip',
 
 
2371
                   'bzrlib.tests.test_ui',
 
 
2372
                   'bzrlib.tests.test_upgrade',
 
 
2373
                   'bzrlib.tests.test_urlutils',
 
 
2374
                   'bzrlib.tests.test_versionedfile',
 
 
2375
                   'bzrlib.tests.test_version',
 
 
2376
                   'bzrlib.tests.test_version_info',
 
 
2377
                   'bzrlib.tests.test_weave',
 
 
2378
                   'bzrlib.tests.test_whitebox',
 
 
2379
                   'bzrlib.tests.test_win32utils',
 
 
2380
                   'bzrlib.tests.test_workingtree',
 
 
2381
                   'bzrlib.tests.test_workingtree_4',
 
 
2382
                   'bzrlib.tests.test_wsgi',
 
 
2383
                   'bzrlib.tests.test_xml',
 
 
2385
    test_transport_implementations = [
 
 
2386
        'bzrlib.tests.test_transport_implementations',
 
 
2387
        'bzrlib.tests.test_read_bundle',
 
 
2389
    suite = TestUtil.TestSuite()
 
 
2390
    loader = TestUtil.TestLoader()
 
 
2391
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
 
2392
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
 
2393
    adapter = TransportTestProviderAdapter()
 
 
2394
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
 
2395
    for package in packages_to_test():
 
 
2396
        suite.addTest(package.test_suite())
 
 
2397
    for m in MODULES_TO_TEST:
 
 
2398
        suite.addTest(loader.loadTestsFromModule(m))
 
 
2399
    for m in MODULES_TO_DOCTEST:
 
 
2401
            suite.addTest(doctest.DocTestSuite(m))
 
 
2402
        except ValueError, e:
 
 
2403
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
 
2405
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
 
2406
        if getattr(plugin, 'test_suite', None) is not None:
 
 
2407
            default_encoding = sys.getdefaultencoding()
 
 
2409
                plugin_suite = plugin.test_suite()
 
 
2410
            except ImportError, e:
 
 
2411
                bzrlib.trace.warning(
 
 
2412
                    'Unable to test plugin "%s": %s', name, e)
 
 
2414
                suite.addTest(plugin_suite)
 
 
2415
            if default_encoding != sys.getdefaultencoding():
 
 
2416
                bzrlib.trace.warning(
 
 
2417
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
 
2418
                    sys.getdefaultencoding())
 
 
2420
                sys.setdefaultencoding(default_encoding)
 
 
2424
def adapt_modules(mods_list, adapter, loader, suite):
 
 
2425
    """Adapt the modules in mods_list using adapter and add to suite."""
 
 
2426
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
 
2427
        suite.addTests(adapter.adapt(test))
 
 
2430
def _rmtree_temp_dir(dirname):
 
 
2431
    # If LANG=C we probably have created some bogus paths
 
 
2432
    # which rmtree(unicode) will fail to delete
 
 
2433
    # so make sure we are using rmtree(str) to delete everything
 
 
2434
    # except on win32, where rmtree(str) will fail
 
 
2435
    # since it doesn't have the property of byte-stream paths
 
 
2436
    # (they are either ascii or mbcs)
 
 
2437
    if sys.platform == 'win32':
 
 
2438
        # make sure we are using the unicode win32 api
 
 
2439
        dirname = unicode(dirname)
 
 
2441
        dirname = dirname.encode(sys.getfilesystemencoding())
 
 
2443
        osutils.rmtree(dirname)
 
 
2445
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
 
2446
            print >>sys.stderr, ('Permission denied: '
 
 
2447
                                 'unable to remove testing dir '
 
 
2448
                                 '%s' % os.path.basename(dirname))
 
 
2453
class Feature(object):
 
 
2454
    """An operating system Feature."""
 
 
2457
        self._available = None
 
 
2459
    def available(self):
 
 
2460
        """Is the feature available?
 
 
2462
        :return: True if the feature is available.
 
 
2464
        if self._available is None:
 
 
2465
            self._available = self._probe()
 
 
2466
        return self._available
 
 
2469
        """Implement this method in concrete features.
 
 
2471
        :return: True if the feature is available.
 
 
2473
        raise NotImplementedError
 
 
2476
        if getattr(self, 'feature_name', None):
 
 
2477
            return self.feature_name()
 
 
2478
        return self.__class__.__name__
 
 
2481
class TestScenarioApplier(object):
 
 
2482
    """A tool to apply scenarios to tests."""
 
 
2484
    def adapt(self, test):
 
 
2485
        """Return a TestSuite containing a copy of test for each scenario."""
 
 
2486
        result = unittest.TestSuite()
 
 
2487
        for scenario in self.scenarios:
 
 
2488
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
 
2491
    def adapt_test_to_scenario(self, test, scenario):
 
 
2492
        """Copy test and apply scenario to it.
 
 
2494
        :param test: A test to adapt.
 
 
2495
        :param scenario: A tuple describing the scenarion.
 
 
2496
            The first element of the tuple is the new test id.
 
 
2497
            The second element is a dict containing attributes to set on the
 
 
2499
        :return: The adapted test.
 
 
2501
        from copy import deepcopy
 
 
2502
        new_test = deepcopy(test)
 
 
2503
        for name, value in scenario[1].items():
 
 
2504
            setattr(new_test, name, value)
 
 
2505
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
 
2506
        new_test.id = lambda: new_id