/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
 
18
from cStringIO import StringIO
18
19
import errno
19
 
from io import BytesIO
20
20
import os
21
21
import subprocess
22
22
import sys
23
23
import threading
24
24
 
25
 
from .. import (
 
25
from brzlib import (
26
26
    errors,
27
27
    osutils,
28
28
    tests,
29
29
    transport,
30
30
    urlutils,
31
31
    )
32
 
from ..transport import (
 
32
from brzlib.directory_service import directories
 
33
from brzlib.transport import (
33
34
    chroot,
34
35
    fakenfs,
35
36
    http,
36
37
    local,
 
38
    location_to_url,
37
39
    memory,
38
40
    pathfilter,
39
41
    readonly,
40
42
    )
41
 
import breezy.transport.trace
42
 
from . import (
 
43
import brzlib.transport.trace
 
44
from brzlib.tests import (
43
45
    features,
44
46
    test_server,
45
47
    )
69
71
        transport._clear_protocol_handlers()
70
72
        transport.register_transport_proto('foo')
71
73
        transport.register_lazy_transport('foo',
72
 
                                          'breezy.tests.test_transport',
73
 
                                          'TestTransport.SampleHandler')
 
74
                                            'brzlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
74
76
        transport.register_transport_proto('bar')
75
77
        transport.register_lazy_transport('bar',
76
 
                                          'breezy.tests.test_transport',
77
 
                                          'TestTransport.SampleHandler')
 
78
                                            'brzlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
78
80
        self.assertEqual([SampleHandler.__module__,
79
 
                          'breezy.transport.chroot',
80
 
                          'breezy.transport.pathfilter'],
81
 
                         transport._get_transport_modules())
 
81
                            'brzlib.transport.chroot',
 
82
                            'brzlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
82
84
 
83
85
    def test_transport_dependency(self):
84
86
        """Transport with missing dependency causes no error"""
88
90
        transport._clear_protocol_handlers()
89
91
        transport.register_transport_proto('foo')
90
92
        transport.register_lazy_transport(
91
 
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
 
93
            'foo', 'brzlib.tests.test_transport', 'BadTransportHandler')
92
94
        try:
93
95
            transport.get_transport_from_url('foo://fooserver/foo')
94
 
        except errors.UnsupportedProtocol as e:
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
95
98
            self.assertEqual('Unsupported protocol'
96
 
                             ' for url "foo://fooserver/foo":'
97
 
                             ' Unable to import library "some_lib":'
98
 
                             ' testing missing dependency', str(e))
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
99
102
        else:
100
103
            self.fail('Did not raise UnsupportedProtocol')
101
104
 
106
109
        transport._clear_protocol_handlers()
107
110
        transport.register_transport_proto('foo')
108
111
        transport.register_lazy_transport(
109
 
            'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
 
112
            'foo', 'brzlib.tests.test_transport', 'BackupTransportHandler')
110
113
        transport.register_lazy_transport(
111
 
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
 
114
            'foo', 'brzlib.tests.test_transport', 'BadTransportHandler')
112
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
113
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
114
117
 
116
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
117
120
        try:
118
121
            transport.get_transport_from_url('ssh://fooserver/foo')
119
 
        except errors.UnsupportedProtocol as e:
120
 
            self.assertEqual(
121
 
                'Unsupported protocol'
122
 
                ' for url "ssh://fooserver/foo":'
123
 
                ' Use bzr+ssh for Bazaar operations over SSH, '
124
 
                'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
125
 
                'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
126
 
                str(e))
 
122
        except errors.UnsupportedProtocol, e:
 
123
            e_str = str(e)
 
124
            self.assertEqual('Unsupported protocol'
 
125
                              ' for url "ssh://fooserver/foo":'
 
126
                              ' bzr supports bzr+ssh to operate over ssh,'
 
127
                              ' use "bzr+ssh://fooserver/foo".',
 
128
                              str(e))
127
129
        else:
128
130
            self.fail('Did not raise UnsupportedProtocol')
129
131
 
132
134
        a_file = transport.LateReadError('a path')
133
135
        try:
134
136
            a_file.read()
135
 
        except errors.ReadError as error:
 
137
        except errors.ReadError, error:
136
138
            self.assertEqual('a path', error.path)
137
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
138
140
        a_file.close()
162
164
    def test_coalesce_unrelated(self):
163
165
        self.check([(0, 10, [(0, 10)]),
164
166
                    (20, 10, [(0, 10)]),
165
 
                    ], [(0, 10), (20, 10)])
 
167
                   ], [(0, 10), (20, 10)])
166
168
 
167
169
    def test_coalesce_unsorted(self):
168
170
        self.check([(20, 10, [(0, 10)]),
169
171
                    (0, 10, [(0, 10)]),
170
 
                    ], [(20, 10), (0, 10)])
 
172
                   ], [(20, 10), (0, 10)])
171
173
 
172
174
    def test_coalesce_nearby(self):
173
175
        self.check([(0, 20, [(0, 10), (10, 10)])],
175
177
 
176
178
    def test_coalesce_overlapped(self):
177
179
        self.assertRaises(ValueError,
178
 
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
179
 
                          [(0, 10), (5, 10)])
 
180
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
181
                        [(0, 10), (5, 10)])
180
182
 
181
183
    def test_coalesce_limit(self):
182
184
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
183
185
                              (30, 10), (40, 10)]),
184
186
                    (60, 50, [(0, 10), (10, 10), (20, 10),
185
187
                              (30, 10), (40, 10)]),
186
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
187
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
188
 
                        (90, 10), (100, 10)],
189
 
                   limit=5)
 
188
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
189
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
190
                       (90, 10), (100, 10)],
 
191
                    limit=5)
190
192
 
191
193
    def test_coalesce_no_limit(self):
192
194
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
193
195
                               (30, 10), (40, 10), (50, 10),
194
196
                               (60, 10), (70, 10), (80, 10),
195
197
                               (90, 10)]),
196
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
197
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
198
 
                        (90, 10), (100, 10)])
 
198
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
199
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
200
                       (90, 10), (100, 10)])
199
201
 
200
202
    def test_coalesce_fudge(self):
201
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
202
204
                    (100, 10, [(0, 10)]),
203
 
                    ], [(10, 10), (30, 10), (100, 10)],
 
205
                   ], [(10, 10), (30, 10), (100, 10)],
204
206
                   fudge=10)
205
207
 
206
208
    def test_coalesce_max_size(self):
208
210
                    (30, 50, [(0, 50)]),
209
211
                    # If one range is above max_size, it gets its own coalesced
210
212
                    # offset
211
 
                    (100, 80, [(0, 80)]), ],
 
213
                    (100, 80, [(0, 80)]),],
212
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
213
215
                   max_size=50)
214
216
 
215
217
    def test_coalesce_no_max_size(self):
216
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
217
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
218
 
                   )
 
220
                  )
219
221
 
220
222
    def test_coalesce_default_limit(self):
221
223
        # By default we use a 100MB max size.
222
224
        ten_mb = 10 * 1024 * 1024
223
 
        self.check(
224
 
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
225
 
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
226
 
            [(i * ten_mb, ten_mb) for i in range(11)])
227
 
        self.check(
228
 
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
 
            [(i * ten_mb, ten_mb) for i in range(11)],
230
 
            max_size=1 * 1024 * 1024 * 1024)
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
230
                   max_size=1*1024*1024*1024)
231
231
 
232
232
 
233
233
class TestMemoryServer(tests.TestCase):
270
270
 
271
271
    def test_append_and_get(self):
272
272
        t = memory.MemoryTransport()
273
 
        t.append_bytes('path', b'content')
274
 
        self.assertEqual(t.get('path').read(), b'content')
275
 
        t.append_file('path', BytesIO(b'content'))
276
 
        with t.get('path') as f:
277
 
            self.assertEqual(f.read(), b'contentcontent')
 
273
        t.append_bytes('path', 'content')
 
274
        self.assertEqual(t.get('path').read(), 'content')
 
275
        t.append_file('path', StringIO('content'))
 
276
        self.assertEqual(t.get('path').read(), 'contentcontent')
278
277
 
279
278
    def test_put_and_get(self):
280
279
        t = memory.MemoryTransport()
281
 
        t.put_file('path', BytesIO(b'content'))
282
 
        self.assertEqual(t.get('path').read(), b'content')
283
 
        t.put_bytes('path', b'content')
284
 
        self.assertEqual(t.get('path').read(), b'content')
 
280
        t.put_file('path', StringIO('content'))
 
281
        self.assertEqual(t.get('path').read(), 'content')
 
282
        t.put_bytes('path', 'content')
 
283
        self.assertEqual(t.get('path').read(), 'content')
285
284
 
286
285
    def test_append_without_dir_fails(self):
287
286
        t = memory.MemoryTransport()
288
287
        self.assertRaises(errors.NoSuchFile,
289
 
                          t.append_bytes, 'dir/path', b'content')
 
288
                          t.append_bytes, 'dir/path', 'content')
290
289
 
291
290
    def test_put_without_dir_fails(self):
292
291
        t = memory.MemoryTransport()
293
292
        self.assertRaises(errors.NoSuchFile,
294
 
                          t.put_file, 'dir/path', BytesIO(b'content'))
 
293
                          t.put_file, 'dir/path', StringIO('content'))
295
294
 
296
295
    def test_get_missing(self):
297
296
        transport = memory.MemoryTransport()
303
302
 
304
303
    def test_has_present(self):
305
304
        t = memory.MemoryTransport()
306
 
        t.append_bytes('foo', b'content')
 
305
        t.append_bytes('foo', 'content')
307
306
        self.assertEqual(True, t.has('foo'))
308
307
 
309
308
    def test_list_dir(self):
310
309
        t = memory.MemoryTransport()
311
 
        t.put_bytes('foo', b'content')
 
310
        t.put_bytes('foo', 'content')
312
311
        t.mkdir('dir')
313
 
        t.put_bytes('dir/subfoo', b'content')
314
 
        t.put_bytes('dirlike', b'content')
 
312
        t.put_bytes('dir/subfoo', 'content')
 
313
        t.put_bytes('dirlike', 'content')
315
314
 
316
315
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
317
316
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
319
318
    def test_mkdir(self):
320
319
        t = memory.MemoryTransport()
321
320
        t.mkdir('dir')
322
 
        t.append_bytes('dir/path', b'content')
323
 
        with t.get('dir/path') as f:
324
 
            self.assertEqual(f.read(), b'content')
 
321
        t.append_bytes('dir/path', 'content')
 
322
        self.assertEqual(t.get('dir/path').read(), 'content')
325
323
 
326
324
    def test_mkdir_missing_parent(self):
327
325
        t = memory.MemoryTransport()
340
338
    def test_iter_files_recursive(self):
341
339
        t = memory.MemoryTransport()
342
340
        t.mkdir('dir')
343
 
        t.put_bytes('dir/foo', b'content')
344
 
        t.put_bytes('dir/bar', b'content')
345
 
        t.put_bytes('bar', b'content')
 
341
        t.put_bytes('dir/foo', 'content')
 
342
        t.put_bytes('dir/bar', 'content')
 
343
        t.put_bytes('bar', 'content')
346
344
        paths = set(t.iter_files_recursive())
347
 
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
 
345
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
348
346
 
349
347
    def test_stat(self):
350
348
        t = memory.MemoryTransport()
351
 
        t.put_bytes('foo', b'content')
352
 
        t.put_bytes('bar', b'phowar')
 
349
        t.put_bytes('foo', 'content')
 
350
        t.put_bytes('bar', 'phowar')
353
351
        self.assertEqual(7, t.stat('foo').st_size)
354
352
        self.assertEqual(6, t.stat('bar').st_size)
355
353
 
411
409
        self.start_server(server)
412
410
        t = transport.get_transport_from_url(server.get_url())
413
411
        self.assertRaises(
414
 
            urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
 
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
415
413
 
416
414
 
417
415
class TestChrootServer(tests.TestCase):
455
453
        """Check all expected transport hook points are set up"""
456
454
        hookpoint = transport.TransportHooks()
457
455
        self.assertTrue("post_connect" in hookpoint,
458
 
                        "post_connect not in %s" % (hookpoint,))
 
456
            "post_connect not in %s" % (hookpoint,))
459
457
 
460
458
    def test_post_connect(self):
461
459
        """Ensure the post_connect hook is called when _set_transport is"""
462
460
        calls = []
463
461
        transport.Transport.hooks.install_named_hook("post_connect",
464
 
                                                     calls.append, None)
 
462
            calls.append, None)
465
463
        t = self._get_connected_transport()
466
464
        self.assertLength(0, calls)
467
465
        t._set_connection("connection", "auth")
490
488
        :param filter_func: by default this will be a no-op function.  Use this
491
489
            parameter to override it."""
492
490
        if filter_func is None:
493
 
            def filter_func(x):
494
 
                return x
 
491
            filter_func = lambda x: x
495
492
        server = pathfilter.PathFilteringServer(
496
 
            transport.get_transport_from_url('memory:///foo/bar/'),
497
 
            filter_func)
 
493
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
498
494
        server.start_server()
499
495
        self.addCleanup(server.stop_server)
500
496
        return transport.get_transport_from_url(server.get_url())
565
561
        self.assertEqual(True, t.is_readonly())
566
562
 
567
563
    def test_http_parameters(self):
568
 
        from breezy.tests.http_server import HttpServer
 
564
        from brzlib.tests.http_server import HttpServer
569
565
        # connect to '.' via http which is not listable
570
566
        server = HttpServer()
571
567
        self.start_server(server)
592
588
    def test_http_parameters(self):
593
589
        # the listable and is_readonly parameters
594
590
        # are not changed by the fakenfs decorator
595
 
        from breezy.tests.http_server import HttpServer
 
591
        from brzlib.tests.http_server import HttpServer
596
592
        # connect to '.' via http which is not listable
597
593
        server = HttpServer()
598
594
        self.start_server(server)
626
622
 
627
623
    def get_vfat_transport(self, url):
628
624
        """Return vfat-backed transport for test directory"""
629
 
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
 
625
        from brzlib.transport.fakevfat import FakeVFATTransportDecorator
630
626
        return FakeVFATTransportDecorator('vfat+' + url)
631
627
 
632
628
    def test_transport_creation(self):
633
 
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
 
629
        from brzlib.transport.fakevfat import FakeVFATTransportDecorator
634
630
        t = self.get_vfat_transport('.')
635
631
        self.assertIsInstance(t, FakeVFATTransportDecorator)
636
632
 
661
657
 
662
658
    To verify a transport we need a server factory, which is a callable
663
659
    that accepts no parameters and returns an implementation of
664
 
    breezy.transport.Server.
 
660
    brzlib.transport.Server.
665
661
 
666
662
    That Server is then used to construct transport instances and test
667
663
    the transport via loopback activity.
708
704
        t = transport.get_transport_from_path(self.test_dir)
709
705
        self.assertIsInstance(t, local.LocalTransport)
710
706
        self.assertEqual(t.base.rstrip("/"),
711
 
                         urlutils.local_path_to_url(self.test_dir))
 
707
            urlutils.local_path_to_url(self.test_dir))
712
708
 
713
709
    def test_with_url(self):
714
710
        t = transport.get_transport_from_path("file:")
715
711
        self.assertIsInstance(t, local.LocalTransport)
716
 
        self.assertEqual(
717
 
            t.base.rstrip("/"),
 
712
        self.assertEqual(t.base.rstrip("/"),
718
713
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
719
714
 
720
715
 
721
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
722
717
 
723
718
    def test_with_path(self):
724
 
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
725
 
                          self.test_dir)
 
719
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
720
            self.test_dir)
726
721
 
727
722
    def test_with_url(self):
728
723
        url = urlutils.local_path_to_url(self.test_dir)
731
726
        self.assertEqual(t.base.rstrip("/"), url)
732
727
 
733
728
    def test_with_url_and_segment_parameters(self):
734
 
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
 
729
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
735
730
        t = transport.get_transport_from_url(url)
736
731
        self.assertIsInstance(t, local.LocalTransport)
737
732
        self.assertEqual(t.base.rstrip("/"), url)
749
744
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
750
745
 
751
746
    def test_get_transport_from_relpath(self):
 
747
        here = osutils.abspath('.')
752
748
        t = transport.get_transport('.')
753
749
        self.assertIsInstance(t, local.LocalTransport)
754
750
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
778
774
        # See https://bugs.launchpad.net/bzr/+bug/606537
779
775
        here = osutils.abspath('.')
780
776
        t = transport.get_transport(here)
781
 
 
782
777
        def fake_chmod(path, mode):
783
778
            e = OSError('permission denied')
784
779
            e.errno = errno.EPERM
785
780
            raise e
786
781
        self.overrideAttr(os, 'chmod', fake_chmod)
787
782
        t.mkdir('test')
788
 
        t.mkdir('test2', mode=0o707)
 
783
        t.mkdir('test2', mode=0707)
789
784
        self.assertTrue(os.path.exists('test'))
790
785
        self.assertTrue(os.path.exists('test2'))
791
786
 
794
789
 
795
790
    def test_local_fdatasync_calls_fdatasync(self):
796
791
        """Check fdatasync on a stream tries to flush the data to the OS.
797
 
 
 
792
        
798
793
        We can't easily observe the external effect but we can at least see
799
794
        it's called.
800
795
        """
805
800
        t = self.get_transport('.')
806
801
        calls = self.recordCalls(os, 'fdatasync')
807
802
        w = t.open_write_stream('out')
808
 
        w.write(b'foo')
 
803
        w.write('foo')
809
804
        w.fdatasync()
810
805
        with open('out', 'rb') as f:
811
806
            # Should have been flushed.
812
 
            self.assertEqual(f.read(), b'foo')
 
807
            self.assertEqual(f.read(), 'foo')
813
808
        self.assertEqual(len(calls), 1, calls)
814
809
 
815
810
    def test_missing_directory(self):
825
820
        # clone to root should stop at least at \\HOST part
826
821
        # not on \\
827
822
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
828
 
        for i in range(4):
 
823
        for i in xrange(4):
829
824
            t = t.clone('..')
830
825
        self.assertEqual(t.base, 'file://HOST/')
831
826
        # make sure we reach the root
865
860
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
866
861
 
867
862
    def test_parse_invalid_url(self):
868
 
        self.assertRaises(urlutils.InvalidURL,
 
863
        self.assertRaises(errors.InvalidURL,
869
864
                          transport.ConnectedTransport,
870
865
                          'sftp://lily.org:~janneke/public/bzr/gub')
871
866
 
873
868
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
874
869
 
875
870
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
876
 
                         'sub')
 
871
            'sub')
877
872
        self.assertRaises(errors.PathNotChild, t.relpath,
878
873
                          'http://user@host.com/abs/path/sub')
879
874
        self.assertRaises(errors.PathNotChild, t.relpath,
923
918
 
924
919
    def test_reuse_same_transport(self):
925
920
        possible_transports = []
926
 
        t1 = transport.get_transport_from_url(
927
 
            'http://foo/', possible_transports=possible_transports)
 
921
        t1 = transport.get_transport_from_url('http://foo/',
 
922
                                     possible_transports=possible_transports)
928
923
        self.assertEqual([t1], possible_transports)
929
924
        t2 = transport.get_transport_from_url('http://foo/',
930
 
                                              possible_transports=[t1])
 
925
                                     possible_transports=[t1])
931
926
        self.assertIs(t1, t2)
932
927
 
933
928
        # Also check that final '/' are handled correctly
934
929
        t3 = transport.get_transport_from_url('http://foo/path/')
935
930
        t4 = transport.get_transport_from_url('http://foo/path',
936
 
                                              possible_transports=[t3])
 
931
                                     possible_transports=[t3])
937
932
        self.assertIs(t3, t4)
938
933
 
939
934
        t5 = transport.get_transport_from_url('http://foo/path')
940
935
        t6 = transport.get_transport_from_url('http://foo/path/',
941
 
                                              possible_transports=[t5])
 
936
                                     possible_transports=[t5])
942
937
        self.assertIs(t5, t6)
943
938
 
944
939
    def test_don_t_reuse_different_transport(self):
945
940
        t1 = transport.get_transport_from_url('http://foo/path')
946
941
        t2 = transport.get_transport_from_url('http://bar/path',
947
 
                                              possible_transports=[t1])
 
942
                                     possible_transports=[t1])
948
943
        self.assertIsNot(t1, t2)
949
944
 
950
945
 
953
948
    def test_decorator(self):
954
949
        t = transport.get_transport_from_url('trace+memory://')
955
950
        self.assertIsInstance(
956
 
            t, breezy.transport.trace.TransportTraceDecorator)
 
951
            t, brzlib.transport.trace.TransportTraceDecorator)
957
952
 
958
953
    def test_clone_preserves_activity(self):
959
954
        t = transport.get_transport_from_url('trace+memory://')
967
962
    # changes; so there is little return doing that.
968
963
    def test_get(self):
969
964
        t = transport.get_transport_from_url('trace+memory:///')
970
 
        t.put_bytes('foo', b'barish')
 
965
        t.put_bytes('foo', 'barish')
971
966
        t.get('foo')
972
967
        expected_result = []
973
968
        # put_bytes records the bytes, not the content to avoid memory
979
974
 
980
975
    def test_readv(self):
981
976
        t = transport.get_transport_from_url('trace+memory:///')
982
 
        t.put_bytes('foo', b'barish')
 
977
        t.put_bytes('foo', 'barish')
983
978
        list(t.readv('foo', [(0, 1), (3, 2)],
984
979
                     adjust_for_latency=True, upper_limit=6))
985
980
        expected_result = []
1007
1002
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
1008
1003
        # override the interface (doesn't change self._vendor).
1009
1004
        # Note that this does encryption, so can be slow.
1010
 
        from breezy.tests import stub_sftp
 
1005
        from brzlib.tests import stub_sftp
1011
1006
 
1012
1007
        # Start an SSH server
1013
1008
        self.command_executed = []
1025
1020
                self.test.command_executed.append(command)
1026
1021
                proc = subprocess.Popen(
1027
1022
                    command, shell=True, stdin=subprocess.PIPE,
1028
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1029
 
                    bufsize=0)
 
1023
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1030
1024
 
1031
1025
                # XXX: horribly inefficient, not to mention ugly.
1032
1026
                # Start a thread for each of stdin/out/err, and relay bytes
1034
1028
                def ferry_bytes(read, write, close):
1035
1029
                    while True:
1036
1030
                        bytes = read(1)
1037
 
                        if bytes == b'':
 
1031
                        if bytes == '':
1038
1032
                            close()
1039
1033
                            break
1040
1034
                        write(bytes)
1081
1075
        t.mkdir('foo')
1082
1076
 
1083
1077
        self.assertEqual(
1084
 
            [b'%s serve --inet --directory=/ --allow-writes' %
1085
 
                bzr_remote_path.encode()],
 
1078
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1086
1079
            self.command_executed)
1087
1080
        # Make sure to disconnect, so that the remote process can stop, and we
1088
1081
        # can cleanup. Then pause the test until everything is shutdown
1105
1098
        result = http.unhtml_roughly(fake_html)
1106
1099
        self.assertEqual(len(result), 1000)
1107
1100
        self.assertStartsWith(result, " something!")
 
1101
 
 
1102
 
 
1103
class SomeDirectory(object):
 
1104
 
 
1105
    def look_up(self, name, url):
 
1106
        return "http://bar"
 
1107
 
 
1108
 
 
1109
class TestLocationToUrl(tests.TestCase):
 
1110
 
 
1111
    def get_base_location(self):
 
1112
        path = osutils.abspath('/foo/bar')
 
1113
        if path.startswith('/'):
 
1114
            url = 'file://%s' % (path,)
 
1115
        else:
 
1116
            # On Windows, abspaths start with the drive letter, so we have to
 
1117
            # add in the extra '/'
 
1118
            url = 'file:///%s' % (path,)
 
1119
        return path, url
 
1120
 
 
1121
    def test_regular_url(self):
 
1122
        self.assertEqual("file://foo", location_to_url("file://foo"))
 
1123
 
 
1124
    def test_directory(self):
 
1125
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1126
        self.addCleanup(directories.remove, "bar:")
 
1127
        self.assertEqual("http://bar", location_to_url("bar:"))
 
1128
 
 
1129
    def test_unicode_url(self):
 
1130
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1131
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1132
 
 
1133
    def test_unicode_path(self):
 
1134
        path, url = self.get_base_location()
 
1135
        location = path + "\xc3\xaf".decode("utf-8")
 
1136
        url += '%C3%AF'
 
1137
        self.assertEqual(url, location_to_url(location))
 
1138
 
 
1139
    def test_path(self):
 
1140
        path, url = self.get_base_location()
 
1141
        self.assertEqual(url, location_to_url(path))
 
1142
 
 
1143
    def test_relative_file_url(self):
 
1144
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
 
1145
            location_to_url("file:bar"))
 
1146
 
 
1147
    def test_absolute_file_url(self):
 
1148
        self.assertEqual("file:///bar", location_to_url("file:/bar"))