/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
import errno
19
 
from io import BytesIO
20
19
import os
21
20
import subprocess
22
21
import sys
29
28
    transport,
30
29
    urlutils,
31
30
    )
 
31
from ..directory_service import directories
 
32
from ..sixish import (
 
33
    BytesIO,
 
34
    )
32
35
from ..transport import (
33
36
    chroot,
34
37
    fakenfs,
35
38
    http,
36
39
    local,
 
40
    location_to_url,
37
41
    memory,
38
42
    pathfilter,
39
43
    readonly,
69
73
        transport._clear_protocol_handlers()
70
74
        transport.register_transport_proto('foo')
71
75
        transport.register_lazy_transport('foo',
72
 
                                          'breezy.tests.test_transport',
73
 
                                          'TestTransport.SampleHandler')
 
76
                                            'breezy.tests.test_transport',
 
77
                                            'TestTransport.SampleHandler')
74
78
        transport.register_transport_proto('bar')
75
79
        transport.register_lazy_transport('bar',
76
 
                                          'breezy.tests.test_transport',
77
 
                                          'TestTransport.SampleHandler')
 
80
                                            'breezy.tests.test_transport',
 
81
                                            'TestTransport.SampleHandler')
78
82
        self.assertEqual([SampleHandler.__module__,
79
 
                          'breezy.transport.chroot',
80
 
                          'breezy.transport.pathfilter'],
81
 
                         transport._get_transport_modules())
 
83
                            'breezy.transport.chroot',
 
84
                            'breezy.transport.pathfilter'],
 
85
                            transport._get_transport_modules())
82
86
 
83
87
    def test_transport_dependency(self):
84
88
        """Transport with missing dependency causes no error"""
92
96
        try:
93
97
            transport.get_transport_from_url('foo://fooserver/foo')
94
98
        except errors.UnsupportedProtocol as e:
 
99
            e_str = str(e)
95
100
            self.assertEqual('Unsupported protocol'
96
 
                             ' for url "foo://fooserver/foo":'
97
 
                             ' Unable to import library "some_lib":'
98
 
                             ' testing missing dependency', str(e))
 
101
                                ' for url "foo://fooserver/foo":'
 
102
                                ' Unable to import library "some_lib":'
 
103
                                ' testing missing dependency', str(e))
99
104
        else:
100
105
            self.fail('Did not raise UnsupportedProtocol')
101
106
 
117
122
        try:
118
123
            transport.get_transport_from_url('ssh://fooserver/foo')
119
124
        except errors.UnsupportedProtocol as e:
120
 
            self.assertEqual(
121
 
                'Unsupported protocol'
122
 
                ' for url "ssh://fooserver/foo":'
123
 
                ' Use bzr+ssh for Bazaar operations over SSH, '
124
 
                'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
125
 
                'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
126
 
                str(e))
 
125
            e_str = str(e)
 
126
            self.assertEqual('Unsupported protocol'
 
127
                              ' for url "ssh://fooserver/foo":'
 
128
                              ' bzr supports bzr+ssh to operate over ssh,'
 
129
                              ' use "bzr+ssh://fooserver/foo".',
 
130
                              str(e))
127
131
        else:
128
132
            self.fail('Did not raise UnsupportedProtocol')
129
133
 
162
166
    def test_coalesce_unrelated(self):
163
167
        self.check([(0, 10, [(0, 10)]),
164
168
                    (20, 10, [(0, 10)]),
165
 
                    ], [(0, 10), (20, 10)])
 
169
                   ], [(0, 10), (20, 10)])
166
170
 
167
171
    def test_coalesce_unsorted(self):
168
172
        self.check([(20, 10, [(0, 10)]),
169
173
                    (0, 10, [(0, 10)]),
170
 
                    ], [(20, 10), (0, 10)])
 
174
                   ], [(20, 10), (0, 10)])
171
175
 
172
176
    def test_coalesce_nearby(self):
173
177
        self.check([(0, 20, [(0, 10), (10, 10)])],
175
179
 
176
180
    def test_coalesce_overlapped(self):
177
181
        self.assertRaises(ValueError,
178
 
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
179
 
                          [(0, 10), (5, 10)])
 
182
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
183
                        [(0, 10), (5, 10)])
180
184
 
181
185
    def test_coalesce_limit(self):
182
186
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
183
187
                              (30, 10), (40, 10)]),
184
188
                    (60, 50, [(0, 10), (10, 10), (20, 10),
185
189
                              (30, 10), (40, 10)]),
186
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
187
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
188
 
                        (90, 10), (100, 10)],
189
 
                   limit=5)
 
190
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
191
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
192
                       (90, 10), (100, 10)],
 
193
                    limit=5)
190
194
 
191
195
    def test_coalesce_no_limit(self):
192
196
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
193
197
                               (30, 10), (40, 10), (50, 10),
194
198
                               (60, 10), (70, 10), (80, 10),
195
199
                               (90, 10)]),
196
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
197
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
198
 
                        (90, 10), (100, 10)])
 
200
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
201
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
202
                       (90, 10), (100, 10)])
199
203
 
200
204
    def test_coalesce_fudge(self):
201
205
        self.check([(10, 30, [(0, 10), (20, 10)]),
202
206
                    (100, 10, [(0, 10)]),
203
 
                    ], [(10, 10), (30, 10), (100, 10)],
 
207
                   ], [(10, 10), (30, 10), (100, 10)],
204
208
                   fudge=10)
205
209
 
206
210
    def test_coalesce_max_size(self):
208
212
                    (30, 50, [(0, 50)]),
209
213
                    # If one range is above max_size, it gets its own coalesced
210
214
                    # offset
211
 
                    (100, 80, [(0, 80)]), ],
 
215
                    (100, 80, [(0, 80)]),],
212
216
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
213
217
                   max_size=50)
214
218
 
215
219
    def test_coalesce_no_max_size(self):
216
220
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
217
221
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
218
 
                   )
 
222
                  )
219
223
 
220
224
    def test_coalesce_default_limit(self):
221
225
        # By default we use a 100MB max size.
222
226
        ten_mb = 10 * 1024 * 1024
223
 
        self.check(
224
 
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
225
 
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
226
 
            [(i * ten_mb, ten_mb) for i in range(11)])
227
 
        self.check(
228
 
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
 
            [(i * ten_mb, ten_mb) for i in range(11)],
230
 
            max_size=1 * 1024 * 1024 * 1024)
 
227
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
228
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
229
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
230
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
231
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
232
                   max_size=1*1024*1024*1024)
231
233
 
232
234
 
233
235
class TestMemoryServer(tests.TestCase):
270
272
 
271
273
    def test_append_and_get(self):
272
274
        t = memory.MemoryTransport()
273
 
        t.append_bytes('path', b'content')
274
 
        self.assertEqual(t.get('path').read(), b'content')
 
275
        t.append_bytes('path', 'content')
 
276
        self.assertEqual(t.get('path').read(), 'content')
275
277
        t.append_file('path', BytesIO(b'content'))
276
 
        with t.get('path') as f:
277
 
            self.assertEqual(f.read(), b'contentcontent')
 
278
        self.assertEqual(t.get('path').read(), 'contentcontent')
278
279
 
279
280
    def test_put_and_get(self):
280
281
        t = memory.MemoryTransport()
281
282
        t.put_file('path', BytesIO(b'content'))
282
 
        self.assertEqual(t.get('path').read(), b'content')
283
 
        t.put_bytes('path', b'content')
284
 
        self.assertEqual(t.get('path').read(), b'content')
 
283
        self.assertEqual(t.get('path').read(), 'content')
 
284
        t.put_bytes('path', 'content')
 
285
        self.assertEqual(t.get('path').read(), 'content')
285
286
 
286
287
    def test_append_without_dir_fails(self):
287
288
        t = memory.MemoryTransport()
288
289
        self.assertRaises(errors.NoSuchFile,
289
 
                          t.append_bytes, 'dir/path', b'content')
 
290
                          t.append_bytes, 'dir/path', 'content')
290
291
 
291
292
    def test_put_without_dir_fails(self):
292
293
        t = memory.MemoryTransport()
303
304
 
304
305
    def test_has_present(self):
305
306
        t = memory.MemoryTransport()
306
 
        t.append_bytes('foo', b'content')
 
307
        t.append_bytes('foo', 'content')
307
308
        self.assertEqual(True, t.has('foo'))
308
309
 
309
310
    def test_list_dir(self):
310
311
        t = memory.MemoryTransport()
311
 
        t.put_bytes('foo', b'content')
 
312
        t.put_bytes('foo', 'content')
312
313
        t.mkdir('dir')
313
 
        t.put_bytes('dir/subfoo', b'content')
314
 
        t.put_bytes('dirlike', b'content')
 
314
        t.put_bytes('dir/subfoo', 'content')
 
315
        t.put_bytes('dirlike', 'content')
315
316
 
316
317
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
317
318
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
319
320
    def test_mkdir(self):
320
321
        t = memory.MemoryTransport()
321
322
        t.mkdir('dir')
322
 
        t.append_bytes('dir/path', b'content')
323
 
        with t.get('dir/path') as f:
324
 
            self.assertEqual(f.read(), b'content')
 
323
        t.append_bytes('dir/path', 'content')
 
324
        self.assertEqual(t.get('dir/path').read(), 'content')
325
325
 
326
326
    def test_mkdir_missing_parent(self):
327
327
        t = memory.MemoryTransport()
340
340
    def test_iter_files_recursive(self):
341
341
        t = memory.MemoryTransport()
342
342
        t.mkdir('dir')
343
 
        t.put_bytes('dir/foo', b'content')
344
 
        t.put_bytes('dir/bar', b'content')
345
 
        t.put_bytes('bar', b'content')
 
343
        t.put_bytes('dir/foo', 'content')
 
344
        t.put_bytes('dir/bar', 'content')
 
345
        t.put_bytes('bar', 'content')
346
346
        paths = set(t.iter_files_recursive())
347
347
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
348
348
 
349
349
    def test_stat(self):
350
350
        t = memory.MemoryTransport()
351
 
        t.put_bytes('foo', b'content')
352
 
        t.put_bytes('bar', b'phowar')
 
351
        t.put_bytes('foo', 'content')
 
352
        t.put_bytes('bar', 'phowar')
353
353
        self.assertEqual(7, t.stat('foo').st_size)
354
354
        self.assertEqual(6, t.stat('bar').st_size)
355
355
 
411
411
        self.start_server(server)
412
412
        t = transport.get_transport_from_url(server.get_url())
413
413
        self.assertRaises(
414
 
            urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
 
414
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
415
415
 
416
416
 
417
417
class TestChrootServer(tests.TestCase):
455
455
        """Check all expected transport hook points are set up"""
456
456
        hookpoint = transport.TransportHooks()
457
457
        self.assertTrue("post_connect" in hookpoint,
458
 
                        "post_connect not in %s" % (hookpoint,))
 
458
            "post_connect not in %s" % (hookpoint,))
459
459
 
460
460
    def test_post_connect(self):
461
461
        """Ensure the post_connect hook is called when _set_transport is"""
462
462
        calls = []
463
463
        transport.Transport.hooks.install_named_hook("post_connect",
464
 
                                                     calls.append, None)
 
464
            calls.append, None)
465
465
        t = self._get_connected_transport()
466
466
        self.assertLength(0, calls)
467
467
        t._set_connection("connection", "auth")
490
490
        :param filter_func: by default this will be a no-op function.  Use this
491
491
            parameter to override it."""
492
492
        if filter_func is None:
493
 
            def filter_func(x):
494
 
                return x
 
493
            filter_func = lambda x: x
495
494
        server = pathfilter.PathFilteringServer(
496
 
            transport.get_transport_from_url('memory:///foo/bar/'),
497
 
            filter_func)
 
495
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
498
496
        server.start_server()
499
497
        self.addCleanup(server.stop_server)
500
498
        return transport.get_transport_from_url(server.get_url())
708
706
        t = transport.get_transport_from_path(self.test_dir)
709
707
        self.assertIsInstance(t, local.LocalTransport)
710
708
        self.assertEqual(t.base.rstrip("/"),
711
 
                         urlutils.local_path_to_url(self.test_dir))
 
709
            urlutils.local_path_to_url(self.test_dir))
712
710
 
713
711
    def test_with_url(self):
714
712
        t = transport.get_transport_from_path("file:")
715
713
        self.assertIsInstance(t, local.LocalTransport)
716
 
        self.assertEqual(
717
 
            t.base.rstrip("/"),
 
714
        self.assertEqual(t.base.rstrip("/"),
718
715
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
719
716
 
720
717
 
721
718
class TestTransportFromUrl(tests.TestCaseInTempDir):
722
719
 
723
720
    def test_with_path(self):
724
 
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
725
 
                          self.test_dir)
 
721
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
722
            self.test_dir)
726
723
 
727
724
    def test_with_url(self):
728
725
        url = urlutils.local_path_to_url(self.test_dir)
731
728
        self.assertEqual(t.base.rstrip("/"), url)
732
729
 
733
730
    def test_with_url_and_segment_parameters(self):
734
 
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
 
731
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
735
732
        t = transport.get_transport_from_url(url)
736
733
        self.assertIsInstance(t, local.LocalTransport)
737
734
        self.assertEqual(t.base.rstrip("/"), url)
749
746
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
750
747
 
751
748
    def test_get_transport_from_relpath(self):
 
749
        here = osutils.abspath('.')
752
750
        t = transport.get_transport('.')
753
751
        self.assertIsInstance(t, local.LocalTransport)
754
752
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
778
776
        # See https://bugs.launchpad.net/bzr/+bug/606537
779
777
        here = osutils.abspath('.')
780
778
        t = transport.get_transport(here)
781
 
 
782
779
        def fake_chmod(path, mode):
783
780
            e = OSError('permission denied')
784
781
            e.errno = errno.EPERM
794
791
 
795
792
    def test_local_fdatasync_calls_fdatasync(self):
796
793
        """Check fdatasync on a stream tries to flush the data to the OS.
797
 
 
 
794
        
798
795
        We can't easily observe the external effect but we can at least see
799
796
        it's called.
800
797
        """
805
802
        t = self.get_transport('.')
806
803
        calls = self.recordCalls(os, 'fdatasync')
807
804
        w = t.open_write_stream('out')
808
 
        w.write(b'foo')
 
805
        w.write('foo')
809
806
        w.fdatasync()
810
807
        with open('out', 'rb') as f:
811
808
            # Should have been flushed.
812
 
            self.assertEqual(f.read(), b'foo')
 
809
            self.assertEqual(f.read(), 'foo')
813
810
        self.assertEqual(len(calls), 1, calls)
814
811
 
815
812
    def test_missing_directory(self):
865
862
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
866
863
 
867
864
    def test_parse_invalid_url(self):
868
 
        self.assertRaises(urlutils.InvalidURL,
 
865
        self.assertRaises(errors.InvalidURL,
869
866
                          transport.ConnectedTransport,
870
867
                          'sftp://lily.org:~janneke/public/bzr/gub')
871
868
 
873
870
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
874
871
 
875
872
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
876
 
                         'sub')
 
873
            'sub')
877
874
        self.assertRaises(errors.PathNotChild, t.relpath,
878
875
                          'http://user@host.com/abs/path/sub')
879
876
        self.assertRaises(errors.PathNotChild, t.relpath,
923
920
 
924
921
    def test_reuse_same_transport(self):
925
922
        possible_transports = []
926
 
        t1 = transport.get_transport_from_url(
927
 
            'http://foo/', possible_transports=possible_transports)
 
923
        t1 = transport.get_transport_from_url('http://foo/',
 
924
                                     possible_transports=possible_transports)
928
925
        self.assertEqual([t1], possible_transports)
929
926
        t2 = transport.get_transport_from_url('http://foo/',
930
 
                                              possible_transports=[t1])
 
927
                                     possible_transports=[t1])
931
928
        self.assertIs(t1, t2)
932
929
 
933
930
        # Also check that final '/' are handled correctly
934
931
        t3 = transport.get_transport_from_url('http://foo/path/')
935
932
        t4 = transport.get_transport_from_url('http://foo/path',
936
 
                                              possible_transports=[t3])
 
933
                                     possible_transports=[t3])
937
934
        self.assertIs(t3, t4)
938
935
 
939
936
        t5 = transport.get_transport_from_url('http://foo/path')
940
937
        t6 = transport.get_transport_from_url('http://foo/path/',
941
 
                                              possible_transports=[t5])
 
938
                                     possible_transports=[t5])
942
939
        self.assertIs(t5, t6)
943
940
 
944
941
    def test_don_t_reuse_different_transport(self):
945
942
        t1 = transport.get_transport_from_url('http://foo/path')
946
943
        t2 = transport.get_transport_from_url('http://bar/path',
947
 
                                              possible_transports=[t1])
 
944
                                     possible_transports=[t1])
948
945
        self.assertIsNot(t1, t2)
949
946
 
950
947
 
967
964
    # changes; so there is little return doing that.
968
965
    def test_get(self):
969
966
        t = transport.get_transport_from_url('trace+memory:///')
970
 
        t.put_bytes('foo', b'barish')
 
967
        t.put_bytes('foo', 'barish')
971
968
        t.get('foo')
972
969
        expected_result = []
973
970
        # put_bytes records the bytes, not the content to avoid memory
979
976
 
980
977
    def test_readv(self):
981
978
        t = transport.get_transport_from_url('trace+memory:///')
982
 
        t.put_bytes('foo', b'barish')
 
979
        t.put_bytes('foo', 'barish')
983
980
        list(t.readv('foo', [(0, 1), (3, 2)],
984
981
                     adjust_for_latency=True, upper_limit=6))
985
982
        expected_result = []
1025
1022
                self.test.command_executed.append(command)
1026
1023
                proc = subprocess.Popen(
1027
1024
                    command, shell=True, stdin=subprocess.PIPE,
1028
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1029
 
                    bufsize=0)
 
1025
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1030
1026
 
1031
1027
                # XXX: horribly inefficient, not to mention ugly.
1032
1028
                # Start a thread for each of stdin/out/err, and relay bytes
1034
1030
                def ferry_bytes(read, write, close):
1035
1031
                    while True:
1036
1032
                        bytes = read(1)
1037
 
                        if bytes == b'':
 
1033
                        if bytes == '':
1038
1034
                            close()
1039
1035
                            break
1040
1036
                        write(bytes)
1081
1077
        t.mkdir('foo')
1082
1078
 
1083
1079
        self.assertEqual(
1084
 
            [b'%s serve --inet --directory=/ --allow-writes' %
1085
 
                bzr_remote_path.encode()],
 
1080
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1086
1081
            self.command_executed)
1087
1082
        # Make sure to disconnect, so that the remote process can stop, and we
1088
1083
        # can cleanup. Then pause the test until everything is shutdown
1105
1100
        result = http.unhtml_roughly(fake_html)
1106
1101
        self.assertEqual(len(result), 1000)
1107
1102
        self.assertStartsWith(result, " something!")
 
1103
 
 
1104
 
 
1105
class SomeDirectory(object):
 
1106
 
 
1107
    def look_up(self, name, url):
 
1108
        return "http://bar"
 
1109
 
 
1110
 
 
1111
class TestLocationToUrl(tests.TestCase):
 
1112
 
 
1113
    def get_base_location(self):
 
1114
        path = osutils.abspath('/foo/bar')
 
1115
        if path.startswith('/'):
 
1116
            url = 'file://%s' % (path,)
 
1117
        else:
 
1118
            # On Windows, abspaths start with the drive letter, so we have to
 
1119
            # add in the extra '/'
 
1120
            url = 'file:///%s' % (path,)
 
1121
        return path, url
 
1122
 
 
1123
    def test_regular_url(self):
 
1124
        self.assertEqual("file://foo", location_to_url("file://foo"))
 
1125
 
 
1126
    def test_directory(self):
 
1127
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1128
        self.addCleanup(directories.remove, "bar:")
 
1129
        self.assertEqual("http://bar", location_to_url("bar:"))
 
1130
 
 
1131
    def test_unicode_url(self):
 
1132
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1133
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1134
 
 
1135
    def test_unicode_path(self):
 
1136
        path, url = self.get_base_location()
 
1137
        location = path + "\xc3\xaf".decode("utf-8")
 
1138
        url += '%C3%AF'
 
1139
        self.assertEqual(url, location_to_url(location))
 
1140
 
 
1141
    def test_path(self):
 
1142
        path, url = self.get_base_location()
 
1143
        self.assertEqual(url, location_to_url(path))
 
1144
 
 
1145
    def test_relative_file_url(self):
 
1146
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
 
1147
            location_to_url("file:bar"))
 
1148
 
 
1149
    def test_absolute_file_url(self):
 
1150
        self.assertEqual("file:///bar", location_to_url("file:/bar"))