/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Andrew Bennetts
  • Date: 2007-04-11 08:09:06 UTC
  • mto: This revision was merged to the branch mainline in revision 2410.
  • Revision ID: andrew.bennetts@canonical.com-20070411080906-4mgvcy6focqyhl0a
Build HACKING.htm from 'make docs'.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import doctest
 
33
import errno
 
34
import logging
 
35
import os
 
36
from pprint import pformat
 
37
import re
 
38
import shlex
 
39
import stat
 
40
from subprocess import Popen, PIPE
 
41
import sys
 
42
import tempfile
 
43
import unittest
 
44
import time
 
45
 
 
46
 
 
47
from bzrlib import (
 
48
    bzrdir,
 
49
    debug,
 
50
    errors,
 
51
    memorytree,
 
52
    osutils,
 
53
    progress,
 
54
    ui,
 
55
    urlutils,
 
56
    )
 
57
import bzrlib.branch
 
58
import bzrlib.commands
 
59
import bzrlib.timestamp
 
60
import bzrlib.export
 
61
import bzrlib.inventory
 
62
import bzrlib.iterablefile
 
63
import bzrlib.lockdir
 
64
try:
 
65
    import bzrlib.lsprof
 
66
except ImportError:
 
67
    # lsprof not available
 
68
    pass
 
69
from bzrlib.merge import merge_inner
 
70
import bzrlib.merge3
 
71
import bzrlib.osutils
 
72
import bzrlib.plugin
 
73
from bzrlib.revision import common_ancestor
 
74
import bzrlib.store
 
75
from bzrlib import symbol_versioning
 
76
import bzrlib.trace
 
77
from bzrlib.transport import get_transport
 
78
import bzrlib.transport
 
79
from bzrlib.transport.local import LocalURLServer
 
80
from bzrlib.transport.memory import MemoryServer
 
81
from bzrlib.transport.readonly import ReadonlyServer
 
82
from bzrlib.trace import mutter, note
 
83
from bzrlib.tests import TestUtil
 
84
from bzrlib.tests.HttpServer import HttpServer
 
85
from bzrlib.tests.TestUtil import (
 
86
                          TestSuite,
 
87
                          TestLoader,
 
88
                          )
 
89
from bzrlib.tests.treeshape import build_tree_contents
 
90
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
91
 
 
92
default_transport = LocalURLServer
 
93
 
 
94
MODULES_TO_TEST = []
 
95
MODULES_TO_DOCTEST = [
 
96
                      bzrlib.timestamp,
 
97
                      bzrlib.errors,
 
98
                      bzrlib.export,
 
99
                      bzrlib.inventory,
 
100
                      bzrlib.iterablefile,
 
101
                      bzrlib.lockdir,
 
102
                      bzrlib.merge3,
 
103
                      bzrlib.option,
 
104
                      bzrlib.store,
 
105
                      ]
 
106
 
 
107
NUMBERED_DIRS = False   # dirs kind for TestCaseInTempDir (numbered or named)
 
108
 
 
109
 
 
110
def packages_to_test():
 
111
    """Return a list of packages to test.
 
112
 
 
113
    The packages are not globally imported so that import failures are
 
114
    triggered when running selftest, not when importing the command.
 
115
    """
 
116
    import bzrlib.doc
 
117
    import bzrlib.tests.blackbox
 
118
    import bzrlib.tests.branch_implementations
 
119
    import bzrlib.tests.bzrdir_implementations
 
120
    import bzrlib.tests.interrepository_implementations
 
121
    import bzrlib.tests.interversionedfile_implementations
 
122
    import bzrlib.tests.intertree_implementations
 
123
    import bzrlib.tests.per_lock
 
124
    import bzrlib.tests.repository_implementations
 
125
    import bzrlib.tests.revisionstore_implementations
 
126
    import bzrlib.tests.tree_implementations
 
127
    import bzrlib.tests.workingtree_implementations
 
128
    return [
 
129
            bzrlib.doc,
 
130
            bzrlib.tests.blackbox,
 
131
            bzrlib.tests.branch_implementations,
 
132
            bzrlib.tests.bzrdir_implementations,
 
133
            bzrlib.tests.interrepository_implementations,
 
134
            bzrlib.tests.interversionedfile_implementations,
 
135
            bzrlib.tests.intertree_implementations,
 
136
            bzrlib.tests.per_lock,
 
137
            bzrlib.tests.repository_implementations,
 
138
            bzrlib.tests.revisionstore_implementations,
 
139
            bzrlib.tests.tree_implementations,
 
140
            bzrlib.tests.workingtree_implementations,
 
141
            ]
 
142
 
 
143
 
 
144
class ExtendedTestResult(unittest._TextTestResult):
 
145
    """Accepts, reports and accumulates the results of running tests.
 
146
 
 
147
    Compared to this unittest version this class adds support for profiling,
 
148
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
149
    There are further-specialized subclasses for different types of display.
 
150
    """
 
151
 
 
152
    stop_early = False
 
153
    
 
154
    def __init__(self, stream, descriptions, verbosity,
 
155
                 bench_history=None,
 
156
                 num_tests=None,
 
157
                 ):
 
158
        """Construct new TestResult.
 
159
 
 
160
        :param bench_history: Optionally, a writable file object to accumulate
 
161
            benchmark results.
 
162
        """
 
163
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
164
        if bench_history is not None:
 
165
            from bzrlib.version import _get_bzr_source_tree
 
166
            src_tree = _get_bzr_source_tree()
 
167
            if src_tree:
 
168
                try:
 
169
                    revision_id = src_tree.get_parent_ids()[0]
 
170
                except IndexError:
 
171
                    # XXX: if this is a brand new tree, do the same as if there
 
172
                    # is no branch.
 
173
                    revision_id = ''
 
174
            else:
 
175
                # XXX: If there's no branch, what should we do?
 
176
                revision_id = ''
 
177
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
178
        self._bench_history = bench_history
 
179
        self.ui = ui.ui_factory
 
180
        self.num_tests = num_tests
 
181
        self.error_count = 0
 
182
        self.failure_count = 0
 
183
        self.known_failure_count = 0
 
184
        self.skip_count = 0
 
185
        self.unsupported = {}
 
186
        self.count = 0
 
187
        self._overall_start_time = time.time()
 
188
    
 
189
    def extractBenchmarkTime(self, testCase):
 
190
        """Add a benchmark time for the current test case."""
 
191
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
192
    
 
193
    def _elapsedTestTimeString(self):
 
194
        """Return a time string for the overall time the current test has taken."""
 
195
        return self._formatTime(time.time() - self._start_time)
 
196
 
 
197
    def _testTimeString(self):
 
198
        if self._benchmarkTime is not None:
 
199
            return "%s/%s" % (
 
200
                self._formatTime(self._benchmarkTime),
 
201
                self._elapsedTestTimeString())
 
202
        else:
 
203
            return "           %s" % self._elapsedTestTimeString()
 
204
 
 
205
    def _formatTime(self, seconds):
 
206
        """Format seconds as milliseconds with leading spaces."""
 
207
        # some benchmarks can take thousands of seconds to run, so we need 8
 
208
        # places
 
209
        return "%8dms" % (1000 * seconds)
 
210
 
 
211
    def _shortened_test_description(self, test):
 
212
        what = test.id()
 
213
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
214
        return what
 
215
 
 
216
    def startTest(self, test):
 
217
        unittest.TestResult.startTest(self, test)
 
218
        self.report_test_start(test)
 
219
        test.number = self.count
 
220
        self._recordTestStartTime()
 
221
 
 
222
    def _recordTestStartTime(self):
 
223
        """Record that a test has started."""
 
224
        self._start_time = time.time()
 
225
 
 
226
    def _cleanupLogFile(self, test):
 
227
        # We can only do this if we have one of our TestCases, not if
 
228
        # we have a doctest.
 
229
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
230
        if setKeepLogfile is not None:
 
231
            setKeepLogfile()
 
232
 
 
233
    def addError(self, test, err):
 
234
        self.extractBenchmarkTime(test)
 
235
        self._cleanupLogFile(test)
 
236
        if isinstance(err[1], TestSkipped):
 
237
            return self.addSkipped(test, err)
 
238
        elif isinstance(err[1], UnavailableFeature):
 
239
            return self.addNotSupported(test, err[1].args[0])
 
240
        unittest.TestResult.addError(self, test, err)
 
241
        self.error_count += 1
 
242
        self.report_error(test, err)
 
243
        if self.stop_early:
 
244
            self.stop()
 
245
 
 
246
    def addFailure(self, test, err):
 
247
        self._cleanupLogFile(test)
 
248
        self.extractBenchmarkTime(test)
 
249
        if isinstance(err[1], KnownFailure):
 
250
            return self.addKnownFailure(test, err)
 
251
        unittest.TestResult.addFailure(self, test, err)
 
252
        self.failure_count += 1
 
253
        self.report_failure(test, err)
 
254
        if self.stop_early:
 
255
            self.stop()
 
256
 
 
257
    def addKnownFailure(self, test, err):
 
258
        self.known_failure_count += 1
 
259
        self.report_known_failure(test, err)
 
260
 
 
261
    def addNotSupported(self, test, feature):
 
262
        self.unsupported.setdefault(str(feature), 0)
 
263
        self.unsupported[str(feature)] += 1
 
264
        self.report_unsupported(test, feature)
 
265
 
 
266
    def addSuccess(self, test):
 
267
        self.extractBenchmarkTime(test)
 
268
        if self._bench_history is not None:
 
269
            if self._benchmarkTime is not None:
 
270
                self._bench_history.write("%s %s\n" % (
 
271
                    self._formatTime(self._benchmarkTime),
 
272
                    test.id()))
 
273
        self.report_success(test)
 
274
        unittest.TestResult.addSuccess(self, test)
 
275
 
 
276
    def addSkipped(self, test, skip_excinfo):
 
277
        self.report_skip(test, skip_excinfo)
 
278
        # seems best to treat this as success from point-of-view of unittest
 
279
        # -- it actually does nothing so it barely matters :)
 
280
        try:
 
281
            test.tearDown()
 
282
        except KeyboardInterrupt:
 
283
            raise
 
284
        except:
 
285
            self.addError(test, test.__exc_info())
 
286
        else:
 
287
            unittest.TestResult.addSuccess(self, test)
 
288
 
 
289
    def printErrorList(self, flavour, errors):
 
290
        for test, err in errors:
 
291
            self.stream.writeln(self.separator1)
 
292
            self.stream.write("%s: " % flavour)
 
293
            if NUMBERED_DIRS:
 
294
                self.stream.write('#%d ' % test.number)
 
295
            self.stream.writeln(self.getDescription(test))
 
296
            if getattr(test, '_get_log', None) is not None:
 
297
                print >>self.stream
 
298
                print >>self.stream, \
 
299
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
300
                print >>self.stream, test._get_log()
 
301
                print >>self.stream, \
 
302
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
303
            self.stream.writeln(self.separator2)
 
304
            self.stream.writeln("%s" % err)
 
305
 
 
306
    def finished(self):
 
307
        pass
 
308
 
 
309
    def report_cleaning_up(self):
 
310
        pass
 
311
 
 
312
    def report_success(self, test):
 
313
        pass
 
314
 
 
315
 
 
316
class TextTestResult(ExtendedTestResult):
 
317
    """Displays progress and results of tests in text form"""
 
318
 
 
319
    def __init__(self, stream, descriptions, verbosity,
 
320
                 bench_history=None,
 
321
                 num_tests=None,
 
322
                 pb=None,
 
323
                 ):
 
324
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
325
            bench_history, num_tests)
 
326
        if pb is None:
 
327
            self.pb = self.ui.nested_progress_bar()
 
328
            self._supplied_pb = False
 
329
        else:
 
330
            self.pb = pb
 
331
            self._supplied_pb = True
 
332
        self.pb.show_pct = False
 
333
        self.pb.show_spinner = False
 
334
        self.pb.show_eta = False,
 
335
        self.pb.show_count = False
 
336
        self.pb.show_bar = False
 
337
 
 
338
    def report_starting(self):
 
339
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
340
 
 
341
    def _progress_prefix_text(self):
 
342
        a = '[%d' % self.count
 
343
        if self.num_tests is not None:
 
344
            a +='/%d' % self.num_tests
 
345
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
346
        if self.error_count:
 
347
            a += ', %d errors' % self.error_count
 
348
        if self.failure_count:
 
349
            a += ', %d failed' % self.failure_count
 
350
        if self.known_failure_count:
 
351
            a += ', %d known failures' % self.known_failure_count
 
352
        if self.skip_count:
 
353
            a += ', %d skipped' % self.skip_count
 
354
        if self.unsupported:
 
355
            a += ', %d missing features' % len(self.unsupported)
 
356
        a += ']'
 
357
        return a
 
358
 
 
359
    def report_test_start(self, test):
 
360
        self.count += 1
 
361
        self.pb.update(
 
362
                self._progress_prefix_text()
 
363
                + ' ' 
 
364
                + self._shortened_test_description(test))
 
365
 
 
366
    def _test_description(self, test):
 
367
        if NUMBERED_DIRS:
 
368
            return '#%d %s' % (self.count,
 
369
                               self._shortened_test_description(test))
 
370
        else:
 
371
            return self._shortened_test_description(test)
 
372
 
 
373
    def report_error(self, test, err):
 
374
        self.pb.note('ERROR: %s\n    %s\n', 
 
375
            self._test_description(test),
 
376
            err[1],
 
377
            )
 
378
 
 
379
    def report_failure(self, test, err):
 
380
        self.pb.note('FAIL: %s\n    %s\n', 
 
381
            self._test_description(test),
 
382
            err[1],
 
383
            )
 
384
 
 
385
    def report_known_failure(self, test, err):
 
386
        self.pb.note('XFAIL: %s\n%s\n',
 
387
            self._test_description(test), err[1])
 
388
 
 
389
    def report_skip(self, test, skip_excinfo):
 
390
        self.skip_count += 1
 
391
        if False:
 
392
            # at the moment these are mostly not things we can fix
 
393
            # and so they just produce stipple; use the verbose reporter
 
394
            # to see them.
 
395
            if False:
 
396
                # show test and reason for skip
 
397
                self.pb.note('SKIP: %s\n    %s\n', 
 
398
                    self._shortened_test_description(test),
 
399
                    skip_excinfo[1])
 
400
            else:
 
401
                # since the class name was left behind in the still-visible
 
402
                # progress bar...
 
403
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
404
 
 
405
    def report_unsupported(self, test, feature):
 
406
        """test cannot be run because feature is missing."""
 
407
                  
 
408
    def report_cleaning_up(self):
 
409
        self.pb.update('cleaning up...')
 
410
 
 
411
    def finished(self):
 
412
        if not self._supplied_pb:
 
413
            self.pb.finished()
 
414
 
 
415
 
 
416
class VerboseTestResult(ExtendedTestResult):
 
417
    """Produce long output, with one line per test run plus times"""
 
418
 
 
419
    def _ellipsize_to_right(self, a_string, final_width):
 
420
        """Truncate and pad a string, keeping the right hand side"""
 
421
        if len(a_string) > final_width:
 
422
            result = '...' + a_string[3-final_width:]
 
423
        else:
 
424
            result = a_string
 
425
        return result.ljust(final_width)
 
426
 
 
427
    def report_starting(self):
 
428
        self.stream.write('running %d tests...\n' % self.num_tests)
 
429
 
 
430
    def report_test_start(self, test):
 
431
        self.count += 1
 
432
        name = self._shortened_test_description(test)
 
433
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
434
        # numbers, plus a trailing blank
 
435
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
436
        if NUMBERED_DIRS:
 
437
            self.stream.write('%5d ' % self.count)
 
438
            self.stream.write(self._ellipsize_to_right(name,
 
439
                                osutils.terminal_width()-36))
 
440
        else:
 
441
            self.stream.write(self._ellipsize_to_right(name,
 
442
                                osutils.terminal_width()-30))
 
443
        self.stream.flush()
 
444
 
 
445
    def _error_summary(self, err):
 
446
        indent = ' ' * 4
 
447
        if NUMBERED_DIRS:
 
448
            indent += ' ' * 6
 
449
        return '%s%s' % (indent, err[1])
 
450
 
 
451
    def report_error(self, test, err):
 
452
        self.stream.writeln('ERROR %s\n%s'
 
453
                % (self._testTimeString(),
 
454
                   self._error_summary(err)))
 
455
 
 
456
    def report_failure(self, test, err):
 
457
        self.stream.writeln(' FAIL %s\n%s'
 
458
                % (self._testTimeString(),
 
459
                   self._error_summary(err)))
 
460
 
 
461
    def report_known_failure(self, test, err):
 
462
        self.stream.writeln('XFAIL %s\n%s'
 
463
                % (self._testTimeString(),
 
464
                   self._error_summary(err)))
 
465
 
 
466
    def report_success(self, test):
 
467
        self.stream.writeln('   OK %s' % self._testTimeString())
 
468
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
469
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
470
            stats.pprint(file=self.stream)
 
471
        # flush the stream so that we get smooth output. This verbose mode is
 
472
        # used to show the output in PQM.
 
473
        self.stream.flush()
 
474
 
 
475
    def report_skip(self, test, skip_excinfo):
 
476
        self.skip_count += 1
 
477
        self.stream.writeln(' SKIP %s\n%s'
 
478
                % (self._testTimeString(),
 
479
                   self._error_summary(skip_excinfo)))
 
480
 
 
481
    def report_unsupported(self, test, feature):
 
482
        """test cannot be run because feature is missing."""
 
483
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
484
                %(self._testTimeString(), feature))
 
485
                  
 
486
 
 
487
 
 
488
class TextTestRunner(object):
 
489
    stop_on_failure = False
 
490
 
 
491
    def __init__(self,
 
492
                 stream=sys.stderr,
 
493
                 descriptions=0,
 
494
                 verbosity=1,
 
495
                 keep_output=False,
 
496
                 bench_history=None):
 
497
        self.stream = unittest._WritelnDecorator(stream)
 
498
        self.descriptions = descriptions
 
499
        self.verbosity = verbosity
 
500
        self.keep_output = keep_output
 
501
        self._bench_history = bench_history
 
502
 
 
503
    def run(self, test):
 
504
        "Run the given test case or test suite."
 
505
        startTime = time.time()
 
506
        if self.verbosity == 1:
 
507
            result_class = TextTestResult
 
508
        elif self.verbosity >= 2:
 
509
            result_class = VerboseTestResult
 
510
        result = result_class(self.stream,
 
511
                              self.descriptions,
 
512
                              self.verbosity,
 
513
                              bench_history=self._bench_history,
 
514
                              num_tests=test.countTestCases(),
 
515
                              )
 
516
        result.stop_early = self.stop_on_failure
 
517
        result.report_starting()
 
518
        test.run(result)
 
519
        stopTime = time.time()
 
520
        timeTaken = stopTime - startTime
 
521
        result.printErrors()
 
522
        self.stream.writeln(result.separator2)
 
523
        run = result.testsRun
 
524
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
525
                            (run, run != 1 and "s" or "", timeTaken))
 
526
        self.stream.writeln()
 
527
        if not result.wasSuccessful():
 
528
            self.stream.write("FAILED (")
 
529
            failed, errored = map(len, (result.failures, result.errors))
 
530
            if failed:
 
531
                self.stream.write("failures=%d" % failed)
 
532
            if errored:
 
533
                if failed: self.stream.write(", ")
 
534
                self.stream.write("errors=%d" % errored)
 
535
            if result.known_failure_count:
 
536
                if failed or errored: self.stream.write(", ")
 
537
                self.stream.write("known_failure_count=%d" %
 
538
                    result.known_failure_count)
 
539
            self.stream.writeln(")")
 
540
        else:
 
541
            if result.known_failure_count:
 
542
                self.stream.writeln("OK (known_failures=%d)" %
 
543
                    result.known_failure_count)
 
544
            else:
 
545
                self.stream.writeln("OK")
 
546
        if result.skip_count > 0:
 
547
            skipped = result.skip_count
 
548
            self.stream.writeln('%d test%s skipped' %
 
549
                                (skipped, skipped != 1 and "s" or ""))
 
550
        if result.unsupported:
 
551
            for feature, count in sorted(result.unsupported.items()):
 
552
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
553
                    (feature, count))
 
554
        result.report_cleaning_up()
 
555
        # This is still a little bogus, 
 
556
        # but only a little. Folk not using our testrunner will
 
557
        # have to delete their temp directories themselves.
 
558
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
 
559
        if result.wasSuccessful() or not self.keep_output:
 
560
            if test_root is not None:
 
561
                # If LANG=C we probably have created some bogus paths
 
562
                # which rmtree(unicode) will fail to delete
 
563
                # so make sure we are using rmtree(str) to delete everything
 
564
                # except on win32, where rmtree(str) will fail
 
565
                # since it doesn't have the property of byte-stream paths
 
566
                # (they are either ascii or mbcs)
 
567
                if sys.platform == 'win32':
 
568
                    # make sure we are using the unicode win32 api
 
569
                    test_root = unicode(test_root)
 
570
                else:
 
571
                    test_root = test_root.encode(
 
572
                        sys.getfilesystemencoding())
 
573
                try:
 
574
                    osutils.rmtree(test_root)
 
575
                except OSError, e:
 
576
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
577
                        print >>sys.stderr, ('Permission denied: '
 
578
                                             'unable to remove testing dir '
 
579
                                             '%s' % os.path.basename(test_root))
 
580
                    else:
 
581
                        raise
 
582
        else:
 
583
            note("Failed tests working directories are in '%s'\n", test_root)
 
584
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
585
        result.finished()
 
586
        return result
 
587
 
 
588
 
 
589
def iter_suite_tests(suite):
 
590
    """Return all tests in a suite, recursing through nested suites"""
 
591
    for item in suite._tests:
 
592
        if isinstance(item, unittest.TestCase):
 
593
            yield item
 
594
        elif isinstance(item, unittest.TestSuite):
 
595
            for r in iter_suite_tests(item):
 
596
                yield r
 
597
        else:
 
598
            raise Exception('unknown object %r inside test suite %r'
 
599
                            % (item, suite))
 
600
 
 
601
 
 
602
class TestSkipped(Exception):
 
603
    """Indicates that a test was intentionally skipped, rather than failing."""
 
604
 
 
605
 
 
606
class KnownFailure(AssertionError):
 
607
    """Indicates that a test failed in a precisely expected manner.
 
608
 
 
609
    Such failures dont block the whole test suite from passing because they are
 
610
    indicators of partially completed code or of future work. We have an
 
611
    explicit error for them so that we can ensure that they are always visible:
 
612
    KnownFailures are always shown in the output of bzr selftest.
 
613
    """
 
614
 
 
615
 
 
616
class UnavailableFeature(Exception):
 
617
    """A feature required for this test was not available.
 
618
 
 
619
    The feature should be used to construct the exception.
 
620
    """
 
621
 
 
622
 
 
623
class CommandFailed(Exception):
 
624
    pass
 
625
 
 
626
 
 
627
class StringIOWrapper(object):
 
628
    """A wrapper around cStringIO which just adds an encoding attribute.
 
629
    
 
630
    Internally we can check sys.stdout to see what the output encoding
 
631
    should be. However, cStringIO has no encoding attribute that we can
 
632
    set. So we wrap it instead.
 
633
    """
 
634
    encoding='ascii'
 
635
    _cstring = None
 
636
 
 
637
    def __init__(self, s=None):
 
638
        if s is not None:
 
639
            self.__dict__['_cstring'] = StringIO(s)
 
640
        else:
 
641
            self.__dict__['_cstring'] = StringIO()
 
642
 
 
643
    def __getattr__(self, name, getattr=getattr):
 
644
        return getattr(self.__dict__['_cstring'], name)
 
645
 
 
646
    def __setattr__(self, name, val):
 
647
        if name == 'encoding':
 
648
            self.__dict__['encoding'] = val
 
649
        else:
 
650
            return setattr(self._cstring, name, val)
 
651
 
 
652
 
 
653
class TestUIFactory(ui.CLIUIFactory):
 
654
    """A UI Factory for testing.
 
655
 
 
656
    Hide the progress bar but emit note()s.
 
657
    Redirect stdin.
 
658
    Allows get_password to be tested without real tty attached.
 
659
    """
 
660
 
 
661
    def __init__(self,
 
662
                 stdout=None,
 
663
                 stderr=None,
 
664
                 stdin=None):
 
665
        super(TestUIFactory, self).__init__()
 
666
        if stdin is not None:
 
667
            # We use a StringIOWrapper to be able to test various
 
668
            # encodings, but the user is still responsible to
 
669
            # encode the string and to set the encoding attribute
 
670
            # of StringIOWrapper.
 
671
            self.stdin = StringIOWrapper(stdin)
 
672
        if stdout is None:
 
673
            self.stdout = sys.stdout
 
674
        else:
 
675
            self.stdout = stdout
 
676
        if stderr is None:
 
677
            self.stderr = sys.stderr
 
678
        else:
 
679
            self.stderr = stderr
 
680
 
 
681
    def clear(self):
 
682
        """See progress.ProgressBar.clear()."""
 
683
 
 
684
    def clear_term(self):
 
685
        """See progress.ProgressBar.clear_term()."""
 
686
 
 
687
    def clear_term(self):
 
688
        """See progress.ProgressBar.clear_term()."""
 
689
 
 
690
    def finished(self):
 
691
        """See progress.ProgressBar.finished()."""
 
692
 
 
693
    def note(self, fmt_string, *args, **kwargs):
 
694
        """See progress.ProgressBar.note()."""
 
695
        self.stdout.write((fmt_string + "\n") % args)
 
696
 
 
697
    def progress_bar(self):
 
698
        return self
 
699
 
 
700
    def nested_progress_bar(self):
 
701
        return self
 
702
 
 
703
    def update(self, message, count=None, total=None):
 
704
        """See progress.ProgressBar.update()."""
 
705
 
 
706
    def get_non_echoed_password(self, prompt):
 
707
        """Get password from stdin without trying to handle the echo mode"""
 
708
        if prompt:
 
709
            self.stdout.write(prompt)
 
710
        password = self.stdin.readline()
 
711
        if not password:
 
712
            raise EOFError
 
713
        if password[-1] == '\n':
 
714
            password = password[:-1]
 
715
        return password
 
716
 
 
717
 
 
718
class TestCase(unittest.TestCase):
 
719
    """Base class for bzr unit tests.
 
720
    
 
721
    Tests that need access to disk resources should subclass 
 
722
    TestCaseInTempDir not TestCase.
 
723
 
 
724
    Error and debug log messages are redirected from their usual
 
725
    location into a temporary file, the contents of which can be
 
726
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
727
    so that it can also capture file IO.  When the test completes this file
 
728
    is read into memory and removed from disk.
 
729
       
 
730
    There are also convenience functions to invoke bzr's command-line
 
731
    routine, and to build and check bzr trees.
 
732
   
 
733
    In addition to the usual method of overriding tearDown(), this class also
 
734
    allows subclasses to register functions into the _cleanups list, which is
 
735
    run in order as the object is torn down.  It's less likely this will be
 
736
    accidentally overlooked.
 
737
    """
 
738
 
 
739
    _log_file_name = None
 
740
    _log_contents = ''
 
741
    _keep_log_file = False
 
742
    # record lsprof data when performing benchmark calls.
 
743
    _gather_lsprof_in_benchmarks = False
 
744
 
 
745
    def __init__(self, methodName='testMethod'):
 
746
        super(TestCase, self).__init__(methodName)
 
747
        self._cleanups = []
 
748
 
 
749
    def setUp(self):
 
750
        unittest.TestCase.setUp(self)
 
751
        self._cleanEnvironment()
 
752
        bzrlib.trace.disable_default_logging()
 
753
        self._silenceUI()
 
754
        self._startLogFile()
 
755
        self._benchcalls = []
 
756
        self._benchtime = None
 
757
        # prevent hooks affecting tests
 
758
        self._preserved_hooks = {
 
759
            bzrlib.branch.Branch:bzrlib.branch.Branch.hooks,
 
760
            bzrlib.smart.server.SmartTCPServer:bzrlib.smart.server.SmartTCPServer.hooks,
 
761
            }
 
762
        self.addCleanup(self._restoreHooks)
 
763
        # this list of hooks must be kept in sync with the defaults
 
764
        # in branch.py
 
765
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
766
 
 
767
    def _silenceUI(self):
 
768
        """Turn off UI for duration of test"""
 
769
        # by default the UI is off; tests can turn it on if they want it.
 
770
        saved = ui.ui_factory
 
771
        def _restore():
 
772
            ui.ui_factory = saved
 
773
        ui.ui_factory = ui.SilentUIFactory()
 
774
        self.addCleanup(_restore)
 
775
 
 
776
    def _ndiff_strings(self, a, b):
 
777
        """Return ndiff between two strings containing lines.
 
778
        
 
779
        A trailing newline is added if missing to make the strings
 
780
        print properly."""
 
781
        if b and b[-1] != '\n':
 
782
            b += '\n'
 
783
        if a and a[-1] != '\n':
 
784
            a += '\n'
 
785
        difflines = difflib.ndiff(a.splitlines(True),
 
786
                                  b.splitlines(True),
 
787
                                  linejunk=lambda x: False,
 
788
                                  charjunk=lambda x: False)
 
789
        return ''.join(difflines)
 
790
 
 
791
    def assertEqual(self, a, b, message=''):
 
792
        try:
 
793
            if a == b:
 
794
                return
 
795
        except UnicodeError, e:
 
796
            # If we can't compare without getting a UnicodeError, then
 
797
            # obviously they are different
 
798
            mutter('UnicodeError: %s', e)
 
799
        if message:
 
800
            message += '\n'
 
801
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
802
            % (message,
 
803
               pformat(a, indent=4), pformat(b, indent=4)))
 
804
 
 
805
    assertEquals = assertEqual
 
806
 
 
807
    def assertEqualDiff(self, a, b, message=None):
 
808
        """Assert two texts are equal, if not raise an exception.
 
809
        
 
810
        This is intended for use with multi-line strings where it can 
 
811
        be hard to find the differences by eye.
 
812
        """
 
813
        # TODO: perhaps override assertEquals to call this for strings?
 
814
        if a == b:
 
815
            return
 
816
        if message is None:
 
817
            message = "texts not equal:\n"
 
818
        raise AssertionError(message + 
 
819
                             self._ndiff_strings(a, b))      
 
820
        
 
821
    def assertEqualMode(self, mode, mode_test):
 
822
        self.assertEqual(mode, mode_test,
 
823
                         'mode mismatch %o != %o' % (mode, mode_test))
 
824
 
 
825
    def assertStartsWith(self, s, prefix):
 
826
        if not s.startswith(prefix):
 
827
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
828
 
 
829
    def assertEndsWith(self, s, suffix):
 
830
        """Asserts that s ends with suffix."""
 
831
        if not s.endswith(suffix):
 
832
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
833
 
 
834
    def assertContainsRe(self, haystack, needle_re):
 
835
        """Assert that a contains something matching a regular expression."""
 
836
        if not re.search(needle_re, haystack):
 
837
            raise AssertionError('pattern "%r" not found in "%r"'
 
838
                    % (needle_re, haystack))
 
839
 
 
840
    def assertNotContainsRe(self, haystack, needle_re):
 
841
        """Assert that a does not match a regular expression"""
 
842
        if re.search(needle_re, haystack):
 
843
            raise AssertionError('pattern "%s" found in "%s"'
 
844
                    % (needle_re, haystack))
 
845
 
 
846
    def assertSubset(self, sublist, superlist):
 
847
        """Assert that every entry in sublist is present in superlist."""
 
848
        missing = []
 
849
        for entry in sublist:
 
850
            if entry not in superlist:
 
851
                missing.append(entry)
 
852
        if len(missing) > 0:
 
853
            raise AssertionError("value(s) %r not present in container %r" % 
 
854
                                 (missing, superlist))
 
855
 
 
856
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
857
        """Fail unless excClass is raised when the iterator from func is used.
 
858
        
 
859
        Many functions can return generators this makes sure
 
860
        to wrap them in a list() call to make sure the whole generator
 
861
        is run, and that the proper exception is raised.
 
862
        """
 
863
        try:
 
864
            list(func(*args, **kwargs))
 
865
        except excClass:
 
866
            return
 
867
        else:
 
868
            if getattr(excClass,'__name__', None) is not None:
 
869
                excName = excClass.__name__
 
870
            else:
 
871
                excName = str(excClass)
 
872
            raise self.failureException, "%s not raised" % excName
 
873
 
 
874
    def assertRaises(self, excClass, func, *args, **kwargs):
 
875
        """Assert that a callable raises a particular exception.
 
876
 
 
877
        :param excClass: As for the except statement, this may be either an
 
878
        exception class, or a tuple of classes.
 
879
 
 
880
        Returns the exception so that you can examine it.
 
881
        """
 
882
        try:
 
883
            func(*args, **kwargs)
 
884
        except excClass, e:
 
885
            return e
 
886
        else:
 
887
            if getattr(excClass,'__name__', None) is not None:
 
888
                excName = excClass.__name__
 
889
            else:
 
890
                # probably a tuple
 
891
                excName = str(excClass)
 
892
            raise self.failureException, "%s not raised" % excName
 
893
 
 
894
    def assertIs(self, left, right, message=None):
 
895
        if not (left is right):
 
896
            if message is not None:
 
897
                raise AssertionError(message)
 
898
            else:
 
899
                raise AssertionError("%r is not %r." % (left, right))
 
900
 
 
901
    def assertIsNot(self, left, right, message=None):
 
902
        if (left is right):
 
903
            if message is not None:
 
904
                raise AssertionError(message)
 
905
            else:
 
906
                raise AssertionError("%r is %r." % (left, right))
 
907
 
 
908
    def assertTransportMode(self, transport, path, mode):
 
909
        """Fail if a path does not have mode mode.
 
910
        
 
911
        If modes are not supported on this transport, the assertion is ignored.
 
912
        """
 
913
        if not transport._can_roundtrip_unix_modebits():
 
914
            return
 
915
        path_stat = transport.stat(path)
 
916
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
917
        self.assertEqual(mode, actual_mode,
 
918
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
919
 
 
920
    def assertIsInstance(self, obj, kls):
 
921
        """Fail if obj is not an instance of kls"""
 
922
        if not isinstance(obj, kls):
 
923
            self.fail("%r is an instance of %s rather than %s" % (
 
924
                obj, obj.__class__, kls))
 
925
 
 
926
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
927
        """Invoke a test, expecting it to fail for the given reason.
 
928
 
 
929
        This is for assertions that ought to succeed, but currently fail.
 
930
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
931
        about the failure you're expecting.  If a new bug is introduced,
 
932
        AssertionError should be raised, not KnownFailure.
 
933
 
 
934
        Frequently, expectFailure should be followed by an opposite assertion.
 
935
        See example below.
 
936
 
 
937
        Intended to be used with a callable that raises AssertionError as the
 
938
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
939
 
 
940
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
941
        test succeeds.
 
942
 
 
943
        example usage::
 
944
 
 
945
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
946
                             dynamic_val)
 
947
          self.assertEqual(42, dynamic_val)
 
948
 
 
949
          This means that a dynamic_val of 54 will cause the test to raise
 
950
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
951
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
952
          than 54 or 42 will cause an AssertionError.
 
953
        """
 
954
        try:
 
955
            assertion(*args, **kwargs)
 
956
        except AssertionError:
 
957
            raise KnownFailure(reason)
 
958
        else:
 
959
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
960
 
 
961
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
962
        """A helper for callDeprecated and applyDeprecated.
 
963
 
 
964
        :param a_callable: A callable to call.
 
965
        :param args: The positional arguments for the callable
 
966
        :param kwargs: The keyword arguments for the callable
 
967
        :return: A tuple (warnings, result). result is the result of calling
 
968
            a_callable(*args, **kwargs).
 
969
        """
 
970
        local_warnings = []
 
971
        def capture_warnings(msg, cls=None, stacklevel=None):
 
972
            # we've hooked into a deprecation specific callpath,
 
973
            # only deprecations should getting sent via it.
 
974
            self.assertEqual(cls, DeprecationWarning)
 
975
            local_warnings.append(msg)
 
976
        original_warning_method = symbol_versioning.warn
 
977
        symbol_versioning.set_warning_method(capture_warnings)
 
978
        try:
 
979
            result = a_callable(*args, **kwargs)
 
980
        finally:
 
981
            symbol_versioning.set_warning_method(original_warning_method)
 
982
        return (local_warnings, result)
 
983
 
 
984
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
985
        """Call a deprecated callable without warning the user.
 
986
 
 
987
        :param deprecation_format: The deprecation format that the callable
 
988
            should have been deprecated with. This is the same type as the 
 
989
            parameter to deprecated_method/deprecated_function. If the 
 
990
            callable is not deprecated with this format, an assertion error
 
991
            will be raised.
 
992
        :param a_callable: A callable to call. This may be a bound method or
 
993
            a regular function. It will be called with *args and **kwargs.
 
994
        :param args: The positional arguments for the callable
 
995
        :param kwargs: The keyword arguments for the callable
 
996
        :return: The result of a_callable(*args, **kwargs)
 
997
        """
 
998
        call_warnings, result = self._capture_warnings(a_callable,
 
999
            *args, **kwargs)
 
1000
        expected_first_warning = symbol_versioning.deprecation_string(
 
1001
            a_callable, deprecation_format)
 
1002
        if len(call_warnings) == 0:
 
1003
            self.fail("No deprecation warning generated by call to %s" %
 
1004
                a_callable)
 
1005
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1006
        return result
 
1007
 
 
1008
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1009
        """Assert that a callable is deprecated in a particular way.
 
1010
 
 
1011
        This is a very precise test for unusual requirements. The 
 
1012
        applyDeprecated helper function is probably more suited for most tests
 
1013
        as it allows you to simply specify the deprecation format being used
 
1014
        and will ensure that that is issued for the function being called.
 
1015
 
 
1016
        :param expected: a list of the deprecation warnings expected, in order
 
1017
        :param callable: The callable to call
 
1018
        :param args: The positional arguments for the callable
 
1019
        :param kwargs: The keyword arguments for the callable
 
1020
        """
 
1021
        call_warnings, result = self._capture_warnings(callable,
 
1022
            *args, **kwargs)
 
1023
        self.assertEqual(expected, call_warnings)
 
1024
        return result
 
1025
 
 
1026
    def _startLogFile(self):
 
1027
        """Send bzr and test log messages to a temporary file.
 
1028
 
 
1029
        The file is removed as the test is torn down.
 
1030
        """
 
1031
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1032
        self._log_file = os.fdopen(fileno, 'w+')
 
1033
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1034
        self._log_file_name = name
 
1035
        self.addCleanup(self._finishLogFile)
 
1036
 
 
1037
    def _finishLogFile(self):
 
1038
        """Finished with the log file.
 
1039
 
 
1040
        Close the file and delete it, unless setKeepLogfile was called.
 
1041
        """
 
1042
        if self._log_file is None:
 
1043
            return
 
1044
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1045
        self._log_file.close()
 
1046
        self._log_file = None
 
1047
        if not self._keep_log_file:
 
1048
            os.remove(self._log_file_name)
 
1049
            self._log_file_name = None
 
1050
 
 
1051
    def setKeepLogfile(self):
 
1052
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1053
        self._keep_log_file = True
 
1054
 
 
1055
    def addCleanup(self, callable):
 
1056
        """Arrange to run a callable when this case is torn down.
 
1057
 
 
1058
        Callables are run in the reverse of the order they are registered, 
 
1059
        ie last-in first-out.
 
1060
        """
 
1061
        if callable in self._cleanups:
 
1062
            raise ValueError("cleanup function %r already registered on %s" 
 
1063
                    % (callable, self))
 
1064
        self._cleanups.append(callable)
 
1065
 
 
1066
    def _cleanEnvironment(self):
 
1067
        new_env = {
 
1068
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1069
            'HOME': os.getcwd(),
 
1070
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1071
            'BZR_EMAIL': None,
 
1072
            'BZREMAIL': None, # may still be present in the environment
 
1073
            'EMAIL': None,
 
1074
            'BZR_PROGRESS_BAR': None,
 
1075
            # Proxies
 
1076
            'http_proxy': None,
 
1077
            'HTTP_PROXY': None,
 
1078
            'https_proxy': None,
 
1079
            'HTTPS_PROXY': None,
 
1080
            'no_proxy': None,
 
1081
            'NO_PROXY': None,
 
1082
            'all_proxy': None,
 
1083
            'ALL_PROXY': None,
 
1084
            # Nobody cares about these ones AFAIK. So far at
 
1085
            # least. If you do (care), please update this comment
 
1086
            # -- vila 20061212
 
1087
            'ftp_proxy': None,
 
1088
            'FTP_PROXY': None,
 
1089
        }
 
1090
        self.__old_env = {}
 
1091
        self.addCleanup(self._restoreEnvironment)
 
1092
        for name, value in new_env.iteritems():
 
1093
            self._captureVar(name, value)
 
1094
 
 
1095
    def _captureVar(self, name, newvalue):
 
1096
        """Set an environment variable, and reset it when finished."""
 
1097
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1098
 
 
1099
    def _restoreEnvironment(self):
 
1100
        for name, value in self.__old_env.iteritems():
 
1101
            osutils.set_or_unset_env(name, value)
 
1102
 
 
1103
    def _restoreHooks(self):
 
1104
        for klass, hooks in self._preserved_hooks.items():
 
1105
            setattr(klass, 'hooks', hooks)
 
1106
 
 
1107
    def knownFailure(self, reason):
 
1108
        """This test has failed for some known reason."""
 
1109
        raise KnownFailure(reason)
 
1110
 
 
1111
    def run(self, result=None):
 
1112
        if result is None: result = self.defaultTestResult()
 
1113
        for feature in getattr(self, '_test_needs_features', []):
 
1114
            if not feature.available():
 
1115
                result.startTest(self)
 
1116
                if getattr(result, 'addNotSupported', None):
 
1117
                    result.addNotSupported(self, feature)
 
1118
                else:
 
1119
                    result.addSuccess(self)
 
1120
                result.stopTest(self)
 
1121
                return
 
1122
        return unittest.TestCase.run(self, result)
 
1123
 
 
1124
    def tearDown(self):
 
1125
        self._runCleanups()
 
1126
        unittest.TestCase.tearDown(self)
 
1127
 
 
1128
    def time(self, callable, *args, **kwargs):
 
1129
        """Run callable and accrue the time it takes to the benchmark time.
 
1130
        
 
1131
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1132
        this will cause lsprofile statistics to be gathered and stored in
 
1133
        self._benchcalls.
 
1134
        """
 
1135
        if self._benchtime is None:
 
1136
            self._benchtime = 0
 
1137
        start = time.time()
 
1138
        try:
 
1139
            if not self._gather_lsprof_in_benchmarks:
 
1140
                return callable(*args, **kwargs)
 
1141
            else:
 
1142
                # record this benchmark
 
1143
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1144
                stats.sort()
 
1145
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1146
                return ret
 
1147
        finally:
 
1148
            self._benchtime += time.time() - start
 
1149
 
 
1150
    def _runCleanups(self):
 
1151
        """Run registered cleanup functions. 
 
1152
 
 
1153
        This should only be called from TestCase.tearDown.
 
1154
        """
 
1155
        # TODO: Perhaps this should keep running cleanups even if 
 
1156
        # one of them fails?
 
1157
 
 
1158
        # Actually pop the cleanups from the list so tearDown running
 
1159
        # twice is safe (this happens for skipped tests).
 
1160
        while self._cleanups:
 
1161
            self._cleanups.pop()()
 
1162
 
 
1163
    def log(self, *args):
 
1164
        mutter(*args)
 
1165
 
 
1166
    def _get_log(self, keep_log_file=False):
 
1167
        """Return as a string the log for this test. If the file is still
 
1168
        on disk and keep_log_file=False, delete the log file and store the
 
1169
        content in self._log_contents."""
 
1170
        # flush the log file, to get all content
 
1171
        import bzrlib.trace
 
1172
        bzrlib.trace._trace_file.flush()
 
1173
        if self._log_contents:
 
1174
            return self._log_contents
 
1175
        if self._log_file_name is not None:
 
1176
            logfile = open(self._log_file_name)
 
1177
            try:
 
1178
                log_contents = logfile.read()
 
1179
            finally:
 
1180
                logfile.close()
 
1181
            if not keep_log_file:
 
1182
                self._log_contents = log_contents
 
1183
                try:
 
1184
                    os.remove(self._log_file_name)
 
1185
                except OSError, e:
 
1186
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1187
                        print >>sys.stderr, ('Unable to delete log file '
 
1188
                                             ' %r' % self._log_file_name)
 
1189
                    else:
 
1190
                        raise
 
1191
            return log_contents
 
1192
        else:
 
1193
            return "DELETED log file to reduce memory footprint"
 
1194
 
 
1195
    def capture(self, cmd, retcode=0):
 
1196
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
1197
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
1198
 
 
1199
    def requireFeature(self, feature):
 
1200
        """This test requires a specific feature is available.
 
1201
 
 
1202
        :raises UnavailableFeature: When feature is not available.
 
1203
        """
 
1204
        if not feature.available():
 
1205
            raise UnavailableFeature(feature)
 
1206
 
 
1207
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1208
                         working_dir=None):
 
1209
        """Invoke bzr and return (stdout, stderr).
 
1210
 
 
1211
        Useful for code that wants to check the contents of the
 
1212
        output, the way error messages are presented, etc.
 
1213
 
 
1214
        This should be the main method for tests that want to exercise the
 
1215
        overall behavior of the bzr application (rather than a unit test
 
1216
        or a functional test of the library.)
 
1217
 
 
1218
        Much of the old code runs bzr by forking a new copy of Python, but
 
1219
        that is slower, harder to debug, and generally not necessary.
 
1220
 
 
1221
        This runs bzr through the interface that catches and reports
 
1222
        errors, and with logging set to something approximating the
 
1223
        default, so that error reporting can be checked.
 
1224
 
 
1225
        :param argv: arguments to invoke bzr
 
1226
        :param retcode: expected return code, or None for don't-care.
 
1227
        :param encoding: encoding for sys.stdout and sys.stderr
 
1228
        :param stdin: A string to be used as stdin for the command.
 
1229
        :param working_dir: Change to this directory before running
 
1230
        """
 
1231
        if encoding is None:
 
1232
            encoding = bzrlib.user_encoding
 
1233
        stdout = StringIOWrapper()
 
1234
        stderr = StringIOWrapper()
 
1235
        stdout.encoding = encoding
 
1236
        stderr.encoding = encoding
 
1237
 
 
1238
        self.log('run bzr: %r', argv)
 
1239
        # FIXME: don't call into logging here
 
1240
        handler = logging.StreamHandler(stderr)
 
1241
        handler.setLevel(logging.INFO)
 
1242
        logger = logging.getLogger('')
 
1243
        logger.addHandler(handler)
 
1244
        old_ui_factory = ui.ui_factory
 
1245
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1246
 
 
1247
        cwd = None
 
1248
        if working_dir is not None:
 
1249
            cwd = osutils.getcwd()
 
1250
            os.chdir(working_dir)
 
1251
 
 
1252
        try:
 
1253
            saved_debug_flags = frozenset(debug.debug_flags)
 
1254
            debug.debug_flags.clear()
 
1255
            try:
 
1256
                result = self.apply_redirected(ui.ui_factory.stdin,
 
1257
                                               stdout, stderr,
 
1258
                                               bzrlib.commands.run_bzr_catch_errors,
 
1259
                                               argv)
 
1260
            finally:
 
1261
                debug.debug_flags.update(saved_debug_flags)
 
1262
        finally:
 
1263
            logger.removeHandler(handler)
 
1264
            ui.ui_factory = old_ui_factory
 
1265
            if cwd is not None:
 
1266
                os.chdir(cwd)
 
1267
 
 
1268
        out = stdout.getvalue()
 
1269
        err = stderr.getvalue()
 
1270
        if out:
 
1271
            self.log('output:\n%r', out)
 
1272
        if err:
 
1273
            self.log('errors:\n%r', err)
 
1274
        if retcode is not None:
 
1275
            self.assertEquals(retcode, result)
 
1276
        return out, err
 
1277
 
 
1278
    def run_bzr(self, *args, **kwargs):
 
1279
        """Invoke bzr, as if it were run from the command line.
 
1280
 
 
1281
        This should be the main method for tests that want to exercise the
 
1282
        overall behavior of the bzr application (rather than a unit test
 
1283
        or a functional test of the library.)
 
1284
 
 
1285
        This sends the stdout/stderr results into the test's log,
 
1286
        where it may be useful for debugging.  See also run_captured.
 
1287
 
 
1288
        :param stdin: A string to be used as stdin for the command.
 
1289
        :param retcode: The status code the command should return
 
1290
        :param working_dir: The directory to run the command in
 
1291
        """
 
1292
        retcode = kwargs.pop('retcode', 0)
 
1293
        encoding = kwargs.pop('encoding', None)
 
1294
        stdin = kwargs.pop('stdin', None)
 
1295
        working_dir = kwargs.pop('working_dir', None)
 
1296
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
1297
                                     stdin=stdin, working_dir=working_dir)
 
1298
 
 
1299
    def run_bzr_decode(self, *args, **kwargs):
 
1300
        if 'encoding' in kwargs:
 
1301
            encoding = kwargs['encoding']
 
1302
        else:
 
1303
            encoding = bzrlib.user_encoding
 
1304
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1305
 
 
1306
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1307
        """Run bzr, and check that stderr contains the supplied regexes
 
1308
        
 
1309
        :param error_regexes: Sequence of regular expressions which 
 
1310
            must each be found in the error output. The relative ordering
 
1311
            is not enforced.
 
1312
        :param args: command-line arguments for bzr
 
1313
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1314
            This function changes the default value of retcode to be 3,
 
1315
            since in most cases this is run when you expect bzr to fail.
 
1316
        :return: (out, err) The actual output of running the command (in case you
 
1317
                 want to do more inspection)
 
1318
 
 
1319
        Examples of use:
 
1320
            # Make sure that commit is failing because there is nothing to do
 
1321
            self.run_bzr_error(['no changes to commit'],
 
1322
                               'commit', '-m', 'my commit comment')
 
1323
            # Make sure --strict is handling an unknown file, rather than
 
1324
            # giving us the 'nothing to do' error
 
1325
            self.build_tree(['unknown'])
 
1326
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1327
                               'commit', '--strict', '-m', 'my commit comment')
 
1328
        """
 
1329
        kwargs.setdefault('retcode', 3)
 
1330
        out, err = self.run_bzr(*args, **kwargs)
 
1331
        for regex in error_regexes:
 
1332
            self.assertContainsRe(err, regex)
 
1333
        return out, err
 
1334
 
 
1335
    def run_bzr_subprocess(self, *args, **kwargs):
 
1336
        """Run bzr in a subprocess for testing.
 
1337
 
 
1338
        This starts a new Python interpreter and runs bzr in there. 
 
1339
        This should only be used for tests that have a justifiable need for
 
1340
        this isolation: e.g. they are testing startup time, or signal
 
1341
        handling, or early startup code, etc.  Subprocess code can't be 
 
1342
        profiled or debugged so easily.
 
1343
 
 
1344
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1345
            None is supplied, the status code is not checked.
 
1346
        :param env_changes: A dictionary which lists changes to environment
 
1347
            variables. A value of None will unset the env variable.
 
1348
            The values must be strings. The change will only occur in the
 
1349
            child, so you don't need to fix the environment after running.
 
1350
        :param universal_newlines: Convert CRLF => LF
 
1351
        :param allow_plugins: By default the subprocess is run with
 
1352
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1353
            for system-wide plugins to create unexpected output on stderr,
 
1354
            which can cause unnecessary test failures.
 
1355
        """
 
1356
        env_changes = kwargs.get('env_changes', {})
 
1357
        working_dir = kwargs.get('working_dir', None)
 
1358
        allow_plugins = kwargs.get('allow_plugins', False)
 
1359
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1360
                                            working_dir=working_dir,
 
1361
                                            allow_plugins=allow_plugins)
 
1362
        # We distinguish between retcode=None and retcode not passed.
 
1363
        supplied_retcode = kwargs.get('retcode', 0)
 
1364
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1365
            universal_newlines=kwargs.get('universal_newlines', False),
 
1366
            process_args=args)
 
1367
 
 
1368
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1369
                             skip_if_plan_to_signal=False,
 
1370
                             working_dir=None,
 
1371
                             allow_plugins=False):
 
1372
        """Start bzr in a subprocess for testing.
 
1373
 
 
1374
        This starts a new Python interpreter and runs bzr in there.
 
1375
        This should only be used for tests that have a justifiable need for
 
1376
        this isolation: e.g. they are testing startup time, or signal
 
1377
        handling, or early startup code, etc.  Subprocess code can't be
 
1378
        profiled or debugged so easily.
 
1379
 
 
1380
        :param process_args: a list of arguments to pass to the bzr executable,
 
1381
            for example `['--version']`.
 
1382
        :param env_changes: A dictionary which lists changes to environment
 
1383
            variables. A value of None will unset the env variable.
 
1384
            The values must be strings. The change will only occur in the
 
1385
            child, so you don't need to fix the environment after running.
 
1386
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1387
            is not available.
 
1388
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1389
 
 
1390
        :returns: Popen object for the started process.
 
1391
        """
 
1392
        if skip_if_plan_to_signal:
 
1393
            if not getattr(os, 'kill', None):
 
1394
                raise TestSkipped("os.kill not available.")
 
1395
 
 
1396
        if env_changes is None:
 
1397
            env_changes = {}
 
1398
        old_env = {}
 
1399
 
 
1400
        def cleanup_environment():
 
1401
            for env_var, value in env_changes.iteritems():
 
1402
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1403
 
 
1404
        def restore_environment():
 
1405
            for env_var, value in old_env.iteritems():
 
1406
                osutils.set_or_unset_env(env_var, value)
 
1407
 
 
1408
        bzr_path = self.get_bzr_path()
 
1409
 
 
1410
        cwd = None
 
1411
        if working_dir is not None:
 
1412
            cwd = osutils.getcwd()
 
1413
            os.chdir(working_dir)
 
1414
 
 
1415
        try:
 
1416
            # win32 subprocess doesn't support preexec_fn
 
1417
            # so we will avoid using it on all platforms, just to
 
1418
            # make sure the code path is used, and we don't break on win32
 
1419
            cleanup_environment()
 
1420
            command = [sys.executable, bzr_path]
 
1421
            if not allow_plugins:
 
1422
                command.append('--no-plugins')
 
1423
            command.extend(process_args)
 
1424
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1425
        finally:
 
1426
            restore_environment()
 
1427
            if cwd is not None:
 
1428
                os.chdir(cwd)
 
1429
 
 
1430
        return process
 
1431
 
 
1432
    def _popen(self, *args, **kwargs):
 
1433
        """Place a call to Popen.
 
1434
 
 
1435
        Allows tests to override this method to intercept the calls made to
 
1436
        Popen for introspection.
 
1437
        """
 
1438
        return Popen(*args, **kwargs)
 
1439
 
 
1440
    def get_bzr_path(self):
 
1441
        """Return the path of the 'bzr' executable for this test suite."""
 
1442
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1443
        if not os.path.isfile(bzr_path):
 
1444
            # We are probably installed. Assume sys.argv is the right file
 
1445
            bzr_path = sys.argv[0]
 
1446
        return bzr_path
 
1447
 
 
1448
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1449
                              universal_newlines=False, process_args=None):
 
1450
        """Finish the execution of process.
 
1451
 
 
1452
        :param process: the Popen object returned from start_bzr_subprocess.
 
1453
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1454
            None is supplied, the status code is not checked.
 
1455
        :param send_signal: an optional signal to send to the process.
 
1456
        :param universal_newlines: Convert CRLF => LF
 
1457
        :returns: (stdout, stderr)
 
1458
        """
 
1459
        if send_signal is not None:
 
1460
            os.kill(process.pid, send_signal)
 
1461
        out, err = process.communicate()
 
1462
 
 
1463
        if universal_newlines:
 
1464
            out = out.replace('\r\n', '\n')
 
1465
            err = err.replace('\r\n', '\n')
 
1466
 
 
1467
        if retcode is not None and retcode != process.returncode:
 
1468
            if process_args is None:
 
1469
                process_args = "(unknown args)"
 
1470
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1471
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1472
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1473
                      % (process_args, retcode, process.returncode))
 
1474
        return [out, err]
 
1475
 
 
1476
    def check_inventory_shape(self, inv, shape):
 
1477
        """Compare an inventory to a list of expected names.
 
1478
 
 
1479
        Fail if they are not precisely equal.
 
1480
        """
 
1481
        extras = []
 
1482
        shape = list(shape)             # copy
 
1483
        for path, ie in inv.entries():
 
1484
            name = path.replace('\\', '/')
 
1485
            if ie.kind == 'dir':
 
1486
                name = name + '/'
 
1487
            if name in shape:
 
1488
                shape.remove(name)
 
1489
            else:
 
1490
                extras.append(name)
 
1491
        if shape:
 
1492
            self.fail("expected paths not found in inventory: %r" % shape)
 
1493
        if extras:
 
1494
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1495
 
 
1496
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1497
                         a_callable=None, *args, **kwargs):
 
1498
        """Call callable with redirected std io pipes.
 
1499
 
 
1500
        Returns the return code."""
 
1501
        if not callable(a_callable):
 
1502
            raise ValueError("a_callable must be callable.")
 
1503
        if stdin is None:
 
1504
            stdin = StringIO("")
 
1505
        if stdout is None:
 
1506
            if getattr(self, "_log_file", None) is not None:
 
1507
                stdout = self._log_file
 
1508
            else:
 
1509
                stdout = StringIO()
 
1510
        if stderr is None:
 
1511
            if getattr(self, "_log_file", None is not None):
 
1512
                stderr = self._log_file
 
1513
            else:
 
1514
                stderr = StringIO()
 
1515
        real_stdin = sys.stdin
 
1516
        real_stdout = sys.stdout
 
1517
        real_stderr = sys.stderr
 
1518
        try:
 
1519
            sys.stdout = stdout
 
1520
            sys.stderr = stderr
 
1521
            sys.stdin = stdin
 
1522
            return a_callable(*args, **kwargs)
 
1523
        finally:
 
1524
            sys.stdout = real_stdout
 
1525
            sys.stderr = real_stderr
 
1526
            sys.stdin = real_stdin
 
1527
 
 
1528
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1529
    def merge(self, branch_from, wt_to):
 
1530
        """A helper for tests to do a ui-less merge.
 
1531
 
 
1532
        This should move to the main library when someone has time to integrate
 
1533
        it in.
 
1534
        """
 
1535
        # minimal ui-less merge.
 
1536
        wt_to.branch.fetch(branch_from)
 
1537
        base_rev = common_ancestor(branch_from.last_revision(),
 
1538
                                   wt_to.branch.last_revision(),
 
1539
                                   wt_to.branch.repository)
 
1540
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1541
                    wt_to.branch.repository.revision_tree(base_rev),
 
1542
                    this_tree=wt_to)
 
1543
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1544
 
 
1545
    def reduceLockdirTimeout(self):
 
1546
        """Reduce the default lock timeout for the duration of the test, so that
 
1547
        if LockContention occurs during a test, it does so quickly.
 
1548
 
 
1549
        Tests that expect to provoke LockContention errors should call this.
 
1550
        """
 
1551
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1552
        def resetTimeout():
 
1553
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1554
        self.addCleanup(resetTimeout)
 
1555
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1556
 
 
1557
BzrTestBase = TestCase
 
1558
 
 
1559
 
 
1560
class TestCaseWithMemoryTransport(TestCase):
 
1561
    """Common test class for tests that do not need disk resources.
 
1562
 
 
1563
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1564
 
 
1565
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1566
 
 
1567
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1568
    a directory which does not exist. This serves to help ensure test isolation
 
1569
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1570
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1571
    file defaults for the transport in tests, nor does it obey the command line
 
1572
    override, so tests that accidentally write to the common directory should
 
1573
    be rare.
 
1574
    """
 
1575
 
 
1576
    TEST_ROOT = None
 
1577
    _TEST_NAME = 'test'
 
1578
 
 
1579
 
 
1580
    def __init__(self, methodName='runTest'):
 
1581
        # allow test parameterisation after test construction and before test
 
1582
        # execution. Variables that the parameteriser sets need to be 
 
1583
        # ones that are not set by setUp, or setUp will trash them.
 
1584
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1585
        self.vfs_transport_factory = default_transport
 
1586
        self.transport_server = None
 
1587
        self.transport_readonly_server = None
 
1588
        self.__vfs_server = None
 
1589
 
 
1590
    def get_transport(self):
 
1591
        """Return a writeable transport for the test scratch space"""
 
1592
        t = get_transport(self.get_url())
 
1593
        self.assertFalse(t.is_readonly())
 
1594
        return t
 
1595
 
 
1596
    def get_readonly_transport(self):
 
1597
        """Return a readonly transport for the test scratch space
 
1598
        
 
1599
        This can be used to test that operations which should only need
 
1600
        readonly access in fact do not try to write.
 
1601
        """
 
1602
        t = get_transport(self.get_readonly_url())
 
1603
        self.assertTrue(t.is_readonly())
 
1604
        return t
 
1605
 
 
1606
    def create_transport_readonly_server(self):
 
1607
        """Create a transport server from class defined at init.
 
1608
 
 
1609
        This is mostly a hook for daughter classes.
 
1610
        """
 
1611
        return self.transport_readonly_server()
 
1612
 
 
1613
    def get_readonly_server(self):
 
1614
        """Get the server instance for the readonly transport
 
1615
 
 
1616
        This is useful for some tests with specific servers to do diagnostics.
 
1617
        """
 
1618
        if self.__readonly_server is None:
 
1619
            if self.transport_readonly_server is None:
 
1620
                # readonly decorator requested
 
1621
                # bring up the server
 
1622
                self.__readonly_server = ReadonlyServer()
 
1623
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1624
            else:
 
1625
                self.__readonly_server = self.create_transport_readonly_server()
 
1626
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1627
            self.addCleanup(self.__readonly_server.tearDown)
 
1628
        return self.__readonly_server
 
1629
 
 
1630
    def get_readonly_url(self, relpath=None):
 
1631
        """Get a URL for the readonly transport.
 
1632
 
 
1633
        This will either be backed by '.' or a decorator to the transport 
 
1634
        used by self.get_url()
 
1635
        relpath provides for clients to get a path relative to the base url.
 
1636
        These should only be downwards relative, not upwards.
 
1637
        """
 
1638
        base = self.get_readonly_server().get_url()
 
1639
        if relpath is not None:
 
1640
            if not base.endswith('/'):
 
1641
                base = base + '/'
 
1642
            base = base + relpath
 
1643
        return base
 
1644
 
 
1645
    def get_vfs_only_server(self):
 
1646
        """Get the vfs only read/write server instance.
 
1647
 
 
1648
        This is useful for some tests with specific servers that need
 
1649
        diagnostics.
 
1650
 
 
1651
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1652
        is no means to override it.
 
1653
        """
 
1654
        if self.__vfs_server is None:
 
1655
            self.__vfs_server = MemoryServer()
 
1656
            self.__vfs_server.setUp()
 
1657
            self.addCleanup(self.__vfs_server.tearDown)
 
1658
        return self.__vfs_server
 
1659
 
 
1660
    def get_server(self):
 
1661
        """Get the read/write server instance.
 
1662
 
 
1663
        This is useful for some tests with specific servers that need
 
1664
        diagnostics.
 
1665
 
 
1666
        This is built from the self.transport_server factory. If that is None,
 
1667
        then the self.get_vfs_server is returned.
 
1668
        """
 
1669
        if self.__server is None:
 
1670
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1671
                return self.get_vfs_only_server()
 
1672
            else:
 
1673
                # bring up a decorated means of access to the vfs only server.
 
1674
                self.__server = self.transport_server()
 
1675
                try:
 
1676
                    self.__server.setUp(self.get_vfs_only_server())
 
1677
                except TypeError, e:
 
1678
                    # This should never happen; the try:Except here is to assist
 
1679
                    # developers having to update code rather than seeing an
 
1680
                    # uninformative TypeError.
 
1681
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1682
            self.addCleanup(self.__server.tearDown)
 
1683
        return self.__server
 
1684
 
 
1685
    def _adjust_url(self, base, relpath):
 
1686
        """Get a URL (or maybe a path) for the readwrite transport.
 
1687
 
 
1688
        This will either be backed by '.' or to an equivalent non-file based
 
1689
        facility.
 
1690
        relpath provides for clients to get a path relative to the base url.
 
1691
        These should only be downwards relative, not upwards.
 
1692
        """
 
1693
        if relpath is not None and relpath != '.':
 
1694
            if not base.endswith('/'):
 
1695
                base = base + '/'
 
1696
            # XXX: Really base should be a url; we did after all call
 
1697
            # get_url()!  But sometimes it's just a path (from
 
1698
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1699
            # to a non-escaped local path.
 
1700
            if base.startswith('./') or base.startswith('/'):
 
1701
                base += relpath
 
1702
            else:
 
1703
                base += urlutils.escape(relpath)
 
1704
        return base
 
1705
 
 
1706
    def get_url(self, relpath=None):
 
1707
        """Get a URL (or maybe a path) for the readwrite transport.
 
1708
 
 
1709
        This will either be backed by '.' or to an equivalent non-file based
 
1710
        facility.
 
1711
        relpath provides for clients to get a path relative to the base url.
 
1712
        These should only be downwards relative, not upwards.
 
1713
        """
 
1714
        base = self.get_server().get_url()
 
1715
        return self._adjust_url(base, relpath)
 
1716
 
 
1717
    def get_vfs_only_url(self, relpath=None):
 
1718
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1719
 
 
1720
        This will never be a smart protocol.
 
1721
        :param relpath: provides for clients to get a path relative to the base
 
1722
            url.  These should only be downwards relative, not upwards.
 
1723
        """
 
1724
        base = self.get_vfs_only_server().get_url()
 
1725
        return self._adjust_url(base, relpath)
 
1726
 
 
1727
    def _make_test_root(self):
 
1728
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1729
            return
 
1730
        i = 0
 
1731
        while True:
 
1732
            root = u'test%04d.tmp' % i
 
1733
            try:
 
1734
                os.mkdir(root)
 
1735
            except OSError, e:
 
1736
                if e.errno == errno.EEXIST:
 
1737
                    i += 1
 
1738
                    continue
 
1739
                else:
 
1740
                    raise
 
1741
            # successfully created
 
1742
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1743
            break
 
1744
        # make a fake bzr directory there to prevent any tests propagating
 
1745
        # up onto the source directory's real branch
 
1746
        bzrdir.BzrDir.create_standalone_workingtree(
 
1747
            TestCaseWithMemoryTransport.TEST_ROOT)
 
1748
 
 
1749
    def makeAndChdirToTestDir(self):
 
1750
        """Create a temporary directories for this one test.
 
1751
        
 
1752
        This must set self.test_home_dir and self.test_dir and chdir to
 
1753
        self.test_dir.
 
1754
        
 
1755
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1756
        """
 
1757
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1758
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1759
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1760
        
 
1761
    def make_branch(self, relpath, format=None):
 
1762
        """Create a branch on the transport at relpath."""
 
1763
        repo = self.make_repository(relpath, format=format)
 
1764
        return repo.bzrdir.create_branch()
 
1765
 
 
1766
    def make_bzrdir(self, relpath, format=None):
 
1767
        try:
 
1768
            # might be a relative or absolute path
 
1769
            maybe_a_url = self.get_url(relpath)
 
1770
            segments = maybe_a_url.rsplit('/', 1)
 
1771
            t = get_transport(maybe_a_url)
 
1772
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1773
                try:
 
1774
                    t.mkdir('.')
 
1775
                except errors.FileExists:
 
1776
                    pass
 
1777
            if format is None:
 
1778
                format = 'default'
 
1779
            if isinstance(format, basestring):
 
1780
                format = bzrdir.format_registry.make_bzrdir(format)
 
1781
            return format.initialize_on_transport(t)
 
1782
        except errors.UninitializableFormat:
 
1783
            raise TestSkipped("Format %s is not initializable." % format)
 
1784
 
 
1785
    def make_repository(self, relpath, shared=False, format=None):
 
1786
        """Create a repository on our default transport at relpath."""
 
1787
        made_control = self.make_bzrdir(relpath, format=format)
 
1788
        return made_control.create_repository(shared=shared)
 
1789
 
 
1790
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1791
        """Create a branch on the default transport and a MemoryTree for it."""
 
1792
        b = self.make_branch(relpath, format=format)
 
1793
        return memorytree.MemoryTree.create_on_branch(b)
 
1794
 
 
1795
    def overrideEnvironmentForTesting(self):
 
1796
        os.environ['HOME'] = self.test_home_dir
 
1797
        os.environ['BZR_HOME'] = self.test_home_dir
 
1798
        
 
1799
    def setUp(self):
 
1800
        super(TestCaseWithMemoryTransport, self).setUp()
 
1801
        self._make_test_root()
 
1802
        _currentdir = os.getcwdu()
 
1803
        def _leaveDirectory():
 
1804
            os.chdir(_currentdir)
 
1805
        self.addCleanup(_leaveDirectory)
 
1806
        self.makeAndChdirToTestDir()
 
1807
        self.overrideEnvironmentForTesting()
 
1808
        self.__readonly_server = None
 
1809
        self.__server = None
 
1810
        self.reduceLockdirTimeout()
 
1811
 
 
1812
     
 
1813
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1814
    """Derived class that runs a test within a temporary directory.
 
1815
 
 
1816
    This is useful for tests that need to create a branch, etc.
 
1817
 
 
1818
    The directory is created in a slightly complex way: for each
 
1819
    Python invocation, a new temporary top-level directory is created.
 
1820
    All test cases create their own directory within that.  If the
 
1821
    tests complete successfully, the directory is removed.
 
1822
 
 
1823
    InTempDir is an old alias for FunctionalTestCase.
 
1824
    """
 
1825
 
 
1826
    OVERRIDE_PYTHON = 'python'
 
1827
 
 
1828
    def check_file_contents(self, filename, expect):
 
1829
        self.log("check contents of file %s" % filename)
 
1830
        contents = file(filename, 'r').read()
 
1831
        if contents != expect:
 
1832
            self.log("expected: %r" % expect)
 
1833
            self.log("actually: %r" % contents)
 
1834
            self.fail("contents of %s not as expected" % filename)
 
1835
 
 
1836
    def makeAndChdirToTestDir(self):
 
1837
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1838
        
 
1839
        For TestCaseInTempDir we create a temporary directory based on the test
 
1840
        name and then create two subdirs - test and home under it.
 
1841
        """
 
1842
        if NUMBERED_DIRS:       # strongly recommended on Windows
 
1843
                                # due the path length limitation (260 chars)
 
1844
            candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
 
1845
                                             int(self.number/1000),
 
1846
                                             self.number)
 
1847
            os.makedirs(candidate_dir)
 
1848
            self.test_home_dir = candidate_dir + '/home'
 
1849
            os.mkdir(self.test_home_dir)
 
1850
            self.test_dir = candidate_dir + '/work'
 
1851
            os.mkdir(self.test_dir)
 
1852
            os.chdir(self.test_dir)
 
1853
            # put name of test inside
 
1854
            f = file(candidate_dir + '/name', 'w')
 
1855
            f.write(self.id())
 
1856
            f.close()
 
1857
            return
 
1858
        # Else NAMED DIRS
 
1859
        # shorten the name, to avoid test failures due to path length
 
1860
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1861
                   .replace('__main__.', '')[-100:]
 
1862
        # it's possible the same test class is run several times for
 
1863
        # parameterized tests, so make sure the names don't collide.  
 
1864
        i = 0
 
1865
        while True:
 
1866
            if i > 0:
 
1867
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1868
            else:
 
1869
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1870
            if os.path.exists(candidate_dir):
 
1871
                i = i + 1
 
1872
                continue
 
1873
            else:
 
1874
                os.mkdir(candidate_dir)
 
1875
                self.test_home_dir = candidate_dir + '/home'
 
1876
                os.mkdir(self.test_home_dir)
 
1877
                self.test_dir = candidate_dir + '/work'
 
1878
                os.mkdir(self.test_dir)
 
1879
                os.chdir(self.test_dir)
 
1880
                break
 
1881
 
 
1882
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1883
        """Build a test tree according to a pattern.
 
1884
 
 
1885
        shape is a sequence of file specifications.  If the final
 
1886
        character is '/', a directory is created.
 
1887
 
 
1888
        This assumes that all the elements in the tree being built are new.
 
1889
 
 
1890
        This doesn't add anything to a branch.
 
1891
        :param line_endings: Either 'binary' or 'native'
 
1892
                             in binary mode, exact contents are written
 
1893
                             in native mode, the line endings match the
 
1894
                             default platform endings.
 
1895
 
 
1896
        :param transport: A transport to write to, for building trees on 
 
1897
                          VFS's. If the transport is readonly or None,
 
1898
                          "." is opened automatically.
 
1899
        """
 
1900
        # It's OK to just create them using forward slashes on windows.
 
1901
        if transport is None or transport.is_readonly():
 
1902
            transport = get_transport(".")
 
1903
        for name in shape:
 
1904
            self.assert_(isinstance(name, basestring))
 
1905
            if name[-1] == '/':
 
1906
                transport.mkdir(urlutils.escape(name[:-1]))
 
1907
            else:
 
1908
                if line_endings == 'binary':
 
1909
                    end = '\n'
 
1910
                elif line_endings == 'native':
 
1911
                    end = os.linesep
 
1912
                else:
 
1913
                    raise errors.BzrError(
 
1914
                        'Invalid line ending request %r' % line_endings)
 
1915
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1916
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1917
 
 
1918
    def build_tree_contents(self, shape):
 
1919
        build_tree_contents(shape)
 
1920
 
 
1921
    def assertFileEqual(self, content, path):
 
1922
        """Fail if path does not contain 'content'."""
 
1923
        self.failUnlessExists(path)
 
1924
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
1925
        f = file(path, 'r')
 
1926
        try:
 
1927
            s = f.read()
 
1928
        finally:
 
1929
            f.close()
 
1930
        self.assertEqualDiff(content, s)
 
1931
 
 
1932
    def failUnlessExists(self, path):
 
1933
        """Fail unless path, which may be abs or relative, exists."""
 
1934
        self.failUnless(osutils.lexists(path),path+" does not exist")
 
1935
 
 
1936
    def failIfExists(self, path):
 
1937
        """Fail if path, which may be abs or relative, exists."""
 
1938
        self.failIf(osutils.lexists(path),path+" exists")
 
1939
 
 
1940
 
 
1941
class TestCaseWithTransport(TestCaseInTempDir):
 
1942
    """A test case that provides get_url and get_readonly_url facilities.
 
1943
 
 
1944
    These back onto two transport servers, one for readonly access and one for
 
1945
    read write access.
 
1946
 
 
1947
    If no explicit class is provided for readonly access, a
 
1948
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1949
    based read write transports.
 
1950
 
 
1951
    If an explicit class is provided for readonly access, that server and the 
 
1952
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1953
    """
 
1954
 
 
1955
    def get_vfs_only_server(self):
 
1956
        """See TestCaseWithMemoryTransport.
 
1957
 
 
1958
        This is useful for some tests with specific servers that need
 
1959
        diagnostics.
 
1960
        """
 
1961
        if self.__vfs_server is None:
 
1962
            self.__vfs_server = self.vfs_transport_factory()
 
1963
            self.__vfs_server.setUp()
 
1964
            self.addCleanup(self.__vfs_server.tearDown)
 
1965
        return self.__vfs_server
 
1966
 
 
1967
    def make_branch_and_tree(self, relpath, format=None):
 
1968
        """Create a branch on the transport and a tree locally.
 
1969
 
 
1970
        If the transport is not a LocalTransport, the Tree can't be created on
 
1971
        the transport.  In that case if the vfs_transport_factory is
 
1972
        LocalURLServer the working tree is created in the local
 
1973
        directory backing the transport, and the returned tree's branch and
 
1974
        repository will also be accessed locally. Otherwise a lightweight
 
1975
        checkout is created and returned.
 
1976
 
 
1977
        :param format: The BzrDirFormat.
 
1978
        :returns: the WorkingTree.
 
1979
        """
 
1980
        # TODO: always use the local disk path for the working tree,
 
1981
        # this obviously requires a format that supports branch references
 
1982
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1983
        # RBC 20060208
 
1984
        b = self.make_branch(relpath, format=format)
 
1985
        try:
 
1986
            return b.bzrdir.create_workingtree()
 
1987
        except errors.NotLocalUrl:
 
1988
            # We can only make working trees locally at the moment.  If the
 
1989
            # transport can't support them, then we keep the non-disk-backed
 
1990
            # branch and create a local checkout.
 
1991
            if self.vfs_transport_factory is LocalURLServer:
 
1992
                # the branch is colocated on disk, we cannot create a checkout.
 
1993
                # hopefully callers will expect this.
 
1994
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
1995
                return local_controldir.create_workingtree()
 
1996
            else:
 
1997
                return b.create_checkout(relpath, lightweight=True)
 
1998
 
 
1999
    def assertIsDirectory(self, relpath, transport):
 
2000
        """Assert that relpath within transport is a directory.
 
2001
 
 
2002
        This may not be possible on all transports; in that case it propagates
 
2003
        a TransportNotPossible.
 
2004
        """
 
2005
        try:
 
2006
            mode = transport.stat(relpath).st_mode
 
2007
        except errors.NoSuchFile:
 
2008
            self.fail("path %s is not a directory; no such file"
 
2009
                      % (relpath))
 
2010
        if not stat.S_ISDIR(mode):
 
2011
            self.fail("path %s is not a directory; has mode %#o"
 
2012
                      % (relpath, mode))
 
2013
 
 
2014
    def assertTreesEqual(self, left, right):
 
2015
        """Check that left and right have the same content and properties."""
 
2016
        # we use a tree delta to check for equality of the content, and we
 
2017
        # manually check for equality of other things such as the parents list.
 
2018
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2019
        differences = left.changes_from(right)
 
2020
        self.assertFalse(differences.has_changed(),
 
2021
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2022
 
 
2023
    def setUp(self):
 
2024
        super(TestCaseWithTransport, self).setUp()
 
2025
        self.__vfs_server = None
 
2026
 
 
2027
 
 
2028
class ChrootedTestCase(TestCaseWithTransport):
 
2029
    """A support class that provides readonly urls outside the local namespace.
 
2030
 
 
2031
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2032
    is then we are chrooted already, if it is not then an HttpServer is used
 
2033
    for readonly urls.
 
2034
 
 
2035
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2036
                       be used without needed to redo it when a different 
 
2037
                       subclass is in use ?
 
2038
    """
 
2039
 
 
2040
    def setUp(self):
 
2041
        super(ChrootedTestCase, self).setUp()
 
2042
        if not self.vfs_transport_factory == MemoryServer:
 
2043
            self.transport_readonly_server = HttpServer
 
2044
 
 
2045
 
 
2046
def filter_suite_by_re(suite, pattern):
 
2047
    result = TestUtil.TestSuite()
 
2048
    filter_re = re.compile(pattern)
 
2049
    for test in iter_suite_tests(suite):
 
2050
        if filter_re.search(test.id()):
 
2051
            result.addTest(test)
 
2052
    return result
 
2053
 
 
2054
 
 
2055
def sort_suite_by_re(suite, pattern):
 
2056
    first = []
 
2057
    second = []
 
2058
    filter_re = re.compile(pattern)
 
2059
    for test in iter_suite_tests(suite):
 
2060
        if filter_re.search(test.id()):
 
2061
            first.append(test)
 
2062
        else:
 
2063
            second.append(test)
 
2064
    return TestUtil.TestSuite(first + second)
 
2065
 
 
2066
 
 
2067
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2068
              stop_on_failure=False, keep_output=False,
 
2069
              transport=None, lsprof_timed=None, bench_history=None,
 
2070
              matching_tests_first=None,
 
2071
              numbered_dirs=None):
 
2072
    global NUMBERED_DIRS
 
2073
    if numbered_dirs is not None:
 
2074
        NUMBERED_DIRS = bool(numbered_dirs)
 
2075
 
 
2076
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2077
    if verbose:
 
2078
        verbosity = 2
 
2079
    else:
 
2080
        verbosity = 1
 
2081
    runner = TextTestRunner(stream=sys.stdout,
 
2082
                            descriptions=0,
 
2083
                            verbosity=verbosity,
 
2084
                            keep_output=keep_output,
 
2085
                            bench_history=bench_history)
 
2086
    runner.stop_on_failure=stop_on_failure
 
2087
    if pattern != '.*':
 
2088
        if matching_tests_first:
 
2089
            suite = sort_suite_by_re(suite, pattern)
 
2090
        else:
 
2091
            suite = filter_suite_by_re(suite, pattern)
 
2092
    result = runner.run(suite)
 
2093
    return result.wasSuccessful()
 
2094
 
 
2095
 
 
2096
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2097
             keep_output=False,
 
2098
             transport=None,
 
2099
             test_suite_factory=None,
 
2100
             lsprof_timed=None,
 
2101
             bench_history=None,
 
2102
             matching_tests_first=None,
 
2103
             numbered_dirs=None):
 
2104
    """Run the whole test suite under the enhanced runner"""
 
2105
    # XXX: Very ugly way to do this...
 
2106
    # Disable warning about old formats because we don't want it to disturb
 
2107
    # any blackbox tests.
 
2108
    from bzrlib import repository
 
2109
    repository._deprecation_warning_done = True
 
2110
 
 
2111
    global default_transport
 
2112
    if transport is None:
 
2113
        transport = default_transport
 
2114
    old_transport = default_transport
 
2115
    default_transport = transport
 
2116
    try:
 
2117
        if test_suite_factory is None:
 
2118
            suite = test_suite()
 
2119
        else:
 
2120
            suite = test_suite_factory()
 
2121
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2122
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
2123
                     transport=transport,
 
2124
                     lsprof_timed=lsprof_timed,
 
2125
                     bench_history=bench_history,
 
2126
                     matching_tests_first=matching_tests_first,
 
2127
                     numbered_dirs=numbered_dirs)
 
2128
    finally:
 
2129
        default_transport = old_transport
 
2130
 
 
2131
 
 
2132
def test_suite():
 
2133
    """Build and return TestSuite for the whole of bzrlib.
 
2134
    
 
2135
    This function can be replaced if you need to change the default test
 
2136
    suite on a global basis, but it is not encouraged.
 
2137
    """
 
2138
    testmod_names = [
 
2139
                   'bzrlib.tests.test_ancestry',
 
2140
                   'bzrlib.tests.test_annotate',
 
2141
                   'bzrlib.tests.test_api',
 
2142
                   'bzrlib.tests.test_atomicfile',
 
2143
                   'bzrlib.tests.test_bad_files',
 
2144
                   'bzrlib.tests.test_branch',
 
2145
                   'bzrlib.tests.test_bundle',
 
2146
                   'bzrlib.tests.test_bzrdir',
 
2147
                   'bzrlib.tests.test_cache_utf8',
 
2148
                   'bzrlib.tests.test_commands',
 
2149
                   'bzrlib.tests.test_commit',
 
2150
                   'bzrlib.tests.test_commit_merge',
 
2151
                   'bzrlib.tests.test_config',
 
2152
                   'bzrlib.tests.test_conflicts',
 
2153
                   'bzrlib.tests.test_decorators',
 
2154
                   'bzrlib.tests.test_delta',
 
2155
                   'bzrlib.tests.test_diff',
 
2156
                   'bzrlib.tests.test_dirstate',
 
2157
                   'bzrlib.tests.test_doc_generate',
 
2158
                   'bzrlib.tests.test_errors',
 
2159
                   'bzrlib.tests.test_escaped_store',
 
2160
                   'bzrlib.tests.test_extract',
 
2161
                   'bzrlib.tests.test_fetch',
 
2162
                   'bzrlib.tests.test_ftp_transport',
 
2163
                   'bzrlib.tests.test_generate_docs',
 
2164
                   'bzrlib.tests.test_generate_ids',
 
2165
                   'bzrlib.tests.test_globbing',
 
2166
                   'bzrlib.tests.test_gpg',
 
2167
                   'bzrlib.tests.test_graph',
 
2168
                   'bzrlib.tests.test_hashcache',
 
2169
                   'bzrlib.tests.test_http',
 
2170
                   'bzrlib.tests.test_http_response',
 
2171
                   'bzrlib.tests.test_https_ca_bundle',
 
2172
                   'bzrlib.tests.test_identitymap',
 
2173
                   'bzrlib.tests.test_ignores',
 
2174
                   'bzrlib.tests.test_inv',
 
2175
                   'bzrlib.tests.test_knit',
 
2176
                   'bzrlib.tests.test_lazy_import',
 
2177
                   'bzrlib.tests.test_lazy_regex',
 
2178
                   'bzrlib.tests.test_lockdir',
 
2179
                   'bzrlib.tests.test_lockable_files',
 
2180
                   'bzrlib.tests.test_log',
 
2181
                   'bzrlib.tests.test_memorytree',
 
2182
                   'bzrlib.tests.test_merge',
 
2183
                   'bzrlib.tests.test_merge3',
 
2184
                   'bzrlib.tests.test_merge_core',
 
2185
                   'bzrlib.tests.test_merge_directive',
 
2186
                   'bzrlib.tests.test_missing',
 
2187
                   'bzrlib.tests.test_msgeditor',
 
2188
                   'bzrlib.tests.test_nonascii',
 
2189
                   'bzrlib.tests.test_options',
 
2190
                   'bzrlib.tests.test_osutils',
 
2191
                   'bzrlib.tests.test_osutils_encodings',
 
2192
                   'bzrlib.tests.test_patch',
 
2193
                   'bzrlib.tests.test_patches',
 
2194
                   'bzrlib.tests.test_permissions',
 
2195
                   'bzrlib.tests.test_plugins',
 
2196
                   'bzrlib.tests.test_progress',
 
2197
                   'bzrlib.tests.test_reconcile',
 
2198
                   'bzrlib.tests.test_registry',
 
2199
                   'bzrlib.tests.test_repository',
 
2200
                   'bzrlib.tests.test_revert',
 
2201
                   'bzrlib.tests.test_revision',
 
2202
                   'bzrlib.tests.test_revisionnamespaces',
 
2203
                   'bzrlib.tests.test_revisiontree',
 
2204
                   'bzrlib.tests.test_rio',
 
2205
                   'bzrlib.tests.test_sampler',
 
2206
                   'bzrlib.tests.test_selftest',
 
2207
                   'bzrlib.tests.test_setup',
 
2208
                   'bzrlib.tests.test_sftp_transport',
 
2209
                   'bzrlib.tests.test_smart_add',
 
2210
                   'bzrlib.tests.test_smart_transport',
 
2211
                   'bzrlib.tests.test_source',
 
2212
                   'bzrlib.tests.test_ssh_transport',
 
2213
                   'bzrlib.tests.test_status',
 
2214
                   'bzrlib.tests.test_store',
 
2215
                   'bzrlib.tests.test_strace',
 
2216
                   'bzrlib.tests.test_subsume',
 
2217
                   'bzrlib.tests.test_symbol_versioning',
 
2218
                   'bzrlib.tests.test_tag',
 
2219
                   'bzrlib.tests.test_testament',
 
2220
                   'bzrlib.tests.test_textfile',
 
2221
                   'bzrlib.tests.test_textmerge',
 
2222
                   'bzrlib.tests.test_timestamp',
 
2223
                   'bzrlib.tests.test_trace',
 
2224
                   'bzrlib.tests.test_transactions',
 
2225
                   'bzrlib.tests.test_transform',
 
2226
                   'bzrlib.tests.test_transport',
 
2227
                   'bzrlib.tests.test_tree',
 
2228
                   'bzrlib.tests.test_treebuilder',
 
2229
                   'bzrlib.tests.test_tsort',
 
2230
                   'bzrlib.tests.test_tuned_gzip',
 
2231
                   'bzrlib.tests.test_ui',
 
2232
                   'bzrlib.tests.test_upgrade',
 
2233
                   'bzrlib.tests.test_urlutils',
 
2234
                   'bzrlib.tests.test_versionedfile',
 
2235
                   'bzrlib.tests.test_version',
 
2236
                   'bzrlib.tests.test_version_info',
 
2237
                   'bzrlib.tests.test_weave',
 
2238
                   'bzrlib.tests.test_whitebox',
 
2239
                   'bzrlib.tests.test_workingtree',
 
2240
                   'bzrlib.tests.test_workingtree_4',
 
2241
                   'bzrlib.tests.test_wsgi',
 
2242
                   'bzrlib.tests.test_xml',
 
2243
                   ]
 
2244
    test_transport_implementations = [
 
2245
        'bzrlib.tests.test_transport_implementations',
 
2246
        'bzrlib.tests.test_read_bundle',
 
2247
        ]
 
2248
    suite = TestUtil.TestSuite()
 
2249
    loader = TestUtil.TestLoader()
 
2250
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2251
    from bzrlib.transport import TransportTestProviderAdapter
 
2252
    adapter = TransportTestProviderAdapter()
 
2253
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2254
    for package in packages_to_test():
 
2255
        suite.addTest(package.test_suite())
 
2256
    for m in MODULES_TO_TEST:
 
2257
        suite.addTest(loader.loadTestsFromModule(m))
 
2258
    for m in MODULES_TO_DOCTEST:
 
2259
        try:
 
2260
            suite.addTest(doctest.DocTestSuite(m))
 
2261
        except ValueError, e:
 
2262
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2263
            raise
 
2264
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
2265
        if getattr(plugin, 'test_suite', None) is not None:
 
2266
            default_encoding = sys.getdefaultencoding()
 
2267
            try:
 
2268
                plugin_suite = plugin.test_suite()
 
2269
            except ImportError, e:
 
2270
                bzrlib.trace.warning(
 
2271
                    'Unable to test plugin "%s": %s', name, e)
 
2272
            else:
 
2273
                suite.addTest(plugin_suite)
 
2274
            if default_encoding != sys.getdefaultencoding():
 
2275
                bzrlib.trace.warning(
 
2276
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2277
                    sys.getdefaultencoding())
 
2278
                reload(sys)
 
2279
                sys.setdefaultencoding(default_encoding)
 
2280
    return suite
 
2281
 
 
2282
 
 
2283
def adapt_modules(mods_list, adapter, loader, suite):
 
2284
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2285
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2286
        suite.addTests(adapter.adapt(test))
 
2287
 
 
2288
 
 
2289
def clean_selftest_output(root=None, quiet=False):
 
2290
    """Remove all selftest output directories from root directory.
 
2291
 
 
2292
    :param  root:   root directory for clean
 
2293
                    (if ommitted or None then clean current directory).
 
2294
    :param  quiet:  suppress report about deleting directories
 
2295
    """
 
2296
    import re
 
2297
    import shutil
 
2298
 
 
2299
    re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
 
2300
    if root is None:
 
2301
        root = u'.'
 
2302
    for i in os.listdir(root):
 
2303
        if os.path.isdir(i) and re_dir.match(i):
 
2304
            if not quiet:
 
2305
                print 'delete directory:', i
 
2306
            shutil.rmtree(i)
 
2307
 
 
2308
 
 
2309
class Feature(object):
 
2310
    """An operating system Feature."""
 
2311
 
 
2312
    def __init__(self):
 
2313
        self._available = None
 
2314
 
 
2315
    def available(self):
 
2316
        """Is the feature available?
 
2317
 
 
2318
        :return: True if the feature is available.
 
2319
        """
 
2320
        if self._available is None:
 
2321
            self._available = self._probe()
 
2322
        return self._available
 
2323
 
 
2324
    def _probe(self):
 
2325
        """Implement this method in concrete features.
 
2326
 
 
2327
        :return: True if the feature is available.
 
2328
        """
 
2329
        raise NotImplementedError
 
2330
 
 
2331
    def __str__(self):
 
2332
        if getattr(self, 'feature_name', None):
 
2333
            return self.feature_name()
 
2334
        return self.__class__.__name__