/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Pool
  • Date: 2010-08-13 07:56:06 UTC
  • mfrom: (5050.17.4 2.2)
  • mto: (5050.17.6 2.2)
  • mto: This revision was merged to the branch mainline in revision 5379.
  • Revision ID: mbp@sourcefrog.net-20100813075606-8zgmov3ezwans2zo
merge bzr 2.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
 
30
30
import atexit
31
31
import codecs
32
 
from copy import copy
 
32
import copy
33
33
from cStringIO import StringIO
34
34
import difflib
35
35
import doctest
37
37
import logging
38
38
import math
39
39
import os
40
 
from pprint import pformat
 
40
import pprint
41
41
import random
42
42
import re
43
43
import shlex
44
44
import stat
45
 
from subprocess import Popen, PIPE, STDOUT
 
45
import subprocess
46
46
import sys
47
47
import tempfile
48
48
import threading
74
74
    ui,
75
75
    urlutils,
76
76
    registry,
 
77
    transport as _mod_transport,
77
78
    workingtree,
78
79
    )
79
80
import bzrlib.branch
103
104
    )
104
105
import bzrlib.trace
105
106
from bzrlib.transport import (
106
 
    get_transport,
107
107
    memory,
108
108
    pathfilter,
109
109
    )
110
 
import bzrlib.transport
111
110
from bzrlib.trace import mutter, note
112
111
from bzrlib.tests import (
113
112
    test_server,
275
274
 
276
275
    def _shortened_test_description(self, test):
277
276
        what = test.id()
278
 
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
277
        what = re.sub(r'^bzrlib\.tests\.', '', what)
279
278
        return what
280
279
 
281
280
    def startTest(self, test):
943
942
 
944
943
    def permit_dir(self, name):
945
944
        """Permit a directory to be used by this test. See permit_url."""
946
 
        name_transport = get_transport(name)
 
945
        name_transport = _mod_transport.get_transport(name)
947
946
        self.permit_url(name)
948
947
        self.permit_url(name_transport.base)
949
948
 
1028
1027
        self.addCleanup(transport_server.stop_server)
1029
1028
        # Obtain a real transport because if the server supplies a password, it
1030
1029
        # will be hidden from the base on the client side.
1031
 
        t = get_transport(transport_server.get_url())
 
1030
        t = _mod_transport.get_transport(transport_server.get_url())
1032
1031
        # Some transport servers effectively chroot the backing transport;
1033
1032
        # others like SFTPServer don't - users of the transport can walk up the
1034
1033
        # transport to read the entire backing transport. This wouldn't matter
1095
1094
            message += '\n'
1096
1095
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1097
1096
            % (message,
1098
 
               pformat(a), pformat(b)))
 
1097
               pprint.pformat(a), pprint.pformat(b)))
1099
1098
 
1100
1099
    assertEquals = assertEqual
1101
1100
 
1986
1985
            if not allow_plugins:
1987
1986
                command.append('--no-plugins')
1988
1987
            command.extend(process_args)
1989
 
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1988
            process = self._popen(command, stdin=subprocess.PIPE,
 
1989
                                  stdout=subprocess.PIPE,
 
1990
                                  stderr=subprocess.PIPE)
1990
1991
        finally:
1991
1992
            restore_environment()
1992
1993
            if cwd is not None:
2000
2001
        Allows tests to override this method to intercept the calls made to
2001
2002
        Popen for introspection.
2002
2003
        """
2003
 
        return Popen(*args, **kwargs)
 
2004
        return subprocess.Popen(*args, **kwargs)
2004
2005
 
2005
2006
    def get_source_path(self):
2006
2007
        """Return the path of the directory containing bzrlib."""
2186
2187
 
2187
2188
        :param relpath: a path relative to the base url.
2188
2189
        """
2189
 
        t = get_transport(self.get_url(relpath))
 
2190
        t = _mod_transport.get_transport(self.get_url(relpath))
2190
2191
        self.assertFalse(t.is_readonly())
2191
2192
        return t
2192
2193
 
2198
2199
 
2199
2200
        :param relpath: a path relative to the base url.
2200
2201
        """
2201
 
        t = get_transport(self.get_readonly_url(relpath))
 
2202
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2202
2203
        self.assertTrue(t.is_readonly())
2203
2204
        return t
2204
2205
 
2334
2335
        propagating. This method ensures than a test did not leaked.
2335
2336
        """
2336
2337
        root = TestCaseWithMemoryTransport.TEST_ROOT
2337
 
        self.permit_url(get_transport(root).base)
 
2338
        self.permit_url(_mod_transport.get_transport(root).base)
2338
2339
        wt = workingtree.WorkingTree.open(root)
2339
2340
        last_rev = wt.last_revision()
2340
2341
        if last_rev != 'null:':
2385
2386
            # might be a relative or absolute path
2386
2387
            maybe_a_url = self.get_url(relpath)
2387
2388
            segments = maybe_a_url.rsplit('/', 1)
2388
 
            t = get_transport(maybe_a_url)
 
2389
            t = _mod_transport.get_transport(maybe_a_url)
2389
2390
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2390
2391
                t.ensure_base()
2391
2392
            if format is None:
2408
2409
        made_control = self.make_bzrdir(relpath, format=format)
2409
2410
        return made_control.create_repository(shared=shared)
2410
2411
 
2411
 
    def make_smart_server(self, path):
 
2412
    def make_smart_server(self, path, backing_server=None):
 
2413
        if backing_server is None:
 
2414
            backing_server = self.get_server()
2412
2415
        smart_server = test_server.SmartTCPServer_for_testing()
2413
 
        self.start_server(smart_server, self.get_server())
2414
 
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2416
        self.start_server(smart_server, backing_server)
 
2417
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2418
                                                   ).clone(path)
2415
2419
        return remote_transport
2416
2420
 
2417
2421
    def make_branch_and_memory_tree(self, relpath, format=None):
2566
2570
                "a list or a tuple. Got %r instead" % (shape,))
2567
2571
        # It's OK to just create them using forward slashes on windows.
2568
2572
        if transport is None or transport.is_readonly():
2569
 
            transport = get_transport(".")
 
2573
            transport = _mod_transport.get_transport(".")
2570
2574
        for name in shape:
2571
2575
            self.assertIsInstance(name, basestring)
2572
2576
            if name[-1] == '/':
2740
2744
    :param pattern: A regular expression string.
2741
2745
    :return: A callable that returns True if the re matches.
2742
2746
    """
2743
 
    filter_re = osutils.re_compile_checked(pattern, 0,
2744
 
        'test filter')
 
2747
    filter_re = re.compile(pattern, 0)
2745
2748
    def condition(test):
2746
2749
        test_id = test.id()
2747
2750
        return filter_re.search(test_id)
3313
3316
                '--subunit']
3314
3317
            if '--no-plugins' in sys.argv:
3315
3318
                argv.append('--no-plugins')
3316
 
            # stderr=STDOUT would be ideal, but until we prevent noise on
3317
 
            # stderr it can interrupt the subunit protocol.
3318
 
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
3319
 
                bufsize=1)
 
3319
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3320
            # noise on stderr it can interrupt the subunit protocol.
 
3321
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3322
                                      stdout=subprocess.PIPE,
 
3323
                                      stderr=subprocess.PIPE,
 
3324
                                      bufsize=1)
3320
3325
            test = TestInSubprocess(process, test_list_file_name)
3321
3326
            result.append(test)
3322
3327
        except:
3371
3376
 
3372
3377
    def startTest(self, test):
3373
3378
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3379
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3380
        # unavoidably fail.
 
3381
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
3374
3382
        self.profiler.start()
3375
3383
        ForwardingResult.startTest(self, test)
3376
3384
 
3704
3712
        'bzrlib.tests.test_export',
3705
3713
        'bzrlib.tests.test_extract',
3706
3714
        'bzrlib.tests.test_fetch',
 
3715
        'bzrlib.tests.test_fixtures',
3707
3716
        'bzrlib.tests.test_fifo_cache',
3708
3717
        'bzrlib.tests.test_filters',
3709
3718
        'bzrlib.tests.test_ftp_transport',
3730
3739
        'bzrlib.tests.test_knit',
3731
3740
        'bzrlib.tests.test_lazy_import',
3732
3741
        'bzrlib.tests.test_lazy_regex',
 
3742
        'bzrlib.tests.test_library_state',
3733
3743
        'bzrlib.tests.test_lock',
3734
3744
        'bzrlib.tests.test_lockable_files',
3735
3745
        'bzrlib.tests.test_lockdir',
3803
3813
        'bzrlib.tests.test_transport_log',
3804
3814
        'bzrlib.tests.test_tree',
3805
3815
        'bzrlib.tests.test_treebuilder',
 
3816
        'bzrlib.tests.test_treeshape',
3806
3817
        'bzrlib.tests.test_tsort',
3807
3818
        'bzrlib.tests.test_tuned_gzip',
3808
3819
        'bzrlib.tests.test_ui',
3839
3850
        'bzrlib.option',
3840
3851
        'bzrlib.symbol_versioning',
3841
3852
        'bzrlib.tests',
 
3853
        'bzrlib.tests.fixtures',
3842
3854
        'bzrlib.timestamp',
3843
3855
        'bzrlib.version_info_formats.format_custom',
3844
3856
        ]
4038
4050
    :param new_id: The id to assign to it.
4039
4051
    :return: The new test.
4040
4052
    """
4041
 
    new_test = copy(test)
 
4053
    new_test = copy.copy(test)
4042
4054
    new_test.id = lambda: new_id
4043
4055
    return new_test
4044
4056
 
4345
4357
UnicodeFilename = _UnicodeFilename()
4346
4358
 
4347
4359
 
 
4360
class _ByteStringNamedFilesystem(Feature):
 
4361
    """Is the filesystem based on bytes?"""
 
4362
 
 
4363
    def _probe(self):
 
4364
        if os.name == "posix":
 
4365
            return True
 
4366
        return False
 
4367
 
 
4368
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4369
 
 
4370
 
4348
4371
class _UTF8Filesystem(Feature):
4349
4372
    """Is the filesystem UTF-8?"""
4350
4373