446
453
server.tearDown()
456
class PathFilteringDecoratorTransportTest(TestCase):
457
"""Pathfilter decoration specific tests."""
459
def test_abspath(self):
460
# The abspath is always relative to the base of the backing transport.
461
server = PathFilteringServer(get_transport('memory:///foo/bar/'),
464
transport = get_transport(server.get_url())
465
self.assertEqual(server.get_url(), transport.abspath('/'))
467
subdir_transport = transport.clone('subdir')
468
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
471
def make_pf_transport(self, filter_func=None):
472
"""Make a PathFilteringTransport backed by a MemoryTransport.
474
:param filter_func: by default this will be a no-op function. Use this
475
parameter to override it."""
476
if filter_func is None:
477
filter_func = lambda x: x
478
server = PathFilteringServer(
479
get_transport('memory:///foo/bar/'), filter_func)
481
self.addCleanup(server.tearDown)
482
return get_transport(server.get_url())
484
def test__filter(self):
485
# _filter (with an identity func as filter_func) always returns
486
# paths relative to the base of the backing transport.
487
transport = self.make_pf_transport()
488
self.assertEqual('foo', transport._filter('foo'))
489
self.assertEqual('foo/bar', transport._filter('foo/bar'))
490
self.assertEqual('', transport._filter('..'))
491
self.assertEqual('', transport._filter('/'))
492
# The base of the pathfiltering transport is taken into account too.
493
transport = transport.clone('subdir1/subdir2')
494
self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
496
'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
497
self.assertEqual('subdir1', transport._filter('..'))
498
self.assertEqual('', transport._filter('/'))
500
def test_filter_invocation(self):
503
filter_log.append(path)
505
transport = self.make_pf_transport(filter)
507
self.assertEqual(['abc'], filter_log)
509
transport.clone('abc').has('xyz')
510
self.assertEqual(['abc/xyz'], filter_log)
512
transport.has('/abc')
513
self.assertEqual(['abc'], filter_log)
515
def test_clone(self):
516
transport = self.make_pf_transport()
517
# relpath from root and root path are the same
518
relpath_cloned = transport.clone('foo')
519
abspath_cloned = transport.clone('/foo')
520
self.assertEqual(transport.server, relpath_cloned.server)
521
self.assertEqual(transport.server, abspath_cloned.server)
523
def test_url_preserves_pathfiltering(self):
524
"""Calling get_transport on a pathfiltered transport's base should
525
produce a transport with exactly the same behaviour as the original
526
pathfiltered transport.
528
This is so that it is not possible to escape (accidentally or
529
otherwise) the filtering by doing::
530
url = filtered_transport.base
531
parent_url = urlutils.join(url, '..')
532
new_transport = get_transport(parent_url)
534
transport = self.make_pf_transport()
535
new_transport = get_transport(transport.base)
536
self.assertEqual(transport.server, new_transport.server)
537
self.assertEqual(transport.base, new_transport.base)
449
540
class ReadonlyDecoratorTransportTest(TestCase):
450
541
"""Readonly decoration specific tests."""
790
881
# readv records the supplied offset request
791
882
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
792
883
self.assertEqual(expected_result, transport._activity)
886
class TestSSHConnections(tests.TestCaseWithTransport):
888
def test_bzr_connect_to_bzr_ssh(self):
889
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
891
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
893
# This test actually causes a bzr instance to be invoked, which is very
894
# expensive: it should be the only such test in the test suite.
895
# A reasonable evolution for this would be to simply check inside
896
# check_channel_exec_request that the command is appropriate, and then
897
# satisfy requests in-process.
898
self.requireFeature(ParamikoFeature)
899
# SFTPFullAbsoluteServer has a get_url method, and doesn't
900
# override the interface (doesn't change self._vendor).
901
# Note that this does encryption, so can be slow.
902
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
903
from bzrlib.tests.stub_sftp import StubServer
905
# Start an SSH server
906
self.command_executed = []
907
# XXX: This is horrible -- we define a really dumb SSH server that
908
# executes commands, and manage the hooking up of stdin/out/err to the
909
# SSH channel ourselves. Surely this has already been implemented
912
class StubSSHServer(StubServer):
916
def check_channel_exec_request(self, channel, command):
917
self.test.command_executed.append(command)
918
proc = subprocess.Popen(
919
command, shell=True, stdin=subprocess.PIPE,
920
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
922
# XXX: horribly inefficient, not to mention ugly.
923
# Start a thread for each of stdin/out/err, and relay bytes from
924
# the subprocess to channel and vice versa.
925
def ferry_bytes(read, write, close):
934
(channel.recv, proc.stdin.write, proc.stdin.close),
935
(proc.stdout.read, channel.sendall, channel.close),
936
(proc.stderr.read, channel.sendall_stderr, channel.close)]
938
for read, write, close in file_functions:
939
t = threading.Thread(
940
target=ferry_bytes, args=(read, write, close))
946
ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
947
# We *don't* want to override the default SSH vendor: the detected one
949
self.start_server(ssh_server)
950
port = ssh_server._listener.port
952
if sys.platform == 'win32':
953
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
955
bzr_remote_path = self.get_bzr_path()
956
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
958
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
959
# variable is used to tell bzr what command to run on the remote end.
960
path_to_branch = osutils.abspath('.')
961
if sys.platform == 'win32':
962
# On Windows, we export all drives as '/C:/, etc. So we need to
963
# prefix a '/' to get the right path.
964
path_to_branch = '/' + path_to_branch
965
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
966
t = get_transport(url)
967
self.permit_url(t.base)
971
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
972
self.command_executed)
973
# Make sure to disconnect, so that the remote process can stop, and we
974
# can cleanup. Then pause the test until everything is shutdown
975
t._client._medium.disconnect()
978
# First wait for the subprocess
980
# And the rest are threads
981
for t in started[1:]: