/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2006-05-10 19:43:34 UTC
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060510194334-0c20aad23237d047
Working on getting normalize_url working.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import errno
 
33
import logging
 
34
import os
 
35
import re
 
36
import shutil
 
37
import stat
 
38
import sys
 
39
import tempfile
 
40
import unittest
 
41
import time
 
42
 
 
43
 
 
44
import bzrlib.branch
 
45
import bzrlib.bzrdir as bzrdir
 
46
import bzrlib.commands
 
47
import bzrlib.errors as errors
 
48
import bzrlib.inventory
 
49
import bzrlib.iterablefile
 
50
import bzrlib.lockdir
 
51
from bzrlib.merge import merge_inner
 
52
import bzrlib.merge3
 
53
import bzrlib.osutils
 
54
import bzrlib.osutils as osutils
 
55
import bzrlib.plugin
 
56
from bzrlib.revision import common_ancestor
 
57
import bzrlib.store
 
58
import bzrlib.trace
 
59
from bzrlib.transport import get_transport
 
60
import bzrlib.transport
 
61
from bzrlib.transport.local import LocalRelpathServer
 
62
from bzrlib.transport.readonly import ReadonlyServer
 
63
from bzrlib.trace import mutter
 
64
from bzrlib.tests.TestUtil import TestLoader, TestSuite
 
65
from bzrlib.tests.treeshape import build_tree_contents
 
66
import bzrlib.urlutils as urlutils
 
67
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
68
 
 
69
default_transport = LocalRelpathServer
 
70
 
 
71
MODULES_TO_TEST = []
 
72
MODULES_TO_DOCTEST = [
 
73
                      bzrlib.branch,
 
74
                      bzrlib.commands,
 
75
                      bzrlib.errors,
 
76
                      bzrlib.inventory,
 
77
                      bzrlib.iterablefile,
 
78
                      bzrlib.lockdir,
 
79
                      bzrlib.merge3,
 
80
                      bzrlib.option,
 
81
                      bzrlib.osutils,
 
82
                      bzrlib.store
 
83
                      ]
 
84
def packages_to_test():
 
85
    """Return a list of packages to test.
 
86
 
 
87
    The packages are not globally imported so that import failures are
 
88
    triggered when running selftest, not when importing the command.
 
89
    """
 
90
    import bzrlib.doc
 
91
    import bzrlib.tests.blackbox
 
92
    import bzrlib.tests.branch_implementations
 
93
    import bzrlib.tests.bzrdir_implementations
 
94
    import bzrlib.tests.interrepository_implementations
 
95
    import bzrlib.tests.interversionedfile_implementations
 
96
    import bzrlib.tests.repository_implementations
 
97
    import bzrlib.tests.revisionstore_implementations
 
98
    import bzrlib.tests.workingtree_implementations
 
99
    return [
 
100
            bzrlib.doc,
 
101
            bzrlib.tests.blackbox,
 
102
            bzrlib.tests.branch_implementations,
 
103
            bzrlib.tests.bzrdir_implementations,
 
104
            bzrlib.tests.interrepository_implementations,
 
105
            bzrlib.tests.interversionedfile_implementations,
 
106
            bzrlib.tests.repository_implementations,
 
107
            bzrlib.tests.revisionstore_implementations,
 
108
            bzrlib.tests.workingtree_implementations,
 
109
            ]
 
110
 
 
111
 
 
112
class _MyResult(unittest._TextTestResult):
 
113
    """Custom TestResult.
 
114
 
 
115
    Shows output in a different format, including displaying runtime for tests.
 
116
    """
 
117
    stop_early = False
 
118
 
 
119
    def _elapsedTime(self):
 
120
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
121
 
 
122
    def startTest(self, test):
 
123
        unittest.TestResult.startTest(self, test)
 
124
        # In a short description, the important words are in
 
125
        # the beginning, but in an id, the important words are
 
126
        # at the end
 
127
        SHOW_DESCRIPTIONS = False
 
128
        if self.showAll:
 
129
            width = osutils.terminal_width()
 
130
            name_width = width - 15
 
131
            what = None
 
132
            if SHOW_DESCRIPTIONS:
 
133
                what = test.shortDescription()
 
134
                if what:
 
135
                    if len(what) > name_width:
 
136
                        what = what[:name_width-3] + '...'
 
137
            if what is None:
 
138
                what = test.id()
 
139
                if what.startswith('bzrlib.tests.'):
 
140
                    what = what[13:]
 
141
                if len(what) > name_width:
 
142
                    what = '...' + what[3-name_width:]
 
143
            what = what.ljust(name_width)
 
144
            self.stream.write(what)
 
145
        self.stream.flush()
 
146
        self._start_time = time.time()
 
147
 
 
148
    def addError(self, test, err):
 
149
        if isinstance(err[1], TestSkipped):
 
150
            return self.addSkipped(test, err)    
 
151
        unittest.TestResult.addError(self, test, err)
 
152
        if self.showAll:
 
153
            self.stream.writeln("ERROR %s" % self._elapsedTime())
 
154
        elif self.dots:
 
155
            self.stream.write('E')
 
156
        self.stream.flush()
 
157
        if self.stop_early:
 
158
            self.stop()
 
159
 
 
160
    def addFailure(self, test, err):
 
161
        unittest.TestResult.addFailure(self, test, err)
 
162
        if self.showAll:
 
163
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
 
164
        elif self.dots:
 
165
            self.stream.write('F')
 
166
        self.stream.flush()
 
167
        if self.stop_early:
 
168
            self.stop()
 
169
 
 
170
    def addSuccess(self, test):
 
171
        if self.showAll:
 
172
            self.stream.writeln('   OK %s' % self._elapsedTime())
 
173
        elif self.dots:
 
174
            self.stream.write('~')
 
175
        self.stream.flush()
 
176
        unittest.TestResult.addSuccess(self, test)
 
177
 
 
178
    def addSkipped(self, test, skip_excinfo):
 
179
        if self.showAll:
 
180
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
 
181
            print >>self.stream, '     %s' % skip_excinfo[1]
 
182
        elif self.dots:
 
183
            self.stream.write('S')
 
184
        self.stream.flush()
 
185
        # seems best to treat this as success from point-of-view of unittest
 
186
        # -- it actually does nothing so it barely matters :)
 
187
        unittest.TestResult.addSuccess(self, test)
 
188
 
 
189
    def printErrorList(self, flavour, errors):
 
190
        for test, err in errors:
 
191
            self.stream.writeln(self.separator1)
 
192
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
193
            if getattr(test, '_get_log', None) is not None:
 
194
                print >>self.stream
 
195
                print >>self.stream, \
 
196
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
197
                print >>self.stream, test._get_log()
 
198
                print >>self.stream, \
 
199
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
200
            self.stream.writeln(self.separator2)
 
201
            self.stream.writeln("%s" % err)
 
202
 
 
203
 
 
204
class TextTestRunner(unittest.TextTestRunner):
 
205
    stop_on_failure = False
 
206
 
 
207
    def _makeResult(self):
 
208
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
209
        result.stop_early = self.stop_on_failure
 
210
        return result
 
211
 
 
212
 
 
213
def iter_suite_tests(suite):
 
214
    """Return all tests in a suite, recursing through nested suites"""
 
215
    for item in suite._tests:
 
216
        if isinstance(item, unittest.TestCase):
 
217
            yield item
 
218
        elif isinstance(item, unittest.TestSuite):
 
219
            for r in iter_suite_tests(item):
 
220
                yield r
 
221
        else:
 
222
            raise Exception('unknown object %r inside test suite %r'
 
223
                            % (item, suite))
 
224
 
 
225
 
 
226
class TestSkipped(Exception):
 
227
    """Indicates that a test was intentionally skipped, rather than failing."""
 
228
    # XXX: Not used yet
 
229
 
 
230
 
 
231
class CommandFailed(Exception):
 
232
    pass
 
233
 
 
234
 
 
235
class StringIOWrapper(object):
 
236
    """A wrapper around cStringIO which just adds an encoding attribute.
 
237
    
 
238
    Internally we can check sys.stdout to see what the output encoding
 
239
    should be. However, cStringIO has no encoding attribute that we can
 
240
    set. So we wrap it instead.
 
241
    """
 
242
    encoding='ascii'
 
243
    _cstring = None
 
244
 
 
245
    def __init__(self, s=None):
 
246
        if s is not None:
 
247
            self.__dict__['_cstring'] = StringIO(s)
 
248
        else:
 
249
            self.__dict__['_cstring'] = StringIO()
 
250
 
 
251
    def __getattr__(self, name, getattr=getattr):
 
252
        return getattr(self.__dict__['_cstring'], name)
 
253
 
 
254
    def __setattr__(self, name, val):
 
255
        if name == 'encoding':
 
256
            self.__dict__['encoding'] = val
 
257
        else:
 
258
            return setattr(self._cstring, name, val)
 
259
 
 
260
 
 
261
class TestCase(unittest.TestCase):
 
262
    """Base class for bzr unit tests.
 
263
    
 
264
    Tests that need access to disk resources should subclass 
 
265
    TestCaseInTempDir not TestCase.
 
266
 
 
267
    Error and debug log messages are redirected from their usual
 
268
    location into a temporary file, the contents of which can be
 
269
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
270
    so that it can also capture file IO.  When the test completes this file
 
271
    is read into memory and removed from disk.
 
272
       
 
273
    There are also convenience functions to invoke bzr's command-line
 
274
    routine, and to build and check bzr trees.
 
275
   
 
276
    In addition to the usual method of overriding tearDown(), this class also
 
277
    allows subclasses to register functions into the _cleanups list, which is
 
278
    run in order as the object is torn down.  It's less likely this will be
 
279
    accidentally overlooked.
 
280
    """
 
281
 
 
282
    BZRPATH = 'bzr'
 
283
    _log_file_name = None
 
284
    _log_contents = ''
 
285
 
 
286
    def __init__(self, methodName='testMethod'):
 
287
        super(TestCase, self).__init__(methodName)
 
288
        self._cleanups = []
 
289
 
 
290
    def setUp(self):
 
291
        unittest.TestCase.setUp(self)
 
292
        self._cleanEnvironment()
 
293
        bzrlib.trace.disable_default_logging()
 
294
        self._startLogFile()
 
295
 
 
296
    def _ndiff_strings(self, a, b):
 
297
        """Return ndiff between two strings containing lines.
 
298
        
 
299
        A trailing newline is added if missing to make the strings
 
300
        print properly."""
 
301
        if b and b[-1] != '\n':
 
302
            b += '\n'
 
303
        if a and a[-1] != '\n':
 
304
            a += '\n'
 
305
        difflines = difflib.ndiff(a.splitlines(True),
 
306
                                  b.splitlines(True),
 
307
                                  linejunk=lambda x: False,
 
308
                                  charjunk=lambda x: False)
 
309
        return ''.join(difflines)
 
310
 
 
311
    def assertEqualDiff(self, a, b, message=None):
 
312
        """Assert two texts are equal, if not raise an exception.
 
313
        
 
314
        This is intended for use with multi-line strings where it can 
 
315
        be hard to find the differences by eye.
 
316
        """
 
317
        # TODO: perhaps override assertEquals to call this for strings?
 
318
        if a == b:
 
319
            return
 
320
        if message is None:
 
321
            message = "texts not equal:\n"
 
322
        raise AssertionError(message + 
 
323
                             self._ndiff_strings(a, b))      
 
324
        
 
325
    def assertEqualMode(self, mode, mode_test):
 
326
        self.assertEqual(mode, mode_test,
 
327
                         'mode mismatch %o != %o' % (mode, mode_test))
 
328
 
 
329
    def assertStartsWith(self, s, prefix):
 
330
        if not s.startswith(prefix):
 
331
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
332
 
 
333
    def assertEndsWith(self, s, suffix):
 
334
        if not s.endswith(prefix):
 
335
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
336
 
 
337
    def assertContainsRe(self, haystack, needle_re):
 
338
        """Assert that a contains something matching a regular expression."""
 
339
        if not re.search(needle_re, haystack):
 
340
            raise AssertionError('pattern "%s" not found in "%s"'
 
341
                    % (needle_re, haystack))
 
342
 
 
343
    def assertSubset(self, sublist, superlist):
 
344
        """Assert that every entry in sublist is present in superlist."""
 
345
        missing = []
 
346
        for entry in sublist:
 
347
            if entry not in superlist:
 
348
                missing.append(entry)
 
349
        if len(missing) > 0:
 
350
            raise AssertionError("value(s) %r not present in container %r" % 
 
351
                                 (missing, superlist))
 
352
 
 
353
    def assertIs(self, left, right):
 
354
        if not (left is right):
 
355
            raise AssertionError("%r is not %r." % (left, right))
 
356
 
 
357
    def assertTransportMode(self, transport, path, mode):
 
358
        """Fail if a path does not have mode mode.
 
359
        
 
360
        If modes are not supported on this transport, the assertion is ignored.
 
361
        """
 
362
        if not transport._can_roundtrip_unix_modebits():
 
363
            return
 
364
        path_stat = transport.stat(path)
 
365
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
366
        self.assertEqual(mode, actual_mode,
 
367
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
368
 
 
369
    def assertIsInstance(self, obj, kls):
 
370
        """Fail if obj is not an instance of kls"""
 
371
        if not isinstance(obj, kls):
 
372
            self.fail("%r is an instance of %s rather than %s" % (
 
373
                obj, obj.__class__, kls))
 
374
 
 
375
    def _startLogFile(self):
 
376
        """Send bzr and test log messages to a temporary file.
 
377
 
 
378
        The file is removed as the test is torn down.
 
379
        """
 
380
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
381
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
 
382
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
 
383
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
384
        self._log_file_name = name
 
385
        self.addCleanup(self._finishLogFile)
 
386
 
 
387
    def _finishLogFile(self):
 
388
        """Finished with the log file.
 
389
 
 
390
        Read contents into memory, close, and delete.
 
391
        """
 
392
        bzrlib.trace.disable_test_log(self._log_nonce)
 
393
        self._log_file.seek(0)
 
394
        self._log_contents = self._log_file.read()
 
395
        self._log_file.close()
 
396
        os.remove(self._log_file_name)
 
397
        self._log_file = self._log_file_name = None
 
398
 
 
399
    def addCleanup(self, callable):
 
400
        """Arrange to run a callable when this case is torn down.
 
401
 
 
402
        Callables are run in the reverse of the order they are registered, 
 
403
        ie last-in first-out.
 
404
        """
 
405
        if callable in self._cleanups:
 
406
            raise ValueError("cleanup function %r already registered on %s" 
 
407
                    % (callable, self))
 
408
        self._cleanups.append(callable)
 
409
 
 
410
    def _cleanEnvironment(self):
 
411
        new_env = {
 
412
            'HOME': os.getcwd(),
 
413
            'APPDATA': os.getcwd(),
 
414
            'BZREMAIL': None,
 
415
            'EMAIL': None,
 
416
        }
 
417
        self.__old_env = {}
 
418
        self.addCleanup(self._restoreEnvironment)
 
419
        for name, value in new_env.iteritems():
 
420
            self._captureVar(name, value)
 
421
 
 
422
 
 
423
    def _captureVar(self, name, newvalue):
 
424
        """Set an environment variable, preparing it to be reset when finished."""
 
425
        self.__old_env[name] = os.environ.get(name, None)
 
426
        if newvalue is None:
 
427
            if name in os.environ:
 
428
                del os.environ[name]
 
429
        else:
 
430
            os.environ[name] = newvalue
 
431
 
 
432
    @staticmethod
 
433
    def _restoreVar(name, value):
 
434
        if value is None:
 
435
            if name in os.environ:
 
436
                del os.environ[name]
 
437
        else:
 
438
            os.environ[name] = value
 
439
 
 
440
    def _restoreEnvironment(self):
 
441
        for name, value in self.__old_env.iteritems():
 
442
            self._restoreVar(name, value)
 
443
 
 
444
    def tearDown(self):
 
445
        self._runCleanups()
 
446
        unittest.TestCase.tearDown(self)
 
447
 
 
448
    def _runCleanups(self):
 
449
        """Run registered cleanup functions. 
 
450
 
 
451
        This should only be called from TestCase.tearDown.
 
452
        """
 
453
        # TODO: Perhaps this should keep running cleanups even if 
 
454
        # one of them fails?
 
455
        for cleanup_fn in reversed(self._cleanups):
 
456
            cleanup_fn()
 
457
 
 
458
    def log(self, *args):
 
459
        mutter(*args)
 
460
 
 
461
    def _get_log(self):
 
462
        """Return as a string the log for this test"""
 
463
        if self._log_file_name:
 
464
            return open(self._log_file_name).read()
 
465
        else:
 
466
            return self._log_contents
 
467
        # TODO: Delete the log after it's been read in
 
468
 
 
469
    def capture(self, cmd, retcode=0):
 
470
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
471
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
472
 
 
473
    def run_bzr_captured(self, argv, retcode=0, encoding=None):
 
474
        """Invoke bzr and return (stdout, stderr).
 
475
 
 
476
        Useful for code that wants to check the contents of the
 
477
        output, the way error messages are presented, etc.
 
478
 
 
479
        This should be the main method for tests that want to exercise the
 
480
        overall behavior of the bzr application (rather than a unit test
 
481
        or a functional test of the library.)
 
482
 
 
483
        Much of the old code runs bzr by forking a new copy of Python, but
 
484
        that is slower, harder to debug, and generally not necessary.
 
485
 
 
486
        This runs bzr through the interface that catches and reports
 
487
        errors, and with logging set to something approximating the
 
488
        default, so that error reporting can be checked.
 
489
 
 
490
        :param argv: arguments to invoke bzr
 
491
        :param retcode: expected return code, or None for don't-care.
 
492
        :param encoding: encoding for sys.stdout and sys.stderr
 
493
        """
 
494
        if encoding is None:
 
495
            encoding = bzrlib.user_encoding
 
496
        stdout = StringIOWrapper()
 
497
        stderr = StringIOWrapper()
 
498
        stdout.encoding = encoding
 
499
        stderr.encoding = encoding
 
500
 
 
501
        self.log('run bzr: %r', argv)
 
502
        # FIXME: don't call into logging here
 
503
        handler = logging.StreamHandler(stderr)
 
504
        handler.setFormatter(bzrlib.trace.QuietFormatter())
 
505
        handler.setLevel(logging.INFO)
 
506
        logger = logging.getLogger('')
 
507
        logger.addHandler(handler)
 
508
        try:
 
509
            result = self.apply_redirected(None, stdout, stderr,
 
510
                                           bzrlib.commands.run_bzr_catch_errors,
 
511
                                           argv)
 
512
        finally:
 
513
            logger.removeHandler(handler)
 
514
        # TODO: jam 20060105 Because we theoretically know the encoding
 
515
        #       of stdout and stderr, we could decode them at this time
 
516
        #       but for now, we will assume that the output of all
 
517
        #       functions
 
518
        out = stdout.getvalue()
 
519
        err = stderr.getvalue()
 
520
        if out:
 
521
            self.log('output:\n%r', out)
 
522
        if err:
 
523
            self.log('errors:\n%r', err)
 
524
        if retcode is not None:
 
525
            self.assertEquals(retcode, result)
 
526
        return out, err
 
527
 
 
528
    def run_bzr(self, *args, **kwargs):
 
529
        """Invoke bzr, as if it were run from the command line.
 
530
 
 
531
        This should be the main method for tests that want to exercise the
 
532
        overall behavior of the bzr application (rather than a unit test
 
533
        or a functional test of the library.)
 
534
 
 
535
        This sends the stdout/stderr results into the test's log,
 
536
        where it may be useful for debugging.  See also run_captured.
 
537
        """
 
538
        retcode = kwargs.pop('retcode', 0)
 
539
        encoding = kwargs.pop('encoding', None)
 
540
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding)
 
541
 
 
542
    def run_bzr_decode(self, *args, **kwargs):
 
543
        if kwargs.has_key('encoding'):
 
544
            encoding = kwargs['encoding']
 
545
        else:
 
546
            encoding = bzrlib.user_encoding
 
547
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
548
 
 
549
    def check_inventory_shape(self, inv, shape):
 
550
        """Compare an inventory to a list of expected names.
 
551
 
 
552
        Fail if they are not precisely equal.
 
553
        """
 
554
        extras = []
 
555
        shape = list(shape)             # copy
 
556
        for path, ie in inv.entries():
 
557
            name = path.replace('\\', '/')
 
558
            if ie.kind == 'dir':
 
559
                name = name + '/'
 
560
            if name in shape:
 
561
                shape.remove(name)
 
562
            else:
 
563
                extras.append(name)
 
564
        if shape:
 
565
            self.fail("expected paths not found in inventory: %r" % shape)
 
566
        if extras:
 
567
            self.fail("unexpected paths found in inventory: %r" % extras)
 
568
 
 
569
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
570
                         a_callable=None, *args, **kwargs):
 
571
        """Call callable with redirected std io pipes.
 
572
 
 
573
        Returns the return code."""
 
574
        if not callable(a_callable):
 
575
            raise ValueError("a_callable must be callable.")
 
576
        if stdin is None:
 
577
            stdin = StringIO("")
 
578
        if stdout is None:
 
579
            if getattr(self, "_log_file", None) is not None:
 
580
                stdout = self._log_file
 
581
            else:
 
582
                stdout = StringIO()
 
583
        if stderr is None:
 
584
            if getattr(self, "_log_file", None is not None):
 
585
                stderr = self._log_file
 
586
            else:
 
587
                stderr = StringIO()
 
588
        real_stdin = sys.stdin
 
589
        real_stdout = sys.stdout
 
590
        real_stderr = sys.stderr
 
591
        try:
 
592
            sys.stdout = stdout
 
593
            sys.stderr = stderr
 
594
            sys.stdin = stdin
 
595
            return a_callable(*args, **kwargs)
 
596
        finally:
 
597
            sys.stdout = real_stdout
 
598
            sys.stderr = real_stderr
 
599
            sys.stdin = real_stdin
 
600
 
 
601
    def merge(self, branch_from, wt_to):
 
602
        """A helper for tests to do a ui-less merge.
 
603
 
 
604
        This should move to the main library when someone has time to integrate
 
605
        it in.
 
606
        """
 
607
        # minimal ui-less merge.
 
608
        wt_to.branch.fetch(branch_from)
 
609
        base_rev = common_ancestor(branch_from.last_revision(),
 
610
                                   wt_to.branch.last_revision(),
 
611
                                   wt_to.branch.repository)
 
612
        merge_inner(wt_to.branch, branch_from.basis_tree(), 
 
613
                    wt_to.branch.repository.revision_tree(base_rev),
 
614
                    this_tree=wt_to)
 
615
        wt_to.add_pending_merge(branch_from.last_revision())
 
616
 
 
617
 
 
618
BzrTestBase = TestCase
 
619
 
 
620
     
 
621
class TestCaseInTempDir(TestCase):
 
622
    """Derived class that runs a test within a temporary directory.
 
623
 
 
624
    This is useful for tests that need to create a branch, etc.
 
625
 
 
626
    The directory is created in a slightly complex way: for each
 
627
    Python invocation, a new temporary top-level directory is created.
 
628
    All test cases create their own directory within that.  If the
 
629
    tests complete successfully, the directory is removed.
 
630
 
 
631
    InTempDir is an old alias for FunctionalTestCase.
 
632
    """
 
633
 
 
634
    TEST_ROOT = None
 
635
    _TEST_NAME = 'test'
 
636
    OVERRIDE_PYTHON = 'python'
 
637
 
 
638
    def check_file_contents(self, filename, expect):
 
639
        self.log("check contents of file %s" % filename)
 
640
        contents = file(filename, 'r').read()
 
641
        if contents != expect:
 
642
            self.log("expected: %r" % expect)
 
643
            self.log("actually: %r" % contents)
 
644
            self.fail("contents of %s not as expected" % filename)
 
645
 
 
646
    def _make_test_root(self):
 
647
        if TestCaseInTempDir.TEST_ROOT is not None:
 
648
            return
 
649
        i = 0
 
650
        while True:
 
651
            root = u'test%04d.tmp' % i
 
652
            try:
 
653
                os.mkdir(root)
 
654
            except OSError, e:
 
655
                if e.errno == errno.EEXIST:
 
656
                    i += 1
 
657
                    continue
 
658
                else:
 
659
                    raise
 
660
            # successfully created
 
661
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
662
            break
 
663
        # make a fake bzr directory there to prevent any tests propagating
 
664
        # up onto the source directory's real branch
 
665
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
 
666
 
 
667
    def setUp(self):
 
668
        super(TestCaseInTempDir, self).setUp()
 
669
        self._make_test_root()
 
670
        _currentdir = os.getcwdu()
 
671
        # shorten the name, to avoid test failures due to path length
 
672
        short_id = self.id().replace('bzrlib.tests.', '') \
 
673
                   .replace('__main__.', '')[-100:]
 
674
        # it's possible the same test class is run several times for
 
675
        # parameterized tests, so make sure the names don't collide.  
 
676
        i = 0
 
677
        while True:
 
678
            if i > 0:
 
679
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
680
            else:
 
681
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
682
            if os.path.exists(candidate_dir):
 
683
                i = i + 1
 
684
                continue
 
685
            else:
 
686
                self.test_dir = candidate_dir
 
687
                os.mkdir(self.test_dir)
 
688
                os.chdir(self.test_dir)
 
689
                break
 
690
        os.environ['HOME'] = self.test_dir
 
691
        os.environ['APPDATA'] = self.test_dir
 
692
        def _leaveDirectory():
 
693
            os.chdir(_currentdir)
 
694
        self.addCleanup(_leaveDirectory)
 
695
        
 
696
    def build_tree(self, shape, line_endings='native', transport=None):
 
697
        """Build a test tree according to a pattern.
 
698
 
 
699
        shape is a sequence of file specifications.  If the final
 
700
        character is '/', a directory is created.
 
701
 
 
702
        This doesn't add anything to a branch.
 
703
        :param line_endings: Either 'binary' or 'native'
 
704
                             in binary mode, exact contents are written
 
705
                             in native mode, the line endings match the
 
706
                             default platform endings.
 
707
 
 
708
        :param transport: A transport to write to, for building trees on 
 
709
                          VFS's. If the transport is readonly or None,
 
710
                          "." is opened automatically.
 
711
        """
 
712
        # XXX: It's OK to just create them using forward slashes on windows?
 
713
        if transport is None or transport.is_readonly():
 
714
            transport = get_transport(".")
 
715
        for name in shape:
 
716
            self.assert_(isinstance(name, basestring))
 
717
            if name[-1] == '/':
 
718
                transport.mkdir(urlutils.escape(name[:-1]))
 
719
            else:
 
720
                if line_endings == 'binary':
 
721
                    end = '\n'
 
722
                elif line_endings == 'native':
 
723
                    end = os.linesep
 
724
                else:
 
725
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
 
726
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
727
                transport.put(urlutils.escape(name), StringIO(content))
 
728
 
 
729
    def build_tree_contents(self, shape):
 
730
        build_tree_contents(shape)
 
731
 
 
732
    def failUnlessExists(self, path):
 
733
        """Fail unless path, which may be abs or relative, exists."""
 
734
        self.failUnless(osutils.lexists(path))
 
735
 
 
736
    def failIfExists(self, path):
 
737
        """Fail if path, which may be abs or relative, exists."""
 
738
        self.failIf(osutils.lexists(path))
 
739
        
 
740
    def assertFileEqual(self, content, path):
 
741
        """Fail if path does not contain 'content'."""
 
742
        self.failUnless(osutils.lexists(path))
 
743
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
744
        self.assertEqualDiff(content, open(path, 'r').read())
 
745
 
 
746
 
 
747
class TestCaseWithTransport(TestCaseInTempDir):
 
748
    """A test case that provides get_url and get_readonly_url facilities.
 
749
 
 
750
    These back onto two transport servers, one for readonly access and one for
 
751
    read write access.
 
752
 
 
753
    If no explicit class is provided for readonly access, a
 
754
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
755
    based read write transports.
 
756
 
 
757
    If an explicit class is provided for readonly access, that server and the 
 
758
    readwrite one must both define get_url() as resolving to os.getcwd().
 
759
    """
 
760
 
 
761
    def __init__(self, methodName='testMethod'):
 
762
        super(TestCaseWithTransport, self).__init__(methodName)
 
763
        self.__readonly_server = None
 
764
        self.__server = None
 
765
        self.transport_server = default_transport
 
766
        self.transport_readonly_server = None
 
767
 
 
768
    def get_readonly_url(self, relpath=None):
 
769
        """Get a URL for the readonly transport.
 
770
 
 
771
        This will either be backed by '.' or a decorator to the transport 
 
772
        used by self.get_url()
 
773
        relpath provides for clients to get a path relative to the base url.
 
774
        These should only be downwards relative, not upwards.
 
775
        """
 
776
        base = self.get_readonly_server().get_url()
 
777
        if relpath is not None:
 
778
            if not base.endswith('/'):
 
779
                base = base + '/'
 
780
            base = base + relpath
 
781
        return base
 
782
 
 
783
    def get_readonly_server(self):
 
784
        """Get the server instance for the readonly transport
 
785
 
 
786
        This is useful for some tests with specific servers to do diagnostics.
 
787
        """
 
788
        if self.__readonly_server is None:
 
789
            if self.transport_readonly_server is None:
 
790
                # readonly decorator requested
 
791
                # bring up the server
 
792
                self.get_url()
 
793
                self.__readonly_server = ReadonlyServer()
 
794
                self.__readonly_server.setUp(self.__server)
 
795
            else:
 
796
                self.__readonly_server = self.transport_readonly_server()
 
797
                self.__readonly_server.setUp()
 
798
            self.addCleanup(self.__readonly_server.tearDown)
 
799
        return self.__readonly_server
 
800
 
 
801
    def get_server(self):
 
802
        """Get the read/write server instance.
 
803
 
 
804
        This is useful for some tests with specific servers that need
 
805
        diagnostics.
 
806
        """
 
807
        if self.__server is None:
 
808
            self.__server = self.transport_server()
 
809
            self.__server.setUp()
 
810
            self.addCleanup(self.__server.tearDown)
 
811
        return self.__server
 
812
 
 
813
    def get_url(self, relpath=None):
 
814
        """Get a URL for the readwrite transport.
 
815
 
 
816
        This will either be backed by '.' or to an equivalent non-file based
 
817
        facility.
 
818
        relpath provides for clients to get a path relative to the base url.
 
819
        These should only be downwards relative, not upwards.
 
820
        """
 
821
        base = self.get_server().get_url()
 
822
        if relpath is not None and relpath != '.':
 
823
            if not base.endswith('/'):
 
824
                base = base + '/'
 
825
            base = base + urlutils.escape(relpath)
 
826
        return base
 
827
 
 
828
    def get_transport(self):
 
829
        """Return a writeable transport for the test scratch space"""
 
830
        t = get_transport(self.get_url())
 
831
        self.assertFalse(t.is_readonly())
 
832
        return t
 
833
 
 
834
    def get_readonly_transport(self):
 
835
        """Return a readonly transport for the test scratch space
 
836
        
 
837
        This can be used to test that operations which should only need
 
838
        readonly access in fact do not try to write.
 
839
        """
 
840
        t = get_transport(self.get_readonly_url())
 
841
        self.assertTrue(t.is_readonly())
 
842
        return t
 
843
 
 
844
    def make_branch(self, relpath, format=None):
 
845
        """Create a branch on the transport at relpath."""
 
846
        repo = self.make_repository(relpath, format=format)
 
847
        return repo.bzrdir.create_branch()
 
848
 
 
849
    def make_bzrdir(self, relpath, format=None):
 
850
        try:
 
851
            url = self.get_url(relpath)
 
852
            mutter('relpath %r => url %r', relpath, url)
 
853
            segments = url.split('/')
 
854
            if segments and segments[-1] not in ('', '.'):
 
855
                parent = '/'.join(segments[:-1])
 
856
                t = get_transport(parent)
 
857
                try:
 
858
                    t.mkdir(segments[-1])
 
859
                except errors.FileExists:
 
860
                    pass
 
861
            if format is None:
 
862
                format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
 
863
            # FIXME: make this use a single transport someday. RBC 20060418
 
864
            return format.initialize_on_transport(get_transport(relpath))
 
865
        except errors.UninitializableFormat:
 
866
            raise TestSkipped("Format %s is not initializable.")
 
867
 
 
868
    def make_repository(self, relpath, shared=False, format=None):
 
869
        """Create a repository on our default transport at relpath."""
 
870
        made_control = self.make_bzrdir(relpath, format=format)
 
871
        return made_control.create_repository(shared=shared)
 
872
 
 
873
    def make_branch_and_tree(self, relpath, format=None):
 
874
        """Create a branch on the transport and a tree locally.
 
875
 
 
876
        Returns the tree.
 
877
        """
 
878
        # TODO: always use the local disk path for the working tree,
 
879
        # this obviously requires a format that supports branch references
 
880
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
881
        # RBC 20060208
 
882
        b = self.make_branch(relpath, format=format)
 
883
        try:
 
884
            return b.bzrdir.create_workingtree()
 
885
        except errors.NotLocalUrl:
 
886
            # new formats - catch No tree error and create
 
887
            # a branch reference and a checkout.
 
888
            # old formats at that point - raise TestSkipped.
 
889
            # TODO: rbc 20060208
 
890
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
 
891
 
 
892
    def assertIsDirectory(self, relpath, transport):
 
893
        """Assert that relpath within transport is a directory.
 
894
 
 
895
        This may not be possible on all transports; in that case it propagates
 
896
        a TransportNotPossible.
 
897
        """
 
898
        try:
 
899
            mode = transport.stat(relpath).st_mode
 
900
        except errors.NoSuchFile:
 
901
            self.fail("path %s is not a directory; no such file"
 
902
                      % (relpath))
 
903
        if not stat.S_ISDIR(mode):
 
904
            self.fail("path %s is not a directory; has mode %#o"
 
905
                      % (relpath, mode))
 
906
 
 
907
 
 
908
class ChrootedTestCase(TestCaseWithTransport):
 
909
    """A support class that provides readonly urls outside the local namespace.
 
910
 
 
911
    This is done by checking if self.transport_server is a MemoryServer. if it
 
912
    is then we are chrooted already, if it is not then an HttpServer is used
 
913
    for readonly urls.
 
914
 
 
915
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
916
                       be used without needed to redo it when a different 
 
917
                       subclass is in use ?
 
918
    """
 
919
 
 
920
    def setUp(self):
 
921
        super(ChrootedTestCase, self).setUp()
 
922
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
923
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
924
 
 
925
 
 
926
def filter_suite_by_re(suite, pattern):
 
927
    result = TestSuite()
 
928
    filter_re = re.compile(pattern)
 
929
    for test in iter_suite_tests(suite):
 
930
        if filter_re.search(test.id()):
 
931
            result.addTest(test)
 
932
    return result
 
933
 
 
934
 
 
935
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
936
              stop_on_failure=False, keep_output=False,
 
937
              transport=None):
 
938
    TestCaseInTempDir._TEST_NAME = name
 
939
    if verbose:
 
940
        verbosity = 2
 
941
    else:
 
942
        verbosity = 1
 
943
    runner = TextTestRunner(stream=sys.stdout,
 
944
                            descriptions=0,
 
945
                            verbosity=verbosity)
 
946
    runner.stop_on_failure=stop_on_failure
 
947
    if pattern != '.*':
 
948
        suite = filter_suite_by_re(suite, pattern)
 
949
    result = runner.run(suite)
 
950
    # This is still a little bogus, 
 
951
    # but only a little. Folk not using our testrunner will
 
952
    # have to delete their temp directories themselves.
 
953
    test_root = TestCaseInTempDir.TEST_ROOT
 
954
    if result.wasSuccessful() or not keep_output:
 
955
        if test_root is not None:
 
956
            print 'Deleting test root %s...' % test_root
 
957
            try:
 
958
                shutil.rmtree(test_root)
 
959
            finally:
 
960
                print
 
961
    else:
 
962
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
963
    return result.wasSuccessful()
 
964
 
 
965
 
 
966
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
967
             keep_output=False,
 
968
             transport=None):
 
969
    """Run the whole test suite under the enhanced runner"""
 
970
    global default_transport
 
971
    if transport is None:
 
972
        transport = default_transport
 
973
    old_transport = default_transport
 
974
    default_transport = transport
 
975
    suite = test_suite()
 
976
    try:
 
977
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
978
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
979
                     transport=transport)
 
980
    finally:
 
981
        default_transport = old_transport
 
982
 
 
983
 
 
984
 
 
985
def test_suite():
 
986
    """Build and return TestSuite for the whole program."""
 
987
    from doctest import DocTestSuite
 
988
 
 
989
    global MODULES_TO_DOCTEST
 
990
 
 
991
    testmod_names = [ \
 
992
                   'bzrlib.tests.test_ancestry',
 
993
                   'bzrlib.tests.test_annotate',
 
994
                   'bzrlib.tests.test_api',
 
995
                   'bzrlib.tests.test_bad_files',
 
996
                   'bzrlib.tests.test_branch',
 
997
                   'bzrlib.tests.test_bzrdir',
 
998
                   'bzrlib.tests.test_command',
 
999
                   'bzrlib.tests.test_commit',
 
1000
                   'bzrlib.tests.test_commit_merge',
 
1001
                   'bzrlib.tests.test_config',
 
1002
                   'bzrlib.tests.test_conflicts',
 
1003
                   'bzrlib.tests.test_decorators',
 
1004
                   'bzrlib.tests.test_diff',
 
1005
                   'bzrlib.tests.test_doc_generate',
 
1006
                   'bzrlib.tests.test_errors',
 
1007
                   'bzrlib.tests.test_escaped_store',
 
1008
                   'bzrlib.tests.test_fetch',
 
1009
                   'bzrlib.tests.test_gpg',
 
1010
                   'bzrlib.tests.test_graph',
 
1011
                   'bzrlib.tests.test_hashcache',
 
1012
                   'bzrlib.tests.test_http',
 
1013
                   'bzrlib.tests.test_identitymap',
 
1014
                   'bzrlib.tests.test_inv',
 
1015
                   'bzrlib.tests.test_knit',
 
1016
                   'bzrlib.tests.test_lockdir',
 
1017
                   'bzrlib.tests.test_lockable_files',
 
1018
                   'bzrlib.tests.test_log',
 
1019
                   'bzrlib.tests.test_merge',
 
1020
                   'bzrlib.tests.test_merge3',
 
1021
                   'bzrlib.tests.test_merge_core',
 
1022
                   'bzrlib.tests.test_missing',
 
1023
                   'bzrlib.tests.test_msgeditor',
 
1024
                   'bzrlib.tests.test_nonascii',
 
1025
                   'bzrlib.tests.test_options',
 
1026
                   'bzrlib.tests.test_osutils',
 
1027
                   'bzrlib.tests.test_patch',
 
1028
                   'bzrlib.tests.test_permissions',
 
1029
                   'bzrlib.tests.test_plugins',
 
1030
                   'bzrlib.tests.test_progress',
 
1031
                   'bzrlib.tests.test_reconcile',
 
1032
                   'bzrlib.tests.test_repository',
 
1033
                   'bzrlib.tests.test_revision',
 
1034
                   'bzrlib.tests.test_revisionnamespaces',
 
1035
                   'bzrlib.tests.test_revprops',
 
1036
                   'bzrlib.tests.test_rio',
 
1037
                   'bzrlib.tests.test_sampler',
 
1038
                   'bzrlib.tests.test_selftest',
 
1039
                   'bzrlib.tests.test_setup',
 
1040
                   'bzrlib.tests.test_sftp_transport',
 
1041
                   'bzrlib.tests.test_smart_add',
 
1042
                   'bzrlib.tests.test_source',
 
1043
                   'bzrlib.tests.test_store',
 
1044
                   'bzrlib.tests.test_symbol_versioning',
 
1045
                   'bzrlib.tests.test_testament',
 
1046
                   'bzrlib.tests.test_textfile',
 
1047
                   'bzrlib.tests.test_textmerge',
 
1048
                   'bzrlib.tests.test_trace',
 
1049
                   'bzrlib.tests.test_transactions',
 
1050
                   'bzrlib.tests.test_transform',
 
1051
                   'bzrlib.tests.test_transport',
 
1052
                   'bzrlib.tests.test_tsort',
 
1053
                   'bzrlib.tests.test_tuned_gzip',
 
1054
                   'bzrlib.tests.test_ui',
 
1055
                   'bzrlib.tests.test_upgrade',
 
1056
                   'bzrlib.tests.test_urlutils',
 
1057
                   'bzrlib.tests.test_versionedfile',
 
1058
                   'bzrlib.tests.test_weave',
 
1059
                   'bzrlib.tests.test_whitebox',
 
1060
                   'bzrlib.tests.test_workingtree',
 
1061
                   'bzrlib.tests.test_xml',
 
1062
                   ]
 
1063
    test_transport_implementations = [
 
1064
        'bzrlib.tests.test_transport_implementations']
 
1065
 
 
1066
    TestCase.BZRPATH = osutils.pathjoin(
 
1067
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
 
1068
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
 
1069
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
 
1070
    print
 
1071
    suite = TestSuite()
 
1072
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
 
1073
    # errors if it fails to load a named module - no indication of what's
 
1074
    # actually wrong, just "no such module".  We should probably override that
 
1075
    # class, but for the moment just load them ourselves. (mbp 20051202)
 
1076
    loader = TestLoader()
 
1077
    from bzrlib.transport import TransportTestProviderAdapter
 
1078
    adapter = TransportTestProviderAdapter()
 
1079
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1080
    for mod_name in testmod_names:
 
1081
        mod = _load_module_by_name(mod_name)
 
1082
        suite.addTest(loader.loadTestsFromModule(mod))
 
1083
    for package in packages_to_test():
 
1084
        suite.addTest(package.test_suite())
 
1085
    for m in MODULES_TO_TEST:
 
1086
        suite.addTest(loader.loadTestsFromModule(m))
 
1087
    for m in (MODULES_TO_DOCTEST):
 
1088
        suite.addTest(DocTestSuite(m))
 
1089
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1090
        if getattr(plugin, 'test_suite', None) is not None:
 
1091
            suite.addTest(plugin.test_suite())
 
1092
    return suite
 
1093
 
 
1094
 
 
1095
def adapt_modules(mods_list, adapter, loader, suite):
 
1096
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1097
    for mod_name in mods_list:
 
1098
        mod = _load_module_by_name(mod_name)
 
1099
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
 
1100
            suite.addTests(adapter.adapt(test))
 
1101
 
 
1102
 
 
1103
def _load_module_by_name(mod_name):
 
1104
    parts = mod_name.split('.')
 
1105
    module = __import__(mod_name)
 
1106
    del parts[0]
 
1107
    # for historical reasons python returns the top-level module even though
 
1108
    # it loads the submodule; we need to walk down to get the one we want.
 
1109
    while parts:
 
1110
        module = getattr(module, parts.pop(0))
 
1111
    return module