/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Martin Pool
  • Date: 2009-12-14 06:06:59 UTC
  • mfrom: (4889 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4891.
  • Revision ID: mbp@sourcefrog.net-20091214060659-1ucv8hpnky0cbnaj
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
48
48
from bzrlib.smart import medium
49
49
from bzrlib.tests import (
50
50
    TestCaseInTempDir,
51
 
    TestScenarioApplier,
52
51
    TestSkipped,
53
52
    TestNotApplicable,
 
53
    multiply_tests,
54
54
    )
55
55
from bzrlib.tests.test_transport import TestTransportImplementation
56
56
from bzrlib.transport import (
61
61
from bzrlib.transport.memory import MemoryTransport
62
62
 
63
63
 
64
 
class TransportTestProviderAdapter(TestScenarioApplier):
65
 
    """A tool to generate a suite testing all transports for a single test.
66
 
 
67
 
    This is done by copying the test once for each transport and injecting
68
 
    the transport_class and transport_server classes into each copy. Each copy
69
 
    is also given a new id() to make it easy to identify.
70
 
    """
71
 
 
72
 
    def __init__(self):
73
 
        self.scenarios = self._test_permutations()
74
 
 
75
 
    def get_transport_test_permutations(self, module):
76
 
        """Get the permutations module wants to have tested."""
77
 
        if getattr(module, 'get_test_permutations', None) is None:
78
 
            raise AssertionError(
79
 
                "transport module %s doesn't provide get_test_permutations()"
80
 
                % module.__name__)
81
 
            return []
82
 
        return module.get_test_permutations()
83
 
 
84
 
    def _test_permutations(self):
85
 
        """Return a list of the klass, server_factory pairs to test."""
86
 
        result = []
87
 
        for module in _get_transport_modules():
88
 
            try:
89
 
                permutations = self.get_transport_test_permutations(
90
 
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
91
 
                for (klass, server_factory) in permutations:
92
 
                    scenario = (server_factory.__name__,
93
 
                        {"transport_class":klass,
94
 
                         "transport_server":server_factory})
95
 
                    result.append(scenario)
96
 
            except errors.DependencyNotPresent, e:
97
 
                # Continue even if a dependency prevents us 
98
 
                # from adding this test
99
 
                pass
100
 
        return result
 
64
def get_transport_test_permutations(module):
 
65
    """Get the permutations module wants to have tested."""
 
66
    if getattr(module, 'get_test_permutations', None) is None:
 
67
        raise AssertionError(
 
68
            "transport module %s doesn't provide get_test_permutations()"
 
69
            % module.__name__)
 
70
        return []
 
71
    return module.get_test_permutations()
 
72
 
 
73
 
 
74
def transport_test_permutations():
 
75
    """Return a list of the klass, server_factory pairs to test."""
 
76
    result = []
 
77
    for module in _get_transport_modules():
 
78
        try:
 
79
            permutations = get_transport_test_permutations(
 
80
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
81
            for (klass, server_factory) in permutations:
 
82
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
83
                    {"transport_class":klass,
 
84
                     "transport_server":server_factory})
 
85
                result.append(scenario)
 
86
        except errors.DependencyNotPresent, e:
 
87
            # Continue even if a dependency prevents us
 
88
            # from adding this test
 
89
            pass
 
90
    return result
101
91
 
102
92
 
103
93
def load_tests(standard_tests, module, loader):
104
94
    """Multiply tests for tranport implementations."""
105
95
    result = loader.suiteClass()
106
 
    adapter = TransportTestProviderAdapter()
107
 
    for test in tests.iter_suite_tests(standard_tests):
108
 
        result.addTests(adapter.adapt(test))
109
 
    return result
 
96
    scenarios = transport_test_permutations()
 
97
    return multiply_tests(standard_tests, scenarios, result)
110
98
 
111
99
 
112
100
class TransportTests(TestTransportImplementation):
167
155
        self.assertEqual(True, t.has('a'))
168
156
        self.assertEqual(False, t.has('c'))
169
157
        self.assertEqual(True, t.has(urlutils.escape('%')))
170
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
171
 
                [True, True, False, False, True, False, True, False])
 
158
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
159
                                           'e', 'f', 'g', 'h'])),
 
160
                         [True, True, False, False,
 
161
                          True, False, True, False])
172
162
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
173
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
174
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
175
 
                [True, True, False, False, True, False, True, False])
 
163
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
164
                                           urlutils.escape('%%')]))
 
165
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
166
                                                'e', 'f', 'g', 'h']))),
 
167
                         [True, True, False, False,
 
168
                          True, False, True, False])
176
169
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
177
170
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
178
171
 
210
203
        for content, f in itertools.izip(contents, content_f):
211
204
            self.assertEqual(content, f.read())
212
205
 
 
206
    def test_get_unknown_file(self):
 
207
        t = self.get_transport()
 
208
        files = ['a', 'b']
 
209
        contents = ['contents of a\n',
 
210
                    'contents of b\n',
 
211
                    ]
 
212
        self.build_tree(files, transport=t, line_endings='binary')
213
213
        self.assertRaises(NoSuchFile, t.get, 'c')
214
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
249
249
        for content, fname in zip(contents, files):
250
250
            self.assertEqual(content, t.get_bytes(fname))
251
251
 
 
252
    def test_get_bytes_unknown_file(self):
 
253
        t = self.get_transport()
 
254
 
252
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
253
256
 
254
257
    def test_get_with_open_write_stream_sees_all_content(self):
325
328
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
326
329
                               create_parent_dir=True)
327
330
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
328
 
        
 
331
 
329
332
        # But we still get NoSuchFile if we can't make the parent dir
330
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
331
334
                                       'contents\n',
353
356
        umask = osutils.get_umask()
354
357
        t.put_bytes('nomode', 'test text\n', mode=None)
355
358
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
356
 
        
 
359
 
357
360
    def test_put_bytes_non_atomic_permissions(self):
358
361
        t = self.get_transport()
359
362
 
387
390
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
388
391
                               dir_mode=0777, create_parent_dir=True)
389
392
        self.assertTransportMode(t, 'dir777', 0777)
390
 
        
 
393
 
391
394
    def test_put_file(self):
392
395
        t = self.get_transport()
393
396
 
441
444
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
442
445
                              create_parent_dir=True)
443
446
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
444
 
        
 
447
 
445
448
        # But we still get NoSuchFile if we can't make the parent dir
446
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
447
450
                                       StringIO('contents\n'),
469
472
        umask = osutils.get_umask()
470
473
        t.put_file('nomode', StringIO('test text\n'), mode=None)
471
474
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
472
 
        
 
475
 
473
476
    def test_put_file_non_atomic_permissions(self):
474
477
        t = self.get_transport()
475
478
 
492
495
        umask = osutils.get_umask()
493
496
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
494
497
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
495
 
        
 
498
 
496
499
        # We should also be able to set the mode for a parent directory
497
500
        # when it is created
498
501
        sio = StringIO()
538
541
        t = self.get_transport()
539
542
 
540
543
        if t.is_readonly():
541
 
            # cannot mkdir on readonly transports. We're not testing for 
 
544
            # cannot mkdir on readonly transports. We're not testing for
542
545
            # cache coherency because cache behaviour is not currently
543
546
            # defined for the transport interface.
544
547
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
565
568
 
566
569
        # we were testing that a local mkdir followed by a transport
567
570
        # mkdir failed thusly, but given that we * in one process * do not
568
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
571
        # concurrently fiddle with disk dirs and then use transport to do
569
572
        # things, the win here seems marginal compared to the constraint on
570
573
        # the interface. RBC 20051227
571
574
        t.mkdir('dir_g')
681
684
            for f in files:
682
685
                self.assertTransportMode(temp_transport, f, mode)
683
686
 
 
687
    def test_create_prefix(self):
 
688
        t = self.get_transport()
 
689
        sub = t.clone('foo').clone('bar')
 
690
        try:
 
691
            sub.create_prefix()
 
692
        except TransportNotPossible:
 
693
            self.assertTrue(t.is_readonly())
 
694
        else:
 
695
            self.assertTrue(t.has('foo/bar'))
 
696
 
684
697
    def test_append_file(self):
685
698
        t = self.get_transport()
686
699
 
790
803
                t.append_file, 'f', StringIO('f'), mode=None)
791
804
            return
792
805
        t.append_file('f', StringIO('f'), mode=None)
793
 
        
 
806
 
794
807
    def test_append_bytes_mode(self):
795
808
        # check append_bytes accepts a mode
796
809
        t = self.get_transport()
799
812
                t.append_bytes, 'f', 'f', mode=None)
800
813
            return
801
814
        t.append_bytes('f', 'f', mode=None)
802
 
        
 
815
 
803
816
    def test_delete(self):
804
817
        # TODO: Test Transport.delete
805
818
        t = self.get_transport()
866
879
 
867
880
    def test_rmdir_not_empty(self):
868
881
        """Deleting a non-empty directory raises an exception
869
 
        
 
882
 
870
883
        sftp (and possibly others) don't give us a specific "directory not
871
884
        empty" exception -- we can just see that the operation failed.
872
885
        """
879
892
 
880
893
    def test_rmdir_empty_but_similar_prefix(self):
881
894
        """rmdir does not get confused by sibling paths.
882
 
        
 
895
 
883
896
        A naive implementation of MemoryTransport would refuse to rmdir
884
897
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
885
898
        uses "path.startswith(dir)" on all file paths to determine if directory
951
964
        except TransportNotPossible:
952
965
            # ok, this transport does not support delete_tree
953
966
            return
954
 
        
 
967
 
955
968
        # did it delete that trivial case?
956
969
        self.assertRaises(NoSuchFile, t.stat, 'adir')
957
970
 
958
971
        self.build_tree(['adir/',
959
 
                         'adir/file', 
960
 
                         'adir/subdir/', 
961
 
                         'adir/subdir/file', 
 
972
                         'adir/file',
 
973
                         'adir/subdir/',
 
974
                         'adir/subdir/file',
962
975
                         'adir/subdir2/',
963
976
                         'adir/subdir2/file',
964
977
                         ], transport=t)
1021
1034
 
1022
1035
    def test_connection_error(self):
1023
1036
        """ConnectionError is raised when connection is impossible.
1024
 
        
 
1037
 
1025
1038
        The error should be raised from the first operation on the transport.
1026
1039
        """
1027
1040
        try:
1045
1058
            return
1046
1059
 
1047
1060
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1048
 
        sizes = [14, 0, 16, 0, 18] 
 
1061
        sizes = [14, 0, 16, 0, 18]
1049
1062
        self.build_tree(paths, transport=t, line_endings='binary')
1050
1063
 
1051
1064
        for path, size in zip(paths, sizes):
1073
1086
    def test_list_dir(self):
1074
1087
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1075
1088
        t = self.get_transport()
1076
 
        
 
1089
 
1077
1090
        if not t.listable():
1078
1091
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
1079
1092
            return
1109
1122
        else:
1110
1123
            os.unlink('c/d')
1111
1124
            os.unlink('b')
1112
 
            
 
1125
 
1113
1126
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1114
1127
        self.assertEqual(['e'], sorted_list('c', t))
1115
1128
 
1116
1129
        self.assertListRaises(PathError, t.list_dir, 'q')
1117
1130
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1131
        # 'a' is a file, list_dir should raise an error
1118
1132
        self.assertListRaises(PathError, t.list_dir, 'a')
1119
1133
 
1120
1134
    def test_list_dir_result_is_url_escaped(self):
1126
1140
            self.build_tree(['a/', 'a/%'], transport=t)
1127
1141
        else:
1128
1142
            self.build_tree(['a/', 'a/%'])
1129
 
        
 
1143
 
1130
1144
        names = list(t.list_dir('a'))
1131
1145
        self.assertEqual(['%25'], names)
1132
1146
        self.assertIsInstance(names[0], str)
1252
1266
        self.failIf(t3.has('b/d'))
1253
1267
 
1254
1268
        if t1.is_readonly():
1255
 
            open('b/d', 'wb').write('newfile\n')
 
1269
            self.build_tree_contents([('b/d', 'newfile\n')])
1256
1270
        else:
1257
1271
            t2.put_bytes('d', 'newfile\n')
1258
1272
 
1338
1352
        self.assertEqual(transport.clone("/").abspath('foo'),
1339
1353
                         transport.abspath("/foo"))
1340
1354
 
 
1355
    def test_win32_abspath(self):
 
1356
        # Note: we tried to set sys.platform='win32' so we could test on
 
1357
        # other platforms too, but then osutils does platform specific
 
1358
        # things at import time which defeated us...
 
1359
        if sys.platform != 'win32':
 
1360
            raise TestSkipped(
 
1361
                'Testing drive letters in abspath implemented only for win32')
 
1362
 
 
1363
        # smoke test for abspath on win32.
 
1364
        # a transport based on 'file:///' never fully qualifies the drive.
 
1365
        transport = get_transport("file:///")
 
1366
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1367
 
 
1368
        # but a transport that starts with a drive spec must keep it.
 
1369
        transport = get_transport("file:///C:/")
 
1370
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1371
 
1341
1372
    def test_local_abspath(self):
1342
1373
        transport = self.get_transport()
1343
1374
        try:
1420
1451
                         'to/dir/b%2525z',
1421
1452
                         'to/bar',]))
1422
1453
 
 
1454
    def test_copy_tree_to_transport(self):
 
1455
        transport = self.get_transport()
 
1456
        if not transport.listable():
 
1457
            self.assertRaises(TransportNotPossible,
 
1458
                              transport.iter_files_recursive)
 
1459
            return
 
1460
        if transport.is_readonly():
 
1461
            return
 
1462
        self.build_tree(['from/',
 
1463
                         'from/dir/',
 
1464
                         'from/dir/foo',
 
1465
                         'from/dir/bar',
 
1466
                         'from/dir/b%25z', # make sure quoting is correct
 
1467
                         'from/bar'],
 
1468
                        transport=transport)
 
1469
        from_transport = transport.clone('from')
 
1470
        to_transport = transport.clone('to')
 
1471
        to_transport.ensure_base()
 
1472
        from_transport.copy_tree_to_transport(to_transport)
 
1473
        paths = set(transport.iter_files_recursive())
 
1474
        self.assertEqual(paths,
 
1475
                    set(['from/dir/foo',
 
1476
                         'from/dir/bar',
 
1477
                         'from/dir/b%2525z',
 
1478
                         'from/bar',
 
1479
                         'to/dir/foo',
 
1480
                         'to/dir/bar',
 
1481
                         'to/dir/b%2525z',
 
1482
                         'to/bar',]))
 
1483
 
1423
1484
    def test_unicode_paths(self):
1424
1485
        """Test that we can read/write files with Unicode names."""
1425
1486
        t = self.get_transport()
1459
1520
        transport.put_bytes('foo', 'bar')
1460
1521
        transport3 = self.get_transport()
1461
1522
        self.check_transport_contents('bar', transport3, 'foo')
1462
 
        # its base should be usable.
1463
 
        transport4 = get_transport(transport.base)
1464
 
        self.check_transport_contents('bar', transport4, 'foo')
1465
1523
 
1466
1524
        # now opening at a relative url should give use a sane result:
1467
1525
        transport.mkdir('newdir')
1468
 
        transport5 = get_transport(transport.base + "newdir")
 
1526
        transport5 = self.get_transport('newdir')
1469
1527
        transport6 = transport5.clone('..')
1470
1528
        self.check_transport_contents('bar', transport6, 'foo')
1471
1529
 
1546
1604
        content = osutils.rand_bytes(200*1024)
1547
1605
        content_size = len(content)
1548
1606
        if transport.is_readonly():
1549
 
            file('a', 'w').write(content)
 
1607
            self.build_tree_contents([('a', content)])
1550
1608
        else:
1551
1609
            transport.put_bytes('a', content)
1552
1610
        def check_result_data(result_vector):
1597
1655
            self.assertTrue(result[0][0] <= 400)
1598
1656
            self.assertTrue(result[0][0] + data_len >= 1034)
1599
1657
            check_result_data(result)
 
1658
 
 
1659
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1660
        transport = self.get_transport()
1600
1661
        # test from observed failure case.
1601
1662
        if transport.is_readonly():
1602
1663
            file('a', 'w').write('a'*1024*1024)