1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006 Canonical Ltd
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Implementation of Transport over SFTP, using paramiko."""
33
from bzrlib.errors import (FileExists,
34
NoSuchFile, PathNotChild,
41
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
42
from bzrlib.trace import mutter, warning
43
from bzrlib.transport import (
44
register_urlparse_netloc_protocol,
50
import bzrlib.urlutils as urlutils
54
except ImportError, e:
55
raise ParamikoNotPresent(e)
57
from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
58
SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
60
from paramiko.sftp_attr import SFTPAttributes
61
from paramiko.sftp_file import SFTPFile
64
register_urlparse_netloc_protocol('sftp')
67
# This is a weakref dictionary, so that we can reuse connections
68
# that are still active. Long term, it might be nice to have some
69
# sort of expiration policy, such as disconnect if inactive for
70
# X seconds. But that requires a lot more fanciness.
71
_connected_hosts = weakref.WeakValueDictionary()
74
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
75
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
76
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
79
def clear_connection_cache():
80
"""Remove all hosts from the SFTP connection cache.
82
Primarily useful for test cases wanting to force garbage collection.
84
_connected_hosts.clear()
87
class SFTPLock(object):
88
"""This fakes a lock in a remote location."""
89
__slots__ = ['path', 'lock_path', 'lock_file', 'transport']
90
def __init__(self, path, transport):
91
assert isinstance(transport, SFTPTransport)
95
self.lock_path = path + '.write-lock'
96
self.transport = transport
98
# RBC 20060103 FIXME should we be using private methods here ?
99
abspath = transport._remote_path(self.lock_path)
100
self.lock_file = transport._sftp_open_exclusive(abspath)
102
raise LockError('File %r already locked' % (self.path,))
105
"""Should this warn, or actually try to cleanup?"""
107
warning("SFTPLock %r not explicitly unlocked" % (self.path,))
111
if not self.lock_file:
113
self.lock_file.close()
114
self.lock_file = None
116
self.transport.delete(self.lock_path)
117
except (NoSuchFile,):
118
# What specific errors should we catch here?
122
class SFTPTransport(Transport):
123
"""Transport implementation for SFTP access."""
125
_do_prefetch = _default_do_prefetch
126
# TODO: jam 20060717 Conceivably these could be configurable, either
127
# by auto-tuning at run-time, or by a configuration (per host??)
128
# but the performance curve is pretty flat, so just going with
129
# reasonable defaults.
130
_max_readv_combine = 200
131
# Having to round trip to the server means waiting for a response,
132
# so it is better to download extra bytes.
133
# 8KiB had good performance for both local and remote network operations
134
_bytes_to_read_before_seek = 8192
136
# The sftp spec says that implementations SHOULD allow reads
137
# to be at least 32K. paramiko.readv() does an async request
138
# for the chunks. So we need to keep it within a single request
139
# size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
140
# up the request itself, rather than us having to worry about it
141
_max_request_size = 32768
143
def __init__(self, base, clone_from=None):
144
assert base.startswith('sftp://')
145
self._parse_url(base)
146
base = self._unparse_url()
149
super(SFTPTransport, self).__init__(base)
150
if clone_from is None:
153
# use the same ssh connection, etc
154
self._sftp = clone_from._sftp
155
# super saves 'self.base'
157
def should_cache(self):
159
Return True if the data pulled across should be cached locally.
163
def clone(self, offset=None):
165
Return a new SFTPTransport with root at self.base + offset.
166
We share the same SFTP session between such transports, because it's
167
fairly expensive to set them up.
170
return SFTPTransport(self.base, self)
172
return SFTPTransport(self.abspath(offset), self)
174
def abspath(self, relpath):
176
Return the full url to the given relative path.
178
@param relpath: the relative path or path components
179
@type relpath: str or list
181
return self._unparse_url(self._remote_path(relpath))
183
def _remote_path(self, relpath):
184
"""Return the path to be passed along the sftp protocol for relpath.
186
relpath is a urlencoded string.
188
# FIXME: share the common code across transports
189
assert isinstance(relpath, basestring)
190
relpath = urlutils.unescape(relpath).split('/')
191
basepath = self._path.split('/')
192
if len(basepath) > 0 and basepath[-1] == '':
193
basepath = basepath[:-1]
197
if len(basepath) == 0:
198
# In most filesystems, a request for the parent
199
# of root, just returns root.
207
path = '/'.join(basepath)
208
# mutter('relpath => remotepath %s => %s', relpath, path)
211
def relpath(self, abspath):
212
username, password, host, port, path = self._split_url(abspath)
214
if (username != self._username):
215
error.append('username mismatch')
216
if (host != self._host):
217
error.append('host mismatch')
218
if (port != self._port):
219
error.append('port mismatch')
220
if (not path.startswith(self._path)):
221
error.append('path mismatch')
223
extra = ': ' + ', '.join(error)
224
raise PathNotChild(abspath, self.base, extra=extra)
226
return path[pl:].strip('/')
228
def has(self, relpath):
230
Does the target location exist?
233
self._sftp.stat(self._remote_path(relpath))
238
def get(self, relpath):
240
Get the file at the given relative path.
242
:param relpath: The relative path to the file
245
path = self._remote_path(relpath)
246
f = self._sftp.file(path, mode='rb')
247
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
250
except (IOError, paramiko.SSHException), e:
251
self._translate_io_exception(e, path, ': error retrieving')
253
def readv(self, relpath, offsets):
254
"""See Transport.readv()"""
255
# We overload the default readv() because we want to use a file
256
# that does not have prefetch enabled.
257
# Also, if we have a new paramiko, it implements an async readv()
262
path = self._remote_path(relpath)
263
fp = self._sftp.file(path, mode='rb')
264
readv = getattr(fp, 'readv', None)
266
return self._sftp_readv(fp, offsets)
267
mutter('seek and read %s offsets', len(offsets))
268
return self._seek_and_read(fp, offsets)
269
except (IOError, paramiko.SSHException), e:
270
self._translate_io_exception(e, path, ': error retrieving')
272
def _sftp_readv(self, fp, offsets):
273
"""Use the readv() member of fp to do async readv.
275
And then read them using paramiko.readv(). paramiko.readv()
276
does not support ranges > 64K, so it caps the request size, and
277
just reads until it gets all the stuff it wants
279
offsets = list(offsets)
280
sorted_offsets = sorted(offsets)
282
# The algorithm works as follows:
283
# 1) Coalesce nearby reads into a single chunk
284
# This generates a list of combined regions, the total size
285
# and the size of the sub regions. This coalescing step is limited
286
# in the number of nearby chunks to combine, and is allowed to
287
# skip small breaks in the requests. Limiting it makes sure that
288
# we can start yielding some data earlier, and skipping means we
289
# make fewer requests. (Beneficial even when using async)
290
# 2) Break up this combined regions into chunks that are smaller
291
# than 64KiB. Technically the limit is 65536, but we are a
292
# little bit conservative. This is because sftp has a maximum
293
# return chunk size of 64KiB (max size of an unsigned short)
294
# 3) Issue a readv() to paramiko to create an async request for
296
# 4) Read in the data as it comes back, until we've read one
297
# continuous section as determined in step 1
298
# 5) Break up the full sections into hunks for the original requested
299
# offsets. And put them in a cache
300
# 6) Check if the next request is in the cache, and if it is, remove
301
# it from the cache, and yield its data. Continue until no more
302
# entries are in the cache.
303
# 7) loop back to step 4 until all data has been read
305
# TODO: jam 20060725 This could be optimized one step further, by
306
# attempting to yield whatever data we have read, even before
307
# the first coallesced section has been fully processed.
309
# When coalescing for use with readv(), we don't really need to
310
# use any fudge factor, because the requests are made asynchronously
311
coalesced = list(self._coalesce_offsets(sorted_offsets,
312
limit=self._max_readv_combine,
316
for c_offset in coalesced:
317
start = c_offset.start
318
size = c_offset.length
320
# We need to break this up into multiple requests
322
next_size = min(size, self._max_request_size)
323
requests.append((start, next_size))
327
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
328
len(offsets), len(coalesced), len(requests))
330
# Queue the current read until we have read the full coalesced section
333
cur_coalesced_stack = iter(coalesced)
334
cur_coalesced = cur_coalesced_stack.next()
336
# Cache the results, but only until they have been fulfilled
338
# turn the list of offsets into a stack
339
offset_stack = iter(offsets)
340
cur_offset_and_size = offset_stack.next()
342
for data in fp.readv(requests):
344
cur_data_len += len(data)
346
if cur_data_len < cur_coalesced.length:
348
assert cur_data_len == cur_coalesced.length, \
349
"Somehow we read too much: %s != %s" % (cur_data_len,
350
cur_coalesced.length)
351
all_data = ''.join(cur_data)
355
for suboffset, subsize in cur_coalesced.ranges:
356
key = (cur_coalesced.start+suboffset, subsize)
357
data_map[key] = all_data[suboffset:suboffset+subsize]
359
# Now that we've read some data, see if we can yield anything back
360
while cur_offset_and_size in data_map:
361
this_data = data_map.pop(cur_offset_and_size)
362
yield cur_offset_and_size[0], this_data
363
cur_offset_and_size = offset_stack.next()
365
# Now that we've read all of the data for this coalesced section
367
cur_coalesced = cur_coalesced_stack.next()
369
def put_file(self, relpath, f, mode=None):
371
Copy the file-like object into the location.
373
:param relpath: Location to put the contents, relative to base.
374
:param f: File-like object.
375
:param mode: The final mode for the file
377
final_path = self._remote_path(relpath)
378
self._put(final_path, f, mode=mode)
380
def _put(self, abspath, f, mode=None):
381
"""Helper function so both put() and copy_abspaths can reuse the code"""
382
tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
383
os.getpid(), random.randint(0,0x7FFFFFFF))
384
fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
388
fout.set_pipelined(True)
390
except (IOError, paramiko.SSHException), e:
391
self._translate_io_exception(e, tmp_abspath)
392
# XXX: This doesn't truly help like we would like it to.
393
# The problem is that openssh strips sticky bits. So while we
394
# can properly set group write permission, we lose the group
395
# sticky bit. So it is probably best to stop chmodding, and
396
# just tell users that they need to set the umask correctly.
397
# The attr.st_mode = mode, in _sftp_open_exclusive
398
# will handle when the user wants the final mode to be more
399
# restrictive. And then we avoid a round trip. Unless
400
# paramiko decides to expose an async chmod()
402
# This is designed to chmod() right before we close.
403
# Because we set_pipelined() earlier, theoretically we might
404
# avoid the round trip for fout.close()
406
self._sftp.chmod(tmp_abspath, mode)
409
self._rename_and_overwrite(tmp_abspath, abspath)
411
# If we fail, try to clean up the temporary file
412
# before we throw the exception
413
# but don't let another exception mess things up
414
# Write out the traceback, because otherwise
415
# the catch and throw destroys it
417
mutter(traceback.format_exc())
421
self._sftp.remove(tmp_abspath)
423
# raise the saved except
425
# raise the original with its traceback if we can.
428
def _put_non_atomic_helper(self, relpath, writer, mode=None,
429
create_parent_dir=False,
431
abspath = self._remote_path(relpath)
433
# TODO: jam 20060816 paramiko doesn't publicly expose a way to
434
# set the file mode at create time. If it does, use it.
435
# But for now, we just chmod later anyway.
437
def _open_and_write_file():
438
"""Try to open the target file, raise error on failure"""
442
fout = self._sftp.file(abspath, mode='wb')
443
fout.set_pipelined(True)
445
except (paramiko.SSHException, IOError), e:
446
self._translate_io_exception(e, abspath,
449
# This is designed to chmod() right before we close.
450
# Because we set_pipelined() earlier, theoretically we might
451
# avoid the round trip for fout.close()
453
self._sftp.chmod(abspath, mode)
458
if not create_parent_dir:
459
_open_and_write_file()
462
# Try error handling to create the parent directory if we need to
464
_open_and_write_file()
466
# Try to create the parent directory, and then go back to
468
parent_dir = os.path.dirname(abspath)
469
self._mkdir(parent_dir, dir_mode)
470
_open_and_write_file()
472
def put_file_non_atomic(self, relpath, f, mode=None,
473
create_parent_dir=False,
475
"""Copy the file-like object into the target location.
477
This function is not strictly safe to use. It is only meant to
478
be used when you already know that the target does not exist.
479
It is not safe, because it will open and truncate the remote
480
file. So there may be a time when the file has invalid contents.
482
:param relpath: The remote location to put the contents.
483
:param f: File-like object.
484
:param mode: Possible access permissions for new file.
485
None means do not set remote permissions.
486
:param create_parent_dir: If we cannot create the target file because
487
the parent directory does not exist, go ahead and
488
create it, and then try again.
492
self._put_non_atomic_helper(relpath, writer, mode=mode,
493
create_parent_dir=create_parent_dir,
496
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
497
create_parent_dir=False,
501
self._put_non_atomic_helper(relpath, writer, mode=mode,
502
create_parent_dir=create_parent_dir,
505
def iter_files_recursive(self):
506
"""Walk the relative paths of all files in this transport."""
507
queue = list(self.list_dir('.'))
509
relpath = queue.pop(0)
510
st = self.stat(relpath)
511
if stat.S_ISDIR(st.st_mode):
512
for i, basename in enumerate(self.list_dir(relpath)):
513
queue.insert(i, relpath+'/'+basename)
517
def _mkdir(self, abspath, mode=None):
523
self._sftp.mkdir(abspath, local_mode)
525
self._sftp.chmod(abspath, mode=mode)
526
except (paramiko.SSHException, IOError), e:
527
self._translate_io_exception(e, abspath, ': unable to mkdir',
528
failure_exc=FileExists)
530
def mkdir(self, relpath, mode=None):
531
"""Create a directory at the given path."""
532
self._mkdir(self._remote_path(relpath), mode=mode)
534
def _translate_io_exception(self, e, path, more_info='',
535
failure_exc=PathError):
536
"""Translate a paramiko or IOError into a friendlier exception.
538
:param e: The original exception
539
:param path: The path in question when the error is raised
540
:param more_info: Extra information that can be included,
541
such as what was going on
542
:param failure_exc: Paramiko has the super fun ability to raise completely
543
opaque errors that just set "e.args = ('Failure',)" with
545
If this parameter is set, it defines the exception
546
to raise in these cases.
548
# paramiko seems to generate detailless errors.
549
self._translate_error(e, path, raise_generic=False)
550
if getattr(e, 'args', None) is not None:
551
if (e.args == ('No such file or directory',) or
552
e.args == ('No such file',)):
553
raise NoSuchFile(path, str(e) + more_info)
554
if (e.args == ('mkdir failed',)):
555
raise FileExists(path, str(e) + more_info)
556
# strange but true, for the paramiko server.
557
if (e.args == ('Failure',)):
558
raise failure_exc(path, str(e) + more_info)
559
mutter('Raising exception with args %s', e.args)
560
if getattr(e, 'errno', None) is not None:
561
mutter('Raising exception with errno %s', e.errno)
564
def append_file(self, relpath, f, mode=None):
566
Append the text in the file-like object into the final
570
path = self._remote_path(relpath)
571
fout = self._sftp.file(path, 'ab')
573
self._sftp.chmod(path, mode)
577
except (IOError, paramiko.SSHException), e:
578
self._translate_io_exception(e, relpath, ': unable to append')
580
def rename(self, rel_from, rel_to):
581
"""Rename without special overwriting"""
583
self._sftp.rename(self._remote_path(rel_from),
584
self._remote_path(rel_to))
585
except (IOError, paramiko.SSHException), e:
586
self._translate_io_exception(e, rel_from,
587
': unable to rename to %r' % (rel_to))
589
def _rename_and_overwrite(self, abs_from, abs_to):
590
"""Do a fancy rename on the remote server.
592
Using the implementation provided by osutils.
595
fancy_rename(abs_from, abs_to,
596
rename_func=self._sftp.rename,
597
unlink_func=self._sftp.remove)
598
except (IOError, paramiko.SSHException), e:
599
self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
601
def move(self, rel_from, rel_to):
602
"""Move the item at rel_from to the location at rel_to"""
603
path_from = self._remote_path(rel_from)
604
path_to = self._remote_path(rel_to)
605
self._rename_and_overwrite(path_from, path_to)
607
def delete(self, relpath):
608
"""Delete the item at relpath"""
609
path = self._remote_path(relpath)
611
self._sftp.remove(path)
612
except (IOError, paramiko.SSHException), e:
613
self._translate_io_exception(e, path, ': unable to delete')
616
"""Return True if this store supports listing."""
619
def list_dir(self, relpath):
621
Return a list of all files at the given location.
623
# does anything actually use this?
625
# This is at least used by copy_tree for remote upgrades.
626
# -- David Allouche 2006-08-11
627
path = self._remote_path(relpath)
629
entries = self._sftp.listdir(path)
630
except (IOError, paramiko.SSHException), e:
631
self._translate_io_exception(e, path, ': failed to list_dir')
632
return [urlutils.escape(entry) for entry in entries]
634
def rmdir(self, relpath):
635
"""See Transport.rmdir."""
636
path = self._remote_path(relpath)
638
return self._sftp.rmdir(path)
639
except (IOError, paramiko.SSHException), e:
640
self._translate_io_exception(e, path, ': failed to rmdir')
642
def stat(self, relpath):
643
"""Return the stat information for a file."""
644
path = self._remote_path(relpath)
646
return self._sftp.stat(path)
647
except (IOError, paramiko.SSHException), e:
648
self._translate_io_exception(e, path, ': unable to stat')
650
def lock_read(self, relpath):
652
Lock the given file for shared (read) access.
653
:return: A lock object, which has an unlock() member function
655
# FIXME: there should be something clever i can do here...
656
class BogusLock(object):
657
def __init__(self, path):
661
return BogusLock(relpath)
663
def lock_write(self, relpath):
665
Lock the given file for exclusive (write) access.
666
WARNING: many transports do not support this, so trying avoid using it
668
:return: A lock object, which has an unlock() member function
670
# This is a little bit bogus, but basically, we create a file
671
# which should not already exist, and if it does, we assume
672
# that there is a lock, and if it doesn't, the we assume
673
# that we have taken the lock.
674
return SFTPLock(relpath, self)
676
def _unparse_url(self, path=None):
679
path = urllib.quote(path)
680
# handle homedir paths
681
if not path.startswith('/'):
683
netloc = urllib.quote(self._host)
684
if self._username is not None:
685
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
686
if self._port is not None:
687
netloc = '%s:%d' % (netloc, self._port)
688
return urlparse.urlunparse(('sftp', netloc, path, '', '', ''))
690
def _split_url(self, url):
691
(scheme, username, password, host, port, path) = split_url(url)
692
assert scheme == 'sftp'
694
# the initial slash should be removed from the path, and treated
695
# as a homedir relative path (the path begins with a double slash
696
# if it is absolute).
697
# see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
698
# RBC 20060118 we are not using this as its too user hostile. instead
699
# we are following lftp and using /~/foo to mean '~/foo'.
700
# handle homedir paths
701
if path.startswith('/~/'):
705
return (username, password, host, port, path)
707
def _parse_url(self, url):
708
(self._username, self._password,
709
self._host, self._port, self._path) = self._split_url(url)
711
def _sftp_connect(self):
712
"""Connect to the remote sftp server.
713
After this, self._sftp should have a valid connection (or
714
we raise an TransportError 'could not connect').
716
TODO: Raise a more reasonable ConnectionFailed exception
718
self._sftp = _sftp_connect(self._host, self._port, self._username,
721
def _sftp_open_exclusive(self, abspath, mode=None):
722
"""Open a remote path exclusively.
724
SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
725
the file already exists. However it does not expose this
726
at the higher level of SFTPClient.open(), so we have to
729
WARNING: This breaks the SFTPClient abstraction, so it
730
could easily break against an updated version of paramiko.
732
:param abspath: The remote absolute path where the file should be opened
733
:param mode: The mode permissions bits for the new file
735
# TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
736
# using the 'x' flag to indicate SFTP_FLAG_EXCL.
737
# However, there is no way to set the permission mode at open
738
# time using the sftp_client.file() functionality.
739
path = self._sftp._adjust_cwd(abspath)
740
# mutter('sftp abspath %s => %s', abspath, path)
741
attr = SFTPAttributes()
744
omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
745
| SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
747
t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
749
raise TransportError('Expected an SFTP handle')
750
handle = msg.get_string()
751
return SFTPFile(self._sftp, handle, 'wb', -1)
752
except (paramiko.SSHException, IOError), e:
753
self._translate_io_exception(e, abspath, ': unable to open',
754
failure_exc=FileExists)
756
def _can_roundtrip_unix_modebits(self):
757
if sys.platform == 'win32':
763
# ------------- server test implementation --------------
766
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
768
STUB_SERVER_KEY = """
769
-----BEGIN RSA PRIVATE KEY-----
770
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
771
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
772
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
773
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
774
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
775
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
776
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
777
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
778
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
779
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
780
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
781
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
782
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
783
-----END RSA PRIVATE KEY-----
787
class SocketListener(threading.Thread):
789
def __init__(self, callback):
790
threading.Thread.__init__(self)
791
self._callback = callback
792
self._socket = socket.socket()
793
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
794
self._socket.bind(('localhost', 0))
795
self._socket.listen(1)
796
self.port = self._socket.getsockname()[1]
797
self._stop_event = threading.Event()
800
# called from outside this thread
801
self._stop_event.set()
802
# use a timeout here, because if the test fails, the server thread may
803
# never notice the stop_event.
809
readable, writable_unused, exception_unused = \
810
select.select([self._socket], [], [], 0.1)
811
if self._stop_event.isSet():
813
if len(readable) == 0:
816
s, addr_unused = self._socket.accept()
817
# because the loopback socket is inline, and transports are
818
# never explicitly closed, best to launch a new thread.
819
threading.Thread(target=self._callback, args=(s,)).start()
820
except socket.error, x:
821
sys.excepthook(*sys.exc_info())
822
warning('Socket error during accept() within unit test server'
825
# probably a failed test; unit test thread will log the
827
sys.excepthook(*sys.exc_info())
828
warning('Exception from within unit test server thread: %r' %
832
class SocketDelay(object):
833
"""A socket decorator to make TCP appear slower.
835
This changes recv, send, and sendall to add a fixed latency to each python
836
call if a new roundtrip is detected. That is, when a recv is called and the
837
flag new_roundtrip is set, latency is charged. Every send and send_all
840
In addition every send, sendall and recv sleeps a bit per character send to
843
Not all methods are implemented, this is deliberate as this class is not a
844
replacement for the builtin sockets layer. fileno is not implemented to
845
prevent the proxy being bypassed.
849
_proxied_arguments = dict.fromkeys([
850
"close", "getpeername", "getsockname", "getsockopt", "gettimeout",
851
"setblocking", "setsockopt", "settimeout", "shutdown"])
853
def __init__(self, sock, latency, bandwidth=1.0,
856
:param bandwith: simulated bandwith (MegaBit)
857
:param really_sleep: If set to false, the SocketDelay will just
858
increase a counter, instead of calling time.sleep. This is useful for
859
unittesting the SocketDelay.
862
self.latency = latency
863
self.really_sleep = really_sleep
864
self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
865
self.new_roundtrip = False
868
if self.really_sleep:
871
SocketDelay.simulated_time += s
873
def __getattr__(self, attr):
874
if attr in SocketDelay._proxied_arguments:
875
return getattr(self.sock, attr)
876
raise AttributeError("'SocketDelay' object has no attribute %r" %
880
return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
883
def recv(self, *args):
884
data = self.sock.recv(*args)
885
if data and self.new_roundtrip:
886
self.new_roundtrip = False
887
self.sleep(self.latency)
888
self.sleep(len(data) * self.time_per_byte)
891
def sendall(self, data, flags=0):
892
if not self.new_roundtrip:
893
self.new_roundtrip = True
894
self.sleep(self.latency)
895
self.sleep(len(data) * self.time_per_byte)
896
return self.sock.sendall(data, flags)
898
def send(self, data, flags=0):
899
if not self.new_roundtrip:
900
self.new_roundtrip = True
901
self.sleep(self.latency)
902
bytes_sent = self.sock.send(data, flags)
903
self.sleep(bytes_sent * self.time_per_byte)
907
class SFTPServer(Server):
908
"""Common code for SFTP server facilities."""
911
self._original_vendor = None
913
self._server_homedir = None
914
self._listener = None
916
self._vendor = ssh.ParamikoVendor()
921
def _get_sftp_url(self, path):
922
"""Calculate an sftp url to this server for path."""
923
return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
925
def log(self, message):
926
"""StubServer uses this to log when a new server is created."""
927
self.logs.append(message)
929
def _run_server_entry(self, sock):
930
"""Entry point for all implementations of _run_server.
932
If self.add_latency is > 0.000001 then sock is given a latency adding
935
if self.add_latency > 0.000001:
936
sock = SocketDelay(sock, self.add_latency)
937
return self._run_server(sock)
939
def _run_server(self, s):
940
ssh_server = paramiko.Transport(s)
941
key_file = pathjoin(self._homedir, 'test_rsa.key')
942
f = open(key_file, 'w')
943
f.write(STUB_SERVER_KEY)
945
host_key = paramiko.RSAKey.from_private_key_file(key_file)
946
ssh_server.add_server_key(host_key)
947
server = StubServer(self)
948
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
949
StubSFTPServer, root=self._root,
950
home=self._server_homedir)
951
event = threading.Event()
952
ssh_server.start_server(event, server)
956
self._original_vendor = ssh._ssh_vendor
957
ssh._ssh_vendor = self._vendor
958
if sys.platform == 'win32':
959
# Win32 needs to use the UNICODE api
960
self._homedir = getcwd()
962
# But Linux SFTP servers should just deal in bytestreams
963
self._homedir = os.getcwd()
964
if self._server_homedir is None:
965
self._server_homedir = self._homedir
967
if sys.platform == 'win32':
969
self._listener = SocketListener(self._run_server_entry)
970
self._listener.setDaemon(True)
971
self._listener.start()
974
"""See bzrlib.transport.Server.tearDown."""
975
self._listener.stop()
976
ssh._ssh_vendor = self._original_vendor
978
def get_bogus_url(self):
979
"""See bzrlib.transport.Server.get_bogus_url."""
980
# this is chosen to try to prevent trouble with proxies, wierd dns, etc
981
# we bind a random socket, so that we get a guaranteed unused port
982
# we just never listen on that port
984
s.bind(('localhost', 0))
985
return 'sftp://%s:%s/' % s.getsockname()
988
class SFTPFullAbsoluteServer(SFTPServer):
989
"""A test server for sftp transports, using absolute urls and ssh."""
992
"""See bzrlib.transport.Server.get_url."""
993
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
996
class SFTPServerWithoutSSH(SFTPServer):
997
"""An SFTP server that uses a simple TCP socket pair rather than SSH."""
1000
super(SFTPServerWithoutSSH, self).__init__()
1001
self._vendor = ssh.LoopbackVendor()
1003
def _run_server(self, sock):
1004
# Re-import these as locals, so that they're still accessible during
1005
# interpreter shutdown (when all module globals get set to None, leading
1006
# to confusing errors like "'NoneType' object has no attribute 'error'".
1007
import socket, errno
1008
class FakeChannel(object):
1009
def get_transport(self):
1011
def get_log_channel(self):
1015
def get_hexdump(self):
1020
server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1021
root=self._root, home=self._server_homedir)
1023
server.start_subsystem('sftp', None, sock)
1024
except socket.error, e:
1025
if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1026
# it's okay for the client to disconnect abruptly
1027
# (bug in paramiko 1.6: it should absorb this exception)
1031
except Exception, e:
1032
import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
1033
server.finish_subsystem()
1036
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1037
"""A test server for sftp transports, using absolute urls."""
1040
"""See bzrlib.transport.Server.get_url."""
1041
if sys.platform == 'win32':
1042
return self._get_sftp_url(urlutils.escape(self._homedir))
1044
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1047
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1048
"""A test server for sftp transports, using homedir relative urls."""
1051
"""See bzrlib.transport.Server.get_url."""
1052
return self._get_sftp_url("~/")
1055
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1056
"""A test servere for sftp transports, using absolute urls to non-home."""
1059
self._server_homedir = '/dev/noone/runs/tests/here'
1060
super(SFTPSiblingAbsoluteServer, self).setUp()
1063
def _sftp_connect(host, port, username, password):
1064
"""Connect to the remote sftp server.
1066
:raises: a TransportError 'could not connect'.
1068
:returns: an paramiko.sftp_client.SFTPClient
1070
TODO: Raise a more reasonable ConnectionFailed exception
1072
idx = (host, port, username)
1074
return _connected_hosts[idx]
1078
sftp = _sftp_connect_uncached(host, port, username, password)
1079
_connected_hosts[idx] = sftp
1082
def _sftp_connect_uncached(host, port, username, password):
1083
vendor = ssh._get_ssh_vendor()
1084
sftp = vendor.connect_sftp(username, password, host, port)
1088
def get_test_permutations():
1089
"""Return the permutations to be used in testing."""
1090
return [(SFTPTransport, SFTPAbsoluteServer),
1091
(SFTPTransport, SFTPHomeDirServer),
1092
(SFTPTransport, SFTPSiblingAbsoluteServer),