/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
27
27
import stat
28
28
import sys
29
 
import unittest
30
29
 
31
 
from bzrlib import (
 
30
from brzlib import (
32
31
    errors,
33
32
    osutils,
 
33
    pyutils,
34
34
    tests,
 
35
    transport as _mod_transport,
35
36
    urlutils,
36
37
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
 
38
from brzlib.errors import (ConnectionError,
39
39
                           FileExists,
40
40
                           InvalidURL,
41
 
                           LockError,
42
41
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
42
                           PathError,
45
43
                           TransportNotPossible,
46
44
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
 
45
from brzlib.osutils import getcwd
 
46
from brzlib.smart import medium
 
47
from brzlib.tests import (
51
48
    TestSkipped,
52
49
    TestNotApplicable,
53
50
    multiply_tests,
54
51
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
 
52
from brzlib.tests import test_server
 
53
from brzlib.tests.test_transport import TestTransportImplementation
 
54
from brzlib.transport import (
58
55
    ConnectedTransport,
59
 
    get_transport,
 
56
    Transport,
60
57
    _get_transport_modules,
61
58
    )
62
 
from bzrlib.transport.memory import MemoryTransport
 
59
from brzlib.transport.memory import MemoryTransport
 
60
from brzlib.transport.remote import RemoteTransport
63
61
 
64
62
 
65
63
def get_transport_test_permutations(module):
78
76
    for module in _get_transport_modules():
79
77
        try:
80
78
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
79
                pyutils.get_named_object(module))
82
80
            for (klass, server_factory) in permutations:
83
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
82
                    {"transport_class":klass,
102
100
 
103
101
    def setUp(self):
104
102
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
106
104
 
107
105
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
108
 
111
109
    def test_ensure_base_missing(self):
112
110
        """.ensure_base() should create the directory if it doesn't exist"""
211
209
                    ]
212
210
        self.build_tree(files, transport=t, line_endings='binary')
213
211
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
216
223
 
217
224
    def test_get_directory_read_gives_ReadError(self):
218
225
        """consistent errors for read() on a file returned by get()."""
251
258
 
252
259
    def test_get_bytes_unknown_file(self):
253
260
        t = self.get_transport()
254
 
 
255
261
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
262
 
257
263
    def test_get_with_open_write_stream_sees_all_content(self):
261
267
        handle = t.open_write_stream('foo')
262
268
        try:
263
269
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
 
270
            self.assertEqual('b', t.get_bytes('foo'))
265
271
        finally:
266
272
            handle.close()
267
273
 
273
279
        try:
274
280
            handle.write('b')
275
281
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
277
287
        finally:
278
288
            handle.close()
279
289
 
286
296
            return
287
297
 
288
298
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
 
299
        self.assertTrue(t.has('a'))
290
300
        self.check_transport_contents('some text for a\n', t, 'a')
291
301
 
292
302
        # The contents should be overwritten
304
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
305
315
            return
306
316
 
307
 
        self.failIf(t.has('a'))
 
317
        self.assertFalse(t.has('a'))
308
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
 
319
        self.assertTrue(t.has('a'))
310
320
        self.check_transport_contents('some text for a\n', t, 'a')
311
321
        # Put also replaces contents
312
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
324
334
        # Now test the create_parent flag
325
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
336
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
 
337
        self.assertFalse(t.has('dir/a'))
328
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
339
                               create_parent_dir=True)
330
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
402
412
        result = t.put_file('a', StringIO('some text for a\n'))
403
413
        # put_file returns the length of the data written
404
414
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
 
415
        self.assertTrue(t.has('a'))
406
416
        self.check_transport_contents('some text for a\n', t, 'a')
407
417
        # Put also replaces contents
408
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
420
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
421
431
            return
422
432
 
423
 
        self.failIf(t.has('a'))
 
433
        self.assertFalse(t.has('a'))
424
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
 
435
        self.assertTrue(t.has('a'))
426
436
        self.check_transport_contents('some text for a\n', t, 'a')
427
437
        # Put also replaces contents
428
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
440
450
        # Now test the create_parent flag
441
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
452
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
 
453
        self.assertFalse(t.has('dir/a'))
444
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
445
455
                              create_parent_dir=True)
446
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
510
520
        self.assertTransportMode(t, 'dir777', 0777)
511
521
 
512
522
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
523
        t = self.get_transport()
520
524
        if t.is_readonly():
521
525
            return
522
526
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
527
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
539
528
 
540
529
    def test_mkdir(self):
541
530
        t = self.get_transport()
620
609
    def test_opening_a_file_stream_can_set_mode(self):
621
610
        t = self.get_transport()
622
611
        if t.is_readonly():
 
612
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
613
                              t.open_write_stream, 'foo')
623
614
            return
624
615
        if not t._can_roundtrip_unix_modebits():
625
616
            # Can't roundtrip, so no need to run this test
626
617
            return
 
618
 
627
619
        def check_mode(name, mode, expected):
628
620
            handle = t.open_write_stream(name, mode=mode)
629
621
            handle.close()
645
637
            self.build_tree(files, transport=transport_from)
646
638
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
639
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
 
640
                self.check_transport_contents(transport_to.get_bytes(f),
649
641
                                              transport_from, f)
650
642
 
651
643
        t = self.get_transport()
674
666
        files = ['a', 'b', 'c', 'd']
675
667
        t.copy_to(iter(files), temp_transport)
676
668
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
669
            self.check_transport_contents(temp_transport.get_bytes(f),
678
670
                                          t, f)
679
671
        del temp_transport
680
672
 
823
815
            return
824
816
 
825
817
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
 
818
        self.assertTrue(t.has('a'))
827
819
        t.delete('a')
828
 
        self.failIf(t.has('a'))
 
820
        self.assertFalse(t.has('a'))
829
821
 
830
822
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
823
 
837
829
        t.delete_multi(['a', 'c'])
838
830
        self.assertEqual([False, True, False],
839
831
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
 
832
        self.assertFalse(t.has('a'))
 
833
        self.assertTrue(t.has('b'))
 
834
        self.assertFalse(t.has('c'))
843
835
 
844
836
        self.assertRaises(NoSuchFile,
845
837
                t.delete_multi, ['a', 'b', 'c'])
906
898
        t.mkdir('foo-baz')
907
899
        t.rmdir('foo')
908
900
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
 
901
        self.assertTrue(t.has('foo-bar'))
910
902
 
911
903
    def test_rename_dir_succeeds(self):
912
904
        t = self.get_transport()
913
905
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
 
906
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
907
                              t.rename, 'foo', 'bar')
 
908
            return
915
909
        t.mkdir('adir')
916
910
        t.mkdir('adir/asubdir')
917
911
        t.rename('adir', 'bdir')
922
916
        """Attempting to replace a nonemtpy directory should fail"""
923
917
        t = self.get_transport()
924
918
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
 
919
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
920
                              t.rename, 'foo', 'bar')
 
921
            return
926
922
        t.mkdir('adir')
927
923
        t.mkdir('adir/asubdir')
928
924
        t.mkdir('bdir')
992
988
        # perhaps all of this could be done in a subdirectory
993
989
 
994
990
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
991
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
996
992
 
997
993
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
 
994
        self.assertTrue(t.has('b'))
 
995
        self.assertFalse(t.has('a'))
1000
996
 
1001
997
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
998
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
1003
999
 
1004
1000
        # Overwrite a file
1005
1001
        t.put_bytes('c', 'c this file\n')
1006
1002
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
 
1003
        self.assertFalse(t.has('c'))
1008
1004
        self.check_transport_contents('c this file\n', t, 'b')
1009
1005
 
1010
1006
        # TODO: Try to write a test for atomicity
1042
1038
        except NotImplementedError:
1043
1039
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
1040
                              self._server.__class__)
1045
 
        t = get_transport(url)
 
1041
        t = _mod_transport.get_transport_from_url(url)
1046
1042
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
1043
 
1048
1044
    def test_stat(self):
1064
1060
        for path, size in zip(paths, sizes):
1065
1061
            st = t.stat(path)
1066
1062
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1063
                self.assertTrue(S_ISDIR(st.st_mode))
1068
1064
                # directory sizes are meaningless
1069
1065
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
 
1066
                self.assertTrue(S_ISREG(st.st_mode))
1071
1067
                self.assertEqual(size, st.st_size)
1072
1068
 
1073
1069
        remote_stats = list(t.stat_multi(paths))
1080
1076
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
1077
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
1078
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
 
1079
        st = subdir.stat('./file')
 
1080
        st = subdir.stat('.')
1085
1081
 
1086
1082
    def test_hardlink(self):
1087
1083
        from stat import ST_NLINK
1096
1092
        try:
1097
1093
            t.hardlink(source_name, link_name)
1098
1094
 
1099
 
            self.failUnless(t.has(source_name))
1100
 
            self.failUnless(t.has(link_name))
 
1095
            self.assertTrue(t.has(source_name))
 
1096
            self.assertTrue(t.has(link_name))
1101
1097
 
1102
1098
            st = t.stat(link_name)
1103
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1099
            self.assertEqual(st[ST_NLINK], 2)
1104
1100
        except TransportNotPossible:
1105
1101
            raise TestSkipped("Transport %s does not support hardlinks." %
1106
1102
                              self._server.__class__)
1118
1114
        try:
1119
1115
            t.symlink(source_name, link_name)
1120
1116
 
1121
 
            self.failUnless(t.has(source_name))
1122
 
            self.failUnless(t.has(link_name))
 
1117
            self.assertTrue(t.has(source_name))
 
1118
            self.assertTrue(t.has(link_name))
1123
1119
 
1124
1120
            st = t.stat(link_name)
1125
 
            self.failUnless(S_ISLNK(st.st_mode))
 
1121
            self.assertTrue(S_ISLNK(st.st_mode),
 
1122
                "expected symlink, got mode %o" % st.st_mode)
1126
1123
        except TransportNotPossible:
1127
1124
            raise TestSkipped("Transport %s does not support symlinks." %
1128
1125
                              self._server.__class__)
1129
1126
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1127
            self.knownFailure("Paramiko fails to create symlinks during tests")
1131
1128
 
1132
1129
    def test_list_dir(self):
1133
1130
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1197
1194
            raise TestSkipped("not a connected transport")
1198
1195
 
1199
1196
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1197
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1198
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1199
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1200
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1201
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1205
1202
 
1206
1203
    def test__reuse_for(self):
1207
1204
        t = self.get_transport()
1214
1211
 
1215
1212
            Only the parameters different from None will be changed.
1216
1213
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1214
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1215
            if user     is None: user     = t._parsed_url.user
 
1216
            if password is None: password = t._parsed_url.password
 
1217
            if user     is None: user     = t._parsed_url.user
 
1218
            if host     is None: host     = t._parsed_url.host
 
1219
            if port     is None: port     = t._parsed_url.port
 
1220
            if path     is None: path     = t._parsed_url.path
 
1221
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1222
 
1226
 
        if t._scheme == 'ftp':
 
1223
        if t._parsed_url.scheme == 'ftp':
1227
1224
            scheme = 'sftp'
1228
1225
        else:
1229
1226
            scheme = 'ftp'
1230
1227
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1228
        if t._parsed_url.user == 'me':
1232
1229
            user = 'you'
1233
1230
        else:
1234
1231
            user = 'me'
1245
1242
        #   (they may be typed by the user when prompted for example)
1246
1243
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1244
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1245
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1246
        if t._parsed_url.port == 1234:
1250
1247
            port = 4321
1251
1248
        else:
1252
1249
            port = 1234
1265
1262
        self.assertIs(t._get_connection(), c._get_connection())
1266
1263
 
1267
1264
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = object()
 
1265
        new_connection = None
1269
1266
        t._set_connection(new_connection)
1270
1267
        # Check that both transports use the same connection
1271
1268
        self.assertIs(new_connection, t._get_connection())
1293
1290
 
1294
1291
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1292
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1293
        self.assertTrue(t1.has('a'))
 
1294
        self.assertTrue(t1.has('b/c'))
 
1295
        self.assertFalse(t1.has('c'))
1299
1296
 
1300
1297
        t2 = t1.clone('b')
1301
1298
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1299
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1300
        self.assertTrue(t2.has('c'))
 
1301
        self.assertFalse(t2.has('a'))
1305
1302
 
1306
1303
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1304
        self.assertTrue(t3.has('a'))
 
1305
        self.assertFalse(t3.has('c'))
1309
1306
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1307
        self.assertFalse(t1.has('b/d'))
 
1308
        self.assertFalse(t2.has('d'))
 
1309
        self.assertFalse(t3.has('b/d'))
1313
1310
 
1314
1311
        if t1.is_readonly():
1315
1312
            self.build_tree_contents([('b/d', 'newfile\n')])
1316
1313
        else:
1317
1314
            t2.put_bytes('d', 'newfile\n')
1318
1315
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1316
        self.assertTrue(t1.has('b/d'))
 
1317
        self.assertTrue(t2.has('d'))
 
1318
        self.assertTrue(t3.has('b/d'))
1322
1319
 
1323
1320
    def test_clone_to_root(self):
1324
1321
        orig_transport = self.get_transport()
1398
1395
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1396
                         transport.abspath("/foo"))
1400
1397
 
 
1398
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1399
    def test_win32_abspath(self):
1402
1400
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1401
        # other platforms too, but then osutils does platform specific
1408
1406
 
1409
1407
        # smoke test for abspath on win32.
1410
1408
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1409
        transport = _mod_transport.get_transport_from_url("file:///")
 
1410
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1411
 
1414
1412
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1413
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1414
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1415
 
1418
1416
    def test_local_abspath(self):
1419
1417
        transport = self.get_transport()
1545
1543
 
1546
1544
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1545
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1546
            self.knownFailure("test server cannot handle unicode paths")
1549
1547
 
1550
1548
        try:
1551
1549
            self.build_tree(files, transport=t, line_endings='binary')
1616
1614
    def test_readv(self):
1617
1615
        transport = self.get_transport()
1618
1616
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1617
            with file('a', 'w') as f: f.write('0123456789')
1620
1618
        else:
1621
1619
            transport.put_bytes('a', '0123456789')
1622
1620
 
1632
1630
    def test_readv_out_of_order(self):
1633
1631
        transport = self.get_transport()
1634
1632
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1633
            with file('a', 'w') as f: f.write('0123456789')
1636
1634
        else:
1637
1635
            transport.put_bytes('a', '01234567890')
1638
1636
 
1710
1708
        transport = self.get_transport()
1711
1709
        # test from observed failure case.
1712
1710
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1711
            with file('a', 'w') as f: f.write('a'*1024*1024)
1714
1712
        else:
1715
1713
            transport.put_bytes('a', 'a'*1024*1024)
1716
1714
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1750
1748
    def test_readv_short_read(self):
1751
1749
        transport = self.get_transport()
1752
1750
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1751
            with file('a', 'w') as f: f.write('0123456789')
1754
1752
        else:
1755
1753
            transport.put_bytes('a', '01234567890')
1756
1754
 
1765
1763
        # also raise a special error
1766
1764
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
1765
                              transport.readv, 'a', [(12,2)])
 
1766
 
 
1767
    def test_no_segment_parameters(self):
 
1768
        """Segment parameters should be stripped and stored in
 
1769
        transport.segment_parameters."""
 
1770
        transport = self.get_transport("foo")
 
1771
        self.assertEqual({}, transport.get_segment_parameters())
 
1772
 
 
1773
    def test_segment_parameters(self):
 
1774
        """Segment parameters should be stripped and stored in
 
1775
        transport.get_segment_parameters()."""
 
1776
        base_url = self._server.get_url()
 
1777
        parameters = {"key1": "val1", "key2": "val2"}
 
1778
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1779
        transport = _mod_transport.get_transport_from_url(url)
 
1780
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1781
 
 
1782
    def test_set_segment_parameters(self):
 
1783
        """Segment parameters can be set and show up in base."""
 
1784
        transport = self.get_transport("foo")
 
1785
        orig_base = transport.base
 
1786
        transport.set_segment_parameter("arm", "board")
 
1787
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1788
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1789
        transport.set_segment_parameter("arm", None)
 
1790
        transport.set_segment_parameter("nonexistant", None)
 
1791
        self.assertEqual({}, transport.get_segment_parameters())
 
1792
        self.assertEqual(orig_base, transport.base)
 
1793
 
 
1794
    def test_stat_symlink(self):
 
1795
        # if a transport points directly to a symlink (and supports symlinks
 
1796
        # at all) you can tell this.  helps with bug 32669.
 
1797
        t = self.get_transport()
 
1798
        try:
 
1799
            t.symlink('target', 'link')
 
1800
        except TransportNotPossible:
 
1801
            raise TestSkipped("symlinks not supported")
 
1802
        t2 = t.clone('link')
 
1803
        st = t2.stat('')
 
1804
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1805
 
 
1806
    def test_abspath_url_unquote_unreserved(self):
 
1807
        """URLs from abspath should have unreserved characters unquoted
 
1808
        
 
1809
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1810
        """
 
1811
        t = self.get_transport()
 
1812
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1813
        self.assertEqual(t.base + "-.09AZ_az~",
 
1814
            t.abspath(needlessly_escaped_dir))
 
1815
 
 
1816
    def test_clone_url_unquote_unreserved(self):
 
1817
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1818
        
 
1819
        Cloned transports should be prefix comparable for things like the
 
1820
        isolation checking of tests, see lp:842223 for more.
 
1821
        """
 
1822
        t1 = self.get_transport()
 
1823
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1824
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1825
        t2 = t1.clone(needlessly_escaped_dir)
 
1826
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1827
 
 
1828
    def test_hook_post_connection_one(self):
 
1829
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1830
        log = []
 
1831
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1832
        t = self.get_transport()
 
1833
        self.assertEqual([], log)
 
1834
        t.has("non-existant")
 
1835
        if isinstance(t, RemoteTransport):
 
1836
            self.assertEqual([t.get_smart_medium()], log)
 
1837
        elif isinstance(t, ConnectedTransport):
 
1838
            self.assertEqual([t], log)
 
1839
        else:
 
1840
            self.assertEqual([], log)
 
1841
 
 
1842
    def test_hook_post_connection_multi(self):
 
1843
        """Fire post_connect hook once per unshared underlying connection"""
 
1844
        log = []
 
1845
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1846
        t1 = self.get_transport()
 
1847
        t2 = t1.clone(".")
 
1848
        t3 = self.get_transport()
 
1849
        self.assertEqual([], log)
 
1850
        t1.has("x")
 
1851
        t2.has("x")
 
1852
        t3.has("x")
 
1853
        if isinstance(t1, RemoteTransport):
 
1854
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1855
        elif isinstance(t1, ConnectedTransport):
 
1856
            self.assertEqual([t1, t3], log)
 
1857
        else:
 
1858
            self.assertEqual([], log)