/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

  • Committer: Martin von Gagern
  • Date: 2010-04-20 08:47:38 UTC
  • mfrom: (5167 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5195.
  • Revision ID: martin.vgagern@gmx.net-20100420084738-ygymnqmdllzrhpfn
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
1
# Copyright (C) 2005-2010 Robey Pointer <robey@lag.net>
2
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
3
#
4
4
# This program is free software; you can redistribute it and/or modify
13
13
#
14
14
# You should have received a copy of the GNU General Public License
15
15
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
 
18
18
import os
19
19
import socket
21
21
import threading
22
22
import time
23
23
 
24
 
try:
25
 
    import paramiko
26
 
    paramiko_loaded = True
27
 
except ImportError:
28
 
    paramiko_loaded = False
29
 
 
30
24
from bzrlib import (
31
25
    bzrdir,
 
26
    config,
32
27
    errors,
33
28
    tests,
34
29
    transport as _mod_transport,
 
30
    ui,
35
31
    )
36
32
from bzrlib.osutils import (
37
33
    pathjoin,
39
35
    set_or_unset_env,
40
36
    )
41
37
from bzrlib.tests import (
 
38
    features,
42
39
    TestCaseWithTransport,
43
40
    TestCase,
44
41
    TestSkipped,
47
44
from bzrlib.transport import get_transport
48
45
import bzrlib.transport.http
49
46
 
50
 
if paramiko_loaded:
 
47
if features.paramiko.available():
51
48
    from bzrlib.transport import sftp as _mod_sftp
52
 
    from bzrlib.transport.sftp import (
53
 
        SFTPAbsoluteServer,
54
 
        SFTPHomeDirServer,
55
 
        SFTPTransport,
56
 
        )
 
49
    from bzrlib.tests import stub_sftp
57
50
 
58
51
from bzrlib.workingtree import WorkingTree
59
52
 
63
56
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
64
57
        testcase._get_remote_is_absolute = True
65
58
    if testcase._get_remote_is_absolute:
66
 
        testcase.transport_server = SFTPAbsoluteServer
 
59
        testcase.transport_server = stub_sftp.SFTPAbsoluteServer
67
60
    else:
68
 
        testcase.transport_server = SFTPHomeDirServer
 
61
        testcase.transport_server = stub_sftp.SFTPHomeDirServer
69
62
    testcase.transport_readonly_server = HttpServer
70
63
 
71
64
 
74
67
 
75
68
    def setUp(self):
76
69
        super(TestCaseWithSFTPServer, self).setUp()
77
 
        if not paramiko_loaded:
78
 
            raise TestSkipped('you must have paramiko to run this test')
 
70
        self.requireFeature(features.paramiko)
79
71
        set_test_transport_to_sftp(self)
80
72
 
81
73
 
163
155
class SFTPNonServerTest(TestCase):
164
156
    def setUp(self):
165
157
        TestCase.setUp(self)
166
 
        if not paramiko_loaded:
167
 
            raise TestSkipped('you must have paramiko to run this test')
 
158
        self.requireFeature(features.paramiko)
168
159
 
169
160
    def test_parse_url_with_home_dir(self):
170
 
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative')
 
161
        s = _mod_sftp.SFTPTransport(
 
162
            'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
171
163
        self.assertEquals(s._host, 'example.com')
172
164
        self.assertEquals(s._port, 2222)
173
165
        self.assertEquals(s._user, 'robey')
175
167
        self.assertEquals(s._path, '/~/relative/')
176
168
 
177
169
    def test_relpath(self):
178
 
        s = SFTPTransport('sftp://user@host.com/abs/path')
 
170
        s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path')
179
171
        self.assertRaises(errors.PathNotChild, s.relpath,
180
172
                          'sftp://user@host.com/~/rel/path/sub')
181
173
 
195
187
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
196
188
 
197
189
    def test_abspath_root_sibling_server(self):
198
 
        from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
199
 
        server = SFTPSiblingAbsoluteServer()
200
 
        server.setUp()
 
190
        server = stub_sftp.SFTPSiblingAbsoluteServer()
 
191
        server.start_server()
201
192
        try:
202
193
            transport = get_transport(server.get_url())
203
194
            self.assertFalse(transport.abspath('/').endswith('/~/'))
204
195
            self.assertTrue(transport.abspath('/').endswith('/'))
205
196
            del transport
206
197
        finally:
207
 
            server.tearDown()
 
198
            server.stop_server()
208
199
 
209
200
 
210
201
class SFTPBranchTest(TestCaseWithSFTPServer):
257
248
      None:       If 'ssh' exists on the machine, then it will be spawned as a
258
249
                  child process.
259
250
    """
260
 
    
 
251
 
261
252
    def setUp(self):
262
253
        super(SSHVendorConnection, self).setUp()
263
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
264
254
 
265
255
        def create_server():
266
256
            """Just a wrapper so that when created, it will set _vendor"""
267
257
            # SFTPFullAbsoluteServer can handle any vendor,
268
258
            # it just needs to be set between the time it is instantiated
269
259
            # and the time .setUp() is called
270
 
            server = SFTPFullAbsoluteServer()
 
260
            server = stub_sftp.SFTPFullAbsoluteServer()
271
261
            server._vendor = self._test_vendor
272
262
            return server
273
263
        self._test_vendor = 'loopback'
305
295
    """
306
296
 
307
297
    def setUp(self):
308
 
        if not paramiko_loaded:
309
 
            raise TestSkipped('you must have paramiko to run this test')
 
298
        self.requireFeature(features.paramiko)
310
299
        super(SSHVendorBadConnection, self).setUp()
311
 
        import bzrlib.transport.ssh
312
300
 
313
301
        # open a random port, so we know nobody else is using it
314
302
        # but don't actually listen on the port.
315
303
        s = socket.socket()
316
304
        s.bind(('localhost', 0))
 
305
        self.addCleanup(s.close)
317
306
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
318
307
 
319
 
        orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
320
 
        def reset():
321
 
            bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
322
 
            s.close()
323
 
        self.addCleanup(reset)
324
 
 
325
308
    def set_vendor(self, vendor):
326
 
        import bzrlib.transport.ssh
327
 
        bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
 
309
        from bzrlib.transport import ssh
 
310
        self.overrideAttr(ssh._ssh_vendor_manager, '_cached_ssh_vendor', vendor)
328
311
 
329
312
    def test_bad_connection_paramiko(self):
330
313
        """Test that a real connection attempt raises the right error"""
367
350
    """Test that the testing SFTPServer's latency knob works."""
368
351
 
369
352
    def test_latency_knob_slows_transport(self):
370
 
        # change the latency knob to 500ms. We take about 40ms for a 
 
353
        # change the latency knob to 500ms. We take about 40ms for a
371
354
        # loopback connection ordinarily.
372
355
        start_time = time.time()
373
356
        self.get_server().add_latency = 0.5
418
401
 
419
402
    def setUp(self):
420
403
        TestCase.setUp(self)
421
 
        if not paramiko_loaded:
422
 
            raise TestSkipped('you must have paramiko to run this test')
 
404
        self.requireFeature(features.paramiko)
423
405
 
424
406
    def test_delay(self):
425
 
        from bzrlib.transport.sftp import SocketDelay
426
407
        sending = FakeSocket()
427
 
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
428
 
                                really_sleep=False)
 
408
        receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000,
 
409
                                          really_sleep=False)
429
410
        # check that simulated time is charged only per round-trip:
430
 
        t1 = SocketDelay.simulated_time
 
411
        t1 = stub_sftp.SocketDelay.simulated_time
431
412
        receiving.send("connect1")
432
413
        self.assertEqual(sending.recv(1024), "connect1")
433
 
        t2 = SocketDelay.simulated_time
 
414
        t2 = stub_sftp.SocketDelay.simulated_time
434
415
        self.assertAlmostEqual(t2 - t1, 0.1)
435
416
        receiving.send("connect2")
436
417
        self.assertEqual(sending.recv(1024), "connect2")
437
418
        sending.send("hello")
438
419
        self.assertEqual(receiving.recv(1024), "hello")
439
 
        t3 = SocketDelay.simulated_time
 
420
        t3 = stub_sftp.SocketDelay.simulated_time
440
421
        self.assertAlmostEqual(t3 - t2, 0.1)
441
422
        sending.send("hello")
442
423
        self.assertEqual(receiving.recv(1024), "hello")
444
425
        self.assertEqual(receiving.recv(1024), "hello")
445
426
        sending.send("hello")
446
427
        self.assertEqual(receiving.recv(1024), "hello")
447
 
        t4 = SocketDelay.simulated_time
 
428
        t4 = stub_sftp.SocketDelay.simulated_time
448
429
        self.assertAlmostEqual(t4, t3)
449
430
 
450
431
    def test_bandwidth(self):
451
 
        from bzrlib.transport.sftp import SocketDelay
452
432
        sending = FakeSocket()
453
 
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
454
 
                                really_sleep=False)
 
433
        receiving = stub_sftp.SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
 
434
                                          really_sleep=False)
455
435
        # check that simulated time is charged only per round-trip:
456
 
        t1 = SocketDelay.simulated_time
 
436
        t1 = stub_sftp.SocketDelay.simulated_time
457
437
        receiving.send("connect")
458
438
        self.assertEqual(sending.recv(1024), "connect")
459
439
        sending.send("a" * 100)
460
440
        self.assertEqual(receiving.recv(1024), "a" * 100)
461
 
        t2 = SocketDelay.simulated_time
 
441
        t2 = stub_sftp.SocketDelay.simulated_time
462
442
        self.assertAlmostEqual(t2 - t1, 100 + 7)
463
443
 
464
444
 
 
445
class ReadvFile(object):
 
446
    """An object that acts like Paramiko's SFTPFile.readv()"""
 
447
 
 
448
    def __init__(self, data):
 
449
        self._data = data
 
450
 
 
451
    def readv(self, requests):
 
452
        for start, length in requests:
 
453
            yield self._data[start:start+length]
 
454
 
 
455
 
 
456
def _null_report_activity(*a, **k):
 
457
    pass
 
458
 
 
459
 
465
460
class Test_SFTPReadvHelper(tests.TestCase):
466
461
 
467
462
    def checkGetRequests(self, expected_requests, offsets):
468
 
        if not paramiko_loaded:
469
 
            raise TestSkipped('you must have paramiko to run this test')
470
 
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test')
 
463
        self.requireFeature(features.paramiko)
 
464
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
 
465
            _null_report_activity)
471
466
        self.assertEqual(expected_requests, helper._get_requests())
472
467
 
473
468
    def test__get_requests(self):
483
478
        self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
484
479
                              [(0, 40000), (40000, 100), (40100, 1900),
485
480
                               (42000, 24000)])
 
481
 
 
482
    def checkRequestAndYield(self, expected, data, offsets):
 
483
        self.requireFeature(features.paramiko)
 
484
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
 
485
            _null_report_activity)
 
486
        data_f = ReadvFile(data)
 
487
        result = list(helper.request_and_yield_offsets(data_f))
 
488
        self.assertEqual(expected, result)
 
489
 
 
490
    def test_request_and_yield_offsets(self):
 
491
        data = 'abcdefghijklmnopqrstuvwxyz'
 
492
        self.checkRequestAndYield([(0, 'a'), (5, 'f'), (10, 'klm')], data,
 
493
                                  [(0, 1), (5, 1), (10, 3)])
 
494
        # Should combine requests, and split them again
 
495
        self.checkRequestAndYield([(0, 'a'), (1, 'b'), (10, 'klm')], data,
 
496
                                  [(0, 1), (1, 1), (10, 3)])
 
497
        # Out of order requests. The requests should get combined, but then be
 
498
        # yielded out-of-order. We also need one that is at the end of a
 
499
        # previous range. See bug #293746
 
500
        self.checkRequestAndYield([(0, 'a'), (10, 'k'), (4, 'efg'), (1, 'bcd')],
 
501
                                  data, [(0, 1), (10, 1), (4, 3), (1, 3)])
 
502
 
 
503
 
 
504
class TestUsesAuthConfig(TestCaseWithSFTPServer):
 
505
    """Test that AuthenticationConfig can supply default usernames."""
 
506
 
 
507
    def get_transport_for_connection(self, set_config):
 
508
        port = self.get_server()._listener.port
 
509
        if set_config:
 
510
            conf = config.AuthenticationConfig()
 
511
            conf._get_config().update(
 
512
                {'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
 
513
            conf._save()
 
514
        t = get_transport('sftp://localhost:%d' % port)
 
515
        # force a connection to be performed.
 
516
        t.has('foo')
 
517
        return t
 
518
 
 
519
    def test_sftp_uses_config(self):
 
520
        t = self.get_transport_for_connection(set_config=True)
 
521
        self.assertEqual('bar', t._get_credentials()[0])
 
522
 
 
523
    def test_sftp_is_none_if_no_config(self):
 
524
        t = self.get_transport_for_connection(set_config=False)
 
525
        self.assertIs(None, t._get_credentials()[0])
 
526
 
 
527
    def test_sftp_doesnt_prompt_username(self):
 
528
        stdout = tests.StringIOWrapper()
 
529
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
 
530
        t = self.get_transport_for_connection(set_config=False)
 
531
        self.assertIs(None, t._get_credentials()[0])
 
532
        # No prompts should've been printed, stdin shouldn't have been read
 
533
        self.assertEquals("", stdout.getvalue())
 
534
        self.assertEquals(0, ui.ui_factory.stdin.tell())