1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Implementation of Transport over SFTP, using paramiko."""
20
# TODO: Remove the transport-based lock_read and lock_write methods. They'll
21
# then raise TransportNotPossible, which will break remote access to any
22
# formats which rely on OS-level locks. That should be fine as those formats
23
# are pretty old, but these combinations may have to be removed from the test
24
# suite. Those formats all date back to 0.7; so we should be able to remove
25
# these methods when we officially drop support for those formats.
45
from bzrlib.errors import (FileExists,
46
NoSuchFile, PathNotChild,
52
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
53
from bzrlib.symbol_versioning import (
56
from bzrlib.trace import mutter, warning
57
from bzrlib.transport import (
66
# Disable one particular warning that comes from paramiko in Python2.5; if
67
# this is emitted at the wrong time it tends to cause spurious test failures
68
# or at least noise in the test case::
70
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
71
# test_permissions.TestSftpPermissions.test_new_files
72
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
73
# self.packet.write(struct.pack('>I', n))
74
warnings.filterwarnings('ignore',
75
'integer argument expected, got float',
76
category=DeprecationWarning,
77
module='paramiko.message')
81
except ImportError, e:
82
raise ParamikoNotPresent(e)
84
from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
85
SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
87
from paramiko.sftp_attr import SFTPAttributes
88
from paramiko.sftp_file import SFTPFile
91
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
92
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
93
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
96
class SFTPLock(object):
97
"""This fakes a lock in a remote location.
99
A present lock is indicated just by the existence of a file. This
100
doesn't work well on all transports and they are only used in
101
deprecated storage formats.
104
__slots__ = ['path', 'lock_path', 'lock_file', 'transport']
106
def __init__(self, path, transport):
107
self.lock_file = None
109
self.lock_path = path + '.write-lock'
110
self.transport = transport
112
# RBC 20060103 FIXME should we be using private methods here ?
113
abspath = transport._remote_path(self.lock_path)
114
self.lock_file = transport._sftp_open_exclusive(abspath)
116
raise LockError('File %r already locked' % (self.path,))
119
"""Should this warn, or actually try to cleanup?"""
121
warning("SFTPLock %r not explicitly unlocked" % (self.path,))
125
if not self.lock_file:
127
self.lock_file.close()
128
self.lock_file = None
130
self.transport.delete(self.lock_path)
131
except (NoSuchFile,):
132
# What specific errors should we catch here?
136
class _SFTPReadvHelper(object):
137
"""A class to help with managing the state of a readv request."""
139
# See _get_requests for an explanation.
140
_max_request_size = 32768
142
def __init__(self, original_offsets, relpath):
143
"""Create a new readv helper.
145
:param original_offsets: The original requests given by the caller of
147
:param relpath: The name of the file (if known)
149
self.original_offsets = list(original_offsets)
150
self.relpath = relpath
152
def _get_requests(self):
153
"""Break up the offsets into individual requests over sftp.
155
The SFTP spec only requires implementers to support 32kB requests. We
156
could try something larger (openssh supports 64kB), but then we have to
157
handle requests that fail.
158
So instead, we just break up our maximum chunks into 32kB chunks, and
159
asyncronously requests them.
160
Newer versions of paramiko would do the chunking for us, but we want to
161
start processing results right away, so we do it ourselves.
163
# TODO: Because we issue async requests, we don't 'fudge' any extra
164
# data. I'm not 100% sure that is the best choice.
166
# The first thing we do, is to collapse the individual requests as much
167
# as possible, so we don't issues requests <32kB
168
sorted_offsets = sorted(self.original_offsets)
169
coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
170
limit=0, fudge_factor=0))
172
for c_offset in coalesced:
173
start = c_offset.start
174
size = c_offset.length
176
# Break this up into 32kB requests
178
next_size = min(size, self._max_request_size)
179
requests.append((start, next_size))
182
mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
183
self.relpath, len(sorted_offsets), len(coalesced),
187
def request_and_yield_offsets(self, fp):
188
"""Request the data from the remote machine, yielding the results.
190
:param fp: A Paramiko SFTPFile object that supports readv.
191
:return: Yield the data requested by the original readv caller, one by
194
requests = self._get_requests()
195
offset_iter = iter(self.original_offsets)
196
cur_offset, cur_size = offset_iter.next()
197
# paramiko .readv() yields strings that are in the order of the requests
198
# So we track the current request to know where the next data is
199
# being returned from.
205
# This is used to buffer chunks which we couldn't process yet
206
# It is (start, end, data) tuples.
208
# Create an 'unlimited' data stream, so we stop based on requests,
209
# rather than just because the data stream ended. This lets us detect
211
data_stream = itertools.chain(fp.readv(requests),
212
itertools.repeat(None))
213
for (start, length), data in itertools.izip(requests, data_stream):
215
if cur_coalesced is not None:
216
raise errors.ShortReadvError(self.relpath,
217
start, length, len(data))
218
if len(data) != length:
219
raise errors.ShortReadvError(self.relpath,
220
start, length, len(data))
222
# This is the first request, just buffer it
223
buffered_data = [data]
224
buffered_len = length
226
elif start == last_end:
227
# The data we are reading fits neatly on the previous
228
# buffer, so this is all part of a larger coalesced range.
229
buffered_data.append(data)
230
buffered_len += length
232
# We have an 'interrupt' in the data stream. So we know we are
233
# at a request boundary.
235
# We haven't consumed the buffer so far, so put it into
236
# data_chunks, and continue.
237
buffered = ''.join(buffered_data)
238
data_chunks.append((input_start, buffered))
240
buffered_data = [data]
241
buffered_len = length
242
last_end = start + length
243
if input_start == cur_offset and cur_size <= buffered_len:
244
# Simplify the next steps a bit by transforming buffered_data
245
# into a single string. We also have the nice property that
246
# when there is only one string ''.join([x]) == x, so there is
248
buffered = ''.join(buffered_data)
249
# Clean out buffered data so that we keep memory
253
# TODO: We *could* also consider the case where cur_offset is in
254
# in the buffered range, even though it doesn't *start*
255
# the buffered range. But for packs we pretty much always
256
# read in order, so you won't get any extra data in the
258
while (input_start == cur_offset
259
and (buffered_offset + cur_size) <= buffered_len):
260
# We've buffered enough data to process this request, spit it
262
cur_data = buffered[buffered_offset:buffered_offset + cur_size]
263
# move the direct pointer into our buffered data
264
buffered_offset += cur_size
265
# Move the start-of-buffer pointer
266
input_start += cur_size
267
# Yield the requested data
268
yield cur_offset, cur_data
269
cur_offset, cur_size = offset_iter.next()
270
# at this point, we've consumed as much of buffered as we can,
271
# so break off the portion that we consumed
272
if buffered_offset == len(buffered_data):
273
# No tail to leave behind
277
buffered = buffered[buffered_offset:]
278
buffered_data = [buffered]
279
buffered_len = len(buffered)
281
buffered = ''.join(buffered_data)
283
data_chunks.append((input_start, buffered))
285
mutter('SFTP readv left with %d out-of-order bytes',
286
sum(map(lambda x: len(x[1]), data_chunks)))
287
# We've processed all the readv data, at this point, anything we
288
# couldn't process is in data_chunks. This doesn't happen often, so
289
# this code path isn't optimized
290
# We use an interesting process for data_chunks
291
# Specifically if we have "bisect_left([(start, len, entries)],
293
# If start == qstart, then we get the specific node. Otherwise we
294
# get the previous node
296
idx = bisect.bisect_left(data_chunks, (cur_offset,))
297
if data_chunks[idx][0] == cur_offset: # The data starts here
298
data = data_chunks[idx][1][:cur_size]
300
# The data is in a portion of a previous page
302
sub_offset = cur_offset - data_chunks[idx][0]
303
data = data_chunks[idx][1]
304
data = data[sub_offset:sub_offset + cur_size]
306
# We are missing the page where the data should be found,
309
if len(data) != cur_size:
310
raise AssertionError('We must have miscalulated.'
311
' We expected %d bytes, but only found %d'
312
% (cur_size, len(data)))
313
yield cur_offset, data
314
cur_offset, cur_size = offset_iter.next()
317
class SFTPTransport(ConnectedTransport):
318
"""Transport implementation for SFTP access."""
320
_do_prefetch = _default_do_prefetch
321
# TODO: jam 20060717 Conceivably these could be configurable, either
322
# by auto-tuning at run-time, or by a configuration (per host??)
323
# but the performance curve is pretty flat, so just going with
324
# reasonable defaults.
325
_max_readv_combine = 200
326
# Having to round trip to the server means waiting for a response,
327
# so it is better to download extra bytes.
328
# 8KiB had good performance for both local and remote network operations
329
_bytes_to_read_before_seek = 8192
331
# The sftp spec says that implementations SHOULD allow reads
332
# to be at least 32K. paramiko.readv() does an async request
333
# for the chunks. So we need to keep it within a single request
334
# size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
335
# up the request itself, rather than us having to worry about it
336
_max_request_size = 32768
338
def __init__(self, base, _from_transport=None):
339
super(SFTPTransport, self).__init__(base,
340
_from_transport=_from_transport)
342
def _remote_path(self, relpath):
343
"""Return the path to be passed along the sftp protocol for relpath.
345
:param relpath: is a urlencoded string.
347
relative = urlutils.unescape(relpath).encode('utf-8')
348
remote_path = self._combine_paths(self._path, relative)
349
# the initial slash should be removed from the path, and treated as a
350
# homedir relative path (the path begins with a double slash if it is
351
# absolute). see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
352
# RBC 20060118 we are not using this as its too user hostile. instead
353
# we are following lftp and using /~/foo to mean '~/foo'
354
# vila--20070602 and leave absolute paths begin with a single slash.
355
if remote_path.startswith('/~/'):
356
remote_path = remote_path[3:]
357
elif remote_path == '/~':
361
def _create_connection(self, credentials=None):
362
"""Create a new connection with the provided credentials.
364
:param credentials: The credentials needed to establish the connection.
366
:return: The created connection and its associated credentials.
368
The credentials are only the password as it may have been entered
369
interactively by the user and may be different from the one provided
370
in base url at transport creation time.
372
if credentials is None:
373
password = self._password
375
password = credentials
377
vendor = ssh._get_ssh_vendor()
378
connection = vendor.connect_sftp(self._user, password,
379
self._host, self._port)
380
return connection, password
383
"""Ensures that a connection is established"""
384
connection = self._get_connection()
385
if connection is None:
386
# First connection ever
387
connection, credentials = self._create_connection()
388
self._set_connection(connection, credentials)
391
def has(self, relpath):
393
Does the target location exist?
396
self._get_sftp().stat(self._remote_path(relpath))
401
def get(self, relpath):
403
Get the file at the given relative path.
405
:param relpath: The relative path to the file
408
path = self._remote_path(relpath)
409
f = self._get_sftp().file(path, mode='rb')
410
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
413
except (IOError, paramiko.SSHException), e:
414
self._translate_io_exception(e, path, ': error retrieving',
415
failure_exc=errors.ReadError)
417
def _readv(self, relpath, offsets):
418
"""See Transport.readv()"""
419
# We overload the default readv() because we want to use a file
420
# that does not have prefetch enabled.
421
# Also, if we have a new paramiko, it implements an async readv()
426
path = self._remote_path(relpath)
427
fp = self._get_sftp().file(path, mode='rb')
428
readv = getattr(fp, 'readv', None)
430
return self._sftp_readv(fp, offsets, relpath)
431
mutter('seek and read %s offsets', len(offsets))
432
return self._seek_and_read(fp, offsets, relpath)
433
except (IOError, paramiko.SSHException), e:
434
self._translate_io_exception(e, path, ': error retrieving')
436
def recommended_page_size(self):
437
"""See Transport.recommended_page_size().
439
For SFTP we suggest a large page size to reduce the overhead
440
introduced by latency.
444
def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
445
"""Use the readv() member of fp to do async readv.
447
And then read them using paramiko.readv(). paramiko.readv()
448
does not support ranges > 64K, so it caps the request size, and
449
just reads until it gets all the stuff it wants
451
helper = _SFTPReadvHelper(offsets, relpath)
452
return helper.request_and_yield_offsets(fp)
454
def put_file(self, relpath, f, mode=None):
456
Copy the file-like object into the location.
458
:param relpath: Location to put the contents, relative to base.
459
:param f: File-like object.
460
:param mode: The final mode for the file
462
final_path = self._remote_path(relpath)
463
return self._put(final_path, f, mode=mode)
465
def _put(self, abspath, f, mode=None):
466
"""Helper function so both put() and copy_abspaths can reuse the code"""
467
tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
468
os.getpid(), random.randint(0,0x7FFFFFFF))
469
fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
473
fout.set_pipelined(True)
474
length = self._pump(f, fout)
475
except (IOError, paramiko.SSHException), e:
476
self._translate_io_exception(e, tmp_abspath)
477
# XXX: This doesn't truly help like we would like it to.
478
# The problem is that openssh strips sticky bits. So while we
479
# can properly set group write permission, we lose the group
480
# sticky bit. So it is probably best to stop chmodding, and
481
# just tell users that they need to set the umask correctly.
482
# The attr.st_mode = mode, in _sftp_open_exclusive
483
# will handle when the user wants the final mode to be more
484
# restrictive. And then we avoid a round trip. Unless
485
# paramiko decides to expose an async chmod()
487
# This is designed to chmod() right before we close.
488
# Because we set_pipelined() earlier, theoretically we might
489
# avoid the round trip for fout.close()
491
self._get_sftp().chmod(tmp_abspath, mode)
494
self._rename_and_overwrite(tmp_abspath, abspath)
497
# If we fail, try to clean up the temporary file
498
# before we throw the exception
499
# but don't let another exception mess things up
500
# Write out the traceback, because otherwise
501
# the catch and throw destroys it
503
mutter(traceback.format_exc())
507
self._get_sftp().remove(tmp_abspath)
509
# raise the saved except
511
# raise the original with its traceback if we can.
514
def _put_non_atomic_helper(self, relpath, writer, mode=None,
515
create_parent_dir=False,
517
abspath = self._remote_path(relpath)
519
# TODO: jam 20060816 paramiko doesn't publicly expose a way to
520
# set the file mode at create time. If it does, use it.
521
# But for now, we just chmod later anyway.
523
def _open_and_write_file():
524
"""Try to open the target file, raise error on failure"""
528
fout = self._get_sftp().file(abspath, mode='wb')
529
fout.set_pipelined(True)
531
except (paramiko.SSHException, IOError), e:
532
self._translate_io_exception(e, abspath,
535
# This is designed to chmod() right before we close.
536
# Because we set_pipelined() earlier, theoretically we might
537
# avoid the round trip for fout.close()
539
self._get_sftp().chmod(abspath, mode)
544
if not create_parent_dir:
545
_open_and_write_file()
548
# Try error handling to create the parent directory if we need to
550
_open_and_write_file()
552
# Try to create the parent directory, and then go back to
554
parent_dir = os.path.dirname(abspath)
555
self._mkdir(parent_dir, dir_mode)
556
_open_and_write_file()
558
def put_file_non_atomic(self, relpath, f, mode=None,
559
create_parent_dir=False,
561
"""Copy the file-like object into the target location.
563
This function is not strictly safe to use. It is only meant to
564
be used when you already know that the target does not exist.
565
It is not safe, because it will open and truncate the remote
566
file. So there may be a time when the file has invalid contents.
568
:param relpath: The remote location to put the contents.
569
:param f: File-like object.
570
:param mode: Possible access permissions for new file.
571
None means do not set remote permissions.
572
:param create_parent_dir: If we cannot create the target file because
573
the parent directory does not exist, go ahead and
574
create it, and then try again.
578
self._put_non_atomic_helper(relpath, writer, mode=mode,
579
create_parent_dir=create_parent_dir,
582
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
583
create_parent_dir=False,
587
self._put_non_atomic_helper(relpath, writer, mode=mode,
588
create_parent_dir=create_parent_dir,
591
def iter_files_recursive(self):
592
"""Walk the relative paths of all files in this transport."""
593
queue = list(self.list_dir('.'))
595
relpath = queue.pop(0)
596
st = self.stat(relpath)
597
if stat.S_ISDIR(st.st_mode):
598
for i, basename in enumerate(self.list_dir(relpath)):
599
queue.insert(i, relpath+'/'+basename)
603
def _mkdir(self, abspath, mode=None):
609
self._get_sftp().mkdir(abspath, local_mode)
611
# chmod a dir through sftp will erase any sgid bit set
612
# on the server side. So, if the bit mode are already
613
# set, avoid the chmod. If the mode is not fine but
614
# the sgid bit is set, report a warning to the user
615
# with the umask fix.
616
stat = self._get_sftp().lstat(abspath)
617
mode = mode & 0777 # can't set special bits anyway
618
if mode != stat.st_mode & 0777:
619
if stat.st_mode & 06000:
620
warning('About to chmod %s over sftp, which will result'
621
' in its suid or sgid bits being cleared. If'
622
' you want to preserve those bits, change your '
623
' environment on the server to use umask 0%03o.'
624
% (abspath, 0777 - mode))
625
self._get_sftp().chmod(abspath, mode=mode)
626
except (paramiko.SSHException, IOError), e:
627
self._translate_io_exception(e, abspath, ': unable to mkdir',
628
failure_exc=FileExists)
630
def mkdir(self, relpath, mode=None):
631
"""Create a directory at the given path."""
632
self._mkdir(self._remote_path(relpath), mode=mode)
634
def open_write_stream(self, relpath, mode=None):
635
"""See Transport.open_write_stream."""
636
# initialise the file to zero-length
637
# this is three round trips, but we don't use this
638
# api more than once per write_group at the moment so
639
# it is a tolerable overhead. Better would be to truncate
640
# the file after opening. RBC 20070805
641
self.put_bytes_non_atomic(relpath, "", mode)
642
abspath = self._remote_path(relpath)
643
# TODO: jam 20060816 paramiko doesn't publicly expose a way to
644
# set the file mode at create time. If it does, use it.
645
# But for now, we just chmod later anyway.
648
handle = self._get_sftp().file(abspath, mode='wb')
649
handle.set_pipelined(True)
650
except (paramiko.SSHException, IOError), e:
651
self._translate_io_exception(e, abspath,
653
_file_streams[self.abspath(relpath)] = handle
654
return FileFileStream(self, relpath, handle)
656
def _translate_io_exception(self, e, path, more_info='',
657
failure_exc=PathError):
658
"""Translate a paramiko or IOError into a friendlier exception.
660
:param e: The original exception
661
:param path: The path in question when the error is raised
662
:param more_info: Extra information that can be included,
663
such as what was going on
664
:param failure_exc: Paramiko has the super fun ability to raise completely
665
opaque errors that just set "e.args = ('Failure',)" with
667
If this parameter is set, it defines the exception
668
to raise in these cases.
670
# paramiko seems to generate detailless errors.
671
self._translate_error(e, path, raise_generic=False)
672
if getattr(e, 'args', None) is not None:
673
if (e.args == ('No such file or directory',) or
674
e.args == ('No such file',)):
675
raise NoSuchFile(path, str(e) + more_info)
676
if (e.args == ('mkdir failed',) or
677
e.args[0].startswith('syserr: File exists')):
678
raise FileExists(path, str(e) + more_info)
679
# strange but true, for the paramiko server.
680
if (e.args == ('Failure',)):
681
raise failure_exc(path, str(e) + more_info)
682
mutter('Raising exception with args %s', e.args)
683
if getattr(e, 'errno', None) is not None:
684
mutter('Raising exception with errno %s', e.errno)
687
def append_file(self, relpath, f, mode=None):
689
Append the text in the file-like object into the final
693
path = self._remote_path(relpath)
694
fout = self._get_sftp().file(path, 'ab')
696
self._get_sftp().chmod(path, mode)
700
except (IOError, paramiko.SSHException), e:
701
self._translate_io_exception(e, relpath, ': unable to append')
703
def rename(self, rel_from, rel_to):
704
"""Rename without special overwriting"""
706
self._get_sftp().rename(self._remote_path(rel_from),
707
self._remote_path(rel_to))
708
except (IOError, paramiko.SSHException), e:
709
self._translate_io_exception(e, rel_from,
710
': unable to rename to %r' % (rel_to))
712
def _rename_and_overwrite(self, abs_from, abs_to):
713
"""Do a fancy rename on the remote server.
715
Using the implementation provided by osutils.
718
sftp = self._get_sftp()
719
fancy_rename(abs_from, abs_to,
720
rename_func=sftp.rename,
721
unlink_func=sftp.remove)
722
except (IOError, paramiko.SSHException), e:
723
self._translate_io_exception(e, abs_from,
724
': unable to rename to %r' % (abs_to))
726
def move(self, rel_from, rel_to):
727
"""Move the item at rel_from to the location at rel_to"""
728
path_from = self._remote_path(rel_from)
729
path_to = self._remote_path(rel_to)
730
self._rename_and_overwrite(path_from, path_to)
732
def delete(self, relpath):
733
"""Delete the item at relpath"""
734
path = self._remote_path(relpath)
736
self._get_sftp().remove(path)
737
except (IOError, paramiko.SSHException), e:
738
self._translate_io_exception(e, path, ': unable to delete')
740
def external_url(self):
741
"""See bzrlib.transport.Transport.external_url."""
742
# the external path for SFTP is the base
746
"""Return True if this store supports listing."""
749
def list_dir(self, relpath):
751
Return a list of all files at the given location.
753
# does anything actually use this?
755
# This is at least used by copy_tree for remote upgrades.
756
# -- David Allouche 2006-08-11
757
path = self._remote_path(relpath)
759
entries = self._get_sftp().listdir(path)
760
except (IOError, paramiko.SSHException), e:
761
self._translate_io_exception(e, path, ': failed to list_dir')
762
return [urlutils.escape(entry) for entry in entries]
764
def rmdir(self, relpath):
765
"""See Transport.rmdir."""
766
path = self._remote_path(relpath)
768
return self._get_sftp().rmdir(path)
769
except (IOError, paramiko.SSHException), e:
770
self._translate_io_exception(e, path, ': failed to rmdir')
772
def stat(self, relpath):
773
"""Return the stat information for a file."""
774
path = self._remote_path(relpath)
776
return self._get_sftp().stat(path)
777
except (IOError, paramiko.SSHException), e:
778
self._translate_io_exception(e, path, ': unable to stat')
780
def lock_read(self, relpath):
782
Lock the given file for shared (read) access.
783
:return: A lock object, which has an unlock() member function
785
# FIXME: there should be something clever i can do here...
786
class BogusLock(object):
787
def __init__(self, path):
791
return BogusLock(relpath)
793
def lock_write(self, relpath):
795
Lock the given file for exclusive (write) access.
796
WARNING: many transports do not support this, so trying avoid using it
798
:return: A lock object, which has an unlock() member function
800
# This is a little bit bogus, but basically, we create a file
801
# which should not already exist, and if it does, we assume
802
# that there is a lock, and if it doesn't, the we assume
803
# that we have taken the lock.
804
return SFTPLock(relpath, self)
806
def _sftp_open_exclusive(self, abspath, mode=None):
807
"""Open a remote path exclusively.
809
SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
810
the file already exists. However it does not expose this
811
at the higher level of SFTPClient.open(), so we have to
814
WARNING: This breaks the SFTPClient abstraction, so it
815
could easily break against an updated version of paramiko.
817
:param abspath: The remote absolute path where the file should be opened
818
:param mode: The mode permissions bits for the new file
820
# TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
821
# using the 'x' flag to indicate SFTP_FLAG_EXCL.
822
# However, there is no way to set the permission mode at open
823
# time using the sftp_client.file() functionality.
824
path = self._get_sftp()._adjust_cwd(abspath)
825
# mutter('sftp abspath %s => %s', abspath, path)
826
attr = SFTPAttributes()
829
omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
830
| SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
832
t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
834
raise TransportError('Expected an SFTP handle')
835
handle = msg.get_string()
836
return SFTPFile(self._get_sftp(), handle, 'wb', -1)
837
except (paramiko.SSHException, IOError), e:
838
self._translate_io_exception(e, abspath, ': unable to open',
839
failure_exc=FileExists)
841
def _can_roundtrip_unix_modebits(self):
842
if sys.platform == 'win32':
848
# ------------- server test implementation --------------
851
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
853
STUB_SERVER_KEY = """
854
-----BEGIN RSA PRIVATE KEY-----
855
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
856
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
857
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
858
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
859
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
860
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
861
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
862
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
863
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
864
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
865
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
866
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
867
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
868
-----END RSA PRIVATE KEY-----
872
class SocketListener(threading.Thread):
874
def __init__(self, callback):
875
threading.Thread.__init__(self)
876
self._callback = callback
877
self._socket = socket.socket()
878
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
879
self._socket.bind(('localhost', 0))
880
self._socket.listen(1)
881
self.port = self._socket.getsockname()[1]
882
self._stop_event = threading.Event()
885
# called from outside this thread
886
self._stop_event.set()
887
# use a timeout here, because if the test fails, the server thread may
888
# never notice the stop_event.
894
readable, writable_unused, exception_unused = \
895
select.select([self._socket], [], [], 0.1)
896
if self._stop_event.isSet():
898
if len(readable) == 0:
901
s, addr_unused = self._socket.accept()
902
# because the loopback socket is inline, and transports are
903
# never explicitly closed, best to launch a new thread.
904
threading.Thread(target=self._callback, args=(s,)).start()
905
except socket.error, x:
906
sys.excepthook(*sys.exc_info())
907
warning('Socket error during accept() within unit test server'
910
# probably a failed test; unit test thread will log the
912
sys.excepthook(*sys.exc_info())
913
warning('Exception from within unit test server thread: %r' %
917
class SocketDelay(object):
918
"""A socket decorator to make TCP appear slower.
920
This changes recv, send, and sendall to add a fixed latency to each python
921
call if a new roundtrip is detected. That is, when a recv is called and the
922
flag new_roundtrip is set, latency is charged. Every send and send_all
925
In addition every send, sendall and recv sleeps a bit per character send to
928
Not all methods are implemented, this is deliberate as this class is not a
929
replacement for the builtin sockets layer. fileno is not implemented to
930
prevent the proxy being bypassed.
934
_proxied_arguments = dict.fromkeys([
935
"close", "getpeername", "getsockname", "getsockopt", "gettimeout",
936
"setblocking", "setsockopt", "settimeout", "shutdown"])
938
def __init__(self, sock, latency, bandwidth=1.0,
941
:param bandwith: simulated bandwith (MegaBit)
942
:param really_sleep: If set to false, the SocketDelay will just
943
increase a counter, instead of calling time.sleep. This is useful for
944
unittesting the SocketDelay.
947
self.latency = latency
948
self.really_sleep = really_sleep
949
self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
950
self.new_roundtrip = False
953
if self.really_sleep:
956
SocketDelay.simulated_time += s
958
def __getattr__(self, attr):
959
if attr in SocketDelay._proxied_arguments:
960
return getattr(self.sock, attr)
961
raise AttributeError("'SocketDelay' object has no attribute %r" %
965
return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
968
def recv(self, *args):
969
data = self.sock.recv(*args)
970
if data and self.new_roundtrip:
971
self.new_roundtrip = False
972
self.sleep(self.latency)
973
self.sleep(len(data) * self.time_per_byte)
976
def sendall(self, data, flags=0):
977
if not self.new_roundtrip:
978
self.new_roundtrip = True
979
self.sleep(self.latency)
980
self.sleep(len(data) * self.time_per_byte)
981
return self.sock.sendall(data, flags)
983
def send(self, data, flags=0):
984
if not self.new_roundtrip:
985
self.new_roundtrip = True
986
self.sleep(self.latency)
987
bytes_sent = self.sock.send(data, flags)
988
self.sleep(bytes_sent * self.time_per_byte)
992
class SFTPServer(Server):
993
"""Common code for SFTP server facilities."""
995
def __init__(self, server_interface=StubServer):
996
self._original_vendor = None
998
self._server_homedir = None
999
self._listener = None
1001
self._vendor = ssh.ParamikoVendor()
1002
self._server_interface = server_interface
1005
self.add_latency = 0
1007
def _get_sftp_url(self, path):
1008
"""Calculate an sftp url to this server for path."""
1009
return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
1011
def log(self, message):
1012
"""StubServer uses this to log when a new server is created."""
1013
self.logs.append(message)
1015
def _run_server_entry(self, sock):
1016
"""Entry point for all implementations of _run_server.
1018
If self.add_latency is > 0.000001 then sock is given a latency adding
1021
if self.add_latency > 0.000001:
1022
sock = SocketDelay(sock, self.add_latency)
1023
return self._run_server(sock)
1025
def _run_server(self, s):
1026
ssh_server = paramiko.Transport(s)
1027
key_file = pathjoin(self._homedir, 'test_rsa.key')
1028
f = open(key_file, 'w')
1029
f.write(STUB_SERVER_KEY)
1031
host_key = paramiko.RSAKey.from_private_key_file(key_file)
1032
ssh_server.add_server_key(host_key)
1033
server = self._server_interface(self)
1034
ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
1035
StubSFTPServer, root=self._root,
1036
home=self._server_homedir)
1037
event = threading.Event()
1038
ssh_server.start_server(event, server)
1041
def setUp(self, backing_server=None):
1042
# XXX: TODO: make sftpserver back onto backing_server rather than local
1044
if not (backing_server is None or
1045
isinstance(backing_server, local.LocalURLServer)):
1046
raise AssertionError(
1047
"backing_server should not be %r, because this can only serve the "
1048
"local current working directory." % (backing_server,))
1049
self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
1050
ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
1051
if sys.platform == 'win32':
1052
# Win32 needs to use the UNICODE api
1053
self._homedir = getcwd()
1055
# But Linux SFTP servers should just deal in bytestreams
1056
self._homedir = os.getcwd()
1057
if self._server_homedir is None:
1058
self._server_homedir = self._homedir
1060
if sys.platform == 'win32':
1062
self._listener = SocketListener(self._run_server_entry)
1063
self._listener.setDaemon(True)
1064
self._listener.start()
1067
"""See bzrlib.transport.Server.tearDown."""
1068
self._listener.stop()
1069
ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
1071
def get_bogus_url(self):
1072
"""See bzrlib.transport.Server.get_bogus_url."""
1073
# this is chosen to try to prevent trouble with proxies, wierd dns, etc
1074
# we bind a random socket, so that we get a guaranteed unused port
1075
# we just never listen on that port
1077
s.bind(('localhost', 0))
1078
return 'sftp://%s:%s/' % s.getsockname()
1081
class SFTPFullAbsoluteServer(SFTPServer):
1082
"""A test server for sftp transports, using absolute urls and ssh."""
1085
"""See bzrlib.transport.Server.get_url."""
1086
homedir = self._homedir
1087
if sys.platform != 'win32':
1088
# Remove the initial '/' on all platforms but win32
1089
homedir = homedir[1:]
1090
return self._get_sftp_url(urlutils.escape(homedir))
1093
class SFTPServerWithoutSSH(SFTPServer):
1094
"""An SFTP server that uses a simple TCP socket pair rather than SSH."""
1097
super(SFTPServerWithoutSSH, self).__init__()
1098
self._vendor = ssh.LoopbackVendor()
1100
def _run_server(self, sock):
1101
# Re-import these as locals, so that they're still accessible during
1102
# interpreter shutdown (when all module globals get set to None, leading
1103
# to confusing errors like "'NoneType' object has no attribute 'error'".
1104
class FakeChannel(object):
1105
def get_transport(self):
1107
def get_log_channel(self):
1111
def get_hexdump(self):
1116
server = paramiko.SFTPServer(
1117
FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1118
root=self._root, home=self._server_homedir)
1120
server.start_subsystem(
1121
'sftp', None, ssh.SocketAsChannelAdapter(sock))
1122
except socket.error, e:
1123
if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1124
# it's okay for the client to disconnect abruptly
1125
# (bug in paramiko 1.6: it should absorb this exception)
1129
except Exception, e:
1130
# This typically seems to happen during interpreter shutdown, so
1131
# most of the useful ways to report this error are won't work.
1132
# Writing the exception type, and then the text of the exception,
1133
# seems to be the best we can do.
1135
sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
1136
sys.stderr.write('%s\n\n' % (e,))
1137
server.finish_subsystem()
1140
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1141
"""A test server for sftp transports, using absolute urls."""
1144
"""See bzrlib.transport.Server.get_url."""
1145
homedir = self._homedir
1146
if sys.platform != 'win32':
1147
# Remove the initial '/' on all platforms but win32
1148
homedir = homedir[1:]
1149
return self._get_sftp_url(urlutils.escape(homedir))
1152
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1153
"""A test server for sftp transports, using homedir relative urls."""
1156
"""See bzrlib.transport.Server.get_url."""
1157
return self._get_sftp_url("~/")
1160
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1161
"""A test server for sftp transports where only absolute paths will work.
1163
It does this by serving from a deeply-nested directory that doesn't exist.
1166
def setUp(self, backing_server=None):
1167
self._server_homedir = '/dev/noone/runs/tests/here'
1168
super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
1171
def get_test_permutations():
1172
"""Return the permutations to be used in testing."""
1173
return [(SFTPTransport, SFTPAbsoluteServer),
1174
(SFTPTransport, SFTPHomeDirServer),
1175
(SFTPTransport, SFTPSiblingAbsoluteServer),