1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006 Canonical Ltd
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Implementation of Transport over SFTP, using paramiko."""
20
# TODO: Remove the transport-based lock_read and lock_write methods. They'll
21
# then raise TransportNotPossible, which will break remote access to any
22
# formats which rely on OS-level locks. That should be fine as those formats
23
# are pretty old, but these combinations may have to be removed from the test
39
from bzrlib.errors import (FileExists,
40
NoSuchFile, PathNotChild,
47
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
48
from bzrlib.trace import mutter, warning
49
from bzrlib.transport import (
50
register_urlparse_netloc_protocol,
56
import bzrlib.urlutils as urlutils
60
except ImportError, e:
61
raise ParamikoNotPresent(e)
63
from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
64
SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
66
from paramiko.sftp_attr import SFTPAttributes
67
from paramiko.sftp_file import SFTPFile
70
register_urlparse_netloc_protocol('sftp')
73
# This is a weakref dictionary, so that we can reuse connections
74
# that are still active. Long term, it might be nice to have some
75
# sort of expiration policy, such as disconnect if inactive for
76
# X seconds. But that requires a lot more fanciness.
77
_connected_hosts = weakref.WeakValueDictionary()
80
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
81
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
82
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
85
def clear_connection_cache():
86
"""Remove all hosts from the SFTP connection cache.
88
Primarily useful for test cases wanting to force garbage collection.
90
_connected_hosts.clear()
93
class SFTPLock(object):
94
"""This fakes a lock in a remote location.
96
A present lock is indicated just by the existence of a file. This
97
doesn't work well on all transports and they are only used in
98
deprecated storage formats.
101
__slots__ = ['path', 'lock_path', 'lock_file', 'transport']
103
def __init__(self, path, transport):
104
assert isinstance(transport, SFTPTransport)
106
self.lock_file = None
108
self.lock_path = path + '.write-lock'
109
self.transport = transport
111
# RBC 20060103 FIXME should we be using private methods here ?
112
abspath = transport._remote_path(self.lock_path)
113
self.lock_file = transport._sftp_open_exclusive(abspath)
115
raise LockError('File %r already locked' % (self.path,))
118
"""Should this warn, or actually try to cleanup?"""
120
warning("SFTPLock %r not explicitly unlocked" % (self.path,))
124
if not self.lock_file:
126
self.lock_file.close()
127
self.lock_file = None
129
self.transport.delete(self.lock_path)
130
except (NoSuchFile,):
131
# What specific errors should we catch here?
135
class SFTPTransport(Transport):
136
"""Transport implementation for SFTP access."""
138
_do_prefetch = _default_do_prefetch
139
# TODO: jam 20060717 Conceivably these could be configurable, either
140
# by auto-tuning at run-time, or by a configuration (per host??)
141
# but the performance curve is pretty flat, so just going with
142
# reasonable defaults.
143
_max_readv_combine = 200
144
# Having to round trip to the server means waiting for a response,
145
# so it is better to download extra bytes.
146
# 8KiB had good performance for both local and remote network operations
147
_bytes_to_read_before_seek = 8192
149
# The sftp spec says that implementations SHOULD allow reads
150
# to be at least 32K. paramiko.readv() does an async request
151
# for the chunks. So we need to keep it within a single request
152
# size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
153
# up the request itself, rather than us having to worry about it
154
_max_request_size = 32768
156
def __init__(self, base, clone_from=None):
157
assert base.startswith('sftp://')
158
self._parse_url(base)
159
base = self._unparse_url()
162
super(SFTPTransport, self).__init__(base)
163
if clone_from is None:
166
# use the same ssh connection, etc
167
self._sftp = clone_from._sftp
168
# super saves 'self.base'
170
def should_cache(self):
172
Return True if the data pulled across should be cached locally.
176
def clone(self, offset=None):
178
Return a new SFTPTransport with root at self.base + offset.
179
We share the same SFTP session between such transports, because it's
180
fairly expensive to set them up.
183
return SFTPTransport(self.base, self)
185
return SFTPTransport(self.abspath(offset), self)
187
def abspath(self, relpath):
189
Return the full url to the given relative path.
191
@param relpath: the relative path or path components
192
@type relpath: str or list
194
return self._unparse_url(self._remote_path(relpath))
196
def _remote_path(self, relpath):
197
"""Return the path to be passed along the sftp protocol for relpath.
199
relpath is a urlencoded string.
201
# FIXME: share the common code across transports
202
assert isinstance(relpath, basestring)
203
relpath = urlutils.unescape(relpath).split('/')
204
basepath = self._path.split('/')
205
if len(basepath) > 0 and basepath[-1] == '':
206
basepath = basepath[:-1]
210
if len(basepath) == 0:
211
# In most filesystems, a request for the parent
212
# of root, just returns root.
220
path = '/'.join(basepath)
221
# mutter('relpath => remotepath %s => %s', relpath, path)
224
def relpath(self, abspath):
225
username, password, host, port, path = self._split_url(abspath)
227
if (username != self._username):
228
error.append('username mismatch')
229
if (host != self._host):
230
error.append('host mismatch')
231
if (port != self._port):
232
error.append('port mismatch')
233
if (not path.startswith(self._path)):
234
error.append('path mismatch')
236
extra = ': ' + ', '.join(error)
237
raise PathNotChild(abspath, self.base, extra=extra)
239
return path[pl:].strip('/')
241
def has(self, relpath):
243
Does the target location exist?
246
self._sftp.stat(self._remote_path(relpath))
251
def get(self, relpath):
253
Get the file at the given relative path.
255
:param relpath: The relative path to the file
258
path = self._remote_path(relpath)
259
f = self._sftp.file(path, mode='rb')
260
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
263
except (IOError, paramiko.SSHException), e:
264
self._translate_io_exception(e, path, ': error retrieving')
266
def readv(self, relpath, offsets):
267
"""See Transport.readv()"""
268
# We overload the default readv() because we want to use a file
269
# that does not have prefetch enabled.
270
# Also, if we have a new paramiko, it implements an async readv()
275
path = self._remote_path(relpath)
276
fp = self._sftp.file(path, mode='rb')
277
readv = getattr(fp, 'readv', None)
279
return self._sftp_readv(fp, offsets)
280
mutter('seek and read %s offsets', len(offsets))
281
return self._seek_and_read(fp, offsets)
282
except (IOError, paramiko.SSHException), e:
283
self._translate_io_exception(e, path, ': error retrieving')
285
def _sftp_readv(self, fp, offsets):
286
"""Use the readv() member of fp to do async readv.
288
And then read them using paramiko.readv(). paramiko.readv()
289
does not support ranges > 64K, so it caps the request size, and
290
just reads until it gets all the stuff it wants
292
offsets = list(offsets)
293
sorted_offsets = sorted(offsets)
295
# The algorithm works as follows:
296
# 1) Coalesce nearby reads into a single chunk
297
# This generates a list of combined regions, the total size
298
# and the size of the sub regions. This coalescing step is limited
299
# in the number of nearby chunks to combine, and is allowed to
300
# skip small breaks in the requests. Limiting it makes sure that
301
# we can start yielding some data earlier, and skipping means we
302
# make fewer requests. (Beneficial even when using async)
303
# 2) Break up this combined regions into chunks that are smaller
304
# than 64KiB. Technically the limit is 65536, but we are a
305
# little bit conservative. This is because sftp has a maximum
306
# return chunk size of 64KiB (max size of an unsigned short)
307
# 3) Issue a readv() to paramiko to create an async request for
309
# 4) Read in the data as it comes back, until we've read one
310
# continuous section as determined in step 1
311
# 5) Break up the full sections into hunks for the original requested
312
# offsets. And put them in a cache
313
# 6) Check if the next request is in the cache, and if it is, remove
314
# it from the cache, and yield its data. Continue until no more
315
# entries are in the cache.
316
# 7) loop back to step 4 until all data has been read
318
# TODO: jam 20060725 This could be optimized one step further, by
319
# attempting to yield whatever data we have read, even before
320
# the first coallesced section has been fully processed.
322
# When coalescing for use with readv(), we don't really need to
323
# use any fudge factor, because the requests are made asynchronously
324
coalesced = list(self._coalesce_offsets(sorted_offsets,
325
limit=self._max_readv_combine,
329
for c_offset in coalesced:
330
start = c_offset.start
331
size = c_offset.length
333
# We need to break this up into multiple requests
335
next_size = min(size, self._max_request_size)
336
requests.append((start, next_size))
340
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
341
len(offsets), len(coalesced), len(requests))
343
# Queue the current read until we have read the full coalesced section
346
cur_coalesced_stack = iter(coalesced)
347
cur_coalesced = cur_coalesced_stack.next()
349
# Cache the results, but only until they have been fulfilled
351
# turn the list of offsets into a stack
352
offset_stack = iter(offsets)
353
cur_offset_and_size = offset_stack.next()
355
for data in fp.readv(requests):
357
cur_data_len += len(data)
359
if cur_data_len < cur_coalesced.length:
361
assert cur_data_len == cur_coalesced.length, \
362
"Somehow we read too much: %s != %s" % (cur_data_len,
363
cur_coalesced.length)
364
all_data = ''.join(cur_data)
368
for suboffset, subsize in cur_coalesced.ranges:
369
key = (cur_coalesced.start+suboffset, subsize)
370
data_map[key] = all_data[suboffset:suboffset+subsize]
372
# Now that we've read some data, see if we can yield anything back
373
while cur_offset_and_size in data_map:
374
this_data = data_map.pop(cur_offset_and_size)
375
yield cur_offset_and_size[0], this_data
376
cur_offset_and_size = offset_stack.next()
378
# Now that we've read all of the data for this coalesced section
380
cur_coalesced = cur_coalesced_stack.next()
382
def put(self, relpath, f, mode=None):
384
Copy the file-like or string object into the location.
386
:param relpath: Location to put the contents, relative to base.
387
:param f: File-like or string object.
388
:param mode: The final mode for the file
390
final_path = self._remote_path(relpath)
391
self._put(final_path, f, mode=mode)
393
def _put(self, abspath, f, mode=None):
394
"""Helper function so both put() and copy_abspaths can reuse the code"""
395
tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
396
os.getpid(), random.randint(0,0x7FFFFFFF))
397
fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
401
fout.set_pipelined(True)
403
except (IOError, paramiko.SSHException), e:
404
self._translate_io_exception(e, tmp_abspath)
406
self._sftp.chmod(tmp_abspath, mode)
409
self._rename_and_overwrite(tmp_abspath, abspath)
411
# If we fail, try to clean up the temporary file
412
# before we throw the exception
413
# but don't let another exception mess things up
414
# Write out the traceback, because otherwise
415
# the catch and throw destroys it
417
mutter(traceback.format_exc())
421
self._sftp.remove(tmp_abspath)
423
# raise the saved except
425
# raise the original with its traceback if we can.
428
def iter_files_recursive(self):
429
"""Walk the relative paths of all files in this transport."""
430
queue = list(self.list_dir('.'))
432
relpath = queue.pop(0)
433
st = self.stat(relpath)
434
if stat.S_ISDIR(st.st_mode):
435
for i, basename in enumerate(self.list_dir(relpath)):
436
queue.insert(i, relpath+'/'+basename)
440
def mkdir(self, relpath, mode=None):
441
"""Create a directory at the given path."""
442
path = self._remote_path(relpath)
444
# In the paramiko documentation, it says that passing a mode flag
445
# will filtered against the server umask.
446
# StubSFTPServer does not do this, which would be nice, because it is
447
# what we really want :)
448
# However, real servers do use umask, so we really should do it that way
449
self._sftp.mkdir(path)
451
self._sftp.chmod(path, mode=mode)
452
except (paramiko.SSHException, IOError), e:
453
self._translate_io_exception(e, path, ': unable to mkdir',
454
failure_exc=FileExists)
456
def _translate_io_exception(self, e, path, more_info='',
457
failure_exc=PathError):
458
"""Translate a paramiko or IOError into a friendlier exception.
460
:param e: The original exception
461
:param path: The path in question when the error is raised
462
:param more_info: Extra information that can be included,
463
such as what was going on
464
:param failure_exc: Paramiko has the super fun ability to raise completely
465
opaque errors that just set "e.args = ('Failure',)" with
467
If this parameter is set, it defines the exception
468
to raise in these cases.
470
# paramiko seems to generate detailless errors.
471
self._translate_error(e, path, raise_generic=False)
472
if hasattr(e, 'args'):
473
if (e.args == ('No such file or directory',) or
474
e.args == ('No such file',)):
475
raise NoSuchFile(path, str(e) + more_info)
476
if (e.args == ('mkdir failed',)):
477
raise FileExists(path, str(e) + more_info)
478
# strange but true, for the paramiko server.
479
if (e.args == ('Failure',)):
480
raise failure_exc(path, str(e) + more_info)
481
mutter('Raising exception with args %s', e.args)
482
if hasattr(e, 'errno'):
483
mutter('Raising exception with errno %s', e.errno)
486
def append(self, relpath, f, mode=None):
488
Append the text in the file-like object into the final
492
path = self._remote_path(relpath)
493
fout = self._sftp.file(path, 'ab')
495
self._sftp.chmod(path, mode)
499
except (IOError, paramiko.SSHException), e:
500
self._translate_io_exception(e, relpath, ': unable to append')
502
def rename(self, rel_from, rel_to):
503
"""Rename without special overwriting"""
505
self._sftp.rename(self._remote_path(rel_from),
506
self._remote_path(rel_to))
507
except (IOError, paramiko.SSHException), e:
508
self._translate_io_exception(e, rel_from,
509
': unable to rename to %r' % (rel_to))
511
def _rename_and_overwrite(self, abs_from, abs_to):
512
"""Do a fancy rename on the remote server.
514
Using the implementation provided by osutils.
517
fancy_rename(abs_from, abs_to,
518
rename_func=self._sftp.rename,
519
unlink_func=self._sftp.remove)
520
except (IOError, paramiko.SSHException), e:
521
self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
523
def move(self, rel_from, rel_to):
524
"""Move the item at rel_from to the location at rel_to"""
525
path_from = self._remote_path(rel_from)
526
path_to = self._remote_path(rel_to)
527
self._rename_and_overwrite(path_from, path_to)
529
def delete(self, relpath):
530
"""Delete the item at relpath"""
531
path = self._remote_path(relpath)
533
self._sftp.remove(path)
534
except (IOError, paramiko.SSHException), e:
535
self._translate_io_exception(e, path, ': unable to delete')
538
"""Return True if this store supports listing."""
541
def list_dir(self, relpath):
543
Return a list of all files at the given location.
545
# does anything actually use this?
547
# This is at least used by copy_tree for remote upgrades.
548
# -- David Allouche 2006-08-11
549
path = self._remote_path(relpath)
551
entries = self._sftp.listdir(path)
552
except (IOError, paramiko.SSHException), e:
553
self._translate_io_exception(e, path, ': failed to list_dir')
554
return [urlutils.escape(entry) for entry in entries]
556
def rmdir(self, relpath):
557
"""See Transport.rmdir."""
558
path = self._remote_path(relpath)
560
return self._sftp.rmdir(path)
561
except (IOError, paramiko.SSHException), e:
562
self._translate_io_exception(e, path, ': failed to rmdir')
564
def stat(self, relpath):
565
"""Return the stat information for a file."""
566
path = self._remote_path(relpath)
568
return self._sftp.stat(path)
569
except (IOError, paramiko.SSHException), e:
570
self._translate_io_exception(e, path, ': unable to stat')
572
def lock_read(self, relpath):
574
Lock the given file for shared (read) access.
575
:return: A lock object, which has an unlock() member function
577
# FIXME: there should be something clever i can do here...
578
class BogusLock(object):
579
def __init__(self, path):
583
return BogusLock(relpath)
585
def lock_write(self, relpath):
587
Lock the given file for exclusive (write) access.
588
WARNING: many transports do not support this, so trying avoid using it
590
:return: A lock object, which has an unlock() member function
592
# This is a little bit bogus, but basically, we create a file
593
# which should not already exist, and if it does, we assume
594
# that there is a lock, and if it doesn't, the we assume
595
# that we have taken the lock.
596
return SFTPLock(relpath, self)
598
def _unparse_url(self, path=None):
601
path = urllib.quote(path)
602
# handle homedir paths
603
if not path.startswith('/'):
605
netloc = urllib.quote(self._host)
606
if self._username is not None:
607
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
608
if self._port is not None:
609
netloc = '%s:%d' % (netloc, self._port)
610
return urlparse.urlunparse(('sftp', netloc, path, '', '', ''))
612
def _split_url(self, url):
613
(scheme, username, password, host, port, path) = split_url(url)
614
assert scheme == 'sftp'
616
# the initial slash should be removed from the path, and treated
617
# as a homedir relative path (the path begins with a double slash
618
# if it is absolute).
619
# see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
620
# RBC 20060118 we are not using this as its too user hostile. instead
621
# we are following lftp and using /~/foo to mean '~/foo'.
622
# handle homedir paths
623
if path.startswith('/~/'):
627
return (username, password, host, port, path)
629
def _parse_url(self, url):
630
(self._username, self._password,
631
self._host, self._port, self._path) = self._split_url(url)
633
def _sftp_connect(self):
634
"""Connect to the remote sftp server.
635
After this, self._sftp should have a valid connection (or
636
we raise an TransportError 'could not connect').
638
TODO: Raise a more reasonable ConnectionFailed exception
640
self._sftp = _sftp_connect(self._host, self._port, self._username,
643
def _sftp_open_exclusive(self, abspath, mode=None):
644
"""Open a remote path exclusively.
646
SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
647
the file already exists. However it does not expose this
648
at the higher level of SFTPClient.open(), so we have to
651
WARNING: This breaks the SFTPClient abstraction, so it
652
could easily break against an updated version of paramiko.
654
:param abspath: The remote absolute path where the file should be opened
655
:param mode: The mode permissions bits for the new file
657
path = self._sftp._adjust_cwd(abspath)
658
# mutter('sftp abspath %s => %s', abspath, path)
659
attr = SFTPAttributes()
662
omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
663
| SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
665
t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
667
raise TransportError('Expected an SFTP handle')
668
handle = msg.get_string()
669
return SFTPFile(self._sftp, handle, 'wb', -1)
670
except (paramiko.SSHException, IOError), e:
671
self._translate_io_exception(e, abspath, ': unable to open',
672
failure_exc=FileExists)
675
# ------------- server test implementation --------------
678
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
680
STUB_SERVER_KEY = """
681
-----BEGIN RSA PRIVATE KEY-----
682
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
683
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
684
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
685
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
686
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
687
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
688
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
689
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
690
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
691
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
692
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
693
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
694
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
695
-----END RSA PRIVATE KEY-----
699
class SocketListener(threading.Thread):
701
def __init__(self, callback):
702
threading.Thread.__init__(self)
703
self._callback = callback
704
self._socket = socket.socket()
705
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
706
self._socket.bind(('localhost', 0))
707
self._socket.listen(1)
708
self.port = self._socket.getsockname()[1]
709
self._stop_event = threading.Event()
712
# called from outside this thread
713
self._stop_event.set()
714
# use a timeout here, because if the test fails, the server thread may
715
# never notice the stop_event.
721
readable, writable_unused, exception_unused = \
722
select.select([self._socket], [], [], 0.1)
723
if self._stop_event.isSet():
725
if len(readable) == 0:
728
s, addr_unused = self._socket.accept()
729
# because the loopback socket is inline, and transports are
730
# never explicitly closed, best to launch a new thread.
731
threading.Thread(target=self._callback, args=(s,)).start()
732
except socket.error, x:
733
sys.excepthook(*sys.exc_info())
734
warning('Socket error during accept() within unit test server'
737
# probably a failed test; unit test thread will log the
739
sys.excepthook(*sys.exc_info())
740
warning('Exception from within unit test server thread: %r' %
744
class SocketDelay(object):
745
"""A socket decorator to make TCP appear slower.
747
This changes recv, send, and sendall to add a fixed latency to each python
748
call if a new roundtrip is detected. That is, when a recv is called and the
749
flag new_roundtrip is set, latency is charged. Every send and send_all
752
In addition every send, sendall and recv sleeps a bit per character send to
755
Not all methods are implemented, this is deliberate as this class is not a
756
replacement for the builtin sockets layer. fileno is not implemented to
757
prevent the proxy being bypassed.
761
_proxied_arguments = dict.fromkeys([
762
"close", "getpeername", "getsockname", "getsockopt", "gettimeout",
763
"setblocking", "setsockopt", "settimeout", "shutdown"])
765
def __init__(self, sock, latency, bandwidth=1.0,
768
:param bandwith: simulated bandwith (MegaBit)
769
:param really_sleep: If set to false, the SocketDelay will just
770
increase a counter, instead of calling time.sleep. This is useful for
771
unittesting the SocketDelay.
774
self.latency = latency
775
self.really_sleep = really_sleep
776
self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
777
self.new_roundtrip = False
780
if self.really_sleep:
783
SocketDelay.simulated_time += s
785
def __getattr__(self, attr):
786
if attr in SocketDelay._proxied_arguments:
787
return getattr(self.sock, attr)
788
raise AttributeError("'SocketDelay' object has no attribute %r" %
792
return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
795
def recv(self, *args):
796
data = self.sock.recv(*args)
797
if data and self.new_roundtrip:
798
self.new_roundtrip = False
799
self.sleep(self.latency)
800
self.sleep(len(data) * self.time_per_byte)
803
def sendall(self, data, flags=0):
804
if not self.new_roundtrip:
805
self.new_roundtrip = True
806
self.sleep(self.latency)
807
self.sleep(len(data) * self.time_per_byte)
808
return self.sock.sendall(data, flags)
810
def send(self, data, flags=0):
811
if not self.new_roundtrip:
812
self.new_roundtrip = True
813
self.sleep(self.latency)
814
bytes_sent = self.sock.send(data, flags)
815
self.sleep(bytes_sent * self.time_per_byte)
819
class SFTPServer(Server):
820
"""Common code for SFTP server facilities."""
823
self._original_vendor = None
825
self._server_homedir = None
826
self._listener = None
828
self._vendor = ssh.ParamikoVendor()
833
def _get_sftp_url(self, path):
834
"""Calculate an sftp url to this server for path."""
835
return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
837
def log(self, message):
838
"""StubServer uses this to log when a new server is created."""
839
self.logs.append(message)
841
def _run_server_entry(self, sock):
842
"""Entry point for all implementations of _run_server.
844
If self.add_latency is > 0.000001 then sock is given a latency adding
847
if self.add_latency > 0.000001:
848
sock = SocketDelay(sock, self.add_latency)
849
return self._run_server(sock)
851
def _run_server(self, s):
852
ssh_server = paramiko.Transport(s)
853
key_file = pathjoin(self._homedir, 'test_rsa.key')
854
f = open(key_file, 'w')
855
f.write(STUB_SERVER_KEY)
857
host_key = paramiko.RSAKey.from_private_key_file(key_file)
858
ssh_server.add_server_key(host_key)
859
server = StubServer(self)
860
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
861
StubSFTPServer, root=self._root,
862
home=self._server_homedir)
863
event = threading.Event()
864
ssh_server.start_server(event, server)
868
self._original_vendor = ssh._ssh_vendor
869
ssh._ssh_vendor = self._vendor
870
if sys.platform == 'win32':
871
# Win32 needs to use the UNICODE api
872
self._homedir = getcwd()
874
# But Linux SFTP servers should just deal in bytestreams
875
self._homedir = os.getcwd()
876
if self._server_homedir is None:
877
self._server_homedir = self._homedir
879
if sys.platform == 'win32':
881
self._listener = SocketListener(self._run_server_entry)
882
self._listener.setDaemon(True)
883
self._listener.start()
886
"""See bzrlib.transport.Server.tearDown."""
887
self._listener.stop()
888
ssh._ssh_vendor = self._original_vendor
890
def get_bogus_url(self):
891
"""See bzrlib.transport.Server.get_bogus_url."""
892
# this is chosen to try to prevent trouble with proxies, wierd dns, etc
893
# we bind a random socket, so that we get a guaranteed unused port
894
# we just never listen on that port
896
s.bind(('localhost', 0))
897
return 'sftp://%s:%s/' % s.getsockname()
900
class SFTPFullAbsoluteServer(SFTPServer):
901
"""A test server for sftp transports, using absolute urls and ssh."""
904
"""See bzrlib.transport.Server.get_url."""
905
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
908
class SFTPServerWithoutSSH(SFTPServer):
909
"""An SFTP server that uses a simple TCP socket pair rather than SSH."""
912
super(SFTPServerWithoutSSH, self).__init__()
913
self._vendor = ssh.LoopbackVendor()
915
def _run_server(self, sock):
916
# Re-import these as locals, so that they're still accessible during
917
# interpreter shutdown (when all module globals get set to None, leading
918
# to confusing errors like "'NoneType' object has no attribute 'error'".
920
class FakeChannel(object):
921
def get_transport(self):
923
def get_log_channel(self):
927
def get_hexdump(self):
932
server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
933
root=self._root, home=self._server_homedir)
935
server.start_subsystem('sftp', None, sock)
936
except socket.error, e:
937
if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
938
# it's okay for the client to disconnect abruptly
939
# (bug in paramiko 1.6: it should absorb this exception)
944
import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
945
server.finish_subsystem()
948
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
949
"""A test server for sftp transports, using absolute urls."""
952
"""See bzrlib.transport.Server.get_url."""
953
if sys.platform == 'win32':
954
return self._get_sftp_url(urlutils.escape(self._homedir))
956
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
959
class SFTPHomeDirServer(SFTPServerWithoutSSH):
960
"""A test server for sftp transports, using homedir relative urls."""
963
"""See bzrlib.transport.Server.get_url."""
964
return self._get_sftp_url("~/")
967
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
968
"""A test servere for sftp transports, using absolute urls to non-home."""
971
self._server_homedir = '/dev/noone/runs/tests/here'
972
super(SFTPSiblingAbsoluteServer, self).setUp()
975
def _sftp_connect(host, port, username, password):
976
"""Connect to the remote sftp server.
978
:raises: a TransportError 'could not connect'.
980
:returns: an paramiko.sftp_client.SFTPClient
982
TODO: Raise a more reasonable ConnectionFailed exception
984
idx = (host, port, username)
986
return _connected_hosts[idx]
990
sftp = _sftp_connect_uncached(host, port, username, password)
991
_connected_hosts[idx] = sftp
994
def _sftp_connect_uncached(host, port, username, password):
995
vendor = ssh._get_ssh_vendor()
996
sftp = vendor.connect_sftp(username, password, host, port)
1000
def get_test_permutations():
1001
"""Return the permutations to be used in testing."""
1002
return [(SFTPTransport, SFTPAbsoluteServer),
1003
(SFTPTransport, SFTPHomeDirServer),
1004
(SFTPTransport, SFTPSiblingAbsoluteServer),