363
370
def test_abspath(self):
364
371
# The abspath is always relative to the chroot_url.
365
372
server = ChrootServer(get_transport('memory:///foo/bar/'))
373
self.start_server(server)
367
374
transport = get_transport(server.get_url())
368
375
self.assertEqual(server.get_url(), transport.abspath('/'))
370
377
subdir_transport = transport.clone('subdir')
371
378
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
374
380
def test_clone(self):
375
381
server = ChrootServer(get_transport('memory:///foo/bar/'))
382
self.start_server(server)
377
383
transport = get_transport(server.get_url())
378
384
# relpath from root and root path are the same
379
385
relpath_cloned = transport.clone('foo')
380
386
abspath_cloned = transport.clone('/foo')
381
387
self.assertEqual(server, relpath_cloned.server)
382
388
self.assertEqual(server, abspath_cloned.server)
385
390
def test_chroot_url_preserves_chroot(self):
386
391
"""Calling get_transport on a chroot transport's base should produce a
441
447
backing_transport = MemoryTransport()
442
448
server = ChrootServer(backing_transport)
444
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
451
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
456
class PathFilteringDecoratorTransportTest(TestCase):
457
"""Pathfilter decoration specific tests."""
459
def test_abspath(self):
460
# The abspath is always relative to the base of the backing transport.
461
server = PathFilteringServer(get_transport('memory:///foo/bar/'),
464
transport = get_transport(server.get_url())
465
self.assertEqual(server.get_url(), transport.abspath('/'))
467
subdir_transport = transport.clone('subdir')
468
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
445
469
server.tearDown()
471
def make_pf_transport(self, filter_func=None):
472
"""Make a PathFilteringTransport backed by a MemoryTransport.
474
:param filter_func: by default this will be a no-op function. Use this
475
parameter to override it."""
476
if filter_func is None:
477
filter_func = lambda x: x
478
server = PathFilteringServer(
479
get_transport('memory:///foo/bar/'), filter_func)
481
self.addCleanup(server.tearDown)
482
return get_transport(server.get_url())
484
def test__filter(self):
485
# _filter (with an identity func as filter_func) always returns
486
# paths relative to the base of the backing transport.
487
transport = self.make_pf_transport()
488
self.assertEqual('foo', transport._filter('foo'))
489
self.assertEqual('foo/bar', transport._filter('foo/bar'))
490
self.assertEqual('', transport._filter('..'))
491
self.assertEqual('', transport._filter('/'))
492
# The base of the pathfiltering transport is taken into account too.
493
transport = transport.clone('subdir1/subdir2')
494
self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
496
'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
497
self.assertEqual('subdir1', transport._filter('..'))
498
self.assertEqual('', transport._filter('/'))
500
def test_filter_invocation(self):
503
filter_log.append(path)
505
transport = self.make_pf_transport(filter)
507
self.assertEqual(['abc'], filter_log)
509
transport.clone('abc').has('xyz')
510
self.assertEqual(['abc/xyz'], filter_log)
512
transport.has('/abc')
513
self.assertEqual(['abc'], filter_log)
515
def test_clone(self):
516
transport = self.make_pf_transport()
517
# relpath from root and root path are the same
518
relpath_cloned = transport.clone('foo')
519
abspath_cloned = transport.clone('/foo')
520
self.assertEqual(transport.server, relpath_cloned.server)
521
self.assertEqual(transport.server, abspath_cloned.server)
523
def test_url_preserves_pathfiltering(self):
524
"""Calling get_transport on a pathfiltered transport's base should
525
produce a transport with exactly the same behaviour as the original
526
pathfiltered transport.
528
This is so that it is not possible to escape (accidentally or
529
otherwise) the filtering by doing::
530
url = filtered_transport.base
531
parent_url = urlutils.join(url, '..')
532
new_transport = get_transport(parent_url)
534
transport = self.make_pf_transport()
535
new_transport = get_transport(transport.base)
536
self.assertEqual(transport.server, new_transport.server)
537
self.assertEqual(transport.base, new_transport.base)
448
540
class ReadonlyDecoratorTransportTest(TestCase):
449
541
"""Readonly decoration specific tests."""
460
552
import bzrlib.transport.readonly as readonly
461
553
# connect to '.' via http which is not listable
462
554
server = HttpServer()
465
transport = get_transport('readonly+' + server.get_url())
466
self.failUnless(isinstance(transport,
467
readonly.ReadonlyTransportDecorator))
468
self.assertEqual(False, transport.listable())
469
self.assertEqual(True, transport.is_readonly())
555
self.start_server(server)
556
transport = get_transport('readonly+' + server.get_url())
557
self.failUnless(isinstance(transport,
558
readonly.ReadonlyTransportDecorator))
559
self.assertEqual(False, transport.listable())
560
self.assertEqual(True, transport.is_readonly())
474
563
class FakeNFSDecoratorTests(TestCaseInTempDir):
492
581
from bzrlib.tests.http_server import HttpServer
493
582
# connect to '.' via http which is not listable
494
583
server = HttpServer()
497
transport = self.get_nfs_transport(server.get_url())
498
self.assertIsInstance(
499
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
self.assertEqual(False, transport.listable())
501
self.assertEqual(True, transport.is_readonly())
584
self.start_server(server)
585
transport = self.get_nfs_transport(server.get_url())
586
self.assertIsInstance(
587
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
588
self.assertEqual(False, transport.listable())
589
self.assertEqual(True, transport.is_readonly())
505
591
def test_fakenfs_server_default(self):
506
592
# a FakeNFSServer() should bring up a local relpath server for itself
507
593
import bzrlib.transport.fakenfs as fakenfs
508
594
server = fakenfs.FakeNFSServer()
511
# the url should be decorated appropriately
512
self.assertStartsWith(server.get_url(), 'fakenfs+')
513
# and we should be able to get a transport for it
514
transport = get_transport(server.get_url())
515
# which must be a FakeNFSTransportDecorator instance.
516
self.assertIsInstance(
517
transport, fakenfs.FakeNFSTransportDecorator)
595
self.start_server(server)
596
# the url should be decorated appropriately
597
self.assertStartsWith(server.get_url(), 'fakenfs+')
598
# and we should be able to get a transport for it
599
transport = get_transport(server.get_url())
600
# which must be a FakeNFSTransportDecorator instance.
601
self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
521
603
def test_fakenfs_rename_semantics(self):
522
604
# a FakeNFS transport must mangle the way rename errors occur to
800
881
# readv records the supplied offset request
801
882
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
883
self.assertEqual(expected_result, transport._activity)
886
class TestSSHConnections(tests.TestCaseWithTransport):
888
def test_bzr_connect_to_bzr_ssh(self):
889
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
891
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
893
# This test actually causes a bzr instance to be invoked, which is very
894
# expensive: it should be the only such test in the test suite.
895
# A reasonable evolution for this would be to simply check inside
896
# check_channel_exec_request that the command is appropriate, and then
897
# satisfy requests in-process.
898
self.requireFeature(ParamikoFeature)
899
# SFTPFullAbsoluteServer has a get_url method, and doesn't
900
# override the interface (doesn't change self._vendor).
901
# Note that this does encryption, so can be slow.
902
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
903
from bzrlib.tests.stub_sftp import StubServer
905
# Start an SSH server
906
self.command_executed = []
907
# XXX: This is horrible -- we define a really dumb SSH server that
908
# executes commands, and manage the hooking up of stdin/out/err to the
909
# SSH channel ourselves. Surely this has already been implemented
912
class StubSSHServer(StubServer):
916
def check_channel_exec_request(self, channel, command):
917
self.test.command_executed.append(command)
918
proc = subprocess.Popen(
919
command, shell=True, stdin=subprocess.PIPE,
920
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
922
# XXX: horribly inefficient, not to mention ugly.
923
# Start a thread for each of stdin/out/err, and relay bytes from
924
# the subprocess to channel and vice versa.
925
def ferry_bytes(read, write, close):
934
(channel.recv, proc.stdin.write, proc.stdin.close),
935
(proc.stdout.read, channel.sendall, channel.close),
936
(proc.stderr.read, channel.sendall_stderr, channel.close)]
938
for read, write, close in file_functions:
939
t = threading.Thread(
940
target=ferry_bytes, args=(read, write, close))
946
ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
947
# We *don't* want to override the default SSH vendor: the detected one
949
self.start_server(ssh_server)
950
port = ssh_server._listener.port
952
if sys.platform == 'win32':
953
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
955
bzr_remote_path = self.get_bzr_path()
956
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
958
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
959
# variable is used to tell bzr what command to run on the remote end.
960
path_to_branch = osutils.abspath('.')
961
if sys.platform == 'win32':
962
# On Windows, we export all drives as '/C:/, etc. So we need to
963
# prefix a '/' to get the right path.
964
path_to_branch = '/' + path_to_branch
965
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
966
t = get_transport(url)
967
self.permit_url(t.base)
971
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
972
self.command_executed)
973
# Make sure to disconnect, so that the remote process can stop, and we
974
# can cleanup. Then pause the test until everything is shutdown
975
t._client._medium.disconnect()
978
# First wait for the subprocess
980
# And the rest are threads
981
for t in started[1:]: