1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006 Canonical Ltd
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Implementation of Transport over SFTP, using paramiko."""
20
# TODO: Remove the transport-based lock_read and lock_write methods. They'll
21
# then raise TransportNotPossible, which will break remote access to any
22
# formats which rely on OS-level locks. That should be fine as those formats
23
# are pretty old, but these combinations may have to be removed from the test
24
# suite. Those formats all date back to 0.7; so we should be able to remove
25
# these methods when we officially drop support for those formats.
39
from bzrlib.errors import (FileExists,
40
NoSuchFile, PathNotChild,
46
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
47
from bzrlib.trace import mutter, warning
48
from bzrlib.transport import (
49
register_urlparse_netloc_protocol,
55
import bzrlib.urlutils as urlutils
59
except ImportError, e:
60
raise ParamikoNotPresent(e)
62
from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
63
SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
65
from paramiko.sftp_attr import SFTPAttributes
66
from paramiko.sftp_file import SFTPFile
69
register_urlparse_netloc_protocol('sftp')
72
# This is a weakref dictionary, so that we can reuse connections
73
# that are still active. Long term, it might be nice to have some
74
# sort of expiration policy, such as disconnect if inactive for
75
# X seconds. But that requires a lot more fanciness.
76
_connected_hosts = weakref.WeakValueDictionary()
79
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
80
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
81
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
84
def clear_connection_cache():
85
"""Remove all hosts from the SFTP connection cache.
87
Primarily useful for test cases wanting to force garbage collection.
89
_connected_hosts.clear()
92
class SFTPLock(object):
93
"""This fakes a lock in a remote location.
95
A present lock is indicated just by the existence of a file. This
96
doesn't work well on all transports and they are only used in
97
deprecated storage formats.
100
__slots__ = ['path', 'lock_path', 'lock_file', 'transport']
102
def __init__(self, path, transport):
103
assert isinstance(transport, SFTPTransport)
105
self.lock_file = None
107
self.lock_path = path + '.write-lock'
108
self.transport = transport
110
# RBC 20060103 FIXME should we be using private methods here ?
111
abspath = transport._remote_path(self.lock_path)
112
self.lock_file = transport._sftp_open_exclusive(abspath)
114
raise LockError('File %r already locked' % (self.path,))
117
"""Should this warn, or actually try to cleanup?"""
119
warning("SFTPLock %r not explicitly unlocked" % (self.path,))
123
if not self.lock_file:
125
self.lock_file.close()
126
self.lock_file = None
128
self.transport.delete(self.lock_path)
129
except (NoSuchFile,):
130
# What specific errors should we catch here?
134
class SFTPUrlHandling(Transport):
135
"""Mix-in that does common handling of SSH/SFTP URLs."""
137
def __init__(self, base):
138
self._parse_url(base)
139
base = self._unparse_url(self._path)
142
super(SFTPUrlHandling, self).__init__(base)
144
def _parse_url(self, url):
146
self._username, self._password,
147
self._host, self._port, self._path) = self._split_url(url)
149
def _unparse_url(self, path):
150
"""Return a URL for a path relative to this transport.
152
path = urllib.quote(path)
153
# handle homedir paths
154
if not path.startswith('/'):
156
netloc = urllib.quote(self._host)
157
if self._username is not None:
158
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
159
if self._port is not None:
160
netloc = '%s:%d' % (netloc, self._port)
161
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
163
def _split_url(self, url):
164
(scheme, username, password, host, port, path) = split_url(url)
165
## assert scheme == 'sftp'
167
# the initial slash should be removed from the path, and treated
168
# as a homedir relative path (the path begins with a double slash
169
# if it is absolute).
170
# see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
171
# RBC 20060118 we are not using this as its too user hostile. instead
172
# we are following lftp and using /~/foo to mean '~/foo'.
173
# handle homedir paths
174
if path.startswith('/~/'):
178
return (scheme, username, password, host, port, path)
180
def _remote_path(self, relpath):
181
"""Return the path to be passed along the sftp protocol for relpath.
183
:param relpath: is a urlencoded string.
185
return self._combine_paths(self._path, relpath)
188
class SFTPTransport(SFTPUrlHandling):
189
"""Transport implementation for SFTP access."""
191
_do_prefetch = _default_do_prefetch
192
# TODO: jam 20060717 Conceivably these could be configurable, either
193
# by auto-tuning at run-time, or by a configuration (per host??)
194
# but the performance curve is pretty flat, so just going with
195
# reasonable defaults.
196
_max_readv_combine = 200
197
# Having to round trip to the server means waiting for a response,
198
# so it is better to download extra bytes.
199
# 8KiB had good performance for both local and remote network operations
200
_bytes_to_read_before_seek = 8192
202
# The sftp spec says that implementations SHOULD allow reads
203
# to be at least 32K. paramiko.readv() does an async request
204
# for the chunks. So we need to keep it within a single request
205
# size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
206
# up the request itself, rather than us having to worry about it
207
_max_request_size = 32768
209
def __init__(self, base, clone_from=None):
210
super(SFTPTransport, self).__init__(base)
211
if clone_from is None:
214
# use the same ssh connection, etc
215
self._sftp = clone_from._sftp
216
# super saves 'self.base'
218
def should_cache(self):
220
Return True if the data pulled across should be cached locally.
224
def clone(self, offset=None):
226
Return a new SFTPTransport with root at self.base + offset.
227
We share the same SFTP session between such transports, because it's
228
fairly expensive to set them up.
231
return SFTPTransport(self.base, self)
233
return SFTPTransport(self.abspath(offset), self)
235
def abspath(self, relpath):
237
Return the full url to the given relative path.
239
@param relpath: the relative path or path components
240
@type relpath: str or list
242
return self._unparse_url(self._remote_path(relpath))
244
def _remote_path(self, relpath):
245
"""Return the path to be passed along the sftp protocol for relpath.
247
relpath is a urlencoded string.
249
:return: a path prefixed with / for regular abspath-based urls, or a
250
path that does not begin with / for urls which begin with /~/.
252
# FIXME: share the common code across transports
253
assert isinstance(relpath, basestring)
254
basepath = self._path.split('/')
255
if relpath.startswith('/'):
257
relpath = urlutils.unescape(relpath).split('/')
258
if len(basepath) > 0 and basepath[-1] == '':
259
basepath = basepath[:-1]
263
if len(basepath) == 0:
264
# In most filesystems, a request for the parent
265
# of root, just returns root.
273
path = '/'.join(basepath)
274
# mutter('relpath => remotepath %s => %s', relpath, path)
277
def relpath(self, abspath):
278
scheme, username, password, host, port, path = self._split_url(abspath)
280
if (username != self._username):
281
error.append('username mismatch')
282
if (host != self._host):
283
error.append('host mismatch')
284
if (port != self._port):
285
error.append('port mismatch')
286
if (not path.startswith(self._path)):
287
error.append('path mismatch')
289
extra = ': ' + ', '.join(error)
290
raise PathNotChild(abspath, self.base, extra=extra)
292
return path[pl:].strip('/')
294
def has(self, relpath):
296
Does the target location exist?
299
self._sftp.stat(self._remote_path(relpath))
304
def get(self, relpath):
306
Get the file at the given relative path.
308
:param relpath: The relative path to the file
311
path = self._remote_path(relpath)
312
f = self._sftp.file(path, mode='rb')
313
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
316
except (IOError, paramiko.SSHException), e:
317
self._translate_io_exception(e, path, ': error retrieving')
319
def readv(self, relpath, offsets):
320
"""See Transport.readv()"""
321
# We overload the default readv() because we want to use a file
322
# that does not have prefetch enabled.
323
# Also, if we have a new paramiko, it implements an async readv()
328
path = self._remote_path(relpath)
329
fp = self._sftp.file(path, mode='rb')
330
readv = getattr(fp, 'readv', None)
332
return self._sftp_readv(fp, offsets)
333
mutter('seek and read %s offsets', len(offsets))
334
return self._seek_and_read(fp, offsets)
335
except (IOError, paramiko.SSHException), e:
336
self._translate_io_exception(e, path, ': error retrieving')
338
def _sftp_readv(self, fp, offsets):
339
"""Use the readv() member of fp to do async readv.
341
And then read them using paramiko.readv(). paramiko.readv()
342
does not support ranges > 64K, so it caps the request size, and
343
just reads until it gets all the stuff it wants
345
offsets = list(offsets)
346
sorted_offsets = sorted(offsets)
348
# The algorithm works as follows:
349
# 1) Coalesce nearby reads into a single chunk
350
# This generates a list of combined regions, the total size
351
# and the size of the sub regions. This coalescing step is limited
352
# in the number of nearby chunks to combine, and is allowed to
353
# skip small breaks in the requests. Limiting it makes sure that
354
# we can start yielding some data earlier, and skipping means we
355
# make fewer requests. (Beneficial even when using async)
356
# 2) Break up this combined regions into chunks that are smaller
357
# than 64KiB. Technically the limit is 65536, but we are a
358
# little bit conservative. This is because sftp has a maximum
359
# return chunk size of 64KiB (max size of an unsigned short)
360
# 3) Issue a readv() to paramiko to create an async request for
362
# 4) Read in the data as it comes back, until we've read one
363
# continuous section as determined in step 1
364
# 5) Break up the full sections into hunks for the original requested
365
# offsets. And put them in a cache
366
# 6) Check if the next request is in the cache, and if it is, remove
367
# it from the cache, and yield its data. Continue until no more
368
# entries are in the cache.
369
# 7) loop back to step 4 until all data has been read
371
# TODO: jam 20060725 This could be optimized one step further, by
372
# attempting to yield whatever data we have read, even before
373
# the first coallesced section has been fully processed.
375
# When coalescing for use with readv(), we don't really need to
376
# use any fudge factor, because the requests are made asynchronously
377
coalesced = list(self._coalesce_offsets(sorted_offsets,
378
limit=self._max_readv_combine,
382
for c_offset in coalesced:
383
start = c_offset.start
384
size = c_offset.length
386
# We need to break this up into multiple requests
388
next_size = min(size, self._max_request_size)
389
requests.append((start, next_size))
393
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
394
len(offsets), len(coalesced), len(requests))
396
# Queue the current read until we have read the full coalesced section
399
cur_coalesced_stack = iter(coalesced)
400
cur_coalesced = cur_coalesced_stack.next()
402
# Cache the results, but only until they have been fulfilled
404
# turn the list of offsets into a stack
405
offset_stack = iter(offsets)
406
cur_offset_and_size = offset_stack.next()
408
for data in fp.readv(requests):
410
cur_data_len += len(data)
412
if cur_data_len < cur_coalesced.length:
414
assert cur_data_len == cur_coalesced.length, \
415
"Somehow we read too much: %s != %s" % (cur_data_len,
416
cur_coalesced.length)
417
all_data = ''.join(cur_data)
421
for suboffset, subsize in cur_coalesced.ranges:
422
key = (cur_coalesced.start+suboffset, subsize)
423
data_map[key] = all_data[suboffset:suboffset+subsize]
425
# Now that we've read some data, see if we can yield anything back
426
while cur_offset_and_size in data_map:
427
this_data = data_map.pop(cur_offset_and_size)
428
yield cur_offset_and_size[0], this_data
429
cur_offset_and_size = offset_stack.next()
431
# Now that we've read all of the data for this coalesced section
433
cur_coalesced = cur_coalesced_stack.next()
435
def put_file(self, relpath, f, mode=None):
437
Copy the file-like object into the location.
439
:param relpath: Location to put the contents, relative to base.
440
:param f: File-like object.
441
:param mode: The final mode for the file
443
final_path = self._remote_path(relpath)
444
self._put(final_path, f, mode=mode)
446
def _put(self, abspath, f, mode=None):
447
"""Helper function so both put() and copy_abspaths can reuse the code"""
448
tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
449
os.getpid(), random.randint(0,0x7FFFFFFF))
450
fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
454
fout.set_pipelined(True)
456
except (IOError, paramiko.SSHException), e:
457
self._translate_io_exception(e, tmp_abspath)
458
# XXX: This doesn't truly help like we would like it to.
459
# The problem is that openssh strips sticky bits. So while we
460
# can properly set group write permission, we lose the group
461
# sticky bit. So it is probably best to stop chmodding, and
462
# just tell users that they need to set the umask correctly.
463
# The attr.st_mode = mode, in _sftp_open_exclusive
464
# will handle when the user wants the final mode to be more
465
# restrictive. And then we avoid a round trip. Unless
466
# paramiko decides to expose an async chmod()
468
# This is designed to chmod() right before we close.
469
# Because we set_pipelined() earlier, theoretically we might
470
# avoid the round trip for fout.close()
472
self._sftp.chmod(tmp_abspath, mode)
475
self._rename_and_overwrite(tmp_abspath, abspath)
477
# If we fail, try to clean up the temporary file
478
# before we throw the exception
479
# but don't let another exception mess things up
480
# Write out the traceback, because otherwise
481
# the catch and throw destroys it
483
mutter(traceback.format_exc())
487
self._sftp.remove(tmp_abspath)
489
# raise the saved except
491
# raise the original with its traceback if we can.
494
def _put_non_atomic_helper(self, relpath, writer, mode=None,
495
create_parent_dir=False,
497
abspath = self._remote_path(relpath)
499
# TODO: jam 20060816 paramiko doesn't publicly expose a way to
500
# set the file mode at create time. If it does, use it.
501
# But for now, we just chmod later anyway.
503
def _open_and_write_file():
504
"""Try to open the target file, raise error on failure"""
508
fout = self._sftp.file(abspath, mode='wb')
509
fout.set_pipelined(True)
511
except (paramiko.SSHException, IOError), e:
512
self._translate_io_exception(e, abspath,
515
# This is designed to chmod() right before we close.
516
# Because we set_pipelined() earlier, theoretically we might
517
# avoid the round trip for fout.close()
519
self._sftp.chmod(abspath, mode)
524
if not create_parent_dir:
525
_open_and_write_file()
528
# Try error handling to create the parent directory if we need to
530
_open_and_write_file()
532
# Try to create the parent directory, and then go back to
534
parent_dir = os.path.dirname(abspath)
535
self._mkdir(parent_dir, dir_mode)
536
_open_and_write_file()
538
def put_file_non_atomic(self, relpath, f, mode=None,
539
create_parent_dir=False,
541
"""Copy the file-like object into the target location.
543
This function is not strictly safe to use. It is only meant to
544
be used when you already know that the target does not exist.
545
It is not safe, because it will open and truncate the remote
546
file. So there may be a time when the file has invalid contents.
548
:param relpath: The remote location to put the contents.
549
:param f: File-like object.
550
:param mode: Possible access permissions for new file.
551
None means do not set remote permissions.
552
:param create_parent_dir: If we cannot create the target file because
553
the parent directory does not exist, go ahead and
554
create it, and then try again.
558
self._put_non_atomic_helper(relpath, writer, mode=mode,
559
create_parent_dir=create_parent_dir,
562
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
563
create_parent_dir=False,
567
self._put_non_atomic_helper(relpath, writer, mode=mode,
568
create_parent_dir=create_parent_dir,
571
def iter_files_recursive(self):
572
"""Walk the relative paths of all files in this transport."""
573
queue = list(self.list_dir('.'))
575
relpath = queue.pop(0)
576
st = self.stat(relpath)
577
if stat.S_ISDIR(st.st_mode):
578
for i, basename in enumerate(self.list_dir(relpath)):
579
queue.insert(i, relpath+'/'+basename)
583
def _mkdir(self, abspath, mode=None):
589
self._sftp.mkdir(abspath, local_mode)
591
self._sftp.chmod(abspath, mode=mode)
592
except (paramiko.SSHException, IOError), e:
593
self._translate_io_exception(e, abspath, ': unable to mkdir',
594
failure_exc=FileExists)
596
def mkdir(self, relpath, mode=None):
597
"""Create a directory at the given path."""
598
self._mkdir(self._remote_path(relpath), mode=mode)
600
def _translate_io_exception(self, e, path, more_info='',
601
failure_exc=PathError):
602
"""Translate a paramiko or IOError into a friendlier exception.
604
:param e: The original exception
605
:param path: The path in question when the error is raised
606
:param more_info: Extra information that can be included,
607
such as what was going on
608
:param failure_exc: Paramiko has the super fun ability to raise completely
609
opaque errors that just set "e.args = ('Failure',)" with
611
If this parameter is set, it defines the exception
612
to raise in these cases.
614
# paramiko seems to generate detailless errors.
615
self._translate_error(e, path, raise_generic=False)
616
if getattr(e, 'args', None) is not None:
617
if (e.args == ('No such file or directory',) or
618
e.args == ('No such file',)):
619
raise NoSuchFile(path, str(e) + more_info)
620
if (e.args == ('mkdir failed',)):
621
raise FileExists(path, str(e) + more_info)
622
# strange but true, for the paramiko server.
623
if (e.args == ('Failure',)):
624
raise failure_exc(path, str(e) + more_info)
625
mutter('Raising exception with args %s', e.args)
626
if getattr(e, 'errno', None) is not None:
627
mutter('Raising exception with errno %s', e.errno)
630
def append_file(self, relpath, f, mode=None):
632
Append the text in the file-like object into the final
636
path = self._remote_path(relpath)
637
fout = self._sftp.file(path, 'ab')
639
self._sftp.chmod(path, mode)
643
except (IOError, paramiko.SSHException), e:
644
self._translate_io_exception(e, relpath, ': unable to append')
646
def rename(self, rel_from, rel_to):
647
"""Rename without special overwriting"""
649
self._sftp.rename(self._remote_path(rel_from),
650
self._remote_path(rel_to))
651
except (IOError, paramiko.SSHException), e:
652
self._translate_io_exception(e, rel_from,
653
': unable to rename to %r' % (rel_to))
655
def _rename_and_overwrite(self, abs_from, abs_to):
656
"""Do a fancy rename on the remote server.
658
Using the implementation provided by osutils.
661
fancy_rename(abs_from, abs_to,
662
rename_func=self._sftp.rename,
663
unlink_func=self._sftp.remove)
664
except (IOError, paramiko.SSHException), e:
665
self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
667
def move(self, rel_from, rel_to):
668
"""Move the item at rel_from to the location at rel_to"""
669
path_from = self._remote_path(rel_from)
670
path_to = self._remote_path(rel_to)
671
self._rename_and_overwrite(path_from, path_to)
673
def delete(self, relpath):
674
"""Delete the item at relpath"""
675
path = self._remote_path(relpath)
677
self._sftp.remove(path)
678
except (IOError, paramiko.SSHException), e:
679
self._translate_io_exception(e, path, ': unable to delete')
682
"""Return True if this store supports listing."""
685
def list_dir(self, relpath):
687
Return a list of all files at the given location.
689
# does anything actually use this?
691
# This is at least used by copy_tree for remote upgrades.
692
# -- David Allouche 2006-08-11
693
path = self._remote_path(relpath)
695
entries = self._sftp.listdir(path)
696
except (IOError, paramiko.SSHException), e:
697
self._translate_io_exception(e, path, ': failed to list_dir')
698
return [urlutils.escape(entry) for entry in entries]
700
def rmdir(self, relpath):
701
"""See Transport.rmdir."""
702
path = self._remote_path(relpath)
704
return self._sftp.rmdir(path)
705
except (IOError, paramiko.SSHException), e:
706
self._translate_io_exception(e, path, ': failed to rmdir')
708
def stat(self, relpath):
709
"""Return the stat information for a file."""
710
path = self._remote_path(relpath)
712
return self._sftp.stat(path)
713
except (IOError, paramiko.SSHException), e:
714
self._translate_io_exception(e, path, ': unable to stat')
716
def lock_read(self, relpath):
718
Lock the given file for shared (read) access.
719
:return: A lock object, which has an unlock() member function
721
# FIXME: there should be something clever i can do here...
722
class BogusLock(object):
723
def __init__(self, path):
727
return BogusLock(relpath)
729
def lock_write(self, relpath):
731
Lock the given file for exclusive (write) access.
732
WARNING: many transports do not support this, so trying avoid using it
734
:return: A lock object, which has an unlock() member function
736
# This is a little bit bogus, but basically, we create a file
737
# which should not already exist, and if it does, we assume
738
# that there is a lock, and if it doesn't, the we assume
739
# that we have taken the lock.
740
return SFTPLock(relpath, self)
742
def _sftp_connect(self):
743
"""Connect to the remote sftp server.
744
After this, self._sftp should have a valid connection (or
745
we raise an TransportError 'could not connect').
747
TODO: Raise a more reasonable ConnectionFailed exception
749
self._sftp = _sftp_connect(self._host, self._port, self._username,
752
def _sftp_open_exclusive(self, abspath, mode=None):
753
"""Open a remote path exclusively.
755
SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
756
the file already exists. However it does not expose this
757
at the higher level of SFTPClient.open(), so we have to
760
WARNING: This breaks the SFTPClient abstraction, so it
761
could easily break against an updated version of paramiko.
763
:param abspath: The remote absolute path where the file should be opened
764
:param mode: The mode permissions bits for the new file
766
# TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
767
# using the 'x' flag to indicate SFTP_FLAG_EXCL.
768
# However, there is no way to set the permission mode at open
769
# time using the sftp_client.file() functionality.
770
path = self._sftp._adjust_cwd(abspath)
771
# mutter('sftp abspath %s => %s', abspath, path)
772
attr = SFTPAttributes()
775
omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
776
| SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
778
t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
780
raise TransportError('Expected an SFTP handle')
781
handle = msg.get_string()
782
return SFTPFile(self._sftp, handle, 'wb', -1)
783
except (paramiko.SSHException, IOError), e:
784
self._translate_io_exception(e, abspath, ': unable to open',
785
failure_exc=FileExists)
787
def _can_roundtrip_unix_modebits(self):
788
if sys.platform == 'win32':
794
# ------------- server test implementation --------------
797
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
799
STUB_SERVER_KEY = """
800
-----BEGIN RSA PRIVATE KEY-----
801
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
802
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
803
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
804
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
805
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
806
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
807
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
808
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
809
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
810
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
811
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
812
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
813
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
814
-----END RSA PRIVATE KEY-----
818
class SocketListener(threading.Thread):
820
def __init__(self, callback):
821
threading.Thread.__init__(self)
822
self._callback = callback
823
self._socket = socket.socket()
824
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
825
self._socket.bind(('localhost', 0))
826
self._socket.listen(1)
827
self.port = self._socket.getsockname()[1]
828
self._stop_event = threading.Event()
831
# called from outside this thread
832
self._stop_event.set()
833
# use a timeout here, because if the test fails, the server thread may
834
# never notice the stop_event.
840
readable, writable_unused, exception_unused = \
841
select.select([self._socket], [], [], 0.1)
842
if self._stop_event.isSet():
844
if len(readable) == 0:
847
s, addr_unused = self._socket.accept()
848
# because the loopback socket is inline, and transports are
849
# never explicitly closed, best to launch a new thread.
850
threading.Thread(target=self._callback, args=(s,)).start()
851
except socket.error, x:
852
sys.excepthook(*sys.exc_info())
853
warning('Socket error during accept() within unit test server'
856
# probably a failed test; unit test thread will log the
858
sys.excepthook(*sys.exc_info())
859
warning('Exception from within unit test server thread: %r' %
863
class SocketDelay(object):
864
"""A socket decorator to make TCP appear slower.
866
This changes recv, send, and sendall to add a fixed latency to each python
867
call if a new roundtrip is detected. That is, when a recv is called and the
868
flag new_roundtrip is set, latency is charged. Every send and send_all
871
In addition every send, sendall and recv sleeps a bit per character send to
874
Not all methods are implemented, this is deliberate as this class is not a
875
replacement for the builtin sockets layer. fileno is not implemented to
876
prevent the proxy being bypassed.
880
_proxied_arguments = dict.fromkeys([
881
"close", "getpeername", "getsockname", "getsockopt", "gettimeout",
882
"setblocking", "setsockopt", "settimeout", "shutdown"])
884
def __init__(self, sock, latency, bandwidth=1.0,
887
:param bandwith: simulated bandwith (MegaBit)
888
:param really_sleep: If set to false, the SocketDelay will just
889
increase a counter, instead of calling time.sleep. This is useful for
890
unittesting the SocketDelay.
893
self.latency = latency
894
self.really_sleep = really_sleep
895
self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
896
self.new_roundtrip = False
899
if self.really_sleep:
902
SocketDelay.simulated_time += s
904
def __getattr__(self, attr):
905
if attr in SocketDelay._proxied_arguments:
906
return getattr(self.sock, attr)
907
raise AttributeError("'SocketDelay' object has no attribute %r" %
911
return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
914
def recv(self, *args):
915
data = self.sock.recv(*args)
916
if data and self.new_roundtrip:
917
self.new_roundtrip = False
918
self.sleep(self.latency)
919
self.sleep(len(data) * self.time_per_byte)
922
def sendall(self, data, flags=0):
923
if not self.new_roundtrip:
924
self.new_roundtrip = True
925
self.sleep(self.latency)
926
self.sleep(len(data) * self.time_per_byte)
927
return self.sock.sendall(data, flags)
929
def send(self, data, flags=0):
930
if not self.new_roundtrip:
931
self.new_roundtrip = True
932
self.sleep(self.latency)
933
bytes_sent = self.sock.send(data, flags)
934
self.sleep(bytes_sent * self.time_per_byte)
938
class SFTPServer(Server):
939
"""Common code for SFTP server facilities."""
942
self._original_vendor = None
944
self._server_homedir = None
945
self._listener = None
947
self._vendor = ssh.ParamikoVendor()
952
def _get_sftp_url(self, path):
953
"""Calculate an sftp url to this server for path."""
954
return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
956
def log(self, message):
957
"""StubServer uses this to log when a new server is created."""
958
self.logs.append(message)
960
def _run_server_entry(self, sock):
961
"""Entry point for all implementations of _run_server.
963
If self.add_latency is > 0.000001 then sock is given a latency adding
966
if self.add_latency > 0.000001:
967
sock = SocketDelay(sock, self.add_latency)
968
return self._run_server(sock)
970
def _run_server(self, s):
971
ssh_server = paramiko.Transport(s)
972
key_file = pathjoin(self._homedir, 'test_rsa.key')
973
f = open(key_file, 'w')
974
f.write(STUB_SERVER_KEY)
976
host_key = paramiko.RSAKey.from_private_key_file(key_file)
977
ssh_server.add_server_key(host_key)
978
server = StubServer(self)
979
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
980
StubSFTPServer, root=self._root,
981
home=self._server_homedir)
982
event = threading.Event()
983
ssh_server.start_server(event, server)
987
self._original_vendor = ssh._ssh_vendor
988
ssh._ssh_vendor = self._vendor
989
if sys.platform == 'win32':
990
# Win32 needs to use the UNICODE api
991
self._homedir = getcwd()
993
# But Linux SFTP servers should just deal in bytestreams
994
self._homedir = os.getcwd()
995
if self._server_homedir is None:
996
self._server_homedir = self._homedir
998
if sys.platform == 'win32':
1000
self._listener = SocketListener(self._run_server_entry)
1001
self._listener.setDaemon(True)
1002
self._listener.start()
1005
"""See bzrlib.transport.Server.tearDown."""
1006
self._listener.stop()
1007
ssh._ssh_vendor = self._original_vendor
1009
def get_bogus_url(self):
1010
"""See bzrlib.transport.Server.get_bogus_url."""
1011
# this is chosen to try to prevent trouble with proxies, wierd dns, etc
1012
# we bind a random socket, so that we get a guaranteed unused port
1013
# we just never listen on that port
1015
s.bind(('localhost', 0))
1016
return 'sftp://%s:%s/' % s.getsockname()
1019
class SFTPFullAbsoluteServer(SFTPServer):
1020
"""A test server for sftp transports, using absolute urls and ssh."""
1023
"""See bzrlib.transport.Server.get_url."""
1024
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1027
class SFTPServerWithoutSSH(SFTPServer):
1028
"""An SFTP server that uses a simple TCP socket pair rather than SSH."""
1031
super(SFTPServerWithoutSSH, self).__init__()
1032
self._vendor = ssh.LoopbackVendor()
1034
def _run_server(self, sock):
1035
# Re-import these as locals, so that they're still accessible during
1036
# interpreter shutdown (when all module globals get set to None, leading
1037
# to confusing errors like "'NoneType' object has no attribute 'error'".
1038
class FakeChannel(object):
1039
def get_transport(self):
1041
def get_log_channel(self):
1045
def get_hexdump(self):
1050
server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1051
root=self._root, home=self._server_homedir)
1053
server.start_subsystem('sftp', None, sock)
1054
except socket.error, e:
1055
if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1056
# it's okay for the client to disconnect abruptly
1057
# (bug in paramiko 1.6: it should absorb this exception)
1061
except Exception, e:
1062
import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
1063
server.finish_subsystem()
1066
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1067
"""A test server for sftp transports, using absolute urls."""
1070
"""See bzrlib.transport.Server.get_url."""
1071
if sys.platform == 'win32':
1072
return self._get_sftp_url(urlutils.escape(self._homedir))
1074
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1077
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1078
"""A test server for sftp transports, using homedir relative urls."""
1081
"""See bzrlib.transport.Server.get_url."""
1082
return self._get_sftp_url("~/")
1085
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1086
"""A test servere for sftp transports, using absolute urls to non-home."""
1089
self._server_homedir = '/dev/noone/runs/tests/here'
1090
super(SFTPSiblingAbsoluteServer, self).setUp()
1093
def _sftp_connect(host, port, username, password):
1094
"""Connect to the remote sftp server.
1096
:raises: a TransportError 'could not connect'.
1098
:returns: an paramiko.sftp_client.SFTPClient
1100
TODO: Raise a more reasonable ConnectionFailed exception
1102
idx = (host, port, username)
1104
return _connected_hosts[idx]
1108
sftp = _sftp_connect_uncached(host, port, username, password)
1109
_connected_hosts[idx] = sftp
1112
def _sftp_connect_uncached(host, port, username, password):
1113
vendor = ssh._get_ssh_vendor()
1114
sftp = vendor.connect_sftp(username, password, host, port)
1118
def get_test_permutations():
1119
"""Return the permutations to be used in testing."""
1120
return [(SFTPTransport, SFTPAbsoluteServer),
1121
(SFTPTransport, SFTPHomeDirServer),
1122
(SFTPTransport, SFTPSiblingAbsoluteServer),