1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006 Canonical Ltd
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Implementation of Transport over SFTP, using paramiko."""
20
# TODO: Remove the transport-based lock_read and lock_write methods. They'll
21
# then raise TransportNotPossible, which will break remote access to any
22
# formats which rely on OS-level locks. That should be fine as those formats
23
# are pretty old, but these combinations may have to be removed from the test
39
from bzrlib.errors import (FileExists,
40
NoSuchFile, PathNotChild,
47
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
48
from bzrlib.trace import mutter, warning
49
from bzrlib.transport import (
50
register_urlparse_netloc_protocol,
56
import bzrlib.urlutils as urlutils
60
except ImportError, e:
61
raise ParamikoNotPresent(e)
63
from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
64
SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
66
from paramiko.sftp_attr import SFTPAttributes
67
from paramiko.sftp_file import SFTPFile
70
register_urlparse_netloc_protocol('sftp')
73
# This is a weakref dictionary, so that we can reuse connections
74
# that are still active. Long term, it might be nice to have some
75
# sort of expiration policy, such as disconnect if inactive for
76
# X seconds. But that requires a lot more fanciness.
77
_connected_hosts = weakref.WeakValueDictionary()
80
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
81
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
82
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
85
def clear_connection_cache():
86
"""Remove all hosts from the SFTP connection cache.
88
Primarily useful for test cases wanting to force garbage collection.
90
_connected_hosts.clear()
93
class SFTPLock(object):
94
"""This fakes a lock in a remote location.
96
A present lock is indicated just by the existence of a file. This
97
doesn't work well on all transports and they are only used in
98
deprecated storage formats.
101
__slots__ = ['path', 'lock_path', 'lock_file', 'transport']
103
def __init__(self, path, transport):
104
assert isinstance(transport, SFTPTransport)
106
self.lock_file = None
108
self.lock_path = path + '.write-lock'
109
self.transport = transport
111
# RBC 20060103 FIXME should we be using private methods here ?
112
abspath = transport._remote_path(self.lock_path)
113
self.lock_file = transport._sftp_open_exclusive(abspath)
115
raise LockError('File %r already locked' % (self.path,))
118
"""Should this warn, or actually try to cleanup?"""
120
warning("SFTPLock %r not explicitly unlocked" % (self.path,))
124
if not self.lock_file:
126
self.lock_file.close()
127
self.lock_file = None
129
self.transport.delete(self.lock_path)
130
except (NoSuchFile,):
131
# What specific errors should we catch here?
135
class SFTPTransport(Transport):
136
"""Transport implementation for SFTP access."""
138
_do_prefetch = _default_do_prefetch
139
# TODO: jam 20060717 Conceivably these could be configurable, either
140
# by auto-tuning at run-time, or by a configuration (per host??)
141
# but the performance curve is pretty flat, so just going with
142
# reasonable defaults.
143
_max_readv_combine = 200
144
# Having to round trip to the server means waiting for a response,
145
# so it is better to download extra bytes.
146
# 8KiB had good performance for both local and remote network operations
147
_bytes_to_read_before_seek = 8192
149
# The sftp spec says that implementations SHOULD allow reads
150
# to be at least 32K. paramiko.readv() does an async request
151
# for the chunks. So we need to keep it within a single request
152
# size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
153
# up the request itself, rather than us having to worry about it
154
_max_request_size = 32768
156
def __init__(self, base, clone_from=None):
157
assert base.startswith('sftp://')
158
self._parse_url(base)
159
base = self._unparse_url()
162
super(SFTPTransport, self).__init__(base)
163
if clone_from is None:
166
# use the same ssh connection, etc
167
self._sftp = clone_from._sftp
168
# super saves 'self.base'
170
def should_cache(self):
172
Return True if the data pulled across should be cached locally.
176
def clone(self, offset=None):
178
Return a new SFTPTransport with root at self.base + offset.
179
We share the same SFTP session between such transports, because it's
180
fairly expensive to set them up.
183
return SFTPTransport(self.base, self)
185
return SFTPTransport(self.abspath(offset), self)
187
def abspath(self, relpath):
189
Return the full url to the given relative path.
191
@param relpath: the relative path or path components
192
@type relpath: str or list
194
return self._unparse_url(self._remote_path(relpath))
196
def _remote_path(self, relpath):
197
"""Return the path to be passed along the sftp protocol for relpath.
199
relpath is a urlencoded string.
201
:return: a path prefixed with / for regular abspath-based urls, or a
202
path that does not begin with / for urls which begin with /~/.
204
# FIXME: share the common code across transports
205
assert isinstance(relpath, basestring)
206
basepath = self._path.split('/')
207
if relpath.startswith('/'):
209
relpath = urlutils.unescape(relpath).split('/')
210
if len(basepath) > 0 and basepath[-1] == '':
211
basepath = basepath[:-1]
215
if len(basepath) == 0:
216
# In most filesystems, a request for the parent
217
# of root, just returns root.
225
path = '/'.join(basepath)
226
# mutter('relpath => remotepath %s => %s', relpath, path)
229
def relpath(self, abspath):
230
username, password, host, port, path = self._split_url(abspath)
232
if (username != self._username):
233
error.append('username mismatch')
234
if (host != self._host):
235
error.append('host mismatch')
236
if (port != self._port):
237
error.append('port mismatch')
238
if (not path.startswith(self._path)):
239
error.append('path mismatch')
241
extra = ': ' + ', '.join(error)
242
raise PathNotChild(abspath, self.base, extra=extra)
244
return path[pl:].strip('/')
246
def has(self, relpath):
248
Does the target location exist?
251
self._sftp.stat(self._remote_path(relpath))
256
def get(self, relpath):
258
Get the file at the given relative path.
260
:param relpath: The relative path to the file
263
path = self._remote_path(relpath)
264
f = self._sftp.file(path, mode='rb')
265
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
268
except (IOError, paramiko.SSHException), e:
269
self._translate_io_exception(e, path, ': error retrieving')
271
def readv(self, relpath, offsets):
272
"""See Transport.readv()"""
273
# We overload the default readv() because we want to use a file
274
# that does not have prefetch enabled.
275
# Also, if we have a new paramiko, it implements an async readv()
280
path = self._remote_path(relpath)
281
fp = self._sftp.file(path, mode='rb')
282
readv = getattr(fp, 'readv', None)
284
return self._sftp_readv(fp, offsets)
285
mutter('seek and read %s offsets', len(offsets))
286
return self._seek_and_read(fp, offsets)
287
except (IOError, paramiko.SSHException), e:
288
self._translate_io_exception(e, path, ': error retrieving')
290
def _sftp_readv(self, fp, offsets):
291
"""Use the readv() member of fp to do async readv.
293
And then read them using paramiko.readv(). paramiko.readv()
294
does not support ranges > 64K, so it caps the request size, and
295
just reads until it gets all the stuff it wants
297
offsets = list(offsets)
298
sorted_offsets = sorted(offsets)
300
# The algorithm works as follows:
301
# 1) Coalesce nearby reads into a single chunk
302
# This generates a list of combined regions, the total size
303
# and the size of the sub regions. This coalescing step is limited
304
# in the number of nearby chunks to combine, and is allowed to
305
# skip small breaks in the requests. Limiting it makes sure that
306
# we can start yielding some data earlier, and skipping means we
307
# make fewer requests. (Beneficial even when using async)
308
# 2) Break up this combined regions into chunks that are smaller
309
# than 64KiB. Technically the limit is 65536, but we are a
310
# little bit conservative. This is because sftp has a maximum
311
# return chunk size of 64KiB (max size of an unsigned short)
312
# 3) Issue a readv() to paramiko to create an async request for
314
# 4) Read in the data as it comes back, until we've read one
315
# continuous section as determined in step 1
316
# 5) Break up the full sections into hunks for the original requested
317
# offsets. And put them in a cache
318
# 6) Check if the next request is in the cache, and if it is, remove
319
# it from the cache, and yield its data. Continue until no more
320
# entries are in the cache.
321
# 7) loop back to step 4 until all data has been read
323
# TODO: jam 20060725 This could be optimized one step further, by
324
# attempting to yield whatever data we have read, even before
325
# the first coallesced section has been fully processed.
327
# When coalescing for use with readv(), we don't really need to
328
# use any fudge factor, because the requests are made asynchronously
329
coalesced = list(self._coalesce_offsets(sorted_offsets,
330
limit=self._max_readv_combine,
334
for c_offset in coalesced:
335
start = c_offset.start
336
size = c_offset.length
338
# We need to break this up into multiple requests
340
next_size = min(size, self._max_request_size)
341
requests.append((start, next_size))
345
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
346
len(offsets), len(coalesced), len(requests))
348
# Queue the current read until we have read the full coalesced section
351
cur_coalesced_stack = iter(coalesced)
352
cur_coalesced = cur_coalesced_stack.next()
354
# Cache the results, but only until they have been fulfilled
356
# turn the list of offsets into a stack
357
offset_stack = iter(offsets)
358
cur_offset_and_size = offset_stack.next()
360
for data in fp.readv(requests):
362
cur_data_len += len(data)
364
if cur_data_len < cur_coalesced.length:
366
assert cur_data_len == cur_coalesced.length, \
367
"Somehow we read too much: %s != %s" % (cur_data_len,
368
cur_coalesced.length)
369
all_data = ''.join(cur_data)
373
for suboffset, subsize in cur_coalesced.ranges:
374
key = (cur_coalesced.start+suboffset, subsize)
375
data_map[key] = all_data[suboffset:suboffset+subsize]
377
# Now that we've read some data, see if we can yield anything back
378
while cur_offset_and_size in data_map:
379
this_data = data_map.pop(cur_offset_and_size)
380
yield cur_offset_and_size[0], this_data
381
cur_offset_and_size = offset_stack.next()
383
# Now that we've read all of the data for this coalesced section
385
cur_coalesced = cur_coalesced_stack.next()
387
def put_file(self, relpath, f, mode=None):
389
Copy the file-like object into the location.
391
:param relpath: Location to put the contents, relative to base.
392
:param f: File-like object.
393
:param mode: The final mode for the file
395
final_path = self._remote_path(relpath)
396
self._put(final_path, f, mode=mode)
398
def _put(self, abspath, f, mode=None):
399
"""Helper function so both put() and copy_abspaths can reuse the code"""
400
tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
401
os.getpid(), random.randint(0,0x7FFFFFFF))
402
fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
406
fout.set_pipelined(True)
408
except (IOError, paramiko.SSHException), e:
409
self._translate_io_exception(e, tmp_abspath)
410
# XXX: This doesn't truly help like we would like it to.
411
# The problem is that openssh strips sticky bits. So while we
412
# can properly set group write permission, we lose the group
413
# sticky bit. So it is probably best to stop chmodding, and
414
# just tell users that they need to set the umask correctly.
415
# The attr.st_mode = mode, in _sftp_open_exclusive
416
# will handle when the user wants the final mode to be more
417
# restrictive. And then we avoid a round trip. Unless
418
# paramiko decides to expose an async chmod()
420
# This is designed to chmod() right before we close.
421
# Because we set_pipelined() earlier, theoretically we might
422
# avoid the round trip for fout.close()
424
self._sftp.chmod(tmp_abspath, mode)
427
self._rename_and_overwrite(tmp_abspath, abspath)
429
# If we fail, try to clean up the temporary file
430
# before we throw the exception
431
# but don't let another exception mess things up
432
# Write out the traceback, because otherwise
433
# the catch and throw destroys it
435
mutter(traceback.format_exc())
439
self._sftp.remove(tmp_abspath)
441
# raise the saved except
443
# raise the original with its traceback if we can.
446
def _put_non_atomic_helper(self, relpath, writer, mode=None,
447
create_parent_dir=False,
449
abspath = self._remote_path(relpath)
451
# TODO: jam 20060816 paramiko doesn't publicly expose a way to
452
# set the file mode at create time. If it does, use it.
453
# But for now, we just chmod later anyway.
455
def _open_and_write_file():
456
"""Try to open the target file, raise error on failure"""
460
fout = self._sftp.file(abspath, mode='wb')
461
fout.set_pipelined(True)
463
except (paramiko.SSHException, IOError), e:
464
self._translate_io_exception(e, abspath,
467
# This is designed to chmod() right before we close.
468
# Because we set_pipelined() earlier, theoretically we might
469
# avoid the round trip for fout.close()
471
self._sftp.chmod(abspath, mode)
476
if not create_parent_dir:
477
_open_and_write_file()
480
# Try error handling to create the parent directory if we need to
482
_open_and_write_file()
484
# Try to create the parent directory, and then go back to
486
parent_dir = os.path.dirname(abspath)
487
self._mkdir(parent_dir, dir_mode)
488
_open_and_write_file()
490
def put_file_non_atomic(self, relpath, f, mode=None,
491
create_parent_dir=False,
493
"""Copy the file-like object into the target location.
495
This function is not strictly safe to use. It is only meant to
496
be used when you already know that the target does not exist.
497
It is not safe, because it will open and truncate the remote
498
file. So there may be a time when the file has invalid contents.
500
:param relpath: The remote location to put the contents.
501
:param f: File-like object.
502
:param mode: Possible access permissions for new file.
503
None means do not set remote permissions.
504
:param create_parent_dir: If we cannot create the target file because
505
the parent directory does not exist, go ahead and
506
create it, and then try again.
510
self._put_non_atomic_helper(relpath, writer, mode=mode,
511
create_parent_dir=create_parent_dir,
514
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
515
create_parent_dir=False,
519
self._put_non_atomic_helper(relpath, writer, mode=mode,
520
create_parent_dir=create_parent_dir,
523
def iter_files_recursive(self):
524
"""Walk the relative paths of all files in this transport."""
525
queue = list(self.list_dir('.'))
527
relpath = queue.pop(0)
528
st = self.stat(relpath)
529
if stat.S_ISDIR(st.st_mode):
530
for i, basename in enumerate(self.list_dir(relpath)):
531
queue.insert(i, relpath+'/'+basename)
535
def _mkdir(self, abspath, mode=None):
541
self._sftp.mkdir(abspath, local_mode)
543
self._sftp.chmod(abspath, mode=mode)
544
except (paramiko.SSHException, IOError), e:
545
self._translate_io_exception(e, abspath, ': unable to mkdir',
546
failure_exc=FileExists)
548
def mkdir(self, relpath, mode=None):
549
"""Create a directory at the given path."""
550
self._mkdir(self._remote_path(relpath), mode=mode)
552
def _translate_io_exception(self, e, path, more_info='',
553
failure_exc=PathError):
554
"""Translate a paramiko or IOError into a friendlier exception.
556
:param e: The original exception
557
:param path: The path in question when the error is raised
558
:param more_info: Extra information that can be included,
559
such as what was going on
560
:param failure_exc: Paramiko has the super fun ability to raise completely
561
opaque errors that just set "e.args = ('Failure',)" with
563
If this parameter is set, it defines the exception
564
to raise in these cases.
566
# paramiko seems to generate detailless errors.
567
self._translate_error(e, path, raise_generic=False)
568
if getattr(e, 'args', None) is not None:
569
if (e.args == ('No such file or directory',) or
570
e.args == ('No such file',)):
571
raise NoSuchFile(path, str(e) + more_info)
572
if (e.args == ('mkdir failed',)):
573
raise FileExists(path, str(e) + more_info)
574
# strange but true, for the paramiko server.
575
if (e.args == ('Failure',)):
576
raise failure_exc(path, str(e) + more_info)
577
mutter('Raising exception with args %s', e.args)
578
if getattr(e, 'errno', None) is not None:
579
mutter('Raising exception with errno %s', e.errno)
582
def append_file(self, relpath, f, mode=None):
584
Append the text in the file-like object into the final
588
path = self._remote_path(relpath)
589
fout = self._sftp.file(path, 'ab')
591
self._sftp.chmod(path, mode)
595
except (IOError, paramiko.SSHException), e:
596
self._translate_io_exception(e, relpath, ': unable to append')
598
def rename(self, rel_from, rel_to):
599
"""Rename without special overwriting"""
601
self._sftp.rename(self._remote_path(rel_from),
602
self._remote_path(rel_to))
603
except (IOError, paramiko.SSHException), e:
604
self._translate_io_exception(e, rel_from,
605
': unable to rename to %r' % (rel_to))
607
def _rename_and_overwrite(self, abs_from, abs_to):
608
"""Do a fancy rename on the remote server.
610
Using the implementation provided by osutils.
613
fancy_rename(abs_from, abs_to,
614
rename_func=self._sftp.rename,
615
unlink_func=self._sftp.remove)
616
except (IOError, paramiko.SSHException), e:
617
self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
619
def move(self, rel_from, rel_to):
620
"""Move the item at rel_from to the location at rel_to"""
621
path_from = self._remote_path(rel_from)
622
path_to = self._remote_path(rel_to)
623
self._rename_and_overwrite(path_from, path_to)
625
def delete(self, relpath):
626
"""Delete the item at relpath"""
627
path = self._remote_path(relpath)
629
self._sftp.remove(path)
630
except (IOError, paramiko.SSHException), e:
631
self._translate_io_exception(e, path, ': unable to delete')
634
"""Return True if this store supports listing."""
637
def list_dir(self, relpath):
639
Return a list of all files at the given location.
641
# does anything actually use this?
643
# This is at least used by copy_tree for remote upgrades.
644
# -- David Allouche 2006-08-11
645
path = self._remote_path(relpath)
647
entries = self._sftp.listdir(path)
648
except (IOError, paramiko.SSHException), e:
649
self._translate_io_exception(e, path, ': failed to list_dir')
650
return [urlutils.escape(entry) for entry in entries]
652
def rmdir(self, relpath):
653
"""See Transport.rmdir."""
654
path = self._remote_path(relpath)
656
return self._sftp.rmdir(path)
657
except (IOError, paramiko.SSHException), e:
658
self._translate_io_exception(e, path, ': failed to rmdir')
660
def stat(self, relpath):
661
"""Return the stat information for a file."""
662
path = self._remote_path(relpath)
664
return self._sftp.stat(path)
665
except (IOError, paramiko.SSHException), e:
666
self._translate_io_exception(e, path, ': unable to stat')
668
def lock_read(self, relpath):
670
Lock the given file for shared (read) access.
671
:return: A lock object, which has an unlock() member function
673
# FIXME: there should be something clever i can do here...
674
class BogusLock(object):
675
def __init__(self, path):
679
return BogusLock(relpath)
681
def lock_write(self, relpath):
683
Lock the given file for exclusive (write) access.
684
WARNING: many transports do not support this, so trying avoid using it
686
:return: A lock object, which has an unlock() member function
688
# This is a little bit bogus, but basically, we create a file
689
# which should not already exist, and if it does, we assume
690
# that there is a lock, and if it doesn't, the we assume
691
# that we have taken the lock.
692
return SFTPLock(relpath, self)
694
def _unparse_url(self, path=None):
695
"""Gives a url for a relative reference."""
698
path = urllib.quote(path)
699
# handle homedir paths
700
if not path.startswith('/'):
702
netloc = urllib.quote(self._host)
703
if self._username is not None:
704
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
705
if self._port is not None:
706
netloc = '%s:%d' % (netloc, self._port)
707
return urlparse.urlunparse(('sftp', netloc, path, '', '', ''))
709
def _split_url(self, url):
710
(scheme, username, password, host, port, path) = split_url(url)
711
assert scheme == 'sftp'
713
# the initial slash should be removed from the path, and treated
714
# as a homedir relative path (the path begins with a double slash
715
# if it is absolute).
716
# see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
717
# RBC 20060118 we are not using this as its too user hostile. instead
718
# we are following lftp and using /~/foo to mean '~/foo'.
719
# handle homedir paths
720
if path.startswith('/~/'):
724
return (username, password, host, port, path)
726
def _parse_url(self, url):
727
(self._username, self._password,
728
self._host, self._port, self._path) = self._split_url(url)
730
def _sftp_connect(self):
731
"""Connect to the remote sftp server.
732
After this, self._sftp should have a valid connection (or
733
we raise an TransportError 'could not connect').
735
TODO: Raise a more reasonable ConnectionFailed exception
737
self._sftp = _sftp_connect(self._host, self._port, self._username,
740
def _sftp_open_exclusive(self, abspath, mode=None):
741
"""Open a remote path exclusively.
743
SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
744
the file already exists. However it does not expose this
745
at the higher level of SFTPClient.open(), so we have to
748
WARNING: This breaks the SFTPClient abstraction, so it
749
could easily break against an updated version of paramiko.
751
:param abspath: The remote absolute path where the file should be opened
752
:param mode: The mode permissions bits for the new file
754
# TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
755
# using the 'x' flag to indicate SFTP_FLAG_EXCL.
756
# However, there is no way to set the permission mode at open
757
# time using the sftp_client.file() functionality.
758
path = self._sftp._adjust_cwd(abspath)
759
# mutter('sftp abspath %s => %s', abspath, path)
760
attr = SFTPAttributes()
763
omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
764
| SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
766
t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
768
raise TransportError('Expected an SFTP handle')
769
handle = msg.get_string()
770
return SFTPFile(self._sftp, handle, 'wb', -1)
771
except (paramiko.SSHException, IOError), e:
772
self._translate_io_exception(e, abspath, ': unable to open',
773
failure_exc=FileExists)
775
def _can_roundtrip_unix_modebits(self):
776
if sys.platform == 'win32':
782
# ------------- server test implementation --------------
785
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
787
STUB_SERVER_KEY = """
788
-----BEGIN RSA PRIVATE KEY-----
789
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
790
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
791
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
792
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
793
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
794
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
795
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
796
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
797
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
798
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
799
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
800
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
801
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
802
-----END RSA PRIVATE KEY-----
806
class SocketListener(threading.Thread):
808
def __init__(self, callback):
809
threading.Thread.__init__(self)
810
self._callback = callback
811
self._socket = socket.socket()
812
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
813
self._socket.bind(('localhost', 0))
814
self._socket.listen(1)
815
self.port = self._socket.getsockname()[1]
816
self._stop_event = threading.Event()
819
# called from outside this thread
820
self._stop_event.set()
821
# use a timeout here, because if the test fails, the server thread may
822
# never notice the stop_event.
828
readable, writable_unused, exception_unused = \
829
select.select([self._socket], [], [], 0.1)
830
if self._stop_event.isSet():
832
if len(readable) == 0:
835
s, addr_unused = self._socket.accept()
836
# because the loopback socket is inline, and transports are
837
# never explicitly closed, best to launch a new thread.
838
threading.Thread(target=self._callback, args=(s,)).start()
839
except socket.error, x:
840
sys.excepthook(*sys.exc_info())
841
warning('Socket error during accept() within unit test server'
844
# probably a failed test; unit test thread will log the
846
sys.excepthook(*sys.exc_info())
847
warning('Exception from within unit test server thread: %r' %
851
class SocketDelay(object):
852
"""A socket decorator to make TCP appear slower.
854
This changes recv, send, and sendall to add a fixed latency to each python
855
call if a new roundtrip is detected. That is, when a recv is called and the
856
flag new_roundtrip is set, latency is charged. Every send and send_all
859
In addition every send, sendall and recv sleeps a bit per character send to
862
Not all methods are implemented, this is deliberate as this class is not a
863
replacement for the builtin sockets layer. fileno is not implemented to
864
prevent the proxy being bypassed.
868
_proxied_arguments = dict.fromkeys([
869
"close", "getpeername", "getsockname", "getsockopt", "gettimeout",
870
"setblocking", "setsockopt", "settimeout", "shutdown"])
872
def __init__(self, sock, latency, bandwidth=1.0,
875
:param bandwith: simulated bandwith (MegaBit)
876
:param really_sleep: If set to false, the SocketDelay will just
877
increase a counter, instead of calling time.sleep. This is useful for
878
unittesting the SocketDelay.
881
self.latency = latency
882
self.really_sleep = really_sleep
883
self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
884
self.new_roundtrip = False
887
if self.really_sleep:
890
SocketDelay.simulated_time += s
892
def __getattr__(self, attr):
893
if attr in SocketDelay._proxied_arguments:
894
return getattr(self.sock, attr)
895
raise AttributeError("'SocketDelay' object has no attribute %r" %
899
return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
902
def recv(self, *args):
903
data = self.sock.recv(*args)
904
if data and self.new_roundtrip:
905
self.new_roundtrip = False
906
self.sleep(self.latency)
907
self.sleep(len(data) * self.time_per_byte)
910
def sendall(self, data, flags=0):
911
if not self.new_roundtrip:
912
self.new_roundtrip = True
913
self.sleep(self.latency)
914
self.sleep(len(data) * self.time_per_byte)
915
return self.sock.sendall(data, flags)
917
def send(self, data, flags=0):
918
if not self.new_roundtrip:
919
self.new_roundtrip = True
920
self.sleep(self.latency)
921
bytes_sent = self.sock.send(data, flags)
922
self.sleep(bytes_sent * self.time_per_byte)
926
class SFTPServer(Server):
927
"""Common code for SFTP server facilities."""
930
self._original_vendor = None
932
self._server_homedir = None
933
self._listener = None
935
self._vendor = ssh.ParamikoVendor()
940
def _get_sftp_url(self, path):
941
"""Calculate an sftp url to this server for path."""
942
return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
944
def log(self, message):
945
"""StubServer uses this to log when a new server is created."""
946
self.logs.append(message)
948
def _run_server_entry(self, sock):
949
"""Entry point for all implementations of _run_server.
951
If self.add_latency is > 0.000001 then sock is given a latency adding
954
if self.add_latency > 0.000001:
955
sock = SocketDelay(sock, self.add_latency)
956
return self._run_server(sock)
958
def _run_server(self, s):
959
ssh_server = paramiko.Transport(s)
960
key_file = pathjoin(self._homedir, 'test_rsa.key')
961
f = open(key_file, 'w')
962
f.write(STUB_SERVER_KEY)
964
host_key = paramiko.RSAKey.from_private_key_file(key_file)
965
ssh_server.add_server_key(host_key)
966
server = StubServer(self)
967
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
968
StubSFTPServer, root=self._root,
969
home=self._server_homedir)
970
event = threading.Event()
971
ssh_server.start_server(event, server)
975
self._original_vendor = ssh._ssh_vendor
976
ssh._ssh_vendor = self._vendor
977
if sys.platform == 'win32':
978
# Win32 needs to use the UNICODE api
979
self._homedir = getcwd()
981
# But Linux SFTP servers should just deal in bytestreams
982
self._homedir = os.getcwd()
983
if self._server_homedir is None:
984
self._server_homedir = self._homedir
986
if sys.platform == 'win32':
988
self._listener = SocketListener(self._run_server_entry)
989
self._listener.setDaemon(True)
990
self._listener.start()
993
"""See bzrlib.transport.Server.tearDown."""
994
self._listener.stop()
995
ssh._ssh_vendor = self._original_vendor
997
def get_bogus_url(self):
998
"""See bzrlib.transport.Server.get_bogus_url."""
999
# this is chosen to try to prevent trouble with proxies, wierd dns, etc
1000
# we bind a random socket, so that we get a guaranteed unused port
1001
# we just never listen on that port
1003
s.bind(('localhost', 0))
1004
return 'sftp://%s:%s/' % s.getsockname()
1007
class SFTPFullAbsoluteServer(SFTPServer):
1008
"""A test server for sftp transports, using absolute urls and ssh."""
1011
"""See bzrlib.transport.Server.get_url."""
1012
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1015
class SFTPServerWithoutSSH(SFTPServer):
1016
"""An SFTP server that uses a simple TCP socket pair rather than SSH."""
1019
super(SFTPServerWithoutSSH, self).__init__()
1020
self._vendor = ssh.LoopbackVendor()
1022
def _run_server(self, sock):
1023
# Re-import these as locals, so that they're still accessible during
1024
# interpreter shutdown (when all module globals get set to None, leading
1025
# to confusing errors like "'NoneType' object has no attribute 'error'".
1026
import socket, errno
1027
class FakeChannel(object):
1028
def get_transport(self):
1030
def get_log_channel(self):
1034
def get_hexdump(self):
1039
server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1040
root=self._root, home=self._server_homedir)
1042
server.start_subsystem('sftp', None, sock)
1043
except socket.error, e:
1044
if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1045
# it's okay for the client to disconnect abruptly
1046
# (bug in paramiko 1.6: it should absorb this exception)
1050
except Exception, e:
1051
import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
1052
server.finish_subsystem()
1055
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1056
"""A test server for sftp transports, using absolute urls."""
1059
"""See bzrlib.transport.Server.get_url."""
1060
if sys.platform == 'win32':
1061
return self._get_sftp_url(urlutils.escape(self._homedir))
1063
return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1066
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1067
"""A test server for sftp transports, using homedir relative urls."""
1070
"""See bzrlib.transport.Server.get_url."""
1071
return self._get_sftp_url("~/")
1074
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1075
"""A test servere for sftp transports, using absolute urls to non-home."""
1078
self._server_homedir = '/dev/noone/runs/tests/here'
1079
super(SFTPSiblingAbsoluteServer, self).setUp()
1082
def _sftp_connect(host, port, username, password):
1083
"""Connect to the remote sftp server.
1085
:raises: a TransportError 'could not connect'.
1087
:returns: an paramiko.sftp_client.SFTPClient
1089
TODO: Raise a more reasonable ConnectionFailed exception
1091
idx = (host, port, username)
1093
return _connected_hosts[idx]
1097
sftp = _sftp_connect_uncached(host, port, username, password)
1098
_connected_hosts[idx] = sftp
1101
def _sftp_connect_uncached(host, port, username, password):
1102
vendor = ssh._get_ssh_vendor()
1103
sftp = vendor.connect_sftp(username, password, host, port)
1107
def get_test_permutations():
1108
"""Return the permutations to be used in testing."""
1109
return [(SFTPTransport, SFTPAbsoluteServer),
1110
(SFTPTransport, SFTPHomeDirServer),
1111
(SFTPTransport, SFTPSiblingAbsoluteServer),