/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Ian Clatworthy
  • Date: 2009-02-26 06:15:24 UTC
  • mto: (4157.1.1 ianc-integration)
  • mto: This revision was merged to the branch mainline in revision 4158.
  • Revision ID: ian.clatworthy@canonical.com-20090226061524-kpy3n8na3mk4ubuy
help xxx is full help; xxx -h is concise help

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
 
43
import sys
 
44
import tempfile
 
45
import threading
 
46
import time
 
47
import unittest
 
48
import warnings
 
49
 
 
50
 
 
51
from bzrlib import (
 
52
    branchbuilder,
 
53
    bzrdir,
 
54
    debug,
 
55
    errors,
 
56
    memorytree,
 
57
    osutils,
 
58
    progress,
 
59
    ui,
 
60
    urlutils,
 
61
    registry,
 
62
    workingtree,
 
63
    )
 
64
import bzrlib.branch
 
65
import bzrlib.commands
 
66
import bzrlib.timestamp
 
67
import bzrlib.export
 
68
import bzrlib.inventory
 
69
import bzrlib.iterablefile
 
70
import bzrlib.lockdir
 
71
try:
 
72
    import bzrlib.lsprof
 
73
except ImportError:
 
74
    # lsprof not available
 
75
    pass
 
76
from bzrlib.merge import merge_inner
 
77
import bzrlib.merge3
 
78
import bzrlib.plugin
 
79
from bzrlib.smart import client, server
 
80
import bzrlib.store
 
81
from bzrlib import symbol_versioning
 
82
from bzrlib.symbol_versioning import (
 
83
    DEPRECATED_PARAMETER,
 
84
    deprecated_function,
 
85
    deprecated_method,
 
86
    deprecated_passed,
 
87
    )
 
88
import bzrlib.trace
 
89
from bzrlib.transport import get_transport
 
90
import bzrlib.transport
 
91
from bzrlib.transport.local import LocalURLServer
 
92
from bzrlib.transport.memory import MemoryServer
 
93
from bzrlib.transport.readonly import ReadonlyServer
 
94
from bzrlib.trace import mutter, note
 
95
from bzrlib.tests import TestUtil
 
96
from bzrlib.tests.http_server import HttpServer
 
97
from bzrlib.tests.TestUtil import (
 
98
                          TestSuite,
 
99
                          TestLoader,
 
100
                          )
 
101
from bzrlib.tests.treeshape import build_tree_contents
 
102
import bzrlib.version_info_formats.format_custom
 
103
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
104
 
 
105
# Mark this python module as being part of the implementation
 
106
# of unittest: this gives us better tracebacks where the last
 
107
# shown frame is the test code, not our assertXYZ.
 
108
__unittest = 1
 
109
 
 
110
default_transport = LocalURLServer
 
111
 
 
112
 
 
113
class ExtendedTestResult(unittest._TextTestResult):
 
114
    """Accepts, reports and accumulates the results of running tests.
 
115
 
 
116
    Compared to the unittest version this class adds support for
 
117
    profiling, benchmarking, stopping as soon as a test fails,  and
 
118
    skipping tests.  There are further-specialized subclasses for
 
119
    different types of display.
 
120
 
 
121
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
122
    addFailure or addError classes.  These in turn may redirect to a more
 
123
    specific case for the special test results supported by our extended
 
124
    tests.
 
125
 
 
126
    Note that just one of these objects is fed the results from many tests.
 
127
    """
 
128
 
 
129
    stop_early = False
 
130
 
 
131
    def __init__(self, stream, descriptions, verbosity,
 
132
                 bench_history=None,
 
133
                 num_tests=None,
 
134
                 ):
 
135
        """Construct new TestResult.
 
136
 
 
137
        :param bench_history: Optionally, a writable file object to accumulate
 
138
            benchmark results.
 
139
        """
 
140
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
141
        if bench_history is not None:
 
142
            from bzrlib.version import _get_bzr_source_tree
 
143
            src_tree = _get_bzr_source_tree()
 
144
            if src_tree:
 
145
                try:
 
146
                    revision_id = src_tree.get_parent_ids()[0]
 
147
                except IndexError:
 
148
                    # XXX: if this is a brand new tree, do the same as if there
 
149
                    # is no branch.
 
150
                    revision_id = ''
 
151
            else:
 
152
                # XXX: If there's no branch, what should we do?
 
153
                revision_id = ''
 
154
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
155
        self._bench_history = bench_history
 
156
        self.ui = ui.ui_factory
 
157
        self.num_tests = num_tests
 
158
        self.error_count = 0
 
159
        self.failure_count = 0
 
160
        self.known_failure_count = 0
 
161
        self.skip_count = 0
 
162
        self.not_applicable_count = 0
 
163
        self.unsupported = {}
 
164
        self.count = 0
 
165
        self._overall_start_time = time.time()
 
166
 
 
167
    def _extractBenchmarkTime(self, testCase):
 
168
        """Add a benchmark time for the current test case."""
 
169
        return getattr(testCase, "_benchtime", None)
 
170
 
 
171
    def _elapsedTestTimeString(self):
 
172
        """Return a time string for the overall time the current test has taken."""
 
173
        return self._formatTime(time.time() - self._start_time)
 
174
 
 
175
    def _testTimeString(self, testCase):
 
176
        benchmark_time = self._extractBenchmarkTime(testCase)
 
177
        if benchmark_time is not None:
 
178
            return "%s/%s" % (
 
179
                self._formatTime(benchmark_time),
 
180
                self._elapsedTestTimeString())
 
181
        else:
 
182
            return "           %s" % self._elapsedTestTimeString()
 
183
 
 
184
    def _formatTime(self, seconds):
 
185
        """Format seconds as milliseconds with leading spaces."""
 
186
        # some benchmarks can take thousands of seconds to run, so we need 8
 
187
        # places
 
188
        return "%8dms" % (1000 * seconds)
 
189
 
 
190
    def _shortened_test_description(self, test):
 
191
        what = test.id()
 
192
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
193
        return what
 
194
 
 
195
    def startTest(self, test):
 
196
        unittest.TestResult.startTest(self, test)
 
197
        self.report_test_start(test)
 
198
        test.number = self.count
 
199
        self._recordTestStartTime()
 
200
 
 
201
    def _recordTestStartTime(self):
 
202
        """Record that a test has started."""
 
203
        self._start_time = time.time()
 
204
 
 
205
    def _cleanupLogFile(self, test):
 
206
        # We can only do this if we have one of our TestCases, not if
 
207
        # we have a doctest.
 
208
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
209
        if setKeepLogfile is not None:
 
210
            setKeepLogfile()
 
211
 
 
212
    def addError(self, test, err):
 
213
        """Tell result that test finished with an error.
 
214
 
 
215
        Called from the TestCase run() method when the test
 
216
        fails with an unexpected error.
 
217
        """
 
218
        self._testConcluded(test)
 
219
        if isinstance(err[1], TestSkipped):
 
220
            return self._addSkipped(test, err)
 
221
        elif isinstance(err[1], UnavailableFeature):
 
222
            return self.addNotSupported(test, err[1].args[0])
 
223
        else:
 
224
            unittest.TestResult.addError(self, test, err)
 
225
            self.error_count += 1
 
226
            self.report_error(test, err)
 
227
            if self.stop_early:
 
228
                self.stop()
 
229
            self._cleanupLogFile(test)
 
230
 
 
231
    def addFailure(self, test, err):
 
232
        """Tell result that test failed.
 
233
 
 
234
        Called from the TestCase run() method when the test
 
235
        fails because e.g. an assert() method failed.
 
236
        """
 
237
        self._testConcluded(test)
 
238
        if isinstance(err[1], KnownFailure):
 
239
            return self._addKnownFailure(test, err)
 
240
        else:
 
241
            unittest.TestResult.addFailure(self, test, err)
 
242
            self.failure_count += 1
 
243
            self.report_failure(test, err)
 
244
            if self.stop_early:
 
245
                self.stop()
 
246
            self._cleanupLogFile(test)
 
247
 
 
248
    def addSuccess(self, test):
 
249
        """Tell result that test completed successfully.
 
250
 
 
251
        Called from the TestCase run()
 
252
        """
 
253
        self._testConcluded(test)
 
254
        if self._bench_history is not None:
 
255
            benchmark_time = self._extractBenchmarkTime(test)
 
256
            if benchmark_time is not None:
 
257
                self._bench_history.write("%s %s\n" % (
 
258
                    self._formatTime(benchmark_time),
 
259
                    test.id()))
 
260
        self.report_success(test)
 
261
        self._cleanupLogFile(test)
 
262
        unittest.TestResult.addSuccess(self, test)
 
263
        test._log_contents = ''
 
264
 
 
265
    def _testConcluded(self, test):
 
266
        """Common code when a test has finished.
 
267
 
 
268
        Called regardless of whether it succeded, failed, etc.
 
269
        """
 
270
        pass
 
271
 
 
272
    def _addKnownFailure(self, test, err):
 
273
        self.known_failure_count += 1
 
274
        self.report_known_failure(test, err)
 
275
 
 
276
    def addNotSupported(self, test, feature):
 
277
        """The test will not be run because of a missing feature.
 
278
        """
 
279
        # this can be called in two different ways: it may be that the
 
280
        # test started running, and then raised (through addError)
 
281
        # UnavailableFeature.  Alternatively this method can be called
 
282
        # while probing for features before running the tests; in that
 
283
        # case we will see startTest and stopTest, but the test will never
 
284
        # actually run.
 
285
        self.unsupported.setdefault(str(feature), 0)
 
286
        self.unsupported[str(feature)] += 1
 
287
        self.report_unsupported(test, feature)
 
288
 
 
289
    def _addSkipped(self, test, skip_excinfo):
 
290
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
291
            self.not_applicable_count += 1
 
292
            self.report_not_applicable(test, skip_excinfo)
 
293
        else:
 
294
            self.skip_count += 1
 
295
            self.report_skip(test, skip_excinfo)
 
296
        try:
 
297
            test.tearDown()
 
298
        except KeyboardInterrupt:
 
299
            raise
 
300
        except:
 
301
            self.addError(test, test._exc_info())
 
302
        else:
 
303
            # seems best to treat this as success from point-of-view of unittest
 
304
            # -- it actually does nothing so it barely matters :)
 
305
            unittest.TestResult.addSuccess(self, test)
 
306
            test._log_contents = ''
 
307
 
 
308
    def printErrorList(self, flavour, errors):
 
309
        for test, err in errors:
 
310
            self.stream.writeln(self.separator1)
 
311
            self.stream.write("%s: " % flavour)
 
312
            self.stream.writeln(self.getDescription(test))
 
313
            if getattr(test, '_get_log', None) is not None:
 
314
                self.stream.write('\n')
 
315
                self.stream.write(
 
316
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
317
                self.stream.write('\n')
 
318
                self.stream.write(test._get_log())
 
319
                self.stream.write('\n')
 
320
                self.stream.write(
 
321
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
322
                self.stream.write('\n')
 
323
            self.stream.writeln(self.separator2)
 
324
            self.stream.writeln("%s" % err)
 
325
 
 
326
    def finished(self):
 
327
        pass
 
328
 
 
329
    def report_cleaning_up(self):
 
330
        pass
 
331
 
 
332
    def report_success(self, test):
 
333
        pass
 
334
 
 
335
    def wasStrictlySuccessful(self):
 
336
        if self.unsupported or self.known_failure_count:
 
337
            return False
 
338
        return self.wasSuccessful()
 
339
 
 
340
 
 
341
class TextTestResult(ExtendedTestResult):
 
342
    """Displays progress and results of tests in text form"""
 
343
 
 
344
    def __init__(self, stream, descriptions, verbosity,
 
345
                 bench_history=None,
 
346
                 num_tests=None,
 
347
                 pb=None,
 
348
                 ):
 
349
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
350
            bench_history, num_tests)
 
351
        if pb is None:
 
352
            self.pb = self.ui.nested_progress_bar()
 
353
            self._supplied_pb = False
 
354
        else:
 
355
            self.pb = pb
 
356
            self._supplied_pb = True
 
357
        self.pb.show_pct = False
 
358
        self.pb.show_spinner = False
 
359
        self.pb.show_eta = False,
 
360
        self.pb.show_count = False
 
361
        self.pb.show_bar = False
 
362
 
 
363
    def report_starting(self):
 
364
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
365
 
 
366
    def _progress_prefix_text(self):
 
367
        # the longer this text, the less space we have to show the test
 
368
        # name...
 
369
        a = '[%d' % self.count              # total that have been run
 
370
        # tests skipped as known not to be relevant are not important enough
 
371
        # to show here
 
372
        ## if self.skip_count:
 
373
        ##     a += ', %d skip' % self.skip_count
 
374
        ## if self.known_failure_count:
 
375
        ##     a += '+%dX' % self.known_failure_count
 
376
        if self.num_tests is not None:
 
377
            a +='/%d' % self.num_tests
 
378
        a += ' in '
 
379
        runtime = time.time() - self._overall_start_time
 
380
        if runtime >= 60:
 
381
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
382
        else:
 
383
            a += '%ds' % runtime
 
384
        if self.error_count:
 
385
            a += ', %d err' % self.error_count
 
386
        if self.failure_count:
 
387
            a += ', %d fail' % self.failure_count
 
388
        if self.unsupported:
 
389
            a += ', %d missing' % len(self.unsupported)
 
390
        a += ']'
 
391
        return a
 
392
 
 
393
    def report_test_start(self, test):
 
394
        self.count += 1
 
395
        self.pb.update(
 
396
                self._progress_prefix_text()
 
397
                + ' '
 
398
                + self._shortened_test_description(test))
 
399
 
 
400
    def _test_description(self, test):
 
401
        return self._shortened_test_description(test)
 
402
 
 
403
    def report_error(self, test, err):
 
404
        self.pb.note('ERROR: %s\n    %s\n',
 
405
            self._test_description(test),
 
406
            err[1],
 
407
            )
 
408
 
 
409
    def report_failure(self, test, err):
 
410
        self.pb.note('FAIL: %s\n    %s\n',
 
411
            self._test_description(test),
 
412
            err[1],
 
413
            )
 
414
 
 
415
    def report_known_failure(self, test, err):
 
416
        self.pb.note('XFAIL: %s\n%s\n',
 
417
            self._test_description(test), err[1])
 
418
 
 
419
    def report_skip(self, test, skip_excinfo):
 
420
        pass
 
421
 
 
422
    def report_not_applicable(self, test, skip_excinfo):
 
423
        pass
 
424
 
 
425
    def report_unsupported(self, test, feature):
 
426
        """test cannot be run because feature is missing."""
 
427
 
 
428
    def report_cleaning_up(self):
 
429
        self.pb.update('cleaning up...')
 
430
 
 
431
    def finished(self):
 
432
        if not self._supplied_pb:
 
433
            self.pb.finished()
 
434
 
 
435
 
 
436
class VerboseTestResult(ExtendedTestResult):
 
437
    """Produce long output, with one line per test run plus times"""
 
438
 
 
439
    def _ellipsize_to_right(self, a_string, final_width):
 
440
        """Truncate and pad a string, keeping the right hand side"""
 
441
        if len(a_string) > final_width:
 
442
            result = '...' + a_string[3-final_width:]
 
443
        else:
 
444
            result = a_string
 
445
        return result.ljust(final_width)
 
446
 
 
447
    def report_starting(self):
 
448
        self.stream.write('running %d tests...\n' % self.num_tests)
 
449
 
 
450
    def report_test_start(self, test):
 
451
        self.count += 1
 
452
        name = self._shortened_test_description(test)
 
453
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
454
        # numbers, plus a trailing blank
 
455
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
456
        self.stream.write(self._ellipsize_to_right(name,
 
457
                          osutils.terminal_width()-30))
 
458
        self.stream.flush()
 
459
 
 
460
    def _error_summary(self, err):
 
461
        indent = ' ' * 4
 
462
        return '%s%s' % (indent, err[1])
 
463
 
 
464
    def report_error(self, test, err):
 
465
        self.stream.writeln('ERROR %s\n%s'
 
466
                % (self._testTimeString(test),
 
467
                   self._error_summary(err)))
 
468
 
 
469
    def report_failure(self, test, err):
 
470
        self.stream.writeln(' FAIL %s\n%s'
 
471
                % (self._testTimeString(test),
 
472
                   self._error_summary(err)))
 
473
 
 
474
    def report_known_failure(self, test, err):
 
475
        self.stream.writeln('XFAIL %s\n%s'
 
476
                % (self._testTimeString(test),
 
477
                   self._error_summary(err)))
 
478
 
 
479
    def report_success(self, test):
 
480
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
481
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
482
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
483
            stats.pprint(file=self.stream)
 
484
        # flush the stream so that we get smooth output. This verbose mode is
 
485
        # used to show the output in PQM.
 
486
        self.stream.flush()
 
487
 
 
488
    def report_skip(self, test, skip_excinfo):
 
489
        self.stream.writeln(' SKIP %s\n%s'
 
490
                % (self._testTimeString(test),
 
491
                   self._error_summary(skip_excinfo)))
 
492
 
 
493
    def report_not_applicable(self, test, skip_excinfo):
 
494
        self.stream.writeln('  N/A %s\n%s'
 
495
                % (self._testTimeString(test),
 
496
                   self._error_summary(skip_excinfo)))
 
497
 
 
498
    def report_unsupported(self, test, feature):
 
499
        """test cannot be run because feature is missing."""
 
500
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
501
                %(self._testTimeString(test), feature))
 
502
 
 
503
 
 
504
class TextTestRunner(object):
 
505
    stop_on_failure = False
 
506
 
 
507
    def __init__(self,
 
508
                 stream=sys.stderr,
 
509
                 descriptions=0,
 
510
                 verbosity=1,
 
511
                 bench_history=None,
 
512
                 list_only=False
 
513
                 ):
 
514
        self.stream = unittest._WritelnDecorator(stream)
 
515
        self.descriptions = descriptions
 
516
        self.verbosity = verbosity
 
517
        self._bench_history = bench_history
 
518
        self.list_only = list_only
 
519
 
 
520
    def run(self, test):
 
521
        "Run the given test case or test suite."
 
522
        startTime = time.time()
 
523
        if self.verbosity == 1:
 
524
            result_class = TextTestResult
 
525
        elif self.verbosity >= 2:
 
526
            result_class = VerboseTestResult
 
527
        result = result_class(self.stream,
 
528
                              self.descriptions,
 
529
                              self.verbosity,
 
530
                              bench_history=self._bench_history,
 
531
                              num_tests=test.countTestCases(),
 
532
                              )
 
533
        result.stop_early = self.stop_on_failure
 
534
        result.report_starting()
 
535
        if self.list_only:
 
536
            if self.verbosity >= 2:
 
537
                self.stream.writeln("Listing tests only ...\n")
 
538
            run = 0
 
539
            for t in iter_suite_tests(test):
 
540
                self.stream.writeln("%s" % (t.id()))
 
541
                run += 1
 
542
            actionTaken = "Listed"
 
543
        else:
 
544
            test.run(result)
 
545
            run = result.testsRun
 
546
            actionTaken = "Ran"
 
547
        stopTime = time.time()
 
548
        timeTaken = stopTime - startTime
 
549
        result.printErrors()
 
550
        self.stream.writeln(result.separator2)
 
551
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
552
                            run, run != 1 and "s" or "", timeTaken))
 
553
        self.stream.writeln()
 
554
        if not result.wasSuccessful():
 
555
            self.stream.write("FAILED (")
 
556
            failed, errored = map(len, (result.failures, result.errors))
 
557
            if failed:
 
558
                self.stream.write("failures=%d" % failed)
 
559
            if errored:
 
560
                if failed: self.stream.write(", ")
 
561
                self.stream.write("errors=%d" % errored)
 
562
            if result.known_failure_count:
 
563
                if failed or errored: self.stream.write(", ")
 
564
                self.stream.write("known_failure_count=%d" %
 
565
                    result.known_failure_count)
 
566
            self.stream.writeln(")")
 
567
        else:
 
568
            if result.known_failure_count:
 
569
                self.stream.writeln("OK (known_failures=%d)" %
 
570
                    result.known_failure_count)
 
571
            else:
 
572
                self.stream.writeln("OK")
 
573
        if result.skip_count > 0:
 
574
            skipped = result.skip_count
 
575
            self.stream.writeln('%d test%s skipped' %
 
576
                                (skipped, skipped != 1 and "s" or ""))
 
577
        if result.unsupported:
 
578
            for feature, count in sorted(result.unsupported.items()):
 
579
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
580
                    (feature, count))
 
581
        result.finished()
 
582
        return result
 
583
 
 
584
 
 
585
def iter_suite_tests(suite):
 
586
    """Return all tests in a suite, recursing through nested suites"""
 
587
    for item in suite._tests:
 
588
        if isinstance(item, unittest.TestCase):
 
589
            yield item
 
590
        elif isinstance(item, unittest.TestSuite):
 
591
            for r in iter_suite_tests(item):
 
592
                yield r
 
593
        else:
 
594
            raise Exception('unknown object %r inside test suite %r'
 
595
                            % (item, suite))
 
596
 
 
597
 
 
598
class TestSkipped(Exception):
 
599
    """Indicates that a test was intentionally skipped, rather than failing."""
 
600
 
 
601
 
 
602
class TestNotApplicable(TestSkipped):
 
603
    """A test is not applicable to the situation where it was run.
 
604
 
 
605
    This is only normally raised by parameterized tests, if they find that
 
606
    the instance they're constructed upon does not support one aspect
 
607
    of its interface.
 
608
    """
 
609
 
 
610
 
 
611
class KnownFailure(AssertionError):
 
612
    """Indicates that a test failed in a precisely expected manner.
 
613
 
 
614
    Such failures dont block the whole test suite from passing because they are
 
615
    indicators of partially completed code or of future work. We have an
 
616
    explicit error for them so that we can ensure that they are always visible:
 
617
    KnownFailures are always shown in the output of bzr selftest.
 
618
    """
 
619
 
 
620
 
 
621
class UnavailableFeature(Exception):
 
622
    """A feature required for this test was not available.
 
623
 
 
624
    The feature should be used to construct the exception.
 
625
    """
 
626
 
 
627
 
 
628
class CommandFailed(Exception):
 
629
    pass
 
630
 
 
631
 
 
632
class StringIOWrapper(object):
 
633
    """A wrapper around cStringIO which just adds an encoding attribute.
 
634
 
 
635
    Internally we can check sys.stdout to see what the output encoding
 
636
    should be. However, cStringIO has no encoding attribute that we can
 
637
    set. So we wrap it instead.
 
638
    """
 
639
    encoding='ascii'
 
640
    _cstring = None
 
641
 
 
642
    def __init__(self, s=None):
 
643
        if s is not None:
 
644
            self.__dict__['_cstring'] = StringIO(s)
 
645
        else:
 
646
            self.__dict__['_cstring'] = StringIO()
 
647
 
 
648
    def __getattr__(self, name, getattr=getattr):
 
649
        return getattr(self.__dict__['_cstring'], name)
 
650
 
 
651
    def __setattr__(self, name, val):
 
652
        if name == 'encoding':
 
653
            self.__dict__['encoding'] = val
 
654
        else:
 
655
            return setattr(self._cstring, name, val)
 
656
 
 
657
 
 
658
class TestUIFactory(ui.CLIUIFactory):
 
659
    """A UI Factory for testing.
 
660
 
 
661
    Hide the progress bar but emit note()s.
 
662
    Redirect stdin.
 
663
    Allows get_password to be tested without real tty attached.
 
664
    """
 
665
 
 
666
    def __init__(self,
 
667
                 stdout=None,
 
668
                 stderr=None,
 
669
                 stdin=None):
 
670
        super(TestUIFactory, self).__init__()
 
671
        if stdin is not None:
 
672
            # We use a StringIOWrapper to be able to test various
 
673
            # encodings, but the user is still responsible to
 
674
            # encode the string and to set the encoding attribute
 
675
            # of StringIOWrapper.
 
676
            self.stdin = StringIOWrapper(stdin)
 
677
        if stdout is None:
 
678
            self.stdout = sys.stdout
 
679
        else:
 
680
            self.stdout = stdout
 
681
        if stderr is None:
 
682
            self.stderr = sys.stderr
 
683
        else:
 
684
            self.stderr = stderr
 
685
 
 
686
    def clear(self):
 
687
        """See progress.ProgressBar.clear()."""
 
688
 
 
689
    def clear_term(self):
 
690
        """See progress.ProgressBar.clear_term()."""
 
691
 
 
692
    def clear_term(self):
 
693
        """See progress.ProgressBar.clear_term()."""
 
694
 
 
695
    def finished(self):
 
696
        """See progress.ProgressBar.finished()."""
 
697
 
 
698
    def note(self, fmt_string, *args, **kwargs):
 
699
        """See progress.ProgressBar.note()."""
 
700
        self.stdout.write((fmt_string + "\n") % args)
 
701
 
 
702
    def progress_bar(self):
 
703
        return self
 
704
 
 
705
    def nested_progress_bar(self):
 
706
        return self
 
707
 
 
708
    def update(self, message, count=None, total=None):
 
709
        """See progress.ProgressBar.update()."""
 
710
 
 
711
    def get_non_echoed_password(self, prompt):
 
712
        """Get password from stdin without trying to handle the echo mode"""
 
713
        if prompt:
 
714
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
715
        password = self.stdin.readline()
 
716
        if not password:
 
717
            raise EOFError
 
718
        if password[-1] == '\n':
 
719
            password = password[:-1]
 
720
        return password
 
721
 
 
722
 
 
723
def _report_leaked_threads():
 
724
    bzrlib.trace.warning('%s is leaking threads among %d leaking tests',
 
725
                         TestCase._first_thread_leaker_id,
 
726
                         TestCase._leaking_threads_tests)
 
727
 
 
728
 
 
729
class TestCase(unittest.TestCase):
 
730
    """Base class for bzr unit tests.
 
731
 
 
732
    Tests that need access to disk resources should subclass
 
733
    TestCaseInTempDir not TestCase.
 
734
 
 
735
    Error and debug log messages are redirected from their usual
 
736
    location into a temporary file, the contents of which can be
 
737
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
738
    so that it can also capture file IO.  When the test completes this file
 
739
    is read into memory and removed from disk.
 
740
 
 
741
    There are also convenience functions to invoke bzr's command-line
 
742
    routine, and to build and check bzr trees.
 
743
 
 
744
    In addition to the usual method of overriding tearDown(), this class also
 
745
    allows subclasses to register functions into the _cleanups list, which is
 
746
    run in order as the object is torn down.  It's less likely this will be
 
747
    accidentally overlooked.
 
748
    """
 
749
 
 
750
    _active_threads = None
 
751
    _leaking_threads_tests = 0
 
752
    _first_thread_leaker_id = None
 
753
    _log_file_name = None
 
754
    _log_contents = ''
 
755
    _keep_log_file = False
 
756
    # record lsprof data when performing benchmark calls.
 
757
    _gather_lsprof_in_benchmarks = False
 
758
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
759
                     '_log_contents', '_log_file_name', '_benchtime',
 
760
                     '_TestCase__testMethodName')
 
761
 
 
762
    def __init__(self, methodName='testMethod'):
 
763
        super(TestCase, self).__init__(methodName)
 
764
        self._cleanups = []
 
765
 
 
766
    def setUp(self):
 
767
        unittest.TestCase.setUp(self)
 
768
        self._cleanEnvironment()
 
769
        self._silenceUI()
 
770
        self._startLogFile()
 
771
        self._benchcalls = []
 
772
        self._benchtime = None
 
773
        self._clear_hooks()
 
774
        self._clear_debug_flags()
 
775
        TestCase._active_threads = threading.activeCount()
 
776
        self.addCleanup(self._check_leaked_threads)
 
777
 
 
778
    def _check_leaked_threads(self):
 
779
        active = threading.activeCount()
 
780
        leaked_threads = active - TestCase._active_threads
 
781
        TestCase._active_threads = active
 
782
        if leaked_threads:
 
783
            TestCase._leaking_threads_tests += 1
 
784
            if TestCase._first_thread_leaker_id is None:
 
785
                TestCase._first_thread_leaker_id = self.id()
 
786
                # we're not specifically told when all tests are finished.
 
787
                # This will do. We use a function to avoid keeping a reference
 
788
                # to a TestCase object.
 
789
                atexit.register(_report_leaked_threads)
 
790
 
 
791
    def _clear_debug_flags(self):
 
792
        """Prevent externally set debug flags affecting tests.
 
793
 
 
794
        Tests that want to use debug flags can just set them in the
 
795
        debug_flags set during setup/teardown.
 
796
        """
 
797
        self._preserved_debug_flags = set(debug.debug_flags)
 
798
        if 'allow_debug' not in selftest_debug_flags:
 
799
            debug.debug_flags.clear()
 
800
        self.addCleanup(self._restore_debug_flags)
 
801
 
 
802
    def _clear_hooks(self):
 
803
        # prevent hooks affecting tests
 
804
        import bzrlib.branch
 
805
        import bzrlib.smart.client
 
806
        import bzrlib.smart.server
 
807
        self._preserved_hooks = {
 
808
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
809
            bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
 
810
            bzrlib.smart.client._SmartClient: bzrlib.smart.client._SmartClient.hooks,
 
811
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
812
            bzrlib.commands.Command: bzrlib.commands.Command.hooks,
 
813
            }
 
814
        self.addCleanup(self._restoreHooks)
 
815
        # reset all hooks to an empty instance of the appropriate type
 
816
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
817
        bzrlib.smart.client._SmartClient.hooks = bzrlib.smart.client.SmartClientHooks()
 
818
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
819
        bzrlib.commands.Command.hooks = bzrlib.commands.CommandHooks()
 
820
 
 
821
    def _silenceUI(self):
 
822
        """Turn off UI for duration of test"""
 
823
        # by default the UI is off; tests can turn it on if they want it.
 
824
        saved = ui.ui_factory
 
825
        def _restore():
 
826
            ui.ui_factory = saved
 
827
        ui.ui_factory = ui.SilentUIFactory()
 
828
        self.addCleanup(_restore)
 
829
 
 
830
    def _ndiff_strings(self, a, b):
 
831
        """Return ndiff between two strings containing lines.
 
832
 
 
833
        A trailing newline is added if missing to make the strings
 
834
        print properly."""
 
835
        if b and b[-1] != '\n':
 
836
            b += '\n'
 
837
        if a and a[-1] != '\n':
 
838
            a += '\n'
 
839
        difflines = difflib.ndiff(a.splitlines(True),
 
840
                                  b.splitlines(True),
 
841
                                  linejunk=lambda x: False,
 
842
                                  charjunk=lambda x: False)
 
843
        return ''.join(difflines)
 
844
 
 
845
    def assertEqual(self, a, b, message=''):
 
846
        try:
 
847
            if a == b:
 
848
                return
 
849
        except UnicodeError, e:
 
850
            # If we can't compare without getting a UnicodeError, then
 
851
            # obviously they are different
 
852
            mutter('UnicodeError: %s', e)
 
853
        if message:
 
854
            message += '\n'
 
855
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
856
            % (message,
 
857
               pformat(a), pformat(b)))
 
858
 
 
859
    assertEquals = assertEqual
 
860
 
 
861
    def assertEqualDiff(self, a, b, message=None):
 
862
        """Assert two texts are equal, if not raise an exception.
 
863
 
 
864
        This is intended for use with multi-line strings where it can
 
865
        be hard to find the differences by eye.
 
866
        """
 
867
        # TODO: perhaps override assertEquals to call this for strings?
 
868
        if a == b:
 
869
            return
 
870
        if message is None:
 
871
            message = "texts not equal:\n"
 
872
        if a == b + '\n':
 
873
            message = 'first string is missing a final newline.\n'
 
874
        if a + '\n' == b:
 
875
            message = 'second string is missing a final newline.\n'
 
876
        raise AssertionError(message +
 
877
                             self._ndiff_strings(a, b))
 
878
 
 
879
    def assertEqualMode(self, mode, mode_test):
 
880
        self.assertEqual(mode, mode_test,
 
881
                         'mode mismatch %o != %o' % (mode, mode_test))
 
882
 
 
883
    def assertEqualStat(self, expected, actual):
 
884
        """assert that expected and actual are the same stat result.
 
885
 
 
886
        :param expected: A stat result.
 
887
        :param actual: A stat result.
 
888
        :raises AssertionError: If the expected and actual stat values differ
 
889
            other than by atime.
 
890
        """
 
891
        self.assertEqual(expected.st_size, actual.st_size)
 
892
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
893
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
894
        self.assertEqual(expected.st_dev, actual.st_dev)
 
895
        self.assertEqual(expected.st_ino, actual.st_ino)
 
896
        self.assertEqual(expected.st_mode, actual.st_mode)
 
897
 
 
898
    def assertPositive(self, val):
 
899
        """Assert that val is greater than 0."""
 
900
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
901
 
 
902
    def assertNegative(self, val):
 
903
        """Assert that val is less than 0."""
 
904
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
905
 
 
906
    def assertStartsWith(self, s, prefix):
 
907
        if not s.startswith(prefix):
 
908
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
909
 
 
910
    def assertEndsWith(self, s, suffix):
 
911
        """Asserts that s ends with suffix."""
 
912
        if not s.endswith(suffix):
 
913
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
914
 
 
915
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
916
        """Assert that a contains something matching a regular expression."""
 
917
        if not re.search(needle_re, haystack, flags):
 
918
            if '\n' in haystack or len(haystack) > 60:
 
919
                # a long string, format it in a more readable way
 
920
                raise AssertionError(
 
921
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
922
                        % (needle_re, haystack))
 
923
            else:
 
924
                raise AssertionError('pattern "%s" not found in "%s"'
 
925
                        % (needle_re, haystack))
 
926
 
 
927
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
928
        """Assert that a does not match a regular expression"""
 
929
        if re.search(needle_re, haystack, flags):
 
930
            raise AssertionError('pattern "%s" found in "%s"'
 
931
                    % (needle_re, haystack))
 
932
 
 
933
    def assertSubset(self, sublist, superlist):
 
934
        """Assert that every entry in sublist is present in superlist."""
 
935
        missing = set(sublist) - set(superlist)
 
936
        if len(missing) > 0:
 
937
            raise AssertionError("value(s) %r not present in container %r" %
 
938
                                 (missing, superlist))
 
939
 
 
940
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
941
        """Fail unless excClass is raised when the iterator from func is used.
 
942
 
 
943
        Many functions can return generators this makes sure
 
944
        to wrap them in a list() call to make sure the whole generator
 
945
        is run, and that the proper exception is raised.
 
946
        """
 
947
        try:
 
948
            list(func(*args, **kwargs))
 
949
        except excClass, e:
 
950
            return e
 
951
        else:
 
952
            if getattr(excClass,'__name__', None) is not None:
 
953
                excName = excClass.__name__
 
954
            else:
 
955
                excName = str(excClass)
 
956
            raise self.failureException, "%s not raised" % excName
 
957
 
 
958
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
959
        """Assert that a callable raises a particular exception.
 
960
 
 
961
        :param excClass: As for the except statement, this may be either an
 
962
            exception class, or a tuple of classes.
 
963
        :param callableObj: A callable, will be passed ``*args`` and
 
964
            ``**kwargs``.
 
965
 
 
966
        Returns the exception so that you can examine it.
 
967
        """
 
968
        try:
 
969
            callableObj(*args, **kwargs)
 
970
        except excClass, e:
 
971
            return e
 
972
        else:
 
973
            if getattr(excClass,'__name__', None) is not None:
 
974
                excName = excClass.__name__
 
975
            else:
 
976
                # probably a tuple
 
977
                excName = str(excClass)
 
978
            raise self.failureException, "%s not raised" % excName
 
979
 
 
980
    def assertIs(self, left, right, message=None):
 
981
        if not (left is right):
 
982
            if message is not None:
 
983
                raise AssertionError(message)
 
984
            else:
 
985
                raise AssertionError("%r is not %r." % (left, right))
 
986
 
 
987
    def assertIsNot(self, left, right, message=None):
 
988
        if (left is right):
 
989
            if message is not None:
 
990
                raise AssertionError(message)
 
991
            else:
 
992
                raise AssertionError("%r is %r." % (left, right))
 
993
 
 
994
    def assertTransportMode(self, transport, path, mode):
 
995
        """Fail if a path does not have mode mode.
 
996
 
 
997
        If modes are not supported on this transport, the assertion is ignored.
 
998
        """
 
999
        if not transport._can_roundtrip_unix_modebits():
 
1000
            return
 
1001
        path_stat = transport.stat(path)
 
1002
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1003
        self.assertEqual(mode, actual_mode,
 
1004
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
1005
 
 
1006
    def assertIsSameRealPath(self, path1, path2):
 
1007
        """Fail if path1 and path2 points to different files"""
 
1008
        self.assertEqual(osutils.realpath(path1),
 
1009
                         osutils.realpath(path2),
 
1010
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1011
 
 
1012
    def assertIsInstance(self, obj, kls):
 
1013
        """Fail if obj is not an instance of kls"""
 
1014
        if not isinstance(obj, kls):
 
1015
            self.fail("%r is an instance of %s rather than %s" % (
 
1016
                obj, obj.__class__, kls))
 
1017
 
 
1018
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1019
        """Invoke a test, expecting it to fail for the given reason.
 
1020
 
 
1021
        This is for assertions that ought to succeed, but currently fail.
 
1022
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1023
        about the failure you're expecting.  If a new bug is introduced,
 
1024
        AssertionError should be raised, not KnownFailure.
 
1025
 
 
1026
        Frequently, expectFailure should be followed by an opposite assertion.
 
1027
        See example below.
 
1028
 
 
1029
        Intended to be used with a callable that raises AssertionError as the
 
1030
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1031
 
 
1032
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1033
        test succeeds.
 
1034
 
 
1035
        example usage::
 
1036
 
 
1037
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1038
                             dynamic_val)
 
1039
          self.assertEqual(42, dynamic_val)
 
1040
 
 
1041
          This means that a dynamic_val of 54 will cause the test to raise
 
1042
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1043
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1044
          than 54 or 42 will cause an AssertionError.
 
1045
        """
 
1046
        try:
 
1047
            assertion(*args, **kwargs)
 
1048
        except AssertionError:
 
1049
            raise KnownFailure(reason)
 
1050
        else:
 
1051
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1052
 
 
1053
    def assertFileEqual(self, content, path):
 
1054
        """Fail if path does not contain 'content'."""
 
1055
        self.failUnlessExists(path)
 
1056
        f = file(path, 'rb')
 
1057
        try:
 
1058
            s = f.read()
 
1059
        finally:
 
1060
            f.close()
 
1061
        self.assertEqualDiff(content, s)
 
1062
 
 
1063
    def failUnlessExists(self, path):
 
1064
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1065
        if not isinstance(path, basestring):
 
1066
            for p in path:
 
1067
                self.failUnlessExists(p)
 
1068
        else:
 
1069
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1070
 
 
1071
    def failIfExists(self, path):
 
1072
        """Fail if path or paths, which may be abs or relative, exist."""
 
1073
        if not isinstance(path, basestring):
 
1074
            for p in path:
 
1075
                self.failIfExists(p)
 
1076
        else:
 
1077
            self.failIf(osutils.lexists(path),path+" exists")
 
1078
 
 
1079
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1080
        """A helper for callDeprecated and applyDeprecated.
 
1081
 
 
1082
        :param a_callable: A callable to call.
 
1083
        :param args: The positional arguments for the callable
 
1084
        :param kwargs: The keyword arguments for the callable
 
1085
        :return: A tuple (warnings, result). result is the result of calling
 
1086
            a_callable(``*args``, ``**kwargs``).
 
1087
        """
 
1088
        local_warnings = []
 
1089
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1090
            # we've hooked into a deprecation specific callpath,
 
1091
            # only deprecations should getting sent via it.
 
1092
            self.assertEqual(cls, DeprecationWarning)
 
1093
            local_warnings.append(msg)
 
1094
        original_warning_method = symbol_versioning.warn
 
1095
        symbol_versioning.set_warning_method(capture_warnings)
 
1096
        try:
 
1097
            result = a_callable(*args, **kwargs)
 
1098
        finally:
 
1099
            symbol_versioning.set_warning_method(original_warning_method)
 
1100
        return (local_warnings, result)
 
1101
 
 
1102
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1103
        """Call a deprecated callable without warning the user.
 
1104
 
 
1105
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1106
        not other callers that go direct to the warning module.
 
1107
 
 
1108
        To test that a deprecated method raises an error, do something like
 
1109
        this::
 
1110
 
 
1111
            self.assertRaises(errors.ReservedId,
 
1112
                self.applyDeprecated,
 
1113
                deprecated_in((1, 5, 0)),
 
1114
                br.append_revision,
 
1115
                'current:')
 
1116
 
 
1117
        :param deprecation_format: The deprecation format that the callable
 
1118
            should have been deprecated with. This is the same type as the
 
1119
            parameter to deprecated_method/deprecated_function. If the
 
1120
            callable is not deprecated with this format, an assertion error
 
1121
            will be raised.
 
1122
        :param a_callable: A callable to call. This may be a bound method or
 
1123
            a regular function. It will be called with ``*args`` and
 
1124
            ``**kwargs``.
 
1125
        :param args: The positional arguments for the callable
 
1126
        :param kwargs: The keyword arguments for the callable
 
1127
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1128
        """
 
1129
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1130
            *args, **kwargs)
 
1131
        expected_first_warning = symbol_versioning.deprecation_string(
 
1132
            a_callable, deprecation_format)
 
1133
        if len(call_warnings) == 0:
 
1134
            self.fail("No deprecation warning generated by call to %s" %
 
1135
                a_callable)
 
1136
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1137
        return result
 
1138
 
 
1139
    def callCatchWarnings(self, fn, *args, **kw):
 
1140
        """Call a callable that raises python warnings.
 
1141
 
 
1142
        The caller's responsible for examining the returned warnings.
 
1143
 
 
1144
        If the callable raises an exception, the exception is not
 
1145
        caught and propagates up to the caller.  In that case, the list
 
1146
        of warnings is not available.
 
1147
 
 
1148
        :returns: ([warning_object, ...], fn_result)
 
1149
        """
 
1150
        # XXX: This is not perfect, because it completely overrides the
 
1151
        # warnings filters, and some code may depend on suppressing particular
 
1152
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1153
        # though.  -- Andrew, 20071062
 
1154
        wlist = []
 
1155
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1156
            # despite the name, 'message' is normally(?) a Warning subclass
 
1157
            # instance
 
1158
            wlist.append(message)
 
1159
        saved_showwarning = warnings.showwarning
 
1160
        saved_filters = warnings.filters
 
1161
        try:
 
1162
            warnings.showwarning = _catcher
 
1163
            warnings.filters = []
 
1164
            result = fn(*args, **kw)
 
1165
        finally:
 
1166
            warnings.showwarning = saved_showwarning
 
1167
            warnings.filters = saved_filters
 
1168
        return wlist, result
 
1169
 
 
1170
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1171
        """Assert that a callable is deprecated in a particular way.
 
1172
 
 
1173
        This is a very precise test for unusual requirements. The
 
1174
        applyDeprecated helper function is probably more suited for most tests
 
1175
        as it allows you to simply specify the deprecation format being used
 
1176
        and will ensure that that is issued for the function being called.
 
1177
 
 
1178
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1179
        not other callers that go direct to the warning module.  To catch
 
1180
        general warnings, use callCatchWarnings.
 
1181
 
 
1182
        :param expected: a list of the deprecation warnings expected, in order
 
1183
        :param callable: The callable to call
 
1184
        :param args: The positional arguments for the callable
 
1185
        :param kwargs: The keyword arguments for the callable
 
1186
        """
 
1187
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1188
            *args, **kwargs)
 
1189
        self.assertEqual(expected, call_warnings)
 
1190
        return result
 
1191
 
 
1192
    def _startLogFile(self):
 
1193
        """Send bzr and test log messages to a temporary file.
 
1194
 
 
1195
        The file is removed as the test is torn down.
 
1196
        """
 
1197
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1198
        self._log_file = os.fdopen(fileno, 'w+')
 
1199
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1200
        self._log_file_name = name
 
1201
        self.addCleanup(self._finishLogFile)
 
1202
 
 
1203
    def _finishLogFile(self):
 
1204
        """Finished with the log file.
 
1205
 
 
1206
        Close the file and delete it, unless setKeepLogfile was called.
 
1207
        """
 
1208
        if self._log_file is None:
 
1209
            return
 
1210
        bzrlib.trace.pop_log_file(self._log_memento)
 
1211
        self._log_file.close()
 
1212
        self._log_file = None
 
1213
        if not self._keep_log_file:
 
1214
            os.remove(self._log_file_name)
 
1215
            self._log_file_name = None
 
1216
 
 
1217
    def setKeepLogfile(self):
 
1218
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1219
        self._keep_log_file = True
 
1220
 
 
1221
    def addCleanup(self, callable, *args, **kwargs):
 
1222
        """Arrange to run a callable when this case is torn down.
 
1223
 
 
1224
        Callables are run in the reverse of the order they are registered,
 
1225
        ie last-in first-out.
 
1226
        """
 
1227
        self._cleanups.append((callable, args, kwargs))
 
1228
 
 
1229
    def _cleanEnvironment(self):
 
1230
        new_env = {
 
1231
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1232
            'HOME': os.getcwd(),
 
1233
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1234
            # tests do check our impls match APPDATA
 
1235
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1236
            'BZR_EMAIL': None,
 
1237
            'BZREMAIL': None, # may still be present in the environment
 
1238
            'EMAIL': None,
 
1239
            'BZR_PROGRESS_BAR': None,
 
1240
            'BZR_LOG': None,
 
1241
            'BZR_PLUGIN_PATH': None,
 
1242
            # SSH Agent
 
1243
            'SSH_AUTH_SOCK': None,
 
1244
            # Proxies
 
1245
            'http_proxy': None,
 
1246
            'HTTP_PROXY': None,
 
1247
            'https_proxy': None,
 
1248
            'HTTPS_PROXY': None,
 
1249
            'no_proxy': None,
 
1250
            'NO_PROXY': None,
 
1251
            'all_proxy': None,
 
1252
            'ALL_PROXY': None,
 
1253
            # Nobody cares about these ones AFAIK. So far at
 
1254
            # least. If you do (care), please update this comment
 
1255
            # -- vila 20061212
 
1256
            'ftp_proxy': None,
 
1257
            'FTP_PROXY': None,
 
1258
            'BZR_REMOTE_PATH': None,
 
1259
        }
 
1260
        self.__old_env = {}
 
1261
        self.addCleanup(self._restoreEnvironment)
 
1262
        for name, value in new_env.iteritems():
 
1263
            self._captureVar(name, value)
 
1264
 
 
1265
    def _captureVar(self, name, newvalue):
 
1266
        """Set an environment variable, and reset it when finished."""
 
1267
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1268
 
 
1269
    def _restore_debug_flags(self):
 
1270
        debug.debug_flags.clear()
 
1271
        debug.debug_flags.update(self._preserved_debug_flags)
 
1272
 
 
1273
    def _restoreEnvironment(self):
 
1274
        for name, value in self.__old_env.iteritems():
 
1275
            osutils.set_or_unset_env(name, value)
 
1276
 
 
1277
    def _restoreHooks(self):
 
1278
        for klass, hooks in self._preserved_hooks.items():
 
1279
            setattr(klass, 'hooks', hooks)
 
1280
 
 
1281
    def knownFailure(self, reason):
 
1282
        """This test has failed for some known reason."""
 
1283
        raise KnownFailure(reason)
 
1284
 
 
1285
    def run(self, result=None):
 
1286
        if result is None: result = self.defaultTestResult()
 
1287
        for feature in getattr(self, '_test_needs_features', []):
 
1288
            if not feature.available():
 
1289
                result.startTest(self)
 
1290
                if getattr(result, 'addNotSupported', None):
 
1291
                    result.addNotSupported(self, feature)
 
1292
                else:
 
1293
                    result.addSuccess(self)
 
1294
                result.stopTest(self)
 
1295
                return
 
1296
        try:
 
1297
            return unittest.TestCase.run(self, result)
 
1298
        finally:
 
1299
            saved_attrs = {}
 
1300
            absent_attr = object()
 
1301
            for attr_name in self.attrs_to_keep:
 
1302
                attr = getattr(self, attr_name, absent_attr)
 
1303
                if attr is not absent_attr:
 
1304
                    saved_attrs[attr_name] = attr
 
1305
            self.__dict__ = saved_attrs
 
1306
 
 
1307
    def tearDown(self):
 
1308
        self._runCleanups()
 
1309
        unittest.TestCase.tearDown(self)
 
1310
 
 
1311
    def time(self, callable, *args, **kwargs):
 
1312
        """Run callable and accrue the time it takes to the benchmark time.
 
1313
 
 
1314
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1315
        this will cause lsprofile statistics to be gathered and stored in
 
1316
        self._benchcalls.
 
1317
        """
 
1318
        if self._benchtime is None:
 
1319
            self._benchtime = 0
 
1320
        start = time.time()
 
1321
        try:
 
1322
            if not self._gather_lsprof_in_benchmarks:
 
1323
                return callable(*args, **kwargs)
 
1324
            else:
 
1325
                # record this benchmark
 
1326
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1327
                stats.sort()
 
1328
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1329
                return ret
 
1330
        finally:
 
1331
            self._benchtime += time.time() - start
 
1332
 
 
1333
    def _runCleanups(self):
 
1334
        """Run registered cleanup functions.
 
1335
 
 
1336
        This should only be called from TestCase.tearDown.
 
1337
        """
 
1338
        # TODO: Perhaps this should keep running cleanups even if
 
1339
        # one of them fails?
 
1340
 
 
1341
        # Actually pop the cleanups from the list so tearDown running
 
1342
        # twice is safe (this happens for skipped tests).
 
1343
        while self._cleanups:
 
1344
            cleanup, args, kwargs = self._cleanups.pop()
 
1345
            cleanup(*args, **kwargs)
 
1346
 
 
1347
    def log(self, *args):
 
1348
        mutter(*args)
 
1349
 
 
1350
    def _get_log(self, keep_log_file=False):
 
1351
        """Get the log from bzrlib.trace calls from this test.
 
1352
 
 
1353
        :param keep_log_file: When True, if the log is still a file on disk
 
1354
            leave it as a file on disk. When False, if the log is still a file
 
1355
            on disk, the log file is deleted and the log preserved as
 
1356
            self._log_contents.
 
1357
        :return: A string containing the log.
 
1358
        """
 
1359
        # flush the log file, to get all content
 
1360
        import bzrlib.trace
 
1361
        if bzrlib.trace._trace_file:
 
1362
            bzrlib.trace._trace_file.flush()
 
1363
        if self._log_contents:
 
1364
            # XXX: this can hardly contain the content flushed above --vila
 
1365
            # 20080128
 
1366
            return self._log_contents
 
1367
        if self._log_file_name is not None:
 
1368
            logfile = open(self._log_file_name)
 
1369
            try:
 
1370
                log_contents = logfile.read()
 
1371
            finally:
 
1372
                logfile.close()
 
1373
            if not keep_log_file:
 
1374
                self._log_contents = log_contents
 
1375
                try:
 
1376
                    os.remove(self._log_file_name)
 
1377
                except OSError, e:
 
1378
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1379
                        sys.stderr.write(('Unable to delete log file '
 
1380
                                             ' %r\n' % self._log_file_name))
 
1381
                    else:
 
1382
                        raise
 
1383
            return log_contents
 
1384
        else:
 
1385
            return "DELETED log file to reduce memory footprint"
 
1386
 
 
1387
    def requireFeature(self, feature):
 
1388
        """This test requires a specific feature is available.
 
1389
 
 
1390
        :raises UnavailableFeature: When feature is not available.
 
1391
        """
 
1392
        if not feature.available():
 
1393
            raise UnavailableFeature(feature)
 
1394
 
 
1395
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1396
            working_dir):
 
1397
        """Run bazaar command line, splitting up a string command line."""
 
1398
        if isinstance(args, basestring):
 
1399
            # shlex don't understand unicode strings,
 
1400
            # so args should be plain string (bialix 20070906)
 
1401
            args = list(shlex.split(str(args)))
 
1402
        return self._run_bzr_core(args, retcode=retcode,
 
1403
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1404
                )
 
1405
 
 
1406
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1407
            working_dir):
 
1408
        if encoding is None:
 
1409
            encoding = osutils.get_user_encoding()
 
1410
        stdout = StringIOWrapper()
 
1411
        stderr = StringIOWrapper()
 
1412
        stdout.encoding = encoding
 
1413
        stderr.encoding = encoding
 
1414
 
 
1415
        self.log('run bzr: %r', args)
 
1416
        # FIXME: don't call into logging here
 
1417
        handler = logging.StreamHandler(stderr)
 
1418
        handler.setLevel(logging.INFO)
 
1419
        logger = logging.getLogger('')
 
1420
        logger.addHandler(handler)
 
1421
        old_ui_factory = ui.ui_factory
 
1422
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1423
 
 
1424
        cwd = None
 
1425
        if working_dir is not None:
 
1426
            cwd = osutils.getcwd()
 
1427
            os.chdir(working_dir)
 
1428
 
 
1429
        try:
 
1430
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1431
                stdout, stderr,
 
1432
                bzrlib.commands.run_bzr_catch_user_errors,
 
1433
                args)
 
1434
        finally:
 
1435
            logger.removeHandler(handler)
 
1436
            ui.ui_factory = old_ui_factory
 
1437
            if cwd is not None:
 
1438
                os.chdir(cwd)
 
1439
 
 
1440
        out = stdout.getvalue()
 
1441
        err = stderr.getvalue()
 
1442
        if out:
 
1443
            self.log('output:\n%r', out)
 
1444
        if err:
 
1445
            self.log('errors:\n%r', err)
 
1446
        if retcode is not None:
 
1447
            self.assertEquals(retcode, result,
 
1448
                              message='Unexpected return code')
 
1449
        return out, err
 
1450
 
 
1451
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1452
                working_dir=None, error_regexes=[], output_encoding=None):
 
1453
        """Invoke bzr, as if it were run from the command line.
 
1454
 
 
1455
        The argument list should not include the bzr program name - the
 
1456
        first argument is normally the bzr command.  Arguments may be
 
1457
        passed in three ways:
 
1458
 
 
1459
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1460
        when the command contains whitespace or metacharacters, or
 
1461
        is built up at run time.
 
1462
 
 
1463
        2- A single string, eg "add a".  This is the most convenient
 
1464
        for hardcoded commands.
 
1465
 
 
1466
        This runs bzr through the interface that catches and reports
 
1467
        errors, and with logging set to something approximating the
 
1468
        default, so that error reporting can be checked.
 
1469
 
 
1470
        This should be the main method for tests that want to exercise the
 
1471
        overall behavior of the bzr application (rather than a unit test
 
1472
        or a functional test of the library.)
 
1473
 
 
1474
        This sends the stdout/stderr results into the test's log,
 
1475
        where it may be useful for debugging.  See also run_captured.
 
1476
 
 
1477
        :keyword stdin: A string to be used as stdin for the command.
 
1478
        :keyword retcode: The status code the command should return;
 
1479
            default 0.
 
1480
        :keyword working_dir: The directory to run the command in
 
1481
        :keyword error_regexes: A list of expected error messages.  If
 
1482
            specified they must be seen in the error output of the command.
 
1483
        """
 
1484
        out, err = self._run_bzr_autosplit(
 
1485
            args=args,
 
1486
            retcode=retcode,
 
1487
            encoding=encoding,
 
1488
            stdin=stdin,
 
1489
            working_dir=working_dir,
 
1490
            )
 
1491
        for regex in error_regexes:
 
1492
            self.assertContainsRe(err, regex)
 
1493
        return out, err
 
1494
 
 
1495
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1496
        """Run bzr, and check that stderr contains the supplied regexes
 
1497
 
 
1498
        :param error_regexes: Sequence of regular expressions which
 
1499
            must each be found in the error output. The relative ordering
 
1500
            is not enforced.
 
1501
        :param args: command-line arguments for bzr
 
1502
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1503
            This function changes the default value of retcode to be 3,
 
1504
            since in most cases this is run when you expect bzr to fail.
 
1505
 
 
1506
        :return: (out, err) The actual output of running the command (in case
 
1507
            you want to do more inspection)
 
1508
 
 
1509
        Examples of use::
 
1510
 
 
1511
            # Make sure that commit is failing because there is nothing to do
 
1512
            self.run_bzr_error(['no changes to commit'],
 
1513
                               ['commit', '-m', 'my commit comment'])
 
1514
            # Make sure --strict is handling an unknown file, rather than
 
1515
            # giving us the 'nothing to do' error
 
1516
            self.build_tree(['unknown'])
 
1517
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1518
                               ['commit', --strict', '-m', 'my commit comment'])
 
1519
        """
 
1520
        kwargs.setdefault('retcode', 3)
 
1521
        kwargs['error_regexes'] = error_regexes
 
1522
        out, err = self.run_bzr(*args, **kwargs)
 
1523
        return out, err
 
1524
 
 
1525
    def run_bzr_subprocess(self, *args, **kwargs):
 
1526
        """Run bzr in a subprocess for testing.
 
1527
 
 
1528
        This starts a new Python interpreter and runs bzr in there.
 
1529
        This should only be used for tests that have a justifiable need for
 
1530
        this isolation: e.g. they are testing startup time, or signal
 
1531
        handling, or early startup code, etc.  Subprocess code can't be
 
1532
        profiled or debugged so easily.
 
1533
 
 
1534
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1535
            None is supplied, the status code is not checked.
 
1536
        :keyword env_changes: A dictionary which lists changes to environment
 
1537
            variables. A value of None will unset the env variable.
 
1538
            The values must be strings. The change will only occur in the
 
1539
            child, so you don't need to fix the environment after running.
 
1540
        :keyword universal_newlines: Convert CRLF => LF
 
1541
        :keyword allow_plugins: By default the subprocess is run with
 
1542
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1543
            for system-wide plugins to create unexpected output on stderr,
 
1544
            which can cause unnecessary test failures.
 
1545
        """
 
1546
        env_changes = kwargs.get('env_changes', {})
 
1547
        working_dir = kwargs.get('working_dir', None)
 
1548
        allow_plugins = kwargs.get('allow_plugins', False)
 
1549
        if len(args) == 1:
 
1550
            if isinstance(args[0], list):
 
1551
                args = args[0]
 
1552
            elif isinstance(args[0], basestring):
 
1553
                args = list(shlex.split(args[0]))
 
1554
        else:
 
1555
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1556
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1557
                                            working_dir=working_dir,
 
1558
                                            allow_plugins=allow_plugins)
 
1559
        # We distinguish between retcode=None and retcode not passed.
 
1560
        supplied_retcode = kwargs.get('retcode', 0)
 
1561
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1562
            universal_newlines=kwargs.get('universal_newlines', False),
 
1563
            process_args=args)
 
1564
 
 
1565
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1566
                             skip_if_plan_to_signal=False,
 
1567
                             working_dir=None,
 
1568
                             allow_plugins=False):
 
1569
        """Start bzr in a subprocess for testing.
 
1570
 
 
1571
        This starts a new Python interpreter and runs bzr in there.
 
1572
        This should only be used for tests that have a justifiable need for
 
1573
        this isolation: e.g. they are testing startup time, or signal
 
1574
        handling, or early startup code, etc.  Subprocess code can't be
 
1575
        profiled or debugged so easily.
 
1576
 
 
1577
        :param process_args: a list of arguments to pass to the bzr executable,
 
1578
            for example ``['--version']``.
 
1579
        :param env_changes: A dictionary which lists changes to environment
 
1580
            variables. A value of None will unset the env variable.
 
1581
            The values must be strings. The change will only occur in the
 
1582
            child, so you don't need to fix the environment after running.
 
1583
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1584
            is not available.
 
1585
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1586
 
 
1587
        :returns: Popen object for the started process.
 
1588
        """
 
1589
        if skip_if_plan_to_signal:
 
1590
            if not getattr(os, 'kill', None):
 
1591
                raise TestSkipped("os.kill not available.")
 
1592
 
 
1593
        if env_changes is None:
 
1594
            env_changes = {}
 
1595
        old_env = {}
 
1596
 
 
1597
        def cleanup_environment():
 
1598
            for env_var, value in env_changes.iteritems():
 
1599
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1600
 
 
1601
        def restore_environment():
 
1602
            for env_var, value in old_env.iteritems():
 
1603
                osutils.set_or_unset_env(env_var, value)
 
1604
 
 
1605
        bzr_path = self.get_bzr_path()
 
1606
 
 
1607
        cwd = None
 
1608
        if working_dir is not None:
 
1609
            cwd = osutils.getcwd()
 
1610
            os.chdir(working_dir)
 
1611
 
 
1612
        try:
 
1613
            # win32 subprocess doesn't support preexec_fn
 
1614
            # so we will avoid using it on all platforms, just to
 
1615
            # make sure the code path is used, and we don't break on win32
 
1616
            cleanup_environment()
 
1617
            command = [sys.executable]
 
1618
            # frozen executables don't need the path to bzr
 
1619
            if getattr(sys, "frozen", None) is None:
 
1620
                command.append(bzr_path)
 
1621
            if not allow_plugins:
 
1622
                command.append('--no-plugins')
 
1623
            command.extend(process_args)
 
1624
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1625
        finally:
 
1626
            restore_environment()
 
1627
            if cwd is not None:
 
1628
                os.chdir(cwd)
 
1629
 
 
1630
        return process
 
1631
 
 
1632
    def _popen(self, *args, **kwargs):
 
1633
        """Place a call to Popen.
 
1634
 
 
1635
        Allows tests to override this method to intercept the calls made to
 
1636
        Popen for introspection.
 
1637
        """
 
1638
        return Popen(*args, **kwargs)
 
1639
 
 
1640
    def get_bzr_path(self):
 
1641
        """Return the path of the 'bzr' executable for this test suite."""
 
1642
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1643
        if not os.path.isfile(bzr_path):
 
1644
            # We are probably installed. Assume sys.argv is the right file
 
1645
            bzr_path = sys.argv[0]
 
1646
        return bzr_path
 
1647
 
 
1648
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1649
                              universal_newlines=False, process_args=None):
 
1650
        """Finish the execution of process.
 
1651
 
 
1652
        :param process: the Popen object returned from start_bzr_subprocess.
 
1653
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1654
            None is supplied, the status code is not checked.
 
1655
        :param send_signal: an optional signal to send to the process.
 
1656
        :param universal_newlines: Convert CRLF => LF
 
1657
        :returns: (stdout, stderr)
 
1658
        """
 
1659
        if send_signal is not None:
 
1660
            os.kill(process.pid, send_signal)
 
1661
        out, err = process.communicate()
 
1662
 
 
1663
        if universal_newlines:
 
1664
            out = out.replace('\r\n', '\n')
 
1665
            err = err.replace('\r\n', '\n')
 
1666
 
 
1667
        if retcode is not None and retcode != process.returncode:
 
1668
            if process_args is None:
 
1669
                process_args = "(unknown args)"
 
1670
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1671
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1672
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1673
                      % (process_args, retcode, process.returncode))
 
1674
        return [out, err]
 
1675
 
 
1676
    def check_inventory_shape(self, inv, shape):
 
1677
        """Compare an inventory to a list of expected names.
 
1678
 
 
1679
        Fail if they are not precisely equal.
 
1680
        """
 
1681
        extras = []
 
1682
        shape = list(shape)             # copy
 
1683
        for path, ie in inv.entries():
 
1684
            name = path.replace('\\', '/')
 
1685
            if ie.kind == 'directory':
 
1686
                name = name + '/'
 
1687
            if name in shape:
 
1688
                shape.remove(name)
 
1689
            else:
 
1690
                extras.append(name)
 
1691
        if shape:
 
1692
            self.fail("expected paths not found in inventory: %r" % shape)
 
1693
        if extras:
 
1694
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1695
 
 
1696
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1697
                         a_callable=None, *args, **kwargs):
 
1698
        """Call callable with redirected std io pipes.
 
1699
 
 
1700
        Returns the return code."""
 
1701
        if not callable(a_callable):
 
1702
            raise ValueError("a_callable must be callable.")
 
1703
        if stdin is None:
 
1704
            stdin = StringIO("")
 
1705
        if stdout is None:
 
1706
            if getattr(self, "_log_file", None) is not None:
 
1707
                stdout = self._log_file
 
1708
            else:
 
1709
                stdout = StringIO()
 
1710
        if stderr is None:
 
1711
            if getattr(self, "_log_file", None is not None):
 
1712
                stderr = self._log_file
 
1713
            else:
 
1714
                stderr = StringIO()
 
1715
        real_stdin = sys.stdin
 
1716
        real_stdout = sys.stdout
 
1717
        real_stderr = sys.stderr
 
1718
        try:
 
1719
            sys.stdout = stdout
 
1720
            sys.stderr = stderr
 
1721
            sys.stdin = stdin
 
1722
            return a_callable(*args, **kwargs)
 
1723
        finally:
 
1724
            sys.stdout = real_stdout
 
1725
            sys.stderr = real_stderr
 
1726
            sys.stdin = real_stdin
 
1727
 
 
1728
    def reduceLockdirTimeout(self):
 
1729
        """Reduce the default lock timeout for the duration of the test, so that
 
1730
        if LockContention occurs during a test, it does so quickly.
 
1731
 
 
1732
        Tests that expect to provoke LockContention errors should call this.
 
1733
        """
 
1734
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1735
        def resetTimeout():
 
1736
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1737
        self.addCleanup(resetTimeout)
 
1738
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1739
 
 
1740
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1741
        """Return a StringIOWrapper instance, that will encode Unicode
 
1742
        input to UTF-8.
 
1743
        """
 
1744
        if encoding_type is None:
 
1745
            encoding_type = 'strict'
 
1746
        sio = StringIO()
 
1747
        output_encoding = 'utf-8'
 
1748
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1749
        sio.encoding = output_encoding
 
1750
        return sio
 
1751
 
 
1752
 
 
1753
class TestCaseWithMemoryTransport(TestCase):
 
1754
    """Common test class for tests that do not need disk resources.
 
1755
 
 
1756
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1757
 
 
1758
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1759
 
 
1760
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1761
    a directory which does not exist. This serves to help ensure test isolation
 
1762
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1763
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1764
    file defaults for the transport in tests, nor does it obey the command line
 
1765
    override, so tests that accidentally write to the common directory should
 
1766
    be rare.
 
1767
 
 
1768
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1769
    a .bzr directory that stops us ascending higher into the filesystem.
 
1770
    """
 
1771
 
 
1772
    TEST_ROOT = None
 
1773
    _TEST_NAME = 'test'
 
1774
 
 
1775
    def __init__(self, methodName='runTest'):
 
1776
        # allow test parameterization after test construction and before test
 
1777
        # execution. Variables that the parameterizer sets need to be
 
1778
        # ones that are not set by setUp, or setUp will trash them.
 
1779
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1780
        self.vfs_transport_factory = default_transport
 
1781
        self.transport_server = None
 
1782
        self.transport_readonly_server = None
 
1783
        self.__vfs_server = None
 
1784
 
 
1785
    def get_transport(self, relpath=None):
 
1786
        """Return a writeable transport.
 
1787
 
 
1788
        This transport is for the test scratch space relative to
 
1789
        "self._test_root"
 
1790
 
 
1791
        :param relpath: a path relative to the base url.
 
1792
        """
 
1793
        t = get_transport(self.get_url(relpath))
 
1794
        self.assertFalse(t.is_readonly())
 
1795
        return t
 
1796
 
 
1797
    def get_readonly_transport(self, relpath=None):
 
1798
        """Return a readonly transport for the test scratch space
 
1799
 
 
1800
        This can be used to test that operations which should only need
 
1801
        readonly access in fact do not try to write.
 
1802
 
 
1803
        :param relpath: a path relative to the base url.
 
1804
        """
 
1805
        t = get_transport(self.get_readonly_url(relpath))
 
1806
        self.assertTrue(t.is_readonly())
 
1807
        return t
 
1808
 
 
1809
    def create_transport_readonly_server(self):
 
1810
        """Create a transport server from class defined at init.
 
1811
 
 
1812
        This is mostly a hook for daughter classes.
 
1813
        """
 
1814
        return self.transport_readonly_server()
 
1815
 
 
1816
    def get_readonly_server(self):
 
1817
        """Get the server instance for the readonly transport
 
1818
 
 
1819
        This is useful for some tests with specific servers to do diagnostics.
 
1820
        """
 
1821
        if self.__readonly_server is None:
 
1822
            if self.transport_readonly_server is None:
 
1823
                # readonly decorator requested
 
1824
                # bring up the server
 
1825
                self.__readonly_server = ReadonlyServer()
 
1826
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1827
            else:
 
1828
                self.__readonly_server = self.create_transport_readonly_server()
 
1829
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1830
            self.addCleanup(self.__readonly_server.tearDown)
 
1831
        return self.__readonly_server
 
1832
 
 
1833
    def get_readonly_url(self, relpath=None):
 
1834
        """Get a URL for the readonly transport.
 
1835
 
 
1836
        This will either be backed by '.' or a decorator to the transport
 
1837
        used by self.get_url()
 
1838
        relpath provides for clients to get a path relative to the base url.
 
1839
        These should only be downwards relative, not upwards.
 
1840
        """
 
1841
        base = self.get_readonly_server().get_url()
 
1842
        return self._adjust_url(base, relpath)
 
1843
 
 
1844
    def get_vfs_only_server(self):
 
1845
        """Get the vfs only read/write server instance.
 
1846
 
 
1847
        This is useful for some tests with specific servers that need
 
1848
        diagnostics.
 
1849
 
 
1850
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1851
        is no means to override it.
 
1852
        """
 
1853
        if self.__vfs_server is None:
 
1854
            self.__vfs_server = MemoryServer()
 
1855
            self.__vfs_server.setUp()
 
1856
            self.addCleanup(self.__vfs_server.tearDown)
 
1857
        return self.__vfs_server
 
1858
 
 
1859
    def get_server(self):
 
1860
        """Get the read/write server instance.
 
1861
 
 
1862
        This is useful for some tests with specific servers that need
 
1863
        diagnostics.
 
1864
 
 
1865
        This is built from the self.transport_server factory. If that is None,
 
1866
        then the self.get_vfs_server is returned.
 
1867
        """
 
1868
        if self.__server is None:
 
1869
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1870
                return self.get_vfs_only_server()
 
1871
            else:
 
1872
                # bring up a decorated means of access to the vfs only server.
 
1873
                self.__server = self.transport_server()
 
1874
                try:
 
1875
                    self.__server.setUp(self.get_vfs_only_server())
 
1876
                except TypeError, e:
 
1877
                    # This should never happen; the try:Except here is to assist
 
1878
                    # developers having to update code rather than seeing an
 
1879
                    # uninformative TypeError.
 
1880
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1881
            self.addCleanup(self.__server.tearDown)
 
1882
        return self.__server
 
1883
 
 
1884
    def _adjust_url(self, base, relpath):
 
1885
        """Get a URL (or maybe a path) for the readwrite transport.
 
1886
 
 
1887
        This will either be backed by '.' or to an equivalent non-file based
 
1888
        facility.
 
1889
        relpath provides for clients to get a path relative to the base url.
 
1890
        These should only be downwards relative, not upwards.
 
1891
        """
 
1892
        if relpath is not None and relpath != '.':
 
1893
            if not base.endswith('/'):
 
1894
                base = base + '/'
 
1895
            # XXX: Really base should be a url; we did after all call
 
1896
            # get_url()!  But sometimes it's just a path (from
 
1897
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1898
            # to a non-escaped local path.
 
1899
            if base.startswith('./') or base.startswith('/'):
 
1900
                base += relpath
 
1901
            else:
 
1902
                base += urlutils.escape(relpath)
 
1903
        return base
 
1904
 
 
1905
    def get_url(self, relpath=None):
 
1906
        """Get a URL (or maybe a path) for the readwrite transport.
 
1907
 
 
1908
        This will either be backed by '.' or to an equivalent non-file based
 
1909
        facility.
 
1910
        relpath provides for clients to get a path relative to the base url.
 
1911
        These should only be downwards relative, not upwards.
 
1912
        """
 
1913
        base = self.get_server().get_url()
 
1914
        return self._adjust_url(base, relpath)
 
1915
 
 
1916
    def get_vfs_only_url(self, relpath=None):
 
1917
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1918
 
 
1919
        This will never be a smart protocol.  It always has all the
 
1920
        capabilities of the local filesystem, but it might actually be a
 
1921
        MemoryTransport or some other similar virtual filesystem.
 
1922
 
 
1923
        This is the backing transport (if any) of the server returned by
 
1924
        get_url and get_readonly_url.
 
1925
 
 
1926
        :param relpath: provides for clients to get a path relative to the base
 
1927
            url.  These should only be downwards relative, not upwards.
 
1928
        :return: A URL
 
1929
        """
 
1930
        base = self.get_vfs_only_server().get_url()
 
1931
        return self._adjust_url(base, relpath)
 
1932
 
 
1933
    def _create_safety_net(self):
 
1934
        """Make a fake bzr directory.
 
1935
 
 
1936
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
1937
        real branch.
 
1938
        """
 
1939
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
1940
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1941
 
 
1942
    def _check_safety_net(self):
 
1943
        """Check that the safety .bzr directory have not been touched.
 
1944
 
 
1945
        _make_test_root have created a .bzr directory to prevent tests from
 
1946
        propagating. This method ensures than a test did not leaked.
 
1947
        """
 
1948
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
1949
        wt = workingtree.WorkingTree.open(root)
 
1950
        last_rev = wt.last_revision()
 
1951
        if last_rev != 'null:':
 
1952
            # The current test have modified the /bzr directory, we need to
 
1953
            # recreate a new one or all the followng tests will fail.
 
1954
            # If you need to inspect its content uncomment the following line
 
1955
            # import pdb; pdb.set_trace()
 
1956
            _rmtree_temp_dir(root + '/.bzr')
 
1957
            self._create_safety_net()
 
1958
            raise AssertionError('%s/.bzr should not be modified' % root)
 
1959
 
 
1960
    def _make_test_root(self):
 
1961
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
1962
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1963
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
1964
 
 
1965
            self._create_safety_net()
 
1966
 
 
1967
            # The same directory is used by all tests, and we're not
 
1968
            # specifically told when all tests are finished.  This will do.
 
1969
            atexit.register(_rmtree_temp_dir, root)
 
1970
 
 
1971
        self.addCleanup(self._check_safety_net)
 
1972
 
 
1973
    def makeAndChdirToTestDir(self):
 
1974
        """Create a temporary directories for this one test.
 
1975
 
 
1976
        This must set self.test_home_dir and self.test_dir and chdir to
 
1977
        self.test_dir.
 
1978
 
 
1979
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1980
        """
 
1981
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1982
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1983
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1984
 
 
1985
    def make_branch(self, relpath, format=None):
 
1986
        """Create a branch on the transport at relpath."""
 
1987
        repo = self.make_repository(relpath, format=format)
 
1988
        return repo.bzrdir.create_branch()
 
1989
 
 
1990
    def make_bzrdir(self, relpath, format=None):
 
1991
        try:
 
1992
            # might be a relative or absolute path
 
1993
            maybe_a_url = self.get_url(relpath)
 
1994
            segments = maybe_a_url.rsplit('/', 1)
 
1995
            t = get_transport(maybe_a_url)
 
1996
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1997
                t.ensure_base()
 
1998
            if format is None:
 
1999
                format = 'default'
 
2000
            if isinstance(format, basestring):
 
2001
                format = bzrdir.format_registry.make_bzrdir(format)
 
2002
            return format.initialize_on_transport(t)
 
2003
        except errors.UninitializableFormat:
 
2004
            raise TestSkipped("Format %s is not initializable." % format)
 
2005
 
 
2006
    def make_repository(self, relpath, shared=False, format=None):
 
2007
        """Create a repository on our default transport at relpath.
 
2008
 
 
2009
        Note that relpath must be a relative path, not a full url.
 
2010
        """
 
2011
        # FIXME: If you create a remoterepository this returns the underlying
 
2012
        # real format, which is incorrect.  Actually we should make sure that
 
2013
        # RemoteBzrDir returns a RemoteRepository.
 
2014
        # maybe  mbp 20070410
 
2015
        made_control = self.make_bzrdir(relpath, format=format)
 
2016
        return made_control.create_repository(shared=shared)
 
2017
 
 
2018
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2019
        """Create a branch on the default transport and a MemoryTree for it."""
 
2020
        b = self.make_branch(relpath, format=format)
 
2021
        return memorytree.MemoryTree.create_on_branch(b)
 
2022
 
 
2023
    def make_branch_builder(self, relpath, format=None):
 
2024
        url = self.get_url(relpath)
 
2025
        tran = get_transport(url)
 
2026
        return branchbuilder.BranchBuilder(get_transport(url), format=format)
 
2027
 
 
2028
    def overrideEnvironmentForTesting(self):
 
2029
        os.environ['HOME'] = self.test_home_dir
 
2030
        os.environ['BZR_HOME'] = self.test_home_dir
 
2031
 
 
2032
    def setUp(self):
 
2033
        super(TestCaseWithMemoryTransport, self).setUp()
 
2034
        self._make_test_root()
 
2035
        _currentdir = os.getcwdu()
 
2036
        def _leaveDirectory():
 
2037
            os.chdir(_currentdir)
 
2038
        self.addCleanup(_leaveDirectory)
 
2039
        self.makeAndChdirToTestDir()
 
2040
        self.overrideEnvironmentForTesting()
 
2041
        self.__readonly_server = None
 
2042
        self.__server = None
 
2043
        self.reduceLockdirTimeout()
 
2044
 
 
2045
    def setup_smart_server_with_call_log(self):
 
2046
        """Sets up a smart server as the transport server with a call log."""
 
2047
        self.transport_server = server.SmartTCPServer_for_testing
 
2048
        self.hpss_calls = []
 
2049
        def capture_hpss_call(params):
 
2050
            import traceback
 
2051
            self.hpss_calls.append((params, traceback.format_stack()))
 
2052
        client._SmartClient.hooks.install_named_hook(
 
2053
            'call', capture_hpss_call, None)
 
2054
 
 
2055
    def reset_smart_call_log(self):
 
2056
        self.hpss_calls = []
 
2057
 
 
2058
 
 
2059
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2060
    """Derived class that runs a test within a temporary directory.
 
2061
 
 
2062
    This is useful for tests that need to create a branch, etc.
 
2063
 
 
2064
    The directory is created in a slightly complex way: for each
 
2065
    Python invocation, a new temporary top-level directory is created.
 
2066
    All test cases create their own directory within that.  If the
 
2067
    tests complete successfully, the directory is removed.
 
2068
 
 
2069
    :ivar test_base_dir: The path of the top-level directory for this
 
2070
    test, which contains a home directory and a work directory.
 
2071
 
 
2072
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2073
    which is used as $HOME for this test.
 
2074
 
 
2075
    :ivar test_dir: A directory under test_base_dir used as the current
 
2076
    directory when the test proper is run.
 
2077
    """
 
2078
 
 
2079
    OVERRIDE_PYTHON = 'python'
 
2080
 
 
2081
    def check_file_contents(self, filename, expect):
 
2082
        self.log("check contents of file %s" % filename)
 
2083
        contents = file(filename, 'r').read()
 
2084
        if contents != expect:
 
2085
            self.log("expected: %r" % expect)
 
2086
            self.log("actually: %r" % contents)
 
2087
            self.fail("contents of %s not as expected" % filename)
 
2088
 
 
2089
    def _getTestDirPrefix(self):
 
2090
        # create a directory within the top level test directory
 
2091
        if sys.platform == 'win32':
 
2092
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2093
            # windows is likely to have path-length limits so use a short name
 
2094
            name_prefix = name_prefix[-30:]
 
2095
        else:
 
2096
            name_prefix = re.sub('[/]', '_', self.id())
 
2097
        return name_prefix
 
2098
 
 
2099
    def makeAndChdirToTestDir(self):
 
2100
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2101
 
 
2102
        For TestCaseInTempDir we create a temporary directory based on the test
 
2103
        name and then create two subdirs - test and home under it.
 
2104
        """
 
2105
        name_prefix = osutils.pathjoin(self.TEST_ROOT, self._getTestDirPrefix())
 
2106
        name = name_prefix
 
2107
        for i in range(100):
 
2108
            if os.path.exists(name):
 
2109
                name = name_prefix + '_' + str(i)
 
2110
            else:
 
2111
                os.mkdir(name)
 
2112
                break
 
2113
        # now create test and home directories within this dir
 
2114
        self.test_base_dir = name
 
2115
        self.test_home_dir = self.test_base_dir + '/home'
 
2116
        os.mkdir(self.test_home_dir)
 
2117
        self.test_dir = self.test_base_dir + '/work'
 
2118
        os.mkdir(self.test_dir)
 
2119
        os.chdir(self.test_dir)
 
2120
        # put name of test inside
 
2121
        f = file(self.test_base_dir + '/name', 'w')
 
2122
        try:
 
2123
            f.write(self.id())
 
2124
        finally:
 
2125
            f.close()
 
2126
        self.addCleanup(self.deleteTestDir)
 
2127
 
 
2128
    def deleteTestDir(self):
 
2129
        os.chdir(self.TEST_ROOT)
 
2130
        _rmtree_temp_dir(self.test_base_dir)
 
2131
 
 
2132
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2133
        """Build a test tree according to a pattern.
 
2134
 
 
2135
        shape is a sequence of file specifications.  If the final
 
2136
        character is '/', a directory is created.
 
2137
 
 
2138
        This assumes that all the elements in the tree being built are new.
 
2139
 
 
2140
        This doesn't add anything to a branch.
 
2141
 
 
2142
        :type shape:    list or tuple.
 
2143
        :param line_endings: Either 'binary' or 'native'
 
2144
            in binary mode, exact contents are written in native mode, the
 
2145
            line endings match the default platform endings.
 
2146
        :param transport: A transport to write to, for building trees on VFS's.
 
2147
            If the transport is readonly or None, "." is opened automatically.
 
2148
        :return: None
 
2149
        """
 
2150
        if type(shape) not in (list, tuple):
 
2151
            raise AssertionError("Parameter 'shape' should be "
 
2152
                "a list or a tuple. Got %r instead" % (shape,))
 
2153
        # It's OK to just create them using forward slashes on windows.
 
2154
        if transport is None or transport.is_readonly():
 
2155
            transport = get_transport(".")
 
2156
        for name in shape:
 
2157
            self.assertIsInstance(name, basestring)
 
2158
            if name[-1] == '/':
 
2159
                transport.mkdir(urlutils.escape(name[:-1]))
 
2160
            else:
 
2161
                if line_endings == 'binary':
 
2162
                    end = '\n'
 
2163
                elif line_endings == 'native':
 
2164
                    end = os.linesep
 
2165
                else:
 
2166
                    raise errors.BzrError(
 
2167
                        'Invalid line ending request %r' % line_endings)
 
2168
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2169
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2170
 
 
2171
    def build_tree_contents(self, shape):
 
2172
        build_tree_contents(shape)
 
2173
 
 
2174
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2175
        """Assert whether path or paths are in the WorkingTree"""
 
2176
        if tree is None:
 
2177
            tree = workingtree.WorkingTree.open(root_path)
 
2178
        if not isinstance(path, basestring):
 
2179
            for p in path:
 
2180
                self.assertInWorkingTree(p, tree=tree)
 
2181
        else:
 
2182
            self.assertIsNot(tree.path2id(path), None,
 
2183
                path+' not in working tree.')
 
2184
 
 
2185
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2186
        """Assert whether path or paths are not in the WorkingTree"""
 
2187
        if tree is None:
 
2188
            tree = workingtree.WorkingTree.open(root_path)
 
2189
        if not isinstance(path, basestring):
 
2190
            for p in path:
 
2191
                self.assertNotInWorkingTree(p,tree=tree)
 
2192
        else:
 
2193
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2194
 
 
2195
 
 
2196
class TestCaseWithTransport(TestCaseInTempDir):
 
2197
    """A test case that provides get_url and get_readonly_url facilities.
 
2198
 
 
2199
    These back onto two transport servers, one for readonly access and one for
 
2200
    read write access.
 
2201
 
 
2202
    If no explicit class is provided for readonly access, a
 
2203
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2204
    based read write transports.
 
2205
 
 
2206
    If an explicit class is provided for readonly access, that server and the
 
2207
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2208
    """
 
2209
 
 
2210
    def get_vfs_only_server(self):
 
2211
        """See TestCaseWithMemoryTransport.
 
2212
 
 
2213
        This is useful for some tests with specific servers that need
 
2214
        diagnostics.
 
2215
        """
 
2216
        if self.__vfs_server is None:
 
2217
            self.__vfs_server = self.vfs_transport_factory()
 
2218
            self.__vfs_server.setUp()
 
2219
            self.addCleanup(self.__vfs_server.tearDown)
 
2220
        return self.__vfs_server
 
2221
 
 
2222
    def make_branch_and_tree(self, relpath, format=None):
 
2223
        """Create a branch on the transport and a tree locally.
 
2224
 
 
2225
        If the transport is not a LocalTransport, the Tree can't be created on
 
2226
        the transport.  In that case if the vfs_transport_factory is
 
2227
        LocalURLServer the working tree is created in the local
 
2228
        directory backing the transport, and the returned tree's branch and
 
2229
        repository will also be accessed locally. Otherwise a lightweight
 
2230
        checkout is created and returned.
 
2231
 
 
2232
        :param format: The BzrDirFormat.
 
2233
        :returns: the WorkingTree.
 
2234
        """
 
2235
        # TODO: always use the local disk path for the working tree,
 
2236
        # this obviously requires a format that supports branch references
 
2237
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2238
        # RBC 20060208
 
2239
        b = self.make_branch(relpath, format=format)
 
2240
        try:
 
2241
            return b.bzrdir.create_workingtree()
 
2242
        except errors.NotLocalUrl:
 
2243
            # We can only make working trees locally at the moment.  If the
 
2244
            # transport can't support them, then we keep the non-disk-backed
 
2245
            # branch and create a local checkout.
 
2246
            if self.vfs_transport_factory is LocalURLServer:
 
2247
                # the branch is colocated on disk, we cannot create a checkout.
 
2248
                # hopefully callers will expect this.
 
2249
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2250
                wt = local_controldir.create_workingtree()
 
2251
                if wt.branch._format != b._format:
 
2252
                    wt._branch = b
 
2253
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2254
                    # in case the implementation details of workingtree objects
 
2255
                    # change.
 
2256
                    self.assertIs(b, wt.branch)
 
2257
                return wt
 
2258
            else:
 
2259
                return b.create_checkout(relpath, lightweight=True)
 
2260
 
 
2261
    def assertIsDirectory(self, relpath, transport):
 
2262
        """Assert that relpath within transport is a directory.
 
2263
 
 
2264
        This may not be possible on all transports; in that case it propagates
 
2265
        a TransportNotPossible.
 
2266
        """
 
2267
        try:
 
2268
            mode = transport.stat(relpath).st_mode
 
2269
        except errors.NoSuchFile:
 
2270
            self.fail("path %s is not a directory; no such file"
 
2271
                      % (relpath))
 
2272
        if not stat.S_ISDIR(mode):
 
2273
            self.fail("path %s is not a directory; has mode %#o"
 
2274
                      % (relpath, mode))
 
2275
 
 
2276
    def assertTreesEqual(self, left, right):
 
2277
        """Check that left and right have the same content and properties."""
 
2278
        # we use a tree delta to check for equality of the content, and we
 
2279
        # manually check for equality of other things such as the parents list.
 
2280
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2281
        differences = left.changes_from(right)
 
2282
        self.assertFalse(differences.has_changed(),
 
2283
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2284
 
 
2285
    def setUp(self):
 
2286
        super(TestCaseWithTransport, self).setUp()
 
2287
        self.__vfs_server = None
 
2288
 
 
2289
 
 
2290
class ChrootedTestCase(TestCaseWithTransport):
 
2291
    """A support class that provides readonly urls outside the local namespace.
 
2292
 
 
2293
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2294
    is then we are chrooted already, if it is not then an HttpServer is used
 
2295
    for readonly urls.
 
2296
 
 
2297
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2298
                       be used without needed to redo it when a different
 
2299
                       subclass is in use ?
 
2300
    """
 
2301
 
 
2302
    def setUp(self):
 
2303
        super(ChrootedTestCase, self).setUp()
 
2304
        if not self.vfs_transport_factory == MemoryServer:
 
2305
            self.transport_readonly_server = HttpServer
 
2306
 
 
2307
 
 
2308
def condition_id_re(pattern):
 
2309
    """Create a condition filter which performs a re check on a test's id.
 
2310
 
 
2311
    :param pattern: A regular expression string.
 
2312
    :return: A callable that returns True if the re matches.
 
2313
    """
 
2314
    filter_re = re.compile(pattern)
 
2315
    def condition(test):
 
2316
        test_id = test.id()
 
2317
        return filter_re.search(test_id)
 
2318
    return condition
 
2319
 
 
2320
 
 
2321
def condition_isinstance(klass_or_klass_list):
 
2322
    """Create a condition filter which returns isinstance(param, klass).
 
2323
 
 
2324
    :return: A callable which when called with one parameter obj return the
 
2325
        result of isinstance(obj, klass_or_klass_list).
 
2326
    """
 
2327
    def condition(obj):
 
2328
        return isinstance(obj, klass_or_klass_list)
 
2329
    return condition
 
2330
 
 
2331
 
 
2332
def condition_id_in_list(id_list):
 
2333
    """Create a condition filter which verify that test's id in a list.
 
2334
 
 
2335
    :param id_list: A TestIdList object.
 
2336
    :return: A callable that returns True if the test's id appears in the list.
 
2337
    """
 
2338
    def condition(test):
 
2339
        return id_list.includes(test.id())
 
2340
    return condition
 
2341
 
 
2342
 
 
2343
def condition_id_startswith(starts):
 
2344
    """Create a condition filter verifying that test's id starts with a string.
 
2345
 
 
2346
    :param starts: A list of string.
 
2347
    :return: A callable that returns True if the test's id starts with one of
 
2348
        the given strings.
 
2349
    """
 
2350
    def condition(test):
 
2351
        for start in starts:
 
2352
            if test.id().startswith(start):
 
2353
                return True
 
2354
        return False
 
2355
    return condition
 
2356
 
 
2357
 
 
2358
def exclude_tests_by_condition(suite, condition):
 
2359
    """Create a test suite which excludes some tests from suite.
 
2360
 
 
2361
    :param suite: The suite to get tests from.
 
2362
    :param condition: A callable whose result evaluates True when called with a
 
2363
        test case which should be excluded from the result.
 
2364
    :return: A suite which contains the tests found in suite that fail
 
2365
        condition.
 
2366
    """
 
2367
    result = []
 
2368
    for test in iter_suite_tests(suite):
 
2369
        if not condition(test):
 
2370
            result.append(test)
 
2371
    return TestUtil.TestSuite(result)
 
2372
 
 
2373
 
 
2374
def filter_suite_by_condition(suite, condition):
 
2375
    """Create a test suite by filtering another one.
 
2376
 
 
2377
    :param suite: The source suite.
 
2378
    :param condition: A callable whose result evaluates True when called with a
 
2379
        test case which should be included in the result.
 
2380
    :return: A suite which contains the tests found in suite that pass
 
2381
        condition.
 
2382
    """
 
2383
    result = []
 
2384
    for test in iter_suite_tests(suite):
 
2385
        if condition(test):
 
2386
            result.append(test)
 
2387
    return TestUtil.TestSuite(result)
 
2388
 
 
2389
 
 
2390
def filter_suite_by_re(suite, pattern):
 
2391
    """Create a test suite by filtering another one.
 
2392
 
 
2393
    :param suite:           the source suite
 
2394
    :param pattern:         pattern that names must match
 
2395
    :returns: the newly created suite
 
2396
    """
 
2397
    condition = condition_id_re(pattern)
 
2398
    result_suite = filter_suite_by_condition(suite, condition)
 
2399
    return result_suite
 
2400
 
 
2401
 
 
2402
def filter_suite_by_id_list(suite, test_id_list):
 
2403
    """Create a test suite by filtering another one.
 
2404
 
 
2405
    :param suite: The source suite.
 
2406
    :param test_id_list: A list of the test ids to keep as strings.
 
2407
    :returns: the newly created suite
 
2408
    """
 
2409
    condition = condition_id_in_list(test_id_list)
 
2410
    result_suite = filter_suite_by_condition(suite, condition)
 
2411
    return result_suite
 
2412
 
 
2413
 
 
2414
def filter_suite_by_id_startswith(suite, start):
 
2415
    """Create a test suite by filtering another one.
 
2416
 
 
2417
    :param suite: The source suite.
 
2418
    :param start: A list of string the test id must start with one of.
 
2419
    :returns: the newly created suite
 
2420
    """
 
2421
    condition = condition_id_startswith(start)
 
2422
    result_suite = filter_suite_by_condition(suite, condition)
 
2423
    return result_suite
 
2424
 
 
2425
 
 
2426
def exclude_tests_by_re(suite, pattern):
 
2427
    """Create a test suite which excludes some tests from suite.
 
2428
 
 
2429
    :param suite: The suite to get tests from.
 
2430
    :param pattern: A regular expression string. Test ids that match this
 
2431
        pattern will be excluded from the result.
 
2432
    :return: A TestSuite that contains all the tests from suite without the
 
2433
        tests that matched pattern. The order of tests is the same as it was in
 
2434
        suite.
 
2435
    """
 
2436
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2437
 
 
2438
 
 
2439
def preserve_input(something):
 
2440
    """A helper for performing test suite transformation chains.
 
2441
 
 
2442
    :param something: Anything you want to preserve.
 
2443
    :return: Something.
 
2444
    """
 
2445
    return something
 
2446
 
 
2447
 
 
2448
def randomize_suite(suite):
 
2449
    """Return a new TestSuite with suite's tests in random order.
 
2450
 
 
2451
    The tests in the input suite are flattened into a single suite in order to
 
2452
    accomplish this. Any nested TestSuites are removed to provide global
 
2453
    randomness.
 
2454
    """
 
2455
    tests = list(iter_suite_tests(suite))
 
2456
    random.shuffle(tests)
 
2457
    return TestUtil.TestSuite(tests)
 
2458
 
 
2459
 
 
2460
def split_suite_by_condition(suite, condition):
 
2461
    """Split a test suite into two by a condition.
 
2462
 
 
2463
    :param suite: The suite to split.
 
2464
    :param condition: The condition to match on. Tests that match this
 
2465
        condition are returned in the first test suite, ones that do not match
 
2466
        are in the second suite.
 
2467
    :return: A tuple of two test suites, where the first contains tests from
 
2468
        suite matching the condition, and the second contains the remainder
 
2469
        from suite. The order within each output suite is the same as it was in
 
2470
        suite.
 
2471
    """
 
2472
    matched = []
 
2473
    did_not_match = []
 
2474
    for test in iter_suite_tests(suite):
 
2475
        if condition(test):
 
2476
            matched.append(test)
 
2477
        else:
 
2478
            did_not_match.append(test)
 
2479
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2480
 
 
2481
 
 
2482
def split_suite_by_re(suite, pattern):
 
2483
    """Split a test suite into two by a regular expression.
 
2484
 
 
2485
    :param suite: The suite to split.
 
2486
    :param pattern: A regular expression string. Test ids that match this
 
2487
        pattern will be in the first test suite returned, and the others in the
 
2488
        second test suite returned.
 
2489
    :return: A tuple of two test suites, where the first contains tests from
 
2490
        suite matching pattern, and the second contains the remainder from
 
2491
        suite. The order within each output suite is the same as it was in
 
2492
        suite.
 
2493
    """
 
2494
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2495
 
 
2496
 
 
2497
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2498
              stop_on_failure=False,
 
2499
              transport=None, lsprof_timed=None, bench_history=None,
 
2500
              matching_tests_first=None,
 
2501
              list_only=False,
 
2502
              random_seed=None,
 
2503
              exclude_pattern=None,
 
2504
              strict=False,
 
2505
              runner_class=None):
 
2506
    """Run a test suite for bzr selftest.
 
2507
 
 
2508
    :param runner_class: The class of runner to use. Must support the
 
2509
        constructor arguments passed by run_suite which are more than standard
 
2510
        python uses.
 
2511
    :return: A boolean indicating success.
 
2512
    """
 
2513
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2514
    if verbose:
 
2515
        verbosity = 2
 
2516
    else:
 
2517
        verbosity = 1
 
2518
    if runner_class is None:
 
2519
        runner_class = TextTestRunner
 
2520
    runner = runner_class(stream=sys.stdout,
 
2521
                            descriptions=0,
 
2522
                            verbosity=verbosity,
 
2523
                            bench_history=bench_history,
 
2524
                            list_only=list_only,
 
2525
                            )
 
2526
    runner.stop_on_failure=stop_on_failure
 
2527
    # Initialise the random number generator and display the seed used.
 
2528
    # We convert the seed to a long to make it reuseable across invocations.
 
2529
    random_order = False
 
2530
    if random_seed is not None:
 
2531
        random_order = True
 
2532
        if random_seed == "now":
 
2533
            random_seed = long(time.time())
 
2534
        else:
 
2535
            # Convert the seed to a long if we can
 
2536
            try:
 
2537
                random_seed = long(random_seed)
 
2538
            except:
 
2539
                pass
 
2540
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2541
            (random_seed))
 
2542
        random.seed(random_seed)
 
2543
    # Customise the list of tests if requested
 
2544
    if exclude_pattern is not None:
 
2545
        suite = exclude_tests_by_re(suite, exclude_pattern)
 
2546
    if random_order:
 
2547
        order_changer = randomize_suite
 
2548
    else:
 
2549
        order_changer = preserve_input
 
2550
    if pattern != '.*' or random_order:
 
2551
        if matching_tests_first:
 
2552
            suites = map(order_changer, split_suite_by_re(suite, pattern))
 
2553
            suite = TestUtil.TestSuite(suites)
 
2554
        else:
 
2555
            suite = order_changer(filter_suite_by_re(suite, pattern))
 
2556
 
 
2557
    result = runner.run(suite)
 
2558
 
 
2559
    if strict:
 
2560
        return result.wasStrictlySuccessful()
 
2561
 
 
2562
    return result.wasSuccessful()
 
2563
 
 
2564
 
 
2565
# Controlled by "bzr selftest -E=..." option
 
2566
selftest_debug_flags = set()
 
2567
 
 
2568
 
 
2569
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2570
             transport=None,
 
2571
             test_suite_factory=None,
 
2572
             lsprof_timed=None,
 
2573
             bench_history=None,
 
2574
             matching_tests_first=None,
 
2575
             list_only=False,
 
2576
             random_seed=None,
 
2577
             exclude_pattern=None,
 
2578
             strict=False,
 
2579
             load_list=None,
 
2580
             debug_flags=None,
 
2581
             starting_with=None,
 
2582
             runner_class=None,
 
2583
             ):
 
2584
    """Run the whole test suite under the enhanced runner"""
 
2585
    # XXX: Very ugly way to do this...
 
2586
    # Disable warning about old formats because we don't want it to disturb
 
2587
    # any blackbox tests.
 
2588
    from bzrlib import repository
 
2589
    repository._deprecation_warning_done = True
 
2590
 
 
2591
    global default_transport
 
2592
    if transport is None:
 
2593
        transport = default_transport
 
2594
    old_transport = default_transport
 
2595
    default_transport = transport
 
2596
    global selftest_debug_flags
 
2597
    old_debug_flags = selftest_debug_flags
 
2598
    if debug_flags is not None:
 
2599
        selftest_debug_flags = set(debug_flags)
 
2600
    try:
 
2601
        if load_list is None:
 
2602
            keep_only = None
 
2603
        else:
 
2604
            keep_only = load_test_id_list(load_list)
 
2605
        if test_suite_factory is None:
 
2606
            suite = test_suite(keep_only, starting_with)
 
2607
        else:
 
2608
            suite = test_suite_factory()
 
2609
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2610
                     stop_on_failure=stop_on_failure,
 
2611
                     transport=transport,
 
2612
                     lsprof_timed=lsprof_timed,
 
2613
                     bench_history=bench_history,
 
2614
                     matching_tests_first=matching_tests_first,
 
2615
                     list_only=list_only,
 
2616
                     random_seed=random_seed,
 
2617
                     exclude_pattern=exclude_pattern,
 
2618
                     strict=strict,
 
2619
                     runner_class=runner_class,
 
2620
                     )
 
2621
    finally:
 
2622
        default_transport = old_transport
 
2623
        selftest_debug_flags = old_debug_flags
 
2624
 
 
2625
 
 
2626
def load_test_id_list(file_name):
 
2627
    """Load a test id list from a text file.
 
2628
 
 
2629
    The format is one test id by line.  No special care is taken to impose
 
2630
    strict rules, these test ids are used to filter the test suite so a test id
 
2631
    that do not match an existing test will do no harm. This allows user to add
 
2632
    comments, leave blank lines, etc.
 
2633
    """
 
2634
    test_list = []
 
2635
    try:
 
2636
        ftest = open(file_name, 'rt')
 
2637
    except IOError, e:
 
2638
        if e.errno != errno.ENOENT:
 
2639
            raise
 
2640
        else:
 
2641
            raise errors.NoSuchFile(file_name)
 
2642
 
 
2643
    for test_name in ftest.readlines():
 
2644
        test_list.append(test_name.strip())
 
2645
    ftest.close()
 
2646
    return test_list
 
2647
 
 
2648
 
 
2649
def suite_matches_id_list(test_suite, id_list):
 
2650
    """Warns about tests not appearing or appearing more than once.
 
2651
 
 
2652
    :param test_suite: A TestSuite object.
 
2653
    :param test_id_list: The list of test ids that should be found in
 
2654
         test_suite.
 
2655
 
 
2656
    :return: (absents, duplicates) absents is a list containing the test found
 
2657
        in id_list but not in test_suite, duplicates is a list containing the
 
2658
        test found multiple times in test_suite.
 
2659
 
 
2660
    When using a prefined test id list, it may occurs that some tests do not
 
2661
    exist anymore or that some tests use the same id. This function warns the
 
2662
    tester about potential problems in his workflow (test lists are volatile)
 
2663
    or in the test suite itself (using the same id for several tests does not
 
2664
    help to localize defects).
 
2665
    """
 
2666
    # Build a dict counting id occurrences
 
2667
    tests = dict()
 
2668
    for test in iter_suite_tests(test_suite):
 
2669
        id = test.id()
 
2670
        tests[id] = tests.get(id, 0) + 1
 
2671
 
 
2672
    not_found = []
 
2673
    duplicates = []
 
2674
    for id in id_list:
 
2675
        occurs = tests.get(id, 0)
 
2676
        if not occurs:
 
2677
            not_found.append(id)
 
2678
        elif occurs > 1:
 
2679
            duplicates.append(id)
 
2680
 
 
2681
    return not_found, duplicates
 
2682
 
 
2683
 
 
2684
class TestIdList(object):
 
2685
    """Test id list to filter a test suite.
 
2686
 
 
2687
    Relying on the assumption that test ids are built as:
 
2688
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
2689
    notation, this class offers methods to :
 
2690
    - avoid building a test suite for modules not refered to in the test list,
 
2691
    - keep only the tests listed from the module test suite.
 
2692
    """
 
2693
 
 
2694
    def __init__(self, test_id_list):
 
2695
        # When a test suite needs to be filtered against us we compare test ids
 
2696
        # for equality, so a simple dict offers a quick and simple solution.
 
2697
        self.tests = dict().fromkeys(test_id_list, True)
 
2698
 
 
2699
        # While unittest.TestCase have ids like:
 
2700
        # <module>.<class>.<method>[(<param+)],
 
2701
        # doctest.DocTestCase can have ids like:
 
2702
        # <module>
 
2703
        # <module>.<class>
 
2704
        # <module>.<function>
 
2705
        # <module>.<class>.<method>
 
2706
 
 
2707
        # Since we can't predict a test class from its name only, we settle on
 
2708
        # a simple constraint: a test id always begins with its module name.
 
2709
 
 
2710
        modules = {}
 
2711
        for test_id in test_id_list:
 
2712
            parts = test_id.split('.')
 
2713
            mod_name = parts.pop(0)
 
2714
            modules[mod_name] = True
 
2715
            for part in parts:
 
2716
                mod_name += '.' + part
 
2717
                modules[mod_name] = True
 
2718
        self.modules = modules
 
2719
 
 
2720
    def refers_to(self, module_name):
 
2721
        """Is there tests for the module or one of its sub modules."""
 
2722
        return self.modules.has_key(module_name)
 
2723
 
 
2724
    def includes(self, test_id):
 
2725
        return self.tests.has_key(test_id)
 
2726
 
 
2727
 
 
2728
class TestPrefixAliasRegistry(registry.Registry):
 
2729
    """A registry for test prefix aliases.
 
2730
 
 
2731
    This helps implement shorcuts for the --starting-with selftest
 
2732
    option. Overriding existing prefixes is not allowed but not fatal (a
 
2733
    warning will be emitted).
 
2734
    """
 
2735
 
 
2736
    def register(self, key, obj, help=None, info=None,
 
2737
                 override_existing=False):
 
2738
        """See Registry.register.
 
2739
 
 
2740
        Trying to override an existing alias causes a warning to be emitted,
 
2741
        not a fatal execption.
 
2742
        """
 
2743
        try:
 
2744
            super(TestPrefixAliasRegistry, self).register(
 
2745
                key, obj, help=help, info=info, override_existing=False)
 
2746
        except KeyError:
 
2747
            actual = self.get(key)
 
2748
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
2749
                 % (key, actual, obj))
 
2750
 
 
2751
    def resolve_alias(self, id_start):
 
2752
        """Replace the alias by the prefix in the given string.
 
2753
 
 
2754
        Using an unknown prefix is an error to help catching typos.
 
2755
        """
 
2756
        parts = id_start.split('.')
 
2757
        try:
 
2758
            parts[0] = self.get(parts[0])
 
2759
        except KeyError:
 
2760
            raise errors.BzrCommandError(
 
2761
                '%s is not a known test prefix alias' % parts[0])
 
2762
        return '.'.join(parts)
 
2763
 
 
2764
 
 
2765
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
2766
"""Registry of test prefix aliases."""
 
2767
 
 
2768
 
 
2769
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
2770
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
2771
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
2772
 
 
2773
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
2774
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
2775
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
2776
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
2777
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
2778
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
2779
 
 
2780
 
 
2781
def test_suite(keep_only=None, starting_with=None):
 
2782
    """Build and return TestSuite for the whole of bzrlib.
 
2783
 
 
2784
    :param keep_only: A list of test ids limiting the suite returned.
 
2785
 
 
2786
    :param starting_with: An id limiting the suite returned to the tests
 
2787
         starting with it.
 
2788
 
 
2789
    This function can be replaced if you need to change the default test
 
2790
    suite on a global basis, but it is not encouraged.
 
2791
    """
 
2792
    testmod_names = [
 
2793
                   'bzrlib.doc',
 
2794
                   'bzrlib.tests.blackbox',
 
2795
                   'bzrlib.tests.branch_implementations',
 
2796
                   'bzrlib.tests.bzrdir_implementations',
 
2797
                   'bzrlib.tests.commands',
 
2798
                   'bzrlib.tests.interrepository_implementations',
 
2799
                   'bzrlib.tests.intertree_implementations',
 
2800
                   'bzrlib.tests.inventory_implementations',
 
2801
                   'bzrlib.tests.per_interbranch',
 
2802
                   'bzrlib.tests.per_lock',
 
2803
                   'bzrlib.tests.per_repository',
 
2804
                   'bzrlib.tests.per_repository_reference',
 
2805
                   'bzrlib.tests.test__dirstate_helpers',
 
2806
                   'bzrlib.tests.test__walkdirs_win32',
 
2807
                   'bzrlib.tests.test_ancestry',
 
2808
                   'bzrlib.tests.test_annotate',
 
2809
                   'bzrlib.tests.test_api',
 
2810
                   'bzrlib.tests.test_atomicfile',
 
2811
                   'bzrlib.tests.test_bad_files',
 
2812
                   'bzrlib.tests.test_bisect_multi',
 
2813
                   'bzrlib.tests.test_branch',
 
2814
                   'bzrlib.tests.test_branchbuilder',
 
2815
                   'bzrlib.tests.test_btree_index',
 
2816
                   'bzrlib.tests.test_bugtracker',
 
2817
                   'bzrlib.tests.test_bundle',
 
2818
                   'bzrlib.tests.test_bzrdir',
 
2819
                   'bzrlib.tests.test_cache_utf8',
 
2820
                   'bzrlib.tests.test_chunk_writer',
 
2821
                   'bzrlib.tests.test__chunks_to_lines',
 
2822
                   'bzrlib.tests.test_commands',
 
2823
                   'bzrlib.tests.test_commit',
 
2824
                   'bzrlib.tests.test_commit_merge',
 
2825
                   'bzrlib.tests.test_config',
 
2826
                   'bzrlib.tests.test_conflicts',
 
2827
                   'bzrlib.tests.test_counted_lock',
 
2828
                   'bzrlib.tests.test_decorators',
 
2829
                   'bzrlib.tests.test_delta',
 
2830
                   'bzrlib.tests.test_deprecated_graph',
 
2831
                   'bzrlib.tests.test_diff',
 
2832
                   'bzrlib.tests.test_directory_service',
 
2833
                   'bzrlib.tests.test_dirstate',
 
2834
                   'bzrlib.tests.test_email_message',
 
2835
                   'bzrlib.tests.test_errors',
 
2836
                   'bzrlib.tests.test_export',
 
2837
                   'bzrlib.tests.test_extract',
 
2838
                   'bzrlib.tests.test_fetch',
 
2839
                   'bzrlib.tests.test_fifo_cache',
 
2840
                   'bzrlib.tests.test_ftp_transport',
 
2841
                   'bzrlib.tests.test_foreign',
 
2842
                   'bzrlib.tests.test_generate_docs',
 
2843
                   'bzrlib.tests.test_generate_ids',
 
2844
                   'bzrlib.tests.test_globbing',
 
2845
                   'bzrlib.tests.test_gpg',
 
2846
                   'bzrlib.tests.test_graph',
 
2847
                   'bzrlib.tests.test_hashcache',
 
2848
                   'bzrlib.tests.test_help',
 
2849
                   'bzrlib.tests.test_hooks',
 
2850
                   'bzrlib.tests.test_http',
 
2851
                   'bzrlib.tests.test_http_implementations',
 
2852
                   'bzrlib.tests.test_http_response',
 
2853
                   'bzrlib.tests.test_https_ca_bundle',
 
2854
                   'bzrlib.tests.test_identitymap',
 
2855
                   'bzrlib.tests.test_ignores',
 
2856
                   'bzrlib.tests.test_index',
 
2857
                   'bzrlib.tests.test_info',
 
2858
                   'bzrlib.tests.test_inv',
 
2859
                   'bzrlib.tests.test_knit',
 
2860
                   'bzrlib.tests.test_lazy_import',
 
2861
                   'bzrlib.tests.test_lazy_regex',
 
2862
                   'bzrlib.tests.test_lockable_files',
 
2863
                   'bzrlib.tests.test_lockdir',
 
2864
                   'bzrlib.tests.test_log',
 
2865
                   'bzrlib.tests.test_lru_cache',
 
2866
                   'bzrlib.tests.test_lsprof',
 
2867
                   'bzrlib.tests.test_mail_client',
 
2868
                   'bzrlib.tests.test_memorytree',
 
2869
                   'bzrlib.tests.test_merge',
 
2870
                   'bzrlib.tests.test_merge3',
 
2871
                   'bzrlib.tests.test_merge_core',
 
2872
                   'bzrlib.tests.test_merge_directive',
 
2873
                   'bzrlib.tests.test_missing',
 
2874
                   'bzrlib.tests.test_msgeditor',
 
2875
                   'bzrlib.tests.test_multiparent',
 
2876
                   'bzrlib.tests.test_mutabletree',
 
2877
                   'bzrlib.tests.test_nonascii',
 
2878
                   'bzrlib.tests.test_options',
 
2879
                   'bzrlib.tests.test_osutils',
 
2880
                   'bzrlib.tests.test_osutils_encodings',
 
2881
                   'bzrlib.tests.test_pack',
 
2882
                   'bzrlib.tests.test_pack_repository',
 
2883
                   'bzrlib.tests.test_patch',
 
2884
                   'bzrlib.tests.test_patches',
 
2885
                   'bzrlib.tests.test_permissions',
 
2886
                   'bzrlib.tests.test_plugins',
 
2887
                   'bzrlib.tests.test_progress',
 
2888
                   'bzrlib.tests.test_read_bundle',
 
2889
                   'bzrlib.tests.test_reconcile',
 
2890
                   'bzrlib.tests.test_reconfigure',
 
2891
                   'bzrlib.tests.test_registry',
 
2892
                   'bzrlib.tests.test_remote',
 
2893
                   'bzrlib.tests.test_repository',
 
2894
                   'bzrlib.tests.test_revert',
 
2895
                   'bzrlib.tests.test_revision',
 
2896
                   'bzrlib.tests.test_revisionspec',
 
2897
                   'bzrlib.tests.test_revisiontree',
 
2898
                   'bzrlib.tests.test_rio',
 
2899
                   'bzrlib.tests.test_rules',
 
2900
                   'bzrlib.tests.test_sampler',
 
2901
                   'bzrlib.tests.test_selftest',
 
2902
                   'bzrlib.tests.test_setup',
 
2903
                   'bzrlib.tests.test_sftp_transport',
 
2904
                   'bzrlib.tests.test_shelf',
 
2905
                   'bzrlib.tests.test_shelf_ui',
 
2906
                   'bzrlib.tests.test_smart',
 
2907
                   'bzrlib.tests.test_smart_add',
 
2908
                   'bzrlib.tests.test_smart_request',
 
2909
                   'bzrlib.tests.test_smart_transport',
 
2910
                   'bzrlib.tests.test_smtp_connection',
 
2911
                   'bzrlib.tests.test_source',
 
2912
                   'bzrlib.tests.test_ssh_transport',
 
2913
                   'bzrlib.tests.test_status',
 
2914
                   'bzrlib.tests.test_store',
 
2915
                   'bzrlib.tests.test_strace',
 
2916
                   'bzrlib.tests.test_subsume',
 
2917
                   'bzrlib.tests.test_switch',
 
2918
                   'bzrlib.tests.test_symbol_versioning',
 
2919
                   'bzrlib.tests.test_tag',
 
2920
                   'bzrlib.tests.test_testament',
 
2921
                   'bzrlib.tests.test_textfile',
 
2922
                   'bzrlib.tests.test_textmerge',
 
2923
                   'bzrlib.tests.test_timestamp',
 
2924
                   'bzrlib.tests.test_trace',
 
2925
                   'bzrlib.tests.test_transactions',
 
2926
                   'bzrlib.tests.test_transform',
 
2927
                   'bzrlib.tests.test_transport',
 
2928
                   'bzrlib.tests.test_transport_implementations',
 
2929
                   'bzrlib.tests.test_transport_log',
 
2930
                   'bzrlib.tests.test_tree',
 
2931
                   'bzrlib.tests.test_treebuilder',
 
2932
                   'bzrlib.tests.test_tsort',
 
2933
                   'bzrlib.tests.test_tuned_gzip',
 
2934
                   'bzrlib.tests.test_ui',
 
2935
                   'bzrlib.tests.test_uncommit',
 
2936
                   'bzrlib.tests.test_upgrade',
 
2937
                   'bzrlib.tests.test_upgrade_stacked',
 
2938
                   'bzrlib.tests.test_urlutils',
 
2939
                   'bzrlib.tests.test_version',
 
2940
                   'bzrlib.tests.test_version_info',
 
2941
                   'bzrlib.tests.test_versionedfile',
 
2942
                   'bzrlib.tests.test_weave',
 
2943
                   'bzrlib.tests.test_whitebox',
 
2944
                   'bzrlib.tests.test_win32utils',
 
2945
                   'bzrlib.tests.test_workingtree',
 
2946
                   'bzrlib.tests.test_workingtree_4',
 
2947
                   'bzrlib.tests.test_wsgi',
 
2948
                   'bzrlib.tests.test_xml',
 
2949
                   'bzrlib.tests.tree_implementations',
 
2950
                   'bzrlib.tests.workingtree_implementations',
 
2951
                   'bzrlib.util.tests.test_bencode',
 
2952
                   ]
 
2953
 
 
2954
    loader = TestUtil.TestLoader()
 
2955
 
 
2956
    if starting_with:
 
2957
        starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
2958
                         for start in starting_with]
 
2959
        # We take precedence over keep_only because *at loading time* using
 
2960
        # both options means we will load less tests for the same final result.
 
2961
        def interesting_module(name):
 
2962
            for start in starting_with:
 
2963
                if (
 
2964
                    # Either the module name starts with the specified string
 
2965
                    name.startswith(start)
 
2966
                    # or it may contain tests starting with the specified string
 
2967
                    or start.startswith(name)
 
2968
                    ):
 
2969
                    return True
 
2970
            return False
 
2971
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
2972
 
 
2973
    elif keep_only is not None:
 
2974
        id_filter = TestIdList(keep_only)
 
2975
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
2976
        def interesting_module(name):
 
2977
            return id_filter.refers_to(name)
 
2978
 
 
2979
    else:
 
2980
        loader = TestUtil.TestLoader()
 
2981
        def interesting_module(name):
 
2982
            # No filtering, all modules are interesting
 
2983
            return True
 
2984
 
 
2985
    suite = loader.suiteClass()
 
2986
 
 
2987
    # modules building their suite with loadTestsFromModuleNames
 
2988
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2989
 
 
2990
    modules_to_doctest = [
 
2991
        'bzrlib',
 
2992
        'bzrlib.branchbuilder',
 
2993
        'bzrlib.export',
 
2994
        'bzrlib.inventory',
 
2995
        'bzrlib.iterablefile',
 
2996
        'bzrlib.lockdir',
 
2997
        'bzrlib.merge3',
 
2998
        'bzrlib.option',
 
2999
        'bzrlib.symbol_versioning',
 
3000
        'bzrlib.tests',
 
3001
        'bzrlib.timestamp',
 
3002
        'bzrlib.version_info_formats.format_custom',
 
3003
        ]
 
3004
 
 
3005
    for mod in modules_to_doctest:
 
3006
        if not interesting_module(mod):
 
3007
            # No tests to keep here, move along
 
3008
            continue
 
3009
        try:
 
3010
            # note that this really does mean "report only" -- doctest
 
3011
            # still runs the rest of the examples
 
3012
            doc_suite = doctest.DocTestSuite(mod,
 
3013
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3014
        except ValueError, e:
 
3015
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3016
            raise
 
3017
        if len(doc_suite._tests) == 0:
 
3018
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3019
        suite.addTest(doc_suite)
 
3020
 
 
3021
    default_encoding = sys.getdefaultencoding()
 
3022
    for name, plugin in bzrlib.plugin.plugins().items():
 
3023
        if not interesting_module(plugin.module.__name__):
 
3024
            continue
 
3025
        plugin_suite = plugin.test_suite()
 
3026
        # We used to catch ImportError here and turn it into just a warning,
 
3027
        # but really if you don't have --no-plugins this should be a failure.
 
3028
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3029
        if plugin_suite is None:
 
3030
            plugin_suite = plugin.load_plugin_tests(loader)
 
3031
        if plugin_suite is not None:
 
3032
            suite.addTest(plugin_suite)
 
3033
        if default_encoding != sys.getdefaultencoding():
 
3034
            bzrlib.trace.warning(
 
3035
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3036
                sys.getdefaultencoding())
 
3037
            reload(sys)
 
3038
            sys.setdefaultencoding(default_encoding)
 
3039
 
 
3040
    if starting_with:
 
3041
        suite = filter_suite_by_id_startswith(suite, starting_with)
 
3042
 
 
3043
    if keep_only is not None:
 
3044
        # Now that the referred modules have loaded their tests, keep only the
 
3045
        # requested ones.
 
3046
        suite = filter_suite_by_id_list(suite, id_filter)
 
3047
        # Do some sanity checks on the id_list filtering
 
3048
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3049
        if starting_with:
 
3050
            # The tester has used both keep_only and starting_with, so he is
 
3051
            # already aware that some tests are excluded from the list, there
 
3052
            # is no need to tell him which.
 
3053
            pass
 
3054
        else:
 
3055
            # Some tests mentioned in the list are not in the test suite. The
 
3056
            # list may be out of date, report to the tester.
 
3057
            for id in not_found:
 
3058
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3059
        for id in duplicates:
 
3060
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3061
 
 
3062
    return suite
 
3063
 
 
3064
 
 
3065
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
 
3066
    """Adapt all tests in some given modules to given scenarios.
 
3067
 
 
3068
    This is the recommended public interface for test parameterization.
 
3069
    Typically the test_suite() method for a per-implementation test
 
3070
    suite will call multiply_tests_from_modules and return the
 
3071
    result.
 
3072
 
 
3073
    :param module_name_list: List of fully-qualified names of test
 
3074
        modules.
 
3075
    :param scenario_iter: Iterable of pairs of (scenario_name,
 
3076
        scenario_param_dict).
 
3077
    :param loader: If provided, will be used instead of a new
 
3078
        bzrlib.tests.TestLoader() instance.
 
3079
 
 
3080
    This returns a new TestSuite containing the cross product of
 
3081
    all the tests in all the modules, each repeated for each scenario.
 
3082
    Each test is adapted by adding the scenario name at the end
 
3083
    of its name, and updating the test object's __dict__ with the
 
3084
    scenario_param_dict.
 
3085
 
 
3086
    >>> r = multiply_tests_from_modules(
 
3087
    ...     ['bzrlib.tests.test_sampler'],
 
3088
    ...     [('one', dict(param=1)),
 
3089
    ...      ('two', dict(param=2))])
 
3090
    >>> tests = list(iter_suite_tests(r))
 
3091
    >>> len(tests)
 
3092
    2
 
3093
    >>> tests[0].id()
 
3094
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
3095
    >>> tests[0].param
 
3096
    1
 
3097
    >>> tests[1].param
 
3098
    2
 
3099
    """
 
3100
    # XXX: Isn't load_tests() a better way to provide the same functionality
 
3101
    # without forcing a predefined TestScenarioApplier ? --vila 080215
 
3102
    if loader is None:
 
3103
        loader = TestUtil.TestLoader()
 
3104
 
 
3105
    suite = loader.suiteClass()
 
3106
 
 
3107
    adapter = TestScenarioApplier()
 
3108
    adapter.scenarios = list(scenario_iter)
 
3109
    adapt_modules(module_name_list, adapter, loader, suite)
 
3110
    return suite
 
3111
 
 
3112
 
 
3113
def multiply_scenarios(scenarios_left, scenarios_right):
 
3114
    """Multiply two sets of scenarios.
 
3115
 
 
3116
    :returns: the cartesian product of the two sets of scenarios, that is
 
3117
        a scenario for every possible combination of a left scenario and a
 
3118
        right scenario.
 
3119
    """
 
3120
    return [
 
3121
        ('%s,%s' % (left_name, right_name),
 
3122
         dict(left_dict.items() + right_dict.items()))
 
3123
        for left_name, left_dict in scenarios_left
 
3124
        for right_name, right_dict in scenarios_right]
 
3125
 
 
3126
 
 
3127
 
 
3128
def adapt_modules(mods_list, adapter, loader, suite):
 
3129
    """Adapt the modules in mods_list using adapter and add to suite."""
 
3130
    tests = loader.loadTestsFromModuleNames(mods_list)
 
3131
    adapt_tests(tests, adapter, suite)
 
3132
 
 
3133
 
 
3134
def adapt_tests(tests_list, adapter, suite):
 
3135
    """Adapt the tests in tests_list using adapter and add to suite."""
 
3136
    for test in iter_suite_tests(tests_list):
 
3137
        suite.addTests(adapter.adapt(test))
 
3138
 
 
3139
 
 
3140
def _rmtree_temp_dir(dirname):
 
3141
    # If LANG=C we probably have created some bogus paths
 
3142
    # which rmtree(unicode) will fail to delete
 
3143
    # so make sure we are using rmtree(str) to delete everything
 
3144
    # except on win32, where rmtree(str) will fail
 
3145
    # since it doesn't have the property of byte-stream paths
 
3146
    # (they are either ascii or mbcs)
 
3147
    if sys.platform == 'win32':
 
3148
        # make sure we are using the unicode win32 api
 
3149
        dirname = unicode(dirname)
 
3150
    else:
 
3151
        dirname = dirname.encode(sys.getfilesystemencoding())
 
3152
    try:
 
3153
        osutils.rmtree(dirname)
 
3154
    except OSError, e:
 
3155
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
3156
            sys.stderr.write(('Permission denied: '
 
3157
                                 'unable to remove testing dir '
 
3158
                                 '%s\n' % os.path.basename(dirname)))
 
3159
        else:
 
3160
            raise
 
3161
 
 
3162
 
 
3163
class Feature(object):
 
3164
    """An operating system Feature."""
 
3165
 
 
3166
    def __init__(self):
 
3167
        self._available = None
 
3168
 
 
3169
    def available(self):
 
3170
        """Is the feature available?
 
3171
 
 
3172
        :return: True if the feature is available.
 
3173
        """
 
3174
        if self._available is None:
 
3175
            self._available = self._probe()
 
3176
        return self._available
 
3177
 
 
3178
    def _probe(self):
 
3179
        """Implement this method in concrete features.
 
3180
 
 
3181
        :return: True if the feature is available.
 
3182
        """
 
3183
        raise NotImplementedError
 
3184
 
 
3185
    def __str__(self):
 
3186
        if getattr(self, 'feature_name', None):
 
3187
            return self.feature_name()
 
3188
        return self.__class__.__name__
 
3189
 
 
3190
 
 
3191
class _SymlinkFeature(Feature):
 
3192
 
 
3193
    def _probe(self):
 
3194
        return osutils.has_symlinks()
 
3195
 
 
3196
    def feature_name(self):
 
3197
        return 'symlinks'
 
3198
 
 
3199
SymlinkFeature = _SymlinkFeature()
 
3200
 
 
3201
 
 
3202
class _HardlinkFeature(Feature):
 
3203
 
 
3204
    def _probe(self):
 
3205
        return osutils.has_hardlinks()
 
3206
 
 
3207
    def feature_name(self):
 
3208
        return 'hardlinks'
 
3209
 
 
3210
HardlinkFeature = _HardlinkFeature()
 
3211
 
 
3212
 
 
3213
class _OsFifoFeature(Feature):
 
3214
 
 
3215
    def _probe(self):
 
3216
        return getattr(os, 'mkfifo', None)
 
3217
 
 
3218
    def feature_name(self):
 
3219
        return 'filesystem fifos'
 
3220
 
 
3221
OsFifoFeature = _OsFifoFeature()
 
3222
 
 
3223
 
 
3224
class _UnicodeFilenameFeature(Feature):
 
3225
    """Does the filesystem support Unicode filenames?"""
 
3226
 
 
3227
    def _probe(self):
 
3228
        try:
 
3229
            # Check for character combinations unlikely to be covered by any
 
3230
            # single non-unicode encoding. We use the characters
 
3231
            # - greek small letter alpha (U+03B1) and
 
3232
            # - braille pattern dots-123456 (U+283F).
 
3233
            os.stat(u'\u03b1\u283f')
 
3234
        except UnicodeEncodeError:
 
3235
            return False
 
3236
        except (IOError, OSError):
 
3237
            # The filesystem allows the Unicode filename but the file doesn't
 
3238
            # exist.
 
3239
            return True
 
3240
        else:
 
3241
            # The filesystem allows the Unicode filename and the file exists,
 
3242
            # for some reason.
 
3243
            return True
 
3244
 
 
3245
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
3246
 
 
3247
 
 
3248
class TestScenarioApplier(object):
 
3249
    """A tool to apply scenarios to tests."""
 
3250
 
 
3251
    def adapt(self, test):
 
3252
        """Return a TestSuite containing a copy of test for each scenario."""
 
3253
        result = unittest.TestSuite()
 
3254
        for scenario in self.scenarios:
 
3255
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
3256
        return result
 
3257
 
 
3258
    def adapt_test_to_scenario(self, test, scenario):
 
3259
        """Copy test and apply scenario to it.
 
3260
 
 
3261
        :param test: A test to adapt.
 
3262
        :param scenario: A tuple describing the scenarion.
 
3263
            The first element of the tuple is the new test id.
 
3264
            The second element is a dict containing attributes to set on the
 
3265
            test.
 
3266
        :return: The adapted test.
 
3267
        """
 
3268
        from copy import deepcopy
 
3269
        new_test = deepcopy(test)
 
3270
        for name, value in scenario[1].items():
 
3271
            setattr(new_test, name, value)
 
3272
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
3273
        new_test.id = lambda: new_id
 
3274
        return new_test
 
3275
 
 
3276
 
 
3277
def probe_unicode_in_user_encoding():
 
3278
    """Try to encode several unicode strings to use in unicode-aware tests.
 
3279
    Return first successfull match.
 
3280
 
 
3281
    :return:  (unicode value, encoded plain string value) or (None, None)
 
3282
    """
 
3283
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
3284
    for uni_val in possible_vals:
 
3285
        try:
 
3286
            str_val = uni_val.encode(osutils.get_user_encoding())
 
3287
        except UnicodeEncodeError:
 
3288
            # Try a different character
 
3289
            pass
 
3290
        else:
 
3291
            return uni_val, str_val
 
3292
    return None, None
 
3293
 
 
3294
 
 
3295
def probe_bad_non_ascii(encoding):
 
3296
    """Try to find [bad] character with code [128..255]
 
3297
    that cannot be decoded to unicode in some encoding.
 
3298
    Return None if all non-ascii characters is valid
 
3299
    for given encoding.
 
3300
    """
 
3301
    for i in xrange(128, 256):
 
3302
        char = chr(i)
 
3303
        try:
 
3304
            char.decode(encoding)
 
3305
        except UnicodeDecodeError:
 
3306
            return char
 
3307
    return None
 
3308
 
 
3309
 
 
3310
class _FTPServerFeature(Feature):
 
3311
    """Some tests want an FTP Server, check if one is available.
 
3312
 
 
3313
    Right now, the only way this is available is if 'medusa' is installed.
 
3314
    http://www.amk.ca/python/code/medusa.html
 
3315
    """
 
3316
 
 
3317
    def _probe(self):
 
3318
        try:
 
3319
            import bzrlib.tests.ftp_server
 
3320
            return True
 
3321
        except ImportError:
 
3322
            return False
 
3323
 
 
3324
    def feature_name(self):
 
3325
        return 'FTPServer'
 
3326
 
 
3327
 
 
3328
FTPServerFeature = _FTPServerFeature()
 
3329
 
 
3330
 
 
3331
class _HTTPSServerFeature(Feature):
 
3332
    """Some tests want an https Server, check if one is available.
 
3333
 
 
3334
    Right now, the only way this is available is under python2.6 which provides
 
3335
    an ssl module.
 
3336
    """
 
3337
 
 
3338
    def _probe(self):
 
3339
        try:
 
3340
            import ssl
 
3341
            return True
 
3342
        except ImportError:
 
3343
            return False
 
3344
 
 
3345
    def feature_name(self):
 
3346
        return 'HTTPSServer'
 
3347
 
 
3348
 
 
3349
HTTPSServerFeature = _HTTPSServerFeature()
 
3350
 
 
3351
 
 
3352
class _UnicodeFilename(Feature):
 
3353
    """Does the filesystem support Unicode filenames?"""
 
3354
 
 
3355
    def _probe(self):
 
3356
        try:
 
3357
            os.stat(u'\u03b1')
 
3358
        except UnicodeEncodeError:
 
3359
            return False
 
3360
        except (IOError, OSError):
 
3361
            # The filesystem allows the Unicode filename but the file doesn't
 
3362
            # exist.
 
3363
            return True
 
3364
        else:
 
3365
            # The filesystem allows the Unicode filename and the file exists,
 
3366
            # for some reason.
 
3367
            return True
 
3368
 
 
3369
UnicodeFilename = _UnicodeFilename()
 
3370
 
 
3371
 
 
3372
class _UTF8Filesystem(Feature):
 
3373
    """Is the filesystem UTF-8?"""
 
3374
 
 
3375
    def _probe(self):
 
3376
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
3377
            return True
 
3378
        return False
 
3379
 
 
3380
UTF8Filesystem = _UTF8Filesystem()
 
3381
 
 
3382
 
 
3383
class _CaseInsCasePresFilenameFeature(Feature):
 
3384
    """Is the file-system case insensitive, but case-preserving?"""
 
3385
 
 
3386
    def _probe(self):
 
3387
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
3388
        try:
 
3389
            # first check truly case-preserving for created files, then check
 
3390
            # case insensitive when opening existing files.
 
3391
            name = osutils.normpath(name)
 
3392
            base, rel = osutils.split(name)
 
3393
            found_rel = osutils.canonical_relpath(base, name)
 
3394
            return (found_rel == rel
 
3395
                    and os.path.isfile(name.upper())
 
3396
                    and os.path.isfile(name.lower()))
 
3397
        finally:
 
3398
            os.close(fileno)
 
3399
            os.remove(name)
 
3400
 
 
3401
    def feature_name(self):
 
3402
        return "case-insensitive case-preserving filesystem"
 
3403
 
 
3404
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
3405
 
 
3406
 
 
3407
class _CaseInsensitiveFilesystemFeature(Feature):
 
3408
    """Check if underlying filesystem is case-insensitive but *not* case
 
3409
    preserving.
 
3410
    """
 
3411
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
3412
    # more likely to be case preserving, so this case is rare.
 
3413
 
 
3414
    def _probe(self):
 
3415
        if CaseInsCasePresFilenameFeature.available():
 
3416
            return False
 
3417
 
 
3418
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
3419
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
3420
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
3421
        else:
 
3422
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
3423
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
3424
            dir=root)
 
3425
        name_a = osutils.pathjoin(tdir, 'a')
 
3426
        name_A = osutils.pathjoin(tdir, 'A')
 
3427
        os.mkdir(name_a)
 
3428
        result = osutils.isdir(name_A)
 
3429
        _rmtree_temp_dir(tdir)
 
3430
        return result
 
3431
 
 
3432
    def feature_name(self):
 
3433
        return 'case-insensitive filesystem'
 
3434
 
 
3435
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()