/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Aaron Bentley
  • Date: 2008-04-12 05:48:07 UTC
  • mto: This revision was merged to the branch mainline in revision 3364.
  • Revision ID: aaron@aaronbentley.com-20080412054807-47yd7dgjufa0nf98
rename 'sharing' to 'use-shared'

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
 
43
import sys
 
44
import tempfile
 
45
import time
 
46
import unittest
 
47
import warnings
 
48
 
 
49
 
 
50
from bzrlib import (
 
51
    bzrdir,
 
52
    debug,
 
53
    errors,
 
54
    memorytree,
 
55
    osutils,
 
56
    progress,
 
57
    ui,
 
58
    urlutils,
 
59
    workingtree,
 
60
    )
 
61
import bzrlib.branch
 
62
import bzrlib.commands
 
63
import bzrlib.timestamp
 
64
import bzrlib.export
 
65
import bzrlib.inventory
 
66
import bzrlib.iterablefile
 
67
import bzrlib.lockdir
 
68
try:
 
69
    import bzrlib.lsprof
 
70
except ImportError:
 
71
    # lsprof not available
 
72
    pass
 
73
from bzrlib.merge import merge_inner
 
74
import bzrlib.merge3
 
75
import bzrlib.plugin
 
76
from bzrlib.revision import common_ancestor
 
77
import bzrlib.store
 
78
from bzrlib import symbol_versioning
 
79
from bzrlib.symbol_versioning import (
 
80
    DEPRECATED_PARAMETER,
 
81
    deprecated_function,
 
82
    deprecated_method,
 
83
    deprecated_passed,
 
84
    zero_ninetyone,
 
85
    zero_ninetytwo,
 
86
    one_zero,
 
87
    )
 
88
import bzrlib.trace
 
89
from bzrlib.transport import get_transport
 
90
import bzrlib.transport
 
91
from bzrlib.transport.local import LocalURLServer
 
92
from bzrlib.transport.memory import MemoryServer
 
93
from bzrlib.transport.readonly import ReadonlyServer
 
94
from bzrlib.trace import mutter, note
 
95
from bzrlib.tests import TestUtil
 
96
from bzrlib.tests.http_server import HttpServer
 
97
from bzrlib.tests.TestUtil import (
 
98
                          TestSuite,
 
99
                          TestLoader,
 
100
                          )
 
101
from bzrlib.tests.treeshape import build_tree_contents
 
102
import bzrlib.version_info_formats.format_custom
 
103
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
104
 
 
105
# Mark this python module as being part of the implementation
 
106
# of unittest: this gives us better tracebacks where the last
 
107
# shown frame is the test code, not our assertXYZ.
 
108
__unittest = 1
 
109
 
 
110
default_transport = LocalURLServer
 
111
 
 
112
 
 
113
def packages_to_test():
 
114
    """Return a list of packages to test.
 
115
 
 
116
    The packages are not globally imported so that import failures are
 
117
    triggered when running selftest, not when importing the command.
 
118
    """
 
119
    import bzrlib.doc
 
120
    import bzrlib.tests.blackbox
 
121
    import bzrlib.tests.branch_implementations
 
122
    import bzrlib.tests.bzrdir_implementations
 
123
    import bzrlib.tests.commands
 
124
    import bzrlib.tests.interrepository_implementations
 
125
    import bzrlib.tests.interversionedfile_implementations
 
126
    import bzrlib.tests.intertree_implementations
 
127
    import bzrlib.tests.inventory_implementations
 
128
    import bzrlib.tests.per_lock
 
129
    import bzrlib.tests.repository_implementations
 
130
    import bzrlib.tests.revisionstore_implementations
 
131
    import bzrlib.tests.tree_implementations
 
132
    import bzrlib.tests.workingtree_implementations
 
133
    return [
 
134
            bzrlib.doc,
 
135
            bzrlib.tests.blackbox,
 
136
            bzrlib.tests.branch_implementations,
 
137
            bzrlib.tests.bzrdir_implementations,
 
138
            bzrlib.tests.commands,
 
139
            bzrlib.tests.interrepository_implementations,
 
140
            bzrlib.tests.interversionedfile_implementations,
 
141
            bzrlib.tests.intertree_implementations,
 
142
            bzrlib.tests.inventory_implementations,
 
143
            bzrlib.tests.per_lock,
 
144
            bzrlib.tests.repository_implementations,
 
145
            bzrlib.tests.revisionstore_implementations,
 
146
            bzrlib.tests.tree_implementations,
 
147
            bzrlib.tests.workingtree_implementations,
 
148
            ]
 
149
 
 
150
 
 
151
class ExtendedTestResult(unittest._TextTestResult):
 
152
    """Accepts, reports and accumulates the results of running tests.
 
153
 
 
154
    Compared to the unittest version this class adds support for
 
155
    profiling, benchmarking, stopping as soon as a test fails,  and
 
156
    skipping tests.  There are further-specialized subclasses for
 
157
    different types of display.
 
158
 
 
159
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
160
    addFailure or addError classes.  These in turn may redirect to a more
 
161
    specific case for the special test results supported by our extended
 
162
    tests.
 
163
 
 
164
    Note that just one of these objects is fed the results from many tests.
 
165
    """
 
166
 
 
167
    stop_early = False
 
168
    
 
169
    def __init__(self, stream, descriptions, verbosity,
 
170
                 bench_history=None,
 
171
                 num_tests=None,
 
172
                 ):
 
173
        """Construct new TestResult.
 
174
 
 
175
        :param bench_history: Optionally, a writable file object to accumulate
 
176
            benchmark results.
 
177
        """
 
178
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
179
        if bench_history is not None:
 
180
            from bzrlib.version import _get_bzr_source_tree
 
181
            src_tree = _get_bzr_source_tree()
 
182
            if src_tree:
 
183
                try:
 
184
                    revision_id = src_tree.get_parent_ids()[0]
 
185
                except IndexError:
 
186
                    # XXX: if this is a brand new tree, do the same as if there
 
187
                    # is no branch.
 
188
                    revision_id = ''
 
189
            else:
 
190
                # XXX: If there's no branch, what should we do?
 
191
                revision_id = ''
 
192
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
193
        self._bench_history = bench_history
 
194
        self.ui = ui.ui_factory
 
195
        self.num_tests = num_tests
 
196
        self.error_count = 0
 
197
        self.failure_count = 0
 
198
        self.known_failure_count = 0
 
199
        self.skip_count = 0
 
200
        self.not_applicable_count = 0
 
201
        self.unsupported = {}
 
202
        self.count = 0
 
203
        self._overall_start_time = time.time()
 
204
    
 
205
    def _extractBenchmarkTime(self, testCase):
 
206
        """Add a benchmark time for the current test case."""
 
207
        return getattr(testCase, "_benchtime", None)
 
208
    
 
209
    def _elapsedTestTimeString(self):
 
210
        """Return a time string for the overall time the current test has taken."""
 
211
        return self._formatTime(time.time() - self._start_time)
 
212
 
 
213
    def _testTimeString(self, testCase):
 
214
        benchmark_time = self._extractBenchmarkTime(testCase)
 
215
        if benchmark_time is not None:
 
216
            return "%s/%s" % (
 
217
                self._formatTime(benchmark_time),
 
218
                self._elapsedTestTimeString())
 
219
        else:
 
220
            return "           %s" % self._elapsedTestTimeString()
 
221
 
 
222
    def _formatTime(self, seconds):
 
223
        """Format seconds as milliseconds with leading spaces."""
 
224
        # some benchmarks can take thousands of seconds to run, so we need 8
 
225
        # places
 
226
        return "%8dms" % (1000 * seconds)
 
227
 
 
228
    def _shortened_test_description(self, test):
 
229
        what = test.id()
 
230
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
231
        return what
 
232
 
 
233
    def startTest(self, test):
 
234
        unittest.TestResult.startTest(self, test)
 
235
        self.report_test_start(test)
 
236
        test.number = self.count
 
237
        self._recordTestStartTime()
 
238
 
 
239
    def _recordTestStartTime(self):
 
240
        """Record that a test has started."""
 
241
        self._start_time = time.time()
 
242
 
 
243
    def _cleanupLogFile(self, test):
 
244
        # We can only do this if we have one of our TestCases, not if
 
245
        # we have a doctest.
 
246
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
247
        if setKeepLogfile is not None:
 
248
            setKeepLogfile()
 
249
 
 
250
    def addError(self, test, err):
 
251
        """Tell result that test finished with an error.
 
252
 
 
253
        Called from the TestCase run() method when the test
 
254
        fails with an unexpected error.
 
255
        """
 
256
        self._testConcluded(test)
 
257
        if isinstance(err[1], TestSkipped):
 
258
            return self._addSkipped(test, err)
 
259
        elif isinstance(err[1], UnavailableFeature):
 
260
            return self.addNotSupported(test, err[1].args[0])
 
261
        else:
 
262
            self._cleanupLogFile(test)
 
263
            unittest.TestResult.addError(self, test, err)
 
264
            self.error_count += 1
 
265
            self.report_error(test, err)
 
266
            if self.stop_early:
 
267
                self.stop()
 
268
 
 
269
    def addFailure(self, test, err):
 
270
        """Tell result that test failed.
 
271
 
 
272
        Called from the TestCase run() method when the test
 
273
        fails because e.g. an assert() method failed.
 
274
        """
 
275
        self._testConcluded(test)
 
276
        if isinstance(err[1], KnownFailure):
 
277
            return self._addKnownFailure(test, err)
 
278
        else:
 
279
            self._cleanupLogFile(test)
 
280
            unittest.TestResult.addFailure(self, test, err)
 
281
            self.failure_count += 1
 
282
            self.report_failure(test, err)
 
283
            if self.stop_early:
 
284
                self.stop()
 
285
 
 
286
    def addSuccess(self, test):
 
287
        """Tell result that test completed successfully.
 
288
 
 
289
        Called from the TestCase run()
 
290
        """
 
291
        self._testConcluded(test)
 
292
        if self._bench_history is not None:
 
293
            benchmark_time = self._extractBenchmarkTime(test)
 
294
            if benchmark_time is not None:
 
295
                self._bench_history.write("%s %s\n" % (
 
296
                    self._formatTime(benchmark_time),
 
297
                    test.id()))
 
298
        self.report_success(test)
 
299
        self._cleanupLogFile(test)
 
300
        unittest.TestResult.addSuccess(self, test)
 
301
        test._log_contents = ''
 
302
 
 
303
    def _testConcluded(self, test):
 
304
        """Common code when a test has finished.
 
305
 
 
306
        Called regardless of whether it succeded, failed, etc.
 
307
        """
 
308
        pass
 
309
 
 
310
    def _addKnownFailure(self, test, err):
 
311
        self.known_failure_count += 1
 
312
        self.report_known_failure(test, err)
 
313
 
 
314
    def addNotSupported(self, test, feature):
 
315
        """The test will not be run because of a missing feature.
 
316
        """
 
317
        # this can be called in two different ways: it may be that the
 
318
        # test started running, and then raised (through addError) 
 
319
        # UnavailableFeature.  Alternatively this method can be called
 
320
        # while probing for features before running the tests; in that
 
321
        # case we will see startTest and stopTest, but the test will never
 
322
        # actually run.
 
323
        self.unsupported.setdefault(str(feature), 0)
 
324
        self.unsupported[str(feature)] += 1
 
325
        self.report_unsupported(test, feature)
 
326
 
 
327
    def _addSkipped(self, test, skip_excinfo):
 
328
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
329
            self.not_applicable_count += 1
 
330
            self.report_not_applicable(test, skip_excinfo)
 
331
        else:
 
332
            self.skip_count += 1
 
333
            self.report_skip(test, skip_excinfo)
 
334
        try:
 
335
            test.tearDown()
 
336
        except KeyboardInterrupt:
 
337
            raise
 
338
        except:
 
339
            self.addError(test, test._exc_info())
 
340
        else:
 
341
            # seems best to treat this as success from point-of-view of unittest
 
342
            # -- it actually does nothing so it barely matters :)
 
343
            unittest.TestResult.addSuccess(self, test)
 
344
            test._log_contents = ''
 
345
 
 
346
    def printErrorList(self, flavour, errors):
 
347
        for test, err in errors:
 
348
            self.stream.writeln(self.separator1)
 
349
            self.stream.write("%s: " % flavour)
 
350
            self.stream.writeln(self.getDescription(test))
 
351
            if getattr(test, '_get_log', None) is not None:
 
352
                self.stream.write('\n')
 
353
                self.stream.write(
 
354
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
355
                self.stream.write('\n')
 
356
                self.stream.write(test._get_log())
 
357
                self.stream.write('\n')
 
358
                self.stream.write(
 
359
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
360
                self.stream.write('\n')
 
361
            self.stream.writeln(self.separator2)
 
362
            self.stream.writeln("%s" % err)
 
363
 
 
364
    def finished(self):
 
365
        pass
 
366
 
 
367
    def report_cleaning_up(self):
 
368
        pass
 
369
 
 
370
    def report_success(self, test):
 
371
        pass
 
372
 
 
373
    def wasStrictlySuccessful(self):
 
374
        if self.unsupported or self.known_failure_count:
 
375
            return False
 
376
        return self.wasSuccessful()
 
377
 
 
378
 
 
379
class TextTestResult(ExtendedTestResult):
 
380
    """Displays progress and results of tests in text form"""
 
381
 
 
382
    def __init__(self, stream, descriptions, verbosity,
 
383
                 bench_history=None,
 
384
                 num_tests=None,
 
385
                 pb=None,
 
386
                 ):
 
387
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
388
            bench_history, num_tests)
 
389
        if pb is None:
 
390
            self.pb = self.ui.nested_progress_bar()
 
391
            self._supplied_pb = False
 
392
        else:
 
393
            self.pb = pb
 
394
            self._supplied_pb = True
 
395
        self.pb.show_pct = False
 
396
        self.pb.show_spinner = False
 
397
        self.pb.show_eta = False,
 
398
        self.pb.show_count = False
 
399
        self.pb.show_bar = False
 
400
 
 
401
    def report_starting(self):
 
402
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
403
 
 
404
    def _progress_prefix_text(self):
 
405
        a = '[%d' % self.count
 
406
        if self.num_tests is not None:
 
407
            a +='/%d' % self.num_tests
 
408
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
409
        if self.error_count:
 
410
            a += ', %d errors' % self.error_count
 
411
        if self.failure_count:
 
412
            a += ', %d failed' % self.failure_count
 
413
        if self.known_failure_count:
 
414
            a += ', %d known failures' % self.known_failure_count
 
415
        if self.skip_count:
 
416
            a += ', %d skipped' % self.skip_count
 
417
        if self.unsupported:
 
418
            a += ', %d missing features' % len(self.unsupported)
 
419
        a += ']'
 
420
        return a
 
421
 
 
422
    def report_test_start(self, test):
 
423
        self.count += 1
 
424
        self.pb.update(
 
425
                self._progress_prefix_text()
 
426
                + ' ' 
 
427
                + self._shortened_test_description(test))
 
428
 
 
429
    def _test_description(self, test):
 
430
        return self._shortened_test_description(test)
 
431
 
 
432
    def report_error(self, test, err):
 
433
        self.pb.note('ERROR: %s\n    %s\n', 
 
434
            self._test_description(test),
 
435
            err[1],
 
436
            )
 
437
 
 
438
    def report_failure(self, test, err):
 
439
        self.pb.note('FAIL: %s\n    %s\n', 
 
440
            self._test_description(test),
 
441
            err[1],
 
442
            )
 
443
 
 
444
    def report_known_failure(self, test, err):
 
445
        self.pb.note('XFAIL: %s\n%s\n',
 
446
            self._test_description(test), err[1])
 
447
 
 
448
    def report_skip(self, test, skip_excinfo):
 
449
        pass
 
450
 
 
451
    def report_not_applicable(self, test, skip_excinfo):
 
452
        pass
 
453
 
 
454
    def report_unsupported(self, test, feature):
 
455
        """test cannot be run because feature is missing."""
 
456
                  
 
457
    def report_cleaning_up(self):
 
458
        self.pb.update('cleaning up...')
 
459
 
 
460
    def finished(self):
 
461
        if not self._supplied_pb:
 
462
            self.pb.finished()
 
463
 
 
464
 
 
465
class VerboseTestResult(ExtendedTestResult):
 
466
    """Produce long output, with one line per test run plus times"""
 
467
 
 
468
    def _ellipsize_to_right(self, a_string, final_width):
 
469
        """Truncate and pad a string, keeping the right hand side"""
 
470
        if len(a_string) > final_width:
 
471
            result = '...' + a_string[3-final_width:]
 
472
        else:
 
473
            result = a_string
 
474
        return result.ljust(final_width)
 
475
 
 
476
    def report_starting(self):
 
477
        self.stream.write('running %d tests...\n' % self.num_tests)
 
478
 
 
479
    def report_test_start(self, test):
 
480
        self.count += 1
 
481
        name = self._shortened_test_description(test)
 
482
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
483
        # numbers, plus a trailing blank
 
484
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
485
        self.stream.write(self._ellipsize_to_right(name,
 
486
                          osutils.terminal_width()-30))
 
487
        self.stream.flush()
 
488
 
 
489
    def _error_summary(self, err):
 
490
        indent = ' ' * 4
 
491
        return '%s%s' % (indent, err[1])
 
492
 
 
493
    def report_error(self, test, err):
 
494
        self.stream.writeln('ERROR %s\n%s'
 
495
                % (self._testTimeString(test),
 
496
                   self._error_summary(err)))
 
497
 
 
498
    def report_failure(self, test, err):
 
499
        self.stream.writeln(' FAIL %s\n%s'
 
500
                % (self._testTimeString(test),
 
501
                   self._error_summary(err)))
 
502
 
 
503
    def report_known_failure(self, test, err):
 
504
        self.stream.writeln('XFAIL %s\n%s'
 
505
                % (self._testTimeString(test),
 
506
                   self._error_summary(err)))
 
507
 
 
508
    def report_success(self, test):
 
509
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
510
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
511
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
512
            stats.pprint(file=self.stream)
 
513
        # flush the stream so that we get smooth output. This verbose mode is
 
514
        # used to show the output in PQM.
 
515
        self.stream.flush()
 
516
 
 
517
    def report_skip(self, test, skip_excinfo):
 
518
        self.stream.writeln(' SKIP %s\n%s'
 
519
                % (self._testTimeString(test),
 
520
                   self._error_summary(skip_excinfo)))
 
521
 
 
522
    def report_not_applicable(self, test, skip_excinfo):
 
523
        self.stream.writeln('  N/A %s\n%s'
 
524
                % (self._testTimeString(test),
 
525
                   self._error_summary(skip_excinfo)))
 
526
 
 
527
    def report_unsupported(self, test, feature):
 
528
        """test cannot be run because feature is missing."""
 
529
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
530
                %(self._testTimeString(test), feature))
 
531
 
 
532
 
 
533
class TextTestRunner(object):
 
534
    stop_on_failure = False
 
535
 
 
536
    def __init__(self,
 
537
                 stream=sys.stderr,
 
538
                 descriptions=0,
 
539
                 verbosity=1,
 
540
                 bench_history=None,
 
541
                 list_only=False
 
542
                 ):
 
543
        self.stream = unittest._WritelnDecorator(stream)
 
544
        self.descriptions = descriptions
 
545
        self.verbosity = verbosity
 
546
        self._bench_history = bench_history
 
547
        self.list_only = list_only
 
548
 
 
549
    def run(self, test):
 
550
        "Run the given test case or test suite."
 
551
        startTime = time.time()
 
552
        if self.verbosity == 1:
 
553
            result_class = TextTestResult
 
554
        elif self.verbosity >= 2:
 
555
            result_class = VerboseTestResult
 
556
        result = result_class(self.stream,
 
557
                              self.descriptions,
 
558
                              self.verbosity,
 
559
                              bench_history=self._bench_history,
 
560
                              num_tests=test.countTestCases(),
 
561
                              )
 
562
        result.stop_early = self.stop_on_failure
 
563
        result.report_starting()
 
564
        if self.list_only:
 
565
            if self.verbosity >= 2:
 
566
                self.stream.writeln("Listing tests only ...\n")
 
567
            run = 0
 
568
            for t in iter_suite_tests(test):
 
569
                self.stream.writeln("%s" % (t.id()))
 
570
                run += 1
 
571
            actionTaken = "Listed"
 
572
        else: 
 
573
            test.run(result)
 
574
            run = result.testsRun
 
575
            actionTaken = "Ran"
 
576
        stopTime = time.time()
 
577
        timeTaken = stopTime - startTime
 
578
        result.printErrors()
 
579
        self.stream.writeln(result.separator2)
 
580
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
581
                            run, run != 1 and "s" or "", timeTaken))
 
582
        self.stream.writeln()
 
583
        if not result.wasSuccessful():
 
584
            self.stream.write("FAILED (")
 
585
            failed, errored = map(len, (result.failures, result.errors))
 
586
            if failed:
 
587
                self.stream.write("failures=%d" % failed)
 
588
            if errored:
 
589
                if failed: self.stream.write(", ")
 
590
                self.stream.write("errors=%d" % errored)
 
591
            if result.known_failure_count:
 
592
                if failed or errored: self.stream.write(", ")
 
593
                self.stream.write("known_failure_count=%d" %
 
594
                    result.known_failure_count)
 
595
            self.stream.writeln(")")
 
596
        else:
 
597
            if result.known_failure_count:
 
598
                self.stream.writeln("OK (known_failures=%d)" %
 
599
                    result.known_failure_count)
 
600
            else:
 
601
                self.stream.writeln("OK")
 
602
        if result.skip_count > 0:
 
603
            skipped = result.skip_count
 
604
            self.stream.writeln('%d test%s skipped' %
 
605
                                (skipped, skipped != 1 and "s" or ""))
 
606
        if result.unsupported:
 
607
            for feature, count in sorted(result.unsupported.items()):
 
608
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
609
                    (feature, count))
 
610
        result.finished()
 
611
        return result
 
612
 
 
613
 
 
614
def iter_suite_tests(suite):
 
615
    """Return all tests in a suite, recursing through nested suites"""
 
616
    for item in suite._tests:
 
617
        if isinstance(item, unittest.TestCase):
 
618
            yield item
 
619
        elif isinstance(item, unittest.TestSuite):
 
620
            for r in iter_suite_tests(item):
 
621
                yield r
 
622
        else:
 
623
            raise Exception('unknown object %r inside test suite %r'
 
624
                            % (item, suite))
 
625
 
 
626
 
 
627
class TestSkipped(Exception):
 
628
    """Indicates that a test was intentionally skipped, rather than failing."""
 
629
 
 
630
 
 
631
class TestNotApplicable(TestSkipped):
 
632
    """A test is not applicable to the situation where it was run.
 
633
 
 
634
    This is only normally raised by parameterized tests, if they find that 
 
635
    the instance they're constructed upon does not support one aspect 
 
636
    of its interface.
 
637
    """
 
638
 
 
639
 
 
640
class KnownFailure(AssertionError):
 
641
    """Indicates that a test failed in a precisely expected manner.
 
642
 
 
643
    Such failures dont block the whole test suite from passing because they are
 
644
    indicators of partially completed code or of future work. We have an
 
645
    explicit error for them so that we can ensure that they are always visible:
 
646
    KnownFailures are always shown in the output of bzr selftest.
 
647
    """
 
648
 
 
649
 
 
650
class UnavailableFeature(Exception):
 
651
    """A feature required for this test was not available.
 
652
 
 
653
    The feature should be used to construct the exception.
 
654
    """
 
655
 
 
656
 
 
657
class CommandFailed(Exception):
 
658
    pass
 
659
 
 
660
 
 
661
class StringIOWrapper(object):
 
662
    """A wrapper around cStringIO which just adds an encoding attribute.
 
663
    
 
664
    Internally we can check sys.stdout to see what the output encoding
 
665
    should be. However, cStringIO has no encoding attribute that we can
 
666
    set. So we wrap it instead.
 
667
    """
 
668
    encoding='ascii'
 
669
    _cstring = None
 
670
 
 
671
    def __init__(self, s=None):
 
672
        if s is not None:
 
673
            self.__dict__['_cstring'] = StringIO(s)
 
674
        else:
 
675
            self.__dict__['_cstring'] = StringIO()
 
676
 
 
677
    def __getattr__(self, name, getattr=getattr):
 
678
        return getattr(self.__dict__['_cstring'], name)
 
679
 
 
680
    def __setattr__(self, name, val):
 
681
        if name == 'encoding':
 
682
            self.__dict__['encoding'] = val
 
683
        else:
 
684
            return setattr(self._cstring, name, val)
 
685
 
 
686
 
 
687
class TestUIFactory(ui.CLIUIFactory):
 
688
    """A UI Factory for testing.
 
689
 
 
690
    Hide the progress bar but emit note()s.
 
691
    Redirect stdin.
 
692
    Allows get_password to be tested without real tty attached.
 
693
    """
 
694
 
 
695
    def __init__(self,
 
696
                 stdout=None,
 
697
                 stderr=None,
 
698
                 stdin=None):
 
699
        super(TestUIFactory, self).__init__()
 
700
        if stdin is not None:
 
701
            # We use a StringIOWrapper to be able to test various
 
702
            # encodings, but the user is still responsible to
 
703
            # encode the string and to set the encoding attribute
 
704
            # of StringIOWrapper.
 
705
            self.stdin = StringIOWrapper(stdin)
 
706
        if stdout is None:
 
707
            self.stdout = sys.stdout
 
708
        else:
 
709
            self.stdout = stdout
 
710
        if stderr is None:
 
711
            self.stderr = sys.stderr
 
712
        else:
 
713
            self.stderr = stderr
 
714
 
 
715
    def clear(self):
 
716
        """See progress.ProgressBar.clear()."""
 
717
 
 
718
    def clear_term(self):
 
719
        """See progress.ProgressBar.clear_term()."""
 
720
 
 
721
    def clear_term(self):
 
722
        """See progress.ProgressBar.clear_term()."""
 
723
 
 
724
    def finished(self):
 
725
        """See progress.ProgressBar.finished()."""
 
726
 
 
727
    def note(self, fmt_string, *args, **kwargs):
 
728
        """See progress.ProgressBar.note()."""
 
729
        self.stdout.write((fmt_string + "\n") % args)
 
730
 
 
731
    def progress_bar(self):
 
732
        return self
 
733
 
 
734
    def nested_progress_bar(self):
 
735
        return self
 
736
 
 
737
    def update(self, message, count=None, total=None):
 
738
        """See progress.ProgressBar.update()."""
 
739
 
 
740
    def get_non_echoed_password(self, prompt):
 
741
        """Get password from stdin without trying to handle the echo mode"""
 
742
        if prompt:
 
743
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
744
        password = self.stdin.readline()
 
745
        if not password:
 
746
            raise EOFError
 
747
        if password[-1] == '\n':
 
748
            password = password[:-1]
 
749
        return password
 
750
 
 
751
 
 
752
class TestCase(unittest.TestCase):
 
753
    """Base class for bzr unit tests.
 
754
    
 
755
    Tests that need access to disk resources should subclass 
 
756
    TestCaseInTempDir not TestCase.
 
757
 
 
758
    Error and debug log messages are redirected from their usual
 
759
    location into a temporary file, the contents of which can be
 
760
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
761
    so that it can also capture file IO.  When the test completes this file
 
762
    is read into memory and removed from disk.
 
763
       
 
764
    There are also convenience functions to invoke bzr's command-line
 
765
    routine, and to build and check bzr trees.
 
766
   
 
767
    In addition to the usual method of overriding tearDown(), this class also
 
768
    allows subclasses to register functions into the _cleanups list, which is
 
769
    run in order as the object is torn down.  It's less likely this will be
 
770
    accidentally overlooked.
 
771
    """
 
772
 
 
773
    _log_file_name = None
 
774
    _log_contents = ''
 
775
    _keep_log_file = False
 
776
    # record lsprof data when performing benchmark calls.
 
777
    _gather_lsprof_in_benchmarks = False
 
778
    attrs_to_keep = ('_testMethodName', '_testMethodDoc',
 
779
                     '_log_contents', '_log_file_name', '_benchtime',
 
780
                     '_TestCase__testMethodName')
 
781
 
 
782
    def __init__(self, methodName='testMethod'):
 
783
        super(TestCase, self).__init__(methodName)
 
784
        self._cleanups = []
 
785
 
 
786
    def setUp(self):
 
787
        unittest.TestCase.setUp(self)
 
788
        self._cleanEnvironment()
 
789
        self._silenceUI()
 
790
        self._startLogFile()
 
791
        self._benchcalls = []
 
792
        self._benchtime = None
 
793
        self._clear_hooks()
 
794
        self._clear_debug_flags()
 
795
 
 
796
    def _clear_debug_flags(self):
 
797
        """Prevent externally set debug flags affecting tests.
 
798
        
 
799
        Tests that want to use debug flags can just set them in the
 
800
        debug_flags set during setup/teardown.
 
801
        """
 
802
        if 'selftest_debug' not in debug.debug_flags:
 
803
            self._preserved_debug_flags = set(debug.debug_flags)
 
804
            debug.debug_flags.clear()
 
805
            self.addCleanup(self._restore_debug_flags)
 
806
 
 
807
    def _clear_hooks(self):
 
808
        # prevent hooks affecting tests
 
809
        import bzrlib.branch
 
810
        import bzrlib.smart.server
 
811
        self._preserved_hooks = {
 
812
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
813
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
814
            }
 
815
        self.addCleanup(self._restoreHooks)
 
816
        # reset all hooks to an empty instance of the appropriate type
 
817
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
818
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
819
 
 
820
    def _silenceUI(self):
 
821
        """Turn off UI for duration of test"""
 
822
        # by default the UI is off; tests can turn it on if they want it.
 
823
        saved = ui.ui_factory
 
824
        def _restore():
 
825
            ui.ui_factory = saved
 
826
        ui.ui_factory = ui.SilentUIFactory()
 
827
        self.addCleanup(_restore)
 
828
 
 
829
    def _ndiff_strings(self, a, b):
 
830
        """Return ndiff between two strings containing lines.
 
831
        
 
832
        A trailing newline is added if missing to make the strings
 
833
        print properly."""
 
834
        if b and b[-1] != '\n':
 
835
            b += '\n'
 
836
        if a and a[-1] != '\n':
 
837
            a += '\n'
 
838
        difflines = difflib.ndiff(a.splitlines(True),
 
839
                                  b.splitlines(True),
 
840
                                  linejunk=lambda x: False,
 
841
                                  charjunk=lambda x: False)
 
842
        return ''.join(difflines)
 
843
 
 
844
    def assertEqual(self, a, b, message=''):
 
845
        try:
 
846
            if a == b:
 
847
                return
 
848
        except UnicodeError, e:
 
849
            # If we can't compare without getting a UnicodeError, then
 
850
            # obviously they are different
 
851
            mutter('UnicodeError: %s', e)
 
852
        if message:
 
853
            message += '\n'
 
854
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
855
            % (message,
 
856
               pformat(a), pformat(b)))
 
857
 
 
858
    assertEquals = assertEqual
 
859
 
 
860
    def assertEqualDiff(self, a, b, message=None):
 
861
        """Assert two texts are equal, if not raise an exception.
 
862
        
 
863
        This is intended for use with multi-line strings where it can 
 
864
        be hard to find the differences by eye.
 
865
        """
 
866
        # TODO: perhaps override assertEquals to call this for strings?
 
867
        if a == b:
 
868
            return
 
869
        if message is None:
 
870
            message = "texts not equal:\n"
 
871
        raise AssertionError(message +
 
872
                             self._ndiff_strings(a, b))
 
873
        
 
874
    def assertEqualMode(self, mode, mode_test):
 
875
        self.assertEqual(mode, mode_test,
 
876
                         'mode mismatch %o != %o' % (mode, mode_test))
 
877
 
 
878
    def assertPositive(self, val):
 
879
        """Assert that val is greater than 0."""
 
880
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
881
 
 
882
    def assertNegative(self, val):
 
883
        """Assert that val is less than 0."""
 
884
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
885
 
 
886
    def assertStartsWith(self, s, prefix):
 
887
        if not s.startswith(prefix):
 
888
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
889
 
 
890
    def assertEndsWith(self, s, suffix):
 
891
        """Asserts that s ends with suffix."""
 
892
        if not s.endswith(suffix):
 
893
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
894
 
 
895
    def assertContainsRe(self, haystack, needle_re):
 
896
        """Assert that a contains something matching a regular expression."""
 
897
        if not re.search(needle_re, haystack):
 
898
            if '\n' in haystack or len(haystack) > 60:
 
899
                # a long string, format it in a more readable way
 
900
                raise AssertionError(
 
901
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
902
                        % (needle_re, haystack))
 
903
            else:
 
904
                raise AssertionError('pattern "%s" not found in "%s"'
 
905
                        % (needle_re, haystack))
 
906
 
 
907
    def assertNotContainsRe(self, haystack, needle_re):
 
908
        """Assert that a does not match a regular expression"""
 
909
        if re.search(needle_re, haystack):
 
910
            raise AssertionError('pattern "%s" found in "%s"'
 
911
                    % (needle_re, haystack))
 
912
 
 
913
    def assertSubset(self, sublist, superlist):
 
914
        """Assert that every entry in sublist is present in superlist."""
 
915
        missing = set(sublist) - set(superlist)
 
916
        if len(missing) > 0:
 
917
            raise AssertionError("value(s) %r not present in container %r" %
 
918
                                 (missing, superlist))
 
919
 
 
920
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
921
        """Fail unless excClass is raised when the iterator from func is used.
 
922
        
 
923
        Many functions can return generators this makes sure
 
924
        to wrap them in a list() call to make sure the whole generator
 
925
        is run, and that the proper exception is raised.
 
926
        """
 
927
        try:
 
928
            list(func(*args, **kwargs))
 
929
        except excClass:
 
930
            return
 
931
        else:
 
932
            if getattr(excClass,'__name__', None) is not None:
 
933
                excName = excClass.__name__
 
934
            else:
 
935
                excName = str(excClass)
 
936
            raise self.failureException, "%s not raised" % excName
 
937
 
 
938
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
939
        """Assert that a callable raises a particular exception.
 
940
 
 
941
        :param excClass: As for the except statement, this may be either an
 
942
            exception class, or a tuple of classes.
 
943
        :param callableObj: A callable, will be passed ``*args`` and
 
944
            ``**kwargs``.
 
945
 
 
946
        Returns the exception so that you can examine it.
 
947
        """
 
948
        try:
 
949
            callableObj(*args, **kwargs)
 
950
        except excClass, e:
 
951
            return e
 
952
        else:
 
953
            if getattr(excClass,'__name__', None) is not None:
 
954
                excName = excClass.__name__
 
955
            else:
 
956
                # probably a tuple
 
957
                excName = str(excClass)
 
958
            raise self.failureException, "%s not raised" % excName
 
959
 
 
960
    def assertIs(self, left, right, message=None):
 
961
        if not (left is right):
 
962
            if message is not None:
 
963
                raise AssertionError(message)
 
964
            else:
 
965
                raise AssertionError("%r is not %r." % (left, right))
 
966
 
 
967
    def assertIsNot(self, left, right, message=None):
 
968
        if (left is right):
 
969
            if message is not None:
 
970
                raise AssertionError(message)
 
971
            else:
 
972
                raise AssertionError("%r is %r." % (left, right))
 
973
 
 
974
    def assertTransportMode(self, transport, path, mode):
 
975
        """Fail if a path does not have mode mode.
 
976
        
 
977
        If modes are not supported on this transport, the assertion is ignored.
 
978
        """
 
979
        if not transport._can_roundtrip_unix_modebits():
 
980
            return
 
981
        path_stat = transport.stat(path)
 
982
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
983
        self.assertEqual(mode, actual_mode,
 
984
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
985
 
 
986
    def assertIsSameRealPath(self, path1, path2):
 
987
        """Fail if path1 and path2 points to different files"""
 
988
        self.assertEqual(osutils.realpath(path1),
 
989
                         osutils.realpath(path2),
 
990
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
991
 
 
992
    def assertIsInstance(self, obj, kls):
 
993
        """Fail if obj is not an instance of kls"""
 
994
        if not isinstance(obj, kls):
 
995
            self.fail("%r is an instance of %s rather than %s" % (
 
996
                obj, obj.__class__, kls))
 
997
 
 
998
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
999
        """Invoke a test, expecting it to fail for the given reason.
 
1000
 
 
1001
        This is for assertions that ought to succeed, but currently fail.
 
1002
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1003
        about the failure you're expecting.  If a new bug is introduced,
 
1004
        AssertionError should be raised, not KnownFailure.
 
1005
 
 
1006
        Frequently, expectFailure should be followed by an opposite assertion.
 
1007
        See example below.
 
1008
 
 
1009
        Intended to be used with a callable that raises AssertionError as the
 
1010
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1011
 
 
1012
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1013
        test succeeds.
 
1014
 
 
1015
        example usage::
 
1016
 
 
1017
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1018
                             dynamic_val)
 
1019
          self.assertEqual(42, dynamic_val)
 
1020
 
 
1021
          This means that a dynamic_val of 54 will cause the test to raise
 
1022
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1023
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1024
          than 54 or 42 will cause an AssertionError.
 
1025
        """
 
1026
        try:
 
1027
            assertion(*args, **kwargs)
 
1028
        except AssertionError:
 
1029
            raise KnownFailure(reason)
 
1030
        else:
 
1031
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1032
 
 
1033
    def assertFileEqual(self, content, path):
 
1034
        """Fail if path does not contain 'content'."""
 
1035
        self.failUnlessExists(path)
 
1036
        f = file(path, 'rb')
 
1037
        try:
 
1038
            s = f.read()
 
1039
        finally:
 
1040
            f.close()
 
1041
        self.assertEqualDiff(content, s)
 
1042
 
 
1043
    def failUnlessExists(self, path):
 
1044
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1045
        if not isinstance(path, basestring):
 
1046
            for p in path:
 
1047
                self.failUnlessExists(p)
 
1048
        else:
 
1049
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1050
 
 
1051
    def failIfExists(self, path):
 
1052
        """Fail if path or paths, which may be abs or relative, exist."""
 
1053
        if not isinstance(path, basestring):
 
1054
            for p in path:
 
1055
                self.failIfExists(p)
 
1056
        else:
 
1057
            self.failIf(osutils.lexists(path),path+" exists")
 
1058
 
 
1059
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1060
        """A helper for callDeprecated and applyDeprecated.
 
1061
 
 
1062
        :param a_callable: A callable to call.
 
1063
        :param args: The positional arguments for the callable
 
1064
        :param kwargs: The keyword arguments for the callable
 
1065
        :return: A tuple (warnings, result). result is the result of calling
 
1066
            a_callable(``*args``, ``**kwargs``).
 
1067
        """
 
1068
        local_warnings = []
 
1069
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1070
            # we've hooked into a deprecation specific callpath,
 
1071
            # only deprecations should getting sent via it.
 
1072
            self.assertEqual(cls, DeprecationWarning)
 
1073
            local_warnings.append(msg)
 
1074
        original_warning_method = symbol_versioning.warn
 
1075
        symbol_versioning.set_warning_method(capture_warnings)
 
1076
        try:
 
1077
            result = a_callable(*args, **kwargs)
 
1078
        finally:
 
1079
            symbol_versioning.set_warning_method(original_warning_method)
 
1080
        return (local_warnings, result)
 
1081
 
 
1082
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1083
        """Call a deprecated callable without warning the user.
 
1084
 
 
1085
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1086
        not other callers that go direct to the warning module.
 
1087
 
 
1088
        To test that a deprecated method raises an error, do something like
 
1089
        this::
 
1090
 
 
1091
        self.assertRaises(errors.ReservedId,
 
1092
            self.applyDeprecated, zero_ninetyone,
 
1093
                br.append_revision, 'current:')
 
1094
 
 
1095
        :param deprecation_format: The deprecation format that the callable
 
1096
            should have been deprecated with. This is the same type as the
 
1097
            parameter to deprecated_method/deprecated_function. If the
 
1098
            callable is not deprecated with this format, an assertion error
 
1099
            will be raised.
 
1100
        :param a_callable: A callable to call. This may be a bound method or
 
1101
            a regular function. It will be called with ``*args`` and
 
1102
            ``**kwargs``.
 
1103
        :param args: The positional arguments for the callable
 
1104
        :param kwargs: The keyword arguments for the callable
 
1105
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1106
        """
 
1107
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1108
            *args, **kwargs)
 
1109
        expected_first_warning = symbol_versioning.deprecation_string(
 
1110
            a_callable, deprecation_format)
 
1111
        if len(call_warnings) == 0:
 
1112
            self.fail("No deprecation warning generated by call to %s" %
 
1113
                a_callable)
 
1114
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1115
        return result
 
1116
 
 
1117
    def callCatchWarnings(self, fn, *args, **kw):
 
1118
        """Call a callable that raises python warnings.
 
1119
 
 
1120
        The caller's responsible for examining the returned warnings.
 
1121
 
 
1122
        If the callable raises an exception, the exception is not
 
1123
        caught and propagates up to the caller.  In that case, the list
 
1124
        of warnings is not available.
 
1125
 
 
1126
        :returns: ([warning_object, ...], fn_result)
 
1127
        """
 
1128
        # XXX: This is not perfect, because it completely overrides the
 
1129
        # warnings filters, and some code may depend on suppressing particular
 
1130
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1131
        # though.  -- Andrew, 20071062
 
1132
        wlist = []
 
1133
        def _catcher(message, category, filename, lineno, file=None):
 
1134
            # despite the name, 'message' is normally(?) a Warning subclass
 
1135
            # instance
 
1136
            wlist.append(message)
 
1137
        saved_showwarning = warnings.showwarning
 
1138
        saved_filters = warnings.filters
 
1139
        try:
 
1140
            warnings.showwarning = _catcher
 
1141
            warnings.filters = []
 
1142
            result = fn(*args, **kw)
 
1143
        finally:
 
1144
            warnings.showwarning = saved_showwarning
 
1145
            warnings.filters = saved_filters
 
1146
        return wlist, result
 
1147
 
 
1148
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1149
        """Assert that a callable is deprecated in a particular way.
 
1150
 
 
1151
        This is a very precise test for unusual requirements. The 
 
1152
        applyDeprecated helper function is probably more suited for most tests
 
1153
        as it allows you to simply specify the deprecation format being used
 
1154
        and will ensure that that is issued for the function being called.
 
1155
 
 
1156
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1157
        not other callers that go direct to the warning module.  To catch
 
1158
        general warnings, use callCatchWarnings.
 
1159
 
 
1160
        :param expected: a list of the deprecation warnings expected, in order
 
1161
        :param callable: The callable to call
 
1162
        :param args: The positional arguments for the callable
 
1163
        :param kwargs: The keyword arguments for the callable
 
1164
        """
 
1165
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1166
            *args, **kwargs)
 
1167
        self.assertEqual(expected, call_warnings)
 
1168
        return result
 
1169
 
 
1170
    def _startLogFile(self):
 
1171
        """Send bzr and test log messages to a temporary file.
 
1172
 
 
1173
        The file is removed as the test is torn down.
 
1174
        """
 
1175
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1176
        self._log_file = os.fdopen(fileno, 'w+')
 
1177
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1178
        self._log_file_name = name
 
1179
        self.addCleanup(self._finishLogFile)
 
1180
 
 
1181
    def _finishLogFile(self):
 
1182
        """Finished with the log file.
 
1183
 
 
1184
        Close the file and delete it, unless setKeepLogfile was called.
 
1185
        """
 
1186
        if self._log_file is None:
 
1187
            return
 
1188
        bzrlib.trace.pop_log_file(self._log_memento)
 
1189
        self._log_file.close()
 
1190
        self._log_file = None
 
1191
        if not self._keep_log_file:
 
1192
            os.remove(self._log_file_name)
 
1193
            self._log_file_name = None
 
1194
 
 
1195
    def setKeepLogfile(self):
 
1196
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1197
        self._keep_log_file = True
 
1198
 
 
1199
    def addCleanup(self, callable):
 
1200
        """Arrange to run a callable when this case is torn down.
 
1201
 
 
1202
        Callables are run in the reverse of the order they are registered, 
 
1203
        ie last-in first-out.
 
1204
        """
 
1205
        if callable in self._cleanups:
 
1206
            raise ValueError("cleanup function %r already registered on %s" 
 
1207
                    % (callable, self))
 
1208
        self._cleanups.append(callable)
 
1209
 
 
1210
    def _cleanEnvironment(self):
 
1211
        new_env = {
 
1212
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1213
            'HOME': os.getcwd(),
 
1214
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1215
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1216
            'BZR_EMAIL': None,
 
1217
            'BZREMAIL': None, # may still be present in the environment
 
1218
            'EMAIL': None,
 
1219
            'BZR_PROGRESS_BAR': None,
 
1220
            'BZR_LOG': None,
 
1221
            # SSH Agent
 
1222
            'SSH_AUTH_SOCK': None,
 
1223
            # Proxies
 
1224
            'http_proxy': None,
 
1225
            'HTTP_PROXY': None,
 
1226
            'https_proxy': None,
 
1227
            'HTTPS_PROXY': None,
 
1228
            'no_proxy': None,
 
1229
            'NO_PROXY': None,
 
1230
            'all_proxy': None,
 
1231
            'ALL_PROXY': None,
 
1232
            # Nobody cares about these ones AFAIK. So far at
 
1233
            # least. If you do (care), please update this comment
 
1234
            # -- vila 20061212
 
1235
            'ftp_proxy': None,
 
1236
            'FTP_PROXY': None,
 
1237
            'BZR_REMOTE_PATH': None,
 
1238
        }
 
1239
        self.__old_env = {}
 
1240
        self.addCleanup(self._restoreEnvironment)
 
1241
        for name, value in new_env.iteritems():
 
1242
            self._captureVar(name, value)
 
1243
 
 
1244
    def _captureVar(self, name, newvalue):
 
1245
        """Set an environment variable, and reset it when finished."""
 
1246
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1247
 
 
1248
    def _restore_debug_flags(self):
 
1249
        debug.debug_flags.clear()
 
1250
        debug.debug_flags.update(self._preserved_debug_flags)
 
1251
 
 
1252
    def _restoreEnvironment(self):
 
1253
        for name, value in self.__old_env.iteritems():
 
1254
            osutils.set_or_unset_env(name, value)
 
1255
 
 
1256
    def _restoreHooks(self):
 
1257
        for klass, hooks in self._preserved_hooks.items():
 
1258
            setattr(klass, 'hooks', hooks)
 
1259
 
 
1260
    def knownFailure(self, reason):
 
1261
        """This test has failed for some known reason."""
 
1262
        raise KnownFailure(reason)
 
1263
 
 
1264
    def run(self, result=None):
 
1265
        if result is None: result = self.defaultTestResult()
 
1266
        for feature in getattr(self, '_test_needs_features', []):
 
1267
            if not feature.available():
 
1268
                result.startTest(self)
 
1269
                if getattr(result, 'addNotSupported', None):
 
1270
                    result.addNotSupported(self, feature)
 
1271
                else:
 
1272
                    result.addSuccess(self)
 
1273
                result.stopTest(self)
 
1274
                return
 
1275
        try:
 
1276
            return unittest.TestCase.run(self, result)
 
1277
        finally:
 
1278
            saved_attrs = {}
 
1279
            absent_attr = object()
 
1280
            for attr_name in self.attrs_to_keep:
 
1281
                attr = getattr(self, attr_name, absent_attr)
 
1282
                if attr is not absent_attr:
 
1283
                    saved_attrs[attr_name] = attr
 
1284
            self.__dict__ = saved_attrs
 
1285
 
 
1286
    def tearDown(self):
 
1287
        self._runCleanups()
 
1288
        unittest.TestCase.tearDown(self)
 
1289
 
 
1290
    def time(self, callable, *args, **kwargs):
 
1291
        """Run callable and accrue the time it takes to the benchmark time.
 
1292
        
 
1293
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1294
        this will cause lsprofile statistics to be gathered and stored in
 
1295
        self._benchcalls.
 
1296
        """
 
1297
        if self._benchtime is None:
 
1298
            self._benchtime = 0
 
1299
        start = time.time()
 
1300
        try:
 
1301
            if not self._gather_lsprof_in_benchmarks:
 
1302
                return callable(*args, **kwargs)
 
1303
            else:
 
1304
                # record this benchmark
 
1305
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1306
                stats.sort()
 
1307
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1308
                return ret
 
1309
        finally:
 
1310
            self._benchtime += time.time() - start
 
1311
 
 
1312
    def _runCleanups(self):
 
1313
        """Run registered cleanup functions. 
 
1314
 
 
1315
        This should only be called from TestCase.tearDown.
 
1316
        """
 
1317
        # TODO: Perhaps this should keep running cleanups even if 
 
1318
        # one of them fails?
 
1319
 
 
1320
        # Actually pop the cleanups from the list so tearDown running
 
1321
        # twice is safe (this happens for skipped tests).
 
1322
        while self._cleanups:
 
1323
            self._cleanups.pop()()
 
1324
 
 
1325
    def log(self, *args):
 
1326
        mutter(*args)
 
1327
 
 
1328
    def _get_log(self, keep_log_file=False):
 
1329
        """Get the log from bzrlib.trace calls from this test.
 
1330
 
 
1331
        :param keep_log_file: When True, if the log is still a file on disk
 
1332
            leave it as a file on disk. When False, if the log is still a file
 
1333
            on disk, the log file is deleted and the log preserved as
 
1334
            self._log_contents.
 
1335
        :return: A string containing the log.
 
1336
        """
 
1337
        # flush the log file, to get all content
 
1338
        import bzrlib.trace
 
1339
        bzrlib.trace._trace_file.flush()
 
1340
        if self._log_contents:
 
1341
            # XXX: this can hardly contain the content flushed above --vila
 
1342
            # 20080128
 
1343
            return self._log_contents
 
1344
        if self._log_file_name is not None:
 
1345
            logfile = open(self._log_file_name)
 
1346
            try:
 
1347
                log_contents = logfile.read()
 
1348
            finally:
 
1349
                logfile.close()
 
1350
            if not keep_log_file:
 
1351
                self._log_contents = log_contents
 
1352
                try:
 
1353
                    os.remove(self._log_file_name)
 
1354
                except OSError, e:
 
1355
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1356
                        sys.stderr.write(('Unable to delete log file '
 
1357
                                             ' %r\n' % self._log_file_name))
 
1358
                    else:
 
1359
                        raise
 
1360
            return log_contents
 
1361
        else:
 
1362
            return "DELETED log file to reduce memory footprint"
 
1363
 
 
1364
    def requireFeature(self, feature):
 
1365
        """This test requires a specific feature is available.
 
1366
 
 
1367
        :raises UnavailableFeature: When feature is not available.
 
1368
        """
 
1369
        if not feature.available():
 
1370
            raise UnavailableFeature(feature)
 
1371
 
 
1372
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1373
            working_dir):
 
1374
        """Run bazaar command line, splitting up a string command line."""
 
1375
        if isinstance(args, basestring):
 
1376
            # shlex don't understand unicode strings,
 
1377
            # so args should be plain string (bialix 20070906)
 
1378
            args = list(shlex.split(str(args)))
 
1379
        return self._run_bzr_core(args, retcode=retcode,
 
1380
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1381
                )
 
1382
 
 
1383
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1384
            working_dir):
 
1385
        if encoding is None:
 
1386
            encoding = bzrlib.user_encoding
 
1387
        stdout = StringIOWrapper()
 
1388
        stderr = StringIOWrapper()
 
1389
        stdout.encoding = encoding
 
1390
        stderr.encoding = encoding
 
1391
 
 
1392
        self.log('run bzr: %r', args)
 
1393
        # FIXME: don't call into logging here
 
1394
        handler = logging.StreamHandler(stderr)
 
1395
        handler.setLevel(logging.INFO)
 
1396
        logger = logging.getLogger('')
 
1397
        logger.addHandler(handler)
 
1398
        old_ui_factory = ui.ui_factory
 
1399
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1400
 
 
1401
        cwd = None
 
1402
        if working_dir is not None:
 
1403
            cwd = osutils.getcwd()
 
1404
            os.chdir(working_dir)
 
1405
 
 
1406
        try:
 
1407
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1408
                stdout, stderr,
 
1409
                bzrlib.commands.run_bzr_catch_user_errors,
 
1410
                args)
 
1411
        finally:
 
1412
            logger.removeHandler(handler)
 
1413
            ui.ui_factory = old_ui_factory
 
1414
            if cwd is not None:
 
1415
                os.chdir(cwd)
 
1416
 
 
1417
        out = stdout.getvalue()
 
1418
        err = stderr.getvalue()
 
1419
        if out:
 
1420
            self.log('output:\n%r', out)
 
1421
        if err:
 
1422
            self.log('errors:\n%r', err)
 
1423
        if retcode is not None:
 
1424
            self.assertEquals(retcode, result,
 
1425
                              message='Unexpected return code')
 
1426
        return out, err
 
1427
 
 
1428
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1429
                working_dir=None, error_regexes=[], output_encoding=None):
 
1430
        """Invoke bzr, as if it were run from the command line.
 
1431
 
 
1432
        The argument list should not include the bzr program name - the
 
1433
        first argument is normally the bzr command.  Arguments may be
 
1434
        passed in three ways:
 
1435
 
 
1436
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1437
        when the command contains whitespace or metacharacters, or 
 
1438
        is built up at run time.
 
1439
 
 
1440
        2- A single string, eg "add a".  This is the most convenient 
 
1441
        for hardcoded commands.
 
1442
 
 
1443
        This runs bzr through the interface that catches and reports
 
1444
        errors, and with logging set to something approximating the
 
1445
        default, so that error reporting can be checked.
 
1446
 
 
1447
        This should be the main method for tests that want to exercise the
 
1448
        overall behavior of the bzr application (rather than a unit test
 
1449
        or a functional test of the library.)
 
1450
 
 
1451
        This sends the stdout/stderr results into the test's log,
 
1452
        where it may be useful for debugging.  See also run_captured.
 
1453
 
 
1454
        :keyword stdin: A string to be used as stdin for the command.
 
1455
        :keyword retcode: The status code the command should return;
 
1456
            default 0.
 
1457
        :keyword working_dir: The directory to run the command in
 
1458
        :keyword error_regexes: A list of expected error messages.  If
 
1459
            specified they must be seen in the error output of the command.
 
1460
        """
 
1461
        out, err = self._run_bzr_autosplit(
 
1462
            args=args,
 
1463
            retcode=retcode,
 
1464
            encoding=encoding,
 
1465
            stdin=stdin,
 
1466
            working_dir=working_dir,
 
1467
            )
 
1468
        for regex in error_regexes:
 
1469
            self.assertContainsRe(err, regex)
 
1470
        return out, err
 
1471
 
 
1472
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1473
        """Run bzr, and check that stderr contains the supplied regexes
 
1474
 
 
1475
        :param error_regexes: Sequence of regular expressions which
 
1476
            must each be found in the error output. The relative ordering
 
1477
            is not enforced.
 
1478
        :param args: command-line arguments for bzr
 
1479
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1480
            This function changes the default value of retcode to be 3,
 
1481
            since in most cases this is run when you expect bzr to fail.
 
1482
 
 
1483
        :return: (out, err) The actual output of running the command (in case
 
1484
            you want to do more inspection)
 
1485
 
 
1486
        Examples of use::
 
1487
 
 
1488
            # Make sure that commit is failing because there is nothing to do
 
1489
            self.run_bzr_error(['no changes to commit'],
 
1490
                               ['commit', '-m', 'my commit comment'])
 
1491
            # Make sure --strict is handling an unknown file, rather than
 
1492
            # giving us the 'nothing to do' error
 
1493
            self.build_tree(['unknown'])
 
1494
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1495
                               ['commit', --strict', '-m', 'my commit comment'])
 
1496
        """
 
1497
        kwargs.setdefault('retcode', 3)
 
1498
        kwargs['error_regexes'] = error_regexes
 
1499
        out, err = self.run_bzr(*args, **kwargs)
 
1500
        return out, err
 
1501
 
 
1502
    def run_bzr_subprocess(self, *args, **kwargs):
 
1503
        """Run bzr in a subprocess for testing.
 
1504
 
 
1505
        This starts a new Python interpreter and runs bzr in there. 
 
1506
        This should only be used for tests that have a justifiable need for
 
1507
        this isolation: e.g. they are testing startup time, or signal
 
1508
        handling, or early startup code, etc.  Subprocess code can't be 
 
1509
        profiled or debugged so easily.
 
1510
 
 
1511
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1512
            None is supplied, the status code is not checked.
 
1513
        :keyword env_changes: A dictionary which lists changes to environment
 
1514
            variables. A value of None will unset the env variable.
 
1515
            The values must be strings. The change will only occur in the
 
1516
            child, so you don't need to fix the environment after running.
 
1517
        :keyword universal_newlines: Convert CRLF => LF
 
1518
        :keyword allow_plugins: By default the subprocess is run with
 
1519
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1520
            for system-wide plugins to create unexpected output on stderr,
 
1521
            which can cause unnecessary test failures.
 
1522
        """
 
1523
        env_changes = kwargs.get('env_changes', {})
 
1524
        working_dir = kwargs.get('working_dir', None)
 
1525
        allow_plugins = kwargs.get('allow_plugins', False)
 
1526
        if len(args) == 1:
 
1527
            if isinstance(args[0], list):
 
1528
                args = args[0]
 
1529
            elif isinstance(args[0], basestring):
 
1530
                args = list(shlex.split(args[0]))
 
1531
        else:
 
1532
            symbol_versioning.warn(zero_ninetyone %
 
1533
                                   "passing varargs to run_bzr_subprocess",
 
1534
                                   DeprecationWarning, stacklevel=3)
 
1535
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1536
                                            working_dir=working_dir,
 
1537
                                            allow_plugins=allow_plugins)
 
1538
        # We distinguish between retcode=None and retcode not passed.
 
1539
        supplied_retcode = kwargs.get('retcode', 0)
 
1540
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1541
            universal_newlines=kwargs.get('universal_newlines', False),
 
1542
            process_args=args)
 
1543
 
 
1544
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1545
                             skip_if_plan_to_signal=False,
 
1546
                             working_dir=None,
 
1547
                             allow_plugins=False):
 
1548
        """Start bzr in a subprocess for testing.
 
1549
 
 
1550
        This starts a new Python interpreter and runs bzr in there.
 
1551
        This should only be used for tests that have a justifiable need for
 
1552
        this isolation: e.g. they are testing startup time, or signal
 
1553
        handling, or early startup code, etc.  Subprocess code can't be
 
1554
        profiled or debugged so easily.
 
1555
 
 
1556
        :param process_args: a list of arguments to pass to the bzr executable,
 
1557
            for example ``['--version']``.
 
1558
        :param env_changes: A dictionary which lists changes to environment
 
1559
            variables. A value of None will unset the env variable.
 
1560
            The values must be strings. The change will only occur in the
 
1561
            child, so you don't need to fix the environment after running.
 
1562
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1563
            is not available.
 
1564
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1565
 
 
1566
        :returns: Popen object for the started process.
 
1567
        """
 
1568
        if skip_if_plan_to_signal:
 
1569
            if not getattr(os, 'kill', None):
 
1570
                raise TestSkipped("os.kill not available.")
 
1571
 
 
1572
        if env_changes is None:
 
1573
            env_changes = {}
 
1574
        old_env = {}
 
1575
 
 
1576
        def cleanup_environment():
 
1577
            for env_var, value in env_changes.iteritems():
 
1578
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1579
 
 
1580
        def restore_environment():
 
1581
            for env_var, value in old_env.iteritems():
 
1582
                osutils.set_or_unset_env(env_var, value)
 
1583
 
 
1584
        bzr_path = self.get_bzr_path()
 
1585
 
 
1586
        cwd = None
 
1587
        if working_dir is not None:
 
1588
            cwd = osutils.getcwd()
 
1589
            os.chdir(working_dir)
 
1590
 
 
1591
        try:
 
1592
            # win32 subprocess doesn't support preexec_fn
 
1593
            # so we will avoid using it on all platforms, just to
 
1594
            # make sure the code path is used, and we don't break on win32
 
1595
            cleanup_environment()
 
1596
            command = [sys.executable, bzr_path]
 
1597
            if not allow_plugins:
 
1598
                command.append('--no-plugins')
 
1599
            command.extend(process_args)
 
1600
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1601
        finally:
 
1602
            restore_environment()
 
1603
            if cwd is not None:
 
1604
                os.chdir(cwd)
 
1605
 
 
1606
        return process
 
1607
 
 
1608
    def _popen(self, *args, **kwargs):
 
1609
        """Place a call to Popen.
 
1610
 
 
1611
        Allows tests to override this method to intercept the calls made to
 
1612
        Popen for introspection.
 
1613
        """
 
1614
        return Popen(*args, **kwargs)
 
1615
 
 
1616
    def get_bzr_path(self):
 
1617
        """Return the path of the 'bzr' executable for this test suite."""
 
1618
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1619
        if not os.path.isfile(bzr_path):
 
1620
            # We are probably installed. Assume sys.argv is the right file
 
1621
            bzr_path = sys.argv[0]
 
1622
        return bzr_path
 
1623
 
 
1624
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1625
                              universal_newlines=False, process_args=None):
 
1626
        """Finish the execution of process.
 
1627
 
 
1628
        :param process: the Popen object returned from start_bzr_subprocess.
 
1629
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1630
            None is supplied, the status code is not checked.
 
1631
        :param send_signal: an optional signal to send to the process.
 
1632
        :param universal_newlines: Convert CRLF => LF
 
1633
        :returns: (stdout, stderr)
 
1634
        """
 
1635
        if send_signal is not None:
 
1636
            os.kill(process.pid, send_signal)
 
1637
        out, err = process.communicate()
 
1638
 
 
1639
        if universal_newlines:
 
1640
            out = out.replace('\r\n', '\n')
 
1641
            err = err.replace('\r\n', '\n')
 
1642
 
 
1643
        if retcode is not None and retcode != process.returncode:
 
1644
            if process_args is None:
 
1645
                process_args = "(unknown args)"
 
1646
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1647
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1648
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1649
                      % (process_args, retcode, process.returncode))
 
1650
        return [out, err]
 
1651
 
 
1652
    def check_inventory_shape(self, inv, shape):
 
1653
        """Compare an inventory to a list of expected names.
 
1654
 
 
1655
        Fail if they are not precisely equal.
 
1656
        """
 
1657
        extras = []
 
1658
        shape = list(shape)             # copy
 
1659
        for path, ie in inv.entries():
 
1660
            name = path.replace('\\', '/')
 
1661
            if ie.kind == 'directory':
 
1662
                name = name + '/'
 
1663
            if name in shape:
 
1664
                shape.remove(name)
 
1665
            else:
 
1666
                extras.append(name)
 
1667
        if shape:
 
1668
            self.fail("expected paths not found in inventory: %r" % shape)
 
1669
        if extras:
 
1670
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1671
 
 
1672
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1673
                         a_callable=None, *args, **kwargs):
 
1674
        """Call callable with redirected std io pipes.
 
1675
 
 
1676
        Returns the return code."""
 
1677
        if not callable(a_callable):
 
1678
            raise ValueError("a_callable must be callable.")
 
1679
        if stdin is None:
 
1680
            stdin = StringIO("")
 
1681
        if stdout is None:
 
1682
            if getattr(self, "_log_file", None) is not None:
 
1683
                stdout = self._log_file
 
1684
            else:
 
1685
                stdout = StringIO()
 
1686
        if stderr is None:
 
1687
            if getattr(self, "_log_file", None is not None):
 
1688
                stderr = self._log_file
 
1689
            else:
 
1690
                stderr = StringIO()
 
1691
        real_stdin = sys.stdin
 
1692
        real_stdout = sys.stdout
 
1693
        real_stderr = sys.stderr
 
1694
        try:
 
1695
            sys.stdout = stdout
 
1696
            sys.stderr = stderr
 
1697
            sys.stdin = stdin
 
1698
            return a_callable(*args, **kwargs)
 
1699
        finally:
 
1700
            sys.stdout = real_stdout
 
1701
            sys.stderr = real_stderr
 
1702
            sys.stdin = real_stdin
 
1703
 
 
1704
    def reduceLockdirTimeout(self):
 
1705
        """Reduce the default lock timeout for the duration of the test, so that
 
1706
        if LockContention occurs during a test, it does so quickly.
 
1707
 
 
1708
        Tests that expect to provoke LockContention errors should call this.
 
1709
        """
 
1710
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1711
        def resetTimeout():
 
1712
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1713
        self.addCleanup(resetTimeout)
 
1714
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1715
 
 
1716
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1717
        """Return a StringIOWrapper instance, that will encode Unicode
 
1718
        input to UTF-8.
 
1719
        """
 
1720
        if encoding_type is None:
 
1721
            encoding_type = 'strict'
 
1722
        sio = StringIO()
 
1723
        output_encoding = 'utf-8'
 
1724
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1725
        sio.encoding = output_encoding
 
1726
        return sio
 
1727
 
 
1728
 
 
1729
class TestCaseWithMemoryTransport(TestCase):
 
1730
    """Common test class for tests that do not need disk resources.
 
1731
 
 
1732
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1733
 
 
1734
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1735
 
 
1736
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1737
    a directory which does not exist. This serves to help ensure test isolation
 
1738
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1739
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1740
    file defaults for the transport in tests, nor does it obey the command line
 
1741
    override, so tests that accidentally write to the common directory should
 
1742
    be rare.
 
1743
 
 
1744
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1745
    a .bzr directory that stops us ascending higher into the filesystem.
 
1746
    """
 
1747
 
 
1748
    TEST_ROOT = None
 
1749
    _TEST_NAME = 'test'
 
1750
 
 
1751
    def __init__(self, methodName='runTest'):
 
1752
        # allow test parameterization after test construction and before test
 
1753
        # execution. Variables that the parameterizer sets need to be 
 
1754
        # ones that are not set by setUp, or setUp will trash them.
 
1755
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1756
        self.vfs_transport_factory = default_transport
 
1757
        self.transport_server = None
 
1758
        self.transport_readonly_server = None
 
1759
        self.__vfs_server = None
 
1760
 
 
1761
    def get_transport(self, relpath=None):
 
1762
        """Return a writeable transport.
 
1763
 
 
1764
        This transport is for the test scratch space relative to
 
1765
        "self._test_root"
 
1766
        
 
1767
        :param relpath: a path relative to the base url.
 
1768
        """
 
1769
        t = get_transport(self.get_url(relpath))
 
1770
        self.assertFalse(t.is_readonly())
 
1771
        return t
 
1772
 
 
1773
    def get_readonly_transport(self, relpath=None):
 
1774
        """Return a readonly transport for the test scratch space
 
1775
        
 
1776
        This can be used to test that operations which should only need
 
1777
        readonly access in fact do not try to write.
 
1778
 
 
1779
        :param relpath: a path relative to the base url.
 
1780
        """
 
1781
        t = get_transport(self.get_readonly_url(relpath))
 
1782
        self.assertTrue(t.is_readonly())
 
1783
        return t
 
1784
 
 
1785
    def create_transport_readonly_server(self):
 
1786
        """Create a transport server from class defined at init.
 
1787
 
 
1788
        This is mostly a hook for daughter classes.
 
1789
        """
 
1790
        return self.transport_readonly_server()
 
1791
 
 
1792
    def get_readonly_server(self):
 
1793
        """Get the server instance for the readonly transport
 
1794
 
 
1795
        This is useful for some tests with specific servers to do diagnostics.
 
1796
        """
 
1797
        if self.__readonly_server is None:
 
1798
            if self.transport_readonly_server is None:
 
1799
                # readonly decorator requested
 
1800
                # bring up the server
 
1801
                self.__readonly_server = ReadonlyServer()
 
1802
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1803
            else:
 
1804
                self.__readonly_server = self.create_transport_readonly_server()
 
1805
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1806
            self.addCleanup(self.__readonly_server.tearDown)
 
1807
        return self.__readonly_server
 
1808
 
 
1809
    def get_readonly_url(self, relpath=None):
 
1810
        """Get a URL for the readonly transport.
 
1811
 
 
1812
        This will either be backed by '.' or a decorator to the transport 
 
1813
        used by self.get_url()
 
1814
        relpath provides for clients to get a path relative to the base url.
 
1815
        These should only be downwards relative, not upwards.
 
1816
        """
 
1817
        base = self.get_readonly_server().get_url()
 
1818
        return self._adjust_url(base, relpath)
 
1819
 
 
1820
    def get_vfs_only_server(self):
 
1821
        """Get the vfs only read/write server instance.
 
1822
 
 
1823
        This is useful for some tests with specific servers that need
 
1824
        diagnostics.
 
1825
 
 
1826
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1827
        is no means to override it.
 
1828
        """
 
1829
        if self.__vfs_server is None:
 
1830
            self.__vfs_server = MemoryServer()
 
1831
            self.__vfs_server.setUp()
 
1832
            self.addCleanup(self.__vfs_server.tearDown)
 
1833
        return self.__vfs_server
 
1834
 
 
1835
    def get_server(self):
 
1836
        """Get the read/write server instance.
 
1837
 
 
1838
        This is useful for some tests with specific servers that need
 
1839
        diagnostics.
 
1840
 
 
1841
        This is built from the self.transport_server factory. If that is None,
 
1842
        then the self.get_vfs_server is returned.
 
1843
        """
 
1844
        if self.__server is None:
 
1845
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1846
                return self.get_vfs_only_server()
 
1847
            else:
 
1848
                # bring up a decorated means of access to the vfs only server.
 
1849
                self.__server = self.transport_server()
 
1850
                try:
 
1851
                    self.__server.setUp(self.get_vfs_only_server())
 
1852
                except TypeError, e:
 
1853
                    # This should never happen; the try:Except here is to assist
 
1854
                    # developers having to update code rather than seeing an
 
1855
                    # uninformative TypeError.
 
1856
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1857
            self.addCleanup(self.__server.tearDown)
 
1858
        return self.__server
 
1859
 
 
1860
    def _adjust_url(self, base, relpath):
 
1861
        """Get a URL (or maybe a path) for the readwrite transport.
 
1862
 
 
1863
        This will either be backed by '.' or to an equivalent non-file based
 
1864
        facility.
 
1865
        relpath provides for clients to get a path relative to the base url.
 
1866
        These should only be downwards relative, not upwards.
 
1867
        """
 
1868
        if relpath is not None and relpath != '.':
 
1869
            if not base.endswith('/'):
 
1870
                base = base + '/'
 
1871
            # XXX: Really base should be a url; we did after all call
 
1872
            # get_url()!  But sometimes it's just a path (from
 
1873
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1874
            # to a non-escaped local path.
 
1875
            if base.startswith('./') or base.startswith('/'):
 
1876
                base += relpath
 
1877
            else:
 
1878
                base += urlutils.escape(relpath)
 
1879
        return base
 
1880
 
 
1881
    def get_url(self, relpath=None):
 
1882
        """Get a URL (or maybe a path) for the readwrite transport.
 
1883
 
 
1884
        This will either be backed by '.' or to an equivalent non-file based
 
1885
        facility.
 
1886
        relpath provides for clients to get a path relative to the base url.
 
1887
        These should only be downwards relative, not upwards.
 
1888
        """
 
1889
        base = self.get_server().get_url()
 
1890
        return self._adjust_url(base, relpath)
 
1891
 
 
1892
    def get_vfs_only_url(self, relpath=None):
 
1893
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1894
 
 
1895
        This will never be a smart protocol.  It always has all the
 
1896
        capabilities of the local filesystem, but it might actually be a
 
1897
        MemoryTransport or some other similar virtual filesystem.
 
1898
 
 
1899
        This is the backing transport (if any) of the server returned by
 
1900
        get_url and get_readonly_url.
 
1901
 
 
1902
        :param relpath: provides for clients to get a path relative to the base
 
1903
            url.  These should only be downwards relative, not upwards.
 
1904
        :return: A URL
 
1905
        """
 
1906
        base = self.get_vfs_only_server().get_url()
 
1907
        return self._adjust_url(base, relpath)
 
1908
 
 
1909
    def _create_safety_net(self):
 
1910
        """Make a fake bzr directory.
 
1911
 
 
1912
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
1913
        real branch.
 
1914
        """
 
1915
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
1916
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1917
 
 
1918
    def _check_safety_net(self):
 
1919
        """Check that the safety .bzr directory have not been touched.
 
1920
 
 
1921
        _make_test_root have created a .bzr directory to prevent tests from
 
1922
        propagating. This method ensures than a test did not leaked.
 
1923
        """
 
1924
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
1925
        wt = workingtree.WorkingTree.open(root)
 
1926
        last_rev = wt.last_revision()
 
1927
        if last_rev != 'null:':
 
1928
            # The current test have modified the /bzr directory, we need to
 
1929
            # recreate a new one or all the followng tests will fail.
 
1930
            # If you need to inspect its content uncomment the following line
 
1931
            # import pdb; pdb.set_trace()
 
1932
            _rmtree_temp_dir(root + '/.bzr')
 
1933
            self._create_safety_net()
 
1934
            raise AssertionError('%s/.bzr should not be modified' % root)
 
1935
 
 
1936
    def _make_test_root(self):
 
1937
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
1938
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1939
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
1940
 
 
1941
            self._create_safety_net()
 
1942
 
 
1943
            # The same directory is used by all tests, and we're not
 
1944
            # specifically told when all tests are finished.  This will do.
 
1945
            atexit.register(_rmtree_temp_dir, root)
 
1946
 
 
1947
        self.addCleanup(self._check_safety_net)
 
1948
 
 
1949
    def makeAndChdirToTestDir(self):
 
1950
        """Create a temporary directories for this one test.
 
1951
        
 
1952
        This must set self.test_home_dir and self.test_dir and chdir to
 
1953
        self.test_dir.
 
1954
        
 
1955
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1956
        """
 
1957
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1958
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1959
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1960
        
 
1961
    def make_branch(self, relpath, format=None):
 
1962
        """Create a branch on the transport at relpath."""
 
1963
        repo = self.make_repository(relpath, format=format)
 
1964
        return repo.bzrdir.create_branch()
 
1965
 
 
1966
    def make_bzrdir(self, relpath, format=None):
 
1967
        try:
 
1968
            # might be a relative or absolute path
 
1969
            maybe_a_url = self.get_url(relpath)
 
1970
            segments = maybe_a_url.rsplit('/', 1)
 
1971
            t = get_transport(maybe_a_url)
 
1972
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1973
                t.ensure_base()
 
1974
            if format is None:
 
1975
                format = 'default'
 
1976
            if isinstance(format, basestring):
 
1977
                format = bzrdir.format_registry.make_bzrdir(format)
 
1978
            return format.initialize_on_transport(t)
 
1979
        except errors.UninitializableFormat:
 
1980
            raise TestSkipped("Format %s is not initializable." % format)
 
1981
 
 
1982
    def make_repository(self, relpath, shared=False, format=None):
 
1983
        """Create a repository on our default transport at relpath.
 
1984
        
 
1985
        Note that relpath must be a relative path, not a full url.
 
1986
        """
 
1987
        # FIXME: If you create a remoterepository this returns the underlying
 
1988
        # real format, which is incorrect.  Actually we should make sure that 
 
1989
        # RemoteBzrDir returns a RemoteRepository.
 
1990
        # maybe  mbp 20070410
 
1991
        made_control = self.make_bzrdir(relpath, format=format)
 
1992
        return made_control.create_repository(shared=shared)
 
1993
 
 
1994
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1995
        """Create a branch on the default transport and a MemoryTree for it."""
 
1996
        b = self.make_branch(relpath, format=format)
 
1997
        return memorytree.MemoryTree.create_on_branch(b)
 
1998
 
 
1999
    def overrideEnvironmentForTesting(self):
 
2000
        os.environ['HOME'] = self.test_home_dir
 
2001
        os.environ['BZR_HOME'] = self.test_home_dir
 
2002
        
 
2003
    def setUp(self):
 
2004
        super(TestCaseWithMemoryTransport, self).setUp()
 
2005
        self._make_test_root()
 
2006
        _currentdir = os.getcwdu()
 
2007
        def _leaveDirectory():
 
2008
            os.chdir(_currentdir)
 
2009
        self.addCleanup(_leaveDirectory)
 
2010
        self.makeAndChdirToTestDir()
 
2011
        self.overrideEnvironmentForTesting()
 
2012
        self.__readonly_server = None
 
2013
        self.__server = None
 
2014
        self.reduceLockdirTimeout()
 
2015
 
 
2016
     
 
2017
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2018
    """Derived class that runs a test within a temporary directory.
 
2019
 
 
2020
    This is useful for tests that need to create a branch, etc.
 
2021
 
 
2022
    The directory is created in a slightly complex way: for each
 
2023
    Python invocation, a new temporary top-level directory is created.
 
2024
    All test cases create their own directory within that.  If the
 
2025
    tests complete successfully, the directory is removed.
 
2026
 
 
2027
    :ivar test_base_dir: The path of the top-level directory for this 
 
2028
    test, which contains a home directory and a work directory.
 
2029
 
 
2030
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2031
    which is used as $HOME for this test.
 
2032
 
 
2033
    :ivar test_dir: A directory under test_base_dir used as the current
 
2034
    directory when the test proper is run.
 
2035
    """
 
2036
 
 
2037
    OVERRIDE_PYTHON = 'python'
 
2038
 
 
2039
    def check_file_contents(self, filename, expect):
 
2040
        self.log("check contents of file %s" % filename)
 
2041
        contents = file(filename, 'r').read()
 
2042
        if contents != expect:
 
2043
            self.log("expected: %r" % expect)
 
2044
            self.log("actually: %r" % contents)
 
2045
            self.fail("contents of %s not as expected" % filename)
 
2046
 
 
2047
    def makeAndChdirToTestDir(self):
 
2048
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2049
        
 
2050
        For TestCaseInTempDir we create a temporary directory based on the test
 
2051
        name and then create two subdirs - test and home under it.
 
2052
        """
 
2053
        # create a directory within the top level test directory
 
2054
        candidate_dir = osutils.mkdtemp(dir=self.TEST_ROOT)
 
2055
        # now create test and home directories within this dir
 
2056
        self.test_base_dir = candidate_dir
 
2057
        self.test_home_dir = self.test_base_dir + '/home'
 
2058
        os.mkdir(self.test_home_dir)
 
2059
        self.test_dir = self.test_base_dir + '/work'
 
2060
        os.mkdir(self.test_dir)
 
2061
        os.chdir(self.test_dir)
 
2062
        # put name of test inside
 
2063
        f = file(self.test_base_dir + '/name', 'w')
 
2064
        try:
 
2065
            f.write(self.id())
 
2066
        finally:
 
2067
            f.close()
 
2068
        self.addCleanup(self.deleteTestDir)
 
2069
 
 
2070
    def deleteTestDir(self):
 
2071
        os.chdir(self.TEST_ROOT)
 
2072
        _rmtree_temp_dir(self.test_base_dir)
 
2073
 
 
2074
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2075
        """Build a test tree according to a pattern.
 
2076
 
 
2077
        shape is a sequence of file specifications.  If the final
 
2078
        character is '/', a directory is created.
 
2079
 
 
2080
        This assumes that all the elements in the tree being built are new.
 
2081
 
 
2082
        This doesn't add anything to a branch.
 
2083
 
 
2084
        :type shape:    list or tuple.
 
2085
        :param line_endings: Either 'binary' or 'native'
 
2086
            in binary mode, exact contents are written in native mode, the
 
2087
            line endings match the default platform endings.
 
2088
        :param transport: A transport to write to, for building trees on VFS's.
 
2089
            If the transport is readonly or None, "." is opened automatically.
 
2090
        :return: None
 
2091
        """
 
2092
        if type(shape) not in (list, tuple):
 
2093
            raise AssertionError("Parameter 'shape' should be "
 
2094
                "a list or a tuple. Got %r instead" % (shape,))
 
2095
        # It's OK to just create them using forward slashes on windows.
 
2096
        if transport is None or transport.is_readonly():
 
2097
            transport = get_transport(".")
 
2098
        for name in shape:
 
2099
            self.assert_(isinstance(name, basestring))
 
2100
            if name[-1] == '/':
 
2101
                transport.mkdir(urlutils.escape(name[:-1]))
 
2102
            else:
 
2103
                if line_endings == 'binary':
 
2104
                    end = '\n'
 
2105
                elif line_endings == 'native':
 
2106
                    end = os.linesep
 
2107
                else:
 
2108
                    raise errors.BzrError(
 
2109
                        'Invalid line ending request %r' % line_endings)
 
2110
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2111
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2112
 
 
2113
    def build_tree_contents(self, shape):
 
2114
        build_tree_contents(shape)
 
2115
 
 
2116
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2117
        """Assert whether path or paths are in the WorkingTree"""
 
2118
        if tree is None:
 
2119
            tree = workingtree.WorkingTree.open(root_path)
 
2120
        if not isinstance(path, basestring):
 
2121
            for p in path:
 
2122
                self.assertInWorkingTree(p,tree=tree)
 
2123
        else:
 
2124
            self.assertIsNot(tree.path2id(path), None,
 
2125
                path+' not in working tree.')
 
2126
 
 
2127
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2128
        """Assert whether path or paths are not in the WorkingTree"""
 
2129
        if tree is None:
 
2130
            tree = workingtree.WorkingTree.open(root_path)
 
2131
        if not isinstance(path, basestring):
 
2132
            for p in path:
 
2133
                self.assertNotInWorkingTree(p,tree=tree)
 
2134
        else:
 
2135
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2136
 
 
2137
 
 
2138
class TestCaseWithTransport(TestCaseInTempDir):
 
2139
    """A test case that provides get_url and get_readonly_url facilities.
 
2140
 
 
2141
    These back onto two transport servers, one for readonly access and one for
 
2142
    read write access.
 
2143
 
 
2144
    If no explicit class is provided for readonly access, a
 
2145
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2146
    based read write transports.
 
2147
 
 
2148
    If an explicit class is provided for readonly access, that server and the 
 
2149
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2150
    """
 
2151
 
 
2152
    def get_vfs_only_server(self):
 
2153
        """See TestCaseWithMemoryTransport.
 
2154
 
 
2155
        This is useful for some tests with specific servers that need
 
2156
        diagnostics.
 
2157
        """
 
2158
        if self.__vfs_server is None:
 
2159
            self.__vfs_server = self.vfs_transport_factory()
 
2160
            self.__vfs_server.setUp()
 
2161
            self.addCleanup(self.__vfs_server.tearDown)
 
2162
        return self.__vfs_server
 
2163
 
 
2164
    def make_branch_and_tree(self, relpath, format=None):
 
2165
        """Create a branch on the transport and a tree locally.
 
2166
 
 
2167
        If the transport is not a LocalTransport, the Tree can't be created on
 
2168
        the transport.  In that case if the vfs_transport_factory is
 
2169
        LocalURLServer the working tree is created in the local
 
2170
        directory backing the transport, and the returned tree's branch and
 
2171
        repository will also be accessed locally. Otherwise a lightweight
 
2172
        checkout is created and returned.
 
2173
 
 
2174
        :param format: The BzrDirFormat.
 
2175
        :returns: the WorkingTree.
 
2176
        """
 
2177
        # TODO: always use the local disk path for the working tree,
 
2178
        # this obviously requires a format that supports branch references
 
2179
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2180
        # RBC 20060208
 
2181
        b = self.make_branch(relpath, format=format)
 
2182
        try:
 
2183
            return b.bzrdir.create_workingtree()
 
2184
        except errors.NotLocalUrl:
 
2185
            # We can only make working trees locally at the moment.  If the
 
2186
            # transport can't support them, then we keep the non-disk-backed
 
2187
            # branch and create a local checkout.
 
2188
            if self.vfs_transport_factory is LocalURLServer:
 
2189
                # the branch is colocated on disk, we cannot create a checkout.
 
2190
                # hopefully callers will expect this.
 
2191
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2192
                return local_controldir.create_workingtree()
 
2193
            else:
 
2194
                return b.create_checkout(relpath, lightweight=True)
 
2195
 
 
2196
    def assertIsDirectory(self, relpath, transport):
 
2197
        """Assert that relpath within transport is a directory.
 
2198
 
 
2199
        This may not be possible on all transports; in that case it propagates
 
2200
        a TransportNotPossible.
 
2201
        """
 
2202
        try:
 
2203
            mode = transport.stat(relpath).st_mode
 
2204
        except errors.NoSuchFile:
 
2205
            self.fail("path %s is not a directory; no such file"
 
2206
                      % (relpath))
 
2207
        if not stat.S_ISDIR(mode):
 
2208
            self.fail("path %s is not a directory; has mode %#o"
 
2209
                      % (relpath, mode))
 
2210
 
 
2211
    def assertTreesEqual(self, left, right):
 
2212
        """Check that left and right have the same content and properties."""
 
2213
        # we use a tree delta to check for equality of the content, and we
 
2214
        # manually check for equality of other things such as the parents list.
 
2215
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2216
        differences = left.changes_from(right)
 
2217
        self.assertFalse(differences.has_changed(),
 
2218
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2219
 
 
2220
    def setUp(self):
 
2221
        super(TestCaseWithTransport, self).setUp()
 
2222
        self.__vfs_server = None
 
2223
 
 
2224
 
 
2225
class ChrootedTestCase(TestCaseWithTransport):
 
2226
    """A support class that provides readonly urls outside the local namespace.
 
2227
 
 
2228
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2229
    is then we are chrooted already, if it is not then an HttpServer is used
 
2230
    for readonly urls.
 
2231
 
 
2232
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2233
                       be used without needed to redo it when a different 
 
2234
                       subclass is in use ?
 
2235
    """
 
2236
 
 
2237
    def setUp(self):
 
2238
        super(ChrootedTestCase, self).setUp()
 
2239
        if not self.vfs_transport_factory == MemoryServer:
 
2240
            self.transport_readonly_server = HttpServer
 
2241
 
 
2242
 
 
2243
def condition_id_re(pattern):
 
2244
    """Create a condition filter which performs a re check on a test's id.
 
2245
    
 
2246
    :param pattern: A regular expression string.
 
2247
    :return: A callable that returns True if the re matches.
 
2248
    """
 
2249
    filter_re = re.compile(pattern)
 
2250
    def condition(test):
 
2251
        test_id = test.id()
 
2252
        return filter_re.search(test_id)
 
2253
    return condition
 
2254
 
 
2255
 
 
2256
def condition_isinstance(klass_or_klass_list):
 
2257
    """Create a condition filter which returns isinstance(param, klass).
 
2258
    
 
2259
    :return: A callable which when called with one parameter obj return the
 
2260
        result of isinstance(obj, klass_or_klass_list).
 
2261
    """
 
2262
    def condition(obj):
 
2263
        return isinstance(obj, klass_or_klass_list)
 
2264
    return condition
 
2265
 
 
2266
 
 
2267
def condition_id_in_list(id_list):
 
2268
    """Create a condition filter which verify that test's id in a list.
 
2269
    
 
2270
    :param id_list: A TestIdList object.
 
2271
    :return: A callable that returns True if the test's id appears in the list.
 
2272
    """
 
2273
    def condition(test):
 
2274
        return id_list.includes(test.id())
 
2275
    return condition
 
2276
 
 
2277
 
 
2278
def exclude_tests_by_condition(suite, condition):
 
2279
    """Create a test suite which excludes some tests from suite.
 
2280
 
 
2281
    :param suite: The suite to get tests from.
 
2282
    :param condition: A callable whose result evaluates True when called with a
 
2283
        test case which should be excluded from the result.
 
2284
    :return: A suite which contains the tests found in suite that fail
 
2285
        condition.
 
2286
    """
 
2287
    result = []
 
2288
    for test in iter_suite_tests(suite):
 
2289
        if not condition(test):
 
2290
            result.append(test)
 
2291
    return TestUtil.TestSuite(result)
 
2292
 
 
2293
 
 
2294
def filter_suite_by_condition(suite, condition):
 
2295
    """Create a test suite by filtering another one.
 
2296
    
 
2297
    :param suite: The source suite.
 
2298
    :param condition: A callable whose result evaluates True when called with a
 
2299
        test case which should be included in the result.
 
2300
    :return: A suite which contains the tests found in suite that pass
 
2301
        condition.
 
2302
    """ 
 
2303
    result = []
 
2304
    for test in iter_suite_tests(suite):
 
2305
        if condition(test):
 
2306
            result.append(test)
 
2307
    return TestUtil.TestSuite(result)
 
2308
 
 
2309
 
 
2310
def filter_suite_by_re(suite, pattern, exclude_pattern=DEPRECATED_PARAMETER,
 
2311
                       random_order=DEPRECATED_PARAMETER):
 
2312
    """Create a test suite by filtering another one.
 
2313
    
 
2314
    :param suite:           the source suite
 
2315
    :param pattern:         pattern that names must match
 
2316
    :param exclude_pattern: A pattern that names must not match. This parameter
 
2317
        is deprecated as of bzrlib 1.0. Please use the separate function
 
2318
        exclude_tests_by_re instead.
 
2319
    :param random_order:    If True, tests in the new suite will be put in
 
2320
        random order. This parameter is deprecated as of bzrlib 1.0. Please
 
2321
        use the separate function randomize_suite instead.
 
2322
    :returns: the newly created suite
 
2323
    """ 
 
2324
    if deprecated_passed(exclude_pattern):
 
2325
        symbol_versioning.warn(
 
2326
            one_zero % "passing exclude_pattern to filter_suite_by_re",
 
2327
                DeprecationWarning, stacklevel=2)
 
2328
        if exclude_pattern is not None:
 
2329
            suite = exclude_tests_by_re(suite, exclude_pattern)
 
2330
    condition = condition_id_re(pattern)
 
2331
    result_suite = filter_suite_by_condition(suite, condition)
 
2332
    if deprecated_passed(random_order):
 
2333
        symbol_versioning.warn(
 
2334
            one_zero % "passing random_order to filter_suite_by_re",
 
2335
                DeprecationWarning, stacklevel=2)
 
2336
        if random_order:
 
2337
            result_suite = randomize_suite(result_suite)
 
2338
    return result_suite
 
2339
 
 
2340
 
 
2341
def filter_suite_by_id_list(suite, test_id_list):
 
2342
    """Create a test suite by filtering another one.
 
2343
 
 
2344
    :param suite: The source suite.
 
2345
    :param test_id_list: A list of the test ids to keep as strings.
 
2346
    :returns: the newly created suite
 
2347
    """
 
2348
    condition = condition_id_in_list(test_id_list)
 
2349
    result_suite = filter_suite_by_condition(suite, condition)
 
2350
    return result_suite
 
2351
 
 
2352
 
 
2353
def exclude_tests_by_re(suite, pattern):
 
2354
    """Create a test suite which excludes some tests from suite.
 
2355
 
 
2356
    :param suite: The suite to get tests from.
 
2357
    :param pattern: A regular expression string. Test ids that match this
 
2358
        pattern will be excluded from the result.
 
2359
    :return: A TestSuite that contains all the tests from suite without the
 
2360
        tests that matched pattern. The order of tests is the same as it was in
 
2361
        suite.
 
2362
    """
 
2363
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2364
 
 
2365
 
 
2366
def preserve_input(something):
 
2367
    """A helper for performing test suite transformation chains.
 
2368
 
 
2369
    :param something: Anything you want to preserve.
 
2370
    :return: Something.
 
2371
    """
 
2372
    return something
 
2373
 
 
2374
 
 
2375
def randomize_suite(suite):
 
2376
    """Return a new TestSuite with suite's tests in random order.
 
2377
    
 
2378
    The tests in the input suite are flattened into a single suite in order to
 
2379
    accomplish this. Any nested TestSuites are removed to provide global
 
2380
    randomness.
 
2381
    """
 
2382
    tests = list(iter_suite_tests(suite))
 
2383
    random.shuffle(tests)
 
2384
    return TestUtil.TestSuite(tests)
 
2385
 
 
2386
 
 
2387
@deprecated_function(one_zero)
 
2388
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
2389
                     random_order=False, append_rest=True):
 
2390
    """DEPRECATED: Create a test suite by sorting another one.
 
2391
 
 
2392
    This method has been decomposed into separate helper methods that should be
 
2393
    called directly:
 
2394
     - filter_suite_by_re
 
2395
     - exclude_tests_by_re
 
2396
     - randomize_suite
 
2397
     - split_suite_by_re
 
2398
    
 
2399
    :param suite:           the source suite
 
2400
    :param pattern:         pattern that names must match in order to go
 
2401
                            first in the new suite
 
2402
    :param exclude_pattern: pattern that names must not match, if any
 
2403
    :param random_order:    if True, tests in the new suite will be put in
 
2404
                            random order (with all tests matching pattern
 
2405
                            first).
 
2406
    :param append_rest:     if False, pattern is a strict filter and not
 
2407
                            just an ordering directive
 
2408
    :returns: the newly created suite
 
2409
    """ 
 
2410
    if exclude_pattern is not None:
 
2411
        suite = exclude_tests_by_re(suite, exclude_pattern)
 
2412
    if random_order:
 
2413
        order_changer = randomize_suite
 
2414
    else:
 
2415
        order_changer = preserve_input
 
2416
    if append_rest:
 
2417
        suites = map(order_changer, split_suite_by_re(suite, pattern))
 
2418
        return TestUtil.TestSuite(suites)
 
2419
    else:
 
2420
        return order_changer(filter_suite_by_re(suite, pattern))
 
2421
 
 
2422
 
 
2423
def split_suite_by_re(suite, pattern):
 
2424
    """Split a test suite into two by a regular expression.
 
2425
    
 
2426
    :param suite: The suite to split.
 
2427
    :param pattern: A regular expression string. Test ids that match this
 
2428
        pattern will be in the first test suite returned, and the others in the
 
2429
        second test suite returned.
 
2430
    :return: A tuple of two test suites, where the first contains tests from
 
2431
        suite matching pattern, and the second contains the remainder from
 
2432
        suite. The order within each output suite is the same as it was in
 
2433
        suite.
 
2434
    """ 
 
2435
    matched = []
 
2436
    did_not_match = []
 
2437
    filter_re = re.compile(pattern)
 
2438
    for test in iter_suite_tests(suite):
 
2439
        test_id = test.id()
 
2440
        if filter_re.search(test_id):
 
2441
            matched.append(test)
 
2442
        else:
 
2443
            did_not_match.append(test)
 
2444
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2445
 
 
2446
 
 
2447
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2448
              stop_on_failure=False,
 
2449
              transport=None, lsprof_timed=None, bench_history=None,
 
2450
              matching_tests_first=None,
 
2451
              list_only=False,
 
2452
              random_seed=None,
 
2453
              exclude_pattern=None,
 
2454
              strict=False):
 
2455
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2456
    if verbose:
 
2457
        verbosity = 2
 
2458
    else:
 
2459
        verbosity = 1
 
2460
    runner = TextTestRunner(stream=sys.stdout,
 
2461
                            descriptions=0,
 
2462
                            verbosity=verbosity,
 
2463
                            bench_history=bench_history,
 
2464
                            list_only=list_only,
 
2465
                            )
 
2466
    runner.stop_on_failure=stop_on_failure
 
2467
    # Initialise the random number generator and display the seed used.
 
2468
    # We convert the seed to a long to make it reuseable across invocations.
 
2469
    random_order = False
 
2470
    if random_seed is not None:
 
2471
        random_order = True
 
2472
        if random_seed == "now":
 
2473
            random_seed = long(time.time())
 
2474
        else:
 
2475
            # Convert the seed to a long if we can
 
2476
            try:
 
2477
                random_seed = long(random_seed)
 
2478
            except:
 
2479
                pass
 
2480
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2481
            (random_seed))
 
2482
        random.seed(random_seed)
 
2483
    # Customise the list of tests if requested
 
2484
    if exclude_pattern is not None:
 
2485
        suite = exclude_tests_by_re(suite, exclude_pattern)
 
2486
    if random_order:
 
2487
        order_changer = randomize_suite
 
2488
    else:
 
2489
        order_changer = preserve_input
 
2490
    if pattern != '.*' or random_order:
 
2491
        if matching_tests_first:
 
2492
            suites = map(order_changer, split_suite_by_re(suite, pattern))
 
2493
            suite = TestUtil.TestSuite(suites)
 
2494
        else:
 
2495
            suite = order_changer(filter_suite_by_re(suite, pattern))
 
2496
 
 
2497
    result = runner.run(suite)
 
2498
 
 
2499
    if strict:
 
2500
        return result.wasStrictlySuccessful()
 
2501
 
 
2502
    return result.wasSuccessful()
 
2503
 
 
2504
 
 
2505
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2506
             transport=None,
 
2507
             test_suite_factory=None,
 
2508
             lsprof_timed=None,
 
2509
             bench_history=None,
 
2510
             matching_tests_first=None,
 
2511
             list_only=False,
 
2512
             random_seed=None,
 
2513
             exclude_pattern=None,
 
2514
             strict=False,
 
2515
             load_list=None,
 
2516
             ):
 
2517
    """Run the whole test suite under the enhanced runner"""
 
2518
    # XXX: Very ugly way to do this...
 
2519
    # Disable warning about old formats because we don't want it to disturb
 
2520
    # any blackbox tests.
 
2521
    from bzrlib import repository
 
2522
    repository._deprecation_warning_done = True
 
2523
 
 
2524
    global default_transport
 
2525
    if transport is None:
 
2526
        transport = default_transport
 
2527
    old_transport = default_transport
 
2528
    default_transport = transport
 
2529
    try:
 
2530
        if load_list is None:
 
2531
            keep_only = None
 
2532
        else:
 
2533
            keep_only = load_test_id_list(load_list)
 
2534
        if test_suite_factory is None:
 
2535
            suite = test_suite(keep_only)
 
2536
        else:
 
2537
            suite = test_suite_factory()
 
2538
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2539
                     stop_on_failure=stop_on_failure,
 
2540
                     transport=transport,
 
2541
                     lsprof_timed=lsprof_timed,
 
2542
                     bench_history=bench_history,
 
2543
                     matching_tests_first=matching_tests_first,
 
2544
                     list_only=list_only,
 
2545
                     random_seed=random_seed,
 
2546
                     exclude_pattern=exclude_pattern,
 
2547
                     strict=strict)
 
2548
    finally:
 
2549
        default_transport = old_transport
 
2550
 
 
2551
 
 
2552
def load_test_id_list(file_name):
 
2553
    """Load a test id list from a text file.
 
2554
 
 
2555
    The format is one test id by line.  No special care is taken to impose
 
2556
    strict rules, these test ids are used to filter the test suite so a test id
 
2557
    that do not match an existing test will do no harm. This allows user to add
 
2558
    comments, leave blank lines, etc.
 
2559
    """
 
2560
    test_list = []
 
2561
    try:
 
2562
        ftest = open(file_name, 'rt')
 
2563
    except IOError, e:
 
2564
        if e.errno != errno.ENOENT:
 
2565
            raise
 
2566
        else:
 
2567
            raise errors.NoSuchFile(file_name)
 
2568
 
 
2569
    for test_name in ftest.readlines():
 
2570
        test_list.append(test_name.strip())
 
2571
    ftest.close()
 
2572
    return test_list
 
2573
 
 
2574
 
 
2575
def suite_matches_id_list(test_suite, id_list):
 
2576
    """Warns about tests not appearing or appearing more than once.
 
2577
 
 
2578
    :param test_suite: A TestSuite object.
 
2579
    :param test_id_list: The list of test ids that should be found in 
 
2580
         test_suite.
 
2581
 
 
2582
    :return: (absents, duplicates) absents is a list containing the test found
 
2583
        in id_list but not in test_suite, duplicates is a list containing the
 
2584
        test found multiple times in test_suite.
 
2585
 
 
2586
    When using a prefined test id list, it may occurs that some tests do not
 
2587
    exist anymore or that some tests use the same id. This function warns the
 
2588
    tester about potential problems in his workflow (test lists are volatile)
 
2589
    or in the test suite itself (using the same id for several tests does not
 
2590
    help to localize defects).
 
2591
    """
 
2592
    # Build a dict counting id occurrences
 
2593
    tests = dict()
 
2594
    for test in iter_suite_tests(test_suite):
 
2595
        id = test.id()
 
2596
        tests[id] = tests.get(id, 0) + 1
 
2597
 
 
2598
    not_found = []
 
2599
    duplicates = []
 
2600
    for id in id_list:
 
2601
        occurs = tests.get(id, 0)
 
2602
        if not occurs:
 
2603
            not_found.append(id)
 
2604
        elif occurs > 1:
 
2605
            duplicates.append(id)
 
2606
 
 
2607
    return not_found, duplicates
 
2608
 
 
2609
 
 
2610
class TestIdList(object):
 
2611
    """Test id list to filter a test suite.
 
2612
 
 
2613
    Relying on the assumption that test ids are built as:
 
2614
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
2615
    notation, this class offers methods to :
 
2616
    - avoid building a test suite for modules not refered to in the test list,
 
2617
    - keep only the tests listed from the module test suite.
 
2618
    """
 
2619
 
 
2620
    def __init__(self, test_id_list):
 
2621
        # When a test suite needs to be filtered against us we compare test ids
 
2622
        # for equality, so a simple dict offers a quick and simple solution.
 
2623
        self.tests = dict().fromkeys(test_id_list, True)
 
2624
 
 
2625
        # While unittest.TestCase have ids like:
 
2626
        # <module>.<class>.<method>[(<param+)],
 
2627
        # doctest.DocTestCase can have ids like:
 
2628
        # <module>
 
2629
        # <module>.<class>
 
2630
        # <module>.<function>
 
2631
        # <module>.<class>.<method>
 
2632
 
 
2633
        # Since we can't predict a test class from its name only, we settle on
 
2634
        # a simple constraint: a test id always begins with its module name.
 
2635
 
 
2636
        modules = {}
 
2637
        for test_id in test_id_list:
 
2638
            parts = test_id.split('.')
 
2639
            mod_name = parts.pop(0)
 
2640
            modules[mod_name] = True
 
2641
            for part in parts:
 
2642
                mod_name += '.' + part
 
2643
                modules[mod_name] = True
 
2644
        self.modules = modules
 
2645
 
 
2646
    def refers_to(self, module_name):
 
2647
        """Is there tests for the module or one of its sub modules."""
 
2648
        return self.modules.has_key(module_name)
 
2649
 
 
2650
    def includes(self, test_id):
 
2651
        return self.tests.has_key(test_id)
 
2652
 
 
2653
 
 
2654
def test_suite(keep_only=None):
 
2655
    """Build and return TestSuite for the whole of bzrlib.
 
2656
 
 
2657
    :param keep_only: A list of test ids limiting the suite returned.
 
2658
 
 
2659
    This function can be replaced if you need to change the default test
 
2660
    suite on a global basis, but it is not encouraged.
 
2661
    """
 
2662
    testmod_names = [
 
2663
                   'bzrlib.util.tests.test_bencode',
 
2664
                   'bzrlib.tests.test__dirstate_helpers',
 
2665
                   'bzrlib.tests.test_ancestry',
 
2666
                   'bzrlib.tests.test_annotate',
 
2667
                   'bzrlib.tests.test_api',
 
2668
                   'bzrlib.tests.test_atomicfile',
 
2669
                   'bzrlib.tests.test_bad_files',
 
2670
                   'bzrlib.tests.test_bisect_multi',
 
2671
                   'bzrlib.tests.test_branch',
 
2672
                   'bzrlib.tests.test_branchbuilder',
 
2673
                   'bzrlib.tests.test_bugtracker',
 
2674
                   'bzrlib.tests.test_bundle',
 
2675
                   'bzrlib.tests.test_bzrdir',
 
2676
                   'bzrlib.tests.test_cache_utf8',
 
2677
                   'bzrlib.tests.test_commands',
 
2678
                   'bzrlib.tests.test_commit',
 
2679
                   'bzrlib.tests.test_commit_merge',
 
2680
                   'bzrlib.tests.test_config',
 
2681
                   'bzrlib.tests.test_conflicts',
 
2682
                   'bzrlib.tests.test_counted_lock',
 
2683
                   'bzrlib.tests.test_decorators',
 
2684
                   'bzrlib.tests.test_delta',
 
2685
                   'bzrlib.tests.test_deprecated_graph',
 
2686
                   'bzrlib.tests.test_diff',
 
2687
                   'bzrlib.tests.test_dirstate',
 
2688
                   'bzrlib.tests.test_directory_service',
 
2689
                   'bzrlib.tests.test_email_message',
 
2690
                   'bzrlib.tests.test_errors',
 
2691
                   'bzrlib.tests.test_escaped_store',
 
2692
                   'bzrlib.tests.test_extract',
 
2693
                   'bzrlib.tests.test_fetch',
 
2694
                   'bzrlib.tests.test_ftp_transport',
 
2695
                   'bzrlib.tests.test_generate_docs',
 
2696
                   'bzrlib.tests.test_generate_ids',
 
2697
                   'bzrlib.tests.test_globbing',
 
2698
                   'bzrlib.tests.test_gpg',
 
2699
                   'bzrlib.tests.test_graph',
 
2700
                   'bzrlib.tests.test_hashcache',
 
2701
                   'bzrlib.tests.test_help',
 
2702
                   'bzrlib.tests.test_hooks',
 
2703
                   'bzrlib.tests.test_http',
 
2704
                   'bzrlib.tests.test_http_implementations',
 
2705
                   'bzrlib.tests.test_http_response',
 
2706
                   'bzrlib.tests.test_https_ca_bundle',
 
2707
                   'bzrlib.tests.test_identitymap',
 
2708
                   'bzrlib.tests.test_ignores',
 
2709
                   'bzrlib.tests.test_index',
 
2710
                   'bzrlib.tests.test_info',
 
2711
                   'bzrlib.tests.test_inv',
 
2712
                   'bzrlib.tests.test_knit',
 
2713
                   'bzrlib.tests.test_lazy_import',
 
2714
                   'bzrlib.tests.test_lazy_regex',
 
2715
                   'bzrlib.tests.test_lockdir',
 
2716
                   'bzrlib.tests.test_lockable_files',
 
2717
                   'bzrlib.tests.test_log',
 
2718
                   'bzrlib.tests.test_lsprof',
 
2719
                   'bzrlib.tests.test_lru_cache',
 
2720
                   'bzrlib.tests.test_mail_client',
 
2721
                   'bzrlib.tests.test_memorytree',
 
2722
                   'bzrlib.tests.test_merge',
 
2723
                   'bzrlib.tests.test_merge3',
 
2724
                   'bzrlib.tests.test_merge_core',
 
2725
                   'bzrlib.tests.test_merge_directive',
 
2726
                   'bzrlib.tests.test_missing',
 
2727
                   'bzrlib.tests.test_msgeditor',
 
2728
                   'bzrlib.tests.test_multiparent',
 
2729
                   'bzrlib.tests.test_nonascii',
 
2730
                   'bzrlib.tests.test_options',
 
2731
                   'bzrlib.tests.test_osutils',
 
2732
                   'bzrlib.tests.test_osutils_encodings',
 
2733
                   'bzrlib.tests.test_pack',
 
2734
                   'bzrlib.tests.test_patch',
 
2735
                   'bzrlib.tests.test_patches',
 
2736
                   'bzrlib.tests.test_permissions',
 
2737
                   'bzrlib.tests.test_plugins',
 
2738
                   'bzrlib.tests.test_progress',
 
2739
                   'bzrlib.tests.test_reconfigure',
 
2740
                   'bzrlib.tests.test_reconcile',
 
2741
                   'bzrlib.tests.test_registry',
 
2742
                   'bzrlib.tests.test_remote',
 
2743
                   'bzrlib.tests.test_repository',
 
2744
                   'bzrlib.tests.test_revert',
 
2745
                   'bzrlib.tests.test_revision',
 
2746
                   'bzrlib.tests.test_revisionspec',
 
2747
                   'bzrlib.tests.test_revisiontree',
 
2748
                   'bzrlib.tests.test_rio',
 
2749
                   'bzrlib.tests.test_sampler',
 
2750
                   'bzrlib.tests.test_selftest',
 
2751
                   'bzrlib.tests.test_setup',
 
2752
                   'bzrlib.tests.test_sftp_transport',
 
2753
                   'bzrlib.tests.test_smart',
 
2754
                   'bzrlib.tests.test_smart_add',
 
2755
                   'bzrlib.tests.test_smart_transport',
 
2756
                   'bzrlib.tests.test_smtp_connection',
 
2757
                   'bzrlib.tests.test_source',
 
2758
                   'bzrlib.tests.test_ssh_transport',
 
2759
                   'bzrlib.tests.test_status',
 
2760
                   'bzrlib.tests.test_store',
 
2761
                   'bzrlib.tests.test_strace',
 
2762
                   'bzrlib.tests.test_subsume',
 
2763
                   'bzrlib.tests.test_switch',
 
2764
                   'bzrlib.tests.test_symbol_versioning',
 
2765
                   'bzrlib.tests.test_tag',
 
2766
                   'bzrlib.tests.test_testament',
 
2767
                   'bzrlib.tests.test_textfile',
 
2768
                   'bzrlib.tests.test_textmerge',
 
2769
                   'bzrlib.tests.test_timestamp',
 
2770
                   'bzrlib.tests.test_trace',
 
2771
                   'bzrlib.tests.test_transactions',
 
2772
                   'bzrlib.tests.test_transform',
 
2773
                   'bzrlib.tests.test_transport',
 
2774
                   'bzrlib.tests.test_tree',
 
2775
                   'bzrlib.tests.test_treebuilder',
 
2776
                   'bzrlib.tests.test_tsort',
 
2777
                   'bzrlib.tests.test_tuned_gzip',
 
2778
                   'bzrlib.tests.test_ui',
 
2779
                   'bzrlib.tests.test_uncommit',
 
2780
                   'bzrlib.tests.test_upgrade',
 
2781
                   'bzrlib.tests.test_urlutils',
 
2782
                   'bzrlib.tests.test_versionedfile',
 
2783
                   'bzrlib.tests.test_version',
 
2784
                   'bzrlib.tests.test_version_info',
 
2785
                   'bzrlib.tests.test_weave',
 
2786
                   'bzrlib.tests.test_whitebox',
 
2787
                   'bzrlib.tests.test_win32utils',
 
2788
                   'bzrlib.tests.test_workingtree',
 
2789
                   'bzrlib.tests.test_workingtree_4',
 
2790
                   'bzrlib.tests.test_wsgi',
 
2791
                   'bzrlib.tests.test_xml',
 
2792
                   ]
 
2793
    test_transport_implementations = [
 
2794
        'bzrlib.tests.test_transport_implementations',
 
2795
        'bzrlib.tests.test_read_bundle',
 
2796
        ]
 
2797
    loader = TestUtil.TestLoader()
 
2798
 
 
2799
    if keep_only is None:
 
2800
        loader = TestUtil.TestLoader()
 
2801
    else:
 
2802
        id_filter = TestIdList(keep_only)
 
2803
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
2804
    suite = loader.suiteClass()
 
2805
 
 
2806
    # modules building their suite with loadTestsFromModuleNames
 
2807
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2808
 
 
2809
    # modules adapted for transport implementations
 
2810
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
2811
    adapter = TransportTestProviderAdapter()
 
2812
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2813
 
 
2814
    # modules defining their own test_suite()
 
2815
    for package in [p for p in packages_to_test()
 
2816
                    if (keep_only is None
 
2817
                        or id_filter.refers_to(p.__name__))]:
 
2818
        pack_suite = package.test_suite()
 
2819
        suite.addTest(pack_suite)
 
2820
 
 
2821
    modules_to_doctest = [
 
2822
        'bzrlib',
 
2823
        'bzrlib.errors',
 
2824
        'bzrlib.export',
 
2825
        'bzrlib.inventory',
 
2826
        'bzrlib.iterablefile',
 
2827
        'bzrlib.lockdir',
 
2828
        'bzrlib.merge3',
 
2829
        'bzrlib.option',
 
2830
        'bzrlib.store',
 
2831
        'bzrlib.tests',
 
2832
        'bzrlib.timestamp',
 
2833
        'bzrlib.version_info_formats.format_custom',
 
2834
        ]
 
2835
 
 
2836
    for mod in modules_to_doctest:
 
2837
        if not (keep_only is None or id_filter.refers_to(mod)):
 
2838
            # No tests to keep here, move along
 
2839
            continue
 
2840
        try:
 
2841
            doc_suite = doctest.DocTestSuite(mod)
 
2842
        except ValueError, e:
 
2843
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
2844
            raise
 
2845
        suite.addTest(doc_suite)
 
2846
 
 
2847
    default_encoding = sys.getdefaultencoding()
 
2848
    for name, plugin in bzrlib.plugin.plugins().items():
 
2849
        if keep_only is not None:
 
2850
            if not id_filter.refers_to(plugin.module.__name__):
 
2851
                continue
 
2852
        plugin_suite = plugin.test_suite()
 
2853
        # We used to catch ImportError here and turn it into just a warning,
 
2854
        # but really if you don't have --no-plugins this should be a failure.
 
2855
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
2856
        if plugin_suite is None:
 
2857
            plugin_suite = plugin.load_plugin_tests(loader)
 
2858
        if plugin_suite is not None:
 
2859
            suite.addTest(plugin_suite)
 
2860
        if default_encoding != sys.getdefaultencoding():
 
2861
            bzrlib.trace.warning(
 
2862
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
2863
                sys.getdefaultencoding())
 
2864
            reload(sys)
 
2865
            sys.setdefaultencoding(default_encoding)
 
2866
 
 
2867
    if keep_only is not None:
 
2868
        # Now that the referred modules have loaded their tests, keep only the
 
2869
        # requested ones.
 
2870
        suite = filter_suite_by_id_list(suite, id_filter)
 
2871
        # Do some sanity checks on the id_list filtering
 
2872
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
2873
        for id in not_found:
 
2874
            bzrlib.trace.warning('"%s" not found in the test suite', id)
 
2875
        for id in duplicates:
 
2876
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
2877
 
 
2878
    return suite
 
2879
 
 
2880
 
 
2881
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
 
2882
    """Adapt all tests in some given modules to given scenarios.
 
2883
 
 
2884
    This is the recommended public interface for test parameterization.
 
2885
    Typically the test_suite() method for a per-implementation test
 
2886
    suite will call multiply_tests_from_modules and return the 
 
2887
    result.
 
2888
 
 
2889
    :param module_name_list: List of fully-qualified names of test
 
2890
        modules.
 
2891
    :param scenario_iter: Iterable of pairs of (scenario_name, 
 
2892
        scenario_param_dict).
 
2893
    :param loader: If provided, will be used instead of a new 
 
2894
        bzrlib.tests.TestLoader() instance.
 
2895
 
 
2896
    This returns a new TestSuite containing the cross product of
 
2897
    all the tests in all the modules, each repeated for each scenario.
 
2898
    Each test is adapted by adding the scenario name at the end 
 
2899
    of its name, and updating the test object's __dict__ with the
 
2900
    scenario_param_dict.
 
2901
 
 
2902
    >>> r = multiply_tests_from_modules(
 
2903
    ...     ['bzrlib.tests.test_sampler'],
 
2904
    ...     [('one', dict(param=1)), 
 
2905
    ...      ('two', dict(param=2))])
 
2906
    >>> tests = list(iter_suite_tests(r))
 
2907
    >>> len(tests)
 
2908
    2
 
2909
    >>> tests[0].id()
 
2910
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
2911
    >>> tests[0].param
 
2912
    1
 
2913
    >>> tests[1].param
 
2914
    2
 
2915
    """
 
2916
    # XXX: Isn't load_tests() a better way to provide the same functionality
 
2917
    # without forcing a predefined TestScenarioApplier ? --vila 080215
 
2918
    if loader is None:
 
2919
        loader = TestUtil.TestLoader()
 
2920
 
 
2921
    suite = loader.suiteClass()
 
2922
 
 
2923
    adapter = TestScenarioApplier()
 
2924
    adapter.scenarios = list(scenario_iter)
 
2925
    adapt_modules(module_name_list, adapter, loader, suite)
 
2926
    return suite
 
2927
 
 
2928
 
 
2929
def multiply_scenarios(scenarios_left, scenarios_right):
 
2930
    """Multiply two sets of scenarios.
 
2931
 
 
2932
    :returns: the cartesian product of the two sets of scenarios, that is
 
2933
        a scenario for every possible combination of a left scenario and a
 
2934
        right scenario.
 
2935
    """
 
2936
    return [
 
2937
        ('%s,%s' % (left_name, right_name),
 
2938
         dict(left_dict.items() + right_dict.items()))
 
2939
        for left_name, left_dict in scenarios_left
 
2940
        for right_name, right_dict in scenarios_right]
 
2941
 
 
2942
 
 
2943
 
 
2944
def adapt_modules(mods_list, adapter, loader, suite):
 
2945
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2946
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2947
        suite.addTests(adapter.adapt(test))
 
2948
 
 
2949
 
 
2950
def adapt_tests(tests_list, adapter, loader, suite):
 
2951
    """Adapt the tests in tests_list using adapter and add to suite."""
 
2952
    for test in tests_list:
 
2953
        suite.addTests(adapter.adapt(loader.loadTestsFromName(test)))
 
2954
 
 
2955
 
 
2956
def _rmtree_temp_dir(dirname):
 
2957
    # If LANG=C we probably have created some bogus paths
 
2958
    # which rmtree(unicode) will fail to delete
 
2959
    # so make sure we are using rmtree(str) to delete everything
 
2960
    # except on win32, where rmtree(str) will fail
 
2961
    # since it doesn't have the property of byte-stream paths
 
2962
    # (they are either ascii or mbcs)
 
2963
    if sys.platform == 'win32':
 
2964
        # make sure we are using the unicode win32 api
 
2965
        dirname = unicode(dirname)
 
2966
    else:
 
2967
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2968
    try:
 
2969
        osutils.rmtree(dirname)
 
2970
    except OSError, e:
 
2971
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2972
            sys.stderr.write(('Permission denied: '
 
2973
                                 'unable to remove testing dir '
 
2974
                                 '%s\n' % os.path.basename(dirname)))
 
2975
        else:
 
2976
            raise
 
2977
 
 
2978
 
 
2979
class Feature(object):
 
2980
    """An operating system Feature."""
 
2981
 
 
2982
    def __init__(self):
 
2983
        self._available = None
 
2984
 
 
2985
    def available(self):
 
2986
        """Is the feature available?
 
2987
 
 
2988
        :return: True if the feature is available.
 
2989
        """
 
2990
        if self._available is None:
 
2991
            self._available = self._probe()
 
2992
        return self._available
 
2993
 
 
2994
    def _probe(self):
 
2995
        """Implement this method in concrete features.
 
2996
 
 
2997
        :return: True if the feature is available.
 
2998
        """
 
2999
        raise NotImplementedError
 
3000
 
 
3001
    def __str__(self):
 
3002
        if getattr(self, 'feature_name', None):
 
3003
            return self.feature_name()
 
3004
        return self.__class__.__name__
 
3005
 
 
3006
 
 
3007
class _SymlinkFeature(Feature):
 
3008
 
 
3009
    def _probe(self):
 
3010
        return osutils.has_symlinks()
 
3011
 
 
3012
    def feature_name(self):
 
3013
        return 'symlinks'
 
3014
 
 
3015
SymlinkFeature = _SymlinkFeature()
 
3016
 
 
3017
 
 
3018
class _HardlinkFeature(Feature):
 
3019
 
 
3020
    def _probe(self):
 
3021
        return osutils.has_hardlinks()
 
3022
 
 
3023
    def feature_name(self):
 
3024
        return 'hardlinks'
 
3025
 
 
3026
HardlinkFeature = _HardlinkFeature()
 
3027
 
 
3028
 
 
3029
class _OsFifoFeature(Feature):
 
3030
 
 
3031
    def _probe(self):
 
3032
        return getattr(os, 'mkfifo', None)
 
3033
 
 
3034
    def feature_name(self):
 
3035
        return 'filesystem fifos'
 
3036
 
 
3037
OsFifoFeature = _OsFifoFeature()
 
3038
 
 
3039
 
 
3040
class TestScenarioApplier(object):
 
3041
    """A tool to apply scenarios to tests."""
 
3042
 
 
3043
    def adapt(self, test):
 
3044
        """Return a TestSuite containing a copy of test for each scenario."""
 
3045
        result = unittest.TestSuite()
 
3046
        for scenario in self.scenarios:
 
3047
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
3048
        return result
 
3049
 
 
3050
    def adapt_test_to_scenario(self, test, scenario):
 
3051
        """Copy test and apply scenario to it.
 
3052
 
 
3053
        :param test: A test to adapt.
 
3054
        :param scenario: A tuple describing the scenarion.
 
3055
            The first element of the tuple is the new test id.
 
3056
            The second element is a dict containing attributes to set on the
 
3057
            test.
 
3058
        :return: The adapted test.
 
3059
        """
 
3060
        from copy import deepcopy
 
3061
        new_test = deepcopy(test)
 
3062
        for name, value in scenario[1].items():
 
3063
            setattr(new_test, name, value)
 
3064
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
3065
        new_test.id = lambda: new_id
 
3066
        return new_test
 
3067
 
 
3068
 
 
3069
def probe_unicode_in_user_encoding():
 
3070
    """Try to encode several unicode strings to use in unicode-aware tests.
 
3071
    Return first successfull match.
 
3072
 
 
3073
    :return:  (unicode value, encoded plain string value) or (None, None)
 
3074
    """
 
3075
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
3076
    for uni_val in possible_vals:
 
3077
        try:
 
3078
            str_val = uni_val.encode(bzrlib.user_encoding)
 
3079
        except UnicodeEncodeError:
 
3080
            # Try a different character
 
3081
            pass
 
3082
        else:
 
3083
            return uni_val, str_val
 
3084
    return None, None
 
3085
 
 
3086
 
 
3087
def probe_bad_non_ascii(encoding):
 
3088
    """Try to find [bad] character with code [128..255]
 
3089
    that cannot be decoded to unicode in some encoding.
 
3090
    Return None if all non-ascii characters is valid
 
3091
    for given encoding.
 
3092
    """
 
3093
    for i in xrange(128, 256):
 
3094
        char = chr(i)
 
3095
        try:
 
3096
            char.decode(encoding)
 
3097
        except UnicodeDecodeError:
 
3098
            return char
 
3099
    return None
 
3100
 
 
3101
 
 
3102
class _FTPServerFeature(Feature):
 
3103
    """Some tests want an FTP Server, check if one is available.
 
3104
 
 
3105
    Right now, the only way this is available is if 'medusa' is installed.
 
3106
    http://www.amk.ca/python/code/medusa.html
 
3107
    """
 
3108
 
 
3109
    def _probe(self):
 
3110
        try:
 
3111
            import bzrlib.tests.ftp_server
 
3112
            return True
 
3113
        except ImportError:
 
3114
            return False
 
3115
 
 
3116
    def feature_name(self):
 
3117
        return 'FTPServer'
 
3118
 
 
3119
FTPServerFeature = _FTPServerFeature()
 
3120
 
 
3121
 
 
3122
class _CaseInsensitiveFilesystemFeature(Feature):
 
3123
    """Check if underlined filesystem is case-insensitive
 
3124
    (e.g. on Windows, Cygwin, MacOS)
 
3125
    """
 
3126
 
 
3127
    def _probe(self):
 
3128
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
3129
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
3130
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
3131
        else:
 
3132
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
3133
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
3134
            dir=root)
 
3135
        name_a = osutils.pathjoin(tdir, 'a')
 
3136
        name_A = osutils.pathjoin(tdir, 'A')
 
3137
        os.mkdir(name_a)
 
3138
        result = osutils.isdir(name_A)
 
3139
        _rmtree_temp_dir(tdir)
 
3140
        return result
 
3141
 
 
3142
    def feature_name(self):
 
3143
        return 'case-insensitive filesystem'
 
3144
 
 
3145
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()