/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

  • Committer: Martin Pool
  • Date: 2007-09-14 06:31:28 UTC
  • mfrom: (2822 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2823.
  • Revision ID: mbp@sourcefrog.net-20070914063128-0p7mh6zfb4pzdg9p
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
45
45
                           )
46
46
from bzrlib.osutils import getcwd
47
47
from bzrlib.smart import medium
48
 
from bzrlib.symbol_versioning import zero_eleven
49
48
from bzrlib.tests import TestCaseInTempDir, TestScenarioApplier, TestSkipped
50
49
from bzrlib.tests.test_transport import TestTransportImplementation
51
 
from bzrlib.transport import memory, remote, _get_transport_modules
52
 
import bzrlib.transport
 
50
from bzrlib.transport import (
 
51
    ConnectedTransport,
 
52
    get_transport,
 
53
    _get_transport_modules,
 
54
    )
 
55
from bzrlib.transport.memory import MemoryTransport
53
56
 
54
57
 
55
58
class TransportTestProviderAdapter(TestScenarioApplier):
135
138
        t_b = t_a.clone('b')
136
139
        self.assertRaises(NoSuchFile, t_b.ensure_base)
137
140
 
 
141
    def test_external_url(self):
 
142
        """.external_url either works or raises InProcessTransport."""
 
143
        t = self.get_transport()
 
144
        try:
 
145
            t.external_url()
 
146
        except errors.InProcessTransport:
 
147
            pass
 
148
 
138
149
    def test_has(self):
139
150
        t = self.get_transport()
140
151
 
218
229
 
219
230
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
220
231
 
221
 
    def test_put(self):
222
 
        t = self.get_transport()
223
 
 
224
 
        if t.is_readonly():
225
 
            return
226
 
 
227
 
        self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
228
 
        self.check_transport_contents('string\ncontents\n', t, 'a')
229
 
 
230
 
        self.applyDeprecated(zero_eleven,
231
 
                             t.put, 'b', StringIO('file-like\ncontents\n'))
232
 
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
233
 
 
234
 
        self.assertRaises(NoSuchFile,
235
 
            self.applyDeprecated,
236
 
            zero_eleven,
237
 
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
232
    def test_get_with_open_write_stream_sees_all_content(self):
 
233
        t = self.get_transport()
 
234
        if t.is_readonly():
 
235
            return
 
236
        handle = t.open_write_stream('foo')
 
237
        try:
 
238
            handle.write('b')
 
239
            self.assertEqual('b', t.get('foo').read())
 
240
        finally:
 
241
            handle.close()
 
242
 
 
243
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
244
        t = self.get_transport()
 
245
        if t.is_readonly():
 
246
            return
 
247
        handle = t.open_write_stream('foo')
 
248
        try:
 
249
            handle.write('b')
 
250
            self.assertEqual('b', t.get_bytes('foo'))
 
251
            self.assertEqual('b', t.get('foo').read())
 
252
        finally:
 
253
            handle.close()
238
254
 
239
255
    def test_put_bytes(self):
240
256
        t = self.get_transport()
424
440
        # Yes, you can put a file such that it becomes readonly
425
441
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
426
442
        self.assertTransportMode(t, 'mode400', 0400)
427
 
 
428
 
        # XXX: put_multi is deprecated, so do we really care anymore?
429
 
        self.applyDeprecated(zero_eleven, t.put_multi,
430
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
431
 
        self.assertTransportMode(t, 'mmode644', 0644)
432
 
 
433
443
        # The default permissions should be based on the current umask
434
444
        umask = osutils.get_umask()
435
445
        t.put_file('nomode', StringIO('test text\n'), mode=None)
499
509
        unicode_file = pyStringIO(u'\u1234')
500
510
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
501
511
 
502
 
    def test_put_multi(self):
503
 
        t = self.get_transport()
504
 
 
505
 
        if t.is_readonly():
506
 
            return
507
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
508
 
            t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
509
 
                          ('d', StringIO('contents\nfor d\n'))]
510
 
            ))
511
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
512
 
                [True, False, False, True])
513
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
514
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
515
 
 
516
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
517
 
            t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
518
 
                              ('d', StringIO('another contents\nfor d\n'))])
519
 
            ))
520
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
521
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
522
 
 
523
 
    def test_put_permissions(self):
524
 
        t = self.get_transport()
525
 
 
526
 
        if t.is_readonly():
527
 
            return
528
 
        if not t._can_roundtrip_unix_modebits():
529
 
            # Can't roundtrip, so no need to run this test
530
 
            return
531
 
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
532
 
                             StringIO('test text\n'), mode=0644)
533
 
        self.assertTransportMode(t, 'mode644', 0644)
534
 
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
535
 
                             StringIO('test text\n'), mode=0666)
536
 
        self.assertTransportMode(t, 'mode666', 0666)
537
 
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
538
 
                             StringIO('test text\n'), mode=0600)
539
 
        self.assertTransportMode(t, 'mode600', 0600)
540
 
        # Yes, you can put a file such that it becomes readonly
541
 
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
542
 
                             StringIO('test text\n'), mode=0400)
543
 
        self.assertTransportMode(t, 'mode400', 0400)
544
 
        self.applyDeprecated(zero_eleven, t.put_multi,
545
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
546
 
        self.assertTransportMode(t, 'mmode644', 0644)
547
 
 
548
 
        # The default permissions should be based on the current umask
549
 
        umask = osutils.get_umask()
550
 
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
551
 
                             StringIO('test text\n'), mode=None)
552
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
553
 
        
554
512
    def test_mkdir(self):
555
513
        t = self.get_transport()
556
514
 
621
579
        t.mkdir('dnomode', mode=None)
622
580
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
623
581
 
 
582
    def test_opening_a_file_stream_creates_file(self):
 
583
        t = self.get_transport()
 
584
        if t.is_readonly():
 
585
            return
 
586
        handle = t.open_write_stream('foo')
 
587
        try:
 
588
            self.assertEqual('', t.get_bytes('foo'))
 
589
        finally:
 
590
            handle.close()
 
591
 
 
592
    def test_opening_a_file_stream_can_set_mode(self):
 
593
        t = self.get_transport()
 
594
        if t.is_readonly():
 
595
            return
 
596
        if not t._can_roundtrip_unix_modebits():
 
597
            # Can't roundtrip, so no need to run this test
 
598
            return
 
599
        def check_mode(name, mode, expected):
 
600
            handle = t.open_write_stream(name, mode=mode)
 
601
            handle.close()
 
602
            self.assertTransportMode(t, name, expected)
 
603
        check_mode('mode644', 0644, 0644)
 
604
        check_mode('mode666', 0666, 0666)
 
605
        check_mode('mode600', 0600, 0600)
 
606
        # The default permissions should be based on the current umask
 
607
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
608
 
624
609
    def test_copy_to(self):
625
610
        # FIXME: test:   same server to same server (partly done)
626
611
        # same protocol two servers
627
612
        # and    different protocols (done for now except for MemoryTransport.
628
613
        # - RBC 20060122
629
 
        from bzrlib.transport.memory import MemoryTransport
630
614
 
631
615
        def simple_copy_files(transport_from, transport_to):
632
616
            files = ['a', 'b', 'c', 'd']
672
656
            for f in files:
673
657
                self.assertTransportMode(temp_transport, f, mode)
674
658
 
675
 
    def test_append(self):
676
 
        t = self.get_transport()
677
 
 
678
 
        if t.is_readonly():
679
 
            return
680
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
681
 
        t.put_bytes('b', 'contents\nfor b\n')
682
 
 
683
 
        self.assertEqual(20, self.applyDeprecated(zero_eleven,
684
 
            t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
685
 
 
686
 
        self.check_transport_contents(
687
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
688
 
            t, 'a')
689
 
 
690
 
        # And we can create new files, too
691
 
        self.assertEqual(0, self.applyDeprecated(zero_eleven,
692
 
            t.append, 'c', StringIO('some text\nfor a missing file\n')))
693
 
        self.check_transport_contents('some text\nfor a missing file\n',
694
 
                                      t, 'c')
695
659
    def test_append_file(self):
696
660
        t = self.get_transport()
697
661
 
855
819
        # plain "listdir".
856
820
        # self.assertEqual([], os.listdir('.'))
857
821
 
 
822
    def test_recommended_page_size(self):
 
823
        """Transports recommend a page size for partial access to files."""
 
824
        t = self.get_transport()
 
825
        self.assertIsInstance(t.recommended_page_size(), int)
 
826
 
858
827
    def test_rmdir(self):
859
828
        t = self.get_transport()
860
829
        # Not much to do with a readonly transport
1022
991
        except NotImplementedError:
1023
992
            raise TestSkipped("Transport %s has no bogus URL support." %
1024
993
                              self._server.__class__)
1025
 
        # This should be:  but SSH still connects on construction. No COOKIE!
1026
 
        # self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1027
 
        try:
1028
 
            t = bzrlib.transport.get_transport(url)
1029
 
            t.get('.bzr/branch')
1030
 
        except (ConnectionError, NoSuchFile), e:
1031
 
            pass
1032
 
        except (Exception), e:
1033
 
            self.fail('Wrong exception thrown (%s.%s): %s' 
1034
 
                        % (e.__class__.__module__, e.__class__.__name__, e))
1035
 
        else:
1036
 
            self.fail('Did not get the expected ConnectionError or NoSuchFile.')
 
994
        t = get_transport(url)
 
995
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1037
996
 
1038
997
    def test_stat(self):
1039
998
        # TODO: Test stat, just try once, and if it throws, stop testing
1129
1088
        self.assertEqual(['%25'], names)
1130
1089
        self.assertIsInstance(names[0], str)
1131
1090
 
 
1091
    def test_clone_preserve_info(self):
 
1092
        t1 = self.get_transport()
 
1093
        if not isinstance(t1, ConnectedTransport):
 
1094
            raise TestSkipped("not a connected transport")
 
1095
 
 
1096
        t2 = t1.clone('subdir')
 
1097
        self.assertEquals(t1._scheme, t2._scheme)
 
1098
        self.assertEquals(t1._user, t2._user)
 
1099
        self.assertEquals(t1._password, t2._password)
 
1100
        self.assertEquals(t1._host, t2._host)
 
1101
        self.assertEquals(t1._port, t2._port)
 
1102
 
 
1103
    def test__reuse_for(self):
 
1104
        t = self.get_transport()
 
1105
        if not isinstance(t, ConnectedTransport):
 
1106
            raise TestSkipped("not a connected transport")
 
1107
 
 
1108
        def new_url(scheme=None, user=None, password=None,
 
1109
                    host=None, port=None, path=None):
 
1110
            """Build a new url from t.base chaging only parts of it.
 
1111
 
 
1112
            Only the parameters different from None will be changed.
 
1113
            """
 
1114
            if scheme   is None: scheme   = t._scheme
 
1115
            if user     is None: user     = t._user
 
1116
            if password is None: password = t._password
 
1117
            if user     is None: user     = t._user
 
1118
            if host     is None: host     = t._host
 
1119
            if port     is None: port     = t._port
 
1120
            if path     is None: path     = t._path
 
1121
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1122
 
 
1123
        self.assertIsNot(t, t._reuse_for(new_url(scheme='foo')))
 
1124
        if t._user == 'me':
 
1125
            user = 'you'
 
1126
        else:
 
1127
            user = 'me'
 
1128
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1129
        # passwords are not taken into account because:
 
1130
        # - it makes no sense to have two different valid passwords for the
 
1131
        #   same user
 
1132
        # - _password in ConnectedTransport is intended to collect what the
 
1133
        #   user specified from the command-line and there are cases where the
 
1134
        #   new url can contain no password (if the url was built from an
 
1135
        #   existing transport.base for example)
 
1136
        # - password are considered part of the credentials provided at
 
1137
        #   connection creation time and as such may not be present in the url
 
1138
        #   (they may be typed by the user when prompted for example)
 
1139
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1140
        # We will not connect, we can use a invalid host
 
1141
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1142
        if t._port == 1234:
 
1143
            port = 4321
 
1144
        else:
 
1145
            port = 1234
 
1146
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1147
 
 
1148
    def test_connection_sharing(self):
 
1149
        t = self.get_transport()
 
1150
        if not isinstance(t, ConnectedTransport):
 
1151
            raise TestSkipped("not a connected transport")
 
1152
 
 
1153
        c = t.clone('subdir')
 
1154
        # Some transports will create the connection  only when needed
 
1155
        t.has('surely_not') # Force connection
 
1156
        self.assertIs(t._get_connection(), c._get_connection())
 
1157
 
 
1158
        # Temporary failure, we need to create a new dummy connection
 
1159
        new_connection = object()
 
1160
        t._set_connection(new_connection)
 
1161
        # Check that both transports use the same connection
 
1162
        self.assertIs(new_connection, t._get_connection())
 
1163
        self.assertIs(new_connection, c._get_connection())
 
1164
 
 
1165
    def test_reuse_connection_for_various_paths(self):
 
1166
        t = self.get_transport()
 
1167
        if not isinstance(t, ConnectedTransport):
 
1168
            raise TestSkipped("not a connected transport")
 
1169
 
 
1170
        t.has('surely_not') # Force connection
 
1171
        self.assertIsNot(None, t._get_connection())
 
1172
 
 
1173
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1174
        self.assertIsNot(t, subdir)
 
1175
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1176
 
 
1177
        home = subdir._reuse_for(t.base + 'home')
 
1178
        self.assertIs(t._get_connection(), home._get_connection())
 
1179
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1180
 
1132
1181
    def test_clone(self):
1133
1182
        # TODO: Test that clone moves up and down the filesystem
1134
1183
        t1 = self.get_transport()
1227
1276
        # that have aliasing problems like symlinks should go in backend
1228
1277
        # specific test cases.
1229
1278
        transport = self.get_transport()
1230
 
        
 
1279
 
1231
1280
        self.assertEqual(transport.base + 'relpath',
1232
1281
                         transport.abspath('relpath'))
1233
1282
 
1353
1402
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1354
1403
 
1355
1404
    def test_connect_twice_is_same_content(self):
1356
 
        # check that our server (whatever it is) is accessable reliably
 
1405
        # check that our server (whatever it is) is accessible reliably
1357
1406
        # via get_transport and multiple connections share content.
1358
1407
        transport = self.get_transport()
1359
1408
        if transport.is_readonly():
1360
1409
            return
1361
1410
        transport.put_bytes('foo', 'bar')
1362
 
        transport2 = self.get_transport()
1363
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1411
        transport3 = self.get_transport()
 
1412
        self.check_transport_contents('bar', transport3, 'foo')
1364
1413
        # its base should be usable.
1365
 
        transport2 = bzrlib.transport.get_transport(transport.base)
1366
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1414
        transport4 = get_transport(transport.base)
 
1415
        self.check_transport_contents('bar', transport4, 'foo')
1367
1416
 
1368
1417
        # now opening at a relative url should give use a sane result:
1369
1418
        transport.mkdir('newdir')
1370
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1371
 
        transport2 = transport2.clone('..')
1372
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1419
        transport5 = get_transport(transport.base + "newdir")
 
1420
        transport6 = transport5.clone('..')
 
1421
        self.check_transport_contents('bar', transport6, 'foo')
1373
1422
 
1374
1423
    def test_lock_write(self):
1375
1424
        """Test transport-level write locks.
1436
1485
        self.assertEqual(d[2], (0, '0'))
1437
1486
        self.assertEqual(d[3], (3, '34'))
1438
1487
 
 
1488
    def test_get_with_open_write_stream_sees_all_content(self):
 
1489
        t = self.get_transport()
 
1490
        if t.is_readonly():
 
1491
            return
 
1492
        handle = t.open_write_stream('foo')
 
1493
        try:
 
1494
            handle.write('bcd')
 
1495
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1496
        finally:
 
1497
            handle.close()
 
1498
 
1439
1499
    def test_get_smart_medium(self):
1440
1500
        """All transports must either give a smart medium, or know they can't.
1441
1501
        """