/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Aaron Bentley
  • Date: 2006-02-21 02:26:30 UTC
  • mto: (1558.1.1 bzr.ab.integration)
  • mto: This revision was merged to the branch mainline in revision 1559.
  • Revision ID: aaron.bentley@utoronto.ca-20060221022630-c74618c305be4ffd
Switched to ConfigObj 4.2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
import codecs
 
25
from cStringIO import StringIO
 
26
import difflib
 
27
import errno
 
28
import logging
 
29
import os
 
30
import re
 
31
import shutil
 
32
import stat
 
33
import sys
 
34
import tempfile
 
35
import unittest
 
36
import time
 
37
 
 
38
 
 
39
import bzrlib.branch
 
40
import bzrlib.bzrdir as bzrdir
 
41
import bzrlib.commands
 
42
import bzrlib.errors as errors
 
43
import bzrlib.inventory
 
44
import bzrlib.iterablefile
 
45
import bzrlib.merge3
 
46
import bzrlib.osutils
 
47
import bzrlib.osutils as osutils
 
48
import bzrlib.plugin
 
49
import bzrlib.store
 
50
import bzrlib.trace
 
51
from bzrlib.transport import urlescape
 
52
import bzrlib.transport
 
53
from bzrlib.transport.local import LocalRelpathServer
 
54
from bzrlib.transport.readonly import ReadonlyServer
 
55
from bzrlib.trace import mutter
 
56
from bzrlib.tests.TestUtil import TestLoader, TestSuite
 
57
from bzrlib.tests.treeshape import build_tree_contents
 
58
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
59
 
 
60
default_transport = LocalRelpathServer
 
61
 
 
62
MODULES_TO_TEST = []
 
63
MODULES_TO_DOCTEST = [
 
64
                      bzrlib.branch,
 
65
                      bzrlib.commands,
 
66
                      bzrlib.errors,
 
67
                      bzrlib.inventory,
 
68
                      bzrlib.iterablefile,
 
69
                      bzrlib.merge3,
 
70
                      bzrlib.option,
 
71
                      bzrlib.osutils,
 
72
                      bzrlib.store
 
73
                      ]
 
74
def packages_to_test():
 
75
    """Return a list of packages to test.
 
76
 
 
77
    The packages are not globally imported so that import failures are
 
78
    triggered when running selftest, not when importing the command.
 
79
    """
 
80
    import bzrlib.doc
 
81
    import bzrlib.tests.blackbox
 
82
    import bzrlib.tests.branch_implementations
 
83
    import bzrlib.tests.bzrdir_implementations
 
84
    import bzrlib.tests.repository_implementations
 
85
    import bzrlib.tests.workingtree_implementations
 
86
    return [
 
87
            bzrlib.doc,
 
88
            bzrlib.tests.blackbox,
 
89
            bzrlib.tests.branch_implementations,
 
90
            bzrlib.tests.bzrdir_implementations,
 
91
            bzrlib.tests.repository_implementations,
 
92
            bzrlib.tests.workingtree_implementations,
 
93
            ]
 
94
 
 
95
 
 
96
class _MyResult(unittest._TextTestResult):
 
97
    """Custom TestResult.
 
98
 
 
99
    Shows output in a different format, including displaying runtime for tests.
 
100
    """
 
101
    stop_early = False
 
102
 
 
103
    def _elapsedTime(self):
 
104
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
105
 
 
106
    def startTest(self, test):
 
107
        unittest.TestResult.startTest(self, test)
 
108
        # In a short description, the important words are in
 
109
        # the beginning, but in an id, the important words are
 
110
        # at the end
 
111
        SHOW_DESCRIPTIONS = False
 
112
        if self.showAll:
 
113
            width = osutils.terminal_width()
 
114
            name_width = width - 15
 
115
            what = None
 
116
            if SHOW_DESCRIPTIONS:
 
117
                what = test.shortDescription()
 
118
                if what:
 
119
                    if len(what) > name_width:
 
120
                        what = what[:name_width-3] + '...'
 
121
            if what is None:
 
122
                what = test.id()
 
123
                if what.startswith('bzrlib.tests.'):
 
124
                    what = what[13:]
 
125
                if len(what) > name_width:
 
126
                    what = '...' + what[3-name_width:]
 
127
            what = what.ljust(name_width)
 
128
            self.stream.write(what)
 
129
        self.stream.flush()
 
130
        self._start_time = time.time()
 
131
 
 
132
    def addError(self, test, err):
 
133
        if isinstance(err[1], TestSkipped):
 
134
            return self.addSkipped(test, err)    
 
135
        unittest.TestResult.addError(self, test, err)
 
136
        if self.showAll:
 
137
            self.stream.writeln("ERROR %s" % self._elapsedTime())
 
138
        elif self.dots:
 
139
            self.stream.write('E')
 
140
        self.stream.flush()
 
141
        if self.stop_early:
 
142
            self.stop()
 
143
 
 
144
    def addFailure(self, test, err):
 
145
        unittest.TestResult.addFailure(self, test, err)
 
146
        if self.showAll:
 
147
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
 
148
        elif self.dots:
 
149
            self.stream.write('F')
 
150
        self.stream.flush()
 
151
        if self.stop_early:
 
152
            self.stop()
 
153
 
 
154
    def addSuccess(self, test):
 
155
        if self.showAll:
 
156
            self.stream.writeln('   OK %s' % self._elapsedTime())
 
157
        elif self.dots:
 
158
            self.stream.write('~')
 
159
        self.stream.flush()
 
160
        unittest.TestResult.addSuccess(self, test)
 
161
 
 
162
    def addSkipped(self, test, skip_excinfo):
 
163
        if self.showAll:
 
164
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
 
165
            print >>self.stream, '     %s' % skip_excinfo[1]
 
166
        elif self.dots:
 
167
            self.stream.write('S')
 
168
        self.stream.flush()
 
169
        # seems best to treat this as success from point-of-view of unittest
 
170
        # -- it actually does nothing so it barely matters :)
 
171
        unittest.TestResult.addSuccess(self, test)
 
172
 
 
173
    def printErrorList(self, flavour, errors):
 
174
        for test, err in errors:
 
175
            self.stream.writeln(self.separator1)
 
176
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
177
            if getattr(test, '_get_log', None) is not None:
 
178
                print >>self.stream
 
179
                print >>self.stream, \
 
180
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
181
                print >>self.stream, test._get_log()
 
182
                print >>self.stream, \
 
183
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
184
            self.stream.writeln(self.separator2)
 
185
            self.stream.writeln("%s" % err)
 
186
 
 
187
 
 
188
class TextTestRunner(unittest.TextTestRunner):
 
189
    stop_on_failure = False
 
190
 
 
191
    def _makeResult(self):
 
192
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
193
        result.stop_early = self.stop_on_failure
 
194
        return result
 
195
 
 
196
 
 
197
def iter_suite_tests(suite):
 
198
    """Return all tests in a suite, recursing through nested suites"""
 
199
    for item in suite._tests:
 
200
        if isinstance(item, unittest.TestCase):
 
201
            yield item
 
202
        elif isinstance(item, unittest.TestSuite):
 
203
            for r in iter_suite_tests(item):
 
204
                yield r
 
205
        else:
 
206
            raise Exception('unknown object %r inside test suite %r'
 
207
                            % (item, suite))
 
208
 
 
209
 
 
210
class TestSkipped(Exception):
 
211
    """Indicates that a test was intentionally skipped, rather than failing."""
 
212
    # XXX: Not used yet
 
213
 
 
214
 
 
215
class CommandFailed(Exception):
 
216
    pass
 
217
 
 
218
class TestCase(unittest.TestCase):
 
219
    """Base class for bzr unit tests.
 
220
    
 
221
    Tests that need access to disk resources should subclass 
 
222
    TestCaseInTempDir not TestCase.
 
223
 
 
224
    Error and debug log messages are redirected from their usual
 
225
    location into a temporary file, the contents of which can be
 
226
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
227
    so that it can also capture file IO.  When the test completes this file
 
228
    is read into memory and removed from disk.
 
229
       
 
230
    There are also convenience functions to invoke bzr's command-line
 
231
    routine, and to build and check bzr trees.
 
232
   
 
233
    In addition to the usual method of overriding tearDown(), this class also
 
234
    allows subclasses to register functions into the _cleanups list, which is
 
235
    run in order as the object is torn down.  It's less likely this will be
 
236
    accidentally overlooked.
 
237
    """
 
238
 
 
239
    BZRPATH = 'bzr'
 
240
    _log_file_name = None
 
241
    _log_contents = ''
 
242
 
 
243
    def __init__(self, methodName='testMethod'):
 
244
        super(TestCase, self).__init__(methodName)
 
245
        self._cleanups = []
 
246
 
 
247
    def setUp(self):
 
248
        unittest.TestCase.setUp(self)
 
249
        self._cleanEnvironment()
 
250
        bzrlib.trace.disable_default_logging()
 
251
        self._startLogFile()
 
252
 
 
253
    def _ndiff_strings(self, a, b):
 
254
        """Return ndiff between two strings containing lines.
 
255
        
 
256
        A trailing newline is added if missing to make the strings
 
257
        print properly."""
 
258
        if b and b[-1] != '\n':
 
259
            b += '\n'
 
260
        if a and a[-1] != '\n':
 
261
            a += '\n'
 
262
        difflines = difflib.ndiff(a.splitlines(True),
 
263
                                  b.splitlines(True),
 
264
                                  linejunk=lambda x: False,
 
265
                                  charjunk=lambda x: False)
 
266
        return ''.join(difflines)
 
267
 
 
268
    def assertEqualDiff(self, a, b, message=None):
 
269
        """Assert two texts are equal, if not raise an exception.
 
270
        
 
271
        This is intended for use with multi-line strings where it can 
 
272
        be hard to find the differences by eye.
 
273
        """
 
274
        # TODO: perhaps override assertEquals to call this for strings?
 
275
        if a == b:
 
276
            return
 
277
        if message is None:
 
278
            message = "texts not equal:\n"
 
279
        raise AssertionError(message + 
 
280
                             self._ndiff_strings(a, b))      
 
281
        
 
282
    def assertEqualMode(self, mode, mode_test):
 
283
        self.assertEqual(mode, mode_test,
 
284
                         'mode mismatch %o != %o' % (mode, mode_test))
 
285
 
 
286
    def assertStartsWith(self, s, prefix):
 
287
        if not s.startswith(prefix):
 
288
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
289
 
 
290
    def assertEndsWith(self, s, suffix):
 
291
        if not s.endswith(prefix):
 
292
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
293
 
 
294
    def assertContainsRe(self, haystack, needle_re):
 
295
        """Assert that a contains something matching a regular expression."""
 
296
        if not re.search(needle_re, haystack):
 
297
            raise AssertionError('pattern "%s" not found in "%s"'
 
298
                    % (needle_re, haystack))
 
299
 
 
300
    def AssertSubset(self, sublist, superlist):
 
301
        """Assert that every entry in sublist is present in superlist."""
 
302
        missing = []
 
303
        for entry in sublist:
 
304
            if entry not in superlist:
 
305
                missing.append(entry)
 
306
        if len(missing) > 0:
 
307
            raise AssertionError("value(s) %r not present in container %r" % 
 
308
                                 (missing, superlist))
 
309
 
 
310
    def assertIs(self, left, right):
 
311
        if not (left is right):
 
312
            raise AssertionError("%r is not %r." % (left, right))
 
313
 
 
314
    def assertTransportMode(self, transport, path, mode):
 
315
        """Fail if a path does not have mode mode.
 
316
        
 
317
        If modes are not supported on this platform, the test is skipped.
 
318
        """
 
319
        if sys.platform == 'win32':
 
320
            return
 
321
        path_stat = transport.stat(path)
 
322
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
323
        self.assertEqual(mode, actual_mode,
 
324
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
325
 
 
326
    def _startLogFile(self):
 
327
        """Send bzr and test log messages to a temporary file.
 
328
 
 
329
        The file is removed as the test is torn down.
 
330
        """
 
331
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
332
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
 
333
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
 
334
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
335
        self._log_file_name = name
 
336
        self.addCleanup(self._finishLogFile)
 
337
 
 
338
    def _finishLogFile(self):
 
339
        """Finished with the log file.
 
340
 
 
341
        Read contents into memory, close, and delete.
 
342
        """
 
343
        bzrlib.trace.disable_test_log(self._log_nonce)
 
344
        self._log_file.seek(0)
 
345
        self._log_contents = self._log_file.read()
 
346
        self._log_file.close()
 
347
        os.remove(self._log_file_name)
 
348
        self._log_file = self._log_file_name = None
 
349
 
 
350
    def addCleanup(self, callable):
 
351
        """Arrange to run a callable when this case is torn down.
 
352
 
 
353
        Callables are run in the reverse of the order they are registered, 
 
354
        ie last-in first-out.
 
355
        """
 
356
        if callable in self._cleanups:
 
357
            raise ValueError("cleanup function %r already registered on %s" 
 
358
                    % (callable, self))
 
359
        self._cleanups.append(callable)
 
360
 
 
361
    def _cleanEnvironment(self):
 
362
        new_env = {
 
363
            'HOME': os.getcwd(),
 
364
            'APPDATA': os.getcwd(),
 
365
            'BZREMAIL': None,
 
366
            'EMAIL': None,
 
367
        }
 
368
        self.__old_env = {}
 
369
        self.addCleanup(self._restoreEnvironment)
 
370
        for name, value in new_env.iteritems():
 
371
            self._captureVar(name, value)
 
372
 
 
373
 
 
374
    def _captureVar(self, name, newvalue):
 
375
        """Set an environment variable, preparing it to be reset when finished."""
 
376
        self.__old_env[name] = os.environ.get(name, None)
 
377
        if newvalue is None:
 
378
            if name in os.environ:
 
379
                del os.environ[name]
 
380
        else:
 
381
            os.environ[name] = newvalue
 
382
 
 
383
    @staticmethod
 
384
    def _restoreVar(name, value):
 
385
        if value is None:
 
386
            if name in os.environ:
 
387
                del os.environ[name]
 
388
        else:
 
389
            os.environ[name] = value
 
390
 
 
391
    def _restoreEnvironment(self):
 
392
        for name, value in self.__old_env.iteritems():
 
393
            self._restoreVar(name, value)
 
394
 
 
395
    def tearDown(self):
 
396
        self._runCleanups()
 
397
        unittest.TestCase.tearDown(self)
 
398
 
 
399
    def _runCleanups(self):
 
400
        """Run registered cleanup functions. 
 
401
 
 
402
        This should only be called from TestCase.tearDown.
 
403
        """
 
404
        # TODO: Perhaps this should keep running cleanups even if 
 
405
        # one of them fails?
 
406
        for cleanup_fn in reversed(self._cleanups):
 
407
            cleanup_fn()
 
408
 
 
409
    def log(self, *args):
 
410
        mutter(*args)
 
411
 
 
412
    def _get_log(self):
 
413
        """Return as a string the log for this test"""
 
414
        if self._log_file_name:
 
415
            return open(self._log_file_name).read()
 
416
        else:
 
417
            return self._log_contents
 
418
        # TODO: Delete the log after it's been read in
 
419
 
 
420
    def capture(self, cmd, retcode=0):
 
421
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
422
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
423
 
 
424
    def run_bzr_captured(self, argv, retcode=0):
 
425
        """Invoke bzr and return (stdout, stderr).
 
426
 
 
427
        Useful for code that wants to check the contents of the
 
428
        output, the way error messages are presented, etc.
 
429
 
 
430
        This should be the main method for tests that want to exercise the
 
431
        overall behavior of the bzr application (rather than a unit test
 
432
        or a functional test of the library.)
 
433
 
 
434
        Much of the old code runs bzr by forking a new copy of Python, but
 
435
        that is slower, harder to debug, and generally not necessary.
 
436
 
 
437
        This runs bzr through the interface that catches and reports
 
438
        errors, and with logging set to something approximating the
 
439
        default, so that error reporting can be checked.
 
440
 
 
441
        argv -- arguments to invoke bzr
 
442
        retcode -- expected return code, or None for don't-care.
 
443
        """
 
444
        stdout = StringIO()
 
445
        stderr = StringIO()
 
446
        self.log('run bzr: %s', ' '.join(argv))
 
447
        # FIXME: don't call into logging here
 
448
        handler = logging.StreamHandler(stderr)
 
449
        handler.setFormatter(bzrlib.trace.QuietFormatter())
 
450
        handler.setLevel(logging.INFO)
 
451
        logger = logging.getLogger('')
 
452
        logger.addHandler(handler)
 
453
        try:
 
454
            result = self.apply_redirected(None, stdout, stderr,
 
455
                                           bzrlib.commands.run_bzr_catch_errors,
 
456
                                           argv)
 
457
        finally:
 
458
            logger.removeHandler(handler)
 
459
        out = stdout.getvalue()
 
460
        err = stderr.getvalue()
 
461
        if out:
 
462
            self.log('output:\n%s', out)
 
463
        if err:
 
464
            self.log('errors:\n%s', err)
 
465
        if retcode is not None:
 
466
            self.assertEquals(result, retcode)
 
467
        return out, err
 
468
 
 
469
    def run_bzr(self, *args, **kwargs):
 
470
        """Invoke bzr, as if it were run from the command line.
 
471
 
 
472
        This should be the main method for tests that want to exercise the
 
473
        overall behavior of the bzr application (rather than a unit test
 
474
        or a functional test of the library.)
 
475
 
 
476
        This sends the stdout/stderr results into the test's log,
 
477
        where it may be useful for debugging.  See also run_captured.
 
478
        """
 
479
        retcode = kwargs.pop('retcode', 0)
 
480
        return self.run_bzr_captured(args, retcode)
 
481
 
 
482
    def check_inventory_shape(self, inv, shape):
 
483
        """Compare an inventory to a list of expected names.
 
484
 
 
485
        Fail if they are not precisely equal.
 
486
        """
 
487
        extras = []
 
488
        shape = list(shape)             # copy
 
489
        for path, ie in inv.entries():
 
490
            name = path.replace('\\', '/')
 
491
            if ie.kind == 'dir':
 
492
                name = name + '/'
 
493
            if name in shape:
 
494
                shape.remove(name)
 
495
            else:
 
496
                extras.append(name)
 
497
        if shape:
 
498
            self.fail("expected paths not found in inventory: %r" % shape)
 
499
        if extras:
 
500
            self.fail("unexpected paths found in inventory: %r" % extras)
 
501
 
 
502
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
503
                         a_callable=None, *args, **kwargs):
 
504
        """Call callable with redirected std io pipes.
 
505
 
 
506
        Returns the return code."""
 
507
        if not callable(a_callable):
 
508
            raise ValueError("a_callable must be callable.")
 
509
        if stdin is None:
 
510
            stdin = StringIO("")
 
511
        if stdout is None:
 
512
            if getattr(self, "_log_file", None) is not None:
 
513
                stdout = self._log_file
 
514
            else:
 
515
                stdout = StringIO()
 
516
        if stderr is None:
 
517
            if getattr(self, "_log_file", None is not None):
 
518
                stderr = self._log_file
 
519
            else:
 
520
                stderr = StringIO()
 
521
        real_stdin = sys.stdin
 
522
        real_stdout = sys.stdout
 
523
        real_stderr = sys.stderr
 
524
        try:
 
525
            sys.stdout = stdout
 
526
            sys.stderr = stderr
 
527
            sys.stdin = stdin
 
528
            return a_callable(*args, **kwargs)
 
529
        finally:
 
530
            sys.stdout = real_stdout
 
531
            sys.stderr = real_stderr
 
532
            sys.stdin = real_stdin
 
533
 
 
534
 
 
535
BzrTestBase = TestCase
 
536
 
 
537
     
 
538
class TestCaseInTempDir(TestCase):
 
539
    """Derived class that runs a test within a temporary directory.
 
540
 
 
541
    This is useful for tests that need to create a branch, etc.
 
542
 
 
543
    The directory is created in a slightly complex way: for each
 
544
    Python invocation, a new temporary top-level directory is created.
 
545
    All test cases create their own directory within that.  If the
 
546
    tests complete successfully, the directory is removed.
 
547
 
 
548
    InTempDir is an old alias for FunctionalTestCase.
 
549
    """
 
550
 
 
551
    TEST_ROOT = None
 
552
    _TEST_NAME = 'test'
 
553
    OVERRIDE_PYTHON = 'python'
 
554
 
 
555
    def check_file_contents(self, filename, expect):
 
556
        self.log("check contents of file %s" % filename)
 
557
        contents = file(filename, 'r').read()
 
558
        if contents != expect:
 
559
            self.log("expected: %r" % expect)
 
560
            self.log("actually: %r" % contents)
 
561
            self.fail("contents of %s not as expected" % filename)
 
562
 
 
563
    def _make_test_root(self):
 
564
        if TestCaseInTempDir.TEST_ROOT is not None:
 
565
            return
 
566
        i = 0
 
567
        while True:
 
568
            root = u'test%04d.tmp' % i
 
569
            try:
 
570
                os.mkdir(root)
 
571
            except OSError, e:
 
572
                if e.errno == errno.EEXIST:
 
573
                    i += 1
 
574
                    continue
 
575
                else:
 
576
                    raise
 
577
            # successfully created
 
578
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
579
            break
 
580
        # make a fake bzr directory there to prevent any tests propagating
 
581
        # up onto the source directory's real branch
 
582
        os.mkdir(osutils.pathjoin(TestCaseInTempDir.TEST_ROOT, '.bzr'))
 
583
 
 
584
    def setUp(self):
 
585
        super(TestCaseInTempDir, self).setUp()
 
586
        self._make_test_root()
 
587
        _currentdir = os.getcwdu()
 
588
        short_id = self.id().replace('bzrlib.tests.', '') \
 
589
                   .replace('__main__.', '')
 
590
        self.test_dir = osutils.pathjoin(self.TEST_ROOT, short_id)
 
591
        os.mkdir(self.test_dir)
 
592
        os.chdir(self.test_dir)
 
593
        os.environ['HOME'] = self.test_dir
 
594
        os.environ['APPDATA'] = self.test_dir
 
595
        def _leaveDirectory():
 
596
            os.chdir(_currentdir)
 
597
        self.addCleanup(_leaveDirectory)
 
598
        
 
599
    def build_tree(self, shape, line_endings='native', transport=None):
 
600
        """Build a test tree according to a pattern.
 
601
 
 
602
        shape is a sequence of file specifications.  If the final
 
603
        character is '/', a directory is created.
 
604
 
 
605
        This doesn't add anything to a branch.
 
606
        :param line_endings: Either 'binary' or 'native'
 
607
                             in binary mode, exact contents are written
 
608
                             in native mode, the line endings match the
 
609
                             default platform endings.
 
610
 
 
611
        :param transport: A transport to write to, for building trees on 
 
612
                          VFS's. If the transport is readonly or None,
 
613
                          "." is opened automatically.
 
614
        """
 
615
        # XXX: It's OK to just create them using forward slashes on windows?
 
616
        if transport is None or transport.is_readonly():
 
617
            transport = bzrlib.transport.get_transport(".")
 
618
        for name in shape:
 
619
            self.assert_(isinstance(name, basestring))
 
620
            if name[-1] == '/':
 
621
                transport.mkdir(urlescape(name[:-1]))
 
622
            else:
 
623
                if line_endings == 'binary':
 
624
                    end = '\n'
 
625
                elif line_endings == 'native':
 
626
                    end = os.linesep
 
627
                else:
 
628
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
 
629
                content = "contents of %s%s" % (name, end)
 
630
                transport.put(urlescape(name), StringIO(content))
 
631
 
 
632
    def build_tree_contents(self, shape):
 
633
        build_tree_contents(shape)
 
634
 
 
635
    def failUnlessExists(self, path):
 
636
        """Fail unless path, which may be abs or relative, exists."""
 
637
        self.failUnless(osutils.lexists(path))
 
638
 
 
639
    def failIfExists(self, path):
 
640
        """Fail if path, which may be abs or relative, exists."""
 
641
        self.failIf(osutils.lexists(path))
 
642
        
 
643
    def assertFileEqual(self, content, path):
 
644
        """Fail if path does not contain 'content'."""
 
645
        self.failUnless(osutils.lexists(path))
 
646
        self.assertEqualDiff(content, open(path, 'r').read())
 
647
 
 
648
 
 
649
class TestCaseWithTransport(TestCaseInTempDir):
 
650
    """A test case that provides get_url and get_readonly_url facilities.
 
651
 
 
652
    These back onto two transport servers, one for readonly access and one for
 
653
    read write access.
 
654
 
 
655
    If no explicit class is provided for readonly access, a
 
656
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
657
    based read write transports.
 
658
 
 
659
    If an explicit class is provided for readonly access, that server and the 
 
660
    readwrite one must both define get_url() as resolving to os.getcwd().
 
661
    """
 
662
 
 
663
    def __init__(self, methodName='testMethod'):
 
664
        super(TestCaseWithTransport, self).__init__(methodName)
 
665
        self.__readonly_server = None
 
666
        self.__server = None
 
667
        self.transport_server = default_transport
 
668
        self.transport_readonly_server = None
 
669
 
 
670
    def get_readonly_url(self, relpath=None):
 
671
        """Get a URL for the readonly transport.
 
672
 
 
673
        This will either be backed by '.' or a decorator to the transport 
 
674
        used by self.get_url()
 
675
        relpath provides for clients to get a path relative to the base url.
 
676
        These should only be downwards relative, not upwards.
 
677
        """
 
678
        base = self.get_readonly_server().get_url()
 
679
        if relpath is not None:
 
680
            if not base.endswith('/'):
 
681
                base = base + '/'
 
682
            base = base + relpath
 
683
        return base
 
684
 
 
685
    def get_readonly_server(self):
 
686
        """Get the server instance for the readonly transport
 
687
 
 
688
        This is useful for some tests with specific servers to do diagnostics.
 
689
        """
 
690
        if self.__readonly_server is None:
 
691
            if self.transport_readonly_server is None:
 
692
                # readonly decorator requested
 
693
                # bring up the server
 
694
                self.get_url()
 
695
                self.__readonly_server = ReadonlyServer()
 
696
                self.__readonly_server.setUp(self.__server)
 
697
            else:
 
698
                self.__readonly_server = self.transport_readonly_server()
 
699
                self.__readonly_server.setUp()
 
700
            self.addCleanup(self.__readonly_server.tearDown)
 
701
        return self.__readonly_server
 
702
 
 
703
    def get_server(self):
 
704
        """Get the read/write server instance.
 
705
 
 
706
        This is useful for some tests with specific servers that need
 
707
        diagnostics.
 
708
        """
 
709
        if self.__server is None:
 
710
            self.__server = self.transport_server()
 
711
            self.__server.setUp()
 
712
            self.addCleanup(self.__server.tearDown)
 
713
        return self.__server
 
714
 
 
715
    def get_url(self, relpath=None):
 
716
        """Get a URL for the readwrite transport.
 
717
 
 
718
        This will either be backed by '.' or to an equivalent non-file based
 
719
        facility.
 
720
        relpath provides for clients to get a path relative to the base url.
 
721
        These should only be downwards relative, not upwards.
 
722
        """
 
723
        base = self.get_server().get_url()
 
724
        if relpath is not None and relpath != '.':
 
725
            if not base.endswith('/'):
 
726
                base = base + '/'
 
727
            base = base + relpath
 
728
        return base
 
729
 
 
730
    def make_branch(self, relpath):
 
731
        """Create a branch on the transport at relpath."""
 
732
        repo = self.make_repository(relpath)
 
733
        return repo.bzrdir.create_branch()
 
734
 
 
735
    def make_bzrdir(self, relpath):
 
736
        try:
 
737
            url = self.get_url(relpath)
 
738
            segments = url.split('/')
 
739
            if segments and segments[-1] not in ('', '.'):
 
740
                parent = '/'.join(segments[:-1])
 
741
                t = bzrlib.transport.get_transport(parent)
 
742
                try:
 
743
                    t.mkdir(segments[-1])
 
744
                except errors.FileExists:
 
745
                    pass
 
746
            return bzrlib.bzrdir.BzrDir.create(url)
 
747
        except errors.UninitializableFormat:
 
748
            raise TestSkipped("Format %s is not initializable.")
 
749
 
 
750
    def make_repository(self, relpath, shared=False):
 
751
        """Create a repository on our default transport at relpath."""
 
752
        made_control = self.make_bzrdir(relpath)
 
753
        return made_control.create_repository(shared=shared)
 
754
 
 
755
    def make_branch_and_tree(self, relpath):
 
756
        """Create a branch on the transport and a tree locally.
 
757
 
 
758
        Returns the tree.
 
759
        """
 
760
        # TODO: always use the local disk path for the working tree,
 
761
        # this obviously requires a format that supports branch references
 
762
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
763
        # RBC 20060208
 
764
        b = self.make_branch(relpath)
 
765
        try:
 
766
            return b.bzrdir.create_workingtree()
 
767
        except errors.NotLocalUrl:
 
768
            # new formats - catch No tree error and create
 
769
            # a branch reference and a checkout.
 
770
            # old formats at that point - raise TestSkipped.
 
771
            # TODO: rbc 20060208
 
772
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
 
773
 
 
774
 
 
775
class ChrootedTestCase(TestCaseWithTransport):
 
776
    """A support class that provides readonly urls outside the local namespace.
 
777
 
 
778
    This is done by checking if self.transport_server is a MemoryServer. if it
 
779
    is then we are chrooted already, if it is not then an HttpServer is used
 
780
    for readonly urls.
 
781
 
 
782
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
783
                       be used without needed to redo it when a different 
 
784
                       subclass is in use ?
 
785
    """
 
786
 
 
787
    def setUp(self):
 
788
        super(ChrootedTestCase, self).setUp()
 
789
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
790
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
791
 
 
792
 
 
793
def filter_suite_by_re(suite, pattern):
 
794
    result = TestSuite()
 
795
    filter_re = re.compile(pattern)
 
796
    for test in iter_suite_tests(suite):
 
797
        if filter_re.search(test.id()):
 
798
            result.addTest(test)
 
799
    return result
 
800
 
 
801
 
 
802
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
803
              stop_on_failure=False, keep_output=False,
 
804
              transport=None):
 
805
    TestCaseInTempDir._TEST_NAME = name
 
806
    if verbose:
 
807
        verbosity = 2
 
808
    else:
 
809
        verbosity = 1
 
810
    runner = TextTestRunner(stream=sys.stdout,
 
811
                            descriptions=0,
 
812
                            verbosity=verbosity)
 
813
    runner.stop_on_failure=stop_on_failure
 
814
    if pattern != '.*':
 
815
        suite = filter_suite_by_re(suite, pattern)
 
816
    result = runner.run(suite)
 
817
    # This is still a little bogus, 
 
818
    # but only a little. Folk not using our testrunner will
 
819
    # have to delete their temp directories themselves.
 
820
    if result.wasSuccessful() or not keep_output:
 
821
        if TestCaseInTempDir.TEST_ROOT is not None:
 
822
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
 
823
    else:
 
824
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
825
    return result.wasSuccessful()
 
826
 
 
827
 
 
828
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
829
             keep_output=False,
 
830
             transport=None):
 
831
    """Run the whole test suite under the enhanced runner"""
 
832
    global default_transport
 
833
    if transport is None:
 
834
        transport = default_transport
 
835
    old_transport = default_transport
 
836
    default_transport = transport
 
837
    suite = test_suite()
 
838
    try:
 
839
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
840
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
841
                     transport=transport)
 
842
    finally:
 
843
        default_transport = old_transport
 
844
 
 
845
 
 
846
 
 
847
def test_suite():
 
848
    """Build and return TestSuite for the whole program."""
 
849
    from doctest import DocTestSuite
 
850
 
 
851
    global MODULES_TO_DOCTEST
 
852
 
 
853
    testmod_names = [ \
 
854
                   'bzrlib.tests.test_ancestry',
 
855
                   'bzrlib.tests.test_annotate',
 
856
                   'bzrlib.tests.test_api',
 
857
                   'bzrlib.tests.test_bad_files',
 
858
                   'bzrlib.tests.test_basis_inventory',
 
859
                   'bzrlib.tests.test_branch',
 
860
                   'bzrlib.tests.test_bzrdir',
 
861
                   'bzrlib.tests.test_command',
 
862
                   'bzrlib.tests.test_commit',
 
863
                   'bzrlib.tests.test_commit_merge',
 
864
                   'bzrlib.tests.test_config',
 
865
                   'bzrlib.tests.test_conflicts',
 
866
                   'bzrlib.tests.test_decorators',
 
867
                   'bzrlib.tests.test_diff',
 
868
                   'bzrlib.tests.test_doc_generate',
 
869
                   'bzrlib.tests.test_errors',
 
870
                   'bzrlib.tests.test_fetch',
 
871
                   'bzrlib.tests.test_gpg',
 
872
                   'bzrlib.tests.test_graph',
 
873
                   'bzrlib.tests.test_hashcache',
 
874
                   'bzrlib.tests.test_http',
 
875
                   'bzrlib.tests.test_identitymap',
 
876
                   'bzrlib.tests.test_inv',
 
877
                   'bzrlib.tests.test_lockable_files',
 
878
                   'bzrlib.tests.test_log',
 
879
                   'bzrlib.tests.test_merge',
 
880
                   'bzrlib.tests.test_merge3',
 
881
                   'bzrlib.tests.test_merge_core',
 
882
                   'bzrlib.tests.test_missing',
 
883
                   'bzrlib.tests.test_msgeditor',
 
884
                   'bzrlib.tests.test_nonascii',
 
885
                   'bzrlib.tests.test_options',
 
886
                   'bzrlib.tests.test_osutils',
 
887
                   'bzrlib.tests.test_permissions',
 
888
                   'bzrlib.tests.test_plugins',
 
889
                   'bzrlib.tests.test_repository',
 
890
                   'bzrlib.tests.test_revision',
 
891
                   'bzrlib.tests.test_revisionnamespaces',
 
892
                   'bzrlib.tests.test_revprops',
 
893
                   'bzrlib.tests.test_reweave',
 
894
                   'bzrlib.tests.test_rio',
 
895
                   'bzrlib.tests.test_sampler',
 
896
                   'bzrlib.tests.test_selftest',
 
897
                   'bzrlib.tests.test_setup',
 
898
                   'bzrlib.tests.test_sftp_transport',
 
899
                   'bzrlib.tests.test_smart_add',
 
900
                   'bzrlib.tests.test_source',
 
901
                   'bzrlib.tests.test_store',
 
902
                   'bzrlib.tests.test_symbol_versioning',
 
903
                   'bzrlib.tests.test_testament',
 
904
                   'bzrlib.tests.test_trace',
 
905
                   'bzrlib.tests.test_transactions',
 
906
                   'bzrlib.tests.test_transport',
 
907
                   'bzrlib.tests.test_tsort',
 
908
                   'bzrlib.tests.test_ui',
 
909
                   'bzrlib.tests.test_uncommit',
 
910
                   'bzrlib.tests.test_upgrade',
 
911
                   'bzrlib.tests.test_weave',
 
912
                   'bzrlib.tests.test_whitebox',
 
913
                   'bzrlib.tests.test_workingtree',
 
914
                   'bzrlib.tests.test_xml',
 
915
                   ]
 
916
    test_transport_implementations = [
 
917
        'bzrlib.tests.test_transport_implementations']
 
918
 
 
919
    TestCase.BZRPATH = osutils.pathjoin(
 
920
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
 
921
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
 
922
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
 
923
    print
 
924
    suite = TestSuite()
 
925
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
 
926
    # errors if it fails to load a named module - no indication of what's
 
927
    # actually wrong, just "no such module".  We should probably override that
 
928
    # class, but for the moment just load them ourselves. (mbp 20051202)
 
929
    loader = TestLoader()
 
930
    from bzrlib.transport import TransportTestProviderAdapter
 
931
    adapter = TransportTestProviderAdapter()
 
932
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
933
    for mod_name in testmod_names:
 
934
        mod = _load_module_by_name(mod_name)
 
935
        suite.addTest(loader.loadTestsFromModule(mod))
 
936
    for package in packages_to_test():
 
937
        suite.addTest(package.test_suite())
 
938
    for m in MODULES_TO_TEST:
 
939
        suite.addTest(loader.loadTestsFromModule(m))
 
940
    for m in (MODULES_TO_DOCTEST):
 
941
        suite.addTest(DocTestSuite(m))
 
942
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
943
        if getattr(plugin, 'test_suite', None) is not None:
 
944
            suite.addTest(plugin.test_suite())
 
945
    return suite
 
946
 
 
947
 
 
948
def adapt_modules(mods_list, adapter, loader, suite):
 
949
    """Adapt the modules in mods_list using adapter and add to suite."""
 
950
    for mod_name in mods_list:
 
951
        mod = _load_module_by_name(mod_name)
 
952
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
 
953
            suite.addTests(adapter.adapt(test))
 
954
 
 
955
 
 
956
def _load_module_by_name(mod_name):
 
957
    parts = mod_name.split('.')
 
958
    module = __import__(mod_name)
 
959
    del parts[0]
 
960
    # for historical reasons python returns the top-level module even though
 
961
    # it loads the submodule; we need to walk down to get the one we want.
 
962
    while parts:
 
963
        module = getattr(module, parts.pop(0))
 
964
    return module