/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2011-07-23 16:33:38 UTC
  • mto: This revision was merged to the branch mainline in revision 6045.
  • Revision ID: jelmer@samba.org-20110723163338-mjox55g52kt3u922
Add get_transport_from_url and get_transport_from_path functions.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
19
 
import os
20
19
import subprocess
21
20
import sys
22
21
import threading
28
27
    transport,
29
28
    urlutils,
30
29
    )
 
30
from bzrlib.directory_service import directories
31
31
from bzrlib.transport import (
32
32
    chroot,
33
33
    fakenfs,
 
34
    http,
34
35
    local,
 
36
    location_to_url,
35
37
    memory,
36
38
    pathfilter,
37
39
    readonly,
38
40
    )
 
41
import bzrlib.transport.trace
39
42
from bzrlib.tests import (
40
43
    features,
41
44
    test_server,
48
51
class TestTransport(tests.TestCase):
49
52
    """Test the non transport-concrete class functionality."""
50
53
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
54
    def test__get_set_protocol_handlers(self):
55
55
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
56
        self.assertNotEqual([], handlers.keys())
 
57
        transport._clear_protocol_handlers()
 
58
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
59
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
60
 
63
61
    def test_get_transport_modules(self):
64
62
        handlers = transport._get_protocol_handlers()
 
63
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
64
        # don't pollute the current handlers
66
65
        transport._clear_protocol_handlers()
 
66
 
67
67
        class SampleHandler(object):
68
68
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
69
        transport._clear_protocol_handlers()
 
70
        transport.register_transport_proto('foo')
 
71
        transport.register_lazy_transport('foo',
 
72
                                            'bzrlib.tests.test_transport',
 
73
                                            'TestTransport.SampleHandler')
 
74
        transport.register_transport_proto('bar')
 
75
        transport.register_lazy_transport('bar',
 
76
                                            'bzrlib.tests.test_transport',
 
77
                                            'TestTransport.SampleHandler')
 
78
        self.assertEqual([SampleHandler.__module__,
 
79
                            'bzrlib.transport.chroot',
 
80
                            'bzrlib.transport.pathfilter'],
 
81
                            transport._get_transport_modules())
85
82
 
86
83
    def test_transport_dependency(self):
87
84
        """Transport with missing dependency causes no error"""
88
85
        saved_handlers = transport._get_protocol_handlers()
 
86
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
87
        # don't pollute the current handlers
90
88
        transport._clear_protocol_handlers()
 
89
        transport.register_transport_proto('foo')
 
90
        transport.register_lazy_transport(
 
91
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
91
92
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
93
            transport.get_transport('foo://fooserver/foo')
 
94
        except errors.UnsupportedProtocol, e:
 
95
            e_str = str(e)
 
96
            self.assertEquals('Unsupported protocol'
 
97
                                ' for url "foo://fooserver/foo":'
 
98
                                ' Unable to import library "some_lib":'
 
99
                                ' testing missing dependency', str(e))
 
100
        else:
 
101
            self.fail('Did not raise UnsupportedProtocol')
108
102
 
109
103
    def test_transport_fallback(self):
110
104
        """Transport with missing dependency causes no error"""
111
105
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
106
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
107
        transport._clear_protocol_handlers()
 
108
        transport.register_transport_proto('foo')
 
109
        transport.register_lazy_transport(
 
110
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
113
        t = transport.get_transport('foo://fooserver/foo')
 
114
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
115
 
124
116
    def test_ssh_hints(self):
125
117
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
218
210
 
219
211
    def test_coalesce_fudge(self):
220
212
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
213
                    (100, 10, [(0, 10)]),
222
214
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
215
                   fudge=10)
 
216
 
225
217
    def test_coalesce_max_size(self):
226
218
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
219
                    (30, 50, [(0, 50)]),
228
220
                    # If one range is above max_size, it gets its own coalesced
229
221
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
222
                    (100, 80, [(0, 80)]),],
231
223
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
224
                   max_size=50)
234
225
 
235
226
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
227
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
228
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
229
                  )
239
230
 
240
231
    def test_coalesce_default_limit(self):
241
232
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
233
        ten_mb = 10 * 1024 * 1024
 
234
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
235
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
236
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
237
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
238
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
239
                   max_size=1*1024*1024*1024)
249
240
 
250
241
 
422
413
            parent_url = urlutils.join(url, '..')
423
414
            new_t = transport.get_transport(parent_url)
424
415
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
416
        server = chroot.ChrootServer(
 
417
            transport.get_transport('memory:///path/'))
426
418
        self.start_server(server)
427
419
        t = transport.get_transport(server.get_url())
428
420
        self.assertRaises(
440
432
        backing_transport = memory.MemoryTransport()
441
433
        server = chroot.ChrootServer(backing_transport)
442
434
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
435
        self.addCleanup(server.stop_server)
 
436
        self.assertTrue(server.scheme
 
437
                        in transport._get_protocol_handlers().keys())
448
438
 
449
439
    def test_stop_server(self):
450
440
        backing_transport = memory.MemoryTransport()
458
448
        backing_transport = memory.MemoryTransport()
459
449
        server = chroot.ChrootServer(backing_transport)
460
450
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
451
        self.addCleanup(server.stop_server)
 
452
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
465
453
 
466
454
 
467
455
class PathFilteringDecoratorTransportTest(tests.TestCase):
482
470
 
483
471
    def make_pf_transport(self, filter_func=None):
484
472
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
473
 
486
474
        :param filter_func: by default this will be a no-op function.  Use this
487
475
            parameter to override it."""
488
476
        if filter_func is None:
510
498
 
511
499
    def test_filter_invocation(self):
512
500
        filter_log = []
 
501
 
513
502
        def filter(path):
514
503
            filter_log.append(path)
515
504
            return path
563
552
        server = HttpServer()
564
553
        self.start_server(server)
565
554
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
555
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
556
        self.assertEqual(False, t.listable())
568
557
        self.assertEqual(True, t.is_readonly())
569
558
 
684
673
        base_url = self._server.get_url()
685
674
        url = self._adjust_url(base_url, relpath)
686
675
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
676
        t = transport.get_transport_from_url(url)
688
677
        # vila--20070607 if the following are commented out the test suite
689
678
        # still pass. Is this really still needed or was it a forgotten
690
679
        # temporary fix ?
746
735
        self.assertEquals(t._host, 'simple.example.com')
747
736
        self.assertEquals(t._port, None)
748
737
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
738
        self.assertTrue(t._user is None)
 
739
        self.assertTrue(t._password is None)
751
740
 
752
741
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
753
742
 
776
765
    def test_relpath(self):
777
766
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
767
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
768
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
769
            'sub')
780
770
        self.assertRaises(errors.PathNotChild, t.relpath,
781
771
                          'http://user@host.com/abs/path/sub')
782
772
        self.assertRaises(errors.PathNotChild, t.relpath,
853
843
 
854
844
class TestTransportTrace(tests.TestCase):
855
845
 
856
 
    def test_get(self):
 
846
    def test_decorator(self):
857
847
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
848
        self.assertIsInstance(
 
849
            t, bzrlib.transport.trace.TransportTraceDecorator)
859
850
 
860
851
    def test_clone_preserves_activity(self):
861
852
        t = transport.get_transport('trace+memory://')
896
887
class TestSSHConnections(tests.TestCaseWithTransport):
897
888
 
898
889
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
890
        """get_transport of a bzr+ssh:// behaves correctly.
900
891
 
901
892
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
893
        """
918
909
        # SSH channel ourselves.  Surely this has already been implemented
919
910
        # elsewhere?
920
911
        started = []
 
912
 
921
913
        class StubSSHServer(stub_sftp.StubServer):
922
914
 
923
915
            test = self
929
921
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
930
922
 
931
923
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
924
                # Start a thread for each of stdin/out/err, and relay bytes
 
925
                # from the subprocess to channel and vice versa.
934
926
                def ferry_bytes(read, write, close):
935
927
                    while True:
936
928
                        bytes = read(1)
955
947
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
948
        # We *don't* want to override the default SSH vendor: the detected one
957
949
        # is the one to use.
 
950
 
 
951
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
952
        # inherits from SFTPServer which forces the SSH vendor to
 
953
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
954
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
955
        port = ssh_server.port
960
956
 
961
957
        if sys.platform == 'win32':
962
958
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
963
959
        else:
964
960
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
961
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
962
 
967
963
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
964
        # variable is used to tell bzr what command to run on the remote end.
989
985
        # And the rest are threads
990
986
        for t in started[1:]:
991
987
            t.join()
 
988
 
 
989
 
 
990
class TestUnhtml(tests.TestCase):
 
991
 
 
992
    """Tests for unhtml_roughly"""
 
993
 
 
994
    def test_truncation(self):
 
995
        fake_html = "<p>something!\n" * 1000
 
996
        result = http.unhtml_roughly(fake_html)
 
997
        self.assertEquals(len(result), 1000)
 
998
        self.assertStartsWith(result, " something!")
 
999
 
 
1000
 
 
1001
class SomeDirectory(object):
 
1002
 
 
1003
    def look_up(self, name, url):
 
1004
        return "http://bar"
 
1005
 
 
1006
 
 
1007
class TestLocationToUrl(tests.TestCase):
 
1008
 
 
1009
    def test_regular_url(self):
 
1010
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1011
 
 
1012
    def test_directory(self):
 
1013
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1014
        self.addCleanup(directories.remove, "bar:")
 
1015
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1016
 
 
1017
    def test_unicode_url(self):
 
1018
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1019
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1020
 
 
1021
    def test_unicode_path(self):
 
1022
        self.assertEquals("file:///foo/bar%C3%AF",
 
1023
            location_to_url("/foo/bar\xc3\xaf".decode("utf-8")))
 
1024
 
 
1025
    def test_path(self):
 
1026
        self.assertEquals("file:///foo/bar", location_to_url("/foo/bar"))