/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
49
49
import bzrlib.commands
50
50
import bzrlib.bundle.serializer
51
51
import bzrlib.errors as errors
 
52
import bzrlib.export
52
53
import bzrlib.inventory
53
54
import bzrlib.iterablefile
54
55
import bzrlib.lockdir
70
71
from bzrlib.transport import get_transport
71
72
import bzrlib.transport
72
73
from bzrlib.transport.local import LocalRelpathServer
 
74
from bzrlib.transport.memory import MemoryServer
73
75
from bzrlib.transport.readonly import ReadonlyServer
74
76
from bzrlib.trace import mutter
75
77
from bzrlib.tests import TestUtil
85
87
 
86
88
MODULES_TO_TEST = []
87
89
MODULES_TO_DOCTEST = [
88
 
                      bzrlib.branch,
89
90
                      bzrlib.bundle.serializer,
90
 
                      bzrlib.commands,
91
91
                      bzrlib.errors,
 
92
                      bzrlib.export,
92
93
                      bzrlib.inventory,
93
94
                      bzrlib.iterablefile,
94
95
                      bzrlib.lockdir,
95
96
                      bzrlib.merge3,
96
97
                      bzrlib.option,
97
 
                      bzrlib.osutils,
98
98
                      bzrlib.store,
99
 
                      bzrlib.transport,
100
99
                      ]
101
100
 
102
101
 
240
239
        if isinstance(err[1], TestSkipped):
241
240
            return self.addSkipped(test, err)    
242
241
        unittest.TestResult.addError(self, test, err)
 
242
        # We can only do this if we have one of our TestCases, not if
 
243
        # we have a doctest.
 
244
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
245
        if setKeepLogfile is not None:
 
246
            setKeepLogfile()
243
247
        self.extractBenchmarkTime(test)
244
248
        if self.showAll:
245
249
            self.stream.writeln("ERROR %s" % self._testTimeString())
256
260
 
257
261
    def addFailure(self, test, err):
258
262
        unittest.TestResult.addFailure(self, test, err)
 
263
        # We can only do this if we have one of our TestCases, not if
 
264
        # we have a doctest.
 
265
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
266
        if setKeepLogfile is not None:
 
267
            setKeepLogfile()
259
268
        self.extractBenchmarkTime(test)
260
269
        if self.showAll:
261
270
            self.stream.writeln(" FAIL %s" % self._testTimeString())
382
391
        # This is still a little bogus, 
383
392
        # but only a little. Folk not using our testrunner will
384
393
        # have to delete their temp directories themselves.
385
 
        test_root = TestCaseInTempDir.TEST_ROOT
 
394
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
386
395
        if result.wasSuccessful() or not self.keep_output:
387
396
            if test_root is not None:
388
397
                # If LANG=C we probably have created some bogus paths
406
415
                self.stream.writeln(
407
416
                    "Failed tests working directories are in '%s'\n" %
408
417
                    test_root)
409
 
        TestCaseInTempDir.TEST_ROOT = None
 
418
        TestCaseWithMemoryTransport.TEST_ROOT = None
410
419
        if self.pb is not None:
411
420
            self.pb.clear()
412
421
        return result
482
491
 
483
492
    _log_file_name = None
484
493
    _log_contents = ''
 
494
    _keep_log_file = False
485
495
    # record lsprof data when performing benchmark calls.
486
496
    _gather_lsprof_in_benchmarks = False
487
497
 
662
672
    def _finishLogFile(self):
663
673
        """Finished with the log file.
664
674
 
665
 
        Read contents into memory, close, and delete.
 
675
        Close the file and delete it, unless setKeepLogfile was called.
666
676
        """
667
677
        if self._log_file is None:
668
678
            return
669
679
        bzrlib.trace.disable_test_log(self._log_nonce)
670
 
        self._log_file.seek(0)
671
 
        self._log_contents = self._log_file.read()
672
680
        self._log_file.close()
673
 
        os.remove(self._log_file_name)
674
 
        self._log_file = self._log_file_name = None
 
681
        self._log_file = None
 
682
        if not self._keep_log_file:
 
683
            os.remove(self._log_file_name)
 
684
            self._log_file_name = None
 
685
 
 
686
    def setKeepLogfile(self):
 
687
        """Make the logfile not be deleted when _finishLogFile is called."""
 
688
        self._keep_log_file = True
675
689
 
676
690
    def addCleanup(self, callable):
677
691
        """Arrange to run a callable when this case is torn down.
686
700
 
687
701
    def _cleanEnvironment(self):
688
702
        new_env = {
 
703
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
689
704
            'HOME': os.getcwd(),
690
705
            'APPDATA': os.getcwd(),
691
706
            'BZR_EMAIL': None,
745
760
    def log(self, *args):
746
761
        mutter(*args)
747
762
 
748
 
    def _get_log(self):
749
 
        """Return as a string the log for this test"""
750
 
        if self._log_file_name:
751
 
            return open(self._log_file_name).read()
752
 
        else:
 
763
    def _get_log(self, keep_log_file=False):
 
764
        """Return as a string the log for this test. If the file is still
 
765
        on disk and keep_log_file=False, delete the log file and store the
 
766
        content in self._log_contents."""
 
767
        # flush the log file, to get all content
 
768
        import bzrlib.trace
 
769
        bzrlib.trace._trace_file.flush()
 
770
        if self._log_contents:
753
771
            return self._log_contents
754
 
        # TODO: Delete the log after it's been read in
 
772
        if self._log_file_name is not None:
 
773
            logfile = open(self._log_file_name)
 
774
            try:
 
775
                log_contents = logfile.read()
 
776
            finally:
 
777
                logfile.close()
 
778
            if not keep_log_file:
 
779
                self._log_contents = log_contents
 
780
                os.remove(self._log_file_name)
 
781
            return log_contents
 
782
        else:
 
783
            return "DELETED log file to reduce memory footprint"
755
784
 
756
785
    def capture(self, cmd, retcode=0):
757
786
        """Shortcut that splits cmd into words, runs, and returns stdout"""
758
787
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
759
788
 
760
 
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
 
789
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
790
                         working_dir=None):
761
791
        """Invoke bzr and return (stdout, stderr).
762
792
 
763
793
        Useful for code that wants to check the contents of the
778
808
        :param retcode: expected return code, or None for don't-care.
779
809
        :param encoding: encoding for sys.stdout and sys.stderr
780
810
        :param stdin: A string to be used as stdin for the command.
 
811
        :param working_dir: Change to this directory before running
781
812
        """
782
813
        if encoding is None:
783
814
            encoding = bzrlib.user_encoding
799
830
            stdout=stdout,
800
831
            stderr=stderr)
801
832
        bzrlib.ui.ui_factory.stdin = stdin
 
833
 
 
834
        cwd = None
 
835
        if working_dir is not None:
 
836
            cwd = osutils.getcwd()
 
837
            os.chdir(working_dir)
 
838
 
802
839
        try:
803
840
            result = self.apply_redirected(stdin, stdout, stderr,
804
841
                                           bzrlib.commands.run_bzr_catch_errors,
806
843
        finally:
807
844
            logger.removeHandler(handler)
808
845
            bzrlib.ui.ui_factory = old_ui_factory
 
846
            if cwd is not None:
 
847
                os.chdir(cwd)
809
848
 
810
849
        out = stdout.getvalue()
811
850
        err = stderr.getvalue()
832
871
        retcode = kwargs.pop('retcode', 0)
833
872
        encoding = kwargs.pop('encoding', None)
834
873
        stdin = kwargs.pop('stdin', None)
835
 
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
 
874
        working_dir = kwargs.pop('working_dir', None)
 
875
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
876
                                     stdin=stdin, working_dir=working_dir)
836
877
 
837
878
    def run_bzr_decode(self, *args, **kwargs):
838
879
        if 'encoding' in kwargs:
886
927
            The values must be strings. The change will only occur in the
887
928
            child, so you don't need to fix the environment after running.
888
929
        :param universal_newlines: Convert CRLF => LF
 
930
        :param allow_plugins: By default the subprocess is run with
 
931
            --no-plugins to ensure test reproducibility. Also, it is possible
 
932
            for system-wide plugins to create unexpected output on stderr,
 
933
            which can cause unnecessary test failures.
889
934
        """
890
935
        env_changes = kwargs.get('env_changes', {})
891
 
        process = self.start_bzr_subprocess(args, env_changes=env_changes)
 
936
        working_dir = kwargs.get('working_dir', None)
 
937
        allow_plugins = kwargs.get('allow_plugins', False)
 
938
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
939
                                            working_dir=working_dir,
 
940
                                            allow_plugins=allow_plugins)
892
941
        # We distinguish between retcode=None and retcode not passed.
893
942
        supplied_retcode = kwargs.get('retcode', 0)
894
943
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
896
945
            process_args=args)
897
946
 
898
947
    def start_bzr_subprocess(self, process_args, env_changes=None,
899
 
                             skip_if_plan_to_signal=False):
 
948
                             skip_if_plan_to_signal=False,
 
949
                             working_dir=None,
 
950
                             allow_plugins=False):
900
951
        """Start bzr in a subprocess for testing.
901
952
 
902
953
        This starts a new Python interpreter and runs bzr in there.
913
964
            child, so you don't need to fix the environment after running.
914
965
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
915
966
            is not available.
 
967
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
916
968
 
917
969
        :returns: Popen object for the started process.
918
970
        """
934
986
 
935
987
        bzr_path = self.get_bzr_path()
936
988
 
 
989
        cwd = None
 
990
        if working_dir is not None:
 
991
            cwd = osutils.getcwd()
 
992
            os.chdir(working_dir)
 
993
 
937
994
        try:
938
995
            # win32 subprocess doesn't support preexec_fn
939
996
            # so we will avoid using it on all platforms, just to
940
997
            # make sure the code path is used, and we don't break on win32
941
998
            cleanup_environment()
942
 
            process = Popen([sys.executable, bzr_path] + list(process_args),
943
 
                             stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
999
            command = [sys.executable, bzr_path]
 
1000
            if not allow_plugins:
 
1001
                command.append('--no-plugins')
 
1002
            command.extend(process_args)
 
1003
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
944
1004
        finally:
945
1005
            restore_environment()
 
1006
            if cwd is not None:
 
1007
                os.chdir(cwd)
 
1008
 
946
1009
        return process
947
1010
 
 
1011
    def _popen(self, *args, **kwargs):
 
1012
        """Place a call to Popen.
 
1013
 
 
1014
        Allows tests to override this method to intercept the calls made to
 
1015
        Popen for introspection.
 
1016
        """
 
1017
        return Popen(*args, **kwargs)
 
1018
 
948
1019
    def get_bzr_path(self):
949
1020
        """Return the path of the 'bzr' executable for this test suite."""
950
1021
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1053
1124
 
1054
1125
BzrTestBase = TestCase
1055
1126
 
 
1127
 
 
1128
class TestCaseWithMemoryTransport(TestCase):
 
1129
    """Common test class for tests that do not need disk resources.
 
1130
 
 
1131
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1132
 
 
1133
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1134
 
 
1135
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1136
    a directory which does not exist. This serves to help ensure test isolation
 
1137
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1138
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1139
    file defaults for the transport in tests, nor does it obey the command line
 
1140
    override, so tests that accidentally write to the common directory should
 
1141
    be rare.
 
1142
    """
 
1143
 
 
1144
    TEST_ROOT = None
 
1145
    _TEST_NAME = 'test'
 
1146
 
 
1147
 
 
1148
    def __init__(self, methodName='runTest'):
 
1149
        # allow test parameterisation after test construction and before test
 
1150
        # execution. Variables that the parameteriser sets need to be 
 
1151
        # ones that are not set by setUp, or setUp will trash them.
 
1152
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1153
        self.transport_server = default_transport
 
1154
        self.transport_readonly_server = None
 
1155
 
 
1156
    def failUnlessExists(self, path):
 
1157
        """Fail unless path, which may be abs or relative, exists."""
 
1158
        self.failUnless(osutils.lexists(path))
 
1159
 
 
1160
    def failIfExists(self, path):
 
1161
        """Fail if path, which may be abs or relative, exists."""
 
1162
        self.failIf(osutils.lexists(path))
 
1163
        
 
1164
    def get_transport(self):
 
1165
        """Return a writeable transport for the test scratch space"""
 
1166
        t = get_transport(self.get_url())
 
1167
        self.assertFalse(t.is_readonly())
 
1168
        return t
 
1169
 
 
1170
    def get_readonly_transport(self):
 
1171
        """Return a readonly transport for the test scratch space
 
1172
        
 
1173
        This can be used to test that operations which should only need
 
1174
        readonly access in fact do not try to write.
 
1175
        """
 
1176
        t = get_transport(self.get_readonly_url())
 
1177
        self.assertTrue(t.is_readonly())
 
1178
        return t
 
1179
 
 
1180
    def get_readonly_server(self):
 
1181
        """Get the server instance for the readonly transport
 
1182
 
 
1183
        This is useful for some tests with specific servers to do diagnostics.
 
1184
        """
 
1185
        if self.__readonly_server is None:
 
1186
            if self.transport_readonly_server is None:
 
1187
                # readonly decorator requested
 
1188
                # bring up the server
 
1189
                self.get_url()
 
1190
                self.__readonly_server = ReadonlyServer()
 
1191
                self.__readonly_server.setUp(self.__server)
 
1192
            else:
 
1193
                self.__readonly_server = self.transport_readonly_server()
 
1194
                self.__readonly_server.setUp()
 
1195
            self.addCleanup(self.__readonly_server.tearDown)
 
1196
        return self.__readonly_server
 
1197
 
 
1198
    def get_readonly_url(self, relpath=None):
 
1199
        """Get a URL for the readonly transport.
 
1200
 
 
1201
        This will either be backed by '.' or a decorator to the transport 
 
1202
        used by self.get_url()
 
1203
        relpath provides for clients to get a path relative to the base url.
 
1204
        These should only be downwards relative, not upwards.
 
1205
        """
 
1206
        base = self.get_readonly_server().get_url()
 
1207
        if relpath is not None:
 
1208
            if not base.endswith('/'):
 
1209
                base = base + '/'
 
1210
            base = base + relpath
 
1211
        return base
 
1212
 
 
1213
    def get_server(self):
 
1214
        """Get the read/write server instance.
 
1215
 
 
1216
        This is useful for some tests with specific servers that need
 
1217
        diagnostics.
 
1218
 
 
1219
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1220
        is no means to override it.
 
1221
        """
 
1222
        if self.__server is None:
 
1223
            self.__server = MemoryServer()
 
1224
            self.__server.setUp()
 
1225
            self.addCleanup(self.__server.tearDown)
 
1226
        return self.__server
 
1227
 
 
1228
    def get_url(self, relpath=None):
 
1229
        """Get a URL (or maybe a path) for the readwrite transport.
 
1230
 
 
1231
        This will either be backed by '.' or to an equivalent non-file based
 
1232
        facility.
 
1233
        relpath provides for clients to get a path relative to the base url.
 
1234
        These should only be downwards relative, not upwards.
 
1235
        """
 
1236
        base = self.get_server().get_url()
 
1237
        if relpath is not None and relpath != '.':
 
1238
            if not base.endswith('/'):
 
1239
                base = base + '/'
 
1240
            # XXX: Really base should be a url; we did after all call
 
1241
            # get_url()!  But sometimes it's just a path (from
 
1242
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1243
            # to a non-escaped local path.
 
1244
            if base.startswith('./') or base.startswith('/'):
 
1245
                base += relpath
 
1246
            else:
 
1247
                base += urlutils.escape(relpath)
 
1248
        return base
 
1249
 
 
1250
    def _make_test_root(self):
 
1251
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1252
            return
 
1253
        i = 0
 
1254
        while True:
 
1255
            root = u'test%04d.tmp' % i
 
1256
            try:
 
1257
                os.mkdir(root)
 
1258
            except OSError, e:
 
1259
                if e.errno == errno.EEXIST:
 
1260
                    i += 1
 
1261
                    continue
 
1262
                else:
 
1263
                    raise
 
1264
            # successfully created
 
1265
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1266
            break
 
1267
        # make a fake bzr directory there to prevent any tests propagating
 
1268
        # up onto the source directory's real branch
 
1269
        bzrdir.BzrDir.create_standalone_workingtree(
 
1270
            TestCaseWithMemoryTransport.TEST_ROOT)
 
1271
 
 
1272
    def makeAndChdirToTestDir(self):
 
1273
        """Create a temporary directories for this one test.
 
1274
        
 
1275
        This must set self.test_home_dir and self.test_dir and chdir to
 
1276
        self.test_dir.
 
1277
        
 
1278
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1279
        """
 
1280
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1281
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1282
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1283
        
 
1284
    def make_branch(self, relpath, format=None):
 
1285
        """Create a branch on the transport at relpath."""
 
1286
        repo = self.make_repository(relpath, format=format)
 
1287
        return repo.bzrdir.create_branch()
 
1288
 
 
1289
    def make_bzrdir(self, relpath, format=None):
 
1290
        try:
 
1291
            # might be a relative or absolute path
 
1292
            maybe_a_url = self.get_url(relpath)
 
1293
            segments = maybe_a_url.rsplit('/', 1)
 
1294
            t = get_transport(maybe_a_url)
 
1295
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1296
                try:
 
1297
                    t.mkdir('.')
 
1298
                except errors.FileExists:
 
1299
                    pass
 
1300
            if format is None:
 
1301
                format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
 
1302
            return format.initialize_on_transport(t)
 
1303
        except errors.UninitializableFormat:
 
1304
            raise TestSkipped("Format %s is not initializable." % format)
 
1305
 
 
1306
    def make_repository(self, relpath, shared=False, format=None):
 
1307
        """Create a repository on our default transport at relpath."""
 
1308
        made_control = self.make_bzrdir(relpath, format=format)
 
1309
        return made_control.create_repository(shared=shared)
 
1310
 
 
1311
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1312
        """Create a branch on the default transport and a MemoryTree for it."""
 
1313
        b = self.make_branch(relpath, format=format)
 
1314
        return memorytree.MemoryTree.create_on_branch(b)
 
1315
 
 
1316
    def overrideEnvironmentForTesting(self):
 
1317
        os.environ['HOME'] = self.test_home_dir
 
1318
        os.environ['APPDATA'] = self.test_home_dir
 
1319
        
 
1320
    def setUp(self):
 
1321
        super(TestCaseWithMemoryTransport, self).setUp()
 
1322
        self._make_test_root()
 
1323
        _currentdir = os.getcwdu()
 
1324
        def _leaveDirectory():
 
1325
            os.chdir(_currentdir)
 
1326
        self.addCleanup(_leaveDirectory)
 
1327
        self.makeAndChdirToTestDir()
 
1328
        self.overrideEnvironmentForTesting()
 
1329
        self.__readonly_server = None
 
1330
        self.__server = None
 
1331
 
1056
1332
     
1057
 
class TestCaseInTempDir(TestCase):
 
1333
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1058
1334
    """Derived class that runs a test within a temporary directory.
1059
1335
 
1060
1336
    This is useful for tests that need to create a branch, etc.
1067
1343
    InTempDir is an old alias for FunctionalTestCase.
1068
1344
    """
1069
1345
 
1070
 
    TEST_ROOT = None
1071
 
    _TEST_NAME = 'test'
1072
1346
    OVERRIDE_PYTHON = 'python'
1073
1347
 
1074
1348
    def check_file_contents(self, filename, expect):
1079
1353
            self.log("actually: %r" % contents)
1080
1354
            self.fail("contents of %s not as expected" % filename)
1081
1355
 
1082
 
    def _make_test_root(self):
1083
 
        if TestCaseInTempDir.TEST_ROOT is not None:
1084
 
            return
1085
 
        i = 0
1086
 
        while True:
1087
 
            root = u'test%04d.tmp' % i
1088
 
            try:
1089
 
                os.mkdir(root)
1090
 
            except OSError, e:
1091
 
                if e.errno == errno.EEXIST:
1092
 
                    i += 1
1093
 
                    continue
1094
 
                else:
1095
 
                    raise
1096
 
            # successfully created
1097
 
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
1098
 
            break
1099
 
        # make a fake bzr directory there to prevent any tests propagating
1100
 
        # up onto the source directory's real branch
1101
 
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
1102
 
 
1103
 
    def setUp(self):
1104
 
        super(TestCaseInTempDir, self).setUp()
1105
 
        self._make_test_root()
1106
 
        _currentdir = os.getcwdu()
 
1356
    def makeAndChdirToTestDir(self):
 
1357
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1358
        
 
1359
        For TestCaseInTempDir we create a temporary directory based on the test
 
1360
        name and then create two subdirs - test and home under it.
 
1361
        """
1107
1362
        # shorten the name, to avoid test failures due to path length
1108
1363
        short_id = self.id().replace('bzrlib.tests.', '') \
1109
1364
                   .replace('__main__.', '')[-100:]
1126
1381
                os.mkdir(self.test_dir)
1127
1382
                os.chdir(self.test_dir)
1128
1383
                break
1129
 
        os.environ['HOME'] = self.test_home_dir
1130
 
        os.environ['APPDATA'] = self.test_home_dir
1131
 
        def _leaveDirectory():
1132
 
            os.chdir(_currentdir)
1133
 
        self.addCleanup(_leaveDirectory)
1134
 
        
 
1384
 
1135
1385
    def build_tree(self, shape, line_endings='native', transport=None):
1136
1386
        """Build a test tree according to a pattern.
1137
1387
 
1178
1428
    def build_tree_contents(self, shape):
1179
1429
        build_tree_contents(shape)
1180
1430
 
1181
 
    def failUnlessExists(self, path):
1182
 
        """Fail unless path, which may be abs or relative, exists."""
1183
 
        self.failUnless(osutils.lexists(path))
1184
 
 
1185
 
    def failIfExists(self, path):
1186
 
        """Fail if path, which may be abs or relative, exists."""
1187
 
        self.failIf(osutils.lexists(path))
1188
 
        
1189
1431
    def assertFileEqual(self, content, path):
1190
1432
        """Fail if path does not contain 'content'."""
1191
1433
        self.failUnless(osutils.lexists(path))
1207
1449
    readwrite one must both define get_url() as resolving to os.getcwd().
1208
1450
    """
1209
1451
 
1210
 
    def __init__(self, methodName='testMethod'):
1211
 
        super(TestCaseWithTransport, self).__init__(methodName)
1212
 
        self.__readonly_server = None
1213
 
        self.__server = None
1214
 
        self.transport_server = default_transport
1215
 
        self.transport_readonly_server = None
1216
 
 
1217
 
    def get_readonly_url(self, relpath=None):
1218
 
        """Get a URL for the readonly transport.
1219
 
 
1220
 
        This will either be backed by '.' or a decorator to the transport 
1221
 
        used by self.get_url()
1222
 
        relpath provides for clients to get a path relative to the base url.
1223
 
        These should only be downwards relative, not upwards.
1224
 
        """
1225
 
        base = self.get_readonly_server().get_url()
1226
 
        if relpath is not None:
1227
 
            if not base.endswith('/'):
1228
 
                base = base + '/'
1229
 
            base = base + relpath
1230
 
        return base
1231
 
 
1232
 
    def get_readonly_server(self):
1233
 
        """Get the server instance for the readonly transport
1234
 
 
1235
 
        This is useful for some tests with specific servers to do diagnostics.
1236
 
        """
1237
 
        if self.__readonly_server is None:
1238
 
            if self.transport_readonly_server is None:
1239
 
                # readonly decorator requested
1240
 
                # bring up the server
1241
 
                self.get_url()
1242
 
                self.__readonly_server = ReadonlyServer()
1243
 
                self.__readonly_server.setUp(self.__server)
1244
 
            else:
1245
 
                self.__readonly_server = self.transport_readonly_server()
1246
 
                self.__readonly_server.setUp()
1247
 
            self.addCleanup(self.__readonly_server.tearDown)
1248
 
        return self.__readonly_server
1249
 
 
1250
1452
    def get_server(self):
1251
 
        """Get the read/write server instance.
 
1453
        """See TestCaseWithMemoryTransport.
1252
1454
 
1253
1455
        This is useful for some tests with specific servers that need
1254
1456
        diagnostics.
1259
1461
            self.addCleanup(self.__server.tearDown)
1260
1462
        return self.__server
1261
1463
 
1262
 
    def get_url(self, relpath=None):
1263
 
        """Get a URL (or maybe a path) for the readwrite transport.
1264
 
 
1265
 
        This will either be backed by '.' or to an equivalent non-file based
1266
 
        facility.
1267
 
        relpath provides for clients to get a path relative to the base url.
1268
 
        These should only be downwards relative, not upwards.
1269
 
        """
1270
 
        base = self.get_server().get_url()
1271
 
        if relpath is not None and relpath != '.':
1272
 
            if not base.endswith('/'):
1273
 
                base = base + '/'
1274
 
            # XXX: Really base should be a url; we did after all call
1275
 
            # get_url()!  But sometimes it's just a path (from
1276
 
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
1277
 
            # to a non-escaped local path.
1278
 
            if base.startswith('./') or base.startswith('/'):
1279
 
                base += relpath
1280
 
            else:
1281
 
                base += urlutils.escape(relpath)
1282
 
        return base
1283
 
 
1284
 
    def get_transport(self):
1285
 
        """Return a writeable transport for the test scratch space"""
1286
 
        t = get_transport(self.get_url())
1287
 
        self.assertFalse(t.is_readonly())
1288
 
        return t
1289
 
 
1290
 
    def get_readonly_transport(self):
1291
 
        """Return a readonly transport for the test scratch space
1292
 
        
1293
 
        This can be used to test that operations which should only need
1294
 
        readonly access in fact do not try to write.
1295
 
        """
1296
 
        t = get_transport(self.get_readonly_url())
1297
 
        self.assertTrue(t.is_readonly())
1298
 
        return t
1299
 
 
1300
 
    def make_branch(self, relpath, format=None):
1301
 
        """Create a branch on the transport at relpath."""
1302
 
        repo = self.make_repository(relpath, format=format)
1303
 
        return repo.bzrdir.create_branch()
1304
 
 
1305
 
    def make_bzrdir(self, relpath, format=None):
1306
 
        try:
1307
 
            # might be a relative or absolute path
1308
 
            maybe_a_url = self.get_url(relpath)
1309
 
            segments = maybe_a_url.rsplit('/', 1)
1310
 
            t = get_transport(maybe_a_url)
1311
 
            if len(segments) > 1 and segments[-1] not in ('', '.'):
1312
 
                try:
1313
 
                    t.mkdir('.')
1314
 
                except errors.FileExists:
1315
 
                    pass
1316
 
            if format is None:
1317
 
                format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
1318
 
            return format.initialize_on_transport(t)
1319
 
        except errors.UninitializableFormat:
1320
 
            raise TestSkipped("Format %s is not initializable." % format)
1321
 
 
1322
 
    def make_repository(self, relpath, shared=False, format=None):
1323
 
        """Create a repository on our default transport at relpath."""
1324
 
        made_control = self.make_bzrdir(relpath, format=format)
1325
 
        return made_control.create_repository(shared=shared)
1326
 
 
1327
 
    def make_branch_and_memory_tree(self, relpath):
1328
 
        """Create a branch on the default transport and a MemoryTree for it."""
1329
 
        b = self.make_branch(relpath)
1330
 
        return memorytree.MemoryTree.create_on_branch(b)
1331
 
 
1332
1464
    def make_branch_and_tree(self, relpath, format=None):
1333
1465
        """Create a branch on the transport and a tree locally.
1334
1466
 
1376
1508
            self.fail("path %s is not a directory; has mode %#o"
1377
1509
                      % (relpath, mode))
1378
1510
 
 
1511
    def setUp(self):
 
1512
        super(TestCaseWithTransport, self).setUp()
 
1513
        self.__server = None
 
1514
        self.transport_server = default_transport
 
1515
 
1379
1516
 
1380
1517
class ChrootedTestCase(TestCaseWithTransport):
1381
1518
    """A support class that provides readonly urls outside the local namespace.
1407
1544
def run_suite(suite, name='test', verbose=False, pattern=".*",
1408
1545
              stop_on_failure=False, keep_output=False,
1409
1546
              transport=None, lsprof_timed=None, bench_history=None):
1410
 
    TestCaseInTempDir._TEST_NAME = name
1411
1547
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1412
1548
    if verbose:
1413
1549
        verbosity = 2
1497
1633
                   'bzrlib.tests.test_inv',
1498
1634
                   'bzrlib.tests.test_knit',
1499
1635
                   'bzrlib.tests.test_lazy_import',
 
1636
                   'bzrlib.tests.test_lazy_regex',
1500
1637
                   'bzrlib.tests.test_lockdir',
1501
1638
                   'bzrlib.tests.test_lockable_files',
1502
1639
                   'bzrlib.tests.test_log',
1515
1652
                   'bzrlib.tests.test_plugins',
1516
1653
                   'bzrlib.tests.test_progress',
1517
1654
                   'bzrlib.tests.test_reconcile',
 
1655
                   'bzrlib.tests.test_registry',
1518
1656
                   'bzrlib.tests.test_repository',
1519
1657
                   'bzrlib.tests.test_revert',
1520
1658
                   'bzrlib.tests.test_revision',
1547
1685
                   'bzrlib.tests.test_urlutils',
1548
1686
                   'bzrlib.tests.test_versionedfile',
1549
1687
                   'bzrlib.tests.test_version',
 
1688
                   'bzrlib.tests.test_version_info',
1550
1689
                   'bzrlib.tests.test_weave',
1551
1690
                   'bzrlib.tests.test_whitebox',
1552
1691
                   'bzrlib.tests.test_workingtree',
1567
1706
    for m in MODULES_TO_TEST:
1568
1707
        suite.addTest(loader.loadTestsFromModule(m))
1569
1708
    for m in MODULES_TO_DOCTEST:
1570
 
        suite.addTest(doctest.DocTestSuite(m))
 
1709
        try:
 
1710
            suite.addTest(doctest.DocTestSuite(m))
 
1711
        except ValueError, e:
 
1712
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
1713
            raise
1571
1714
    for name, plugin in bzrlib.plugin.all_plugins().items():
1572
1715
        if getattr(plugin, 'test_suite', None) is not None:
1573
1716
            suite.addTest(plugin.test_suite())