/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
import sys
27
27
 
28
28
from bzrlib import (
 
29
    errors,
29
30
    osutils,
30
31
    urlutils,
31
32
    )
32
33
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
33
 
                           LockError, PathError,
 
34
                           LockError, NoSmartServer, PathError,
34
35
                           TransportNotPossible, ConnectionError,
35
36
                           InvalidURL)
36
37
from bzrlib.osutils import getcwd
37
38
from bzrlib.symbol_versioning import zero_eleven
38
39
from bzrlib.tests import TestCaseInTempDir, TestSkipped
39
40
from bzrlib.tests.test_transport import TestTransportImplementation
40
 
from bzrlib.transport import memory
 
41
from bzrlib.transport import memory, smart
41
42
import bzrlib.transport
42
43
 
43
44
 
68
69
        except excClass:
69
70
            return
70
71
        else:
71
 
            if hasattr(excClass,'__name__'): excName = excClass.__name__
72
 
            else: excName = str(excClass)
 
72
            if getattr(excClass,'__name__', None) is not None:
 
73
                excName = excClass.__name__
 
74
            else:
 
75
                excName = str(excClass)
73
76
            raise self.failureException, "%s not raised" % excName
74
77
 
75
78
    def test_has(self):
89
92
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
90
93
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
91
94
 
 
95
    def test_has_root_works(self):
 
96
        current_transport = self.get_transport()
 
97
        # import pdb;pdb.set_trace()
 
98
        self.assertTrue(current_transport.has('/'))
 
99
        root = current_transport.clone('/')
 
100
        self.assertTrue(root.has(''))
 
101
 
92
102
    def test_get(self):
93
103
        t = self.get_transport()
94
104
 
142
152
                             t.put, 'b', StringIO('file-like\ncontents\n'))
143
153
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
144
154
 
 
155
        self.assertRaises(NoSuchFile,
 
156
            self.applyDeprecated,
 
157
            zero_eleven,
 
158
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
159
 
145
160
    def test_put_bytes(self):
146
161
        t = self.get_transport()
147
162
 
398
413
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
399
414
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
400
415
 
 
416
    def test_put_permissions(self):
 
417
        t = self.get_transport()
 
418
 
 
419
        if t.is_readonly():
 
420
            return
 
421
        if not t._can_roundtrip_unix_modebits():
 
422
            # Can't roundtrip, so no need to run this test
 
423
            return
 
424
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
 
425
                             StringIO('test text\n'), mode=0644)
 
426
        self.assertTransportMode(t, 'mode644', 0644)
 
427
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
 
428
                             StringIO('test text\n'), mode=0666)
 
429
        self.assertTransportMode(t, 'mode666', 0666)
 
430
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
 
431
                             StringIO('test text\n'), mode=0600)
 
432
        self.assertTransportMode(t, 'mode600', 0600)
 
433
        # Yes, you can put a file such that it becomes readonly
 
434
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
 
435
                             StringIO('test text\n'), mode=0400)
 
436
        self.assertTransportMode(t, 'mode400', 0400)
 
437
        self.applyDeprecated(zero_eleven, t.put_multi,
 
438
                             [('mmode644', StringIO('text\n'))], mode=0644)
 
439
        self.assertTransportMode(t, 'mmode644', 0644)
 
440
 
 
441
        # The default permissions should be based on the current umask
 
442
        umask = osutils.get_umask()
 
443
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
 
444
                             StringIO('test text\n'), mode=None)
 
445
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
446
        
401
447
    def test_mkdir(self):
402
448
        t = self.get_transport()
403
449
 
749
795
        t.mkdir('adir/asubdir')
750
796
        t.mkdir('bdir')
751
797
        t.mkdir('bdir/bsubdir')
 
798
        # any kind of PathError would be OK, though we normally expect
 
799
        # DirectoryNotEmpty
752
800
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
753
801
        # nothing was changed so it should still be as before
754
802
        self.assertTrue(t.has('bdir/bsubdir'))
839
887
        # TODO: test copy_multi
840
888
 
841
889
    def test_connection_error(self):
842
 
        """ConnectionError is raised when connection is impossible"""
 
890
        """ConnectionError is raised when connection is impossible.
 
891
        
 
892
        The error may be raised from either the constructor or the first
 
893
        operation on the transport.
 
894
        """
843
895
        try:
844
896
            url = self._server.get_bogus_url()
845
897
        except NotImplementedError:
990
1042
        self.failUnless(t2.has('d'))
991
1043
        self.failUnless(t3.has('b/d'))
992
1044
 
 
1045
    def test_clone_to_root(self):
 
1046
        orig_transport = self.get_transport()
 
1047
        # Repeatedly go up to a parent directory until we're at the root
 
1048
        # directory of this transport
 
1049
        root_transport = orig_transport
 
1050
        new_transport = root_transport.clone("..")
 
1051
        # as we are walking up directories, the path must be must be 
 
1052
        # growing less, except at the top
 
1053
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1054
            or new_transport.base == root_transport.base)
 
1055
        while new_transport.base != root_transport.base:
 
1056
            root_transport = new_transport
 
1057
            new_transport = root_transport.clone("..")
 
1058
            # as we are walking up directories, the path must be must be 
 
1059
            # growing less, except at the top
 
1060
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1061
                or new_transport.base == root_transport.base)
 
1062
 
 
1063
        # Cloning to "/" should take us to exactly the same location.
 
1064
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1065
        # the abspath of "/" from the original transport should be the same
 
1066
        # as the base at the root:
 
1067
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1068
 
 
1069
        # At the root, the URL must still end with / as its a directory
 
1070
        self.assertEqual(root_transport.base[-1], '/')
 
1071
 
 
1072
    def test_clone_from_root(self):
 
1073
        """At the root, cloning to a simple dir should just do string append."""
 
1074
        orig_transport = self.get_transport()
 
1075
        root_transport = orig_transport.clone('/')
 
1076
        self.assertEqual(root_transport.base + '.bzr/',
 
1077
            root_transport.clone('.bzr').base)
 
1078
 
 
1079
    def test_base_url(self):
 
1080
        t = self.get_transport()
 
1081
        self.assertEqual('/', t.base[-1])
 
1082
 
993
1083
    def test_relpath(self):
994
1084
        t = self.get_transport()
995
1085
        self.assertEqual('', t.relpath(t.base))
1018
1108
        # specific test cases.
1019
1109
        transport = self.get_transport()
1020
1110
        
1021
 
        # disabled because some transports might normalize urls in generating
1022
 
        # the abspath - eg http+pycurl-> just http -- mbp 20060308 
1023
1111
        self.assertEqual(transport.base + 'relpath',
1024
1112
                         transport.abspath('relpath'))
1025
1113
 
 
1114
        # This should work without raising an error.
 
1115
        transport.abspath("/")
 
1116
 
 
1117
        # the abspath of "/" and "/foo/.." should result in the same location
 
1118
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1119
 
1026
1120
    def test_local_abspath(self):
1027
1121
        transport = self.get_transport()
1028
1122
        try:
1154
1248
        self.check_transport_contents('bar', transport2, 'foo')
1155
1249
 
1156
1250
    def test_lock_write(self):
 
1251
        """Test transport-level write locks.
 
1252
 
 
1253
        These are deprecated and transports may decline to support them.
 
1254
        """
1157
1255
        transport = self.get_transport()
1158
1256
        if transport.is_readonly():
1159
1257
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1160
1258
            return
1161
1259
        transport.put_bytes('lock', '')
1162
 
        lock = transport.lock_write('lock')
 
1260
        try:
 
1261
            lock = transport.lock_write('lock')
 
1262
        except TransportNotPossible:
 
1263
            return
1163
1264
        # TODO make this consistent on all platforms:
1164
1265
        # self.assertRaises(LockError, transport.lock_write, 'lock')
1165
1266
        lock.unlock()
1166
1267
 
1167
1268
    def test_lock_read(self):
 
1269
        """Test transport-level read locks.
 
1270
 
 
1271
        These are deprecated and transports may decline to support them.
 
1272
        """
1168
1273
        transport = self.get_transport()
1169
1274
        if transport.is_readonly():
1170
1275
            file('lock', 'w').close()
1171
1276
        else:
1172
1277
            transport.put_bytes('lock', '')
1173
 
        lock = transport.lock_read('lock')
 
1278
        try:
 
1279
            lock = transport.lock_read('lock')
 
1280
        except TransportNotPossible:
 
1281
            return
1174
1282
        # TODO make this consistent on all platforms:
1175
1283
        # self.assertRaises(LockError, transport.lock_read, 'lock')
1176
1284
        lock.unlock()
1200
1308
        self.assertEqual(d[1], (9, '9'))
1201
1309
        self.assertEqual(d[2], (0, '0'))
1202
1310
        self.assertEqual(d[3], (3, '34'))
 
1311
 
 
1312
    def test_get_smart_client(self):
 
1313
        """All transports must either give a smart client, or know they can't.
 
1314
 
 
1315
        For some transports such as http this might depend on probing to see 
 
1316
        what's actually present on the other end.  (But we can adjust for that 
 
1317
        in the future.)
 
1318
        """
 
1319
        transport = self.get_transport()
 
1320
        try:
 
1321
            client = transport.get_smart_client()
 
1322
            # XXX: should be a more general class
 
1323
            self.assertIsInstance(client, smart.SmartStreamClient)
 
1324
        except NoSmartServer:
 
1325
            # as long as we got it we're fine
 
1326
            pass
 
1327
 
 
1328
    def test_readv_short_read(self):
 
1329
        transport = self.get_transport()
 
1330
        if transport.is_readonly():
 
1331
            file('a', 'w').write('0123456789')
 
1332
        else:
 
1333
            transport.put_bytes('a', '01234567890')
 
1334
 
 
1335
        # This is intentionally reading off the end of the file
 
1336
        # since we are sure that it cannot get there
 
1337
        self.assertListRaises((errors.ShortReadvError, AssertionError),
 
1338
                              transport.readv, 'a', [(1,1), (8,10)])
 
1339
 
 
1340
        # This is trying to seek past the end of the file, it should
 
1341
        # also raise a special error
 
1342
        self.assertListRaises(errors.ShortReadvError,
 
1343
                              transport.readv, 'a', [(12,2)])