/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-06 02:13:25 UTC
  • mfrom: (7490.7.21 work)
  • mto: This revision was merged to the branch mainline in revision 7501.
  • Revision ID: jelmer@jelmer.uk-20200506021325-awbmmqu1zyorz7sj
Merge 3.1 branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
import errno
 
19
from io import BytesIO
19
20
import os
20
21
import subprocess
21
22
import sys
28
29
    transport,
29
30
    urlutils,
30
31
    )
31
 
from ..directory_service import directories
32
 
from ..sixish import (
33
 
    BytesIO,
34
 
    )
35
32
from ..transport import (
36
33
    chroot,
37
34
    fakenfs,
38
35
    http,
39
36
    local,
40
 
    location_to_url,
41
37
    memory,
42
38
    pathfilter,
43
39
    readonly,
73
69
        transport._clear_protocol_handlers()
74
70
        transport.register_transport_proto('foo')
75
71
        transport.register_lazy_transport('foo',
76
 
                                            'breezy.tests.test_transport',
77
 
                                            'TestTransport.SampleHandler')
 
72
                                          'breezy.tests.test_transport',
 
73
                                          'TestTransport.SampleHandler')
78
74
        transport.register_transport_proto('bar')
79
75
        transport.register_lazy_transport('bar',
80
 
                                            'breezy.tests.test_transport',
81
 
                                            'TestTransport.SampleHandler')
 
76
                                          'breezy.tests.test_transport',
 
77
                                          'TestTransport.SampleHandler')
82
78
        self.assertEqual([SampleHandler.__module__,
83
 
                            'breezy.transport.chroot',
84
 
                            'breezy.transport.pathfilter'],
85
 
                            transport._get_transport_modules())
 
79
                          'breezy.transport.chroot',
 
80
                          'breezy.transport.pathfilter'],
 
81
                         transport._get_transport_modules())
86
82
 
87
83
    def test_transport_dependency(self):
88
84
        """Transport with missing dependency causes no error"""
96
92
        try:
97
93
            transport.get_transport_from_url('foo://fooserver/foo')
98
94
        except errors.UnsupportedProtocol as e:
99
 
            e_str = str(e)
100
95
            self.assertEqual('Unsupported protocol'
101
 
                                ' for url "foo://fooserver/foo":'
102
 
                                ' Unable to import library "some_lib":'
103
 
                                ' testing missing dependency', str(e))
 
96
                             ' for url "foo://fooserver/foo":'
 
97
                             ' Unable to import library "some_lib":'
 
98
                             ' testing missing dependency', str(e))
104
99
        else:
105
100
            self.fail('Did not raise UnsupportedProtocol')
106
101
 
122
117
        try:
123
118
            transport.get_transport_from_url('ssh://fooserver/foo')
124
119
        except errors.UnsupportedProtocol as e:
125
 
            e_str = str(e)
126
 
            self.assertEqual('Unsupported protocol'
127
 
                              ' for url "ssh://fooserver/foo":'
128
 
                              ' bzr supports bzr+ssh to operate over ssh,'
129
 
                              ' use "bzr+ssh://fooserver/foo".',
130
 
                              str(e))
 
120
            self.assertEqual(
 
121
                'Unsupported protocol'
 
122
                ' for url "ssh://fooserver/foo":'
 
123
                ' Use bzr+ssh for Bazaar operations over SSH, '
 
124
                'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
 
125
                'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
 
126
                str(e))
131
127
        else:
132
128
            self.fail('Did not raise UnsupportedProtocol')
133
129
 
166
162
    def test_coalesce_unrelated(self):
167
163
        self.check([(0, 10, [(0, 10)]),
168
164
                    (20, 10, [(0, 10)]),
169
 
                   ], [(0, 10), (20, 10)])
 
165
                    ], [(0, 10), (20, 10)])
170
166
 
171
167
    def test_coalesce_unsorted(self):
172
168
        self.check([(20, 10, [(0, 10)]),
173
169
                    (0, 10, [(0, 10)]),
174
 
                   ], [(20, 10), (0, 10)])
 
170
                    ], [(20, 10), (0, 10)])
175
171
 
176
172
    def test_coalesce_nearby(self):
177
173
        self.check([(0, 20, [(0, 10), (10, 10)])],
179
175
 
180
176
    def test_coalesce_overlapped(self):
181
177
        self.assertRaises(ValueError,
182
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
183
 
                        [(0, 10), (5, 10)])
 
178
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
 
179
                          [(0, 10), (5, 10)])
184
180
 
185
181
    def test_coalesce_limit(self):
186
182
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
187
183
                              (30, 10), (40, 10)]),
188
184
                    (60, 50, [(0, 10), (10, 10), (20, 10),
189
185
                              (30, 10), (40, 10)]),
190
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
191
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
192
 
                       (90, 10), (100, 10)],
193
 
                    limit=5)
 
186
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
187
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
188
                        (90, 10), (100, 10)],
 
189
                   limit=5)
194
190
 
195
191
    def test_coalesce_no_limit(self):
196
192
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
197
193
                               (30, 10), (40, 10), (50, 10),
198
194
                               (60, 10), (70, 10), (80, 10),
199
195
                               (90, 10)]),
200
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
201
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
202
 
                       (90, 10), (100, 10)])
 
196
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
197
                        (50, 10), (60, 10), (70, 10), (80, 10),
 
198
                        (90, 10), (100, 10)])
203
199
 
204
200
    def test_coalesce_fudge(self):
205
201
        self.check([(10, 30, [(0, 10), (20, 10)]),
206
202
                    (100, 10, [(0, 10)]),
207
 
                   ], [(10, 10), (30, 10), (100, 10)],
 
203
                    ], [(10, 10), (30, 10), (100, 10)],
208
204
                   fudge=10)
209
205
 
210
206
    def test_coalesce_max_size(self):
212
208
                    (30, 50, [(0, 50)]),
213
209
                    # If one range is above max_size, it gets its own coalesced
214
210
                    # offset
215
 
                    (100, 80, [(0, 80)]),],
 
211
                    (100, 80, [(0, 80)]), ],
216
212
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
217
213
                   max_size=50)
218
214
 
219
215
    def test_coalesce_no_max_size(self):
220
216
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
221
217
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
222
 
                  )
 
218
                   )
223
219
 
224
220
    def test_coalesce_default_limit(self):
225
221
        # By default we use a 100MB max size.
226
222
        ten_mb = 10 * 1024 * 1024
227
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
228
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
229
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
230
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
232
 
                   max_size=1*1024*1024*1024)
 
223
        self.check(
 
224
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
225
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
 
226
            [(i * ten_mb, ten_mb) for i in range(11)])
 
227
        self.check(
 
228
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
            [(i * ten_mb, ten_mb) for i in range(11)],
 
230
            max_size=1 * 1024 * 1024 * 1024)
233
231
 
234
232
 
235
233
class TestMemoryServer(tests.TestCase):
272
270
 
273
271
    def test_append_and_get(self):
274
272
        t = memory.MemoryTransport()
275
 
        t.append_bytes('path', 'content')
276
 
        self.assertEqual(t.get('path').read(), 'content')
 
273
        t.append_bytes('path', b'content')
 
274
        self.assertEqual(t.get('path').read(), b'content')
277
275
        t.append_file('path', BytesIO(b'content'))
278
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
276
        with t.get('path') as f:
 
277
            self.assertEqual(f.read(), b'contentcontent')
279
278
 
280
279
    def test_put_and_get(self):
281
280
        t = memory.MemoryTransport()
282
281
        t.put_file('path', BytesIO(b'content'))
283
 
        self.assertEqual(t.get('path').read(), 'content')
284
 
        t.put_bytes('path', 'content')
285
 
        self.assertEqual(t.get('path').read(), 'content')
 
282
        self.assertEqual(t.get('path').read(), b'content')
 
283
        t.put_bytes('path', b'content')
 
284
        self.assertEqual(t.get('path').read(), b'content')
286
285
 
287
286
    def test_append_without_dir_fails(self):
288
287
        t = memory.MemoryTransport()
289
288
        self.assertRaises(errors.NoSuchFile,
290
 
                          t.append_bytes, 'dir/path', 'content')
 
289
                          t.append_bytes, 'dir/path', b'content')
291
290
 
292
291
    def test_put_without_dir_fails(self):
293
292
        t = memory.MemoryTransport()
304
303
 
305
304
    def test_has_present(self):
306
305
        t = memory.MemoryTransport()
307
 
        t.append_bytes('foo', 'content')
 
306
        t.append_bytes('foo', b'content')
308
307
        self.assertEqual(True, t.has('foo'))
309
308
 
310
309
    def test_list_dir(self):
311
310
        t = memory.MemoryTransport()
312
 
        t.put_bytes('foo', 'content')
 
311
        t.put_bytes('foo', b'content')
313
312
        t.mkdir('dir')
314
 
        t.put_bytes('dir/subfoo', 'content')
315
 
        t.put_bytes('dirlike', 'content')
 
313
        t.put_bytes('dir/subfoo', b'content')
 
314
        t.put_bytes('dirlike', b'content')
316
315
 
317
316
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
318
317
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
320
319
    def test_mkdir(self):
321
320
        t = memory.MemoryTransport()
322
321
        t.mkdir('dir')
323
 
        t.append_bytes('dir/path', 'content')
324
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
322
        t.append_bytes('dir/path', b'content')
 
323
        with t.get('dir/path') as f:
 
324
            self.assertEqual(f.read(), b'content')
325
325
 
326
326
    def test_mkdir_missing_parent(self):
327
327
        t = memory.MemoryTransport()
340
340
    def test_iter_files_recursive(self):
341
341
        t = memory.MemoryTransport()
342
342
        t.mkdir('dir')
343
 
        t.put_bytes('dir/foo', 'content')
344
 
        t.put_bytes('dir/bar', 'content')
345
 
        t.put_bytes('bar', 'content')
 
343
        t.put_bytes('dir/foo', b'content')
 
344
        t.put_bytes('dir/bar', b'content')
 
345
        t.put_bytes('bar', b'content')
346
346
        paths = set(t.iter_files_recursive())
347
347
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
348
348
 
349
349
    def test_stat(self):
350
350
        t = memory.MemoryTransport()
351
 
        t.put_bytes('foo', 'content')
352
 
        t.put_bytes('bar', 'phowar')
 
351
        t.put_bytes('foo', b'content')
 
352
        t.put_bytes('bar', b'phowar')
353
353
        self.assertEqual(7, t.stat('foo').st_size)
354
354
        self.assertEqual(6, t.stat('bar').st_size)
355
355
 
455
455
        """Check all expected transport hook points are set up"""
456
456
        hookpoint = transport.TransportHooks()
457
457
        self.assertTrue("post_connect" in hookpoint,
458
 
            "post_connect not in %s" % (hookpoint,))
 
458
                        "post_connect not in %s" % (hookpoint,))
459
459
 
460
460
    def test_post_connect(self):
461
461
        """Ensure the post_connect hook is called when _set_transport is"""
462
462
        calls = []
463
463
        transport.Transport.hooks.install_named_hook("post_connect",
464
 
            calls.append, None)
 
464
                                                     calls.append, None)
465
465
        t = self._get_connected_transport()
466
466
        self.assertLength(0, calls)
467
467
        t._set_connection("connection", "auth")
490
490
        :param filter_func: by default this will be a no-op function.  Use this
491
491
            parameter to override it."""
492
492
        if filter_func is None:
493
 
            filter_func = lambda x: x
 
493
            def filter_func(x):
 
494
                return x
494
495
        server = pathfilter.PathFilteringServer(
495
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
496
            transport.get_transport_from_url('memory:///foo/bar/'),
 
497
            filter_func)
496
498
        server.start_server()
497
499
        self.addCleanup(server.stop_server)
498
500
        return transport.get_transport_from_url(server.get_url())
706
708
        t = transport.get_transport_from_path(self.test_dir)
707
709
        self.assertIsInstance(t, local.LocalTransport)
708
710
        self.assertEqual(t.base.rstrip("/"),
709
 
            urlutils.local_path_to_url(self.test_dir))
 
711
                         urlutils.local_path_to_url(self.test_dir))
710
712
 
711
713
    def test_with_url(self):
712
714
        t = transport.get_transport_from_path("file:")
713
715
        self.assertIsInstance(t, local.LocalTransport)
714
 
        self.assertEqual(t.base.rstrip("/"),
 
716
        self.assertEqual(
 
717
            t.base.rstrip("/"),
715
718
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
716
719
 
717
720
 
719
722
 
720
723
    def test_with_path(self):
721
724
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
722
 
            self.test_dir)
 
725
                          self.test_dir)
723
726
 
724
727
    def test_with_url(self):
725
728
        url = urlutils.local_path_to_url(self.test_dir)
728
731
        self.assertEqual(t.base.rstrip("/"), url)
729
732
 
730
733
    def test_with_url_and_segment_parameters(self):
731
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
734
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
732
735
        t = transport.get_transport_from_url(url)
733
736
        self.assertIsInstance(t, local.LocalTransport)
734
737
        self.assertEqual(t.base.rstrip("/"), url)
746
749
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
747
750
 
748
751
    def test_get_transport_from_relpath(self):
749
 
        here = osutils.abspath('.')
750
752
        t = transport.get_transport('.')
751
753
        self.assertIsInstance(t, local.LocalTransport)
752
754
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
776
778
        # See https://bugs.launchpad.net/bzr/+bug/606537
777
779
        here = osutils.abspath('.')
778
780
        t = transport.get_transport(here)
 
781
 
779
782
        def fake_chmod(path, mode):
780
783
            e = OSError('permission denied')
781
784
            e.errno = errno.EPERM
791
794
 
792
795
    def test_local_fdatasync_calls_fdatasync(self):
793
796
        """Check fdatasync on a stream tries to flush the data to the OS.
794
 
        
 
797
 
795
798
        We can't easily observe the external effect but we can at least see
796
799
        it's called.
797
800
        """
802
805
        t = self.get_transport('.')
803
806
        calls = self.recordCalls(os, 'fdatasync')
804
807
        w = t.open_write_stream('out')
805
 
        w.write('foo')
 
808
        w.write(b'foo')
806
809
        w.fdatasync()
807
810
        with open('out', 'rb') as f:
808
811
            # Should have been flushed.
809
 
            self.assertEqual(f.read(), 'foo')
 
812
            self.assertEqual(f.read(), b'foo')
810
813
        self.assertEqual(len(calls), 1, calls)
811
814
 
812
815
    def test_missing_directory(self):
870
873
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
871
874
 
872
875
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
873
 
            'sub')
 
876
                         'sub')
874
877
        self.assertRaises(errors.PathNotChild, t.relpath,
875
878
                          'http://user@host.com/abs/path/sub')
876
879
        self.assertRaises(errors.PathNotChild, t.relpath,
920
923
 
921
924
    def test_reuse_same_transport(self):
922
925
        possible_transports = []
923
 
        t1 = transport.get_transport_from_url('http://foo/',
924
 
                                     possible_transports=possible_transports)
 
926
        t1 = transport.get_transport_from_url(
 
927
            'http://foo/', possible_transports=possible_transports)
925
928
        self.assertEqual([t1], possible_transports)
926
929
        t2 = transport.get_transport_from_url('http://foo/',
927
 
                                     possible_transports=[t1])
 
930
                                              possible_transports=[t1])
928
931
        self.assertIs(t1, t2)
929
932
 
930
933
        # Also check that final '/' are handled correctly
931
934
        t3 = transport.get_transport_from_url('http://foo/path/')
932
935
        t4 = transport.get_transport_from_url('http://foo/path',
933
 
                                     possible_transports=[t3])
 
936
                                              possible_transports=[t3])
934
937
        self.assertIs(t3, t4)
935
938
 
936
939
        t5 = transport.get_transport_from_url('http://foo/path')
937
940
        t6 = transport.get_transport_from_url('http://foo/path/',
938
 
                                     possible_transports=[t5])
 
941
                                              possible_transports=[t5])
939
942
        self.assertIs(t5, t6)
940
943
 
941
944
    def test_don_t_reuse_different_transport(self):
942
945
        t1 = transport.get_transport_from_url('http://foo/path')
943
946
        t2 = transport.get_transport_from_url('http://bar/path',
944
 
                                     possible_transports=[t1])
 
947
                                              possible_transports=[t1])
945
948
        self.assertIsNot(t1, t2)
946
949
 
947
950
 
964
967
    # changes; so there is little return doing that.
965
968
    def test_get(self):
966
969
        t = transport.get_transport_from_url('trace+memory:///')
967
 
        t.put_bytes('foo', 'barish')
 
970
        t.put_bytes('foo', b'barish')
968
971
        t.get('foo')
969
972
        expected_result = []
970
973
        # put_bytes records the bytes, not the content to avoid memory
976
979
 
977
980
    def test_readv(self):
978
981
        t = transport.get_transport_from_url('trace+memory:///')
979
 
        t.put_bytes('foo', 'barish')
 
982
        t.put_bytes('foo', b'barish')
980
983
        list(t.readv('foo', [(0, 1), (3, 2)],
981
984
                     adjust_for_latency=True, upper_limit=6))
982
985
        expected_result = []
1022
1025
                self.test.command_executed.append(command)
1023
1026
                proc = subprocess.Popen(
1024
1027
                    command, shell=True, stdin=subprocess.PIPE,
1025
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
1028
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
 
1029
                    bufsize=0)
1026
1030
 
1027
1031
                # XXX: horribly inefficient, not to mention ugly.
1028
1032
                # Start a thread for each of stdin/out/err, and relay bytes
1030
1034
                def ferry_bytes(read, write, close):
1031
1035
                    while True:
1032
1036
                        bytes = read(1)
1033
 
                        if bytes == '':
 
1037
                        if bytes == b'':
1034
1038
                            close()
1035
1039
                            break
1036
1040
                        write(bytes)
1077
1081
        t.mkdir('foo')
1078
1082
 
1079
1083
        self.assertEqual(
1080
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1084
            [b'%s serve --inet --directory=/ --allow-writes' %
 
1085
                bzr_remote_path.encode()],
1081
1086
            self.command_executed)
1082
1087
        # Make sure to disconnect, so that the remote process can stop, and we
1083
1088
        # can cleanup. Then pause the test until everything is shutdown
1100
1105
        result = http.unhtml_roughly(fake_html)
1101
1106
        self.assertEqual(len(result), 1000)
1102
1107
        self.assertStartsWith(result, " something!")
1103
 
 
1104
 
 
1105
 
class SomeDirectory(object):
1106
 
 
1107
 
    def look_up(self, name, url):
1108
 
        return "http://bar"
1109
 
 
1110
 
 
1111
 
class TestLocationToUrl(tests.TestCase):
1112
 
 
1113
 
    def get_base_location(self):
1114
 
        path = osutils.abspath('/foo/bar')
1115
 
        if path.startswith('/'):
1116
 
            url = 'file://%s' % (path,)
1117
 
        else:
1118
 
            # On Windows, abspaths start with the drive letter, so we have to
1119
 
            # add in the extra '/'
1120
 
            url = 'file:///%s' % (path,)
1121
 
        return path, url
1122
 
 
1123
 
    def test_regular_url(self):
1124
 
        self.assertEqual("file://foo", location_to_url("file://foo"))
1125
 
 
1126
 
    def test_directory(self):
1127
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1128
 
        self.addCleanup(directories.remove, "bar:")
1129
 
        self.assertEqual("http://bar", location_to_url("bar:"))
1130
 
 
1131
 
    def test_unicode_url(self):
1132
 
        self.assertRaises(urlutils.InvalidURL, location_to_url,
1133
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1134
 
 
1135
 
    def test_unicode_path(self):
1136
 
        path, url = self.get_base_location()
1137
 
        location = path + "\xc3\xaf".decode("utf-8")
1138
 
        url += '%C3%AF'
1139
 
        self.assertEqual(url, location_to_url(location))
1140
 
 
1141
 
    def test_path(self):
1142
 
        path, url = self.get_base_location()
1143
 
        self.assertEqual(url, location_to_url(path))
1144
 
 
1145
 
    def test_relative_file_url(self):
1146
 
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1147
 
            location_to_url("file:bar"))
1148
 
 
1149
 
    def test_absolute_file_url(self):
1150
 
        self.assertEqual("file:///bar", location_to_url("file:/bar"))