/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2011-07-15 13:52:32 UTC
  • mto: This revision was merged to the branch mainline in revision 6029.
  • Revision ID: v.ladeuil+lp@free.fr-20110715135232-yperkg15pllj369s
Also delete show_one_log which wasn't properly marked as deprecated but not used in the whole code base and simple enough to be inlined if needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
19
 
import os
20
19
import subprocess
21
20
import sys
22
21
import threading
31
30
from bzrlib.transport import (
32
31
    chroot,
33
32
    fakenfs,
 
33
    http,
34
34
    local,
35
35
    memory,
36
36
    pathfilter,
48
48
class TestTransport(tests.TestCase):
49
49
    """Test the non transport-concrete class functionality."""
50
50
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
51
    def test__get_set_protocol_handlers(self):
55
52
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
53
        self.assertNotEqual([], handlers.keys())
 
54
        transport._clear_protocol_handlers()
 
55
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
56
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
57
 
63
58
    def test_get_transport_modules(self):
64
59
        handlers = transport._get_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
61
        # don't pollute the current handlers
66
62
        transport._clear_protocol_handlers()
 
63
 
67
64
        class SampleHandler(object):
68
65
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
66
        transport._clear_protocol_handlers()
 
67
        transport.register_transport_proto('foo')
 
68
        transport.register_lazy_transport('foo',
 
69
                                            'bzrlib.tests.test_transport',
 
70
                                            'TestTransport.SampleHandler')
 
71
        transport.register_transport_proto('bar')
 
72
        transport.register_lazy_transport('bar',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        self.assertEqual([SampleHandler.__module__,
 
76
                            'bzrlib.transport.chroot',
 
77
                            'bzrlib.transport.pathfilter'],
 
78
                            transport._get_transport_modules())
85
79
 
86
80
    def test_transport_dependency(self):
87
81
        """Transport with missing dependency causes no error"""
88
82
        saved_handlers = transport._get_protocol_handlers()
 
83
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
84
        # don't pollute the current handlers
90
85
        transport._clear_protocol_handlers()
 
86
        transport.register_transport_proto('foo')
 
87
        transport.register_lazy_transport(
 
88
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
91
89
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
90
            transport.get_transport('foo://fooserver/foo')
 
91
        except errors.UnsupportedProtocol, e:
 
92
            e_str = str(e)
 
93
            self.assertEquals('Unsupported protocol'
 
94
                                ' for url "foo://fooserver/foo":'
 
95
                                ' Unable to import library "some_lib":'
 
96
                                ' testing missing dependency', str(e))
 
97
        else:
 
98
            self.fail('Did not raise UnsupportedProtocol')
108
99
 
109
100
    def test_transport_fallback(self):
110
101
        """Transport with missing dependency causes no error"""
111
102
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
103
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
104
        transport._clear_protocol_handlers()
 
105
        transport.register_transport_proto('foo')
 
106
        transport.register_lazy_transport(
 
107
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
108
        transport.register_lazy_transport(
 
109
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
110
        t = transport.get_transport('foo://fooserver/foo')
 
111
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
112
 
124
113
    def test_ssh_hints(self):
125
114
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
218
207
 
219
208
    def test_coalesce_fudge(self):
220
209
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
210
                    (100, 10, [(0, 10)]),
222
211
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
212
                   fudge=10)
 
213
 
225
214
    def test_coalesce_max_size(self):
226
215
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
216
                    (30, 50, [(0, 50)]),
228
217
                    # If one range is above max_size, it gets its own coalesced
229
218
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
219
                    (100, 80, [(0, 80)]),],
231
220
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
221
                   max_size=50)
234
222
 
235
223
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
224
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
225
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
226
                  )
239
227
 
240
228
    def test_coalesce_default_limit(self):
241
229
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
230
        ten_mb = 10 * 1024 * 1024
 
231
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
232
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
233
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
234
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
235
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
236
                   max_size=1*1024*1024*1024)
249
237
 
250
238
 
422
410
            parent_url = urlutils.join(url, '..')
423
411
            new_t = transport.get_transport(parent_url)
424
412
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
413
        server = chroot.ChrootServer(
 
414
            transport.get_transport('memory:///path/'))
426
415
        self.start_server(server)
427
416
        t = transport.get_transport(server.get_url())
428
417
        self.assertRaises(
440
429
        backing_transport = memory.MemoryTransport()
441
430
        server = chroot.ChrootServer(backing_transport)
442
431
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
432
        self.addCleanup(server.stop_server)
 
433
        self.assertTrue(server.scheme
 
434
                        in transport._get_protocol_handlers().keys())
448
435
 
449
436
    def test_stop_server(self):
450
437
        backing_transport = memory.MemoryTransport()
458
445
        backing_transport = memory.MemoryTransport()
459
446
        server = chroot.ChrootServer(backing_transport)
460
447
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
448
        self.addCleanup(server.stop_server)
 
449
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
465
450
 
466
451
 
467
452
class PathFilteringDecoratorTransportTest(tests.TestCase):
482
467
 
483
468
    def make_pf_transport(self, filter_func=None):
484
469
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
470
 
486
471
        :param filter_func: by default this will be a no-op function.  Use this
487
472
            parameter to override it."""
488
473
        if filter_func is None:
510
495
 
511
496
    def test_filter_invocation(self):
512
497
        filter_log = []
 
498
 
513
499
        def filter(path):
514
500
            filter_log.append(path)
515
501
            return path
563
549
        server = HttpServer()
564
550
        self.start_server(server)
565
551
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
552
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
553
        self.assertEqual(False, t.listable())
568
554
        self.assertEqual(True, t.is_readonly())
569
555
 
746
732
        self.assertEquals(t._host, 'simple.example.com')
747
733
        self.assertEquals(t._port, None)
748
734
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
735
        self.assertTrue(t._user is None)
 
736
        self.assertTrue(t._password is None)
751
737
 
752
738
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
753
739
 
776
762
    def test_relpath(self):
777
763
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
764
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
765
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
766
            'sub')
780
767
        self.assertRaises(errors.PathNotChild, t.relpath,
781
768
                          'http://user@host.com/abs/path/sub')
782
769
        self.assertRaises(errors.PathNotChild, t.relpath,
855
842
 
856
843
    def test_get(self):
857
844
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
845
        self.assertIsInstance(
 
846
            t, bzrlib.transport.trace.TransportTraceDecorator)
859
847
 
860
848
    def test_clone_preserves_activity(self):
861
849
        t = transport.get_transport('trace+memory://')
896
884
class TestSSHConnections(tests.TestCaseWithTransport):
897
885
 
898
886
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
887
        """get_transport of a bzr+ssh:// behaves correctly.
900
888
 
901
889
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
890
        """
918
906
        # SSH channel ourselves.  Surely this has already been implemented
919
907
        # elsewhere?
920
908
        started = []
 
909
 
921
910
        class StubSSHServer(stub_sftp.StubServer):
922
911
 
923
912
            test = self
929
918
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
930
919
 
931
920
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
921
                # Start a thread for each of stdin/out/err, and relay bytes
 
922
                # from the subprocess to channel and vice versa.
934
923
                def ferry_bytes(read, write, close):
935
924
                    while True:
936
925
                        bytes = read(1)
955
944
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
945
        # We *don't* want to override the default SSH vendor: the detected one
957
946
        # is the one to use.
 
947
 
 
948
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
949
        # inherits from SFTPServer which forces the SSH vendor to
 
950
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
951
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
952
        port = ssh_server.port
960
953
 
961
954
        if sys.platform == 'win32':
962
955
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
963
956
        else:
964
957
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
958
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
959
 
967
960
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
961
        # variable is used to tell bzr what command to run on the remote end.
989
982
        # And the rest are threads
990
983
        for t in started[1:]:
991
984
            t.join()
 
985
 
 
986
 
 
987
class TestUnhtml(tests.TestCase):
 
988
 
 
989
    """Tests for unhtml_roughly"""
 
990
 
 
991
    def test_truncation(self):
 
992
        fake_html = "<p>something!\n" * 1000
 
993
        result = http.unhtml_roughly(fake_html)
 
994
        self.assertEquals(len(result), 1000)
 
995
        self.assertStartsWith(result, " something!")