/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2018-11-11 04:08:32 UTC
  • mto: (7143.16.20 even-more-cleanups)
  • mto: This revision was merged to the branch mainline in revision 7175.
  • Revision ID: jelmer@jelmer.uk-20181111040832-nsljjynzzwmznf3h
Run autopep8.

Show diffs side-by-side

added added

removed removed

Lines of Context:
73
73
        transport._clear_protocol_handlers()
74
74
        transport.register_transport_proto('foo')
75
75
        transport.register_lazy_transport('foo',
76
 
                                            'breezy.tests.test_transport',
77
 
                                            'TestTransport.SampleHandler')
 
76
                                          'breezy.tests.test_transport',
 
77
                                          'TestTransport.SampleHandler')
78
78
        transport.register_transport_proto('bar')
79
79
        transport.register_lazy_transport('bar',
80
 
                                            'breezy.tests.test_transport',
81
 
                                            'TestTransport.SampleHandler')
 
80
                                          'breezy.tests.test_transport',
 
81
                                          'TestTransport.SampleHandler')
82
82
        self.assertEqual([SampleHandler.__module__,
83
 
                            'breezy.transport.chroot',
84
 
                            'breezy.transport.pathfilter'],
85
 
                            transport._get_transport_modules())
 
83
                          'breezy.transport.chroot',
 
84
                          'breezy.transport.pathfilter'],
 
85
                         transport._get_transport_modules())
86
86
 
87
87
    def test_transport_dependency(self):
88
88
        """Transport with missing dependency causes no error"""
98
98
        except errors.UnsupportedProtocol as e:
99
99
            e_str = str(e)
100
100
            self.assertEqual('Unsupported protocol'
101
 
                                ' for url "foo://fooserver/foo":'
102
 
                                ' Unable to import library "some_lib":'
103
 
                                ' testing missing dependency', str(e))
 
101
                             ' for url "foo://fooserver/foo":'
 
102
                             ' Unable to import library "some_lib":'
 
103
                             ' testing missing dependency', str(e))
104
104
        else:
105
105
            self.fail('Did not raise UnsupportedProtocol')
106
106
 
124
124
        except errors.UnsupportedProtocol as e:
125
125
            e_str = str(e)
126
126
            self.assertEqual('Unsupported protocol'
127
 
                              ' for url "ssh://fooserver/foo":'
128
 
                              ' bzr supports bzr+ssh to operate over ssh,'
129
 
                              ' use "bzr+ssh://fooserver/foo".',
130
 
                              str(e))
 
127
                             ' for url "ssh://fooserver/foo":'
 
128
                             ' bzr supports bzr+ssh to operate over ssh,'
 
129
                             ' use "bzr+ssh://fooserver/foo".',
 
130
                             str(e))
131
131
        else:
132
132
            self.fail('Did not raise UnsupportedProtocol')
133
133
 
166
166
    def test_coalesce_unrelated(self):
167
167
        self.check([(0, 10, [(0, 10)]),
168
168
                    (20, 10, [(0, 10)]),
169
 
                   ], [(0, 10), (20, 10)])
 
169
                    ], [(0, 10), (20, 10)])
170
170
 
171
171
    def test_coalesce_unsorted(self):
172
172
        self.check([(20, 10, [(0, 10)]),
173
173
                    (0, 10, [(0, 10)]),
174
 
                   ], [(20, 10), (0, 10)])
 
174
                    ], [(20, 10), (0, 10)])
175
175
 
176
176
    def test_coalesce_nearby(self):
177
177
        self.check([(0, 20, [(0, 10), (10, 10)])],
179
179
 
180
180
    def test_coalesce_overlapped(self):
181
181
        self.assertRaises(ValueError,
182
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
183
 
                        [(0, 10), (5, 10)])
 
182
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
 
183
                          [(0, 10), (5, 10)])
184
184
 
185
185
    def test_coalesce_limit(self):
186
186
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
187
187
                              (30, 10), (40, 10)]),
188
188
                    (60, 50, [(0, 10), (10, 10), (20, 10),
189
189
                              (30, 10), (40, 10)]),
190
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
191
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
192
 
                       (90, 10), (100, 10)],
193
 
                    limit=5)
 
190
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
191
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
192
                        (90, 10), (100, 10)],
 
193
                   limit=5)
194
194
 
195
195
    def test_coalesce_no_limit(self):
196
196
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
197
197
                               (30, 10), (40, 10), (50, 10),
198
198
                               (60, 10), (70, 10), (80, 10),
199
199
                               (90, 10)]),
200
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
201
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
202
 
                       (90, 10), (100, 10)])
 
200
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
201
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
202
                        (90, 10), (100, 10)])
203
203
 
204
204
    def test_coalesce_fudge(self):
205
205
        self.check([(10, 30, [(0, 10), (20, 10)]),
206
206
                    (100, 10, [(0, 10)]),
207
 
                   ], [(10, 10), (30, 10), (100, 10)],
 
207
                    ], [(10, 10), (30, 10), (100, 10)],
208
208
                   fudge=10)
209
209
 
210
210
    def test_coalesce_max_size(self):
212
212
                    (30, 50, [(0, 50)]),
213
213
                    # If one range is above max_size, it gets its own coalesced
214
214
                    # offset
215
 
                    (100, 80, [(0, 80)]),],
 
215
                    (100, 80, [(0, 80)]), ],
216
216
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
217
217
                   max_size=50)
218
218
 
219
219
    def test_coalesce_no_max_size(self):
220
220
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
221
221
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
222
 
                  )
 
222
                   )
223
223
 
224
224
    def test_coalesce_default_limit(self):
225
225
        # By default we use a 100MB max size.
226
226
        ten_mb = 10 * 1024 * 1024
227
227
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
228
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
229
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
228
                    (10 * ten_mb, ten_mb, [(0, ten_mb)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)])
230
230
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
231
                   [(i * ten_mb, ten_mb) for i in range(11)],
232
 
                   max_size=1*1024*1024*1024)
 
232
                   max_size=1 * 1024 * 1024 * 1024)
233
233
 
234
234
 
235
235
class TestMemoryServer(tests.TestCase):
457
457
        """Check all expected transport hook points are set up"""
458
458
        hookpoint = transport.TransportHooks()
459
459
        self.assertTrue("post_connect" in hookpoint,
460
 
            "post_connect not in %s" % (hookpoint,))
 
460
                        "post_connect not in %s" % (hookpoint,))
461
461
 
462
462
    def test_post_connect(self):
463
463
        """Ensure the post_connect hook is called when _set_transport is"""
464
464
        calls = []
465
465
        transport.Transport.hooks.install_named_hook("post_connect",
466
 
            calls.append, None)
 
466
                                                     calls.append, None)
467
467
        t = self._get_connected_transport()
468
468
        self.assertLength(0, calls)
469
469
        t._set_connection("connection", "auth")
492
492
        :param filter_func: by default this will be a no-op function.  Use this
493
493
            parameter to override it."""
494
494
        if filter_func is None:
495
 
            filter_func = lambda x: x
 
495
            def filter_func(x): return x
496
496
        server = pathfilter.PathFilteringServer(
497
497
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
498
498
        server.start_server()
708
708
        t = transport.get_transport_from_path(self.test_dir)
709
709
        self.assertIsInstance(t, local.LocalTransport)
710
710
        self.assertEqual(t.base.rstrip("/"),
711
 
            urlutils.local_path_to_url(self.test_dir))
 
711
                         urlutils.local_path_to_url(self.test_dir))
712
712
 
713
713
    def test_with_url(self):
714
714
        t = transport.get_transport_from_path("file:")
715
715
        self.assertIsInstance(t, local.LocalTransport)
716
716
        self.assertEqual(t.base.rstrip("/"),
717
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
717
                         urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
718
718
 
719
719
 
720
720
class TestTransportFromUrl(tests.TestCaseInTempDir):
721
721
 
722
722
    def test_with_path(self):
723
723
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
724
 
            self.test_dir)
 
724
                          self.test_dir)
725
725
 
726
726
    def test_with_url(self):
727
727
        url = urlutils.local_path_to_url(self.test_dir)
730
730
        self.assertEqual(t.base.rstrip("/"), url)
731
731
 
732
732
    def test_with_url_and_segment_parameters(self):
733
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
733
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
734
734
        t = transport.get_transport_from_url(url)
735
735
        self.assertIsInstance(t, local.LocalTransport)
736
736
        self.assertEqual(t.base.rstrip("/"), url)
778
778
        # See https://bugs.launchpad.net/bzr/+bug/606537
779
779
        here = osutils.abspath('.')
780
780
        t = transport.get_transport(here)
 
781
 
781
782
        def fake_chmod(path, mode):
782
783
            e = OSError('permission denied')
783
784
            e.errno = errno.EPERM
793
794
 
794
795
    def test_local_fdatasync_calls_fdatasync(self):
795
796
        """Check fdatasync on a stream tries to flush the data to the OS.
796
 
        
 
797
 
797
798
        We can't easily observe the external effect but we can at least see
798
799
        it's called.
799
800
        """
872
873
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
873
874
 
874
875
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
875
 
            'sub')
 
876
                         'sub')
876
877
        self.assertRaises(errors.PathNotChild, t.relpath,
877
878
                          'http://user@host.com/abs/path/sub')
878
879
        self.assertRaises(errors.PathNotChild, t.relpath,
923
924
    def test_reuse_same_transport(self):
924
925
        possible_transports = []
925
926
        t1 = transport.get_transport_from_url('http://foo/',
926
 
                                     possible_transports=possible_transports)
 
927
                                              possible_transports=possible_transports)
927
928
        self.assertEqual([t1], possible_transports)
928
929
        t2 = transport.get_transport_from_url('http://foo/',
929
 
                                     possible_transports=[t1])
 
930
                                              possible_transports=[t1])
930
931
        self.assertIs(t1, t2)
931
932
 
932
933
        # Also check that final '/' are handled correctly
933
934
        t3 = transport.get_transport_from_url('http://foo/path/')
934
935
        t4 = transport.get_transport_from_url('http://foo/path',
935
 
                                     possible_transports=[t3])
 
936
                                              possible_transports=[t3])
936
937
        self.assertIs(t3, t4)
937
938
 
938
939
        t5 = transport.get_transport_from_url('http://foo/path')
939
940
        t6 = transport.get_transport_from_url('http://foo/path/',
940
 
                                     possible_transports=[t5])
 
941
                                              possible_transports=[t5])
941
942
        self.assertIs(t5, t6)
942
943
 
943
944
    def test_don_t_reuse_different_transport(self):
944
945
        t1 = transport.get_transport_from_url('http://foo/path')
945
946
        t2 = transport.get_transport_from_url('http://bar/path',
946
 
                                     possible_transports=[t1])
 
947
                                              possible_transports=[t1])
947
948
        self.assertIsNot(t1, t2)
948
949
 
949
950
 
1080
1081
        t.mkdir('foo')
1081
1082
 
1082
1083
        self.assertEqual(
1083
 
            [b'%s serve --inet --directory=/ --allow-writes' % bzr_remote_path.encode()],
 
1084
            [b'%s serve --inet --directory=/ --allow-writes' %
 
1085
                bzr_remote_path.encode()],
1084
1086
            self.command_executed)
1085
1087
        # Make sure to disconnect, so that the remote process can stop, and we
1086
1088
        # can cleanup. Then pause the test until everything is shutdown
1133
1135
 
1134
1136
    def test_unicode_url(self):
1135
1137
        self.assertRaises(urlutils.InvalidURL, location_to_url,
1136
 
            b"http://fo/\xc3\xaf".decode("utf-8"))
 
1138
                          b"http://fo/\xc3\xaf".decode("utf-8"))
1137
1139
 
1138
1140
    def test_unicode_path(self):
1139
1141
        path, url = self.get_base_location()
1147
1149
 
1148
1150
    def test_relative_file_url(self):
1149
1151
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1150
 
            location_to_url("file:bar"))
 
1152
                         location_to_url("file:bar"))
1151
1153
 
1152
1154
    def test_absolute_file_url(self):
1153
1155
        self.assertEqual("file:///bar", location_to_url("file:/bar"))