1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006 Canonical Ltd
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Implementation of Transport over SFTP, using paramiko."""
20
# TODO: Remove the transport-based lock_read and lock_write methods. They'll
21
# then raise TransportNotPossible, which will break remote access to any
22
# formats which rely on OS-level locks. That should be fine as those formats
23
# are pretty old, but these combinations may have to be removed from the test
24
# suite. Those formats all date back to 0.7; so we should be able to remove
25
# these methods when we officially drop support for those formats.
43
from bzrlib.errors import (FileExists,
44
NoSuchFile, PathNotChild,
50
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
51
from bzrlib.trace import mutter, warning
52
from bzrlib.transport import (
53
register_urlparse_netloc_protocol,
62
except ImportError, e:
63
raise ParamikoNotPresent(e)
65
from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
66
SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
68
from paramiko.sftp_attr import SFTPAttributes
69
from paramiko.sftp_file import SFTPFile
72
register_urlparse_netloc_protocol('sftp')
75
# This is a weakref dictionary, so that we can reuse connections
76
# that are still active. Long term, it might be nice to have some
77
# sort of expiration policy, such as disconnect if inactive for
78
# X seconds. But that requires a lot more fanciness.
79
_connected_hosts = weakref.WeakValueDictionary()
82
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
83
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
84
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
87
def clear_connection_cache():
88
"""Remove all hosts from the SFTP connection cache.
90
Primarily useful for test cases wanting to force garbage collection.
92
_connected_hosts.clear()
95
class SFTPLock(object):
96
"""This fakes a lock in a remote location.
98
A present lock is indicated just by the existence of a file. This
99
doesn't work well on all transports and they are only used in
100
deprecated storage formats.
103
__slots__ = ['path', 'lock_path', 'lock_file', 'transport']
105
def __init__(self, path, transport):
106
assert isinstance(transport, SFTPTransport)
108
self.lock_file = None
110
self.lock_path = path + '.write-lock'
111
self.transport = transport
113
# RBC 20060103 FIXME should we be using private methods here ?
114
abspath = transport._remote_path(self.lock_path)
115
self.lock_file = transport._sftp_open_exclusive(abspath)
117
raise LockError('File %r already locked' % (self.path,))
120
"""Should this warn, or actually try to cleanup?"""
122
warning("SFTPLock %r not explicitly unlocked" % (self.path,))
126
if not self.lock_file:
128
self.lock_file.close()
129
self.lock_file = None
131
self.transport.delete(self.lock_path)
132
except (NoSuchFile,):
133
# What specific errors should we catch here?
137
class SFTPUrlHandling(Transport):
138
"""Mix-in that does common handling of SSH/SFTP URLs."""
140
def __init__(self, base):
141
self._parse_url(base)
142
base = self._unparse_url(self._path)
145
super(SFTPUrlHandling, self).__init__(base)
147
def _parse_url(self, url):
149
self._username, self._password,
150
self._host, self._port, self._path) = self._split_url(url)
152
def _unparse_url(self, path):
153
"""Return a URL for a path relative to this transport.
155
path = urllib.quote(path)
156
# handle homedir paths
157
if not path.startswith('/'):
159
netloc = urllib.quote(self._host)
160
if self._username is not None:
161
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
162
if self._port is not None:
163
netloc = '%s:%d' % (netloc, self._port)
164
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
166
def _split_url(self, url):
167
(scheme, username, password, host, port, path) = split_url(url)
168
## assert scheme == 'sftp'
170
# the initial slash should be removed from the path, and treated
171
# as a homedir relative path (the path begins with a double slash
172
# if it is absolute).
173
# see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
174
# RBC 20060118 we are not using this as its too user hostile. instead
175
# we are following lftp and using /~/foo to mean '~/foo'.
176
# handle homedir paths
177
if path.startswith('/~/'):
181
return (scheme, username, password, host, port, path)
183
def abspath(self, relpath):
184
"""Return the full url to the given relative path.
186
@param relpath: the relative path or path components
187
@type relpath: str or list
189
return self._unparse_url(self._remote_path(relpath))
191
def _remote_path(self, relpath):
192
"""Return the path to be passed along the sftp protocol for relpath.
194
:param relpath: is a urlencoded string.
196
return self._combine_paths(self._path, relpath)
199
class SFTPTransport(SFTPUrlHandling):
200
"""Transport implementation for SFTP access."""
202
_do_prefetch = _default_do_prefetch
203
# TODO: jam 20060717 Conceivably these could be configurable, either
204
# by auto-tuning at run-time, or by a configuration (per host??)
205
# but the performance curve is pretty flat, so just going with
206
# reasonable defaults.
207
_max_readv_combine = 200
208
# Having to round trip to the server means waiting for a response,
209
# so it is better to download extra bytes.
210
# 8KiB had good performance for both local and remote network operations
211
_bytes_to_read_before_seek = 8192
213
# The sftp spec says that implementations SHOULD allow reads
214
# to be at least 32K. paramiko.readv() does an async request
215
# for the chunks. So we need to keep it within a single request
216
# size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
217
# up the request itself, rather than us having to worry about it
218
_max_request_size = 32768
220
def __init__(self, base, clone_from=None):
221
super(SFTPTransport, self).__init__(base)
222
if clone_from is None:
225
# use the same ssh connection, etc
226
self._sftp = clone_from._sftp
227
# super saves 'self.base'
229
def should_cache(self):
231
Return True if the data pulled across should be cached locally.
235
def clone(self, offset=None):
237
Return a new SFTPTransport with root at self.base + offset.
238
We share the same SFTP session between such transports, because it's
239
fairly expensive to set them up.
242
return SFTPTransport(self.base, self)
244
return SFTPTransport(self.abspath(offset), self)
246
def _remote_path(self, relpath):
247
"""Return the path to be passed along the sftp protocol for relpath.
249
relpath is a urlencoded string.
251
:return: a path prefixed with / for regular abspath-based urls, or a
252
path that does not begin with / for urls which begin with /~/.
254
# FIXME: share the common code across transports
255
assert isinstance(relpath, basestring)
256
basepath = self._path.split('/')
257
if relpath.startswith('/'):
259
relpath = urlutils.unescape(relpath).split('/')
260
if len(basepath) > 0 and basepath[-1] == '':
261
basepath = basepath[:-1]
265
if len(basepath) == 0:
266
# In most filesystems, a request for the parent
267
# of root, just returns root.
275
path = '/'.join(basepath)
276
# mutter('relpath => remotepath %s => %s', relpath, path)
279
def relpath(self, abspath):
280
scheme, username, password, host, port, path = self._split_url(abspath)
282
if (username != self._username):
283
error.append('username mismatch')
284
if (host != self._host):
285
error.append('host mismatch')
286
if (port != self._port):
287
error.append('port mismatch')
288
if (not path.startswith(self._path)):
289
error.append('path mismatch')
291
extra = ': ' + ', '.join(error)
292
raise PathNotChild(abspath, self.base, extra=extra)
294
return path[pl:].strip('/')
296
def has(self, relpath):
298
Does the target location exist?
301
self._sftp.stat(self._remote_path(relpath))
306
def get(self, relpath):
308
Get the file at the given relative path.
310
:param relpath: The relative path to the file
313
path = self._remote_path(relpath)
314
f = self._sftp.file(path, mode='rb')
315
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
318
except (IOError, paramiko.SSHException), e:
319
self._translate_io_exception(e, path, ': error retrieving')
321
def readv(self, relpath, offsets):
322
"""See Transport.readv()"""
323
# We overload the default readv() because we want to use a file
324
# that does not have prefetch enabled.
325
# Also, if we have a new paramiko, it implements an async readv()
330
path = self._remote_path(relpath)
331
fp = self._sftp.file(path, mode='rb')
332
readv = getattr(fp, 'readv', None)
334
return self._sftp_readv(fp, offsets, relpath)
335
mutter('seek and read %s offsets', len(offsets))
336
return self._seek_and_read(fp, offsets, relpath)
337
except (IOError, paramiko.SSHException), e:
338
self._translate_io_exception(e, path, ': error retrieving')
340
def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
341
"""Use the readv() member of fp to do async readv.
343
And then read them using paramiko.readv(). paramiko.readv()
344
does not support ranges > 64K, so it caps the request size, and
345
just reads until it gets all the stuff it wants
347
offsets = list(offsets)
348
sorted_offsets = sorted(offsets)
350
# The algorithm works as follows:
351
# 1) Coalesce nearby reads into a single chunk
352
# This generates a list of combined regions, the total size
353
# and the size of the sub regions. This coalescing step is limited
354
# in the number of nearby chunks to combine, and is allowed to
355
# skip small breaks in the requests. Limiting it makes sure that
356
# we can start yielding some data earlier, and skipping means we
357
# make fewer requests. (Beneficial even when using async)
358
# 2) Break up this combined regions into chunks that are smaller
359
# than 64KiB. Technically the limit is 65536, but we are a
360
# little bit conservative. This is because sftp has a maximum
361
# return chunk size of 64KiB (max size of an unsigned short)
362
# 3) Issue a readv() to paramiko to create an async request for
364
# 4) Read in the data as it comes back, until we've read one
365
# continuous section as determined in step 1
366
# 5) Break up the full sections into hunks for the original requested
367
# offsets. And put them in a cache
368
# 6) Check if the next request is in the cache, and if it is, remove
369
# it from the cache, and yield its data. Continue until no more
370
# entries are in the cache.
371
# 7) loop back to step 4 until all data has been read
373
# TODO: jam 20060725 This could be optimized one step further, by
374
# attempting to yield whatever data we have read, even before
375
# the first coallesced section has been fully processed.
377
# When coalescing for use with readv(), we don't really need to
378
# use any fudge factor, because the requests are made asynchronously
379
coalesced = list(self._coalesce_offsets(sorted_offsets,
380
limit=self._max_readv_combine,
384
for c_offset in coalesced:
385
start = c_offset.start
386
size = c_offset.length
388
# We need to break this up into multiple requests
390
next_size = min(size, self._max_request_size)
391
requests.append((start, next_size))
395
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
396
len(offsets), len(coalesced), len(requests))
398
# Queue the current read until we have read the full coalesced section
401
cur_coalesced_stack = iter(coalesced)
402
cur_coalesced = cur_coalesced_stack.next()
404
# Cache the results, but only until they have been fulfilled
406
# turn the list of offsets into a stack
407
offset_stack = iter(offsets)
408
cur_offset_and_size = offset_stack.next()
410
for data in fp.readv(requests):
412
cur_data_len += len(data)
414
if cur_data_len < cur_coalesced.length:
416
assert cur_data_len == cur_coalesced.length, \
417
"Somehow we read too much: %s != %s" % (cur_data_len,
418
cur_coalesced.length)
419
all_data = ''.join(cur_data)
423
for suboffset, subsize in cur_coalesced.ranges:
424
key = (cur_coalesced.start+suboffset, subsize)
425
data_map[key] = all_data[suboffset:suboffset+subsize]
427
# Now that we've read some data, see if we can yield anything back
428
while cur_offset_and_size in data_map:
429
this_data = data_map.pop(cur_offset_and_size)
430
yield cur_offset_and_size[0], this_data
431
cur_offset_and_size = offset_stack.next()
433
# We read a coalesced entry, so mark it as done
435
# Now that we've read all of the data for this coalesced section
437
cur_coalesced = cur_coalesced_stack.next()
439
if cur_coalesced is not None:
440
raise errors.ShortReadvError(relpath, cur_coalesced.start,
441
cur_coalesced.length, len(data))
443
def put_file(self, relpath, f, mode=None):
445
Copy the file-like object into the location.
447
:param relpath: Location to put the contents, relative to base.
448
:param f: File-like object.
449
:param mode: The final mode for the file
451
final_path = self._remote_path(relpath)
452
self._put(final_path, f, mode=mode)
454
def _put(self, abspath, f, mode=None):
455
"""Helper function so both put() and copy_abspaths can reuse the code"""
456
tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
457
os.getpid(), random.randint(0,0x7FFFFFFF))
458
fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
462
fout.set_pipelined(True)
464
except (IOError, paramiko.SSHException), e:
465
self._translate_io_exception(e, tmp_abspath)
466
# XXX: This doesn't truly help like we would like it to.
467
# The problem is that openssh strips sticky bits. So while we
468
# can properly set group write permission, we lose the group
469
# sticky bit. So it is probably best to stop chmodding, and
470
# just tell users that they need to set the umask correctly.
471
# The attr.st_mode = mode, in _sftp_open_exclusive
472
# will handle when the user wants the final mode to be more
473
# restrictive. And then we avoid a round trip. Unless
474
# paramiko decides to expose an async chmod()
476
# This is designed to chmod() right before we close.
477
# Because we set_pipelined() earlier, theoretically we might
478
# avoid the round trip for fout.close()
480
self._sftp.chmod(tmp_abspath, mode)
483
self._rename_and_overwrite(tmp_abspath, abspath)
485
# If we fail, try to clean up the temporary file
486
# before we throw the exception
487
# but don't let another exception mess things up
488
# Write out the traceback, because otherwise
489
# the catch and throw destroys it
491
mutter(traceback.format_exc())
495
self._sftp.remove(tmp_abspath)
497
# raise the saved except
499
# raise the original with its traceback if we can.
502
def _put_non_atomic_helper(self, relpath, writer, mode=None,
503
create_parent_dir=False,
505
abspath = self._remote_path(relpath)
507
# TODO: jam 20060816 paramiko doesn't publicly expose a way to
508
# set the file mode at create time. If it does, use it.
509
# But for now, we just chmod later anyway.
511
def _open_and_write_file():
512
"""Try to open the target file, raise error on failure"""
516
fout = self._sftp.file(abspath, mode='wb')
517
fout.set_pipelined(True)
519
except (paramiko.SSHException, IOError), e:
520
self._translate_io_exception(e, abspath,
523
# This is designed to chmod() right before we close.
524
# Because we set_pipelined() earlier, theoretically we might
525
# avoid the round trip for fout.close()
527
self._sftp.chmod(abspath, mode)
532
if not create_parent_dir:
533
_open_and_write_file()
536
# Try error handling to create the parent directory if we need to
538
_open_and_write_file()
540
# Try to create the parent directory, and then go back to
542
parent_dir = os.path.dirname(abspath)
543
self._mkdir(parent_dir, dir_mode)
544
_open_and_write_file()
546
def put_file_non_atomic(self, relpath, f, mode=None,
547
create_parent_dir=False,
549
"""Copy the file-like object into the target location.
551
This function is not strictly safe to use. It is only meant to
552
be used when you already know that the target does not exist.
553
It is not safe, because it will open and truncate the remote
554
file. So there may be a time when the file has invalid contents.
556
:param relpath: The remote location to put the contents.
557
:param f: File-like object.
558
:param mode: Possible access permissions for new file.
559
None means do not set remote permissions.
560
:param create_parent_dir: If we cannot create the target file because
561
the parent directory does not exist, go ahead and
562
create it, and then try again.
566
self._put_non_atomic_helper(relpath, writer, mode=mode,
567
create_parent_dir=create_parent_dir,
570
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
571
create_parent_dir=False,
575
self._put_non_atomic_helper(relpath, writer, mode=mode,
576
create_parent_dir=create_parent_dir,
579
def iter_files_recursive(self):
580
"""Walk the relative paths of all files in this transport."""
581
queue = list(self.list_dir('.'))
583
relpath = queue.pop(0)
584
st = self.stat(relpath)
585
if stat.S_ISDIR(st.st_mode):
586
for i, basename in enumerate(self.list_dir(relpath)):
587
queue.insert(i, relpath+'/'+basename)
591
def _mkdir(self, abspath, mode=None):
597
self._sftp.mkdir(abspath, local_mode)
599
self._sftp.chmod(abspath, mode=mode)
600
except (paramiko.SSHException, IOError), e:
601
self._translate_io_exception(e, abspath, ': unable to mkdir',
602
failure_exc=FileExists)
604
def mkdir(self, relpath, mode=None):
605
"""Create a directory at the given path."""
606
self._mkdir(self._remote_path(relpath), mode=mode)
608
def _translate_io_exception(self, e, path, more_info='',
609
failure_exc=PathError):
610
"""Translate a paramiko or IOError into a friendlier exception.
612
:param e: The original exception
613
:param path: The path in question when the error is raised
614
:param more_info: Extra information that can be included,
615
such as what was going on
616
:param failure_exc: Paramiko has the super fun ability to raise completely
617
opaque errors that just set "e.args = ('Failure',)" with
619
If this parameter is set, it defines the exception
620
to raise in these cases.
622
# paramiko seems to generate detailless errors.
623
self._translate_error(e, path, raise_generic=False)
624
if getattr(e, 'args', None) is not None:
625
if (e.args == ('No such file or directory',) or
626
e.args == ('No such file',)):
627
raise NoSuchFile(path, str(e) + more_info)
628
if (e.args == ('mkdir failed',)):
629
raise FileExists(path, str(e) + more_info)
630
# strange but true, for the paramiko server.
631
if (e.args == ('Failure',)):
632
raise failure_exc(path, str(e) + more_info)
633
mutter('Raising exception with args %s', e.args)
634
if getattr(e, 'errno', None) is not None:
635
mutter('Raising exception with errno %s', e.errno)
638
def append_file(self, relpath, f, mode=None):
640
Append the text in the file-like object into the final
644
path = self._remote_path(relpath)
645
fout = self._sftp.file(path, 'ab')
647
self._sftp.chmod(path, mode)
651
except (IOError, paramiko.SSHException), e:
652
self._translate_io_exception(e, relpath, ': unable to append')
654
def rename(self, rel_from, rel_to):
655
"""Rename without special overwriting"""
657
self._sftp.rename(self._remote_path(rel_from),
658
self._remote_path(rel_to))
659
except (IOError, paramiko.SSHException), e:
660
self._translate_io_exception(e, rel_from,
661
': unable to rename to %r' % (rel_to))
663
def _rename_and_overwrite(self, abs_from, abs_to):
664
"""Do a fancy rename on the remote server.
666
Using the implementation provided by osutils.
669
fancy_rename(abs_from, abs_to,
670
rename_func=self._sftp.rename,
671
unlink_func=self._sftp.remove)
672
except (IOError, paramiko.SSHException), e:
673
self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
675
def move(self, rel_from, rel_to):
676
"""Move the item at rel_from to the location at rel_to"""
677
path_from = self._remote_path(rel_from)
678
path_to = self._remote_path(rel_to)
679
self._rename_and_overwrite(path_from, path_to)
681
def delete(self, relpath):
682
"""Delete the item at relpath"""
683
path = self._remote_path(relpath)
685
self._sftp.remove(path)
686
except (IOError, paramiko.SSHException), e:
687
self._translate_io_exception(e, path, ': unable to delete')
690
"""Return True if this store supports listing."""
693
def list_dir(self, relpath):
695
Return a list of all files at the given location.
697
# does anything actually use this?
699
# This is at least used by copy_tree for remote upgrades.
700
# -- David Allouche 2006-08-11
701
path = self._remote_path(relpath)
703
entries = self._sftp.listdir(path)
704
except (IOError, paramiko.SSHException), e:
705
self._translate_io_exception(e, path, ': failed to list_dir')
706
return [urlutils.escape(entry) for entry in entries]
708
def rmdir(self, relpath):
709
"""See Transport.rmdir."""
710
path = self._remote_path(relpath)
712
return self._sftp.rmdir(path)
713
except (IOError, paramiko.SSHException), e:
714
self._translate_io_exception(e, path, ': failed to rmdir')
716
def stat(self, relpath):
717
"""Return the stat information for a file."""
718
path = self._remote_path(relpath)
720
return self._sftp.stat(path)
721
except (IOError, paramiko.SSHException), e:
722
self._translate_io_exception(e, path, ': unable to stat')
724
def lock_read(self, relpath):
726
Lock the given file for shared (read) access.
727
:return: A lock object, which has an unlock() member function
729
# FIXME: there should be something clever i can do here...
730
class BogusLock(object):
731
def __init__(self, path):
735
return BogusLock(relpath)
737
def lock_write(self, relpath):
739
Lock the given file for exclusive (write) access.
740
WARNING: many transports do not support this, so trying avoid using it
742
:return: A lock object, which has an unlock() member function
744
# This is a little bit bogus, but basically, we create a file
745
# which should not already exist, and if it does, we assume
746
# that there is a lock, and if it doesn't, the we assume
747
# that we have taken the lock.
748
return SFTPLock(relpath, self)
750
def _sftp_connect(self):
751
"""Connect to the remote sftp server.
752
After this, self._sftp should have a valid connection (or
753
we raise an TransportError 'could not connect').
755
TODO: Raise a more reasonable ConnectionFailed exception
757
self._sftp = _sftp_connect(self._host, self._port, self._username,
760
def _sftp_open_exclusive(self, abspath, mode=None):
761
"""Open a remote path exclusively.
763
SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
764
the file already exists. However it does not expose this
765
at the higher level of SFTPClient.open(), so we have to
768
WARNING: This breaks the SFTPClient abstraction, so it
769
could easily break against an updated version of paramiko.
771
:param abspath: The remote absolute path where the file should be opened
772
:param mode: The mode permissions bits for the new file
774
# TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
775
# using the 'x' flag to indicate SFTP_FLAG_EXCL.
776
# However, there is no way to set the permission mode at open
777
# time using the sftp_client.file() functionality.
778
path = self._sftp._adjust_cwd(abspath)
779
# mutter('sftp abspath %s => %s', abspath, path)
780
attr = SFTPAttributes()
783
omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
784
| SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
786
t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
788
raise TransportError('Expected an SFTP handle')
789
handle = msg.get_string()
790
return SFTPFile(self._sftp, handle, 'wb', -1)
791
except (paramiko.SSHException, IOError), e:
792
self._translate_io_exception(e, abspath, ': unable to open',
793
failure_exc=FileExists)
795
def _can_roundtrip_unix_modebits(self):
796
if sys.platform == 'win32':
802
# ------------- server test implementation --------------
805
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
807
STUB_SERVER_KEY = """
808
-----BEGIN RSA PRIVATE KEY-----
809
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
810
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
811
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
812
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
813
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
814
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
815
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
816
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
817
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
818
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
819
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
820
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
821
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
822
-----END RSA PRIVATE KEY-----
826
class SocketListener(threading.Thread):
828
def __init__(self, callback):
829
threading.Thread.__init__(self)
830
self._callback = callback
831
self._socket = socket.socket()
832
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
833
self._socket.bind(('localhost', 0))
834
self._socket.listen(1)
835
self.port = self._socket.getsockname()[1]
836
self._stop_event = threading.Event()
839
# called from outside this thread
840
self._stop_event.set()
841
# use a timeout here, because if the test fails, the server thread may
842
# never notice the stop_event.
848
readable, writable_unused, exception_unused = \
849
select.select([self._socket], [], [], 0.1)
850
if self._stop_event.isSet():
852
if len(readable) == 0:
855
s, addr_unused = self._socket.accept()
856
# because the loopback socket is inline, and transports are
857
# never explicitly closed, best to launch a new thread.
858
threading.Thread(target=self._callback, args=(s,)).start()
859
except socket.error, x:
860
sys.excepthook(*sys.exc_info())
861
warning('Socket error during accept() within unit test server'
864
# probably a failed test; unit test thread will log the
866
sys.excepthook(*sys.exc_info())
867
warning('Exception from within unit test server thread: %r' %
871
class SocketDelay(object):
872
"""A socket decorator to make TCP appear slower.
874
This changes recv, send, and sendall to add a fixed latency to each python
875
call if a new roundtrip is detected. That is, when a recv is called and the
876
flag new_roundtrip is set, latency is charged. Every send and send_all
879
In addition every send, sendall and recv sleeps a bit per character send to
882
Not all methods are implemented, this is deliberate as this class is not a
883
replacement for the builtin sockets layer. fileno is not implemented to
884
prevent the proxy being bypassed.
888
_proxied_arguments = dict.fromkeys([
889
"close", "getpeername", "getsockname", "getsockopt", "gettimeout",
890
"setblocking", "setsockopt", "settimeout", "shutdown"])
892
def __init__(self, sock, latency, bandwidth=1.0,
895
:param bandwith: simulated bandwith (MegaBit)
896
:param really_sleep: If set to false, the SocketDelay will just
897
increase a counter, instead of calling time.sleep. This is useful for
898
unittesting the SocketDelay.
901
self.latency = latency
902
self.really_sleep = really_sleep
903
self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
904
self.new_roundtrip = False
907
if self.really_sleep:
910
SocketDelay.simulated_time += s
912
def __getattr__(self, attr):
913
if attr in SocketDelay._proxied_arguments:
914
return getattr(self.sock, attr)
915
raise AttributeError("'SocketDelay' object has no attribute %r" %
919
return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
922
def recv(self, *args):
923
data = self.sock.recv(*args)
924
if data and self.new_roundtrip:
925
self.new_roundtrip = False
926
self.sleep(self.latency)
927
self.sleep(len(data) * self.time_per_byte)
930
def sendall(self, data, flags=0):
931
if not self.new_roundtrip:
932
self.new_roundtrip = True
933
self.sleep(self.latency)
934
self.sleep(len(data) * self.time_per_byte)
935
return self.sock.sendall(data, flags)
937
def send(self, data, flags=0):
938
if not self.new_roundtrip:
939
self.new_roundtrip = True
940
self.sleep(self.latency)
941
bytes_sent = self.sock.send(data, flags)
942
self.sleep(bytes_sent * self.time_per_byte)
946
class SFTPServer(Server):
947
"""Common code for SFTP server facilities."""
950
self._original_vendor = None
952
self._server_homedir = None
953
self._listener = None
955
self._vendor = ssh.ParamikoVendor()
960
def _get_sftp_url(self, path):
961
"""Calculate an sftp url to this server for path."""
962
return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
964
def log(self, message):
965
"""StubServer uses this to log when a new server is created."""
966
self.logs.append(message)
968
def _run_server_entry(self, sock):
969
"""Entry point for all implementations of _run_server.
971
If self.add_latency is > 0.000001 then sock is given a latency adding
974
if self.add_latency > 0.000001:
975
sock = SocketDelay(sock, self.add_latency)
976
return self._run_server(sock)
978
def _run_server(self, s):
979
ssh_server = paramiko.Transport(s)
980
key_file = pathjoin(self._homedir, 'test_rsa.key')
981
f = open(key_file, 'w')
982
f.write(STUB_SERVER_KEY)
984
host_key = paramiko.RSAKey.from_private_key_file(key_file)
985
ssh_server.add_server_key(host_key)
986
server = StubServer(self)
987
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
988
StubSFTPServer, root=self._root,
989
home=self._server_homedir)
990
event = threading.Event()
991
ssh_server.start_server(event, server)
995
self._original_vendor = ssh._ssh_vendor
996
ssh._ssh_vendor = self._vendor
997
if sys.platform == 'win32':
998
# Win32 needs to use the UNICODE api
999
self._homedir = getcwd()
1001
# But Linux SFTP servers should just deal in bytestreams
1002
self._homedir = os.getcwd()
1003
if self._server_homedir is None:
1004
self._server_homedir = self._homedir
1006
if sys.platform == 'win32':
1008
self._listener = SocketListener(self._run_server_entry)
1009
self._listener.setDaemon(True)
1010
self._listener.start()
1013
"""See bzrlib.transport.Server.tearDown."""
1014
self._listener.stop()
1015
ssh._ssh_vendor = self._original_vendor
1017
def get_bogus_url(self):
1018
"""See bzrlib.transport.Server.get_bogus_url."""
1019
# this is chosen to try to prevent trouble with proxies, wierd dns, etc
1020
# we bind a random socket, so that we get a guaranteed unused port
1021
# we just never listen on that port
1023
s.bind(('localhost', 0))
1024
return 'sftp://%s:%s/' % s.getsockname()
1027
class SFTPFullAbsoluteServer(SFTPServer):
1028
"""A test server for sftp transports, using absolute urls and ssh."""
1031
"""See bzrlib.transport.Server.get_url."""
1032
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1035
class SFTPServerWithoutSSH(SFTPServer):
1036
"""An SFTP server that uses a simple TCP socket pair rather than SSH."""
1039
super(SFTPServerWithoutSSH, self).__init__()
1040
self._vendor = ssh.LoopbackVendor()
1042
def _run_server(self, sock):
1043
# Re-import these as locals, so that they're still accessible during
1044
# interpreter shutdown (when all module globals get set to None, leading
1045
# to confusing errors like "'NoneType' object has no attribute 'error'".
1046
class FakeChannel(object):
1047
def get_transport(self):
1049
def get_log_channel(self):
1053
def get_hexdump(self):
1058
server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1059
root=self._root, home=self._server_homedir)
1061
server.start_subsystem('sftp', None, sock)
1062
except socket.error, e:
1063
if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1064
# it's okay for the client to disconnect abruptly
1065
# (bug in paramiko 1.6: it should absorb this exception)
1069
except Exception, e:
1070
import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
1071
server.finish_subsystem()
1074
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1075
"""A test server for sftp transports, using absolute urls."""
1078
"""See bzrlib.transport.Server.get_url."""
1079
if sys.platform == 'win32':
1080
return self._get_sftp_url(urlutils.escape(self._homedir))
1082
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1085
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1086
"""A test server for sftp transports, using homedir relative urls."""
1089
"""See bzrlib.transport.Server.get_url."""
1090
return self._get_sftp_url("~/")
1093
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1094
"""A test servere for sftp transports, using absolute urls to non-home."""
1097
self._server_homedir = '/dev/noone/runs/tests/here'
1098
super(SFTPSiblingAbsoluteServer, self).setUp()
1101
def _sftp_connect(host, port, username, password):
1102
"""Connect to the remote sftp server.
1104
:raises: a TransportError 'could not connect'.
1106
:returns: an paramiko.sftp_client.SFTPClient
1108
TODO: Raise a more reasonable ConnectionFailed exception
1110
idx = (host, port, username)
1112
return _connected_hosts[idx]
1116
sftp = _sftp_connect_uncached(host, port, username, password)
1117
_connected_hosts[idx] = sftp
1120
def _sftp_connect_uncached(host, port, username, password):
1121
vendor = ssh._get_ssh_vendor()
1122
sftp = vendor.connect_sftp(username, password, host, port)
1126
def get_test_permutations():
1127
"""Return the permutations to be used in testing."""
1128
return [(SFTPTransport, SFTPAbsoluteServer),
1129
(SFTPTransport, SFTPHomeDirServer),
1130
(SFTPTransport, SFTPSiblingAbsoluteServer),