/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Pool
  • Date: 2007-03-07 01:31:55 UTC
  • mto: (2321.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 2322.
  • Revision ID: mbp@sourcefrog.net-20070307013155-kxvc6ppleyv8jswg
Add blackbox test that join gives clean error when the repository doesn't support rich roots

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import doctest
 
33
import errno
 
34
import logging
 
35
import os
 
36
from pprint import pformat
 
37
import re
 
38
import shlex
 
39
import stat
 
40
from subprocess import Popen, PIPE
 
41
import sys
 
42
import tempfile
 
43
import unittest
 
44
import time
 
45
 
 
46
 
 
47
from bzrlib import (
 
48
    bzrdir,
 
49
    debug,
 
50
    errors,
 
51
    memorytree,
 
52
    osutils,
 
53
    progress,
 
54
    urlutils,
 
55
    )
 
56
import bzrlib.branch
 
57
import bzrlib.commands
 
58
import bzrlib.bundle.serializer
 
59
import bzrlib.export
 
60
import bzrlib.inventory
 
61
import bzrlib.iterablefile
 
62
import bzrlib.lockdir
 
63
try:
 
64
    import bzrlib.lsprof
 
65
except ImportError:
 
66
    # lsprof not available
 
67
    pass
 
68
from bzrlib.merge import merge_inner
 
69
import bzrlib.merge3
 
70
import bzrlib.osutils
 
71
import bzrlib.plugin
 
72
from bzrlib.revision import common_ancestor
 
73
import bzrlib.store
 
74
from bzrlib import symbol_versioning
 
75
import bzrlib.trace
 
76
from bzrlib.transport import get_transport
 
77
import bzrlib.transport
 
78
from bzrlib.transport.local import LocalURLServer
 
79
from bzrlib.transport.memory import MemoryServer
 
80
from bzrlib.transport.readonly import ReadonlyServer
 
81
from bzrlib.trace import mutter, note
 
82
from bzrlib.tests import TestUtil
 
83
from bzrlib.tests.HttpServer import HttpServer
 
84
from bzrlib.tests.TestUtil import (
 
85
                          TestSuite,
 
86
                          TestLoader,
 
87
                          )
 
88
from bzrlib.tests.treeshape import build_tree_contents
 
89
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
90
 
 
91
default_transport = LocalURLServer
 
92
 
 
93
MODULES_TO_TEST = []
 
94
MODULES_TO_DOCTEST = [
 
95
                      bzrlib.bundle.serializer,
 
96
                      bzrlib.errors,
 
97
                      bzrlib.export,
 
98
                      bzrlib.inventory,
 
99
                      bzrlib.iterablefile,
 
100
                      bzrlib.lockdir,
 
101
                      bzrlib.merge3,
 
102
                      bzrlib.option,
 
103
                      bzrlib.store,
 
104
                      ]
 
105
 
 
106
 
 
107
def packages_to_test():
 
108
    """Return a list of packages to test.
 
109
 
 
110
    The packages are not globally imported so that import failures are
 
111
    triggered when running selftest, not when importing the command.
 
112
    """
 
113
    import bzrlib.doc
 
114
    import bzrlib.tests.blackbox
 
115
    import bzrlib.tests.branch_implementations
 
116
    import bzrlib.tests.bzrdir_implementations
 
117
    import bzrlib.tests.interrepository_implementations
 
118
    import bzrlib.tests.interversionedfile_implementations
 
119
    import bzrlib.tests.intertree_implementations
 
120
    import bzrlib.tests.repository_implementations
 
121
    import bzrlib.tests.revisionstore_implementations
 
122
    import bzrlib.tests.tree_implementations
 
123
    import bzrlib.tests.workingtree_implementations
 
124
    return [
 
125
            bzrlib.doc,
 
126
            bzrlib.tests.blackbox,
 
127
            bzrlib.tests.branch_implementations,
 
128
            bzrlib.tests.bzrdir_implementations,
 
129
            bzrlib.tests.interrepository_implementations,
 
130
            bzrlib.tests.interversionedfile_implementations,
 
131
            bzrlib.tests.intertree_implementations,
 
132
            bzrlib.tests.repository_implementations,
 
133
            bzrlib.tests.revisionstore_implementations,
 
134
            bzrlib.tests.tree_implementations,
 
135
            bzrlib.tests.workingtree_implementations,
 
136
            ]
 
137
 
 
138
 
 
139
class ExtendedTestResult(unittest._TextTestResult):
 
140
    """Accepts, reports and accumulates the results of running tests.
 
141
 
 
142
    Compared to this unittest version this class adds support for profiling,
 
143
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
144
    There are further-specialized subclasses for different types of display.
 
145
    """
 
146
 
 
147
    stop_early = False
 
148
    
 
149
    def __init__(self, stream, descriptions, verbosity,
 
150
                 bench_history=None,
 
151
                 num_tests=None,
 
152
                 ):
 
153
        """Construct new TestResult.
 
154
 
 
155
        :param bench_history: Optionally, a writable file object to accumulate
 
156
            benchmark results.
 
157
        """
 
158
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
159
        if bench_history is not None:
 
160
            from bzrlib.version import _get_bzr_source_tree
 
161
            src_tree = _get_bzr_source_tree()
 
162
            if src_tree:
 
163
                try:
 
164
                    revision_id = src_tree.get_parent_ids()[0]
 
165
                except IndexError:
 
166
                    # XXX: if this is a brand new tree, do the same as if there
 
167
                    # is no branch.
 
168
                    revision_id = ''
 
169
            else:
 
170
                # XXX: If there's no branch, what should we do?
 
171
                revision_id = ''
 
172
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
173
        self._bench_history = bench_history
 
174
        self.ui = bzrlib.ui.ui_factory
 
175
        self.num_tests = num_tests
 
176
        self.error_count = 0
 
177
        self.failure_count = 0
 
178
        self.skip_count = 0
 
179
        self.count = 0
 
180
        self._overall_start_time = time.time()
 
181
    
 
182
    def extractBenchmarkTime(self, testCase):
 
183
        """Add a benchmark time for the current test case."""
 
184
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
185
    
 
186
    def _elapsedTestTimeString(self):
 
187
        """Return a time string for the overall time the current test has taken."""
 
188
        return self._formatTime(time.time() - self._start_time)
 
189
 
 
190
    def _testTimeString(self):
 
191
        if self._benchmarkTime is not None:
 
192
            return "%s/%s" % (
 
193
                self._formatTime(self._benchmarkTime),
 
194
                self._elapsedTestTimeString())
 
195
        else:
 
196
            return "           %s" % self._elapsedTestTimeString()
 
197
 
 
198
    def _formatTime(self, seconds):
 
199
        """Format seconds as milliseconds with leading spaces."""
 
200
        # some benchmarks can take thousands of seconds to run, so we need 8
 
201
        # places
 
202
        return "%8dms" % (1000 * seconds)
 
203
 
 
204
    def _shortened_test_description(self, test):
 
205
        what = test.id()
 
206
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
207
        return what
 
208
 
 
209
    def startTest(self, test):
 
210
        unittest.TestResult.startTest(self, test)
 
211
        self.report_test_start(test)
 
212
        self._recordTestStartTime()
 
213
 
 
214
    def _recordTestStartTime(self):
 
215
        """Record that a test has started."""
 
216
        self._start_time = time.time()
 
217
 
 
218
    def addError(self, test, err):
 
219
        if isinstance(err[1], TestSkipped):
 
220
            return self.addSkipped(test, err)    
 
221
        unittest.TestResult.addError(self, test, err)
 
222
        # We can only do this if we have one of our TestCases, not if
 
223
        # we have a doctest.
 
224
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
225
        if setKeepLogfile is not None:
 
226
            setKeepLogfile()
 
227
        self.extractBenchmarkTime(test)
 
228
        self.report_error(test, err)
 
229
        if self.stop_early:
 
230
            self.stop()
 
231
 
 
232
    def addFailure(self, test, err):
 
233
        unittest.TestResult.addFailure(self, test, err)
 
234
        # We can only do this if we have one of our TestCases, not if
 
235
        # we have a doctest.
 
236
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
237
        if setKeepLogfile is not None:
 
238
            setKeepLogfile()
 
239
        self.extractBenchmarkTime(test)
 
240
        self.report_failure(test, err)
 
241
        if self.stop_early:
 
242
            self.stop()
 
243
 
 
244
    def addSuccess(self, test):
 
245
        self.extractBenchmarkTime(test)
 
246
        if self._bench_history is not None:
 
247
            if self._benchmarkTime is not None:
 
248
                self._bench_history.write("%s %s\n" % (
 
249
                    self._formatTime(self._benchmarkTime),
 
250
                    test.id()))
 
251
        self.report_success(test)
 
252
        unittest.TestResult.addSuccess(self, test)
 
253
 
 
254
    def addSkipped(self, test, skip_excinfo):
 
255
        self.extractBenchmarkTime(test)
 
256
        self.report_skip(test, skip_excinfo)
 
257
        # seems best to treat this as success from point-of-view of unittest
 
258
        # -- it actually does nothing so it barely matters :)
 
259
        try:
 
260
            test.tearDown()
 
261
        except KeyboardInterrupt:
 
262
            raise
 
263
        except:
 
264
            self.addError(test, test.__exc_info())
 
265
        else:
 
266
            unittest.TestResult.addSuccess(self, test)
 
267
 
 
268
    def printErrorList(self, flavour, errors):
 
269
        for test, err in errors:
 
270
            self.stream.writeln(self.separator1)
 
271
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
272
            if getattr(test, '_get_log', None) is not None:
 
273
                print >>self.stream
 
274
                print >>self.stream, \
 
275
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
276
                print >>self.stream, test._get_log()
 
277
                print >>self.stream, \
 
278
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
279
            self.stream.writeln(self.separator2)
 
280
            self.stream.writeln("%s" % err)
 
281
 
 
282
    def finished(self):
 
283
        pass
 
284
 
 
285
    def report_cleaning_up(self):
 
286
        pass
 
287
 
 
288
    def report_success(self, test):
 
289
        pass
 
290
 
 
291
 
 
292
class TextTestResult(ExtendedTestResult):
 
293
    """Displays progress and results of tests in text form"""
 
294
 
 
295
    def __init__(self, *args, **kw):
 
296
        ExtendedTestResult.__init__(self, *args, **kw)
 
297
        self.pb = self.ui.nested_progress_bar()
 
298
        self.pb.show_pct = False
 
299
        self.pb.show_spinner = False
 
300
        self.pb.show_eta = False, 
 
301
        self.pb.show_count = False
 
302
        self.pb.show_bar = False
 
303
 
 
304
    def report_starting(self):
 
305
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
306
 
 
307
    def _progress_prefix_text(self):
 
308
        a = '[%d' % self.count
 
309
        if self.num_tests is not None:
 
310
            a +='/%d' % self.num_tests
 
311
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
312
        if self.error_count:
 
313
            a += ', %d errors' % self.error_count
 
314
        if self.failure_count:
 
315
            a += ', %d failed' % self.failure_count
 
316
        if self.skip_count:
 
317
            a += ', %d skipped' % self.skip_count
 
318
        a += ']'
 
319
        return a
 
320
 
 
321
    def report_test_start(self, test):
 
322
        self.count += 1
 
323
        self.pb.update(
 
324
                self._progress_prefix_text()
 
325
                + ' ' 
 
326
                + self._shortened_test_description(test))
 
327
 
 
328
    def report_error(self, test, err):
 
329
        self.error_count += 1
 
330
        self.pb.note('ERROR: %s\n    %s\n', 
 
331
            self._shortened_test_description(test),
 
332
            err[1],
 
333
            )
 
334
 
 
335
    def report_failure(self, test, err):
 
336
        self.failure_count += 1
 
337
        self.pb.note('FAIL: %s\n    %s\n', 
 
338
            self._shortened_test_description(test),
 
339
            err[1],
 
340
            )
 
341
 
 
342
    def report_skip(self, test, skip_excinfo):
 
343
        self.skip_count += 1
 
344
        if False:
 
345
            # at the moment these are mostly not things we can fix
 
346
            # and so they just produce stipple; use the verbose reporter
 
347
            # to see them.
 
348
            if False:
 
349
                # show test and reason for skip
 
350
                self.pb.note('SKIP: %s\n    %s\n', 
 
351
                    self._shortened_test_description(test),
 
352
                    skip_excinfo[1])
 
353
            else:
 
354
                # since the class name was left behind in the still-visible
 
355
                # progress bar...
 
356
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
357
 
 
358
    def report_cleaning_up(self):
 
359
        self.pb.update('cleaning up...')
 
360
 
 
361
    def finished(self):
 
362
        self.pb.finished()
 
363
 
 
364
 
 
365
class VerboseTestResult(ExtendedTestResult):
 
366
    """Produce long output, with one line per test run plus times"""
 
367
 
 
368
    def _ellipsize_to_right(self, a_string, final_width):
 
369
        """Truncate and pad a string, keeping the right hand side"""
 
370
        if len(a_string) > final_width:
 
371
            result = '...' + a_string[3-final_width:]
 
372
        else:
 
373
            result = a_string
 
374
        return result.ljust(final_width)
 
375
 
 
376
    def report_starting(self):
 
377
        self.stream.write('running %d tests...\n' % self.num_tests)
 
378
 
 
379
    def report_test_start(self, test):
 
380
        self.count += 1
 
381
        name = self._shortened_test_description(test)
 
382
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
383
        # numbers, plus a trailing blank
 
384
        self.stream.write(self._ellipsize_to_right(name,
 
385
                            osutils.terminal_width()-30))
 
386
        self.stream.flush()
 
387
 
 
388
    def report_error(self, test, err):
 
389
        self.error_count += 1
 
390
        self.stream.writeln('ERROR %s\n    %s' 
 
391
                % (self._testTimeString(), err[1]))
 
392
 
 
393
    def report_failure(self, test, err):
 
394
        self.failure_count += 1
 
395
        self.stream.writeln(' FAIL %s\n    %s'
 
396
                % (self._testTimeString(), err[1]))
 
397
 
 
398
    def report_success(self, test):
 
399
        self.stream.writeln('   OK %s' % self._testTimeString())
 
400
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
401
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
402
            stats.pprint(file=self.stream)
 
403
        self.stream.flush()
 
404
 
 
405
    def report_skip(self, test, skip_excinfo):
 
406
        print >>self.stream, ' SKIP %s' % self._testTimeString()
 
407
        print >>self.stream, '     %s' % skip_excinfo[1]
 
408
 
 
409
 
 
410
class TextTestRunner(object):
 
411
    stop_on_failure = False
 
412
 
 
413
    def __init__(self,
 
414
                 stream=sys.stderr,
 
415
                 descriptions=0,
 
416
                 verbosity=1,
 
417
                 keep_output=False,
 
418
                 bench_history=None):
 
419
        self.stream = unittest._WritelnDecorator(stream)
 
420
        self.descriptions = descriptions
 
421
        self.verbosity = verbosity
 
422
        self.keep_output = keep_output
 
423
        self._bench_history = bench_history
 
424
 
 
425
    def run(self, test):
 
426
        "Run the given test case or test suite."
 
427
        startTime = time.time()
 
428
        if self.verbosity == 1:
 
429
            result_class = TextTestResult
 
430
        elif self.verbosity >= 2:
 
431
            result_class = VerboseTestResult
 
432
        result = result_class(self.stream,
 
433
                              self.descriptions,
 
434
                              self.verbosity,
 
435
                              bench_history=self._bench_history,
 
436
                              num_tests=test.countTestCases(),
 
437
                              )
 
438
        result.stop_early = self.stop_on_failure
 
439
        result.report_starting()
 
440
        test.run(result)
 
441
        stopTime = time.time()
 
442
        timeTaken = stopTime - startTime
 
443
        result.printErrors()
 
444
        self.stream.writeln(result.separator2)
 
445
        run = result.testsRun
 
446
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
447
                            (run, run != 1 and "s" or "", timeTaken))
 
448
        self.stream.writeln()
 
449
        if not result.wasSuccessful():
 
450
            self.stream.write("FAILED (")
 
451
            failed, errored = map(len, (result.failures, result.errors))
 
452
            if failed:
 
453
                self.stream.write("failures=%d" % failed)
 
454
            if errored:
 
455
                if failed: self.stream.write(", ")
 
456
                self.stream.write("errors=%d" % errored)
 
457
            self.stream.writeln(")")
 
458
        else:
 
459
            self.stream.writeln("OK")
 
460
        result.report_cleaning_up()
 
461
        # This is still a little bogus, 
 
462
        # but only a little. Folk not using our testrunner will
 
463
        # have to delete their temp directories themselves.
 
464
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
 
465
        if result.wasSuccessful() or not self.keep_output:
 
466
            if test_root is not None:
 
467
                # If LANG=C we probably have created some bogus paths
 
468
                # which rmtree(unicode) will fail to delete
 
469
                # so make sure we are using rmtree(str) to delete everything
 
470
                # except on win32, where rmtree(str) will fail
 
471
                # since it doesn't have the property of byte-stream paths
 
472
                # (they are either ascii or mbcs)
 
473
                if sys.platform == 'win32':
 
474
                    # make sure we are using the unicode win32 api
 
475
                    test_root = unicode(test_root)
 
476
                else:
 
477
                    test_root = test_root.encode(
 
478
                        sys.getfilesystemencoding())
 
479
                try:
 
480
                    osutils.rmtree(test_root)
 
481
                except OSError, e:
 
482
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
483
                        print >>sys.stderr, ('Permission denied: '
 
484
                                             'unable to remove testing dir '
 
485
                                             '%s' % os.path.basename(test_root))
 
486
                    else:
 
487
                        raise
 
488
        else:
 
489
            note("Failed tests working directories are in '%s'\n", test_root)
 
490
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
491
        result.finished()
 
492
        return result
 
493
 
 
494
 
 
495
def iter_suite_tests(suite):
 
496
    """Return all tests in a suite, recursing through nested suites"""
 
497
    for item in suite._tests:
 
498
        if isinstance(item, unittest.TestCase):
 
499
            yield item
 
500
        elif isinstance(item, unittest.TestSuite):
 
501
            for r in iter_suite_tests(item):
 
502
                yield r
 
503
        else:
 
504
            raise Exception('unknown object %r inside test suite %r'
 
505
                            % (item, suite))
 
506
 
 
507
 
 
508
class TestSkipped(Exception):
 
509
    """Indicates that a test was intentionally skipped, rather than failing."""
 
510
 
 
511
 
 
512
class CommandFailed(Exception):
 
513
    pass
 
514
 
 
515
 
 
516
class StringIOWrapper(object):
 
517
    """A wrapper around cStringIO which just adds an encoding attribute.
 
518
    
 
519
    Internally we can check sys.stdout to see what the output encoding
 
520
    should be. However, cStringIO has no encoding attribute that we can
 
521
    set. So we wrap it instead.
 
522
    """
 
523
    encoding='ascii'
 
524
    _cstring = None
 
525
 
 
526
    def __init__(self, s=None):
 
527
        if s is not None:
 
528
            self.__dict__['_cstring'] = StringIO(s)
 
529
        else:
 
530
            self.__dict__['_cstring'] = StringIO()
 
531
 
 
532
    def __getattr__(self, name, getattr=getattr):
 
533
        return getattr(self.__dict__['_cstring'], name)
 
534
 
 
535
    def __setattr__(self, name, val):
 
536
        if name == 'encoding':
 
537
            self.__dict__['encoding'] = val
 
538
        else:
 
539
            return setattr(self._cstring, name, val)
 
540
 
 
541
 
 
542
class TestCase(unittest.TestCase):
 
543
    """Base class for bzr unit tests.
 
544
    
 
545
    Tests that need access to disk resources should subclass 
 
546
    TestCaseInTempDir not TestCase.
 
547
 
 
548
    Error and debug log messages are redirected from their usual
 
549
    location into a temporary file, the contents of which can be
 
550
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
551
    so that it can also capture file IO.  When the test completes this file
 
552
    is read into memory and removed from disk.
 
553
       
 
554
    There are also convenience functions to invoke bzr's command-line
 
555
    routine, and to build and check bzr trees.
 
556
   
 
557
    In addition to the usual method of overriding tearDown(), this class also
 
558
    allows subclasses to register functions into the _cleanups list, which is
 
559
    run in order as the object is torn down.  It's less likely this will be
 
560
    accidentally overlooked.
 
561
    """
 
562
 
 
563
    _log_file_name = None
 
564
    _log_contents = ''
 
565
    _keep_log_file = False
 
566
    # record lsprof data when performing benchmark calls.
 
567
    _gather_lsprof_in_benchmarks = False
 
568
 
 
569
    def __init__(self, methodName='testMethod'):
 
570
        super(TestCase, self).__init__(methodName)
 
571
        self._cleanups = []
 
572
 
 
573
    def setUp(self):
 
574
        unittest.TestCase.setUp(self)
 
575
        self._cleanEnvironment()
 
576
        bzrlib.trace.disable_default_logging()
 
577
        self._silenceUI()
 
578
        self._startLogFile()
 
579
        self._benchcalls = []
 
580
        self._benchtime = None
 
581
        # prevent hooks affecting tests
 
582
        self._preserved_hooks = bzrlib.branch.Branch.hooks
 
583
        self.addCleanup(self._restoreHooks)
 
584
        # this list of hooks must be kept in sync with the defaults
 
585
        # in branch.py
 
586
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
587
 
 
588
    def _silenceUI(self):
 
589
        """Turn off UI for duration of test"""
 
590
        # by default the UI is off; tests can turn it on if they want it.
 
591
        saved = bzrlib.ui.ui_factory
 
592
        def _restore():
 
593
            bzrlib.ui.ui_factory = saved
 
594
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
595
        self.addCleanup(_restore)
 
596
 
 
597
    def _ndiff_strings(self, a, b):
 
598
        """Return ndiff between two strings containing lines.
 
599
        
 
600
        A trailing newline is added if missing to make the strings
 
601
        print properly."""
 
602
        if b and b[-1] != '\n':
 
603
            b += '\n'
 
604
        if a and a[-1] != '\n':
 
605
            a += '\n'
 
606
        difflines = difflib.ndiff(a.splitlines(True),
 
607
                                  b.splitlines(True),
 
608
                                  linejunk=lambda x: False,
 
609
                                  charjunk=lambda x: False)
 
610
        return ''.join(difflines)
 
611
 
 
612
    def assertEqual(self, a, b, message=''):
 
613
        if a == b:
 
614
            return
 
615
        if message:
 
616
            message += '\n'
 
617
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
618
            % (message,
 
619
               pformat(a, indent=4), pformat(b, indent=4)))
 
620
 
 
621
    assertEquals = assertEqual
 
622
 
 
623
    def assertEqualDiff(self, a, b, message=None):
 
624
        """Assert two texts are equal, if not raise an exception.
 
625
        
 
626
        This is intended for use with multi-line strings where it can 
 
627
        be hard to find the differences by eye.
 
628
        """
 
629
        # TODO: perhaps override assertEquals to call this for strings?
 
630
        if a == b:
 
631
            return
 
632
        if message is None:
 
633
            message = "texts not equal:\n"
 
634
        raise AssertionError(message + 
 
635
                             self._ndiff_strings(a, b))      
 
636
        
 
637
    def assertEqualMode(self, mode, mode_test):
 
638
        self.assertEqual(mode, mode_test,
 
639
                         'mode mismatch %o != %o' % (mode, mode_test))
 
640
 
 
641
    def assertStartsWith(self, s, prefix):
 
642
        if not s.startswith(prefix):
 
643
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
644
 
 
645
    def assertEndsWith(self, s, suffix):
 
646
        """Asserts that s ends with suffix."""
 
647
        if not s.endswith(suffix):
 
648
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
649
 
 
650
    def assertContainsRe(self, haystack, needle_re):
 
651
        """Assert that a contains something matching a regular expression."""
 
652
        if not re.search(needle_re, haystack):
 
653
            raise AssertionError('pattern "%r" not found in "%r"'
 
654
                    % (needle_re, haystack))
 
655
 
 
656
    def assertNotContainsRe(self, haystack, needle_re):
 
657
        """Assert that a does not match a regular expression"""
 
658
        if re.search(needle_re, haystack):
 
659
            raise AssertionError('pattern "%s" found in "%s"'
 
660
                    % (needle_re, haystack))
 
661
 
 
662
    def assertSubset(self, sublist, superlist):
 
663
        """Assert that every entry in sublist is present in superlist."""
 
664
        missing = []
 
665
        for entry in sublist:
 
666
            if entry not in superlist:
 
667
                missing.append(entry)
 
668
        if len(missing) > 0:
 
669
            raise AssertionError("value(s) %r not present in container %r" % 
 
670
                                 (missing, superlist))
 
671
 
 
672
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
673
        """Fail unless excClass is raised when the iterator from func is used.
 
674
        
 
675
        Many functions can return generators this makes sure
 
676
        to wrap them in a list() call to make sure the whole generator
 
677
        is run, and that the proper exception is raised.
 
678
        """
 
679
        try:
 
680
            list(func(*args, **kwargs))
 
681
        except excClass:
 
682
            return
 
683
        else:
 
684
            if getattr(excClass,'__name__', None) is not None:
 
685
                excName = excClass.__name__
 
686
            else:
 
687
                excName = str(excClass)
 
688
            raise self.failureException, "%s not raised" % excName
 
689
 
 
690
    def assertIs(self, left, right, message=None):
 
691
        if not (left is right):
 
692
            if message is not None:
 
693
                raise AssertionError(message)
 
694
            else:
 
695
                raise AssertionError("%r is not %r." % (left, right))
 
696
 
 
697
    def assertIsNot(self, left, right, message=None):
 
698
        if (left is right):
 
699
            if message is not None:
 
700
                raise AssertionError(message)
 
701
            else:
 
702
                raise AssertionError("%r is %r." % (left, right))
 
703
 
 
704
    def assertTransportMode(self, transport, path, mode):
 
705
        """Fail if a path does not have mode mode.
 
706
        
 
707
        If modes are not supported on this transport, the assertion is ignored.
 
708
        """
 
709
        if not transport._can_roundtrip_unix_modebits():
 
710
            return
 
711
        path_stat = transport.stat(path)
 
712
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
713
        self.assertEqual(mode, actual_mode,
 
714
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
715
 
 
716
    def assertIsInstance(self, obj, kls):
 
717
        """Fail if obj is not an instance of kls"""
 
718
        if not isinstance(obj, kls):
 
719
            self.fail("%r is an instance of %s rather than %s" % (
 
720
                obj, obj.__class__, kls))
 
721
 
 
722
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
723
        """A helper for callDeprecated and applyDeprecated.
 
724
 
 
725
        :param a_callable: A callable to call.
 
726
        :param args: The positional arguments for the callable
 
727
        :param kwargs: The keyword arguments for the callable
 
728
        :return: A tuple (warnings, result). result is the result of calling
 
729
            a_callable(*args, **kwargs).
 
730
        """
 
731
        local_warnings = []
 
732
        def capture_warnings(msg, cls=None, stacklevel=None):
 
733
            # we've hooked into a deprecation specific callpath,
 
734
            # only deprecations should getting sent via it.
 
735
            self.assertEqual(cls, DeprecationWarning)
 
736
            local_warnings.append(msg)
 
737
        original_warning_method = symbol_versioning.warn
 
738
        symbol_versioning.set_warning_method(capture_warnings)
 
739
        try:
 
740
            result = a_callable(*args, **kwargs)
 
741
        finally:
 
742
            symbol_versioning.set_warning_method(original_warning_method)
 
743
        return (local_warnings, result)
 
744
 
 
745
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
746
        """Call a deprecated callable without warning the user.
 
747
 
 
748
        :param deprecation_format: The deprecation format that the callable
 
749
            should have been deprecated with. This is the same type as the 
 
750
            parameter to deprecated_method/deprecated_function. If the 
 
751
            callable is not deprecated with this format, an assertion error
 
752
            will be raised.
 
753
        :param a_callable: A callable to call. This may be a bound method or
 
754
            a regular function. It will be called with *args and **kwargs.
 
755
        :param args: The positional arguments for the callable
 
756
        :param kwargs: The keyword arguments for the callable
 
757
        :return: The result of a_callable(*args, **kwargs)
 
758
        """
 
759
        call_warnings, result = self._capture_warnings(a_callable,
 
760
            *args, **kwargs)
 
761
        expected_first_warning = symbol_versioning.deprecation_string(
 
762
            a_callable, deprecation_format)
 
763
        if len(call_warnings) == 0:
 
764
            self.fail("No deprecation warning generated by call to %s" %
 
765
                a_callable)
 
766
        self.assertEqual(expected_first_warning, call_warnings[0])
 
767
        return result
 
768
 
 
769
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
770
        """Assert that a callable is deprecated in a particular way.
 
771
 
 
772
        This is a very precise test for unusual requirements. The 
 
773
        applyDeprecated helper function is probably more suited for most tests
 
774
        as it allows you to simply specify the deprecation format being used
 
775
        and will ensure that that is issued for the function being called.
 
776
 
 
777
        :param expected: a list of the deprecation warnings expected, in order
 
778
        :param callable: The callable to call
 
779
        :param args: The positional arguments for the callable
 
780
        :param kwargs: The keyword arguments for the callable
 
781
        """
 
782
        call_warnings, result = self._capture_warnings(callable,
 
783
            *args, **kwargs)
 
784
        self.assertEqual(expected, call_warnings)
 
785
        return result
 
786
 
 
787
    def _startLogFile(self):
 
788
        """Send bzr and test log messages to a temporary file.
 
789
 
 
790
        The file is removed as the test is torn down.
 
791
        """
 
792
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
793
        self._log_file = os.fdopen(fileno, 'w+')
 
794
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
795
        self._log_file_name = name
 
796
        self.addCleanup(self._finishLogFile)
 
797
 
 
798
    def _finishLogFile(self):
 
799
        """Finished with the log file.
 
800
 
 
801
        Close the file and delete it, unless setKeepLogfile was called.
 
802
        """
 
803
        if self._log_file is None:
 
804
            return
 
805
        bzrlib.trace.disable_test_log(self._log_nonce)
 
806
        self._log_file.close()
 
807
        self._log_file = None
 
808
        if not self._keep_log_file:
 
809
            os.remove(self._log_file_name)
 
810
            self._log_file_name = None
 
811
 
 
812
    def setKeepLogfile(self):
 
813
        """Make the logfile not be deleted when _finishLogFile is called."""
 
814
        self._keep_log_file = True
 
815
 
 
816
    def addCleanup(self, callable):
 
817
        """Arrange to run a callable when this case is torn down.
 
818
 
 
819
        Callables are run in the reverse of the order they are registered, 
 
820
        ie last-in first-out.
 
821
        """
 
822
        if callable in self._cleanups:
 
823
            raise ValueError("cleanup function %r already registered on %s" 
 
824
                    % (callable, self))
 
825
        self._cleanups.append(callable)
 
826
 
 
827
    def _cleanEnvironment(self):
 
828
        new_env = {
 
829
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
830
            'HOME': os.getcwd(),
 
831
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
832
            'BZR_EMAIL': None,
 
833
            'BZREMAIL': None, # may still be present in the environment
 
834
            'EMAIL': None,
 
835
            'BZR_PROGRESS_BAR': None,
 
836
            # Proxies
 
837
            'http_proxy': None,
 
838
            'HTTP_PROXY': None,
 
839
            'https_proxy': None,
 
840
            'HTTPS_PROXY': None,
 
841
            'no_proxy': None,
 
842
            'NO_PROXY': None,
 
843
            'all_proxy': None,
 
844
            'ALL_PROXY': None,
 
845
            # Nobody cares about these ones AFAIK. So far at
 
846
            # least. If you do (care), please update this comment
 
847
            # -- vila 20061212
 
848
            'ftp_proxy': None,
 
849
            'FTP_PROXY': None,
 
850
        }
 
851
        self.__old_env = {}
 
852
        self.addCleanup(self._restoreEnvironment)
 
853
        for name, value in new_env.iteritems():
 
854
            self._captureVar(name, value)
 
855
 
 
856
    def _captureVar(self, name, newvalue):
 
857
        """Set an environment variable, and reset it when finished."""
 
858
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
859
 
 
860
    def _restoreEnvironment(self):
 
861
        for name, value in self.__old_env.iteritems():
 
862
            osutils.set_or_unset_env(name, value)
 
863
 
 
864
    def _restoreHooks(self):
 
865
        bzrlib.branch.Branch.hooks = self._preserved_hooks
 
866
 
 
867
    def tearDown(self):
 
868
        self._runCleanups()
 
869
        unittest.TestCase.tearDown(self)
 
870
 
 
871
    def time(self, callable, *args, **kwargs):
 
872
        """Run callable and accrue the time it takes to the benchmark time.
 
873
        
 
874
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
875
        this will cause lsprofile statistics to be gathered and stored in
 
876
        self._benchcalls.
 
877
        """
 
878
        if self._benchtime is None:
 
879
            self._benchtime = 0
 
880
        start = time.time()
 
881
        try:
 
882
            if not self._gather_lsprof_in_benchmarks:
 
883
                return callable(*args, **kwargs)
 
884
            else:
 
885
                # record this benchmark
 
886
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
887
                stats.sort()
 
888
                self._benchcalls.append(((callable, args, kwargs), stats))
 
889
                return ret
 
890
        finally:
 
891
            self._benchtime += time.time() - start
 
892
 
 
893
    def _runCleanups(self):
 
894
        """Run registered cleanup functions. 
 
895
 
 
896
        This should only be called from TestCase.tearDown.
 
897
        """
 
898
        # TODO: Perhaps this should keep running cleanups even if 
 
899
        # one of them fails?
 
900
        for cleanup_fn in reversed(self._cleanups):
 
901
            cleanup_fn()
 
902
 
 
903
    def log(self, *args):
 
904
        mutter(*args)
 
905
 
 
906
    def _get_log(self, keep_log_file=False):
 
907
        """Return as a string the log for this test. If the file is still
 
908
        on disk and keep_log_file=False, delete the log file and store the
 
909
        content in self._log_contents."""
 
910
        # flush the log file, to get all content
 
911
        import bzrlib.trace
 
912
        bzrlib.trace._trace_file.flush()
 
913
        if self._log_contents:
 
914
            return self._log_contents
 
915
        if self._log_file_name is not None:
 
916
            logfile = open(self._log_file_name)
 
917
            try:
 
918
                log_contents = logfile.read()
 
919
            finally:
 
920
                logfile.close()
 
921
            if not keep_log_file:
 
922
                self._log_contents = log_contents
 
923
                try:
 
924
                    os.remove(self._log_file_name)
 
925
                except OSError, e:
 
926
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
927
                        print >>sys.stderr, ('Unable to delete log file '
 
928
                                             ' %r' % self._log_file_name)
 
929
                    else:
 
930
                        raise
 
931
            return log_contents
 
932
        else:
 
933
            return "DELETED log file to reduce memory footprint"
 
934
 
 
935
    def capture(self, cmd, retcode=0):
 
936
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
937
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
938
 
 
939
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
940
                         working_dir=None):
 
941
        """Invoke bzr and return (stdout, stderr).
 
942
 
 
943
        Useful for code that wants to check the contents of the
 
944
        output, the way error messages are presented, etc.
 
945
 
 
946
        This should be the main method for tests that want to exercise the
 
947
        overall behavior of the bzr application (rather than a unit test
 
948
        or a functional test of the library.)
 
949
 
 
950
        Much of the old code runs bzr by forking a new copy of Python, but
 
951
        that is slower, harder to debug, and generally not necessary.
 
952
 
 
953
        This runs bzr through the interface that catches and reports
 
954
        errors, and with logging set to something approximating the
 
955
        default, so that error reporting can be checked.
 
956
 
 
957
        :param argv: arguments to invoke bzr
 
958
        :param retcode: expected return code, or None for don't-care.
 
959
        :param encoding: encoding for sys.stdout and sys.stderr
 
960
        :param stdin: A string to be used as stdin for the command.
 
961
        :param working_dir: Change to this directory before running
 
962
        """
 
963
        if encoding is None:
 
964
            encoding = bzrlib.user_encoding
 
965
        if stdin is not None:
 
966
            stdin = StringIO(stdin)
 
967
        stdout = StringIOWrapper()
 
968
        stderr = StringIOWrapper()
 
969
        stdout.encoding = encoding
 
970
        stderr.encoding = encoding
 
971
 
 
972
        self.log('run bzr: %r', argv)
 
973
        # FIXME: don't call into logging here
 
974
        handler = logging.StreamHandler(stderr)
 
975
        handler.setLevel(logging.INFO)
 
976
        logger = logging.getLogger('')
 
977
        logger.addHandler(handler)
 
978
        old_ui_factory = bzrlib.ui.ui_factory
 
979
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
980
            stdout=stdout,
 
981
            stderr=stderr)
 
982
        bzrlib.ui.ui_factory.stdin = stdin
 
983
 
 
984
        cwd = None
 
985
        if working_dir is not None:
 
986
            cwd = osutils.getcwd()
 
987
            os.chdir(working_dir)
 
988
 
 
989
        try:
 
990
            saved_debug_flags = frozenset(debug.debug_flags)
 
991
            debug.debug_flags.clear()
 
992
            try:
 
993
                result = self.apply_redirected(stdin, stdout, stderr,
 
994
                                               bzrlib.commands.run_bzr_catch_errors,
 
995
                                               argv)
 
996
            finally:
 
997
                debug.debug_flags.update(saved_debug_flags)
 
998
        finally:
 
999
            logger.removeHandler(handler)
 
1000
            bzrlib.ui.ui_factory = old_ui_factory
 
1001
            if cwd is not None:
 
1002
                os.chdir(cwd)
 
1003
 
 
1004
        out = stdout.getvalue()
 
1005
        err = stderr.getvalue()
 
1006
        if out:
 
1007
            self.log('output:\n%r', out)
 
1008
        if err:
 
1009
            self.log('errors:\n%r', err)
 
1010
        if retcode is not None:
 
1011
            self.assertEquals(retcode, result)
 
1012
        return out, err
 
1013
 
 
1014
    def run_bzr(self, *args, **kwargs):
 
1015
        """Invoke bzr, as if it were run from the command line.
 
1016
 
 
1017
        This should be the main method for tests that want to exercise the
 
1018
        overall behavior of the bzr application (rather than a unit test
 
1019
        or a functional test of the library.)
 
1020
 
 
1021
        This sends the stdout/stderr results into the test's log,
 
1022
        where it may be useful for debugging.  See also run_captured.
 
1023
 
 
1024
        :param stdin: A string to be used as stdin for the command.
 
1025
        :param retcode: The status code the command should return
 
1026
        :param working_dir: The directory to run the command in
 
1027
        """
 
1028
        retcode = kwargs.pop('retcode', 0)
 
1029
        encoding = kwargs.pop('encoding', None)
 
1030
        stdin = kwargs.pop('stdin', None)
 
1031
        working_dir = kwargs.pop('working_dir', None)
 
1032
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
1033
                                     stdin=stdin, working_dir=working_dir)
 
1034
 
 
1035
    def run_bzr_decode(self, *args, **kwargs):
 
1036
        if 'encoding' in kwargs:
 
1037
            encoding = kwargs['encoding']
 
1038
        else:
 
1039
            encoding = bzrlib.user_encoding
 
1040
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1041
 
 
1042
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1043
        """Run bzr, and check that stderr contains the supplied regexes
 
1044
        
 
1045
        :param error_regexes: Sequence of regular expressions which 
 
1046
            must each be found in the error output. The relative ordering
 
1047
            is not enforced.
 
1048
        :param args: command-line arguments for bzr
 
1049
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1050
            This function changes the default value of retcode to be 3,
 
1051
            since in most cases this is run when you expect bzr to fail.
 
1052
        :return: (out, err) The actual output of running the command (in case you
 
1053
                 want to do more inspection)
 
1054
 
 
1055
        Examples of use:
 
1056
            # Make sure that commit is failing because there is nothing to do
 
1057
            self.run_bzr_error(['no changes to commit'],
 
1058
                               'commit', '-m', 'my commit comment')
 
1059
            # Make sure --strict is handling an unknown file, rather than
 
1060
            # giving us the 'nothing to do' error
 
1061
            self.build_tree(['unknown'])
 
1062
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1063
                               'commit', '--strict', '-m', 'my commit comment')
 
1064
        """
 
1065
        kwargs.setdefault('retcode', 3)
 
1066
        out, err = self.run_bzr(*args, **kwargs)
 
1067
        for regex in error_regexes:
 
1068
            self.assertContainsRe(err, regex)
 
1069
        return out, err
 
1070
 
 
1071
    def run_bzr_subprocess(self, *args, **kwargs):
 
1072
        """Run bzr in a subprocess for testing.
 
1073
 
 
1074
        This starts a new Python interpreter and runs bzr in there. 
 
1075
        This should only be used for tests that have a justifiable need for
 
1076
        this isolation: e.g. they are testing startup time, or signal
 
1077
        handling, or early startup code, etc.  Subprocess code can't be 
 
1078
        profiled or debugged so easily.
 
1079
 
 
1080
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1081
            None is supplied, the status code is not checked.
 
1082
        :param env_changes: A dictionary which lists changes to environment
 
1083
            variables. A value of None will unset the env variable.
 
1084
            The values must be strings. The change will only occur in the
 
1085
            child, so you don't need to fix the environment after running.
 
1086
        :param universal_newlines: Convert CRLF => LF
 
1087
        :param allow_plugins: By default the subprocess is run with
 
1088
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1089
            for system-wide plugins to create unexpected output on stderr,
 
1090
            which can cause unnecessary test failures.
 
1091
        """
 
1092
        env_changes = kwargs.get('env_changes', {})
 
1093
        working_dir = kwargs.get('working_dir', None)
 
1094
        allow_plugins = kwargs.get('allow_plugins', False)
 
1095
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1096
                                            working_dir=working_dir,
 
1097
                                            allow_plugins=allow_plugins)
 
1098
        # We distinguish between retcode=None and retcode not passed.
 
1099
        supplied_retcode = kwargs.get('retcode', 0)
 
1100
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1101
            universal_newlines=kwargs.get('universal_newlines', False),
 
1102
            process_args=args)
 
1103
 
 
1104
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1105
                             skip_if_plan_to_signal=False,
 
1106
                             working_dir=None,
 
1107
                             allow_plugins=False):
 
1108
        """Start bzr in a subprocess for testing.
 
1109
 
 
1110
        This starts a new Python interpreter and runs bzr in there.
 
1111
        This should only be used for tests that have a justifiable need for
 
1112
        this isolation: e.g. they are testing startup time, or signal
 
1113
        handling, or early startup code, etc.  Subprocess code can't be
 
1114
        profiled or debugged so easily.
 
1115
 
 
1116
        :param process_args: a list of arguments to pass to the bzr executable,
 
1117
            for example `['--version']`.
 
1118
        :param env_changes: A dictionary which lists changes to environment
 
1119
            variables. A value of None will unset the env variable.
 
1120
            The values must be strings. The change will only occur in the
 
1121
            child, so you don't need to fix the environment after running.
 
1122
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1123
            is not available.
 
1124
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1125
 
 
1126
        :returns: Popen object for the started process.
 
1127
        """
 
1128
        if skip_if_plan_to_signal:
 
1129
            if not getattr(os, 'kill', None):
 
1130
                raise TestSkipped("os.kill not available.")
 
1131
 
 
1132
        if env_changes is None:
 
1133
            env_changes = {}
 
1134
        old_env = {}
 
1135
 
 
1136
        def cleanup_environment():
 
1137
            for env_var, value in env_changes.iteritems():
 
1138
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1139
 
 
1140
        def restore_environment():
 
1141
            for env_var, value in old_env.iteritems():
 
1142
                osutils.set_or_unset_env(env_var, value)
 
1143
 
 
1144
        bzr_path = self.get_bzr_path()
 
1145
 
 
1146
        cwd = None
 
1147
        if working_dir is not None:
 
1148
            cwd = osutils.getcwd()
 
1149
            os.chdir(working_dir)
 
1150
 
 
1151
        try:
 
1152
            # win32 subprocess doesn't support preexec_fn
 
1153
            # so we will avoid using it on all platforms, just to
 
1154
            # make sure the code path is used, and we don't break on win32
 
1155
            cleanup_environment()
 
1156
            command = [sys.executable, bzr_path]
 
1157
            if not allow_plugins:
 
1158
                command.append('--no-plugins')
 
1159
            command.extend(process_args)
 
1160
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1161
        finally:
 
1162
            restore_environment()
 
1163
            if cwd is not None:
 
1164
                os.chdir(cwd)
 
1165
 
 
1166
        return process
 
1167
 
 
1168
    def _popen(self, *args, **kwargs):
 
1169
        """Place a call to Popen.
 
1170
 
 
1171
        Allows tests to override this method to intercept the calls made to
 
1172
        Popen for introspection.
 
1173
        """
 
1174
        return Popen(*args, **kwargs)
 
1175
 
 
1176
    def get_bzr_path(self):
 
1177
        """Return the path of the 'bzr' executable for this test suite."""
 
1178
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1179
        if not os.path.isfile(bzr_path):
 
1180
            # We are probably installed. Assume sys.argv is the right file
 
1181
            bzr_path = sys.argv[0]
 
1182
        return bzr_path
 
1183
 
 
1184
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1185
                              universal_newlines=False, process_args=None):
 
1186
        """Finish the execution of process.
 
1187
 
 
1188
        :param process: the Popen object returned from start_bzr_subprocess.
 
1189
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1190
            None is supplied, the status code is not checked.
 
1191
        :param send_signal: an optional signal to send to the process.
 
1192
        :param universal_newlines: Convert CRLF => LF
 
1193
        :returns: (stdout, stderr)
 
1194
        """
 
1195
        if send_signal is not None:
 
1196
            os.kill(process.pid, send_signal)
 
1197
        out, err = process.communicate()
 
1198
 
 
1199
        if universal_newlines:
 
1200
            out = out.replace('\r\n', '\n')
 
1201
            err = err.replace('\r\n', '\n')
 
1202
 
 
1203
        if retcode is not None and retcode != process.returncode:
 
1204
            if process_args is None:
 
1205
                process_args = "(unknown args)"
 
1206
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1207
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1208
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1209
                      % (process_args, retcode, process.returncode))
 
1210
        return [out, err]
 
1211
 
 
1212
    def check_inventory_shape(self, inv, shape):
 
1213
        """Compare an inventory to a list of expected names.
 
1214
 
 
1215
        Fail if they are not precisely equal.
 
1216
        """
 
1217
        extras = []
 
1218
        shape = list(shape)             # copy
 
1219
        for path, ie in inv.entries():
 
1220
            name = path.replace('\\', '/')
 
1221
            if ie.kind == 'dir':
 
1222
                name = name + '/'
 
1223
            if name in shape:
 
1224
                shape.remove(name)
 
1225
            else:
 
1226
                extras.append(name)
 
1227
        if shape:
 
1228
            self.fail("expected paths not found in inventory: %r" % shape)
 
1229
        if extras:
 
1230
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1231
 
 
1232
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1233
                         a_callable=None, *args, **kwargs):
 
1234
        """Call callable with redirected std io pipes.
 
1235
 
 
1236
        Returns the return code."""
 
1237
        if not callable(a_callable):
 
1238
            raise ValueError("a_callable must be callable.")
 
1239
        if stdin is None:
 
1240
            stdin = StringIO("")
 
1241
        if stdout is None:
 
1242
            if getattr(self, "_log_file", None) is not None:
 
1243
                stdout = self._log_file
 
1244
            else:
 
1245
                stdout = StringIO()
 
1246
        if stderr is None:
 
1247
            if getattr(self, "_log_file", None is not None):
 
1248
                stderr = self._log_file
 
1249
            else:
 
1250
                stderr = StringIO()
 
1251
        real_stdin = sys.stdin
 
1252
        real_stdout = sys.stdout
 
1253
        real_stderr = sys.stderr
 
1254
        try:
 
1255
            sys.stdout = stdout
 
1256
            sys.stderr = stderr
 
1257
            sys.stdin = stdin
 
1258
            return a_callable(*args, **kwargs)
 
1259
        finally:
 
1260
            sys.stdout = real_stdout
 
1261
            sys.stderr = real_stderr
 
1262
            sys.stdin = real_stdin
 
1263
 
 
1264
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1265
    def merge(self, branch_from, wt_to):
 
1266
        """A helper for tests to do a ui-less merge.
 
1267
 
 
1268
        This should move to the main library when someone has time to integrate
 
1269
        it in.
 
1270
        """
 
1271
        # minimal ui-less merge.
 
1272
        wt_to.branch.fetch(branch_from)
 
1273
        base_rev = common_ancestor(branch_from.last_revision(),
 
1274
                                   wt_to.branch.last_revision(),
 
1275
                                   wt_to.branch.repository)
 
1276
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1277
                    wt_to.branch.repository.revision_tree(base_rev),
 
1278
                    this_tree=wt_to)
 
1279
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1280
 
 
1281
 
 
1282
BzrTestBase = TestCase
 
1283
 
 
1284
 
 
1285
class TestCaseWithMemoryTransport(TestCase):
 
1286
    """Common test class for tests that do not need disk resources.
 
1287
 
 
1288
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1289
 
 
1290
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1291
 
 
1292
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1293
    a directory which does not exist. This serves to help ensure test isolation
 
1294
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1295
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1296
    file defaults for the transport in tests, nor does it obey the command line
 
1297
    override, so tests that accidentally write to the common directory should
 
1298
    be rare.
 
1299
    """
 
1300
 
 
1301
    TEST_ROOT = None
 
1302
    _TEST_NAME = 'test'
 
1303
 
 
1304
 
 
1305
    def __init__(self, methodName='runTest'):
 
1306
        # allow test parameterisation after test construction and before test
 
1307
        # execution. Variables that the parameteriser sets need to be 
 
1308
        # ones that are not set by setUp, or setUp will trash them.
 
1309
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1310
        self.transport_server = default_transport
 
1311
        self.transport_readonly_server = None
 
1312
 
 
1313
    def get_transport(self):
 
1314
        """Return a writeable transport for the test scratch space"""
 
1315
        t = get_transport(self.get_url())
 
1316
        self.assertFalse(t.is_readonly())
 
1317
        return t
 
1318
 
 
1319
    def get_readonly_transport(self):
 
1320
        """Return a readonly transport for the test scratch space
 
1321
        
 
1322
        This can be used to test that operations which should only need
 
1323
        readonly access in fact do not try to write.
 
1324
        """
 
1325
        t = get_transport(self.get_readonly_url())
 
1326
        self.assertTrue(t.is_readonly())
 
1327
        return t
 
1328
 
 
1329
    def create_transport_readonly_server(self):
 
1330
        """Create a transport server from class defined at init.
 
1331
 
 
1332
        This is mostly a hook for daughter classes.
 
1333
        """
 
1334
        return self.transport_readonly_server()
 
1335
 
 
1336
    def get_readonly_server(self):
 
1337
        """Get the server instance for the readonly transport
 
1338
 
 
1339
        This is useful for some tests with specific servers to do diagnostics.
 
1340
        """
 
1341
        if self.__readonly_server is None:
 
1342
            if self.transport_readonly_server is None:
 
1343
                # readonly decorator requested
 
1344
                # bring up the server
 
1345
                self.get_url()
 
1346
                self.__readonly_server = ReadonlyServer()
 
1347
                self.__readonly_server.setUp(self.__server)
 
1348
            else:
 
1349
                self.__readonly_server = self.create_transport_readonly_server()
 
1350
                self.__readonly_server.setUp()
 
1351
            self.addCleanup(self.__readonly_server.tearDown)
 
1352
        return self.__readonly_server
 
1353
 
 
1354
    def get_readonly_url(self, relpath=None):
 
1355
        """Get a URL for the readonly transport.
 
1356
 
 
1357
        This will either be backed by '.' or a decorator to the transport 
 
1358
        used by self.get_url()
 
1359
        relpath provides for clients to get a path relative to the base url.
 
1360
        These should only be downwards relative, not upwards.
 
1361
        """
 
1362
        base = self.get_readonly_server().get_url()
 
1363
        if relpath is not None:
 
1364
            if not base.endswith('/'):
 
1365
                base = base + '/'
 
1366
            base = base + relpath
 
1367
        return base
 
1368
 
 
1369
    def get_server(self):
 
1370
        """Get the read/write server instance.
 
1371
 
 
1372
        This is useful for some tests with specific servers that need
 
1373
        diagnostics.
 
1374
 
 
1375
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1376
        is no means to override it.
 
1377
        """
 
1378
        if self.__server is None:
 
1379
            self.__server = MemoryServer()
 
1380
            self.__server.setUp()
 
1381
            self.addCleanup(self.__server.tearDown)
 
1382
        return self.__server
 
1383
 
 
1384
    def get_url(self, relpath=None):
 
1385
        """Get a URL (or maybe a path) for the readwrite transport.
 
1386
 
 
1387
        This will either be backed by '.' or to an equivalent non-file based
 
1388
        facility.
 
1389
        relpath provides for clients to get a path relative to the base url.
 
1390
        These should only be downwards relative, not upwards.
 
1391
        """
 
1392
        base = self.get_server().get_url()
 
1393
        if relpath is not None and relpath != '.':
 
1394
            if not base.endswith('/'):
 
1395
                base = base + '/'
 
1396
            # XXX: Really base should be a url; we did after all call
 
1397
            # get_url()!  But sometimes it's just a path (from
 
1398
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1399
            # to a non-escaped local path.
 
1400
            if base.startswith('./') or base.startswith('/'):
 
1401
                base += relpath
 
1402
            else:
 
1403
                base += urlutils.escape(relpath)
 
1404
        return base
 
1405
 
 
1406
    def _make_test_root(self):
 
1407
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1408
            return
 
1409
        i = 0
 
1410
        while True:
 
1411
            root = u'test%04d.tmp' % i
 
1412
            try:
 
1413
                os.mkdir(root)
 
1414
            except OSError, e:
 
1415
                if e.errno == errno.EEXIST:
 
1416
                    i += 1
 
1417
                    continue
 
1418
                else:
 
1419
                    raise
 
1420
            # successfully created
 
1421
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1422
            break
 
1423
        # make a fake bzr directory there to prevent any tests propagating
 
1424
        # up onto the source directory's real branch
 
1425
        bzrdir.BzrDir.create_standalone_workingtree(
 
1426
            TestCaseWithMemoryTransport.TEST_ROOT)
 
1427
 
 
1428
    def makeAndChdirToTestDir(self):
 
1429
        """Create a temporary directories for this one test.
 
1430
        
 
1431
        This must set self.test_home_dir and self.test_dir and chdir to
 
1432
        self.test_dir.
 
1433
        
 
1434
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1435
        """
 
1436
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1437
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1438
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1439
        
 
1440
    def make_branch(self, relpath, format=None):
 
1441
        """Create a branch on the transport at relpath."""
 
1442
        repo = self.make_repository(relpath, format=format)
 
1443
        return repo.bzrdir.create_branch()
 
1444
 
 
1445
    def make_bzrdir(self, relpath, format=None):
 
1446
        try:
 
1447
            # might be a relative or absolute path
 
1448
            maybe_a_url = self.get_url(relpath)
 
1449
            segments = maybe_a_url.rsplit('/', 1)
 
1450
            t = get_transport(maybe_a_url)
 
1451
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1452
                try:
 
1453
                    t.mkdir('.')
 
1454
                except errors.FileExists:
 
1455
                    pass
 
1456
            if format is None:
 
1457
                format = 'default'
 
1458
            if isinstance(format, basestring):
 
1459
                format = bzrdir.format_registry.make_bzrdir(format)
 
1460
            return format.initialize_on_transport(t)
 
1461
        except errors.UninitializableFormat:
 
1462
            raise TestSkipped("Format %s is not initializable." % format)
 
1463
 
 
1464
    def make_repository(self, relpath, shared=False, format=None):
 
1465
        """Create a repository on our default transport at relpath."""
 
1466
        made_control = self.make_bzrdir(relpath, format=format)
 
1467
        return made_control.create_repository(shared=shared)
 
1468
 
 
1469
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1470
        """Create a branch on the default transport and a MemoryTree for it."""
 
1471
        b = self.make_branch(relpath, format=format)
 
1472
        return memorytree.MemoryTree.create_on_branch(b)
 
1473
 
 
1474
    def overrideEnvironmentForTesting(self):
 
1475
        os.environ['HOME'] = self.test_home_dir
 
1476
        os.environ['BZR_HOME'] = self.test_home_dir
 
1477
        
 
1478
    def setUp(self):
 
1479
        super(TestCaseWithMemoryTransport, self).setUp()
 
1480
        self._make_test_root()
 
1481
        _currentdir = os.getcwdu()
 
1482
        def _leaveDirectory():
 
1483
            os.chdir(_currentdir)
 
1484
        self.addCleanup(_leaveDirectory)
 
1485
        self.makeAndChdirToTestDir()
 
1486
        self.overrideEnvironmentForTesting()
 
1487
        self.__readonly_server = None
 
1488
        self.__server = None
 
1489
 
 
1490
     
 
1491
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1492
    """Derived class that runs a test within a temporary directory.
 
1493
 
 
1494
    This is useful for tests that need to create a branch, etc.
 
1495
 
 
1496
    The directory is created in a slightly complex way: for each
 
1497
    Python invocation, a new temporary top-level directory is created.
 
1498
    All test cases create their own directory within that.  If the
 
1499
    tests complete successfully, the directory is removed.
 
1500
 
 
1501
    InTempDir is an old alias for FunctionalTestCase.
 
1502
    """
 
1503
 
 
1504
    OVERRIDE_PYTHON = 'python'
 
1505
 
 
1506
    def check_file_contents(self, filename, expect):
 
1507
        self.log("check contents of file %s" % filename)
 
1508
        contents = file(filename, 'r').read()
 
1509
        if contents != expect:
 
1510
            self.log("expected: %r" % expect)
 
1511
            self.log("actually: %r" % contents)
 
1512
            self.fail("contents of %s not as expected" % filename)
 
1513
 
 
1514
    def makeAndChdirToTestDir(self):
 
1515
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1516
        
 
1517
        For TestCaseInTempDir we create a temporary directory based on the test
 
1518
        name and then create two subdirs - test and home under it.
 
1519
        """
 
1520
        # shorten the name, to avoid test failures due to path length
 
1521
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1522
                   .replace('__main__.', '')[-100:]
 
1523
        # it's possible the same test class is run several times for
 
1524
        # parameterized tests, so make sure the names don't collide.  
 
1525
        i = 0
 
1526
        while True:
 
1527
            if i > 0:
 
1528
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1529
            else:
 
1530
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1531
            if os.path.exists(candidate_dir):
 
1532
                i = i + 1
 
1533
                continue
 
1534
            else:
 
1535
                os.mkdir(candidate_dir)
 
1536
                self.test_home_dir = candidate_dir + '/home'
 
1537
                os.mkdir(self.test_home_dir)
 
1538
                self.test_dir = candidate_dir + '/work'
 
1539
                os.mkdir(self.test_dir)
 
1540
                os.chdir(self.test_dir)
 
1541
                break
 
1542
 
 
1543
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1544
        """Build a test tree according to a pattern.
 
1545
 
 
1546
        shape is a sequence of file specifications.  If the final
 
1547
        character is '/', a directory is created.
 
1548
 
 
1549
        This assumes that all the elements in the tree being built are new.
 
1550
 
 
1551
        This doesn't add anything to a branch.
 
1552
        :param line_endings: Either 'binary' or 'native'
 
1553
                             in binary mode, exact contents are written
 
1554
                             in native mode, the line endings match the
 
1555
                             default platform endings.
 
1556
 
 
1557
        :param transport: A transport to write to, for building trees on 
 
1558
                          VFS's. If the transport is readonly or None,
 
1559
                          "." is opened automatically.
 
1560
        """
 
1561
        # It's OK to just create them using forward slashes on windows.
 
1562
        if transport is None or transport.is_readonly():
 
1563
            transport = get_transport(".")
 
1564
        for name in shape:
 
1565
            self.assert_(isinstance(name, basestring))
 
1566
            if name[-1] == '/':
 
1567
                transport.mkdir(urlutils.escape(name[:-1]))
 
1568
            else:
 
1569
                if line_endings == 'binary':
 
1570
                    end = '\n'
 
1571
                elif line_endings == 'native':
 
1572
                    end = os.linesep
 
1573
                else:
 
1574
                    raise errors.BzrError(
 
1575
                        'Invalid line ending request %r' % line_endings)
 
1576
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1577
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1578
 
 
1579
    def build_tree_contents(self, shape):
 
1580
        build_tree_contents(shape)
 
1581
 
 
1582
    def assertFileEqual(self, content, path):
 
1583
        """Fail if path does not contain 'content'."""
 
1584
        self.failUnlessExists(path)
 
1585
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
1586
        self.assertEqualDiff(content, open(path, 'r').read())
 
1587
 
 
1588
    def failUnlessExists(self, path):
 
1589
        """Fail unless path, which may be abs or relative, exists."""
 
1590
        self.failUnless(osutils.lexists(path),path+" does not exist")
 
1591
 
 
1592
    def failIfExists(self, path):
 
1593
        """Fail if path, which may be abs or relative, exists."""
 
1594
        self.failIf(osutils.lexists(path),path+" exists")
 
1595
 
 
1596
 
 
1597
class TestCaseWithTransport(TestCaseInTempDir):
 
1598
    """A test case that provides get_url and get_readonly_url facilities.
 
1599
 
 
1600
    These back onto two transport servers, one for readonly access and one for
 
1601
    read write access.
 
1602
 
 
1603
    If no explicit class is provided for readonly access, a
 
1604
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1605
    based read write transports.
 
1606
 
 
1607
    If an explicit class is provided for readonly access, that server and the 
 
1608
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1609
    """
 
1610
 
 
1611
    def create_transport_server(self):
 
1612
        """Create a transport server from class defined at init.
 
1613
 
 
1614
        This is mostly a hook for daughter classes.
 
1615
        """
 
1616
        return self.transport_server()
 
1617
 
 
1618
    def get_server(self):
 
1619
        """See TestCaseWithMemoryTransport.
 
1620
 
 
1621
        This is useful for some tests with specific servers that need
 
1622
        diagnostics.
 
1623
        """
 
1624
        if self.__server is None:
 
1625
            self.__server = self.create_transport_server()
 
1626
            self.__server.setUp()
 
1627
            self.addCleanup(self.__server.tearDown)
 
1628
        return self.__server
 
1629
 
 
1630
    def make_branch_and_tree(self, relpath, format=None):
 
1631
        """Create a branch on the transport and a tree locally.
 
1632
 
 
1633
        If the transport is not a LocalTransport, the Tree can't be created on
 
1634
        the transport.  In that case the working tree is created in the local
 
1635
        directory, and the returned tree's branch and repository will also be
 
1636
        accessed locally.
 
1637
 
 
1638
        This will fail if the original default transport for this test
 
1639
        case wasn't backed by the working directory, as the branch won't
 
1640
        be on disk for us to open it.  
 
1641
 
 
1642
        :param format: The BzrDirFormat.
 
1643
        :returns: the WorkingTree.
 
1644
        """
 
1645
        # TODO: always use the local disk path for the working tree,
 
1646
        # this obviously requires a format that supports branch references
 
1647
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1648
        # RBC 20060208
 
1649
        b = self.make_branch(relpath, format=format)
 
1650
        try:
 
1651
            return b.bzrdir.create_workingtree()
 
1652
        except errors.NotLocalUrl:
 
1653
            # We can only make working trees locally at the moment.  If the
 
1654
            # transport can't support them, then reopen the branch on a local
 
1655
            # transport, and create the working tree there.  
 
1656
            #
 
1657
            # Possibly we should instead keep
 
1658
            # the non-disk-backed branch and create a local checkout?
 
1659
            bd = bzrdir.BzrDir.open(relpath)
 
1660
            return bd.create_workingtree()
 
1661
 
 
1662
    def assertIsDirectory(self, relpath, transport):
 
1663
        """Assert that relpath within transport is a directory.
 
1664
 
 
1665
        This may not be possible on all transports; in that case it propagates
 
1666
        a TransportNotPossible.
 
1667
        """
 
1668
        try:
 
1669
            mode = transport.stat(relpath).st_mode
 
1670
        except errors.NoSuchFile:
 
1671
            self.fail("path %s is not a directory; no such file"
 
1672
                      % (relpath))
 
1673
        if not stat.S_ISDIR(mode):
 
1674
            self.fail("path %s is not a directory; has mode %#o"
 
1675
                      % (relpath, mode))
 
1676
 
 
1677
    def assertTreesEqual(self, left, right):
 
1678
        """Check that left and right have the same content and properties."""
 
1679
        # we use a tree delta to check for equality of the content, and we
 
1680
        # manually check for equality of other things such as the parents list.
 
1681
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
1682
        differences = left.changes_from(right)
 
1683
        self.assertFalse(differences.has_changed(),
 
1684
            "Trees %r and %r are different: %r" % (left, right, differences))
 
1685
 
 
1686
    def setUp(self):
 
1687
        super(TestCaseWithTransport, self).setUp()
 
1688
        self.__server = None
 
1689
 
 
1690
 
 
1691
class ChrootedTestCase(TestCaseWithTransport):
 
1692
    """A support class that provides readonly urls outside the local namespace.
 
1693
 
 
1694
    This is done by checking if self.transport_server is a MemoryServer. if it
 
1695
    is then we are chrooted already, if it is not then an HttpServer is used
 
1696
    for readonly urls.
 
1697
 
 
1698
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
1699
                       be used without needed to redo it when a different 
 
1700
                       subclass is in use ?
 
1701
    """
 
1702
 
 
1703
    def setUp(self):
 
1704
        super(ChrootedTestCase, self).setUp()
 
1705
        if not self.transport_server == MemoryServer:
 
1706
            self.transport_readonly_server = HttpServer
 
1707
 
 
1708
 
 
1709
def filter_suite_by_re(suite, pattern):
 
1710
    result = TestUtil.TestSuite()
 
1711
    filter_re = re.compile(pattern)
 
1712
    for test in iter_suite_tests(suite):
 
1713
        if filter_re.search(test.id()):
 
1714
            result.addTest(test)
 
1715
    return result
 
1716
 
 
1717
 
 
1718
def sort_suite_by_re(suite, pattern):
 
1719
    first = []
 
1720
    second = []
 
1721
    filter_re = re.compile(pattern)
 
1722
    for test in iter_suite_tests(suite):
 
1723
        if filter_re.search(test.id()):
 
1724
            first.append(test)
 
1725
        else:
 
1726
            second.append(test)
 
1727
    return TestUtil.TestSuite(first + second)
 
1728
 
 
1729
 
 
1730
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
1731
              stop_on_failure=False, keep_output=False,
 
1732
              transport=None, lsprof_timed=None, bench_history=None,
 
1733
              matching_tests_first=None):
 
1734
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
1735
    if verbose:
 
1736
        verbosity = 2
 
1737
    else:
 
1738
        verbosity = 1
 
1739
    runner = TextTestRunner(stream=sys.stdout,
 
1740
                            descriptions=0,
 
1741
                            verbosity=verbosity,
 
1742
                            keep_output=keep_output,
 
1743
                            bench_history=bench_history)
 
1744
    runner.stop_on_failure=stop_on_failure
 
1745
    if pattern != '.*':
 
1746
        if matching_tests_first:
 
1747
            suite = sort_suite_by_re(suite, pattern)
 
1748
        else:
 
1749
            suite = filter_suite_by_re(suite, pattern)
 
1750
    result = runner.run(suite)
 
1751
    return result.wasSuccessful()
 
1752
 
 
1753
 
 
1754
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
1755
             keep_output=False,
 
1756
             transport=None,
 
1757
             test_suite_factory=None,
 
1758
             lsprof_timed=None,
 
1759
             bench_history=None,
 
1760
             matching_tests_first=None):
 
1761
    """Run the whole test suite under the enhanced runner"""
 
1762
    # XXX: Very ugly way to do this...
 
1763
    # Disable warning about old formats because we don't want it to disturb
 
1764
    # any blackbox tests.
 
1765
    from bzrlib import repository
 
1766
    repository._deprecation_warning_done = True
 
1767
 
 
1768
    global default_transport
 
1769
    if transport is None:
 
1770
        transport = default_transport
 
1771
    old_transport = default_transport
 
1772
    default_transport = transport
 
1773
    try:
 
1774
        if test_suite_factory is None:
 
1775
            suite = test_suite()
 
1776
        else:
 
1777
            suite = test_suite_factory()
 
1778
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
1779
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
1780
                     transport=transport,
 
1781
                     lsprof_timed=lsprof_timed,
 
1782
                     bench_history=bench_history,
 
1783
                     matching_tests_first=matching_tests_first)
 
1784
    finally:
 
1785
        default_transport = old_transport
 
1786
 
 
1787
 
 
1788
def test_suite():
 
1789
    """Build and return TestSuite for the whole of bzrlib.
 
1790
    
 
1791
    This function can be replaced if you need to change the default test
 
1792
    suite on a global basis, but it is not encouraged.
 
1793
    """
 
1794
    testmod_names = [
 
1795
                   'bzrlib.tests.test_ancestry',
 
1796
                   'bzrlib.tests.test_annotate',
 
1797
                   'bzrlib.tests.test_api',
 
1798
                   'bzrlib.tests.test_atomicfile',
 
1799
                   'bzrlib.tests.test_bad_files',
 
1800
                   'bzrlib.tests.test_branch',
 
1801
                   'bzrlib.tests.test_bundle',
 
1802
                   'bzrlib.tests.test_bzrdir',
 
1803
                   'bzrlib.tests.test_cache_utf8',
 
1804
                   'bzrlib.tests.test_commands',
 
1805
                   'bzrlib.tests.test_commit',
 
1806
                   'bzrlib.tests.test_commit_merge',
 
1807
                   'bzrlib.tests.test_config',
 
1808
                   'bzrlib.tests.test_conflicts',
 
1809
                   'bzrlib.tests.test_decorators',
 
1810
                   'bzrlib.tests.test_delta',
 
1811
                   'bzrlib.tests.test_diff',
 
1812
                   'bzrlib.tests.test_dirstate',
 
1813
                   'bzrlib.tests.test_doc_generate',
 
1814
                   'bzrlib.tests.test_errors',
 
1815
                   'bzrlib.tests.test_escaped_store',
 
1816
                   'bzrlib.tests.test_extract',
 
1817
                   'bzrlib.tests.test_fetch',
 
1818
                   'bzrlib.tests.test_ftp_transport',
 
1819
                   'bzrlib.tests.test_generate_docs',
 
1820
                   'bzrlib.tests.test_generate_ids',
 
1821
                   'bzrlib.tests.test_globbing',
 
1822
                   'bzrlib.tests.test_gpg',
 
1823
                   'bzrlib.tests.test_graph',
 
1824
                   'bzrlib.tests.test_hashcache',
 
1825
                   'bzrlib.tests.test_http',
 
1826
                   'bzrlib.tests.test_http_response',
 
1827
                   'bzrlib.tests.test_https_ca_bundle',
 
1828
                   'bzrlib.tests.test_identitymap',
 
1829
                   'bzrlib.tests.test_ignores',
 
1830
                   'bzrlib.tests.test_inv',
 
1831
                   'bzrlib.tests.test_knit',
 
1832
                   'bzrlib.tests.test_lazy_import',
 
1833
                   'bzrlib.tests.test_lazy_regex',
 
1834
                   'bzrlib.tests.test_lockdir',
 
1835
                   'bzrlib.tests.test_lockable_files',
 
1836
                   'bzrlib.tests.test_log',
 
1837
                   'bzrlib.tests.test_memorytree',
 
1838
                   'bzrlib.tests.test_merge',
 
1839
                   'bzrlib.tests.test_merge3',
 
1840
                   'bzrlib.tests.test_merge_core',
 
1841
                   'bzrlib.tests.test_missing',
 
1842
                   'bzrlib.tests.test_msgeditor',
 
1843
                   'bzrlib.tests.test_nonascii',
 
1844
                   'bzrlib.tests.test_options',
 
1845
                   'bzrlib.tests.test_osutils',
 
1846
                   'bzrlib.tests.test_osutils_encodings',
 
1847
                   'bzrlib.tests.test_patch',
 
1848
                   'bzrlib.tests.test_patches',
 
1849
                   'bzrlib.tests.test_permissions',
 
1850
                   'bzrlib.tests.test_plugins',
 
1851
                   'bzrlib.tests.test_progress',
 
1852
                   'bzrlib.tests.test_reconcile',
 
1853
                   'bzrlib.tests.test_registry',
 
1854
                   'bzrlib.tests.test_repository',
 
1855
                   'bzrlib.tests.test_revert',
 
1856
                   'bzrlib.tests.test_revision',
 
1857
                   'bzrlib.tests.test_revisionnamespaces',
 
1858
                   'bzrlib.tests.test_revisiontree',
 
1859
                   'bzrlib.tests.test_rio',
 
1860
                   'bzrlib.tests.test_sampler',
 
1861
                   'bzrlib.tests.test_selftest',
 
1862
                   'bzrlib.tests.test_setup',
 
1863
                   'bzrlib.tests.test_sftp_transport',
 
1864
                   'bzrlib.tests.test_smart_add',
 
1865
                   'bzrlib.tests.test_smart_transport',
 
1866
                   'bzrlib.tests.test_source',
 
1867
                   'bzrlib.tests.test_status',
 
1868
                   'bzrlib.tests.test_store',
 
1869
                   'bzrlib.tests.test_subsume',
 
1870
                   'bzrlib.tests.test_symbol_versioning',
 
1871
                   'bzrlib.tests.test_tag',
 
1872
                   'bzrlib.tests.test_testament',
 
1873
                   'bzrlib.tests.test_textfile',
 
1874
                   'bzrlib.tests.test_textmerge',
 
1875
                   'bzrlib.tests.test_trace',
 
1876
                   'bzrlib.tests.test_transactions',
 
1877
                   'bzrlib.tests.test_transform',
 
1878
                   'bzrlib.tests.test_transport',
 
1879
                   'bzrlib.tests.test_tree',
 
1880
                   'bzrlib.tests.test_treebuilder',
 
1881
                   'bzrlib.tests.test_tsort',
 
1882
                   'bzrlib.tests.test_tuned_gzip',
 
1883
                   'bzrlib.tests.test_ui',
 
1884
                   'bzrlib.tests.test_upgrade',
 
1885
                   'bzrlib.tests.test_urlutils',
 
1886
                   'bzrlib.tests.test_versionedfile',
 
1887
                   'bzrlib.tests.test_version',
 
1888
                   'bzrlib.tests.test_version_info',
 
1889
                   'bzrlib.tests.test_weave',
 
1890
                   'bzrlib.tests.test_whitebox',
 
1891
                   'bzrlib.tests.test_workingtree',
 
1892
                   'bzrlib.tests.test_workingtree_4',
 
1893
                   'bzrlib.tests.test_wsgi',
 
1894
                   'bzrlib.tests.test_xml',
 
1895
                   ]
 
1896
    test_transport_implementations = [
 
1897
        'bzrlib.tests.test_transport_implementations',
 
1898
        'bzrlib.tests.test_read_bundle',
 
1899
        ]
 
1900
    suite = TestUtil.TestSuite()
 
1901
    loader = TestUtil.TestLoader()
 
1902
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
1903
    from bzrlib.transport import TransportTestProviderAdapter
 
1904
    adapter = TransportTestProviderAdapter()
 
1905
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1906
    for package in packages_to_test():
 
1907
        suite.addTest(package.test_suite())
 
1908
    for m in MODULES_TO_TEST:
 
1909
        suite.addTest(loader.loadTestsFromModule(m))
 
1910
    for m in MODULES_TO_DOCTEST:
 
1911
        try:
 
1912
            suite.addTest(doctest.DocTestSuite(m))
 
1913
        except ValueError, e:
 
1914
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
1915
            raise
 
1916
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1917
        if getattr(plugin, 'test_suite', None) is not None:
 
1918
            default_encoding = sys.getdefaultencoding()
 
1919
            try:
 
1920
                plugin_suite = plugin.test_suite()
 
1921
            except ImportError, e:
 
1922
                bzrlib.trace.warning(
 
1923
                    'Unable to test plugin "%s": %s', name, e)
 
1924
            else:
 
1925
                suite.addTest(plugin_suite)
 
1926
            if default_encoding != sys.getdefaultencoding():
 
1927
                bzrlib.trace.warning(
 
1928
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
1929
                    sys.getdefaultencoding())
 
1930
                reload(sys)
 
1931
                sys.setdefaultencoding(default_encoding)
 
1932
    return suite
 
1933
 
 
1934
 
 
1935
def adapt_modules(mods_list, adapter, loader, suite):
 
1936
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1937
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
1938
        suite.addTests(adapter.adapt(test))
 
1939
 
 
1940
 
 
1941
def clean_selftest_output(root=None, quiet=False):
 
1942
    """Remove all selftest output directories from root directory.
 
1943
 
 
1944
    :param  root:   root directory for clean
 
1945
                    (if ommitted or None then clean current directory).
 
1946
    :param  quiet:  suppress report about deleting directories
 
1947
    """
 
1948
    import re
 
1949
    import shutil
 
1950
 
 
1951
    re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
 
1952
    if root is None:
 
1953
        root = u'.'
 
1954
    for i in os.listdir(root):
 
1955
        if os.path.isdir(i) and re_dir.match(i):
 
1956
            if not quiet:
 
1957
                print 'delete directory:', i
 
1958
            shutil.rmtree(i)