/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Alexander Belchenko
  • Date: 2007-03-11 20:30:46 UTC
  • mto: This revision was merged to the branch mainline in revision 2338.
  • Revision ID: bialix@ukr.net-20070311203046-e8jo5ku8xxepti4x
0.15 NEWS cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import doctest
 
33
import errno
 
34
import logging
 
35
import os
 
36
import re
 
37
import shlex
 
38
import stat
 
39
from subprocess import Popen, PIPE
 
40
import sys
 
41
import tempfile
 
42
import unittest
 
43
import time
 
44
 
 
45
 
 
46
from bzrlib import (
 
47
    bzrdir,
 
48
    debug,
 
49
    errors,
 
50
    memorytree,
 
51
    osutils,
 
52
    progress,
 
53
    urlutils,
 
54
    )
 
55
import bzrlib.branch
 
56
import bzrlib.commands
 
57
import bzrlib.bundle.serializer
 
58
import bzrlib.export
 
59
import bzrlib.inventory
 
60
import bzrlib.iterablefile
 
61
import bzrlib.lockdir
 
62
try:
 
63
    import bzrlib.lsprof
 
64
except ImportError:
 
65
    # lsprof not available
 
66
    pass
 
67
from bzrlib.merge import merge_inner
 
68
import bzrlib.merge3
 
69
import bzrlib.osutils
 
70
import bzrlib.plugin
 
71
from bzrlib.revision import common_ancestor
 
72
import bzrlib.store
 
73
from bzrlib import symbol_versioning
 
74
import bzrlib.trace
 
75
from bzrlib.transport import get_transport
 
76
import bzrlib.transport
 
77
from bzrlib.transport.local import LocalURLServer
 
78
from bzrlib.transport.memory import MemoryServer
 
79
from bzrlib.transport.readonly import ReadonlyServer
 
80
from bzrlib.trace import mutter, note
 
81
from bzrlib.tests import TestUtil
 
82
from bzrlib.tests.HttpServer import HttpServer
 
83
from bzrlib.tests.TestUtil import (
 
84
                          TestSuite,
 
85
                          TestLoader,
 
86
                          )
 
87
from bzrlib.tests.treeshape import build_tree_contents
 
88
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
89
 
 
90
default_transport = LocalURLServer
 
91
 
 
92
MODULES_TO_TEST = []
 
93
MODULES_TO_DOCTEST = [
 
94
                      bzrlib.bundle.serializer,
 
95
                      bzrlib.errors,
 
96
                      bzrlib.export,
 
97
                      bzrlib.inventory,
 
98
                      bzrlib.iterablefile,
 
99
                      bzrlib.lockdir,
 
100
                      bzrlib.merge3,
 
101
                      bzrlib.option,
 
102
                      bzrlib.store,
 
103
                      ]
 
104
 
 
105
NUMBERED_DIRS = False   # dirs kind for TestCaseInTempDir (numbered or named)
 
106
 
 
107
 
 
108
def packages_to_test():
 
109
    """Return a list of packages to test.
 
110
 
 
111
    The packages are not globally imported so that import failures are
 
112
    triggered when running selftest, not when importing the command.
 
113
    """
 
114
    import bzrlib.doc
 
115
    import bzrlib.tests.blackbox
 
116
    import bzrlib.tests.branch_implementations
 
117
    import bzrlib.tests.bzrdir_implementations
 
118
    import bzrlib.tests.interrepository_implementations
 
119
    import bzrlib.tests.interversionedfile_implementations
 
120
    import bzrlib.tests.intertree_implementations
 
121
    import bzrlib.tests.repository_implementations
 
122
    import bzrlib.tests.revisionstore_implementations
 
123
    import bzrlib.tests.tree_implementations
 
124
    import bzrlib.tests.workingtree_implementations
 
125
    return [
 
126
            bzrlib.doc,
 
127
            bzrlib.tests.blackbox,
 
128
            bzrlib.tests.branch_implementations,
 
129
            bzrlib.tests.bzrdir_implementations,
 
130
            bzrlib.tests.interrepository_implementations,
 
131
            bzrlib.tests.interversionedfile_implementations,
 
132
            bzrlib.tests.intertree_implementations,
 
133
            bzrlib.tests.repository_implementations,
 
134
            bzrlib.tests.revisionstore_implementations,
 
135
            bzrlib.tests.tree_implementations,
 
136
            bzrlib.tests.workingtree_implementations,
 
137
            ]
 
138
 
 
139
 
 
140
class ExtendedTestResult(unittest._TextTestResult):
 
141
    """Accepts, reports and accumulates the results of running tests.
 
142
 
 
143
    Compared to this unittest version this class adds support for profiling,
 
144
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
145
    There are further-specialized subclasses for different types of display.
 
146
    """
 
147
 
 
148
    stop_early = False
 
149
    
 
150
    def __init__(self, stream, descriptions, verbosity,
 
151
                 bench_history=None,
 
152
                 num_tests=None,
 
153
                 ):
 
154
        """Construct new TestResult.
 
155
 
 
156
        :param bench_history: Optionally, a writable file object to accumulate
 
157
            benchmark results.
 
158
        """
 
159
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
160
        if bench_history is not None:
 
161
            from bzrlib.version import _get_bzr_source_tree
 
162
            src_tree = _get_bzr_source_tree()
 
163
            if src_tree:
 
164
                try:
 
165
                    revision_id = src_tree.get_parent_ids()[0]
 
166
                except IndexError:
 
167
                    # XXX: if this is a brand new tree, do the same as if there
 
168
                    # is no branch.
 
169
                    revision_id = ''
 
170
            else:
 
171
                # XXX: If there's no branch, what should we do?
 
172
                revision_id = ''
 
173
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
174
        self._bench_history = bench_history
 
175
        self.ui = bzrlib.ui.ui_factory
 
176
        self.num_tests = num_tests
 
177
        self.error_count = 0
 
178
        self.failure_count = 0
 
179
        self.skip_count = 0
 
180
        self.count = 0
 
181
        self._overall_start_time = time.time()
 
182
    
 
183
    def extractBenchmarkTime(self, testCase):
 
184
        """Add a benchmark time for the current test case."""
 
185
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
186
    
 
187
    def _elapsedTestTimeString(self):
 
188
        """Return a time string for the overall time the current test has taken."""
 
189
        return self._formatTime(time.time() - self._start_time)
 
190
 
 
191
    def _testTimeString(self):
 
192
        if self._benchmarkTime is not None:
 
193
            return "%s/%s" % (
 
194
                self._formatTime(self._benchmarkTime),
 
195
                self._elapsedTestTimeString())
 
196
        else:
 
197
            return "           %s" % self._elapsedTestTimeString()
 
198
 
 
199
    def _formatTime(self, seconds):
 
200
        """Format seconds as milliseconds with leading spaces."""
 
201
        # some benchmarks can take thousands of seconds to run, so we need 8
 
202
        # places
 
203
        return "%8dms" % (1000 * seconds)
 
204
 
 
205
    def _shortened_test_description(self, test):
 
206
        what = test.id()
 
207
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
208
        return what
 
209
 
 
210
    def startTest(self, test):
 
211
        unittest.TestResult.startTest(self, test)
 
212
        self.report_test_start(test)
 
213
        test.number = self.count
 
214
        self._recordTestStartTime()
 
215
 
 
216
    def _recordTestStartTime(self):
 
217
        """Record that a test has started."""
 
218
        self._start_time = time.time()
 
219
 
 
220
    def addError(self, test, err):
 
221
        if isinstance(err[1], TestSkipped):
 
222
            return self.addSkipped(test, err)    
 
223
        unittest.TestResult.addError(self, test, err)
 
224
        # We can only do this if we have one of our TestCases, not if
 
225
        # we have a doctest.
 
226
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
227
        if setKeepLogfile is not None:
 
228
            setKeepLogfile()
 
229
        self.extractBenchmarkTime(test)
 
230
        self.report_error(test, err)
 
231
        if self.stop_early:
 
232
            self.stop()
 
233
 
 
234
    def addFailure(self, test, err):
 
235
        unittest.TestResult.addFailure(self, test, err)
 
236
        # We can only do this if we have one of our TestCases, not if
 
237
        # we have a doctest.
 
238
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
239
        if setKeepLogfile is not None:
 
240
            setKeepLogfile()
 
241
        self.extractBenchmarkTime(test)
 
242
        self.report_failure(test, err)
 
243
        if self.stop_early:
 
244
            self.stop()
 
245
 
 
246
    def addSuccess(self, test):
 
247
        self.extractBenchmarkTime(test)
 
248
        if self._bench_history is not None:
 
249
            if self._benchmarkTime is not None:
 
250
                self._bench_history.write("%s %s\n" % (
 
251
                    self._formatTime(self._benchmarkTime),
 
252
                    test.id()))
 
253
        self.report_success(test)
 
254
        unittest.TestResult.addSuccess(self, test)
 
255
 
 
256
    def addSkipped(self, test, skip_excinfo):
 
257
        self.extractBenchmarkTime(test)
 
258
        self.report_skip(test, skip_excinfo)
 
259
        # seems best to treat this as success from point-of-view of unittest
 
260
        # -- it actually does nothing so it barely matters :)
 
261
        try:
 
262
            test.tearDown()
 
263
        except KeyboardInterrupt:
 
264
            raise
 
265
        except:
 
266
            self.addError(test, test.__exc_info())
 
267
        else:
 
268
            unittest.TestResult.addSuccess(self, test)
 
269
 
 
270
    def printErrorList(self, flavour, errors):
 
271
        for test, err in errors:
 
272
            self.stream.writeln(self.separator1)
 
273
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
274
            if getattr(test, '_get_log', None) is not None:
 
275
                print >>self.stream
 
276
                print >>self.stream, \
 
277
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
278
                print >>self.stream, test._get_log()
 
279
                print >>self.stream, \
 
280
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
281
            self.stream.writeln(self.separator2)
 
282
            self.stream.writeln("%s" % err)
 
283
 
 
284
    def finished(self):
 
285
        pass
 
286
 
 
287
    def report_cleaning_up(self):
 
288
        pass
 
289
 
 
290
    def report_success(self, test):
 
291
        pass
 
292
 
 
293
 
 
294
class TextTestResult(ExtendedTestResult):
 
295
    """Displays progress and results of tests in text form"""
 
296
 
 
297
    def __init__(self, *args, **kw):
 
298
        ExtendedTestResult.__init__(self, *args, **kw)
 
299
        self.pb = self.ui.nested_progress_bar()
 
300
        self.pb.show_pct = False
 
301
        self.pb.show_spinner = False
 
302
        self.pb.show_eta = False, 
 
303
        self.pb.show_count = False
 
304
        self.pb.show_bar = False
 
305
 
 
306
    def report_starting(self):
 
307
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
308
 
 
309
    def _progress_prefix_text(self):
 
310
        a = '[%d' % self.count
 
311
        if self.num_tests is not None:
 
312
            a +='/%d' % self.num_tests
 
313
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
314
        if self.error_count:
 
315
            a += ', %d errors' % self.error_count
 
316
        if self.failure_count:
 
317
            a += ', %d failed' % self.failure_count
 
318
        if self.skip_count:
 
319
            a += ', %d skipped' % self.skip_count
 
320
        a += ']'
 
321
        return a
 
322
 
 
323
    def report_test_start(self, test):
 
324
        self.count += 1
 
325
        self.pb.update(
 
326
                self._progress_prefix_text()
 
327
                + ' ' 
 
328
                + self._shortened_test_description(test))
 
329
 
 
330
    def _test_description(self, test):
 
331
        if NUMBERED_DIRS:
 
332
            return '#%d %s' % (self.count,
 
333
                               self._shortened_test_description(test))
 
334
        else:
 
335
            return self._shortened_test_description(test)
 
336
 
 
337
    def report_error(self, test, err):
 
338
        self.error_count += 1
 
339
        self.pb.note('ERROR: %s\n    %s\n', 
 
340
            self._test_description(test),
 
341
            err[1],
 
342
            )
 
343
 
 
344
    def report_failure(self, test, err):
 
345
        self.failure_count += 1
 
346
        self.pb.note('FAIL: %s\n    %s\n', 
 
347
            self._test_description(test),
 
348
            err[1],
 
349
            )
 
350
 
 
351
    def report_skip(self, test, skip_excinfo):
 
352
        self.skip_count += 1
 
353
        if False:
 
354
            # at the moment these are mostly not things we can fix
 
355
            # and so they just produce stipple; use the verbose reporter
 
356
            # to see them.
 
357
            if False:
 
358
                # show test and reason for skip
 
359
                self.pb.note('SKIP: %s\n    %s\n', 
 
360
                    self._shortened_test_description(test),
 
361
                    skip_excinfo[1])
 
362
            else:
 
363
                # since the class name was left behind in the still-visible
 
364
                # progress bar...
 
365
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
366
 
 
367
    def report_cleaning_up(self):
 
368
        self.pb.update('cleaning up...')
 
369
 
 
370
    def finished(self):
 
371
        self.pb.finished()
 
372
 
 
373
 
 
374
class VerboseTestResult(ExtendedTestResult):
 
375
    """Produce long output, with one line per test run plus times"""
 
376
 
 
377
    def _ellipsize_to_right(self, a_string, final_width):
 
378
        """Truncate and pad a string, keeping the right hand side"""
 
379
        if len(a_string) > final_width:
 
380
            result = '...' + a_string[3-final_width:]
 
381
        else:
 
382
            result = a_string
 
383
        return result.ljust(final_width)
 
384
 
 
385
    def report_starting(self):
 
386
        self.stream.write('running %d tests...\n' % self.num_tests)
 
387
 
 
388
    def report_test_start(self, test):
 
389
        self.count += 1
 
390
        name = self._shortened_test_description(test)
 
391
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
392
        # numbers, plus a trailing blank
 
393
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
394
        if NUMBERED_DIRS:
 
395
            self.stream.write('%5d ' % self.count)
 
396
            self.stream.write(self._ellipsize_to_right(name,
 
397
                                osutils.terminal_width()-36))
 
398
        else:
 
399
            self.stream.write(self._ellipsize_to_right(name,
 
400
                                osutils.terminal_width()-30))
 
401
        self.stream.flush()
 
402
 
 
403
    def _error_summary(self, err):
 
404
        indent = ' ' * 4
 
405
        if NUMBERED_DIRS:
 
406
            indent += ' ' * 6
 
407
        return '%s%s' % (indent, err[1])
 
408
 
 
409
    def report_error(self, test, err):
 
410
        self.error_count += 1
 
411
        self.stream.writeln('ERROR %s\n%s'
 
412
                % (self._testTimeString(),
 
413
                   self._error_summary(err)))
 
414
 
 
415
    def report_failure(self, test, err):
 
416
        self.failure_count += 1
 
417
        self.stream.writeln(' FAIL %s\n%s'
 
418
                % (self._testTimeString(),
 
419
                   self._error_summary(err)))
 
420
 
 
421
    def report_success(self, test):
 
422
        self.stream.writeln('   OK %s' % self._testTimeString())
 
423
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
424
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
425
            stats.pprint(file=self.stream)
 
426
        self.stream.flush()
 
427
 
 
428
    def report_skip(self, test, skip_excinfo):
 
429
        self.skip_count += 1
 
430
        self.stream.writeln(' SKIP %s\n%s'
 
431
                % (self._testTimeString(),
 
432
                   self._error_summary(skip_excinfo)))
 
433
 
 
434
 
 
435
class TextTestRunner(object):
 
436
    stop_on_failure = False
 
437
 
 
438
    def __init__(self,
 
439
                 stream=sys.stderr,
 
440
                 descriptions=0,
 
441
                 verbosity=1,
 
442
                 keep_output=False,
 
443
                 bench_history=None):
 
444
        self.stream = unittest._WritelnDecorator(stream)
 
445
        self.descriptions = descriptions
 
446
        self.verbosity = verbosity
 
447
        self.keep_output = keep_output
 
448
        self._bench_history = bench_history
 
449
 
 
450
    def run(self, test):
 
451
        "Run the given test case or test suite."
 
452
        startTime = time.time()
 
453
        if self.verbosity == 1:
 
454
            result_class = TextTestResult
 
455
        elif self.verbosity >= 2:
 
456
            result_class = VerboseTestResult
 
457
        result = result_class(self.stream,
 
458
                              self.descriptions,
 
459
                              self.verbosity,
 
460
                              bench_history=self._bench_history,
 
461
                              num_tests=test.countTestCases(),
 
462
                              )
 
463
        result.stop_early = self.stop_on_failure
 
464
        result.report_starting()
 
465
        test.run(result)
 
466
        stopTime = time.time()
 
467
        timeTaken = stopTime - startTime
 
468
        result.printErrors()
 
469
        self.stream.writeln(result.separator2)
 
470
        run = result.testsRun
 
471
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
472
                            (run, run != 1 and "s" or "", timeTaken))
 
473
        self.stream.writeln()
 
474
        if not result.wasSuccessful():
 
475
            self.stream.write("FAILED (")
 
476
            failed, errored = map(len, (result.failures, result.errors))
 
477
            if failed:
 
478
                self.stream.write("failures=%d" % failed)
 
479
            if errored:
 
480
                if failed: self.stream.write(", ")
 
481
                self.stream.write("errors=%d" % errored)
 
482
            self.stream.writeln(")")
 
483
        else:
 
484
            self.stream.writeln("OK")
 
485
        if result.skip_count > 0:
 
486
            skipped = result.skip_count
 
487
            self.stream.writeln('%d test%s skipped' %
 
488
                                (skipped, skipped != 1 and "s" or ""))
 
489
        result.report_cleaning_up()
 
490
        # This is still a little bogus, 
 
491
        # but only a little. Folk not using our testrunner will
 
492
        # have to delete their temp directories themselves.
 
493
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
 
494
        if result.wasSuccessful() or not self.keep_output:
 
495
            if test_root is not None:
 
496
                # If LANG=C we probably have created some bogus paths
 
497
                # which rmtree(unicode) will fail to delete
 
498
                # so make sure we are using rmtree(str) to delete everything
 
499
                # except on win32, where rmtree(str) will fail
 
500
                # since it doesn't have the property of byte-stream paths
 
501
                # (they are either ascii or mbcs)
 
502
                if sys.platform == 'win32':
 
503
                    # make sure we are using the unicode win32 api
 
504
                    test_root = unicode(test_root)
 
505
                else:
 
506
                    test_root = test_root.encode(
 
507
                        sys.getfilesystemencoding())
 
508
                try:
 
509
                    osutils.rmtree(test_root)
 
510
                except OSError, e:
 
511
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
512
                        print >>sys.stderr, ('Permission denied: '
 
513
                                             'unable to remove testing dir '
 
514
                                             '%s' % os.path.basename(test_root))
 
515
                    else:
 
516
                        raise
 
517
        else:
 
518
            note("Failed tests working directories are in '%s'\n", test_root)
 
519
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
520
        result.finished()
 
521
        return result
 
522
 
 
523
 
 
524
def iter_suite_tests(suite):
 
525
    """Return all tests in a suite, recursing through nested suites"""
 
526
    for item in suite._tests:
 
527
        if isinstance(item, unittest.TestCase):
 
528
            yield item
 
529
        elif isinstance(item, unittest.TestSuite):
 
530
            for r in iter_suite_tests(item):
 
531
                yield r
 
532
        else:
 
533
            raise Exception('unknown object %r inside test suite %r'
 
534
                            % (item, suite))
 
535
 
 
536
 
 
537
class TestSkipped(Exception):
 
538
    """Indicates that a test was intentionally skipped, rather than failing."""
 
539
 
 
540
 
 
541
class CommandFailed(Exception):
 
542
    pass
 
543
 
 
544
 
 
545
class StringIOWrapper(object):
 
546
    """A wrapper around cStringIO which just adds an encoding attribute.
 
547
    
 
548
    Internally we can check sys.stdout to see what the output encoding
 
549
    should be. However, cStringIO has no encoding attribute that we can
 
550
    set. So we wrap it instead.
 
551
    """
 
552
    encoding='ascii'
 
553
    _cstring = None
 
554
 
 
555
    def __init__(self, s=None):
 
556
        if s is not None:
 
557
            self.__dict__['_cstring'] = StringIO(s)
 
558
        else:
 
559
            self.__dict__['_cstring'] = StringIO()
 
560
 
 
561
    def __getattr__(self, name, getattr=getattr):
 
562
        return getattr(self.__dict__['_cstring'], name)
 
563
 
 
564
    def __setattr__(self, name, val):
 
565
        if name == 'encoding':
 
566
            self.__dict__['encoding'] = val
 
567
        else:
 
568
            return setattr(self._cstring, name, val)
 
569
 
 
570
 
 
571
class TestCase(unittest.TestCase):
 
572
    """Base class for bzr unit tests.
 
573
    
 
574
    Tests that need access to disk resources should subclass 
 
575
    TestCaseInTempDir not TestCase.
 
576
 
 
577
    Error and debug log messages are redirected from their usual
 
578
    location into a temporary file, the contents of which can be
 
579
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
580
    so that it can also capture file IO.  When the test completes this file
 
581
    is read into memory and removed from disk.
 
582
       
 
583
    There are also convenience functions to invoke bzr's command-line
 
584
    routine, and to build and check bzr trees.
 
585
   
 
586
    In addition to the usual method of overriding tearDown(), this class also
 
587
    allows subclasses to register functions into the _cleanups list, which is
 
588
    run in order as the object is torn down.  It's less likely this will be
 
589
    accidentally overlooked.
 
590
    """
 
591
 
 
592
    _log_file_name = None
 
593
    _log_contents = ''
 
594
    _keep_log_file = False
 
595
    # record lsprof data when performing benchmark calls.
 
596
    _gather_lsprof_in_benchmarks = False
 
597
 
 
598
    def __init__(self, methodName='testMethod'):
 
599
        super(TestCase, self).__init__(methodName)
 
600
        self._cleanups = []
 
601
 
 
602
    def setUp(self):
 
603
        unittest.TestCase.setUp(self)
 
604
        self._cleanEnvironment()
 
605
        bzrlib.trace.disable_default_logging()
 
606
        self._silenceUI()
 
607
        self._startLogFile()
 
608
        self._benchcalls = []
 
609
        self._benchtime = None
 
610
        # prevent hooks affecting tests
 
611
        self._preserved_hooks = bzrlib.branch.Branch.hooks
 
612
        self.addCleanup(self._restoreHooks)
 
613
        # this list of hooks must be kept in sync with the defaults
 
614
        # in branch.py
 
615
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
616
 
 
617
    def _silenceUI(self):
 
618
        """Turn off UI for duration of test"""
 
619
        # by default the UI is off; tests can turn it on if they want it.
 
620
        saved = bzrlib.ui.ui_factory
 
621
        def _restore():
 
622
            bzrlib.ui.ui_factory = saved
 
623
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
624
        self.addCleanup(_restore)
 
625
 
 
626
    def _ndiff_strings(self, a, b):
 
627
        """Return ndiff between two strings containing lines.
 
628
        
 
629
        A trailing newline is added if missing to make the strings
 
630
        print properly."""
 
631
        if b and b[-1] != '\n':
 
632
            b += '\n'
 
633
        if a and a[-1] != '\n':
 
634
            a += '\n'
 
635
        difflines = difflib.ndiff(a.splitlines(True),
 
636
                                  b.splitlines(True),
 
637
                                  linejunk=lambda x: False,
 
638
                                  charjunk=lambda x: False)
 
639
        return ''.join(difflines)
 
640
 
 
641
    def assertEqualDiff(self, a, b, message=None):
 
642
        """Assert two texts are equal, if not raise an exception.
 
643
        
 
644
        This is intended for use with multi-line strings where it can 
 
645
        be hard to find the differences by eye.
 
646
        """
 
647
        # TODO: perhaps override assertEquals to call this for strings?
 
648
        if a == b:
 
649
            return
 
650
        if message is None:
 
651
            message = "texts not equal:\n"
 
652
        raise AssertionError(message + 
 
653
                             self._ndiff_strings(a, b))      
 
654
        
 
655
    def assertEqualMode(self, mode, mode_test):
 
656
        self.assertEqual(mode, mode_test,
 
657
                         'mode mismatch %o != %o' % (mode, mode_test))
 
658
 
 
659
    def assertStartsWith(self, s, prefix):
 
660
        if not s.startswith(prefix):
 
661
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
662
 
 
663
    def assertEndsWith(self, s, suffix):
 
664
        """Asserts that s ends with suffix."""
 
665
        if not s.endswith(suffix):
 
666
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
667
 
 
668
    def assertContainsRe(self, haystack, needle_re):
 
669
        """Assert that a contains something matching a regular expression."""
 
670
        if not re.search(needle_re, haystack):
 
671
            raise AssertionError('pattern "%s" not found in "%s"'
 
672
                    % (needle_re, haystack))
 
673
 
 
674
    def assertNotContainsRe(self, haystack, needle_re):
 
675
        """Assert that a does not match a regular expression"""
 
676
        if re.search(needle_re, haystack):
 
677
            raise AssertionError('pattern "%s" found in "%s"'
 
678
                    % (needle_re, haystack))
 
679
 
 
680
    def assertSubset(self, sublist, superlist):
 
681
        """Assert that every entry in sublist is present in superlist."""
 
682
        missing = []
 
683
        for entry in sublist:
 
684
            if entry not in superlist:
 
685
                missing.append(entry)
 
686
        if len(missing) > 0:
 
687
            raise AssertionError("value(s) %r not present in container %r" % 
 
688
                                 (missing, superlist))
 
689
 
 
690
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
691
        """Fail unless excClass is raised when the iterator from func is used.
 
692
        
 
693
        Many functions can return generators this makes sure
 
694
        to wrap them in a list() call to make sure the whole generator
 
695
        is run, and that the proper exception is raised.
 
696
        """
 
697
        try:
 
698
            list(func(*args, **kwargs))
 
699
        except excClass:
 
700
            return
 
701
        else:
 
702
            if getattr(excClass,'__name__', None) is not None:
 
703
                excName = excClass.__name__
 
704
            else:
 
705
                excName = str(excClass)
 
706
            raise self.failureException, "%s not raised" % excName
 
707
 
 
708
    def assertIs(self, left, right, message=None):
 
709
        if not (left is right):
 
710
            if message is not None:
 
711
                raise AssertionError(message)
 
712
            else:
 
713
                raise AssertionError("%r is not %r." % (left, right))
 
714
 
 
715
    def assertIsNot(self, left, right, message=None):
 
716
        if (left is right):
 
717
            if message is not None:
 
718
                raise AssertionError(message)
 
719
            else:
 
720
                raise AssertionError("%r is %r." % (left, right))
 
721
 
 
722
    def assertTransportMode(self, transport, path, mode):
 
723
        """Fail if a path does not have mode mode.
 
724
        
 
725
        If modes are not supported on this transport, the assertion is ignored.
 
726
        """
 
727
        if not transport._can_roundtrip_unix_modebits():
 
728
            return
 
729
        path_stat = transport.stat(path)
 
730
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
731
        self.assertEqual(mode, actual_mode,
 
732
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
733
 
 
734
    def assertIsInstance(self, obj, kls):
 
735
        """Fail if obj is not an instance of kls"""
 
736
        if not isinstance(obj, kls):
 
737
            self.fail("%r is an instance of %s rather than %s" % (
 
738
                obj, obj.__class__, kls))
 
739
 
 
740
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
741
        """A helper for callDeprecated and applyDeprecated.
 
742
 
 
743
        :param a_callable: A callable to call.
 
744
        :param args: The positional arguments for the callable
 
745
        :param kwargs: The keyword arguments for the callable
 
746
        :return: A tuple (warnings, result). result is the result of calling
 
747
            a_callable(*args, **kwargs).
 
748
        """
 
749
        local_warnings = []
 
750
        def capture_warnings(msg, cls=None, stacklevel=None):
 
751
            # we've hooked into a deprecation specific callpath,
 
752
            # only deprecations should getting sent via it.
 
753
            self.assertEqual(cls, DeprecationWarning)
 
754
            local_warnings.append(msg)
 
755
        original_warning_method = symbol_versioning.warn
 
756
        symbol_versioning.set_warning_method(capture_warnings)
 
757
        try:
 
758
            result = a_callable(*args, **kwargs)
 
759
        finally:
 
760
            symbol_versioning.set_warning_method(original_warning_method)
 
761
        return (local_warnings, result)
 
762
 
 
763
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
764
        """Call a deprecated callable without warning the user.
 
765
 
 
766
        :param deprecation_format: The deprecation format that the callable
 
767
            should have been deprecated with. This is the same type as the 
 
768
            parameter to deprecated_method/deprecated_function. If the 
 
769
            callable is not deprecated with this format, an assertion error
 
770
            will be raised.
 
771
        :param a_callable: A callable to call. This may be a bound method or
 
772
            a regular function. It will be called with *args and **kwargs.
 
773
        :param args: The positional arguments for the callable
 
774
        :param kwargs: The keyword arguments for the callable
 
775
        :return: The result of a_callable(*args, **kwargs)
 
776
        """
 
777
        call_warnings, result = self._capture_warnings(a_callable,
 
778
            *args, **kwargs)
 
779
        expected_first_warning = symbol_versioning.deprecation_string(
 
780
            a_callable, deprecation_format)
 
781
        if len(call_warnings) == 0:
 
782
            self.fail("No assertion generated by call to %s" %
 
783
                a_callable)
 
784
        self.assertEqual(expected_first_warning, call_warnings[0])
 
785
        return result
 
786
 
 
787
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
788
        """Assert that a callable is deprecated in a particular way.
 
789
 
 
790
        This is a very precise test for unusual requirements. The 
 
791
        applyDeprecated helper function is probably more suited for most tests
 
792
        as it allows you to simply specify the deprecation format being used
 
793
        and will ensure that that is issued for the function being called.
 
794
 
 
795
        :param expected: a list of the deprecation warnings expected, in order
 
796
        :param callable: The callable to call
 
797
        :param args: The positional arguments for the callable
 
798
        :param kwargs: The keyword arguments for the callable
 
799
        """
 
800
        call_warnings, result = self._capture_warnings(callable,
 
801
            *args, **kwargs)
 
802
        self.assertEqual(expected, call_warnings)
 
803
        return result
 
804
 
 
805
    def _startLogFile(self):
 
806
        """Send bzr and test log messages to a temporary file.
 
807
 
 
808
        The file is removed as the test is torn down.
 
809
        """
 
810
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
811
        self._log_file = os.fdopen(fileno, 'w+')
 
812
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
813
        self._log_file_name = name
 
814
        self.addCleanup(self._finishLogFile)
 
815
 
 
816
    def _finishLogFile(self):
 
817
        """Finished with the log file.
 
818
 
 
819
        Close the file and delete it, unless setKeepLogfile was called.
 
820
        """
 
821
        if self._log_file is None:
 
822
            return
 
823
        bzrlib.trace.disable_test_log(self._log_nonce)
 
824
        self._log_file.close()
 
825
        self._log_file = None
 
826
        if not self._keep_log_file:
 
827
            os.remove(self._log_file_name)
 
828
            self._log_file_name = None
 
829
 
 
830
    def setKeepLogfile(self):
 
831
        """Make the logfile not be deleted when _finishLogFile is called."""
 
832
        self._keep_log_file = True
 
833
 
 
834
    def addCleanup(self, callable):
 
835
        """Arrange to run a callable when this case is torn down.
 
836
 
 
837
        Callables are run in the reverse of the order they are registered, 
 
838
        ie last-in first-out.
 
839
        """
 
840
        if callable in self._cleanups:
 
841
            raise ValueError("cleanup function %r already registered on %s" 
 
842
                    % (callable, self))
 
843
        self._cleanups.append(callable)
 
844
 
 
845
    def _cleanEnvironment(self):
 
846
        new_env = {
 
847
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
848
            'HOME': os.getcwd(),
 
849
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
850
            'BZR_EMAIL': None,
 
851
            'BZREMAIL': None, # may still be present in the environment
 
852
            'EMAIL': None,
 
853
            'BZR_PROGRESS_BAR': None,
 
854
            # Proxies
 
855
            'http_proxy': None,
 
856
            'HTTP_PROXY': None,
 
857
            'https_proxy': None,
 
858
            'HTTPS_PROXY': None,
 
859
            'no_proxy': None,
 
860
            'NO_PROXY': None,
 
861
            'all_proxy': None,
 
862
            'ALL_PROXY': None,
 
863
            # Nobody cares about these ones AFAIK. So far at
 
864
            # least. If you do (care), please update this comment
 
865
            # -- vila 20061212
 
866
            'ftp_proxy': None,
 
867
            'FTP_PROXY': None,
 
868
        }
 
869
        self.__old_env = {}
 
870
        self.addCleanup(self._restoreEnvironment)
 
871
        for name, value in new_env.iteritems():
 
872
            self._captureVar(name, value)
 
873
 
 
874
    def _captureVar(self, name, newvalue):
 
875
        """Set an environment variable, and reset it when finished."""
 
876
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
877
 
 
878
    def _restoreEnvironment(self):
 
879
        for name, value in self.__old_env.iteritems():
 
880
            osutils.set_or_unset_env(name, value)
 
881
 
 
882
    def _restoreHooks(self):
 
883
        bzrlib.branch.Branch.hooks = self._preserved_hooks
 
884
 
 
885
    def tearDown(self):
 
886
        self._runCleanups()
 
887
        unittest.TestCase.tearDown(self)
 
888
 
 
889
    def time(self, callable, *args, **kwargs):
 
890
        """Run callable and accrue the time it takes to the benchmark time.
 
891
        
 
892
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
893
        this will cause lsprofile statistics to be gathered and stored in
 
894
        self._benchcalls.
 
895
        """
 
896
        if self._benchtime is None:
 
897
            self._benchtime = 0
 
898
        start = time.time()
 
899
        try:
 
900
            if not self._gather_lsprof_in_benchmarks:
 
901
                return callable(*args, **kwargs)
 
902
            else:
 
903
                # record this benchmark
 
904
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
905
                stats.sort()
 
906
                self._benchcalls.append(((callable, args, kwargs), stats))
 
907
                return ret
 
908
        finally:
 
909
            self._benchtime += time.time() - start
 
910
 
 
911
    def _runCleanups(self):
 
912
        """Run registered cleanup functions. 
 
913
 
 
914
        This should only be called from TestCase.tearDown.
 
915
        """
 
916
        # TODO: Perhaps this should keep running cleanups even if 
 
917
        # one of them fails?
 
918
        for cleanup_fn in reversed(self._cleanups):
 
919
            cleanup_fn()
 
920
 
 
921
    def log(self, *args):
 
922
        mutter(*args)
 
923
 
 
924
    def _get_log(self, keep_log_file=False):
 
925
        """Return as a string the log for this test. If the file is still
 
926
        on disk and keep_log_file=False, delete the log file and store the
 
927
        content in self._log_contents."""
 
928
        # flush the log file, to get all content
 
929
        import bzrlib.trace
 
930
        bzrlib.trace._trace_file.flush()
 
931
        if self._log_contents:
 
932
            return self._log_contents
 
933
        if self._log_file_name is not None:
 
934
            logfile = open(self._log_file_name)
 
935
            try:
 
936
                log_contents = logfile.read()
 
937
            finally:
 
938
                logfile.close()
 
939
            if not keep_log_file:
 
940
                self._log_contents = log_contents
 
941
                try:
 
942
                    os.remove(self._log_file_name)
 
943
                except OSError, e:
 
944
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
945
                        print >>sys.stderr, ('Unable to delete log file '
 
946
                                             ' %r' % self._log_file_name)
 
947
                    else:
 
948
                        raise
 
949
            return log_contents
 
950
        else:
 
951
            return "DELETED log file to reduce memory footprint"
 
952
 
 
953
    def capture(self, cmd, retcode=0):
 
954
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
955
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
956
 
 
957
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
958
                         working_dir=None):
 
959
        """Invoke bzr and return (stdout, stderr).
 
960
 
 
961
        Useful for code that wants to check the contents of the
 
962
        output, the way error messages are presented, etc.
 
963
 
 
964
        This should be the main method for tests that want to exercise the
 
965
        overall behavior of the bzr application (rather than a unit test
 
966
        or a functional test of the library.)
 
967
 
 
968
        Much of the old code runs bzr by forking a new copy of Python, but
 
969
        that is slower, harder to debug, and generally not necessary.
 
970
 
 
971
        This runs bzr through the interface that catches and reports
 
972
        errors, and with logging set to something approximating the
 
973
        default, so that error reporting can be checked.
 
974
 
 
975
        :param argv: arguments to invoke bzr
 
976
        :param retcode: expected return code, or None for don't-care.
 
977
        :param encoding: encoding for sys.stdout and sys.stderr
 
978
        :param stdin: A string to be used as stdin for the command.
 
979
        :param working_dir: Change to this directory before running
 
980
        """
 
981
        if encoding is None:
 
982
            encoding = bzrlib.user_encoding
 
983
        if stdin is not None:
 
984
            stdin = StringIO(stdin)
 
985
        stdout = StringIOWrapper()
 
986
        stderr = StringIOWrapper()
 
987
        stdout.encoding = encoding
 
988
        stderr.encoding = encoding
 
989
 
 
990
        self.log('run bzr: %r', argv)
 
991
        # FIXME: don't call into logging here
 
992
        handler = logging.StreamHandler(stderr)
 
993
        handler.setLevel(logging.INFO)
 
994
        logger = logging.getLogger('')
 
995
        logger.addHandler(handler)
 
996
        old_ui_factory = bzrlib.ui.ui_factory
 
997
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
998
            stdout=stdout,
 
999
            stderr=stderr)
 
1000
        bzrlib.ui.ui_factory.stdin = stdin
 
1001
 
 
1002
        cwd = None
 
1003
        if working_dir is not None:
 
1004
            cwd = osutils.getcwd()
 
1005
            os.chdir(working_dir)
 
1006
 
 
1007
        try:
 
1008
            saved_debug_flags = frozenset(debug.debug_flags)
 
1009
            debug.debug_flags.clear()
 
1010
            try:
 
1011
                result = self.apply_redirected(stdin, stdout, stderr,
 
1012
                                               bzrlib.commands.run_bzr_catch_errors,
 
1013
                                               argv)
 
1014
            finally:
 
1015
                debug.debug_flags.update(saved_debug_flags)
 
1016
        finally:
 
1017
            logger.removeHandler(handler)
 
1018
            bzrlib.ui.ui_factory = old_ui_factory
 
1019
            if cwd is not None:
 
1020
                os.chdir(cwd)
 
1021
 
 
1022
        out = stdout.getvalue()
 
1023
        err = stderr.getvalue()
 
1024
        if out:
 
1025
            self.log('output:\n%r', out)
 
1026
        if err:
 
1027
            self.log('errors:\n%r', err)
 
1028
        if retcode is not None:
 
1029
            self.assertEquals(retcode, result)
 
1030
        return out, err
 
1031
 
 
1032
    def run_bzr(self, *args, **kwargs):
 
1033
        """Invoke bzr, as if it were run from the command line.
 
1034
 
 
1035
        This should be the main method for tests that want to exercise the
 
1036
        overall behavior of the bzr application (rather than a unit test
 
1037
        or a functional test of the library.)
 
1038
 
 
1039
        This sends the stdout/stderr results into the test's log,
 
1040
        where it may be useful for debugging.  See also run_captured.
 
1041
 
 
1042
        :param stdin: A string to be used as stdin for the command.
 
1043
        """
 
1044
        retcode = kwargs.pop('retcode', 0)
 
1045
        encoding = kwargs.pop('encoding', None)
 
1046
        stdin = kwargs.pop('stdin', None)
 
1047
        working_dir = kwargs.pop('working_dir', None)
 
1048
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
1049
                                     stdin=stdin, working_dir=working_dir)
 
1050
 
 
1051
    def run_bzr_decode(self, *args, **kwargs):
 
1052
        if 'encoding' in kwargs:
 
1053
            encoding = kwargs['encoding']
 
1054
        else:
 
1055
            encoding = bzrlib.user_encoding
 
1056
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1057
 
 
1058
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1059
        """Run bzr, and check that stderr contains the supplied regexes
 
1060
        
 
1061
        :param error_regexes: Sequence of regular expressions which 
 
1062
            must each be found in the error output. The relative ordering
 
1063
            is not enforced.
 
1064
        :param args: command-line arguments for bzr
 
1065
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1066
            This function changes the default value of retcode to be 3,
 
1067
            since in most cases this is run when you expect bzr to fail.
 
1068
        :return: (out, err) The actual output of running the command (in case you
 
1069
                 want to do more inspection)
 
1070
 
 
1071
        Examples of use:
 
1072
            # Make sure that commit is failing because there is nothing to do
 
1073
            self.run_bzr_error(['no changes to commit'],
 
1074
                               'commit', '-m', 'my commit comment')
 
1075
            # Make sure --strict is handling an unknown file, rather than
 
1076
            # giving us the 'nothing to do' error
 
1077
            self.build_tree(['unknown'])
 
1078
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1079
                               'commit', '--strict', '-m', 'my commit comment')
 
1080
        """
 
1081
        kwargs.setdefault('retcode', 3)
 
1082
        out, err = self.run_bzr(*args, **kwargs)
 
1083
        for regex in error_regexes:
 
1084
            self.assertContainsRe(err, regex)
 
1085
        return out, err
 
1086
 
 
1087
    def run_bzr_subprocess(self, *args, **kwargs):
 
1088
        """Run bzr in a subprocess for testing.
 
1089
 
 
1090
        This starts a new Python interpreter and runs bzr in there. 
 
1091
        This should only be used for tests that have a justifiable need for
 
1092
        this isolation: e.g. they are testing startup time, or signal
 
1093
        handling, or early startup code, etc.  Subprocess code can't be 
 
1094
        profiled or debugged so easily.
 
1095
 
 
1096
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1097
            None is supplied, the status code is not checked.
 
1098
        :param env_changes: A dictionary which lists changes to environment
 
1099
            variables. A value of None will unset the env variable.
 
1100
            The values must be strings. The change will only occur in the
 
1101
            child, so you don't need to fix the environment after running.
 
1102
        :param universal_newlines: Convert CRLF => LF
 
1103
        :param allow_plugins: By default the subprocess is run with
 
1104
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1105
            for system-wide plugins to create unexpected output on stderr,
 
1106
            which can cause unnecessary test failures.
 
1107
        """
 
1108
        env_changes = kwargs.get('env_changes', {})
 
1109
        working_dir = kwargs.get('working_dir', None)
 
1110
        allow_plugins = kwargs.get('allow_plugins', False)
 
1111
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1112
                                            working_dir=working_dir,
 
1113
                                            allow_plugins=allow_plugins)
 
1114
        # We distinguish between retcode=None and retcode not passed.
 
1115
        supplied_retcode = kwargs.get('retcode', 0)
 
1116
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1117
            universal_newlines=kwargs.get('universal_newlines', False),
 
1118
            process_args=args)
 
1119
 
 
1120
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1121
                             skip_if_plan_to_signal=False,
 
1122
                             working_dir=None,
 
1123
                             allow_plugins=False):
 
1124
        """Start bzr in a subprocess for testing.
 
1125
 
 
1126
        This starts a new Python interpreter and runs bzr in there.
 
1127
        This should only be used for tests that have a justifiable need for
 
1128
        this isolation: e.g. they are testing startup time, or signal
 
1129
        handling, or early startup code, etc.  Subprocess code can't be
 
1130
        profiled or debugged so easily.
 
1131
 
 
1132
        :param process_args: a list of arguments to pass to the bzr executable,
 
1133
            for example `['--version']`.
 
1134
        :param env_changes: A dictionary which lists changes to environment
 
1135
            variables. A value of None will unset the env variable.
 
1136
            The values must be strings. The change will only occur in the
 
1137
            child, so you don't need to fix the environment after running.
 
1138
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1139
            is not available.
 
1140
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1141
 
 
1142
        :returns: Popen object for the started process.
 
1143
        """
 
1144
        if skip_if_plan_to_signal:
 
1145
            if not getattr(os, 'kill', None):
 
1146
                raise TestSkipped("os.kill not available.")
 
1147
 
 
1148
        if env_changes is None:
 
1149
            env_changes = {}
 
1150
        old_env = {}
 
1151
 
 
1152
        def cleanup_environment():
 
1153
            for env_var, value in env_changes.iteritems():
 
1154
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1155
 
 
1156
        def restore_environment():
 
1157
            for env_var, value in old_env.iteritems():
 
1158
                osutils.set_or_unset_env(env_var, value)
 
1159
 
 
1160
        bzr_path = self.get_bzr_path()
 
1161
 
 
1162
        cwd = None
 
1163
        if working_dir is not None:
 
1164
            cwd = osutils.getcwd()
 
1165
            os.chdir(working_dir)
 
1166
 
 
1167
        try:
 
1168
            # win32 subprocess doesn't support preexec_fn
 
1169
            # so we will avoid using it on all platforms, just to
 
1170
            # make sure the code path is used, and we don't break on win32
 
1171
            cleanup_environment()
 
1172
            command = [sys.executable, bzr_path]
 
1173
            if not allow_plugins:
 
1174
                command.append('--no-plugins')
 
1175
            command.extend(process_args)
 
1176
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1177
        finally:
 
1178
            restore_environment()
 
1179
            if cwd is not None:
 
1180
                os.chdir(cwd)
 
1181
 
 
1182
        return process
 
1183
 
 
1184
    def _popen(self, *args, **kwargs):
 
1185
        """Place a call to Popen.
 
1186
 
 
1187
        Allows tests to override this method to intercept the calls made to
 
1188
        Popen for introspection.
 
1189
        """
 
1190
        return Popen(*args, **kwargs)
 
1191
 
 
1192
    def get_bzr_path(self):
 
1193
        """Return the path of the 'bzr' executable for this test suite."""
 
1194
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1195
        if not os.path.isfile(bzr_path):
 
1196
            # We are probably installed. Assume sys.argv is the right file
 
1197
            bzr_path = sys.argv[0]
 
1198
        return bzr_path
 
1199
 
 
1200
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1201
                              universal_newlines=False, process_args=None):
 
1202
        """Finish the execution of process.
 
1203
 
 
1204
        :param process: the Popen object returned from start_bzr_subprocess.
 
1205
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1206
            None is supplied, the status code is not checked.
 
1207
        :param send_signal: an optional signal to send to the process.
 
1208
        :param universal_newlines: Convert CRLF => LF
 
1209
        :returns: (stdout, stderr)
 
1210
        """
 
1211
        if send_signal is not None:
 
1212
            os.kill(process.pid, send_signal)
 
1213
        out, err = process.communicate()
 
1214
 
 
1215
        if universal_newlines:
 
1216
            out = out.replace('\r\n', '\n')
 
1217
            err = err.replace('\r\n', '\n')
 
1218
 
 
1219
        if retcode is not None and retcode != process.returncode:
 
1220
            if process_args is None:
 
1221
                process_args = "(unknown args)"
 
1222
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1223
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1224
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1225
                      % (process_args, retcode, process.returncode))
 
1226
        return [out, err]
 
1227
 
 
1228
    def check_inventory_shape(self, inv, shape):
 
1229
        """Compare an inventory to a list of expected names.
 
1230
 
 
1231
        Fail if they are not precisely equal.
 
1232
        """
 
1233
        extras = []
 
1234
        shape = list(shape)             # copy
 
1235
        for path, ie in inv.entries():
 
1236
            name = path.replace('\\', '/')
 
1237
            if ie.kind == 'dir':
 
1238
                name = name + '/'
 
1239
            if name in shape:
 
1240
                shape.remove(name)
 
1241
            else:
 
1242
                extras.append(name)
 
1243
        if shape:
 
1244
            self.fail("expected paths not found in inventory: %r" % shape)
 
1245
        if extras:
 
1246
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1247
 
 
1248
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1249
                         a_callable=None, *args, **kwargs):
 
1250
        """Call callable with redirected std io pipes.
 
1251
 
 
1252
        Returns the return code."""
 
1253
        if not callable(a_callable):
 
1254
            raise ValueError("a_callable must be callable.")
 
1255
        if stdin is None:
 
1256
            stdin = StringIO("")
 
1257
        if stdout is None:
 
1258
            if getattr(self, "_log_file", None) is not None:
 
1259
                stdout = self._log_file
 
1260
            else:
 
1261
                stdout = StringIO()
 
1262
        if stderr is None:
 
1263
            if getattr(self, "_log_file", None is not None):
 
1264
                stderr = self._log_file
 
1265
            else:
 
1266
                stderr = StringIO()
 
1267
        real_stdin = sys.stdin
 
1268
        real_stdout = sys.stdout
 
1269
        real_stderr = sys.stderr
 
1270
        try:
 
1271
            sys.stdout = stdout
 
1272
            sys.stderr = stderr
 
1273
            sys.stdin = stdin
 
1274
            return a_callable(*args, **kwargs)
 
1275
        finally:
 
1276
            sys.stdout = real_stdout
 
1277
            sys.stderr = real_stderr
 
1278
            sys.stdin = real_stdin
 
1279
 
 
1280
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1281
    def merge(self, branch_from, wt_to):
 
1282
        """A helper for tests to do a ui-less merge.
 
1283
 
 
1284
        This should move to the main library when someone has time to integrate
 
1285
        it in.
 
1286
        """
 
1287
        # minimal ui-less merge.
 
1288
        wt_to.branch.fetch(branch_from)
 
1289
        base_rev = common_ancestor(branch_from.last_revision(),
 
1290
                                   wt_to.branch.last_revision(),
 
1291
                                   wt_to.branch.repository)
 
1292
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1293
                    wt_to.branch.repository.revision_tree(base_rev),
 
1294
                    this_tree=wt_to)
 
1295
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1296
 
 
1297
 
 
1298
BzrTestBase = TestCase
 
1299
 
 
1300
 
 
1301
class TestCaseWithMemoryTransport(TestCase):
 
1302
    """Common test class for tests that do not need disk resources.
 
1303
 
 
1304
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1305
 
 
1306
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1307
 
 
1308
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1309
    a directory which does not exist. This serves to help ensure test isolation
 
1310
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1311
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1312
    file defaults for the transport in tests, nor does it obey the command line
 
1313
    override, so tests that accidentally write to the common directory should
 
1314
    be rare.
 
1315
    """
 
1316
 
 
1317
    TEST_ROOT = None
 
1318
    _TEST_NAME = 'test'
 
1319
 
 
1320
 
 
1321
    def __init__(self, methodName='runTest'):
 
1322
        # allow test parameterisation after test construction and before test
 
1323
        # execution. Variables that the parameteriser sets need to be 
 
1324
        # ones that are not set by setUp, or setUp will trash them.
 
1325
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1326
        self.transport_server = default_transport
 
1327
        self.transport_readonly_server = None
 
1328
 
 
1329
    def get_transport(self):
 
1330
        """Return a writeable transport for the test scratch space"""
 
1331
        t = get_transport(self.get_url())
 
1332
        self.assertFalse(t.is_readonly())
 
1333
        return t
 
1334
 
 
1335
    def get_readonly_transport(self):
 
1336
        """Return a readonly transport for the test scratch space
 
1337
        
 
1338
        This can be used to test that operations which should only need
 
1339
        readonly access in fact do not try to write.
 
1340
        """
 
1341
        t = get_transport(self.get_readonly_url())
 
1342
        self.assertTrue(t.is_readonly())
 
1343
        return t
 
1344
 
 
1345
    def create_transport_readonly_server(self):
 
1346
        """Create a transport server from class defined at init.
 
1347
 
 
1348
        This is mostly a hook for daughter classes.
 
1349
        """
 
1350
        return self.transport_readonly_server()
 
1351
 
 
1352
    def get_readonly_server(self):
 
1353
        """Get the server instance for the readonly transport
 
1354
 
 
1355
        This is useful for some tests with specific servers to do diagnostics.
 
1356
        """
 
1357
        if self.__readonly_server is None:
 
1358
            if self.transport_readonly_server is None:
 
1359
                # readonly decorator requested
 
1360
                # bring up the server
 
1361
                self.get_url()
 
1362
                self.__readonly_server = ReadonlyServer()
 
1363
                self.__readonly_server.setUp(self.__server)
 
1364
            else:
 
1365
                self.__readonly_server = self.create_transport_readonly_server()
 
1366
                self.__readonly_server.setUp()
 
1367
            self.addCleanup(self.__readonly_server.tearDown)
 
1368
        return self.__readonly_server
 
1369
 
 
1370
    def get_readonly_url(self, relpath=None):
 
1371
        """Get a URL for the readonly transport.
 
1372
 
 
1373
        This will either be backed by '.' or a decorator to the transport 
 
1374
        used by self.get_url()
 
1375
        relpath provides for clients to get a path relative to the base url.
 
1376
        These should only be downwards relative, not upwards.
 
1377
        """
 
1378
        base = self.get_readonly_server().get_url()
 
1379
        if relpath is not None:
 
1380
            if not base.endswith('/'):
 
1381
                base = base + '/'
 
1382
            base = base + relpath
 
1383
        return base
 
1384
 
 
1385
    def get_server(self):
 
1386
        """Get the read/write server instance.
 
1387
 
 
1388
        This is useful for some tests with specific servers that need
 
1389
        diagnostics.
 
1390
 
 
1391
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1392
        is no means to override it.
 
1393
        """
 
1394
        if self.__server is None:
 
1395
            self.__server = MemoryServer()
 
1396
            self.__server.setUp()
 
1397
            self.addCleanup(self.__server.tearDown)
 
1398
        return self.__server
 
1399
 
 
1400
    def get_url(self, relpath=None):
 
1401
        """Get a URL (or maybe a path) for the readwrite transport.
 
1402
 
 
1403
        This will either be backed by '.' or to an equivalent non-file based
 
1404
        facility.
 
1405
        relpath provides for clients to get a path relative to the base url.
 
1406
        These should only be downwards relative, not upwards.
 
1407
        """
 
1408
        base = self.get_server().get_url()
 
1409
        if relpath is not None and relpath != '.':
 
1410
            if not base.endswith('/'):
 
1411
                base = base + '/'
 
1412
            # XXX: Really base should be a url; we did after all call
 
1413
            # get_url()!  But sometimes it's just a path (from
 
1414
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1415
            # to a non-escaped local path.
 
1416
            if base.startswith('./') or base.startswith('/'):
 
1417
                base += relpath
 
1418
            else:
 
1419
                base += urlutils.escape(relpath)
 
1420
        return base
 
1421
 
 
1422
    def _make_test_root(self):
 
1423
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1424
            return
 
1425
        i = 0
 
1426
        while True:
 
1427
            root = u'test%04d.tmp' % i
 
1428
            try:
 
1429
                os.mkdir(root)
 
1430
            except OSError, e:
 
1431
                if e.errno == errno.EEXIST:
 
1432
                    i += 1
 
1433
                    continue
 
1434
                else:
 
1435
                    raise
 
1436
            # successfully created
 
1437
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1438
            break
 
1439
        # make a fake bzr directory there to prevent any tests propagating
 
1440
        # up onto the source directory's real branch
 
1441
        bzrdir.BzrDir.create_standalone_workingtree(
 
1442
            TestCaseWithMemoryTransport.TEST_ROOT)
 
1443
 
 
1444
    def makeAndChdirToTestDir(self):
 
1445
        """Create a temporary directories for this one test.
 
1446
        
 
1447
        This must set self.test_home_dir and self.test_dir and chdir to
 
1448
        self.test_dir.
 
1449
        
 
1450
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1451
        """
 
1452
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1453
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1454
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1455
        
 
1456
    def make_branch(self, relpath, format=None):
 
1457
        """Create a branch on the transport at relpath."""
 
1458
        repo = self.make_repository(relpath, format=format)
 
1459
        return repo.bzrdir.create_branch()
 
1460
 
 
1461
    def make_bzrdir(self, relpath, format=None):
 
1462
        try:
 
1463
            # might be a relative or absolute path
 
1464
            maybe_a_url = self.get_url(relpath)
 
1465
            segments = maybe_a_url.rsplit('/', 1)
 
1466
            t = get_transport(maybe_a_url)
 
1467
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1468
                try:
 
1469
                    t.mkdir('.')
 
1470
                except errors.FileExists:
 
1471
                    pass
 
1472
            if format is None:
 
1473
                format = 'default'
 
1474
            if isinstance(format, basestring):
 
1475
                format = bzrdir.format_registry.make_bzrdir(format)
 
1476
            return format.initialize_on_transport(t)
 
1477
        except errors.UninitializableFormat:
 
1478
            raise TestSkipped("Format %s is not initializable." % format)
 
1479
 
 
1480
    def make_repository(self, relpath, shared=False, format=None):
 
1481
        """Create a repository on our default transport at relpath."""
 
1482
        made_control = self.make_bzrdir(relpath, format=format)
 
1483
        return made_control.create_repository(shared=shared)
 
1484
 
 
1485
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1486
        """Create a branch on the default transport and a MemoryTree for it."""
 
1487
        b = self.make_branch(relpath, format=format)
 
1488
        return memorytree.MemoryTree.create_on_branch(b)
 
1489
 
 
1490
    def overrideEnvironmentForTesting(self):
 
1491
        os.environ['HOME'] = self.test_home_dir
 
1492
        os.environ['BZR_HOME'] = self.test_home_dir
 
1493
        
 
1494
    def setUp(self):
 
1495
        super(TestCaseWithMemoryTransport, self).setUp()
 
1496
        self._make_test_root()
 
1497
        _currentdir = os.getcwdu()
 
1498
        def _leaveDirectory():
 
1499
            os.chdir(_currentdir)
 
1500
        self.addCleanup(_leaveDirectory)
 
1501
        self.makeAndChdirToTestDir()
 
1502
        self.overrideEnvironmentForTesting()
 
1503
        self.__readonly_server = None
 
1504
        self.__server = None
 
1505
 
 
1506
     
 
1507
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1508
    """Derived class that runs a test within a temporary directory.
 
1509
 
 
1510
    This is useful for tests that need to create a branch, etc.
 
1511
 
 
1512
    The directory is created in a slightly complex way: for each
 
1513
    Python invocation, a new temporary top-level directory is created.
 
1514
    All test cases create their own directory within that.  If the
 
1515
    tests complete successfully, the directory is removed.
 
1516
 
 
1517
    InTempDir is an old alias for FunctionalTestCase.
 
1518
    """
 
1519
 
 
1520
    OVERRIDE_PYTHON = 'python'
 
1521
 
 
1522
    def check_file_contents(self, filename, expect):
 
1523
        self.log("check contents of file %s" % filename)
 
1524
        contents = file(filename, 'r').read()
 
1525
        if contents != expect:
 
1526
            self.log("expected: %r" % expect)
 
1527
            self.log("actually: %r" % contents)
 
1528
            self.fail("contents of %s not as expected" % filename)
 
1529
 
 
1530
    def makeAndChdirToTestDir(self):
 
1531
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1532
        
 
1533
        For TestCaseInTempDir we create a temporary directory based on the test
 
1534
        name and then create two subdirs - test and home under it.
 
1535
        """
 
1536
        if NUMBERED_DIRS:       # strongly recommended on Windows
 
1537
                                # due the path length limitation (260 chars)
 
1538
            candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
 
1539
                                             int(self.number/1000),
 
1540
                                             self.number)
 
1541
            os.makedirs(candidate_dir)
 
1542
            self.test_home_dir = candidate_dir + '/home'
 
1543
            os.mkdir(self.test_home_dir)
 
1544
            self.test_dir = candidate_dir + '/work'
 
1545
            os.mkdir(self.test_dir)
 
1546
            os.chdir(self.test_dir)
 
1547
            # put name of test inside
 
1548
            f = file(candidate_dir + '/name', 'w')
 
1549
            f.write(self.id())
 
1550
            f.close()
 
1551
            return
 
1552
        # Else NAMED DIRS
 
1553
        # shorten the name, to avoid test failures due to path length
 
1554
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1555
                   .replace('__main__.', '')[-100:]
 
1556
        # it's possible the same test class is run several times for
 
1557
        # parameterized tests, so make sure the names don't collide.  
 
1558
        i = 0
 
1559
        while True:
 
1560
            if i > 0:
 
1561
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1562
            else:
 
1563
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1564
            if os.path.exists(candidate_dir):
 
1565
                i = i + 1
 
1566
                continue
 
1567
            else:
 
1568
                os.mkdir(candidate_dir)
 
1569
                self.test_home_dir = candidate_dir + '/home'
 
1570
                os.mkdir(self.test_home_dir)
 
1571
                self.test_dir = candidate_dir + '/work'
 
1572
                os.mkdir(self.test_dir)
 
1573
                os.chdir(self.test_dir)
 
1574
                break
 
1575
 
 
1576
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1577
        """Build a test tree according to a pattern.
 
1578
 
 
1579
        shape is a sequence of file specifications.  If the final
 
1580
        character is '/', a directory is created.
 
1581
 
 
1582
        This assumes that all the elements in the tree being built are new.
 
1583
 
 
1584
        This doesn't add anything to a branch.
 
1585
        :param line_endings: Either 'binary' or 'native'
 
1586
                             in binary mode, exact contents are written
 
1587
                             in native mode, the line endings match the
 
1588
                             default platform endings.
 
1589
 
 
1590
        :param transport: A transport to write to, for building trees on 
 
1591
                          VFS's. If the transport is readonly or None,
 
1592
                          "." is opened automatically.
 
1593
        """
 
1594
        # It's OK to just create them using forward slashes on windows.
 
1595
        if transport is None or transport.is_readonly():
 
1596
            transport = get_transport(".")
 
1597
        for name in shape:
 
1598
            self.assert_(isinstance(name, basestring))
 
1599
            if name[-1] == '/':
 
1600
                transport.mkdir(urlutils.escape(name[:-1]))
 
1601
            else:
 
1602
                if line_endings == 'binary':
 
1603
                    end = '\n'
 
1604
                elif line_endings == 'native':
 
1605
                    end = os.linesep
 
1606
                else:
 
1607
                    raise errors.BzrError(
 
1608
                        'Invalid line ending request %r' % line_endings)
 
1609
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1610
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1611
 
 
1612
    def build_tree_contents(self, shape):
 
1613
        build_tree_contents(shape)
 
1614
 
 
1615
    def assertFileEqual(self, content, path):
 
1616
        """Fail if path does not contain 'content'."""
 
1617
        self.failUnlessExists(path)
 
1618
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
1619
        self.assertEqualDiff(content, open(path, 'r').read())
 
1620
 
 
1621
    def failUnlessExists(self, path):
 
1622
        """Fail unless path, which may be abs or relative, exists."""
 
1623
        self.failUnless(osutils.lexists(path),path+" does not exist")
 
1624
 
 
1625
    def failIfExists(self, path):
 
1626
        """Fail if path, which may be abs or relative, exists."""
 
1627
        self.failIf(osutils.lexists(path),path+" exists")
 
1628
 
 
1629
 
 
1630
class TestCaseWithTransport(TestCaseInTempDir):
 
1631
    """A test case that provides get_url and get_readonly_url facilities.
 
1632
 
 
1633
    These back onto two transport servers, one for readonly access and one for
 
1634
    read write access.
 
1635
 
 
1636
    If no explicit class is provided for readonly access, a
 
1637
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1638
    based read write transports.
 
1639
 
 
1640
    If an explicit class is provided for readonly access, that server and the 
 
1641
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1642
    """
 
1643
 
 
1644
    def create_transport_server(self):
 
1645
        """Create a transport server from class defined at init.
 
1646
 
 
1647
        This is mostly a hook for daughter classes.
 
1648
        """
 
1649
        return self.transport_server()
 
1650
 
 
1651
    def get_server(self):
 
1652
        """See TestCaseWithMemoryTransport.
 
1653
 
 
1654
        This is useful for some tests with specific servers that need
 
1655
        diagnostics.
 
1656
        """
 
1657
        if self.__server is None:
 
1658
            self.__server = self.create_transport_server()
 
1659
            self.__server.setUp()
 
1660
            self.addCleanup(self.__server.tearDown)
 
1661
        return self.__server
 
1662
 
 
1663
    def make_branch_and_tree(self, relpath, format=None):
 
1664
        """Create a branch on the transport and a tree locally.
 
1665
 
 
1666
        If the transport is not a LocalTransport, the Tree can't be created on
 
1667
        the transport.  In that case the working tree is created in the local
 
1668
        directory, and the returned tree's branch and repository will also be
 
1669
        accessed locally.
 
1670
 
 
1671
        This will fail if the original default transport for this test
 
1672
        case wasn't backed by the working directory, as the branch won't
 
1673
        be on disk for us to open it.  
 
1674
 
 
1675
        :param format: The BzrDirFormat.
 
1676
        :returns: the WorkingTree.
 
1677
        """
 
1678
        # TODO: always use the local disk path for the working tree,
 
1679
        # this obviously requires a format that supports branch references
 
1680
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1681
        # RBC 20060208
 
1682
        b = self.make_branch(relpath, format=format)
 
1683
        try:
 
1684
            return b.bzrdir.create_workingtree()
 
1685
        except errors.NotLocalUrl:
 
1686
            # We can only make working trees locally at the moment.  If the
 
1687
            # transport can't support them, then reopen the branch on a local
 
1688
            # transport, and create the working tree there.  
 
1689
            #
 
1690
            # Possibly we should instead keep
 
1691
            # the non-disk-backed branch and create a local checkout?
 
1692
            bd = bzrdir.BzrDir.open(relpath)
 
1693
            return bd.create_workingtree()
 
1694
 
 
1695
    def assertIsDirectory(self, relpath, transport):
 
1696
        """Assert that relpath within transport is a directory.
 
1697
 
 
1698
        This may not be possible on all transports; in that case it propagates
 
1699
        a TransportNotPossible.
 
1700
        """
 
1701
        try:
 
1702
            mode = transport.stat(relpath).st_mode
 
1703
        except errors.NoSuchFile:
 
1704
            self.fail("path %s is not a directory; no such file"
 
1705
                      % (relpath))
 
1706
        if not stat.S_ISDIR(mode):
 
1707
            self.fail("path %s is not a directory; has mode %#o"
 
1708
                      % (relpath, mode))
 
1709
 
 
1710
    def setUp(self):
 
1711
        super(TestCaseWithTransport, self).setUp()
 
1712
        self.__server = None
 
1713
 
 
1714
 
 
1715
class ChrootedTestCase(TestCaseWithTransport):
 
1716
    """A support class that provides readonly urls outside the local namespace.
 
1717
 
 
1718
    This is done by checking if self.transport_server is a MemoryServer. if it
 
1719
    is then we are chrooted already, if it is not then an HttpServer is used
 
1720
    for readonly urls.
 
1721
 
 
1722
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
1723
                       be used without needed to redo it when a different 
 
1724
                       subclass is in use ?
 
1725
    """
 
1726
 
 
1727
    def setUp(self):
 
1728
        super(ChrootedTestCase, self).setUp()
 
1729
        if not self.transport_server == MemoryServer:
 
1730
            self.transport_readonly_server = HttpServer
 
1731
 
 
1732
 
 
1733
def filter_suite_by_re(suite, pattern):
 
1734
    result = TestUtil.TestSuite()
 
1735
    filter_re = re.compile(pattern)
 
1736
    for test in iter_suite_tests(suite):
 
1737
        if filter_re.search(test.id()):
 
1738
            result.addTest(test)
 
1739
    return result
 
1740
 
 
1741
 
 
1742
def sort_suite_by_re(suite, pattern):
 
1743
    first = []
 
1744
    second = []
 
1745
    filter_re = re.compile(pattern)
 
1746
    for test in iter_suite_tests(suite):
 
1747
        if filter_re.search(test.id()):
 
1748
            first.append(test)
 
1749
        else:
 
1750
            second.append(test)
 
1751
    return TestUtil.TestSuite(first + second)
 
1752
 
 
1753
 
 
1754
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
1755
              stop_on_failure=False, keep_output=False,
 
1756
              transport=None, lsprof_timed=None, bench_history=None,
 
1757
              matching_tests_first=None,
 
1758
              numbered_dirs=None):
 
1759
    global NUMBERED_DIRS
 
1760
    if numbered_dirs is not None:
 
1761
        NUMBERED_DIRS = bool(numbered_dirs)
 
1762
 
 
1763
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
1764
    if verbose:
 
1765
        verbosity = 2
 
1766
    else:
 
1767
        verbosity = 1
 
1768
    runner = TextTestRunner(stream=sys.stdout,
 
1769
                            descriptions=0,
 
1770
                            verbosity=verbosity,
 
1771
                            keep_output=keep_output,
 
1772
                            bench_history=bench_history)
 
1773
    runner.stop_on_failure=stop_on_failure
 
1774
    if pattern != '.*':
 
1775
        if matching_tests_first:
 
1776
            suite = sort_suite_by_re(suite, pattern)
 
1777
        else:
 
1778
            suite = filter_suite_by_re(suite, pattern)
 
1779
    result = runner.run(suite)
 
1780
    return result.wasSuccessful()
 
1781
 
 
1782
 
 
1783
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
1784
             keep_output=False,
 
1785
             transport=None,
 
1786
             test_suite_factory=None,
 
1787
             lsprof_timed=None,
 
1788
             bench_history=None,
 
1789
             matching_tests_first=None,
 
1790
             numbered_dirs=None):
 
1791
    """Run the whole test suite under the enhanced runner"""
 
1792
    # XXX: Very ugly way to do this...
 
1793
    # Disable warning about old formats because we don't want it to disturb
 
1794
    # any blackbox tests.
 
1795
    from bzrlib import repository
 
1796
    repository._deprecation_warning_done = True
 
1797
 
 
1798
    global default_transport
 
1799
    if transport is None:
 
1800
        transport = default_transport
 
1801
    old_transport = default_transport
 
1802
    default_transport = transport
 
1803
    try:
 
1804
        if test_suite_factory is None:
 
1805
            suite = test_suite()
 
1806
        else:
 
1807
            suite = test_suite_factory()
 
1808
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
1809
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
1810
                     transport=transport,
 
1811
                     lsprof_timed=lsprof_timed,
 
1812
                     bench_history=bench_history,
 
1813
                     matching_tests_first=matching_tests_first,
 
1814
                     numbered_dirs=numbered_dirs)
 
1815
    finally:
 
1816
        default_transport = old_transport
 
1817
 
 
1818
 
 
1819
def test_suite():
 
1820
    """Build and return TestSuite for the whole of bzrlib.
 
1821
    
 
1822
    This function can be replaced if you need to change the default test
 
1823
    suite on a global basis, but it is not encouraged.
 
1824
    """
 
1825
    testmod_names = [
 
1826
                   'bzrlib.tests.test_ancestry',
 
1827
                   'bzrlib.tests.test_annotate',
 
1828
                   'bzrlib.tests.test_api',
 
1829
                   'bzrlib.tests.test_atomicfile',
 
1830
                   'bzrlib.tests.test_bad_files',
 
1831
                   'bzrlib.tests.test_branch',
 
1832
                   'bzrlib.tests.test_bundle',
 
1833
                   'bzrlib.tests.test_bzrdir',
 
1834
                   'bzrlib.tests.test_cache_utf8',
 
1835
                   'bzrlib.tests.test_commands',
 
1836
                   'bzrlib.tests.test_commit',
 
1837
                   'bzrlib.tests.test_commit_merge',
 
1838
                   'bzrlib.tests.test_config',
 
1839
                   'bzrlib.tests.test_conflicts',
 
1840
                   'bzrlib.tests.test_decorators',
 
1841
                   'bzrlib.tests.test_delta',
 
1842
                   'bzrlib.tests.test_diff',
 
1843
                   'bzrlib.tests.test_doc_generate',
 
1844
                   'bzrlib.tests.test_errors',
 
1845
                   'bzrlib.tests.test_escaped_store',
 
1846
                   'bzrlib.tests.test_fetch',
 
1847
                   'bzrlib.tests.test_ftp_transport',
 
1848
                   'bzrlib.tests.test_generate_docs',
 
1849
                   'bzrlib.tests.test_generate_ids',
 
1850
                   'bzrlib.tests.test_globbing',
 
1851
                   'bzrlib.tests.test_gpg',
 
1852
                   'bzrlib.tests.test_graph',
 
1853
                   'bzrlib.tests.test_hashcache',
 
1854
                   'bzrlib.tests.test_http',
 
1855
                   'bzrlib.tests.test_http_response',
 
1856
                   'bzrlib.tests.test_https_ca_bundle',
 
1857
                   'bzrlib.tests.test_identitymap',
 
1858
                   'bzrlib.tests.test_ignores',
 
1859
                   'bzrlib.tests.test_inv',
 
1860
                   'bzrlib.tests.test_knit',
 
1861
                   'bzrlib.tests.test_lazy_import',
 
1862
                   'bzrlib.tests.test_lazy_regex',
 
1863
                   'bzrlib.tests.test_lockdir',
 
1864
                   'bzrlib.tests.test_lockable_files',
 
1865
                   'bzrlib.tests.test_log',
 
1866
                   'bzrlib.tests.test_memorytree',
 
1867
                   'bzrlib.tests.test_merge',
 
1868
                   'bzrlib.tests.test_merge3',
 
1869
                   'bzrlib.tests.test_merge_core',
 
1870
                   'bzrlib.tests.test_missing',
 
1871
                   'bzrlib.tests.test_msgeditor',
 
1872
                   'bzrlib.tests.test_nonascii',
 
1873
                   'bzrlib.tests.test_options',
 
1874
                   'bzrlib.tests.test_osutils',
 
1875
                   'bzrlib.tests.test_osutils_encodings',
 
1876
                   'bzrlib.tests.test_patch',
 
1877
                   'bzrlib.tests.test_patches',
 
1878
                   'bzrlib.tests.test_permissions',
 
1879
                   'bzrlib.tests.test_plugins',
 
1880
                   'bzrlib.tests.test_progress',
 
1881
                   'bzrlib.tests.test_reconcile',
 
1882
                   'bzrlib.tests.test_registry',
 
1883
                   'bzrlib.tests.test_repository',
 
1884
                   'bzrlib.tests.test_revert',
 
1885
                   'bzrlib.tests.test_revision',
 
1886
                   'bzrlib.tests.test_revisionnamespaces',
 
1887
                   'bzrlib.tests.test_revisiontree',
 
1888
                   'bzrlib.tests.test_rio',
 
1889
                   'bzrlib.tests.test_sampler',
 
1890
                   'bzrlib.tests.test_selftest',
 
1891
                   'bzrlib.tests.test_setup',
 
1892
                   'bzrlib.tests.test_sftp_transport',
 
1893
                   'bzrlib.tests.test_smart_add',
 
1894
                   'bzrlib.tests.test_smart_transport',
 
1895
                   'bzrlib.tests.test_source',
 
1896
                   'bzrlib.tests.test_status',
 
1897
                   'bzrlib.tests.test_store',
 
1898
                   'bzrlib.tests.test_symbol_versioning',
 
1899
                   'bzrlib.tests.test_tag',
 
1900
                   'bzrlib.tests.test_testament',
 
1901
                   'bzrlib.tests.test_textfile',
 
1902
                   'bzrlib.tests.test_textmerge',
 
1903
                   'bzrlib.tests.test_trace',
 
1904
                   'bzrlib.tests.test_transactions',
 
1905
                   'bzrlib.tests.test_transform',
 
1906
                   'bzrlib.tests.test_transport',
 
1907
                   'bzrlib.tests.test_tree',
 
1908
                   'bzrlib.tests.test_treebuilder',
 
1909
                   'bzrlib.tests.test_tsort',
 
1910
                   'bzrlib.tests.test_tuned_gzip',
 
1911
                   'bzrlib.tests.test_ui',
 
1912
                   'bzrlib.tests.test_upgrade',
 
1913
                   'bzrlib.tests.test_urlutils',
 
1914
                   'bzrlib.tests.test_versionedfile',
 
1915
                   'bzrlib.tests.test_version',
 
1916
                   'bzrlib.tests.test_version_info',
 
1917
                   'bzrlib.tests.test_weave',
 
1918
                   'bzrlib.tests.test_whitebox',
 
1919
                   'bzrlib.tests.test_workingtree',
 
1920
                   'bzrlib.tests.test_wsgi',
 
1921
                   'bzrlib.tests.test_xml',
 
1922
                   ]
 
1923
    test_transport_implementations = [
 
1924
        'bzrlib.tests.test_transport_implementations',
 
1925
        'bzrlib.tests.test_read_bundle',
 
1926
        ]
 
1927
    suite = TestUtil.TestSuite()
 
1928
    loader = TestUtil.TestLoader()
 
1929
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
1930
    from bzrlib.transport import TransportTestProviderAdapter
 
1931
    adapter = TransportTestProviderAdapter()
 
1932
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1933
    for package in packages_to_test():
 
1934
        suite.addTest(package.test_suite())
 
1935
    for m in MODULES_TO_TEST:
 
1936
        suite.addTest(loader.loadTestsFromModule(m))
 
1937
    for m in MODULES_TO_DOCTEST:
 
1938
        try:
 
1939
            suite.addTest(doctest.DocTestSuite(m))
 
1940
        except ValueError, e:
 
1941
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
1942
            raise
 
1943
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1944
        if getattr(plugin, 'test_suite', None) is not None:
 
1945
            default_encoding = sys.getdefaultencoding()
 
1946
            try:
 
1947
                plugin_suite = plugin.test_suite()
 
1948
            except ImportError, e:
 
1949
                bzrlib.trace.warning(
 
1950
                    'Unable to test plugin "%s": %s', name, e)
 
1951
            else:
 
1952
                suite.addTest(plugin_suite)
 
1953
            if default_encoding != sys.getdefaultencoding():
 
1954
                bzrlib.trace.warning(
 
1955
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
1956
                    sys.getdefaultencoding())
 
1957
                reload(sys)
 
1958
                sys.setdefaultencoding(default_encoding)
 
1959
    return suite
 
1960
 
 
1961
 
 
1962
def adapt_modules(mods_list, adapter, loader, suite):
 
1963
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1964
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
1965
        suite.addTests(adapter.adapt(test))
 
1966
 
 
1967
 
 
1968
def clean_selftest_output(root=None, quiet=False):
 
1969
    """Remove all selftest output directories from root directory.
 
1970
 
 
1971
    :param  root:   root directory for clean
 
1972
                    (if ommitted or None then clean current directory).
 
1973
    :param  quiet:  suppress report about deleting directories
 
1974
    """
 
1975
    import re
 
1976
    import shutil
 
1977
 
 
1978
    re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
 
1979
    if root is None:
 
1980
        root = u'.'
 
1981
    for i in os.listdir(root):
 
1982
        if os.path.isdir(i) and re_dir.match(i):
 
1983
            if not quiet:
 
1984
                print 'delete directory:', i
 
1985
            shutil.rmtree(i)