/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
import os
18
18
import socket
19
19
import threading
 
20
import time
20
21
 
21
22
import bzrlib.bzrdir as bzrdir
22
23
import bzrlib.errors as errors
23
24
from bzrlib.osutils import pathjoin, lexists
24
25
from bzrlib.tests import TestCaseWithTransport, TestCase, TestSkipped
25
26
import bzrlib.transport
 
27
import bzrlib.transport.http
26
28
from bzrlib.workingtree import WorkingTree
27
29
 
28
30
try:
32
34
    paramiko_loaded = False
33
35
 
34
36
 
 
37
def set_test_transport_to_sftp(testcase):
 
38
    """A helper to set transports on test case instances."""
 
39
    from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
 
40
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
 
41
        testcase._get_remote_is_absolute = True
 
42
    if testcase._get_remote_is_absolute:
 
43
        testcase.transport_server = SFTPAbsoluteServer
 
44
    else:
 
45
        testcase.transport_server = SFTPHomeDirServer
 
46
    testcase.transport_readonly_server = bzrlib.transport.http.HttpServer
 
47
 
 
48
 
35
49
class TestCaseWithSFTPServer(TestCaseWithTransport):
36
50
    """A test case base class that provides a sftp server on localhost."""
37
51
 
38
52
    def setUp(self):
39
 
        if not paramiko_loaded:
40
 
            raise TestSkipped('you must have paramiko to run this test')
41
53
        super(TestCaseWithSFTPServer, self).setUp()
42
 
        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
43
 
        if getattr(self, '_get_remote_is_absolute', None) is None:
44
 
            self._get_remote_is_absolute = True
45
 
        if self._get_remote_is_absolute:
46
 
            self.transport_server = SFTPAbsoluteServer
47
 
        else:
48
 
            self.transport_server = SFTPHomeDirServer
49
 
        self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
54
        if not paramiko_loaded:
 
55
            raise TestSkipped('you must have paramiko to run this test')
 
56
        set_test_transport_to_sftp(self) 
50
57
 
51
58
    def get_transport(self, path=None):
52
59
        """Return a transport relative to self._test_root."""
264
271
        self._test_vendor = vendor
265
272
 
266
273
    def test_connection_paramiko(self):
267
 
        self.set_vendor('none')
 
274
        from bzrlib.transport import ssh
 
275
        self.set_vendor(ssh.ParamikoVendor())
268
276
        t = self.get_transport()
269
277
        self.assertEqual('foobar\n', t.get('a_file').read())
270
278
 
289
297
        if not paramiko_loaded:
290
298
            raise TestSkipped('you must have paramiko to run this test')
291
299
        super(SSHVendorBadConnection, self).setUp()
292
 
        import bzrlib.transport.sftp
293
 
 
294
 
        self._transport_sftp = bzrlib.transport.sftp
 
300
        import bzrlib.transport.ssh
295
301
 
296
302
        # open a random port, so we know nobody else is using it
297
303
        # but don't actually listen on the port.
299
305
        s.bind(('localhost', 0))
300
306
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
301
307
 
302
 
        orig_vendor = bzrlib.transport.sftp._ssh_vendor
 
308
        orig_vendor = bzrlib.transport.ssh._ssh_vendor
303
309
        def reset():
304
 
            bzrlib.transport.sftp._ssh_vendor = orig_vendor
 
310
            bzrlib.transport.ssh._ssh_vendor = orig_vendor
305
311
            s.close()
306
312
        self.addCleanup(reset)
307
313
 
308
314
    def set_vendor(self, vendor):
309
 
        self._transport_sftp._ssh_vendor = vendor
 
315
        import bzrlib.transport.ssh
 
316
        bzrlib.transport.ssh._ssh_vendor = vendor
310
317
 
311
318
    def test_bad_connection_paramiko(self):
312
319
        """Test that a real connection attempt raises the right error"""
313
 
        self.set_vendor('none')
 
320
        from bzrlib.transport import ssh
 
321
        self.set_vendor(ssh.ParamikoVendor())
314
322
        self.assertRaises(errors.ConnectionError,
315
323
                          bzrlib.transport.get_transport, self.bogus_url)
316
324
 
342
350
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
343
351
        self.assertContainsRe(err, 'Connection error')
344
352
 
 
353
 
 
354
class SFTPLatencyKnob(TestCaseWithSFTPServer):
 
355
    """Test that the testing SFTPServer's latency knob works."""
 
356
 
 
357
    def test_latency_knob_slows_transport(self):
 
358
        # change the latency knob to 500ms. We take about 40ms for a 
 
359
        # loopback connection ordinarily.
 
360
        start_time = time.time()
 
361
        self.get_server().add_latency = 0.5
 
362
        transport = self.get_transport()
 
363
        with_latency_knob_time = time.time() - start_time
 
364
        self.assertTrue(with_latency_knob_time > 0.4)
 
365
 
 
366
    def test_default(self):
 
367
        # This test is potentially brittle: under extremely high machine load
 
368
        # it could fail, but that is quite unlikely
 
369
        start_time = time.time()
 
370
        transport = self.get_transport()
 
371
        regular_time = time.time() - start_time
 
372
        self.assertTrue(regular_time < 0.5)
 
373
 
 
374
 
 
375
class FakeSocket(object):
 
376
    """Fake socket object used to test the SocketDelay wrapper without
 
377
    using a real socket.
 
378
    """
 
379
 
 
380
    def __init__(self):
 
381
        self._data = ""
 
382
 
 
383
    def send(self, data, flags=0):
 
384
        self._data += data
 
385
        return len(data)
 
386
 
 
387
    def sendall(self, data, flags=0):
 
388
        self._data += data
 
389
        return len(data)
 
390
 
 
391
    def recv(self, size, flags=0):
 
392
        if size < len(self._data):
 
393
            result = self._data[:size]
 
394
            self._data = self._data[size:]
 
395
            return result
 
396
        else:
 
397
            result = self._data
 
398
            self._data = ""
 
399
            return result
 
400
 
 
401
 
 
402
class TestSocketDelay(TestCase):
 
403
 
 
404
    def setUp(self):
 
405
        TestCase.setUp(self)
 
406
 
 
407
    def test_delay(self):
 
408
        from bzrlib.transport.sftp import SocketDelay
 
409
        sending = FakeSocket()
 
410
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
 
411
                                really_sleep=False)
 
412
        # check that simulated time is charged only per round-trip:
 
413
        t1 = SocketDelay.simulated_time
 
414
        receiving.send("connect1")
 
415
        self.assertEqual(sending.recv(1024), "connect1")
 
416
        t2 = SocketDelay.simulated_time
 
417
        self.assertAlmostEqual(t2 - t1, 0.1)
 
418
        receiving.send("connect2")
 
419
        self.assertEqual(sending.recv(1024), "connect2")
 
420
        sending.send("hello")
 
421
        self.assertEqual(receiving.recv(1024), "hello")
 
422
        t3 = SocketDelay.simulated_time
 
423
        self.assertAlmostEqual(t3 - t2, 0.1)
 
424
        sending.send("hello")
 
425
        self.assertEqual(receiving.recv(1024), "hello")
 
426
        sending.send("hello")
 
427
        self.assertEqual(receiving.recv(1024), "hello")
 
428
        sending.send("hello")
 
429
        self.assertEqual(receiving.recv(1024), "hello")
 
430
        t4 = SocketDelay.simulated_time
 
431
        self.assertAlmostEqual(t4, t3)
 
432
 
 
433
    def test_bandwidth(self):
 
434
        from bzrlib.transport.sftp import SocketDelay
 
435
        sending = FakeSocket()
 
436
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
 
437
                                really_sleep=False)
 
438
        # check that simulated time is charged only per round-trip:
 
439
        t1 = SocketDelay.simulated_time
 
440
        receiving.send("connect")
 
441
        self.assertEqual(sending.recv(1024), "connect")
 
442
        sending.send("a" * 100)
 
443
        self.assertEqual(receiving.recv(1024), "a" * 100)
 
444
        t2 = SocketDelay.simulated_time
 
445
        self.assertAlmostEqual(t2 - t1, 100 + 7)
 
446
 
 
447