/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Wouter van Heyst
  • Date: 2007-07-21 15:32:37 UTC
  • mto: This revision was merged to the branch mainline in revision 2645.
  • Revision ID: larstiq@larstiq.dyndns.org-20070721153237-rh7v5kz6rh3r2mza
As Aaron explained #127115 is more general, failing whenever other's basis is an ancestor of this' basis.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
 
43
import sys
 
44
import tempfile
 
45
import unittest
 
46
import time
 
47
import warnings
 
48
 
 
49
 
 
50
from bzrlib import (
 
51
    bzrdir,
 
52
    debug,
 
53
    errors,
 
54
    memorytree,
 
55
    osutils,
 
56
    progress,
 
57
    ui,
 
58
    urlutils,
 
59
    workingtree,
 
60
    )
 
61
import bzrlib.branch
 
62
import bzrlib.commands
 
63
import bzrlib.timestamp
 
64
import bzrlib.export
 
65
import bzrlib.inventory
 
66
import bzrlib.iterablefile
 
67
import bzrlib.lockdir
 
68
try:
 
69
    import bzrlib.lsprof
 
70
except ImportError:
 
71
    # lsprof not available
 
72
    pass
 
73
from bzrlib.merge import merge_inner
 
74
import bzrlib.merge3
 
75
import bzrlib.osutils
 
76
import bzrlib.plugin
 
77
from bzrlib.revision import common_ancestor
 
78
import bzrlib.store
 
79
from bzrlib import symbol_versioning
 
80
from bzrlib.symbol_versioning import (
 
81
    deprecated_method,
 
82
    zero_eighteen,
 
83
    )
 
84
import bzrlib.trace
 
85
from bzrlib.transport import get_transport
 
86
import bzrlib.transport
 
87
from bzrlib.transport.local import LocalURLServer
 
88
from bzrlib.transport.memory import MemoryServer
 
89
from bzrlib.transport.readonly import ReadonlyServer
 
90
from bzrlib.trace import mutter, note
 
91
from bzrlib.tests import TestUtil
 
92
from bzrlib.tests.HttpServer import HttpServer
 
93
from bzrlib.tests.TestUtil import (
 
94
                          TestSuite,
 
95
                          TestLoader,
 
96
                          )
 
97
from bzrlib.tests.treeshape import build_tree_contents
 
98
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
99
 
 
100
# Mark this python module as being part of the implementation
 
101
# of unittest: this gives us better tracebacks where the last
 
102
# shown frame is the test code, not our assertXYZ.
 
103
__unittest = 1
 
104
 
 
105
default_transport = LocalURLServer
 
106
 
 
107
MODULES_TO_TEST = []
 
108
MODULES_TO_DOCTEST = [
 
109
                      bzrlib.timestamp,
 
110
                      bzrlib.errors,
 
111
                      bzrlib.export,
 
112
                      bzrlib.inventory,
 
113
                      bzrlib.iterablefile,
 
114
                      bzrlib.lockdir,
 
115
                      bzrlib.merge3,
 
116
                      bzrlib.option,
 
117
                      bzrlib.store,
 
118
                      ]
 
119
 
 
120
 
 
121
def packages_to_test():
 
122
    """Return a list of packages to test.
 
123
 
 
124
    The packages are not globally imported so that import failures are
 
125
    triggered when running selftest, not when importing the command.
 
126
    """
 
127
    import bzrlib.doc
 
128
    import bzrlib.tests.blackbox
 
129
    import bzrlib.tests.branch_implementations
 
130
    import bzrlib.tests.bzrdir_implementations
 
131
    import bzrlib.tests.interrepository_implementations
 
132
    import bzrlib.tests.interversionedfile_implementations
 
133
    import bzrlib.tests.intertree_implementations
 
134
    import bzrlib.tests.per_lock
 
135
    import bzrlib.tests.repository_implementations
 
136
    import bzrlib.tests.revisionstore_implementations
 
137
    import bzrlib.tests.tree_implementations
 
138
    import bzrlib.tests.workingtree_implementations
 
139
    return [
 
140
            bzrlib.doc,
 
141
            bzrlib.tests.blackbox,
 
142
            bzrlib.tests.branch_implementations,
 
143
            bzrlib.tests.bzrdir_implementations,
 
144
            bzrlib.tests.interrepository_implementations,
 
145
            bzrlib.tests.interversionedfile_implementations,
 
146
            bzrlib.tests.intertree_implementations,
 
147
            bzrlib.tests.per_lock,
 
148
            bzrlib.tests.repository_implementations,
 
149
            bzrlib.tests.revisionstore_implementations,
 
150
            bzrlib.tests.tree_implementations,
 
151
            bzrlib.tests.workingtree_implementations,
 
152
            ]
 
153
 
 
154
 
 
155
class ExtendedTestResult(unittest._TextTestResult):
 
156
    """Accepts, reports and accumulates the results of running tests.
 
157
 
 
158
    Compared to this unittest version this class adds support for profiling,
 
159
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
160
    There are further-specialized subclasses for different types of display.
 
161
    """
 
162
 
 
163
    stop_early = False
 
164
    
 
165
    def __init__(self, stream, descriptions, verbosity,
 
166
                 bench_history=None,
 
167
                 num_tests=None,
 
168
                 ):
 
169
        """Construct new TestResult.
 
170
 
 
171
        :param bench_history: Optionally, a writable file object to accumulate
 
172
            benchmark results.
 
173
        """
 
174
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
175
        if bench_history is not None:
 
176
            from bzrlib.version import _get_bzr_source_tree
 
177
            src_tree = _get_bzr_source_tree()
 
178
            if src_tree:
 
179
                try:
 
180
                    revision_id = src_tree.get_parent_ids()[0]
 
181
                except IndexError:
 
182
                    # XXX: if this is a brand new tree, do the same as if there
 
183
                    # is no branch.
 
184
                    revision_id = ''
 
185
            else:
 
186
                # XXX: If there's no branch, what should we do?
 
187
                revision_id = ''
 
188
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
189
        self._bench_history = bench_history
 
190
        self.ui = ui.ui_factory
 
191
        self.num_tests = num_tests
 
192
        self.error_count = 0
 
193
        self.failure_count = 0
 
194
        self.known_failure_count = 0
 
195
        self.skip_count = 0
 
196
        self.unsupported = {}
 
197
        self.count = 0
 
198
        self._overall_start_time = time.time()
 
199
    
 
200
    def extractBenchmarkTime(self, testCase):
 
201
        """Add a benchmark time for the current test case."""
 
202
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
203
    
 
204
    def _elapsedTestTimeString(self):
 
205
        """Return a time string for the overall time the current test has taken."""
 
206
        return self._formatTime(time.time() - self._start_time)
 
207
 
 
208
    def _testTimeString(self):
 
209
        if self._benchmarkTime is not None:
 
210
            return "%s/%s" % (
 
211
                self._formatTime(self._benchmarkTime),
 
212
                self._elapsedTestTimeString())
 
213
        else:
 
214
            return "           %s" % self._elapsedTestTimeString()
 
215
 
 
216
    def _formatTime(self, seconds):
 
217
        """Format seconds as milliseconds with leading spaces."""
 
218
        # some benchmarks can take thousands of seconds to run, so we need 8
 
219
        # places
 
220
        return "%8dms" % (1000 * seconds)
 
221
 
 
222
    def _shortened_test_description(self, test):
 
223
        what = test.id()
 
224
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
225
        return what
 
226
 
 
227
    def startTest(self, test):
 
228
        unittest.TestResult.startTest(self, test)
 
229
        self.report_test_start(test)
 
230
        test.number = self.count
 
231
        self._recordTestStartTime()
 
232
 
 
233
    def _recordTestStartTime(self):
 
234
        """Record that a test has started."""
 
235
        self._start_time = time.time()
 
236
 
 
237
    def _cleanupLogFile(self, test):
 
238
        # We can only do this if we have one of our TestCases, not if
 
239
        # we have a doctest.
 
240
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
241
        if setKeepLogfile is not None:
 
242
            setKeepLogfile()
 
243
 
 
244
    def addError(self, test, err):
 
245
        self.extractBenchmarkTime(test)
 
246
        self._cleanupLogFile(test)
 
247
        if isinstance(err[1], TestSkipped):
 
248
            return self.addSkipped(test, err)
 
249
        elif isinstance(err[1], UnavailableFeature):
 
250
            return self.addNotSupported(test, err[1].args[0])
 
251
        unittest.TestResult.addError(self, test, err)
 
252
        self.error_count += 1
 
253
        self.report_error(test, err)
 
254
        if self.stop_early:
 
255
            self.stop()
 
256
 
 
257
    def addFailure(self, test, err):
 
258
        self._cleanupLogFile(test)
 
259
        self.extractBenchmarkTime(test)
 
260
        if isinstance(err[1], KnownFailure):
 
261
            return self.addKnownFailure(test, err)
 
262
        unittest.TestResult.addFailure(self, test, err)
 
263
        self.failure_count += 1
 
264
        self.report_failure(test, err)
 
265
        if self.stop_early:
 
266
            self.stop()
 
267
 
 
268
    def addKnownFailure(self, test, err):
 
269
        self.known_failure_count += 1
 
270
        self.report_known_failure(test, err)
 
271
 
 
272
    def addNotSupported(self, test, feature):
 
273
        self.unsupported.setdefault(str(feature), 0)
 
274
        self.unsupported[str(feature)] += 1
 
275
        self.report_unsupported(test, feature)
 
276
 
 
277
    def addSuccess(self, test):
 
278
        self.extractBenchmarkTime(test)
 
279
        if self._bench_history is not None:
 
280
            if self._benchmarkTime is not None:
 
281
                self._bench_history.write("%s %s\n" % (
 
282
                    self._formatTime(self._benchmarkTime),
 
283
                    test.id()))
 
284
        self.report_success(test)
 
285
        unittest.TestResult.addSuccess(self, test)
 
286
 
 
287
    def addSkipped(self, test, skip_excinfo):
 
288
        self.report_skip(test, skip_excinfo)
 
289
        # seems best to treat this as success from point-of-view of unittest
 
290
        # -- it actually does nothing so it barely matters :)
 
291
        try:
 
292
            test.tearDown()
 
293
        except KeyboardInterrupt:
 
294
            raise
 
295
        except:
 
296
            self.addError(test, test.__exc_info())
 
297
        else:
 
298
            unittest.TestResult.addSuccess(self, test)
 
299
 
 
300
    def printErrorList(self, flavour, errors):
 
301
        for test, err in errors:
 
302
            self.stream.writeln(self.separator1)
 
303
            self.stream.write("%s: " % flavour)
 
304
            self.stream.writeln(self.getDescription(test))
 
305
            if getattr(test, '_get_log', None) is not None:
 
306
                print >>self.stream
 
307
                print >>self.stream, \
 
308
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
309
                print >>self.stream, test._get_log()
 
310
                print >>self.stream, \
 
311
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
312
            self.stream.writeln(self.separator2)
 
313
            self.stream.writeln("%s" % err)
 
314
 
 
315
    def finished(self):
 
316
        pass
 
317
 
 
318
    def report_cleaning_up(self):
 
319
        pass
 
320
 
 
321
    def report_success(self, test):
 
322
        pass
 
323
 
 
324
 
 
325
class TextTestResult(ExtendedTestResult):
 
326
    """Displays progress and results of tests in text form"""
 
327
 
 
328
    def __init__(self, stream, descriptions, verbosity,
 
329
                 bench_history=None,
 
330
                 num_tests=None,
 
331
                 pb=None,
 
332
                 ):
 
333
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
334
            bench_history, num_tests)
 
335
        if pb is None:
 
336
            self.pb = self.ui.nested_progress_bar()
 
337
            self._supplied_pb = False
 
338
        else:
 
339
            self.pb = pb
 
340
            self._supplied_pb = True
 
341
        self.pb.show_pct = False
 
342
        self.pb.show_spinner = False
 
343
        self.pb.show_eta = False,
 
344
        self.pb.show_count = False
 
345
        self.pb.show_bar = False
 
346
 
 
347
    def report_starting(self):
 
348
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
349
 
 
350
    def _progress_prefix_text(self):
 
351
        a = '[%d' % self.count
 
352
        if self.num_tests is not None:
 
353
            a +='/%d' % self.num_tests
 
354
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
355
        if self.error_count:
 
356
            a += ', %d errors' % self.error_count
 
357
        if self.failure_count:
 
358
            a += ', %d failed' % self.failure_count
 
359
        if self.known_failure_count:
 
360
            a += ', %d known failures' % self.known_failure_count
 
361
        if self.skip_count:
 
362
            a += ', %d skipped' % self.skip_count
 
363
        if self.unsupported:
 
364
            a += ', %d missing features' % len(self.unsupported)
 
365
        a += ']'
 
366
        return a
 
367
 
 
368
    def report_test_start(self, test):
 
369
        self.count += 1
 
370
        self.pb.update(
 
371
                self._progress_prefix_text()
 
372
                + ' ' 
 
373
                + self._shortened_test_description(test))
 
374
 
 
375
    def _test_description(self, test):
 
376
        return self._shortened_test_description(test)
 
377
 
 
378
    def report_error(self, test, err):
 
379
        self.pb.note('ERROR: %s\n    %s\n', 
 
380
            self._test_description(test),
 
381
            err[1],
 
382
            )
 
383
 
 
384
    def report_failure(self, test, err):
 
385
        self.pb.note('FAIL: %s\n    %s\n', 
 
386
            self._test_description(test),
 
387
            err[1],
 
388
            )
 
389
 
 
390
    def report_known_failure(self, test, err):
 
391
        self.pb.note('XFAIL: %s\n%s\n',
 
392
            self._test_description(test), err[1])
 
393
 
 
394
    def report_skip(self, test, skip_excinfo):
 
395
        self.skip_count += 1
 
396
        if False:
 
397
            # at the moment these are mostly not things we can fix
 
398
            # and so they just produce stipple; use the verbose reporter
 
399
            # to see them.
 
400
            if False:
 
401
                # show test and reason for skip
 
402
                self.pb.note('SKIP: %s\n    %s\n', 
 
403
                    self._shortened_test_description(test),
 
404
                    skip_excinfo[1])
 
405
            else:
 
406
                # since the class name was left behind in the still-visible
 
407
                # progress bar...
 
408
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
409
 
 
410
    def report_unsupported(self, test, feature):
 
411
        """test cannot be run because feature is missing."""
 
412
                  
 
413
    def report_cleaning_up(self):
 
414
        self.pb.update('cleaning up...')
 
415
 
 
416
    def finished(self):
 
417
        if not self._supplied_pb:
 
418
            self.pb.finished()
 
419
 
 
420
 
 
421
class VerboseTestResult(ExtendedTestResult):
 
422
    """Produce long output, with one line per test run plus times"""
 
423
 
 
424
    def _ellipsize_to_right(self, a_string, final_width):
 
425
        """Truncate and pad a string, keeping the right hand side"""
 
426
        if len(a_string) > final_width:
 
427
            result = '...' + a_string[3-final_width:]
 
428
        else:
 
429
            result = a_string
 
430
        return result.ljust(final_width)
 
431
 
 
432
    def report_starting(self):
 
433
        self.stream.write('running %d tests...\n' % self.num_tests)
 
434
 
 
435
    def report_test_start(self, test):
 
436
        self.count += 1
 
437
        name = self._shortened_test_description(test)
 
438
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
439
        # numbers, plus a trailing blank
 
440
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
441
        self.stream.write(self._ellipsize_to_right(name,
 
442
                          osutils.terminal_width()-30))
 
443
        self.stream.flush()
 
444
 
 
445
    def _error_summary(self, err):
 
446
        indent = ' ' * 4
 
447
        return '%s%s' % (indent, err[1])
 
448
 
 
449
    def report_error(self, test, err):
 
450
        self.stream.writeln('ERROR %s\n%s'
 
451
                % (self._testTimeString(),
 
452
                   self._error_summary(err)))
 
453
 
 
454
    def report_failure(self, test, err):
 
455
        self.stream.writeln(' FAIL %s\n%s'
 
456
                % (self._testTimeString(),
 
457
                   self._error_summary(err)))
 
458
 
 
459
    def report_known_failure(self, test, err):
 
460
        self.stream.writeln('XFAIL %s\n%s'
 
461
                % (self._testTimeString(),
 
462
                   self._error_summary(err)))
 
463
 
 
464
    def report_success(self, test):
 
465
        self.stream.writeln('   OK %s' % self._testTimeString())
 
466
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
467
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
468
            stats.pprint(file=self.stream)
 
469
        # flush the stream so that we get smooth output. This verbose mode is
 
470
        # used to show the output in PQM.
 
471
        self.stream.flush()
 
472
 
 
473
    def report_skip(self, test, skip_excinfo):
 
474
        self.skip_count += 1
 
475
        self.stream.writeln(' SKIP %s\n%s'
 
476
                % (self._testTimeString(),
 
477
                   self._error_summary(skip_excinfo)))
 
478
 
 
479
    def report_unsupported(self, test, feature):
 
480
        """test cannot be run because feature is missing."""
 
481
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
482
                %(self._testTimeString(), feature))
 
483
                  
 
484
 
 
485
 
 
486
class TextTestRunner(object):
 
487
    stop_on_failure = False
 
488
 
 
489
    def __init__(self,
 
490
                 stream=sys.stderr,
 
491
                 descriptions=0,
 
492
                 verbosity=1,
 
493
                 bench_history=None,
 
494
                 list_only=False
 
495
                 ):
 
496
        self.stream = unittest._WritelnDecorator(stream)
 
497
        self.descriptions = descriptions
 
498
        self.verbosity = verbosity
 
499
        self._bench_history = bench_history
 
500
        self.list_only = list_only
 
501
 
 
502
    def run(self, test):
 
503
        "Run the given test case or test suite."
 
504
        startTime = time.time()
 
505
        if self.verbosity == 1:
 
506
            result_class = TextTestResult
 
507
        elif self.verbosity >= 2:
 
508
            result_class = VerboseTestResult
 
509
        result = result_class(self.stream,
 
510
                              self.descriptions,
 
511
                              self.verbosity,
 
512
                              bench_history=self._bench_history,
 
513
                              num_tests=test.countTestCases(),
 
514
                              )
 
515
        result.stop_early = self.stop_on_failure
 
516
        result.report_starting()
 
517
        if self.list_only:
 
518
            if self.verbosity >= 2:
 
519
                self.stream.writeln("Listing tests only ...\n")
 
520
            run = 0
 
521
            for t in iter_suite_tests(test):
 
522
                self.stream.writeln("%s" % (t.id()))
 
523
                run += 1
 
524
            actionTaken = "Listed"
 
525
        else: 
 
526
            test.run(result)
 
527
            run = result.testsRun
 
528
            actionTaken = "Ran"
 
529
        stopTime = time.time()
 
530
        timeTaken = stopTime - startTime
 
531
        result.printErrors()
 
532
        self.stream.writeln(result.separator2)
 
533
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
534
                            run, run != 1 and "s" or "", timeTaken))
 
535
        self.stream.writeln()
 
536
        if not result.wasSuccessful():
 
537
            self.stream.write("FAILED (")
 
538
            failed, errored = map(len, (result.failures, result.errors))
 
539
            if failed:
 
540
                self.stream.write("failures=%d" % failed)
 
541
            if errored:
 
542
                if failed: self.stream.write(", ")
 
543
                self.stream.write("errors=%d" % errored)
 
544
            if result.known_failure_count:
 
545
                if failed or errored: self.stream.write(", ")
 
546
                self.stream.write("known_failure_count=%d" %
 
547
                    result.known_failure_count)
 
548
            self.stream.writeln(")")
 
549
        else:
 
550
            if result.known_failure_count:
 
551
                self.stream.writeln("OK (known_failures=%d)" %
 
552
                    result.known_failure_count)
 
553
            else:
 
554
                self.stream.writeln("OK")
 
555
        if result.skip_count > 0:
 
556
            skipped = result.skip_count
 
557
            self.stream.writeln('%d test%s skipped' %
 
558
                                (skipped, skipped != 1 and "s" or ""))
 
559
        if result.unsupported:
 
560
            for feature, count in sorted(result.unsupported.items()):
 
561
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
562
                    (feature, count))
 
563
        result.finished()
 
564
        return result
 
565
 
 
566
 
 
567
def iter_suite_tests(suite):
 
568
    """Return all tests in a suite, recursing through nested suites"""
 
569
    for item in suite._tests:
 
570
        if isinstance(item, unittest.TestCase):
 
571
            yield item
 
572
        elif isinstance(item, unittest.TestSuite):
 
573
            for r in iter_suite_tests(item):
 
574
                yield r
 
575
        else:
 
576
            raise Exception('unknown object %r inside test suite %r'
 
577
                            % (item, suite))
 
578
 
 
579
 
 
580
class TestSkipped(Exception):
 
581
    """Indicates that a test was intentionally skipped, rather than failing."""
 
582
 
 
583
 
 
584
class KnownFailure(AssertionError):
 
585
    """Indicates that a test failed in a precisely expected manner.
 
586
 
 
587
    Such failures dont block the whole test suite from passing because they are
 
588
    indicators of partially completed code or of future work. We have an
 
589
    explicit error for them so that we can ensure that they are always visible:
 
590
    KnownFailures are always shown in the output of bzr selftest.
 
591
    """
 
592
 
 
593
 
 
594
class UnavailableFeature(Exception):
 
595
    """A feature required for this test was not available.
 
596
 
 
597
    The feature should be used to construct the exception.
 
598
    """
 
599
 
 
600
 
 
601
class CommandFailed(Exception):
 
602
    pass
 
603
 
 
604
 
 
605
class StringIOWrapper(object):
 
606
    """A wrapper around cStringIO which just adds an encoding attribute.
 
607
    
 
608
    Internally we can check sys.stdout to see what the output encoding
 
609
    should be. However, cStringIO has no encoding attribute that we can
 
610
    set. So we wrap it instead.
 
611
    """
 
612
    encoding='ascii'
 
613
    _cstring = None
 
614
 
 
615
    def __init__(self, s=None):
 
616
        if s is not None:
 
617
            self.__dict__['_cstring'] = StringIO(s)
 
618
        else:
 
619
            self.__dict__['_cstring'] = StringIO()
 
620
 
 
621
    def __getattr__(self, name, getattr=getattr):
 
622
        return getattr(self.__dict__['_cstring'], name)
 
623
 
 
624
    def __setattr__(self, name, val):
 
625
        if name == 'encoding':
 
626
            self.__dict__['encoding'] = val
 
627
        else:
 
628
            return setattr(self._cstring, name, val)
 
629
 
 
630
 
 
631
class TestUIFactory(ui.CLIUIFactory):
 
632
    """A UI Factory for testing.
 
633
 
 
634
    Hide the progress bar but emit note()s.
 
635
    Redirect stdin.
 
636
    Allows get_password to be tested without real tty attached.
 
637
    """
 
638
 
 
639
    def __init__(self,
 
640
                 stdout=None,
 
641
                 stderr=None,
 
642
                 stdin=None):
 
643
        super(TestUIFactory, self).__init__()
 
644
        if stdin is not None:
 
645
            # We use a StringIOWrapper to be able to test various
 
646
            # encodings, but the user is still responsible to
 
647
            # encode the string and to set the encoding attribute
 
648
            # of StringIOWrapper.
 
649
            self.stdin = StringIOWrapper(stdin)
 
650
        if stdout is None:
 
651
            self.stdout = sys.stdout
 
652
        else:
 
653
            self.stdout = stdout
 
654
        if stderr is None:
 
655
            self.stderr = sys.stderr
 
656
        else:
 
657
            self.stderr = stderr
 
658
 
 
659
    def clear(self):
 
660
        """See progress.ProgressBar.clear()."""
 
661
 
 
662
    def clear_term(self):
 
663
        """See progress.ProgressBar.clear_term()."""
 
664
 
 
665
    def clear_term(self):
 
666
        """See progress.ProgressBar.clear_term()."""
 
667
 
 
668
    def finished(self):
 
669
        """See progress.ProgressBar.finished()."""
 
670
 
 
671
    def note(self, fmt_string, *args, **kwargs):
 
672
        """See progress.ProgressBar.note()."""
 
673
        self.stdout.write((fmt_string + "\n") % args)
 
674
 
 
675
    def progress_bar(self):
 
676
        return self
 
677
 
 
678
    def nested_progress_bar(self):
 
679
        return self
 
680
 
 
681
    def update(self, message, count=None, total=None):
 
682
        """See progress.ProgressBar.update()."""
 
683
 
 
684
    def get_non_echoed_password(self, prompt):
 
685
        """Get password from stdin without trying to handle the echo mode"""
 
686
        if prompt:
 
687
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
688
        password = self.stdin.readline()
 
689
        if not password:
 
690
            raise EOFError
 
691
        if password[-1] == '\n':
 
692
            password = password[:-1]
 
693
        return password
 
694
 
 
695
 
 
696
class TestCase(unittest.TestCase):
 
697
    """Base class for bzr unit tests.
 
698
    
 
699
    Tests that need access to disk resources should subclass 
 
700
    TestCaseInTempDir not TestCase.
 
701
 
 
702
    Error and debug log messages are redirected from their usual
 
703
    location into a temporary file, the contents of which can be
 
704
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
705
    so that it can also capture file IO.  When the test completes this file
 
706
    is read into memory and removed from disk.
 
707
       
 
708
    There are also convenience functions to invoke bzr's command-line
 
709
    routine, and to build and check bzr trees.
 
710
   
 
711
    In addition to the usual method of overriding tearDown(), this class also
 
712
    allows subclasses to register functions into the _cleanups list, which is
 
713
    run in order as the object is torn down.  It's less likely this will be
 
714
    accidentally overlooked.
 
715
    """
 
716
 
 
717
    _log_file_name = None
 
718
    _log_contents = ''
 
719
    _keep_log_file = False
 
720
    # record lsprof data when performing benchmark calls.
 
721
    _gather_lsprof_in_benchmarks = False
 
722
 
 
723
    def __init__(self, methodName='testMethod'):
 
724
        super(TestCase, self).__init__(methodName)
 
725
        self._cleanups = []
 
726
 
 
727
    def setUp(self):
 
728
        unittest.TestCase.setUp(self)
 
729
        self._cleanEnvironment()
 
730
        bzrlib.trace.disable_default_logging()
 
731
        self._silenceUI()
 
732
        self._startLogFile()
 
733
        self._benchcalls = []
 
734
        self._benchtime = None
 
735
        self._clear_hooks()
 
736
        self._clear_debug_flags()
 
737
 
 
738
    def _clear_debug_flags(self):
 
739
        """Prevent externally set debug flags affecting tests.
 
740
        
 
741
        Tests that want to use debug flags can just set them in the
 
742
        debug_flags set during setup/teardown.
 
743
        """
 
744
        self._preserved_debug_flags = set(debug.debug_flags)
 
745
        debug.debug_flags.clear()
 
746
        self.addCleanup(self._restore_debug_flags)
 
747
 
 
748
    def _clear_hooks(self):
 
749
        # prevent hooks affecting tests
 
750
        import bzrlib.branch
 
751
        import bzrlib.smart.server
 
752
        self._preserved_hooks = {
 
753
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
754
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
755
            }
 
756
        self.addCleanup(self._restoreHooks)
 
757
        # reset all hooks to an empty instance of the appropriate type
 
758
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
759
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
760
 
 
761
    def _silenceUI(self):
 
762
        """Turn off UI for duration of test"""
 
763
        # by default the UI is off; tests can turn it on if they want it.
 
764
        saved = ui.ui_factory
 
765
        def _restore():
 
766
            ui.ui_factory = saved
 
767
        ui.ui_factory = ui.SilentUIFactory()
 
768
        self.addCleanup(_restore)
 
769
 
 
770
    def _ndiff_strings(self, a, b):
 
771
        """Return ndiff between two strings containing lines.
 
772
        
 
773
        A trailing newline is added if missing to make the strings
 
774
        print properly."""
 
775
        if b and b[-1] != '\n':
 
776
            b += '\n'
 
777
        if a and a[-1] != '\n':
 
778
            a += '\n'
 
779
        difflines = difflib.ndiff(a.splitlines(True),
 
780
                                  b.splitlines(True),
 
781
                                  linejunk=lambda x: False,
 
782
                                  charjunk=lambda x: False)
 
783
        return ''.join(difflines)
 
784
 
 
785
    def assertEqual(self, a, b, message=''):
 
786
        try:
 
787
            if a == b:
 
788
                return
 
789
        except UnicodeError, e:
 
790
            # If we can't compare without getting a UnicodeError, then
 
791
            # obviously they are different
 
792
            mutter('UnicodeError: %s', e)
 
793
        if message:
 
794
            message += '\n'
 
795
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
796
            % (message,
 
797
               pformat(a), pformat(b)))
 
798
 
 
799
    assertEquals = assertEqual
 
800
 
 
801
    def assertEqualDiff(self, a, b, message=None):
 
802
        """Assert two texts are equal, if not raise an exception.
 
803
        
 
804
        This is intended for use with multi-line strings where it can 
 
805
        be hard to find the differences by eye.
 
806
        """
 
807
        # TODO: perhaps override assertEquals to call this for strings?
 
808
        if a == b:
 
809
            return
 
810
        if message is None:
 
811
            message = "texts not equal:\n"
 
812
        raise AssertionError(message +
 
813
                             self._ndiff_strings(a, b))
 
814
        
 
815
    def assertEqualMode(self, mode, mode_test):
 
816
        self.assertEqual(mode, mode_test,
 
817
                         'mode mismatch %o != %o' % (mode, mode_test))
 
818
 
 
819
    def assertPositive(self, val):
 
820
        """Assert that val is greater than 0."""
 
821
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
822
 
 
823
    def assertNegative(self, val):
 
824
        """Assert that val is less than 0."""
 
825
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
826
 
 
827
    def assertStartsWith(self, s, prefix):
 
828
        if not s.startswith(prefix):
 
829
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
830
 
 
831
    def assertEndsWith(self, s, suffix):
 
832
        """Asserts that s ends with suffix."""
 
833
        if not s.endswith(suffix):
 
834
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
835
 
 
836
    def assertContainsRe(self, haystack, needle_re):
 
837
        """Assert that a contains something matching a regular expression."""
 
838
        if not re.search(needle_re, haystack):
 
839
            if '\n' in haystack or len(haystack) > 60:
 
840
                # a long string, format it in a more readable way
 
841
                raise AssertionError(
 
842
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
843
                        % (needle_re, haystack))
 
844
            else:
 
845
                raise AssertionError('pattern "%s" not found in "%s"'
 
846
                        % (needle_re, haystack))
 
847
 
 
848
    def assertNotContainsRe(self, haystack, needle_re):
 
849
        """Assert that a does not match a regular expression"""
 
850
        if re.search(needle_re, haystack):
 
851
            raise AssertionError('pattern "%s" found in "%s"'
 
852
                    % (needle_re, haystack))
 
853
 
 
854
    def assertSubset(self, sublist, superlist):
 
855
        """Assert that every entry in sublist is present in superlist."""
 
856
        missing = []
 
857
        for entry in sublist:
 
858
            if entry not in superlist:
 
859
                missing.append(entry)
 
860
        if len(missing) > 0:
 
861
            raise AssertionError("value(s) %r not present in container %r" % 
 
862
                                 (missing, superlist))
 
863
 
 
864
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
865
        """Fail unless excClass is raised when the iterator from func is used.
 
866
        
 
867
        Many functions can return generators this makes sure
 
868
        to wrap them in a list() call to make sure the whole generator
 
869
        is run, and that the proper exception is raised.
 
870
        """
 
871
        try:
 
872
            list(func(*args, **kwargs))
 
873
        except excClass:
 
874
            return
 
875
        else:
 
876
            if getattr(excClass,'__name__', None) is not None:
 
877
                excName = excClass.__name__
 
878
            else:
 
879
                excName = str(excClass)
 
880
            raise self.failureException, "%s not raised" % excName
 
881
 
 
882
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
883
        """Assert that a callable raises a particular exception.
 
884
 
 
885
        :param excClass: As for the except statement, this may be either an
 
886
            exception class, or a tuple of classes.
 
887
        :param callableObj: A callable, will be passed ``*args`` and
 
888
            ``**kwargs``.
 
889
 
 
890
        Returns the exception so that you can examine it.
 
891
        """
 
892
        try:
 
893
            callableObj(*args, **kwargs)
 
894
        except excClass, e:
 
895
            return e
 
896
        else:
 
897
            if getattr(excClass,'__name__', None) is not None:
 
898
                excName = excClass.__name__
 
899
            else:
 
900
                # probably a tuple
 
901
                excName = str(excClass)
 
902
            raise self.failureException, "%s not raised" % excName
 
903
 
 
904
    def assertIs(self, left, right, message=None):
 
905
        if not (left is right):
 
906
            if message is not None:
 
907
                raise AssertionError(message)
 
908
            else:
 
909
                raise AssertionError("%r is not %r." % (left, right))
 
910
 
 
911
    def assertIsNot(self, left, right, message=None):
 
912
        if (left is right):
 
913
            if message is not None:
 
914
                raise AssertionError(message)
 
915
            else:
 
916
                raise AssertionError("%r is %r." % (left, right))
 
917
 
 
918
    def assertTransportMode(self, transport, path, mode):
 
919
        """Fail if a path does not have mode mode.
 
920
        
 
921
        If modes are not supported on this transport, the assertion is ignored.
 
922
        """
 
923
        if not transport._can_roundtrip_unix_modebits():
 
924
            return
 
925
        path_stat = transport.stat(path)
 
926
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
927
        self.assertEqual(mode, actual_mode,
 
928
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
929
 
 
930
    def assertIsInstance(self, obj, kls):
 
931
        """Fail if obj is not an instance of kls"""
 
932
        if not isinstance(obj, kls):
 
933
            self.fail("%r is an instance of %s rather than %s" % (
 
934
                obj, obj.__class__, kls))
 
935
 
 
936
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
937
        """Invoke a test, expecting it to fail for the given reason.
 
938
 
 
939
        This is for assertions that ought to succeed, but currently fail.
 
940
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
941
        about the failure you're expecting.  If a new bug is introduced,
 
942
        AssertionError should be raised, not KnownFailure.
 
943
 
 
944
        Frequently, expectFailure should be followed by an opposite assertion.
 
945
        See example below.
 
946
 
 
947
        Intended to be used with a callable that raises AssertionError as the
 
948
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
949
 
 
950
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
951
        test succeeds.
 
952
 
 
953
        example usage::
 
954
 
 
955
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
956
                             dynamic_val)
 
957
          self.assertEqual(42, dynamic_val)
 
958
 
 
959
          This means that a dynamic_val of 54 will cause the test to raise
 
960
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
961
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
962
          than 54 or 42 will cause an AssertionError.
 
963
        """
 
964
        try:
 
965
            assertion(*args, **kwargs)
 
966
        except AssertionError:
 
967
            raise KnownFailure(reason)
 
968
        else:
 
969
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
970
 
 
971
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
972
        """A helper for callDeprecated and applyDeprecated.
 
973
 
 
974
        :param a_callable: A callable to call.
 
975
        :param args: The positional arguments for the callable
 
976
        :param kwargs: The keyword arguments for the callable
 
977
        :return: A tuple (warnings, result). result is the result of calling
 
978
            a_callable(``*args``, ``**kwargs``).
 
979
        """
 
980
        local_warnings = []
 
981
        def capture_warnings(msg, cls=None, stacklevel=None):
 
982
            # we've hooked into a deprecation specific callpath,
 
983
            # only deprecations should getting sent via it.
 
984
            self.assertEqual(cls, DeprecationWarning)
 
985
            local_warnings.append(msg)
 
986
        original_warning_method = symbol_versioning.warn
 
987
        symbol_versioning.set_warning_method(capture_warnings)
 
988
        try:
 
989
            result = a_callable(*args, **kwargs)
 
990
        finally:
 
991
            symbol_versioning.set_warning_method(original_warning_method)
 
992
        return (local_warnings, result)
 
993
 
 
994
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
995
        """Call a deprecated callable without warning the user.
 
996
 
 
997
        Note that this only captures warnings raised by symbol_versioning.warn,
 
998
        not other callers that go direct to the warning module.
 
999
 
 
1000
        :param deprecation_format: The deprecation format that the callable
 
1001
            should have been deprecated with. This is the same type as the
 
1002
            parameter to deprecated_method/deprecated_function. If the
 
1003
            callable is not deprecated with this format, an assertion error
 
1004
            will be raised.
 
1005
        :param a_callable: A callable to call. This may be a bound method or
 
1006
            a regular function. It will be called with ``*args`` and
 
1007
            ``**kwargs``.
 
1008
        :param args: The positional arguments for the callable
 
1009
        :param kwargs: The keyword arguments for the callable
 
1010
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1011
        """
 
1012
        call_warnings, result = self._capture_warnings(a_callable,
 
1013
            *args, **kwargs)
 
1014
        expected_first_warning = symbol_versioning.deprecation_string(
 
1015
            a_callable, deprecation_format)
 
1016
        if len(call_warnings) == 0:
 
1017
            self.fail("No deprecation warning generated by call to %s" %
 
1018
                a_callable)
 
1019
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1020
        return result
 
1021
 
 
1022
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1023
        """Assert that a callable is deprecated in a particular way.
 
1024
 
 
1025
        This is a very precise test for unusual requirements. The 
 
1026
        applyDeprecated helper function is probably more suited for most tests
 
1027
        as it allows you to simply specify the deprecation format being used
 
1028
        and will ensure that that is issued for the function being called.
 
1029
 
 
1030
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1031
        not other callers that go direct to the warning module.
 
1032
 
 
1033
        :param expected: a list of the deprecation warnings expected, in order
 
1034
        :param callable: The callable to call
 
1035
        :param args: The positional arguments for the callable
 
1036
        :param kwargs: The keyword arguments for the callable
 
1037
        """
 
1038
        call_warnings, result = self._capture_warnings(callable,
 
1039
            *args, **kwargs)
 
1040
        self.assertEqual(expected, call_warnings)
 
1041
        return result
 
1042
 
 
1043
    def _startLogFile(self):
 
1044
        """Send bzr and test log messages to a temporary file.
 
1045
 
 
1046
        The file is removed as the test is torn down.
 
1047
        """
 
1048
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1049
        self._log_file = os.fdopen(fileno, 'w+')
 
1050
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1051
        self._log_file_name = name
 
1052
        self.addCleanup(self._finishLogFile)
 
1053
 
 
1054
    def _finishLogFile(self):
 
1055
        """Finished with the log file.
 
1056
 
 
1057
        Close the file and delete it, unless setKeepLogfile was called.
 
1058
        """
 
1059
        if self._log_file is None:
 
1060
            return
 
1061
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1062
        self._log_file.close()
 
1063
        self._log_file = None
 
1064
        if not self._keep_log_file:
 
1065
            os.remove(self._log_file_name)
 
1066
            self._log_file_name = None
 
1067
 
 
1068
    def setKeepLogfile(self):
 
1069
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1070
        self._keep_log_file = True
 
1071
 
 
1072
    def addCleanup(self, callable):
 
1073
        """Arrange to run a callable when this case is torn down.
 
1074
 
 
1075
        Callables are run in the reverse of the order they are registered, 
 
1076
        ie last-in first-out.
 
1077
        """
 
1078
        if callable in self._cleanups:
 
1079
            raise ValueError("cleanup function %r already registered on %s" 
 
1080
                    % (callable, self))
 
1081
        self._cleanups.append(callable)
 
1082
 
 
1083
    def _cleanEnvironment(self):
 
1084
        new_env = {
 
1085
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1086
            'HOME': os.getcwd(),
 
1087
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1088
            'BZR_EMAIL': None,
 
1089
            'BZREMAIL': None, # may still be present in the environment
 
1090
            'EMAIL': None,
 
1091
            'BZR_PROGRESS_BAR': None,
 
1092
            # SSH Agent
 
1093
            'SSH_AUTH_SOCK': None,
 
1094
            # Proxies
 
1095
            'http_proxy': None,
 
1096
            'HTTP_PROXY': None,
 
1097
            'https_proxy': None,
 
1098
            'HTTPS_PROXY': None,
 
1099
            'no_proxy': None,
 
1100
            'NO_PROXY': None,
 
1101
            'all_proxy': None,
 
1102
            'ALL_PROXY': None,
 
1103
            # Nobody cares about these ones AFAIK. So far at
 
1104
            # least. If you do (care), please update this comment
 
1105
            # -- vila 20061212
 
1106
            'ftp_proxy': None,
 
1107
            'FTP_PROXY': None,
 
1108
            'BZR_REMOTE_PATH': None,
 
1109
        }
 
1110
        self.__old_env = {}
 
1111
        self.addCleanup(self._restoreEnvironment)
 
1112
        for name, value in new_env.iteritems():
 
1113
            self._captureVar(name, value)
 
1114
 
 
1115
    def _captureVar(self, name, newvalue):
 
1116
        """Set an environment variable, and reset it when finished."""
 
1117
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1118
 
 
1119
    def _restore_debug_flags(self):
 
1120
        debug.debug_flags.clear()
 
1121
        debug.debug_flags.update(self._preserved_debug_flags)
 
1122
 
 
1123
    def _restoreEnvironment(self):
 
1124
        for name, value in self.__old_env.iteritems():
 
1125
            osutils.set_or_unset_env(name, value)
 
1126
 
 
1127
    def _restoreHooks(self):
 
1128
        for klass, hooks in self._preserved_hooks.items():
 
1129
            setattr(klass, 'hooks', hooks)
 
1130
 
 
1131
    def knownFailure(self, reason):
 
1132
        """This test has failed for some known reason."""
 
1133
        raise KnownFailure(reason)
 
1134
 
 
1135
    def run(self, result=None):
 
1136
        if result is None: result = self.defaultTestResult()
 
1137
        for feature in getattr(self, '_test_needs_features', []):
 
1138
            if not feature.available():
 
1139
                result.startTest(self)
 
1140
                if getattr(result, 'addNotSupported', None):
 
1141
                    result.addNotSupported(self, feature)
 
1142
                else:
 
1143
                    result.addSuccess(self)
 
1144
                result.stopTest(self)
 
1145
                return
 
1146
        return unittest.TestCase.run(self, result)
 
1147
 
 
1148
    def tearDown(self):
 
1149
        self._runCleanups()
 
1150
        unittest.TestCase.tearDown(self)
 
1151
 
 
1152
    def time(self, callable, *args, **kwargs):
 
1153
        """Run callable and accrue the time it takes to the benchmark time.
 
1154
        
 
1155
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1156
        this will cause lsprofile statistics to be gathered and stored in
 
1157
        self._benchcalls.
 
1158
        """
 
1159
        if self._benchtime is None:
 
1160
            self._benchtime = 0
 
1161
        start = time.time()
 
1162
        try:
 
1163
            if not self._gather_lsprof_in_benchmarks:
 
1164
                return callable(*args, **kwargs)
 
1165
            else:
 
1166
                # record this benchmark
 
1167
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1168
                stats.sort()
 
1169
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1170
                return ret
 
1171
        finally:
 
1172
            self._benchtime += time.time() - start
 
1173
 
 
1174
    def _runCleanups(self):
 
1175
        """Run registered cleanup functions. 
 
1176
 
 
1177
        This should only be called from TestCase.tearDown.
 
1178
        """
 
1179
        # TODO: Perhaps this should keep running cleanups even if 
 
1180
        # one of them fails?
 
1181
 
 
1182
        # Actually pop the cleanups from the list so tearDown running
 
1183
        # twice is safe (this happens for skipped tests).
 
1184
        while self._cleanups:
 
1185
            self._cleanups.pop()()
 
1186
 
 
1187
    def log(self, *args):
 
1188
        mutter(*args)
 
1189
 
 
1190
    def _get_log(self, keep_log_file=False):
 
1191
        """Return as a string the log for this test. If the file is still
 
1192
        on disk and keep_log_file=False, delete the log file and store the
 
1193
        content in self._log_contents."""
 
1194
        # flush the log file, to get all content
 
1195
        import bzrlib.trace
 
1196
        bzrlib.trace._trace_file.flush()
 
1197
        if self._log_contents:
 
1198
            return self._log_contents
 
1199
        if self._log_file_name is not None:
 
1200
            logfile = open(self._log_file_name)
 
1201
            try:
 
1202
                log_contents = logfile.read()
 
1203
            finally:
 
1204
                logfile.close()
 
1205
            if not keep_log_file:
 
1206
                self._log_contents = log_contents
 
1207
                try:
 
1208
                    os.remove(self._log_file_name)
 
1209
                except OSError, e:
 
1210
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1211
                        print >>sys.stderr, ('Unable to delete log file '
 
1212
                                             ' %r' % self._log_file_name)
 
1213
                    else:
 
1214
                        raise
 
1215
            return log_contents
 
1216
        else:
 
1217
            return "DELETED log file to reduce memory footprint"
 
1218
 
 
1219
    @deprecated_method(zero_eighteen)
 
1220
    def capture(self, cmd, retcode=0):
 
1221
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
1222
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
1223
 
 
1224
    def requireFeature(self, feature):
 
1225
        """This test requires a specific feature is available.
 
1226
 
 
1227
        :raises UnavailableFeature: When feature is not available.
 
1228
        """
 
1229
        if not feature.available():
 
1230
            raise UnavailableFeature(feature)
 
1231
 
 
1232
    @deprecated_method(zero_eighteen)
 
1233
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1234
                         working_dir=None):
 
1235
        """Invoke bzr and return (stdout, stderr).
 
1236
 
 
1237
        Don't call this method, just use run_bzr() which is equivalent.
 
1238
 
 
1239
        :param argv: Arguments to invoke bzr.  This may be either a 
 
1240
            single string, in which case it is split by shlex into words, 
 
1241
            or a list of arguments.
 
1242
        :param retcode: Expected return code, or None for don't-care.
 
1243
        :param encoding: Encoding for sys.stdout and sys.stderr
 
1244
        :param stdin: A string to be used as stdin for the command.
 
1245
        :param working_dir: Change to this directory before running
 
1246
        """
 
1247
        return self._run_bzr_autosplit(argv, retcode=retcode,
 
1248
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1249
                )
 
1250
 
 
1251
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1252
            working_dir):
 
1253
        """Run bazaar command line, splitting up a string command line."""
 
1254
        if isinstance(args, basestring):
 
1255
            args = list(shlex.split(args))
 
1256
        return self._run_bzr_core(args, retcode=retcode,
 
1257
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1258
                )
 
1259
 
 
1260
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1261
            working_dir):
 
1262
        if encoding is None:
 
1263
            encoding = bzrlib.user_encoding
 
1264
        stdout = StringIOWrapper()
 
1265
        stderr = StringIOWrapper()
 
1266
        stdout.encoding = encoding
 
1267
        stderr.encoding = encoding
 
1268
 
 
1269
        self.log('run bzr: %r', args)
 
1270
        # FIXME: don't call into logging here
 
1271
        handler = logging.StreamHandler(stderr)
 
1272
        handler.setLevel(logging.INFO)
 
1273
        logger = logging.getLogger('')
 
1274
        logger.addHandler(handler)
 
1275
        old_ui_factory = ui.ui_factory
 
1276
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1277
 
 
1278
        cwd = None
 
1279
        if working_dir is not None:
 
1280
            cwd = osutils.getcwd()
 
1281
            os.chdir(working_dir)
 
1282
 
 
1283
        try:
 
1284
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1285
                stdout, stderr,
 
1286
                bzrlib.commands.run_bzr_catch_errors,
 
1287
                args)
 
1288
        finally:
 
1289
            logger.removeHandler(handler)
 
1290
            ui.ui_factory = old_ui_factory
 
1291
            if cwd is not None:
 
1292
                os.chdir(cwd)
 
1293
 
 
1294
        out = stdout.getvalue()
 
1295
        err = stderr.getvalue()
 
1296
        if out:
 
1297
            self.log('output:\n%r', out)
 
1298
        if err:
 
1299
            self.log('errors:\n%r', err)
 
1300
        if retcode is not None:
 
1301
            self.assertEquals(retcode, result,
 
1302
                              message='Unexpected return code')
 
1303
        return out, err
 
1304
 
 
1305
    def run_bzr(self, *args, **kwargs):
 
1306
        """Invoke bzr, as if it were run from the command line.
 
1307
 
 
1308
        The argument list should not include the bzr program name - the
 
1309
        first argument is normally the bzr command.  Arguments may be
 
1310
        passed in three ways:
 
1311
 
 
1312
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1313
        when the command contains whitespace or metacharacters, or 
 
1314
        is built up at run time.
 
1315
 
 
1316
        2- A single string, eg "add a".  This is the most convenient 
 
1317
        for hardcoded commands.
 
1318
 
 
1319
        3- Several varargs parameters, eg run_bzr("add", "a").  
 
1320
        This is not recommended for new code.
 
1321
 
 
1322
        This runs bzr through the interface that catches and reports
 
1323
        errors, and with logging set to something approximating the
 
1324
        default, so that error reporting can be checked.
 
1325
 
 
1326
        This should be the main method for tests that want to exercise the
 
1327
        overall behavior of the bzr application (rather than a unit test
 
1328
        or a functional test of the library.)
 
1329
 
 
1330
        This sends the stdout/stderr results into the test's log,
 
1331
        where it may be useful for debugging.  See also run_captured.
 
1332
 
 
1333
        :keyword stdin: A string to be used as stdin for the command.
 
1334
        :keyword retcode: The status code the command should return;
 
1335
            default 0.
 
1336
        :keyword working_dir: The directory to run the command in
 
1337
        :keyword error_regexes: A list of expected error messages.  If
 
1338
            specified they must be seen in the error output of the command.
 
1339
        """
 
1340
        retcode = kwargs.pop('retcode', 0)
 
1341
        encoding = kwargs.pop('encoding', None)
 
1342
        stdin = kwargs.pop('stdin', None)
 
1343
        working_dir = kwargs.pop('working_dir', None)
 
1344
        error_regexes = kwargs.pop('error_regexes', [])
 
1345
 
 
1346
        if len(args) == 1:
 
1347
            if isinstance(args[0], (list, basestring)):
 
1348
                args = args[0]
 
1349
        else:
 
1350
            symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
 
1351
                                   DeprecationWarning, stacklevel=3)
 
1352
 
 
1353
        out, err = self._run_bzr_autosplit(args=args,
 
1354
            retcode=retcode,
 
1355
            encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1356
            )
 
1357
 
 
1358
        for regex in error_regexes:
 
1359
            self.assertContainsRe(err, regex)
 
1360
        return out, err
 
1361
 
 
1362
    def run_bzr_decode(self, *args, **kwargs):
 
1363
        if 'encoding' in kwargs:
 
1364
            encoding = kwargs['encoding']
 
1365
        else:
 
1366
            encoding = bzrlib.user_encoding
 
1367
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1368
 
 
1369
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1370
        """Run bzr, and check that stderr contains the supplied regexes
 
1371
 
 
1372
        :param error_regexes: Sequence of regular expressions which
 
1373
            must each be found in the error output. The relative ordering
 
1374
            is not enforced.
 
1375
        :param args: command-line arguments for bzr
 
1376
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1377
            This function changes the default value of retcode to be 3,
 
1378
            since in most cases this is run when you expect bzr to fail.
 
1379
 
 
1380
        :return: (out, err) The actual output of running the command (in case
 
1381
            you want to do more inspection)
 
1382
 
 
1383
        Examples of use::
 
1384
 
 
1385
            # Make sure that commit is failing because there is nothing to do
 
1386
            self.run_bzr_error(['no changes to commit'],
 
1387
                               'commit', '-m', 'my commit comment')
 
1388
            # Make sure --strict is handling an unknown file, rather than
 
1389
            # giving us the 'nothing to do' error
 
1390
            self.build_tree(['unknown'])
 
1391
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1392
                               'commit', '--strict', '-m', 'my commit comment')
 
1393
        """
 
1394
        kwargs.setdefault('retcode', 3)
 
1395
        kwargs['error_regexes'] = error_regexes
 
1396
        out, err = self.run_bzr(*args, **kwargs)
 
1397
        return out, err
 
1398
 
 
1399
    def run_bzr_subprocess(self, *args, **kwargs):
 
1400
        """Run bzr in a subprocess for testing.
 
1401
 
 
1402
        This starts a new Python interpreter and runs bzr in there. 
 
1403
        This should only be used for tests that have a justifiable need for
 
1404
        this isolation: e.g. they are testing startup time, or signal
 
1405
        handling, or early startup code, etc.  Subprocess code can't be 
 
1406
        profiled or debugged so easily.
 
1407
 
 
1408
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1409
            None is supplied, the status code is not checked.
 
1410
        :keyword env_changes: A dictionary which lists changes to environment
 
1411
            variables. A value of None will unset the env variable.
 
1412
            The values must be strings. The change will only occur in the
 
1413
            child, so you don't need to fix the environment after running.
 
1414
        :keyword universal_newlines: Convert CRLF => LF
 
1415
        :keyword allow_plugins: By default the subprocess is run with
 
1416
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1417
            for system-wide plugins to create unexpected output on stderr,
 
1418
            which can cause unnecessary test failures.
 
1419
        """
 
1420
        env_changes = kwargs.get('env_changes', {})
 
1421
        working_dir = kwargs.get('working_dir', None)
 
1422
        allow_plugins = kwargs.get('allow_plugins', False)
 
1423
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1424
                                            working_dir=working_dir,
 
1425
                                            allow_plugins=allow_plugins)
 
1426
        # We distinguish between retcode=None and retcode not passed.
 
1427
        supplied_retcode = kwargs.get('retcode', 0)
 
1428
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1429
            universal_newlines=kwargs.get('universal_newlines', False),
 
1430
            process_args=args)
 
1431
 
 
1432
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1433
                             skip_if_plan_to_signal=False,
 
1434
                             working_dir=None,
 
1435
                             allow_plugins=False):
 
1436
        """Start bzr in a subprocess for testing.
 
1437
 
 
1438
        This starts a new Python interpreter and runs bzr in there.
 
1439
        This should only be used for tests that have a justifiable need for
 
1440
        this isolation: e.g. they are testing startup time, or signal
 
1441
        handling, or early startup code, etc.  Subprocess code can't be
 
1442
        profiled or debugged so easily.
 
1443
 
 
1444
        :param process_args: a list of arguments to pass to the bzr executable,
 
1445
            for example ``['--version']``.
 
1446
        :param env_changes: A dictionary which lists changes to environment
 
1447
            variables. A value of None will unset the env variable.
 
1448
            The values must be strings. The change will only occur in the
 
1449
            child, so you don't need to fix the environment after running.
 
1450
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1451
            is not available.
 
1452
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1453
 
 
1454
        :returns: Popen object for the started process.
 
1455
        """
 
1456
        if skip_if_plan_to_signal:
 
1457
            if not getattr(os, 'kill', None):
 
1458
                raise TestSkipped("os.kill not available.")
 
1459
 
 
1460
        if env_changes is None:
 
1461
            env_changes = {}
 
1462
        old_env = {}
 
1463
 
 
1464
        def cleanup_environment():
 
1465
            for env_var, value in env_changes.iteritems():
 
1466
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1467
 
 
1468
        def restore_environment():
 
1469
            for env_var, value in old_env.iteritems():
 
1470
                osutils.set_or_unset_env(env_var, value)
 
1471
 
 
1472
        bzr_path = self.get_bzr_path()
 
1473
 
 
1474
        cwd = None
 
1475
        if working_dir is not None:
 
1476
            cwd = osutils.getcwd()
 
1477
            os.chdir(working_dir)
 
1478
 
 
1479
        try:
 
1480
            # win32 subprocess doesn't support preexec_fn
 
1481
            # so we will avoid using it on all platforms, just to
 
1482
            # make sure the code path is used, and we don't break on win32
 
1483
            cleanup_environment()
 
1484
            command = [sys.executable, bzr_path]
 
1485
            if not allow_plugins:
 
1486
                command.append('--no-plugins')
 
1487
            command.extend(process_args)
 
1488
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1489
        finally:
 
1490
            restore_environment()
 
1491
            if cwd is not None:
 
1492
                os.chdir(cwd)
 
1493
 
 
1494
        return process
 
1495
 
 
1496
    def _popen(self, *args, **kwargs):
 
1497
        """Place a call to Popen.
 
1498
 
 
1499
        Allows tests to override this method to intercept the calls made to
 
1500
        Popen for introspection.
 
1501
        """
 
1502
        return Popen(*args, **kwargs)
 
1503
 
 
1504
    def get_bzr_path(self):
 
1505
        """Return the path of the 'bzr' executable for this test suite."""
 
1506
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1507
        if not os.path.isfile(bzr_path):
 
1508
            # We are probably installed. Assume sys.argv is the right file
 
1509
            bzr_path = sys.argv[0]
 
1510
        return bzr_path
 
1511
 
 
1512
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1513
                              universal_newlines=False, process_args=None):
 
1514
        """Finish the execution of process.
 
1515
 
 
1516
        :param process: the Popen object returned from start_bzr_subprocess.
 
1517
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1518
            None is supplied, the status code is not checked.
 
1519
        :param send_signal: an optional signal to send to the process.
 
1520
        :param universal_newlines: Convert CRLF => LF
 
1521
        :returns: (stdout, stderr)
 
1522
        """
 
1523
        if send_signal is not None:
 
1524
            os.kill(process.pid, send_signal)
 
1525
        out, err = process.communicate()
 
1526
 
 
1527
        if universal_newlines:
 
1528
            out = out.replace('\r\n', '\n')
 
1529
            err = err.replace('\r\n', '\n')
 
1530
 
 
1531
        if retcode is not None and retcode != process.returncode:
 
1532
            if process_args is None:
 
1533
                process_args = "(unknown args)"
 
1534
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1535
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1536
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1537
                      % (process_args, retcode, process.returncode))
 
1538
        return [out, err]
 
1539
 
 
1540
    def check_inventory_shape(self, inv, shape):
 
1541
        """Compare an inventory to a list of expected names.
 
1542
 
 
1543
        Fail if they are not precisely equal.
 
1544
        """
 
1545
        extras = []
 
1546
        shape = list(shape)             # copy
 
1547
        for path, ie in inv.entries():
 
1548
            name = path.replace('\\', '/')
 
1549
            if ie.kind == 'directory':
 
1550
                name = name + '/'
 
1551
            if name in shape:
 
1552
                shape.remove(name)
 
1553
            else:
 
1554
                extras.append(name)
 
1555
        if shape:
 
1556
            self.fail("expected paths not found in inventory: %r" % shape)
 
1557
        if extras:
 
1558
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1559
 
 
1560
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1561
                         a_callable=None, *args, **kwargs):
 
1562
        """Call callable with redirected std io pipes.
 
1563
 
 
1564
        Returns the return code."""
 
1565
        if not callable(a_callable):
 
1566
            raise ValueError("a_callable must be callable.")
 
1567
        if stdin is None:
 
1568
            stdin = StringIO("")
 
1569
        if stdout is None:
 
1570
            if getattr(self, "_log_file", None) is not None:
 
1571
                stdout = self._log_file
 
1572
            else:
 
1573
                stdout = StringIO()
 
1574
        if stderr is None:
 
1575
            if getattr(self, "_log_file", None is not None):
 
1576
                stderr = self._log_file
 
1577
            else:
 
1578
                stderr = StringIO()
 
1579
        real_stdin = sys.stdin
 
1580
        real_stdout = sys.stdout
 
1581
        real_stderr = sys.stderr
 
1582
        try:
 
1583
            sys.stdout = stdout
 
1584
            sys.stderr = stderr
 
1585
            sys.stdin = stdin
 
1586
            return a_callable(*args, **kwargs)
 
1587
        finally:
 
1588
            sys.stdout = real_stdout
 
1589
            sys.stderr = real_stderr
 
1590
            sys.stdin = real_stdin
 
1591
 
 
1592
    def reduceLockdirTimeout(self):
 
1593
        """Reduce the default lock timeout for the duration of the test, so that
 
1594
        if LockContention occurs during a test, it does so quickly.
 
1595
 
 
1596
        Tests that expect to provoke LockContention errors should call this.
 
1597
        """
 
1598
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1599
        def resetTimeout():
 
1600
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1601
        self.addCleanup(resetTimeout)
 
1602
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1603
 
 
1604
 
 
1605
class TestCaseWithMemoryTransport(TestCase):
 
1606
    """Common test class for tests that do not need disk resources.
 
1607
 
 
1608
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1609
 
 
1610
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1611
 
 
1612
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1613
    a directory which does not exist. This serves to help ensure test isolation
 
1614
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1615
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1616
    file defaults for the transport in tests, nor does it obey the command line
 
1617
    override, so tests that accidentally write to the common directory should
 
1618
    be rare.
 
1619
 
 
1620
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1621
    a .bzr directory that stops us ascending higher into the filesystem.
 
1622
    """
 
1623
 
 
1624
    TEST_ROOT = None
 
1625
    _TEST_NAME = 'test'
 
1626
 
 
1627
    def __init__(self, methodName='runTest'):
 
1628
        # allow test parameterisation after test construction and before test
 
1629
        # execution. Variables that the parameteriser sets need to be 
 
1630
        # ones that are not set by setUp, or setUp will trash them.
 
1631
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1632
        self.vfs_transport_factory = default_transport
 
1633
        self.transport_server = None
 
1634
        self.transport_readonly_server = None
 
1635
        self.__vfs_server = None
 
1636
 
 
1637
    def get_transport(self, relpath=None):
 
1638
        """Return a writeable transport.
 
1639
 
 
1640
        This transport is for the test scratch space relative to
 
1641
        "self._test_root""
 
1642
        
 
1643
        :param relpath: a path relative to the base url.
 
1644
        """
 
1645
        t = get_transport(self.get_url(relpath))
 
1646
        self.assertFalse(t.is_readonly())
 
1647
        return t
 
1648
 
 
1649
    def get_readonly_transport(self, relpath=None):
 
1650
        """Return a readonly transport for the test scratch space
 
1651
        
 
1652
        This can be used to test that operations which should only need
 
1653
        readonly access in fact do not try to write.
 
1654
 
 
1655
        :param relpath: a path relative to the base url.
 
1656
        """
 
1657
        t = get_transport(self.get_readonly_url(relpath))
 
1658
        self.assertTrue(t.is_readonly())
 
1659
        return t
 
1660
 
 
1661
    def create_transport_readonly_server(self):
 
1662
        """Create a transport server from class defined at init.
 
1663
 
 
1664
        This is mostly a hook for daughter classes.
 
1665
        """
 
1666
        return self.transport_readonly_server()
 
1667
 
 
1668
    def get_readonly_server(self):
 
1669
        """Get the server instance for the readonly transport
 
1670
 
 
1671
        This is useful for some tests with specific servers to do diagnostics.
 
1672
        """
 
1673
        if self.__readonly_server is None:
 
1674
            if self.transport_readonly_server is None:
 
1675
                # readonly decorator requested
 
1676
                # bring up the server
 
1677
                self.__readonly_server = ReadonlyServer()
 
1678
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1679
            else:
 
1680
                self.__readonly_server = self.create_transport_readonly_server()
 
1681
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1682
            self.addCleanup(self.__readonly_server.tearDown)
 
1683
        return self.__readonly_server
 
1684
 
 
1685
    def get_readonly_url(self, relpath=None):
 
1686
        """Get a URL for the readonly transport.
 
1687
 
 
1688
        This will either be backed by '.' or a decorator to the transport 
 
1689
        used by self.get_url()
 
1690
        relpath provides for clients to get a path relative to the base url.
 
1691
        These should only be downwards relative, not upwards.
 
1692
        """
 
1693
        base = self.get_readonly_server().get_url()
 
1694
        return self._adjust_url(base, relpath)
 
1695
 
 
1696
    def get_vfs_only_server(self):
 
1697
        """Get the vfs only read/write server instance.
 
1698
 
 
1699
        This is useful for some tests with specific servers that need
 
1700
        diagnostics.
 
1701
 
 
1702
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1703
        is no means to override it.
 
1704
        """
 
1705
        if self.__vfs_server is None:
 
1706
            self.__vfs_server = MemoryServer()
 
1707
            self.__vfs_server.setUp()
 
1708
            self.addCleanup(self.__vfs_server.tearDown)
 
1709
        return self.__vfs_server
 
1710
 
 
1711
    def get_server(self):
 
1712
        """Get the read/write server instance.
 
1713
 
 
1714
        This is useful for some tests with specific servers that need
 
1715
        diagnostics.
 
1716
 
 
1717
        This is built from the self.transport_server factory. If that is None,
 
1718
        then the self.get_vfs_server is returned.
 
1719
        """
 
1720
        if self.__server is None:
 
1721
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1722
                return self.get_vfs_only_server()
 
1723
            else:
 
1724
                # bring up a decorated means of access to the vfs only server.
 
1725
                self.__server = self.transport_server()
 
1726
                try:
 
1727
                    self.__server.setUp(self.get_vfs_only_server())
 
1728
                except TypeError, e:
 
1729
                    # This should never happen; the try:Except here is to assist
 
1730
                    # developers having to update code rather than seeing an
 
1731
                    # uninformative TypeError.
 
1732
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1733
            self.addCleanup(self.__server.tearDown)
 
1734
        return self.__server
 
1735
 
 
1736
    def _adjust_url(self, base, relpath):
 
1737
        """Get a URL (or maybe a path) for the readwrite transport.
 
1738
 
 
1739
        This will either be backed by '.' or to an equivalent non-file based
 
1740
        facility.
 
1741
        relpath provides for clients to get a path relative to the base url.
 
1742
        These should only be downwards relative, not upwards.
 
1743
        """
 
1744
        if relpath is not None and relpath != '.':
 
1745
            if not base.endswith('/'):
 
1746
                base = base + '/'
 
1747
            # XXX: Really base should be a url; we did after all call
 
1748
            # get_url()!  But sometimes it's just a path (from
 
1749
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1750
            # to a non-escaped local path.
 
1751
            if base.startswith('./') or base.startswith('/'):
 
1752
                base += relpath
 
1753
            else:
 
1754
                base += urlutils.escape(relpath)
 
1755
        return base
 
1756
 
 
1757
    def get_url(self, relpath=None):
 
1758
        """Get a URL (or maybe a path) for the readwrite transport.
 
1759
 
 
1760
        This will either be backed by '.' or to an equivalent non-file based
 
1761
        facility.
 
1762
        relpath provides for clients to get a path relative to the base url.
 
1763
        These should only be downwards relative, not upwards.
 
1764
        """
 
1765
        base = self.get_server().get_url()
 
1766
        return self._adjust_url(base, relpath)
 
1767
 
 
1768
    def get_vfs_only_url(self, relpath=None):
 
1769
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1770
 
 
1771
        This will never be a smart protocol.  It always has all the
 
1772
        capabilities of the local filesystem, but it might actually be a
 
1773
        MemoryTransport or some other similar virtual filesystem.
 
1774
 
 
1775
        This is the backing transport (if any) of the server returned by
 
1776
        get_url and get_readonly_url.
 
1777
 
 
1778
        :param relpath: provides for clients to get a path relative to the base
 
1779
            url.  These should only be downwards relative, not upwards.
 
1780
        :return: A URL
 
1781
        """
 
1782
        base = self.get_vfs_only_server().get_url()
 
1783
        return self._adjust_url(base, relpath)
 
1784
 
 
1785
    def _make_test_root(self):
 
1786
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1787
            return
 
1788
        root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1789
        TestCaseWithMemoryTransport.TEST_ROOT = root
 
1790
        
 
1791
        # make a fake bzr directory there to prevent any tests propagating
 
1792
        # up onto the source directory's real branch
 
1793
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1794
 
 
1795
        # The same directory is used by all tests, and we're not specifically
 
1796
        # told when all tests are finished.  This will do.
 
1797
        atexit.register(_rmtree_temp_dir, root)
 
1798
 
 
1799
    def makeAndChdirToTestDir(self):
 
1800
        """Create a temporary directories for this one test.
 
1801
        
 
1802
        This must set self.test_home_dir and self.test_dir and chdir to
 
1803
        self.test_dir.
 
1804
        
 
1805
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1806
        """
 
1807
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1808
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1809
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1810
        
 
1811
    def make_branch(self, relpath, format=None):
 
1812
        """Create a branch on the transport at relpath."""
 
1813
        repo = self.make_repository(relpath, format=format)
 
1814
        return repo.bzrdir.create_branch()
 
1815
 
 
1816
    def make_bzrdir(self, relpath, format=None):
 
1817
        try:
 
1818
            # might be a relative or absolute path
 
1819
            maybe_a_url = self.get_url(relpath)
 
1820
            segments = maybe_a_url.rsplit('/', 1)
 
1821
            t = get_transport(maybe_a_url)
 
1822
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1823
                t.ensure_base()
 
1824
            if format is None:
 
1825
                format = 'default'
 
1826
            if isinstance(format, basestring):
 
1827
                format = bzrdir.format_registry.make_bzrdir(format)
 
1828
            return format.initialize_on_transport(t)
 
1829
        except errors.UninitializableFormat:
 
1830
            raise TestSkipped("Format %s is not initializable." % format)
 
1831
 
 
1832
    def make_repository(self, relpath, shared=False, format=None):
 
1833
        """Create a repository on our default transport at relpath.
 
1834
        
 
1835
        Note that relpath must be a relative path, not a full url.
 
1836
        """
 
1837
        # FIXME: If you create a remoterepository this returns the underlying
 
1838
        # real format, which is incorrect.  Actually we should make sure that 
 
1839
        # RemoteBzrDir returns a RemoteRepository.
 
1840
        # maybe  mbp 20070410
 
1841
        made_control = self.make_bzrdir(relpath, format=format)
 
1842
        return made_control.create_repository(shared=shared)
 
1843
 
 
1844
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1845
        """Create a branch on the default transport and a MemoryTree for it."""
 
1846
        b = self.make_branch(relpath, format=format)
 
1847
        return memorytree.MemoryTree.create_on_branch(b)
 
1848
 
 
1849
    def overrideEnvironmentForTesting(self):
 
1850
        os.environ['HOME'] = self.test_home_dir
 
1851
        os.environ['BZR_HOME'] = self.test_home_dir
 
1852
        
 
1853
    def setUp(self):
 
1854
        super(TestCaseWithMemoryTransport, self).setUp()
 
1855
        self._make_test_root()
 
1856
        _currentdir = os.getcwdu()
 
1857
        def _leaveDirectory():
 
1858
            os.chdir(_currentdir)
 
1859
        self.addCleanup(_leaveDirectory)
 
1860
        self.makeAndChdirToTestDir()
 
1861
        self.overrideEnvironmentForTesting()
 
1862
        self.__readonly_server = None
 
1863
        self.__server = None
 
1864
        self.reduceLockdirTimeout()
 
1865
 
 
1866
     
 
1867
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1868
    """Derived class that runs a test within a temporary directory.
 
1869
 
 
1870
    This is useful for tests that need to create a branch, etc.
 
1871
 
 
1872
    The directory is created in a slightly complex way: for each
 
1873
    Python invocation, a new temporary top-level directory is created.
 
1874
    All test cases create their own directory within that.  If the
 
1875
    tests complete successfully, the directory is removed.
 
1876
 
 
1877
    :ivar test_base_dir: The path of the top-level directory for this 
 
1878
    test, which contains a home directory and a work directory.
 
1879
 
 
1880
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
1881
    which is used as $HOME for this test.
 
1882
 
 
1883
    :ivar test_dir: A directory under test_base_dir used as the current
 
1884
    directory when the test proper is run.
 
1885
    """
 
1886
 
 
1887
    OVERRIDE_PYTHON = 'python'
 
1888
 
 
1889
    def check_file_contents(self, filename, expect):
 
1890
        self.log("check contents of file %s" % filename)
 
1891
        contents = file(filename, 'r').read()
 
1892
        if contents != expect:
 
1893
            self.log("expected: %r" % expect)
 
1894
            self.log("actually: %r" % contents)
 
1895
            self.fail("contents of %s not as expected" % filename)
 
1896
 
 
1897
    def makeAndChdirToTestDir(self):
 
1898
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1899
        
 
1900
        For TestCaseInTempDir we create a temporary directory based on the test
 
1901
        name and then create two subdirs - test and home under it.
 
1902
        """
 
1903
        # create a directory within the top level test directory
 
1904
        candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
 
1905
        # now create test and home directories within this dir
 
1906
        self.test_base_dir = candidate_dir
 
1907
        self.test_home_dir = self.test_base_dir + '/home'
 
1908
        os.mkdir(self.test_home_dir)
 
1909
        self.test_dir = self.test_base_dir + '/work'
 
1910
        os.mkdir(self.test_dir)
 
1911
        os.chdir(self.test_dir)
 
1912
        # put name of test inside
 
1913
        f = file(self.test_base_dir + '/name', 'w')
 
1914
        try:
 
1915
            f.write(self.id())
 
1916
        finally:
 
1917
            f.close()
 
1918
        self.addCleanup(self.deleteTestDir)
 
1919
 
 
1920
    def deleteTestDir(self):
 
1921
        os.chdir(self.TEST_ROOT)
 
1922
        _rmtree_temp_dir(self.test_base_dir)
 
1923
 
 
1924
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1925
        """Build a test tree according to a pattern.
 
1926
 
 
1927
        shape is a sequence of file specifications.  If the final
 
1928
        character is '/', a directory is created.
 
1929
 
 
1930
        This assumes that all the elements in the tree being built are new.
 
1931
 
 
1932
        This doesn't add anything to a branch.
 
1933
 
 
1934
        :param line_endings: Either 'binary' or 'native'
 
1935
            in binary mode, exact contents are written in native mode, the
 
1936
            line endings match the default platform endings.
 
1937
        :param transport: A transport to write to, for building trees on VFS's.
 
1938
            If the transport is readonly or None, "." is opened automatically.
 
1939
        :return: None
 
1940
        """
 
1941
        # It's OK to just create them using forward slashes on windows.
 
1942
        if transport is None or transport.is_readonly():
 
1943
            transport = get_transport(".")
 
1944
        for name in shape:
 
1945
            self.assert_(isinstance(name, basestring))
 
1946
            if name[-1] == '/':
 
1947
                transport.mkdir(urlutils.escape(name[:-1]))
 
1948
            else:
 
1949
                if line_endings == 'binary':
 
1950
                    end = '\n'
 
1951
                elif line_endings == 'native':
 
1952
                    end = os.linesep
 
1953
                else:
 
1954
                    raise errors.BzrError(
 
1955
                        'Invalid line ending request %r' % line_endings)
 
1956
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1957
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1958
 
 
1959
    def build_tree_contents(self, shape):
 
1960
        build_tree_contents(shape)
 
1961
 
 
1962
    def assertFileEqual(self, content, path):
 
1963
        """Fail if path does not contain 'content'."""
 
1964
        self.failUnlessExists(path)
 
1965
        f = file(path, 'rb')
 
1966
        try:
 
1967
            s = f.read()
 
1968
        finally:
 
1969
            f.close()
 
1970
        self.assertEqualDiff(content, s)
 
1971
 
 
1972
    def failUnlessExists(self, path):
 
1973
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1974
        if not isinstance(path, basestring):
 
1975
            for p in path:
 
1976
                self.failUnlessExists(p)
 
1977
        else:
 
1978
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1979
 
 
1980
    def failIfExists(self, path):
 
1981
        """Fail if path or paths, which may be abs or relative, exist."""
 
1982
        if not isinstance(path, basestring):
 
1983
            for p in path:
 
1984
                self.failIfExists(p)
 
1985
        else:
 
1986
            self.failIf(osutils.lexists(path),path+" exists")
 
1987
 
 
1988
    def assertInWorkingTree(self,path,root_path='.',tree=None):
 
1989
        """Assert whether path or paths are in the WorkingTree"""
 
1990
        if tree is None:
 
1991
            tree = workingtree.WorkingTree.open(root_path)
 
1992
        if not isinstance(path, basestring):
 
1993
            for p in path:
 
1994
                self.assertInWorkingTree(p,tree=tree)
 
1995
        else:
 
1996
            self.assertIsNot(tree.path2id(path), None,
 
1997
                path+' not in working tree.')
 
1998
 
 
1999
    def assertNotInWorkingTree(self,path,root_path='.',tree=None):
 
2000
        """Assert whether path or paths are not in the WorkingTree"""
 
2001
        if tree is None:
 
2002
            tree = workingtree.WorkingTree.open(root_path)
 
2003
        if not isinstance(path, basestring):
 
2004
            for p in path:
 
2005
                self.assertNotInWorkingTree(p,tree=tree)
 
2006
        else:
 
2007
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2008
 
 
2009
 
 
2010
class TestCaseWithTransport(TestCaseInTempDir):
 
2011
    """A test case that provides get_url and get_readonly_url facilities.
 
2012
 
 
2013
    These back onto two transport servers, one for readonly access and one for
 
2014
    read write access.
 
2015
 
 
2016
    If no explicit class is provided for readonly access, a
 
2017
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2018
    based read write transports.
 
2019
 
 
2020
    If an explicit class is provided for readonly access, that server and the 
 
2021
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2022
    """
 
2023
 
 
2024
    def get_vfs_only_server(self):
 
2025
        """See TestCaseWithMemoryTransport.
 
2026
 
 
2027
        This is useful for some tests with specific servers that need
 
2028
        diagnostics.
 
2029
        """
 
2030
        if self.__vfs_server is None:
 
2031
            self.__vfs_server = self.vfs_transport_factory()
 
2032
            self.__vfs_server.setUp()
 
2033
            self.addCleanup(self.__vfs_server.tearDown)
 
2034
        return self.__vfs_server
 
2035
 
 
2036
    def make_branch_and_tree(self, relpath, format=None):
 
2037
        """Create a branch on the transport and a tree locally.
 
2038
 
 
2039
        If the transport is not a LocalTransport, the Tree can't be created on
 
2040
        the transport.  In that case if the vfs_transport_factory is
 
2041
        LocalURLServer the working tree is created in the local
 
2042
        directory backing the transport, and the returned tree's branch and
 
2043
        repository will also be accessed locally. Otherwise a lightweight
 
2044
        checkout is created and returned.
 
2045
 
 
2046
        :param format: The BzrDirFormat.
 
2047
        :returns: the WorkingTree.
 
2048
        """
 
2049
        # TODO: always use the local disk path for the working tree,
 
2050
        # this obviously requires a format that supports branch references
 
2051
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2052
        # RBC 20060208
 
2053
        b = self.make_branch(relpath, format=format)
 
2054
        try:
 
2055
            return b.bzrdir.create_workingtree()
 
2056
        except errors.NotLocalUrl:
 
2057
            # We can only make working trees locally at the moment.  If the
 
2058
            # transport can't support them, then we keep the non-disk-backed
 
2059
            # branch and create a local checkout.
 
2060
            if self.vfs_transport_factory is LocalURLServer:
 
2061
                # the branch is colocated on disk, we cannot create a checkout.
 
2062
                # hopefully callers will expect this.
 
2063
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2064
                return local_controldir.create_workingtree()
 
2065
            else:
 
2066
                return b.create_checkout(relpath, lightweight=True)
 
2067
 
 
2068
    def assertIsDirectory(self, relpath, transport):
 
2069
        """Assert that relpath within transport is a directory.
 
2070
 
 
2071
        This may not be possible on all transports; in that case it propagates
 
2072
        a TransportNotPossible.
 
2073
        """
 
2074
        try:
 
2075
            mode = transport.stat(relpath).st_mode
 
2076
        except errors.NoSuchFile:
 
2077
            self.fail("path %s is not a directory; no such file"
 
2078
                      % (relpath))
 
2079
        if not stat.S_ISDIR(mode):
 
2080
            self.fail("path %s is not a directory; has mode %#o"
 
2081
                      % (relpath, mode))
 
2082
 
 
2083
    def assertTreesEqual(self, left, right):
 
2084
        """Check that left and right have the same content and properties."""
 
2085
        # we use a tree delta to check for equality of the content, and we
 
2086
        # manually check for equality of other things such as the parents list.
 
2087
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2088
        differences = left.changes_from(right)
 
2089
        self.assertFalse(differences.has_changed(),
 
2090
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2091
 
 
2092
    def setUp(self):
 
2093
        super(TestCaseWithTransport, self).setUp()
 
2094
        self.__vfs_server = None
 
2095
 
 
2096
 
 
2097
class ChrootedTestCase(TestCaseWithTransport):
 
2098
    """A support class that provides readonly urls outside the local namespace.
 
2099
 
 
2100
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2101
    is then we are chrooted already, if it is not then an HttpServer is used
 
2102
    for readonly urls.
 
2103
 
 
2104
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2105
                       be used without needed to redo it when a different 
 
2106
                       subclass is in use ?
 
2107
    """
 
2108
 
 
2109
    def setUp(self):
 
2110
        super(ChrootedTestCase, self).setUp()
 
2111
        if not self.vfs_transport_factory == MemoryServer:
 
2112
            self.transport_readonly_server = HttpServer
 
2113
 
 
2114
 
 
2115
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
 
2116
                       random_order=False):
 
2117
    """Create a test suite by filtering another one.
 
2118
    
 
2119
    :param suite:           the source suite
 
2120
    :param pattern:         pattern that names must match
 
2121
    :param exclude_pattern: pattern that names must not match, if any
 
2122
    :param random_order:    if True, tests in the new suite will be put in
 
2123
                            random order
 
2124
    :returns: the newly created suite
 
2125
    """ 
 
2126
    return sort_suite_by_re(suite, pattern, exclude_pattern,
 
2127
        random_order, False)
 
2128
 
 
2129
 
 
2130
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
2131
                     random_order=False, append_rest=True):
 
2132
    """Create a test suite by sorting another one.
 
2133
    
 
2134
    :param suite:           the source suite
 
2135
    :param pattern:         pattern that names must match in order to go
 
2136
                            first in the new suite
 
2137
    :param exclude_pattern: pattern that names must not match, if any
 
2138
    :param random_order:    if True, tests in the new suite will be put in
 
2139
                            random order
 
2140
    :param append_rest:     if False, pattern is a strict filter and not
 
2141
                            just an ordering directive
 
2142
    :returns: the newly created suite
 
2143
    """ 
 
2144
    first = []
 
2145
    second = []
 
2146
    filter_re = re.compile(pattern)
 
2147
    if exclude_pattern is not None:
 
2148
        exclude_re = re.compile(exclude_pattern)
 
2149
    for test in iter_suite_tests(suite):
 
2150
        test_id = test.id()
 
2151
        if exclude_pattern is None or not exclude_re.search(test_id):
 
2152
            if filter_re.search(test_id):
 
2153
                first.append(test)
 
2154
            elif append_rest:
 
2155
                second.append(test)
 
2156
    if random_order:
 
2157
        random.shuffle(first)
 
2158
        random.shuffle(second)
 
2159
    return TestUtil.TestSuite(first + second)
 
2160
 
 
2161
 
 
2162
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2163
              stop_on_failure=False,
 
2164
              transport=None, lsprof_timed=None, bench_history=None,
 
2165
              matching_tests_first=None,
 
2166
              list_only=False,
 
2167
              random_seed=None,
 
2168
              exclude_pattern=None,
 
2169
              ):
 
2170
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2171
    if verbose:
 
2172
        verbosity = 2
 
2173
    else:
 
2174
        verbosity = 1
 
2175
    runner = TextTestRunner(stream=sys.stdout,
 
2176
                            descriptions=0,
 
2177
                            verbosity=verbosity,
 
2178
                            bench_history=bench_history,
 
2179
                            list_only=list_only,
 
2180
                            )
 
2181
    runner.stop_on_failure=stop_on_failure
 
2182
    # Initialise the random number generator and display the seed used.
 
2183
    # We convert the seed to a long to make it reuseable across invocations.
 
2184
    random_order = False
 
2185
    if random_seed is not None:
 
2186
        random_order = True
 
2187
        if random_seed == "now":
 
2188
            random_seed = long(time.time())
 
2189
        else:
 
2190
            # Convert the seed to a long if we can
 
2191
            try:
 
2192
                random_seed = long(random_seed)
 
2193
            except:
 
2194
                pass
 
2195
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2196
            (random_seed))
 
2197
        random.seed(random_seed)
 
2198
    # Customise the list of tests if requested
 
2199
    if pattern != '.*' or exclude_pattern is not None or random_order:
 
2200
        if matching_tests_first:
 
2201
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
 
2202
                random_order)
 
2203
        else:
 
2204
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
 
2205
                random_order)
 
2206
    result = runner.run(suite)
 
2207
    return result.wasSuccessful()
 
2208
 
 
2209
 
 
2210
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2211
             transport=None,
 
2212
             test_suite_factory=None,
 
2213
             lsprof_timed=None,
 
2214
             bench_history=None,
 
2215
             matching_tests_first=None,
 
2216
             list_only=False,
 
2217
             random_seed=None,
 
2218
             exclude_pattern=None):
 
2219
    """Run the whole test suite under the enhanced runner"""
 
2220
    # XXX: Very ugly way to do this...
 
2221
    # Disable warning about old formats because we don't want it to disturb
 
2222
    # any blackbox tests.
 
2223
    from bzrlib import repository
 
2224
    repository._deprecation_warning_done = True
 
2225
 
 
2226
    global default_transport
 
2227
    if transport is None:
 
2228
        transport = default_transport
 
2229
    old_transport = default_transport
 
2230
    default_transport = transport
 
2231
    try:
 
2232
        if test_suite_factory is None:
 
2233
            suite = test_suite()
 
2234
        else:
 
2235
            suite = test_suite_factory()
 
2236
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2237
                     stop_on_failure=stop_on_failure,
 
2238
                     transport=transport,
 
2239
                     lsprof_timed=lsprof_timed,
 
2240
                     bench_history=bench_history,
 
2241
                     matching_tests_first=matching_tests_first,
 
2242
                     list_only=list_only,
 
2243
                     random_seed=random_seed,
 
2244
                     exclude_pattern=exclude_pattern)
 
2245
    finally:
 
2246
        default_transport = old_transport
 
2247
 
 
2248
 
 
2249
def test_suite():
 
2250
    """Build and return TestSuite for the whole of bzrlib.
 
2251
    
 
2252
    This function can be replaced if you need to change the default test
 
2253
    suite on a global basis, but it is not encouraged.
 
2254
    """
 
2255
    testmod_names = [
 
2256
                   'bzrlib.util.tests.test_bencode',
 
2257
                   'bzrlib.tests.test__dirstate_helpers',
 
2258
                   'bzrlib.tests.test_ancestry',
 
2259
                   'bzrlib.tests.test_annotate',
 
2260
                   'bzrlib.tests.test_api',
 
2261
                   'bzrlib.tests.test_atomicfile',
 
2262
                   'bzrlib.tests.test_bad_files',
 
2263
                   'bzrlib.tests.test_branch',
 
2264
                   'bzrlib.tests.test_branchbuilder',
 
2265
                   'bzrlib.tests.test_bugtracker',
 
2266
                   'bzrlib.tests.test_bundle',
 
2267
                   'bzrlib.tests.test_bzrdir',
 
2268
                   'bzrlib.tests.test_cache_utf8',
 
2269
                   'bzrlib.tests.test_commands',
 
2270
                   'bzrlib.tests.test_commit',
 
2271
                   'bzrlib.tests.test_commit_merge',
 
2272
                   'bzrlib.tests.test_config',
 
2273
                   'bzrlib.tests.test_conflicts',
 
2274
                   'bzrlib.tests.test_counted_lock',
 
2275
                   'bzrlib.tests.test_decorators',
 
2276
                   'bzrlib.tests.test_delta',
 
2277
                   'bzrlib.tests.test_deprecated_graph',
 
2278
                   'bzrlib.tests.test_diff',
 
2279
                   'bzrlib.tests.test_dirstate',
 
2280
                   'bzrlib.tests.test_email_message',
 
2281
                   'bzrlib.tests.test_errors',
 
2282
                   'bzrlib.tests.test_escaped_store',
 
2283
                   'bzrlib.tests.test_extract',
 
2284
                   'bzrlib.tests.test_fetch',
 
2285
                   'bzrlib.tests.test_file_names',
 
2286
                   'bzrlib.tests.test_ftp_transport',
 
2287
                   'bzrlib.tests.test_generate_docs',
 
2288
                   'bzrlib.tests.test_generate_ids',
 
2289
                   'bzrlib.tests.test_globbing',
 
2290
                   'bzrlib.tests.test_gpg',
 
2291
                   'bzrlib.tests.test_graph',
 
2292
                   'bzrlib.tests.test_hashcache',
 
2293
                   'bzrlib.tests.test_help',
 
2294
                   'bzrlib.tests.test_hooks',
 
2295
                   'bzrlib.tests.test_http',
 
2296
                   'bzrlib.tests.test_http_response',
 
2297
                   'bzrlib.tests.test_https_ca_bundle',
 
2298
                   'bzrlib.tests.test_identitymap',
 
2299
                   'bzrlib.tests.test_ignores',
 
2300
                   'bzrlib.tests.test_index',
 
2301
                   'bzrlib.tests.test_info',
 
2302
                   'bzrlib.tests.test_inv',
 
2303
                   'bzrlib.tests.test_knit',
 
2304
                   'bzrlib.tests.test_lazy_import',
 
2305
                   'bzrlib.tests.test_lazy_regex',
 
2306
                   'bzrlib.tests.test_lockdir',
 
2307
                   'bzrlib.tests.test_lockable_files',
 
2308
                   'bzrlib.tests.test_log',
 
2309
                   'bzrlib.tests.test_lsprof',
 
2310
                   'bzrlib.tests.test_memorytree',
 
2311
                   'bzrlib.tests.test_merge',
 
2312
                   'bzrlib.tests.test_merge3',
 
2313
                   'bzrlib.tests.test_merge_core',
 
2314
                   'bzrlib.tests.test_merge_directive',
 
2315
                   'bzrlib.tests.test_missing',
 
2316
                   'bzrlib.tests.test_msgeditor',
 
2317
                   'bzrlib.tests.test_multiparent',
 
2318
                   'bzrlib.tests.test_nonascii',
 
2319
                   'bzrlib.tests.test_options',
 
2320
                   'bzrlib.tests.test_osutils',
 
2321
                   'bzrlib.tests.test_osutils_encodings',
 
2322
                   'bzrlib.tests.test_pack',
 
2323
                   'bzrlib.tests.test_patch',
 
2324
                   'bzrlib.tests.test_patches',
 
2325
                   'bzrlib.tests.test_permissions',
 
2326
                   'bzrlib.tests.test_plugins',
 
2327
                   'bzrlib.tests.test_progress',
 
2328
                   'bzrlib.tests.test_reconcile',
 
2329
                   'bzrlib.tests.test_registry',
 
2330
                   'bzrlib.tests.test_remote',
 
2331
                   'bzrlib.tests.test_repository',
 
2332
                   'bzrlib.tests.test_revert',
 
2333
                   'bzrlib.tests.test_revision',
 
2334
                   'bzrlib.tests.test_revisionnamespaces',
 
2335
                   'bzrlib.tests.test_revisiontree',
 
2336
                   'bzrlib.tests.test_rio',
 
2337
                   'bzrlib.tests.test_sampler',
 
2338
                   'bzrlib.tests.test_selftest',
 
2339
                   'bzrlib.tests.test_setup',
 
2340
                   'bzrlib.tests.test_sftp_transport',
 
2341
                   'bzrlib.tests.test_smart',
 
2342
                   'bzrlib.tests.test_smart_add',
 
2343
                   'bzrlib.tests.test_smart_transport',
 
2344
                   'bzrlib.tests.test_smtp_connection',
 
2345
                   'bzrlib.tests.test_source',
 
2346
                   'bzrlib.tests.test_ssh_transport',
 
2347
                   'bzrlib.tests.test_status',
 
2348
                   'bzrlib.tests.test_store',
 
2349
                   'bzrlib.tests.test_strace',
 
2350
                   'bzrlib.tests.test_subsume',
 
2351
                   'bzrlib.tests.test_symbol_versioning',
 
2352
                   'bzrlib.tests.test_tag',
 
2353
                   'bzrlib.tests.test_testament',
 
2354
                   'bzrlib.tests.test_textfile',
 
2355
                   'bzrlib.tests.test_textmerge',
 
2356
                   'bzrlib.tests.test_timestamp',
 
2357
                   'bzrlib.tests.test_trace',
 
2358
                   'bzrlib.tests.test_transactions',
 
2359
                   'bzrlib.tests.test_transform',
 
2360
                   'bzrlib.tests.test_transport',
 
2361
                   'bzrlib.tests.test_tree',
 
2362
                   'bzrlib.tests.test_treebuilder',
 
2363
                   'bzrlib.tests.test_tsort',
 
2364
                   'bzrlib.tests.test_tuned_gzip',
 
2365
                   'bzrlib.tests.test_ui',
 
2366
                   'bzrlib.tests.test_upgrade',
 
2367
                   'bzrlib.tests.test_urlutils',
 
2368
                   'bzrlib.tests.test_versionedfile',
 
2369
                   'bzrlib.tests.test_version',
 
2370
                   'bzrlib.tests.test_version_info',
 
2371
                   'bzrlib.tests.test_weave',
 
2372
                   'bzrlib.tests.test_whitebox',
 
2373
                   'bzrlib.tests.test_workingtree',
 
2374
                   'bzrlib.tests.test_workingtree_4',
 
2375
                   'bzrlib.tests.test_wsgi',
 
2376
                   'bzrlib.tests.test_xml',
 
2377
                   ]
 
2378
    test_transport_implementations = [
 
2379
        'bzrlib.tests.test_transport_implementations',
 
2380
        'bzrlib.tests.test_read_bundle',
 
2381
        ]
 
2382
    suite = TestUtil.TestSuite()
 
2383
    loader = TestUtil.TestLoader()
 
2384
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2385
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
2386
    adapter = TransportTestProviderAdapter()
 
2387
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2388
    for package in packages_to_test():
 
2389
        suite.addTest(package.test_suite())
 
2390
    for m in MODULES_TO_TEST:
 
2391
        suite.addTest(loader.loadTestsFromModule(m))
 
2392
    for m in MODULES_TO_DOCTEST:
 
2393
        try:
 
2394
            suite.addTest(doctest.DocTestSuite(m))
 
2395
        except ValueError, e:
 
2396
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2397
            raise
 
2398
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
2399
        if getattr(plugin, 'test_suite', None) is not None:
 
2400
            default_encoding = sys.getdefaultencoding()
 
2401
            try:
 
2402
                plugin_suite = plugin.test_suite()
 
2403
            except ImportError, e:
 
2404
                bzrlib.trace.warning(
 
2405
                    'Unable to test plugin "%s": %s', name, e)
 
2406
            else:
 
2407
                suite.addTest(plugin_suite)
 
2408
            if default_encoding != sys.getdefaultencoding():
 
2409
                bzrlib.trace.warning(
 
2410
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2411
                    sys.getdefaultencoding())
 
2412
                reload(sys)
 
2413
                sys.setdefaultencoding(default_encoding)
 
2414
    return suite
 
2415
 
 
2416
 
 
2417
def adapt_modules(mods_list, adapter, loader, suite):
 
2418
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2419
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2420
        suite.addTests(adapter.adapt(test))
 
2421
 
 
2422
 
 
2423
def _rmtree_temp_dir(dirname):
 
2424
    # If LANG=C we probably have created some bogus paths
 
2425
    # which rmtree(unicode) will fail to delete
 
2426
    # so make sure we are using rmtree(str) to delete everything
 
2427
    # except on win32, where rmtree(str) will fail
 
2428
    # since it doesn't have the property of byte-stream paths
 
2429
    # (they are either ascii or mbcs)
 
2430
    if sys.platform == 'win32':
 
2431
        # make sure we are using the unicode win32 api
 
2432
        dirname = unicode(dirname)
 
2433
    else:
 
2434
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2435
    try:
 
2436
        osutils.rmtree(dirname)
 
2437
    except OSError, e:
 
2438
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2439
            print >>sys.stderr, ('Permission denied: '
 
2440
                                 'unable to remove testing dir '
 
2441
                                 '%s' % os.path.basename(dirname))
 
2442
        else:
 
2443
            raise
 
2444
 
 
2445
 
 
2446
class Feature(object):
 
2447
    """An operating system Feature."""
 
2448
 
 
2449
    def __init__(self):
 
2450
        self._available = None
 
2451
 
 
2452
    def available(self):
 
2453
        """Is the feature available?
 
2454
 
 
2455
        :return: True if the feature is available.
 
2456
        """
 
2457
        if self._available is None:
 
2458
            self._available = self._probe()
 
2459
        return self._available
 
2460
 
 
2461
    def _probe(self):
 
2462
        """Implement this method in concrete features.
 
2463
 
 
2464
        :return: True if the feature is available.
 
2465
        """
 
2466
        raise NotImplementedError
 
2467
 
 
2468
    def __str__(self):
 
2469
        if getattr(self, 'feature_name', None):
 
2470
            return self.feature_name()
 
2471
        return self.__class__.__name__
 
2472
 
 
2473
 
 
2474
class TestScenarioApplier(object):
 
2475
    """A tool to apply scenarios to tests."""
 
2476
 
 
2477
    def adapt(self, test):
 
2478
        """Return a TestSuite containing a copy of test for each scenario."""
 
2479
        result = unittest.TestSuite()
 
2480
        for scenario in self.scenarios:
 
2481
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
2482
        return result
 
2483
 
 
2484
    def adapt_test_to_scenario(self, test, scenario):
 
2485
        """Copy test and apply scenario to it.
 
2486
 
 
2487
        :param test: A test to adapt.
 
2488
        :param scenario: A tuple describing the scenarion.
 
2489
            The first element of the tuple is the new test id.
 
2490
            The second element is a dict containing attributes to set on the
 
2491
            test.
 
2492
        :return: The adapted test.
 
2493
        """
 
2494
        from copy import deepcopy
 
2495
        new_test = deepcopy(test)
 
2496
        for name, value in scenario[1].items():
 
2497
            setattr(new_test, name, value)
 
2498
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
2499
        new_test.id = lambda: new_id
 
2500
        return new_test