/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Report commits signed with expired keys in "verify-signatures".

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
    transport,
29
29
    urlutils,
30
30
    )
 
31
from bzrlib.directory_service import directories
31
32
from bzrlib.transport import (
32
33
    chroot,
33
34
    fakenfs,
 
35
    http,
34
36
    local,
 
37
    location_to_url,
35
38
    memory,
36
39
    pathfilter,
37
40
    readonly,
38
41
    )
 
42
import bzrlib.transport.trace
39
43
from bzrlib.tests import (
40
44
    features,
41
45
    test_server,
48
52
class TestTransport(tests.TestCase):
49
53
    """Test the non transport-concrete class functionality."""
50
54
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
55
    def test__get_set_protocol_handlers(self):
55
56
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
57
        self.assertNotEqual([], handlers.keys())
 
58
        transport._clear_protocol_handlers()
 
59
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
60
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
61
 
63
62
    def test_get_transport_modules(self):
64
63
        handlers = transport._get_protocol_handlers()
 
64
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
65
        # don't pollute the current handlers
66
66
        transport._clear_protocol_handlers()
 
67
 
67
68
        class SampleHandler(object):
68
69
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
70
        transport._clear_protocol_handlers()
 
71
        transport.register_transport_proto('foo')
 
72
        transport.register_lazy_transport('foo',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        transport.register_transport_proto('bar')
 
76
        transport.register_lazy_transport('bar',
 
77
                                            'bzrlib.tests.test_transport',
 
78
                                            'TestTransport.SampleHandler')
 
79
        self.assertEqual([SampleHandler.__module__,
 
80
                            'bzrlib.transport.chroot',
 
81
                            'bzrlib.transport.pathfilter'],
 
82
                            transport._get_transport_modules())
85
83
 
86
84
    def test_transport_dependency(self):
87
85
        """Transport with missing dependency causes no error"""
88
86
        saved_handlers = transport._get_protocol_handlers()
 
87
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
88
        # don't pollute the current handlers
90
89
        transport._clear_protocol_handlers()
 
90
        transport.register_transport_proto('foo')
 
91
        transport.register_lazy_transport(
 
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
91
93
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
 
95
        except errors.UnsupportedProtocol, e:
 
96
            e_str = str(e)
 
97
            self.assertEquals('Unsupported protocol'
 
98
                                ' for url "foo://fooserver/foo":'
 
99
                                ' Unable to import library "some_lib":'
 
100
                                ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
108
103
 
109
104
    def test_transport_fallback(self):
110
105
        """Transport with missing dependency causes no error"""
111
106
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
116
 
124
117
    def test_ssh_hints(self):
125
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
119
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
128
121
        except errors.UnsupportedProtocol, e:
129
122
            e_str = str(e)
130
123
            self.assertEquals('Unsupported protocol'
218
211
 
219
212
    def test_coalesce_fudge(self):
220
213
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
214
                    (100, 10, [(0, 10)]),
222
215
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
216
                   fudge=10)
 
217
 
225
218
    def test_coalesce_max_size(self):
226
219
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
220
                    (30, 50, [(0, 50)]),
228
221
                    # If one range is above max_size, it gets its own coalesced
229
222
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
223
                    (100, 80, [(0, 80)]),],
231
224
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
225
                   max_size=50)
234
226
 
235
227
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
228
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
229
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
230
                  )
239
231
 
240
232
    def test_coalesce_default_limit(self):
241
233
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
234
        ten_mb = 10 * 1024 * 1024
 
235
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
236
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
237
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
238
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
239
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
240
                   max_size=1*1024*1024*1024)
249
241
 
250
242
 
255
247
        server.start_server()
256
248
        url = server.get_url()
257
249
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
250
        t = transport.get_transport_from_url(url)
259
251
        del t
260
252
        server.stop_server()
261
253
        self.assertFalse(url in transport.transport_list_registry)
376
368
    def test_abspath(self):
377
369
        # The abspath is always relative to the chroot_url.
378
370
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
380
372
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
373
        t = transport.get_transport_from_url(server.get_url())
382
374
        self.assertEqual(server.get_url(), t.abspath('/'))
383
375
 
384
376
        subdir_t = t.clone('subdir')
386
378
 
387
379
    def test_clone(self):
388
380
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
381
            transport.get_transport_from_url('memory:///foo/bar/'))
390
382
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
383
        t = transport.get_transport_from_url(server.get_url())
392
384
        # relpath from root and root path are the same
393
385
        relpath_cloned = t.clone('foo')
394
386
        abspath_cloned = t.clone('/foo')
403
395
        This is so that it is not possible to escape a chroot by doing::
404
396
            url = chroot_transport.base
405
397
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
398
            new_t = transport.get_transport_from_url(parent_url)
407
399
        """
408
400
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
401
            transport.get_transport_from_url('memory:///path/subpath'))
410
402
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
403
        t = transport.get_transport_from_url(server.get_url())
 
404
        new_t = transport.get_transport_from_url(t.base)
413
405
        self.assertEqual(t.server, new_t.server)
414
406
        self.assertEqual(t.base, new_t.base)
415
407
 
420
412
        This is so that it is not possible to escape a chroot by doing::
421
413
            url = chroot_transport.base
422
414
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
415
            new_t = transport.get_transport_from_url(parent_url)
424
416
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
417
        server = chroot.ChrootServer(
 
418
            transport.get_transport_from_url('memory:///path/'))
426
419
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
420
        t = transport.get_transport_from_url(server.get_url())
428
421
        self.assertRaises(
429
422
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
423
 
440
433
        backing_transport = memory.MemoryTransport()
441
434
        server = chroot.ChrootServer(backing_transport)
442
435
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
436
        self.addCleanup(server.stop_server)
 
437
        self.assertTrue(server.scheme
 
438
                        in transport._get_protocol_handlers().keys())
448
439
 
449
440
    def test_stop_server(self):
450
441
        backing_transport = memory.MemoryTransport()
458
449
        backing_transport = memory.MemoryTransport()
459
450
        server = chroot.ChrootServer(backing_transport)
460
451
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
452
        self.addCleanup(server.stop_server)
 
453
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
465
454
 
466
455
 
467
456
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
459
    def test_abspath(self):
471
460
        # The abspath is always relative to the base of the backing transport.
472
461
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
462
            transport.get_transport_from_url('memory:///foo/bar/'),
474
463
            lambda x: x)
475
464
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
465
        t = transport.get_transport_from_url(server.get_url())
477
466
        self.assertEqual(server.get_url(), t.abspath('/'))
478
467
 
479
468
        subdir_t = t.clone('subdir')
482
471
 
483
472
    def make_pf_transport(self, filter_func=None):
484
473
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
474
 
486
475
        :param filter_func: by default this will be a no-op function.  Use this
487
476
            parameter to override it."""
488
477
        if filter_func is None:
489
478
            filter_func = lambda x: x
490
479
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
480
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
492
481
        server.start_server()
493
482
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
483
        return transport.get_transport_from_url(server.get_url())
495
484
 
496
485
    def test__filter(self):
497
486
        # _filter (with an identity func as filter_func) always returns
510
499
 
511
500
    def test_filter_invocation(self):
512
501
        filter_log = []
 
502
 
513
503
        def filter(path):
514
504
            filter_log.append(path)
515
505
            return path
540
530
        otherwise) the filtering by doing::
541
531
            url = filtered_transport.base
542
532
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
533
            new_t = transport.get_transport_from_url(parent_url)
544
534
        """
545
535
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
536
        new_t = transport.get_transport_from_url(t.base)
547
537
        self.assertEqual(t.server, new_t.server)
548
538
        self.assertEqual(t.base, new_t.base)
549
539
 
562
552
        # connect to '.' via http which is not listable
563
553
        server = HttpServer()
564
554
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
555
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
556
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
557
        self.assertEqual(False, t.listable())
568
558
        self.assertEqual(True, t.is_readonly())
569
559
 
601
591
        # the url should be decorated appropriately
602
592
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
593
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
594
        t = transport.get_transport_from_url(server.get_url())
605
595
        # which must be a FakeNFSTransportDecorator instance.
606
596
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
597
 
684
674
        base_url = self._server.get_url()
685
675
        url = self._adjust_url(base_url, relpath)
686
676
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
677
        t = transport.get_transport_from_url(url)
688
678
        # vila--20070607 if the following are commented out the test suite
689
679
        # still pass. Is this really still needed or was it a forgotten
690
680
        # temporary fix ?
695
685
        return t
696
686
 
697
687
 
 
688
class TestTransportFromPath(tests.TestCaseInTempDir):
 
689
 
 
690
    def test_with_path(self):
 
691
        t = transport.get_transport_from_path(self.test_dir)
 
692
        self.assertIsInstance(t, local.LocalTransport)
 
693
        self.assertEquals(t.base.rstrip("/"),
 
694
            urlutils.local_path_to_url(self.test_dir))
 
695
 
 
696
    def test_with_url(self):
 
697
        t = transport.get_transport_from_path("file:")
 
698
        self.assertIsInstance(t, local.LocalTransport)
 
699
        self.assertEquals(t.base.rstrip("/"),
 
700
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
701
 
 
702
 
 
703
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
704
 
 
705
    def test_with_path(self):
 
706
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
707
            self.test_dir)
 
708
 
 
709
    def test_with_url(self):
 
710
        url = urlutils.local_path_to_url(self.test_dir)
 
711
        t = transport.get_transport_from_url(url)
 
712
        self.assertIsInstance(t, local.LocalTransport)
 
713
        self.assertEquals(t.base.rstrip("/"), url)
 
714
 
 
715
 
698
716
class TestLocalTransports(tests.TestCase):
699
717
 
700
718
    def test_get_transport_from_abspath(self):
722
740
        self.assertEquals(t.local_abspath(''), here)
723
741
 
724
742
 
 
743
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
744
 
 
745
    def test_local_fdatasync_calls_fdatasync(self):
 
746
        """Check fdatasync on a stream tries to flush the data to the OS.
 
747
        
 
748
        We can't easily observe the external effect but we can at least see
 
749
        it's called.
 
750
        """
 
751
        t = self.get_transport('.')
 
752
        calls = self.recordCalls(os, 'fdatasync')
 
753
        w = t.open_write_stream('out')
 
754
        w.write('foo')
 
755
        w.fdatasync()
 
756
        with open('out', 'rb') as f:
 
757
            # Should have been flushed.
 
758
            self.assertEquals(f.read(), 'foo')
 
759
        self.assertEquals(len(calls), 1, calls)
 
760
 
 
761
 
725
762
class TestWin32LocalTransport(tests.TestCase):
726
763
 
727
764
    def test_unc_clone_to_root(self):
746
783
        self.assertEquals(t._host, 'simple.example.com')
747
784
        self.assertEquals(t._port, None)
748
785
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
786
        self.assertTrue(t._user is None)
 
787
        self.assertTrue(t._password is None)
751
788
 
752
789
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
753
790
 
776
813
    def test_relpath(self):
777
814
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
815
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
816
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
817
            'sub')
780
818
        self.assertRaises(errors.PathNotChild, t.relpath,
781
819
                          'http://user@host.com/abs/path/sub')
782
820
        self.assertRaises(errors.PathNotChild, t.relpath,
826
864
 
827
865
    def test_reuse_same_transport(self):
828
866
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
 
867
        t1 = transport.get_transport_from_url('http://foo/',
830
868
                                     possible_transports=possible_transports)
831
869
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
 
870
        t2 = transport.get_transport_from_url('http://foo/',
833
871
                                     possible_transports=[t1])
834
872
        self.assertIs(t1, t2)
835
873
 
836
874
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
 
875
        t3 = transport.get_transport_from_url('http://foo/path/')
 
876
        t4 = transport.get_transport_from_url('http://foo/path',
839
877
                                     possible_transports=[t3])
840
878
        self.assertIs(t3, t4)
841
879
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
 
880
        t5 = transport.get_transport_from_url('http://foo/path')
 
881
        t6 = transport.get_transport_from_url('http://foo/path/',
844
882
                                     possible_transports=[t5])
845
883
        self.assertIs(t5, t6)
846
884
 
847
885
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
 
886
        t1 = transport.get_transport_from_url('http://foo/path')
 
887
        t2 = transport.get_transport_from_url('http://bar/path',
850
888
                                     possible_transports=[t1])
851
889
        self.assertIsNot(t1, t2)
852
890
 
853
891
 
854
892
class TestTransportTrace(tests.TestCase):
855
893
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
894
    def test_decorator(self):
 
895
        t = transport.get_transport_from_url('trace+memory://')
 
896
        self.assertIsInstance(
 
897
            t, bzrlib.transport.trace.TransportTraceDecorator)
859
898
 
860
899
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
900
        t = transport.get_transport_from_url('trace+memory://')
862
901
        t2 = t.clone('.')
863
902
        self.assertTrue(t is not t2)
864
903
        self.assertTrue(t._activity is t2._activity)
868
907
    # still won't cause a test failure when the top level Transport API
869
908
    # changes; so there is little return doing that.
870
909
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
 
910
        t = transport.get_transport_from_url('trace+memory:///')
872
911
        t.put_bytes('foo', 'barish')
873
912
        t.get('foo')
874
913
        expected_result = []
880
919
        self.assertEqual(expected_result, t._activity)
881
920
 
882
921
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
 
922
        t = transport.get_transport_from_url('trace+memory:///')
884
923
        t.put_bytes('foo', 'barish')
885
924
        list(t.readv('foo', [(0, 1), (3, 2)],
886
925
                     adjust_for_latency=True, upper_limit=6))
896
935
class TestSSHConnections(tests.TestCaseWithTransport):
897
936
 
898
937
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
938
        """get_transport of a bzr+ssh:// behaves correctly.
900
939
 
901
940
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
941
        """
918
957
        # SSH channel ourselves.  Surely this has already been implemented
919
958
        # elsewhere?
920
959
        started = []
 
960
 
921
961
        class StubSSHServer(stub_sftp.StubServer):
922
962
 
923
963
            test = self
929
969
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
930
970
 
931
971
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
972
                # Start a thread for each of stdin/out/err, and relay bytes
 
973
                # from the subprocess to channel and vice versa.
934
974
                def ferry_bytes(read, write, close):
935
975
                    while True:
936
976
                        bytes = read(1)
955
995
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
996
        # We *don't* want to override the default SSH vendor: the detected one
957
997
        # is the one to use.
 
998
 
 
999
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1000
        # inherits from SFTPServer which forces the SSH vendor to
 
1001
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
1002
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
1003
        port = ssh_server.port
960
1004
 
961
1005
        if sys.platform == 'win32':
962
1006
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
963
1007
        else:
964
1008
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
1009
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
1010
 
967
1011
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
1012
        # variable is used to tell bzr what command to run on the remote end.
989
1033
        # And the rest are threads
990
1034
        for t in started[1:]:
991
1035
            t.join()
 
1036
 
 
1037
 
 
1038
class TestUnhtml(tests.TestCase):
 
1039
 
 
1040
    """Tests for unhtml_roughly"""
 
1041
 
 
1042
    def test_truncation(self):
 
1043
        fake_html = "<p>something!\n" * 1000
 
1044
        result = http.unhtml_roughly(fake_html)
 
1045
        self.assertEquals(len(result), 1000)
 
1046
        self.assertStartsWith(result, " something!")
 
1047
 
 
1048
 
 
1049
class SomeDirectory(object):
 
1050
 
 
1051
    def look_up(self, name, url):
 
1052
        return "http://bar"
 
1053
 
 
1054
 
 
1055
class TestLocationToUrl(tests.TestCase):
 
1056
 
 
1057
    def test_regular_url(self):
 
1058
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1059
 
 
1060
    def test_directory(self):
 
1061
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1062
        self.addCleanup(directories.remove, "bar:")
 
1063
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1064
 
 
1065
    def test_unicode_url(self):
 
1066
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1067
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1068
 
 
1069
    def test_unicode_path(self):
 
1070
        self.assertEquals("file:///foo/bar%C3%AF",
 
1071
            location_to_url("/foo/bar\xc3\xaf".decode("utf-8")))
 
1072
 
 
1073
    def test_path(self):
 
1074
        self.assertEquals("file:///foo/bar", location_to_url("/foo/bar"))
 
1075
 
 
1076
    def test_relative_file_url(self):
 
1077
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1078
            location_to_url("file:bar"))
 
1079
 
 
1080
    def test_absolute_file_url(self):
 
1081
        self.assertEquals("file:///bar", location_to_url("file:/bar"))