/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-09-21 16:03:07 UTC
  • mfrom: (1927.3.5 throw_log_away)
  • Revision ID: pqm@pqm.ubuntu.com-20060921160307-6081481c9a444cce
(cfbolz) Throw away disk logfile if not used

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import doctest
 
33
import errno
 
34
import logging
 
35
import os
 
36
import re
 
37
import shlex
 
38
import stat
 
39
from subprocess import Popen, PIPE
 
40
import sys
 
41
import tempfile
 
42
import unittest
 
43
import time
 
44
 
 
45
 
 
46
from bzrlib import memorytree
 
47
import bzrlib.branch
 
48
import bzrlib.bzrdir as bzrdir
 
49
import bzrlib.commands
 
50
import bzrlib.bundle.serializer
 
51
import bzrlib.errors as errors
 
52
import bzrlib.inventory
 
53
import bzrlib.iterablefile
 
54
import bzrlib.lockdir
 
55
try:
 
56
    import bzrlib.lsprof
 
57
except ImportError:
 
58
    # lsprof not available
 
59
    pass
 
60
from bzrlib.merge import merge_inner
 
61
import bzrlib.merge3
 
62
import bzrlib.osutils
 
63
import bzrlib.osutils as osutils
 
64
import bzrlib.plugin
 
65
import bzrlib.progress as progress
 
66
from bzrlib.revision import common_ancestor
 
67
import bzrlib.store
 
68
from bzrlib import symbol_versioning
 
69
import bzrlib.trace
 
70
from bzrlib.transport import get_transport
 
71
import bzrlib.transport
 
72
from bzrlib.transport.local import LocalRelpathServer
 
73
from bzrlib.transport.readonly import ReadonlyServer
 
74
from bzrlib.trace import mutter
 
75
from bzrlib.tests import TestUtil
 
76
from bzrlib.tests.TestUtil import (
 
77
                          TestSuite,
 
78
                          TestLoader,
 
79
                          )
 
80
from bzrlib.tests.treeshape import build_tree_contents
 
81
import bzrlib.urlutils as urlutils
 
82
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
83
 
 
84
default_transport = LocalRelpathServer
 
85
 
 
86
MODULES_TO_TEST = []
 
87
MODULES_TO_DOCTEST = [
 
88
                      bzrlib.branch,
 
89
                      bzrlib.bundle.serializer,
 
90
                      bzrlib.commands,
 
91
                      bzrlib.errors,
 
92
                      bzrlib.inventory,
 
93
                      bzrlib.iterablefile,
 
94
                      bzrlib.lockdir,
 
95
                      bzrlib.merge3,
 
96
                      bzrlib.option,
 
97
                      bzrlib.osutils,
 
98
                      bzrlib.store,
 
99
                      bzrlib.transport,
 
100
                      ]
 
101
 
 
102
 
 
103
def packages_to_test():
 
104
    """Return a list of packages to test.
 
105
 
 
106
    The packages are not globally imported so that import failures are
 
107
    triggered when running selftest, not when importing the command.
 
108
    """
 
109
    import bzrlib.doc
 
110
    import bzrlib.tests.blackbox
 
111
    import bzrlib.tests.branch_implementations
 
112
    import bzrlib.tests.bzrdir_implementations
 
113
    import bzrlib.tests.interrepository_implementations
 
114
    import bzrlib.tests.interversionedfile_implementations
 
115
    import bzrlib.tests.intertree_implementations
 
116
    import bzrlib.tests.repository_implementations
 
117
    import bzrlib.tests.revisionstore_implementations
 
118
    import bzrlib.tests.tree_implementations
 
119
    import bzrlib.tests.workingtree_implementations
 
120
    return [
 
121
            bzrlib.doc,
 
122
            bzrlib.tests.blackbox,
 
123
            bzrlib.tests.branch_implementations,
 
124
            bzrlib.tests.bzrdir_implementations,
 
125
            bzrlib.tests.interrepository_implementations,
 
126
            bzrlib.tests.interversionedfile_implementations,
 
127
            bzrlib.tests.intertree_implementations,
 
128
            bzrlib.tests.repository_implementations,
 
129
            bzrlib.tests.revisionstore_implementations,
 
130
            bzrlib.tests.tree_implementations,
 
131
            bzrlib.tests.workingtree_implementations,
 
132
            ]
 
133
 
 
134
 
 
135
class _MyResult(unittest._TextTestResult):
 
136
    """Custom TestResult.
 
137
 
 
138
    Shows output in a different format, including displaying runtime for tests.
 
139
    """
 
140
    stop_early = False
 
141
    
 
142
    def __init__(self, stream, descriptions, verbosity, pb=None,
 
143
                 bench_history=None):
 
144
        """Construct new TestResult.
 
145
 
 
146
        :param bench_history: Optionally, a writable file object to accumulate
 
147
            benchmark results.
 
148
        """
 
149
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
150
        self.pb = pb
 
151
        if bench_history is not None:
 
152
            from bzrlib.version import _get_bzr_source_tree
 
153
            src_tree = _get_bzr_source_tree()
 
154
            if src_tree:
 
155
                try:
 
156
                    revision_id = src_tree.get_parent_ids()[0]
 
157
                except IndexError:
 
158
                    # XXX: if this is a brand new tree, do the same as if there
 
159
                    # is no branch.
 
160
                    revision_id = ''
 
161
            else:
 
162
                # XXX: If there's no branch, what should we do?
 
163
                revision_id = ''
 
164
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
165
        self._bench_history = bench_history
 
166
    
 
167
    def extractBenchmarkTime(self, testCase):
 
168
        """Add a benchmark time for the current test case."""
 
169
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
170
    
 
171
    def _elapsedTestTimeString(self):
 
172
        """Return a time string for the overall time the current test has taken."""
 
173
        return self._formatTime(time.time() - self._start_time)
 
174
 
 
175
    def _testTimeString(self):
 
176
        if self._benchmarkTime is not None:
 
177
            return "%s/%s" % (
 
178
                self._formatTime(self._benchmarkTime),
 
179
                self._elapsedTestTimeString())
 
180
        else:
 
181
            return "      %s" % self._elapsedTestTimeString()
 
182
 
 
183
    def _formatTime(self, seconds):
 
184
        """Format seconds as milliseconds with leading spaces."""
 
185
        return "%5dms" % (1000 * seconds)
 
186
 
 
187
    def _ellipsise_unimportant_words(self, a_string, final_width,
 
188
                                   keep_start=False):
 
189
        """Add ellipses (sp?) for overly long strings.
 
190
        
 
191
        :param keep_start: If true preserve the start of a_string rather
 
192
                           than the end of it.
 
193
        """
 
194
        if keep_start:
 
195
            if len(a_string) > final_width:
 
196
                result = a_string[:final_width-3] + '...'
 
197
            else:
 
198
                result = a_string
 
199
        else:
 
200
            if len(a_string) > final_width:
 
201
                result = '...' + a_string[3-final_width:]
 
202
            else:
 
203
                result = a_string
 
204
        return result.ljust(final_width)
 
205
 
 
206
    def startTest(self, test):
 
207
        unittest.TestResult.startTest(self, test)
 
208
        # In a short description, the important words are in
 
209
        # the beginning, but in an id, the important words are
 
210
        # at the end
 
211
        SHOW_DESCRIPTIONS = False
 
212
 
 
213
        if not self.showAll and self.dots and self.pb is not None:
 
214
            final_width = 13
 
215
        else:
 
216
            final_width = osutils.terminal_width()
 
217
            final_width = final_width - 15 - 8
 
218
        what = None
 
219
        if SHOW_DESCRIPTIONS:
 
220
            what = test.shortDescription()
 
221
            if what:
 
222
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
 
223
        if what is None:
 
224
            what = test.id()
 
225
            if what.startswith('bzrlib.tests.'):
 
226
                what = what[13:]
 
227
            what = self._ellipsise_unimportant_words(what, final_width)
 
228
        if self.showAll:
 
229
            self.stream.write(what)
 
230
        elif self.dots and self.pb is not None:
 
231
            self.pb.update(what, self.testsRun - 1, None)
 
232
        self.stream.flush()
 
233
        self._recordTestStartTime()
 
234
 
 
235
    def _recordTestStartTime(self):
 
236
        """Record that a test has started."""
 
237
        self._start_time = time.time()
 
238
 
 
239
    def addError(self, test, err):
 
240
        if isinstance(err[1], TestSkipped):
 
241
            return self.addSkipped(test, err)    
 
242
        unittest.TestResult.addError(self, test, err)
 
243
        test.setKeepLogfile()
 
244
        self.extractBenchmarkTime(test)
 
245
        if self.showAll:
 
246
            self.stream.writeln("ERROR %s" % self._testTimeString())
 
247
        elif self.dots and self.pb is None:
 
248
            self.stream.write('E')
 
249
        elif self.dots:
 
250
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
 
251
            self.pb.note(self._ellipsise_unimportant_words(
 
252
                            test.id() + ': ERROR',
 
253
                            osutils.terminal_width()))
 
254
        self.stream.flush()
 
255
        if self.stop_early:
 
256
            self.stop()
 
257
 
 
258
    def addFailure(self, test, err):
 
259
        unittest.TestResult.addFailure(self, test, err)
 
260
        test.setKeepLogfile()
 
261
        self.extractBenchmarkTime(test)
 
262
        if self.showAll:
 
263
            self.stream.writeln(" FAIL %s" % self._testTimeString())
 
264
        elif self.dots and self.pb is None:
 
265
            self.stream.write('F')
 
266
        elif self.dots:
 
267
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
 
268
            self.pb.note(self._ellipsise_unimportant_words(
 
269
                            test.id() + ': FAIL',
 
270
                            osutils.terminal_width()))
 
271
        self.stream.flush()
 
272
        if self.stop_early:
 
273
            self.stop()
 
274
 
 
275
    def addSuccess(self, test):
 
276
        self.extractBenchmarkTime(test)
 
277
        if self._bench_history is not None:
 
278
            if self._benchmarkTime is not None:
 
279
                self._bench_history.write("%s %s\n" % (
 
280
                    self._formatTime(self._benchmarkTime),
 
281
                    test.id()))
 
282
        if self.showAll:
 
283
            self.stream.writeln('   OK %s' % self._testTimeString())
 
284
            for bench_called, stats in getattr(test, '_benchcalls', []):
 
285
                self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
286
                stats.pprint(file=self.stream)
 
287
        elif self.dots and self.pb is None:
 
288
            self.stream.write('~')
 
289
        elif self.dots:
 
290
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
 
291
        self.stream.flush()
 
292
        unittest.TestResult.addSuccess(self, test)
 
293
 
 
294
    def addSkipped(self, test, skip_excinfo):
 
295
        self.extractBenchmarkTime(test)
 
296
        if self.showAll:
 
297
            print >>self.stream, ' SKIP %s' % self._testTimeString()
 
298
            print >>self.stream, '     %s' % skip_excinfo[1]
 
299
        elif self.dots and self.pb is None:
 
300
            self.stream.write('S')
 
301
        elif self.dots:
 
302
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
 
303
        self.stream.flush()
 
304
        # seems best to treat this as success from point-of-view of unittest
 
305
        # -- it actually does nothing so it barely matters :)
 
306
        try:
 
307
            test.tearDown()
 
308
        except KeyboardInterrupt:
 
309
            raise
 
310
        except:
 
311
            self.addError(test, test.__exc_info())
 
312
        else:
 
313
            unittest.TestResult.addSuccess(self, test)
 
314
 
 
315
    def printErrorList(self, flavour, errors):
 
316
        for test, err in errors:
 
317
            self.stream.writeln(self.separator1)
 
318
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
319
            if getattr(test, '_get_log', None) is not None:
 
320
                print >>self.stream
 
321
                print >>self.stream, \
 
322
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
323
                print >>self.stream, test._get_log()
 
324
                print >>self.stream, \
 
325
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
326
            self.stream.writeln(self.separator2)
 
327
            self.stream.writeln("%s" % err)
 
328
 
 
329
 
 
330
class TextTestRunner(object):
 
331
    stop_on_failure = False
 
332
 
 
333
    def __init__(self,
 
334
                 stream=sys.stderr,
 
335
                 descriptions=0,
 
336
                 verbosity=1,
 
337
                 keep_output=False,
 
338
                 pb=None,
 
339
                 bench_history=None):
 
340
        self.stream = unittest._WritelnDecorator(stream)
 
341
        self.descriptions = descriptions
 
342
        self.verbosity = verbosity
 
343
        self.keep_output = keep_output
 
344
        self.pb = pb
 
345
        self._bench_history = bench_history
 
346
 
 
347
    def _makeResult(self):
 
348
        result = _MyResult(self.stream,
 
349
                           self.descriptions,
 
350
                           self.verbosity,
 
351
                           pb=self.pb,
 
352
                           bench_history=self._bench_history)
 
353
        result.stop_early = self.stop_on_failure
 
354
        return result
 
355
 
 
356
    def run(self, test):
 
357
        "Run the given test case or test suite."
 
358
        result = self._makeResult()
 
359
        startTime = time.time()
 
360
        if self.pb is not None:
 
361
            self.pb.update('Running tests', 0, test.countTestCases())
 
362
        test.run(result)
 
363
        stopTime = time.time()
 
364
        timeTaken = stopTime - startTime
 
365
        result.printErrors()
 
366
        self.stream.writeln(result.separator2)
 
367
        run = result.testsRun
 
368
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
369
                            (run, run != 1 and "s" or "", timeTaken))
 
370
        self.stream.writeln()
 
371
        if not result.wasSuccessful():
 
372
            self.stream.write("FAILED (")
 
373
            failed, errored = map(len, (result.failures, result.errors))
 
374
            if failed:
 
375
                self.stream.write("failures=%d" % failed)
 
376
            if errored:
 
377
                if failed: self.stream.write(", ")
 
378
                self.stream.write("errors=%d" % errored)
 
379
            self.stream.writeln(")")
 
380
        else:
 
381
            self.stream.writeln("OK")
 
382
        if self.pb is not None:
 
383
            self.pb.update('Cleaning up', 0, 1)
 
384
        # This is still a little bogus, 
 
385
        # but only a little. Folk not using our testrunner will
 
386
        # have to delete their temp directories themselves.
 
387
        test_root = TestCaseInTempDir.TEST_ROOT
 
388
        if result.wasSuccessful() or not self.keep_output:
 
389
            if test_root is not None:
 
390
                # If LANG=C we probably have created some bogus paths
 
391
                # which rmtree(unicode) will fail to delete
 
392
                # so make sure we are using rmtree(str) to delete everything
 
393
                # except on win32, where rmtree(str) will fail
 
394
                # since it doesn't have the property of byte-stream paths
 
395
                # (they are either ascii or mbcs)
 
396
                if sys.platform == 'win32':
 
397
                    # make sure we are using the unicode win32 api
 
398
                    test_root = unicode(test_root)
 
399
                else:
 
400
                    test_root = test_root.encode(
 
401
                        sys.getfilesystemencoding())
 
402
                osutils.rmtree(test_root)
 
403
        else:
 
404
            if self.pb is not None:
 
405
                self.pb.note("Failed tests working directories are in '%s'\n",
 
406
                             test_root)
 
407
            else:
 
408
                self.stream.writeln(
 
409
                    "Failed tests working directories are in '%s'\n" %
 
410
                    test_root)
 
411
        TestCaseInTempDir.TEST_ROOT = None
 
412
        if self.pb is not None:
 
413
            self.pb.clear()
 
414
        return result
 
415
 
 
416
 
 
417
def iter_suite_tests(suite):
 
418
    """Return all tests in a suite, recursing through nested suites"""
 
419
    for item in suite._tests:
 
420
        if isinstance(item, unittest.TestCase):
 
421
            yield item
 
422
        elif isinstance(item, unittest.TestSuite):
 
423
            for r in iter_suite_tests(item):
 
424
                yield r
 
425
        else:
 
426
            raise Exception('unknown object %r inside test suite %r'
 
427
                            % (item, suite))
 
428
 
 
429
 
 
430
class TestSkipped(Exception):
 
431
    """Indicates that a test was intentionally skipped, rather than failing."""
 
432
 
 
433
 
 
434
class CommandFailed(Exception):
 
435
    pass
 
436
 
 
437
 
 
438
class StringIOWrapper(object):
 
439
    """A wrapper around cStringIO which just adds an encoding attribute.
 
440
    
 
441
    Internally we can check sys.stdout to see what the output encoding
 
442
    should be. However, cStringIO has no encoding attribute that we can
 
443
    set. So we wrap it instead.
 
444
    """
 
445
    encoding='ascii'
 
446
    _cstring = None
 
447
 
 
448
    def __init__(self, s=None):
 
449
        if s is not None:
 
450
            self.__dict__['_cstring'] = StringIO(s)
 
451
        else:
 
452
            self.__dict__['_cstring'] = StringIO()
 
453
 
 
454
    def __getattr__(self, name, getattr=getattr):
 
455
        return getattr(self.__dict__['_cstring'], name)
 
456
 
 
457
    def __setattr__(self, name, val):
 
458
        if name == 'encoding':
 
459
            self.__dict__['encoding'] = val
 
460
        else:
 
461
            return setattr(self._cstring, name, val)
 
462
 
 
463
 
 
464
class TestCase(unittest.TestCase):
 
465
    """Base class for bzr unit tests.
 
466
    
 
467
    Tests that need access to disk resources should subclass 
 
468
    TestCaseInTempDir not TestCase.
 
469
 
 
470
    Error and debug log messages are redirected from their usual
 
471
    location into a temporary file, the contents of which can be
 
472
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
473
    so that it can also capture file IO.  When the test completes this file
 
474
    is read into memory and removed from disk.
 
475
       
 
476
    There are also convenience functions to invoke bzr's command-line
 
477
    routine, and to build and check bzr trees.
 
478
   
 
479
    In addition to the usual method of overriding tearDown(), this class also
 
480
    allows subclasses to register functions into the _cleanups list, which is
 
481
    run in order as the object is torn down.  It's less likely this will be
 
482
    accidentally overlooked.
 
483
    """
 
484
 
 
485
    _log_file_name = None
 
486
    _log_contents = ''
 
487
    _keep_log_file = False
 
488
    # record lsprof data when performing benchmark calls.
 
489
    _gather_lsprof_in_benchmarks = False
 
490
 
 
491
    def __init__(self, methodName='testMethod'):
 
492
        super(TestCase, self).__init__(methodName)
 
493
        self._cleanups = []
 
494
 
 
495
    def setUp(self):
 
496
        unittest.TestCase.setUp(self)
 
497
        self._cleanEnvironment()
 
498
        bzrlib.trace.disable_default_logging()
 
499
        self._startLogFile()
 
500
        self._benchcalls = []
 
501
        self._benchtime = None
 
502
 
 
503
    def _ndiff_strings(self, a, b):
 
504
        """Return ndiff between two strings containing lines.
 
505
        
 
506
        A trailing newline is added if missing to make the strings
 
507
        print properly."""
 
508
        if b and b[-1] != '\n':
 
509
            b += '\n'
 
510
        if a and a[-1] != '\n':
 
511
            a += '\n'
 
512
        difflines = difflib.ndiff(a.splitlines(True),
 
513
                                  b.splitlines(True),
 
514
                                  linejunk=lambda x: False,
 
515
                                  charjunk=lambda x: False)
 
516
        return ''.join(difflines)
 
517
 
 
518
    def assertEqualDiff(self, a, b, message=None):
 
519
        """Assert two texts are equal, if not raise an exception.
 
520
        
 
521
        This is intended for use with multi-line strings where it can 
 
522
        be hard to find the differences by eye.
 
523
        """
 
524
        # TODO: perhaps override assertEquals to call this for strings?
 
525
        if a == b:
 
526
            return
 
527
        if message is None:
 
528
            message = "texts not equal:\n"
 
529
        raise AssertionError(message + 
 
530
                             self._ndiff_strings(a, b))      
 
531
        
 
532
    def assertEqualMode(self, mode, mode_test):
 
533
        self.assertEqual(mode, mode_test,
 
534
                         'mode mismatch %o != %o' % (mode, mode_test))
 
535
 
 
536
    def assertStartsWith(self, s, prefix):
 
537
        if not s.startswith(prefix):
 
538
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
539
 
 
540
    def assertEndsWith(self, s, suffix):
 
541
        """Asserts that s ends with suffix."""
 
542
        if not s.endswith(suffix):
 
543
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
544
 
 
545
    def assertContainsRe(self, haystack, needle_re):
 
546
        """Assert that a contains something matching a regular expression."""
 
547
        if not re.search(needle_re, haystack):
 
548
            raise AssertionError('pattern "%s" not found in "%s"'
 
549
                    % (needle_re, haystack))
 
550
 
 
551
    def assertNotContainsRe(self, haystack, needle_re):
 
552
        """Assert that a does not match a regular expression"""
 
553
        if re.search(needle_re, haystack):
 
554
            raise AssertionError('pattern "%s" found in "%s"'
 
555
                    % (needle_re, haystack))
 
556
 
 
557
    def assertSubset(self, sublist, superlist):
 
558
        """Assert that every entry in sublist is present in superlist."""
 
559
        missing = []
 
560
        for entry in sublist:
 
561
            if entry not in superlist:
 
562
                missing.append(entry)
 
563
        if len(missing) > 0:
 
564
            raise AssertionError("value(s) %r not present in container %r" % 
 
565
                                 (missing, superlist))
 
566
 
 
567
    def assertIs(self, left, right):
 
568
        if not (left is right):
 
569
            raise AssertionError("%r is not %r." % (left, right))
 
570
 
 
571
    def assertTransportMode(self, transport, path, mode):
 
572
        """Fail if a path does not have mode mode.
 
573
        
 
574
        If modes are not supported on this transport, the assertion is ignored.
 
575
        """
 
576
        if not transport._can_roundtrip_unix_modebits():
 
577
            return
 
578
        path_stat = transport.stat(path)
 
579
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
580
        self.assertEqual(mode, actual_mode,
 
581
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
582
 
 
583
    def assertIsInstance(self, obj, kls):
 
584
        """Fail if obj is not an instance of kls"""
 
585
        if not isinstance(obj, kls):
 
586
            self.fail("%r is an instance of %s rather than %s" % (
 
587
                obj, obj.__class__, kls))
 
588
 
 
589
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
590
        """A helper for callDeprecated and applyDeprecated.
 
591
 
 
592
        :param a_callable: A callable to call.
 
593
        :param args: The positional arguments for the callable
 
594
        :param kwargs: The keyword arguments for the callable
 
595
        :return: A tuple (warnings, result). result is the result of calling
 
596
            a_callable(*args, **kwargs).
 
597
        """
 
598
        local_warnings = []
 
599
        def capture_warnings(msg, cls, stacklevel=None):
 
600
            # we've hooked into a deprecation specific callpath,
 
601
            # only deprecations should getting sent via it.
 
602
            self.assertEqual(cls, DeprecationWarning)
 
603
            local_warnings.append(msg)
 
604
        original_warning_method = symbol_versioning.warn
 
605
        symbol_versioning.set_warning_method(capture_warnings)
 
606
        try:
 
607
            result = a_callable(*args, **kwargs)
 
608
        finally:
 
609
            symbol_versioning.set_warning_method(original_warning_method)
 
610
        return (local_warnings, result)
 
611
 
 
612
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
613
        """Call a deprecated callable without warning the user.
 
614
 
 
615
        :param deprecation_format: The deprecation format that the callable
 
616
            should have been deprecated with. This is the same type as the 
 
617
            parameter to deprecated_method/deprecated_function. If the 
 
618
            callable is not deprecated with this format, an assertion error
 
619
            will be raised.
 
620
        :param a_callable: A callable to call. This may be a bound method or
 
621
            a regular function. It will be called with *args and **kwargs.
 
622
        :param args: The positional arguments for the callable
 
623
        :param kwargs: The keyword arguments for the callable
 
624
        :return: The result of a_callable(*args, **kwargs)
 
625
        """
 
626
        call_warnings, result = self._capture_warnings(a_callable,
 
627
            *args, **kwargs)
 
628
        expected_first_warning = symbol_versioning.deprecation_string(
 
629
            a_callable, deprecation_format)
 
630
        if len(call_warnings) == 0:
 
631
            self.fail("No assertion generated by call to %s" %
 
632
                a_callable)
 
633
        self.assertEqual(expected_first_warning, call_warnings[0])
 
634
        return result
 
635
 
 
636
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
637
        """Assert that a callable is deprecated in a particular way.
 
638
 
 
639
        This is a very precise test for unusual requirements. The 
 
640
        applyDeprecated helper function is probably more suited for most tests
 
641
        as it allows you to simply specify the deprecation format being used
 
642
        and will ensure that that is issued for the function being called.
 
643
 
 
644
        :param expected: a list of the deprecation warnings expected, in order
 
645
        :param callable: The callable to call
 
646
        :param args: The positional arguments for the callable
 
647
        :param kwargs: The keyword arguments for the callable
 
648
        """
 
649
        call_warnings, result = self._capture_warnings(callable,
 
650
            *args, **kwargs)
 
651
        self.assertEqual(expected, call_warnings)
 
652
        return result
 
653
 
 
654
    def _startLogFile(self):
 
655
        """Send bzr and test log messages to a temporary file.
 
656
 
 
657
        The file is removed as the test is torn down.
 
658
        """
 
659
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
660
        self._log_file = os.fdopen(fileno, 'w+')
 
661
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
662
        self._log_file_name = name
 
663
        self.addCleanup(self._finishLogFile)
 
664
 
 
665
    def _finishLogFile(self):
 
666
        """Finished with the log file.
 
667
 
 
668
        Close the file and delete it, unless setKeepLogfile was called.
 
669
        """
 
670
        if self._log_file is None:
 
671
            return
 
672
        bzrlib.trace.disable_test_log(self._log_nonce)
 
673
        self._log_file.close()
 
674
        self._log_file = None
 
675
        if not self._keep_log_file:
 
676
            os.remove(self._log_file_name)
 
677
            self._log_file_name = None
 
678
 
 
679
    def setKeepLogfile(self):
 
680
        """Make the logfile not be deleted when _finishLogFile is called."""
 
681
        self._keep_log_file = True
 
682
 
 
683
    def addCleanup(self, callable):
 
684
        """Arrange to run a callable when this case is torn down.
 
685
 
 
686
        Callables are run in the reverse of the order they are registered, 
 
687
        ie last-in first-out.
 
688
        """
 
689
        if callable in self._cleanups:
 
690
            raise ValueError("cleanup function %r already registered on %s" 
 
691
                    % (callable, self))
 
692
        self._cleanups.append(callable)
 
693
 
 
694
    def _cleanEnvironment(self):
 
695
        new_env = {
 
696
            'HOME': os.getcwd(),
 
697
            'APPDATA': os.getcwd(),
 
698
            'BZR_EMAIL': None,
 
699
            'BZREMAIL': None, # may still be present in the environment
 
700
            'EMAIL': None,
 
701
            'BZR_PROGRESS_BAR': None,
 
702
        }
 
703
        self.__old_env = {}
 
704
        self.addCleanup(self._restoreEnvironment)
 
705
        for name, value in new_env.iteritems():
 
706
            self._captureVar(name, value)
 
707
 
 
708
    def _captureVar(self, name, newvalue):
 
709
        """Set an environment variable, and reset it when finished."""
 
710
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
711
 
 
712
    def _restoreEnvironment(self):
 
713
        for name, value in self.__old_env.iteritems():
 
714
            osutils.set_or_unset_env(name, value)
 
715
 
 
716
    def tearDown(self):
 
717
        self._runCleanups()
 
718
        unittest.TestCase.tearDown(self)
 
719
 
 
720
    def time(self, callable, *args, **kwargs):
 
721
        """Run callable and accrue the time it takes to the benchmark time.
 
722
        
 
723
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
724
        this will cause lsprofile statistics to be gathered and stored in
 
725
        self._benchcalls.
 
726
        """
 
727
        if self._benchtime is None:
 
728
            self._benchtime = 0
 
729
        start = time.time()
 
730
        try:
 
731
            if not self._gather_lsprof_in_benchmarks:
 
732
                return callable(*args, **kwargs)
 
733
            else:
 
734
                # record this benchmark
 
735
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
736
                stats.sort()
 
737
                self._benchcalls.append(((callable, args, kwargs), stats))
 
738
                return ret
 
739
        finally:
 
740
            self._benchtime += time.time() - start
 
741
 
 
742
    def _runCleanups(self):
 
743
        """Run registered cleanup functions. 
 
744
 
 
745
        This should only be called from TestCase.tearDown.
 
746
        """
 
747
        # TODO: Perhaps this should keep running cleanups even if 
 
748
        # one of them fails?
 
749
        for cleanup_fn in reversed(self._cleanups):
 
750
            cleanup_fn()
 
751
 
 
752
    def log(self, *args):
 
753
        mutter(*args)
 
754
 
 
755
    def _get_log(self, keep_log_file=False):
 
756
        """Return as a string the log for this test. If the file is still
 
757
        on disk and keep_log_file=False, delete the log file and store the
 
758
        content in self._log_contents."""
 
759
        # flush the log file, to get all content
 
760
        import bzrlib.trace
 
761
        bzrlib.trace._trace_file.flush()
 
762
        if self._log_contents:
 
763
            return self._log_contents
 
764
        if self._log_file_name is not None:
 
765
            logfile = open(self._log_file_name)
 
766
            try:
 
767
                log_contents = logfile.read()
 
768
            finally:
 
769
                logfile.close()
 
770
            if not keep_log_file:
 
771
                self._log_contents = log_contents
 
772
                os.remove(self._log_file_name)
 
773
            return log_contents
 
774
        else:
 
775
            return "DELETED log file to reduce memory footprint"
 
776
 
 
777
    def capture(self, cmd, retcode=0):
 
778
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
779
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
780
 
 
781
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
 
782
        """Invoke bzr and return (stdout, stderr).
 
783
 
 
784
        Useful for code that wants to check the contents of the
 
785
        output, the way error messages are presented, etc.
 
786
 
 
787
        This should be the main method for tests that want to exercise the
 
788
        overall behavior of the bzr application (rather than a unit test
 
789
        or a functional test of the library.)
 
790
 
 
791
        Much of the old code runs bzr by forking a new copy of Python, but
 
792
        that is slower, harder to debug, and generally not necessary.
 
793
 
 
794
        This runs bzr through the interface that catches and reports
 
795
        errors, and with logging set to something approximating the
 
796
        default, so that error reporting can be checked.
 
797
 
 
798
        :param argv: arguments to invoke bzr
 
799
        :param retcode: expected return code, or None for don't-care.
 
800
        :param encoding: encoding for sys.stdout and sys.stderr
 
801
        :param stdin: A string to be used as stdin for the command.
 
802
        """
 
803
        if encoding is None:
 
804
            encoding = bzrlib.user_encoding
 
805
        if stdin is not None:
 
806
            stdin = StringIO(stdin)
 
807
        stdout = StringIOWrapper()
 
808
        stderr = StringIOWrapper()
 
809
        stdout.encoding = encoding
 
810
        stderr.encoding = encoding
 
811
 
 
812
        self.log('run bzr: %r', argv)
 
813
        # FIXME: don't call into logging here
 
814
        handler = logging.StreamHandler(stderr)
 
815
        handler.setLevel(logging.INFO)
 
816
        logger = logging.getLogger('')
 
817
        logger.addHandler(handler)
 
818
        old_ui_factory = bzrlib.ui.ui_factory
 
819
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
820
            stdout=stdout,
 
821
            stderr=stderr)
 
822
        bzrlib.ui.ui_factory.stdin = stdin
 
823
        try:
 
824
            result = self.apply_redirected(stdin, stdout, stderr,
 
825
                                           bzrlib.commands.run_bzr_catch_errors,
 
826
                                           argv)
 
827
        finally:
 
828
            logger.removeHandler(handler)
 
829
            bzrlib.ui.ui_factory = old_ui_factory
 
830
 
 
831
        out = stdout.getvalue()
 
832
        err = stderr.getvalue()
 
833
        if out:
 
834
            self.log('output:\n%r', out)
 
835
        if err:
 
836
            self.log('errors:\n%r', err)
 
837
        if retcode is not None:
 
838
            self.assertEquals(retcode, result)
 
839
        return out, err
 
840
 
 
841
    def run_bzr(self, *args, **kwargs):
 
842
        """Invoke bzr, as if it were run from the command line.
 
843
 
 
844
        This should be the main method for tests that want to exercise the
 
845
        overall behavior of the bzr application (rather than a unit test
 
846
        or a functional test of the library.)
 
847
 
 
848
        This sends the stdout/stderr results into the test's log,
 
849
        where it may be useful for debugging.  See also run_captured.
 
850
 
 
851
        :param stdin: A string to be used as stdin for the command.
 
852
        """
 
853
        retcode = kwargs.pop('retcode', 0)
 
854
        encoding = kwargs.pop('encoding', None)
 
855
        stdin = kwargs.pop('stdin', None)
 
856
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
 
857
 
 
858
    def run_bzr_decode(self, *args, **kwargs):
 
859
        if 'encoding' in kwargs:
 
860
            encoding = kwargs['encoding']
 
861
        else:
 
862
            encoding = bzrlib.user_encoding
 
863
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
864
 
 
865
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
866
        """Run bzr, and check that stderr contains the supplied regexes
 
867
        
 
868
        :param error_regexes: Sequence of regular expressions which 
 
869
            must each be found in the error output. The relative ordering
 
870
            is not enforced.
 
871
        :param args: command-line arguments for bzr
 
872
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
873
            This function changes the default value of retcode to be 3,
 
874
            since in most cases this is run when you expect bzr to fail.
 
875
        :return: (out, err) The actual output of running the command (in case you
 
876
                 want to do more inspection)
 
877
 
 
878
        Examples of use:
 
879
            # Make sure that commit is failing because there is nothing to do
 
880
            self.run_bzr_error(['no changes to commit'],
 
881
                               'commit', '-m', 'my commit comment')
 
882
            # Make sure --strict is handling an unknown file, rather than
 
883
            # giving us the 'nothing to do' error
 
884
            self.build_tree(['unknown'])
 
885
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
886
                               'commit', '--strict', '-m', 'my commit comment')
 
887
        """
 
888
        kwargs.setdefault('retcode', 3)
 
889
        out, err = self.run_bzr(*args, **kwargs)
 
890
        for regex in error_regexes:
 
891
            self.assertContainsRe(err, regex)
 
892
        return out, err
 
893
 
 
894
    def run_bzr_subprocess(self, *args, **kwargs):
 
895
        """Run bzr in a subprocess for testing.
 
896
 
 
897
        This starts a new Python interpreter and runs bzr in there. 
 
898
        This should only be used for tests that have a justifiable need for
 
899
        this isolation: e.g. they are testing startup time, or signal
 
900
        handling, or early startup code, etc.  Subprocess code can't be 
 
901
        profiled or debugged so easily.
 
902
 
 
903
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
904
            None is supplied, the status code is not checked.
 
905
        :param env_changes: A dictionary which lists changes to environment
 
906
            variables. A value of None will unset the env variable.
 
907
            The values must be strings. The change will only occur in the
 
908
            child, so you don't need to fix the environment after running.
 
909
        :param universal_newlines: Convert CRLF => LF
 
910
        """
 
911
        env_changes = kwargs.get('env_changes', {})
 
912
        process = self.start_bzr_subprocess(args, env_changes=env_changes)
 
913
        # We distinguish between retcode=None and retcode not passed.
 
914
        supplied_retcode = kwargs.get('retcode', 0)
 
915
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
916
            universal_newlines=kwargs.get('universal_newlines', False),
 
917
            process_args=args)
 
918
 
 
919
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
920
                             skip_if_plan_to_signal=False):
 
921
        """Start bzr in a subprocess for testing.
 
922
 
 
923
        This starts a new Python interpreter and runs bzr in there.
 
924
        This should only be used for tests that have a justifiable need for
 
925
        this isolation: e.g. they are testing startup time, or signal
 
926
        handling, or early startup code, etc.  Subprocess code can't be
 
927
        profiled or debugged so easily.
 
928
 
 
929
        :param process_args: a list of arguments to pass to the bzr executable,
 
930
            for example `['--version']`.
 
931
        :param env_changes: A dictionary which lists changes to environment
 
932
            variables. A value of None will unset the env variable.
 
933
            The values must be strings. The change will only occur in the
 
934
            child, so you don't need to fix the environment after running.
 
935
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
936
            is not available.
 
937
 
 
938
        :returns: Popen object for the started process.
 
939
        """
 
940
        if skip_if_plan_to_signal:
 
941
            if not getattr(os, 'kill', None):
 
942
                raise TestSkipped("os.kill not available.")
 
943
 
 
944
        if env_changes is None:
 
945
            env_changes = {}
 
946
        old_env = {}
 
947
 
 
948
        def cleanup_environment():
 
949
            for env_var, value in env_changes.iteritems():
 
950
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
951
 
 
952
        def restore_environment():
 
953
            for env_var, value in old_env.iteritems():
 
954
                osutils.set_or_unset_env(env_var, value)
 
955
 
 
956
        bzr_path = self.get_bzr_path()
 
957
 
 
958
        try:
 
959
            # win32 subprocess doesn't support preexec_fn
 
960
            # so we will avoid using it on all platforms, just to
 
961
            # make sure the code path is used, and we don't break on win32
 
962
            cleanup_environment()
 
963
            process = Popen([sys.executable, bzr_path] + list(process_args),
 
964
                             stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
965
        finally:
 
966
            restore_environment()
 
967
        return process
 
968
 
 
969
    def get_bzr_path(self):
 
970
        """Return the path of the 'bzr' executable for this test suite."""
 
971
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
972
        if not os.path.isfile(bzr_path):
 
973
            # We are probably installed. Assume sys.argv is the right file
 
974
            bzr_path = sys.argv[0]
 
975
        return bzr_path
 
976
 
 
977
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
978
                              universal_newlines=False, process_args=None):
 
979
        """Finish the execution of process.
 
980
 
 
981
        :param process: the Popen object returned from start_bzr_subprocess.
 
982
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
983
            None is supplied, the status code is not checked.
 
984
        :param send_signal: an optional signal to send to the process.
 
985
        :param universal_newlines: Convert CRLF => LF
 
986
        :returns: (stdout, stderr)
 
987
        """
 
988
        if send_signal is not None:
 
989
            os.kill(process.pid, send_signal)
 
990
        out, err = process.communicate()
 
991
 
 
992
        if universal_newlines:
 
993
            out = out.replace('\r\n', '\n')
 
994
            err = err.replace('\r\n', '\n')
 
995
 
 
996
        if retcode is not None and retcode != process.returncode:
 
997
            if process_args is None:
 
998
                process_args = "(unknown args)"
 
999
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1000
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1001
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1002
                      % (process_args, retcode, process.returncode))
 
1003
        return [out, err]
 
1004
 
 
1005
    def check_inventory_shape(self, inv, shape):
 
1006
        """Compare an inventory to a list of expected names.
 
1007
 
 
1008
        Fail if they are not precisely equal.
 
1009
        """
 
1010
        extras = []
 
1011
        shape = list(shape)             # copy
 
1012
        for path, ie in inv.entries():
 
1013
            name = path.replace('\\', '/')
 
1014
            if ie.kind == 'dir':
 
1015
                name = name + '/'
 
1016
            if name in shape:
 
1017
                shape.remove(name)
 
1018
            else:
 
1019
                extras.append(name)
 
1020
        if shape:
 
1021
            self.fail("expected paths not found in inventory: %r" % shape)
 
1022
        if extras:
 
1023
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1024
 
 
1025
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1026
                         a_callable=None, *args, **kwargs):
 
1027
        """Call callable with redirected std io pipes.
 
1028
 
 
1029
        Returns the return code."""
 
1030
        if not callable(a_callable):
 
1031
            raise ValueError("a_callable must be callable.")
 
1032
        if stdin is None:
 
1033
            stdin = StringIO("")
 
1034
        if stdout is None:
 
1035
            if getattr(self, "_log_file", None) is not None:
 
1036
                stdout = self._log_file
 
1037
            else:
 
1038
                stdout = StringIO()
 
1039
        if stderr is None:
 
1040
            if getattr(self, "_log_file", None is not None):
 
1041
                stderr = self._log_file
 
1042
            else:
 
1043
                stderr = StringIO()
 
1044
        real_stdin = sys.stdin
 
1045
        real_stdout = sys.stdout
 
1046
        real_stderr = sys.stderr
 
1047
        try:
 
1048
            sys.stdout = stdout
 
1049
            sys.stderr = stderr
 
1050
            sys.stdin = stdin
 
1051
            return a_callable(*args, **kwargs)
 
1052
        finally:
 
1053
            sys.stdout = real_stdout
 
1054
            sys.stderr = real_stderr
 
1055
            sys.stdin = real_stdin
 
1056
 
 
1057
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1058
    def merge(self, branch_from, wt_to):
 
1059
        """A helper for tests to do a ui-less merge.
 
1060
 
 
1061
        This should move to the main library when someone has time to integrate
 
1062
        it in.
 
1063
        """
 
1064
        # minimal ui-less merge.
 
1065
        wt_to.branch.fetch(branch_from)
 
1066
        base_rev = common_ancestor(branch_from.last_revision(),
 
1067
                                   wt_to.branch.last_revision(),
 
1068
                                   wt_to.branch.repository)
 
1069
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1070
                    wt_to.branch.repository.revision_tree(base_rev),
 
1071
                    this_tree=wt_to)
 
1072
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1073
 
 
1074
 
 
1075
BzrTestBase = TestCase
 
1076
 
 
1077
     
 
1078
class TestCaseInTempDir(TestCase):
 
1079
    """Derived class that runs a test within a temporary directory.
 
1080
 
 
1081
    This is useful for tests that need to create a branch, etc.
 
1082
 
 
1083
    The directory is created in a slightly complex way: for each
 
1084
    Python invocation, a new temporary top-level directory is created.
 
1085
    All test cases create their own directory within that.  If the
 
1086
    tests complete successfully, the directory is removed.
 
1087
 
 
1088
    InTempDir is an old alias for FunctionalTestCase.
 
1089
    """
 
1090
 
 
1091
    TEST_ROOT = None
 
1092
    _TEST_NAME = 'test'
 
1093
    OVERRIDE_PYTHON = 'python'
 
1094
 
 
1095
    def check_file_contents(self, filename, expect):
 
1096
        self.log("check contents of file %s" % filename)
 
1097
        contents = file(filename, 'r').read()
 
1098
        if contents != expect:
 
1099
            self.log("expected: %r" % expect)
 
1100
            self.log("actually: %r" % contents)
 
1101
            self.fail("contents of %s not as expected" % filename)
 
1102
 
 
1103
    def _make_test_root(self):
 
1104
        if TestCaseInTempDir.TEST_ROOT is not None:
 
1105
            return
 
1106
        i = 0
 
1107
        while True:
 
1108
            root = u'test%04d.tmp' % i
 
1109
            try:
 
1110
                os.mkdir(root)
 
1111
            except OSError, e:
 
1112
                if e.errno == errno.EEXIST:
 
1113
                    i += 1
 
1114
                    continue
 
1115
                else:
 
1116
                    raise
 
1117
            # successfully created
 
1118
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
1119
            break
 
1120
        # make a fake bzr directory there to prevent any tests propagating
 
1121
        # up onto the source directory's real branch
 
1122
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
 
1123
 
 
1124
    def setUp(self):
 
1125
        super(TestCaseInTempDir, self).setUp()
 
1126
        self._make_test_root()
 
1127
        _currentdir = os.getcwdu()
 
1128
        # shorten the name, to avoid test failures due to path length
 
1129
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1130
                   .replace('__main__.', '')[-100:]
 
1131
        # it's possible the same test class is run several times for
 
1132
        # parameterized tests, so make sure the names don't collide.  
 
1133
        i = 0
 
1134
        while True:
 
1135
            if i > 0:
 
1136
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1137
            else:
 
1138
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1139
            if os.path.exists(candidate_dir):
 
1140
                i = i + 1
 
1141
                continue
 
1142
            else:
 
1143
                os.mkdir(candidate_dir)
 
1144
                self.test_home_dir = candidate_dir + '/home'
 
1145
                os.mkdir(self.test_home_dir)
 
1146
                self.test_dir = candidate_dir + '/work'
 
1147
                os.mkdir(self.test_dir)
 
1148
                os.chdir(self.test_dir)
 
1149
                break
 
1150
        os.environ['HOME'] = self.test_home_dir
 
1151
        os.environ['APPDATA'] = self.test_home_dir
 
1152
        def _leaveDirectory():
 
1153
            os.chdir(_currentdir)
 
1154
        self.addCleanup(_leaveDirectory)
 
1155
        
 
1156
    def build_tree(self, shape, line_endings='native', transport=None):
 
1157
        """Build a test tree according to a pattern.
 
1158
 
 
1159
        shape is a sequence of file specifications.  If the final
 
1160
        character is '/', a directory is created.
 
1161
 
 
1162
        This assumes that all the elements in the tree being built are new.
 
1163
 
 
1164
        This doesn't add anything to a branch.
 
1165
        :param line_endings: Either 'binary' or 'native'
 
1166
                             in binary mode, exact contents are written
 
1167
                             in native mode, the line endings match the
 
1168
                             default platform endings.
 
1169
 
 
1170
        :param transport: A transport to write to, for building trees on 
 
1171
                          VFS's. If the transport is readonly or None,
 
1172
                          "." is opened automatically.
 
1173
        """
 
1174
        # It's OK to just create them using forward slashes on windows.
 
1175
        if transport is None or transport.is_readonly():
 
1176
            transport = get_transport(".")
 
1177
        for name in shape:
 
1178
            self.assert_(isinstance(name, basestring))
 
1179
            if name[-1] == '/':
 
1180
                transport.mkdir(urlutils.escape(name[:-1]))
 
1181
            else:
 
1182
                if line_endings == 'binary':
 
1183
                    end = '\n'
 
1184
                elif line_endings == 'native':
 
1185
                    end = os.linesep
 
1186
                else:
 
1187
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
 
1188
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1189
                # Technically 'put()' is the right command. However, put
 
1190
                # uses an AtomicFile, which requires an extra rename into place
 
1191
                # As long as the files didn't exist in the past, append() will
 
1192
                # do the same thing as put()
 
1193
                # On jam's machine, make_kernel_like_tree is:
 
1194
                #   put:    4.5-7.5s (averaging 6s)
 
1195
                #   append: 2.9-4.5s
 
1196
                #   put_non_atomic: 2.9-4.5s
 
1197
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1198
 
 
1199
    def build_tree_contents(self, shape):
 
1200
        build_tree_contents(shape)
 
1201
 
 
1202
    def failUnlessExists(self, path):
 
1203
        """Fail unless path, which may be abs or relative, exists."""
 
1204
        self.failUnless(osutils.lexists(path))
 
1205
 
 
1206
    def failIfExists(self, path):
 
1207
        """Fail if path, which may be abs or relative, exists."""
 
1208
        self.failIf(osutils.lexists(path))
 
1209
        
 
1210
    def assertFileEqual(self, content, path):
 
1211
        """Fail if path does not contain 'content'."""
 
1212
        self.failUnless(osutils.lexists(path))
 
1213
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
1214
        self.assertEqualDiff(content, open(path, 'r').read())
 
1215
 
 
1216
 
 
1217
class TestCaseWithTransport(TestCaseInTempDir):
 
1218
    """A test case that provides get_url and get_readonly_url facilities.
 
1219
 
 
1220
    These back onto two transport servers, one for readonly access and one for
 
1221
    read write access.
 
1222
 
 
1223
    If no explicit class is provided for readonly access, a
 
1224
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1225
    based read write transports.
 
1226
 
 
1227
    If an explicit class is provided for readonly access, that server and the 
 
1228
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1229
    """
 
1230
 
 
1231
    def __init__(self, methodName='testMethod'):
 
1232
        super(TestCaseWithTransport, self).__init__(methodName)
 
1233
        self.__readonly_server = None
 
1234
        self.__server = None
 
1235
        self.transport_server = default_transport
 
1236
        self.transport_readonly_server = None
 
1237
 
 
1238
    def get_readonly_url(self, relpath=None):
 
1239
        """Get a URL for the readonly transport.
 
1240
 
 
1241
        This will either be backed by '.' or a decorator to the transport 
 
1242
        used by self.get_url()
 
1243
        relpath provides for clients to get a path relative to the base url.
 
1244
        These should only be downwards relative, not upwards.
 
1245
        """
 
1246
        base = self.get_readonly_server().get_url()
 
1247
        if relpath is not None:
 
1248
            if not base.endswith('/'):
 
1249
                base = base + '/'
 
1250
            base = base + relpath
 
1251
        return base
 
1252
 
 
1253
    def get_readonly_server(self):
 
1254
        """Get the server instance for the readonly transport
 
1255
 
 
1256
        This is useful for some tests with specific servers to do diagnostics.
 
1257
        """
 
1258
        if self.__readonly_server is None:
 
1259
            if self.transport_readonly_server is None:
 
1260
                # readonly decorator requested
 
1261
                # bring up the server
 
1262
                self.get_url()
 
1263
                self.__readonly_server = ReadonlyServer()
 
1264
                self.__readonly_server.setUp(self.__server)
 
1265
            else:
 
1266
                self.__readonly_server = self.transport_readonly_server()
 
1267
                self.__readonly_server.setUp()
 
1268
            self.addCleanup(self.__readonly_server.tearDown)
 
1269
        return self.__readonly_server
 
1270
 
 
1271
    def get_server(self):
 
1272
        """Get the read/write server instance.
 
1273
 
 
1274
        This is useful for some tests with specific servers that need
 
1275
        diagnostics.
 
1276
        """
 
1277
        if self.__server is None:
 
1278
            self.__server = self.transport_server()
 
1279
            self.__server.setUp()
 
1280
            self.addCleanup(self.__server.tearDown)
 
1281
        return self.__server
 
1282
 
 
1283
    def get_url(self, relpath=None):
 
1284
        """Get a URL (or maybe a path) for the readwrite transport.
 
1285
 
 
1286
        This will either be backed by '.' or to an equivalent non-file based
 
1287
        facility.
 
1288
        relpath provides for clients to get a path relative to the base url.
 
1289
        These should only be downwards relative, not upwards.
 
1290
        """
 
1291
        base = self.get_server().get_url()
 
1292
        if relpath is not None and relpath != '.':
 
1293
            if not base.endswith('/'):
 
1294
                base = base + '/'
 
1295
            # XXX: Really base should be a url; we did after all call
 
1296
            # get_url()!  But sometimes it's just a path (from
 
1297
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1298
            # to a non-escaped local path.
 
1299
            if base.startswith('./') or base.startswith('/'):
 
1300
                base += relpath
 
1301
            else:
 
1302
                base += urlutils.escape(relpath)
 
1303
        return base
 
1304
 
 
1305
    def get_transport(self):
 
1306
        """Return a writeable transport for the test scratch space"""
 
1307
        t = get_transport(self.get_url())
 
1308
        self.assertFalse(t.is_readonly())
 
1309
        return t
 
1310
 
 
1311
    def get_readonly_transport(self):
 
1312
        """Return a readonly transport for the test scratch space
 
1313
        
 
1314
        This can be used to test that operations which should only need
 
1315
        readonly access in fact do not try to write.
 
1316
        """
 
1317
        t = get_transport(self.get_readonly_url())
 
1318
        self.assertTrue(t.is_readonly())
 
1319
        return t
 
1320
 
 
1321
    def make_branch(self, relpath, format=None):
 
1322
        """Create a branch on the transport at relpath."""
 
1323
        repo = self.make_repository(relpath, format=format)
 
1324
        return repo.bzrdir.create_branch()
 
1325
 
 
1326
    def make_bzrdir(self, relpath, format=None):
 
1327
        try:
 
1328
            # might be a relative or absolute path
 
1329
            maybe_a_url = self.get_url(relpath)
 
1330
            segments = maybe_a_url.rsplit('/', 1)
 
1331
            t = get_transport(maybe_a_url)
 
1332
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1333
                try:
 
1334
                    t.mkdir('.')
 
1335
                except errors.FileExists:
 
1336
                    pass
 
1337
            if format is None:
 
1338
                format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
 
1339
            return format.initialize_on_transport(t)
 
1340
        except errors.UninitializableFormat:
 
1341
            raise TestSkipped("Format %s is not initializable." % format)
 
1342
 
 
1343
    def make_repository(self, relpath, shared=False, format=None):
 
1344
        """Create a repository on our default transport at relpath."""
 
1345
        made_control = self.make_bzrdir(relpath, format=format)
 
1346
        return made_control.create_repository(shared=shared)
 
1347
 
 
1348
    def make_branch_and_memory_tree(self, relpath):
 
1349
        """Create a branch on the default transport and a MemoryTree for it."""
 
1350
        b = self.make_branch(relpath)
 
1351
        return memorytree.MemoryTree.create_on_branch(b)
 
1352
 
 
1353
    def make_branch_and_tree(self, relpath, format=None):
 
1354
        """Create a branch on the transport and a tree locally.
 
1355
 
 
1356
        If the transport is not a LocalTransport, the Tree can't be created on
 
1357
        the transport.  In that case the working tree is created in the local
 
1358
        directory, and the returned tree's branch and repository will also be
 
1359
        accessed locally.
 
1360
 
 
1361
        This will fail if the original default transport for this test
 
1362
        case wasn't backed by the working directory, as the branch won't
 
1363
        be on disk for us to open it.  
 
1364
 
 
1365
        :param format: The BzrDirFormat.
 
1366
        :returns: the WorkingTree.
 
1367
        """
 
1368
        # TODO: always use the local disk path for the working tree,
 
1369
        # this obviously requires a format that supports branch references
 
1370
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1371
        # RBC 20060208
 
1372
        b = self.make_branch(relpath, format=format)
 
1373
        try:
 
1374
            return b.bzrdir.create_workingtree()
 
1375
        except errors.NotLocalUrl:
 
1376
            # We can only make working trees locally at the moment.  If the
 
1377
            # transport can't support them, then reopen the branch on a local
 
1378
            # transport, and create the working tree there.  
 
1379
            #
 
1380
            # Possibly we should instead keep
 
1381
            # the non-disk-backed branch and create a local checkout?
 
1382
            bd = bzrdir.BzrDir.open(relpath)
 
1383
            return bd.create_workingtree()
 
1384
 
 
1385
    def assertIsDirectory(self, relpath, transport):
 
1386
        """Assert that relpath within transport is a directory.
 
1387
 
 
1388
        This may not be possible on all transports; in that case it propagates
 
1389
        a TransportNotPossible.
 
1390
        """
 
1391
        try:
 
1392
            mode = transport.stat(relpath).st_mode
 
1393
        except errors.NoSuchFile:
 
1394
            self.fail("path %s is not a directory; no such file"
 
1395
                      % (relpath))
 
1396
        if not stat.S_ISDIR(mode):
 
1397
            self.fail("path %s is not a directory; has mode %#o"
 
1398
                      % (relpath, mode))
 
1399
 
 
1400
 
 
1401
class ChrootedTestCase(TestCaseWithTransport):
 
1402
    """A support class that provides readonly urls outside the local namespace.
 
1403
 
 
1404
    This is done by checking if self.transport_server is a MemoryServer. if it
 
1405
    is then we are chrooted already, if it is not then an HttpServer is used
 
1406
    for readonly urls.
 
1407
 
 
1408
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
1409
                       be used without needed to redo it when a different 
 
1410
                       subclass is in use ?
 
1411
    """
 
1412
 
 
1413
    def setUp(self):
 
1414
        super(ChrootedTestCase, self).setUp()
 
1415
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
1416
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
1417
 
 
1418
 
 
1419
def filter_suite_by_re(suite, pattern):
 
1420
    result = TestUtil.TestSuite()
 
1421
    filter_re = re.compile(pattern)
 
1422
    for test in iter_suite_tests(suite):
 
1423
        if filter_re.search(test.id()):
 
1424
            result.addTest(test)
 
1425
    return result
 
1426
 
 
1427
 
 
1428
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
1429
              stop_on_failure=False, keep_output=False,
 
1430
              transport=None, lsprof_timed=None, bench_history=None):
 
1431
    TestCaseInTempDir._TEST_NAME = name
 
1432
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
1433
    if verbose:
 
1434
        verbosity = 2
 
1435
        pb = None
 
1436
    else:
 
1437
        verbosity = 1
 
1438
        pb = progress.ProgressBar()
 
1439
    runner = TextTestRunner(stream=sys.stdout,
 
1440
                            descriptions=0,
 
1441
                            verbosity=verbosity,
 
1442
                            keep_output=keep_output,
 
1443
                            pb=pb,
 
1444
                            bench_history=bench_history)
 
1445
    runner.stop_on_failure=stop_on_failure
 
1446
    if pattern != '.*':
 
1447
        suite = filter_suite_by_re(suite, pattern)
 
1448
    result = runner.run(suite)
 
1449
    return result.wasSuccessful()
 
1450
 
 
1451
 
 
1452
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
1453
             keep_output=False,
 
1454
             transport=None,
 
1455
             test_suite_factory=None,
 
1456
             lsprof_timed=None,
 
1457
             bench_history=None):
 
1458
    """Run the whole test suite under the enhanced runner"""
 
1459
    # XXX: Very ugly way to do this...
 
1460
    # Disable warning about old formats because we don't want it to disturb
 
1461
    # any blackbox tests.
 
1462
    from bzrlib import repository
 
1463
    repository._deprecation_warning_done = True
 
1464
 
 
1465
    global default_transport
 
1466
    if transport is None:
 
1467
        transport = default_transport
 
1468
    old_transport = default_transport
 
1469
    default_transport = transport
 
1470
    try:
 
1471
        if test_suite_factory is None:
 
1472
            suite = test_suite()
 
1473
        else:
 
1474
            suite = test_suite_factory()
 
1475
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
1476
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
1477
                     transport=transport,
 
1478
                     lsprof_timed=lsprof_timed,
 
1479
                     bench_history=bench_history)
 
1480
    finally:
 
1481
        default_transport = old_transport
 
1482
 
 
1483
 
 
1484
def test_suite():
 
1485
    """Build and return TestSuite for the whole of bzrlib.
 
1486
    
 
1487
    This function can be replaced if you need to change the default test
 
1488
    suite on a global basis, but it is not encouraged.
 
1489
    """
 
1490
    testmod_names = [
 
1491
                   'bzrlib.tests.test_ancestry',
 
1492
                   'bzrlib.tests.test_api',
 
1493
                   'bzrlib.tests.test_atomicfile',
 
1494
                   'bzrlib.tests.test_bad_files',
 
1495
                   'bzrlib.tests.test_branch',
 
1496
                   'bzrlib.tests.test_bundle',
 
1497
                   'bzrlib.tests.test_bzrdir',
 
1498
                   'bzrlib.tests.test_cache_utf8',
 
1499
                   'bzrlib.tests.test_command',
 
1500
                   'bzrlib.tests.test_commit',
 
1501
                   'bzrlib.tests.test_commit_merge',
 
1502
                   'bzrlib.tests.test_config',
 
1503
                   'bzrlib.tests.test_conflicts',
 
1504
                   'bzrlib.tests.test_decorators',
 
1505
                   'bzrlib.tests.test_diff',
 
1506
                   'bzrlib.tests.test_doc_generate',
 
1507
                   'bzrlib.tests.test_errors',
 
1508
                   'bzrlib.tests.test_escaped_store',
 
1509
                   'bzrlib.tests.test_fetch',
 
1510
                   'bzrlib.tests.test_ftp_transport',
 
1511
                   'bzrlib.tests.test_gpg',
 
1512
                   'bzrlib.tests.test_graph',
 
1513
                   'bzrlib.tests.test_hashcache',
 
1514
                   'bzrlib.tests.test_http',
 
1515
                   'bzrlib.tests.test_http_response',
 
1516
                   'bzrlib.tests.test_identitymap',
 
1517
                   'bzrlib.tests.test_ignores',
 
1518
                   'bzrlib.tests.test_inv',
 
1519
                   'bzrlib.tests.test_knit',
 
1520
                   'bzrlib.tests.test_lazy_import',
 
1521
                   'bzrlib.tests.test_lockdir',
 
1522
                   'bzrlib.tests.test_lockable_files',
 
1523
                   'bzrlib.tests.test_log',
 
1524
                   'bzrlib.tests.test_memorytree',
 
1525
                   'bzrlib.tests.test_merge',
 
1526
                   'bzrlib.tests.test_merge3',
 
1527
                   'bzrlib.tests.test_merge_core',
 
1528
                   'bzrlib.tests.test_missing',
 
1529
                   'bzrlib.tests.test_msgeditor',
 
1530
                   'bzrlib.tests.test_nonascii',
 
1531
                   'bzrlib.tests.test_options',
 
1532
                   'bzrlib.tests.test_osutils',
 
1533
                   'bzrlib.tests.test_patch',
 
1534
                   'bzrlib.tests.test_patches',
 
1535
                   'bzrlib.tests.test_permissions',
 
1536
                   'bzrlib.tests.test_plugins',
 
1537
                   'bzrlib.tests.test_progress',
 
1538
                   'bzrlib.tests.test_reconcile',
 
1539
                   'bzrlib.tests.test_repository',
 
1540
                   'bzrlib.tests.test_revert',
 
1541
                   'bzrlib.tests.test_revision',
 
1542
                   'bzrlib.tests.test_revisionnamespaces',
 
1543
                   'bzrlib.tests.test_revisiontree',
 
1544
                   'bzrlib.tests.test_rio',
 
1545
                   'bzrlib.tests.test_sampler',
 
1546
                   'bzrlib.tests.test_selftest',
 
1547
                   'bzrlib.tests.test_setup',
 
1548
                   'bzrlib.tests.test_sftp_transport',
 
1549
                   'bzrlib.tests.test_smart_add',
 
1550
                   'bzrlib.tests.test_smart_transport',
 
1551
                   'bzrlib.tests.test_source',
 
1552
                   'bzrlib.tests.test_status',
 
1553
                   'bzrlib.tests.test_store',
 
1554
                   'bzrlib.tests.test_symbol_versioning',
 
1555
                   'bzrlib.tests.test_testament',
 
1556
                   'bzrlib.tests.test_textfile',
 
1557
                   'bzrlib.tests.test_textmerge',
 
1558
                   'bzrlib.tests.test_trace',
 
1559
                   'bzrlib.tests.test_transactions',
 
1560
                   'bzrlib.tests.test_transform',
 
1561
                   'bzrlib.tests.test_transport',
 
1562
                   'bzrlib.tests.test_tree',
 
1563
                   'bzrlib.tests.test_treebuilder',
 
1564
                   'bzrlib.tests.test_tsort',
 
1565
                   'bzrlib.tests.test_tuned_gzip',
 
1566
                   'bzrlib.tests.test_ui',
 
1567
                   'bzrlib.tests.test_upgrade',
 
1568
                   'bzrlib.tests.test_urlutils',
 
1569
                   'bzrlib.tests.test_versionedfile',
 
1570
                   'bzrlib.tests.test_version',
 
1571
                   'bzrlib.tests.test_weave',
 
1572
                   'bzrlib.tests.test_whitebox',
 
1573
                   'bzrlib.tests.test_workingtree',
 
1574
                   'bzrlib.tests.test_xml',
 
1575
                   ]
 
1576
    test_transport_implementations = [
 
1577
        'bzrlib.tests.test_transport_implementations',
 
1578
        'bzrlib.tests.test_read_bundle',
 
1579
        ]
 
1580
    suite = TestUtil.TestSuite()
 
1581
    loader = TestUtil.TestLoader()
 
1582
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
1583
    from bzrlib.transport import TransportTestProviderAdapter
 
1584
    adapter = TransportTestProviderAdapter()
 
1585
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1586
    for package in packages_to_test():
 
1587
        suite.addTest(package.test_suite())
 
1588
    for m in MODULES_TO_TEST:
 
1589
        suite.addTest(loader.loadTestsFromModule(m))
 
1590
    for m in MODULES_TO_DOCTEST:
 
1591
        suite.addTest(doctest.DocTestSuite(m))
 
1592
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1593
        if getattr(plugin, 'test_suite', None) is not None:
 
1594
            suite.addTest(plugin.test_suite())
 
1595
    return suite
 
1596
 
 
1597
 
 
1598
def adapt_modules(mods_list, adapter, loader, suite):
 
1599
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1600
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
1601
        suite.addTests(adapter.adapt(test))