/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-05-07 10:04:52 UTC
  • mfrom: (3408.4.1 ianc-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20080507100452-ya8ofjjd5f5pb9q7
Nicer error when smart server started on an address already in use
        (Andrea Corbellini)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
 
43
import sys
 
44
import tempfile
 
45
import time
 
46
import unittest
 
47
import warnings
 
48
 
 
49
 
 
50
from bzrlib import (
 
51
    bzrdir,
 
52
    debug,
 
53
    errors,
 
54
    memorytree,
 
55
    osutils,
 
56
    progress,
 
57
    ui,
 
58
    urlutils,
 
59
    workingtree,
 
60
    )
 
61
import bzrlib.branch
 
62
import bzrlib.commands
 
63
import bzrlib.timestamp
 
64
import bzrlib.export
 
65
import bzrlib.inventory
 
66
import bzrlib.iterablefile
 
67
import bzrlib.lockdir
 
68
try:
 
69
    import bzrlib.lsprof
 
70
except ImportError:
 
71
    # lsprof not available
 
72
    pass
 
73
from bzrlib.merge import merge_inner
 
74
import bzrlib.merge3
 
75
import bzrlib.plugin
 
76
from bzrlib.revision import common_ancestor
 
77
import bzrlib.store
 
78
from bzrlib import symbol_versioning
 
79
from bzrlib.symbol_versioning import (
 
80
    DEPRECATED_PARAMETER,
 
81
    deprecated_function,
 
82
    deprecated_method,
 
83
    deprecated_passed,
 
84
    )
 
85
import bzrlib.trace
 
86
from bzrlib.transport import get_transport
 
87
import bzrlib.transport
 
88
from bzrlib.transport.local import LocalURLServer
 
89
from bzrlib.transport.memory import MemoryServer
 
90
from bzrlib.transport.readonly import ReadonlyServer
 
91
from bzrlib.trace import mutter, note
 
92
from bzrlib.tests import TestUtil
 
93
from bzrlib.tests.http_server import HttpServer
 
94
from bzrlib.tests.TestUtil import (
 
95
                          TestSuite,
 
96
                          TestLoader,
 
97
                          )
 
98
from bzrlib.tests.treeshape import build_tree_contents
 
99
import bzrlib.version_info_formats.format_custom
 
100
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
101
 
 
102
# Mark this python module as being part of the implementation
 
103
# of unittest: this gives us better tracebacks where the last
 
104
# shown frame is the test code, not our assertXYZ.
 
105
__unittest = 1
 
106
 
 
107
default_transport = LocalURLServer
 
108
 
 
109
 
 
110
def packages_to_test():
 
111
    """Return a list of packages to test.
 
112
 
 
113
    The packages are not globally imported so that import failures are
 
114
    triggered when running selftest, not when importing the command.
 
115
    """
 
116
    import bzrlib.doc
 
117
    import bzrlib.tests.blackbox
 
118
    import bzrlib.tests.branch_implementations
 
119
    import bzrlib.tests.bzrdir_implementations
 
120
    import bzrlib.tests.commands
 
121
    import bzrlib.tests.interrepository_implementations
 
122
    import bzrlib.tests.interversionedfile_implementations
 
123
    import bzrlib.tests.intertree_implementations
 
124
    import bzrlib.tests.inventory_implementations
 
125
    import bzrlib.tests.per_lock
 
126
    import bzrlib.tests.repository_implementations
 
127
    import bzrlib.tests.revisionstore_implementations
 
128
    import bzrlib.tests.tree_implementations
 
129
    import bzrlib.tests.workingtree_implementations
 
130
    return [
 
131
            bzrlib.doc,
 
132
            bzrlib.tests.blackbox,
 
133
            bzrlib.tests.branch_implementations,
 
134
            bzrlib.tests.bzrdir_implementations,
 
135
            bzrlib.tests.commands,
 
136
            bzrlib.tests.interrepository_implementations,
 
137
            bzrlib.tests.interversionedfile_implementations,
 
138
            bzrlib.tests.intertree_implementations,
 
139
            bzrlib.tests.inventory_implementations,
 
140
            bzrlib.tests.per_lock,
 
141
            bzrlib.tests.repository_implementations,
 
142
            bzrlib.tests.revisionstore_implementations,
 
143
            bzrlib.tests.tree_implementations,
 
144
            bzrlib.tests.workingtree_implementations,
 
145
            ]
 
146
 
 
147
 
 
148
class ExtendedTestResult(unittest._TextTestResult):
 
149
    """Accepts, reports and accumulates the results of running tests.
 
150
 
 
151
    Compared to the unittest version this class adds support for
 
152
    profiling, benchmarking, stopping as soon as a test fails,  and
 
153
    skipping tests.  There are further-specialized subclasses for
 
154
    different types of display.
 
155
 
 
156
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
157
    addFailure or addError classes.  These in turn may redirect to a more
 
158
    specific case for the special test results supported by our extended
 
159
    tests.
 
160
 
 
161
    Note that just one of these objects is fed the results from many tests.
 
162
    """
 
163
 
 
164
    stop_early = False
 
165
    
 
166
    def __init__(self, stream, descriptions, verbosity,
 
167
                 bench_history=None,
 
168
                 num_tests=None,
 
169
                 ):
 
170
        """Construct new TestResult.
 
171
 
 
172
        :param bench_history: Optionally, a writable file object to accumulate
 
173
            benchmark results.
 
174
        """
 
175
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
176
        if bench_history is not None:
 
177
            from bzrlib.version import _get_bzr_source_tree
 
178
            src_tree = _get_bzr_source_tree()
 
179
            if src_tree:
 
180
                try:
 
181
                    revision_id = src_tree.get_parent_ids()[0]
 
182
                except IndexError:
 
183
                    # XXX: if this is a brand new tree, do the same as if there
 
184
                    # is no branch.
 
185
                    revision_id = ''
 
186
            else:
 
187
                # XXX: If there's no branch, what should we do?
 
188
                revision_id = ''
 
189
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
190
        self._bench_history = bench_history
 
191
        self.ui = ui.ui_factory
 
192
        self.num_tests = num_tests
 
193
        self.error_count = 0
 
194
        self.failure_count = 0
 
195
        self.known_failure_count = 0
 
196
        self.skip_count = 0
 
197
        self.not_applicable_count = 0
 
198
        self.unsupported = {}
 
199
        self.count = 0
 
200
        self._overall_start_time = time.time()
 
201
    
 
202
    def _extractBenchmarkTime(self, testCase):
 
203
        """Add a benchmark time for the current test case."""
 
204
        return getattr(testCase, "_benchtime", None)
 
205
    
 
206
    def _elapsedTestTimeString(self):
 
207
        """Return a time string for the overall time the current test has taken."""
 
208
        return self._formatTime(time.time() - self._start_time)
 
209
 
 
210
    def _testTimeString(self, testCase):
 
211
        benchmark_time = self._extractBenchmarkTime(testCase)
 
212
        if benchmark_time is not None:
 
213
            return "%s/%s" % (
 
214
                self._formatTime(benchmark_time),
 
215
                self._elapsedTestTimeString())
 
216
        else:
 
217
            return "           %s" % self._elapsedTestTimeString()
 
218
 
 
219
    def _formatTime(self, seconds):
 
220
        """Format seconds as milliseconds with leading spaces."""
 
221
        # some benchmarks can take thousands of seconds to run, so we need 8
 
222
        # places
 
223
        return "%8dms" % (1000 * seconds)
 
224
 
 
225
    def _shortened_test_description(self, test):
 
226
        what = test.id()
 
227
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
228
        return what
 
229
 
 
230
    def startTest(self, test):
 
231
        unittest.TestResult.startTest(self, test)
 
232
        self.report_test_start(test)
 
233
        test.number = self.count
 
234
        self._recordTestStartTime()
 
235
 
 
236
    def _recordTestStartTime(self):
 
237
        """Record that a test has started."""
 
238
        self._start_time = time.time()
 
239
 
 
240
    def _cleanupLogFile(self, test):
 
241
        # We can only do this if we have one of our TestCases, not if
 
242
        # we have a doctest.
 
243
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
244
        if setKeepLogfile is not None:
 
245
            setKeepLogfile()
 
246
 
 
247
    def addError(self, test, err):
 
248
        """Tell result that test finished with an error.
 
249
 
 
250
        Called from the TestCase run() method when the test
 
251
        fails with an unexpected error.
 
252
        """
 
253
        self._testConcluded(test)
 
254
        if isinstance(err[1], TestSkipped):
 
255
            return self._addSkipped(test, err)
 
256
        elif isinstance(err[1], UnavailableFeature):
 
257
            return self.addNotSupported(test, err[1].args[0])
 
258
        else:
 
259
            self._cleanupLogFile(test)
 
260
            unittest.TestResult.addError(self, test, err)
 
261
            self.error_count += 1
 
262
            self.report_error(test, err)
 
263
            if self.stop_early:
 
264
                self.stop()
 
265
 
 
266
    def addFailure(self, test, err):
 
267
        """Tell result that test failed.
 
268
 
 
269
        Called from the TestCase run() method when the test
 
270
        fails because e.g. an assert() method failed.
 
271
        """
 
272
        self._testConcluded(test)
 
273
        if isinstance(err[1], KnownFailure):
 
274
            return self._addKnownFailure(test, err)
 
275
        else:
 
276
            self._cleanupLogFile(test)
 
277
            unittest.TestResult.addFailure(self, test, err)
 
278
            self.failure_count += 1
 
279
            self.report_failure(test, err)
 
280
            if self.stop_early:
 
281
                self.stop()
 
282
 
 
283
    def addSuccess(self, test):
 
284
        """Tell result that test completed successfully.
 
285
 
 
286
        Called from the TestCase run()
 
287
        """
 
288
        self._testConcluded(test)
 
289
        if self._bench_history is not None:
 
290
            benchmark_time = self._extractBenchmarkTime(test)
 
291
            if benchmark_time is not None:
 
292
                self._bench_history.write("%s %s\n" % (
 
293
                    self._formatTime(benchmark_time),
 
294
                    test.id()))
 
295
        self.report_success(test)
 
296
        self._cleanupLogFile(test)
 
297
        unittest.TestResult.addSuccess(self, test)
 
298
        test._log_contents = ''
 
299
 
 
300
    def _testConcluded(self, test):
 
301
        """Common code when a test has finished.
 
302
 
 
303
        Called regardless of whether it succeded, failed, etc.
 
304
        """
 
305
        pass
 
306
 
 
307
    def _addKnownFailure(self, test, err):
 
308
        self.known_failure_count += 1
 
309
        self.report_known_failure(test, err)
 
310
 
 
311
    def addNotSupported(self, test, feature):
 
312
        """The test will not be run because of a missing feature.
 
313
        """
 
314
        # this can be called in two different ways: it may be that the
 
315
        # test started running, and then raised (through addError) 
 
316
        # UnavailableFeature.  Alternatively this method can be called
 
317
        # while probing for features before running the tests; in that
 
318
        # case we will see startTest and stopTest, but the test will never
 
319
        # actually run.
 
320
        self.unsupported.setdefault(str(feature), 0)
 
321
        self.unsupported[str(feature)] += 1
 
322
        self.report_unsupported(test, feature)
 
323
 
 
324
    def _addSkipped(self, test, skip_excinfo):
 
325
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
326
            self.not_applicable_count += 1
 
327
            self.report_not_applicable(test, skip_excinfo)
 
328
        else:
 
329
            self.skip_count += 1
 
330
            self.report_skip(test, skip_excinfo)
 
331
        try:
 
332
            test.tearDown()
 
333
        except KeyboardInterrupt:
 
334
            raise
 
335
        except:
 
336
            self.addError(test, test._exc_info())
 
337
        else:
 
338
            # seems best to treat this as success from point-of-view of unittest
 
339
            # -- it actually does nothing so it barely matters :)
 
340
            unittest.TestResult.addSuccess(self, test)
 
341
            test._log_contents = ''
 
342
 
 
343
    def printErrorList(self, flavour, errors):
 
344
        for test, err in errors:
 
345
            self.stream.writeln(self.separator1)
 
346
            self.stream.write("%s: " % flavour)
 
347
            self.stream.writeln(self.getDescription(test))
 
348
            if getattr(test, '_get_log', None) is not None:
 
349
                self.stream.write('\n')
 
350
                self.stream.write(
 
351
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
352
                self.stream.write('\n')
 
353
                self.stream.write(test._get_log())
 
354
                self.stream.write('\n')
 
355
                self.stream.write(
 
356
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
357
                self.stream.write('\n')
 
358
            self.stream.writeln(self.separator2)
 
359
            self.stream.writeln("%s" % err)
 
360
 
 
361
    def finished(self):
 
362
        pass
 
363
 
 
364
    def report_cleaning_up(self):
 
365
        pass
 
366
 
 
367
    def report_success(self, test):
 
368
        pass
 
369
 
 
370
    def wasStrictlySuccessful(self):
 
371
        if self.unsupported or self.known_failure_count:
 
372
            return False
 
373
        return self.wasSuccessful()
 
374
 
 
375
 
 
376
class TextTestResult(ExtendedTestResult):
 
377
    """Displays progress and results of tests in text form"""
 
378
 
 
379
    def __init__(self, stream, descriptions, verbosity,
 
380
                 bench_history=None,
 
381
                 num_tests=None,
 
382
                 pb=None,
 
383
                 ):
 
384
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
385
            bench_history, num_tests)
 
386
        if pb is None:
 
387
            self.pb = self.ui.nested_progress_bar()
 
388
            self._supplied_pb = False
 
389
        else:
 
390
            self.pb = pb
 
391
            self._supplied_pb = True
 
392
        self.pb.show_pct = False
 
393
        self.pb.show_spinner = False
 
394
        self.pb.show_eta = False,
 
395
        self.pb.show_count = False
 
396
        self.pb.show_bar = False
 
397
 
 
398
    def report_starting(self):
 
399
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
400
 
 
401
    def _progress_prefix_text(self):
 
402
        # the longer this text, the less space we have to show the test
 
403
        # name...
 
404
        a = '[%d' % self.count              # total that have been run
 
405
        # tests skipped as known not to be relevant are not important enough
 
406
        # to show here
 
407
        ## if self.skip_count:
 
408
        ##     a += ', %d skip' % self.skip_count
 
409
        ## if self.known_failure_count:
 
410
        ##     a += '+%dX' % self.known_failure_count
 
411
        if self.num_tests is not None:
 
412
            a +='/%d' % self.num_tests
 
413
        a += ' in '
 
414
        runtime = time.time() - self._overall_start_time
 
415
        if runtime >= 60:
 
416
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
417
        else:
 
418
            a += '%ds' % runtime
 
419
        if self.error_count:
 
420
            a += ', %d err' % self.error_count
 
421
        if self.failure_count:
 
422
            a += ', %d fail' % self.failure_count
 
423
        if self.unsupported:
 
424
            a += ', %d missing' % len(self.unsupported)
 
425
        a += ']'
 
426
        return a
 
427
 
 
428
    def report_test_start(self, test):
 
429
        self.count += 1
 
430
        self.pb.update(
 
431
                self._progress_prefix_text()
 
432
                + ' ' 
 
433
                + self._shortened_test_description(test))
 
434
 
 
435
    def _test_description(self, test):
 
436
        return self._shortened_test_description(test)
 
437
 
 
438
    def report_error(self, test, err):
 
439
        self.pb.note('ERROR: %s\n    %s\n', 
 
440
            self._test_description(test),
 
441
            err[1],
 
442
            )
 
443
 
 
444
    def report_failure(self, test, err):
 
445
        self.pb.note('FAIL: %s\n    %s\n', 
 
446
            self._test_description(test),
 
447
            err[1],
 
448
            )
 
449
 
 
450
    def report_known_failure(self, test, err):
 
451
        self.pb.note('XFAIL: %s\n%s\n',
 
452
            self._test_description(test), err[1])
 
453
 
 
454
    def report_skip(self, test, skip_excinfo):
 
455
        pass
 
456
 
 
457
    def report_not_applicable(self, test, skip_excinfo):
 
458
        pass
 
459
 
 
460
    def report_unsupported(self, test, feature):
 
461
        """test cannot be run because feature is missing."""
 
462
                  
 
463
    def report_cleaning_up(self):
 
464
        self.pb.update('cleaning up...')
 
465
 
 
466
    def finished(self):
 
467
        if not self._supplied_pb:
 
468
            self.pb.finished()
 
469
 
 
470
 
 
471
class VerboseTestResult(ExtendedTestResult):
 
472
    """Produce long output, with one line per test run plus times"""
 
473
 
 
474
    def _ellipsize_to_right(self, a_string, final_width):
 
475
        """Truncate and pad a string, keeping the right hand side"""
 
476
        if len(a_string) > final_width:
 
477
            result = '...' + a_string[3-final_width:]
 
478
        else:
 
479
            result = a_string
 
480
        return result.ljust(final_width)
 
481
 
 
482
    def report_starting(self):
 
483
        self.stream.write('running %d tests...\n' % self.num_tests)
 
484
 
 
485
    def report_test_start(self, test):
 
486
        self.count += 1
 
487
        name = self._shortened_test_description(test)
 
488
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
489
        # numbers, plus a trailing blank
 
490
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
491
        self.stream.write(self._ellipsize_to_right(name,
 
492
                          osutils.terminal_width()-30))
 
493
        self.stream.flush()
 
494
 
 
495
    def _error_summary(self, err):
 
496
        indent = ' ' * 4
 
497
        return '%s%s' % (indent, err[1])
 
498
 
 
499
    def report_error(self, test, err):
 
500
        self.stream.writeln('ERROR %s\n%s'
 
501
                % (self._testTimeString(test),
 
502
                   self._error_summary(err)))
 
503
 
 
504
    def report_failure(self, test, err):
 
505
        self.stream.writeln(' FAIL %s\n%s'
 
506
                % (self._testTimeString(test),
 
507
                   self._error_summary(err)))
 
508
 
 
509
    def report_known_failure(self, test, err):
 
510
        self.stream.writeln('XFAIL %s\n%s'
 
511
                % (self._testTimeString(test),
 
512
                   self._error_summary(err)))
 
513
 
 
514
    def report_success(self, test):
 
515
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
516
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
517
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
518
            stats.pprint(file=self.stream)
 
519
        # flush the stream so that we get smooth output. This verbose mode is
 
520
        # used to show the output in PQM.
 
521
        self.stream.flush()
 
522
 
 
523
    def report_skip(self, test, skip_excinfo):
 
524
        self.stream.writeln(' SKIP %s\n%s'
 
525
                % (self._testTimeString(test),
 
526
                   self._error_summary(skip_excinfo)))
 
527
 
 
528
    def report_not_applicable(self, test, skip_excinfo):
 
529
        self.stream.writeln('  N/A %s\n%s'
 
530
                % (self._testTimeString(test),
 
531
                   self._error_summary(skip_excinfo)))
 
532
 
 
533
    def report_unsupported(self, test, feature):
 
534
        """test cannot be run because feature is missing."""
 
535
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
536
                %(self._testTimeString(test), feature))
 
537
 
 
538
 
 
539
class TextTestRunner(object):
 
540
    stop_on_failure = False
 
541
 
 
542
    def __init__(self,
 
543
                 stream=sys.stderr,
 
544
                 descriptions=0,
 
545
                 verbosity=1,
 
546
                 bench_history=None,
 
547
                 list_only=False
 
548
                 ):
 
549
        self.stream = unittest._WritelnDecorator(stream)
 
550
        self.descriptions = descriptions
 
551
        self.verbosity = verbosity
 
552
        self._bench_history = bench_history
 
553
        self.list_only = list_only
 
554
 
 
555
    def run(self, test):
 
556
        "Run the given test case or test suite."
 
557
        startTime = time.time()
 
558
        if self.verbosity == 1:
 
559
            result_class = TextTestResult
 
560
        elif self.verbosity >= 2:
 
561
            result_class = VerboseTestResult
 
562
        result = result_class(self.stream,
 
563
                              self.descriptions,
 
564
                              self.verbosity,
 
565
                              bench_history=self._bench_history,
 
566
                              num_tests=test.countTestCases(),
 
567
                              )
 
568
        result.stop_early = self.stop_on_failure
 
569
        result.report_starting()
 
570
        if self.list_only:
 
571
            if self.verbosity >= 2:
 
572
                self.stream.writeln("Listing tests only ...\n")
 
573
            run = 0
 
574
            for t in iter_suite_tests(test):
 
575
                self.stream.writeln("%s" % (t.id()))
 
576
                run += 1
 
577
            actionTaken = "Listed"
 
578
        else: 
 
579
            test.run(result)
 
580
            run = result.testsRun
 
581
            actionTaken = "Ran"
 
582
        stopTime = time.time()
 
583
        timeTaken = stopTime - startTime
 
584
        result.printErrors()
 
585
        self.stream.writeln(result.separator2)
 
586
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
587
                            run, run != 1 and "s" or "", timeTaken))
 
588
        self.stream.writeln()
 
589
        if not result.wasSuccessful():
 
590
            self.stream.write("FAILED (")
 
591
            failed, errored = map(len, (result.failures, result.errors))
 
592
            if failed:
 
593
                self.stream.write("failures=%d" % failed)
 
594
            if errored:
 
595
                if failed: self.stream.write(", ")
 
596
                self.stream.write("errors=%d" % errored)
 
597
            if result.known_failure_count:
 
598
                if failed or errored: self.stream.write(", ")
 
599
                self.stream.write("known_failure_count=%d" %
 
600
                    result.known_failure_count)
 
601
            self.stream.writeln(")")
 
602
        else:
 
603
            if result.known_failure_count:
 
604
                self.stream.writeln("OK (known_failures=%d)" %
 
605
                    result.known_failure_count)
 
606
            else:
 
607
                self.stream.writeln("OK")
 
608
        if result.skip_count > 0:
 
609
            skipped = result.skip_count
 
610
            self.stream.writeln('%d test%s skipped' %
 
611
                                (skipped, skipped != 1 and "s" or ""))
 
612
        if result.unsupported:
 
613
            for feature, count in sorted(result.unsupported.items()):
 
614
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
615
                    (feature, count))
 
616
        result.finished()
 
617
        return result
 
618
 
 
619
 
 
620
def iter_suite_tests(suite):
 
621
    """Return all tests in a suite, recursing through nested suites"""
 
622
    for item in suite._tests:
 
623
        if isinstance(item, unittest.TestCase):
 
624
            yield item
 
625
        elif isinstance(item, unittest.TestSuite):
 
626
            for r in iter_suite_tests(item):
 
627
                yield r
 
628
        else:
 
629
            raise Exception('unknown object %r inside test suite %r'
 
630
                            % (item, suite))
 
631
 
 
632
 
 
633
class TestSkipped(Exception):
 
634
    """Indicates that a test was intentionally skipped, rather than failing."""
 
635
 
 
636
 
 
637
class TestNotApplicable(TestSkipped):
 
638
    """A test is not applicable to the situation where it was run.
 
639
 
 
640
    This is only normally raised by parameterized tests, if they find that 
 
641
    the instance they're constructed upon does not support one aspect 
 
642
    of its interface.
 
643
    """
 
644
 
 
645
 
 
646
class KnownFailure(AssertionError):
 
647
    """Indicates that a test failed in a precisely expected manner.
 
648
 
 
649
    Such failures dont block the whole test suite from passing because they are
 
650
    indicators of partially completed code or of future work. We have an
 
651
    explicit error for them so that we can ensure that they are always visible:
 
652
    KnownFailures are always shown in the output of bzr selftest.
 
653
    """
 
654
 
 
655
 
 
656
class UnavailableFeature(Exception):
 
657
    """A feature required for this test was not available.
 
658
 
 
659
    The feature should be used to construct the exception.
 
660
    """
 
661
 
 
662
 
 
663
class CommandFailed(Exception):
 
664
    pass
 
665
 
 
666
 
 
667
class StringIOWrapper(object):
 
668
    """A wrapper around cStringIO which just adds an encoding attribute.
 
669
    
 
670
    Internally we can check sys.stdout to see what the output encoding
 
671
    should be. However, cStringIO has no encoding attribute that we can
 
672
    set. So we wrap it instead.
 
673
    """
 
674
    encoding='ascii'
 
675
    _cstring = None
 
676
 
 
677
    def __init__(self, s=None):
 
678
        if s is not None:
 
679
            self.__dict__['_cstring'] = StringIO(s)
 
680
        else:
 
681
            self.__dict__['_cstring'] = StringIO()
 
682
 
 
683
    def __getattr__(self, name, getattr=getattr):
 
684
        return getattr(self.__dict__['_cstring'], name)
 
685
 
 
686
    def __setattr__(self, name, val):
 
687
        if name == 'encoding':
 
688
            self.__dict__['encoding'] = val
 
689
        else:
 
690
            return setattr(self._cstring, name, val)
 
691
 
 
692
 
 
693
class TestUIFactory(ui.CLIUIFactory):
 
694
    """A UI Factory for testing.
 
695
 
 
696
    Hide the progress bar but emit note()s.
 
697
    Redirect stdin.
 
698
    Allows get_password to be tested without real tty attached.
 
699
    """
 
700
 
 
701
    def __init__(self,
 
702
                 stdout=None,
 
703
                 stderr=None,
 
704
                 stdin=None):
 
705
        super(TestUIFactory, self).__init__()
 
706
        if stdin is not None:
 
707
            # We use a StringIOWrapper to be able to test various
 
708
            # encodings, but the user is still responsible to
 
709
            # encode the string and to set the encoding attribute
 
710
            # of StringIOWrapper.
 
711
            self.stdin = StringIOWrapper(stdin)
 
712
        if stdout is None:
 
713
            self.stdout = sys.stdout
 
714
        else:
 
715
            self.stdout = stdout
 
716
        if stderr is None:
 
717
            self.stderr = sys.stderr
 
718
        else:
 
719
            self.stderr = stderr
 
720
 
 
721
    def clear(self):
 
722
        """See progress.ProgressBar.clear()."""
 
723
 
 
724
    def clear_term(self):
 
725
        """See progress.ProgressBar.clear_term()."""
 
726
 
 
727
    def clear_term(self):
 
728
        """See progress.ProgressBar.clear_term()."""
 
729
 
 
730
    def finished(self):
 
731
        """See progress.ProgressBar.finished()."""
 
732
 
 
733
    def note(self, fmt_string, *args, **kwargs):
 
734
        """See progress.ProgressBar.note()."""
 
735
        self.stdout.write((fmt_string + "\n") % args)
 
736
 
 
737
    def progress_bar(self):
 
738
        return self
 
739
 
 
740
    def nested_progress_bar(self):
 
741
        return self
 
742
 
 
743
    def update(self, message, count=None, total=None):
 
744
        """See progress.ProgressBar.update()."""
 
745
 
 
746
    def get_non_echoed_password(self, prompt):
 
747
        """Get password from stdin without trying to handle the echo mode"""
 
748
        if prompt:
 
749
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
750
        password = self.stdin.readline()
 
751
        if not password:
 
752
            raise EOFError
 
753
        if password[-1] == '\n':
 
754
            password = password[:-1]
 
755
        return password
 
756
 
 
757
 
 
758
class TestCase(unittest.TestCase):
 
759
    """Base class for bzr unit tests.
 
760
    
 
761
    Tests that need access to disk resources should subclass 
 
762
    TestCaseInTempDir not TestCase.
 
763
 
 
764
    Error and debug log messages are redirected from their usual
 
765
    location into a temporary file, the contents of which can be
 
766
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
767
    so that it can also capture file IO.  When the test completes this file
 
768
    is read into memory and removed from disk.
 
769
       
 
770
    There are also convenience functions to invoke bzr's command-line
 
771
    routine, and to build and check bzr trees.
 
772
   
 
773
    In addition to the usual method of overriding tearDown(), this class also
 
774
    allows subclasses to register functions into the _cleanups list, which is
 
775
    run in order as the object is torn down.  It's less likely this will be
 
776
    accidentally overlooked.
 
777
    """
 
778
 
 
779
    _log_file_name = None
 
780
    _log_contents = ''
 
781
    _keep_log_file = False
 
782
    # record lsprof data when performing benchmark calls.
 
783
    _gather_lsprof_in_benchmarks = False
 
784
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
785
                     '_log_contents', '_log_file_name', '_benchtime',
 
786
                     '_TestCase__testMethodName')
 
787
 
 
788
    def __init__(self, methodName='testMethod'):
 
789
        super(TestCase, self).__init__(methodName)
 
790
        self._cleanups = []
 
791
 
 
792
    def setUp(self):
 
793
        unittest.TestCase.setUp(self)
 
794
        self._cleanEnvironment()
 
795
        self._silenceUI()
 
796
        self._startLogFile()
 
797
        self._benchcalls = []
 
798
        self._benchtime = None
 
799
        self._clear_hooks()
 
800
        self._clear_debug_flags()
 
801
 
 
802
    def _clear_debug_flags(self):
 
803
        """Prevent externally set debug flags affecting tests.
 
804
        
 
805
        Tests that want to use debug flags can just set them in the
 
806
        debug_flags set during setup/teardown.
 
807
        """
 
808
        if 'selftest_debug' not in debug.debug_flags:
 
809
            self._preserved_debug_flags = set(debug.debug_flags)
 
810
            debug.debug_flags.clear()
 
811
            self.addCleanup(self._restore_debug_flags)
 
812
 
 
813
    def _clear_hooks(self):
 
814
        # prevent hooks affecting tests
 
815
        import bzrlib.branch
 
816
        import bzrlib.smart.server
 
817
        self._preserved_hooks = {
 
818
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
819
            bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
 
820
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
821
            }
 
822
        self.addCleanup(self._restoreHooks)
 
823
        # reset all hooks to an empty instance of the appropriate type
 
824
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
825
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
826
 
 
827
    def _silenceUI(self):
 
828
        """Turn off UI for duration of test"""
 
829
        # by default the UI is off; tests can turn it on if they want it.
 
830
        saved = ui.ui_factory
 
831
        def _restore():
 
832
            ui.ui_factory = saved
 
833
        ui.ui_factory = ui.SilentUIFactory()
 
834
        self.addCleanup(_restore)
 
835
 
 
836
    def _ndiff_strings(self, a, b):
 
837
        """Return ndiff between two strings containing lines.
 
838
        
 
839
        A trailing newline is added if missing to make the strings
 
840
        print properly."""
 
841
        if b and b[-1] != '\n':
 
842
            b += '\n'
 
843
        if a and a[-1] != '\n':
 
844
            a += '\n'
 
845
        difflines = difflib.ndiff(a.splitlines(True),
 
846
                                  b.splitlines(True),
 
847
                                  linejunk=lambda x: False,
 
848
                                  charjunk=lambda x: False)
 
849
        return ''.join(difflines)
 
850
 
 
851
    def assertEqual(self, a, b, message=''):
 
852
        try:
 
853
            if a == b:
 
854
                return
 
855
        except UnicodeError, e:
 
856
            # If we can't compare without getting a UnicodeError, then
 
857
            # obviously they are different
 
858
            mutter('UnicodeError: %s', e)
 
859
        if message:
 
860
            message += '\n'
 
861
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
862
            % (message,
 
863
               pformat(a), pformat(b)))
 
864
 
 
865
    assertEquals = assertEqual
 
866
 
 
867
    def assertEqualDiff(self, a, b, message=None):
 
868
        """Assert two texts are equal, if not raise an exception.
 
869
        
 
870
        This is intended for use with multi-line strings where it can 
 
871
        be hard to find the differences by eye.
 
872
        """
 
873
        # TODO: perhaps override assertEquals to call this for strings?
 
874
        if a == b:
 
875
            return
 
876
        if message is None:
 
877
            message = "texts not equal:\n"
 
878
        raise AssertionError(message +
 
879
                             self._ndiff_strings(a, b))
 
880
        
 
881
    def assertEqualMode(self, mode, mode_test):
 
882
        self.assertEqual(mode, mode_test,
 
883
                         'mode mismatch %o != %o' % (mode, mode_test))
 
884
 
 
885
    def assertPositive(self, val):
 
886
        """Assert that val is greater than 0."""
 
887
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
888
 
 
889
    def assertNegative(self, val):
 
890
        """Assert that val is less than 0."""
 
891
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
892
 
 
893
    def assertStartsWith(self, s, prefix):
 
894
        if not s.startswith(prefix):
 
895
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
896
 
 
897
    def assertEndsWith(self, s, suffix):
 
898
        """Asserts that s ends with suffix."""
 
899
        if not s.endswith(suffix):
 
900
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
901
 
 
902
    def assertContainsRe(self, haystack, needle_re):
 
903
        """Assert that a contains something matching a regular expression."""
 
904
        if not re.search(needle_re, haystack):
 
905
            if '\n' in haystack or len(haystack) > 60:
 
906
                # a long string, format it in a more readable way
 
907
                raise AssertionError(
 
908
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
909
                        % (needle_re, haystack))
 
910
            else:
 
911
                raise AssertionError('pattern "%s" not found in "%s"'
 
912
                        % (needle_re, haystack))
 
913
 
 
914
    def assertNotContainsRe(self, haystack, needle_re):
 
915
        """Assert that a does not match a regular expression"""
 
916
        if re.search(needle_re, haystack):
 
917
            raise AssertionError('pattern "%s" found in "%s"'
 
918
                    % (needle_re, haystack))
 
919
 
 
920
    def assertSubset(self, sublist, superlist):
 
921
        """Assert that every entry in sublist is present in superlist."""
 
922
        missing = set(sublist) - set(superlist)
 
923
        if len(missing) > 0:
 
924
            raise AssertionError("value(s) %r not present in container %r" %
 
925
                                 (missing, superlist))
 
926
 
 
927
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
928
        """Fail unless excClass is raised when the iterator from func is used.
 
929
        
 
930
        Many functions can return generators this makes sure
 
931
        to wrap them in a list() call to make sure the whole generator
 
932
        is run, and that the proper exception is raised.
 
933
        """
 
934
        try:
 
935
            list(func(*args, **kwargs))
 
936
        except excClass:
 
937
            return
 
938
        else:
 
939
            if getattr(excClass,'__name__', None) is not None:
 
940
                excName = excClass.__name__
 
941
            else:
 
942
                excName = str(excClass)
 
943
            raise self.failureException, "%s not raised" % excName
 
944
 
 
945
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
946
        """Assert that a callable raises a particular exception.
 
947
 
 
948
        :param excClass: As for the except statement, this may be either an
 
949
            exception class, or a tuple of classes.
 
950
        :param callableObj: A callable, will be passed ``*args`` and
 
951
            ``**kwargs``.
 
952
 
 
953
        Returns the exception so that you can examine it.
 
954
        """
 
955
        try:
 
956
            callableObj(*args, **kwargs)
 
957
        except excClass, e:
 
958
            return e
 
959
        else:
 
960
            if getattr(excClass,'__name__', None) is not None:
 
961
                excName = excClass.__name__
 
962
            else:
 
963
                # probably a tuple
 
964
                excName = str(excClass)
 
965
            raise self.failureException, "%s not raised" % excName
 
966
 
 
967
    def assertIs(self, left, right, message=None):
 
968
        if not (left is right):
 
969
            if message is not None:
 
970
                raise AssertionError(message)
 
971
            else:
 
972
                raise AssertionError("%r is not %r." % (left, right))
 
973
 
 
974
    def assertIsNot(self, left, right, message=None):
 
975
        if (left is right):
 
976
            if message is not None:
 
977
                raise AssertionError(message)
 
978
            else:
 
979
                raise AssertionError("%r is %r." % (left, right))
 
980
 
 
981
    def assertTransportMode(self, transport, path, mode):
 
982
        """Fail if a path does not have mode mode.
 
983
        
 
984
        If modes are not supported on this transport, the assertion is ignored.
 
985
        """
 
986
        if not transport._can_roundtrip_unix_modebits():
 
987
            return
 
988
        path_stat = transport.stat(path)
 
989
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
990
        self.assertEqual(mode, actual_mode,
 
991
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
992
 
 
993
    def assertIsSameRealPath(self, path1, path2):
 
994
        """Fail if path1 and path2 points to different files"""
 
995
        self.assertEqual(osutils.realpath(path1),
 
996
                         osutils.realpath(path2),
 
997
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
998
 
 
999
    def assertIsInstance(self, obj, kls):
 
1000
        """Fail if obj is not an instance of kls"""
 
1001
        if not isinstance(obj, kls):
 
1002
            self.fail("%r is an instance of %s rather than %s" % (
 
1003
                obj, obj.__class__, kls))
 
1004
 
 
1005
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1006
        """Invoke a test, expecting it to fail for the given reason.
 
1007
 
 
1008
        This is for assertions that ought to succeed, but currently fail.
 
1009
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1010
        about the failure you're expecting.  If a new bug is introduced,
 
1011
        AssertionError should be raised, not KnownFailure.
 
1012
 
 
1013
        Frequently, expectFailure should be followed by an opposite assertion.
 
1014
        See example below.
 
1015
 
 
1016
        Intended to be used with a callable that raises AssertionError as the
 
1017
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1018
 
 
1019
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1020
        test succeeds.
 
1021
 
 
1022
        example usage::
 
1023
 
 
1024
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1025
                             dynamic_val)
 
1026
          self.assertEqual(42, dynamic_val)
 
1027
 
 
1028
          This means that a dynamic_val of 54 will cause the test to raise
 
1029
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1030
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1031
          than 54 or 42 will cause an AssertionError.
 
1032
        """
 
1033
        try:
 
1034
            assertion(*args, **kwargs)
 
1035
        except AssertionError:
 
1036
            raise KnownFailure(reason)
 
1037
        else:
 
1038
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1039
 
 
1040
    def assertFileEqual(self, content, path):
 
1041
        """Fail if path does not contain 'content'."""
 
1042
        self.failUnlessExists(path)
 
1043
        f = file(path, 'rb')
 
1044
        try:
 
1045
            s = f.read()
 
1046
        finally:
 
1047
            f.close()
 
1048
        self.assertEqualDiff(content, s)
 
1049
 
 
1050
    def failUnlessExists(self, path):
 
1051
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1052
        if not isinstance(path, basestring):
 
1053
            for p in path:
 
1054
                self.failUnlessExists(p)
 
1055
        else:
 
1056
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1057
 
 
1058
    def failIfExists(self, path):
 
1059
        """Fail if path or paths, which may be abs or relative, exist."""
 
1060
        if not isinstance(path, basestring):
 
1061
            for p in path:
 
1062
                self.failIfExists(p)
 
1063
        else:
 
1064
            self.failIf(osutils.lexists(path),path+" exists")
 
1065
 
 
1066
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1067
        """A helper for callDeprecated and applyDeprecated.
 
1068
 
 
1069
        :param a_callable: A callable to call.
 
1070
        :param args: The positional arguments for the callable
 
1071
        :param kwargs: The keyword arguments for the callable
 
1072
        :return: A tuple (warnings, result). result is the result of calling
 
1073
            a_callable(``*args``, ``**kwargs``).
 
1074
        """
 
1075
        local_warnings = []
 
1076
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1077
            # we've hooked into a deprecation specific callpath,
 
1078
            # only deprecations should getting sent via it.
 
1079
            self.assertEqual(cls, DeprecationWarning)
 
1080
            local_warnings.append(msg)
 
1081
        original_warning_method = symbol_versioning.warn
 
1082
        symbol_versioning.set_warning_method(capture_warnings)
 
1083
        try:
 
1084
            result = a_callable(*args, **kwargs)
 
1085
        finally:
 
1086
            symbol_versioning.set_warning_method(original_warning_method)
 
1087
        return (local_warnings, result)
 
1088
 
 
1089
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1090
        """Call a deprecated callable without warning the user.
 
1091
 
 
1092
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1093
        not other callers that go direct to the warning module.
 
1094
 
 
1095
        To test that a deprecated method raises an error, do something like
 
1096
        this::
 
1097
 
 
1098
            self.assertRaises(errors.ReservedId,
 
1099
                self.applyDeprecated,
 
1100
                deprecated_in((1, 5, 0)),
 
1101
                br.append_revision,
 
1102
                'current:')
 
1103
 
 
1104
        :param deprecation_format: The deprecation format that the callable
 
1105
            should have been deprecated with. This is the same type as the
 
1106
            parameter to deprecated_method/deprecated_function. If the
 
1107
            callable is not deprecated with this format, an assertion error
 
1108
            will be raised.
 
1109
        :param a_callable: A callable to call. This may be a bound method or
 
1110
            a regular function. It will be called with ``*args`` and
 
1111
            ``**kwargs``.
 
1112
        :param args: The positional arguments for the callable
 
1113
        :param kwargs: The keyword arguments for the callable
 
1114
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1115
        """
 
1116
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1117
            *args, **kwargs)
 
1118
        expected_first_warning = symbol_versioning.deprecation_string(
 
1119
            a_callable, deprecation_format)
 
1120
        if len(call_warnings) == 0:
 
1121
            self.fail("No deprecation warning generated by call to %s" %
 
1122
                a_callable)
 
1123
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1124
        return result
 
1125
 
 
1126
    def callCatchWarnings(self, fn, *args, **kw):
 
1127
        """Call a callable that raises python warnings.
 
1128
 
 
1129
        The caller's responsible for examining the returned warnings.
 
1130
 
 
1131
        If the callable raises an exception, the exception is not
 
1132
        caught and propagates up to the caller.  In that case, the list
 
1133
        of warnings is not available.
 
1134
 
 
1135
        :returns: ([warning_object, ...], fn_result)
 
1136
        """
 
1137
        # XXX: This is not perfect, because it completely overrides the
 
1138
        # warnings filters, and some code may depend on suppressing particular
 
1139
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1140
        # though.  -- Andrew, 20071062
 
1141
        wlist = []
 
1142
        def _catcher(message, category, filename, lineno, file=None):
 
1143
            # despite the name, 'message' is normally(?) a Warning subclass
 
1144
            # instance
 
1145
            wlist.append(message)
 
1146
        saved_showwarning = warnings.showwarning
 
1147
        saved_filters = warnings.filters
 
1148
        try:
 
1149
            warnings.showwarning = _catcher
 
1150
            warnings.filters = []
 
1151
            result = fn(*args, **kw)
 
1152
        finally:
 
1153
            warnings.showwarning = saved_showwarning
 
1154
            warnings.filters = saved_filters
 
1155
        return wlist, result
 
1156
 
 
1157
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1158
        """Assert that a callable is deprecated in a particular way.
 
1159
 
 
1160
        This is a very precise test for unusual requirements. The 
 
1161
        applyDeprecated helper function is probably more suited for most tests
 
1162
        as it allows you to simply specify the deprecation format being used
 
1163
        and will ensure that that is issued for the function being called.
 
1164
 
 
1165
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1166
        not other callers that go direct to the warning module.  To catch
 
1167
        general warnings, use callCatchWarnings.
 
1168
 
 
1169
        :param expected: a list of the deprecation warnings expected, in order
 
1170
        :param callable: The callable to call
 
1171
        :param args: The positional arguments for the callable
 
1172
        :param kwargs: The keyword arguments for the callable
 
1173
        """
 
1174
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1175
            *args, **kwargs)
 
1176
        self.assertEqual(expected, call_warnings)
 
1177
        return result
 
1178
 
 
1179
    def _startLogFile(self):
 
1180
        """Send bzr and test log messages to a temporary file.
 
1181
 
 
1182
        The file is removed as the test is torn down.
 
1183
        """
 
1184
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1185
        self._log_file = os.fdopen(fileno, 'w+')
 
1186
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1187
        self._log_file_name = name
 
1188
        self.addCleanup(self._finishLogFile)
 
1189
 
 
1190
    def _finishLogFile(self):
 
1191
        """Finished with the log file.
 
1192
 
 
1193
        Close the file and delete it, unless setKeepLogfile was called.
 
1194
        """
 
1195
        if self._log_file is None:
 
1196
            return
 
1197
        bzrlib.trace.pop_log_file(self._log_memento)
 
1198
        self._log_file.close()
 
1199
        self._log_file = None
 
1200
        if not self._keep_log_file:
 
1201
            os.remove(self._log_file_name)
 
1202
            self._log_file_name = None
 
1203
 
 
1204
    def setKeepLogfile(self):
 
1205
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1206
        self._keep_log_file = True
 
1207
 
 
1208
    def addCleanup(self, callable):
 
1209
        """Arrange to run a callable when this case is torn down.
 
1210
 
 
1211
        Callables are run in the reverse of the order they are registered, 
 
1212
        ie last-in first-out.
 
1213
        """
 
1214
        if callable in self._cleanups:
 
1215
            raise ValueError("cleanup function %r already registered on %s" 
 
1216
                    % (callable, self))
 
1217
        self._cleanups.append(callable)
 
1218
 
 
1219
    def _cleanEnvironment(self):
 
1220
        new_env = {
 
1221
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1222
            'HOME': os.getcwd(),
 
1223
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1224
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1225
            'BZR_EMAIL': None,
 
1226
            'BZREMAIL': None, # may still be present in the environment
 
1227
            'EMAIL': None,
 
1228
            'BZR_PROGRESS_BAR': None,
 
1229
            'BZR_LOG': None,
 
1230
            # SSH Agent
 
1231
            'SSH_AUTH_SOCK': None,
 
1232
            # Proxies
 
1233
            'http_proxy': None,
 
1234
            'HTTP_PROXY': None,
 
1235
            'https_proxy': None,
 
1236
            'HTTPS_PROXY': None,
 
1237
            'no_proxy': None,
 
1238
            'NO_PROXY': None,
 
1239
            'all_proxy': None,
 
1240
            'ALL_PROXY': None,
 
1241
            # Nobody cares about these ones AFAIK. So far at
 
1242
            # least. If you do (care), please update this comment
 
1243
            # -- vila 20061212
 
1244
            'ftp_proxy': None,
 
1245
            'FTP_PROXY': None,
 
1246
            'BZR_REMOTE_PATH': None,
 
1247
        }
 
1248
        self.__old_env = {}
 
1249
        self.addCleanup(self._restoreEnvironment)
 
1250
        for name, value in new_env.iteritems():
 
1251
            self._captureVar(name, value)
 
1252
 
 
1253
    def _captureVar(self, name, newvalue):
 
1254
        """Set an environment variable, and reset it when finished."""
 
1255
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1256
 
 
1257
    def _restore_debug_flags(self):
 
1258
        debug.debug_flags.clear()
 
1259
        debug.debug_flags.update(self._preserved_debug_flags)
 
1260
 
 
1261
    def _restoreEnvironment(self):
 
1262
        for name, value in self.__old_env.iteritems():
 
1263
            osutils.set_or_unset_env(name, value)
 
1264
 
 
1265
    def _restoreHooks(self):
 
1266
        for klass, hooks in self._preserved_hooks.items():
 
1267
            setattr(klass, 'hooks', hooks)
 
1268
 
 
1269
    def knownFailure(self, reason):
 
1270
        """This test has failed for some known reason."""
 
1271
        raise KnownFailure(reason)
 
1272
 
 
1273
    def run(self, result=None):
 
1274
        if result is None: result = self.defaultTestResult()
 
1275
        for feature in getattr(self, '_test_needs_features', []):
 
1276
            if not feature.available():
 
1277
                result.startTest(self)
 
1278
                if getattr(result, 'addNotSupported', None):
 
1279
                    result.addNotSupported(self, feature)
 
1280
                else:
 
1281
                    result.addSuccess(self)
 
1282
                result.stopTest(self)
 
1283
                return
 
1284
        try:
 
1285
            return unittest.TestCase.run(self, result)
 
1286
        finally:
 
1287
            saved_attrs = {}
 
1288
            absent_attr = object()
 
1289
            for attr_name in self.attrs_to_keep:
 
1290
                attr = getattr(self, attr_name, absent_attr)
 
1291
                if attr is not absent_attr:
 
1292
                    saved_attrs[attr_name] = attr
 
1293
            self.__dict__ = saved_attrs
 
1294
 
 
1295
    def tearDown(self):
 
1296
        self._runCleanups()
 
1297
        unittest.TestCase.tearDown(self)
 
1298
 
 
1299
    def time(self, callable, *args, **kwargs):
 
1300
        """Run callable and accrue the time it takes to the benchmark time.
 
1301
        
 
1302
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1303
        this will cause lsprofile statistics to be gathered and stored in
 
1304
        self._benchcalls.
 
1305
        """
 
1306
        if self._benchtime is None:
 
1307
            self._benchtime = 0
 
1308
        start = time.time()
 
1309
        try:
 
1310
            if not self._gather_lsprof_in_benchmarks:
 
1311
                return callable(*args, **kwargs)
 
1312
            else:
 
1313
                # record this benchmark
 
1314
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1315
                stats.sort()
 
1316
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1317
                return ret
 
1318
        finally:
 
1319
            self._benchtime += time.time() - start
 
1320
 
 
1321
    def _runCleanups(self):
 
1322
        """Run registered cleanup functions. 
 
1323
 
 
1324
        This should only be called from TestCase.tearDown.
 
1325
        """
 
1326
        # TODO: Perhaps this should keep running cleanups even if 
 
1327
        # one of them fails?
 
1328
 
 
1329
        # Actually pop the cleanups from the list so tearDown running
 
1330
        # twice is safe (this happens for skipped tests).
 
1331
        while self._cleanups:
 
1332
            self._cleanups.pop()()
 
1333
 
 
1334
    def log(self, *args):
 
1335
        mutter(*args)
 
1336
 
 
1337
    def _get_log(self, keep_log_file=False):
 
1338
        """Get the log from bzrlib.trace calls from this test.
 
1339
 
 
1340
        :param keep_log_file: When True, if the log is still a file on disk
 
1341
            leave it as a file on disk. When False, if the log is still a file
 
1342
            on disk, the log file is deleted and the log preserved as
 
1343
            self._log_contents.
 
1344
        :return: A string containing the log.
 
1345
        """
 
1346
        # flush the log file, to get all content
 
1347
        import bzrlib.trace
 
1348
        bzrlib.trace._trace_file.flush()
 
1349
        if self._log_contents:
 
1350
            # XXX: this can hardly contain the content flushed above --vila
 
1351
            # 20080128
 
1352
            return self._log_contents
 
1353
        if self._log_file_name is not None:
 
1354
            logfile = open(self._log_file_name)
 
1355
            try:
 
1356
                log_contents = logfile.read()
 
1357
            finally:
 
1358
                logfile.close()
 
1359
            if not keep_log_file:
 
1360
                self._log_contents = log_contents
 
1361
                try:
 
1362
                    os.remove(self._log_file_name)
 
1363
                except OSError, e:
 
1364
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1365
                        sys.stderr.write(('Unable to delete log file '
 
1366
                                             ' %r\n' % self._log_file_name))
 
1367
                    else:
 
1368
                        raise
 
1369
            return log_contents
 
1370
        else:
 
1371
            return "DELETED log file to reduce memory footprint"
 
1372
 
 
1373
    def requireFeature(self, feature):
 
1374
        """This test requires a specific feature is available.
 
1375
 
 
1376
        :raises UnavailableFeature: When feature is not available.
 
1377
        """
 
1378
        if not feature.available():
 
1379
            raise UnavailableFeature(feature)
 
1380
 
 
1381
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1382
            working_dir):
 
1383
        """Run bazaar command line, splitting up a string command line."""
 
1384
        if isinstance(args, basestring):
 
1385
            # shlex don't understand unicode strings,
 
1386
            # so args should be plain string (bialix 20070906)
 
1387
            args = list(shlex.split(str(args)))
 
1388
        return self._run_bzr_core(args, retcode=retcode,
 
1389
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1390
                )
 
1391
 
 
1392
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1393
            working_dir):
 
1394
        if encoding is None:
 
1395
            encoding = bzrlib.user_encoding
 
1396
        stdout = StringIOWrapper()
 
1397
        stderr = StringIOWrapper()
 
1398
        stdout.encoding = encoding
 
1399
        stderr.encoding = encoding
 
1400
 
 
1401
        self.log('run bzr: %r', args)
 
1402
        # FIXME: don't call into logging here
 
1403
        handler = logging.StreamHandler(stderr)
 
1404
        handler.setLevel(logging.INFO)
 
1405
        logger = logging.getLogger('')
 
1406
        logger.addHandler(handler)
 
1407
        old_ui_factory = ui.ui_factory
 
1408
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1409
 
 
1410
        cwd = None
 
1411
        if working_dir is not None:
 
1412
            cwd = osutils.getcwd()
 
1413
            os.chdir(working_dir)
 
1414
 
 
1415
        try:
 
1416
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1417
                stdout, stderr,
 
1418
                bzrlib.commands.run_bzr_catch_user_errors,
 
1419
                args)
 
1420
        finally:
 
1421
            logger.removeHandler(handler)
 
1422
            ui.ui_factory = old_ui_factory
 
1423
            if cwd is not None:
 
1424
                os.chdir(cwd)
 
1425
 
 
1426
        out = stdout.getvalue()
 
1427
        err = stderr.getvalue()
 
1428
        if out:
 
1429
            self.log('output:\n%r', out)
 
1430
        if err:
 
1431
            self.log('errors:\n%r', err)
 
1432
        if retcode is not None:
 
1433
            self.assertEquals(retcode, result,
 
1434
                              message='Unexpected return code')
 
1435
        return out, err
 
1436
 
 
1437
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1438
                working_dir=None, error_regexes=[], output_encoding=None):
 
1439
        """Invoke bzr, as if it were run from the command line.
 
1440
 
 
1441
        The argument list should not include the bzr program name - the
 
1442
        first argument is normally the bzr command.  Arguments may be
 
1443
        passed in three ways:
 
1444
 
 
1445
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1446
        when the command contains whitespace or metacharacters, or 
 
1447
        is built up at run time.
 
1448
 
 
1449
        2- A single string, eg "add a".  This is the most convenient 
 
1450
        for hardcoded commands.
 
1451
 
 
1452
        This runs bzr through the interface that catches and reports
 
1453
        errors, and with logging set to something approximating the
 
1454
        default, so that error reporting can be checked.
 
1455
 
 
1456
        This should be the main method for tests that want to exercise the
 
1457
        overall behavior of the bzr application (rather than a unit test
 
1458
        or a functional test of the library.)
 
1459
 
 
1460
        This sends the stdout/stderr results into the test's log,
 
1461
        where it may be useful for debugging.  See also run_captured.
 
1462
 
 
1463
        :keyword stdin: A string to be used as stdin for the command.
 
1464
        :keyword retcode: The status code the command should return;
 
1465
            default 0.
 
1466
        :keyword working_dir: The directory to run the command in
 
1467
        :keyword error_regexes: A list of expected error messages.  If
 
1468
            specified they must be seen in the error output of the command.
 
1469
        """
 
1470
        out, err = self._run_bzr_autosplit(
 
1471
            args=args,
 
1472
            retcode=retcode,
 
1473
            encoding=encoding,
 
1474
            stdin=stdin,
 
1475
            working_dir=working_dir,
 
1476
            )
 
1477
        for regex in error_regexes:
 
1478
            self.assertContainsRe(err, regex)
 
1479
        return out, err
 
1480
 
 
1481
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1482
        """Run bzr, and check that stderr contains the supplied regexes
 
1483
 
 
1484
        :param error_regexes: Sequence of regular expressions which
 
1485
            must each be found in the error output. The relative ordering
 
1486
            is not enforced.
 
1487
        :param args: command-line arguments for bzr
 
1488
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1489
            This function changes the default value of retcode to be 3,
 
1490
            since in most cases this is run when you expect bzr to fail.
 
1491
 
 
1492
        :return: (out, err) The actual output of running the command (in case
 
1493
            you want to do more inspection)
 
1494
 
 
1495
        Examples of use::
 
1496
 
 
1497
            # Make sure that commit is failing because there is nothing to do
 
1498
            self.run_bzr_error(['no changes to commit'],
 
1499
                               ['commit', '-m', 'my commit comment'])
 
1500
            # Make sure --strict is handling an unknown file, rather than
 
1501
            # giving us the 'nothing to do' error
 
1502
            self.build_tree(['unknown'])
 
1503
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1504
                               ['commit', --strict', '-m', 'my commit comment'])
 
1505
        """
 
1506
        kwargs.setdefault('retcode', 3)
 
1507
        kwargs['error_regexes'] = error_regexes
 
1508
        out, err = self.run_bzr(*args, **kwargs)
 
1509
        return out, err
 
1510
 
 
1511
    def run_bzr_subprocess(self, *args, **kwargs):
 
1512
        """Run bzr in a subprocess for testing.
 
1513
 
 
1514
        This starts a new Python interpreter and runs bzr in there. 
 
1515
        This should only be used for tests that have a justifiable need for
 
1516
        this isolation: e.g. they are testing startup time, or signal
 
1517
        handling, or early startup code, etc.  Subprocess code can't be 
 
1518
        profiled or debugged so easily.
 
1519
 
 
1520
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1521
            None is supplied, the status code is not checked.
 
1522
        :keyword env_changes: A dictionary which lists changes to environment
 
1523
            variables. A value of None will unset the env variable.
 
1524
            The values must be strings. The change will only occur in the
 
1525
            child, so you don't need to fix the environment after running.
 
1526
        :keyword universal_newlines: Convert CRLF => LF
 
1527
        :keyword allow_plugins: By default the subprocess is run with
 
1528
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1529
            for system-wide plugins to create unexpected output on stderr,
 
1530
            which can cause unnecessary test failures.
 
1531
        """
 
1532
        env_changes = kwargs.get('env_changes', {})
 
1533
        working_dir = kwargs.get('working_dir', None)
 
1534
        allow_plugins = kwargs.get('allow_plugins', False)
 
1535
        if len(args) == 1:
 
1536
            if isinstance(args[0], list):
 
1537
                args = args[0]
 
1538
            elif isinstance(args[0], basestring):
 
1539
                args = list(shlex.split(args[0]))
 
1540
        else:
 
1541
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1542
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1543
                                            working_dir=working_dir,
 
1544
                                            allow_plugins=allow_plugins)
 
1545
        # We distinguish between retcode=None and retcode not passed.
 
1546
        supplied_retcode = kwargs.get('retcode', 0)
 
1547
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1548
            universal_newlines=kwargs.get('universal_newlines', False),
 
1549
            process_args=args)
 
1550
 
 
1551
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1552
                             skip_if_plan_to_signal=False,
 
1553
                             working_dir=None,
 
1554
                             allow_plugins=False):
 
1555
        """Start bzr in a subprocess for testing.
 
1556
 
 
1557
        This starts a new Python interpreter and runs bzr in there.
 
1558
        This should only be used for tests that have a justifiable need for
 
1559
        this isolation: e.g. they are testing startup time, or signal
 
1560
        handling, or early startup code, etc.  Subprocess code can't be
 
1561
        profiled or debugged so easily.
 
1562
 
 
1563
        :param process_args: a list of arguments to pass to the bzr executable,
 
1564
            for example ``['--version']``.
 
1565
        :param env_changes: A dictionary which lists changes to environment
 
1566
            variables. A value of None will unset the env variable.
 
1567
            The values must be strings. The change will only occur in the
 
1568
            child, so you don't need to fix the environment after running.
 
1569
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1570
            is not available.
 
1571
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1572
 
 
1573
        :returns: Popen object for the started process.
 
1574
        """
 
1575
        if skip_if_plan_to_signal:
 
1576
            if not getattr(os, 'kill', None):
 
1577
                raise TestSkipped("os.kill not available.")
 
1578
 
 
1579
        if env_changes is None:
 
1580
            env_changes = {}
 
1581
        old_env = {}
 
1582
 
 
1583
        def cleanup_environment():
 
1584
            for env_var, value in env_changes.iteritems():
 
1585
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1586
 
 
1587
        def restore_environment():
 
1588
            for env_var, value in old_env.iteritems():
 
1589
                osutils.set_or_unset_env(env_var, value)
 
1590
 
 
1591
        bzr_path = self.get_bzr_path()
 
1592
 
 
1593
        cwd = None
 
1594
        if working_dir is not None:
 
1595
            cwd = osutils.getcwd()
 
1596
            os.chdir(working_dir)
 
1597
 
 
1598
        try:
 
1599
            # win32 subprocess doesn't support preexec_fn
 
1600
            # so we will avoid using it on all platforms, just to
 
1601
            # make sure the code path is used, and we don't break on win32
 
1602
            cleanup_environment()
 
1603
            command = [sys.executable, bzr_path]
 
1604
            if not allow_plugins:
 
1605
                command.append('--no-plugins')
 
1606
            command.extend(process_args)
 
1607
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1608
        finally:
 
1609
            restore_environment()
 
1610
            if cwd is not None:
 
1611
                os.chdir(cwd)
 
1612
 
 
1613
        return process
 
1614
 
 
1615
    def _popen(self, *args, **kwargs):
 
1616
        """Place a call to Popen.
 
1617
 
 
1618
        Allows tests to override this method to intercept the calls made to
 
1619
        Popen for introspection.
 
1620
        """
 
1621
        return Popen(*args, **kwargs)
 
1622
 
 
1623
    def get_bzr_path(self):
 
1624
        """Return the path of the 'bzr' executable for this test suite."""
 
1625
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1626
        if not os.path.isfile(bzr_path):
 
1627
            # We are probably installed. Assume sys.argv is the right file
 
1628
            bzr_path = sys.argv[0]
 
1629
        return bzr_path
 
1630
 
 
1631
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1632
                              universal_newlines=False, process_args=None):
 
1633
        """Finish the execution of process.
 
1634
 
 
1635
        :param process: the Popen object returned from start_bzr_subprocess.
 
1636
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1637
            None is supplied, the status code is not checked.
 
1638
        :param send_signal: an optional signal to send to the process.
 
1639
        :param universal_newlines: Convert CRLF => LF
 
1640
        :returns: (stdout, stderr)
 
1641
        """
 
1642
        if send_signal is not None:
 
1643
            os.kill(process.pid, send_signal)
 
1644
        out, err = process.communicate()
 
1645
 
 
1646
        if universal_newlines:
 
1647
            out = out.replace('\r\n', '\n')
 
1648
            err = err.replace('\r\n', '\n')
 
1649
 
 
1650
        if retcode is not None and retcode != process.returncode:
 
1651
            if process_args is None:
 
1652
                process_args = "(unknown args)"
 
1653
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1654
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1655
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1656
                      % (process_args, retcode, process.returncode))
 
1657
        return [out, err]
 
1658
 
 
1659
    def check_inventory_shape(self, inv, shape):
 
1660
        """Compare an inventory to a list of expected names.
 
1661
 
 
1662
        Fail if they are not precisely equal.
 
1663
        """
 
1664
        extras = []
 
1665
        shape = list(shape)             # copy
 
1666
        for path, ie in inv.entries():
 
1667
            name = path.replace('\\', '/')
 
1668
            if ie.kind == 'directory':
 
1669
                name = name + '/'
 
1670
            if name in shape:
 
1671
                shape.remove(name)
 
1672
            else:
 
1673
                extras.append(name)
 
1674
        if shape:
 
1675
            self.fail("expected paths not found in inventory: %r" % shape)
 
1676
        if extras:
 
1677
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1678
 
 
1679
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1680
                         a_callable=None, *args, **kwargs):
 
1681
        """Call callable with redirected std io pipes.
 
1682
 
 
1683
        Returns the return code."""
 
1684
        if not callable(a_callable):
 
1685
            raise ValueError("a_callable must be callable.")
 
1686
        if stdin is None:
 
1687
            stdin = StringIO("")
 
1688
        if stdout is None:
 
1689
            if getattr(self, "_log_file", None) is not None:
 
1690
                stdout = self._log_file
 
1691
            else:
 
1692
                stdout = StringIO()
 
1693
        if stderr is None:
 
1694
            if getattr(self, "_log_file", None is not None):
 
1695
                stderr = self._log_file
 
1696
            else:
 
1697
                stderr = StringIO()
 
1698
        real_stdin = sys.stdin
 
1699
        real_stdout = sys.stdout
 
1700
        real_stderr = sys.stderr
 
1701
        try:
 
1702
            sys.stdout = stdout
 
1703
            sys.stderr = stderr
 
1704
            sys.stdin = stdin
 
1705
            return a_callable(*args, **kwargs)
 
1706
        finally:
 
1707
            sys.stdout = real_stdout
 
1708
            sys.stderr = real_stderr
 
1709
            sys.stdin = real_stdin
 
1710
 
 
1711
    def reduceLockdirTimeout(self):
 
1712
        """Reduce the default lock timeout for the duration of the test, so that
 
1713
        if LockContention occurs during a test, it does so quickly.
 
1714
 
 
1715
        Tests that expect to provoke LockContention errors should call this.
 
1716
        """
 
1717
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1718
        def resetTimeout():
 
1719
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1720
        self.addCleanup(resetTimeout)
 
1721
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1722
 
 
1723
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1724
        """Return a StringIOWrapper instance, that will encode Unicode
 
1725
        input to UTF-8.
 
1726
        """
 
1727
        if encoding_type is None:
 
1728
            encoding_type = 'strict'
 
1729
        sio = StringIO()
 
1730
        output_encoding = 'utf-8'
 
1731
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1732
        sio.encoding = output_encoding
 
1733
        return sio
 
1734
 
 
1735
 
 
1736
class TestCaseWithMemoryTransport(TestCase):
 
1737
    """Common test class for tests that do not need disk resources.
 
1738
 
 
1739
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1740
 
 
1741
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1742
 
 
1743
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1744
    a directory which does not exist. This serves to help ensure test isolation
 
1745
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1746
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1747
    file defaults for the transport in tests, nor does it obey the command line
 
1748
    override, so tests that accidentally write to the common directory should
 
1749
    be rare.
 
1750
 
 
1751
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1752
    a .bzr directory that stops us ascending higher into the filesystem.
 
1753
    """
 
1754
 
 
1755
    TEST_ROOT = None
 
1756
    _TEST_NAME = 'test'
 
1757
 
 
1758
    def __init__(self, methodName='runTest'):
 
1759
        # allow test parameterization after test construction and before test
 
1760
        # execution. Variables that the parameterizer sets need to be 
 
1761
        # ones that are not set by setUp, or setUp will trash them.
 
1762
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1763
        self.vfs_transport_factory = default_transport
 
1764
        self.transport_server = None
 
1765
        self.transport_readonly_server = None
 
1766
        self.__vfs_server = None
 
1767
 
 
1768
    def get_transport(self, relpath=None):
 
1769
        """Return a writeable transport.
 
1770
 
 
1771
        This transport is for the test scratch space relative to
 
1772
        "self._test_root"
 
1773
        
 
1774
        :param relpath: a path relative to the base url.
 
1775
        """
 
1776
        t = get_transport(self.get_url(relpath))
 
1777
        self.assertFalse(t.is_readonly())
 
1778
        return t
 
1779
 
 
1780
    def get_readonly_transport(self, relpath=None):
 
1781
        """Return a readonly transport for the test scratch space
 
1782
        
 
1783
        This can be used to test that operations which should only need
 
1784
        readonly access in fact do not try to write.
 
1785
 
 
1786
        :param relpath: a path relative to the base url.
 
1787
        """
 
1788
        t = get_transport(self.get_readonly_url(relpath))
 
1789
        self.assertTrue(t.is_readonly())
 
1790
        return t
 
1791
 
 
1792
    def create_transport_readonly_server(self):
 
1793
        """Create a transport server from class defined at init.
 
1794
 
 
1795
        This is mostly a hook for daughter classes.
 
1796
        """
 
1797
        return self.transport_readonly_server()
 
1798
 
 
1799
    def get_readonly_server(self):
 
1800
        """Get the server instance for the readonly transport
 
1801
 
 
1802
        This is useful for some tests with specific servers to do diagnostics.
 
1803
        """
 
1804
        if self.__readonly_server is None:
 
1805
            if self.transport_readonly_server is None:
 
1806
                # readonly decorator requested
 
1807
                # bring up the server
 
1808
                self.__readonly_server = ReadonlyServer()
 
1809
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1810
            else:
 
1811
                self.__readonly_server = self.create_transport_readonly_server()
 
1812
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1813
            self.addCleanup(self.__readonly_server.tearDown)
 
1814
        return self.__readonly_server
 
1815
 
 
1816
    def get_readonly_url(self, relpath=None):
 
1817
        """Get a URL for the readonly transport.
 
1818
 
 
1819
        This will either be backed by '.' or a decorator to the transport 
 
1820
        used by self.get_url()
 
1821
        relpath provides for clients to get a path relative to the base url.
 
1822
        These should only be downwards relative, not upwards.
 
1823
        """
 
1824
        base = self.get_readonly_server().get_url()
 
1825
        return self._adjust_url(base, relpath)
 
1826
 
 
1827
    def get_vfs_only_server(self):
 
1828
        """Get the vfs only read/write server instance.
 
1829
 
 
1830
        This is useful for some tests with specific servers that need
 
1831
        diagnostics.
 
1832
 
 
1833
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1834
        is no means to override it.
 
1835
        """
 
1836
        if self.__vfs_server is None:
 
1837
            self.__vfs_server = MemoryServer()
 
1838
            self.__vfs_server.setUp()
 
1839
            self.addCleanup(self.__vfs_server.tearDown)
 
1840
        return self.__vfs_server
 
1841
 
 
1842
    def get_server(self):
 
1843
        """Get the read/write server instance.
 
1844
 
 
1845
        This is useful for some tests with specific servers that need
 
1846
        diagnostics.
 
1847
 
 
1848
        This is built from the self.transport_server factory. If that is None,
 
1849
        then the self.get_vfs_server is returned.
 
1850
        """
 
1851
        if self.__server is None:
 
1852
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1853
                return self.get_vfs_only_server()
 
1854
            else:
 
1855
                # bring up a decorated means of access to the vfs only server.
 
1856
                self.__server = self.transport_server()
 
1857
                try:
 
1858
                    self.__server.setUp(self.get_vfs_only_server())
 
1859
                except TypeError, e:
 
1860
                    # This should never happen; the try:Except here is to assist
 
1861
                    # developers having to update code rather than seeing an
 
1862
                    # uninformative TypeError.
 
1863
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1864
            self.addCleanup(self.__server.tearDown)
 
1865
        return self.__server
 
1866
 
 
1867
    def _adjust_url(self, base, relpath):
 
1868
        """Get a URL (or maybe a path) for the readwrite transport.
 
1869
 
 
1870
        This will either be backed by '.' or to an equivalent non-file based
 
1871
        facility.
 
1872
        relpath provides for clients to get a path relative to the base url.
 
1873
        These should only be downwards relative, not upwards.
 
1874
        """
 
1875
        if relpath is not None and relpath != '.':
 
1876
            if not base.endswith('/'):
 
1877
                base = base + '/'
 
1878
            # XXX: Really base should be a url; we did after all call
 
1879
            # get_url()!  But sometimes it's just a path (from
 
1880
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1881
            # to a non-escaped local path.
 
1882
            if base.startswith('./') or base.startswith('/'):
 
1883
                base += relpath
 
1884
            else:
 
1885
                base += urlutils.escape(relpath)
 
1886
        return base
 
1887
 
 
1888
    def get_url(self, relpath=None):
 
1889
        """Get a URL (or maybe a path) for the readwrite transport.
 
1890
 
 
1891
        This will either be backed by '.' or to an equivalent non-file based
 
1892
        facility.
 
1893
        relpath provides for clients to get a path relative to the base url.
 
1894
        These should only be downwards relative, not upwards.
 
1895
        """
 
1896
        base = self.get_server().get_url()
 
1897
        return self._adjust_url(base, relpath)
 
1898
 
 
1899
    def get_vfs_only_url(self, relpath=None):
 
1900
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1901
 
 
1902
        This will never be a smart protocol.  It always has all the
 
1903
        capabilities of the local filesystem, but it might actually be a
 
1904
        MemoryTransport or some other similar virtual filesystem.
 
1905
 
 
1906
        This is the backing transport (if any) of the server returned by
 
1907
        get_url and get_readonly_url.
 
1908
 
 
1909
        :param relpath: provides for clients to get a path relative to the base
 
1910
            url.  These should only be downwards relative, not upwards.
 
1911
        :return: A URL
 
1912
        """
 
1913
        base = self.get_vfs_only_server().get_url()
 
1914
        return self._adjust_url(base, relpath)
 
1915
 
 
1916
    def _create_safety_net(self):
 
1917
        """Make a fake bzr directory.
 
1918
 
 
1919
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
1920
        real branch.
 
1921
        """
 
1922
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
1923
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1924
 
 
1925
    def _check_safety_net(self):
 
1926
        """Check that the safety .bzr directory have not been touched.
 
1927
 
 
1928
        _make_test_root have created a .bzr directory to prevent tests from
 
1929
        propagating. This method ensures than a test did not leaked.
 
1930
        """
 
1931
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
1932
        wt = workingtree.WorkingTree.open(root)
 
1933
        last_rev = wt.last_revision()
 
1934
        if last_rev != 'null:':
 
1935
            # The current test have modified the /bzr directory, we need to
 
1936
            # recreate a new one or all the followng tests will fail.
 
1937
            # If you need to inspect its content uncomment the following line
 
1938
            # import pdb; pdb.set_trace()
 
1939
            _rmtree_temp_dir(root + '/.bzr')
 
1940
            self._create_safety_net()
 
1941
            raise AssertionError('%s/.bzr should not be modified' % root)
 
1942
 
 
1943
    def _make_test_root(self):
 
1944
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
1945
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1946
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
1947
 
 
1948
            self._create_safety_net()
 
1949
 
 
1950
            # The same directory is used by all tests, and we're not
 
1951
            # specifically told when all tests are finished.  This will do.
 
1952
            atexit.register(_rmtree_temp_dir, root)
 
1953
 
 
1954
        self.addCleanup(self._check_safety_net)
 
1955
 
 
1956
    def makeAndChdirToTestDir(self):
 
1957
        """Create a temporary directories for this one test.
 
1958
        
 
1959
        This must set self.test_home_dir and self.test_dir and chdir to
 
1960
        self.test_dir.
 
1961
        
 
1962
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1963
        """
 
1964
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1965
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1966
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1967
        
 
1968
    def make_branch(self, relpath, format=None):
 
1969
        """Create a branch on the transport at relpath."""
 
1970
        repo = self.make_repository(relpath, format=format)
 
1971
        return repo.bzrdir.create_branch()
 
1972
 
 
1973
    def make_bzrdir(self, relpath, format=None):
 
1974
        try:
 
1975
            # might be a relative or absolute path
 
1976
            maybe_a_url = self.get_url(relpath)
 
1977
            segments = maybe_a_url.rsplit('/', 1)
 
1978
            t = get_transport(maybe_a_url)
 
1979
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1980
                t.ensure_base()
 
1981
            if format is None:
 
1982
                format = 'default'
 
1983
            if isinstance(format, basestring):
 
1984
                format = bzrdir.format_registry.make_bzrdir(format)
 
1985
            return format.initialize_on_transport(t)
 
1986
        except errors.UninitializableFormat:
 
1987
            raise TestSkipped("Format %s is not initializable." % format)
 
1988
 
 
1989
    def make_repository(self, relpath, shared=False, format=None):
 
1990
        """Create a repository on our default transport at relpath.
 
1991
        
 
1992
        Note that relpath must be a relative path, not a full url.
 
1993
        """
 
1994
        # FIXME: If you create a remoterepository this returns the underlying
 
1995
        # real format, which is incorrect.  Actually we should make sure that 
 
1996
        # RemoteBzrDir returns a RemoteRepository.
 
1997
        # maybe  mbp 20070410
 
1998
        made_control = self.make_bzrdir(relpath, format=format)
 
1999
        return made_control.create_repository(shared=shared)
 
2000
 
 
2001
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2002
        """Create a branch on the default transport and a MemoryTree for it."""
 
2003
        b = self.make_branch(relpath, format=format)
 
2004
        return memorytree.MemoryTree.create_on_branch(b)
 
2005
 
 
2006
    def overrideEnvironmentForTesting(self):
 
2007
        os.environ['HOME'] = self.test_home_dir
 
2008
        os.environ['BZR_HOME'] = self.test_home_dir
 
2009
        
 
2010
    def setUp(self):
 
2011
        super(TestCaseWithMemoryTransport, self).setUp()
 
2012
        self._make_test_root()
 
2013
        _currentdir = os.getcwdu()
 
2014
        def _leaveDirectory():
 
2015
            os.chdir(_currentdir)
 
2016
        self.addCleanup(_leaveDirectory)
 
2017
        self.makeAndChdirToTestDir()
 
2018
        self.overrideEnvironmentForTesting()
 
2019
        self.__readonly_server = None
 
2020
        self.__server = None
 
2021
        self.reduceLockdirTimeout()
 
2022
 
 
2023
     
 
2024
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2025
    """Derived class that runs a test within a temporary directory.
 
2026
 
 
2027
    This is useful for tests that need to create a branch, etc.
 
2028
 
 
2029
    The directory is created in a slightly complex way: for each
 
2030
    Python invocation, a new temporary top-level directory is created.
 
2031
    All test cases create their own directory within that.  If the
 
2032
    tests complete successfully, the directory is removed.
 
2033
 
 
2034
    :ivar test_base_dir: The path of the top-level directory for this 
 
2035
    test, which contains a home directory and a work directory.
 
2036
 
 
2037
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2038
    which is used as $HOME for this test.
 
2039
 
 
2040
    :ivar test_dir: A directory under test_base_dir used as the current
 
2041
    directory when the test proper is run.
 
2042
    """
 
2043
 
 
2044
    OVERRIDE_PYTHON = 'python'
 
2045
 
 
2046
    def check_file_contents(self, filename, expect):
 
2047
        self.log("check contents of file %s" % filename)
 
2048
        contents = file(filename, 'r').read()
 
2049
        if contents != expect:
 
2050
            self.log("expected: %r" % expect)
 
2051
            self.log("actually: %r" % contents)
 
2052
            self.fail("contents of %s not as expected" % filename)
 
2053
 
 
2054
    def makeAndChdirToTestDir(self):
 
2055
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2056
        
 
2057
        For TestCaseInTempDir we create a temporary directory based on the test
 
2058
        name and then create two subdirs - test and home under it.
 
2059
        """
 
2060
        # create a directory within the top level test directory
 
2061
        candidate_dir = osutils.mkdtemp(dir=self.TEST_ROOT)
 
2062
        # now create test and home directories within this dir
 
2063
        self.test_base_dir = candidate_dir
 
2064
        self.test_home_dir = self.test_base_dir + '/home'
 
2065
        os.mkdir(self.test_home_dir)
 
2066
        self.test_dir = self.test_base_dir + '/work'
 
2067
        os.mkdir(self.test_dir)
 
2068
        os.chdir(self.test_dir)
 
2069
        # put name of test inside
 
2070
        f = file(self.test_base_dir + '/name', 'w')
 
2071
        try:
 
2072
            f.write(self.id())
 
2073
        finally:
 
2074
            f.close()
 
2075
        self.addCleanup(self.deleteTestDir)
 
2076
 
 
2077
    def deleteTestDir(self):
 
2078
        os.chdir(self.TEST_ROOT)
 
2079
        _rmtree_temp_dir(self.test_base_dir)
 
2080
 
 
2081
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2082
        """Build a test tree according to a pattern.
 
2083
 
 
2084
        shape is a sequence of file specifications.  If the final
 
2085
        character is '/', a directory is created.
 
2086
 
 
2087
        This assumes that all the elements in the tree being built are new.
 
2088
 
 
2089
        This doesn't add anything to a branch.
 
2090
 
 
2091
        :type shape:    list or tuple.
 
2092
        :param line_endings: Either 'binary' or 'native'
 
2093
            in binary mode, exact contents are written in native mode, the
 
2094
            line endings match the default platform endings.
 
2095
        :param transport: A transport to write to, for building trees on VFS's.
 
2096
            If the transport is readonly or None, "." is opened automatically.
 
2097
        :return: None
 
2098
        """
 
2099
        if type(shape) not in (list, tuple):
 
2100
            raise AssertionError("Parameter 'shape' should be "
 
2101
                "a list or a tuple. Got %r instead" % (shape,))
 
2102
        # It's OK to just create them using forward slashes on windows.
 
2103
        if transport is None or transport.is_readonly():
 
2104
            transport = get_transport(".")
 
2105
        for name in shape:
 
2106
            self.assert_(isinstance(name, basestring))
 
2107
            if name[-1] == '/':
 
2108
                transport.mkdir(urlutils.escape(name[:-1]))
 
2109
            else:
 
2110
                if line_endings == 'binary':
 
2111
                    end = '\n'
 
2112
                elif line_endings == 'native':
 
2113
                    end = os.linesep
 
2114
                else:
 
2115
                    raise errors.BzrError(
 
2116
                        'Invalid line ending request %r' % line_endings)
 
2117
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2118
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2119
 
 
2120
    def build_tree_contents(self, shape):
 
2121
        build_tree_contents(shape)
 
2122
 
 
2123
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2124
        """Assert whether path or paths are in the WorkingTree"""
 
2125
        if tree is None:
 
2126
            tree = workingtree.WorkingTree.open(root_path)
 
2127
        if not isinstance(path, basestring):
 
2128
            for p in path:
 
2129
                self.assertInWorkingTree(p,tree=tree)
 
2130
        else:
 
2131
            self.assertIsNot(tree.path2id(path), None,
 
2132
                path+' not in working tree.')
 
2133
 
 
2134
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2135
        """Assert whether path or paths are not in the WorkingTree"""
 
2136
        if tree is None:
 
2137
            tree = workingtree.WorkingTree.open(root_path)
 
2138
        if not isinstance(path, basestring):
 
2139
            for p in path:
 
2140
                self.assertNotInWorkingTree(p,tree=tree)
 
2141
        else:
 
2142
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2143
 
 
2144
 
 
2145
class TestCaseWithTransport(TestCaseInTempDir):
 
2146
    """A test case that provides get_url and get_readonly_url facilities.
 
2147
 
 
2148
    These back onto two transport servers, one for readonly access and one for
 
2149
    read write access.
 
2150
 
 
2151
    If no explicit class is provided for readonly access, a
 
2152
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2153
    based read write transports.
 
2154
 
 
2155
    If an explicit class is provided for readonly access, that server and the 
 
2156
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2157
    """
 
2158
 
 
2159
    def get_vfs_only_server(self):
 
2160
        """See TestCaseWithMemoryTransport.
 
2161
 
 
2162
        This is useful for some tests with specific servers that need
 
2163
        diagnostics.
 
2164
        """
 
2165
        if self.__vfs_server is None:
 
2166
            self.__vfs_server = self.vfs_transport_factory()
 
2167
            self.__vfs_server.setUp()
 
2168
            self.addCleanup(self.__vfs_server.tearDown)
 
2169
        return self.__vfs_server
 
2170
 
 
2171
    def make_branch_and_tree(self, relpath, format=None):
 
2172
        """Create a branch on the transport and a tree locally.
 
2173
 
 
2174
        If the transport is not a LocalTransport, the Tree can't be created on
 
2175
        the transport.  In that case if the vfs_transport_factory is
 
2176
        LocalURLServer the working tree is created in the local
 
2177
        directory backing the transport, and the returned tree's branch and
 
2178
        repository will also be accessed locally. Otherwise a lightweight
 
2179
        checkout is created and returned.
 
2180
 
 
2181
        :param format: The BzrDirFormat.
 
2182
        :returns: the WorkingTree.
 
2183
        """
 
2184
        # TODO: always use the local disk path for the working tree,
 
2185
        # this obviously requires a format that supports branch references
 
2186
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2187
        # RBC 20060208
 
2188
        b = self.make_branch(relpath, format=format)
 
2189
        try:
 
2190
            return b.bzrdir.create_workingtree()
 
2191
        except errors.NotLocalUrl:
 
2192
            # We can only make working trees locally at the moment.  If the
 
2193
            # transport can't support them, then we keep the non-disk-backed
 
2194
            # branch and create a local checkout.
 
2195
            if self.vfs_transport_factory is LocalURLServer:
 
2196
                # the branch is colocated on disk, we cannot create a checkout.
 
2197
                # hopefully callers will expect this.
 
2198
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2199
                return local_controldir.create_workingtree()
 
2200
            else:
 
2201
                return b.create_checkout(relpath, lightweight=True)
 
2202
 
 
2203
    def assertIsDirectory(self, relpath, transport):
 
2204
        """Assert that relpath within transport is a directory.
 
2205
 
 
2206
        This may not be possible on all transports; in that case it propagates
 
2207
        a TransportNotPossible.
 
2208
        """
 
2209
        try:
 
2210
            mode = transport.stat(relpath).st_mode
 
2211
        except errors.NoSuchFile:
 
2212
            self.fail("path %s is not a directory; no such file"
 
2213
                      % (relpath))
 
2214
        if not stat.S_ISDIR(mode):
 
2215
            self.fail("path %s is not a directory; has mode %#o"
 
2216
                      % (relpath, mode))
 
2217
 
 
2218
    def assertTreesEqual(self, left, right):
 
2219
        """Check that left and right have the same content and properties."""
 
2220
        # we use a tree delta to check for equality of the content, and we
 
2221
        # manually check for equality of other things such as the parents list.
 
2222
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2223
        differences = left.changes_from(right)
 
2224
        self.assertFalse(differences.has_changed(),
 
2225
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2226
 
 
2227
    def setUp(self):
 
2228
        super(TestCaseWithTransport, self).setUp()
 
2229
        self.__vfs_server = None
 
2230
 
 
2231
 
 
2232
class ChrootedTestCase(TestCaseWithTransport):
 
2233
    """A support class that provides readonly urls outside the local namespace.
 
2234
 
 
2235
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2236
    is then we are chrooted already, if it is not then an HttpServer is used
 
2237
    for readonly urls.
 
2238
 
 
2239
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2240
                       be used without needed to redo it when a different 
 
2241
                       subclass is in use ?
 
2242
    """
 
2243
 
 
2244
    def setUp(self):
 
2245
        super(ChrootedTestCase, self).setUp()
 
2246
        if not self.vfs_transport_factory == MemoryServer:
 
2247
            self.transport_readonly_server = HttpServer
 
2248
 
 
2249
 
 
2250
def condition_id_re(pattern):
 
2251
    """Create a condition filter which performs a re check on a test's id.
 
2252
    
 
2253
    :param pattern: A regular expression string.
 
2254
    :return: A callable that returns True if the re matches.
 
2255
    """
 
2256
    filter_re = re.compile(pattern)
 
2257
    def condition(test):
 
2258
        test_id = test.id()
 
2259
        return filter_re.search(test_id)
 
2260
    return condition
 
2261
 
 
2262
 
 
2263
def condition_isinstance(klass_or_klass_list):
 
2264
    """Create a condition filter which returns isinstance(param, klass).
 
2265
    
 
2266
    :return: A callable which when called with one parameter obj return the
 
2267
        result of isinstance(obj, klass_or_klass_list).
 
2268
    """
 
2269
    def condition(obj):
 
2270
        return isinstance(obj, klass_or_klass_list)
 
2271
    return condition
 
2272
 
 
2273
 
 
2274
def condition_id_in_list(id_list):
 
2275
    """Create a condition filter which verify that test's id in a list.
 
2276
    
 
2277
    :param id_list: A TestIdList object.
 
2278
    :return: A callable that returns True if the test's id appears in the list.
 
2279
    """
 
2280
    def condition(test):
 
2281
        return id_list.includes(test.id())
 
2282
    return condition
 
2283
 
 
2284
 
 
2285
def exclude_tests_by_condition(suite, condition):
 
2286
    """Create a test suite which excludes some tests from suite.
 
2287
 
 
2288
    :param suite: The suite to get tests from.
 
2289
    :param condition: A callable whose result evaluates True when called with a
 
2290
        test case which should be excluded from the result.
 
2291
    :return: A suite which contains the tests found in suite that fail
 
2292
        condition.
 
2293
    """
 
2294
    result = []
 
2295
    for test in iter_suite_tests(suite):
 
2296
        if not condition(test):
 
2297
            result.append(test)
 
2298
    return TestUtil.TestSuite(result)
 
2299
 
 
2300
 
 
2301
def filter_suite_by_condition(suite, condition):
 
2302
    """Create a test suite by filtering another one.
 
2303
    
 
2304
    :param suite: The source suite.
 
2305
    :param condition: A callable whose result evaluates True when called with a
 
2306
        test case which should be included in the result.
 
2307
    :return: A suite which contains the tests found in suite that pass
 
2308
        condition.
 
2309
    """ 
 
2310
    result = []
 
2311
    for test in iter_suite_tests(suite):
 
2312
        if condition(test):
 
2313
            result.append(test)
 
2314
    return TestUtil.TestSuite(result)
 
2315
 
 
2316
 
 
2317
def filter_suite_by_re(suite, pattern):
 
2318
    """Create a test suite by filtering another one.
 
2319
    
 
2320
    :param suite:           the source suite
 
2321
    :param pattern:         pattern that names must match
 
2322
    :returns: the newly created suite
 
2323
    """ 
 
2324
    condition = condition_id_re(pattern)
 
2325
    result_suite = filter_suite_by_condition(suite, condition)
 
2326
    return result_suite
 
2327
 
 
2328
 
 
2329
def filter_suite_by_id_list(suite, test_id_list):
 
2330
    """Create a test suite by filtering another one.
 
2331
 
 
2332
    :param suite: The source suite.
 
2333
    :param test_id_list: A list of the test ids to keep as strings.
 
2334
    :returns: the newly created suite
 
2335
    """
 
2336
    condition = condition_id_in_list(test_id_list)
 
2337
    result_suite = filter_suite_by_condition(suite, condition)
 
2338
    return result_suite
 
2339
 
 
2340
 
 
2341
def exclude_tests_by_re(suite, pattern):
 
2342
    """Create a test suite which excludes some tests from suite.
 
2343
 
 
2344
    :param suite: The suite to get tests from.
 
2345
    :param pattern: A regular expression string. Test ids that match this
 
2346
        pattern will be excluded from the result.
 
2347
    :return: A TestSuite that contains all the tests from suite without the
 
2348
        tests that matched pattern. The order of tests is the same as it was in
 
2349
        suite.
 
2350
    """
 
2351
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2352
 
 
2353
 
 
2354
def preserve_input(something):
 
2355
    """A helper for performing test suite transformation chains.
 
2356
 
 
2357
    :param something: Anything you want to preserve.
 
2358
    :return: Something.
 
2359
    """
 
2360
    return something
 
2361
 
 
2362
 
 
2363
def randomize_suite(suite):
 
2364
    """Return a new TestSuite with suite's tests in random order.
 
2365
    
 
2366
    The tests in the input suite are flattened into a single suite in order to
 
2367
    accomplish this. Any nested TestSuites are removed to provide global
 
2368
    randomness.
 
2369
    """
 
2370
    tests = list(iter_suite_tests(suite))
 
2371
    random.shuffle(tests)
 
2372
    return TestUtil.TestSuite(tests)
 
2373
 
 
2374
 
 
2375
def split_suite_by_re(suite, pattern):
 
2376
    """Split a test suite into two by a regular expression.
 
2377
    
 
2378
    :param suite: The suite to split.
 
2379
    :param pattern: A regular expression string. Test ids that match this
 
2380
        pattern will be in the first test suite returned, and the others in the
 
2381
        second test suite returned.
 
2382
    :return: A tuple of two test suites, where the first contains tests from
 
2383
        suite matching pattern, and the second contains the remainder from
 
2384
        suite. The order within each output suite is the same as it was in
 
2385
        suite.
 
2386
    """ 
 
2387
    matched = []
 
2388
    did_not_match = []
 
2389
    filter_re = re.compile(pattern)
 
2390
    for test in iter_suite_tests(suite):
 
2391
        test_id = test.id()
 
2392
        if filter_re.search(test_id):
 
2393
            matched.append(test)
 
2394
        else:
 
2395
            did_not_match.append(test)
 
2396
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2397
 
 
2398
 
 
2399
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2400
              stop_on_failure=False,
 
2401
              transport=None, lsprof_timed=None, bench_history=None,
 
2402
              matching_tests_first=None,
 
2403
              list_only=False,
 
2404
              random_seed=None,
 
2405
              exclude_pattern=None,
 
2406
              strict=False):
 
2407
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2408
    if verbose:
 
2409
        verbosity = 2
 
2410
    else:
 
2411
        verbosity = 1
 
2412
    runner = TextTestRunner(stream=sys.stdout,
 
2413
                            descriptions=0,
 
2414
                            verbosity=verbosity,
 
2415
                            bench_history=bench_history,
 
2416
                            list_only=list_only,
 
2417
                            )
 
2418
    runner.stop_on_failure=stop_on_failure
 
2419
    # Initialise the random number generator and display the seed used.
 
2420
    # We convert the seed to a long to make it reuseable across invocations.
 
2421
    random_order = False
 
2422
    if random_seed is not None:
 
2423
        random_order = True
 
2424
        if random_seed == "now":
 
2425
            random_seed = long(time.time())
 
2426
        else:
 
2427
            # Convert the seed to a long if we can
 
2428
            try:
 
2429
                random_seed = long(random_seed)
 
2430
            except:
 
2431
                pass
 
2432
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2433
            (random_seed))
 
2434
        random.seed(random_seed)
 
2435
    # Customise the list of tests if requested
 
2436
    if exclude_pattern is not None:
 
2437
        suite = exclude_tests_by_re(suite, exclude_pattern)
 
2438
    if random_order:
 
2439
        order_changer = randomize_suite
 
2440
    else:
 
2441
        order_changer = preserve_input
 
2442
    if pattern != '.*' or random_order:
 
2443
        if matching_tests_first:
 
2444
            suites = map(order_changer, split_suite_by_re(suite, pattern))
 
2445
            suite = TestUtil.TestSuite(suites)
 
2446
        else:
 
2447
            suite = order_changer(filter_suite_by_re(suite, pattern))
 
2448
 
 
2449
    result = runner.run(suite)
 
2450
 
 
2451
    if strict:
 
2452
        return result.wasStrictlySuccessful()
 
2453
 
 
2454
    return result.wasSuccessful()
 
2455
 
 
2456
 
 
2457
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2458
             transport=None,
 
2459
             test_suite_factory=None,
 
2460
             lsprof_timed=None,
 
2461
             bench_history=None,
 
2462
             matching_tests_first=None,
 
2463
             list_only=False,
 
2464
             random_seed=None,
 
2465
             exclude_pattern=None,
 
2466
             strict=False,
 
2467
             load_list=None,
 
2468
             ):
 
2469
    """Run the whole test suite under the enhanced runner"""
 
2470
    # XXX: Very ugly way to do this...
 
2471
    # Disable warning about old formats because we don't want it to disturb
 
2472
    # any blackbox tests.
 
2473
    from bzrlib import repository
 
2474
    repository._deprecation_warning_done = True
 
2475
 
 
2476
    global default_transport
 
2477
    if transport is None:
 
2478
        transport = default_transport
 
2479
    old_transport = default_transport
 
2480
    default_transport = transport
 
2481
    try:
 
2482
        if load_list is None:
 
2483
            keep_only = None
 
2484
        else:
 
2485
            keep_only = load_test_id_list(load_list)
 
2486
        if test_suite_factory is None:
 
2487
            suite = test_suite(keep_only)
 
2488
        else:
 
2489
            suite = test_suite_factory()
 
2490
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2491
                     stop_on_failure=stop_on_failure,
 
2492
                     transport=transport,
 
2493
                     lsprof_timed=lsprof_timed,
 
2494
                     bench_history=bench_history,
 
2495
                     matching_tests_first=matching_tests_first,
 
2496
                     list_only=list_only,
 
2497
                     random_seed=random_seed,
 
2498
                     exclude_pattern=exclude_pattern,
 
2499
                     strict=strict)
 
2500
    finally:
 
2501
        default_transport = old_transport
 
2502
 
 
2503
 
 
2504
def load_test_id_list(file_name):
 
2505
    """Load a test id list from a text file.
 
2506
 
 
2507
    The format is one test id by line.  No special care is taken to impose
 
2508
    strict rules, these test ids are used to filter the test suite so a test id
 
2509
    that do not match an existing test will do no harm. This allows user to add
 
2510
    comments, leave blank lines, etc.
 
2511
    """
 
2512
    test_list = []
 
2513
    try:
 
2514
        ftest = open(file_name, 'rt')
 
2515
    except IOError, e:
 
2516
        if e.errno != errno.ENOENT:
 
2517
            raise
 
2518
        else:
 
2519
            raise errors.NoSuchFile(file_name)
 
2520
 
 
2521
    for test_name in ftest.readlines():
 
2522
        test_list.append(test_name.strip())
 
2523
    ftest.close()
 
2524
    return test_list
 
2525
 
 
2526
 
 
2527
def suite_matches_id_list(test_suite, id_list):
 
2528
    """Warns about tests not appearing or appearing more than once.
 
2529
 
 
2530
    :param test_suite: A TestSuite object.
 
2531
    :param test_id_list: The list of test ids that should be found in 
 
2532
         test_suite.
 
2533
 
 
2534
    :return: (absents, duplicates) absents is a list containing the test found
 
2535
        in id_list but not in test_suite, duplicates is a list containing the
 
2536
        test found multiple times in test_suite.
 
2537
 
 
2538
    When using a prefined test id list, it may occurs that some tests do not
 
2539
    exist anymore or that some tests use the same id. This function warns the
 
2540
    tester about potential problems in his workflow (test lists are volatile)
 
2541
    or in the test suite itself (using the same id for several tests does not
 
2542
    help to localize defects).
 
2543
    """
 
2544
    # Build a dict counting id occurrences
 
2545
    tests = dict()
 
2546
    for test in iter_suite_tests(test_suite):
 
2547
        id = test.id()
 
2548
        tests[id] = tests.get(id, 0) + 1
 
2549
 
 
2550
    not_found = []
 
2551
    duplicates = []
 
2552
    for id in id_list:
 
2553
        occurs = tests.get(id, 0)
 
2554
        if not occurs:
 
2555
            not_found.append(id)
 
2556
        elif occurs > 1:
 
2557
            duplicates.append(id)
 
2558
 
 
2559
    return not_found, duplicates
 
2560
 
 
2561
 
 
2562
class TestIdList(object):
 
2563
    """Test id list to filter a test suite.
 
2564
 
 
2565
    Relying on the assumption that test ids are built as:
 
2566
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
2567
    notation, this class offers methods to :
 
2568
    - avoid building a test suite for modules not refered to in the test list,
 
2569
    - keep only the tests listed from the module test suite.
 
2570
    """
 
2571
 
 
2572
    def __init__(self, test_id_list):
 
2573
        # When a test suite needs to be filtered against us we compare test ids
 
2574
        # for equality, so a simple dict offers a quick and simple solution.
 
2575
        self.tests = dict().fromkeys(test_id_list, True)
 
2576
 
 
2577
        # While unittest.TestCase have ids like:
 
2578
        # <module>.<class>.<method>[(<param+)],
 
2579
        # doctest.DocTestCase can have ids like:
 
2580
        # <module>
 
2581
        # <module>.<class>
 
2582
        # <module>.<function>
 
2583
        # <module>.<class>.<method>
 
2584
 
 
2585
        # Since we can't predict a test class from its name only, we settle on
 
2586
        # a simple constraint: a test id always begins with its module name.
 
2587
 
 
2588
        modules = {}
 
2589
        for test_id in test_id_list:
 
2590
            parts = test_id.split('.')
 
2591
            mod_name = parts.pop(0)
 
2592
            modules[mod_name] = True
 
2593
            for part in parts:
 
2594
                mod_name += '.' + part
 
2595
                modules[mod_name] = True
 
2596
        self.modules = modules
 
2597
 
 
2598
    def refers_to(self, module_name):
 
2599
        """Is there tests for the module or one of its sub modules."""
 
2600
        return self.modules.has_key(module_name)
 
2601
 
 
2602
    def includes(self, test_id):
 
2603
        return self.tests.has_key(test_id)
 
2604
 
 
2605
 
 
2606
def test_suite(keep_only=None):
 
2607
    """Build and return TestSuite for the whole of bzrlib.
 
2608
 
 
2609
    :param keep_only: A list of test ids limiting the suite returned.
 
2610
 
 
2611
    This function can be replaced if you need to change the default test
 
2612
    suite on a global basis, but it is not encouraged.
 
2613
    """
 
2614
    testmod_names = [
 
2615
                   'bzrlib.util.tests.test_bencode',
 
2616
                   'bzrlib.tests.test__dirstate_helpers',
 
2617
                   'bzrlib.tests.test_ancestry',
 
2618
                   'bzrlib.tests.test_annotate',
 
2619
                   'bzrlib.tests.test_api',
 
2620
                   'bzrlib.tests.test_atomicfile',
 
2621
                   'bzrlib.tests.test_bad_files',
 
2622
                   'bzrlib.tests.test_bisect_multi',
 
2623
                   'bzrlib.tests.test_branch',
 
2624
                   'bzrlib.tests.test_branchbuilder',
 
2625
                   'bzrlib.tests.test_bugtracker',
 
2626
                   'bzrlib.tests.test_bundle',
 
2627
                   'bzrlib.tests.test_bzrdir',
 
2628
                   'bzrlib.tests.test_cache_utf8',
 
2629
                   'bzrlib.tests.test_commands',
 
2630
                   'bzrlib.tests.test_commit',
 
2631
                   'bzrlib.tests.test_commit_merge',
 
2632
                   'bzrlib.tests.test_config',
 
2633
                   'bzrlib.tests.test_conflicts',
 
2634
                   'bzrlib.tests.test_counted_lock',
 
2635
                   'bzrlib.tests.test_decorators',
 
2636
                   'bzrlib.tests.test_delta',
 
2637
                   'bzrlib.tests.test_deprecated_graph',
 
2638
                   'bzrlib.tests.test_diff',
 
2639
                   'bzrlib.tests.test_dirstate',
 
2640
                   'bzrlib.tests.test_directory_service',
 
2641
                   'bzrlib.tests.test_email_message',
 
2642
                   'bzrlib.tests.test_errors',
 
2643
                   'bzrlib.tests.test_escaped_store',
 
2644
                   'bzrlib.tests.test_extract',
 
2645
                   'bzrlib.tests.test_fetch',
 
2646
                   'bzrlib.tests.test_ftp_transport',
 
2647
                   'bzrlib.tests.test_generate_docs',
 
2648
                   'bzrlib.tests.test_generate_ids',
 
2649
                   'bzrlib.tests.test_globbing',
 
2650
                   'bzrlib.tests.test_gpg',
 
2651
                   'bzrlib.tests.test_graph',
 
2652
                   'bzrlib.tests.test_hashcache',
 
2653
                   'bzrlib.tests.test_help',
 
2654
                   'bzrlib.tests.test_hooks',
 
2655
                   'bzrlib.tests.test_http',
 
2656
                   'bzrlib.tests.test_http_implementations',
 
2657
                   'bzrlib.tests.test_http_response',
 
2658
                   'bzrlib.tests.test_https_ca_bundle',
 
2659
                   'bzrlib.tests.test_identitymap',
 
2660
                   'bzrlib.tests.test_ignores',
 
2661
                   'bzrlib.tests.test_index',
 
2662
                   'bzrlib.tests.test_info',
 
2663
                   'bzrlib.tests.test_inv',
 
2664
                   'bzrlib.tests.test_knit',
 
2665
                   'bzrlib.tests.test_lazy_import',
 
2666
                   'bzrlib.tests.test_lazy_regex',
 
2667
                   'bzrlib.tests.test_lockdir',
 
2668
                   'bzrlib.tests.test_lockable_files',
 
2669
                   'bzrlib.tests.test_log',
 
2670
                   'bzrlib.tests.test_lsprof',
 
2671
                   'bzrlib.tests.test_lru_cache',
 
2672
                   'bzrlib.tests.test_mail_client',
 
2673
                   'bzrlib.tests.test_memorytree',
 
2674
                   'bzrlib.tests.test_merge',
 
2675
                   'bzrlib.tests.test_merge3',
 
2676
                   'bzrlib.tests.test_merge_core',
 
2677
                   'bzrlib.tests.test_merge_directive',
 
2678
                   'bzrlib.tests.test_missing',
 
2679
                   'bzrlib.tests.test_msgeditor',
 
2680
                   'bzrlib.tests.test_multiparent',
 
2681
                   'bzrlib.tests.test_mutabletree',
 
2682
                   'bzrlib.tests.test_nonascii',
 
2683
                   'bzrlib.tests.test_options',
 
2684
                   'bzrlib.tests.test_osutils',
 
2685
                   'bzrlib.tests.test_osutils_encodings',
 
2686
                   'bzrlib.tests.test_pack',
 
2687
                   'bzrlib.tests.test_patch',
 
2688
                   'bzrlib.tests.test_patches',
 
2689
                   'bzrlib.tests.test_permissions',
 
2690
                   'bzrlib.tests.test_plugins',
 
2691
                   'bzrlib.tests.test_progress',
 
2692
                   'bzrlib.tests.test_reconfigure',
 
2693
                   'bzrlib.tests.test_reconcile',
 
2694
                   'bzrlib.tests.test_registry',
 
2695
                   'bzrlib.tests.test_remote',
 
2696
                   'bzrlib.tests.test_repository',
 
2697
                   'bzrlib.tests.test_revert',
 
2698
                   'bzrlib.tests.test_revision',
 
2699
                   'bzrlib.tests.test_revisionspec',
 
2700
                   'bzrlib.tests.test_revisiontree',
 
2701
                   'bzrlib.tests.test_rio',
 
2702
                   'bzrlib.tests.test_sampler',
 
2703
                   'bzrlib.tests.test_selftest',
 
2704
                   'bzrlib.tests.test_setup',
 
2705
                   'bzrlib.tests.test_sftp_transport',
 
2706
                   'bzrlib.tests.test_smart',
 
2707
                   'bzrlib.tests.test_smart_add',
 
2708
                   'bzrlib.tests.test_smart_transport',
 
2709
                   'bzrlib.tests.test_smtp_connection',
 
2710
                   'bzrlib.tests.test_source',
 
2711
                   'bzrlib.tests.test_ssh_transport',
 
2712
                   'bzrlib.tests.test_status',
 
2713
                   'bzrlib.tests.test_store',
 
2714
                   'bzrlib.tests.test_strace',
 
2715
                   'bzrlib.tests.test_subsume',
 
2716
                   'bzrlib.tests.test_switch',
 
2717
                   'bzrlib.tests.test_symbol_versioning',
 
2718
                   'bzrlib.tests.test_tag',
 
2719
                   'bzrlib.tests.test_testament',
 
2720
                   'bzrlib.tests.test_textfile',
 
2721
                   'bzrlib.tests.test_textmerge',
 
2722
                   'bzrlib.tests.test_timestamp',
 
2723
                   'bzrlib.tests.test_trace',
 
2724
                   'bzrlib.tests.test_transactions',
 
2725
                   'bzrlib.tests.test_transform',
 
2726
                   'bzrlib.tests.test_transport',
 
2727
                   'bzrlib.tests.test_tree',
 
2728
                   'bzrlib.tests.test_treebuilder',
 
2729
                   'bzrlib.tests.test_tsort',
 
2730
                   'bzrlib.tests.test_tuned_gzip',
 
2731
                   'bzrlib.tests.test_ui',
 
2732
                   'bzrlib.tests.test_uncommit',
 
2733
                   'bzrlib.tests.test_upgrade',
 
2734
                   'bzrlib.tests.test_urlutils',
 
2735
                   'bzrlib.tests.test_versionedfile',
 
2736
                   'bzrlib.tests.test_version',
 
2737
                   'bzrlib.tests.test_version_info',
 
2738
                   'bzrlib.tests.test_weave',
 
2739
                   'bzrlib.tests.test_whitebox',
 
2740
                   'bzrlib.tests.test_win32utils',
 
2741
                   'bzrlib.tests.test_workingtree',
 
2742
                   'bzrlib.tests.test_workingtree_4',
 
2743
                   'bzrlib.tests.test_wsgi',
 
2744
                   'bzrlib.tests.test_xml',
 
2745
                   ]
 
2746
    test_transport_implementations = [
 
2747
        'bzrlib.tests.test_transport_implementations',
 
2748
        'bzrlib.tests.test_read_bundle',
 
2749
        ]
 
2750
    loader = TestUtil.TestLoader()
 
2751
 
 
2752
    if keep_only is None:
 
2753
        loader = TestUtil.TestLoader()
 
2754
    else:
 
2755
        id_filter = TestIdList(keep_only)
 
2756
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
2757
    suite = loader.suiteClass()
 
2758
 
 
2759
    # modules building their suite with loadTestsFromModuleNames
 
2760
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2761
 
 
2762
    # modules adapted for transport implementations
 
2763
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
2764
    adapter = TransportTestProviderAdapter()
 
2765
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2766
 
 
2767
    # modules defining their own test_suite()
 
2768
    for package in [p for p in packages_to_test()
 
2769
                    if (keep_only is None
 
2770
                        or id_filter.refers_to(p.__name__))]:
 
2771
        pack_suite = package.test_suite()
 
2772
        suite.addTest(pack_suite)
 
2773
 
 
2774
    modules_to_doctest = [
 
2775
        'bzrlib',
 
2776
        'bzrlib.errors',
 
2777
        'bzrlib.export',
 
2778
        'bzrlib.inventory',
 
2779
        'bzrlib.iterablefile',
 
2780
        'bzrlib.lockdir',
 
2781
        'bzrlib.merge3',
 
2782
        'bzrlib.option',
 
2783
        'bzrlib.store',
 
2784
        'bzrlib.symbol_versioning',
 
2785
        'bzrlib.tests',
 
2786
        'bzrlib.timestamp',
 
2787
        'bzrlib.version_info_formats.format_custom',
 
2788
        ]
 
2789
 
 
2790
    for mod in modules_to_doctest:
 
2791
        if not (keep_only is None or id_filter.refers_to(mod)):
 
2792
            # No tests to keep here, move along
 
2793
            continue
 
2794
        try:
 
2795
            doc_suite = doctest.DocTestSuite(mod)
 
2796
        except ValueError, e:
 
2797
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
2798
            raise
 
2799
        suite.addTest(doc_suite)
 
2800
 
 
2801
    default_encoding = sys.getdefaultencoding()
 
2802
    for name, plugin in bzrlib.plugin.plugins().items():
 
2803
        if keep_only is not None:
 
2804
            if not id_filter.refers_to(plugin.module.__name__):
 
2805
                continue
 
2806
        plugin_suite = plugin.test_suite()
 
2807
        # We used to catch ImportError here and turn it into just a warning,
 
2808
        # but really if you don't have --no-plugins this should be a failure.
 
2809
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
2810
        if plugin_suite is None:
 
2811
            plugin_suite = plugin.load_plugin_tests(loader)
 
2812
        if plugin_suite is not None:
 
2813
            suite.addTest(plugin_suite)
 
2814
        if default_encoding != sys.getdefaultencoding():
 
2815
            bzrlib.trace.warning(
 
2816
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
2817
                sys.getdefaultencoding())
 
2818
            reload(sys)
 
2819
            sys.setdefaultencoding(default_encoding)
 
2820
 
 
2821
    if keep_only is not None:
 
2822
        # Now that the referred modules have loaded their tests, keep only the
 
2823
        # requested ones.
 
2824
        suite = filter_suite_by_id_list(suite, id_filter)
 
2825
        # Do some sanity checks on the id_list filtering
 
2826
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
2827
        for id in not_found:
 
2828
            bzrlib.trace.warning('"%s" not found in the test suite', id)
 
2829
        for id in duplicates:
 
2830
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
2831
 
 
2832
    return suite
 
2833
 
 
2834
 
 
2835
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
 
2836
    """Adapt all tests in some given modules to given scenarios.
 
2837
 
 
2838
    This is the recommended public interface for test parameterization.
 
2839
    Typically the test_suite() method for a per-implementation test
 
2840
    suite will call multiply_tests_from_modules and return the 
 
2841
    result.
 
2842
 
 
2843
    :param module_name_list: List of fully-qualified names of test
 
2844
        modules.
 
2845
    :param scenario_iter: Iterable of pairs of (scenario_name, 
 
2846
        scenario_param_dict).
 
2847
    :param loader: If provided, will be used instead of a new 
 
2848
        bzrlib.tests.TestLoader() instance.
 
2849
 
 
2850
    This returns a new TestSuite containing the cross product of
 
2851
    all the tests in all the modules, each repeated for each scenario.
 
2852
    Each test is adapted by adding the scenario name at the end 
 
2853
    of its name, and updating the test object's __dict__ with the
 
2854
    scenario_param_dict.
 
2855
 
 
2856
    >>> r = multiply_tests_from_modules(
 
2857
    ...     ['bzrlib.tests.test_sampler'],
 
2858
    ...     [('one', dict(param=1)), 
 
2859
    ...      ('two', dict(param=2))])
 
2860
    >>> tests = list(iter_suite_tests(r))
 
2861
    >>> len(tests)
 
2862
    2
 
2863
    >>> tests[0].id()
 
2864
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
2865
    >>> tests[0].param
 
2866
    1
 
2867
    >>> tests[1].param
 
2868
    2
 
2869
    """
 
2870
    # XXX: Isn't load_tests() a better way to provide the same functionality
 
2871
    # without forcing a predefined TestScenarioApplier ? --vila 080215
 
2872
    if loader is None:
 
2873
        loader = TestUtil.TestLoader()
 
2874
 
 
2875
    suite = loader.suiteClass()
 
2876
 
 
2877
    adapter = TestScenarioApplier()
 
2878
    adapter.scenarios = list(scenario_iter)
 
2879
    adapt_modules(module_name_list, adapter, loader, suite)
 
2880
    return suite
 
2881
 
 
2882
 
 
2883
def multiply_scenarios(scenarios_left, scenarios_right):
 
2884
    """Multiply two sets of scenarios.
 
2885
 
 
2886
    :returns: the cartesian product of the two sets of scenarios, that is
 
2887
        a scenario for every possible combination of a left scenario and a
 
2888
        right scenario.
 
2889
    """
 
2890
    return [
 
2891
        ('%s,%s' % (left_name, right_name),
 
2892
         dict(left_dict.items() + right_dict.items()))
 
2893
        for left_name, left_dict in scenarios_left
 
2894
        for right_name, right_dict in scenarios_right]
 
2895
 
 
2896
 
 
2897
 
 
2898
def adapt_modules(mods_list, adapter, loader, suite):
 
2899
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2900
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2901
        suite.addTests(adapter.adapt(test))
 
2902
 
 
2903
 
 
2904
def adapt_tests(tests_list, adapter, loader, suite):
 
2905
    """Adapt the tests in tests_list using adapter and add to suite."""
 
2906
    for test in tests_list:
 
2907
        suite.addTests(adapter.adapt(loader.loadTestsFromName(test)))
 
2908
 
 
2909
 
 
2910
def _rmtree_temp_dir(dirname):
 
2911
    # If LANG=C we probably have created some bogus paths
 
2912
    # which rmtree(unicode) will fail to delete
 
2913
    # so make sure we are using rmtree(str) to delete everything
 
2914
    # except on win32, where rmtree(str) will fail
 
2915
    # since it doesn't have the property of byte-stream paths
 
2916
    # (they are either ascii or mbcs)
 
2917
    if sys.platform == 'win32':
 
2918
        # make sure we are using the unicode win32 api
 
2919
        dirname = unicode(dirname)
 
2920
    else:
 
2921
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2922
    try:
 
2923
        osutils.rmtree(dirname)
 
2924
    except OSError, e:
 
2925
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2926
            sys.stderr.write(('Permission denied: '
 
2927
                                 'unable to remove testing dir '
 
2928
                                 '%s\n' % os.path.basename(dirname)))
 
2929
        else:
 
2930
            raise
 
2931
 
 
2932
 
 
2933
class Feature(object):
 
2934
    """An operating system Feature."""
 
2935
 
 
2936
    def __init__(self):
 
2937
        self._available = None
 
2938
 
 
2939
    def available(self):
 
2940
        """Is the feature available?
 
2941
 
 
2942
        :return: True if the feature is available.
 
2943
        """
 
2944
        if self._available is None:
 
2945
            self._available = self._probe()
 
2946
        return self._available
 
2947
 
 
2948
    def _probe(self):
 
2949
        """Implement this method in concrete features.
 
2950
 
 
2951
        :return: True if the feature is available.
 
2952
        """
 
2953
        raise NotImplementedError
 
2954
 
 
2955
    def __str__(self):
 
2956
        if getattr(self, 'feature_name', None):
 
2957
            return self.feature_name()
 
2958
        return self.__class__.__name__
 
2959
 
 
2960
 
 
2961
class _SymlinkFeature(Feature):
 
2962
 
 
2963
    def _probe(self):
 
2964
        return osutils.has_symlinks()
 
2965
 
 
2966
    def feature_name(self):
 
2967
        return 'symlinks'
 
2968
 
 
2969
SymlinkFeature = _SymlinkFeature()
 
2970
 
 
2971
 
 
2972
class _HardlinkFeature(Feature):
 
2973
 
 
2974
    def _probe(self):
 
2975
        return osutils.has_hardlinks()
 
2976
 
 
2977
    def feature_name(self):
 
2978
        return 'hardlinks'
 
2979
 
 
2980
HardlinkFeature = _HardlinkFeature()
 
2981
 
 
2982
 
 
2983
class _OsFifoFeature(Feature):
 
2984
 
 
2985
    def _probe(self):
 
2986
        return getattr(os, 'mkfifo', None)
 
2987
 
 
2988
    def feature_name(self):
 
2989
        return 'filesystem fifos'
 
2990
 
 
2991
OsFifoFeature = _OsFifoFeature()
 
2992
 
 
2993
 
 
2994
class TestScenarioApplier(object):
 
2995
    """A tool to apply scenarios to tests."""
 
2996
 
 
2997
    def adapt(self, test):
 
2998
        """Return a TestSuite containing a copy of test for each scenario."""
 
2999
        result = unittest.TestSuite()
 
3000
        for scenario in self.scenarios:
 
3001
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
3002
        return result
 
3003
 
 
3004
    def adapt_test_to_scenario(self, test, scenario):
 
3005
        """Copy test and apply scenario to it.
 
3006
 
 
3007
        :param test: A test to adapt.
 
3008
        :param scenario: A tuple describing the scenarion.
 
3009
            The first element of the tuple is the new test id.
 
3010
            The second element is a dict containing attributes to set on the
 
3011
            test.
 
3012
        :return: The adapted test.
 
3013
        """
 
3014
        from copy import deepcopy
 
3015
        new_test = deepcopy(test)
 
3016
        for name, value in scenario[1].items():
 
3017
            setattr(new_test, name, value)
 
3018
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
3019
        new_test.id = lambda: new_id
 
3020
        return new_test
 
3021
 
 
3022
 
 
3023
def probe_unicode_in_user_encoding():
 
3024
    """Try to encode several unicode strings to use in unicode-aware tests.
 
3025
    Return first successfull match.
 
3026
 
 
3027
    :return:  (unicode value, encoded plain string value) or (None, None)
 
3028
    """
 
3029
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
3030
    for uni_val in possible_vals:
 
3031
        try:
 
3032
            str_val = uni_val.encode(bzrlib.user_encoding)
 
3033
        except UnicodeEncodeError:
 
3034
            # Try a different character
 
3035
            pass
 
3036
        else:
 
3037
            return uni_val, str_val
 
3038
    return None, None
 
3039
 
 
3040
 
 
3041
def probe_bad_non_ascii(encoding):
 
3042
    """Try to find [bad] character with code [128..255]
 
3043
    that cannot be decoded to unicode in some encoding.
 
3044
    Return None if all non-ascii characters is valid
 
3045
    for given encoding.
 
3046
    """
 
3047
    for i in xrange(128, 256):
 
3048
        char = chr(i)
 
3049
        try:
 
3050
            char.decode(encoding)
 
3051
        except UnicodeDecodeError:
 
3052
            return char
 
3053
    return None
 
3054
 
 
3055
 
 
3056
class _FTPServerFeature(Feature):
 
3057
    """Some tests want an FTP Server, check if one is available.
 
3058
 
 
3059
    Right now, the only way this is available is if 'medusa' is installed.
 
3060
    http://www.amk.ca/python/code/medusa.html
 
3061
    """
 
3062
 
 
3063
    def _probe(self):
 
3064
        try:
 
3065
            import bzrlib.tests.ftp_server
 
3066
            return True
 
3067
        except ImportError:
 
3068
            return False
 
3069
 
 
3070
    def feature_name(self):
 
3071
        return 'FTPServer'
 
3072
 
 
3073
FTPServerFeature = _FTPServerFeature()
 
3074
 
 
3075
 
 
3076
class _CaseInsensitiveFilesystemFeature(Feature):
 
3077
    """Check if underlined filesystem is case-insensitive
 
3078
    (e.g. on Windows, Cygwin, MacOS)
 
3079
    """
 
3080
 
 
3081
    def _probe(self):
 
3082
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
3083
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
3084
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
3085
        else:
 
3086
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
3087
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
3088
            dir=root)
 
3089
        name_a = osutils.pathjoin(tdir, 'a')
 
3090
        name_A = osutils.pathjoin(tdir, 'A')
 
3091
        os.mkdir(name_a)
 
3092
        result = osutils.isdir(name_A)
 
3093
        _rmtree_temp_dir(tdir)
 
3094
        return result
 
3095
 
 
3096
    def feature_name(self):
 
3097
        return 'case-insensitive filesystem'
 
3098
 
 
3099
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()