/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2006-05-10 19:59:55 UTC
  • mfrom: (1704 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060510195955-df080afb1daa3a96
[merge] bzr.dev 1704

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import errno
 
33
import logging
 
34
import os
 
35
import re
 
36
import stat
 
37
import sys
 
38
import tempfile
 
39
import unittest
 
40
import time
 
41
 
 
42
 
 
43
import bzrlib.branch
 
44
import bzrlib.bzrdir as bzrdir
 
45
import bzrlib.commands
 
46
import bzrlib.errors as errors
 
47
import bzrlib.inventory
 
48
import bzrlib.iterablefile
 
49
import bzrlib.lockdir
 
50
from bzrlib.merge import merge_inner
 
51
import bzrlib.merge3
 
52
import bzrlib.osutils
 
53
import bzrlib.osutils as osutils
 
54
import bzrlib.plugin
 
55
from bzrlib.revision import common_ancestor
 
56
import bzrlib.store
 
57
import bzrlib.trace
 
58
from bzrlib.transport import get_transport
 
59
import bzrlib.transport
 
60
from bzrlib.transport.local import LocalRelpathServer
 
61
from bzrlib.transport.readonly import ReadonlyServer
 
62
from bzrlib.trace import mutter
 
63
from bzrlib.tests.TestUtil import TestLoader, TestSuite
 
64
from bzrlib.tests.treeshape import build_tree_contents
 
65
import bzrlib.urlutils as urlutils
 
66
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
67
 
 
68
default_transport = LocalRelpathServer
 
69
 
 
70
MODULES_TO_TEST = []
 
71
MODULES_TO_DOCTEST = [
 
72
                      bzrlib.branch,
 
73
                      bzrlib.commands,
 
74
                      bzrlib.errors,
 
75
                      bzrlib.inventory,
 
76
                      bzrlib.iterablefile,
 
77
                      bzrlib.lockdir,
 
78
                      bzrlib.merge3,
 
79
                      bzrlib.option,
 
80
                      bzrlib.osutils,
 
81
                      bzrlib.store
 
82
                      ]
 
83
def packages_to_test():
 
84
    """Return a list of packages to test.
 
85
 
 
86
    The packages are not globally imported so that import failures are
 
87
    triggered when running selftest, not when importing the command.
 
88
    """
 
89
    import bzrlib.doc
 
90
    import bzrlib.tests.blackbox
 
91
    import bzrlib.tests.branch_implementations
 
92
    import bzrlib.tests.bzrdir_implementations
 
93
    import bzrlib.tests.interrepository_implementations
 
94
    import bzrlib.tests.interversionedfile_implementations
 
95
    import bzrlib.tests.repository_implementations
 
96
    import bzrlib.tests.revisionstore_implementations
 
97
    import bzrlib.tests.workingtree_implementations
 
98
    return [
 
99
            bzrlib.doc,
 
100
            bzrlib.tests.blackbox,
 
101
            bzrlib.tests.branch_implementations,
 
102
            bzrlib.tests.bzrdir_implementations,
 
103
            bzrlib.tests.interrepository_implementations,
 
104
            bzrlib.tests.interversionedfile_implementations,
 
105
            bzrlib.tests.repository_implementations,
 
106
            bzrlib.tests.revisionstore_implementations,
 
107
            bzrlib.tests.workingtree_implementations,
 
108
            ]
 
109
 
 
110
 
 
111
class _MyResult(unittest._TextTestResult):
 
112
    """Custom TestResult.
 
113
 
 
114
    Shows output in a different format, including displaying runtime for tests.
 
115
    """
 
116
    stop_early = False
 
117
 
 
118
    def _elapsedTime(self):
 
119
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
120
 
 
121
    def startTest(self, test):
 
122
        unittest.TestResult.startTest(self, test)
 
123
        # In a short description, the important words are in
 
124
        # the beginning, but in an id, the important words are
 
125
        # at the end
 
126
        SHOW_DESCRIPTIONS = False
 
127
        if self.showAll:
 
128
            width = osutils.terminal_width()
 
129
            name_width = width - 15
 
130
            what = None
 
131
            if SHOW_DESCRIPTIONS:
 
132
                what = test.shortDescription()
 
133
                if what:
 
134
                    if len(what) > name_width:
 
135
                        what = what[:name_width-3] + '...'
 
136
            if what is None:
 
137
                what = test.id()
 
138
                if what.startswith('bzrlib.tests.'):
 
139
                    what = what[13:]
 
140
                if len(what) > name_width:
 
141
                    what = '...' + what[3-name_width:]
 
142
            what = what.ljust(name_width)
 
143
            self.stream.write(what)
 
144
        self.stream.flush()
 
145
        self._start_time = time.time()
 
146
 
 
147
    def addError(self, test, err):
 
148
        if isinstance(err[1], TestSkipped):
 
149
            return self.addSkipped(test, err)    
 
150
        unittest.TestResult.addError(self, test, err)
 
151
        if self.showAll:
 
152
            self.stream.writeln("ERROR %s" % self._elapsedTime())
 
153
        elif self.dots:
 
154
            self.stream.write('E')
 
155
        self.stream.flush()
 
156
        if self.stop_early:
 
157
            self.stop()
 
158
 
 
159
    def addFailure(self, test, err):
 
160
        unittest.TestResult.addFailure(self, test, err)
 
161
        if self.showAll:
 
162
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
 
163
        elif self.dots:
 
164
            self.stream.write('F')
 
165
        self.stream.flush()
 
166
        if self.stop_early:
 
167
            self.stop()
 
168
 
 
169
    def addSuccess(self, test):
 
170
        if self.showAll:
 
171
            self.stream.writeln('   OK %s' % self._elapsedTime())
 
172
        elif self.dots:
 
173
            self.stream.write('~')
 
174
        self.stream.flush()
 
175
        unittest.TestResult.addSuccess(self, test)
 
176
 
 
177
    def addSkipped(self, test, skip_excinfo):
 
178
        if self.showAll:
 
179
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
 
180
            print >>self.stream, '     %s' % skip_excinfo[1]
 
181
        elif self.dots:
 
182
            self.stream.write('S')
 
183
        self.stream.flush()
 
184
        # seems best to treat this as success from point-of-view of unittest
 
185
        # -- it actually does nothing so it barely matters :)
 
186
        unittest.TestResult.addSuccess(self, test)
 
187
 
 
188
    def printErrorList(self, flavour, errors):
 
189
        for test, err in errors:
 
190
            self.stream.writeln(self.separator1)
 
191
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
192
            if getattr(test, '_get_log', None) is not None:
 
193
                print >>self.stream
 
194
                print >>self.stream, \
 
195
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
196
                print >>self.stream, test._get_log()
 
197
                print >>self.stream, \
 
198
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
199
            self.stream.writeln(self.separator2)
 
200
            self.stream.writeln("%s" % err)
 
201
 
 
202
 
 
203
class TextTestRunner(unittest.TextTestRunner):
 
204
    stop_on_failure = False
 
205
 
 
206
    def _makeResult(self):
 
207
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
208
        result.stop_early = self.stop_on_failure
 
209
        return result
 
210
 
 
211
 
 
212
def iter_suite_tests(suite):
 
213
    """Return all tests in a suite, recursing through nested suites"""
 
214
    for item in suite._tests:
 
215
        if isinstance(item, unittest.TestCase):
 
216
            yield item
 
217
        elif isinstance(item, unittest.TestSuite):
 
218
            for r in iter_suite_tests(item):
 
219
                yield r
 
220
        else:
 
221
            raise Exception('unknown object %r inside test suite %r'
 
222
                            % (item, suite))
 
223
 
 
224
 
 
225
class TestSkipped(Exception):
 
226
    """Indicates that a test was intentionally skipped, rather than failing."""
 
227
    # XXX: Not used yet
 
228
 
 
229
 
 
230
class CommandFailed(Exception):
 
231
    pass
 
232
 
 
233
 
 
234
class StringIOWrapper(object):
 
235
    """A wrapper around cStringIO which just adds an encoding attribute.
 
236
    
 
237
    Internally we can check sys.stdout to see what the output encoding
 
238
    should be. However, cStringIO has no encoding attribute that we can
 
239
    set. So we wrap it instead.
 
240
    """
 
241
    encoding='ascii'
 
242
    _cstring = None
 
243
 
 
244
    def __init__(self, s=None):
 
245
        if s is not None:
 
246
            self.__dict__['_cstring'] = StringIO(s)
 
247
        else:
 
248
            self.__dict__['_cstring'] = StringIO()
 
249
 
 
250
    def __getattr__(self, name, getattr=getattr):
 
251
        return getattr(self.__dict__['_cstring'], name)
 
252
 
 
253
    def __setattr__(self, name, val):
 
254
        if name == 'encoding':
 
255
            self.__dict__['encoding'] = val
 
256
        else:
 
257
            return setattr(self._cstring, name, val)
 
258
 
 
259
 
 
260
class TestCase(unittest.TestCase):
 
261
    """Base class for bzr unit tests.
 
262
    
 
263
    Tests that need access to disk resources should subclass 
 
264
    TestCaseInTempDir not TestCase.
 
265
 
 
266
    Error and debug log messages are redirected from their usual
 
267
    location into a temporary file, the contents of which can be
 
268
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
269
    so that it can also capture file IO.  When the test completes this file
 
270
    is read into memory and removed from disk.
 
271
       
 
272
    There are also convenience functions to invoke bzr's command-line
 
273
    routine, and to build and check bzr trees.
 
274
   
 
275
    In addition to the usual method of overriding tearDown(), this class also
 
276
    allows subclasses to register functions into the _cleanups list, which is
 
277
    run in order as the object is torn down.  It's less likely this will be
 
278
    accidentally overlooked.
 
279
    """
 
280
 
 
281
    BZRPATH = 'bzr'
 
282
    _log_file_name = None
 
283
    _log_contents = ''
 
284
 
 
285
    def __init__(self, methodName='testMethod'):
 
286
        super(TestCase, self).__init__(methodName)
 
287
        self._cleanups = []
 
288
 
 
289
    def setUp(self):
 
290
        unittest.TestCase.setUp(self)
 
291
        self._cleanEnvironment()
 
292
        bzrlib.trace.disable_default_logging()
 
293
        self._startLogFile()
 
294
 
 
295
    def _ndiff_strings(self, a, b):
 
296
        """Return ndiff between two strings containing lines.
 
297
        
 
298
        A trailing newline is added if missing to make the strings
 
299
        print properly."""
 
300
        if b and b[-1] != '\n':
 
301
            b += '\n'
 
302
        if a and a[-1] != '\n':
 
303
            a += '\n'
 
304
        difflines = difflib.ndiff(a.splitlines(True),
 
305
                                  b.splitlines(True),
 
306
                                  linejunk=lambda x: False,
 
307
                                  charjunk=lambda x: False)
 
308
        return ''.join(difflines)
 
309
 
 
310
    def assertEqualDiff(self, a, b, message=None):
 
311
        """Assert two texts are equal, if not raise an exception.
 
312
        
 
313
        This is intended for use with multi-line strings where it can 
 
314
        be hard to find the differences by eye.
 
315
        """
 
316
        # TODO: perhaps override assertEquals to call this for strings?
 
317
        if a == b:
 
318
            return
 
319
        if message is None:
 
320
            message = "texts not equal:\n"
 
321
        raise AssertionError(message + 
 
322
                             self._ndiff_strings(a, b))      
 
323
        
 
324
    def assertEqualMode(self, mode, mode_test):
 
325
        self.assertEqual(mode, mode_test,
 
326
                         'mode mismatch %o != %o' % (mode, mode_test))
 
327
 
 
328
    def assertStartsWith(self, s, prefix):
 
329
        if not s.startswith(prefix):
 
330
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
331
 
 
332
    def assertEndsWith(self, s, suffix):
 
333
        """Asserts that s ends with suffix."""
 
334
        if not s.endswith(suffix):
 
335
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
336
 
 
337
    def assertContainsRe(self, haystack, needle_re):
 
338
        """Assert that a contains something matching a regular expression."""
 
339
        if not re.search(needle_re, haystack):
 
340
            raise AssertionError('pattern "%s" not found in "%s"'
 
341
                    % (needle_re, haystack))
 
342
 
 
343
    def assertSubset(self, sublist, superlist):
 
344
        """Assert that every entry in sublist is present in superlist."""
 
345
        missing = []
 
346
        for entry in sublist:
 
347
            if entry not in superlist:
 
348
                missing.append(entry)
 
349
        if len(missing) > 0:
 
350
            raise AssertionError("value(s) %r not present in container %r" % 
 
351
                                 (missing, superlist))
 
352
 
 
353
    def assertIs(self, left, right):
 
354
        if not (left is right):
 
355
            raise AssertionError("%r is not %r." % (left, right))
 
356
 
 
357
    def assertTransportMode(self, transport, path, mode):
 
358
        """Fail if a path does not have mode mode.
 
359
        
 
360
        If modes are not supported on this transport, the assertion is ignored.
 
361
        """
 
362
        if not transport._can_roundtrip_unix_modebits():
 
363
            return
 
364
        path_stat = transport.stat(path)
 
365
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
366
        self.assertEqual(mode, actual_mode,
 
367
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
368
 
 
369
    def assertIsInstance(self, obj, kls):
 
370
        """Fail if obj is not an instance of kls"""
 
371
        if not isinstance(obj, kls):
 
372
            self.fail("%r is an instance of %s rather than %s" % (
 
373
                obj, obj.__class__, kls))
 
374
 
 
375
    def _startLogFile(self):
 
376
        """Send bzr and test log messages to a temporary file.
 
377
 
 
378
        The file is removed as the test is torn down.
 
379
        """
 
380
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
381
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
 
382
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
 
383
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
384
        self._log_file_name = name
 
385
        self.addCleanup(self._finishLogFile)
 
386
 
 
387
    def _finishLogFile(self):
 
388
        """Finished with the log file.
 
389
 
 
390
        Read contents into memory, close, and delete.
 
391
        """
 
392
        bzrlib.trace.disable_test_log(self._log_nonce)
 
393
        self._log_file.seek(0)
 
394
        self._log_contents = self._log_file.read()
 
395
        self._log_file.close()
 
396
        os.remove(self._log_file_name)
 
397
        self._log_file = self._log_file_name = None
 
398
 
 
399
    def addCleanup(self, callable):
 
400
        """Arrange to run a callable when this case is torn down.
 
401
 
 
402
        Callables are run in the reverse of the order they are registered, 
 
403
        ie last-in first-out.
 
404
        """
 
405
        if callable in self._cleanups:
 
406
            raise ValueError("cleanup function %r already registered on %s" 
 
407
                    % (callable, self))
 
408
        self._cleanups.append(callable)
 
409
 
 
410
    def _cleanEnvironment(self):
 
411
        new_env = {
 
412
            'HOME': os.getcwd(),
 
413
            'APPDATA': os.getcwd(),
 
414
            'BZREMAIL': None,
 
415
            'EMAIL': None,
 
416
        }
 
417
        self.__old_env = {}
 
418
        self.addCleanup(self._restoreEnvironment)
 
419
        for name, value in new_env.iteritems():
 
420
            self._captureVar(name, value)
 
421
 
 
422
 
 
423
    def _captureVar(self, name, newvalue):
 
424
        """Set an environment variable, preparing it to be reset when finished."""
 
425
        self.__old_env[name] = os.environ.get(name, None)
 
426
        if newvalue is None:
 
427
            if name in os.environ:
 
428
                del os.environ[name]
 
429
        else:
 
430
            os.environ[name] = newvalue
 
431
 
 
432
    @staticmethod
 
433
    def _restoreVar(name, value):
 
434
        if value is None:
 
435
            if name in os.environ:
 
436
                del os.environ[name]
 
437
        else:
 
438
            os.environ[name] = value
 
439
 
 
440
    def _restoreEnvironment(self):
 
441
        for name, value in self.__old_env.iteritems():
 
442
            self._restoreVar(name, value)
 
443
 
 
444
    def tearDown(self):
 
445
        self._runCleanups()
 
446
        unittest.TestCase.tearDown(self)
 
447
 
 
448
    def _runCleanups(self):
 
449
        """Run registered cleanup functions. 
 
450
 
 
451
        This should only be called from TestCase.tearDown.
 
452
        """
 
453
        # TODO: Perhaps this should keep running cleanups even if 
 
454
        # one of them fails?
 
455
        for cleanup_fn in reversed(self._cleanups):
 
456
            cleanup_fn()
 
457
 
 
458
    def log(self, *args):
 
459
        mutter(*args)
 
460
 
 
461
    def _get_log(self):
 
462
        """Return as a string the log for this test"""
 
463
        if self._log_file_name:
 
464
            return open(self._log_file_name).read()
 
465
        else:
 
466
            return self._log_contents
 
467
        # TODO: Delete the log after it's been read in
 
468
 
 
469
    def capture(self, cmd, retcode=0):
 
470
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
471
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
472
 
 
473
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
 
474
        """Invoke bzr and return (stdout, stderr).
 
475
 
 
476
        Useful for code that wants to check the contents of the
 
477
        output, the way error messages are presented, etc.
 
478
 
 
479
        This should be the main method for tests that want to exercise the
 
480
        overall behavior of the bzr application (rather than a unit test
 
481
        or a functional test of the library.)
 
482
 
 
483
        Much of the old code runs bzr by forking a new copy of Python, but
 
484
        that is slower, harder to debug, and generally not necessary.
 
485
 
 
486
        This runs bzr through the interface that catches and reports
 
487
        errors, and with logging set to something approximating the
 
488
        default, so that error reporting can be checked.
 
489
 
 
490
        :param argv: arguments to invoke bzr
 
491
        :param retcode: expected return code, or None for don't-care.
 
492
        :param encoding: encoding for sys.stdout and sys.stderr
 
493
        :param stdin: A string to be used as stdin for the command.
 
494
        """
 
495
        if encoding is None:
 
496
            encoding = bzrlib.user_encoding
 
497
        if stdin is not None:
 
498
            stdin = StringIO(stdin)
 
499
        stdout = StringIOWrapper()
 
500
        stderr = StringIOWrapper()
 
501
        stdout.encoding = encoding
 
502
        stderr.encoding = encoding
 
503
 
 
504
        self.log('run bzr: %r', argv)
 
505
        # FIXME: don't call into logging here
 
506
        handler = logging.StreamHandler(stderr)
 
507
        handler.setFormatter(bzrlib.trace.QuietFormatter())
 
508
        handler.setLevel(logging.INFO)
 
509
        logger = logging.getLogger('')
 
510
        logger.addHandler(handler)
 
511
        old_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
512
        bzrlib.ui.ui_factory.stdin = stdin
 
513
        try:
 
514
            result = self.apply_redirected(stdin, stdout, stderr,
 
515
                                           bzrlib.commands.run_bzr_catch_errors,
 
516
                                           argv)
 
517
        finally:
 
518
            logger.removeHandler(handler)
 
519
            bzrlib.ui.ui_factory.stdin = old_stdin
 
520
        # TODO: jam 20060105 Because we theoretically know the encoding
 
521
        #       of stdout and stderr, we could decode them at this time
 
522
        #       but for now, we will assume that the output of all
 
523
        #       functions
 
524
        out = stdout.getvalue()
 
525
        err = stderr.getvalue()
 
526
        if out:
 
527
            self.log('output:\n%r', out)
 
528
        if err:
 
529
            self.log('errors:\n%r', err)
 
530
        if retcode is not None:
 
531
            self.assertEquals(retcode, result)
 
532
        return out, err
 
533
 
 
534
    def run_bzr(self, *args, **kwargs):
 
535
        """Invoke bzr, as if it were run from the command line.
 
536
 
 
537
        This should be the main method for tests that want to exercise the
 
538
        overall behavior of the bzr application (rather than a unit test
 
539
        or a functional test of the library.)
 
540
 
 
541
        This sends the stdout/stderr results into the test's log,
 
542
        where it may be useful for debugging.  See also run_captured.
 
543
 
 
544
        :param stdin: A string to be used as stdin for the command.
 
545
        """
 
546
        retcode = kwargs.pop('retcode', 0)
 
547
        encoding = kwargs.pop('encoding', None)
 
548
        stdin = kwargs.pop('stdin', None)
 
549
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
 
550
 
 
551
    def run_bzr_decode(self, *args, **kwargs):
 
552
        if kwargs.has_key('encoding'):
 
553
            encoding = kwargs['encoding']
 
554
        else:
 
555
            encoding = bzrlib.user_encoding
 
556
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
557
 
 
558
    def check_inventory_shape(self, inv, shape):
 
559
        """Compare an inventory to a list of expected names.
 
560
 
 
561
        Fail if they are not precisely equal.
 
562
        """
 
563
        extras = []
 
564
        shape = list(shape)             # copy
 
565
        for path, ie in inv.entries():
 
566
            name = path.replace('\\', '/')
 
567
            if ie.kind == 'dir':
 
568
                name = name + '/'
 
569
            if name in shape:
 
570
                shape.remove(name)
 
571
            else:
 
572
                extras.append(name)
 
573
        if shape:
 
574
            self.fail("expected paths not found in inventory: %r" % shape)
 
575
        if extras:
 
576
            self.fail("unexpected paths found in inventory: %r" % extras)
 
577
 
 
578
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
579
                         a_callable=None, *args, **kwargs):
 
580
        """Call callable with redirected std io pipes.
 
581
 
 
582
        Returns the return code."""
 
583
        if not callable(a_callable):
 
584
            raise ValueError("a_callable must be callable.")
 
585
        if stdin is None:
 
586
            stdin = StringIO("")
 
587
        if stdout is None:
 
588
            if getattr(self, "_log_file", None) is not None:
 
589
                stdout = self._log_file
 
590
            else:
 
591
                stdout = StringIO()
 
592
        if stderr is None:
 
593
            if getattr(self, "_log_file", None is not None):
 
594
                stderr = self._log_file
 
595
            else:
 
596
                stderr = StringIO()
 
597
        real_stdin = sys.stdin
 
598
        real_stdout = sys.stdout
 
599
        real_stderr = sys.stderr
 
600
        try:
 
601
            sys.stdout = stdout
 
602
            sys.stderr = stderr
 
603
            sys.stdin = stdin
 
604
            return a_callable(*args, **kwargs)
 
605
        finally:
 
606
            sys.stdout = real_stdout
 
607
            sys.stderr = real_stderr
 
608
            sys.stdin = real_stdin
 
609
 
 
610
    def merge(self, branch_from, wt_to):
 
611
        """A helper for tests to do a ui-less merge.
 
612
 
 
613
        This should move to the main library when someone has time to integrate
 
614
        it in.
 
615
        """
 
616
        # minimal ui-less merge.
 
617
        wt_to.branch.fetch(branch_from)
 
618
        base_rev = common_ancestor(branch_from.last_revision(),
 
619
                                   wt_to.branch.last_revision(),
 
620
                                   wt_to.branch.repository)
 
621
        merge_inner(wt_to.branch, branch_from.basis_tree(), 
 
622
                    wt_to.branch.repository.revision_tree(base_rev),
 
623
                    this_tree=wt_to)
 
624
        wt_to.add_pending_merge(branch_from.last_revision())
 
625
 
 
626
 
 
627
BzrTestBase = TestCase
 
628
 
 
629
     
 
630
class TestCaseInTempDir(TestCase):
 
631
    """Derived class that runs a test within a temporary directory.
 
632
 
 
633
    This is useful for tests that need to create a branch, etc.
 
634
 
 
635
    The directory is created in a slightly complex way: for each
 
636
    Python invocation, a new temporary top-level directory is created.
 
637
    All test cases create their own directory within that.  If the
 
638
    tests complete successfully, the directory is removed.
 
639
 
 
640
    InTempDir is an old alias for FunctionalTestCase.
 
641
    """
 
642
 
 
643
    TEST_ROOT = None
 
644
    _TEST_NAME = 'test'
 
645
    OVERRIDE_PYTHON = 'python'
 
646
 
 
647
    def check_file_contents(self, filename, expect):
 
648
        self.log("check contents of file %s" % filename)
 
649
        contents = file(filename, 'r').read()
 
650
        if contents != expect:
 
651
            self.log("expected: %r" % expect)
 
652
            self.log("actually: %r" % contents)
 
653
            self.fail("contents of %s not as expected" % filename)
 
654
 
 
655
    def _make_test_root(self):
 
656
        if TestCaseInTempDir.TEST_ROOT is not None:
 
657
            return
 
658
        i = 0
 
659
        while True:
 
660
            root = u'test%04d.tmp' % i
 
661
            try:
 
662
                os.mkdir(root)
 
663
            except OSError, e:
 
664
                if e.errno == errno.EEXIST:
 
665
                    i += 1
 
666
                    continue
 
667
                else:
 
668
                    raise
 
669
            # successfully created
 
670
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
671
            break
 
672
        # make a fake bzr directory there to prevent any tests propagating
 
673
        # up onto the source directory's real branch
 
674
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
 
675
 
 
676
    def setUp(self):
 
677
        super(TestCaseInTempDir, self).setUp()
 
678
        self._make_test_root()
 
679
        _currentdir = os.getcwdu()
 
680
        # shorten the name, to avoid test failures due to path length
 
681
        short_id = self.id().replace('bzrlib.tests.', '') \
 
682
                   .replace('__main__.', '')[-100:]
 
683
        # it's possible the same test class is run several times for
 
684
        # parameterized tests, so make sure the names don't collide.  
 
685
        i = 0
 
686
        while True:
 
687
            if i > 0:
 
688
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
689
            else:
 
690
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
691
            if os.path.exists(candidate_dir):
 
692
                i = i + 1
 
693
                continue
 
694
            else:
 
695
                self.test_dir = candidate_dir
 
696
                os.mkdir(self.test_dir)
 
697
                os.chdir(self.test_dir)
 
698
                break
 
699
        os.environ['HOME'] = self.test_dir
 
700
        os.environ['APPDATA'] = self.test_dir
 
701
        def _leaveDirectory():
 
702
            os.chdir(_currentdir)
 
703
        self.addCleanup(_leaveDirectory)
 
704
        
 
705
    def build_tree(self, shape, line_endings='native', transport=None):
 
706
        """Build a test tree according to a pattern.
 
707
 
 
708
        shape is a sequence of file specifications.  If the final
 
709
        character is '/', a directory is created.
 
710
 
 
711
        This doesn't add anything to a branch.
 
712
        :param line_endings: Either 'binary' or 'native'
 
713
                             in binary mode, exact contents are written
 
714
                             in native mode, the line endings match the
 
715
                             default platform endings.
 
716
 
 
717
        :param transport: A transport to write to, for building trees on 
 
718
                          VFS's. If the transport is readonly or None,
 
719
                          "." is opened automatically.
 
720
        """
 
721
        # XXX: It's OK to just create them using forward slashes on windows?
 
722
        if transport is None or transport.is_readonly():
 
723
            transport = get_transport(".")
 
724
        for name in shape:
 
725
            self.assert_(isinstance(name, basestring))
 
726
            if name[-1] == '/':
 
727
                transport.mkdir(urlutils.escape(name[:-1]))
 
728
            else:
 
729
                if line_endings == 'binary':
 
730
                    end = '\n'
 
731
                elif line_endings == 'native':
 
732
                    end = os.linesep
 
733
                else:
 
734
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
 
735
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
736
                transport.put(urlutils.escape(name), StringIO(content))
 
737
 
 
738
    def build_tree_contents(self, shape):
 
739
        build_tree_contents(shape)
 
740
 
 
741
    def failUnlessExists(self, path):
 
742
        """Fail unless path, which may be abs or relative, exists."""
 
743
        self.failUnless(osutils.lexists(path))
 
744
 
 
745
    def failIfExists(self, path):
 
746
        """Fail if path, which may be abs or relative, exists."""
 
747
        self.failIf(osutils.lexists(path))
 
748
        
 
749
    def assertFileEqual(self, content, path):
 
750
        """Fail if path does not contain 'content'."""
 
751
        self.failUnless(osutils.lexists(path))
 
752
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
753
        self.assertEqualDiff(content, open(path, 'r').read())
 
754
 
 
755
 
 
756
class TestCaseWithTransport(TestCaseInTempDir):
 
757
    """A test case that provides get_url and get_readonly_url facilities.
 
758
 
 
759
    These back onto two transport servers, one for readonly access and one for
 
760
    read write access.
 
761
 
 
762
    If no explicit class is provided for readonly access, a
 
763
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
764
    based read write transports.
 
765
 
 
766
    If an explicit class is provided for readonly access, that server and the 
 
767
    readwrite one must both define get_url() as resolving to os.getcwd().
 
768
    """
 
769
 
 
770
    def __init__(self, methodName='testMethod'):
 
771
        super(TestCaseWithTransport, self).__init__(methodName)
 
772
        self.__readonly_server = None
 
773
        self.__server = None
 
774
        self.transport_server = default_transport
 
775
        self.transport_readonly_server = None
 
776
 
 
777
    def get_readonly_url(self, relpath=None):
 
778
        """Get a URL for the readonly transport.
 
779
 
 
780
        This will either be backed by '.' or a decorator to the transport 
 
781
        used by self.get_url()
 
782
        relpath provides for clients to get a path relative to the base url.
 
783
        These should only be downwards relative, not upwards.
 
784
        """
 
785
        base = self.get_readonly_server().get_url()
 
786
        if relpath is not None:
 
787
            if not base.endswith('/'):
 
788
                base = base + '/'
 
789
            base = base + relpath
 
790
        return base
 
791
 
 
792
    def get_readonly_server(self):
 
793
        """Get the server instance for the readonly transport
 
794
 
 
795
        This is useful for some tests with specific servers to do diagnostics.
 
796
        """
 
797
        if self.__readonly_server is None:
 
798
            if self.transport_readonly_server is None:
 
799
                # readonly decorator requested
 
800
                # bring up the server
 
801
                self.get_url()
 
802
                self.__readonly_server = ReadonlyServer()
 
803
                self.__readonly_server.setUp(self.__server)
 
804
            else:
 
805
                self.__readonly_server = self.transport_readonly_server()
 
806
                self.__readonly_server.setUp()
 
807
            self.addCleanup(self.__readonly_server.tearDown)
 
808
        return self.__readonly_server
 
809
 
 
810
    def get_server(self):
 
811
        """Get the read/write server instance.
 
812
 
 
813
        This is useful for some tests with specific servers that need
 
814
        diagnostics.
 
815
        """
 
816
        if self.__server is None:
 
817
            self.__server = self.transport_server()
 
818
            self.__server.setUp()
 
819
            self.addCleanup(self.__server.tearDown)
 
820
        return self.__server
 
821
 
 
822
    def get_url(self, relpath=None):
 
823
        """Get a URL for the readwrite transport.
 
824
 
 
825
        This will either be backed by '.' or to an equivalent non-file based
 
826
        facility.
 
827
        relpath provides for clients to get a path relative to the base url.
 
828
        These should only be downwards relative, not upwards.
 
829
        """
 
830
        base = self.get_server().get_url()
 
831
        if relpath is not None and relpath != '.':
 
832
            if not base.endswith('/'):
 
833
                base = base + '/'
 
834
            base = base + urlutils.escape(relpath)
 
835
        return base
 
836
 
 
837
    def get_transport(self):
 
838
        """Return a writeable transport for the test scratch space"""
 
839
        t = get_transport(self.get_url())
 
840
        self.assertFalse(t.is_readonly())
 
841
        return t
 
842
 
 
843
    def get_readonly_transport(self):
 
844
        """Return a readonly transport for the test scratch space
 
845
        
 
846
        This can be used to test that operations which should only need
 
847
        readonly access in fact do not try to write.
 
848
        """
 
849
        t = get_transport(self.get_readonly_url())
 
850
        self.assertTrue(t.is_readonly())
 
851
        return t
 
852
 
 
853
    def make_branch(self, relpath, format=None):
 
854
        """Create a branch on the transport at relpath."""
 
855
        repo = self.make_repository(relpath, format=format)
 
856
        return repo.bzrdir.create_branch()
 
857
 
 
858
    def make_bzrdir(self, relpath, format=None):
 
859
        try:
 
860
            url = self.get_url(relpath)
 
861
            mutter('relpath %r => url %r', relpath, url)
 
862
            segments = url.split('/')
 
863
            if segments and segments[-1] not in ('', '.'):
 
864
                parent = '/'.join(segments[:-1])
 
865
                t = get_transport(parent)
 
866
                try:
 
867
                    t.mkdir(segments[-1])
 
868
                except errors.FileExists:
 
869
                    pass
 
870
            if format is None:
 
871
                format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
 
872
            # FIXME: make this use a single transport someday. RBC 20060418
 
873
            return format.initialize_on_transport(get_transport(relpath))
 
874
        except errors.UninitializableFormat:
 
875
            raise TestSkipped("Format %s is not initializable." % format)
 
876
 
 
877
    def make_repository(self, relpath, shared=False, format=None):
 
878
        """Create a repository on our default transport at relpath."""
 
879
        made_control = self.make_bzrdir(relpath, format=format)
 
880
        return made_control.create_repository(shared=shared)
 
881
 
 
882
    def make_branch_and_tree(self, relpath, format=None):
 
883
        """Create a branch on the transport and a tree locally.
 
884
 
 
885
        Returns the tree.
 
886
        """
 
887
        # TODO: always use the local disk path for the working tree,
 
888
        # this obviously requires a format that supports branch references
 
889
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
890
        # RBC 20060208
 
891
        b = self.make_branch(relpath, format=format)
 
892
        try:
 
893
            return b.bzrdir.create_workingtree()
 
894
        except errors.NotLocalUrl:
 
895
            # new formats - catch No tree error and create
 
896
            # a branch reference and a checkout.
 
897
            # old formats at that point - raise TestSkipped.
 
898
            # TODO: rbc 20060208
 
899
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
 
900
 
 
901
    def assertIsDirectory(self, relpath, transport):
 
902
        """Assert that relpath within transport is a directory.
 
903
 
 
904
        This may not be possible on all transports; in that case it propagates
 
905
        a TransportNotPossible.
 
906
        """
 
907
        try:
 
908
            mode = transport.stat(relpath).st_mode
 
909
        except errors.NoSuchFile:
 
910
            self.fail("path %s is not a directory; no such file"
 
911
                      % (relpath))
 
912
        if not stat.S_ISDIR(mode):
 
913
            self.fail("path %s is not a directory; has mode %#o"
 
914
                      % (relpath, mode))
 
915
 
 
916
 
 
917
class ChrootedTestCase(TestCaseWithTransport):
 
918
    """A support class that provides readonly urls outside the local namespace.
 
919
 
 
920
    This is done by checking if self.transport_server is a MemoryServer. if it
 
921
    is then we are chrooted already, if it is not then an HttpServer is used
 
922
    for readonly urls.
 
923
 
 
924
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
925
                       be used without needed to redo it when a different 
 
926
                       subclass is in use ?
 
927
    """
 
928
 
 
929
    def setUp(self):
 
930
        super(ChrootedTestCase, self).setUp()
 
931
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
932
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
933
 
 
934
 
 
935
def filter_suite_by_re(suite, pattern):
 
936
    result = TestSuite()
 
937
    filter_re = re.compile(pattern)
 
938
    for test in iter_suite_tests(suite):
 
939
        if filter_re.search(test.id()):
 
940
            result.addTest(test)
 
941
    return result
 
942
 
 
943
 
 
944
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
945
              stop_on_failure=False, keep_output=False,
 
946
              transport=None):
 
947
    TestCaseInTempDir._TEST_NAME = name
 
948
    if verbose:
 
949
        verbosity = 2
 
950
    else:
 
951
        verbosity = 1
 
952
    runner = TextTestRunner(stream=sys.stdout,
 
953
                            descriptions=0,
 
954
                            verbosity=verbosity)
 
955
    runner.stop_on_failure=stop_on_failure
 
956
    if pattern != '.*':
 
957
        suite = filter_suite_by_re(suite, pattern)
 
958
    result = runner.run(suite)
 
959
    # This is still a little bogus, 
 
960
    # but only a little. Folk not using our testrunner will
 
961
    # have to delete their temp directories themselves.
 
962
    test_root = TestCaseInTempDir.TEST_ROOT
 
963
    if result.wasSuccessful() or not keep_output:
 
964
        if test_root is not None:
 
965
            print 'Deleting test root %s...' % test_root
 
966
            try:
 
967
                osutils.rmtree(test_root)
 
968
            finally:
 
969
                print
 
970
    else:
 
971
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
972
    return result.wasSuccessful()
 
973
 
 
974
 
 
975
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
976
             keep_output=False,
 
977
             transport=None):
 
978
    """Run the whole test suite under the enhanced runner"""
 
979
    global default_transport
 
980
    if transport is None:
 
981
        transport = default_transport
 
982
    old_transport = default_transport
 
983
    default_transport = transport
 
984
    suite = test_suite()
 
985
    try:
 
986
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
987
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
988
                     transport=transport)
 
989
    finally:
 
990
        default_transport = old_transport
 
991
 
 
992
 
 
993
 
 
994
def test_suite():
 
995
    """Build and return TestSuite for the whole program."""
 
996
    from doctest import DocTestSuite
 
997
 
 
998
    global MODULES_TO_DOCTEST
 
999
 
 
1000
    testmod_names = [ \
 
1001
                   'bzrlib.tests.test_ancestry',
 
1002
                   'bzrlib.tests.test_api',
 
1003
                   'bzrlib.tests.test_bad_files',
 
1004
                   'bzrlib.tests.test_branch',
 
1005
                   'bzrlib.tests.test_bzrdir',
 
1006
                   'bzrlib.tests.test_command',
 
1007
                   'bzrlib.tests.test_commit',
 
1008
                   'bzrlib.tests.test_commit_merge',
 
1009
                   'bzrlib.tests.test_config',
 
1010
                   'bzrlib.tests.test_conflicts',
 
1011
                   'bzrlib.tests.test_decorators',
 
1012
                   'bzrlib.tests.test_diff',
 
1013
                   'bzrlib.tests.test_doc_generate',
 
1014
                   'bzrlib.tests.test_errors',
 
1015
                   'bzrlib.tests.test_escaped_store',
 
1016
                   'bzrlib.tests.test_fetch',
 
1017
                   'bzrlib.tests.test_gpg',
 
1018
                   'bzrlib.tests.test_graph',
 
1019
                   'bzrlib.tests.test_hashcache',
 
1020
                   'bzrlib.tests.test_http',
 
1021
                   'bzrlib.tests.test_identitymap',
 
1022
                   'bzrlib.tests.test_inv',
 
1023
                   'bzrlib.tests.test_knit',
 
1024
                   'bzrlib.tests.test_lockdir',
 
1025
                   'bzrlib.tests.test_lockable_files',
 
1026
                   'bzrlib.tests.test_log',
 
1027
                   'bzrlib.tests.test_merge',
 
1028
                   'bzrlib.tests.test_merge3',
 
1029
                   'bzrlib.tests.test_merge_core',
 
1030
                   'bzrlib.tests.test_missing',
 
1031
                   'bzrlib.tests.test_msgeditor',
 
1032
                   'bzrlib.tests.test_nonascii',
 
1033
                   'bzrlib.tests.test_options',
 
1034
                   'bzrlib.tests.test_osutils',
 
1035
                   'bzrlib.tests.test_patch',
 
1036
                   'bzrlib.tests.test_permissions',
 
1037
                   'bzrlib.tests.test_plugins',
 
1038
                   'bzrlib.tests.test_progress',
 
1039
                   'bzrlib.tests.test_reconcile',
 
1040
                   'bzrlib.tests.test_repository',
 
1041
                   'bzrlib.tests.test_revision',
 
1042
                   'bzrlib.tests.test_revisionnamespaces',
 
1043
                   'bzrlib.tests.test_revprops',
 
1044
                   'bzrlib.tests.test_rio',
 
1045
                   'bzrlib.tests.test_sampler',
 
1046
                   'bzrlib.tests.test_selftest',
 
1047
                   'bzrlib.tests.test_setup',
 
1048
                   'bzrlib.tests.test_sftp_transport',
 
1049
                   'bzrlib.tests.test_smart_add',
 
1050
                   'bzrlib.tests.test_source',
 
1051
                   'bzrlib.tests.test_store',
 
1052
                   'bzrlib.tests.test_symbol_versioning',
 
1053
                   'bzrlib.tests.test_testament',
 
1054
                   'bzrlib.tests.test_textfile',
 
1055
                   'bzrlib.tests.test_textmerge',
 
1056
                   'bzrlib.tests.test_trace',
 
1057
                   'bzrlib.tests.test_transactions',
 
1058
                   'bzrlib.tests.test_transform',
 
1059
                   'bzrlib.tests.test_transport',
 
1060
                   'bzrlib.tests.test_tsort',
 
1061
                   'bzrlib.tests.test_tuned_gzip',
 
1062
                   'bzrlib.tests.test_ui',
 
1063
                   'bzrlib.tests.test_upgrade',
 
1064
                   'bzrlib.tests.test_urlutils',
 
1065
                   'bzrlib.tests.test_versionedfile',
 
1066
                   'bzrlib.tests.test_weave',
 
1067
                   'bzrlib.tests.test_whitebox',
 
1068
                   'bzrlib.tests.test_workingtree',
 
1069
                   'bzrlib.tests.test_xml',
 
1070
                   ]
 
1071
    test_transport_implementations = [
 
1072
        'bzrlib.tests.test_transport_implementations']
 
1073
 
 
1074
    TestCase.BZRPATH = osutils.pathjoin(
 
1075
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
 
1076
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
 
1077
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
 
1078
    print
 
1079
    suite = TestSuite()
 
1080
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
 
1081
    # errors if it fails to load a named module - no indication of what's
 
1082
    # actually wrong, just "no such module".  We should probably override that
 
1083
    # class, but for the moment just load them ourselves. (mbp 20051202)
 
1084
    loader = TestLoader()
 
1085
    from bzrlib.transport import TransportTestProviderAdapter
 
1086
    adapter = TransportTestProviderAdapter()
 
1087
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1088
    for mod_name in testmod_names:
 
1089
        mod = _load_module_by_name(mod_name)
 
1090
        suite.addTest(loader.loadTestsFromModule(mod))
 
1091
    for package in packages_to_test():
 
1092
        suite.addTest(package.test_suite())
 
1093
    for m in MODULES_TO_TEST:
 
1094
        suite.addTest(loader.loadTestsFromModule(m))
 
1095
    for m in (MODULES_TO_DOCTEST):
 
1096
        suite.addTest(DocTestSuite(m))
 
1097
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1098
        if getattr(plugin, 'test_suite', None) is not None:
 
1099
            suite.addTest(plugin.test_suite())
 
1100
    return suite
 
1101
 
 
1102
 
 
1103
def adapt_modules(mods_list, adapter, loader, suite):
 
1104
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1105
    for mod_name in mods_list:
 
1106
        mod = _load_module_by_name(mod_name)
 
1107
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
 
1108
            suite.addTests(adapter.adapt(test))
 
1109
 
 
1110
 
 
1111
def _load_module_by_name(mod_name):
 
1112
    parts = mod_name.split('.')
 
1113
    module = __import__(mod_name)
 
1114
    del parts[0]
 
1115
    # for historical reasons python returns the top-level module even though
 
1116
    # it loads the submodule; we need to walk down to get the one we want.
 
1117
    while parts:
 
1118
        module = getattr(module, parts.pop(0))
 
1119
    return module