/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Robert Collins
  • Date: 2007-08-22 23:43:36 UTC
  • mto: This revision was merged to the branch mainline in revision 2744.
  • Revision ID: robertc@robertcollins.net-20070822234336-pl9dspu4z6kbt8lg
Tighten the revision store implementation tests surrounding the
None/null: difference. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
 
43
import sys
 
44
import tempfile
 
45
import unittest
 
46
import time
 
47
import warnings
 
48
 
 
49
 
 
50
from bzrlib import (
 
51
    bzrdir,
 
52
    debug,
 
53
    errors,
 
54
    memorytree,
 
55
    osutils,
 
56
    progress,
 
57
    ui,
 
58
    urlutils,
 
59
    workingtree,
 
60
    )
 
61
import bzrlib.branch
 
62
import bzrlib.commands
 
63
import bzrlib.timestamp
 
64
import bzrlib.export
 
65
import bzrlib.inventory
 
66
import bzrlib.iterablefile
 
67
import bzrlib.lockdir
 
68
try:
 
69
    import bzrlib.lsprof
 
70
except ImportError:
 
71
    # lsprof not available
 
72
    pass
 
73
from bzrlib.merge import merge_inner
 
74
import bzrlib.merge3
 
75
import bzrlib.osutils
 
76
import bzrlib.plugin
 
77
from bzrlib.revision import common_ancestor
 
78
import bzrlib.store
 
79
from bzrlib import symbol_versioning
 
80
from bzrlib.symbol_versioning import (
 
81
    deprecated_method,
 
82
    zero_eighteen,
 
83
    zero_ninetyone,
 
84
    )
 
85
import bzrlib.trace
 
86
from bzrlib.transport import get_transport
 
87
import bzrlib.transport
 
88
from bzrlib.transport.local import LocalURLServer
 
89
from bzrlib.transport.memory import MemoryServer
 
90
from bzrlib.transport.readonly import ReadonlyServer
 
91
from bzrlib.trace import mutter, note
 
92
from bzrlib.tests import TestUtil
 
93
from bzrlib.tests.HttpServer import HttpServer
 
94
from bzrlib.tests.TestUtil import (
 
95
                          TestSuite,
 
96
                          TestLoader,
 
97
                          )
 
98
from bzrlib.tests.treeshape import build_tree_contents
 
99
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
100
 
 
101
# Mark this python module as being part of the implementation
 
102
# of unittest: this gives us better tracebacks where the last
 
103
# shown frame is the test code, not our assertXYZ.
 
104
__unittest = 1
 
105
 
 
106
default_transport = LocalURLServer
 
107
 
 
108
MODULES_TO_TEST = []
 
109
MODULES_TO_DOCTEST = [
 
110
                      bzrlib.timestamp,
 
111
                      bzrlib.errors,
 
112
                      bzrlib.export,
 
113
                      bzrlib.inventory,
 
114
                      bzrlib.iterablefile,
 
115
                      bzrlib.lockdir,
 
116
                      bzrlib.merge3,
 
117
                      bzrlib.option,
 
118
                      bzrlib.store,
 
119
                      ]
 
120
 
 
121
 
 
122
def packages_to_test():
 
123
    """Return a list of packages to test.
 
124
 
 
125
    The packages are not globally imported so that import failures are
 
126
    triggered when running selftest, not when importing the command.
 
127
    """
 
128
    import bzrlib.doc
 
129
    import bzrlib.tests.blackbox
 
130
    import bzrlib.tests.branch_implementations
 
131
    import bzrlib.tests.bzrdir_implementations
 
132
    import bzrlib.tests.commands
 
133
    import bzrlib.tests.interrepository_implementations
 
134
    import bzrlib.tests.interversionedfile_implementations
 
135
    import bzrlib.tests.intertree_implementations
 
136
    import bzrlib.tests.per_lock
 
137
    import bzrlib.tests.repository_implementations
 
138
    import bzrlib.tests.revisionstore_implementations
 
139
    import bzrlib.tests.tree_implementations
 
140
    import bzrlib.tests.workingtree_implementations
 
141
    return [
 
142
            bzrlib.doc,
 
143
            bzrlib.tests.blackbox,
 
144
            bzrlib.tests.branch_implementations,
 
145
            bzrlib.tests.bzrdir_implementations,
 
146
            bzrlib.tests.commands,
 
147
            bzrlib.tests.interrepository_implementations,
 
148
            bzrlib.tests.interversionedfile_implementations,
 
149
            bzrlib.tests.intertree_implementations,
 
150
            bzrlib.tests.per_lock,
 
151
            bzrlib.tests.repository_implementations,
 
152
            bzrlib.tests.revisionstore_implementations,
 
153
            bzrlib.tests.tree_implementations,
 
154
            bzrlib.tests.workingtree_implementations,
 
155
            ]
 
156
 
 
157
 
 
158
class ExtendedTestResult(unittest._TextTestResult):
 
159
    """Accepts, reports and accumulates the results of running tests.
 
160
 
 
161
    Compared to this unittest version this class adds support for
 
162
    profiling, benchmarking, stopping as soon as a test fails,  and
 
163
    skipping tests.  There are further-specialized subclasses for
 
164
    different types of display.
 
165
 
 
166
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
167
    addFailure or addError classes.  These in turn may redirect to a more
 
168
    specific case for the special test results supported by our extended
 
169
    tests.
 
170
 
 
171
    Note that just one of these objects is fed the results from many tests.
 
172
    """
 
173
 
 
174
    stop_early = False
 
175
    
 
176
    def __init__(self, stream, descriptions, verbosity,
 
177
                 bench_history=None,
 
178
                 num_tests=None,
 
179
                 ):
 
180
        """Construct new TestResult.
 
181
 
 
182
        :param bench_history: Optionally, a writable file object to accumulate
 
183
            benchmark results.
 
184
        """
 
185
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
186
        if bench_history is not None:
 
187
            from bzrlib.version import _get_bzr_source_tree
 
188
            src_tree = _get_bzr_source_tree()
 
189
            if src_tree:
 
190
                try:
 
191
                    revision_id = src_tree.get_parent_ids()[0]
 
192
                except IndexError:
 
193
                    # XXX: if this is a brand new tree, do the same as if there
 
194
                    # is no branch.
 
195
                    revision_id = ''
 
196
            else:
 
197
                # XXX: If there's no branch, what should we do?
 
198
                revision_id = ''
 
199
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
200
        self._bench_history = bench_history
 
201
        self.ui = ui.ui_factory
 
202
        self.num_tests = num_tests
 
203
        self.error_count = 0
 
204
        self.failure_count = 0
 
205
        self.known_failure_count = 0
 
206
        self.skip_count = 0
 
207
        self.unsupported = {}
 
208
        self.count = 0
 
209
        self._overall_start_time = time.time()
 
210
    
 
211
    def _extractBenchmarkTime(self, testCase):
 
212
        """Add a benchmark time for the current test case."""
 
213
        return getattr(testCase, "_benchtime", None)
 
214
    
 
215
    def _elapsedTestTimeString(self):
 
216
        """Return a time string for the overall time the current test has taken."""
 
217
        return self._formatTime(time.time() - self._start_time)
 
218
 
 
219
    def _testTimeString(self, testCase):
 
220
        benchmark_time = self._extractBenchmarkTime(testCase)
 
221
        if benchmark_time is not None:
 
222
            return "%s/%s" % (
 
223
                self._formatTime(benchmark_time),
 
224
                self._elapsedTestTimeString())
 
225
        else:
 
226
            return "           %s" % self._elapsedTestTimeString()
 
227
 
 
228
    def _formatTime(self, seconds):
 
229
        """Format seconds as milliseconds with leading spaces."""
 
230
        # some benchmarks can take thousands of seconds to run, so we need 8
 
231
        # places
 
232
        return "%8dms" % (1000 * seconds)
 
233
 
 
234
    def _shortened_test_description(self, test):
 
235
        what = test.id()
 
236
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
237
        return what
 
238
 
 
239
    def startTest(self, test):
 
240
        unittest.TestResult.startTest(self, test)
 
241
        self.report_test_start(test)
 
242
        test.number = self.count
 
243
        self._recordTestStartTime()
 
244
 
 
245
    def _recordTestStartTime(self):
 
246
        """Record that a test has started."""
 
247
        self._start_time = time.time()
 
248
 
 
249
    def _cleanupLogFile(self, test):
 
250
        # We can only do this if we have one of our TestCases, not if
 
251
        # we have a doctest.
 
252
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
253
        if setKeepLogfile is not None:
 
254
            setKeepLogfile()
 
255
 
 
256
    def addError(self, test, err):
 
257
        """Tell result that test finished with an error.
 
258
 
 
259
        Called from the TestCase run() method when the test
 
260
        fails with an unexpected error.
 
261
        """
 
262
        self._testConcluded(test)
 
263
        if isinstance(err[1], TestSkipped):
 
264
            return self._addSkipped(test, err)
 
265
        elif isinstance(err[1], UnavailableFeature):
 
266
            return self.addNotSupported(test, err[1].args[0])
 
267
        else:
 
268
            unittest.TestResult.addError(self, test, err)
 
269
            self.error_count += 1
 
270
            self.report_error(test, err)
 
271
            if self.stop_early:
 
272
                self.stop()
 
273
 
 
274
    def addFailure(self, test, err):
 
275
        """Tell result that test failed.
 
276
 
 
277
        Called from the TestCase run() method when the test
 
278
        fails because e.g. an assert() method failed.
 
279
        """
 
280
        self._testConcluded(test)
 
281
        if isinstance(err[1], KnownFailure):
 
282
            return self._addKnownFailure(test, err)
 
283
        else:
 
284
            unittest.TestResult.addFailure(self, test, err)
 
285
            self.failure_count += 1
 
286
            self.report_failure(test, err)
 
287
            if self.stop_early:
 
288
                self.stop()
 
289
 
 
290
    def addSuccess(self, test):
 
291
        """Tell result that test completed successfully.
 
292
 
 
293
        Called from the TestCase run()
 
294
        """
 
295
        self._testConcluded(test)
 
296
        if self._bench_history is not None:
 
297
            benchmark_time = self._extractBenchmarkTime(test)
 
298
            if benchmark_time is not None:
 
299
                self._bench_history.write("%s %s\n" % (
 
300
                    self._formatTime(benchmark_time),
 
301
                    test.id()))
 
302
        self.report_success(test)
 
303
        unittest.TestResult.addSuccess(self, test)
 
304
 
 
305
    def _testConcluded(self, test):
 
306
        """Common code when a test has finished.
 
307
 
 
308
        Called regardless of whether it succeded, failed, etc.
 
309
        """
 
310
        self._cleanupLogFile(test)
 
311
 
 
312
    def _addKnownFailure(self, test, err):
 
313
        self.known_failure_count += 1
 
314
        self.report_known_failure(test, err)
 
315
 
 
316
    def addNotSupported(self, test, feature):
 
317
        """The test will not be run because of a missing feature.
 
318
        """
 
319
        # this can be called in two different ways: it may be that the
 
320
        # test started running, and then raised (through addError) 
 
321
        # UnavailableFeature.  Alternatively this method can be called
 
322
        # while probing for features before running the tests; in that
 
323
        # case we will see startTest and stopTest, but the test will never
 
324
        # actually run.
 
325
        self.unsupported.setdefault(str(feature), 0)
 
326
        self.unsupported[str(feature)] += 1
 
327
        self.report_unsupported(test, feature)
 
328
 
 
329
    def _addSkipped(self, test, skip_excinfo):
 
330
        self.report_skip(test, skip_excinfo)
 
331
        # seems best to treat this as success from point-of-view of
 
332
        # unittest -- it actually does nothing so it barely matters :)
 
333
        try:
 
334
            test.tearDown()
 
335
        except KeyboardInterrupt:
 
336
            raise
 
337
        except:
 
338
            self.addError(test, test.__exc_info())
 
339
        else:
 
340
            unittest.TestResult.addSuccess(self, test)
 
341
 
 
342
    def printErrorList(self, flavour, errors):
 
343
        for test, err in errors:
 
344
            self.stream.writeln(self.separator1)
 
345
            self.stream.write("%s: " % flavour)
 
346
            self.stream.writeln(self.getDescription(test))
 
347
            if getattr(test, '_get_log', None) is not None:
 
348
                print >>self.stream
 
349
                print >>self.stream, \
 
350
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
351
                print >>self.stream, test._get_log()
 
352
                print >>self.stream, \
 
353
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
354
            self.stream.writeln(self.separator2)
 
355
            self.stream.writeln("%s" % err)
 
356
 
 
357
    def finished(self):
 
358
        pass
 
359
 
 
360
    def report_cleaning_up(self):
 
361
        pass
 
362
 
 
363
    def report_success(self, test):
 
364
        pass
 
365
 
 
366
    def wasStrictlySuccessful(self):
 
367
        if self.unsupported or self.known_failure_count:
 
368
            return False
 
369
 
 
370
        return self.wasSuccessful()
 
371
 
 
372
 
 
373
 
 
374
class TextTestResult(ExtendedTestResult):
 
375
    """Displays progress and results of tests in text form"""
 
376
 
 
377
    def __init__(self, stream, descriptions, verbosity,
 
378
                 bench_history=None,
 
379
                 num_tests=None,
 
380
                 pb=None,
 
381
                 ):
 
382
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
383
            bench_history, num_tests)
 
384
        if pb is None:
 
385
            self.pb = self.ui.nested_progress_bar()
 
386
            self._supplied_pb = False
 
387
        else:
 
388
            self.pb = pb
 
389
            self._supplied_pb = True
 
390
        self.pb.show_pct = False
 
391
        self.pb.show_spinner = False
 
392
        self.pb.show_eta = False,
 
393
        self.pb.show_count = False
 
394
        self.pb.show_bar = False
 
395
 
 
396
    def report_starting(self):
 
397
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
398
 
 
399
    def _progress_prefix_text(self):
 
400
        a = '[%d' % self.count
 
401
        if self.num_tests is not None:
 
402
            a +='/%d' % self.num_tests
 
403
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
404
        if self.error_count:
 
405
            a += ', %d errors' % self.error_count
 
406
        if self.failure_count:
 
407
            a += ', %d failed' % self.failure_count
 
408
        if self.known_failure_count:
 
409
            a += ', %d known failures' % self.known_failure_count
 
410
        if self.skip_count:
 
411
            a += ', %d skipped' % self.skip_count
 
412
        if self.unsupported:
 
413
            a += ', %d missing features' % len(self.unsupported)
 
414
        a += ']'
 
415
        return a
 
416
 
 
417
    def report_test_start(self, test):
 
418
        self.count += 1
 
419
        self.pb.update(
 
420
                self._progress_prefix_text()
 
421
                + ' ' 
 
422
                + self._shortened_test_description(test))
 
423
 
 
424
    def _test_description(self, test):
 
425
        return self._shortened_test_description(test)
 
426
 
 
427
    def report_error(self, test, err):
 
428
        self.pb.note('ERROR: %s\n    %s\n', 
 
429
            self._test_description(test),
 
430
            err[1],
 
431
            )
 
432
 
 
433
    def report_failure(self, test, err):
 
434
        self.pb.note('FAIL: %s\n    %s\n', 
 
435
            self._test_description(test),
 
436
            err[1],
 
437
            )
 
438
 
 
439
    def report_known_failure(self, test, err):
 
440
        self.pb.note('XFAIL: %s\n%s\n',
 
441
            self._test_description(test), err[1])
 
442
 
 
443
    def report_skip(self, test, skip_excinfo):
 
444
        self.skip_count += 1
 
445
        if False:
 
446
            # at the moment these are mostly not things we can fix
 
447
            # and so they just produce stipple; use the verbose reporter
 
448
            # to see them.
 
449
            if False:
 
450
                # show test and reason for skip
 
451
                self.pb.note('SKIP: %s\n    %s\n', 
 
452
                    self._shortened_test_description(test),
 
453
                    skip_excinfo[1])
 
454
            else:
 
455
                # since the class name was left behind in the still-visible
 
456
                # progress bar...
 
457
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
458
 
 
459
    def report_unsupported(self, test, feature):
 
460
        """test cannot be run because feature is missing."""
 
461
                  
 
462
    def report_cleaning_up(self):
 
463
        self.pb.update('cleaning up...')
 
464
 
 
465
    def finished(self):
 
466
        if not self._supplied_pb:
 
467
            self.pb.finished()
 
468
 
 
469
 
 
470
class VerboseTestResult(ExtendedTestResult):
 
471
    """Produce long output, with one line per test run plus times"""
 
472
 
 
473
    def _ellipsize_to_right(self, a_string, final_width):
 
474
        """Truncate and pad a string, keeping the right hand side"""
 
475
        if len(a_string) > final_width:
 
476
            result = '...' + a_string[3-final_width:]
 
477
        else:
 
478
            result = a_string
 
479
        return result.ljust(final_width)
 
480
 
 
481
    def report_starting(self):
 
482
        self.stream.write('running %d tests...\n' % self.num_tests)
 
483
 
 
484
    def report_test_start(self, test):
 
485
        self.count += 1
 
486
        name = self._shortened_test_description(test)
 
487
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
488
        # numbers, plus a trailing blank
 
489
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
490
        self.stream.write(self._ellipsize_to_right(name,
 
491
                          osutils.terminal_width()-30))
 
492
        self.stream.flush()
 
493
 
 
494
    def _error_summary(self, err):
 
495
        indent = ' ' * 4
 
496
        return '%s%s' % (indent, err[1])
 
497
 
 
498
    def report_error(self, test, err):
 
499
        self.stream.writeln('ERROR %s\n%s'
 
500
                % (self._testTimeString(test),
 
501
                   self._error_summary(err)))
 
502
 
 
503
    def report_failure(self, test, err):
 
504
        self.stream.writeln(' FAIL %s\n%s'
 
505
                % (self._testTimeString(test),
 
506
                   self._error_summary(err)))
 
507
 
 
508
    def report_known_failure(self, test, err):
 
509
        self.stream.writeln('XFAIL %s\n%s'
 
510
                % (self._testTimeString(test),
 
511
                   self._error_summary(err)))
 
512
 
 
513
    def report_success(self, test):
 
514
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
515
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
516
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
517
            stats.pprint(file=self.stream)
 
518
        # flush the stream so that we get smooth output. This verbose mode is
 
519
        # used to show the output in PQM.
 
520
        self.stream.flush()
 
521
 
 
522
    def report_skip(self, test, skip_excinfo):
 
523
        self.skip_count += 1
 
524
        self.stream.writeln(' SKIP %s\n%s'
 
525
                % (self._testTimeString(test),
 
526
                   self._error_summary(skip_excinfo)))
 
527
 
 
528
    def report_unsupported(self, test, feature):
 
529
        """test cannot be run because feature is missing."""
 
530
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
531
                %(self._testTimeString(test), feature))
 
532
 
 
533
 
 
534
class TextTestRunner(object):
 
535
    stop_on_failure = False
 
536
 
 
537
    def __init__(self,
 
538
                 stream=sys.stderr,
 
539
                 descriptions=0,
 
540
                 verbosity=1,
 
541
                 bench_history=None,
 
542
                 list_only=False
 
543
                 ):
 
544
        self.stream = unittest._WritelnDecorator(stream)
 
545
        self.descriptions = descriptions
 
546
        self.verbosity = verbosity
 
547
        self._bench_history = bench_history
 
548
        self.list_only = list_only
 
549
 
 
550
    def run(self, test):
 
551
        "Run the given test case or test suite."
 
552
        startTime = time.time()
 
553
        if self.verbosity == 1:
 
554
            result_class = TextTestResult
 
555
        elif self.verbosity >= 2:
 
556
            result_class = VerboseTestResult
 
557
        result = result_class(self.stream,
 
558
                              self.descriptions,
 
559
                              self.verbosity,
 
560
                              bench_history=self._bench_history,
 
561
                              num_tests=test.countTestCases(),
 
562
                              )
 
563
        result.stop_early = self.stop_on_failure
 
564
        result.report_starting()
 
565
        if self.list_only:
 
566
            if self.verbosity >= 2:
 
567
                self.stream.writeln("Listing tests only ...\n")
 
568
            run = 0
 
569
            for t in iter_suite_tests(test):
 
570
                self.stream.writeln("%s" % (t.id()))
 
571
                run += 1
 
572
            actionTaken = "Listed"
 
573
        else: 
 
574
            test.run(result)
 
575
            run = result.testsRun
 
576
            actionTaken = "Ran"
 
577
        stopTime = time.time()
 
578
        timeTaken = stopTime - startTime
 
579
        result.printErrors()
 
580
        self.stream.writeln(result.separator2)
 
581
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
582
                            run, run != 1 and "s" or "", timeTaken))
 
583
        self.stream.writeln()
 
584
        if not result.wasSuccessful():
 
585
            self.stream.write("FAILED (")
 
586
            failed, errored = map(len, (result.failures, result.errors))
 
587
            if failed:
 
588
                self.stream.write("failures=%d" % failed)
 
589
            if errored:
 
590
                if failed: self.stream.write(", ")
 
591
                self.stream.write("errors=%d" % errored)
 
592
            if result.known_failure_count:
 
593
                if failed or errored: self.stream.write(", ")
 
594
                self.stream.write("known_failure_count=%d" %
 
595
                    result.known_failure_count)
 
596
            self.stream.writeln(")")
 
597
        else:
 
598
            if result.known_failure_count:
 
599
                self.stream.writeln("OK (known_failures=%d)" %
 
600
                    result.known_failure_count)
 
601
            else:
 
602
                self.stream.writeln("OK")
 
603
        if result.skip_count > 0:
 
604
            skipped = result.skip_count
 
605
            self.stream.writeln('%d test%s skipped' %
 
606
                                (skipped, skipped != 1 and "s" or ""))
 
607
        if result.unsupported:
 
608
            for feature, count in sorted(result.unsupported.items()):
 
609
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
610
                    (feature, count))
 
611
        result.finished()
 
612
        return result
 
613
 
 
614
 
 
615
def iter_suite_tests(suite):
 
616
    """Return all tests in a suite, recursing through nested suites"""
 
617
    for item in suite._tests:
 
618
        if isinstance(item, unittest.TestCase):
 
619
            yield item
 
620
        elif isinstance(item, unittest.TestSuite):
 
621
            for r in iter_suite_tests(item):
 
622
                yield r
 
623
        else:
 
624
            raise Exception('unknown object %r inside test suite %r'
 
625
                            % (item, suite))
 
626
 
 
627
 
 
628
class TestSkipped(Exception):
 
629
    """Indicates that a test was intentionally skipped, rather than failing."""
 
630
 
 
631
 
 
632
class KnownFailure(AssertionError):
 
633
    """Indicates that a test failed in a precisely expected manner.
 
634
 
 
635
    Such failures dont block the whole test suite from passing because they are
 
636
    indicators of partially completed code or of future work. We have an
 
637
    explicit error for them so that we can ensure that they are always visible:
 
638
    KnownFailures are always shown in the output of bzr selftest.
 
639
    """
 
640
 
 
641
 
 
642
class UnavailableFeature(Exception):
 
643
    """A feature required for this test was not available.
 
644
 
 
645
    The feature should be used to construct the exception.
 
646
    """
 
647
 
 
648
 
 
649
class CommandFailed(Exception):
 
650
    pass
 
651
 
 
652
 
 
653
class StringIOWrapper(object):
 
654
    """A wrapper around cStringIO which just adds an encoding attribute.
 
655
    
 
656
    Internally we can check sys.stdout to see what the output encoding
 
657
    should be. However, cStringIO has no encoding attribute that we can
 
658
    set. So we wrap it instead.
 
659
    """
 
660
    encoding='ascii'
 
661
    _cstring = None
 
662
 
 
663
    def __init__(self, s=None):
 
664
        if s is not None:
 
665
            self.__dict__['_cstring'] = StringIO(s)
 
666
        else:
 
667
            self.__dict__['_cstring'] = StringIO()
 
668
 
 
669
    def __getattr__(self, name, getattr=getattr):
 
670
        return getattr(self.__dict__['_cstring'], name)
 
671
 
 
672
    def __setattr__(self, name, val):
 
673
        if name == 'encoding':
 
674
            self.__dict__['encoding'] = val
 
675
        else:
 
676
            return setattr(self._cstring, name, val)
 
677
 
 
678
 
 
679
class TestUIFactory(ui.CLIUIFactory):
 
680
    """A UI Factory for testing.
 
681
 
 
682
    Hide the progress bar but emit note()s.
 
683
    Redirect stdin.
 
684
    Allows get_password to be tested without real tty attached.
 
685
    """
 
686
 
 
687
    def __init__(self,
 
688
                 stdout=None,
 
689
                 stderr=None,
 
690
                 stdin=None):
 
691
        super(TestUIFactory, self).__init__()
 
692
        if stdin is not None:
 
693
            # We use a StringIOWrapper to be able to test various
 
694
            # encodings, but the user is still responsible to
 
695
            # encode the string and to set the encoding attribute
 
696
            # of StringIOWrapper.
 
697
            self.stdin = StringIOWrapper(stdin)
 
698
        if stdout is None:
 
699
            self.stdout = sys.stdout
 
700
        else:
 
701
            self.stdout = stdout
 
702
        if stderr is None:
 
703
            self.stderr = sys.stderr
 
704
        else:
 
705
            self.stderr = stderr
 
706
 
 
707
    def clear(self):
 
708
        """See progress.ProgressBar.clear()."""
 
709
 
 
710
    def clear_term(self):
 
711
        """See progress.ProgressBar.clear_term()."""
 
712
 
 
713
    def clear_term(self):
 
714
        """See progress.ProgressBar.clear_term()."""
 
715
 
 
716
    def finished(self):
 
717
        """See progress.ProgressBar.finished()."""
 
718
 
 
719
    def note(self, fmt_string, *args, **kwargs):
 
720
        """See progress.ProgressBar.note()."""
 
721
        self.stdout.write((fmt_string + "\n") % args)
 
722
 
 
723
    def progress_bar(self):
 
724
        return self
 
725
 
 
726
    def nested_progress_bar(self):
 
727
        return self
 
728
 
 
729
    def update(self, message, count=None, total=None):
 
730
        """See progress.ProgressBar.update()."""
 
731
 
 
732
    def get_non_echoed_password(self, prompt):
 
733
        """Get password from stdin without trying to handle the echo mode"""
 
734
        if prompt:
 
735
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
736
        password = self.stdin.readline()
 
737
        if not password:
 
738
            raise EOFError
 
739
        if password[-1] == '\n':
 
740
            password = password[:-1]
 
741
        return password
 
742
 
 
743
 
 
744
class TestCase(unittest.TestCase):
 
745
    """Base class for bzr unit tests.
 
746
    
 
747
    Tests that need access to disk resources should subclass 
 
748
    TestCaseInTempDir not TestCase.
 
749
 
 
750
    Error and debug log messages are redirected from their usual
 
751
    location into a temporary file, the contents of which can be
 
752
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
753
    so that it can also capture file IO.  When the test completes this file
 
754
    is read into memory and removed from disk.
 
755
       
 
756
    There are also convenience functions to invoke bzr's command-line
 
757
    routine, and to build and check bzr trees.
 
758
   
 
759
    In addition to the usual method of overriding tearDown(), this class also
 
760
    allows subclasses to register functions into the _cleanups list, which is
 
761
    run in order as the object is torn down.  It's less likely this will be
 
762
    accidentally overlooked.
 
763
    """
 
764
 
 
765
    _log_file_name = None
 
766
    _log_contents = ''
 
767
    _keep_log_file = False
 
768
    # record lsprof data when performing benchmark calls.
 
769
    _gather_lsprof_in_benchmarks = False
 
770
 
 
771
    def __init__(self, methodName='testMethod'):
 
772
        super(TestCase, self).__init__(methodName)
 
773
        self._cleanups = []
 
774
 
 
775
    def setUp(self):
 
776
        unittest.TestCase.setUp(self)
 
777
        self._cleanEnvironment()
 
778
        bzrlib.trace.disable_default_logging()
 
779
        self._silenceUI()
 
780
        self._startLogFile()
 
781
        self._benchcalls = []
 
782
        self._benchtime = None
 
783
        self._clear_hooks()
 
784
        self._clear_debug_flags()
 
785
 
 
786
    def _clear_debug_flags(self):
 
787
        """Prevent externally set debug flags affecting tests.
 
788
        
 
789
        Tests that want to use debug flags can just set them in the
 
790
        debug_flags set during setup/teardown.
 
791
        """
 
792
        self._preserved_debug_flags = set(debug.debug_flags)
 
793
        debug.debug_flags.clear()
 
794
        self.addCleanup(self._restore_debug_flags)
 
795
 
 
796
    def _clear_hooks(self):
 
797
        # prevent hooks affecting tests
 
798
        import bzrlib.branch
 
799
        import bzrlib.smart.server
 
800
        self._preserved_hooks = {
 
801
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
802
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
803
            }
 
804
        self.addCleanup(self._restoreHooks)
 
805
        # reset all hooks to an empty instance of the appropriate type
 
806
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
807
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
808
 
 
809
    def _silenceUI(self):
 
810
        """Turn off UI for duration of test"""
 
811
        # by default the UI is off; tests can turn it on if they want it.
 
812
        saved = ui.ui_factory
 
813
        def _restore():
 
814
            ui.ui_factory = saved
 
815
        ui.ui_factory = ui.SilentUIFactory()
 
816
        self.addCleanup(_restore)
 
817
 
 
818
    def _ndiff_strings(self, a, b):
 
819
        """Return ndiff between two strings containing lines.
 
820
        
 
821
        A trailing newline is added if missing to make the strings
 
822
        print properly."""
 
823
        if b and b[-1] != '\n':
 
824
            b += '\n'
 
825
        if a and a[-1] != '\n':
 
826
            a += '\n'
 
827
        difflines = difflib.ndiff(a.splitlines(True),
 
828
                                  b.splitlines(True),
 
829
                                  linejunk=lambda x: False,
 
830
                                  charjunk=lambda x: False)
 
831
        return ''.join(difflines)
 
832
 
 
833
    def assertEqual(self, a, b, message=''):
 
834
        try:
 
835
            if a == b:
 
836
                return
 
837
        except UnicodeError, e:
 
838
            # If we can't compare without getting a UnicodeError, then
 
839
            # obviously they are different
 
840
            mutter('UnicodeError: %s', e)
 
841
        if message:
 
842
            message += '\n'
 
843
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
844
            % (message,
 
845
               pformat(a), pformat(b)))
 
846
 
 
847
    assertEquals = assertEqual
 
848
 
 
849
    def assertEqualDiff(self, a, b, message=None):
 
850
        """Assert two texts are equal, if not raise an exception.
 
851
        
 
852
        This is intended for use with multi-line strings where it can 
 
853
        be hard to find the differences by eye.
 
854
        """
 
855
        # TODO: perhaps override assertEquals to call this for strings?
 
856
        if a == b:
 
857
            return
 
858
        if message is None:
 
859
            message = "texts not equal:\n"
 
860
        raise AssertionError(message +
 
861
                             self._ndiff_strings(a, b))
 
862
        
 
863
    def assertEqualMode(self, mode, mode_test):
 
864
        self.assertEqual(mode, mode_test,
 
865
                         'mode mismatch %o != %o' % (mode, mode_test))
 
866
 
 
867
    def assertPositive(self, val):
 
868
        """Assert that val is greater than 0."""
 
869
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
870
 
 
871
    def assertNegative(self, val):
 
872
        """Assert that val is less than 0."""
 
873
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
874
 
 
875
    def assertStartsWith(self, s, prefix):
 
876
        if not s.startswith(prefix):
 
877
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
878
 
 
879
    def assertEndsWith(self, s, suffix):
 
880
        """Asserts that s ends with suffix."""
 
881
        if not s.endswith(suffix):
 
882
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
883
 
 
884
    def assertContainsRe(self, haystack, needle_re):
 
885
        """Assert that a contains something matching a regular expression."""
 
886
        if not re.search(needle_re, haystack):
 
887
            if '\n' in haystack or len(haystack) > 60:
 
888
                # a long string, format it in a more readable way
 
889
                raise AssertionError(
 
890
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
891
                        % (needle_re, haystack))
 
892
            else:
 
893
                raise AssertionError('pattern "%s" not found in "%s"'
 
894
                        % (needle_re, haystack))
 
895
 
 
896
    def assertNotContainsRe(self, haystack, needle_re):
 
897
        """Assert that a does not match a regular expression"""
 
898
        if re.search(needle_re, haystack):
 
899
            raise AssertionError('pattern "%s" found in "%s"'
 
900
                    % (needle_re, haystack))
 
901
 
 
902
    def assertSubset(self, sublist, superlist):
 
903
        """Assert that every entry in sublist is present in superlist."""
 
904
        missing = set(sublist) - set(superlist)
 
905
        if len(missing) > 0:
 
906
            raise AssertionError("value(s) %r not present in container %r" %
 
907
                                 (missing, superlist))
 
908
 
 
909
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
910
        """Fail unless excClass is raised when the iterator from func is used.
 
911
        
 
912
        Many functions can return generators this makes sure
 
913
        to wrap them in a list() call to make sure the whole generator
 
914
        is run, and that the proper exception is raised.
 
915
        """
 
916
        try:
 
917
            list(func(*args, **kwargs))
 
918
        except excClass:
 
919
            return
 
920
        else:
 
921
            if getattr(excClass,'__name__', None) is not None:
 
922
                excName = excClass.__name__
 
923
            else:
 
924
                excName = str(excClass)
 
925
            raise self.failureException, "%s not raised" % excName
 
926
 
 
927
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
928
        """Assert that a callable raises a particular exception.
 
929
 
 
930
        :param excClass: As for the except statement, this may be either an
 
931
            exception class, or a tuple of classes.
 
932
        :param callableObj: A callable, will be passed ``*args`` and
 
933
            ``**kwargs``.
 
934
 
 
935
        Returns the exception so that you can examine it.
 
936
        """
 
937
        try:
 
938
            callableObj(*args, **kwargs)
 
939
        except excClass, e:
 
940
            return e
 
941
        else:
 
942
            if getattr(excClass,'__name__', None) is not None:
 
943
                excName = excClass.__name__
 
944
            else:
 
945
                # probably a tuple
 
946
                excName = str(excClass)
 
947
            raise self.failureException, "%s not raised" % excName
 
948
 
 
949
    def assertIs(self, left, right, message=None):
 
950
        if not (left is right):
 
951
            if message is not None:
 
952
                raise AssertionError(message)
 
953
            else:
 
954
                raise AssertionError("%r is not %r." % (left, right))
 
955
 
 
956
    def assertIsNot(self, left, right, message=None):
 
957
        if (left is right):
 
958
            if message is not None:
 
959
                raise AssertionError(message)
 
960
            else:
 
961
                raise AssertionError("%r is %r." % (left, right))
 
962
 
 
963
    def assertTransportMode(self, transport, path, mode):
 
964
        """Fail if a path does not have mode mode.
 
965
        
 
966
        If modes are not supported on this transport, the assertion is ignored.
 
967
        """
 
968
        if not transport._can_roundtrip_unix_modebits():
 
969
            return
 
970
        path_stat = transport.stat(path)
 
971
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
972
        self.assertEqual(mode, actual_mode,
 
973
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
974
 
 
975
    def assertIsInstance(self, obj, kls):
 
976
        """Fail if obj is not an instance of kls"""
 
977
        if not isinstance(obj, kls):
 
978
            self.fail("%r is an instance of %s rather than %s" % (
 
979
                obj, obj.__class__, kls))
 
980
 
 
981
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
982
        """Invoke a test, expecting it to fail for the given reason.
 
983
 
 
984
        This is for assertions that ought to succeed, but currently fail.
 
985
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
986
        about the failure you're expecting.  If a new bug is introduced,
 
987
        AssertionError should be raised, not KnownFailure.
 
988
 
 
989
        Frequently, expectFailure should be followed by an opposite assertion.
 
990
        See example below.
 
991
 
 
992
        Intended to be used with a callable that raises AssertionError as the
 
993
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
994
 
 
995
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
996
        test succeeds.
 
997
 
 
998
        example usage::
 
999
 
 
1000
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1001
                             dynamic_val)
 
1002
          self.assertEqual(42, dynamic_val)
 
1003
 
 
1004
          This means that a dynamic_val of 54 will cause the test to raise
 
1005
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1006
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1007
          than 54 or 42 will cause an AssertionError.
 
1008
        """
 
1009
        try:
 
1010
            assertion(*args, **kwargs)
 
1011
        except AssertionError:
 
1012
            raise KnownFailure(reason)
 
1013
        else:
 
1014
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1015
 
 
1016
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
1017
        """A helper for callDeprecated and applyDeprecated.
 
1018
 
 
1019
        :param a_callable: A callable to call.
 
1020
        :param args: The positional arguments for the callable
 
1021
        :param kwargs: The keyword arguments for the callable
 
1022
        :return: A tuple (warnings, result). result is the result of calling
 
1023
            a_callable(``*args``, ``**kwargs``).
 
1024
        """
 
1025
        local_warnings = []
 
1026
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1027
            # we've hooked into a deprecation specific callpath,
 
1028
            # only deprecations should getting sent via it.
 
1029
            self.assertEqual(cls, DeprecationWarning)
 
1030
            local_warnings.append(msg)
 
1031
        original_warning_method = symbol_versioning.warn
 
1032
        symbol_versioning.set_warning_method(capture_warnings)
 
1033
        try:
 
1034
            result = a_callable(*args, **kwargs)
 
1035
        finally:
 
1036
            symbol_versioning.set_warning_method(original_warning_method)
 
1037
        return (local_warnings, result)
 
1038
 
 
1039
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1040
        """Call a deprecated callable without warning the user.
 
1041
 
 
1042
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1043
        not other callers that go direct to the warning module.
 
1044
 
 
1045
        To test that a deprecated method raises an error, do something like
 
1046
        this::
 
1047
 
 
1048
        self.assertRaises(errors.ReservedId,
 
1049
            self.applyDeprecated, zero_ninetyone,
 
1050
                br.append_revision, 'current:')
 
1051
 
 
1052
        :param deprecation_format: The deprecation format that the callable
 
1053
            should have been deprecated with. This is the same type as the
 
1054
            parameter to deprecated_method/deprecated_function. If the
 
1055
            callable is not deprecated with this format, an assertion error
 
1056
            will be raised.
 
1057
        :param a_callable: A callable to call. This may be a bound method or
 
1058
            a regular function. It will be called with ``*args`` and
 
1059
            ``**kwargs``.
 
1060
        :param args: The positional arguments for the callable
 
1061
        :param kwargs: The keyword arguments for the callable
 
1062
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1063
        """
 
1064
        call_warnings, result = self._capture_warnings(a_callable,
 
1065
            *args, **kwargs)
 
1066
        expected_first_warning = symbol_versioning.deprecation_string(
 
1067
            a_callable, deprecation_format)
 
1068
        if len(call_warnings) == 0:
 
1069
            self.fail("No deprecation warning generated by call to %s" %
 
1070
                a_callable)
 
1071
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1072
        return result
 
1073
 
 
1074
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1075
        """Assert that a callable is deprecated in a particular way.
 
1076
 
 
1077
        This is a very precise test for unusual requirements. The 
 
1078
        applyDeprecated helper function is probably more suited for most tests
 
1079
        as it allows you to simply specify the deprecation format being used
 
1080
        and will ensure that that is issued for the function being called.
 
1081
 
 
1082
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1083
        not other callers that go direct to the warning module.
 
1084
 
 
1085
        :param expected: a list of the deprecation warnings expected, in order
 
1086
        :param callable: The callable to call
 
1087
        :param args: The positional arguments for the callable
 
1088
        :param kwargs: The keyword arguments for the callable
 
1089
        """
 
1090
        call_warnings, result = self._capture_warnings(callable,
 
1091
            *args, **kwargs)
 
1092
        self.assertEqual(expected, call_warnings)
 
1093
        return result
 
1094
 
 
1095
    def _startLogFile(self):
 
1096
        """Send bzr and test log messages to a temporary file.
 
1097
 
 
1098
        The file is removed as the test is torn down.
 
1099
        """
 
1100
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1101
        self._log_file = os.fdopen(fileno, 'w+')
 
1102
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1103
        self._log_file_name = name
 
1104
        self.addCleanup(self._finishLogFile)
 
1105
 
 
1106
    def _finishLogFile(self):
 
1107
        """Finished with the log file.
 
1108
 
 
1109
        Close the file and delete it, unless setKeepLogfile was called.
 
1110
        """
 
1111
        if self._log_file is None:
 
1112
            return
 
1113
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1114
        self._log_file.close()
 
1115
        self._log_file = None
 
1116
        if not self._keep_log_file:
 
1117
            os.remove(self._log_file_name)
 
1118
            self._log_file_name = None
 
1119
 
 
1120
    def setKeepLogfile(self):
 
1121
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1122
        self._keep_log_file = True
 
1123
 
 
1124
    def addCleanup(self, callable):
 
1125
        """Arrange to run a callable when this case is torn down.
 
1126
 
 
1127
        Callables are run in the reverse of the order they are registered, 
 
1128
        ie last-in first-out.
 
1129
        """
 
1130
        if callable in self._cleanups:
 
1131
            raise ValueError("cleanup function %r already registered on %s" 
 
1132
                    % (callable, self))
 
1133
        self._cleanups.append(callable)
 
1134
 
 
1135
    def _cleanEnvironment(self):
 
1136
        new_env = {
 
1137
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1138
            'HOME': os.getcwd(),
 
1139
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1140
            'BZR_EMAIL': None,
 
1141
            'BZREMAIL': None, # may still be present in the environment
 
1142
            'EMAIL': None,
 
1143
            'BZR_PROGRESS_BAR': None,
 
1144
            # SSH Agent
 
1145
            'SSH_AUTH_SOCK': None,
 
1146
            # Proxies
 
1147
            'http_proxy': None,
 
1148
            'HTTP_PROXY': None,
 
1149
            'https_proxy': None,
 
1150
            'HTTPS_PROXY': None,
 
1151
            'no_proxy': None,
 
1152
            'NO_PROXY': None,
 
1153
            'all_proxy': None,
 
1154
            'ALL_PROXY': None,
 
1155
            # Nobody cares about these ones AFAIK. So far at
 
1156
            # least. If you do (care), please update this comment
 
1157
            # -- vila 20061212
 
1158
            'ftp_proxy': None,
 
1159
            'FTP_PROXY': None,
 
1160
            'BZR_REMOTE_PATH': None,
 
1161
        }
 
1162
        self.__old_env = {}
 
1163
        self.addCleanup(self._restoreEnvironment)
 
1164
        for name, value in new_env.iteritems():
 
1165
            self._captureVar(name, value)
 
1166
 
 
1167
    def _captureVar(self, name, newvalue):
 
1168
        """Set an environment variable, and reset it when finished."""
 
1169
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1170
 
 
1171
    def _restore_debug_flags(self):
 
1172
        debug.debug_flags.clear()
 
1173
        debug.debug_flags.update(self._preserved_debug_flags)
 
1174
 
 
1175
    def _restoreEnvironment(self):
 
1176
        for name, value in self.__old_env.iteritems():
 
1177
            osutils.set_or_unset_env(name, value)
 
1178
 
 
1179
    def _restoreHooks(self):
 
1180
        for klass, hooks in self._preserved_hooks.items():
 
1181
            setattr(klass, 'hooks', hooks)
 
1182
 
 
1183
    def knownFailure(self, reason):
 
1184
        """This test has failed for some known reason."""
 
1185
        raise KnownFailure(reason)
 
1186
 
 
1187
    def run(self, result=None):
 
1188
        if result is None: result = self.defaultTestResult()
 
1189
        for feature in getattr(self, '_test_needs_features', []):
 
1190
            if not feature.available():
 
1191
                result.startTest(self)
 
1192
                if getattr(result, 'addNotSupported', None):
 
1193
                    result.addNotSupported(self, feature)
 
1194
                else:
 
1195
                    result.addSuccess(self)
 
1196
                result.stopTest(self)
 
1197
                return
 
1198
        return unittest.TestCase.run(self, result)
 
1199
 
 
1200
    def tearDown(self):
 
1201
        self._runCleanups()
 
1202
        unittest.TestCase.tearDown(self)
 
1203
 
 
1204
    def time(self, callable, *args, **kwargs):
 
1205
        """Run callable and accrue the time it takes to the benchmark time.
 
1206
        
 
1207
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1208
        this will cause lsprofile statistics to be gathered and stored in
 
1209
        self._benchcalls.
 
1210
        """
 
1211
        if self._benchtime is None:
 
1212
            self._benchtime = 0
 
1213
        start = time.time()
 
1214
        try:
 
1215
            if not self._gather_lsprof_in_benchmarks:
 
1216
                return callable(*args, **kwargs)
 
1217
            else:
 
1218
                # record this benchmark
 
1219
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1220
                stats.sort()
 
1221
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1222
                return ret
 
1223
        finally:
 
1224
            self._benchtime += time.time() - start
 
1225
 
 
1226
    def _runCleanups(self):
 
1227
        """Run registered cleanup functions. 
 
1228
 
 
1229
        This should only be called from TestCase.tearDown.
 
1230
        """
 
1231
        # TODO: Perhaps this should keep running cleanups even if 
 
1232
        # one of them fails?
 
1233
 
 
1234
        # Actually pop the cleanups from the list so tearDown running
 
1235
        # twice is safe (this happens for skipped tests).
 
1236
        while self._cleanups:
 
1237
            self._cleanups.pop()()
 
1238
 
 
1239
    def log(self, *args):
 
1240
        mutter(*args)
 
1241
 
 
1242
    def _get_log(self, keep_log_file=False):
 
1243
        """Get the log from bzrlib.trace calls from this test.
 
1244
 
 
1245
        :param keep_log_file: When True, if the log is still a file on disk
 
1246
            leave it as a file on disk. When False, if the log is still a file
 
1247
            on disk, the log file is deleted and the log preserved as
 
1248
            self._log_contents.
 
1249
        :return: A string containing the log.
 
1250
        """
 
1251
        # flush the log file, to get all content
 
1252
        import bzrlib.trace
 
1253
        bzrlib.trace._trace_file.flush()
 
1254
        if self._log_contents:
 
1255
            return self._log_contents
 
1256
        if self._log_file_name is not None:
 
1257
            logfile = open(self._log_file_name)
 
1258
            try:
 
1259
                log_contents = logfile.read()
 
1260
            finally:
 
1261
                logfile.close()
 
1262
            if not keep_log_file:
 
1263
                self._log_contents = log_contents
 
1264
                try:
 
1265
                    os.remove(self._log_file_name)
 
1266
                except OSError, e:
 
1267
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1268
                        print >>sys.stderr, ('Unable to delete log file '
 
1269
                                             ' %r' % self._log_file_name)
 
1270
                    else:
 
1271
                        raise
 
1272
            return log_contents
 
1273
        else:
 
1274
            return "DELETED log file to reduce memory footprint"
 
1275
 
 
1276
    @deprecated_method(zero_eighteen)
 
1277
    def capture(self, cmd, retcode=0):
 
1278
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
1279
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
1280
 
 
1281
    def requireFeature(self, feature):
 
1282
        """This test requires a specific feature is available.
 
1283
 
 
1284
        :raises UnavailableFeature: When feature is not available.
 
1285
        """
 
1286
        if not feature.available():
 
1287
            raise UnavailableFeature(feature)
 
1288
 
 
1289
    @deprecated_method(zero_eighteen)
 
1290
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1291
                         working_dir=None):
 
1292
        """Invoke bzr and return (stdout, stderr).
 
1293
 
 
1294
        Don't call this method, just use run_bzr() which is equivalent.
 
1295
 
 
1296
        :param argv: Arguments to invoke bzr.  This may be either a 
 
1297
            single string, in which case it is split by shlex into words, 
 
1298
            or a list of arguments.
 
1299
        :param retcode: Expected return code, or None for don't-care.
 
1300
        :param encoding: Encoding for sys.stdout and sys.stderr
 
1301
        :param stdin: A string to be used as stdin for the command.
 
1302
        :param working_dir: Change to this directory before running
 
1303
        """
 
1304
        return self._run_bzr_autosplit(argv, retcode=retcode,
 
1305
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1306
                )
 
1307
 
 
1308
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1309
            working_dir):
 
1310
        """Run bazaar command line, splitting up a string command line."""
 
1311
        if isinstance(args, basestring):
 
1312
            args = list(shlex.split(args))
 
1313
        return self._run_bzr_core(args, retcode=retcode,
 
1314
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1315
                )
 
1316
 
 
1317
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1318
            working_dir):
 
1319
        if encoding is None:
 
1320
            encoding = bzrlib.user_encoding
 
1321
        stdout = StringIOWrapper()
 
1322
        stderr = StringIOWrapper()
 
1323
        stdout.encoding = encoding
 
1324
        stderr.encoding = encoding
 
1325
 
 
1326
        self.log('run bzr: %r', args)
 
1327
        # FIXME: don't call into logging here
 
1328
        handler = logging.StreamHandler(stderr)
 
1329
        handler.setLevel(logging.INFO)
 
1330
        logger = logging.getLogger('')
 
1331
        logger.addHandler(handler)
 
1332
        old_ui_factory = ui.ui_factory
 
1333
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1334
 
 
1335
        cwd = None
 
1336
        if working_dir is not None:
 
1337
            cwd = osutils.getcwd()
 
1338
            os.chdir(working_dir)
 
1339
 
 
1340
        try:
 
1341
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1342
                stdout, stderr,
 
1343
                bzrlib.commands.run_bzr_catch_errors,
 
1344
                args)
 
1345
        finally:
 
1346
            logger.removeHandler(handler)
 
1347
            ui.ui_factory = old_ui_factory
 
1348
            if cwd is not None:
 
1349
                os.chdir(cwd)
 
1350
 
 
1351
        out = stdout.getvalue()
 
1352
        err = stderr.getvalue()
 
1353
        if out:
 
1354
            self.log('output:\n%r', out)
 
1355
        if err:
 
1356
            self.log('errors:\n%r', err)
 
1357
        if retcode is not None:
 
1358
            self.assertEquals(retcode, result,
 
1359
                              message='Unexpected return code')
 
1360
        return out, err
 
1361
 
 
1362
    def run_bzr(self, *args, **kwargs):
 
1363
        """Invoke bzr, as if it were run from the command line.
 
1364
 
 
1365
        The argument list should not include the bzr program name - the
 
1366
        first argument is normally the bzr command.  Arguments may be
 
1367
        passed in three ways:
 
1368
 
 
1369
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1370
        when the command contains whitespace or metacharacters, or 
 
1371
        is built up at run time.
 
1372
 
 
1373
        2- A single string, eg "add a".  This is the most convenient 
 
1374
        for hardcoded commands.
 
1375
 
 
1376
        3- Several varargs parameters, eg run_bzr("add", "a").  
 
1377
        This is not recommended for new code.
 
1378
 
 
1379
        This runs bzr through the interface that catches and reports
 
1380
        errors, and with logging set to something approximating the
 
1381
        default, so that error reporting can be checked.
 
1382
 
 
1383
        This should be the main method for tests that want to exercise the
 
1384
        overall behavior of the bzr application (rather than a unit test
 
1385
        or a functional test of the library.)
 
1386
 
 
1387
        This sends the stdout/stderr results into the test's log,
 
1388
        where it may be useful for debugging.  See also run_captured.
 
1389
 
 
1390
        :keyword stdin: A string to be used as stdin for the command.
 
1391
        :keyword retcode: The status code the command should return;
 
1392
            default 0.
 
1393
        :keyword working_dir: The directory to run the command in
 
1394
        :keyword error_regexes: A list of expected error messages.  If
 
1395
            specified they must be seen in the error output of the command.
 
1396
        """
 
1397
        retcode = kwargs.pop('retcode', 0)
 
1398
        encoding = kwargs.pop('encoding', None)
 
1399
        stdin = kwargs.pop('stdin', None)
 
1400
        working_dir = kwargs.pop('working_dir', None)
 
1401
        error_regexes = kwargs.pop('error_regexes', [])
 
1402
 
 
1403
        if kwargs:
 
1404
            raise TypeError("run_bzr() got unexpected keyword arguments '%s'"
 
1405
                            % kwargs.keys())
 
1406
 
 
1407
        if len(args) == 1:
 
1408
            if isinstance(args[0], (list, basestring)):
 
1409
                args = args[0]
 
1410
        else:
 
1411
            symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
 
1412
                                   DeprecationWarning, stacklevel=3)
 
1413
 
 
1414
        out, err = self._run_bzr_autosplit(args=args,
 
1415
            retcode=retcode,
 
1416
            encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1417
            )
 
1418
 
 
1419
        for regex in error_regexes:
 
1420
            self.assertContainsRe(err, regex)
 
1421
        return out, err
 
1422
 
 
1423
    def run_bzr_decode(self, *args, **kwargs):
 
1424
        if 'encoding' in kwargs:
 
1425
            encoding = kwargs['encoding']
 
1426
        else:
 
1427
            encoding = bzrlib.user_encoding
 
1428
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1429
 
 
1430
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1431
        """Run bzr, and check that stderr contains the supplied regexes
 
1432
 
 
1433
        :param error_regexes: Sequence of regular expressions which
 
1434
            must each be found in the error output. The relative ordering
 
1435
            is not enforced.
 
1436
        :param args: command-line arguments for bzr
 
1437
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1438
            This function changes the default value of retcode to be 3,
 
1439
            since in most cases this is run when you expect bzr to fail.
 
1440
 
 
1441
        :return: (out, err) The actual output of running the command (in case
 
1442
            you want to do more inspection)
 
1443
 
 
1444
        Examples of use::
 
1445
 
 
1446
            # Make sure that commit is failing because there is nothing to do
 
1447
            self.run_bzr_error(['no changes to commit'],
 
1448
                               ['commit', '-m', 'my commit comment'])
 
1449
            # Make sure --strict is handling an unknown file, rather than
 
1450
            # giving us the 'nothing to do' error
 
1451
            self.build_tree(['unknown'])
 
1452
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1453
                               ['commit', --strict', '-m', 'my commit comment'])
 
1454
        """
 
1455
        kwargs.setdefault('retcode', 3)
 
1456
        kwargs['error_regexes'] = error_regexes
 
1457
        out, err = self.run_bzr(*args, **kwargs)
 
1458
        return out, err
 
1459
 
 
1460
    def run_bzr_subprocess(self, *args, **kwargs):
 
1461
        """Run bzr in a subprocess for testing.
 
1462
 
 
1463
        This starts a new Python interpreter and runs bzr in there. 
 
1464
        This should only be used for tests that have a justifiable need for
 
1465
        this isolation: e.g. they are testing startup time, or signal
 
1466
        handling, or early startup code, etc.  Subprocess code can't be 
 
1467
        profiled or debugged so easily.
 
1468
 
 
1469
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1470
            None is supplied, the status code is not checked.
 
1471
        :keyword env_changes: A dictionary which lists changes to environment
 
1472
            variables. A value of None will unset the env variable.
 
1473
            The values must be strings. The change will only occur in the
 
1474
            child, so you don't need to fix the environment after running.
 
1475
        :keyword universal_newlines: Convert CRLF => LF
 
1476
        :keyword allow_plugins: By default the subprocess is run with
 
1477
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1478
            for system-wide plugins to create unexpected output on stderr,
 
1479
            which can cause unnecessary test failures.
 
1480
        """
 
1481
        env_changes = kwargs.get('env_changes', {})
 
1482
        working_dir = kwargs.get('working_dir', None)
 
1483
        allow_plugins = kwargs.get('allow_plugins', False)
 
1484
        if len(args) == 1:
 
1485
            if isinstance(args[0], list):
 
1486
                args = args[0]
 
1487
            elif isinstance(args[0], basestring):
 
1488
                args = list(shlex.split(args[0]))
 
1489
        else:
 
1490
            symbol_versioning.warn(zero_ninetyone %
 
1491
                                   "passing varargs to run_bzr_subprocess",
 
1492
                                   DeprecationWarning, stacklevel=3)
 
1493
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1494
                                            working_dir=working_dir,
 
1495
                                            allow_plugins=allow_plugins)
 
1496
        # We distinguish between retcode=None and retcode not passed.
 
1497
        supplied_retcode = kwargs.get('retcode', 0)
 
1498
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1499
            universal_newlines=kwargs.get('universal_newlines', False),
 
1500
            process_args=args)
 
1501
 
 
1502
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1503
                             skip_if_plan_to_signal=False,
 
1504
                             working_dir=None,
 
1505
                             allow_plugins=False):
 
1506
        """Start bzr in a subprocess for testing.
 
1507
 
 
1508
        This starts a new Python interpreter and runs bzr in there.
 
1509
        This should only be used for tests that have a justifiable need for
 
1510
        this isolation: e.g. they are testing startup time, or signal
 
1511
        handling, or early startup code, etc.  Subprocess code can't be
 
1512
        profiled or debugged so easily.
 
1513
 
 
1514
        :param process_args: a list of arguments to pass to the bzr executable,
 
1515
            for example ``['--version']``.
 
1516
        :param env_changes: A dictionary which lists changes to environment
 
1517
            variables. A value of None will unset the env variable.
 
1518
            The values must be strings. The change will only occur in the
 
1519
            child, so you don't need to fix the environment after running.
 
1520
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1521
            is not available.
 
1522
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1523
 
 
1524
        :returns: Popen object for the started process.
 
1525
        """
 
1526
        if skip_if_plan_to_signal:
 
1527
            if not getattr(os, 'kill', None):
 
1528
                raise TestSkipped("os.kill not available.")
 
1529
 
 
1530
        if env_changes is None:
 
1531
            env_changes = {}
 
1532
        old_env = {}
 
1533
 
 
1534
        def cleanup_environment():
 
1535
            for env_var, value in env_changes.iteritems():
 
1536
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1537
 
 
1538
        def restore_environment():
 
1539
            for env_var, value in old_env.iteritems():
 
1540
                osutils.set_or_unset_env(env_var, value)
 
1541
 
 
1542
        bzr_path = self.get_bzr_path()
 
1543
 
 
1544
        cwd = None
 
1545
        if working_dir is not None:
 
1546
            cwd = osutils.getcwd()
 
1547
            os.chdir(working_dir)
 
1548
 
 
1549
        try:
 
1550
            # win32 subprocess doesn't support preexec_fn
 
1551
            # so we will avoid using it on all platforms, just to
 
1552
            # make sure the code path is used, and we don't break on win32
 
1553
            cleanup_environment()
 
1554
            command = [sys.executable, bzr_path]
 
1555
            if not allow_plugins:
 
1556
                command.append('--no-plugins')
 
1557
            command.extend(process_args)
 
1558
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1559
        finally:
 
1560
            restore_environment()
 
1561
            if cwd is not None:
 
1562
                os.chdir(cwd)
 
1563
 
 
1564
        return process
 
1565
 
 
1566
    def _popen(self, *args, **kwargs):
 
1567
        """Place a call to Popen.
 
1568
 
 
1569
        Allows tests to override this method to intercept the calls made to
 
1570
        Popen for introspection.
 
1571
        """
 
1572
        return Popen(*args, **kwargs)
 
1573
 
 
1574
    def get_bzr_path(self):
 
1575
        """Return the path of the 'bzr' executable for this test suite."""
 
1576
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1577
        if not os.path.isfile(bzr_path):
 
1578
            # We are probably installed. Assume sys.argv is the right file
 
1579
            bzr_path = sys.argv[0]
 
1580
        return bzr_path
 
1581
 
 
1582
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1583
                              universal_newlines=False, process_args=None):
 
1584
        """Finish the execution of process.
 
1585
 
 
1586
        :param process: the Popen object returned from start_bzr_subprocess.
 
1587
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1588
            None is supplied, the status code is not checked.
 
1589
        :param send_signal: an optional signal to send to the process.
 
1590
        :param universal_newlines: Convert CRLF => LF
 
1591
        :returns: (stdout, stderr)
 
1592
        """
 
1593
        if send_signal is not None:
 
1594
            os.kill(process.pid, send_signal)
 
1595
        out, err = process.communicate()
 
1596
 
 
1597
        if universal_newlines:
 
1598
            out = out.replace('\r\n', '\n')
 
1599
            err = err.replace('\r\n', '\n')
 
1600
 
 
1601
        if retcode is not None and retcode != process.returncode:
 
1602
            if process_args is None:
 
1603
                process_args = "(unknown args)"
 
1604
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1605
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1606
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1607
                      % (process_args, retcode, process.returncode))
 
1608
        return [out, err]
 
1609
 
 
1610
    def check_inventory_shape(self, inv, shape):
 
1611
        """Compare an inventory to a list of expected names.
 
1612
 
 
1613
        Fail if they are not precisely equal.
 
1614
        """
 
1615
        extras = []
 
1616
        shape = list(shape)             # copy
 
1617
        for path, ie in inv.entries():
 
1618
            name = path.replace('\\', '/')
 
1619
            if ie.kind == 'directory':
 
1620
                name = name + '/'
 
1621
            if name in shape:
 
1622
                shape.remove(name)
 
1623
            else:
 
1624
                extras.append(name)
 
1625
        if shape:
 
1626
            self.fail("expected paths not found in inventory: %r" % shape)
 
1627
        if extras:
 
1628
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1629
 
 
1630
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1631
                         a_callable=None, *args, **kwargs):
 
1632
        """Call callable with redirected std io pipes.
 
1633
 
 
1634
        Returns the return code."""
 
1635
        if not callable(a_callable):
 
1636
            raise ValueError("a_callable must be callable.")
 
1637
        if stdin is None:
 
1638
            stdin = StringIO("")
 
1639
        if stdout is None:
 
1640
            if getattr(self, "_log_file", None) is not None:
 
1641
                stdout = self._log_file
 
1642
            else:
 
1643
                stdout = StringIO()
 
1644
        if stderr is None:
 
1645
            if getattr(self, "_log_file", None is not None):
 
1646
                stderr = self._log_file
 
1647
            else:
 
1648
                stderr = StringIO()
 
1649
        real_stdin = sys.stdin
 
1650
        real_stdout = sys.stdout
 
1651
        real_stderr = sys.stderr
 
1652
        try:
 
1653
            sys.stdout = stdout
 
1654
            sys.stderr = stderr
 
1655
            sys.stdin = stdin
 
1656
            return a_callable(*args, **kwargs)
 
1657
        finally:
 
1658
            sys.stdout = real_stdout
 
1659
            sys.stderr = real_stderr
 
1660
            sys.stdin = real_stdin
 
1661
 
 
1662
    def reduceLockdirTimeout(self):
 
1663
        """Reduce the default lock timeout for the duration of the test, so that
 
1664
        if LockContention occurs during a test, it does so quickly.
 
1665
 
 
1666
        Tests that expect to provoke LockContention errors should call this.
 
1667
        """
 
1668
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1669
        def resetTimeout():
 
1670
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1671
        self.addCleanup(resetTimeout)
 
1672
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1673
 
 
1674
 
 
1675
class TestCaseWithMemoryTransport(TestCase):
 
1676
    """Common test class for tests that do not need disk resources.
 
1677
 
 
1678
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1679
 
 
1680
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1681
 
 
1682
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1683
    a directory which does not exist. This serves to help ensure test isolation
 
1684
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1685
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1686
    file defaults for the transport in tests, nor does it obey the command line
 
1687
    override, so tests that accidentally write to the common directory should
 
1688
    be rare.
 
1689
 
 
1690
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1691
    a .bzr directory that stops us ascending higher into the filesystem.
 
1692
    """
 
1693
 
 
1694
    TEST_ROOT = None
 
1695
    _TEST_NAME = 'test'
 
1696
 
 
1697
    def __init__(self, methodName='runTest'):
 
1698
        # allow test parameterisation after test construction and before test
 
1699
        # execution. Variables that the parameteriser sets need to be 
 
1700
        # ones that are not set by setUp, or setUp will trash them.
 
1701
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1702
        self.vfs_transport_factory = default_transport
 
1703
        self.transport_server = None
 
1704
        self.transport_readonly_server = None
 
1705
        self.__vfs_server = None
 
1706
 
 
1707
    def get_transport(self, relpath=None):
 
1708
        """Return a writeable transport.
 
1709
 
 
1710
        This transport is for the test scratch space relative to
 
1711
        "self._test_root"
 
1712
        
 
1713
        :param relpath: a path relative to the base url.
 
1714
        """
 
1715
        t = get_transport(self.get_url(relpath))
 
1716
        self.assertFalse(t.is_readonly())
 
1717
        return t
 
1718
 
 
1719
    def get_readonly_transport(self, relpath=None):
 
1720
        """Return a readonly transport for the test scratch space
 
1721
        
 
1722
        This can be used to test that operations which should only need
 
1723
        readonly access in fact do not try to write.
 
1724
 
 
1725
        :param relpath: a path relative to the base url.
 
1726
        """
 
1727
        t = get_transport(self.get_readonly_url(relpath))
 
1728
        self.assertTrue(t.is_readonly())
 
1729
        return t
 
1730
 
 
1731
    def create_transport_readonly_server(self):
 
1732
        """Create a transport server from class defined at init.
 
1733
 
 
1734
        This is mostly a hook for daughter classes.
 
1735
        """
 
1736
        return self.transport_readonly_server()
 
1737
 
 
1738
    def get_readonly_server(self):
 
1739
        """Get the server instance for the readonly transport
 
1740
 
 
1741
        This is useful for some tests with specific servers to do diagnostics.
 
1742
        """
 
1743
        if self.__readonly_server is None:
 
1744
            if self.transport_readonly_server is None:
 
1745
                # readonly decorator requested
 
1746
                # bring up the server
 
1747
                self.__readonly_server = ReadonlyServer()
 
1748
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1749
            else:
 
1750
                self.__readonly_server = self.create_transport_readonly_server()
 
1751
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1752
            self.addCleanup(self.__readonly_server.tearDown)
 
1753
        return self.__readonly_server
 
1754
 
 
1755
    def get_readonly_url(self, relpath=None):
 
1756
        """Get a URL for the readonly transport.
 
1757
 
 
1758
        This will either be backed by '.' or a decorator to the transport 
 
1759
        used by self.get_url()
 
1760
        relpath provides for clients to get a path relative to the base url.
 
1761
        These should only be downwards relative, not upwards.
 
1762
        """
 
1763
        base = self.get_readonly_server().get_url()
 
1764
        return self._adjust_url(base, relpath)
 
1765
 
 
1766
    def get_vfs_only_server(self):
 
1767
        """Get the vfs only read/write server instance.
 
1768
 
 
1769
        This is useful for some tests with specific servers that need
 
1770
        diagnostics.
 
1771
 
 
1772
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1773
        is no means to override it.
 
1774
        """
 
1775
        if self.__vfs_server is None:
 
1776
            self.__vfs_server = MemoryServer()
 
1777
            self.__vfs_server.setUp()
 
1778
            self.addCleanup(self.__vfs_server.tearDown)
 
1779
        return self.__vfs_server
 
1780
 
 
1781
    def get_server(self):
 
1782
        """Get the read/write server instance.
 
1783
 
 
1784
        This is useful for some tests with specific servers that need
 
1785
        diagnostics.
 
1786
 
 
1787
        This is built from the self.transport_server factory. If that is None,
 
1788
        then the self.get_vfs_server is returned.
 
1789
        """
 
1790
        if self.__server is None:
 
1791
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1792
                return self.get_vfs_only_server()
 
1793
            else:
 
1794
                # bring up a decorated means of access to the vfs only server.
 
1795
                self.__server = self.transport_server()
 
1796
                try:
 
1797
                    self.__server.setUp(self.get_vfs_only_server())
 
1798
                except TypeError, e:
 
1799
                    # This should never happen; the try:Except here is to assist
 
1800
                    # developers having to update code rather than seeing an
 
1801
                    # uninformative TypeError.
 
1802
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1803
            self.addCleanup(self.__server.tearDown)
 
1804
        return self.__server
 
1805
 
 
1806
    def _adjust_url(self, base, relpath):
 
1807
        """Get a URL (or maybe a path) for the readwrite transport.
 
1808
 
 
1809
        This will either be backed by '.' or to an equivalent non-file based
 
1810
        facility.
 
1811
        relpath provides for clients to get a path relative to the base url.
 
1812
        These should only be downwards relative, not upwards.
 
1813
        """
 
1814
        if relpath is not None and relpath != '.':
 
1815
            if not base.endswith('/'):
 
1816
                base = base + '/'
 
1817
            # XXX: Really base should be a url; we did after all call
 
1818
            # get_url()!  But sometimes it's just a path (from
 
1819
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1820
            # to a non-escaped local path.
 
1821
            if base.startswith('./') or base.startswith('/'):
 
1822
                base += relpath
 
1823
            else:
 
1824
                base += urlutils.escape(relpath)
 
1825
        return base
 
1826
 
 
1827
    def get_url(self, relpath=None):
 
1828
        """Get a URL (or maybe a path) for the readwrite transport.
 
1829
 
 
1830
        This will either be backed by '.' or to an equivalent non-file based
 
1831
        facility.
 
1832
        relpath provides for clients to get a path relative to the base url.
 
1833
        These should only be downwards relative, not upwards.
 
1834
        """
 
1835
        base = self.get_server().get_url()
 
1836
        return self._adjust_url(base, relpath)
 
1837
 
 
1838
    def get_vfs_only_url(self, relpath=None):
 
1839
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1840
 
 
1841
        This will never be a smart protocol.  It always has all the
 
1842
        capabilities of the local filesystem, but it might actually be a
 
1843
        MemoryTransport or some other similar virtual filesystem.
 
1844
 
 
1845
        This is the backing transport (if any) of the server returned by
 
1846
        get_url and get_readonly_url.
 
1847
 
 
1848
        :param relpath: provides for clients to get a path relative to the base
 
1849
            url.  These should only be downwards relative, not upwards.
 
1850
        :return: A URL
 
1851
        """
 
1852
        base = self.get_vfs_only_server().get_url()
 
1853
        return self._adjust_url(base, relpath)
 
1854
 
 
1855
    def _make_test_root(self):
 
1856
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1857
            return
 
1858
        root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1859
        TestCaseWithMemoryTransport.TEST_ROOT = root
 
1860
        
 
1861
        # make a fake bzr directory there to prevent any tests propagating
 
1862
        # up onto the source directory's real branch
 
1863
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1864
 
 
1865
        # The same directory is used by all tests, and we're not specifically
 
1866
        # told when all tests are finished.  This will do.
 
1867
        atexit.register(_rmtree_temp_dir, root)
 
1868
 
 
1869
    def makeAndChdirToTestDir(self):
 
1870
        """Create a temporary directories for this one test.
 
1871
        
 
1872
        This must set self.test_home_dir and self.test_dir and chdir to
 
1873
        self.test_dir.
 
1874
        
 
1875
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1876
        """
 
1877
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1878
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1879
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1880
        
 
1881
    def make_branch(self, relpath, format=None):
 
1882
        """Create a branch on the transport at relpath."""
 
1883
        repo = self.make_repository(relpath, format=format)
 
1884
        return repo.bzrdir.create_branch()
 
1885
 
 
1886
    def make_bzrdir(self, relpath, format=None):
 
1887
        try:
 
1888
            # might be a relative or absolute path
 
1889
            maybe_a_url = self.get_url(relpath)
 
1890
            segments = maybe_a_url.rsplit('/', 1)
 
1891
            t = get_transport(maybe_a_url)
 
1892
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1893
                t.ensure_base()
 
1894
            if format is None:
 
1895
                format = 'default'
 
1896
            if isinstance(format, basestring):
 
1897
                format = bzrdir.format_registry.make_bzrdir(format)
 
1898
            return format.initialize_on_transport(t)
 
1899
        except errors.UninitializableFormat:
 
1900
            raise TestSkipped("Format %s is not initializable." % format)
 
1901
 
 
1902
    def make_repository(self, relpath, shared=False, format=None):
 
1903
        """Create a repository on our default transport at relpath.
 
1904
        
 
1905
        Note that relpath must be a relative path, not a full url.
 
1906
        """
 
1907
        # FIXME: If you create a remoterepository this returns the underlying
 
1908
        # real format, which is incorrect.  Actually we should make sure that 
 
1909
        # RemoteBzrDir returns a RemoteRepository.
 
1910
        # maybe  mbp 20070410
 
1911
        made_control = self.make_bzrdir(relpath, format=format)
 
1912
        return made_control.create_repository(shared=shared)
 
1913
 
 
1914
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1915
        """Create a branch on the default transport and a MemoryTree for it."""
 
1916
        b = self.make_branch(relpath, format=format)
 
1917
        return memorytree.MemoryTree.create_on_branch(b)
 
1918
 
 
1919
    def overrideEnvironmentForTesting(self):
 
1920
        os.environ['HOME'] = self.test_home_dir
 
1921
        os.environ['BZR_HOME'] = self.test_home_dir
 
1922
        
 
1923
    def setUp(self):
 
1924
        super(TestCaseWithMemoryTransport, self).setUp()
 
1925
        self._make_test_root()
 
1926
        _currentdir = os.getcwdu()
 
1927
        def _leaveDirectory():
 
1928
            os.chdir(_currentdir)
 
1929
        self.addCleanup(_leaveDirectory)
 
1930
        self.makeAndChdirToTestDir()
 
1931
        self.overrideEnvironmentForTesting()
 
1932
        self.__readonly_server = None
 
1933
        self.__server = None
 
1934
        self.reduceLockdirTimeout()
 
1935
 
 
1936
     
 
1937
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1938
    """Derived class that runs a test within a temporary directory.
 
1939
 
 
1940
    This is useful for tests that need to create a branch, etc.
 
1941
 
 
1942
    The directory is created in a slightly complex way: for each
 
1943
    Python invocation, a new temporary top-level directory is created.
 
1944
    All test cases create their own directory within that.  If the
 
1945
    tests complete successfully, the directory is removed.
 
1946
 
 
1947
    :ivar test_base_dir: The path of the top-level directory for this 
 
1948
    test, which contains a home directory and a work directory.
 
1949
 
 
1950
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
1951
    which is used as $HOME for this test.
 
1952
 
 
1953
    :ivar test_dir: A directory under test_base_dir used as the current
 
1954
    directory when the test proper is run.
 
1955
    """
 
1956
 
 
1957
    OVERRIDE_PYTHON = 'python'
 
1958
 
 
1959
    def check_file_contents(self, filename, expect):
 
1960
        self.log("check contents of file %s" % filename)
 
1961
        contents = file(filename, 'r').read()
 
1962
        if contents != expect:
 
1963
            self.log("expected: %r" % expect)
 
1964
            self.log("actually: %r" % contents)
 
1965
            self.fail("contents of %s not as expected" % filename)
 
1966
 
 
1967
    def makeAndChdirToTestDir(self):
 
1968
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1969
        
 
1970
        For TestCaseInTempDir we create a temporary directory based on the test
 
1971
        name and then create two subdirs - test and home under it.
 
1972
        """
 
1973
        # create a directory within the top level test directory
 
1974
        candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
 
1975
        # now create test and home directories within this dir
 
1976
        self.test_base_dir = candidate_dir
 
1977
        self.test_home_dir = self.test_base_dir + '/home'
 
1978
        os.mkdir(self.test_home_dir)
 
1979
        self.test_dir = self.test_base_dir + '/work'
 
1980
        os.mkdir(self.test_dir)
 
1981
        os.chdir(self.test_dir)
 
1982
        # put name of test inside
 
1983
        f = file(self.test_base_dir + '/name', 'w')
 
1984
        try:
 
1985
            f.write(self.id())
 
1986
        finally:
 
1987
            f.close()
 
1988
        self.addCleanup(self.deleteTestDir)
 
1989
 
 
1990
    def deleteTestDir(self):
 
1991
        os.chdir(self.TEST_ROOT)
 
1992
        _rmtree_temp_dir(self.test_base_dir)
 
1993
 
 
1994
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1995
        """Build a test tree according to a pattern.
 
1996
 
 
1997
        shape is a sequence of file specifications.  If the final
 
1998
        character is '/', a directory is created.
 
1999
 
 
2000
        This assumes that all the elements in the tree being built are new.
 
2001
 
 
2002
        This doesn't add anything to a branch.
 
2003
 
 
2004
        :param line_endings: Either 'binary' or 'native'
 
2005
            in binary mode, exact contents are written in native mode, the
 
2006
            line endings match the default platform endings.
 
2007
        :param transport: A transport to write to, for building trees on VFS's.
 
2008
            If the transport is readonly or None, "." is opened automatically.
 
2009
        :return: None
 
2010
        """
 
2011
        # It's OK to just create them using forward slashes on windows.
 
2012
        if transport is None or transport.is_readonly():
 
2013
            transport = get_transport(".")
 
2014
        for name in shape:
 
2015
            self.assert_(isinstance(name, basestring))
 
2016
            if name[-1] == '/':
 
2017
                transport.mkdir(urlutils.escape(name[:-1]))
 
2018
            else:
 
2019
                if line_endings == 'binary':
 
2020
                    end = '\n'
 
2021
                elif line_endings == 'native':
 
2022
                    end = os.linesep
 
2023
                else:
 
2024
                    raise errors.BzrError(
 
2025
                        'Invalid line ending request %r' % line_endings)
 
2026
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2027
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2028
 
 
2029
    def build_tree_contents(self, shape):
 
2030
        build_tree_contents(shape)
 
2031
 
 
2032
    def assertFileEqual(self, content, path):
 
2033
        """Fail if path does not contain 'content'."""
 
2034
        self.failUnlessExists(path)
 
2035
        f = file(path, 'rb')
 
2036
        try:
 
2037
            s = f.read()
 
2038
        finally:
 
2039
            f.close()
 
2040
        self.assertEqualDiff(content, s)
 
2041
 
 
2042
    def failUnlessExists(self, path):
 
2043
        """Fail unless path or paths, which may be abs or relative, exist."""
 
2044
        if not isinstance(path, basestring):
 
2045
            for p in path:
 
2046
                self.failUnlessExists(p)
 
2047
        else:
 
2048
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
2049
 
 
2050
    def failIfExists(self, path):
 
2051
        """Fail if path or paths, which may be abs or relative, exist."""
 
2052
        if not isinstance(path, basestring):
 
2053
            for p in path:
 
2054
                self.failIfExists(p)
 
2055
        else:
 
2056
            self.failIf(osutils.lexists(path),path+" exists")
 
2057
 
 
2058
    def assertInWorkingTree(self,path,root_path='.',tree=None):
 
2059
        """Assert whether path or paths are in the WorkingTree"""
 
2060
        if tree is None:
 
2061
            tree = workingtree.WorkingTree.open(root_path)
 
2062
        if not isinstance(path, basestring):
 
2063
            for p in path:
 
2064
                self.assertInWorkingTree(p,tree=tree)
 
2065
        else:
 
2066
            self.assertIsNot(tree.path2id(path), None,
 
2067
                path+' not in working tree.')
 
2068
 
 
2069
    def assertNotInWorkingTree(self,path,root_path='.',tree=None):
 
2070
        """Assert whether path or paths are not in the WorkingTree"""
 
2071
        if tree is None:
 
2072
            tree = workingtree.WorkingTree.open(root_path)
 
2073
        if not isinstance(path, basestring):
 
2074
            for p in path:
 
2075
                self.assertNotInWorkingTree(p,tree=tree)
 
2076
        else:
 
2077
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2078
 
 
2079
 
 
2080
class TestCaseWithTransport(TestCaseInTempDir):
 
2081
    """A test case that provides get_url and get_readonly_url facilities.
 
2082
 
 
2083
    These back onto two transport servers, one for readonly access and one for
 
2084
    read write access.
 
2085
 
 
2086
    If no explicit class is provided for readonly access, a
 
2087
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2088
    based read write transports.
 
2089
 
 
2090
    If an explicit class is provided for readonly access, that server and the 
 
2091
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2092
    """
 
2093
 
 
2094
    def get_vfs_only_server(self):
 
2095
        """See TestCaseWithMemoryTransport.
 
2096
 
 
2097
        This is useful for some tests with specific servers that need
 
2098
        diagnostics.
 
2099
        """
 
2100
        if self.__vfs_server is None:
 
2101
            self.__vfs_server = self.vfs_transport_factory()
 
2102
            self.__vfs_server.setUp()
 
2103
            self.addCleanup(self.__vfs_server.tearDown)
 
2104
        return self.__vfs_server
 
2105
 
 
2106
    def make_branch_and_tree(self, relpath, format=None):
 
2107
        """Create a branch on the transport and a tree locally.
 
2108
 
 
2109
        If the transport is not a LocalTransport, the Tree can't be created on
 
2110
        the transport.  In that case if the vfs_transport_factory is
 
2111
        LocalURLServer the working tree is created in the local
 
2112
        directory backing the transport, and the returned tree's branch and
 
2113
        repository will also be accessed locally. Otherwise a lightweight
 
2114
        checkout is created and returned.
 
2115
 
 
2116
        :param format: The BzrDirFormat.
 
2117
        :returns: the WorkingTree.
 
2118
        """
 
2119
        # TODO: always use the local disk path for the working tree,
 
2120
        # this obviously requires a format that supports branch references
 
2121
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2122
        # RBC 20060208
 
2123
        b = self.make_branch(relpath, format=format)
 
2124
        try:
 
2125
            return b.bzrdir.create_workingtree()
 
2126
        except errors.NotLocalUrl:
 
2127
            # We can only make working trees locally at the moment.  If the
 
2128
            # transport can't support them, then we keep the non-disk-backed
 
2129
            # branch and create a local checkout.
 
2130
            if self.vfs_transport_factory is LocalURLServer:
 
2131
                # the branch is colocated on disk, we cannot create a checkout.
 
2132
                # hopefully callers will expect this.
 
2133
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2134
                return local_controldir.create_workingtree()
 
2135
            else:
 
2136
                return b.create_checkout(relpath, lightweight=True)
 
2137
 
 
2138
    def assertIsDirectory(self, relpath, transport):
 
2139
        """Assert that relpath within transport is a directory.
 
2140
 
 
2141
        This may not be possible on all transports; in that case it propagates
 
2142
        a TransportNotPossible.
 
2143
        """
 
2144
        try:
 
2145
            mode = transport.stat(relpath).st_mode
 
2146
        except errors.NoSuchFile:
 
2147
            self.fail("path %s is not a directory; no such file"
 
2148
                      % (relpath))
 
2149
        if not stat.S_ISDIR(mode):
 
2150
            self.fail("path %s is not a directory; has mode %#o"
 
2151
                      % (relpath, mode))
 
2152
 
 
2153
    def assertTreesEqual(self, left, right):
 
2154
        """Check that left and right have the same content and properties."""
 
2155
        # we use a tree delta to check for equality of the content, and we
 
2156
        # manually check for equality of other things such as the parents list.
 
2157
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2158
        differences = left.changes_from(right)
 
2159
        self.assertFalse(differences.has_changed(),
 
2160
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2161
 
 
2162
    def setUp(self):
 
2163
        super(TestCaseWithTransport, self).setUp()
 
2164
        self.__vfs_server = None
 
2165
 
 
2166
 
 
2167
class ChrootedTestCase(TestCaseWithTransport):
 
2168
    """A support class that provides readonly urls outside the local namespace.
 
2169
 
 
2170
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2171
    is then we are chrooted already, if it is not then an HttpServer is used
 
2172
    for readonly urls.
 
2173
 
 
2174
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2175
                       be used without needed to redo it when a different 
 
2176
                       subclass is in use ?
 
2177
    """
 
2178
 
 
2179
    def setUp(self):
 
2180
        super(ChrootedTestCase, self).setUp()
 
2181
        if not self.vfs_transport_factory == MemoryServer:
 
2182
            self.transport_readonly_server = HttpServer
 
2183
 
 
2184
 
 
2185
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
 
2186
                       random_order=False):
 
2187
    """Create a test suite by filtering another one.
 
2188
    
 
2189
    :param suite:           the source suite
 
2190
    :param pattern:         pattern that names must match
 
2191
    :param exclude_pattern: pattern that names must not match, if any
 
2192
    :param random_order:    if True, tests in the new suite will be put in
 
2193
                            random order
 
2194
    :returns: the newly created suite
 
2195
    """ 
 
2196
    return sort_suite_by_re(suite, pattern, exclude_pattern,
 
2197
        random_order, False)
 
2198
 
 
2199
 
 
2200
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
2201
                     random_order=False, append_rest=True):
 
2202
    """Create a test suite by sorting another one.
 
2203
    
 
2204
    :param suite:           the source suite
 
2205
    :param pattern:         pattern that names must match in order to go
 
2206
                            first in the new suite
 
2207
    :param exclude_pattern: pattern that names must not match, if any
 
2208
    :param random_order:    if True, tests in the new suite will be put in
 
2209
                            random order
 
2210
    :param append_rest:     if False, pattern is a strict filter and not
 
2211
                            just an ordering directive
 
2212
    :returns: the newly created suite
 
2213
    """ 
 
2214
    first = []
 
2215
    second = []
 
2216
    filter_re = re.compile(pattern)
 
2217
    if exclude_pattern is not None:
 
2218
        exclude_re = re.compile(exclude_pattern)
 
2219
    for test in iter_suite_tests(suite):
 
2220
        test_id = test.id()
 
2221
        if exclude_pattern is None or not exclude_re.search(test_id):
 
2222
            if filter_re.search(test_id):
 
2223
                first.append(test)
 
2224
            elif append_rest:
 
2225
                second.append(test)
 
2226
    if random_order:
 
2227
        random.shuffle(first)
 
2228
        random.shuffle(second)
 
2229
    return TestUtil.TestSuite(first + second)
 
2230
 
 
2231
 
 
2232
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2233
              stop_on_failure=False,
 
2234
              transport=None, lsprof_timed=None, bench_history=None,
 
2235
              matching_tests_first=None,
 
2236
              list_only=False,
 
2237
              random_seed=None,
 
2238
              exclude_pattern=None,
 
2239
              strict=False,
 
2240
              ):
 
2241
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2242
    if verbose:
 
2243
        verbosity = 2
 
2244
    else:
 
2245
        verbosity = 1
 
2246
    runner = TextTestRunner(stream=sys.stdout,
 
2247
                            descriptions=0,
 
2248
                            verbosity=verbosity,
 
2249
                            bench_history=bench_history,
 
2250
                            list_only=list_only,
 
2251
                            )
 
2252
    runner.stop_on_failure=stop_on_failure
 
2253
    # Initialise the random number generator and display the seed used.
 
2254
    # We convert the seed to a long to make it reuseable across invocations.
 
2255
    random_order = False
 
2256
    if random_seed is not None:
 
2257
        random_order = True
 
2258
        if random_seed == "now":
 
2259
            random_seed = long(time.time())
 
2260
        else:
 
2261
            # Convert the seed to a long if we can
 
2262
            try:
 
2263
                random_seed = long(random_seed)
 
2264
            except:
 
2265
                pass
 
2266
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2267
            (random_seed))
 
2268
        random.seed(random_seed)
 
2269
    # Customise the list of tests if requested
 
2270
    if pattern != '.*' or exclude_pattern is not None or random_order:
 
2271
        if matching_tests_first:
 
2272
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
 
2273
                random_order)
 
2274
        else:
 
2275
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
 
2276
                random_order)
 
2277
    result = runner.run(suite)
 
2278
 
 
2279
    if strict:
 
2280
        return result.wasStrictlySuccessful()
 
2281
 
 
2282
    return result.wasSuccessful()
 
2283
 
 
2284
 
 
2285
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2286
             transport=None,
 
2287
             test_suite_factory=None,
 
2288
             lsprof_timed=None,
 
2289
             bench_history=None,
 
2290
             matching_tests_first=None,
 
2291
             list_only=False,
 
2292
             random_seed=None,
 
2293
             exclude_pattern=None,
 
2294
             strict=False,
 
2295
             ):
 
2296
    """Run the whole test suite under the enhanced runner"""
 
2297
    # XXX: Very ugly way to do this...
 
2298
    # Disable warning about old formats because we don't want it to disturb
 
2299
    # any blackbox tests.
 
2300
    from bzrlib import repository
 
2301
    repository._deprecation_warning_done = True
 
2302
 
 
2303
    global default_transport
 
2304
    if transport is None:
 
2305
        transport = default_transport
 
2306
    old_transport = default_transport
 
2307
    default_transport = transport
 
2308
    try:
 
2309
        if test_suite_factory is None:
 
2310
            suite = test_suite()
 
2311
        else:
 
2312
            suite = test_suite_factory()
 
2313
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2314
                     stop_on_failure=stop_on_failure,
 
2315
                     transport=transport,
 
2316
                     lsprof_timed=lsprof_timed,
 
2317
                     bench_history=bench_history,
 
2318
                     matching_tests_first=matching_tests_first,
 
2319
                     list_only=list_only,
 
2320
                     random_seed=random_seed,
 
2321
                     exclude_pattern=exclude_pattern,
 
2322
                     strict=strict)
 
2323
    finally:
 
2324
        default_transport = old_transport
 
2325
 
 
2326
 
 
2327
def test_suite():
 
2328
    """Build and return TestSuite for the whole of bzrlib.
 
2329
    
 
2330
    This function can be replaced if you need to change the default test
 
2331
    suite on a global basis, but it is not encouraged.
 
2332
    """
 
2333
    testmod_names = [
 
2334
                   'bzrlib.util.tests.test_bencode',
 
2335
                   'bzrlib.tests.test__dirstate_helpers',
 
2336
                   'bzrlib.tests.test_ancestry',
 
2337
                   'bzrlib.tests.test_annotate',
 
2338
                   'bzrlib.tests.test_api',
 
2339
                   'bzrlib.tests.test_atomicfile',
 
2340
                   'bzrlib.tests.test_bad_files',
 
2341
                   'bzrlib.tests.test_branch',
 
2342
                   'bzrlib.tests.test_branchbuilder',
 
2343
                   'bzrlib.tests.test_bugtracker',
 
2344
                   'bzrlib.tests.test_bundle',
 
2345
                   'bzrlib.tests.test_bzrdir',
 
2346
                   'bzrlib.tests.test_cache_utf8',
 
2347
                   'bzrlib.tests.test_commands',
 
2348
                   'bzrlib.tests.test_commit',
 
2349
                   'bzrlib.tests.test_commit_merge',
 
2350
                   'bzrlib.tests.test_config',
 
2351
                   'bzrlib.tests.test_conflicts',
 
2352
                   'bzrlib.tests.test_counted_lock',
 
2353
                   'bzrlib.tests.test_decorators',
 
2354
                   'bzrlib.tests.test_delta',
 
2355
                   'bzrlib.tests.test_deprecated_graph',
 
2356
                   'bzrlib.tests.test_diff',
 
2357
                   'bzrlib.tests.test_dirstate',
 
2358
                   'bzrlib.tests.test_email_message',
 
2359
                   'bzrlib.tests.test_errors',
 
2360
                   'bzrlib.tests.test_escaped_store',
 
2361
                   'bzrlib.tests.test_extract',
 
2362
                   'bzrlib.tests.test_fetch',
 
2363
                   'bzrlib.tests.test_ftp_transport',
 
2364
                   'bzrlib.tests.test_generate_docs',
 
2365
                   'bzrlib.tests.test_generate_ids',
 
2366
                   'bzrlib.tests.test_globbing',
 
2367
                   'bzrlib.tests.test_gpg',
 
2368
                   'bzrlib.tests.test_graph',
 
2369
                   'bzrlib.tests.test_hashcache',
 
2370
                   'bzrlib.tests.test_help',
 
2371
                   'bzrlib.tests.test_hooks',
 
2372
                   'bzrlib.tests.test_http',
 
2373
                   'bzrlib.tests.test_http_response',
 
2374
                   'bzrlib.tests.test_https_ca_bundle',
 
2375
                   'bzrlib.tests.test_identitymap',
 
2376
                   'bzrlib.tests.test_ignores',
 
2377
                   'bzrlib.tests.test_index',
 
2378
                   'bzrlib.tests.test_info',
 
2379
                   'bzrlib.tests.test_inv',
 
2380
                   'bzrlib.tests.test_knit',
 
2381
                   'bzrlib.tests.test_lazy_import',
 
2382
                   'bzrlib.tests.test_lazy_regex',
 
2383
                   'bzrlib.tests.test_lockdir',
 
2384
                   'bzrlib.tests.test_lockable_files',
 
2385
                   'bzrlib.tests.test_log',
 
2386
                   'bzrlib.tests.test_lsprof',
 
2387
                   'bzrlib.tests.test_mail_client',
 
2388
                   'bzrlib.tests.test_memorytree',
 
2389
                   'bzrlib.tests.test_merge',
 
2390
                   'bzrlib.tests.test_merge3',
 
2391
                   'bzrlib.tests.test_merge_core',
 
2392
                   'bzrlib.tests.test_merge_directive',
 
2393
                   'bzrlib.tests.test_missing',
 
2394
                   'bzrlib.tests.test_msgeditor',
 
2395
                   'bzrlib.tests.test_multiparent',
 
2396
                   'bzrlib.tests.test_nonascii',
 
2397
                   'bzrlib.tests.test_options',
 
2398
                   'bzrlib.tests.test_osutils',
 
2399
                   'bzrlib.tests.test_osutils_encodings',
 
2400
                   'bzrlib.tests.test_pack',
 
2401
                   'bzrlib.tests.test_patch',
 
2402
                   'bzrlib.tests.test_patches',
 
2403
                   'bzrlib.tests.test_permissions',
 
2404
                   'bzrlib.tests.test_plugins',
 
2405
                   'bzrlib.tests.test_progress',
 
2406
                   'bzrlib.tests.test_reconcile',
 
2407
                   'bzrlib.tests.test_registry',
 
2408
                   'bzrlib.tests.test_remote',
 
2409
                   'bzrlib.tests.test_repository',
 
2410
                   'bzrlib.tests.test_revert',
 
2411
                   'bzrlib.tests.test_revision',
 
2412
                   'bzrlib.tests.test_revisionnamespaces',
 
2413
                   'bzrlib.tests.test_revisiontree',
 
2414
                   'bzrlib.tests.test_rio',
 
2415
                   'bzrlib.tests.test_sampler',
 
2416
                   'bzrlib.tests.test_selftest',
 
2417
                   'bzrlib.tests.test_setup',
 
2418
                   'bzrlib.tests.test_sftp_transport',
 
2419
                   'bzrlib.tests.test_smart',
 
2420
                   'bzrlib.tests.test_smart_add',
 
2421
                   'bzrlib.tests.test_smart_transport',
 
2422
                   'bzrlib.tests.test_smtp_connection',
 
2423
                   'bzrlib.tests.test_source',
 
2424
                   'bzrlib.tests.test_ssh_transport',
 
2425
                   'bzrlib.tests.test_status',
 
2426
                   'bzrlib.tests.test_store',
 
2427
                   'bzrlib.tests.test_strace',
 
2428
                   'bzrlib.tests.test_subsume',
 
2429
                   'bzrlib.tests.test_symbol_versioning',
 
2430
                   'bzrlib.tests.test_tag',
 
2431
                   'bzrlib.tests.test_testament',
 
2432
                   'bzrlib.tests.test_textfile',
 
2433
                   'bzrlib.tests.test_textmerge',
 
2434
                   'bzrlib.tests.test_timestamp',
 
2435
                   'bzrlib.tests.test_trace',
 
2436
                   'bzrlib.tests.test_transactions',
 
2437
                   'bzrlib.tests.test_transform',
 
2438
                   'bzrlib.tests.test_transport',
 
2439
                   'bzrlib.tests.test_tree',
 
2440
                   'bzrlib.tests.test_treebuilder',
 
2441
                   'bzrlib.tests.test_tsort',
 
2442
                   'bzrlib.tests.test_tuned_gzip',
 
2443
                   'bzrlib.tests.test_ui',
 
2444
                   'bzrlib.tests.test_upgrade',
 
2445
                   'bzrlib.tests.test_urlutils',
 
2446
                   'bzrlib.tests.test_versionedfile',
 
2447
                   'bzrlib.tests.test_version',
 
2448
                   'bzrlib.tests.test_version_info',
 
2449
                   'bzrlib.tests.test_weave',
 
2450
                   'bzrlib.tests.test_whitebox',
 
2451
                   'bzrlib.tests.test_win32utils',
 
2452
                   'bzrlib.tests.test_workingtree',
 
2453
                   'bzrlib.tests.test_workingtree_4',
 
2454
                   'bzrlib.tests.test_wsgi',
 
2455
                   'bzrlib.tests.test_xml',
 
2456
                   ]
 
2457
    test_transport_implementations = [
 
2458
        'bzrlib.tests.test_transport_implementations',
 
2459
        'bzrlib.tests.test_read_bundle',
 
2460
        ]
 
2461
    suite = TestUtil.TestSuite()
 
2462
    loader = TestUtil.TestLoader()
 
2463
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2464
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
2465
    adapter = TransportTestProviderAdapter()
 
2466
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2467
    for package in packages_to_test():
 
2468
        suite.addTest(package.test_suite())
 
2469
    for m in MODULES_TO_TEST:
 
2470
        suite.addTest(loader.loadTestsFromModule(m))
 
2471
    for m in MODULES_TO_DOCTEST:
 
2472
        try:
 
2473
            suite.addTest(doctest.DocTestSuite(m))
 
2474
        except ValueError, e:
 
2475
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2476
            raise
 
2477
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
2478
        if getattr(plugin, 'test_suite', None) is not None:
 
2479
            default_encoding = sys.getdefaultencoding()
 
2480
            try:
 
2481
                plugin_suite = plugin.test_suite()
 
2482
            except ImportError, e:
 
2483
                bzrlib.trace.warning(
 
2484
                    'Unable to test plugin "%s": %s', name, e)
 
2485
            else:
 
2486
                suite.addTest(plugin_suite)
 
2487
            if default_encoding != sys.getdefaultencoding():
 
2488
                bzrlib.trace.warning(
 
2489
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2490
                    sys.getdefaultencoding())
 
2491
                reload(sys)
 
2492
                sys.setdefaultencoding(default_encoding)
 
2493
    return suite
 
2494
 
 
2495
 
 
2496
def adapt_modules(mods_list, adapter, loader, suite):
 
2497
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2498
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2499
        suite.addTests(adapter.adapt(test))
 
2500
 
 
2501
 
 
2502
def _rmtree_temp_dir(dirname):
 
2503
    # If LANG=C we probably have created some bogus paths
 
2504
    # which rmtree(unicode) will fail to delete
 
2505
    # so make sure we are using rmtree(str) to delete everything
 
2506
    # except on win32, where rmtree(str) will fail
 
2507
    # since it doesn't have the property of byte-stream paths
 
2508
    # (they are either ascii or mbcs)
 
2509
    if sys.platform == 'win32':
 
2510
        # make sure we are using the unicode win32 api
 
2511
        dirname = unicode(dirname)
 
2512
    else:
 
2513
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2514
    try:
 
2515
        osutils.rmtree(dirname)
 
2516
    except OSError, e:
 
2517
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2518
            print >>sys.stderr, ('Permission denied: '
 
2519
                                 'unable to remove testing dir '
 
2520
                                 '%s' % os.path.basename(dirname))
 
2521
        else:
 
2522
            raise
 
2523
 
 
2524
 
 
2525
class Feature(object):
 
2526
    """An operating system Feature."""
 
2527
 
 
2528
    def __init__(self):
 
2529
        self._available = None
 
2530
 
 
2531
    def available(self):
 
2532
        """Is the feature available?
 
2533
 
 
2534
        :return: True if the feature is available.
 
2535
        """
 
2536
        if self._available is None:
 
2537
            self._available = self._probe()
 
2538
        return self._available
 
2539
 
 
2540
    def _probe(self):
 
2541
        """Implement this method in concrete features.
 
2542
 
 
2543
        :return: True if the feature is available.
 
2544
        """
 
2545
        raise NotImplementedError
 
2546
 
 
2547
    def __str__(self):
 
2548
        if getattr(self, 'feature_name', None):
 
2549
            return self.feature_name()
 
2550
        return self.__class__.__name__
 
2551
 
 
2552
 
 
2553
class TestScenarioApplier(object):
 
2554
    """A tool to apply scenarios to tests."""
 
2555
 
 
2556
    def adapt(self, test):
 
2557
        """Return a TestSuite containing a copy of test for each scenario."""
 
2558
        result = unittest.TestSuite()
 
2559
        for scenario in self.scenarios:
 
2560
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
2561
        return result
 
2562
 
 
2563
    def adapt_test_to_scenario(self, test, scenario):
 
2564
        """Copy test and apply scenario to it.
 
2565
 
 
2566
        :param test: A test to adapt.
 
2567
        :param scenario: A tuple describing the scenarion.
 
2568
            The first element of the tuple is the new test id.
 
2569
            The second element is a dict containing attributes to set on the
 
2570
            test.
 
2571
        :return: The adapted test.
 
2572
        """
 
2573
        from copy import deepcopy
 
2574
        new_test = deepcopy(test)
 
2575
        for name, value in scenario[1].items():
 
2576
            setattr(new_test, name, value)
 
2577
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
2578
        new_test.id = lambda: new_id
 
2579
        return new_test