/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2007-03-06 15:43:47 UTC
  • mto: This revision was merged to the branch mainline in revision 2321.
  • Revision ID: john@arbash-meinel.com-20070306154347-ilhp9u2chs2nxm3b
The parameter is 'conflict_path' not 'conflict_file_path'

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import doctest
 
33
import errno
 
34
import logging
 
35
import os
 
36
import re
 
37
import shlex
 
38
import stat
 
39
from subprocess import Popen, PIPE
 
40
import sys
 
41
import tempfile
 
42
import unittest
 
43
import time
 
44
 
 
45
 
 
46
from bzrlib import (
 
47
    bzrdir,
 
48
    debug,
 
49
    errors,
 
50
    memorytree,
 
51
    osutils,
 
52
    progress,
 
53
    urlutils,
 
54
    )
 
55
import bzrlib.branch
 
56
import bzrlib.commands
 
57
import bzrlib.bundle.serializer
 
58
import bzrlib.export
 
59
import bzrlib.inventory
 
60
import bzrlib.iterablefile
 
61
import bzrlib.lockdir
 
62
try:
 
63
    import bzrlib.lsprof
 
64
except ImportError:
 
65
    # lsprof not available
 
66
    pass
 
67
from bzrlib.merge import merge_inner
 
68
import bzrlib.merge3
 
69
import bzrlib.osutils
 
70
import bzrlib.plugin
 
71
from bzrlib.revision import common_ancestor
 
72
import bzrlib.store
 
73
from bzrlib import symbol_versioning
 
74
import bzrlib.trace
 
75
from bzrlib.transport import get_transport
 
76
import bzrlib.transport
 
77
from bzrlib.transport.local import LocalURLServer
 
78
from bzrlib.transport.memory import MemoryServer
 
79
from bzrlib.transport.readonly import ReadonlyServer
 
80
from bzrlib.trace import mutter, note
 
81
from bzrlib.tests import TestUtil
 
82
from bzrlib.tests.HttpServer import HttpServer
 
83
from bzrlib.tests.TestUtil import (
 
84
                          TestSuite,
 
85
                          TestLoader,
 
86
                          )
 
87
from bzrlib.tests.treeshape import build_tree_contents
 
88
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
89
 
 
90
default_transport = LocalURLServer
 
91
 
 
92
MODULES_TO_TEST = []
 
93
MODULES_TO_DOCTEST = [
 
94
                      bzrlib.bundle.serializer,
 
95
                      bzrlib.errors,
 
96
                      bzrlib.export,
 
97
                      bzrlib.inventory,
 
98
                      bzrlib.iterablefile,
 
99
                      bzrlib.lockdir,
 
100
                      bzrlib.merge3,
 
101
                      bzrlib.option,
 
102
                      bzrlib.store,
 
103
                      ]
 
104
 
 
105
 
 
106
def packages_to_test():
 
107
    """Return a list of packages to test.
 
108
 
 
109
    The packages are not globally imported so that import failures are
 
110
    triggered when running selftest, not when importing the command.
 
111
    """
 
112
    import bzrlib.doc
 
113
    import bzrlib.tests.blackbox
 
114
    import bzrlib.tests.branch_implementations
 
115
    import bzrlib.tests.bzrdir_implementations
 
116
    import bzrlib.tests.interrepository_implementations
 
117
    import bzrlib.tests.interversionedfile_implementations
 
118
    import bzrlib.tests.intertree_implementations
 
119
    import bzrlib.tests.repository_implementations
 
120
    import bzrlib.tests.revisionstore_implementations
 
121
    import bzrlib.tests.tree_implementations
 
122
    import bzrlib.tests.workingtree_implementations
 
123
    return [
 
124
            bzrlib.doc,
 
125
            bzrlib.tests.blackbox,
 
126
            bzrlib.tests.branch_implementations,
 
127
            bzrlib.tests.bzrdir_implementations,
 
128
            bzrlib.tests.interrepository_implementations,
 
129
            bzrlib.tests.interversionedfile_implementations,
 
130
            bzrlib.tests.intertree_implementations,
 
131
            bzrlib.tests.repository_implementations,
 
132
            bzrlib.tests.revisionstore_implementations,
 
133
            bzrlib.tests.tree_implementations,
 
134
            bzrlib.tests.workingtree_implementations,
 
135
            ]
 
136
 
 
137
 
 
138
class ExtendedTestResult(unittest._TextTestResult):
 
139
    """Accepts, reports and accumulates the results of running tests.
 
140
 
 
141
    Compared to this unittest version this class adds support for profiling,
 
142
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
143
    There are further-specialized subclasses for different types of display.
 
144
    """
 
145
 
 
146
    stop_early = False
 
147
    
 
148
    def __init__(self, stream, descriptions, verbosity,
 
149
                 bench_history=None,
 
150
                 num_tests=None,
 
151
                 ):
 
152
        """Construct new TestResult.
 
153
 
 
154
        :param bench_history: Optionally, a writable file object to accumulate
 
155
            benchmark results.
 
156
        """
 
157
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
158
        if bench_history is not None:
 
159
            from bzrlib.version import _get_bzr_source_tree
 
160
            src_tree = _get_bzr_source_tree()
 
161
            if src_tree:
 
162
                try:
 
163
                    revision_id = src_tree.get_parent_ids()[0]
 
164
                except IndexError:
 
165
                    # XXX: if this is a brand new tree, do the same as if there
 
166
                    # is no branch.
 
167
                    revision_id = ''
 
168
            else:
 
169
                # XXX: If there's no branch, what should we do?
 
170
                revision_id = ''
 
171
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
172
        self._bench_history = bench_history
 
173
        self.ui = bzrlib.ui.ui_factory
 
174
        self.num_tests = num_tests
 
175
        self.error_count = 0
 
176
        self.failure_count = 0
 
177
        self.skip_count = 0
 
178
        self.count = 0
 
179
        self._overall_start_time = time.time()
 
180
    
 
181
    def extractBenchmarkTime(self, testCase):
 
182
        """Add a benchmark time for the current test case."""
 
183
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
184
    
 
185
    def _elapsedTestTimeString(self):
 
186
        """Return a time string for the overall time the current test has taken."""
 
187
        return self._formatTime(time.time() - self._start_time)
 
188
 
 
189
    def _testTimeString(self):
 
190
        if self._benchmarkTime is not None:
 
191
            return "%s/%s" % (
 
192
                self._formatTime(self._benchmarkTime),
 
193
                self._elapsedTestTimeString())
 
194
        else:
 
195
            return "           %s" % self._elapsedTestTimeString()
 
196
 
 
197
    def _formatTime(self, seconds):
 
198
        """Format seconds as milliseconds with leading spaces."""
 
199
        # some benchmarks can take thousands of seconds to run, so we need 8
 
200
        # places
 
201
        return "%8dms" % (1000 * seconds)
 
202
 
 
203
    def _shortened_test_description(self, test):
 
204
        what = test.id()
 
205
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
206
        return what
 
207
 
 
208
    def startTest(self, test):
 
209
        unittest.TestResult.startTest(self, test)
 
210
        self.report_test_start(test)
 
211
        self._recordTestStartTime()
 
212
 
 
213
    def _recordTestStartTime(self):
 
214
        """Record that a test has started."""
 
215
        self._start_time = time.time()
 
216
 
 
217
    def addError(self, test, err):
 
218
        if isinstance(err[1], TestSkipped):
 
219
            return self.addSkipped(test, err)    
 
220
        unittest.TestResult.addError(self, test, err)
 
221
        # We can only do this if we have one of our TestCases, not if
 
222
        # we have a doctest.
 
223
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
224
        if setKeepLogfile is not None:
 
225
            setKeepLogfile()
 
226
        self.extractBenchmarkTime(test)
 
227
        self.report_error(test, err)
 
228
        if self.stop_early:
 
229
            self.stop()
 
230
 
 
231
    def addFailure(self, test, err):
 
232
        unittest.TestResult.addFailure(self, test, err)
 
233
        # We can only do this if we have one of our TestCases, not if
 
234
        # we have a doctest.
 
235
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
236
        if setKeepLogfile is not None:
 
237
            setKeepLogfile()
 
238
        self.extractBenchmarkTime(test)
 
239
        self.report_failure(test, err)
 
240
        if self.stop_early:
 
241
            self.stop()
 
242
 
 
243
    def addSuccess(self, test):
 
244
        self.extractBenchmarkTime(test)
 
245
        if self._bench_history is not None:
 
246
            if self._benchmarkTime is not None:
 
247
                self._bench_history.write("%s %s\n" % (
 
248
                    self._formatTime(self._benchmarkTime),
 
249
                    test.id()))
 
250
        self.report_success(test)
 
251
        unittest.TestResult.addSuccess(self, test)
 
252
 
 
253
    def addSkipped(self, test, skip_excinfo):
 
254
        self.extractBenchmarkTime(test)
 
255
        self.report_skip(test, skip_excinfo)
 
256
        # seems best to treat this as success from point-of-view of unittest
 
257
        # -- it actually does nothing so it barely matters :)
 
258
        try:
 
259
            test.tearDown()
 
260
        except KeyboardInterrupt:
 
261
            raise
 
262
        except:
 
263
            self.addError(test, test.__exc_info())
 
264
        else:
 
265
            unittest.TestResult.addSuccess(self, test)
 
266
 
 
267
    def printErrorList(self, flavour, errors):
 
268
        for test, err in errors:
 
269
            self.stream.writeln(self.separator1)
 
270
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
271
            if getattr(test, '_get_log', None) is not None:
 
272
                print >>self.stream
 
273
                print >>self.stream, \
 
274
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
275
                print >>self.stream, test._get_log()
 
276
                print >>self.stream, \
 
277
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
278
            self.stream.writeln(self.separator2)
 
279
            self.stream.writeln("%s" % err)
 
280
 
 
281
    def finished(self):
 
282
        pass
 
283
 
 
284
    def report_cleaning_up(self):
 
285
        pass
 
286
 
 
287
    def report_success(self, test):
 
288
        pass
 
289
 
 
290
 
 
291
class TextTestResult(ExtendedTestResult):
 
292
    """Displays progress and results of tests in text form"""
 
293
 
 
294
    def __init__(self, *args, **kw):
 
295
        ExtendedTestResult.__init__(self, *args, **kw)
 
296
        self.pb = self.ui.nested_progress_bar()
 
297
        self.pb.show_pct = False
 
298
        self.pb.show_spinner = False
 
299
        self.pb.show_eta = False, 
 
300
        self.pb.show_count = False
 
301
        self.pb.show_bar = False
 
302
 
 
303
    def report_starting(self):
 
304
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
305
 
 
306
    def _progress_prefix_text(self):
 
307
        a = '[%d' % self.count
 
308
        if self.num_tests is not None:
 
309
            a +='/%d' % self.num_tests
 
310
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
311
        if self.error_count:
 
312
            a += ', %d errors' % self.error_count
 
313
        if self.failure_count:
 
314
            a += ', %d failed' % self.failure_count
 
315
        if self.skip_count:
 
316
            a += ', %d skipped' % self.skip_count
 
317
        a += ']'
 
318
        return a
 
319
 
 
320
    def report_test_start(self, test):
 
321
        self.count += 1
 
322
        self.pb.update(
 
323
                self._progress_prefix_text()
 
324
                + ' ' 
 
325
                + self._shortened_test_description(test))
 
326
 
 
327
    def report_error(self, test, err):
 
328
        self.error_count += 1
 
329
        self.pb.note('ERROR: %s\n    %s\n', 
 
330
            self._shortened_test_description(test),
 
331
            err[1],
 
332
            )
 
333
 
 
334
    def report_failure(self, test, err):
 
335
        self.failure_count += 1
 
336
        self.pb.note('FAIL: %s\n    %s\n', 
 
337
            self._shortened_test_description(test),
 
338
            err[1],
 
339
            )
 
340
 
 
341
    def report_skip(self, test, skip_excinfo):
 
342
        self.skip_count += 1
 
343
        if False:
 
344
            # at the moment these are mostly not things we can fix
 
345
            # and so they just produce stipple; use the verbose reporter
 
346
            # to see them.
 
347
            if False:
 
348
                # show test and reason for skip
 
349
                self.pb.note('SKIP: %s\n    %s\n', 
 
350
                    self._shortened_test_description(test),
 
351
                    skip_excinfo[1])
 
352
            else:
 
353
                # since the class name was left behind in the still-visible
 
354
                # progress bar...
 
355
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
356
 
 
357
    def report_cleaning_up(self):
 
358
        self.pb.update('cleaning up...')
 
359
 
 
360
    def finished(self):
 
361
        self.pb.finished()
 
362
 
 
363
 
 
364
class VerboseTestResult(ExtendedTestResult):
 
365
    """Produce long output, with one line per test run plus times"""
 
366
 
 
367
    def _ellipsize_to_right(self, a_string, final_width):
 
368
        """Truncate and pad a string, keeping the right hand side"""
 
369
        if len(a_string) > final_width:
 
370
            result = '...' + a_string[3-final_width:]
 
371
        else:
 
372
            result = a_string
 
373
        return result.ljust(final_width)
 
374
 
 
375
    def report_starting(self):
 
376
        self.stream.write('running %d tests...\n' % self.num_tests)
 
377
 
 
378
    def report_test_start(self, test):
 
379
        self.count += 1
 
380
        name = self._shortened_test_description(test)
 
381
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
382
        # numbers, plus a trailing blank
 
383
        self.stream.write(self._ellipsize_to_right(name,
 
384
                            osutils.terminal_width()-30))
 
385
        self.stream.flush()
 
386
 
 
387
    def report_error(self, test, err):
 
388
        self.error_count += 1
 
389
        self.stream.writeln('ERROR %s\n    %s' 
 
390
                % (self._testTimeString(), err[1]))
 
391
 
 
392
    def report_failure(self, test, err):
 
393
        self.failure_count += 1
 
394
        self.stream.writeln(' FAIL %s\n    %s'
 
395
                % (self._testTimeString(), err[1]))
 
396
 
 
397
    def report_success(self, test):
 
398
        self.stream.writeln('   OK %s' % self._testTimeString())
 
399
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
400
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
401
            stats.pprint(file=self.stream)
 
402
        self.stream.flush()
 
403
 
 
404
    def report_skip(self, test, skip_excinfo):
 
405
        print >>self.stream, ' SKIP %s' % self._testTimeString()
 
406
        print >>self.stream, '     %s' % skip_excinfo[1]
 
407
 
 
408
 
 
409
class TextTestRunner(object):
 
410
    stop_on_failure = False
 
411
 
 
412
    def __init__(self,
 
413
                 stream=sys.stderr,
 
414
                 descriptions=0,
 
415
                 verbosity=1,
 
416
                 keep_output=False,
 
417
                 bench_history=None):
 
418
        self.stream = unittest._WritelnDecorator(stream)
 
419
        self.descriptions = descriptions
 
420
        self.verbosity = verbosity
 
421
        self.keep_output = keep_output
 
422
        self._bench_history = bench_history
 
423
 
 
424
    def run(self, test):
 
425
        "Run the given test case or test suite."
 
426
        startTime = time.time()
 
427
        if self.verbosity == 1:
 
428
            result_class = TextTestResult
 
429
        elif self.verbosity >= 2:
 
430
            result_class = VerboseTestResult
 
431
        result = result_class(self.stream,
 
432
                              self.descriptions,
 
433
                              self.verbosity,
 
434
                              bench_history=self._bench_history,
 
435
                              num_tests=test.countTestCases(),
 
436
                              )
 
437
        result.stop_early = self.stop_on_failure
 
438
        result.report_starting()
 
439
        test.run(result)
 
440
        stopTime = time.time()
 
441
        timeTaken = stopTime - startTime
 
442
        result.printErrors()
 
443
        self.stream.writeln(result.separator2)
 
444
        run = result.testsRun
 
445
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
446
                            (run, run != 1 and "s" or "", timeTaken))
 
447
        self.stream.writeln()
 
448
        if not result.wasSuccessful():
 
449
            self.stream.write("FAILED (")
 
450
            failed, errored = map(len, (result.failures, result.errors))
 
451
            if failed:
 
452
                self.stream.write("failures=%d" % failed)
 
453
            if errored:
 
454
                if failed: self.stream.write(", ")
 
455
                self.stream.write("errors=%d" % errored)
 
456
            self.stream.writeln(")")
 
457
        else:
 
458
            self.stream.writeln("OK")
 
459
        result.report_cleaning_up()
 
460
        # This is still a little bogus, 
 
461
        # but only a little. Folk not using our testrunner will
 
462
        # have to delete their temp directories themselves.
 
463
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
 
464
        if result.wasSuccessful() or not self.keep_output:
 
465
            if test_root is not None:
 
466
                # If LANG=C we probably have created some bogus paths
 
467
                # which rmtree(unicode) will fail to delete
 
468
                # so make sure we are using rmtree(str) to delete everything
 
469
                # except on win32, where rmtree(str) will fail
 
470
                # since it doesn't have the property of byte-stream paths
 
471
                # (they are either ascii or mbcs)
 
472
                if sys.platform == 'win32':
 
473
                    # make sure we are using the unicode win32 api
 
474
                    test_root = unicode(test_root)
 
475
                else:
 
476
                    test_root = test_root.encode(
 
477
                        sys.getfilesystemencoding())
 
478
                try:
 
479
                    osutils.rmtree(test_root)
 
480
                except OSError, e:
 
481
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
482
                        print >>sys.stderr, ('Permission denied: '
 
483
                                             'unable to remove testing dir '
 
484
                                             '%s' % os.path.basename(test_root))
 
485
                    else:
 
486
                        raise
 
487
        else:
 
488
            note("Failed tests working directories are in '%s'\n", test_root)
 
489
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
490
        result.finished()
 
491
        return result
 
492
 
 
493
 
 
494
def iter_suite_tests(suite):
 
495
    """Return all tests in a suite, recursing through nested suites"""
 
496
    for item in suite._tests:
 
497
        if isinstance(item, unittest.TestCase):
 
498
            yield item
 
499
        elif isinstance(item, unittest.TestSuite):
 
500
            for r in iter_suite_tests(item):
 
501
                yield r
 
502
        else:
 
503
            raise Exception('unknown object %r inside test suite %r'
 
504
                            % (item, suite))
 
505
 
 
506
 
 
507
class TestSkipped(Exception):
 
508
    """Indicates that a test was intentionally skipped, rather than failing."""
 
509
 
 
510
 
 
511
class CommandFailed(Exception):
 
512
    pass
 
513
 
 
514
 
 
515
class StringIOWrapper(object):
 
516
    """A wrapper around cStringIO which just adds an encoding attribute.
 
517
    
 
518
    Internally we can check sys.stdout to see what the output encoding
 
519
    should be. However, cStringIO has no encoding attribute that we can
 
520
    set. So we wrap it instead.
 
521
    """
 
522
    encoding='ascii'
 
523
    _cstring = None
 
524
 
 
525
    def __init__(self, s=None):
 
526
        if s is not None:
 
527
            self.__dict__['_cstring'] = StringIO(s)
 
528
        else:
 
529
            self.__dict__['_cstring'] = StringIO()
 
530
 
 
531
    def __getattr__(self, name, getattr=getattr):
 
532
        return getattr(self.__dict__['_cstring'], name)
 
533
 
 
534
    def __setattr__(self, name, val):
 
535
        if name == 'encoding':
 
536
            self.__dict__['encoding'] = val
 
537
        else:
 
538
            return setattr(self._cstring, name, val)
 
539
 
 
540
 
 
541
class TestCase(unittest.TestCase):
 
542
    """Base class for bzr unit tests.
 
543
    
 
544
    Tests that need access to disk resources should subclass 
 
545
    TestCaseInTempDir not TestCase.
 
546
 
 
547
    Error and debug log messages are redirected from their usual
 
548
    location into a temporary file, the contents of which can be
 
549
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
550
    so that it can also capture file IO.  When the test completes this file
 
551
    is read into memory and removed from disk.
 
552
       
 
553
    There are also convenience functions to invoke bzr's command-line
 
554
    routine, and to build and check bzr trees.
 
555
   
 
556
    In addition to the usual method of overriding tearDown(), this class also
 
557
    allows subclasses to register functions into the _cleanups list, which is
 
558
    run in order as the object is torn down.  It's less likely this will be
 
559
    accidentally overlooked.
 
560
    """
 
561
 
 
562
    _log_file_name = None
 
563
    _log_contents = ''
 
564
    _keep_log_file = False
 
565
    # record lsprof data when performing benchmark calls.
 
566
    _gather_lsprof_in_benchmarks = False
 
567
 
 
568
    def __init__(self, methodName='testMethod'):
 
569
        super(TestCase, self).__init__(methodName)
 
570
        self._cleanups = []
 
571
 
 
572
    def setUp(self):
 
573
        unittest.TestCase.setUp(self)
 
574
        self._cleanEnvironment()
 
575
        bzrlib.trace.disable_default_logging()
 
576
        self._silenceUI()
 
577
        self._startLogFile()
 
578
        self._benchcalls = []
 
579
        self._benchtime = None
 
580
        # prevent hooks affecting tests
 
581
        self._preserved_hooks = bzrlib.branch.Branch.hooks
 
582
        self.addCleanup(self._restoreHooks)
 
583
        # this list of hooks must be kept in sync with the defaults
 
584
        # in branch.py
 
585
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
586
 
 
587
    def _silenceUI(self):
 
588
        """Turn off UI for duration of test"""
 
589
        # by default the UI is off; tests can turn it on if they want it.
 
590
        saved = bzrlib.ui.ui_factory
 
591
        def _restore():
 
592
            bzrlib.ui.ui_factory = saved
 
593
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
594
        self.addCleanup(_restore)
 
595
 
 
596
    def _ndiff_strings(self, a, b):
 
597
        """Return ndiff between two strings containing lines.
 
598
        
 
599
        A trailing newline is added if missing to make the strings
 
600
        print properly."""
 
601
        if b and b[-1] != '\n':
 
602
            b += '\n'
 
603
        if a and a[-1] != '\n':
 
604
            a += '\n'
 
605
        difflines = difflib.ndiff(a.splitlines(True),
 
606
                                  b.splitlines(True),
 
607
                                  linejunk=lambda x: False,
 
608
                                  charjunk=lambda x: False)
 
609
        return ''.join(difflines)
 
610
 
 
611
    def assertEqualDiff(self, a, b, message=None):
 
612
        """Assert two texts are equal, if not raise an exception.
 
613
        
 
614
        This is intended for use with multi-line strings where it can 
 
615
        be hard to find the differences by eye.
 
616
        """
 
617
        # TODO: perhaps override assertEquals to call this for strings?
 
618
        if a == b:
 
619
            return
 
620
        if message is None:
 
621
            message = "texts not equal:\n"
 
622
        raise AssertionError(message + 
 
623
                             self._ndiff_strings(a, b))      
 
624
        
 
625
    def assertEqualMode(self, mode, mode_test):
 
626
        self.assertEqual(mode, mode_test,
 
627
                         'mode mismatch %o != %o' % (mode, mode_test))
 
628
 
 
629
    def assertStartsWith(self, s, prefix):
 
630
        if not s.startswith(prefix):
 
631
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
632
 
 
633
    def assertEndsWith(self, s, suffix):
 
634
        """Asserts that s ends with suffix."""
 
635
        if not s.endswith(suffix):
 
636
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
637
 
 
638
    def assertContainsRe(self, haystack, needle_re):
 
639
        """Assert that a contains something matching a regular expression."""
 
640
        if not re.search(needle_re, haystack):
 
641
            raise AssertionError('pattern "%s" not found in "%s"'
 
642
                    % (needle_re, haystack))
 
643
 
 
644
    def assertNotContainsRe(self, haystack, needle_re):
 
645
        """Assert that a does not match a regular expression"""
 
646
        if re.search(needle_re, haystack):
 
647
            raise AssertionError('pattern "%s" found in "%s"'
 
648
                    % (needle_re, haystack))
 
649
 
 
650
    def assertSubset(self, sublist, superlist):
 
651
        """Assert that every entry in sublist is present in superlist."""
 
652
        missing = []
 
653
        for entry in sublist:
 
654
            if entry not in superlist:
 
655
                missing.append(entry)
 
656
        if len(missing) > 0:
 
657
            raise AssertionError("value(s) %r not present in container %r" % 
 
658
                                 (missing, superlist))
 
659
 
 
660
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
661
        """Fail unless excClass is raised when the iterator from func is used.
 
662
        
 
663
        Many functions can return generators this makes sure
 
664
        to wrap them in a list() call to make sure the whole generator
 
665
        is run, and that the proper exception is raised.
 
666
        """
 
667
        try:
 
668
            list(func(*args, **kwargs))
 
669
        except excClass:
 
670
            return
 
671
        else:
 
672
            if getattr(excClass,'__name__', None) is not None:
 
673
                excName = excClass.__name__
 
674
            else:
 
675
                excName = str(excClass)
 
676
            raise self.failureException, "%s not raised" % excName
 
677
 
 
678
    def assertIs(self, left, right, message=None):
 
679
        if not (left is right):
 
680
            if message is not None:
 
681
                raise AssertionError(message)
 
682
            else:
 
683
                raise AssertionError("%r is not %r." % (left, right))
 
684
 
 
685
    def assertIsNot(self, left, right, message=None):
 
686
        if (left is right):
 
687
            if message is not None:
 
688
                raise AssertionError(message)
 
689
            else:
 
690
                raise AssertionError("%r is %r." % (left, right))
 
691
 
 
692
    def assertTransportMode(self, transport, path, mode):
 
693
        """Fail if a path does not have mode mode.
 
694
        
 
695
        If modes are not supported on this transport, the assertion is ignored.
 
696
        """
 
697
        if not transport._can_roundtrip_unix_modebits():
 
698
            return
 
699
        path_stat = transport.stat(path)
 
700
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
701
        self.assertEqual(mode, actual_mode,
 
702
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
703
 
 
704
    def assertIsInstance(self, obj, kls):
 
705
        """Fail if obj is not an instance of kls"""
 
706
        if not isinstance(obj, kls):
 
707
            self.fail("%r is an instance of %s rather than %s" % (
 
708
                obj, obj.__class__, kls))
 
709
 
 
710
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
711
        """A helper for callDeprecated and applyDeprecated.
 
712
 
 
713
        :param a_callable: A callable to call.
 
714
        :param args: The positional arguments for the callable
 
715
        :param kwargs: The keyword arguments for the callable
 
716
        :return: A tuple (warnings, result). result is the result of calling
 
717
            a_callable(*args, **kwargs).
 
718
        """
 
719
        local_warnings = []
 
720
        def capture_warnings(msg, cls=None, stacklevel=None):
 
721
            # we've hooked into a deprecation specific callpath,
 
722
            # only deprecations should getting sent via it.
 
723
            self.assertEqual(cls, DeprecationWarning)
 
724
            local_warnings.append(msg)
 
725
        original_warning_method = symbol_versioning.warn
 
726
        symbol_versioning.set_warning_method(capture_warnings)
 
727
        try:
 
728
            result = a_callable(*args, **kwargs)
 
729
        finally:
 
730
            symbol_versioning.set_warning_method(original_warning_method)
 
731
        return (local_warnings, result)
 
732
 
 
733
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
734
        """Call a deprecated callable without warning the user.
 
735
 
 
736
        :param deprecation_format: The deprecation format that the callable
 
737
            should have been deprecated with. This is the same type as the 
 
738
            parameter to deprecated_method/deprecated_function. If the 
 
739
            callable is not deprecated with this format, an assertion error
 
740
            will be raised.
 
741
        :param a_callable: A callable to call. This may be a bound method or
 
742
            a regular function. It will be called with *args and **kwargs.
 
743
        :param args: The positional arguments for the callable
 
744
        :param kwargs: The keyword arguments for the callable
 
745
        :return: The result of a_callable(*args, **kwargs)
 
746
        """
 
747
        call_warnings, result = self._capture_warnings(a_callable,
 
748
            *args, **kwargs)
 
749
        expected_first_warning = symbol_versioning.deprecation_string(
 
750
            a_callable, deprecation_format)
 
751
        if len(call_warnings) == 0:
 
752
            self.fail("No assertion generated by call to %s" %
 
753
                a_callable)
 
754
        self.assertEqual(expected_first_warning, call_warnings[0])
 
755
        return result
 
756
 
 
757
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
758
        """Assert that a callable is deprecated in a particular way.
 
759
 
 
760
        This is a very precise test for unusual requirements. The 
 
761
        applyDeprecated helper function is probably more suited for most tests
 
762
        as it allows you to simply specify the deprecation format being used
 
763
        and will ensure that that is issued for the function being called.
 
764
 
 
765
        :param expected: a list of the deprecation warnings expected, in order
 
766
        :param callable: The callable to call
 
767
        :param args: The positional arguments for the callable
 
768
        :param kwargs: The keyword arguments for the callable
 
769
        """
 
770
        call_warnings, result = self._capture_warnings(callable,
 
771
            *args, **kwargs)
 
772
        self.assertEqual(expected, call_warnings)
 
773
        return result
 
774
 
 
775
    def _startLogFile(self):
 
776
        """Send bzr and test log messages to a temporary file.
 
777
 
 
778
        The file is removed as the test is torn down.
 
779
        """
 
780
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
781
        self._log_file = os.fdopen(fileno, 'w+')
 
782
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
783
        self._log_file_name = name
 
784
        self.addCleanup(self._finishLogFile)
 
785
 
 
786
    def _finishLogFile(self):
 
787
        """Finished with the log file.
 
788
 
 
789
        Close the file and delete it, unless setKeepLogfile was called.
 
790
        """
 
791
        if self._log_file is None:
 
792
            return
 
793
        bzrlib.trace.disable_test_log(self._log_nonce)
 
794
        self._log_file.close()
 
795
        self._log_file = None
 
796
        if not self._keep_log_file:
 
797
            os.remove(self._log_file_name)
 
798
            self._log_file_name = None
 
799
 
 
800
    def setKeepLogfile(self):
 
801
        """Make the logfile not be deleted when _finishLogFile is called."""
 
802
        self._keep_log_file = True
 
803
 
 
804
    def addCleanup(self, callable):
 
805
        """Arrange to run a callable when this case is torn down.
 
806
 
 
807
        Callables are run in the reverse of the order they are registered, 
 
808
        ie last-in first-out.
 
809
        """
 
810
        if callable in self._cleanups:
 
811
            raise ValueError("cleanup function %r already registered on %s" 
 
812
                    % (callable, self))
 
813
        self._cleanups.append(callable)
 
814
 
 
815
    def _cleanEnvironment(self):
 
816
        new_env = {
 
817
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
818
            'HOME': os.getcwd(),
 
819
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
820
            'BZR_EMAIL': None,
 
821
            'BZREMAIL': None, # may still be present in the environment
 
822
            'EMAIL': None,
 
823
            'BZR_PROGRESS_BAR': None,
 
824
            # Proxies
 
825
            'http_proxy': None,
 
826
            'HTTP_PROXY': None,
 
827
            'https_proxy': None,
 
828
            'HTTPS_PROXY': None,
 
829
            'no_proxy': None,
 
830
            'NO_PROXY': None,
 
831
            'all_proxy': None,
 
832
            'ALL_PROXY': None,
 
833
            # Nobody cares about these ones AFAIK. So far at
 
834
            # least. If you do (care), please update this comment
 
835
            # -- vila 20061212
 
836
            'ftp_proxy': None,
 
837
            'FTP_PROXY': None,
 
838
        }
 
839
        self.__old_env = {}
 
840
        self.addCleanup(self._restoreEnvironment)
 
841
        for name, value in new_env.iteritems():
 
842
            self._captureVar(name, value)
 
843
 
 
844
    def _captureVar(self, name, newvalue):
 
845
        """Set an environment variable, and reset it when finished."""
 
846
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
847
 
 
848
    def _restoreEnvironment(self):
 
849
        for name, value in self.__old_env.iteritems():
 
850
            osutils.set_or_unset_env(name, value)
 
851
 
 
852
    def _restoreHooks(self):
 
853
        bzrlib.branch.Branch.hooks = self._preserved_hooks
 
854
 
 
855
    def tearDown(self):
 
856
        self._runCleanups()
 
857
        unittest.TestCase.tearDown(self)
 
858
 
 
859
    def time(self, callable, *args, **kwargs):
 
860
        """Run callable and accrue the time it takes to the benchmark time.
 
861
        
 
862
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
863
        this will cause lsprofile statistics to be gathered and stored in
 
864
        self._benchcalls.
 
865
        """
 
866
        if self._benchtime is None:
 
867
            self._benchtime = 0
 
868
        start = time.time()
 
869
        try:
 
870
            if not self._gather_lsprof_in_benchmarks:
 
871
                return callable(*args, **kwargs)
 
872
            else:
 
873
                # record this benchmark
 
874
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
875
                stats.sort()
 
876
                self._benchcalls.append(((callable, args, kwargs), stats))
 
877
                return ret
 
878
        finally:
 
879
            self._benchtime += time.time() - start
 
880
 
 
881
    def _runCleanups(self):
 
882
        """Run registered cleanup functions. 
 
883
 
 
884
        This should only be called from TestCase.tearDown.
 
885
        """
 
886
        # TODO: Perhaps this should keep running cleanups even if 
 
887
        # one of them fails?
 
888
        for cleanup_fn in reversed(self._cleanups):
 
889
            cleanup_fn()
 
890
 
 
891
    def log(self, *args):
 
892
        mutter(*args)
 
893
 
 
894
    def _get_log(self, keep_log_file=False):
 
895
        """Return as a string the log for this test. If the file is still
 
896
        on disk and keep_log_file=False, delete the log file and store the
 
897
        content in self._log_contents."""
 
898
        # flush the log file, to get all content
 
899
        import bzrlib.trace
 
900
        bzrlib.trace._trace_file.flush()
 
901
        if self._log_contents:
 
902
            return self._log_contents
 
903
        if self._log_file_name is not None:
 
904
            logfile = open(self._log_file_name)
 
905
            try:
 
906
                log_contents = logfile.read()
 
907
            finally:
 
908
                logfile.close()
 
909
            if not keep_log_file:
 
910
                self._log_contents = log_contents
 
911
                try:
 
912
                    os.remove(self._log_file_name)
 
913
                except OSError, e:
 
914
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
915
                        print >>sys.stderr, ('Unable to delete log file '
 
916
                                             ' %r' % self._log_file_name)
 
917
                    else:
 
918
                        raise
 
919
            return log_contents
 
920
        else:
 
921
            return "DELETED log file to reduce memory footprint"
 
922
 
 
923
    def capture(self, cmd, retcode=0):
 
924
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
925
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
926
 
 
927
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
928
                         working_dir=None):
 
929
        """Invoke bzr and return (stdout, stderr).
 
930
 
 
931
        Useful for code that wants to check the contents of the
 
932
        output, the way error messages are presented, etc.
 
933
 
 
934
        This should be the main method for tests that want to exercise the
 
935
        overall behavior of the bzr application (rather than a unit test
 
936
        or a functional test of the library.)
 
937
 
 
938
        Much of the old code runs bzr by forking a new copy of Python, but
 
939
        that is slower, harder to debug, and generally not necessary.
 
940
 
 
941
        This runs bzr through the interface that catches and reports
 
942
        errors, and with logging set to something approximating the
 
943
        default, so that error reporting can be checked.
 
944
 
 
945
        :param argv: arguments to invoke bzr
 
946
        :param retcode: expected return code, or None for don't-care.
 
947
        :param encoding: encoding for sys.stdout and sys.stderr
 
948
        :param stdin: A string to be used as stdin for the command.
 
949
        :param working_dir: Change to this directory before running
 
950
        """
 
951
        if encoding is None:
 
952
            encoding = bzrlib.user_encoding
 
953
        if stdin is not None:
 
954
            stdin = StringIO(stdin)
 
955
        stdout = StringIOWrapper()
 
956
        stderr = StringIOWrapper()
 
957
        stdout.encoding = encoding
 
958
        stderr.encoding = encoding
 
959
 
 
960
        self.log('run bzr: %r', argv)
 
961
        # FIXME: don't call into logging here
 
962
        handler = logging.StreamHandler(stderr)
 
963
        handler.setLevel(logging.INFO)
 
964
        logger = logging.getLogger('')
 
965
        logger.addHandler(handler)
 
966
        old_ui_factory = bzrlib.ui.ui_factory
 
967
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
968
            stdout=stdout,
 
969
            stderr=stderr)
 
970
        bzrlib.ui.ui_factory.stdin = stdin
 
971
 
 
972
        cwd = None
 
973
        if working_dir is not None:
 
974
            cwd = osutils.getcwd()
 
975
            os.chdir(working_dir)
 
976
 
 
977
        try:
 
978
            saved_debug_flags = frozenset(debug.debug_flags)
 
979
            debug.debug_flags.clear()
 
980
            try:
 
981
                result = self.apply_redirected(stdin, stdout, stderr,
 
982
                                               bzrlib.commands.run_bzr_catch_errors,
 
983
                                               argv)
 
984
            finally:
 
985
                debug.debug_flags.update(saved_debug_flags)
 
986
        finally:
 
987
            logger.removeHandler(handler)
 
988
            bzrlib.ui.ui_factory = old_ui_factory
 
989
            if cwd is not None:
 
990
                os.chdir(cwd)
 
991
 
 
992
        out = stdout.getvalue()
 
993
        err = stderr.getvalue()
 
994
        if out:
 
995
            self.log('output:\n%r', out)
 
996
        if err:
 
997
            self.log('errors:\n%r', err)
 
998
        if retcode is not None:
 
999
            self.assertEquals(retcode, result)
 
1000
        return out, err
 
1001
 
 
1002
    def run_bzr(self, *args, **kwargs):
 
1003
        """Invoke bzr, as if it were run from the command line.
 
1004
 
 
1005
        This should be the main method for tests that want to exercise the
 
1006
        overall behavior of the bzr application (rather than a unit test
 
1007
        or a functional test of the library.)
 
1008
 
 
1009
        This sends the stdout/stderr results into the test's log,
 
1010
        where it may be useful for debugging.  See also run_captured.
 
1011
 
 
1012
        :param stdin: A string to be used as stdin for the command.
 
1013
        """
 
1014
        retcode = kwargs.pop('retcode', 0)
 
1015
        encoding = kwargs.pop('encoding', None)
 
1016
        stdin = kwargs.pop('stdin', None)
 
1017
        working_dir = kwargs.pop('working_dir', None)
 
1018
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
1019
                                     stdin=stdin, working_dir=working_dir)
 
1020
 
 
1021
    def run_bzr_decode(self, *args, **kwargs):
 
1022
        if 'encoding' in kwargs:
 
1023
            encoding = kwargs['encoding']
 
1024
        else:
 
1025
            encoding = bzrlib.user_encoding
 
1026
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1027
 
 
1028
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1029
        """Run bzr, and check that stderr contains the supplied regexes
 
1030
        
 
1031
        :param error_regexes: Sequence of regular expressions which 
 
1032
            must each be found in the error output. The relative ordering
 
1033
            is not enforced.
 
1034
        :param args: command-line arguments for bzr
 
1035
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1036
            This function changes the default value of retcode to be 3,
 
1037
            since in most cases this is run when you expect bzr to fail.
 
1038
        :return: (out, err) The actual output of running the command (in case you
 
1039
                 want to do more inspection)
 
1040
 
 
1041
        Examples of use:
 
1042
            # Make sure that commit is failing because there is nothing to do
 
1043
            self.run_bzr_error(['no changes to commit'],
 
1044
                               'commit', '-m', 'my commit comment')
 
1045
            # Make sure --strict is handling an unknown file, rather than
 
1046
            # giving us the 'nothing to do' error
 
1047
            self.build_tree(['unknown'])
 
1048
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1049
                               'commit', '--strict', '-m', 'my commit comment')
 
1050
        """
 
1051
        kwargs.setdefault('retcode', 3)
 
1052
        out, err = self.run_bzr(*args, **kwargs)
 
1053
        for regex in error_regexes:
 
1054
            self.assertContainsRe(err, regex)
 
1055
        return out, err
 
1056
 
 
1057
    def run_bzr_subprocess(self, *args, **kwargs):
 
1058
        """Run bzr in a subprocess for testing.
 
1059
 
 
1060
        This starts a new Python interpreter and runs bzr in there. 
 
1061
        This should only be used for tests that have a justifiable need for
 
1062
        this isolation: e.g. they are testing startup time, or signal
 
1063
        handling, or early startup code, etc.  Subprocess code can't be 
 
1064
        profiled or debugged so easily.
 
1065
 
 
1066
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1067
            None is supplied, the status code is not checked.
 
1068
        :param env_changes: A dictionary which lists changes to environment
 
1069
            variables. A value of None will unset the env variable.
 
1070
            The values must be strings. The change will only occur in the
 
1071
            child, so you don't need to fix the environment after running.
 
1072
        :param universal_newlines: Convert CRLF => LF
 
1073
        :param allow_plugins: By default the subprocess is run with
 
1074
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1075
            for system-wide plugins to create unexpected output on stderr,
 
1076
            which can cause unnecessary test failures.
 
1077
        """
 
1078
        env_changes = kwargs.get('env_changes', {})
 
1079
        working_dir = kwargs.get('working_dir', None)
 
1080
        allow_plugins = kwargs.get('allow_plugins', False)
 
1081
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1082
                                            working_dir=working_dir,
 
1083
                                            allow_plugins=allow_plugins)
 
1084
        # We distinguish between retcode=None and retcode not passed.
 
1085
        supplied_retcode = kwargs.get('retcode', 0)
 
1086
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1087
            universal_newlines=kwargs.get('universal_newlines', False),
 
1088
            process_args=args)
 
1089
 
 
1090
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1091
                             skip_if_plan_to_signal=False,
 
1092
                             working_dir=None,
 
1093
                             allow_plugins=False):
 
1094
        """Start bzr in a subprocess for testing.
 
1095
 
 
1096
        This starts a new Python interpreter and runs bzr in there.
 
1097
        This should only be used for tests that have a justifiable need for
 
1098
        this isolation: e.g. they are testing startup time, or signal
 
1099
        handling, or early startup code, etc.  Subprocess code can't be
 
1100
        profiled or debugged so easily.
 
1101
 
 
1102
        :param process_args: a list of arguments to pass to the bzr executable,
 
1103
            for example `['--version']`.
 
1104
        :param env_changes: A dictionary which lists changes to environment
 
1105
            variables. A value of None will unset the env variable.
 
1106
            The values must be strings. The change will only occur in the
 
1107
            child, so you don't need to fix the environment after running.
 
1108
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1109
            is not available.
 
1110
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1111
 
 
1112
        :returns: Popen object for the started process.
 
1113
        """
 
1114
        if skip_if_plan_to_signal:
 
1115
            if not getattr(os, 'kill', None):
 
1116
                raise TestSkipped("os.kill not available.")
 
1117
 
 
1118
        if env_changes is None:
 
1119
            env_changes = {}
 
1120
        old_env = {}
 
1121
 
 
1122
        def cleanup_environment():
 
1123
            for env_var, value in env_changes.iteritems():
 
1124
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1125
 
 
1126
        def restore_environment():
 
1127
            for env_var, value in old_env.iteritems():
 
1128
                osutils.set_or_unset_env(env_var, value)
 
1129
 
 
1130
        bzr_path = self.get_bzr_path()
 
1131
 
 
1132
        cwd = None
 
1133
        if working_dir is not None:
 
1134
            cwd = osutils.getcwd()
 
1135
            os.chdir(working_dir)
 
1136
 
 
1137
        try:
 
1138
            # win32 subprocess doesn't support preexec_fn
 
1139
            # so we will avoid using it on all platforms, just to
 
1140
            # make sure the code path is used, and we don't break on win32
 
1141
            cleanup_environment()
 
1142
            command = [sys.executable, bzr_path]
 
1143
            if not allow_plugins:
 
1144
                command.append('--no-plugins')
 
1145
            command.extend(process_args)
 
1146
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1147
        finally:
 
1148
            restore_environment()
 
1149
            if cwd is not None:
 
1150
                os.chdir(cwd)
 
1151
 
 
1152
        return process
 
1153
 
 
1154
    def _popen(self, *args, **kwargs):
 
1155
        """Place a call to Popen.
 
1156
 
 
1157
        Allows tests to override this method to intercept the calls made to
 
1158
        Popen for introspection.
 
1159
        """
 
1160
        return Popen(*args, **kwargs)
 
1161
 
 
1162
    def get_bzr_path(self):
 
1163
        """Return the path of the 'bzr' executable for this test suite."""
 
1164
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1165
        if not os.path.isfile(bzr_path):
 
1166
            # We are probably installed. Assume sys.argv is the right file
 
1167
            bzr_path = sys.argv[0]
 
1168
        return bzr_path
 
1169
 
 
1170
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1171
                              universal_newlines=False, process_args=None):
 
1172
        """Finish the execution of process.
 
1173
 
 
1174
        :param process: the Popen object returned from start_bzr_subprocess.
 
1175
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1176
            None is supplied, the status code is not checked.
 
1177
        :param send_signal: an optional signal to send to the process.
 
1178
        :param universal_newlines: Convert CRLF => LF
 
1179
        :returns: (stdout, stderr)
 
1180
        """
 
1181
        if send_signal is not None:
 
1182
            os.kill(process.pid, send_signal)
 
1183
        out, err = process.communicate()
 
1184
 
 
1185
        if universal_newlines:
 
1186
            out = out.replace('\r\n', '\n')
 
1187
            err = err.replace('\r\n', '\n')
 
1188
 
 
1189
        if retcode is not None and retcode != process.returncode:
 
1190
            if process_args is None:
 
1191
                process_args = "(unknown args)"
 
1192
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1193
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1194
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1195
                      % (process_args, retcode, process.returncode))
 
1196
        return [out, err]
 
1197
 
 
1198
    def check_inventory_shape(self, inv, shape):
 
1199
        """Compare an inventory to a list of expected names.
 
1200
 
 
1201
        Fail if they are not precisely equal.
 
1202
        """
 
1203
        extras = []
 
1204
        shape = list(shape)             # copy
 
1205
        for path, ie in inv.entries():
 
1206
            name = path.replace('\\', '/')
 
1207
            if ie.kind == 'dir':
 
1208
                name = name + '/'
 
1209
            if name in shape:
 
1210
                shape.remove(name)
 
1211
            else:
 
1212
                extras.append(name)
 
1213
        if shape:
 
1214
            self.fail("expected paths not found in inventory: %r" % shape)
 
1215
        if extras:
 
1216
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1217
 
 
1218
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1219
                         a_callable=None, *args, **kwargs):
 
1220
        """Call callable with redirected std io pipes.
 
1221
 
 
1222
        Returns the return code."""
 
1223
        if not callable(a_callable):
 
1224
            raise ValueError("a_callable must be callable.")
 
1225
        if stdin is None:
 
1226
            stdin = StringIO("")
 
1227
        if stdout is None:
 
1228
            if getattr(self, "_log_file", None) is not None:
 
1229
                stdout = self._log_file
 
1230
            else:
 
1231
                stdout = StringIO()
 
1232
        if stderr is None:
 
1233
            if getattr(self, "_log_file", None is not None):
 
1234
                stderr = self._log_file
 
1235
            else:
 
1236
                stderr = StringIO()
 
1237
        real_stdin = sys.stdin
 
1238
        real_stdout = sys.stdout
 
1239
        real_stderr = sys.stderr
 
1240
        try:
 
1241
            sys.stdout = stdout
 
1242
            sys.stderr = stderr
 
1243
            sys.stdin = stdin
 
1244
            return a_callable(*args, **kwargs)
 
1245
        finally:
 
1246
            sys.stdout = real_stdout
 
1247
            sys.stderr = real_stderr
 
1248
            sys.stdin = real_stdin
 
1249
 
 
1250
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1251
    def merge(self, branch_from, wt_to):
 
1252
        """A helper for tests to do a ui-less merge.
 
1253
 
 
1254
        This should move to the main library when someone has time to integrate
 
1255
        it in.
 
1256
        """
 
1257
        # minimal ui-less merge.
 
1258
        wt_to.branch.fetch(branch_from)
 
1259
        base_rev = common_ancestor(branch_from.last_revision(),
 
1260
                                   wt_to.branch.last_revision(),
 
1261
                                   wt_to.branch.repository)
 
1262
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1263
                    wt_to.branch.repository.revision_tree(base_rev),
 
1264
                    this_tree=wt_to)
 
1265
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1266
 
 
1267
 
 
1268
BzrTestBase = TestCase
 
1269
 
 
1270
 
 
1271
class TestCaseWithMemoryTransport(TestCase):
 
1272
    """Common test class for tests that do not need disk resources.
 
1273
 
 
1274
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1275
 
 
1276
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1277
 
 
1278
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1279
    a directory which does not exist. This serves to help ensure test isolation
 
1280
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1281
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1282
    file defaults for the transport in tests, nor does it obey the command line
 
1283
    override, so tests that accidentally write to the common directory should
 
1284
    be rare.
 
1285
    """
 
1286
 
 
1287
    TEST_ROOT = None
 
1288
    _TEST_NAME = 'test'
 
1289
 
 
1290
 
 
1291
    def __init__(self, methodName='runTest'):
 
1292
        # allow test parameterisation after test construction and before test
 
1293
        # execution. Variables that the parameteriser sets need to be 
 
1294
        # ones that are not set by setUp, or setUp will trash them.
 
1295
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1296
        self.transport_server = default_transport
 
1297
        self.transport_readonly_server = None
 
1298
 
 
1299
    def get_transport(self):
 
1300
        """Return a writeable transport for the test scratch space"""
 
1301
        t = get_transport(self.get_url())
 
1302
        self.assertFalse(t.is_readonly())
 
1303
        return t
 
1304
 
 
1305
    def get_readonly_transport(self):
 
1306
        """Return a readonly transport for the test scratch space
 
1307
        
 
1308
        This can be used to test that operations which should only need
 
1309
        readonly access in fact do not try to write.
 
1310
        """
 
1311
        t = get_transport(self.get_readonly_url())
 
1312
        self.assertTrue(t.is_readonly())
 
1313
        return t
 
1314
 
 
1315
    def create_transport_readonly_server(self):
 
1316
        """Create a transport server from class defined at init.
 
1317
 
 
1318
        This is mostly a hook for daughter classes.
 
1319
        """
 
1320
        return self.transport_readonly_server()
 
1321
 
 
1322
    def get_readonly_server(self):
 
1323
        """Get the server instance for the readonly transport
 
1324
 
 
1325
        This is useful for some tests with specific servers to do diagnostics.
 
1326
        """
 
1327
        if self.__readonly_server is None:
 
1328
            if self.transport_readonly_server is None:
 
1329
                # readonly decorator requested
 
1330
                # bring up the server
 
1331
                self.get_url()
 
1332
                self.__readonly_server = ReadonlyServer()
 
1333
                self.__readonly_server.setUp(self.__server)
 
1334
            else:
 
1335
                self.__readonly_server = self.create_transport_readonly_server()
 
1336
                self.__readonly_server.setUp()
 
1337
            self.addCleanup(self.__readonly_server.tearDown)
 
1338
        return self.__readonly_server
 
1339
 
 
1340
    def get_readonly_url(self, relpath=None):
 
1341
        """Get a URL for the readonly transport.
 
1342
 
 
1343
        This will either be backed by '.' or a decorator to the transport 
 
1344
        used by self.get_url()
 
1345
        relpath provides for clients to get a path relative to the base url.
 
1346
        These should only be downwards relative, not upwards.
 
1347
        """
 
1348
        base = self.get_readonly_server().get_url()
 
1349
        if relpath is not None:
 
1350
            if not base.endswith('/'):
 
1351
                base = base + '/'
 
1352
            base = base + relpath
 
1353
        return base
 
1354
 
 
1355
    def get_server(self):
 
1356
        """Get the read/write server instance.
 
1357
 
 
1358
        This is useful for some tests with specific servers that need
 
1359
        diagnostics.
 
1360
 
 
1361
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1362
        is no means to override it.
 
1363
        """
 
1364
        if self.__server is None:
 
1365
            self.__server = MemoryServer()
 
1366
            self.__server.setUp()
 
1367
            self.addCleanup(self.__server.tearDown)
 
1368
        return self.__server
 
1369
 
 
1370
    def get_url(self, relpath=None):
 
1371
        """Get a URL (or maybe a path) for the readwrite transport.
 
1372
 
 
1373
        This will either be backed by '.' or to an equivalent non-file based
 
1374
        facility.
 
1375
        relpath provides for clients to get a path relative to the base url.
 
1376
        These should only be downwards relative, not upwards.
 
1377
        """
 
1378
        base = self.get_server().get_url()
 
1379
        if relpath is not None and relpath != '.':
 
1380
            if not base.endswith('/'):
 
1381
                base = base + '/'
 
1382
            # XXX: Really base should be a url; we did after all call
 
1383
            # get_url()!  But sometimes it's just a path (from
 
1384
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1385
            # to a non-escaped local path.
 
1386
            if base.startswith('./') or base.startswith('/'):
 
1387
                base += relpath
 
1388
            else:
 
1389
                base += urlutils.escape(relpath)
 
1390
        return base
 
1391
 
 
1392
    def _make_test_root(self):
 
1393
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1394
            return
 
1395
        i = 0
 
1396
        while True:
 
1397
            root = u'test%04d.tmp' % i
 
1398
            try:
 
1399
                os.mkdir(root)
 
1400
            except OSError, e:
 
1401
                if e.errno == errno.EEXIST:
 
1402
                    i += 1
 
1403
                    continue
 
1404
                else:
 
1405
                    raise
 
1406
            # successfully created
 
1407
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1408
            break
 
1409
        # make a fake bzr directory there to prevent any tests propagating
 
1410
        # up onto the source directory's real branch
 
1411
        bzrdir.BzrDir.create_standalone_workingtree(
 
1412
            TestCaseWithMemoryTransport.TEST_ROOT)
 
1413
 
 
1414
    def makeAndChdirToTestDir(self):
 
1415
        """Create a temporary directories for this one test.
 
1416
        
 
1417
        This must set self.test_home_dir and self.test_dir and chdir to
 
1418
        self.test_dir.
 
1419
        
 
1420
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1421
        """
 
1422
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1423
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1424
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1425
        
 
1426
    def make_branch(self, relpath, format=None):
 
1427
        """Create a branch on the transport at relpath."""
 
1428
        repo = self.make_repository(relpath, format=format)
 
1429
        return repo.bzrdir.create_branch()
 
1430
 
 
1431
    def make_bzrdir(self, relpath, format=None):
 
1432
        try:
 
1433
            # might be a relative or absolute path
 
1434
            maybe_a_url = self.get_url(relpath)
 
1435
            segments = maybe_a_url.rsplit('/', 1)
 
1436
            t = get_transport(maybe_a_url)
 
1437
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1438
                try:
 
1439
                    t.mkdir('.')
 
1440
                except errors.FileExists:
 
1441
                    pass
 
1442
            if format is None:
 
1443
                format = 'default'
 
1444
            if isinstance(format, basestring):
 
1445
                format = bzrdir.format_registry.make_bzrdir(format)
 
1446
            return format.initialize_on_transport(t)
 
1447
        except errors.UninitializableFormat:
 
1448
            raise TestSkipped("Format %s is not initializable." % format)
 
1449
 
 
1450
    def make_repository(self, relpath, shared=False, format=None):
 
1451
        """Create a repository on our default transport at relpath."""
 
1452
        made_control = self.make_bzrdir(relpath, format=format)
 
1453
        return made_control.create_repository(shared=shared)
 
1454
 
 
1455
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1456
        """Create a branch on the default transport and a MemoryTree for it."""
 
1457
        b = self.make_branch(relpath, format=format)
 
1458
        return memorytree.MemoryTree.create_on_branch(b)
 
1459
 
 
1460
    def overrideEnvironmentForTesting(self):
 
1461
        os.environ['HOME'] = self.test_home_dir
 
1462
        os.environ['BZR_HOME'] = self.test_home_dir
 
1463
        
 
1464
    def setUp(self):
 
1465
        super(TestCaseWithMemoryTransport, self).setUp()
 
1466
        self._make_test_root()
 
1467
        _currentdir = os.getcwdu()
 
1468
        def _leaveDirectory():
 
1469
            os.chdir(_currentdir)
 
1470
        self.addCleanup(_leaveDirectory)
 
1471
        self.makeAndChdirToTestDir()
 
1472
        self.overrideEnvironmentForTesting()
 
1473
        self.__readonly_server = None
 
1474
        self.__server = None
 
1475
 
 
1476
     
 
1477
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1478
    """Derived class that runs a test within a temporary directory.
 
1479
 
 
1480
    This is useful for tests that need to create a branch, etc.
 
1481
 
 
1482
    The directory is created in a slightly complex way: for each
 
1483
    Python invocation, a new temporary top-level directory is created.
 
1484
    All test cases create their own directory within that.  If the
 
1485
    tests complete successfully, the directory is removed.
 
1486
 
 
1487
    InTempDir is an old alias for FunctionalTestCase.
 
1488
    """
 
1489
 
 
1490
    OVERRIDE_PYTHON = 'python'
 
1491
 
 
1492
    def check_file_contents(self, filename, expect):
 
1493
        self.log("check contents of file %s" % filename)
 
1494
        contents = file(filename, 'r').read()
 
1495
        if contents != expect:
 
1496
            self.log("expected: %r" % expect)
 
1497
            self.log("actually: %r" % contents)
 
1498
            self.fail("contents of %s not as expected" % filename)
 
1499
 
 
1500
    def makeAndChdirToTestDir(self):
 
1501
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1502
        
 
1503
        For TestCaseInTempDir we create a temporary directory based on the test
 
1504
        name and then create two subdirs - test and home under it.
 
1505
        """
 
1506
        # shorten the name, to avoid test failures due to path length
 
1507
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1508
                   .replace('__main__.', '')[-100:]
 
1509
        # it's possible the same test class is run several times for
 
1510
        # parameterized tests, so make sure the names don't collide.  
 
1511
        i = 0
 
1512
        while True:
 
1513
            if i > 0:
 
1514
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1515
            else:
 
1516
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1517
            if os.path.exists(candidate_dir):
 
1518
                i = i + 1
 
1519
                continue
 
1520
            else:
 
1521
                os.mkdir(candidate_dir)
 
1522
                self.test_home_dir = candidate_dir + '/home'
 
1523
                os.mkdir(self.test_home_dir)
 
1524
                self.test_dir = candidate_dir + '/work'
 
1525
                os.mkdir(self.test_dir)
 
1526
                os.chdir(self.test_dir)
 
1527
                break
 
1528
 
 
1529
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1530
        """Build a test tree according to a pattern.
 
1531
 
 
1532
        shape is a sequence of file specifications.  If the final
 
1533
        character is '/', a directory is created.
 
1534
 
 
1535
        This assumes that all the elements in the tree being built are new.
 
1536
 
 
1537
        This doesn't add anything to a branch.
 
1538
        :param line_endings: Either 'binary' or 'native'
 
1539
                             in binary mode, exact contents are written
 
1540
                             in native mode, the line endings match the
 
1541
                             default platform endings.
 
1542
 
 
1543
        :param transport: A transport to write to, for building trees on 
 
1544
                          VFS's. If the transport is readonly or None,
 
1545
                          "." is opened automatically.
 
1546
        """
 
1547
        # It's OK to just create them using forward slashes on windows.
 
1548
        if transport is None or transport.is_readonly():
 
1549
            transport = get_transport(".")
 
1550
        for name in shape:
 
1551
            self.assert_(isinstance(name, basestring))
 
1552
            if name[-1] == '/':
 
1553
                transport.mkdir(urlutils.escape(name[:-1]))
 
1554
            else:
 
1555
                if line_endings == 'binary':
 
1556
                    end = '\n'
 
1557
                elif line_endings == 'native':
 
1558
                    end = os.linesep
 
1559
                else:
 
1560
                    raise errors.BzrError(
 
1561
                        'Invalid line ending request %r' % line_endings)
 
1562
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1563
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1564
 
 
1565
    def build_tree_contents(self, shape):
 
1566
        build_tree_contents(shape)
 
1567
 
 
1568
    def assertFileEqual(self, content, path):
 
1569
        """Fail if path does not contain 'content'."""
 
1570
        self.failUnlessExists(path)
 
1571
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
1572
        self.assertEqualDiff(content, open(path, 'r').read())
 
1573
 
 
1574
    def failUnlessExists(self, path):
 
1575
        """Fail unless path, which may be abs or relative, exists."""
 
1576
        self.failUnless(osutils.lexists(path),path+" does not exist")
 
1577
 
 
1578
    def failIfExists(self, path):
 
1579
        """Fail if path, which may be abs or relative, exists."""
 
1580
        self.failIf(osutils.lexists(path),path+" exists")
 
1581
 
 
1582
 
 
1583
class TestCaseWithTransport(TestCaseInTempDir):
 
1584
    """A test case that provides get_url and get_readonly_url facilities.
 
1585
 
 
1586
    These back onto two transport servers, one for readonly access and one for
 
1587
    read write access.
 
1588
 
 
1589
    If no explicit class is provided for readonly access, a
 
1590
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1591
    based read write transports.
 
1592
 
 
1593
    If an explicit class is provided for readonly access, that server and the 
 
1594
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1595
    """
 
1596
 
 
1597
    def create_transport_server(self):
 
1598
        """Create a transport server from class defined at init.
 
1599
 
 
1600
        This is mostly a hook for daughter classes.
 
1601
        """
 
1602
        return self.transport_server()
 
1603
 
 
1604
    def get_server(self):
 
1605
        """See TestCaseWithMemoryTransport.
 
1606
 
 
1607
        This is useful for some tests with specific servers that need
 
1608
        diagnostics.
 
1609
        """
 
1610
        if self.__server is None:
 
1611
            self.__server = self.create_transport_server()
 
1612
            self.__server.setUp()
 
1613
            self.addCleanup(self.__server.tearDown)
 
1614
        return self.__server
 
1615
 
 
1616
    def make_branch_and_tree(self, relpath, format=None):
 
1617
        """Create a branch on the transport and a tree locally.
 
1618
 
 
1619
        If the transport is not a LocalTransport, the Tree can't be created on
 
1620
        the transport.  In that case the working tree is created in the local
 
1621
        directory, and the returned tree's branch and repository will also be
 
1622
        accessed locally.
 
1623
 
 
1624
        This will fail if the original default transport for this test
 
1625
        case wasn't backed by the working directory, as the branch won't
 
1626
        be on disk for us to open it.  
 
1627
 
 
1628
        :param format: The BzrDirFormat.
 
1629
        :returns: the WorkingTree.
 
1630
        """
 
1631
        # TODO: always use the local disk path for the working tree,
 
1632
        # this obviously requires a format that supports branch references
 
1633
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1634
        # RBC 20060208
 
1635
        b = self.make_branch(relpath, format=format)
 
1636
        try:
 
1637
            return b.bzrdir.create_workingtree()
 
1638
        except errors.NotLocalUrl:
 
1639
            # We can only make working trees locally at the moment.  If the
 
1640
            # transport can't support them, then reopen the branch on a local
 
1641
            # transport, and create the working tree there.  
 
1642
            #
 
1643
            # Possibly we should instead keep
 
1644
            # the non-disk-backed branch and create a local checkout?
 
1645
            bd = bzrdir.BzrDir.open(relpath)
 
1646
            return bd.create_workingtree()
 
1647
 
 
1648
    def assertIsDirectory(self, relpath, transport):
 
1649
        """Assert that relpath within transport is a directory.
 
1650
 
 
1651
        This may not be possible on all transports; in that case it propagates
 
1652
        a TransportNotPossible.
 
1653
        """
 
1654
        try:
 
1655
            mode = transport.stat(relpath).st_mode
 
1656
        except errors.NoSuchFile:
 
1657
            self.fail("path %s is not a directory; no such file"
 
1658
                      % (relpath))
 
1659
        if not stat.S_ISDIR(mode):
 
1660
            self.fail("path %s is not a directory; has mode %#o"
 
1661
                      % (relpath, mode))
 
1662
 
 
1663
    def setUp(self):
 
1664
        super(TestCaseWithTransport, self).setUp()
 
1665
        self.__server = None
 
1666
 
 
1667
 
 
1668
class ChrootedTestCase(TestCaseWithTransport):
 
1669
    """A support class that provides readonly urls outside the local namespace.
 
1670
 
 
1671
    This is done by checking if self.transport_server is a MemoryServer. if it
 
1672
    is then we are chrooted already, if it is not then an HttpServer is used
 
1673
    for readonly urls.
 
1674
 
 
1675
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
1676
                       be used without needed to redo it when a different 
 
1677
                       subclass is in use ?
 
1678
    """
 
1679
 
 
1680
    def setUp(self):
 
1681
        super(ChrootedTestCase, self).setUp()
 
1682
        if not self.transport_server == MemoryServer:
 
1683
            self.transport_readonly_server = HttpServer
 
1684
 
 
1685
 
 
1686
def filter_suite_by_re(suite, pattern):
 
1687
    result = TestUtil.TestSuite()
 
1688
    filter_re = re.compile(pattern)
 
1689
    for test in iter_suite_tests(suite):
 
1690
        if filter_re.search(test.id()):
 
1691
            result.addTest(test)
 
1692
    return result
 
1693
 
 
1694
 
 
1695
def sort_suite_by_re(suite, pattern):
 
1696
    first = []
 
1697
    second = []
 
1698
    filter_re = re.compile(pattern)
 
1699
    for test in iter_suite_tests(suite):
 
1700
        if filter_re.search(test.id()):
 
1701
            first.append(test)
 
1702
        else:
 
1703
            second.append(test)
 
1704
    return TestUtil.TestSuite(first + second)
 
1705
 
 
1706
 
 
1707
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
1708
              stop_on_failure=False, keep_output=False,
 
1709
              transport=None, lsprof_timed=None, bench_history=None,
 
1710
              matching_tests_first=None):
 
1711
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
1712
    if verbose:
 
1713
        verbosity = 2
 
1714
    else:
 
1715
        verbosity = 1
 
1716
    runner = TextTestRunner(stream=sys.stdout,
 
1717
                            descriptions=0,
 
1718
                            verbosity=verbosity,
 
1719
                            keep_output=keep_output,
 
1720
                            bench_history=bench_history)
 
1721
    runner.stop_on_failure=stop_on_failure
 
1722
    if pattern != '.*':
 
1723
        if matching_tests_first:
 
1724
            suite = sort_suite_by_re(suite, pattern)
 
1725
        else:
 
1726
            suite = filter_suite_by_re(suite, pattern)
 
1727
    result = runner.run(suite)
 
1728
    return result.wasSuccessful()
 
1729
 
 
1730
 
 
1731
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
1732
             keep_output=False,
 
1733
             transport=None,
 
1734
             test_suite_factory=None,
 
1735
             lsprof_timed=None,
 
1736
             bench_history=None,
 
1737
             matching_tests_first=None):
 
1738
    """Run the whole test suite under the enhanced runner"""
 
1739
    # XXX: Very ugly way to do this...
 
1740
    # Disable warning about old formats because we don't want it to disturb
 
1741
    # any blackbox tests.
 
1742
    from bzrlib import repository
 
1743
    repository._deprecation_warning_done = True
 
1744
 
 
1745
    global default_transport
 
1746
    if transport is None:
 
1747
        transport = default_transport
 
1748
    old_transport = default_transport
 
1749
    default_transport = transport
 
1750
    try:
 
1751
        if test_suite_factory is None:
 
1752
            suite = test_suite()
 
1753
        else:
 
1754
            suite = test_suite_factory()
 
1755
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
1756
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
1757
                     transport=transport,
 
1758
                     lsprof_timed=lsprof_timed,
 
1759
                     bench_history=bench_history,
 
1760
                     matching_tests_first=matching_tests_first)
 
1761
    finally:
 
1762
        default_transport = old_transport
 
1763
 
 
1764
 
 
1765
def test_suite():
 
1766
    """Build and return TestSuite for the whole of bzrlib.
 
1767
    
 
1768
    This function can be replaced if you need to change the default test
 
1769
    suite on a global basis, but it is not encouraged.
 
1770
    """
 
1771
    testmod_names = [
 
1772
                   'bzrlib.tests.test_ancestry',
 
1773
                   'bzrlib.tests.test_annotate',
 
1774
                   'bzrlib.tests.test_api',
 
1775
                   'bzrlib.tests.test_atomicfile',
 
1776
                   'bzrlib.tests.test_bad_files',
 
1777
                   'bzrlib.tests.test_branch',
 
1778
                   'bzrlib.tests.test_bundle',
 
1779
                   'bzrlib.tests.test_bzrdir',
 
1780
                   'bzrlib.tests.test_cache_utf8',
 
1781
                   'bzrlib.tests.test_commands',
 
1782
                   'bzrlib.tests.test_commit',
 
1783
                   'bzrlib.tests.test_commit_merge',
 
1784
                   'bzrlib.tests.test_config',
 
1785
                   'bzrlib.tests.test_conflicts',
 
1786
                   'bzrlib.tests.test_decorators',
 
1787
                   'bzrlib.tests.test_delta',
 
1788
                   'bzrlib.tests.test_diff',
 
1789
                   'bzrlib.tests.test_doc_generate',
 
1790
                   'bzrlib.tests.test_errors',
 
1791
                   'bzrlib.tests.test_escaped_store',
 
1792
                   'bzrlib.tests.test_fetch',
 
1793
                   'bzrlib.tests.test_ftp_transport',
 
1794
                   'bzrlib.tests.test_generate_docs',
 
1795
                   'bzrlib.tests.test_generate_ids',
 
1796
                   'bzrlib.tests.test_globbing',
 
1797
                   'bzrlib.tests.test_gpg',
 
1798
                   'bzrlib.tests.test_graph',
 
1799
                   'bzrlib.tests.test_hashcache',
 
1800
                   'bzrlib.tests.test_http',
 
1801
                   'bzrlib.tests.test_http_response',
 
1802
                   'bzrlib.tests.test_https_ca_bundle',
 
1803
                   'bzrlib.tests.test_identitymap',
 
1804
                   'bzrlib.tests.test_ignores',
 
1805
                   'bzrlib.tests.test_inv',
 
1806
                   'bzrlib.tests.test_knit',
 
1807
                   'bzrlib.tests.test_lazy_import',
 
1808
                   'bzrlib.tests.test_lazy_regex',
 
1809
                   'bzrlib.tests.test_lockdir',
 
1810
                   'bzrlib.tests.test_lockable_files',
 
1811
                   'bzrlib.tests.test_log',
 
1812
                   'bzrlib.tests.test_memorytree',
 
1813
                   'bzrlib.tests.test_merge',
 
1814
                   'bzrlib.tests.test_merge3',
 
1815
                   'bzrlib.tests.test_merge_core',
 
1816
                   'bzrlib.tests.test_missing',
 
1817
                   'bzrlib.tests.test_msgeditor',
 
1818
                   'bzrlib.tests.test_nonascii',
 
1819
                   'bzrlib.tests.test_options',
 
1820
                   'bzrlib.tests.test_osutils',
 
1821
                   'bzrlib.tests.test_osutils_encodings',
 
1822
                   'bzrlib.tests.test_patch',
 
1823
                   'bzrlib.tests.test_patches',
 
1824
                   'bzrlib.tests.test_permissions',
 
1825
                   'bzrlib.tests.test_plugins',
 
1826
                   'bzrlib.tests.test_progress',
 
1827
                   'bzrlib.tests.test_reconcile',
 
1828
                   'bzrlib.tests.test_registry',
 
1829
                   'bzrlib.tests.test_repository',
 
1830
                   'bzrlib.tests.test_revert',
 
1831
                   'bzrlib.tests.test_revision',
 
1832
                   'bzrlib.tests.test_revisionnamespaces',
 
1833
                   'bzrlib.tests.test_revisiontree',
 
1834
                   'bzrlib.tests.test_rio',
 
1835
                   'bzrlib.tests.test_sampler',
 
1836
                   'bzrlib.tests.test_selftest',
 
1837
                   'bzrlib.tests.test_setup',
 
1838
                   'bzrlib.tests.test_sftp_transport',
 
1839
                   'bzrlib.tests.test_smart_add',
 
1840
                   'bzrlib.tests.test_smart_transport',
 
1841
                   'bzrlib.tests.test_source',
 
1842
                   'bzrlib.tests.test_status',
 
1843
                   'bzrlib.tests.test_store',
 
1844
                   'bzrlib.tests.test_symbol_versioning',
 
1845
                   'bzrlib.tests.test_tag',
 
1846
                   'bzrlib.tests.test_testament',
 
1847
                   'bzrlib.tests.test_textfile',
 
1848
                   'bzrlib.tests.test_textmerge',
 
1849
                   'bzrlib.tests.test_trace',
 
1850
                   'bzrlib.tests.test_transactions',
 
1851
                   'bzrlib.tests.test_transform',
 
1852
                   'bzrlib.tests.test_transport',
 
1853
                   'bzrlib.tests.test_tree',
 
1854
                   'bzrlib.tests.test_treebuilder',
 
1855
                   'bzrlib.tests.test_tsort',
 
1856
                   'bzrlib.tests.test_tuned_gzip',
 
1857
                   'bzrlib.tests.test_ui',
 
1858
                   'bzrlib.tests.test_upgrade',
 
1859
                   'bzrlib.tests.test_urlutils',
 
1860
                   'bzrlib.tests.test_versionedfile',
 
1861
                   'bzrlib.tests.test_version',
 
1862
                   'bzrlib.tests.test_version_info',
 
1863
                   'bzrlib.tests.test_weave',
 
1864
                   'bzrlib.tests.test_whitebox',
 
1865
                   'bzrlib.tests.test_workingtree',
 
1866
                   'bzrlib.tests.test_wsgi',
 
1867
                   'bzrlib.tests.test_xml',
 
1868
                   ]
 
1869
    test_transport_implementations = [
 
1870
        'bzrlib.tests.test_transport_implementations',
 
1871
        'bzrlib.tests.test_read_bundle',
 
1872
        ]
 
1873
    suite = TestUtil.TestSuite()
 
1874
    loader = TestUtil.TestLoader()
 
1875
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
1876
    from bzrlib.transport import TransportTestProviderAdapter
 
1877
    adapter = TransportTestProviderAdapter()
 
1878
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1879
    for package in packages_to_test():
 
1880
        suite.addTest(package.test_suite())
 
1881
    for m in MODULES_TO_TEST:
 
1882
        suite.addTest(loader.loadTestsFromModule(m))
 
1883
    for m in MODULES_TO_DOCTEST:
 
1884
        try:
 
1885
            suite.addTest(doctest.DocTestSuite(m))
 
1886
        except ValueError, e:
 
1887
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
1888
            raise
 
1889
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1890
        if getattr(plugin, 'test_suite', None) is not None:
 
1891
            default_encoding = sys.getdefaultencoding()
 
1892
            try:
 
1893
                plugin_suite = plugin.test_suite()
 
1894
            except ImportError, e:
 
1895
                bzrlib.trace.warning(
 
1896
                    'Unable to test plugin "%s": %s', name, e)
 
1897
            else:
 
1898
                suite.addTest(plugin_suite)
 
1899
            if default_encoding != sys.getdefaultencoding():
 
1900
                bzrlib.trace.warning(
 
1901
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
1902
                    sys.getdefaultencoding())
 
1903
                reload(sys)
 
1904
                sys.setdefaultencoding(default_encoding)
 
1905
    return suite
 
1906
 
 
1907
 
 
1908
def adapt_modules(mods_list, adapter, loader, suite):
 
1909
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1910
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
1911
        suite.addTests(adapter.adapt(test))
 
1912
 
 
1913
 
 
1914
def clean_selftest_output(root=None, quiet=False):
 
1915
    """Remove all selftest output directories from root directory.
 
1916
 
 
1917
    :param  root:   root directory for clean
 
1918
                    (if ommitted or None then clean current directory).
 
1919
    :param  quiet:  suppress report about deleting directories
 
1920
    """
 
1921
    import re
 
1922
    import shutil
 
1923
 
 
1924
    re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
 
1925
    if root is None:
 
1926
        root = u'.'
 
1927
    for i in os.listdir(root):
 
1928
        if os.path.isdir(i) and re_dir.match(i):
 
1929
            if not quiet:
 
1930
                print 'delete directory:', i
 
1931
            shutil.rmtree(i)