/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

Merge from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006 Canonical Ltd
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
 
18
"""Implementation of Transport over SFTP, using paramiko."""
 
19
 
 
20
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
 
21
# then raise TransportNotPossible, which will break remote access to any
 
22
# formats which rely on OS-level locks.  That should be fine as those formats
 
23
# are pretty old, but these combinations may have to be removed from the test
 
24
# suite.
 
25
 
 
26
import errno
 
27
import os
 
28
import random
 
29
import select
 
30
import socket
 
31
import stat
 
32
import subprocess
 
33
import sys
 
34
import time
 
35
import urllib
 
36
import urlparse
 
37
import weakref
 
38
 
 
39
from bzrlib.errors import (FileExists, 
 
40
                           NoSuchFile, PathNotChild,
 
41
                           TransportError,
 
42
                           LockError, 
 
43
                           PathError,
 
44
                           ParamikoNotPresent,
 
45
                           UnknownSSH,
 
46
                           )
 
47
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
48
from bzrlib.trace import mutter, warning
 
49
from bzrlib.transport import (
 
50
    register_urlparse_netloc_protocol,
 
51
    Server,
 
52
    split_url,
 
53
    ssh,
 
54
    Transport,
 
55
    )
 
56
import bzrlib.urlutils as urlutils
 
57
 
 
58
try:
 
59
    import paramiko
 
60
except ImportError, e:
 
61
    raise ParamikoNotPresent(e)
 
62
else:
 
63
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
 
64
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
 
65
                               CMD_HANDLE, CMD_OPEN)
 
66
    from paramiko.sftp_attr import SFTPAttributes
 
67
    from paramiko.sftp_file import SFTPFile
 
68
 
 
69
 
 
70
register_urlparse_netloc_protocol('sftp')
 
71
 
 
72
 
 
73
# This is a weakref dictionary, so that we can reuse connections
 
74
# that are still active. Long term, it might be nice to have some
 
75
# sort of expiration policy, such as disconnect if inactive for
 
76
# X seconds. But that requires a lot more fanciness.
 
77
_connected_hosts = weakref.WeakValueDictionary()
 
78
 
 
79
 
 
80
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
 
81
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
 
82
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
 
83
 
 
84
 
 
85
def clear_connection_cache():
 
86
    """Remove all hosts from the SFTP connection cache.
 
87
 
 
88
    Primarily useful for test cases wanting to force garbage collection.
 
89
    """
 
90
    _connected_hosts.clear()
 
91
 
 
92
 
 
93
class SFTPLock(object):
 
94
    """This fakes a lock in a remote location.
 
95
    
 
96
    A present lock is indicated just by the existence of a file.  This
 
97
    doesn't work well on all transports and they are only used in 
 
98
    deprecated storage formats.
 
99
    """
 
100
    
 
101
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
 
102
 
 
103
    def __init__(self, path, transport):
 
104
        assert isinstance(transport, SFTPTransport)
 
105
 
 
106
        self.lock_file = None
 
107
        self.path = path
 
108
        self.lock_path = path + '.write-lock'
 
109
        self.transport = transport
 
110
        try:
 
111
            # RBC 20060103 FIXME should we be using private methods here ?
 
112
            abspath = transport._remote_path(self.lock_path)
 
113
            self.lock_file = transport._sftp_open_exclusive(abspath)
 
114
        except FileExists:
 
115
            raise LockError('File %r already locked' % (self.path,))
 
116
 
 
117
    def __del__(self):
 
118
        """Should this warn, or actually try to cleanup?"""
 
119
        if self.lock_file:
 
120
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
 
121
            self.unlock()
 
122
 
 
123
    def unlock(self):
 
124
        if not self.lock_file:
 
125
            return
 
126
        self.lock_file.close()
 
127
        self.lock_file = None
 
128
        try:
 
129
            self.transport.delete(self.lock_path)
 
130
        except (NoSuchFile,):
 
131
            # What specific errors should we catch here?
 
132
            pass
 
133
 
 
134
 
 
135
class SFTPTransport(Transport):
 
136
    """Transport implementation for SFTP access."""
 
137
 
 
138
    _do_prefetch = _default_do_prefetch
 
139
    # TODO: jam 20060717 Conceivably these could be configurable, either
 
140
    #       by auto-tuning at run-time, or by a configuration (per host??)
 
141
    #       but the performance curve is pretty flat, so just going with
 
142
    #       reasonable defaults.
 
143
    _max_readv_combine = 200
 
144
    # Having to round trip to the server means waiting for a response,
 
145
    # so it is better to download extra bytes.
 
146
    # 8KiB had good performance for both local and remote network operations
 
147
    _bytes_to_read_before_seek = 8192
 
148
 
 
149
    # The sftp spec says that implementations SHOULD allow reads
 
150
    # to be at least 32K. paramiko.readv() does an async request
 
151
    # for the chunks. So we need to keep it within a single request
 
152
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
 
153
    # up the request itself, rather than us having to worry about it
 
154
    _max_request_size = 32768
 
155
 
 
156
    def __init__(self, base, clone_from=None):
 
157
        assert base.startswith('sftp://')
 
158
        self._parse_url(base)
 
159
        base = self._unparse_url()
 
160
        if base[-1] != '/':
 
161
            base += '/'
 
162
        super(SFTPTransport, self).__init__(base)
 
163
        if clone_from is None:
 
164
            self._sftp_connect()
 
165
        else:
 
166
            # use the same ssh connection, etc
 
167
            self._sftp = clone_from._sftp
 
168
        # super saves 'self.base'
 
169
    
 
170
    def should_cache(self):
 
171
        """
 
172
        Return True if the data pulled across should be cached locally.
 
173
        """
 
174
        return True
 
175
 
 
176
    def clone(self, offset=None):
 
177
        """
 
178
        Return a new SFTPTransport with root at self.base + offset.
 
179
        We share the same SFTP session between such transports, because it's
 
180
        fairly expensive to set them up.
 
181
        """
 
182
        if offset is None:
 
183
            return SFTPTransport(self.base, self)
 
184
        else:
 
185
            return SFTPTransport(self.abspath(offset), self)
 
186
 
 
187
    def abspath(self, relpath):
 
188
        """
 
189
        Return the full url to the given relative path.
 
190
        
 
191
        @param relpath: the relative path or path components
 
192
        @type relpath: str or list
 
193
        """
 
194
        return self._unparse_url(self._remote_path(relpath))
 
195
    
 
196
    def _remote_path(self, relpath):
 
197
        """Return the path to be passed along the sftp protocol for relpath.
 
198
        
 
199
        relpath is a urlencoded string.
 
200
 
 
201
        :return: a path prefixed with / for regular abspath-based urls, or a
 
202
            path that does not begin with / for urls which begin with /~/.
 
203
        """
 
204
        # FIXME: share the common code across transports
 
205
        assert isinstance(relpath, basestring)
 
206
        basepath = self._path.split('/')
 
207
        if relpath.startswith('/'):
 
208
            basepath = ['', '']
 
209
        relpath = urlutils.unescape(relpath).split('/')
 
210
        if len(basepath) > 0 and basepath[-1] == '':
 
211
            basepath = basepath[:-1]
 
212
 
 
213
        for p in relpath:
 
214
            if p == '..':
 
215
                if len(basepath) == 0:
 
216
                    # In most filesystems, a request for the parent
 
217
                    # of root, just returns root.
 
218
                    continue
 
219
                basepath.pop()
 
220
            elif p == '.':
 
221
                continue # No-op
 
222
            elif p != '':
 
223
                basepath.append(p)
 
224
 
 
225
        path = '/'.join(basepath)
 
226
        # mutter('relpath => remotepath %s => %s', relpath, path)
 
227
        return path
 
228
 
 
229
    def relpath(self, abspath):
 
230
        username, password, host, port, path = self._split_url(abspath)
 
231
        error = []
 
232
        if (username != self._username):
 
233
            error.append('username mismatch')
 
234
        if (host != self._host):
 
235
            error.append('host mismatch')
 
236
        if (port != self._port):
 
237
            error.append('port mismatch')
 
238
        if (not path.startswith(self._path)):
 
239
            error.append('path mismatch')
 
240
        if error:
 
241
            extra = ': ' + ', '.join(error)
 
242
            raise PathNotChild(abspath, self.base, extra=extra)
 
243
        pl = len(self._path)
 
244
        return path[pl:].strip('/')
 
245
 
 
246
    def has(self, relpath):
 
247
        """
 
248
        Does the target location exist?
 
249
        """
 
250
        try:
 
251
            self._sftp.stat(self._remote_path(relpath))
 
252
            return True
 
253
        except IOError:
 
254
            return False
 
255
 
 
256
    def get(self, relpath):
 
257
        """
 
258
        Get the file at the given relative path.
 
259
 
 
260
        :param relpath: The relative path to the file
 
261
        """
 
262
        try:
 
263
            path = self._remote_path(relpath)
 
264
            f = self._sftp.file(path, mode='rb')
 
265
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
 
266
                f.prefetch()
 
267
            return f
 
268
        except (IOError, paramiko.SSHException), e:
 
269
            self._translate_io_exception(e, path, ': error retrieving')
 
270
 
 
271
    def readv(self, relpath, offsets):
 
272
        """See Transport.readv()"""
 
273
        # We overload the default readv() because we want to use a file
 
274
        # that does not have prefetch enabled.
 
275
        # Also, if we have a new paramiko, it implements an async readv()
 
276
        if not offsets:
 
277
            return
 
278
 
 
279
        try:
 
280
            path = self._remote_path(relpath)
 
281
            fp = self._sftp.file(path, mode='rb')
 
282
            readv = getattr(fp, 'readv', None)
 
283
            if readv:
 
284
                return self._sftp_readv(fp, offsets)
 
285
            mutter('seek and read %s offsets', len(offsets))
 
286
            return self._seek_and_read(fp, offsets)
 
287
        except (IOError, paramiko.SSHException), e:
 
288
            self._translate_io_exception(e, path, ': error retrieving')
 
289
 
 
290
    def _sftp_readv(self, fp, offsets):
 
291
        """Use the readv() member of fp to do async readv.
 
292
 
 
293
        And then read them using paramiko.readv(). paramiko.readv()
 
294
        does not support ranges > 64K, so it caps the request size, and
 
295
        just reads until it gets all the stuff it wants
 
296
        """
 
297
        offsets = list(offsets)
 
298
        sorted_offsets = sorted(offsets)
 
299
 
 
300
        # The algorithm works as follows:
 
301
        # 1) Coalesce nearby reads into a single chunk
 
302
        #    This generates a list of combined regions, the total size
 
303
        #    and the size of the sub regions. This coalescing step is limited
 
304
        #    in the number of nearby chunks to combine, and is allowed to
 
305
        #    skip small breaks in the requests. Limiting it makes sure that
 
306
        #    we can start yielding some data earlier, and skipping means we
 
307
        #    make fewer requests. (Beneficial even when using async)
 
308
        # 2) Break up this combined regions into chunks that are smaller
 
309
        #    than 64KiB. Technically the limit is 65536, but we are a
 
310
        #    little bit conservative. This is because sftp has a maximum
 
311
        #    return chunk size of 64KiB (max size of an unsigned short)
 
312
        # 3) Issue a readv() to paramiko to create an async request for
 
313
        #    all of this data
 
314
        # 4) Read in the data as it comes back, until we've read one
 
315
        #    continuous section as determined in step 1
 
316
        # 5) Break up the full sections into hunks for the original requested
 
317
        #    offsets. And put them in a cache
 
318
        # 6) Check if the next request is in the cache, and if it is, remove
 
319
        #    it from the cache, and yield its data. Continue until no more
 
320
        #    entries are in the cache.
 
321
        # 7) loop back to step 4 until all data has been read
 
322
        #
 
323
        # TODO: jam 20060725 This could be optimized one step further, by
 
324
        #       attempting to yield whatever data we have read, even before
 
325
        #       the first coallesced section has been fully processed.
 
326
 
 
327
        # When coalescing for use with readv(), we don't really need to
 
328
        # use any fudge factor, because the requests are made asynchronously
 
329
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
330
                               limit=self._max_readv_combine,
 
331
                               fudge_factor=0,
 
332
                               ))
 
333
        requests = []
 
334
        for c_offset in coalesced:
 
335
            start = c_offset.start
 
336
            size = c_offset.length
 
337
 
 
338
            # We need to break this up into multiple requests
 
339
            while size > 0:
 
340
                next_size = min(size, self._max_request_size)
 
341
                requests.append((start, next_size))
 
342
                size -= next_size
 
343
                start += next_size
 
344
 
 
345
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
346
                len(offsets), len(coalesced), len(requests))
 
347
 
 
348
        # Queue the current read until we have read the full coalesced section
 
349
        cur_data = []
 
350
        cur_data_len = 0
 
351
        cur_coalesced_stack = iter(coalesced)
 
352
        cur_coalesced = cur_coalesced_stack.next()
 
353
 
 
354
        # Cache the results, but only until they have been fulfilled
 
355
        data_map = {}
 
356
        # turn the list of offsets into a stack
 
357
        offset_stack = iter(offsets)
 
358
        cur_offset_and_size = offset_stack.next()
 
359
 
 
360
        for data in fp.readv(requests):
 
361
            cur_data += data
 
362
            cur_data_len += len(data)
 
363
 
 
364
            if cur_data_len < cur_coalesced.length:
 
365
                continue
 
366
            assert cur_data_len == cur_coalesced.length, \
 
367
                "Somehow we read too much: %s != %s" % (cur_data_len,
 
368
                                                        cur_coalesced.length)
 
369
            all_data = ''.join(cur_data)
 
370
            cur_data = []
 
371
            cur_data_len = 0
 
372
 
 
373
            for suboffset, subsize in cur_coalesced.ranges:
 
374
                key = (cur_coalesced.start+suboffset, subsize)
 
375
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
376
 
 
377
            # Now that we've read some data, see if we can yield anything back
 
378
            while cur_offset_and_size in data_map:
 
379
                this_data = data_map.pop(cur_offset_and_size)
 
380
                yield cur_offset_and_size[0], this_data
 
381
                cur_offset_and_size = offset_stack.next()
 
382
 
 
383
            # Now that we've read all of the data for this coalesced section
 
384
            # on to the next
 
385
            cur_coalesced = cur_coalesced_stack.next()
 
386
 
 
387
    def put_file(self, relpath, f, mode=None):
 
388
        """
 
389
        Copy the file-like object into the location.
 
390
 
 
391
        :param relpath: Location to put the contents, relative to base.
 
392
        :param f:       File-like object.
 
393
        :param mode: The final mode for the file
 
394
        """
 
395
        final_path = self._remote_path(relpath)
 
396
        self._put(final_path, f, mode=mode)
 
397
 
 
398
    def _put(self, abspath, f, mode=None):
 
399
        """Helper function so both put() and copy_abspaths can reuse the code"""
 
400
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
 
401
                        os.getpid(), random.randint(0,0x7FFFFFFF))
 
402
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
 
403
        closed = False
 
404
        try:
 
405
            try:
 
406
                fout.set_pipelined(True)
 
407
                self._pump(f, fout)
 
408
            except (IOError, paramiko.SSHException), e:
 
409
                self._translate_io_exception(e, tmp_abspath)
 
410
            # XXX: This doesn't truly help like we would like it to.
 
411
            #      The problem is that openssh strips sticky bits. So while we
 
412
            #      can properly set group write permission, we lose the group
 
413
            #      sticky bit. So it is probably best to stop chmodding, and
 
414
            #      just tell users that they need to set the umask correctly.
 
415
            #      The attr.st_mode = mode, in _sftp_open_exclusive
 
416
            #      will handle when the user wants the final mode to be more 
 
417
            #      restrictive. And then we avoid a round trip. Unless 
 
418
            #      paramiko decides to expose an async chmod()
 
419
 
 
420
            # This is designed to chmod() right before we close.
 
421
            # Because we set_pipelined() earlier, theoretically we might 
 
422
            # avoid the round trip for fout.close()
 
423
            if mode is not None:
 
424
                self._sftp.chmod(tmp_abspath, mode)
 
425
            fout.close()
 
426
            closed = True
 
427
            self._rename_and_overwrite(tmp_abspath, abspath)
 
428
        except Exception, e:
 
429
            # If we fail, try to clean up the temporary file
 
430
            # before we throw the exception
 
431
            # but don't let another exception mess things up
 
432
            # Write out the traceback, because otherwise
 
433
            # the catch and throw destroys it
 
434
            import traceback
 
435
            mutter(traceback.format_exc())
 
436
            try:
 
437
                if not closed:
 
438
                    fout.close()
 
439
                self._sftp.remove(tmp_abspath)
 
440
            except:
 
441
                # raise the saved except
 
442
                raise e
 
443
            # raise the original with its traceback if we can.
 
444
            raise
 
445
 
 
446
    def _put_non_atomic_helper(self, relpath, writer, mode=None,
 
447
                               create_parent_dir=False,
 
448
                               dir_mode=None):
 
449
        abspath = self._remote_path(relpath)
 
450
 
 
451
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
452
        #       set the file mode at create time. If it does, use it.
 
453
        #       But for now, we just chmod later anyway.
 
454
 
 
455
        def _open_and_write_file():
 
456
            """Try to open the target file, raise error on failure"""
 
457
            fout = None
 
458
            try:
 
459
                try:
 
460
                    fout = self._sftp.file(abspath, mode='wb')
 
461
                    fout.set_pipelined(True)
 
462
                    writer(fout)
 
463
                except (paramiko.SSHException, IOError), e:
 
464
                    self._translate_io_exception(e, abspath,
 
465
                                                 ': unable to open')
 
466
 
 
467
                # This is designed to chmod() right before we close.
 
468
                # Because we set_pipelined() earlier, theoretically we might 
 
469
                # avoid the round trip for fout.close()
 
470
                if mode is not None:
 
471
                    self._sftp.chmod(abspath, mode)
 
472
            finally:
 
473
                if fout is not None:
 
474
                    fout.close()
 
475
 
 
476
        if not create_parent_dir:
 
477
            _open_and_write_file()
 
478
            return
 
479
 
 
480
        # Try error handling to create the parent directory if we need to
 
481
        try:
 
482
            _open_and_write_file()
 
483
        except NoSuchFile:
 
484
            # Try to create the parent directory, and then go back to
 
485
            # writing the file
 
486
            parent_dir = os.path.dirname(abspath)
 
487
            self._mkdir(parent_dir, dir_mode)
 
488
            _open_and_write_file()
 
489
 
 
490
    def put_file_non_atomic(self, relpath, f, mode=None,
 
491
                            create_parent_dir=False,
 
492
                            dir_mode=None):
 
493
        """Copy the file-like object into the target location.
 
494
 
 
495
        This function is not strictly safe to use. It is only meant to
 
496
        be used when you already know that the target does not exist.
 
497
        It is not safe, because it will open and truncate the remote
 
498
        file. So there may be a time when the file has invalid contents.
 
499
 
 
500
        :param relpath: The remote location to put the contents.
 
501
        :param f:       File-like object.
 
502
        :param mode:    Possible access permissions for new file.
 
503
                        None means do not set remote permissions.
 
504
        :param create_parent_dir: If we cannot create the target file because
 
505
                        the parent directory does not exist, go ahead and
 
506
                        create it, and then try again.
 
507
        """
 
508
        def writer(fout):
 
509
            self._pump(f, fout)
 
510
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
511
                                    create_parent_dir=create_parent_dir,
 
512
                                    dir_mode=dir_mode)
 
513
 
 
514
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
515
                             create_parent_dir=False,
 
516
                             dir_mode=None):
 
517
        def writer(fout):
 
518
            fout.write(bytes)
 
519
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
520
                                    create_parent_dir=create_parent_dir,
 
521
                                    dir_mode=dir_mode)
 
522
 
 
523
    def iter_files_recursive(self):
 
524
        """Walk the relative paths of all files in this transport."""
 
525
        queue = list(self.list_dir('.'))
 
526
        while queue:
 
527
            relpath = queue.pop(0)
 
528
            st = self.stat(relpath)
 
529
            if stat.S_ISDIR(st.st_mode):
 
530
                for i, basename in enumerate(self.list_dir(relpath)):
 
531
                    queue.insert(i, relpath+'/'+basename)
 
532
            else:
 
533
                yield relpath
 
534
 
 
535
    def _mkdir(self, abspath, mode=None):
 
536
        if mode is None:
 
537
            local_mode = 0777
 
538
        else:
 
539
            local_mode = mode
 
540
        try:
 
541
            self._sftp.mkdir(abspath, local_mode)
 
542
            if mode is not None:
 
543
                self._sftp.chmod(abspath, mode=mode)
 
544
        except (paramiko.SSHException, IOError), e:
 
545
            self._translate_io_exception(e, abspath, ': unable to mkdir',
 
546
                failure_exc=FileExists)
 
547
 
 
548
    def mkdir(self, relpath, mode=None):
 
549
        """Create a directory at the given path."""
 
550
        self._mkdir(self._remote_path(relpath), mode=mode)
 
551
 
 
552
    def _translate_io_exception(self, e, path, more_info='', 
 
553
                                failure_exc=PathError):
 
554
        """Translate a paramiko or IOError into a friendlier exception.
 
555
 
 
556
        :param e: The original exception
 
557
        :param path: The path in question when the error is raised
 
558
        :param more_info: Extra information that can be included,
 
559
                          such as what was going on
 
560
        :param failure_exc: Paramiko has the super fun ability to raise completely
 
561
                           opaque errors that just set "e.args = ('Failure',)" with
 
562
                           no more information.
 
563
                           If this parameter is set, it defines the exception 
 
564
                           to raise in these cases.
 
565
        """
 
566
        # paramiko seems to generate detailless errors.
 
567
        self._translate_error(e, path, raise_generic=False)
 
568
        if getattr(e, 'args', None) is not None:
 
569
            if (e.args == ('No such file or directory',) or
 
570
                e.args == ('No such file',)):
 
571
                raise NoSuchFile(path, str(e) + more_info)
 
572
            if (e.args == ('mkdir failed',)):
 
573
                raise FileExists(path, str(e) + more_info)
 
574
            # strange but true, for the paramiko server.
 
575
            if (e.args == ('Failure',)):
 
576
                raise failure_exc(path, str(e) + more_info)
 
577
            mutter('Raising exception with args %s', e.args)
 
578
        if getattr(e, 'errno', None) is not None:
 
579
            mutter('Raising exception with errno %s', e.errno)
 
580
        raise e
 
581
 
 
582
    def append_file(self, relpath, f, mode=None):
 
583
        """
 
584
        Append the text in the file-like object into the final
 
585
        location.
 
586
        """
 
587
        try:
 
588
            path = self._remote_path(relpath)
 
589
            fout = self._sftp.file(path, 'ab')
 
590
            if mode is not None:
 
591
                self._sftp.chmod(path, mode)
 
592
            result = fout.tell()
 
593
            self._pump(f, fout)
 
594
            return result
 
595
        except (IOError, paramiko.SSHException), e:
 
596
            self._translate_io_exception(e, relpath, ': unable to append')
 
597
 
 
598
    def rename(self, rel_from, rel_to):
 
599
        """Rename without special overwriting"""
 
600
        try:
 
601
            self._sftp.rename(self._remote_path(rel_from),
 
602
                              self._remote_path(rel_to))
 
603
        except (IOError, paramiko.SSHException), e:
 
604
            self._translate_io_exception(e, rel_from,
 
605
                    ': unable to rename to %r' % (rel_to))
 
606
 
 
607
    def _rename_and_overwrite(self, abs_from, abs_to):
 
608
        """Do a fancy rename on the remote server.
 
609
        
 
610
        Using the implementation provided by osutils.
 
611
        """
 
612
        try:
 
613
            fancy_rename(abs_from, abs_to,
 
614
                    rename_func=self._sftp.rename,
 
615
                    unlink_func=self._sftp.remove)
 
616
        except (IOError, paramiko.SSHException), e:
 
617
            self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
 
618
 
 
619
    def move(self, rel_from, rel_to):
 
620
        """Move the item at rel_from to the location at rel_to"""
 
621
        path_from = self._remote_path(rel_from)
 
622
        path_to = self._remote_path(rel_to)
 
623
        self._rename_and_overwrite(path_from, path_to)
 
624
 
 
625
    def delete(self, relpath):
 
626
        """Delete the item at relpath"""
 
627
        path = self._remote_path(relpath)
 
628
        try:
 
629
            self._sftp.remove(path)
 
630
        except (IOError, paramiko.SSHException), e:
 
631
            self._translate_io_exception(e, path, ': unable to delete')
 
632
            
 
633
    def listable(self):
 
634
        """Return True if this store supports listing."""
 
635
        return True
 
636
 
 
637
    def list_dir(self, relpath):
 
638
        """
 
639
        Return a list of all files at the given location.
 
640
        """
 
641
        # does anything actually use this?
 
642
        # -- Unknown
 
643
        # This is at least used by copy_tree for remote upgrades.
 
644
        # -- David Allouche 2006-08-11
 
645
        path = self._remote_path(relpath)
 
646
        try:
 
647
            entries = self._sftp.listdir(path)
 
648
        except (IOError, paramiko.SSHException), e:
 
649
            self._translate_io_exception(e, path, ': failed to list_dir')
 
650
        return [urlutils.escape(entry) for entry in entries]
 
651
 
 
652
    def rmdir(self, relpath):
 
653
        """See Transport.rmdir."""
 
654
        path = self._remote_path(relpath)
 
655
        try:
 
656
            return self._sftp.rmdir(path)
 
657
        except (IOError, paramiko.SSHException), e:
 
658
            self._translate_io_exception(e, path, ': failed to rmdir')
 
659
 
 
660
    def stat(self, relpath):
 
661
        """Return the stat information for a file."""
 
662
        path = self._remote_path(relpath)
 
663
        try:
 
664
            return self._sftp.stat(path)
 
665
        except (IOError, paramiko.SSHException), e:
 
666
            self._translate_io_exception(e, path, ': unable to stat')
 
667
 
 
668
    def lock_read(self, relpath):
 
669
        """
 
670
        Lock the given file for shared (read) access.
 
671
        :return: A lock object, which has an unlock() member function
 
672
        """
 
673
        # FIXME: there should be something clever i can do here...
 
674
        class BogusLock(object):
 
675
            def __init__(self, path):
 
676
                self.path = path
 
677
            def unlock(self):
 
678
                pass
 
679
        return BogusLock(relpath)
 
680
 
 
681
    def lock_write(self, relpath):
 
682
        """
 
683
        Lock the given file for exclusive (write) access.
 
684
        WARNING: many transports do not support this, so trying avoid using it
 
685
 
 
686
        :return: A lock object, which has an unlock() member function
 
687
        """
 
688
        # This is a little bit bogus, but basically, we create a file
 
689
        # which should not already exist, and if it does, we assume
 
690
        # that there is a lock, and if it doesn't, the we assume
 
691
        # that we have taken the lock.
 
692
        return SFTPLock(relpath, self)
 
693
 
 
694
    def _unparse_url(self, path=None):
 
695
        """Gives a url for a relative reference."""
 
696
        if path is None:
 
697
            path = self._path
 
698
        path = urllib.quote(path)
 
699
        # handle homedir paths
 
700
        if not path.startswith('/'):
 
701
            path = "/~/" + path
 
702
        netloc = urllib.quote(self._host)
 
703
        if self._username is not None:
 
704
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
705
        if self._port is not None:
 
706
            netloc = '%s:%d' % (netloc, self._port)
 
707
        return urlparse.urlunparse(('sftp', netloc, path, '', '', ''))
 
708
 
 
709
    def _split_url(self, url):
 
710
        (scheme, username, password, host, port, path) = split_url(url)
 
711
        assert scheme == 'sftp'
 
712
 
 
713
        # the initial slash should be removed from the path, and treated
 
714
        # as a homedir relative path (the path begins with a double slash
 
715
        # if it is absolute).
 
716
        # see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
717
        # RBC 20060118 we are not using this as its too user hostile. instead
 
718
        # we are following lftp and using /~/foo to mean '~/foo'.
 
719
        # handle homedir paths
 
720
        if path.startswith('/~/'):
 
721
            path = path[3:]
 
722
        elif path == '/~':
 
723
            path = ''
 
724
        return (username, password, host, port, path)
 
725
 
 
726
    def _parse_url(self, url):
 
727
        (self._username, self._password,
 
728
         self._host, self._port, self._path) = self._split_url(url)
 
729
 
 
730
    def _sftp_connect(self):
 
731
        """Connect to the remote sftp server.
 
732
        After this, self._sftp should have a valid connection (or
 
733
        we raise an TransportError 'could not connect').
 
734
 
 
735
        TODO: Raise a more reasonable ConnectionFailed exception
 
736
        """
 
737
        self._sftp = _sftp_connect(self._host, self._port, self._username,
 
738
                self._password)
 
739
 
 
740
    def _sftp_open_exclusive(self, abspath, mode=None):
 
741
        """Open a remote path exclusively.
 
742
 
 
743
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
 
744
        the file already exists. However it does not expose this
 
745
        at the higher level of SFTPClient.open(), so we have to
 
746
        sneak away with it.
 
747
 
 
748
        WARNING: This breaks the SFTPClient abstraction, so it
 
749
        could easily break against an updated version of paramiko.
 
750
 
 
751
        :param abspath: The remote absolute path where the file should be opened
 
752
        :param mode: The mode permissions bits for the new file
 
753
        """
 
754
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
 
755
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
 
756
        #       However, there is no way to set the permission mode at open 
 
757
        #       time using the sftp_client.file() functionality.
 
758
        path = self._sftp._adjust_cwd(abspath)
 
759
        # mutter('sftp abspath %s => %s', abspath, path)
 
760
        attr = SFTPAttributes()
 
761
        if mode is not None:
 
762
            attr.st_mode = mode
 
763
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
764
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
 
765
        try:
 
766
            t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
 
767
            if t != CMD_HANDLE:
 
768
                raise TransportError('Expected an SFTP handle')
 
769
            handle = msg.get_string()
 
770
            return SFTPFile(self._sftp, handle, 'wb', -1)
 
771
        except (paramiko.SSHException, IOError), e:
 
772
            self._translate_io_exception(e, abspath, ': unable to open',
 
773
                failure_exc=FileExists)
 
774
 
 
775
    def _can_roundtrip_unix_modebits(self):
 
776
        if sys.platform == 'win32':
 
777
            # anyone else?
 
778
            return False
 
779
        else:
 
780
            return True
 
781
 
 
782
# ------------- server test implementation --------------
 
783
import threading
 
784
 
 
785
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
 
786
 
 
787
STUB_SERVER_KEY = """
 
788
-----BEGIN RSA PRIVATE KEY-----
 
789
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
790
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
791
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
792
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
793
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
794
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
795
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
796
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
797
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
798
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
799
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
800
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
801
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
802
-----END RSA PRIVATE KEY-----
 
803
"""
 
804
 
 
805
 
 
806
class SocketListener(threading.Thread):
 
807
 
 
808
    def __init__(self, callback):
 
809
        threading.Thread.__init__(self)
 
810
        self._callback = callback
 
811
        self._socket = socket.socket()
 
812
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
813
        self._socket.bind(('localhost', 0))
 
814
        self._socket.listen(1)
 
815
        self.port = self._socket.getsockname()[1]
 
816
        self._stop_event = threading.Event()
 
817
 
 
818
    def stop(self):
 
819
        # called from outside this thread
 
820
        self._stop_event.set()
 
821
        # use a timeout here, because if the test fails, the server thread may
 
822
        # never notice the stop_event.
 
823
        self.join(5.0)
 
824
        self._socket.close()
 
825
 
 
826
    def run(self):
 
827
        while True:
 
828
            readable, writable_unused, exception_unused = \
 
829
                select.select([self._socket], [], [], 0.1)
 
830
            if self._stop_event.isSet():
 
831
                return
 
832
            if len(readable) == 0:
 
833
                continue
 
834
            try:
 
835
                s, addr_unused = self._socket.accept()
 
836
                # because the loopback socket is inline, and transports are
 
837
                # never explicitly closed, best to launch a new thread.
 
838
                threading.Thread(target=self._callback, args=(s,)).start()
 
839
            except socket.error, x:
 
840
                sys.excepthook(*sys.exc_info())
 
841
                warning('Socket error during accept() within unit test server'
 
842
                        ' thread: %r' % x)
 
843
            except Exception, x:
 
844
                # probably a failed test; unit test thread will log the
 
845
                # failure/error
 
846
                sys.excepthook(*sys.exc_info())
 
847
                warning('Exception from within unit test server thread: %r' % 
 
848
                        x)
 
849
 
 
850
 
 
851
class SocketDelay(object):
 
852
    """A socket decorator to make TCP appear slower.
 
853
 
 
854
    This changes recv, send, and sendall to add a fixed latency to each python
 
855
    call if a new roundtrip is detected. That is, when a recv is called and the
 
856
    flag new_roundtrip is set, latency is charged. Every send and send_all
 
857
    sets this flag.
 
858
 
 
859
    In addition every send, sendall and recv sleeps a bit per character send to
 
860
    simulate bandwidth.
 
861
 
 
862
    Not all methods are implemented, this is deliberate as this class is not a
 
863
    replacement for the builtin sockets layer. fileno is not implemented to
 
864
    prevent the proxy being bypassed. 
 
865
    """
 
866
 
 
867
    simulated_time = 0
 
868
    _proxied_arguments = dict.fromkeys([
 
869
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
 
870
        "setblocking", "setsockopt", "settimeout", "shutdown"])
 
871
 
 
872
    def __init__(self, sock, latency, bandwidth=1.0, 
 
873
                 really_sleep=True):
 
874
        """ 
 
875
        :param bandwith: simulated bandwith (MegaBit)
 
876
        :param really_sleep: If set to false, the SocketDelay will just
 
877
        increase a counter, instead of calling time.sleep. This is useful for
 
878
        unittesting the SocketDelay.
 
879
        """
 
880
        self.sock = sock
 
881
        self.latency = latency
 
882
        self.really_sleep = really_sleep
 
883
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
 
884
        self.new_roundtrip = False
 
885
 
 
886
    def sleep(self, s):
 
887
        if self.really_sleep:
 
888
            time.sleep(s)
 
889
        else:
 
890
            SocketDelay.simulated_time += s
 
891
 
 
892
    def __getattr__(self, attr):
 
893
        if attr in SocketDelay._proxied_arguments:
 
894
            return getattr(self.sock, attr)
 
895
        raise AttributeError("'SocketDelay' object has no attribute %r" %
 
896
                             attr)
 
897
 
 
898
    def dup(self):
 
899
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
 
900
                           self._sleep)
 
901
 
 
902
    def recv(self, *args):
 
903
        data = self.sock.recv(*args)
 
904
        if data and self.new_roundtrip:
 
905
            self.new_roundtrip = False
 
906
            self.sleep(self.latency)
 
907
        self.sleep(len(data) * self.time_per_byte)
 
908
        return data
 
909
 
 
910
    def sendall(self, data, flags=0):
 
911
        if not self.new_roundtrip:
 
912
            self.new_roundtrip = True
 
913
            self.sleep(self.latency)
 
914
        self.sleep(len(data) * self.time_per_byte)
 
915
        return self.sock.sendall(data, flags)
 
916
 
 
917
    def send(self, data, flags=0):
 
918
        if not self.new_roundtrip:
 
919
            self.new_roundtrip = True
 
920
            self.sleep(self.latency)
 
921
        bytes_sent = self.sock.send(data, flags)
 
922
        self.sleep(bytes_sent * self.time_per_byte)
 
923
        return bytes_sent
 
924
 
 
925
 
 
926
class SFTPServer(Server):
 
927
    """Common code for SFTP server facilities."""
 
928
 
 
929
    def __init__(self):
 
930
        self._original_vendor = None
 
931
        self._homedir = None
 
932
        self._server_homedir = None
 
933
        self._listener = None
 
934
        self._root = None
 
935
        self._vendor = ssh.ParamikoVendor()
 
936
        # sftp server logs
 
937
        self.logs = []
 
938
        self.add_latency = 0
 
939
 
 
940
    def _get_sftp_url(self, path):
 
941
        """Calculate an sftp url to this server for path."""
 
942
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
 
943
 
 
944
    def log(self, message):
 
945
        """StubServer uses this to log when a new server is created."""
 
946
        self.logs.append(message)
 
947
 
 
948
    def _run_server_entry(self, sock):
 
949
        """Entry point for all implementations of _run_server.
 
950
        
 
951
        If self.add_latency is > 0.000001 then sock is given a latency adding
 
952
        decorator.
 
953
        """
 
954
        if self.add_latency > 0.000001:
 
955
            sock = SocketDelay(sock, self.add_latency)
 
956
        return self._run_server(sock)
 
957
 
 
958
    def _run_server(self, s):
 
959
        ssh_server = paramiko.Transport(s)
 
960
        key_file = pathjoin(self._homedir, 'test_rsa.key')
 
961
        f = open(key_file, 'w')
 
962
        f.write(STUB_SERVER_KEY)
 
963
        f.close()
 
964
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
965
        ssh_server.add_server_key(host_key)
 
966
        server = StubServer(self)
 
967
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
968
                                         StubSFTPServer, root=self._root,
 
969
                                         home=self._server_homedir)
 
970
        event = threading.Event()
 
971
        ssh_server.start_server(event, server)
 
972
        event.wait(5.0)
 
973
    
 
974
    def setUp(self):
 
975
        self._original_vendor = ssh._ssh_vendor
 
976
        ssh._ssh_vendor = self._vendor
 
977
        if sys.platform == 'win32':
 
978
            # Win32 needs to use the UNICODE api
 
979
            self._homedir = getcwd()
 
980
        else:
 
981
            # But Linux SFTP servers should just deal in bytestreams
 
982
            self._homedir = os.getcwd()
 
983
        if self._server_homedir is None:
 
984
            self._server_homedir = self._homedir
 
985
        self._root = '/'
 
986
        if sys.platform == 'win32':
 
987
            self._root = ''
 
988
        self._listener = SocketListener(self._run_server_entry)
 
989
        self._listener.setDaemon(True)
 
990
        self._listener.start()
 
991
 
 
992
    def tearDown(self):
 
993
        """See bzrlib.transport.Server.tearDown."""
 
994
        self._listener.stop()
 
995
        ssh._ssh_vendor = self._original_vendor
 
996
 
 
997
    def get_bogus_url(self):
 
998
        """See bzrlib.transport.Server.get_bogus_url."""
 
999
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
1000
        # we bind a random socket, so that we get a guaranteed unused port
 
1001
        # we just never listen on that port
 
1002
        s = socket.socket()
 
1003
        s.bind(('localhost', 0))
 
1004
        return 'sftp://%s:%s/' % s.getsockname()
 
1005
 
 
1006
 
 
1007
class SFTPFullAbsoluteServer(SFTPServer):
 
1008
    """A test server for sftp transports, using absolute urls and ssh."""
 
1009
 
 
1010
    def get_url(self):
 
1011
        """See bzrlib.transport.Server.get_url."""
 
1012
        return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
 
1013
 
 
1014
 
 
1015
class SFTPServerWithoutSSH(SFTPServer):
 
1016
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
 
1017
 
 
1018
    def __init__(self):
 
1019
        super(SFTPServerWithoutSSH, self).__init__()
 
1020
        self._vendor = ssh.LoopbackVendor()
 
1021
 
 
1022
    def _run_server(self, sock):
 
1023
        # Re-import these as locals, so that they're still accessible during
 
1024
        # interpreter shutdown (when all module globals get set to None, leading
 
1025
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
1026
        import socket, errno
 
1027
        class FakeChannel(object):
 
1028
            def get_transport(self):
 
1029
                return self
 
1030
            def get_log_channel(self):
 
1031
                return 'paramiko'
 
1032
            def get_name(self):
 
1033
                return '1'
 
1034
            def get_hexdump(self):
 
1035
                return False
 
1036
            def close(self):
 
1037
                pass
 
1038
 
 
1039
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
 
1040
                                     root=self._root, home=self._server_homedir)
 
1041
        try:
 
1042
            server.start_subsystem('sftp', None, sock)
 
1043
        except socket.error, e:
 
1044
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
1045
                # it's okay for the client to disconnect abruptly
 
1046
                # (bug in paramiko 1.6: it should absorb this exception)
 
1047
                pass
 
1048
            else:
 
1049
                raise
 
1050
        except Exception, e:
 
1051
            import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
 
1052
        server.finish_subsystem()
 
1053
 
 
1054
 
 
1055
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
 
1056
    """A test server for sftp transports, using absolute urls."""
 
1057
 
 
1058
    def get_url(self):
 
1059
        """See bzrlib.transport.Server.get_url."""
 
1060
        if sys.platform == 'win32':
 
1061
            return self._get_sftp_url(urlutils.escape(self._homedir))
 
1062
        else:
 
1063
            return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
 
1064
 
 
1065
 
 
1066
class SFTPHomeDirServer(SFTPServerWithoutSSH):
 
1067
    """A test server for sftp transports, using homedir relative urls."""
 
1068
 
 
1069
    def get_url(self):
 
1070
        """See bzrlib.transport.Server.get_url."""
 
1071
        return self._get_sftp_url("~/")
 
1072
 
 
1073
 
 
1074
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
 
1075
    """A test servere for sftp transports, using absolute urls to non-home."""
 
1076
 
 
1077
    def setUp(self):
 
1078
        self._server_homedir = '/dev/noone/runs/tests/here'
 
1079
        super(SFTPSiblingAbsoluteServer, self).setUp()
 
1080
 
 
1081
 
 
1082
def _sftp_connect(host, port, username, password):
 
1083
    """Connect to the remote sftp server.
 
1084
 
 
1085
    :raises: a TransportError 'could not connect'.
 
1086
 
 
1087
    :returns: an paramiko.sftp_client.SFTPClient
 
1088
 
 
1089
    TODO: Raise a more reasonable ConnectionFailed exception
 
1090
    """
 
1091
    idx = (host, port, username)
 
1092
    try:
 
1093
        return _connected_hosts[idx]
 
1094
    except KeyError:
 
1095
        pass
 
1096
    
 
1097
    sftp = _sftp_connect_uncached(host, port, username, password)
 
1098
    _connected_hosts[idx] = sftp
 
1099
    return sftp
 
1100
 
 
1101
def _sftp_connect_uncached(host, port, username, password):
 
1102
    vendor = ssh._get_ssh_vendor()
 
1103
    sftp = vendor.connect_sftp(username, password, host, port)
 
1104
    return sftp
 
1105
 
 
1106
 
 
1107
def get_test_permutations():
 
1108
    """Return the permutations to be used in testing."""
 
1109
    return [(SFTPTransport, SFTPAbsoluteServer),
 
1110
            (SFTPTransport, SFTPHomeDirServer),
 
1111
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
1112
            ]