1
# Copyright (C) 2005-2012, 2016 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
6
5
# the Free Software Foundation; either version 2 of the License, or
7
6
# (at your option) any later version.
9
8
# This program is distributed in the hope that it will be useful,
10
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
11
# GNU General Public License for more details.
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28
transport as _mod_transport,
31
from breezy.osutils import (
34
from breezy.tests import (
36
TestCaseWithTransport,
40
from breezy.tests.http_server import HttpServer
42
if features.paramiko.available():
43
from breezy.transport import sftp as _mod_sftp
44
from breezy.tests import stub_sftp
47
def set_test_transport_to_sftp(testcase):
48
"""A helper to set transports on test case instances."""
49
if getattr(testcase, '_get_remote_is_absolute', None) is None:
50
testcase._get_remote_is_absolute = True
51
if testcase._get_remote_is_absolute:
52
testcase.transport_server = stub_sftp.SFTPAbsoluteServer
54
testcase.transport_server = stub_sftp.SFTPHomeDirServer
55
testcase.transport_readonly_server = HttpServer
58
class TestCaseWithSFTPServer(TestCaseWithTransport):
59
"""A test case base class that provides a sftp server on localhost."""
62
super(TestCaseWithSFTPServer, self).setUp()
63
self.requireFeature(features.paramiko)
64
set_test_transport_to_sftp(self)
67
class SFTPLockTests(TestCaseWithSFTPServer):
69
def test_sftp_locks(self):
70
from breezy.errors import LockError
71
t = self.get_transport()
73
l = t.lock_write('bogus')
74
self.assertPathExists('bogus.write-lock')
76
# Don't wait for the lock, locking an already locked
77
# file should raise an assert
78
self.assertRaises(LockError, t.lock_write, 'bogus')
81
self.assertFalse(lexists('bogus.write-lock'))
83
with open('something.write-lock', 'wb') as f:
84
f.write(b'fake lock\n')
85
self.assertRaises(LockError, t.lock_write, 'something')
86
os.remove('something.write-lock')
88
l = t.lock_write('something')
90
l2 = t.lock_write('bogus')
96
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
97
"""Test the SFTP transport with homedir based relative paths."""
99
def test__remote_path(self):
100
if sys.platform == 'darwin':
101
# This test is about sftp absolute path handling. There is already
102
# (in this test) a TODO about windows needing an absolute path
103
# without drive letter. To me, using self.test_dir is a trick to
104
# get an absolute path for comparison purposes. That fails for OSX
105
# because the sftp server doesn't resolve the links (and it doesn't
106
# have to). --vila 20070924
107
self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
108
' testing against self.test_dir'
109
' is not appropriate')
110
t = self.get_transport()
111
# This test require unix-like absolute path
112
test_dir = self.test_dir
113
if sys.platform == 'win32':
114
# using hack suggested by John Meinel.
115
# TODO: write another mock server for this test
116
# and use absolute path without drive letter
117
test_dir = '/' + test_dir
118
# try what is currently used:
119
# remote path = self._abspath(relpath)
120
self.assertIsSameRealPath(test_dir + '/relative',
121
t._remote_path('relative'))
122
# we dont os.path.join because windows gives us the wrong path
123
root_segments = test_dir.split('/')
124
root_parent = '/'.join(root_segments[:-1])
125
# .. should be honoured
126
self.assertIsSameRealPath(root_parent + '/sibling',
127
t._remote_path('../sibling'))
128
# / should be illegal ?
129
# FIXME decide and then test for all transports. RBC20051208
132
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
133
"""Test the SFTP transport with homedir based relative paths."""
136
# Only SFTPHomeDirServer is tested here
137
self._get_remote_is_absolute = False
138
super(SFTPTransportTestRelativeRoot, self).setUp()
140
def test__remote_path_relative_root(self):
141
# relative paths are preserved
142
t = self.get_transport('')
143
self.assertEqual('/~/', t._parsed_url.path)
144
# the remote path should be relative to home dir
145
# (i.e. not begining with a '/')
146
self.assertEqual('a', t._remote_path('a'))
149
class SFTPNonServerTest(TestCase):
152
super(SFTPNonServerTest, self).setUp()
153
self.requireFeature(features.paramiko)
155
def test_parse_url_with_home_dir(self):
156
s = _mod_sftp.SFTPTransport(
157
'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
158
self.assertEqual(s._parsed_url.host, 'example.com')
159
self.assertEqual(s._parsed_url.port, 2222)
160
self.assertEqual(s._parsed_url.user, 'robey')
161
self.assertEqual(s._parsed_url.password, 'h@t')
162
self.assertEqual(s._parsed_url.path, '/~/relative/')
164
def test_relpath(self):
165
s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path')
166
self.assertRaises(errors.PathNotChild, s.relpath,
167
'sftp://user@host.com/~/rel/path/sub')
169
def test_get_paramiko_vendor(self):
170
"""Test that if no 'ssh' is available we get builtin paramiko"""
171
from breezy.transport import ssh
172
# set '.' as the only location in the path, forcing no 'ssh' to exist
173
self.overrideAttr(ssh, '_ssh_vendor_manager')
174
self.overrideEnv('PATH', '.')
175
ssh._ssh_vendor_manager.clear_cache()
176
vendor = ssh._get_ssh_vendor()
177
self.assertIsInstance(vendor, ssh.ParamikoVendor)
179
def test_abspath_root_sibling_server(self):
180
server = stub_sftp.SFTPSiblingAbsoluteServer()
181
server.start_server()
182
self.addCleanup(server.stop_server)
184
transport = _mod_transport.get_transport_from_url(server.get_url())
185
self.assertFalse(transport.abspath('/').endswith('/~/'))
186
self.assertTrue(transport.abspath('/').endswith('/'))
190
class SFTPBranchTest(TestCaseWithSFTPServer):
191
"""Test some stuff when accessing a bzr Branch over sftp"""
193
def test_push_support(self):
194
self.build_tree(['a/', 'a/foo'])
195
t = controldir.ControlDir.create_standalone_workingtree('a')
198
t.commit('foo', rev_id=b'a1')
200
b2 = controldir.ControlDir.create_branch_and_repo(self.get_url('/b'))
203
self.assertEqual(b2.last_revision(), b'a1')
205
with open('a/foo', 'wt') as f:
206
f.write('something new in foo\n')
207
t.commit('new', rev_id=b'a2')
210
self.assertEqual(b2.last_revision(), b'a2')
213
class SSHVendorConnection(TestCaseWithSFTPServer):
214
"""Test that the ssh vendors can all connect.
216
Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
218
We have 3 sftp implementations in the test suite:
219
'loopback': Doesn't use ssh, just uses a local socket. Most tests are
220
done this way to save the handshaking time, so it is not
222
'none': This uses paramiko's built-in ssh client and server, and
223
layers sftp on top of it.
224
None: If 'ssh' exists on the machine, then it will be spawned as a
229
super(SSHVendorConnection, self).setUp()
232
"""Just a wrapper so that when created, it will set _vendor"""
233
# SFTPFullAbsoluteServer can handle any vendor,
234
# it just needs to be set between the time it is instantiated
235
# and the time .setUp() is called
236
server = stub_sftp.SFTPFullAbsoluteServer()
237
server._vendor = self._test_vendor
239
self._test_vendor = 'loopback'
240
self.vfs_transport_server = create_server
241
f = open('a_file', 'wb')
247
def set_vendor(self, vendor):
248
self._test_vendor = vendor
250
def test_connection_paramiko(self):
251
from breezy.transport import ssh
252
self.set_vendor(ssh.ParamikoVendor())
253
t = self.get_transport()
254
self.assertEqual(b'foobar\n', t.get('a_file').read())
256
def test_connection_vendor(self):
257
raise TestSkipped("We don't test spawning real ssh,"
258
" because it prompts for a password."
259
" Enable this test if we figure out"
260
" how to prevent this.")
261
self.set_vendor(None)
262
t = self.get_transport()
263
self.assertEqual(b'foobar\n', t.get('a_file').read())
266
class SSHVendorBadConnection(TestCaseWithTransport):
267
"""Test that the ssh vendors handle bad connection properly
269
We don't subclass TestCaseWithSFTPServer, because we don't actually
270
need an SFTP connection.
274
self.requireFeature(features.paramiko)
275
super(SSHVendorBadConnection, self).setUp()
277
# open a random port, so we know nobody else is using it
278
# but don't actually listen on the port.
280
s.bind(('localhost', 0))
281
self.addCleanup(s.close)
282
self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
284
def set_vendor(self, vendor, subprocess_stderr=None):
285
from breezy.transport import ssh
286
self.overrideAttr(ssh._ssh_vendor_manager,
287
'_cached_ssh_vendor', vendor)
288
if subprocess_stderr is not None:
289
self.overrideAttr(ssh.SubprocessVendor, "_stderr_target",
292
def test_bad_connection_paramiko(self):
293
"""Test that a real connection attempt raises the right error"""
294
from breezy.transport import ssh
295
self.set_vendor(ssh.ParamikoVendor())
296
t = _mod_transport.get_transport_from_url(self.bogus_url)
297
self.assertRaises(errors.ConnectionError, t.get, 'foobar')
299
def test_bad_connection_ssh(self):
300
"""None => auto-detect vendor"""
301
f = open(os.devnull, "wb")
302
self.addCleanup(f.close)
303
self.set_vendor(None, f)
304
t = _mod_transport.get_transport_from_url(self.bogus_url)
306
self.assertRaises(errors.ConnectionError, t.get, 'foobar')
307
except NameError as e:
308
if "global name 'SSHException'" in str(e):
309
self.knownFailure('Known NameError bug in paramiko 1.6.1')
313
class SFTPLatencyKnob(TestCaseWithSFTPServer):
314
"""Test that the testing SFTPServer's latency knob works."""
316
def test_latency_knob_slows_transport(self):
317
# change the latency knob to 500ms. We take about 40ms for a
318
# loopback connection ordinarily.
319
start_time = time.time()
320
self.get_server().add_latency = 0.5
321
transport = self.get_transport()
322
transport.has('not me') # Force connection by issuing a request
323
with_latency_knob_time = time.time() - start_time
324
self.assertTrue(with_latency_knob_time > 0.4)
326
def test_default(self):
327
# This test is potentially brittle: under extremely high machine load
328
# it could fail, but that is quite unlikely
329
raise TestSkipped('Timing-sensitive test')
330
start_time = time.time()
331
transport = self.get_transport()
332
transport.has('not me') # Force connection by issuing a request
333
regular_time = time.time() - start_time
334
self.assertTrue(regular_time < 0.5)
337
class FakeSocket(object):
338
"""Fake socket object used to test the SocketDelay wrapper without
345
def send(self, data, flags=0):
349
def sendall(self, data, flags=0):
353
def recv(self, size, flags=0):
354
if size < len(self._data):
355
result = self._data[:size]
356
self._data = self._data[size:]
364
class TestSocketDelay(TestCase):
367
super(TestSocketDelay, self).setUp()
368
self.requireFeature(features.paramiko)
370
def test_delay(self):
371
sending = FakeSocket()
372
receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000,
374
# check that simulated time is charged only per round-trip:
375
t1 = stub_sftp.SocketDelay.simulated_time
376
receiving.send("connect1")
377
self.assertEqual(sending.recv(1024), "connect1")
378
t2 = stub_sftp.SocketDelay.simulated_time
379
self.assertAlmostEqual(t2 - t1, 0.1)
380
receiving.send("connect2")
381
self.assertEqual(sending.recv(1024), "connect2")
382
sending.send("hello")
383
self.assertEqual(receiving.recv(1024), "hello")
384
t3 = stub_sftp.SocketDelay.simulated_time
385
self.assertAlmostEqual(t3 - t2, 0.1)
386
sending.send("hello")
387
self.assertEqual(receiving.recv(1024), "hello")
388
sending.send("hello")
389
self.assertEqual(receiving.recv(1024), "hello")
390
sending.send("hello")
391
self.assertEqual(receiving.recv(1024), "hello")
392
t4 = stub_sftp.SocketDelay.simulated_time
393
self.assertAlmostEqual(t4, t3)
395
def test_bandwidth(self):
396
sending = FakeSocket()
397
receiving = stub_sftp.SocketDelay(
398
sending, 0, bandwidth=8.0 / (1024 * 1024), really_sleep=False)
399
# check that simulated time is charged only per round-trip:
400
t1 = stub_sftp.SocketDelay.simulated_time
401
receiving.send("connect")
402
self.assertEqual(sending.recv(1024), "connect")
403
sending.send("a" * 100)
404
self.assertEqual(receiving.recv(1024), "a" * 100)
405
t2 = stub_sftp.SocketDelay.simulated_time
406
self.assertAlmostEqual(t2 - t1, 100 + 7)
409
class ReadvFile(object):
410
"""An object that acts like Paramiko's SFTPFile when readv() is used"""
412
def __init__(self, data):
415
def readv(self, requests):
416
for start, length in requests:
417
yield self._data[start:start + length]
423
def _null_report_activity(*a, **k):
427
class Test_SFTPReadvHelper(tests.TestCase):
429
def checkGetRequests(self, expected_requests, offsets):
430
self.requireFeature(features.paramiko)
431
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
432
_null_report_activity)
433
self.assertEqual(expected_requests, helper._get_requests())
435
def test__get_requests(self):
436
# Small single requests become a single readv request
437
self.checkGetRequests([(0, 100)],
438
[(0, 20), (30, 50), (20, 10), (80, 20)])
439
# Non-contiguous ranges are given as multiple requests
440
self.checkGetRequests([(0, 20), (30, 50)],
441
[(10, 10), (30, 20), (0, 10), (50, 30)])
442
# Ranges larger than _max_request_size (32kB) are broken up into
443
# multiple requests, even if it actually spans multiple logical
445
self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
446
[(0, 40000), (40000, 100), (40100, 1900),
449
def checkRequestAndYield(self, expected, data, offsets):
450
self.requireFeature(features.paramiko)
451
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
452
_null_report_activity)
453
data_f = ReadvFile(data)
454
result = list(helper.request_and_yield_offsets(data_f))
455
self.assertEqual(expected, result)
457
def test_request_and_yield_offsets(self):
458
data = b'abcdefghijklmnopqrstuvwxyz'
459
self.checkRequestAndYield([(0, b'a'), (5, b'f'), (10, b'klm')], data,
460
[(0, 1), (5, 1), (10, 3)])
461
# Should combine requests, and split them again
462
self.checkRequestAndYield([(0, b'a'), (1, b'b'), (10, b'klm')], data,
463
[(0, 1), (1, 1), (10, 3)])
464
# Out of order requests. The requests should get combined, but then be
465
# yielded out-of-order. We also need one that is at the end of a
466
# previous range. See bug #293746
467
self.checkRequestAndYield(
468
[(0, b'a'), (10, b'k'), (4, b'efg'), (1, b'bcd')],
469
data, [(0, 1), (10, 1), (4, 3), (1, 3)])
472
class TestUsesAuthConfig(TestCaseWithSFTPServer):
473
"""Test that AuthenticationConfig can supply default usernames."""
475
def get_transport_for_connection(self, set_config):
476
port = self.get_server().port
478
conf = config.AuthenticationConfig()
479
conf._get_config().update(
480
{'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
482
t = _mod_transport.get_transport_from_url(
483
'sftp://localhost:%d' % port)
484
# force a connection to be performed.
488
def test_sftp_uses_config(self):
489
t = self.get_transport_for_connection(set_config=True)
490
self.assertEqual('bar', t._get_credentials()[0])
492
def test_sftp_is_none_if_no_config(self):
493
t = self.get_transport_for_connection(set_config=False)
494
self.assertIs(None, t._get_credentials()[0])
496
def test_sftp_doesnt_prompt_username(self):
497
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n')
498
t = self.get_transport_for_connection(set_config=False)
499
self.assertIs(None, t._get_credentials()[0])
500
# No prompts should've been printed, stdin shouldn't have been read
501
self.assertEqual("", ui.ui_factory.stdout.getvalue())
502
self.assertEqual(0, ui.ui_factory.stdin.tell())
22
from bzrlib.selftest import TestCaseInTempDir
23
from bzrlib.selftest.testtransport import TestTransportMixIn
27
from stub_sftp import StubServer, StubSFTPServer
28
paramiko_loaded = True
30
paramiko_loaded = False
34
-----BEGIN RSA PRIVATE KEY-----
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
48
-----END RSA PRIVATE KEY-----
52
class SingleListener (threading.Thread):
53
def __init__(self, callback):
54
threading.Thread.__init__(self)
55
self._callback = callback
56
self._socket = socket.socket()
57
self._socket.listen(1)
58
self.port = self._socket.getsockname()[1]
59
self.stop_event = threading.Event()
62
s, _ = self._socket.accept()
63
# now close the listen socket
65
self._callback(s, self.stop_event)
71
class TestCaseWithSFTPServer (TestCaseInTempDir):
73
Execute a test case with a stub SFTP server, serving files from the local
74
filesystem over the loopback network.
77
def _run_server(self, s, stop_event):
78
ssh_server = paramiko.Transport(s)
79
key_file = os.path.join(self._root, 'test_rsa.key')
80
file(key_file, 'w').write(STUB_SERVER_KEY)
81
host_key = paramiko.RSAKey.from_private_key_file(key_file)
82
ssh_server.add_server_key(host_key)
84
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
85
event = threading.Event()
86
ssh_server.start_server(event, server)
91
TestCaseInTempDir.setUp(self)
92
self._root = self.test_dir
94
self._listener = SingleListener(self._run_server)
95
self._listener.setDaemon(True)
96
self._listener.start()
97
self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
100
self._listener.stop()
101
TestCaseInTempDir.tearDown(self)
104
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
107
def get_transport(self):
108
from bzrlib.transport.sftp import SFTPTransport
110
return SFTPTransport(url)
112
if not paramiko_loaded:
113
del SFTPTransportTest