/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Vincent Ladeuil
  • Date: 2007-06-20 14:25:06 UTC
  • mfrom: (2540 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2646.
  • Revision ID: v.ladeuil+lp@free.fr-20070620142506-txsb1v8538kpsafw
merge bzr.dev @ 2540

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
from testsweet import TestBase, run_suite, InTempDir
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import stat
 
41
from subprocess import Popen, PIPE
 
42
import sys
 
43
import tempfile
 
44
import unittest
 
45
import time
 
46
import warnings
 
47
 
 
48
 
 
49
from bzrlib import (
 
50
    bzrdir,
 
51
    debug,
 
52
    errors,
 
53
    memorytree,
 
54
    osutils,
 
55
    progress,
 
56
    ui,
 
57
    urlutils,
 
58
    workingtree,
 
59
    )
 
60
import bzrlib.branch
19
61
import bzrlib.commands
 
62
import bzrlib.timestamp
 
63
import bzrlib.export
 
64
import bzrlib.inventory
 
65
import bzrlib.iterablefile
 
66
import bzrlib.lockdir
 
67
try:
 
68
    import bzrlib.lsprof
 
69
except ImportError:
 
70
    # lsprof not available
 
71
    pass
 
72
from bzrlib.merge import merge_inner
 
73
import bzrlib.merge3
 
74
import bzrlib.osutils
 
75
import bzrlib.plugin
 
76
from bzrlib.revision import common_ancestor
 
77
import bzrlib.store
 
78
from bzrlib import symbol_versioning
 
79
import bzrlib.trace
 
80
from bzrlib.transport import get_transport
 
81
import bzrlib.transport
 
82
from bzrlib.transport.local import LocalURLServer
 
83
from bzrlib.transport.memory import MemoryServer
 
84
from bzrlib.transport.readonly import ReadonlyServer
 
85
from bzrlib.trace import mutter, note
 
86
from bzrlib.tests import TestUtil
 
87
from bzrlib.tests.HttpServer import HttpServer
 
88
from bzrlib.tests.TestUtil import (
 
89
                          TestSuite,
 
90
                          TestLoader,
 
91
                          )
 
92
from bzrlib.tests.treeshape import build_tree_contents
 
93
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
94
 
 
95
# Mark this python module as being part of the implementation
 
96
# of unittest: this gives us better tracebacks where the last
 
97
# shown frame is the test code, not our assertXYZ.
 
98
__unittest = 1
 
99
 
 
100
default_transport = LocalURLServer
20
101
 
21
102
MODULES_TO_TEST = []
22
 
MODULES_TO_DOCTEST = []
23
 
 
24
 
 
25
 
class BzrTestBase(InTempDir):
26
 
    """bzr-specific test base class"""
 
103
MODULES_TO_DOCTEST = [
 
104
                      bzrlib.timestamp,
 
105
                      bzrlib.errors,
 
106
                      bzrlib.export,
 
107
                      bzrlib.inventory,
 
108
                      bzrlib.iterablefile,
 
109
                      bzrlib.lockdir,
 
110
                      bzrlib.merge3,
 
111
                      bzrlib.option,
 
112
                      bzrlib.store,
 
113
                      ]
 
114
 
 
115
 
 
116
def packages_to_test():
 
117
    """Return a list of packages to test.
 
118
 
 
119
    The packages are not globally imported so that import failures are
 
120
    triggered when running selftest, not when importing the command.
 
121
    """
 
122
    import bzrlib.doc
 
123
    import bzrlib.tests.blackbox
 
124
    import bzrlib.tests.branch_implementations
 
125
    import bzrlib.tests.bzrdir_implementations
 
126
    import bzrlib.tests.commands
 
127
    import bzrlib.tests.interrepository_implementations
 
128
    import bzrlib.tests.interversionedfile_implementations
 
129
    import bzrlib.tests.intertree_implementations
 
130
    import bzrlib.tests.per_lock
 
131
    import bzrlib.tests.repository_implementations
 
132
    import bzrlib.tests.revisionstore_implementations
 
133
    import bzrlib.tests.tree_implementations
 
134
    import bzrlib.tests.workingtree_implementations
 
135
    return [
 
136
            bzrlib.doc,
 
137
            bzrlib.tests.blackbox,
 
138
            bzrlib.tests.branch_implementations,
 
139
            bzrlib.tests.bzrdir_implementations,
 
140
            bzrlib.tests.commands,
 
141
            bzrlib.tests.interrepository_implementations,
 
142
            bzrlib.tests.interversionedfile_implementations,
 
143
            bzrlib.tests.intertree_implementations,
 
144
            bzrlib.tests.per_lock,
 
145
            bzrlib.tests.repository_implementations,
 
146
            bzrlib.tests.revisionstore_implementations,
 
147
            bzrlib.tests.tree_implementations,
 
148
            bzrlib.tests.workingtree_implementations,
 
149
            ]
 
150
 
 
151
 
 
152
class ExtendedTestResult(unittest._TextTestResult):
 
153
    """Accepts, reports and accumulates the results of running tests.
 
154
 
 
155
    Compared to this unittest version this class adds support for profiling,
 
156
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
157
    There are further-specialized subclasses for different types of display.
 
158
    """
 
159
 
 
160
    stop_early = False
 
161
    
 
162
    def __init__(self, stream, descriptions, verbosity,
 
163
                 bench_history=None,
 
164
                 num_tests=None,
 
165
                 use_numbered_dirs=False,
 
166
                 ):
 
167
        """Construct new TestResult.
 
168
 
 
169
        :param bench_history: Optionally, a writable file object to accumulate
 
170
            benchmark results.
 
171
        """
 
172
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
173
        if bench_history is not None:
 
174
            from bzrlib.version import _get_bzr_source_tree
 
175
            src_tree = _get_bzr_source_tree()
 
176
            if src_tree:
 
177
                try:
 
178
                    revision_id = src_tree.get_parent_ids()[0]
 
179
                except IndexError:
 
180
                    # XXX: if this is a brand new tree, do the same as if there
 
181
                    # is no branch.
 
182
                    revision_id = ''
 
183
            else:
 
184
                # XXX: If there's no branch, what should we do?
 
185
                revision_id = ''
 
186
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
187
        self._bench_history = bench_history
 
188
        self.ui = ui.ui_factory
 
189
        self.num_tests = num_tests
 
190
        self.error_count = 0
 
191
        self.failure_count = 0
 
192
        self.known_failure_count = 0
 
193
        self.skip_count = 0
 
194
        self.unsupported = {}
 
195
        self.count = 0
 
196
        self.use_numbered_dirs = use_numbered_dirs
 
197
        self._overall_start_time = time.time()
 
198
    
 
199
    def extractBenchmarkTime(self, testCase):
 
200
        """Add a benchmark time for the current test case."""
 
201
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
202
    
 
203
    def _elapsedTestTimeString(self):
 
204
        """Return a time string for the overall time the current test has taken."""
 
205
        return self._formatTime(time.time() - self._start_time)
 
206
 
 
207
    def _testTimeString(self):
 
208
        if self._benchmarkTime is not None:
 
209
            return "%s/%s" % (
 
210
                self._formatTime(self._benchmarkTime),
 
211
                self._elapsedTestTimeString())
 
212
        else:
 
213
            return "           %s" % self._elapsedTestTimeString()
 
214
 
 
215
    def _formatTime(self, seconds):
 
216
        """Format seconds as milliseconds with leading spaces."""
 
217
        # some benchmarks can take thousands of seconds to run, so we need 8
 
218
        # places
 
219
        return "%8dms" % (1000 * seconds)
 
220
 
 
221
    def _shortened_test_description(self, test):
 
222
        what = test.id()
 
223
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
224
        return what
 
225
 
 
226
    def startTest(self, test):
 
227
        unittest.TestResult.startTest(self, test)
 
228
        self.report_test_start(test)
 
229
        test.number = self.count
 
230
        self._recordTestStartTime()
 
231
 
 
232
    def _recordTestStartTime(self):
 
233
        """Record that a test has started."""
 
234
        self._start_time = time.time()
 
235
 
 
236
    def _cleanupLogFile(self, test):
 
237
        # We can only do this if we have one of our TestCases, not if
 
238
        # we have a doctest.
 
239
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
240
        if setKeepLogfile is not None:
 
241
            setKeepLogfile()
 
242
 
 
243
    def addError(self, test, err):
 
244
        self.extractBenchmarkTime(test)
 
245
        self._cleanupLogFile(test)
 
246
        if isinstance(err[1], TestSkipped):
 
247
            return self.addSkipped(test, err)
 
248
        elif isinstance(err[1], UnavailableFeature):
 
249
            return self.addNotSupported(test, err[1].args[0])
 
250
        unittest.TestResult.addError(self, test, err)
 
251
        self.error_count += 1
 
252
        self.report_error(test, err)
 
253
        if self.stop_early:
 
254
            self.stop()
 
255
 
 
256
    def addFailure(self, test, err):
 
257
        self._cleanupLogFile(test)
 
258
        self.extractBenchmarkTime(test)
 
259
        if isinstance(err[1], KnownFailure):
 
260
            return self.addKnownFailure(test, err)
 
261
        unittest.TestResult.addFailure(self, test, err)
 
262
        self.failure_count += 1
 
263
        self.report_failure(test, err)
 
264
        if self.stop_early:
 
265
            self.stop()
 
266
 
 
267
    def addKnownFailure(self, test, err):
 
268
        self.known_failure_count += 1
 
269
        self.report_known_failure(test, err)
 
270
 
 
271
    def addNotSupported(self, test, feature):
 
272
        self.unsupported.setdefault(str(feature), 0)
 
273
        self.unsupported[str(feature)] += 1
 
274
        self.report_unsupported(test, feature)
 
275
 
 
276
    def addSuccess(self, test):
 
277
        self.extractBenchmarkTime(test)
 
278
        if self._bench_history is not None:
 
279
            if self._benchmarkTime is not None:
 
280
                self._bench_history.write("%s %s\n" % (
 
281
                    self._formatTime(self._benchmarkTime),
 
282
                    test.id()))
 
283
        self.report_success(test)
 
284
        unittest.TestResult.addSuccess(self, test)
 
285
 
 
286
    def addSkipped(self, test, skip_excinfo):
 
287
        self.report_skip(test, skip_excinfo)
 
288
        # seems best to treat this as success from point-of-view of unittest
 
289
        # -- it actually does nothing so it barely matters :)
 
290
        try:
 
291
            test.tearDown()
 
292
        except KeyboardInterrupt:
 
293
            raise
 
294
        except:
 
295
            self.addError(test, test.__exc_info())
 
296
        else:
 
297
            unittest.TestResult.addSuccess(self, test)
 
298
 
 
299
    def printErrorList(self, flavour, errors):
 
300
        for test, err in errors:
 
301
            self.stream.writeln(self.separator1)
 
302
            self.stream.write("%s: " % flavour)
 
303
            if self.use_numbered_dirs:
 
304
                self.stream.write('#%d ' % test.number)
 
305
            self.stream.writeln(self.getDescription(test))
 
306
            if getattr(test, '_get_log', None) is not None:
 
307
                print >>self.stream
 
308
                print >>self.stream, \
 
309
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
310
                print >>self.stream, test._get_log()
 
311
                print >>self.stream, \
 
312
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
313
            self.stream.writeln(self.separator2)
 
314
            self.stream.writeln("%s" % err)
 
315
 
 
316
    def finished(self):
 
317
        pass
 
318
 
 
319
    def report_cleaning_up(self):
 
320
        pass
 
321
 
 
322
    def report_success(self, test):
 
323
        pass
 
324
 
 
325
 
 
326
class TextTestResult(ExtendedTestResult):
 
327
    """Displays progress and results of tests in text form"""
 
328
 
 
329
    def __init__(self, stream, descriptions, verbosity,
 
330
                 bench_history=None,
 
331
                 num_tests=None,
 
332
                 pb=None,
 
333
                 use_numbered_dirs=False,
 
334
                 ):
 
335
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
336
            bench_history, num_tests, use_numbered_dirs)
 
337
        if pb is None:
 
338
            self.pb = self.ui.nested_progress_bar()
 
339
            self._supplied_pb = False
 
340
        else:
 
341
            self.pb = pb
 
342
            self._supplied_pb = True
 
343
        self.pb.show_pct = False
 
344
        self.pb.show_spinner = False
 
345
        self.pb.show_eta = False,
 
346
        self.pb.show_count = False
 
347
        self.pb.show_bar = False
 
348
 
 
349
    def report_starting(self):
 
350
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
351
 
 
352
    def _progress_prefix_text(self):
 
353
        a = '[%d' % self.count
 
354
        if self.num_tests is not None:
 
355
            a +='/%d' % self.num_tests
 
356
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
357
        if self.error_count:
 
358
            a += ', %d errors' % self.error_count
 
359
        if self.failure_count:
 
360
            a += ', %d failed' % self.failure_count
 
361
        if self.known_failure_count:
 
362
            a += ', %d known failures' % self.known_failure_count
 
363
        if self.skip_count:
 
364
            a += ', %d skipped' % self.skip_count
 
365
        if self.unsupported:
 
366
            a += ', %d missing features' % len(self.unsupported)
 
367
        a += ']'
 
368
        return a
 
369
 
 
370
    def report_test_start(self, test):
 
371
        self.count += 1
 
372
        self.pb.update(
 
373
                self._progress_prefix_text()
 
374
                + ' ' 
 
375
                + self._shortened_test_description(test))
 
376
 
 
377
    def _test_description(self, test):
 
378
        if self.use_numbered_dirs:
 
379
            return '#%d %s' % (self.count,
 
380
                               self._shortened_test_description(test))
 
381
        else:
 
382
            return self._shortened_test_description(test)
 
383
 
 
384
    def report_error(self, test, err):
 
385
        self.pb.note('ERROR: %s\n    %s\n', 
 
386
            self._test_description(test),
 
387
            err[1],
 
388
            )
 
389
 
 
390
    def report_failure(self, test, err):
 
391
        self.pb.note('FAIL: %s\n    %s\n', 
 
392
            self._test_description(test),
 
393
            err[1],
 
394
            )
 
395
 
 
396
    def report_known_failure(self, test, err):
 
397
        self.pb.note('XFAIL: %s\n%s\n',
 
398
            self._test_description(test), err[1])
 
399
 
 
400
    def report_skip(self, test, skip_excinfo):
 
401
        self.skip_count += 1
 
402
        if False:
 
403
            # at the moment these are mostly not things we can fix
 
404
            # and so they just produce stipple; use the verbose reporter
 
405
            # to see them.
 
406
            if False:
 
407
                # show test and reason for skip
 
408
                self.pb.note('SKIP: %s\n    %s\n', 
 
409
                    self._shortened_test_description(test),
 
410
                    skip_excinfo[1])
 
411
            else:
 
412
                # since the class name was left behind in the still-visible
 
413
                # progress bar...
 
414
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
415
 
 
416
    def report_unsupported(self, test, feature):
 
417
        """test cannot be run because feature is missing."""
 
418
                  
 
419
    def report_cleaning_up(self):
 
420
        self.pb.update('cleaning up...')
 
421
 
 
422
    def finished(self):
 
423
        if not self._supplied_pb:
 
424
            self.pb.finished()
 
425
 
 
426
 
 
427
class VerboseTestResult(ExtendedTestResult):
 
428
    """Produce long output, with one line per test run plus times"""
 
429
 
 
430
    def _ellipsize_to_right(self, a_string, final_width):
 
431
        """Truncate and pad a string, keeping the right hand side"""
 
432
        if len(a_string) > final_width:
 
433
            result = '...' + a_string[3-final_width:]
 
434
        else:
 
435
            result = a_string
 
436
        return result.ljust(final_width)
 
437
 
 
438
    def report_starting(self):
 
439
        self.stream.write('running %d tests...\n' % self.num_tests)
 
440
 
 
441
    def report_test_start(self, test):
 
442
        self.count += 1
 
443
        name = self._shortened_test_description(test)
 
444
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
445
        # numbers, plus a trailing blank
 
446
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
447
        if self.use_numbered_dirs:
 
448
            self.stream.write('%5d ' % self.count)
 
449
            self.stream.write(self._ellipsize_to_right(name,
 
450
                                osutils.terminal_width()-36))
 
451
        else:
 
452
            self.stream.write(self._ellipsize_to_right(name,
 
453
                                osutils.terminal_width()-30))
 
454
        self.stream.flush()
 
455
 
 
456
    def _error_summary(self, err):
 
457
        indent = ' ' * 4
 
458
        if self.use_numbered_dirs:
 
459
            indent += ' ' * 6
 
460
        return '%s%s' % (indent, err[1])
 
461
 
 
462
    def report_error(self, test, err):
 
463
        self.stream.writeln('ERROR %s\n%s'
 
464
                % (self._testTimeString(),
 
465
                   self._error_summary(err)))
 
466
 
 
467
    def report_failure(self, test, err):
 
468
        self.stream.writeln(' FAIL %s\n%s'
 
469
                % (self._testTimeString(),
 
470
                   self._error_summary(err)))
 
471
 
 
472
    def report_known_failure(self, test, err):
 
473
        self.stream.writeln('XFAIL %s\n%s'
 
474
                % (self._testTimeString(),
 
475
                   self._error_summary(err)))
 
476
 
 
477
    def report_success(self, test):
 
478
        self.stream.writeln('   OK %s' % self._testTimeString())
 
479
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
480
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
481
            stats.pprint(file=self.stream)
 
482
        # flush the stream so that we get smooth output. This verbose mode is
 
483
        # used to show the output in PQM.
 
484
        self.stream.flush()
 
485
 
 
486
    def report_skip(self, test, skip_excinfo):
 
487
        self.skip_count += 1
 
488
        self.stream.writeln(' SKIP %s\n%s'
 
489
                % (self._testTimeString(),
 
490
                   self._error_summary(skip_excinfo)))
 
491
 
 
492
    def report_unsupported(self, test, feature):
 
493
        """test cannot be run because feature is missing."""
 
494
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
495
                %(self._testTimeString(), feature))
 
496
                  
 
497
 
 
498
 
 
499
class TextTestRunner(object):
 
500
    stop_on_failure = False
 
501
 
 
502
    def __init__(self,
 
503
                 stream=sys.stderr,
 
504
                 descriptions=0,
 
505
                 verbosity=1,
 
506
                 bench_history=None,
 
507
                 use_numbered_dirs=False,
 
508
                 list_only=False
 
509
                 ):
 
510
        self.stream = unittest._WritelnDecorator(stream)
 
511
        self.descriptions = descriptions
 
512
        self.verbosity = verbosity
 
513
        self._bench_history = bench_history
 
514
        self.use_numbered_dirs = use_numbered_dirs
 
515
        self.list_only = list_only
 
516
 
 
517
    def run(self, test):
 
518
        "Run the given test case or test suite."
 
519
        startTime = time.time()
 
520
        if self.verbosity == 1:
 
521
            result_class = TextTestResult
 
522
        elif self.verbosity >= 2:
 
523
            result_class = VerboseTestResult
 
524
        result = result_class(self.stream,
 
525
                              self.descriptions,
 
526
                              self.verbosity,
 
527
                              bench_history=self._bench_history,
 
528
                              num_tests=test.countTestCases(),
 
529
                              use_numbered_dirs=self.use_numbered_dirs,
 
530
                              )
 
531
        result.stop_early = self.stop_on_failure
 
532
        result.report_starting()
 
533
        if self.list_only:
 
534
            if self.verbosity >= 2:
 
535
                self.stream.writeln("Listing tests only ...\n")
 
536
            run = 0
 
537
            for t in iter_suite_tests(test):
 
538
                self.stream.writeln("%s" % (t.id()))
 
539
                run += 1
 
540
            actionTaken = "Listed"
 
541
        else: 
 
542
            test.run(result)
 
543
            run = result.testsRun
 
544
            actionTaken = "Ran"
 
545
        stopTime = time.time()
 
546
        timeTaken = stopTime - startTime
 
547
        result.printErrors()
 
548
        self.stream.writeln(result.separator2)
 
549
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
550
                            run, run != 1 and "s" or "", timeTaken))
 
551
        self.stream.writeln()
 
552
        if not result.wasSuccessful():
 
553
            self.stream.write("FAILED (")
 
554
            failed, errored = map(len, (result.failures, result.errors))
 
555
            if failed:
 
556
                self.stream.write("failures=%d" % failed)
 
557
            if errored:
 
558
                if failed: self.stream.write(", ")
 
559
                self.stream.write("errors=%d" % errored)
 
560
            if result.known_failure_count:
 
561
                if failed or errored: self.stream.write(", ")
 
562
                self.stream.write("known_failure_count=%d" %
 
563
                    result.known_failure_count)
 
564
            self.stream.writeln(")")
 
565
        else:
 
566
            if result.known_failure_count:
 
567
                self.stream.writeln("OK (known_failures=%d)" %
 
568
                    result.known_failure_count)
 
569
            else:
 
570
                self.stream.writeln("OK")
 
571
        if result.skip_count > 0:
 
572
            skipped = result.skip_count
 
573
            self.stream.writeln('%d test%s skipped' %
 
574
                                (skipped, skipped != 1 and "s" or ""))
 
575
        if result.unsupported:
 
576
            for feature, count in sorted(result.unsupported.items()):
 
577
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
578
                    (feature, count))
 
579
        result.finished()
 
580
        return result
 
581
 
 
582
 
 
583
def iter_suite_tests(suite):
 
584
    """Return all tests in a suite, recursing through nested suites"""
 
585
    for item in suite._tests:
 
586
        if isinstance(item, unittest.TestCase):
 
587
            yield item
 
588
        elif isinstance(item, unittest.TestSuite):
 
589
            for r in iter_suite_tests(item):
 
590
                yield r
 
591
        else:
 
592
            raise Exception('unknown object %r inside test suite %r'
 
593
                            % (item, suite))
 
594
 
 
595
 
 
596
class TestSkipped(Exception):
 
597
    """Indicates that a test was intentionally skipped, rather than failing."""
 
598
 
 
599
 
 
600
class KnownFailure(AssertionError):
 
601
    """Indicates that a test failed in a precisely expected manner.
 
602
 
 
603
    Such failures dont block the whole test suite from passing because they are
 
604
    indicators of partially completed code or of future work. We have an
 
605
    explicit error for them so that we can ensure that they are always visible:
 
606
    KnownFailures are always shown in the output of bzr selftest.
 
607
    """
 
608
 
 
609
 
 
610
class UnavailableFeature(Exception):
 
611
    """A feature required for this test was not available.
 
612
 
 
613
    The feature should be used to construct the exception.
 
614
    """
 
615
 
 
616
 
 
617
class CommandFailed(Exception):
 
618
    pass
 
619
 
 
620
 
 
621
class StringIOWrapper(object):
 
622
    """A wrapper around cStringIO which just adds an encoding attribute.
 
623
    
 
624
    Internally we can check sys.stdout to see what the output encoding
 
625
    should be. However, cStringIO has no encoding attribute that we can
 
626
    set. So we wrap it instead.
 
627
    """
 
628
    encoding='ascii'
 
629
    _cstring = None
 
630
 
 
631
    def __init__(self, s=None):
 
632
        if s is not None:
 
633
            self.__dict__['_cstring'] = StringIO(s)
 
634
        else:
 
635
            self.__dict__['_cstring'] = StringIO()
 
636
 
 
637
    def __getattr__(self, name, getattr=getattr):
 
638
        return getattr(self.__dict__['_cstring'], name)
 
639
 
 
640
    def __setattr__(self, name, val):
 
641
        if name == 'encoding':
 
642
            self.__dict__['encoding'] = val
 
643
        else:
 
644
            return setattr(self._cstring, name, val)
 
645
 
 
646
 
 
647
class TestUIFactory(ui.CLIUIFactory):
 
648
    """A UI Factory for testing.
 
649
 
 
650
    Hide the progress bar but emit note()s.
 
651
    Redirect stdin.
 
652
    Allows get_password to be tested without real tty attached.
 
653
    """
 
654
 
 
655
    def __init__(self,
 
656
                 stdout=None,
 
657
                 stderr=None,
 
658
                 stdin=None):
 
659
        super(TestUIFactory, self).__init__()
 
660
        if stdin is not None:
 
661
            # We use a StringIOWrapper to be able to test various
 
662
            # encodings, but the user is still responsible to
 
663
            # encode the string and to set the encoding attribute
 
664
            # of StringIOWrapper.
 
665
            self.stdin = StringIOWrapper(stdin)
 
666
        if stdout is None:
 
667
            self.stdout = sys.stdout
 
668
        else:
 
669
            self.stdout = stdout
 
670
        if stderr is None:
 
671
            self.stderr = sys.stderr
 
672
        else:
 
673
            self.stderr = stderr
 
674
 
 
675
    def clear(self):
 
676
        """See progress.ProgressBar.clear()."""
 
677
 
 
678
    def clear_term(self):
 
679
        """See progress.ProgressBar.clear_term()."""
 
680
 
 
681
    def clear_term(self):
 
682
        """See progress.ProgressBar.clear_term()."""
 
683
 
 
684
    def finished(self):
 
685
        """See progress.ProgressBar.finished()."""
 
686
 
 
687
    def note(self, fmt_string, *args, **kwargs):
 
688
        """See progress.ProgressBar.note()."""
 
689
        self.stdout.write((fmt_string + "\n") % args)
 
690
 
 
691
    def progress_bar(self):
 
692
        return self
 
693
 
 
694
    def nested_progress_bar(self):
 
695
        return self
 
696
 
 
697
    def update(self, message, count=None, total=None):
 
698
        """See progress.ProgressBar.update()."""
 
699
 
 
700
    def get_non_echoed_password(self, prompt):
 
701
        """Get password from stdin without trying to handle the echo mode"""
 
702
        if prompt:
 
703
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
704
        password = self.stdin.readline()
 
705
        if not password:
 
706
            raise EOFError
 
707
        if password[-1] == '\n':
 
708
            password = password[:-1]
 
709
        return password
 
710
 
 
711
 
 
712
class TestCase(unittest.TestCase):
 
713
    """Base class for bzr unit tests.
 
714
    
 
715
    Tests that need access to disk resources should subclass 
 
716
    TestCaseInTempDir not TestCase.
 
717
 
 
718
    Error and debug log messages are redirected from their usual
 
719
    location into a temporary file, the contents of which can be
 
720
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
721
    so that it can also capture file IO.  When the test completes this file
 
722
    is read into memory and removed from disk.
 
723
       
 
724
    There are also convenience functions to invoke bzr's command-line
 
725
    routine, and to build and check bzr trees.
 
726
   
 
727
    In addition to the usual method of overriding tearDown(), this class also
 
728
    allows subclasses to register functions into the _cleanups list, which is
 
729
    run in order as the object is torn down.  It's less likely this will be
 
730
    accidentally overlooked.
 
731
    """
 
732
 
 
733
    _log_file_name = None
 
734
    _log_contents = ''
 
735
    _keep_log_file = False
 
736
    # record lsprof data when performing benchmark calls.
 
737
    _gather_lsprof_in_benchmarks = False
 
738
 
 
739
    def __init__(self, methodName='testMethod'):
 
740
        super(TestCase, self).__init__(methodName)
 
741
        self._cleanups = []
 
742
 
 
743
    def setUp(self):
 
744
        unittest.TestCase.setUp(self)
 
745
        self._cleanEnvironment()
 
746
        bzrlib.trace.disable_default_logging()
 
747
        self._silenceUI()
 
748
        self._startLogFile()
 
749
        self._benchcalls = []
 
750
        self._benchtime = None
 
751
        self._clear_hooks()
 
752
 
 
753
    def _clear_hooks(self):
 
754
        # prevent hooks affecting tests
 
755
        import bzrlib.branch
 
756
        import bzrlib.smart.server
 
757
        self._preserved_hooks = {
 
758
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
759
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
760
            }
 
761
        self.addCleanup(self._restoreHooks)
 
762
        # reset all hooks to an empty instance of the appropriate type
 
763
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
764
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
765
        # FIXME: Rather than constructing new objects like this, how about
 
766
        # having save() and clear() methods on the base Hook class? mbp
 
767
        # 20070416
 
768
 
 
769
    def _silenceUI(self):
 
770
        """Turn off UI for duration of test"""
 
771
        # by default the UI is off; tests can turn it on if they want it.
 
772
        saved = ui.ui_factory
 
773
        def _restore():
 
774
            ui.ui_factory = saved
 
775
        ui.ui_factory = ui.SilentUIFactory()
 
776
        self.addCleanup(_restore)
 
777
 
 
778
    def _ndiff_strings(self, a, b):
 
779
        """Return ndiff between two strings containing lines.
 
780
        
 
781
        A trailing newline is added if missing to make the strings
 
782
        print properly."""
 
783
        if b and b[-1] != '\n':
 
784
            b += '\n'
 
785
        if a and a[-1] != '\n':
 
786
            a += '\n'
 
787
        difflines = difflib.ndiff(a.splitlines(True),
 
788
                                  b.splitlines(True),
 
789
                                  linejunk=lambda x: False,
 
790
                                  charjunk=lambda x: False)
 
791
        return ''.join(difflines)
 
792
 
 
793
    def assertEqual(self, a, b, message=''):
 
794
        try:
 
795
            if a == b:
 
796
                return
 
797
        except UnicodeError, e:
 
798
            # If we can't compare without getting a UnicodeError, then
 
799
            # obviously they are different
 
800
            mutter('UnicodeError: %s', e)
 
801
        if message:
 
802
            message += '\n'
 
803
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
804
            % (message,
 
805
               pformat(a), pformat(b)))
 
806
 
 
807
    assertEquals = assertEqual
 
808
 
 
809
    def assertEqualDiff(self, a, b, message=None):
 
810
        """Assert two texts are equal, if not raise an exception.
 
811
        
 
812
        This is intended for use with multi-line strings where it can 
 
813
        be hard to find the differences by eye.
 
814
        """
 
815
        # TODO: perhaps override assertEquals to call this for strings?
 
816
        if a == b:
 
817
            return
 
818
        if message is None:
 
819
            message = "texts not equal:\n"
 
820
        raise AssertionError(message + 
 
821
                             self._ndiff_strings(a, b))      
 
822
        
 
823
    def assertEqualMode(self, mode, mode_test):
 
824
        self.assertEqual(mode, mode_test,
 
825
                         'mode mismatch %o != %o' % (mode, mode_test))
 
826
 
 
827
    def assertStartsWith(self, s, prefix):
 
828
        if not s.startswith(prefix):
 
829
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
830
 
 
831
    def assertEndsWith(self, s, suffix):
 
832
        """Asserts that s ends with suffix."""
 
833
        if not s.endswith(suffix):
 
834
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
835
 
 
836
    def assertContainsRe(self, haystack, needle_re):
 
837
        """Assert that a contains something matching a regular expression."""
 
838
        if not re.search(needle_re, haystack):
 
839
            raise AssertionError('pattern "%r" not found in "%r"'
 
840
                    % (needle_re, haystack))
 
841
 
 
842
    def assertNotContainsRe(self, haystack, needle_re):
 
843
        """Assert that a does not match a regular expression"""
 
844
        if re.search(needle_re, haystack):
 
845
            raise AssertionError('pattern "%s" found in "%s"'
 
846
                    % (needle_re, haystack))
 
847
 
 
848
    def assertSubset(self, sublist, superlist):
 
849
        """Assert that every entry in sublist is present in superlist."""
 
850
        missing = []
 
851
        for entry in sublist:
 
852
            if entry not in superlist:
 
853
                missing.append(entry)
 
854
        if len(missing) > 0:
 
855
            raise AssertionError("value(s) %r not present in container %r" % 
 
856
                                 (missing, superlist))
 
857
 
 
858
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
859
        """Fail unless excClass is raised when the iterator from func is used.
 
860
        
 
861
        Many functions can return generators this makes sure
 
862
        to wrap them in a list() call to make sure the whole generator
 
863
        is run, and that the proper exception is raised.
 
864
        """
 
865
        try:
 
866
            list(func(*args, **kwargs))
 
867
        except excClass:
 
868
            return
 
869
        else:
 
870
            if getattr(excClass,'__name__', None) is not None:
 
871
                excName = excClass.__name__
 
872
            else:
 
873
                excName = str(excClass)
 
874
            raise self.failureException, "%s not raised" % excName
 
875
 
 
876
    def assertRaises(self, excClass, func, *args, **kwargs):
 
877
        """Assert that a callable raises a particular exception.
 
878
 
 
879
        :param excClass: As for the except statement, this may be either an
 
880
        exception class, or a tuple of classes.
 
881
 
 
882
        Returns the exception so that you can examine it.
 
883
        """
 
884
        try:
 
885
            func(*args, **kwargs)
 
886
        except excClass, e:
 
887
            return e
 
888
        else:
 
889
            if getattr(excClass,'__name__', None) is not None:
 
890
                excName = excClass.__name__
 
891
            else:
 
892
                # probably a tuple
 
893
                excName = str(excClass)
 
894
            raise self.failureException, "%s not raised" % excName
 
895
 
 
896
    def assertIs(self, left, right, message=None):
 
897
        if not (left is right):
 
898
            if message is not None:
 
899
                raise AssertionError(message)
 
900
            else:
 
901
                raise AssertionError("%r is not %r." % (left, right))
 
902
 
 
903
    def assertIsNot(self, left, right, message=None):
 
904
        if (left is right):
 
905
            if message is not None:
 
906
                raise AssertionError(message)
 
907
            else:
 
908
                raise AssertionError("%r is %r." % (left, right))
 
909
 
 
910
    def assertTransportMode(self, transport, path, mode):
 
911
        """Fail if a path does not have mode mode.
 
912
        
 
913
        If modes are not supported on this transport, the assertion is ignored.
 
914
        """
 
915
        if not transport._can_roundtrip_unix_modebits():
 
916
            return
 
917
        path_stat = transport.stat(path)
 
918
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
919
        self.assertEqual(mode, actual_mode,
 
920
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
921
 
 
922
    def assertIsInstance(self, obj, kls):
 
923
        """Fail if obj is not an instance of kls"""
 
924
        if not isinstance(obj, kls):
 
925
            self.fail("%r is an instance of %s rather than %s" % (
 
926
                obj, obj.__class__, kls))
 
927
 
 
928
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
929
        """Invoke a test, expecting it to fail for the given reason.
 
930
 
 
931
        This is for assertions that ought to succeed, but currently fail.
 
932
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
933
        about the failure you're expecting.  If a new bug is introduced,
 
934
        AssertionError should be raised, not KnownFailure.
 
935
 
 
936
        Frequently, expectFailure should be followed by an opposite assertion.
 
937
        See example below.
 
938
 
 
939
        Intended to be used with a callable that raises AssertionError as the
 
940
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
941
 
 
942
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
943
        test succeeds.
 
944
 
 
945
        example usage::
 
946
 
 
947
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
948
                             dynamic_val)
 
949
          self.assertEqual(42, dynamic_val)
 
950
 
 
951
          This means that a dynamic_val of 54 will cause the test to raise
 
952
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
953
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
954
          than 54 or 42 will cause an AssertionError.
 
955
        """
 
956
        try:
 
957
            assertion(*args, **kwargs)
 
958
        except AssertionError:
 
959
            raise KnownFailure(reason)
 
960
        else:
 
961
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
962
 
 
963
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
964
        """A helper for callDeprecated and applyDeprecated.
 
965
 
 
966
        :param a_callable: A callable to call.
 
967
        :param args: The positional arguments for the callable
 
968
        :param kwargs: The keyword arguments for the callable
 
969
        :return: A tuple (warnings, result). result is the result of calling
 
970
            a_callable(*args, **kwargs).
 
971
        """
 
972
        local_warnings = []
 
973
        def capture_warnings(msg, cls=None, stacklevel=None):
 
974
            # we've hooked into a deprecation specific callpath,
 
975
            # only deprecations should getting sent via it.
 
976
            self.assertEqual(cls, DeprecationWarning)
 
977
            local_warnings.append(msg)
 
978
        original_warning_method = symbol_versioning.warn
 
979
        symbol_versioning.set_warning_method(capture_warnings)
 
980
        try:
 
981
            result = a_callable(*args, **kwargs)
 
982
        finally:
 
983
            symbol_versioning.set_warning_method(original_warning_method)
 
984
        return (local_warnings, result)
 
985
 
 
986
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
987
        """Call a deprecated callable without warning the user.
 
988
 
 
989
        :param deprecation_format: The deprecation format that the callable
 
990
            should have been deprecated with. This is the same type as the 
 
991
            parameter to deprecated_method/deprecated_function. If the 
 
992
            callable is not deprecated with this format, an assertion error
 
993
            will be raised.
 
994
        :param a_callable: A callable to call. This may be a bound method or
 
995
            a regular function. It will be called with *args and **kwargs.
 
996
        :param args: The positional arguments for the callable
 
997
        :param kwargs: The keyword arguments for the callable
 
998
        :return: The result of a_callable(*args, **kwargs)
 
999
        """
 
1000
        call_warnings, result = self._capture_warnings(a_callable,
 
1001
            *args, **kwargs)
 
1002
        expected_first_warning = symbol_versioning.deprecation_string(
 
1003
            a_callable, deprecation_format)
 
1004
        if len(call_warnings) == 0:
 
1005
            self.fail("No deprecation warning generated by call to %s" %
 
1006
                a_callable)
 
1007
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1008
        return result
 
1009
 
 
1010
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1011
        """Assert that a callable is deprecated in a particular way.
 
1012
 
 
1013
        This is a very precise test for unusual requirements. The 
 
1014
        applyDeprecated helper function is probably more suited for most tests
 
1015
        as it allows you to simply specify the deprecation format being used
 
1016
        and will ensure that that is issued for the function being called.
 
1017
 
 
1018
        :param expected: a list of the deprecation warnings expected, in order
 
1019
        :param callable: The callable to call
 
1020
        :param args: The positional arguments for the callable
 
1021
        :param kwargs: The keyword arguments for the callable
 
1022
        """
 
1023
        call_warnings, result = self._capture_warnings(callable,
 
1024
            *args, **kwargs)
 
1025
        self.assertEqual(expected, call_warnings)
 
1026
        return result
 
1027
 
 
1028
    def _startLogFile(self):
 
1029
        """Send bzr and test log messages to a temporary file.
 
1030
 
 
1031
        The file is removed as the test is torn down.
 
1032
        """
 
1033
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1034
        self._log_file = os.fdopen(fileno, 'w+')
 
1035
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1036
        self._log_file_name = name
 
1037
        self.addCleanup(self._finishLogFile)
 
1038
 
 
1039
    def _finishLogFile(self):
 
1040
        """Finished with the log file.
 
1041
 
 
1042
        Close the file and delete it, unless setKeepLogfile was called.
 
1043
        """
 
1044
        if self._log_file is None:
 
1045
            return
 
1046
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1047
        self._log_file.close()
 
1048
        self._log_file = None
 
1049
        if not self._keep_log_file:
 
1050
            os.remove(self._log_file_name)
 
1051
            self._log_file_name = None
 
1052
 
 
1053
    def setKeepLogfile(self):
 
1054
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1055
        self._keep_log_file = True
 
1056
 
 
1057
    def addCleanup(self, callable):
 
1058
        """Arrange to run a callable when this case is torn down.
 
1059
 
 
1060
        Callables are run in the reverse of the order they are registered, 
 
1061
        ie last-in first-out.
 
1062
        """
 
1063
        if callable in self._cleanups:
 
1064
            raise ValueError("cleanup function %r already registered on %s" 
 
1065
                    % (callable, self))
 
1066
        self._cleanups.append(callable)
 
1067
 
 
1068
    def _cleanEnvironment(self):
 
1069
        new_env = {
 
1070
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1071
            'HOME': os.getcwd(),
 
1072
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1073
            'BZR_EMAIL': None,
 
1074
            'BZREMAIL': None, # may still be present in the environment
 
1075
            'EMAIL': None,
 
1076
            'BZR_PROGRESS_BAR': None,
 
1077
            # Proxies
 
1078
            'http_proxy': None,
 
1079
            'HTTP_PROXY': None,
 
1080
            'https_proxy': None,
 
1081
            'HTTPS_PROXY': None,
 
1082
            'no_proxy': None,
 
1083
            'NO_PROXY': None,
 
1084
            'all_proxy': None,
 
1085
            'ALL_PROXY': None,
 
1086
            # Nobody cares about these ones AFAIK. So far at
 
1087
            # least. If you do (care), please update this comment
 
1088
            # -- vila 20061212
 
1089
            'ftp_proxy': None,
 
1090
            'FTP_PROXY': None,
 
1091
            'BZR_REMOTE_PATH': None,
 
1092
        }
 
1093
        self.__old_env = {}
 
1094
        self.addCleanup(self._restoreEnvironment)
 
1095
        for name, value in new_env.iteritems():
 
1096
            self._captureVar(name, value)
 
1097
 
 
1098
    def _captureVar(self, name, newvalue):
 
1099
        """Set an environment variable, and reset it when finished."""
 
1100
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1101
 
 
1102
    def _restoreEnvironment(self):
 
1103
        for name, value in self.__old_env.iteritems():
 
1104
            osutils.set_or_unset_env(name, value)
 
1105
 
 
1106
    def _restoreHooks(self):
 
1107
        for klass, hooks in self._preserved_hooks.items():
 
1108
            setattr(klass, 'hooks', hooks)
 
1109
 
 
1110
    def knownFailure(self, reason):
 
1111
        """This test has failed for some known reason."""
 
1112
        raise KnownFailure(reason)
 
1113
 
 
1114
    def run(self, result=None):
 
1115
        if result is None: result = self.defaultTestResult()
 
1116
        for feature in getattr(self, '_test_needs_features', []):
 
1117
            if not feature.available():
 
1118
                result.startTest(self)
 
1119
                if getattr(result, 'addNotSupported', None):
 
1120
                    result.addNotSupported(self, feature)
 
1121
                else:
 
1122
                    result.addSuccess(self)
 
1123
                result.stopTest(self)
 
1124
                return
 
1125
        return unittest.TestCase.run(self, result)
 
1126
 
 
1127
    def tearDown(self):
 
1128
        self._runCleanups()
 
1129
        unittest.TestCase.tearDown(self)
 
1130
 
 
1131
    def time(self, callable, *args, **kwargs):
 
1132
        """Run callable and accrue the time it takes to the benchmark time.
 
1133
        
 
1134
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1135
        this will cause lsprofile statistics to be gathered and stored in
 
1136
        self._benchcalls.
 
1137
        """
 
1138
        if self._benchtime is None:
 
1139
            self._benchtime = 0
 
1140
        start = time.time()
 
1141
        try:
 
1142
            if not self._gather_lsprof_in_benchmarks:
 
1143
                return callable(*args, **kwargs)
 
1144
            else:
 
1145
                # record this benchmark
 
1146
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1147
                stats.sort()
 
1148
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1149
                return ret
 
1150
        finally:
 
1151
            self._benchtime += time.time() - start
 
1152
 
 
1153
    def _runCleanups(self):
 
1154
        """Run registered cleanup functions. 
 
1155
 
 
1156
        This should only be called from TestCase.tearDown.
 
1157
        """
 
1158
        # TODO: Perhaps this should keep running cleanups even if 
 
1159
        # one of them fails?
 
1160
 
 
1161
        # Actually pop the cleanups from the list so tearDown running
 
1162
        # twice is safe (this happens for skipped tests).
 
1163
        while self._cleanups:
 
1164
            self._cleanups.pop()()
 
1165
 
 
1166
    def log(self, *args):
 
1167
        mutter(*args)
 
1168
 
 
1169
    def _get_log(self, keep_log_file=False):
 
1170
        """Return as a string the log for this test. If the file is still
 
1171
        on disk and keep_log_file=False, delete the log file and store the
 
1172
        content in self._log_contents."""
 
1173
        # flush the log file, to get all content
 
1174
        import bzrlib.trace
 
1175
        bzrlib.trace._trace_file.flush()
 
1176
        if self._log_contents:
 
1177
            return self._log_contents
 
1178
        if self._log_file_name is not None:
 
1179
            logfile = open(self._log_file_name)
 
1180
            try:
 
1181
                log_contents = logfile.read()
 
1182
            finally:
 
1183
                logfile.close()
 
1184
            if not keep_log_file:
 
1185
                self._log_contents = log_contents
 
1186
                try:
 
1187
                    os.remove(self._log_file_name)
 
1188
                except OSError, e:
 
1189
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1190
                        print >>sys.stderr, ('Unable to delete log file '
 
1191
                                             ' %r' % self._log_file_name)
 
1192
                    else:
 
1193
                        raise
 
1194
            return log_contents
 
1195
        else:
 
1196
            return "DELETED log file to reduce memory footprint"
 
1197
 
 
1198
    def capture(self, cmd, retcode=0):
 
1199
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
1200
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
1201
 
 
1202
    def requireFeature(self, feature):
 
1203
        """This test requires a specific feature is available.
 
1204
 
 
1205
        :raises UnavailableFeature: When feature is not available.
 
1206
        """
 
1207
        if not feature.available():
 
1208
            raise UnavailableFeature(feature)
 
1209
 
 
1210
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1211
                         working_dir=None):
 
1212
        """Invoke bzr and return (stdout, stderr).
 
1213
 
 
1214
        Useful for code that wants to check the contents of the
 
1215
        output, the way error messages are presented, etc.
 
1216
 
 
1217
        This should be the main method for tests that want to exercise the
 
1218
        overall behavior of the bzr application (rather than a unit test
 
1219
        or a functional test of the library.)
 
1220
 
 
1221
        Much of the old code runs bzr by forking a new copy of Python, but
 
1222
        that is slower, harder to debug, and generally not necessary.
 
1223
 
 
1224
        This runs bzr through the interface that catches and reports
 
1225
        errors, and with logging set to something approximating the
 
1226
        default, so that error reporting can be checked.
 
1227
 
 
1228
        :param argv: arguments to invoke bzr
 
1229
        :param retcode: expected return code, or None for don't-care.
 
1230
        :param encoding: encoding for sys.stdout and sys.stderr
 
1231
        :param stdin: A string to be used as stdin for the command.
 
1232
        :param working_dir: Change to this directory before running
 
1233
        """
 
1234
        if encoding is None:
 
1235
            encoding = bzrlib.user_encoding
 
1236
        stdout = StringIOWrapper()
 
1237
        stderr = StringIOWrapper()
 
1238
        stdout.encoding = encoding
 
1239
        stderr.encoding = encoding
 
1240
 
 
1241
        self.log('run bzr: %r', argv)
 
1242
        # FIXME: don't call into logging here
 
1243
        handler = logging.StreamHandler(stderr)
 
1244
        handler.setLevel(logging.INFO)
 
1245
        logger = logging.getLogger('')
 
1246
        logger.addHandler(handler)
 
1247
        old_ui_factory = ui.ui_factory
 
1248
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1249
 
 
1250
        cwd = None
 
1251
        if working_dir is not None:
 
1252
            cwd = osutils.getcwd()
 
1253
            os.chdir(working_dir)
 
1254
 
 
1255
        try:
 
1256
            saved_debug_flags = frozenset(debug.debug_flags)
 
1257
            debug.debug_flags.clear()
 
1258
            try:
 
1259
                result = self.apply_redirected(ui.ui_factory.stdin,
 
1260
                                               stdout, stderr,
 
1261
                                               bzrlib.commands.run_bzr_catch_errors,
 
1262
                                               argv)
 
1263
            finally:
 
1264
                debug.debug_flags.update(saved_debug_flags)
 
1265
        finally:
 
1266
            logger.removeHandler(handler)
 
1267
            ui.ui_factory = old_ui_factory
 
1268
            if cwd is not None:
 
1269
                os.chdir(cwd)
 
1270
 
 
1271
        out = stdout.getvalue()
 
1272
        err = stderr.getvalue()
 
1273
        if out:
 
1274
            self.log('output:\n%r', out)
 
1275
        if err:
 
1276
            self.log('errors:\n%r', err)
 
1277
        if retcode is not None:
 
1278
            self.assertEquals(retcode, result,
 
1279
                              message='Unexpected return code')
 
1280
        return out, err
 
1281
 
27
1282
    def run_bzr(self, *args, **kwargs):
28
 
        retcode = kwargs.get('retcode', 0)
29
 
        self.assertEquals(bzrlib.commands.run_bzr(args), retcode)
30
 
        
31
 
 
32
 
def selftest(verbose=False):
33
 
    from unittest import TestLoader, TestSuite
34
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
35
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
36
 
    from doctest import DocTestSuite
37
 
    import os
38
 
    import shutil
39
 
    import time
40
 
    import sys
41
 
    import unittest
42
 
 
43
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
44
 
 
45
 
    testmod_names = \
46
 
                  ['bzrlib.selftest.whitebox',
47
 
                   'bzrlib.selftest.versioning',
48
 
                   'bzrlib.selftest.testinv',
49
 
                   'bzrlib.selftest.testmerge3',
50
 
                   'bzrlib.selftest.testhashcache',
51
 
                   'bzrlib.selftest.teststatus',
52
 
                   'bzrlib.selftest.testlog',
53
 
                   'bzrlib.selftest.blackbox',
54
 
                   'bzrlib.selftest.testrevisionnamespaces',
55
 
                   'bzrlib.selftest.testbranch',
56
 
                   'bzrlib.selftest.testrevision',
57
 
                   'bzrlib.merge_core',
58
 
                   'bzrlib.selftest.testdiff',
 
1283
        """Invoke bzr, as if it were run from the command line.
 
1284
 
 
1285
        This should be the main method for tests that want to exercise the
 
1286
        overall behavior of the bzr application (rather than a unit test
 
1287
        or a functional test of the library.)
 
1288
 
 
1289
        This sends the stdout/stderr results into the test's log,
 
1290
        where it may be useful for debugging.  See also run_captured.
 
1291
 
 
1292
        :param stdin: A string to be used as stdin for the command.
 
1293
        :param retcode: The status code the command should return
 
1294
        :param working_dir: The directory to run the command in
 
1295
        """
 
1296
        retcode = kwargs.pop('retcode', 0)
 
1297
        encoding = kwargs.pop('encoding', None)
 
1298
        stdin = kwargs.pop('stdin', None)
 
1299
        working_dir = kwargs.pop('working_dir', None)
 
1300
        error_regexes = kwargs.pop('error_regexes', [])
 
1301
 
 
1302
        out, err = self.run_bzr_captured(args, retcode=retcode,
 
1303
            encoding=encoding, stdin=stdin, working_dir=working_dir)
 
1304
 
 
1305
        for regex in error_regexes:
 
1306
            self.assertContainsRe(err, regex)
 
1307
        return out, err
 
1308
 
 
1309
 
 
1310
    def run_bzr_decode(self, *args, **kwargs):
 
1311
        if 'encoding' in kwargs:
 
1312
            encoding = kwargs['encoding']
 
1313
        else:
 
1314
            encoding = bzrlib.user_encoding
 
1315
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1316
 
 
1317
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1318
        """Run bzr, and check that stderr contains the supplied regexes
 
1319
        
 
1320
        :param error_regexes: Sequence of regular expressions which 
 
1321
            must each be found in the error output. The relative ordering
 
1322
            is not enforced.
 
1323
        :param args: command-line arguments for bzr
 
1324
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1325
            This function changes the default value of retcode to be 3,
 
1326
            since in most cases this is run when you expect bzr to fail.
 
1327
        :return: (out, err) The actual output of running the command (in case you
 
1328
                 want to do more inspection)
 
1329
 
 
1330
        Examples of use:
 
1331
            # Make sure that commit is failing because there is nothing to do
 
1332
            self.run_bzr_error(['no changes to commit'],
 
1333
                               'commit', '-m', 'my commit comment')
 
1334
            # Make sure --strict is handling an unknown file, rather than
 
1335
            # giving us the 'nothing to do' error
 
1336
            self.build_tree(['unknown'])
 
1337
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1338
                               'commit', '--strict', '-m', 'my commit comment')
 
1339
        """
 
1340
        kwargs.setdefault('retcode', 3)
 
1341
        out, err = self.run_bzr(error_regexes=error_regexes, *args, **kwargs)
 
1342
        return out, err
 
1343
 
 
1344
    def run_bzr_subprocess(self, *args, **kwargs):
 
1345
        """Run bzr in a subprocess for testing.
 
1346
 
 
1347
        This starts a new Python interpreter and runs bzr in there. 
 
1348
        This should only be used for tests that have a justifiable need for
 
1349
        this isolation: e.g. they are testing startup time, or signal
 
1350
        handling, or early startup code, etc.  Subprocess code can't be 
 
1351
        profiled or debugged so easily.
 
1352
 
 
1353
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1354
            None is supplied, the status code is not checked.
 
1355
        :param env_changes: A dictionary which lists changes to environment
 
1356
            variables. A value of None will unset the env variable.
 
1357
            The values must be strings. The change will only occur in the
 
1358
            child, so you don't need to fix the environment after running.
 
1359
        :param universal_newlines: Convert CRLF => LF
 
1360
        :param allow_plugins: By default the subprocess is run with
 
1361
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1362
            for system-wide plugins to create unexpected output on stderr,
 
1363
            which can cause unnecessary test failures.
 
1364
        """
 
1365
        env_changes = kwargs.get('env_changes', {})
 
1366
        working_dir = kwargs.get('working_dir', None)
 
1367
        allow_plugins = kwargs.get('allow_plugins', False)
 
1368
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1369
                                            working_dir=working_dir,
 
1370
                                            allow_plugins=allow_plugins)
 
1371
        # We distinguish between retcode=None and retcode not passed.
 
1372
        supplied_retcode = kwargs.get('retcode', 0)
 
1373
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1374
            universal_newlines=kwargs.get('universal_newlines', False),
 
1375
            process_args=args)
 
1376
 
 
1377
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1378
                             skip_if_plan_to_signal=False,
 
1379
                             working_dir=None,
 
1380
                             allow_plugins=False):
 
1381
        """Start bzr in a subprocess for testing.
 
1382
 
 
1383
        This starts a new Python interpreter and runs bzr in there.
 
1384
        This should only be used for tests that have a justifiable need for
 
1385
        this isolation: e.g. they are testing startup time, or signal
 
1386
        handling, or early startup code, etc.  Subprocess code can't be
 
1387
        profiled or debugged so easily.
 
1388
 
 
1389
        :param process_args: a list of arguments to pass to the bzr executable,
 
1390
            for example `['--version']`.
 
1391
        :param env_changes: A dictionary which lists changes to environment
 
1392
            variables. A value of None will unset the env variable.
 
1393
            The values must be strings. The change will only occur in the
 
1394
            child, so you don't need to fix the environment after running.
 
1395
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1396
            is not available.
 
1397
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1398
 
 
1399
        :returns: Popen object for the started process.
 
1400
        """
 
1401
        if skip_if_plan_to_signal:
 
1402
            if not getattr(os, 'kill', None):
 
1403
                raise TestSkipped("os.kill not available.")
 
1404
 
 
1405
        if env_changes is None:
 
1406
            env_changes = {}
 
1407
        old_env = {}
 
1408
 
 
1409
        def cleanup_environment():
 
1410
            for env_var, value in env_changes.iteritems():
 
1411
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1412
 
 
1413
        def restore_environment():
 
1414
            for env_var, value in old_env.iteritems():
 
1415
                osutils.set_or_unset_env(env_var, value)
 
1416
 
 
1417
        bzr_path = self.get_bzr_path()
 
1418
 
 
1419
        cwd = None
 
1420
        if working_dir is not None:
 
1421
            cwd = osutils.getcwd()
 
1422
            os.chdir(working_dir)
 
1423
 
 
1424
        try:
 
1425
            # win32 subprocess doesn't support preexec_fn
 
1426
            # so we will avoid using it on all platforms, just to
 
1427
            # make sure the code path is used, and we don't break on win32
 
1428
            cleanup_environment()
 
1429
            command = [sys.executable, bzr_path]
 
1430
            if not allow_plugins:
 
1431
                command.append('--no-plugins')
 
1432
            command.extend(process_args)
 
1433
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1434
        finally:
 
1435
            restore_environment()
 
1436
            if cwd is not None:
 
1437
                os.chdir(cwd)
 
1438
 
 
1439
        return process
 
1440
 
 
1441
    def _popen(self, *args, **kwargs):
 
1442
        """Place a call to Popen.
 
1443
 
 
1444
        Allows tests to override this method to intercept the calls made to
 
1445
        Popen for introspection.
 
1446
        """
 
1447
        return Popen(*args, **kwargs)
 
1448
 
 
1449
    def get_bzr_path(self):
 
1450
        """Return the path of the 'bzr' executable for this test suite."""
 
1451
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1452
        if not os.path.isfile(bzr_path):
 
1453
            # We are probably installed. Assume sys.argv is the right file
 
1454
            bzr_path = sys.argv[0]
 
1455
        return bzr_path
 
1456
 
 
1457
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1458
                              universal_newlines=False, process_args=None):
 
1459
        """Finish the execution of process.
 
1460
 
 
1461
        :param process: the Popen object returned from start_bzr_subprocess.
 
1462
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1463
            None is supplied, the status code is not checked.
 
1464
        :param send_signal: an optional signal to send to the process.
 
1465
        :param universal_newlines: Convert CRLF => LF
 
1466
        :returns: (stdout, stderr)
 
1467
        """
 
1468
        if send_signal is not None:
 
1469
            os.kill(process.pid, send_signal)
 
1470
        out, err = process.communicate()
 
1471
 
 
1472
        if universal_newlines:
 
1473
            out = out.replace('\r\n', '\n')
 
1474
            err = err.replace('\r\n', '\n')
 
1475
 
 
1476
        if retcode is not None and retcode != process.returncode:
 
1477
            if process_args is None:
 
1478
                process_args = "(unknown args)"
 
1479
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1480
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1481
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1482
                      % (process_args, retcode, process.returncode))
 
1483
        return [out, err]
 
1484
 
 
1485
    def check_inventory_shape(self, inv, shape):
 
1486
        """Compare an inventory to a list of expected names.
 
1487
 
 
1488
        Fail if they are not precisely equal.
 
1489
        """
 
1490
        extras = []
 
1491
        shape = list(shape)             # copy
 
1492
        for path, ie in inv.entries():
 
1493
            name = path.replace('\\', '/')
 
1494
            if ie.kind == 'dir':
 
1495
                name = name + '/'
 
1496
            if name in shape:
 
1497
                shape.remove(name)
 
1498
            else:
 
1499
                extras.append(name)
 
1500
        if shape:
 
1501
            self.fail("expected paths not found in inventory: %r" % shape)
 
1502
        if extras:
 
1503
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1504
 
 
1505
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1506
                         a_callable=None, *args, **kwargs):
 
1507
        """Call callable with redirected std io pipes.
 
1508
 
 
1509
        Returns the return code."""
 
1510
        if not callable(a_callable):
 
1511
            raise ValueError("a_callable must be callable.")
 
1512
        if stdin is None:
 
1513
            stdin = StringIO("")
 
1514
        if stdout is None:
 
1515
            if getattr(self, "_log_file", None) is not None:
 
1516
                stdout = self._log_file
 
1517
            else:
 
1518
                stdout = StringIO()
 
1519
        if stderr is None:
 
1520
            if getattr(self, "_log_file", None is not None):
 
1521
                stderr = self._log_file
 
1522
            else:
 
1523
                stderr = StringIO()
 
1524
        real_stdin = sys.stdin
 
1525
        real_stdout = sys.stdout
 
1526
        real_stderr = sys.stderr
 
1527
        try:
 
1528
            sys.stdout = stdout
 
1529
            sys.stderr = stderr
 
1530
            sys.stdin = stdin
 
1531
            return a_callable(*args, **kwargs)
 
1532
        finally:
 
1533
            sys.stdout = real_stdout
 
1534
            sys.stderr = real_stderr
 
1535
            sys.stdin = real_stdin
 
1536
 
 
1537
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1538
    def merge(self, branch_from, wt_to):
 
1539
        """A helper for tests to do a ui-less merge.
 
1540
 
 
1541
        This should move to the main library when someone has time to integrate
 
1542
        it in.
 
1543
        """
 
1544
        # minimal ui-less merge.
 
1545
        wt_to.branch.fetch(branch_from)
 
1546
        base_rev = common_ancestor(branch_from.last_revision(),
 
1547
                                   wt_to.branch.last_revision(),
 
1548
                                   wt_to.branch.repository)
 
1549
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1550
                    wt_to.branch.repository.revision_tree(base_rev),
 
1551
                    this_tree=wt_to)
 
1552
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1553
 
 
1554
    def reduceLockdirTimeout(self):
 
1555
        """Reduce the default lock timeout for the duration of the test, so that
 
1556
        if LockContention occurs during a test, it does so quickly.
 
1557
 
 
1558
        Tests that expect to provoke LockContention errors should call this.
 
1559
        """
 
1560
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1561
        def resetTimeout():
 
1562
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1563
        self.addCleanup(resetTimeout)
 
1564
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1565
 
 
1566
BzrTestBase = TestCase
 
1567
 
 
1568
 
 
1569
class TestCaseWithMemoryTransport(TestCase):
 
1570
    """Common test class for tests that do not need disk resources.
 
1571
 
 
1572
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1573
 
 
1574
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1575
 
 
1576
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1577
    a directory which does not exist. This serves to help ensure test isolation
 
1578
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1579
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1580
    file defaults for the transport in tests, nor does it obey the command line
 
1581
    override, so tests that accidentally write to the common directory should
 
1582
    be rare.
 
1583
 
 
1584
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1585
    a .bzr directory that stops us ascending higher into the filesystem.
 
1586
    """
 
1587
 
 
1588
    TEST_ROOT = None
 
1589
    _TEST_NAME = 'test'
 
1590
 
 
1591
    def __init__(self, methodName='runTest'):
 
1592
        # allow test parameterisation after test construction and before test
 
1593
        # execution. Variables that the parameteriser sets need to be 
 
1594
        # ones that are not set by setUp, or setUp will trash them.
 
1595
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1596
        self.vfs_transport_factory = default_transport
 
1597
        self.transport_server = None
 
1598
        self.transport_readonly_server = None
 
1599
        self.__vfs_server = None
 
1600
 
 
1601
    def get_transport(self):
 
1602
        """Return a writeable transport for the test scratch space"""
 
1603
        t = get_transport(self.get_url())
 
1604
        self.assertFalse(t.is_readonly())
 
1605
        return t
 
1606
 
 
1607
    def get_readonly_transport(self):
 
1608
        """Return a readonly transport for the test scratch space
 
1609
        
 
1610
        This can be used to test that operations which should only need
 
1611
        readonly access in fact do not try to write.
 
1612
        """
 
1613
        t = get_transport(self.get_readonly_url())
 
1614
        self.assertTrue(t.is_readonly())
 
1615
        return t
 
1616
 
 
1617
    def create_transport_readonly_server(self):
 
1618
        """Create a transport server from class defined at init.
 
1619
 
 
1620
        This is mostly a hook for daughter classes.
 
1621
        """
 
1622
        return self.transport_readonly_server()
 
1623
 
 
1624
    def get_readonly_server(self):
 
1625
        """Get the server instance for the readonly transport
 
1626
 
 
1627
        This is useful for some tests with specific servers to do diagnostics.
 
1628
        """
 
1629
        if self.__readonly_server is None:
 
1630
            if self.transport_readonly_server is None:
 
1631
                # readonly decorator requested
 
1632
                # bring up the server
 
1633
                self.__readonly_server = ReadonlyServer()
 
1634
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1635
            else:
 
1636
                self.__readonly_server = self.create_transport_readonly_server()
 
1637
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1638
            self.addCleanup(self.__readonly_server.tearDown)
 
1639
        return self.__readonly_server
 
1640
 
 
1641
    def get_readonly_url(self, relpath=None):
 
1642
        """Get a URL for the readonly transport.
 
1643
 
 
1644
        This will either be backed by '.' or a decorator to the transport 
 
1645
        used by self.get_url()
 
1646
        relpath provides for clients to get a path relative to the base url.
 
1647
        These should only be downwards relative, not upwards.
 
1648
        """
 
1649
        base = self.get_readonly_server().get_url()
 
1650
        if relpath is not None:
 
1651
            if not base.endswith('/'):
 
1652
                base = base + '/'
 
1653
            base = base + relpath
 
1654
        return base
 
1655
 
 
1656
    def get_vfs_only_server(self):
 
1657
        """Get the vfs only read/write server instance.
 
1658
 
 
1659
        This is useful for some tests with specific servers that need
 
1660
        diagnostics.
 
1661
 
 
1662
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1663
        is no means to override it.
 
1664
        """
 
1665
        if self.__vfs_server is None:
 
1666
            self.__vfs_server = MemoryServer()
 
1667
            self.__vfs_server.setUp()
 
1668
            self.addCleanup(self.__vfs_server.tearDown)
 
1669
        return self.__vfs_server
 
1670
 
 
1671
    def get_server(self):
 
1672
        """Get the read/write server instance.
 
1673
 
 
1674
        This is useful for some tests with specific servers that need
 
1675
        diagnostics.
 
1676
 
 
1677
        This is built from the self.transport_server factory. If that is None,
 
1678
        then the self.get_vfs_server is returned.
 
1679
        """
 
1680
        if self.__server is None:
 
1681
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1682
                return self.get_vfs_only_server()
 
1683
            else:
 
1684
                # bring up a decorated means of access to the vfs only server.
 
1685
                self.__server = self.transport_server()
 
1686
                try:
 
1687
                    self.__server.setUp(self.get_vfs_only_server())
 
1688
                except TypeError, e:
 
1689
                    # This should never happen; the try:Except here is to assist
 
1690
                    # developers having to update code rather than seeing an
 
1691
                    # uninformative TypeError.
 
1692
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1693
            self.addCleanup(self.__server.tearDown)
 
1694
        return self.__server
 
1695
 
 
1696
    def _adjust_url(self, base, relpath):
 
1697
        """Get a URL (or maybe a path) for the readwrite transport.
 
1698
 
 
1699
        This will either be backed by '.' or to an equivalent non-file based
 
1700
        facility.
 
1701
        relpath provides for clients to get a path relative to the base url.
 
1702
        These should only be downwards relative, not upwards.
 
1703
        """
 
1704
        if relpath is not None and relpath != '.':
 
1705
            if not base.endswith('/'):
 
1706
                base = base + '/'
 
1707
            # XXX: Really base should be a url; we did after all call
 
1708
            # get_url()!  But sometimes it's just a path (from
 
1709
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1710
            # to a non-escaped local path.
 
1711
            if base.startswith('./') or base.startswith('/'):
 
1712
                base += relpath
 
1713
            else:
 
1714
                base += urlutils.escape(relpath)
 
1715
        return base
 
1716
 
 
1717
    def get_url(self, relpath=None):
 
1718
        """Get a URL (or maybe a path) for the readwrite transport.
 
1719
 
 
1720
        This will either be backed by '.' or to an equivalent non-file based
 
1721
        facility.
 
1722
        relpath provides for clients to get a path relative to the base url.
 
1723
        These should only be downwards relative, not upwards.
 
1724
        """
 
1725
        base = self.get_server().get_url()
 
1726
        return self._adjust_url(base, relpath)
 
1727
 
 
1728
    def get_vfs_only_url(self, relpath=None):
 
1729
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1730
 
 
1731
        This will never be a smart protocol.  It always has all the
 
1732
        capabilities of the local filesystem, but it might actually be a
 
1733
        MemoryTransport or some other similar virtual filesystem.
 
1734
 
 
1735
        This is the backing transport (if any) of the server returned by 
 
1736
        get_url and get_readonly_url.
 
1737
 
 
1738
        :param relpath: provides for clients to get a path relative to the base
 
1739
            url.  These should only be downwards relative, not upwards.
 
1740
        """
 
1741
        base = self.get_vfs_only_server().get_url()
 
1742
        return self._adjust_url(base, relpath)
 
1743
 
 
1744
    def _make_test_root(self):
 
1745
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1746
            return
 
1747
        root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1748
        TestCaseWithMemoryTransport.TEST_ROOT = root
 
1749
        
 
1750
        # make a fake bzr directory there to prevent any tests propagating
 
1751
        # up onto the source directory's real branch
 
1752
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1753
 
 
1754
        # The same directory is used by all tests, and we're not specifically
 
1755
        # told when all tests are finished.  This will do.
 
1756
        atexit.register(_rmtree_temp_dir, root)
 
1757
 
 
1758
    def makeAndChdirToTestDir(self):
 
1759
        """Create a temporary directories for this one test.
 
1760
        
 
1761
        This must set self.test_home_dir and self.test_dir and chdir to
 
1762
        self.test_dir.
 
1763
        
 
1764
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1765
        """
 
1766
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1767
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1768
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1769
        
 
1770
    def make_branch(self, relpath, format=None):
 
1771
        """Create a branch on the transport at relpath."""
 
1772
        repo = self.make_repository(relpath, format=format)
 
1773
        return repo.bzrdir.create_branch()
 
1774
 
 
1775
    def make_bzrdir(self, relpath, format=None):
 
1776
        try:
 
1777
            # might be a relative or absolute path
 
1778
            maybe_a_url = self.get_url(relpath)
 
1779
            segments = maybe_a_url.rsplit('/', 1)
 
1780
            t = get_transport(maybe_a_url)
 
1781
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1782
                t.ensure_base()
 
1783
            if format is None:
 
1784
                format = 'default'
 
1785
            if isinstance(format, basestring):
 
1786
                format = bzrdir.format_registry.make_bzrdir(format)
 
1787
            return format.initialize_on_transport(t)
 
1788
        except errors.UninitializableFormat:
 
1789
            raise TestSkipped("Format %s is not initializable." % format)
 
1790
 
 
1791
    def make_repository(self, relpath, shared=False, format=None):
 
1792
        """Create a repository on our default transport at relpath.
 
1793
        
 
1794
        Note that relpath must be a relative path, not a full url.
 
1795
        """
 
1796
        # FIXME: If you create a remoterepository this returns the underlying
 
1797
        # real format, which is incorrect.  Actually we should make sure that 
 
1798
        # RemoteBzrDir returns a RemoteRepository.
 
1799
        # maybe  mbp 20070410
 
1800
        made_control = self.make_bzrdir(relpath, format=format)
 
1801
        return made_control.create_repository(shared=shared)
 
1802
 
 
1803
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1804
        """Create a branch on the default transport and a MemoryTree for it."""
 
1805
        b = self.make_branch(relpath, format=format)
 
1806
        return memorytree.MemoryTree.create_on_branch(b)
 
1807
 
 
1808
    def overrideEnvironmentForTesting(self):
 
1809
        os.environ['HOME'] = self.test_home_dir
 
1810
        os.environ['BZR_HOME'] = self.test_home_dir
 
1811
        
 
1812
    def setUp(self):
 
1813
        super(TestCaseWithMemoryTransport, self).setUp()
 
1814
        self._make_test_root()
 
1815
        _currentdir = os.getcwdu()
 
1816
        def _leaveDirectory():
 
1817
            os.chdir(_currentdir)
 
1818
        self.addCleanup(_leaveDirectory)
 
1819
        self.makeAndChdirToTestDir()
 
1820
        self.overrideEnvironmentForTesting()
 
1821
        self.__readonly_server = None
 
1822
        self.__server = None
 
1823
        self.reduceLockdirTimeout()
 
1824
 
 
1825
     
 
1826
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1827
    """Derived class that runs a test within a temporary directory.
 
1828
 
 
1829
    This is useful for tests that need to create a branch, etc.
 
1830
 
 
1831
    The directory is created in a slightly complex way: for each
 
1832
    Python invocation, a new temporary top-level directory is created.
 
1833
    All test cases create their own directory within that.  If the
 
1834
    tests complete successfully, the directory is removed.
 
1835
 
 
1836
    :ivar test_base_dir: The path of the top-level directory for this 
 
1837
    test, which contains a home directory and a work directory.
 
1838
 
 
1839
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
1840
    which is used as $HOME for this test.
 
1841
 
 
1842
    :ivar test_dir: A directory under test_base_dir used as the current
 
1843
    directory when the test proper is run.
 
1844
    """
 
1845
 
 
1846
    OVERRIDE_PYTHON = 'python'
 
1847
    use_numbered_dirs = False
 
1848
 
 
1849
    def check_file_contents(self, filename, expect):
 
1850
        self.log("check contents of file %s" % filename)
 
1851
        contents = file(filename, 'r').read()
 
1852
        if contents != expect:
 
1853
            self.log("expected: %r" % expect)
 
1854
            self.log("actually: %r" % contents)
 
1855
            self.fail("contents of %s not as expected" % filename)
 
1856
 
 
1857
    def makeAndChdirToTestDir(self):
 
1858
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1859
        
 
1860
        For TestCaseInTempDir we create a temporary directory based on the test
 
1861
        name and then create two subdirs - test and home under it.
 
1862
        """
 
1863
        # create a directory within the top level test directory
 
1864
        candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
 
1865
        # now create test and home directories within this dir
 
1866
        self.test_base_dir = candidate_dir
 
1867
        self.test_home_dir = self.test_base_dir + '/home'
 
1868
        os.mkdir(self.test_home_dir)
 
1869
        self.test_dir = self.test_base_dir + '/work'
 
1870
        os.mkdir(self.test_dir)
 
1871
        os.chdir(self.test_dir)
 
1872
        # put name of test inside
 
1873
        f = file(self.test_base_dir + '/name', 'w')
 
1874
        try:
 
1875
            f.write(self.id())
 
1876
        finally:
 
1877
            f.close()
 
1878
        self.addCleanup(self.deleteTestDir)
 
1879
 
 
1880
    def deleteTestDir(self):
 
1881
        _rmtree_temp_dir(self.test_base_dir)
 
1882
 
 
1883
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1884
        """Build a test tree according to a pattern.
 
1885
 
 
1886
        shape is a sequence of file specifications.  If the final
 
1887
        character is '/', a directory is created.
 
1888
 
 
1889
        This assumes that all the elements in the tree being built are new.
 
1890
 
 
1891
        This doesn't add anything to a branch.
 
1892
        :param line_endings: Either 'binary' or 'native'
 
1893
                             in binary mode, exact contents are written
 
1894
                             in native mode, the line endings match the
 
1895
                             default platform endings.
 
1896
 
 
1897
        :param transport: A transport to write to, for building trees on 
 
1898
                          VFS's. If the transport is readonly or None,
 
1899
                          "." is opened automatically.
 
1900
        """
 
1901
        # It's OK to just create them using forward slashes on windows.
 
1902
        if transport is None or transport.is_readonly():
 
1903
            transport = get_transport(".")
 
1904
        for name in shape:
 
1905
            self.assert_(isinstance(name, basestring))
 
1906
            if name[-1] == '/':
 
1907
                transport.mkdir(urlutils.escape(name[:-1]))
 
1908
            else:
 
1909
                if line_endings == 'binary':
 
1910
                    end = '\n'
 
1911
                elif line_endings == 'native':
 
1912
                    end = os.linesep
 
1913
                else:
 
1914
                    raise errors.BzrError(
 
1915
                        'Invalid line ending request %r' % line_endings)
 
1916
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1917
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1918
 
 
1919
    def build_tree_contents(self, shape):
 
1920
        build_tree_contents(shape)
 
1921
 
 
1922
    def assertFileEqual(self, content, path):
 
1923
        """Fail if path does not contain 'content'."""
 
1924
        self.failUnlessExists(path)
 
1925
        f = file(path, 'rb')
 
1926
        try:
 
1927
            s = f.read()
 
1928
        finally:
 
1929
            f.close()
 
1930
        self.assertEqualDiff(content, s)
 
1931
 
 
1932
    def failUnlessExists(self, path):
 
1933
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1934
        if not isinstance(path, basestring):
 
1935
            for p in path:
 
1936
                self.failUnlessExists(p)
 
1937
        else:
 
1938
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1939
 
 
1940
    def failIfExists(self, path):
 
1941
        """Fail if path or paths, which may be abs or relative, exist."""
 
1942
        if not isinstance(path, basestring):
 
1943
            for p in path:
 
1944
                self.failIfExists(p)
 
1945
        else:
 
1946
            self.failIf(osutils.lexists(path),path+" exists")
 
1947
 
 
1948
    def assertInWorkingTree(self,path,root_path='.',tree=None):
 
1949
        """Assert whether path or paths are in the WorkingTree"""
 
1950
        if tree is None:
 
1951
            tree = workingtree.WorkingTree.open(root_path)
 
1952
        if not isinstance(path, basestring):
 
1953
            for p in path:
 
1954
                self.assertInWorkingTree(p,tree=tree)
 
1955
        else:
 
1956
            self.assertIsNot(tree.path2id(path), None,
 
1957
                path+' not in working tree.')
 
1958
 
 
1959
    def assertNotInWorkingTree(self,path,root_path='.',tree=None):
 
1960
        """Assert whether path or paths are not in the WorkingTree"""
 
1961
        if tree is None:
 
1962
            tree = workingtree.WorkingTree.open(root_path)
 
1963
        if not isinstance(path, basestring):
 
1964
            for p in path:
 
1965
                self.assertNotInWorkingTree(p,tree=tree)
 
1966
        else:
 
1967
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
1968
 
 
1969
 
 
1970
class TestCaseWithTransport(TestCaseInTempDir):
 
1971
    """A test case that provides get_url and get_readonly_url facilities.
 
1972
 
 
1973
    These back onto two transport servers, one for readonly access and one for
 
1974
    read write access.
 
1975
 
 
1976
    If no explicit class is provided for readonly access, a
 
1977
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1978
    based read write transports.
 
1979
 
 
1980
    If an explicit class is provided for readonly access, that server and the 
 
1981
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1982
    """
 
1983
 
 
1984
    def get_vfs_only_server(self):
 
1985
        """See TestCaseWithMemoryTransport.
 
1986
 
 
1987
        This is useful for some tests with specific servers that need
 
1988
        diagnostics.
 
1989
        """
 
1990
        if self.__vfs_server is None:
 
1991
            self.__vfs_server = self.vfs_transport_factory()
 
1992
            self.__vfs_server.setUp()
 
1993
            self.addCleanup(self.__vfs_server.tearDown)
 
1994
        return self.__vfs_server
 
1995
 
 
1996
    def make_branch_and_tree(self, relpath, format=None):
 
1997
        """Create a branch on the transport and a tree locally.
 
1998
 
 
1999
        If the transport is not a LocalTransport, the Tree can't be created on
 
2000
        the transport.  In that case if the vfs_transport_factory is
 
2001
        LocalURLServer the working tree is created in the local
 
2002
        directory backing the transport, and the returned tree's branch and
 
2003
        repository will also be accessed locally. Otherwise a lightweight
 
2004
        checkout is created and returned.
 
2005
 
 
2006
        :param format: The BzrDirFormat.
 
2007
        :returns: the WorkingTree.
 
2008
        """
 
2009
        # TODO: always use the local disk path for the working tree,
 
2010
        # this obviously requires a format that supports branch references
 
2011
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2012
        # RBC 20060208
 
2013
        b = self.make_branch(relpath, format=format)
 
2014
        try:
 
2015
            return b.bzrdir.create_workingtree()
 
2016
        except errors.NotLocalUrl:
 
2017
            # We can only make working trees locally at the moment.  If the
 
2018
            # transport can't support them, then we keep the non-disk-backed
 
2019
            # branch and create a local checkout.
 
2020
            if self.vfs_transport_factory is LocalURLServer:
 
2021
                # the branch is colocated on disk, we cannot create a checkout.
 
2022
                # hopefully callers will expect this.
 
2023
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2024
                return local_controldir.create_workingtree()
 
2025
            else:
 
2026
                return b.create_checkout(relpath, lightweight=True)
 
2027
 
 
2028
    def assertIsDirectory(self, relpath, transport):
 
2029
        """Assert that relpath within transport is a directory.
 
2030
 
 
2031
        This may not be possible on all transports; in that case it propagates
 
2032
        a TransportNotPossible.
 
2033
        """
 
2034
        try:
 
2035
            mode = transport.stat(relpath).st_mode
 
2036
        except errors.NoSuchFile:
 
2037
            self.fail("path %s is not a directory; no such file"
 
2038
                      % (relpath))
 
2039
        if not stat.S_ISDIR(mode):
 
2040
            self.fail("path %s is not a directory; has mode %#o"
 
2041
                      % (relpath, mode))
 
2042
 
 
2043
    def assertTreesEqual(self, left, right):
 
2044
        """Check that left and right have the same content and properties."""
 
2045
        # we use a tree delta to check for equality of the content, and we
 
2046
        # manually check for equality of other things such as the parents list.
 
2047
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2048
        differences = left.changes_from(right)
 
2049
        self.assertFalse(differences.has_changed(),
 
2050
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2051
 
 
2052
    def setUp(self):
 
2053
        super(TestCaseWithTransport, self).setUp()
 
2054
        self.__vfs_server = None
 
2055
 
 
2056
 
 
2057
class ChrootedTestCase(TestCaseWithTransport):
 
2058
    """A support class that provides readonly urls outside the local namespace.
 
2059
 
 
2060
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2061
    is then we are chrooted already, if it is not then an HttpServer is used
 
2062
    for readonly urls.
 
2063
 
 
2064
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2065
                       be used without needed to redo it when a different 
 
2066
                       subclass is in use ?
 
2067
    """
 
2068
 
 
2069
    def setUp(self):
 
2070
        super(ChrootedTestCase, self).setUp()
 
2071
        if not self.vfs_transport_factory == MemoryServer:
 
2072
            self.transport_readonly_server = HttpServer
 
2073
 
 
2074
 
 
2075
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
 
2076
                       random_order=False):
 
2077
    """Create a test suite by filtering another one.
 
2078
    
 
2079
    :param suite:           the source suite
 
2080
    :param pattern:         pattern that names must match
 
2081
    :param exclude_pattern: pattern that names must not match, if any
 
2082
    :param random_order:    if True, tests in the new suite will be put in
 
2083
                            random order
 
2084
    :returns: the newly created suite
 
2085
    """ 
 
2086
    return sort_suite_by_re(suite, pattern, exclude_pattern,
 
2087
        random_order, False)
 
2088
 
 
2089
 
 
2090
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
2091
                     random_order=False, append_rest=True):
 
2092
    """Create a test suite by sorting another one.
 
2093
    
 
2094
    :param suite:           the source suite
 
2095
    :param pattern:         pattern that names must match in order to go
 
2096
                            first in the new suite
 
2097
    :param exclude_pattern: pattern that names must not match, if any
 
2098
    :param random_order:    if True, tests in the new suite will be put in
 
2099
                            random order
 
2100
    :param append_rest:     if False, pattern is a strict filter and not
 
2101
                            just an ordering directive
 
2102
    :returns: the newly created suite
 
2103
    """ 
 
2104
    first = []
 
2105
    second = []
 
2106
    filter_re = re.compile(pattern)
 
2107
    if exclude_pattern is not None:
 
2108
        exclude_re = re.compile(exclude_pattern)
 
2109
    for test in iter_suite_tests(suite):
 
2110
        test_id = test.id()
 
2111
        if exclude_pattern is None or not exclude_re.search(test_id):
 
2112
            if filter_re.search(test_id):
 
2113
                first.append(test)
 
2114
            elif append_rest:
 
2115
                second.append(test)
 
2116
    if random_order:
 
2117
        random.shuffle(first)
 
2118
        random.shuffle(second)
 
2119
    return TestUtil.TestSuite(first + second)
 
2120
 
 
2121
 
 
2122
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2123
              stop_on_failure=False,
 
2124
              transport=None, lsprof_timed=None, bench_history=None,
 
2125
              matching_tests_first=None,
 
2126
              numbered_dirs=None,
 
2127
              list_only=False,
 
2128
              random_seed=None,
 
2129
              exclude_pattern=None,
 
2130
              ):
 
2131
    use_numbered_dirs = bool(numbered_dirs)
 
2132
 
 
2133
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2134
    if numbered_dirs is not None:
 
2135
        TestCaseInTempDir.use_numbered_dirs = use_numbered_dirs
 
2136
    if verbose:
 
2137
        verbosity = 2
 
2138
    else:
 
2139
        verbosity = 1
 
2140
    runner = TextTestRunner(stream=sys.stdout,
 
2141
                            descriptions=0,
 
2142
                            verbosity=verbosity,
 
2143
                            bench_history=bench_history,
 
2144
                            use_numbered_dirs=use_numbered_dirs,
 
2145
                            list_only=list_only,
 
2146
                            )
 
2147
    runner.stop_on_failure=stop_on_failure
 
2148
    # Initialise the random number generator and display the seed used.
 
2149
    # We convert the seed to a long to make it reuseable across invocations.
 
2150
    random_order = False
 
2151
    if random_seed is not None:
 
2152
        random_order = True
 
2153
        if random_seed == "now":
 
2154
            random_seed = long(time.time())
 
2155
        else:
 
2156
            # Convert the seed to a long if we can
 
2157
            try:
 
2158
                random_seed = long(random_seed)
 
2159
            except:
 
2160
                pass
 
2161
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2162
            (random_seed))
 
2163
        random.seed(random_seed)
 
2164
    # Customise the list of tests if requested
 
2165
    if pattern != '.*' or exclude_pattern is not None or random_order:
 
2166
        if matching_tests_first:
 
2167
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
 
2168
                random_order)
 
2169
        else:
 
2170
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
 
2171
                random_order)
 
2172
    result = runner.run(suite)
 
2173
    return result.wasSuccessful()
 
2174
 
 
2175
 
 
2176
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2177
             transport=None,
 
2178
             test_suite_factory=None,
 
2179
             lsprof_timed=None,
 
2180
             bench_history=None,
 
2181
             matching_tests_first=None,
 
2182
             numbered_dirs=None,
 
2183
             list_only=False,
 
2184
             random_seed=None,
 
2185
             exclude_pattern=None):
 
2186
    """Run the whole test suite under the enhanced runner"""
 
2187
    # XXX: Very ugly way to do this...
 
2188
    # Disable warning about old formats because we don't want it to disturb
 
2189
    # any blackbox tests.
 
2190
    from bzrlib import repository
 
2191
    repository._deprecation_warning_done = True
 
2192
 
 
2193
    global default_transport
 
2194
    if transport is None:
 
2195
        transport = default_transport
 
2196
    old_transport = default_transport
 
2197
    default_transport = transport
 
2198
    try:
 
2199
        if test_suite_factory is None:
 
2200
            suite = test_suite()
 
2201
        else:
 
2202
            suite = test_suite_factory()
 
2203
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2204
                     stop_on_failure=stop_on_failure,
 
2205
                     transport=transport,
 
2206
                     lsprof_timed=lsprof_timed,
 
2207
                     bench_history=bench_history,
 
2208
                     matching_tests_first=matching_tests_first,
 
2209
                     numbered_dirs=numbered_dirs,
 
2210
                     list_only=list_only,
 
2211
                     random_seed=random_seed,
 
2212
                     exclude_pattern=exclude_pattern)
 
2213
    finally:
 
2214
        default_transport = old_transport
 
2215
 
 
2216
 
 
2217
def test_suite():
 
2218
    """Build and return TestSuite for the whole of bzrlib.
 
2219
    
 
2220
    This function can be replaced if you need to change the default test
 
2221
    suite on a global basis, but it is not encouraged.
 
2222
    """
 
2223
    testmod_names = [
 
2224
                   'bzrlib.tests.test_ancestry',
 
2225
                   'bzrlib.tests.test_annotate',
 
2226
                   'bzrlib.tests.test_api',
 
2227
                   'bzrlib.tests.test_atomicfile',
 
2228
                   'bzrlib.tests.test_bad_files',
 
2229
                   'bzrlib.tests.test_branch',
 
2230
                   'bzrlib.tests.test_branchbuilder',
 
2231
                   'bzrlib.tests.test_bugtracker',
 
2232
                   'bzrlib.tests.test_bundle',
 
2233
                   'bzrlib.tests.test_bzrdir',
 
2234
                   'bzrlib.tests.test_cache_utf8',
 
2235
                   'bzrlib.tests.test_commands',
 
2236
                   'bzrlib.tests.test_commit',
 
2237
                   'bzrlib.tests.test_commit_merge',
 
2238
                   'bzrlib.tests.test_config',
 
2239
                   'bzrlib.tests.test_conflicts',
 
2240
                   'bzrlib.tests.test_counted_lock',
 
2241
                   'bzrlib.tests.test_decorators',
 
2242
                   'bzrlib.tests.test_delta',
 
2243
                   'bzrlib.tests.test_deprecated_graph',
 
2244
                   'bzrlib.tests.test_diff',
 
2245
                   'bzrlib.tests.test_dirstate',
 
2246
                   'bzrlib.tests.test_errors',
 
2247
                   'bzrlib.tests.test_escaped_store',
 
2248
                   'bzrlib.tests.test_extract',
 
2249
                   'bzrlib.tests.test_fetch',
 
2250
                   'bzrlib.tests.test_ftp_transport',
 
2251
                   'bzrlib.tests.test_generate_docs',
 
2252
                   'bzrlib.tests.test_generate_ids',
 
2253
                   'bzrlib.tests.test_globbing',
 
2254
                   'bzrlib.tests.test_gpg',
 
2255
                   'bzrlib.tests.test_graph',
 
2256
                   'bzrlib.tests.test_hashcache',
 
2257
                   'bzrlib.tests.test_help',
 
2258
                   'bzrlib.tests.test_http',
 
2259
                   'bzrlib.tests.test_http_response',
 
2260
                   'bzrlib.tests.test_https_ca_bundle',
 
2261
                   'bzrlib.tests.test_identitymap',
 
2262
                   'bzrlib.tests.test_ignores',
 
2263
                   'bzrlib.tests.test_info',
 
2264
                   'bzrlib.tests.test_inv',
 
2265
                   'bzrlib.tests.test_knit',
 
2266
                   'bzrlib.tests.test_lazy_import',
 
2267
                   'bzrlib.tests.test_lazy_regex',
 
2268
                   'bzrlib.tests.test_lockdir',
 
2269
                   'bzrlib.tests.test_lockable_files',
 
2270
                   'bzrlib.tests.test_log',
 
2271
                   'bzrlib.tests.test_lsprof',
 
2272
                   'bzrlib.tests.test_memorytree',
 
2273
                   'bzrlib.tests.test_merge',
 
2274
                   'bzrlib.tests.test_merge3',
 
2275
                   'bzrlib.tests.test_merge_core',
 
2276
                   'bzrlib.tests.test_merge_directive',
 
2277
                   'bzrlib.tests.test_missing',
 
2278
                   'bzrlib.tests.test_msgeditor',
 
2279
                   'bzrlib.tests.test_nonascii',
 
2280
                   'bzrlib.tests.test_options',
 
2281
                   'bzrlib.tests.test_osutils',
 
2282
                   'bzrlib.tests.test_osutils_encodings',
 
2283
                   'bzrlib.tests.test_patch',
 
2284
                   'bzrlib.tests.test_patches',
 
2285
                   'bzrlib.tests.test_permissions',
 
2286
                   'bzrlib.tests.test_plugins',
 
2287
                   'bzrlib.tests.test_progress',
 
2288
                   'bzrlib.tests.test_reconcile',
 
2289
                   'bzrlib.tests.test_registry',
 
2290
                   'bzrlib.tests.test_remote',
 
2291
                   'bzrlib.tests.test_repository',
 
2292
                   'bzrlib.tests.test_revert',
 
2293
                   'bzrlib.tests.test_revision',
 
2294
                   'bzrlib.tests.test_revisionnamespaces',
 
2295
                   'bzrlib.tests.test_revisiontree',
 
2296
                   'bzrlib.tests.test_rio',
 
2297
                   'bzrlib.tests.test_sampler',
 
2298
                   'bzrlib.tests.test_selftest',
 
2299
                   'bzrlib.tests.test_setup',
 
2300
                   'bzrlib.tests.test_sftp_transport',
 
2301
                   'bzrlib.tests.test_smart',
 
2302
                   'bzrlib.tests.test_smart_add',
 
2303
                   'bzrlib.tests.test_smart_transport',
 
2304
                   'bzrlib.tests.test_source',
 
2305
                   'bzrlib.tests.test_ssh_transport',
 
2306
                   'bzrlib.tests.test_status',
 
2307
                   'bzrlib.tests.test_store',
 
2308
                   'bzrlib.tests.test_strace',
 
2309
                   'bzrlib.tests.test_subsume',
 
2310
                   'bzrlib.tests.test_symbol_versioning',
 
2311
                   'bzrlib.tests.test_tag',
 
2312
                   'bzrlib.tests.test_testament',
 
2313
                   'bzrlib.tests.test_textfile',
 
2314
                   'bzrlib.tests.test_textmerge',
 
2315
                   'bzrlib.tests.test_timestamp',
 
2316
                   'bzrlib.tests.test_trace',
 
2317
                   'bzrlib.tests.test_transactions',
 
2318
                   'bzrlib.tests.test_transform',
 
2319
                   'bzrlib.tests.test_transport',
 
2320
                   'bzrlib.tests.test_tree',
 
2321
                   'bzrlib.tests.test_treebuilder',
 
2322
                   'bzrlib.tests.test_tsort',
 
2323
                   'bzrlib.tests.test_tuned_gzip',
 
2324
                   'bzrlib.tests.test_ui',
 
2325
                   'bzrlib.tests.test_upgrade',
 
2326
                   'bzrlib.tests.test_urlutils',
 
2327
                   'bzrlib.tests.test_versionedfile',
 
2328
                   'bzrlib.tests.test_version',
 
2329
                   'bzrlib.tests.test_version_info',
 
2330
                   'bzrlib.tests.test_weave',
 
2331
                   'bzrlib.tests.test_whitebox',
 
2332
                   'bzrlib.tests.test_workingtree',
 
2333
                   'bzrlib.tests.test_workingtree_4',
 
2334
                   'bzrlib.tests.test_wsgi',
 
2335
                   'bzrlib.tests.test_xml',
59
2336
                   ]
60
 
 
61
 
    # XXX: should also test bzrlib.merge_core, but they seem to be out
62
 
    # of date with the code.
63
 
 
64
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
65
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
66
 
        if m not in MODULES_TO_DOCTEST:
67
 
            MODULES_TO_DOCTEST.append(m)
68
 
 
69
 
    
70
 
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
71
 
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
72
 
 
73
 
    print
74
 
 
75
 
    suite = TestSuite()
76
 
 
77
 
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
78
 
 
 
2337
    test_transport_implementations = [
 
2338
        'bzrlib.tests.test_transport_implementations',
 
2339
        'bzrlib.tests.test_read_bundle',
 
2340
        ]
 
2341
    suite = TestUtil.TestSuite()
 
2342
    loader = TestUtil.TestLoader()
 
2343
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2344
    from bzrlib.transport import TransportTestProviderAdapter
 
2345
    adapter = TransportTestProviderAdapter()
 
2346
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2347
    for package in packages_to_test():
 
2348
        suite.addTest(package.test_suite())
79
2349
    for m in MODULES_TO_TEST:
80
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
81
 
 
82
 
    for m in (MODULES_TO_DOCTEST):
83
 
        suite.addTest(DocTestSuite(m))
84
 
 
85
 
    for p in bzrlib.plugin.all_plugins:
86
 
        if hasattr(p, 'test_suite'):
87
 
            suite.addTest(p.test_suite())
88
 
 
89
 
    import bzrlib.merge_core
90
 
    suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
91
 
 
92
 
    return run_suite(suite, 'testbzr', verbose=verbose)
93
 
 
94
 
 
95
 
 
 
2350
        suite.addTest(loader.loadTestsFromModule(m))
 
2351
    for m in MODULES_TO_DOCTEST:
 
2352
        try:
 
2353
            suite.addTest(doctest.DocTestSuite(m))
 
2354
        except ValueError, e:
 
2355
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2356
            raise
 
2357
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
2358
        if getattr(plugin, 'test_suite', None) is not None:
 
2359
            default_encoding = sys.getdefaultencoding()
 
2360
            try:
 
2361
                plugin_suite = plugin.test_suite()
 
2362
            except ImportError, e:
 
2363
                bzrlib.trace.warning(
 
2364
                    'Unable to test plugin "%s": %s', name, e)
 
2365
            else:
 
2366
                suite.addTest(plugin_suite)
 
2367
            if default_encoding != sys.getdefaultencoding():
 
2368
                bzrlib.trace.warning(
 
2369
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2370
                    sys.getdefaultencoding())
 
2371
                reload(sys)
 
2372
                sys.setdefaultencoding(default_encoding)
 
2373
    return suite
 
2374
 
 
2375
 
 
2376
def adapt_modules(mods_list, adapter, loader, suite):
 
2377
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2378
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2379
        suite.addTests(adapter.adapt(test))
 
2380
 
 
2381
 
 
2382
def _rmtree_temp_dir(dirname):
 
2383
    # If LANG=C we probably have created some bogus paths
 
2384
    # which rmtree(unicode) will fail to delete
 
2385
    # so make sure we are using rmtree(str) to delete everything
 
2386
    # except on win32, where rmtree(str) will fail
 
2387
    # since it doesn't have the property of byte-stream paths
 
2388
    # (they are either ascii or mbcs)
 
2389
    if sys.platform == 'win32':
 
2390
        # make sure we are using the unicode win32 api
 
2391
        dirname = unicode(dirname)
 
2392
    else:
 
2393
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2394
    try:
 
2395
        osutils.rmtree(dirname)
 
2396
    except OSError, e:
 
2397
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2398
            print >>sys.stderr, ('Permission denied: '
 
2399
                                 'unable to remove testing dir '
 
2400
                                 '%s' % os.path.basename(dirname))
 
2401
        else:
 
2402
            raise
 
2403
 
 
2404
 
 
2405
def clean_selftest_output(root=None, quiet=False):
 
2406
    """Remove all selftest output directories from root directory.
 
2407
 
 
2408
    :param  root:   root directory for clean
 
2409
                    (if ommitted or None then clean current directory).
 
2410
    :param  quiet:  suppress report about deleting directories
 
2411
    """
 
2412
    import re
 
2413
    re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
 
2414
    if root is None:
 
2415
        root = u'.'
 
2416
    for i in os.listdir(root):
 
2417
        if os.path.isdir(i) and re_dir.match(i):
 
2418
            if not quiet:
 
2419
                print 'delete directory:', i
 
2420
            _rmtree_temp_dir(i)
 
2421
 
 
2422
 
 
2423
class Feature(object):
 
2424
    """An operating system Feature."""
 
2425
 
 
2426
    def __init__(self):
 
2427
        self._available = None
 
2428
 
 
2429
    def available(self):
 
2430
        """Is the feature available?
 
2431
 
 
2432
        :return: True if the feature is available.
 
2433
        """
 
2434
        if self._available is None:
 
2435
            self._available = self._probe()
 
2436
        return self._available
 
2437
 
 
2438
    def _probe(self):
 
2439
        """Implement this method in concrete features.
 
2440
 
 
2441
        :return: True if the feature is available.
 
2442
        """
 
2443
        raise NotImplementedError
 
2444
 
 
2445
    def __str__(self):
 
2446
        if getattr(self, 'feature_name', None):
 
2447
            return self.feature_name()
 
2448
        return self.__class__.__name__