/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

Merge with serialize-transform

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
 
18
import os
 
19
import socket
 
20
import sys
 
21
import threading
 
22
import time
 
23
 
 
24
try:
 
25
    import paramiko
 
26
    paramiko_loaded = True
 
27
except ImportError:
 
28
    paramiko_loaded = False
 
29
 
 
30
from bzrlib import (
 
31
    bzrdir,
 
32
    config,
 
33
    errors,
 
34
    tests,
 
35
    transport as _mod_transport,
 
36
    )
 
37
from bzrlib.osutils import (
 
38
    pathjoin,
 
39
    lexists,
 
40
    set_or_unset_env,
 
41
    )
 
42
from bzrlib.tests import (
 
43
    TestCaseWithTransport,
 
44
    TestCase,
 
45
    TestSkipped,
 
46
    )
 
47
from bzrlib.tests.http_server import HttpServer
 
48
from bzrlib.transport import get_transport
 
49
import bzrlib.transport.http
 
50
 
 
51
if paramiko_loaded:
 
52
    from bzrlib.transport import sftp as _mod_sftp
 
53
    from bzrlib.transport.sftp import (
 
54
        SFTPAbsoluteServer,
 
55
        SFTPHomeDirServer,
 
56
        SFTPTransport,
 
57
        )
 
58
 
 
59
from bzrlib.workingtree import WorkingTree
 
60
 
 
61
 
 
62
def set_test_transport_to_sftp(testcase):
 
63
    """A helper to set transports on test case instances."""
 
64
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
 
65
        testcase._get_remote_is_absolute = True
 
66
    if testcase._get_remote_is_absolute:
 
67
        testcase.transport_server = SFTPAbsoluteServer
 
68
    else:
 
69
        testcase.transport_server = SFTPHomeDirServer
 
70
    testcase.transport_readonly_server = HttpServer
 
71
 
 
72
 
 
73
class TestCaseWithSFTPServer(TestCaseWithTransport):
 
74
    """A test case base class that provides a sftp server on localhost."""
 
75
 
 
76
    def setUp(self):
 
77
        super(TestCaseWithSFTPServer, self).setUp()
 
78
        if not paramiko_loaded:
 
79
            raise TestSkipped('you must have paramiko to run this test')
 
80
        set_test_transport_to_sftp(self)
 
81
 
 
82
 
 
83
class SFTPLockTests(TestCaseWithSFTPServer):
 
84
 
 
85
    def test_sftp_locks(self):
 
86
        from bzrlib.errors import LockError
 
87
        t = self.get_transport()
 
88
 
 
89
        l = t.lock_write('bogus')
 
90
        self.failUnlessExists('bogus.write-lock')
 
91
 
 
92
        # Don't wait for the lock, locking an already locked
 
93
        # file should raise an assert
 
94
        self.assertRaises(LockError, t.lock_write, 'bogus')
 
95
 
 
96
        l.unlock()
 
97
        self.failIf(lexists('bogus.write-lock'))
 
98
 
 
99
        open('something.write-lock', 'wb').write('fake lock\n')
 
100
        self.assertRaises(LockError, t.lock_write, 'something')
 
101
        os.remove('something.write-lock')
 
102
 
 
103
        l = t.lock_write('something')
 
104
 
 
105
        l2 = t.lock_write('bogus')
 
106
 
 
107
        l.unlock()
 
108
        l2.unlock()
 
109
 
 
110
 
 
111
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
112
    """Test the SFTP transport with homedir based relative paths."""
 
113
 
 
114
    def test__remote_path(self):
 
115
        if sys.platform == 'darwin':
 
116
            # This test is about sftp absolute path handling. There is already
 
117
            # (in this test) a TODO about windows needing an absolute path
 
118
            # without drive letter. To me, using self.test_dir is a trick to
 
119
            # get an absolute path for comparison purposes.  That fails for OSX
 
120
            # because the sftp server doesn't resolve the links (and it doesn't
 
121
            # have to). --vila 20070924
 
122
            self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
 
123
                              ' testing against self.test_dir'
 
124
                              ' is not appropriate')
 
125
        t = self.get_transport()
 
126
        # This test require unix-like absolute path
 
127
        test_dir = self.test_dir
 
128
        if sys.platform == 'win32':
 
129
            # using hack suggested by John Meinel.
 
130
            # TODO: write another mock server for this test
 
131
            #       and use absolute path without drive letter
 
132
            test_dir = '/' + test_dir
 
133
        # try what is currently used:
 
134
        # remote path = self._abspath(relpath)
 
135
        self.assertIsSameRealPath(test_dir + '/relative',
 
136
                                  t._remote_path('relative'))
 
137
        # we dont os.path.join because windows gives us the wrong path
 
138
        root_segments = test_dir.split('/')
 
139
        root_parent = '/'.join(root_segments[:-1])
 
140
        # .. should be honoured
 
141
        self.assertIsSameRealPath(root_parent + '/sibling',
 
142
                                  t._remote_path('../sibling'))
 
143
        # /  should be illegal ?
 
144
        ### FIXME decide and then test for all transports. RBC20051208
 
145
 
 
146
 
 
147
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
 
148
    """Test the SFTP transport with homedir based relative paths."""
 
149
 
 
150
    def setUp(self):
 
151
        # Only SFTPHomeDirServer is tested here
 
152
        self._get_remote_is_absolute = False
 
153
        super(SFTPTransportTestRelativeRoot, self).setUp()
 
154
 
 
155
    def test__remote_path_relative_root(self):
 
156
        # relative paths are preserved
 
157
        t = self.get_transport('')
 
158
        self.assertEqual('/~/', t._path)
 
159
        # the remote path should be relative to home dir
 
160
        # (i.e. not begining with a '/')
 
161
        self.assertEqual('a', t._remote_path('a'))
 
162
 
 
163
 
 
164
class SFTPNonServerTest(TestCase):
 
165
    def setUp(self):
 
166
        TestCase.setUp(self)
 
167
        if not paramiko_loaded:
 
168
            raise TestSkipped('you must have paramiko to run this test')
 
169
 
 
170
    def test_parse_url_with_home_dir(self):
 
171
        s = SFTPTransport('sftp://ro%62ey:h%40t@example.com:2222/~/relative')
 
172
        self.assertEquals(s._host, 'example.com')
 
173
        self.assertEquals(s._port, 2222)
 
174
        self.assertEquals(s._user, 'robey')
 
175
        self.assertEquals(s._password, 'h@t')
 
176
        self.assertEquals(s._path, '/~/relative/')
 
177
 
 
178
    def test_relpath(self):
 
179
        s = SFTPTransport('sftp://user@host.com/abs/path')
 
180
        self.assertRaises(errors.PathNotChild, s.relpath,
 
181
                          'sftp://user@host.com/~/rel/path/sub')
 
182
 
 
183
    def test_get_paramiko_vendor(self):
 
184
        """Test that if no 'ssh' is available we get builtin paramiko"""
 
185
        from bzrlib.transport import ssh
 
186
        # set '.' as the only location in the path, forcing no 'ssh' to exist
 
187
        orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
 
188
        orig_path = set_or_unset_env('PATH', '.')
 
189
        try:
 
190
            # No vendor defined yet, query for one
 
191
            ssh._ssh_vendor_manager.clear_cache()
 
192
            vendor = ssh._get_ssh_vendor()
 
193
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
 
194
        finally:
 
195
            set_or_unset_env('PATH', orig_path)
 
196
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
 
197
 
 
198
    def test_abspath_root_sibling_server(self):
 
199
        from bzrlib.transport.sftp import SFTPSiblingAbsoluteServer
 
200
        server = SFTPSiblingAbsoluteServer()
 
201
        server.setUp()
 
202
        try:
 
203
            transport = get_transport(server.get_url())
 
204
            self.assertFalse(transport.abspath('/').endswith('/~/'))
 
205
            self.assertTrue(transport.abspath('/').endswith('/'))
 
206
            del transport
 
207
        finally:
 
208
            server.tearDown()
 
209
 
 
210
 
 
211
class SFTPBranchTest(TestCaseWithSFTPServer):
 
212
    """Test some stuff when accessing a bzr Branch over sftp"""
 
213
 
 
214
    def test_lock_file(self):
 
215
        # old format branches use a special lock file on sftp.
 
216
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
 
217
        b = bzrlib.branch.Branch.open(self.get_url())
 
218
        self.failUnlessExists('.bzr/')
 
219
        self.failUnlessExists('.bzr/branch-format')
 
220
        self.failUnlessExists('.bzr/branch-lock')
 
221
 
 
222
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
223
        b.lock_write()
 
224
        self.failUnlessExists('.bzr/branch-lock.write-lock')
 
225
        b.unlock()
 
226
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
227
 
 
228
    def test_push_support(self):
 
229
        self.build_tree(['a/', 'a/foo'])
 
230
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
 
231
        b = t.branch
 
232
        t.add('foo')
 
233
        t.commit('foo', rev_id='a1')
 
234
 
 
235
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
 
236
        b2.pull(b)
 
237
 
 
238
        self.assertEquals(b2.revision_history(), ['a1'])
 
239
 
 
240
        open('a/foo', 'wt').write('something new in foo\n')
 
241
        t.commit('new', rev_id='a2')
 
242
        b2.pull(b)
 
243
 
 
244
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
 
245
 
 
246
 
 
247
class SSHVendorConnection(TestCaseWithSFTPServer):
 
248
    """Test that the ssh vendors can all connect.
 
249
 
 
250
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
 
251
 
 
252
    We have 3 sftp implementations in the test suite:
 
253
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
 
254
                  done this way to save the handshaking time, so it is not
 
255
                  tested again here
 
256
      'none':     This uses paramiko's built-in ssh client and server, and layers
 
257
                  sftp on top of it.
 
258
      None:       If 'ssh' exists on the machine, then it will be spawned as a
 
259
                  child process.
 
260
    """
 
261
    
 
262
    def setUp(self):
 
263
        super(SSHVendorConnection, self).setUp()
 
264
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
265
 
 
266
        def create_server():
 
267
            """Just a wrapper so that when created, it will set _vendor"""
 
268
            # SFTPFullAbsoluteServer can handle any vendor,
 
269
            # it just needs to be set between the time it is instantiated
 
270
            # and the time .setUp() is called
 
271
            server = SFTPFullAbsoluteServer()
 
272
            server._vendor = self._test_vendor
 
273
            return server
 
274
        self._test_vendor = 'loopback'
 
275
        self.vfs_transport_server = create_server
 
276
        f = open('a_file', 'wb')
 
277
        try:
 
278
            f.write('foobar\n')
 
279
        finally:
 
280
            f.close()
 
281
 
 
282
    def set_vendor(self, vendor):
 
283
        self._test_vendor = vendor
 
284
 
 
285
    def test_connection_paramiko(self):
 
286
        from bzrlib.transport import ssh
 
287
        self.set_vendor(ssh.ParamikoVendor())
 
288
        t = self.get_transport()
 
289
        self.assertEqual('foobar\n', t.get('a_file').read())
 
290
 
 
291
    def test_connection_vendor(self):
 
292
        raise TestSkipped("We don't test spawning real ssh,"
 
293
                          " because it prompts for a password."
 
294
                          " Enable this test if we figure out"
 
295
                          " how to prevent this.")
 
296
        self.set_vendor(None)
 
297
        t = self.get_transport()
 
298
        self.assertEqual('foobar\n', t.get('a_file').read())
 
299
 
 
300
 
 
301
class SSHVendorBadConnection(TestCaseWithTransport):
 
302
    """Test that the ssh vendors handle bad connection properly
 
303
 
 
304
    We don't subclass TestCaseWithSFTPServer, because we don't actually
 
305
    need an SFTP connection.
 
306
    """
 
307
 
 
308
    def setUp(self):
 
309
        if not paramiko_loaded:
 
310
            raise TestSkipped('you must have paramiko to run this test')
 
311
        super(SSHVendorBadConnection, self).setUp()
 
312
        import bzrlib.transport.ssh
 
313
 
 
314
        # open a random port, so we know nobody else is using it
 
315
        # but don't actually listen on the port.
 
316
        s = socket.socket()
 
317
        s.bind(('localhost', 0))
 
318
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
 
319
 
 
320
        orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
 
321
        def reset():
 
322
            bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
 
323
            s.close()
 
324
        self.addCleanup(reset)
 
325
 
 
326
    def set_vendor(self, vendor):
 
327
        import bzrlib.transport.ssh
 
328
        bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
 
329
 
 
330
    def test_bad_connection_paramiko(self):
 
331
        """Test that a real connection attempt raises the right error"""
 
332
        from bzrlib.transport import ssh
 
333
        self.set_vendor(ssh.ParamikoVendor())
 
334
        t = bzrlib.transport.get_transport(self.bogus_url)
 
335
        self.assertRaises(errors.ConnectionError, t.get, 'foobar')
 
336
 
 
337
    def test_bad_connection_ssh(self):
 
338
        """None => auto-detect vendor"""
 
339
        self.set_vendor(None)
 
340
        # This is how I would normally test the connection code
 
341
        # it makes it very clear what we are testing.
 
342
        # However, 'ssh' will create stipple on the output, so instead
 
343
        # I'm using run_bzr_subprocess, and parsing the output
 
344
        # try:
 
345
        #     t = bzrlib.transport.get_transport(self.bogus_url)
 
346
        # except errors.ConnectionError:
 
347
        #     # Correct error
 
348
        #     pass
 
349
        # except errors.NameError, e:
 
350
        #     if 'SSHException' in str(e):
 
351
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
 
352
        #     raise
 
353
        # else:
 
354
        #     self.fail('Excepted ConnectionError to be raised')
 
355
 
 
356
        out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
 
357
        self.assertEqual('', out)
 
358
        if "NameError: global name 'SSHException'" in err:
 
359
            # We aren't fixing this bug, because it is a bug in
 
360
            # paramiko, but we know about it, so we don't have to
 
361
            # fail the test
 
362
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
 
363
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
 
364
                                   r' 127\.0\.0\.1:\d+; ')
 
365
 
 
366
 
 
367
class SFTPLatencyKnob(TestCaseWithSFTPServer):
 
368
    """Test that the testing SFTPServer's latency knob works."""
 
369
 
 
370
    def test_latency_knob_slows_transport(self):
 
371
        # change the latency knob to 500ms. We take about 40ms for a 
 
372
        # loopback connection ordinarily.
 
373
        start_time = time.time()
 
374
        self.get_server().add_latency = 0.5
 
375
        transport = self.get_transport()
 
376
        transport.has('not me') # Force connection by issuing a request
 
377
        with_latency_knob_time = time.time() - start_time
 
378
        self.assertTrue(with_latency_knob_time > 0.4)
 
379
 
 
380
    def test_default(self):
 
381
        # This test is potentially brittle: under extremely high machine load
 
382
        # it could fail, but that is quite unlikely
 
383
        raise TestSkipped('Timing-sensitive test')
 
384
        start_time = time.time()
 
385
        transport = self.get_transport()
 
386
        transport.has('not me') # Force connection by issuing a request
 
387
        regular_time = time.time() - start_time
 
388
        self.assertTrue(regular_time < 0.5)
 
389
 
 
390
 
 
391
class FakeSocket(object):
 
392
    """Fake socket object used to test the SocketDelay wrapper without
 
393
    using a real socket.
 
394
    """
 
395
 
 
396
    def __init__(self):
 
397
        self._data = ""
 
398
 
 
399
    def send(self, data, flags=0):
 
400
        self._data += data
 
401
        return len(data)
 
402
 
 
403
    def sendall(self, data, flags=0):
 
404
        self._data += data
 
405
        return len(data)
 
406
 
 
407
    def recv(self, size, flags=0):
 
408
        if size < len(self._data):
 
409
            result = self._data[:size]
 
410
            self._data = self._data[size:]
 
411
            return result
 
412
        else:
 
413
            result = self._data
 
414
            self._data = ""
 
415
            return result
 
416
 
 
417
 
 
418
class TestSocketDelay(TestCase):
 
419
 
 
420
    def setUp(self):
 
421
        TestCase.setUp(self)
 
422
        if not paramiko_loaded:
 
423
            raise TestSkipped('you must have paramiko to run this test')
 
424
 
 
425
    def test_delay(self):
 
426
        from bzrlib.transport.sftp import SocketDelay
 
427
        sending = FakeSocket()
 
428
        receiving = SocketDelay(sending, 0.1, bandwidth=1000000,
 
429
                                really_sleep=False)
 
430
        # check that simulated time is charged only per round-trip:
 
431
        t1 = SocketDelay.simulated_time
 
432
        receiving.send("connect1")
 
433
        self.assertEqual(sending.recv(1024), "connect1")
 
434
        t2 = SocketDelay.simulated_time
 
435
        self.assertAlmostEqual(t2 - t1, 0.1)
 
436
        receiving.send("connect2")
 
437
        self.assertEqual(sending.recv(1024), "connect2")
 
438
        sending.send("hello")
 
439
        self.assertEqual(receiving.recv(1024), "hello")
 
440
        t3 = SocketDelay.simulated_time
 
441
        self.assertAlmostEqual(t3 - t2, 0.1)
 
442
        sending.send("hello")
 
443
        self.assertEqual(receiving.recv(1024), "hello")
 
444
        sending.send("hello")
 
445
        self.assertEqual(receiving.recv(1024), "hello")
 
446
        sending.send("hello")
 
447
        self.assertEqual(receiving.recv(1024), "hello")
 
448
        t4 = SocketDelay.simulated_time
 
449
        self.assertAlmostEqual(t4, t3)
 
450
 
 
451
    def test_bandwidth(self):
 
452
        from bzrlib.transport.sftp import SocketDelay
 
453
        sending = FakeSocket()
 
454
        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
 
455
                                really_sleep=False)
 
456
        # check that simulated time is charged only per round-trip:
 
457
        t1 = SocketDelay.simulated_time
 
458
        receiving.send("connect")
 
459
        self.assertEqual(sending.recv(1024), "connect")
 
460
        sending.send("a" * 100)
 
461
        self.assertEqual(receiving.recv(1024), "a" * 100)
 
462
        t2 = SocketDelay.simulated_time
 
463
        self.assertAlmostEqual(t2 - t1, 100 + 7)
 
464
 
 
465
 
 
466
class Test_SFTPReadvHelper(tests.TestCase):
 
467
 
 
468
    def checkGetRequests(self, expected_requests, offsets):
 
469
        if not paramiko_loaded:
 
470
            raise TestSkipped('you must have paramiko to run this test')
 
471
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test')
 
472
        self.assertEqual(expected_requests, helper._get_requests())
 
473
 
 
474
    def test__get_requests(self):
 
475
        # Small single requests become a single readv request
 
476
        self.checkGetRequests([(0, 100)],
 
477
                              [(0, 20), (30, 50), (20, 10), (80, 20)])
 
478
        # Non-contiguous ranges are given as multiple requests
 
479
        self.checkGetRequests([(0, 20), (30, 50)],
 
480
                              [(10, 10), (30, 20), (0, 10), (50, 30)])
 
481
        # Ranges larger than _max_request_size (32kB) are broken up into
 
482
        # multiple requests, even if it actually spans multiple logical
 
483
        # requests
 
484
        self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
 
485
                              [(0, 40000), (40000, 100), (40100, 1900),
 
486
                               (42000, 24000)])
 
487
 
 
488
 
 
489
class TestUsesAuthConfig(TestCaseWithSFTPServer):
 
490
    """Test that AuthenticationConfig can supply default usernames."""
 
491
 
 
492
    def get_transport_for_connection(self, set_config):
 
493
        port = self.get_server()._listener.port
 
494
        if set_config:
 
495
            conf = config.AuthenticationConfig()
 
496
            conf._get_config().update(
 
497
                {'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
 
498
            conf._save()
 
499
        t = get_transport('sftp://localhost:%d' % port)
 
500
        # force a connection to be performed.
 
501
        t.has('foo')
 
502
        return t
 
503
 
 
504
    def test_sftp_uses_config(self):
 
505
        t = self.get_transport_for_connection(set_config=True)
 
506
        self.assertEqual('bar', t._get_credentials()[0])
 
507
 
 
508
    def test_sftp_is_none_if_no_config(self):
 
509
        t = self.get_transport_for_connection(set_config=False)
 
510
        self.assertIs(None, t._get_credentials()[0])