/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2018-11-16 23:15:15 UTC
  • mfrom: (7180 work)
  • mto: This revision was merged to the branch mainline in revision 7183.
  • Revision ID: jelmer@jelmer.uk-20181116231515-zqd2yn6kj8lfydyp
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
73
73
        transport._clear_protocol_handlers()
74
74
        transport.register_transport_proto('foo')
75
75
        transport.register_lazy_transport('foo',
76
 
                                            'breezy.tests.test_transport',
77
 
                                            'TestTransport.SampleHandler')
 
76
                                          'breezy.tests.test_transport',
 
77
                                          'TestTransport.SampleHandler')
78
78
        transport.register_transport_proto('bar')
79
79
        transport.register_lazy_transport('bar',
80
 
                                            'breezy.tests.test_transport',
81
 
                                            'TestTransport.SampleHandler')
 
80
                                          'breezy.tests.test_transport',
 
81
                                          'TestTransport.SampleHandler')
82
82
        self.assertEqual([SampleHandler.__module__,
83
 
                            'breezy.transport.chroot',
84
 
                            'breezy.transport.pathfilter'],
85
 
                            transport._get_transport_modules())
 
83
                          'breezy.transport.chroot',
 
84
                          'breezy.transport.pathfilter'],
 
85
                         transport._get_transport_modules())
86
86
 
87
87
    def test_transport_dependency(self):
88
88
        """Transport with missing dependency causes no error"""
96
96
        try:
97
97
            transport.get_transport_from_url('foo://fooserver/foo')
98
98
        except errors.UnsupportedProtocol as e:
99
 
            e_str = str(e)
100
99
            self.assertEqual('Unsupported protocol'
101
 
                                ' for url "foo://fooserver/foo":'
102
 
                                ' Unable to import library "some_lib":'
103
 
                                ' testing missing dependency', str(e))
 
100
                             ' for url "foo://fooserver/foo":'
 
101
                             ' Unable to import library "some_lib":'
 
102
                             ' testing missing dependency', str(e))
104
103
        else:
105
104
            self.fail('Did not raise UnsupportedProtocol')
106
105
 
122
121
        try:
123
122
            transport.get_transport_from_url('ssh://fooserver/foo')
124
123
        except errors.UnsupportedProtocol as e:
125
 
            e_str = str(e)
126
124
            self.assertEqual('Unsupported protocol'
127
 
                              ' for url "ssh://fooserver/foo":'
128
 
                              ' bzr supports bzr+ssh to operate over ssh,'
129
 
                              ' use "bzr+ssh://fooserver/foo".',
130
 
                              str(e))
 
125
                             ' for url "ssh://fooserver/foo":'
 
126
                             ' bzr supports bzr+ssh to operate over ssh,'
 
127
                             ' use "bzr+ssh://fooserver/foo".',
 
128
                             str(e))
131
129
        else:
132
130
            self.fail('Did not raise UnsupportedProtocol')
133
131
 
166
164
    def test_coalesce_unrelated(self):
167
165
        self.check([(0, 10, [(0, 10)]),
168
166
                    (20, 10, [(0, 10)]),
169
 
                   ], [(0, 10), (20, 10)])
 
167
                    ], [(0, 10), (20, 10)])
170
168
 
171
169
    def test_coalesce_unsorted(self):
172
170
        self.check([(20, 10, [(0, 10)]),
173
171
                    (0, 10, [(0, 10)]),
174
 
                   ], [(20, 10), (0, 10)])
 
172
                    ], [(20, 10), (0, 10)])
175
173
 
176
174
    def test_coalesce_nearby(self):
177
175
        self.check([(0, 20, [(0, 10), (10, 10)])],
179
177
 
180
178
    def test_coalesce_overlapped(self):
181
179
        self.assertRaises(ValueError,
182
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
183
 
                        [(0, 10), (5, 10)])
 
180
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
 
181
                          [(0, 10), (5, 10)])
184
182
 
185
183
    def test_coalesce_limit(self):
186
184
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
187
185
                              (30, 10), (40, 10)]),
188
186
                    (60, 50, [(0, 10), (10, 10), (20, 10),
189
187
                              (30, 10), (40, 10)]),
190
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
191
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
192
 
                       (90, 10), (100, 10)],
193
 
                    limit=5)
 
188
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
189
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
190
                        (90, 10), (100, 10)],
 
191
                   limit=5)
194
192
 
195
193
    def test_coalesce_no_limit(self):
196
194
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
197
195
                               (30, 10), (40, 10), (50, 10),
198
196
                               (60, 10), (70, 10), (80, 10),
199
197
                               (90, 10)]),
200
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
201
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
202
 
                       (90, 10), (100, 10)])
 
198
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
199
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
200
                        (90, 10), (100, 10)])
203
201
 
204
202
    def test_coalesce_fudge(self):
205
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
206
204
                    (100, 10, [(0, 10)]),
207
 
                   ], [(10, 10), (30, 10), (100, 10)],
 
205
                    ], [(10, 10), (30, 10), (100, 10)],
208
206
                   fudge=10)
209
207
 
210
208
    def test_coalesce_max_size(self):
212
210
                    (30, 50, [(0, 50)]),
213
211
                    # If one range is above max_size, it gets its own coalesced
214
212
                    # offset
215
 
                    (100, 80, [(0, 80)]),],
 
213
                    (100, 80, [(0, 80)]), ],
216
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
217
215
                   max_size=50)
218
216
 
219
217
    def test_coalesce_no_max_size(self):
220
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
221
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
222
 
                  )
 
220
                   )
223
221
 
224
222
    def test_coalesce_default_limit(self):
225
223
        # By default we use a 100MB max size.
226
224
        ten_mb = 10 * 1024 * 1024
227
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
228
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
229
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
230
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
232
 
                   max_size=1*1024*1024*1024)
 
225
        self.check(
 
226
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
227
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
 
228
            [(i * ten_mb, ten_mb) for i in range(11)])
 
229
        self.check(
 
230
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
231
            [(i * ten_mb, ten_mb) for i in range(11)],
 
232
            max_size=1 * 1024 * 1024 * 1024)
233
233
 
234
234
 
235
235
class TestMemoryServer(tests.TestCase):
457
457
        """Check all expected transport hook points are set up"""
458
458
        hookpoint = transport.TransportHooks()
459
459
        self.assertTrue("post_connect" in hookpoint,
460
 
            "post_connect not in %s" % (hookpoint,))
 
460
                        "post_connect not in %s" % (hookpoint,))
461
461
 
462
462
    def test_post_connect(self):
463
463
        """Ensure the post_connect hook is called when _set_transport is"""
464
464
        calls = []
465
465
        transport.Transport.hooks.install_named_hook("post_connect",
466
 
            calls.append, None)
 
466
                                                     calls.append, None)
467
467
        t = self._get_connected_transport()
468
468
        self.assertLength(0, calls)
469
469
        t._set_connection("connection", "auth")
492
492
        :param filter_func: by default this will be a no-op function.  Use this
493
493
            parameter to override it."""
494
494
        if filter_func is None:
495
 
            filter_func = lambda x: x
 
495
            def filter_func(x):
 
496
                return x
496
497
        server = pathfilter.PathFilteringServer(
497
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
498
            transport.get_transport_from_url('memory:///foo/bar/'),
 
499
            filter_func)
498
500
        server.start_server()
499
501
        self.addCleanup(server.stop_server)
500
502
        return transport.get_transport_from_url(server.get_url())
708
710
        t = transport.get_transport_from_path(self.test_dir)
709
711
        self.assertIsInstance(t, local.LocalTransport)
710
712
        self.assertEqual(t.base.rstrip("/"),
711
 
            urlutils.local_path_to_url(self.test_dir))
 
713
                         urlutils.local_path_to_url(self.test_dir))
712
714
 
713
715
    def test_with_url(self):
714
716
        t = transport.get_transport_from_path("file:")
715
717
        self.assertIsInstance(t, local.LocalTransport)
716
 
        self.assertEqual(t.base.rstrip("/"),
 
718
        self.assertEqual(
 
719
            t.base.rstrip("/"),
717
720
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
718
721
 
719
722
 
721
724
 
722
725
    def test_with_path(self):
723
726
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
724
 
            self.test_dir)
 
727
                          self.test_dir)
725
728
 
726
729
    def test_with_url(self):
727
730
        url = urlutils.local_path_to_url(self.test_dir)
730
733
        self.assertEqual(t.base.rstrip("/"), url)
731
734
 
732
735
    def test_with_url_and_segment_parameters(self):
733
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
736
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
734
737
        t = transport.get_transport_from_url(url)
735
738
        self.assertIsInstance(t, local.LocalTransport)
736
739
        self.assertEqual(t.base.rstrip("/"), url)
748
751
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
749
752
 
750
753
    def test_get_transport_from_relpath(self):
751
 
        here = osutils.abspath('.')
752
754
        t = transport.get_transport('.')
753
755
        self.assertIsInstance(t, local.LocalTransport)
754
756
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
778
780
        # See https://bugs.launchpad.net/bzr/+bug/606537
779
781
        here = osutils.abspath('.')
780
782
        t = transport.get_transport(here)
 
783
 
781
784
        def fake_chmod(path, mode):
782
785
            e = OSError('permission denied')
783
786
            e.errno = errno.EPERM
793
796
 
794
797
    def test_local_fdatasync_calls_fdatasync(self):
795
798
        """Check fdatasync on a stream tries to flush the data to the OS.
796
 
        
 
799
 
797
800
        We can't easily observe the external effect but we can at least see
798
801
        it's called.
799
802
        """
872
875
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
873
876
 
874
877
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
875
 
            'sub')
 
878
                         'sub')
876
879
        self.assertRaises(errors.PathNotChild, t.relpath,
877
880
                          'http://user@host.com/abs/path/sub')
878
881
        self.assertRaises(errors.PathNotChild, t.relpath,
922
925
 
923
926
    def test_reuse_same_transport(self):
924
927
        possible_transports = []
925
 
        t1 = transport.get_transport_from_url('http://foo/',
926
 
                                     possible_transports=possible_transports)
 
928
        t1 = transport.get_transport_from_url(
 
929
            'http://foo/', possible_transports=possible_transports)
927
930
        self.assertEqual([t1], possible_transports)
928
931
        t2 = transport.get_transport_from_url('http://foo/',
929
 
                                     possible_transports=[t1])
 
932
                                              possible_transports=[t1])
930
933
        self.assertIs(t1, t2)
931
934
 
932
935
        # Also check that final '/' are handled correctly
933
936
        t3 = transport.get_transport_from_url('http://foo/path/')
934
937
        t4 = transport.get_transport_from_url('http://foo/path',
935
 
                                     possible_transports=[t3])
 
938
                                              possible_transports=[t3])
936
939
        self.assertIs(t3, t4)
937
940
 
938
941
        t5 = transport.get_transport_from_url('http://foo/path')
939
942
        t6 = transport.get_transport_from_url('http://foo/path/',
940
 
                                     possible_transports=[t5])
 
943
                                              possible_transports=[t5])
941
944
        self.assertIs(t5, t6)
942
945
 
943
946
    def test_don_t_reuse_different_transport(self):
944
947
        t1 = transport.get_transport_from_url('http://foo/path')
945
948
        t2 = transport.get_transport_from_url('http://bar/path',
946
 
                                     possible_transports=[t1])
 
949
                                              possible_transports=[t1])
947
950
        self.assertIsNot(t1, t2)
948
951
 
949
952
 
1080
1083
        t.mkdir('foo')
1081
1084
 
1082
1085
        self.assertEqual(
1083
 
            [b'%s serve --inet --directory=/ --allow-writes' % bzr_remote_path.encode()],
 
1086
            [b'%s serve --inet --directory=/ --allow-writes' %
 
1087
                bzr_remote_path.encode()],
1084
1088
            self.command_executed)
1085
1089
        # Make sure to disconnect, so that the remote process can stop, and we
1086
1090
        # can cleanup. Then pause the test until everything is shutdown
1133
1137
 
1134
1138
    def test_unicode_url(self):
1135
1139
        self.assertRaises(urlutils.InvalidURL, location_to_url,
1136
 
            b"http://fo/\xc3\xaf".decode("utf-8"))
 
1140
                          b"http://fo/\xc3\xaf".decode("utf-8"))
1137
1141
 
1138
1142
    def test_unicode_path(self):
1139
1143
        path, url = self.get_base_location()
1147
1151
 
1148
1152
    def test_relative_file_url(self):
1149
1153
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1150
 
            location_to_url("file:bar"))
 
1154
                         location_to_url("file:bar"))
1151
1155
 
1152
1156
    def test_absolute_file_url(self):
1153
1157
        self.assertEqual("file:///bar", location_to_url("file:/bar"))