/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2011-08-16 07:33:29 UTC
  • mto: This revision was merged to the branch mainline in revision 6072.
  • Revision ID: v.ladeuil+lp@free.fr-20110816073329-y5w2c2oz7jbo8vxy
Fix tests failing on pqm.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
    transport,
29
29
    urlutils,
30
30
    )
 
31
from bzrlib.directory_service import directories
31
32
from bzrlib.transport import (
32
33
    chroot,
33
34
    fakenfs,
 
35
    http,
34
36
    local,
 
37
    location_to_url,
35
38
    memory,
36
39
    pathfilter,
37
40
    readonly,
38
41
    )
 
42
import bzrlib.transport.trace
39
43
from bzrlib.tests import (
40
44
    features,
41
45
    test_server,
48
52
class TestTransport(tests.TestCase):
49
53
    """Test the non transport-concrete class functionality."""
50
54
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
55
    def test__get_set_protocol_handlers(self):
55
56
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
57
        self.assertNotEqual([], handlers.keys())
 
58
        transport._clear_protocol_handlers()
 
59
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
60
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
61
 
63
62
    def test_get_transport_modules(self):
64
63
        handlers = transport._get_protocol_handlers()
 
64
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
65
        # don't pollute the current handlers
66
66
        transport._clear_protocol_handlers()
 
67
 
67
68
        class SampleHandler(object):
68
69
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
70
        transport._clear_protocol_handlers()
 
71
        transport.register_transport_proto('foo')
 
72
        transport.register_lazy_transport('foo',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        transport.register_transport_proto('bar')
 
76
        transport.register_lazy_transport('bar',
 
77
                                            'bzrlib.tests.test_transport',
 
78
                                            'TestTransport.SampleHandler')
 
79
        self.assertEqual([SampleHandler.__module__,
 
80
                            'bzrlib.transport.chroot',
 
81
                            'bzrlib.transport.pathfilter'],
 
82
                            transport._get_transport_modules())
85
83
 
86
84
    def test_transport_dependency(self):
87
85
        """Transport with missing dependency causes no error"""
88
86
        saved_handlers = transport._get_protocol_handlers()
 
87
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
88
        # don't pollute the current handlers
90
89
        transport._clear_protocol_handlers()
 
90
        transport.register_transport_proto('foo')
 
91
        transport.register_lazy_transport(
 
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
91
93
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
 
95
        except errors.UnsupportedProtocol, e:
 
96
            e_str = str(e)
 
97
            self.assertEquals('Unsupported protocol'
 
98
                                ' for url "foo://fooserver/foo":'
 
99
                                ' Unable to import library "some_lib":'
 
100
                                ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
108
103
 
109
104
    def test_transport_fallback(self):
110
105
        """Transport with missing dependency causes no error"""
111
106
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
116
 
124
117
    def test_ssh_hints(self):
125
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
119
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
128
121
        except errors.UnsupportedProtocol, e:
129
122
            e_str = str(e)
130
123
            self.assertEquals('Unsupported protocol'
218
211
 
219
212
    def test_coalesce_fudge(self):
220
213
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
214
                    (100, 10, [(0, 10)]),
222
215
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
216
                   fudge=10)
 
217
 
225
218
    def test_coalesce_max_size(self):
226
219
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
220
                    (30, 50, [(0, 50)]),
228
221
                    # If one range is above max_size, it gets its own coalesced
229
222
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
223
                    (100, 80, [(0, 80)]),],
231
224
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
225
                   max_size=50)
234
226
 
235
227
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
228
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
229
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
230
                  )
239
231
 
240
232
    def test_coalesce_default_limit(self):
241
233
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
234
        ten_mb = 10 * 1024 * 1024
 
235
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
236
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
237
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
238
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
239
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
240
                   max_size=1*1024*1024*1024)
249
241
 
250
242
 
255
247
        server.start_server()
256
248
        url = server.get_url()
257
249
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
250
        t = transport.get_transport_from_url(url)
259
251
        del t
260
252
        server.stop_server()
261
253
        self.assertFalse(url in transport.transport_list_registry)
376
368
    def test_abspath(self):
377
369
        # The abspath is always relative to the chroot_url.
378
370
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
380
372
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
382
374
        self.assertEqual(server.get_url(), t.abspath('/'))
383
375
 
384
376
        subdir_t = t.clone('subdir')
386
378
 
387
379
    def test_clone(self):
388
380
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
381
            transport.get_transport_from_url('memory:///foo/bar/'))
390
382
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
383
        t = transport.get_transport_from_url(server.get_url())
392
384
        # relpath from root and root path are the same
393
385
        relpath_cloned = t.clone('foo')
394
386
        abspath_cloned = t.clone('/foo')
403
395
        This is so that it is not possible to escape a chroot by doing::
404
396
            url = chroot_transport.base
405
397
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
398
            new_t = transport.get_transport_from_url(parent_url)
407
399
        """
408
400
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
401
            transport.get_transport_from_url('memory:///path/subpath'))
410
402
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
403
        t = transport.get_transport_from_url(server.get_url())
 
404
        new_t = transport.get_transport_from_url(t.base)
413
405
        self.assertEqual(t.server, new_t.server)
414
406
        self.assertEqual(t.base, new_t.base)
415
407
 
420
412
        This is so that it is not possible to escape a chroot by doing::
421
413
            url = chroot_transport.base
422
414
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
415
            new_t = transport.get_transport_from_url(parent_url)
424
416
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
417
        server = chroot.ChrootServer(
 
418
            transport.get_transport_from_url('memory:///path/'))
426
419
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
420
        t = transport.get_transport_from_url(server.get_url())
428
421
        self.assertRaises(
429
422
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
423
 
440
433
        backing_transport = memory.MemoryTransport()
441
434
        server = chroot.ChrootServer(backing_transport)
442
435
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
436
        self.addCleanup(server.stop_server)
 
437
        self.assertTrue(server.scheme
 
438
                        in transport._get_protocol_handlers().keys())
448
439
 
449
440
    def test_stop_server(self):
450
441
        backing_transport = memory.MemoryTransport()
458
449
        backing_transport = memory.MemoryTransport()
459
450
        server = chroot.ChrootServer(backing_transport)
460
451
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
452
        self.addCleanup(server.stop_server)
 
453
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
465
454
 
466
455
 
467
456
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
459
    def test_abspath(self):
471
460
        # The abspath is always relative to the base of the backing transport.
472
461
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
462
            transport.get_transport_from_url('memory:///foo/bar/'),
474
463
            lambda x: x)
475
464
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
465
        t = transport.get_transport_from_url(server.get_url())
477
466
        self.assertEqual(server.get_url(), t.abspath('/'))
478
467
 
479
468
        subdir_t = t.clone('subdir')
482
471
 
483
472
    def make_pf_transport(self, filter_func=None):
484
473
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
474
 
486
475
        :param filter_func: by default this will be a no-op function.  Use this
487
476
            parameter to override it."""
488
477
        if filter_func is None:
489
478
            filter_func = lambda x: x
490
479
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
480
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
492
481
        server.start_server()
493
482
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
483
        return transport.get_transport_from_url(server.get_url())
495
484
 
496
485
    def test__filter(self):
497
486
        # _filter (with an identity func as filter_func) always returns
510
499
 
511
500
    def test_filter_invocation(self):
512
501
        filter_log = []
 
502
 
513
503
        def filter(path):
514
504
            filter_log.append(path)
515
505
            return path
540
530
        otherwise) the filtering by doing::
541
531
            url = filtered_transport.base
542
532
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
533
            new_t = transport.get_transport_from_url(parent_url)
544
534
        """
545
535
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
536
        new_t = transport.get_transport_from_url(t.base)
547
537
        self.assertEqual(t.server, new_t.server)
548
538
        self.assertEqual(t.base, new_t.base)
549
539
 
562
552
        # connect to '.' via http which is not listable
563
553
        server = HttpServer()
564
554
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
555
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
556
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
557
        self.assertEqual(False, t.listable())
568
558
        self.assertEqual(True, t.is_readonly())
569
559
 
601
591
        # the url should be decorated appropriately
602
592
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
593
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
594
        t = transport.get_transport_from_url(server.get_url())
605
595
        # which must be a FakeNFSTransportDecorator instance.
606
596
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
597
 
684
674
        base_url = self._server.get_url()
685
675
        url = self._adjust_url(base_url, relpath)
686
676
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
677
        t = transport.get_transport_from_url(url)
688
678
        # vila--20070607 if the following are commented out the test suite
689
679
        # still pass. Is this really still needed or was it a forgotten
690
680
        # temporary fix ?
695
685
        return t
696
686
 
697
687
 
 
688
class TestTransportFromPath(tests.TestCaseInTempDir):
 
689
 
 
690
    def test_with_path(self):
 
691
        t = transport.get_transport_from_path(self.test_dir)
 
692
        self.assertIsInstance(t, local.LocalTransport)
 
693
        self.assertEquals(t.base.rstrip("/"),
 
694
            urlutils.local_path_to_url(self.test_dir))
 
695
 
 
696
    def test_with_url(self):
 
697
        t = transport.get_transport_from_path("file:")
 
698
        self.assertIsInstance(t, local.LocalTransport)
 
699
        self.assertEquals(t.base.rstrip("/"),
 
700
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
701
 
 
702
 
 
703
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
704
 
 
705
    def test_with_path(self):
 
706
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
707
            self.test_dir)
 
708
 
 
709
    def test_with_url(self):
 
710
        url = urlutils.local_path_to_url(self.test_dir)
 
711
        t = transport.get_transport_from_url(url)
 
712
        self.assertIsInstance(t, local.LocalTransport)
 
713
        self.assertEquals(t.base.rstrip("/"), url)
 
714
 
 
715
 
698
716
class TestLocalTransports(tests.TestCase):
699
717
 
700
718
    def test_get_transport_from_abspath(self):
722
740
        self.assertEquals(t.local_abspath(''), here)
723
741
 
724
742
 
 
743
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
744
 
 
745
    def test_local_fdatasync_calls_fdatasync(self):
 
746
        """Check fdatasync on a stream tries to flush the data to the OS.
 
747
        
 
748
        We can't easily observe the external effect but we can at least see
 
749
        it's called.
 
750
        """
 
751
        sentinel = object()
 
752
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
753
        if fdatasync is sentinel:
 
754
            raise tests.TestNotApplicable('fdatasync not supported')
 
755
        t = self.get_transport('.')
 
756
        calls = self.recordCalls(os, 'fdatasync')
 
757
        w = t.open_write_stream('out')
 
758
        w.write('foo')
 
759
        w.fdatasync()
 
760
        with open('out', 'rb') as f:
 
761
            # Should have been flushed.
 
762
            self.assertEquals(f.read(), 'foo')
 
763
        self.assertEquals(len(calls), 1, calls)
 
764
 
 
765
 
725
766
class TestWin32LocalTransport(tests.TestCase):
726
767
 
727
768
    def test_unc_clone_to_root(self):
743
784
    def test_parse_url(self):
744
785
        t = transport.ConnectedTransport(
745
786
            'http://simple.example.com/home/source')
746
 
        self.assertEquals(t._host, 'simple.example.com')
747
 
        self.assertEquals(t._port, None)
748
 
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
787
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
788
        self.assertEquals(t._parsed_url.port, None)
 
789
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
790
        self.assertTrue(t._parsed_url.user is None)
 
791
        self.assertTrue(t._parsed_url.password is None)
751
792
 
752
793
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
753
794
 
754
795
    def test_parse_url_with_at_in_user(self):
755
796
        # Bug 228058
756
797
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
757
 
        self.assertEquals(t._user, 'user@host.com')
 
798
        self.assertEquals(t._parsed_url.user, 'user@host.com')
758
799
 
759
800
    def test_parse_quoted_url(self):
760
801
        t = transport.ConnectedTransport(
761
802
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
 
        self.assertEquals(t._host, 'exAmple.com')
763
 
        self.assertEquals(t._port, 2222)
764
 
        self.assertEquals(t._user, 'robey')
765
 
        self.assertEquals(t._password, 'h@t')
766
 
        self.assertEquals(t._path, '/path/')
 
803
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
804
        self.assertEquals(t._parsed_url.port, 2222)
 
805
        self.assertEquals(t._parsed_url.user, 'robey')
 
806
        self.assertEquals(t._parsed_url.password, 'h@t')
 
807
        self.assertEquals(t._parsed_url.path, '/path/')
767
808
 
768
809
        # Base should not keep track of the password
769
810
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
776
817
    def test_relpath(self):
777
818
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
819
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
820
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
821
            'sub')
780
822
        self.assertRaises(errors.PathNotChild, t.relpath,
781
823
                          'http://user@host.com/abs/path/sub')
782
824
        self.assertRaises(errors.PathNotChild, t.relpath,
795
837
 
796
838
    def test_connection_sharing_propagate_credentials(self):
797
839
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
798
 
        self.assertEquals('user', t._user)
799
 
        self.assertEquals('host.com', t._host)
 
840
        self.assertEquals('user', t._parsed_url.user)
 
841
        self.assertEquals('host.com', t._parsed_url.host)
800
842
        self.assertIs(None, t._get_connection())
801
 
        self.assertIs(None, t._password)
 
843
        self.assertIs(None, t._parsed_url.password)
802
844
        c = t.clone('subdir')
803
845
        self.assertIs(None, c._get_connection())
804
 
        self.assertIs(None, t._password)
 
846
        self.assertIs(None, t._parsed_url.password)
805
847
 
806
848
        # Simulate the user entering a password
807
849
        password = 'secret'
826
868
 
827
869
    def test_reuse_same_transport(self):
828
870
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
 
871
        t1 = transport.get_transport_from_url('http://foo/',
830
872
                                     possible_transports=possible_transports)
831
873
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
 
874
        t2 = transport.get_transport_from_url('http://foo/',
833
875
                                     possible_transports=[t1])
834
876
        self.assertIs(t1, t2)
835
877
 
836
878
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
 
879
        t3 = transport.get_transport_from_url('http://foo/path/')
 
880
        t4 = transport.get_transport_from_url('http://foo/path',
839
881
                                     possible_transports=[t3])
840
882
        self.assertIs(t3, t4)
841
883
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
 
884
        t5 = transport.get_transport_from_url('http://foo/path')
 
885
        t6 = transport.get_transport_from_url('http://foo/path/',
844
886
                                     possible_transports=[t5])
845
887
        self.assertIs(t5, t6)
846
888
 
847
889
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
 
890
        t1 = transport.get_transport_from_url('http://foo/path')
 
891
        t2 = transport.get_transport_from_url('http://bar/path',
850
892
                                     possible_transports=[t1])
851
893
        self.assertIsNot(t1, t2)
852
894
 
853
895
 
854
896
class TestTransportTrace(tests.TestCase):
855
897
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
898
    def test_decorator(self):
 
899
        t = transport.get_transport_from_url('trace+memory://')
 
900
        self.assertIsInstance(
 
901
            t, bzrlib.transport.trace.TransportTraceDecorator)
859
902
 
860
903
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
904
        t = transport.get_transport_from_url('trace+memory://')
862
905
        t2 = t.clone('.')
863
906
        self.assertTrue(t is not t2)
864
907
        self.assertTrue(t._activity is t2._activity)
868
911
    # still won't cause a test failure when the top level Transport API
869
912
    # changes; so there is little return doing that.
870
913
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
 
914
        t = transport.get_transport_from_url('trace+memory:///')
872
915
        t.put_bytes('foo', 'barish')
873
916
        t.get('foo')
874
917
        expected_result = []
880
923
        self.assertEqual(expected_result, t._activity)
881
924
 
882
925
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
 
926
        t = transport.get_transport_from_url('trace+memory:///')
884
927
        t.put_bytes('foo', 'barish')
885
928
        list(t.readv('foo', [(0, 1), (3, 2)],
886
929
                     adjust_for_latency=True, upper_limit=6))
896
939
class TestSSHConnections(tests.TestCaseWithTransport):
897
940
 
898
941
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
942
        """get_transport of a bzr+ssh:// behaves correctly.
900
943
 
901
944
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
945
        """
918
961
        # SSH channel ourselves.  Surely this has already been implemented
919
962
        # elsewhere?
920
963
        started = []
 
964
 
921
965
        class StubSSHServer(stub_sftp.StubServer):
922
966
 
923
967
            test = self
929
973
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
930
974
 
931
975
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
976
                # Start a thread for each of stdin/out/err, and relay bytes
 
977
                # from the subprocess to channel and vice versa.
934
978
                def ferry_bytes(read, write, close):
935
979
                    while True:
936
980
                        bytes = read(1)
955
999
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
1000
        # We *don't* want to override the default SSH vendor: the detected one
957
1001
        # is the one to use.
 
1002
 
 
1003
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1004
        # inherits from SFTPServer which forces the SSH vendor to
 
1005
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
1006
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
1007
        port = ssh_server.port
960
1008
 
961
1009
        if sys.platform == 'win32':
962
1010
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
963
1011
        else:
964
1012
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
1013
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
1014
 
967
1015
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
1016
        # variable is used to tell bzr what command to run on the remote end.
989
1037
        # And the rest are threads
990
1038
        for t in started[1:]:
991
1039
            t.join()
 
1040
 
 
1041
 
 
1042
class TestUnhtml(tests.TestCase):
 
1043
 
 
1044
    """Tests for unhtml_roughly"""
 
1045
 
 
1046
    def test_truncation(self):
 
1047
        fake_html = "<p>something!\n" * 1000
 
1048
        result = http.unhtml_roughly(fake_html)
 
1049
        self.assertEquals(len(result), 1000)
 
1050
        self.assertStartsWith(result, " something!")
 
1051
 
 
1052
 
 
1053
class SomeDirectory(object):
 
1054
 
 
1055
    def look_up(self, name, url):
 
1056
        return "http://bar"
 
1057
 
 
1058
 
 
1059
class TestLocationToUrl(tests.TestCase):
 
1060
 
 
1061
    def get_base_location(self):
 
1062
        path = osutils.abspath('/foo/bar')
 
1063
        if path.startswith('/'):
 
1064
            url = 'file://%s' % (path,)
 
1065
        else:
 
1066
            # On Windows, abspaths start with the drive letter, so we have to
 
1067
            # add in the extra '/'
 
1068
            url = 'file:///%s' % (path,)
 
1069
        return path, url
 
1070
 
 
1071
    def test_regular_url(self):
 
1072
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1073
 
 
1074
    def test_directory(self):
 
1075
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1076
        self.addCleanup(directories.remove, "bar:")
 
1077
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1078
 
 
1079
    def test_unicode_url(self):
 
1080
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1081
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1082
 
 
1083
    def test_unicode_path(self):
 
1084
        path, url = self.get_base_location()
 
1085
        location = path + "\xc3\xaf".decode("utf-8")
 
1086
        url += '%C3%AF'
 
1087
        self.assertEquals(url, location_to_url(location))
 
1088
 
 
1089
    def test_path(self):
 
1090
        path, url = self.get_base_location()
 
1091
        self.assertEquals(url, location_to_url(path))
 
1092
 
 
1093
    def test_relative_file_url(self):
 
1094
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1095
            location_to_url("file:bar"))
 
1096
 
 
1097
    def test_absolute_file_url(self):
 
1098
        self.assertEquals("file:///bar", location_to_url("file:/bar"))