163
155
class SFTPNonServerTest(TestCase):
165
157
TestCase.setUp(self)
166
if not paramiko_loaded:
167
raise TestSkipped('you must have paramiko to run this test')
158
self.requireFeature(features.paramiko)
169
160
def test_parse_url_with_home_dir(self):
170
s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative')
161
s = _mod_sftp.SFTPTransport(
162
'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
171
163
self.assertEquals(s._host, 'example.com')
172
164
self.assertEquals(s._port, 2222)
173
165
self.assertEquals(s._user, 'robey')
257
248
None: If 'ssh' exists on the machine, then it will be spawned as a
262
253
super(SSHVendorConnection, self).setUp()
263
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
265
255
def create_server():
266
256
"""Just a wrapper so that when created, it will set _vendor"""
267
257
# SFTPFullAbsoluteServer can handle any vendor,
268
258
# it just needs to be set between the time it is instantiated
269
259
# and the time .setUp() is called
270
server = SFTPFullAbsoluteServer()
260
server = stub_sftp.SFTPFullAbsoluteServer()
271
261
server._vendor = self._test_vendor
273
263
self._test_vendor = 'loopback'
308
if not paramiko_loaded:
309
raise TestSkipped('you must have paramiko to run this test')
298
self.requireFeature(features.paramiko)
310
299
super(SSHVendorBadConnection, self).setUp()
311
import bzrlib.transport.ssh
313
301
# open a random port, so we know nobody else is using it
314
302
# but don't actually listen on the port.
315
303
s = socket.socket()
316
304
s.bind(('localhost', 0))
305
self.addCleanup(s.close)
317
306
self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
319
orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
321
bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
323
self.addCleanup(reset)
325
308
def set_vendor(self, vendor):
326
import bzrlib.transport.ssh
327
bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
309
from bzrlib.transport import ssh
310
self.overrideAttr(ssh._ssh_vendor_manager, '_cached_ssh_vendor', vendor)
329
312
def test_bad_connection_paramiko(self):
330
313
"""Test that a real connection attempt raises the right error"""
420
403
TestCase.setUp(self)
421
if not paramiko_loaded:
422
raise TestSkipped('you must have paramiko to run this test')
404
self.requireFeature(features.paramiko)
424
406
def test_delay(self):
425
from bzrlib.transport.sftp import SocketDelay
426
407
sending = FakeSocket()
427
receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
408
receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000,
429
410
# check that simulated time is charged only per round-trip:
430
t1 = SocketDelay.simulated_time
411
t1 = stub_sftp.SocketDelay.simulated_time
431
412
receiving.send("connect1")
432
413
self.assertEqual(sending.recv(1024), "connect1")
433
t2 = SocketDelay.simulated_time
414
t2 = stub_sftp.SocketDelay.simulated_time
434
415
self.assertAlmostEqual(t2 - t1, 0.1)
435
416
receiving.send("connect2")
436
417
self.assertEqual(sending.recv(1024), "connect2")
437
418
sending.send("hello")
438
419
self.assertEqual(receiving.recv(1024), "hello")
439
t3 = SocketDelay.simulated_time
420
t3 = stub_sftp.SocketDelay.simulated_time
440
421
self.assertAlmostEqual(t3 - t2, 0.1)
441
422
sending.send("hello")
442
423
self.assertEqual(receiving.recv(1024), "hello")
444
425
self.assertEqual(receiving.recv(1024), "hello")
445
426
sending.send("hello")
446
427
self.assertEqual(receiving.recv(1024), "hello")
447
t4 = SocketDelay.simulated_time
428
t4 = stub_sftp.SocketDelay.simulated_time
448
429
self.assertAlmostEqual(t4, t3)
450
431
def test_bandwidth(self):
451
from bzrlib.transport.sftp import SocketDelay
452
432
sending = FakeSocket()
453
receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
433
receiving = stub_sftp.SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
455
435
# check that simulated time is charged only per round-trip:
456
t1 = SocketDelay.simulated_time
436
t1 = stub_sftp.SocketDelay.simulated_time
457
437
receiving.send("connect")
458
438
self.assertEqual(sending.recv(1024), "connect")
459
439
sending.send("a" * 100)
460
440
self.assertEqual(receiving.recv(1024), "a" * 100)
461
t2 = SocketDelay.simulated_time
441
t2 = stub_sftp.SocketDelay.simulated_time
462
442
self.assertAlmostEqual(t2 - t1, 100 + 7)
445
class ReadvFile(object):
446
"""An object that acts like Paramiko's SFTPFile.readv()"""
448
def __init__(self, data):
451
def readv(self, requests):
452
for start, length in requests:
453
yield self._data[start:start+length]
456
def _null_report_activity(*a, **k):
465
460
class Test_SFTPReadvHelper(tests.TestCase):
467
462
def checkGetRequests(self, expected_requests, offsets):
468
if not paramiko_loaded:
469
raise TestSkipped('you must have paramiko to run this test')
470
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test')
463
self.requireFeature(features.paramiko)
464
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
465
_null_report_activity)
471
466
self.assertEqual(expected_requests, helper._get_requests())
473
468
def test__get_requests(self):
483
478
self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
484
479
[(0, 40000), (40000, 100), (40100, 1900),
482
def checkRequestAndYield(self, expected, data, offsets):
483
self.requireFeature(features.paramiko)
484
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
485
_null_report_activity)
486
data_f = ReadvFile(data)
487
result = list(helper.request_and_yield_offsets(data_f))
488
self.assertEqual(expected, result)
490
def test_request_and_yield_offsets(self):
491
data = 'abcdefghijklmnopqrstuvwxyz'
492
self.checkRequestAndYield([(0, 'a'), (5, 'f'), (10, 'klm')], data,
493
[(0, 1), (5, 1), (10, 3)])
494
# Should combine requests, and split them again
495
self.checkRequestAndYield([(0, 'a'), (1, 'b'), (10, 'klm')], data,
496
[(0, 1), (1, 1), (10, 3)])
497
# Out of order requests. The requests should get combined, but then be
498
# yielded out-of-order. We also need one that is at the end of a
499
# previous range. See bug #293746
500
self.checkRequestAndYield([(0, 'a'), (10, 'k'), (4, 'efg'), (1, 'bcd')],
501
data, [(0, 1), (10, 1), (4, 3), (1, 3)])
504
class TestUsesAuthConfig(TestCaseWithSFTPServer):
505
"""Test that AuthenticationConfig can supply default usernames."""
507
def get_transport_for_connection(self, set_config):
508
port = self.get_server()._listener.port
510
conf = config.AuthenticationConfig()
511
conf._get_config().update(
512
{'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
514
t = get_transport('sftp://localhost:%d' % port)
515
# force a connection to be performed.
519
def test_sftp_uses_config(self):
520
t = self.get_transport_for_connection(set_config=True)
521
self.assertEqual('bar', t._get_credentials()[0])
523
def test_sftp_is_none_if_no_config(self):
524
t = self.get_transport_for_connection(set_config=False)
525
self.assertIs(None, t._get_credentials()[0])
527
def test_sftp_doesnt_prompt_username(self):
528
stdout = tests.StringIOWrapper()
529
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
530
t = self.get_transport_for_connection(set_config=False)
531
self.assertIs(None, t._get_credentials()[0])
532
# No prompts should've been printed, stdin shouldn't have been read
533
self.assertEquals("", stdout.getvalue())
534
self.assertEquals(0, ui.ui_factory.stdin.tell())