/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-12-21 06:03:07 UTC
  • mfrom: (4665.7.3 serve-init)
  • Revision ID: pqm@pqm.ubuntu.com-20091221060307-uvja3vdy1o6dzzy0
(mbp) example debian init script

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
19
23
 
20
24
import bzrlib
21
25
from bzrlib import (
22
26
    errors,
23
27
    osutils,
 
28
    tests,
24
29
    urlutils,
25
30
    )
26
31
from bzrlib.errors import (DependencyNotPresent,
31
36
                           ReadError,
32
37
                           UnsupportedProtocol,
33
38
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
 
39
from bzrlib.tests import ParamikoFeature, TestCase, TestCaseInTempDir
35
40
from bzrlib.transport import (_clear_protocol_handlers,
36
41
                              _CoalescedOffset,
37
42
                              ConnectedTransport,
48
53
from bzrlib.transport.memory import MemoryTransport
49
54
from bzrlib.transport.local import (LocalTransport,
50
55
                                    EmulatedWin32LocalTransport)
 
56
from bzrlib.transport.pathfilter import PathFilteringServer
51
57
 
52
58
 
53
59
# TODO: Should possibly split transport-specific tests into their own files.
80
86
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
87
                                    'TestTransport.SampleHandler')
82
88
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
 
89
                              'bzrlib.transport.chroot',
 
90
                              'bzrlib.transport.pathfilter'],
84
91
                             _get_transport_modules())
85
92
        finally:
86
93
            _set_protocol_handlers(handlers)
446
453
            server.tearDown()
447
454
 
448
455
 
 
456
class PathFilteringDecoratorTransportTest(TestCase):
 
457
    """Pathfilter decoration specific tests."""
 
458
 
 
459
    def test_abspath(self):
 
460
        # The abspath is always relative to the base of the backing transport.
 
461
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
462
            lambda x: x)
 
463
        server.setUp()
 
464
        transport = get_transport(server.get_url())
 
465
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
466
 
 
467
        subdir_transport = transport.clone('subdir')
 
468
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
469
        server.tearDown()
 
470
 
 
471
    def make_pf_transport(self, filter_func=None):
 
472
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
473
        
 
474
        :param filter_func: by default this will be a no-op function.  Use this
 
475
            parameter to override it."""
 
476
        if filter_func is None:
 
477
            filter_func = lambda x: x
 
478
        server = PathFilteringServer(
 
479
            get_transport('memory:///foo/bar/'), filter_func)
 
480
        server.setUp()
 
481
        self.addCleanup(server.tearDown)
 
482
        return get_transport(server.get_url())
 
483
 
 
484
    def test__filter(self):
 
485
        # _filter (with an identity func as filter_func) always returns
 
486
        # paths relative to the base of the backing transport.
 
487
        transport = self.make_pf_transport()
 
488
        self.assertEqual('foo', transport._filter('foo'))
 
489
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
490
        self.assertEqual('', transport._filter('..'))
 
491
        self.assertEqual('', transport._filter('/'))
 
492
        # The base of the pathfiltering transport is taken into account too.
 
493
        transport = transport.clone('subdir1/subdir2')
 
494
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
495
        self.assertEqual(
 
496
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
497
        self.assertEqual('subdir1', transport._filter('..'))
 
498
        self.assertEqual('', transport._filter('/'))
 
499
 
 
500
    def test_filter_invocation(self):
 
501
        filter_log = []
 
502
        def filter(path):
 
503
            filter_log.append(path)
 
504
            return path
 
505
        transport = self.make_pf_transport(filter)
 
506
        transport.has('abc')
 
507
        self.assertEqual(['abc'], filter_log)
 
508
        del filter_log[:]
 
509
        transport.clone('abc').has('xyz')
 
510
        self.assertEqual(['abc/xyz'], filter_log)
 
511
        del filter_log[:]
 
512
        transport.has('/abc')
 
513
        self.assertEqual(['abc'], filter_log)
 
514
 
 
515
    def test_clone(self):
 
516
        transport = self.make_pf_transport()
 
517
        # relpath from root and root path are the same
 
518
        relpath_cloned = transport.clone('foo')
 
519
        abspath_cloned = transport.clone('/foo')
 
520
        self.assertEqual(transport.server, relpath_cloned.server)
 
521
        self.assertEqual(transport.server, abspath_cloned.server)
 
522
 
 
523
    def test_url_preserves_pathfiltering(self):
 
524
        """Calling get_transport on a pathfiltered transport's base should
 
525
        produce a transport with exactly the same behaviour as the original
 
526
        pathfiltered transport.
 
527
 
 
528
        This is so that it is not possible to escape (accidentally or
 
529
        otherwise) the filtering by doing::
 
530
            url = filtered_transport.base
 
531
            parent_url = urlutils.join(url, '..')
 
532
            new_transport = get_transport(parent_url)
 
533
        """
 
534
        transport = self.make_pf_transport()
 
535
        new_transport = get_transport(transport.base)
 
536
        self.assertEqual(transport.server, new_transport.server)
 
537
        self.assertEqual(transport.base, new_transport.base)
 
538
 
 
539
 
449
540
class ReadonlyDecoratorTransportTest(TestCase):
450
541
    """Readonly decoration specific tests."""
451
542
 
790
881
        # readv records the supplied offset request
791
882
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
792
883
        self.assertEqual(expected_result, transport._activity)
 
884
 
 
885
 
 
886
class TestSSHConnections(tests.TestCaseWithTransport):
 
887
 
 
888
    def test_bzr_connect_to_bzr_ssh(self):
 
889
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
890
 
 
891
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
892
        """
 
893
        # This test actually causes a bzr instance to be invoked, which is very
 
894
        # expensive: it should be the only such test in the test suite.
 
895
        # A reasonable evolution for this would be to simply check inside
 
896
        # check_channel_exec_request that the command is appropriate, and then
 
897
        # satisfy requests in-process.
 
898
        self.requireFeature(ParamikoFeature)
 
899
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
900
        # override the interface (doesn't change self._vendor).
 
901
        # Note that this does encryption, so can be slow.
 
902
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
903
        from bzrlib.tests.stub_sftp import StubServer
 
904
 
 
905
        # Start an SSH server
 
906
        self.command_executed = []
 
907
        # XXX: This is horrible -- we define a really dumb SSH server that
 
908
        # executes commands, and manage the hooking up of stdin/out/err to the
 
909
        # SSH channel ourselves.  Surely this has already been implemented
 
910
        # elsewhere?
 
911
        started = []
 
912
        class StubSSHServer(StubServer):
 
913
 
 
914
            test = self
 
915
 
 
916
            def check_channel_exec_request(self, channel, command):
 
917
                self.test.command_executed.append(command)
 
918
                proc = subprocess.Popen(
 
919
                    command, shell=True, stdin=subprocess.PIPE,
 
920
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
921
 
 
922
                # XXX: horribly inefficient, not to mention ugly.
 
923
                # Start a thread for each of stdin/out/err, and relay bytes from
 
924
                # the subprocess to channel and vice versa.
 
925
                def ferry_bytes(read, write, close):
 
926
                    while True:
 
927
                        bytes = read(1)
 
928
                        if bytes == '':
 
929
                            close()
 
930
                            break
 
931
                        write(bytes)
 
932
 
 
933
                file_functions = [
 
934
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
935
                    (proc.stdout.read, channel.sendall, channel.close),
 
936
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
937
                started.append(proc)
 
938
                for read, write, close in file_functions:
 
939
                    t = threading.Thread(
 
940
                        target=ferry_bytes, args=(read, write, close))
 
941
                    t.start()
 
942
                    started.append(t)
 
943
 
 
944
                return True
 
945
 
 
946
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
 
947
        # We *don't* want to override the default SSH vendor: the detected one
 
948
        # is the one to use.
 
949
        self.start_server(ssh_server)
 
950
        port = ssh_server._listener.port
 
951
 
 
952
        if sys.platform == 'win32':
 
953
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
954
        else:
 
955
            bzr_remote_path = self.get_bzr_path()
 
956
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
957
 
 
958
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
959
        # variable is used to tell bzr what command to run on the remote end.
 
960
        path_to_branch = osutils.abspath('.')
 
961
        if sys.platform == 'win32':
 
962
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
963
            # prefix a '/' to get the right path.
 
964
            path_to_branch = '/' + path_to_branch
 
965
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
966
        t = get_transport(url)
 
967
        self.permit_url(t.base)
 
968
        t.mkdir('foo')
 
969
 
 
970
        self.assertEqual(
 
971
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
972
            self.command_executed)
 
973
        # Make sure to disconnect, so that the remote process can stop, and we
 
974
        # can cleanup. Then pause the test until everything is shutdown
 
975
        t._client._medium.disconnect()
 
976
        if not started:
 
977
            return
 
978
        # First wait for the subprocess
 
979
        started[0].wait()
 
980
        # And the rest are threads
 
981
        for t in started[1:]:
 
982
            t.join()