1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005-2011, 2016, 2017 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Implementation of Transport over SFTP, using paramiko."""
19
from __future__ import absolute_import
19
21
# TODO: Remove the transport-based lock_read and lock_write methods. They'll
20
22
# then raise TransportNotPossible, which will break remote access to any
21
23
# formats which rely on OS-level locks. That should be fine as those formats
44
from bzrlib.errors import (FileExists,
45
NoSuchFile, PathNotChild,
44
from ..errors import (FileExists,
49
49
ParamikoNotPresent,
51
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
52
from bzrlib.symbol_versioning import (
55
from bzrlib.trace import mutter, warning
56
from bzrlib.transport import (
51
from ..osutils import fancy_rename
52
from ..sixish import (
55
from ..trace import mutter, warning
56
from ..transport import (
62
60
ConnectedTransport,
87
85
from paramiko.sftp_file import SFTPFile
90
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
91
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
92
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
88
# GZ 2017-05-25: Some dark hackery to monkeypatch out issues with paramiko's
89
# Python 3 compatibility code. Replace broken b() and asbytes() code.
91
from paramiko.py3compat import b as _bad
92
from paramiko.common import asbytes as _bad_asbytes
96
def _b_for_broken_paramiko(s, encoding='utf8'):
97
"""Hacked b() that does not raise TypeError."""
98
# https://github.com/paramiko/paramiko/issues/967
99
if not isinstance(s, bytes):
100
encode = getattr(s, 'encode', None)
101
if encode is not None:
102
return encode(encoding)
103
# Would like to pass buffer objects along, but have to realise.
104
tobytes = getattr(s, 'tobytes', None)
105
if tobytes is not None:
109
def _asbytes_for_broken_paramiko(s):
110
"""Hacked asbytes() that does not raise Exception."""
111
# https://github.com/paramiko/paramiko/issues/968
112
if not isinstance(s, bytes):
113
encode = getattr(s, 'encode', None)
114
if encode is not None:
115
return encode('utf8')
116
asbytes = getattr(s, 'asbytes', None)
117
if asbytes is not None:
121
_bad.__code__ = _b_for_broken_paramiko.__code__
122
_bad_asbytes.__code__ = _asbytes_for_broken_paramiko.__code__
95
125
class SFTPLock(object):
114
144
except FileExists:
115
145
raise LockError('File %r already locked' % (self.path,))
118
"""Should this warn, or actually try to cleanup?"""
120
warning("SFTPLock %r not explicitly unlocked" % (self.path,))
123
147
def unlock(self):
124
148
if not self.lock_file:
197
221
requests = self._get_requests()
198
222
offset_iter = iter(self.original_offsets)
199
cur_offset, cur_size = offset_iter.next()
223
cur_offset, cur_size = next(offset_iter)
200
224
# paramiko .readv() yields strings that are in the order of the requests
201
225
# So we track the current request to know where the next data is
202
226
# being returned from.
214
238
data_stream = itertools.chain(fp.readv(requests),
215
239
itertools.repeat(None))
216
for (start, length), data in itertools.izip(requests, data_stream):
240
for (start, length), data in zip(requests, data_stream):
218
242
if cur_coalesced is not None:
219
243
raise errors.ShortReadvError(self.relpath,
238
262
if buffered_len > 0:
239
263
# We haven't consumed the buffer so far, so put it into
240
264
# data_chunks, and continue.
241
buffered = ''.join(buffered_data)
265
buffered = b''.join(buffered_data)
242
266
data_chunks.append((input_start, buffered))
243
267
input_start = start
244
268
buffered_data = [data]
249
273
# into a single string. We also have the nice property that
250
274
# when there is only one string ''.join([x]) == x, so there is
251
275
# no data copying.
252
buffered = ''.join(buffered_data)
276
buffered = b''.join(buffered_data)
253
277
# Clean out buffered data so that we keep memory
254
278
# consumption low
255
279
del buffered_data[:]
270
294
input_start += cur_size
271
295
# Yield the requested data
272
296
yield cur_offset, cur_data
273
cur_offset, cur_size = offset_iter.next()
297
cur_offset, cur_size = next(offset_iter)
274
298
# at this point, we've consumed as much of buffered as we can,
275
299
# so break off the portion that we consumed
276
300
if buffered_offset == len(buffered_data):
281
305
buffered = buffered[buffered_offset:]
282
306
buffered_data = [buffered]
283
307
buffered_len = len(buffered)
308
# now that the data stream is done, close the handle
285
buffered = ''.join(buffered_data)
311
buffered = b''.join(buffered_data)
286
312
del buffered_data[:]
287
313
data_chunks.append((input_start, buffered))
289
315
if 'sftp' in debug.debug_flags:
290
316
mutter('SFTP readv left with %d out-of-order bytes',
291
sum(map(lambda x: len(x[1]), data_chunks)))
317
sum(len(x[1]) for x in data_chunks))
292
318
# We've processed all the readv data, at this point, anything we
293
319
# couldn't process is in data_chunks. This doesn't happen often, so
294
320
# this code path isn't optimized
317
343
' We expected %d bytes, but only found %d'
318
344
% (cur_size, len(data)))
319
345
yield cur_offset, data
320
cur_offset, cur_size = offset_iter.next()
346
cur_offset, cur_size = next(offset_iter)
323
349
class SFTPTransport(ConnectedTransport):
324
350
"""Transport implementation for SFTP access."""
326
_do_prefetch = _default_do_prefetch
327
352
# TODO: jam 20060717 Conceivably these could be configurable, either
328
353
# by auto-tuning at run-time, or by a configuration (per host??)
329
354
# but the performance curve is pretty flat, so just going with
341
366
# up the request itself, rather than us having to worry about it
342
367
_max_request_size = 32768
344
def __init__(self, base, _from_transport=None):
345
super(SFTPTransport, self).__init__(base,
346
_from_transport=_from_transport)
348
369
def _remote_path(self, relpath):
349
370
"""Return the path to be passed along the sftp protocol for relpath.
351
372
:param relpath: is a urlencoded string.
353
relative = urlutils.unescape(relpath).encode('utf-8')
354
remote_path = self._combine_paths(self._path, relative)
374
remote_path = self._parsed_url.clone(relpath).path
355
375
# the initial slash should be removed from the path, and treated as a
356
376
# homedir relative path (the path begins with a double slash if it is
357
377
# absolute). see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
376
396
in base url at transport creation time.
378
398
if credentials is None:
379
password = self._password
399
password = self._parsed_url.password
381
401
password = credentials
383
403
vendor = ssh._get_ssh_vendor()
404
user = self._parsed_url.user
386
406
auth = config.AuthenticationConfig()
387
user = auth.get_user('ssh', self._host, self._port)
388
connection = vendor.connect_sftp(self._user, password,
389
self._host, self._port)
407
user = auth.get_user('ssh', self._parsed_url.host,
408
self._parsed_url.port)
409
connection = vendor.connect_sftp(self._parsed_url.user, password,
410
self._parsed_url.host, self._parsed_url.port)
390
411
return connection, (user, password)
413
def disconnect(self):
414
connection = self._get_connection()
415
if connection is not None:
392
418
def _get_sftp(self):
393
419
"""Ensures that a connection is established"""
394
420
connection = self._get_connection()
416
442
:param relpath: The relative path to the file
419
# FIXME: by returning the file directly, we don't pass this
420
# through to report_activity. We could try wrapping the object
421
# before it's returned. For readv and get_bytes it's handled in
422
# the higher-level function.
424
445
path = self._remote_path(relpath)
425
446
f = self._get_sftp().file(path, mode='rb')
426
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
447
size = f.stat().st_size
448
if getattr(f, 'prefetch', None) is not None:
429
except (IOError, paramiko.SSHException), e:
451
except (IOError, paramiko.SSHException) as e:
430
452
self._translate_io_exception(e, path, ': error retrieving',
431
453
failure_exc=errors.ReadError)
433
455
def get_bytes(self, relpath):
434
456
# reimplement this here so that we can report how many bytes came back
435
f = self.get(relpath)
457
with self.get(relpath) as f:
438
459
self._report_activity(len(bytes), 'read')
443
462
def _readv(self, relpath, offsets):
444
463
"""See Transport.readv()"""
457
476
if 'sftp' in debug.debug_flags:
458
477
mutter('seek and read %s offsets', len(offsets))
459
478
return self._seek_and_read(fp, offsets, relpath)
460
except (IOError, paramiko.SSHException), e:
479
except (IOError, paramiko.SSHException) as e:
461
480
self._translate_io_exception(e, path, ': error retrieving')
463
482
def recommended_page_size(self):
492
511
def _put(self, abspath, f, mode=None):
493
512
"""Helper function so both put() and copy_abspaths can reuse the code"""
494
513
tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
495
os.getpid(), random.randint(0,0x7FFFFFFF))
514
os.getpid(), random.randint(0, 0x7FFFFFFF))
496
515
fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
500
519
fout.set_pipelined(True)
501
520
length = self._pump(f, fout)
502
except (IOError, paramiko.SSHException), e:
521
except (IOError, paramiko.SSHException) as e:
503
522
self._translate_io_exception(e, tmp_abspath)
504
523
# XXX: This doesn't truly help like we would like it to.
505
524
# The problem is that openssh strips sticky bits. So while we
521
540
self._rename_and_overwrite(tmp_abspath, abspath)
542
except Exception as e:
524
543
# If we fail, try to clean up the temporary file
525
544
# before we throw the exception
526
545
# but don't let another exception mess things up
555
574
fout = self._get_sftp().file(abspath, mode='wb')
556
575
fout.set_pipelined(True)
558
except (paramiko.SSHException, IOError), e:
577
except (paramiko.SSHException, IOError) as e:
559
578
self._translate_io_exception(e, abspath,
560
579
': unable to open')
606
625
create_parent_dir=create_parent_dir,
607
626
dir_mode=dir_mode)
609
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
628
def put_bytes_non_atomic(self, relpath, raw_bytes, mode=None,
610
629
create_parent_dir=False,
631
if not isinstance(raw_bytes, bytes):
633
'raw_bytes must be a plain string, not %s' % type(raw_bytes))
612
635
def writer(fout):
636
fout.write(raw_bytes)
614
637
self._put_non_atomic_helper(relpath, writer, mode=mode,
615
638
create_parent_dir=create_parent_dir,
616
639
dir_mode=dir_mode)
644
667
# the sgid bit is set, report a warning to the user
645
668
# with the umask fix.
646
669
stat = self._get_sftp().lstat(abspath)
647
mode = mode & 0777 # can't set special bits anyway
648
if mode != stat.st_mode & 0777:
649
if stat.st_mode & 06000:
670
mode = mode & 0o777 # can't set special bits anyway
671
if mode != stat.st_mode & 0o777:
672
if stat.st_mode & 0o6000:
650
673
warning('About to chmod %s over sftp, which will result'
651
674
' in its suid or sgid bits being cleared. If'
652
675
' you want to preserve those bits, change your '
653
676
' environment on the server to use umask 0%03o.'
654
% (abspath, 0777 - mode))
677
% (abspath, 0o777 - mode))
655
678
self._get_sftp().chmod(abspath, mode=mode)
656
except (paramiko.SSHException, IOError), e:
679
except (paramiko.SSHException, IOError) as e:
657
680
self._translate_io_exception(e, abspath, ': unable to mkdir',
658
681
failure_exc=FileExists)
668
691
# api more than once per write_group at the moment so
669
692
# it is a tolerable overhead. Better would be to truncate
670
693
# the file after opening. RBC 20070805
671
self.put_bytes_non_atomic(relpath, "", mode)
694
self.put_bytes_non_atomic(relpath, b"", mode)
672
695
abspath = self._remote_path(relpath)
673
696
# TODO: jam 20060816 paramiko doesn't publicly expose a way to
674
697
# set the file mode at create time. If it does, use it.
678
701
handle = self._get_sftp().file(abspath, mode='wb')
679
702
handle.set_pipelined(True)
680
except (paramiko.SSHException, IOError), e:
703
except (paramiko.SSHException, IOError) as e:
681
704
self._translate_io_exception(e, abspath,
682
705
': unable to open')
683
706
_file_streams[self.abspath(relpath)] = handle
715
738
if (e.args[0].startswith('Directory not empty: ')
716
739
or getattr(e, 'errno', None) == errno.ENOTEMPTY):
717
740
raise errors.DirectoryNotEmpty(path, str(e))
741
if e.args == ('Operation unsupported',):
742
raise errors.TransportNotPossible()
718
743
mutter('Raising exception with args %s', e.args)
719
744
if getattr(e, 'errno', None) is not None:
720
745
mutter('Raising exception with errno %s', e.errno)
733
758
result = fout.tell()
734
759
self._pump(f, fout)
736
except (IOError, paramiko.SSHException), e:
761
except (IOError, paramiko.SSHException) as e:
737
762
self._translate_io_exception(e, relpath, ': unable to append')
739
764
def rename(self, rel_from, rel_to):
742
767
self._get_sftp().rename(self._remote_path(rel_from),
743
768
self._remote_path(rel_to))
744
except (IOError, paramiko.SSHException), e:
769
except (IOError, paramiko.SSHException) as e:
745
770
self._translate_io_exception(e, rel_from,
746
771
': unable to rename to %r' % (rel_to))
755
780
fancy_rename(abs_from, abs_to,
756
781
rename_func=sftp.rename,
757
782
unlink_func=sftp.remove)
758
except (IOError, paramiko.SSHException), e:
783
except (IOError, paramiko.SSHException) as e:
759
784
self._translate_io_exception(e, abs_from,
760
785
': unable to rename to %r' % (abs_to))
770
795
path = self._remote_path(relpath)
772
797
self._get_sftp().remove(path)
773
except (IOError, paramiko.SSHException), e:
798
except (IOError, paramiko.SSHException) as e:
774
799
self._translate_io_exception(e, path, ': unable to delete')
776
801
def external_url(self):
777
"""See bzrlib.transport.Transport.external_url."""
802
"""See breezy.transport.Transport.external_url."""
778
803
# the external path for SFTP is the base
795
820
entries = self._get_sftp().listdir(path)
796
821
self._report_activity(sum(map(len, entries)), 'read')
797
except (IOError, paramiko.SSHException), e:
822
except (IOError, paramiko.SSHException) as e:
798
823
self._translate_io_exception(e, path, ': failed to list_dir')
799
824
return [urlutils.escape(entry) for entry in entries]
803
828
path = self._remote_path(relpath)
805
830
return self._get_sftp().rmdir(path)
806
except (IOError, paramiko.SSHException), e:
831
except (IOError, paramiko.SSHException) as e:
807
832
self._translate_io_exception(e, path, ': failed to rmdir')
809
834
def stat(self, relpath):
811
836
path = self._remote_path(relpath)
813
838
return self._get_sftp().lstat(path)
814
except (IOError, paramiko.SSHException), e:
839
except (IOError, paramiko.SSHException) as e:
815
840
self._translate_io_exception(e, path, ': unable to stat')
817
842
def readlink(self, relpath):
819
844
path = self._remote_path(relpath)
821
846
return self._get_sftp().readlink(path)
822
except (IOError, paramiko.SSHException), e:
847
except (IOError, paramiko.SSHException) as e:
823
848
self._translate_io_exception(e, path, ': unable to readlink')
825
850
def symlink(self, source, link_name):
832
857
'%r: unable to create symlink to %r' % (link_name, source),
835
except (IOError, paramiko.SSHException), e:
860
except (IOError, paramiko.SSHException) as e:
836
861
self._translate_io_exception(e, link_name,
837
862
': unable to create symlink to %r' % (source))
848
873
def unlock(self):
875
def __exit__(self, exc_type, exc_val, exc_tb):
850
879
return BogusLock(relpath)
852
881
def lock_write(self, relpath):
893
922
raise TransportError('Expected an SFTP handle')
894
923
handle = msg.get_string()
895
924
return SFTPFile(self._get_sftp(), handle, 'wb', -1)
896
except (paramiko.SSHException, IOError), e:
925
except (paramiko.SSHException, IOError) as e:
897
926
self._translate_io_exception(e, abspath, ': unable to open',
898
927
failure_exc=FileExists)
908
937
def get_test_permutations():
909
938
"""Return the permutations to be used in testing."""
910
from bzrlib.tests import stub_sftp
939
from ..tests import stub_sftp
911
940
return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
912
941
(SFTPTransport, stub_sftp.SFTPHomeDirServer),
913
942
(SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),