/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

Fix the FTP transport's handling of abspath('/')

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006 Canonical Ltd
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
 
18
"""Implementation of Transport over SFTP, using paramiko."""
 
19
 
 
20
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
 
21
# then raise TransportNotPossible, which will break remote access to any
 
22
# formats which rely on OS-level locks.  That should be fine as those formats
 
23
# are pretty old, but these combinations may have to be removed from the test
 
24
# suite.  Those formats all date back to 0.7; so we should be able to remove
 
25
# these methods when we officially drop support for those formats.
 
26
 
 
27
import errno
 
28
import os
 
29
import random
 
30
import select
 
31
import socket
 
32
import stat
 
33
import subprocess
 
34
import sys
 
35
import time
 
36
import urllib
 
37
import urlparse
 
38
import weakref
 
39
 
 
40
from bzrlib.errors import (FileExists, 
 
41
                           NoSuchFile, PathNotChild,
 
42
                           TransportError,
 
43
                           LockError, 
 
44
                           PathError,
 
45
                           ParamikoNotPresent,
 
46
                           UnknownSSH,
 
47
                           )
 
48
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
49
from bzrlib.trace import mutter, warning
 
50
from bzrlib.transport import (
 
51
    register_urlparse_netloc_protocol,
 
52
    Server,
 
53
    split_url,
 
54
    ssh,
 
55
    Transport,
 
56
    )
 
57
import bzrlib.urlutils as urlutils
 
58
 
 
59
try:
 
60
    import paramiko
 
61
except ImportError, e:
 
62
    raise ParamikoNotPresent(e)
 
63
else:
 
64
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
 
65
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
 
66
                               CMD_HANDLE, CMD_OPEN)
 
67
    from paramiko.sftp_attr import SFTPAttributes
 
68
    from paramiko.sftp_file import SFTPFile
 
69
 
 
70
 
 
71
register_urlparse_netloc_protocol('sftp')
 
72
 
 
73
 
 
74
# This is a weakref dictionary, so that we can reuse connections
 
75
# that are still active. Long term, it might be nice to have some
 
76
# sort of expiration policy, such as disconnect if inactive for
 
77
# X seconds. But that requires a lot more fanciness.
 
78
_connected_hosts = weakref.WeakValueDictionary()
 
79
 
 
80
 
 
81
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
 
82
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
 
83
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
 
84
 
 
85
 
 
86
def clear_connection_cache():
 
87
    """Remove all hosts from the SFTP connection cache.
 
88
 
 
89
    Primarily useful for test cases wanting to force garbage collection.
 
90
    """
 
91
    _connected_hosts.clear()
 
92
 
 
93
 
 
94
class SFTPLock(object):
 
95
    """This fakes a lock in a remote location.
 
96
    
 
97
    A present lock is indicated just by the existence of a file.  This
 
98
    doesn't work well on all transports and they are only used in 
 
99
    deprecated storage formats.
 
100
    """
 
101
    
 
102
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
 
103
 
 
104
    def __init__(self, path, transport):
 
105
        assert isinstance(transport, SFTPTransport)
 
106
 
 
107
        self.lock_file = None
 
108
        self.path = path
 
109
        self.lock_path = path + '.write-lock'
 
110
        self.transport = transport
 
111
        try:
 
112
            # RBC 20060103 FIXME should we be using private methods here ?
 
113
            abspath = transport._remote_path(self.lock_path)
 
114
            self.lock_file = transport._sftp_open_exclusive(abspath)
 
115
        except FileExists:
 
116
            raise LockError('File %r already locked' % (self.path,))
 
117
 
 
118
    def __del__(self):
 
119
        """Should this warn, or actually try to cleanup?"""
 
120
        if self.lock_file:
 
121
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
 
122
            self.unlock()
 
123
 
 
124
    def unlock(self):
 
125
        if not self.lock_file:
 
126
            return
 
127
        self.lock_file.close()
 
128
        self.lock_file = None
 
129
        try:
 
130
            self.transport.delete(self.lock_path)
 
131
        except (NoSuchFile,):
 
132
            # What specific errors should we catch here?
 
133
            pass
 
134
 
 
135
 
 
136
class SFTPTransport(Transport):
 
137
    """Transport implementation for SFTP access."""
 
138
 
 
139
    _do_prefetch = _default_do_prefetch
 
140
    # TODO: jam 20060717 Conceivably these could be configurable, either
 
141
    #       by auto-tuning at run-time, or by a configuration (per host??)
 
142
    #       but the performance curve is pretty flat, so just going with
 
143
    #       reasonable defaults.
 
144
    _max_readv_combine = 200
 
145
    # Having to round trip to the server means waiting for a response,
 
146
    # so it is better to download extra bytes.
 
147
    # 8KiB had good performance for both local and remote network operations
 
148
    _bytes_to_read_before_seek = 8192
 
149
 
 
150
    # The sftp spec says that implementations SHOULD allow reads
 
151
    # to be at least 32K. paramiko.readv() does an async request
 
152
    # for the chunks. So we need to keep it within a single request
 
153
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
 
154
    # up the request itself, rather than us having to worry about it
 
155
    _max_request_size = 32768
 
156
 
 
157
    def __init__(self, base, clone_from=None):
 
158
        assert base.startswith('sftp://')
 
159
        self._parse_url(base)
 
160
        base = self._unparse_url()
 
161
        if base[-1] != '/':
 
162
            base += '/'
 
163
        super(SFTPTransport, self).__init__(base)
 
164
        if clone_from is None:
 
165
            self._sftp_connect()
 
166
        else:
 
167
            # use the same ssh connection, etc
 
168
            self._sftp = clone_from._sftp
 
169
        # super saves 'self.base'
 
170
    
 
171
    def should_cache(self):
 
172
        """
 
173
        Return True if the data pulled across should be cached locally.
 
174
        """
 
175
        return True
 
176
 
 
177
    def clone(self, offset=None):
 
178
        """
 
179
        Return a new SFTPTransport with root at self.base + offset.
 
180
        We share the same SFTP session between such transports, because it's
 
181
        fairly expensive to set them up.
 
182
        """
 
183
        if offset is None:
 
184
            return SFTPTransport(self.base, self)
 
185
        else:
 
186
            return SFTPTransport(self.abspath(offset), self)
 
187
 
 
188
    def abspath(self, relpath):
 
189
        """
 
190
        Return the full url to the given relative path.
 
191
        
 
192
        @param relpath: the relative path or path components
 
193
        @type relpath: str or list
 
194
        """
 
195
        return self._unparse_url(self._remote_path(relpath))
 
196
    
 
197
    def _remote_path(self, relpath):
 
198
        """Return the path to be passed along the sftp protocol for relpath.
 
199
        
 
200
        relpath is a urlencoded string.
 
201
 
 
202
        :return: a path prefixed with / for regular abspath-based urls, or a
 
203
            path that does not begin with / for urls which begin with /~/.
 
204
        """
 
205
        # FIXME: share the common code across transports
 
206
        assert isinstance(relpath, basestring)
 
207
        basepath = self._path.split('/')
 
208
        if relpath.startswith('/'):
 
209
            basepath = ['', '']
 
210
        relpath = urlutils.unescape(relpath).split('/')
 
211
        if len(basepath) > 0 and basepath[-1] == '':
 
212
            basepath = basepath[:-1]
 
213
 
 
214
        for p in relpath:
 
215
            if p == '..':
 
216
                if len(basepath) == 0:
 
217
                    # In most filesystems, a request for the parent
 
218
                    # of root, just returns root.
 
219
                    continue
 
220
                basepath.pop()
 
221
            elif p == '.':
 
222
                continue # No-op
 
223
            elif p != '':
 
224
                basepath.append(p)
 
225
 
 
226
        path = '/'.join(basepath)
 
227
        # mutter('relpath => remotepath %s => %s', relpath, path)
 
228
        return path
 
229
 
 
230
    def relpath(self, abspath):
 
231
        username, password, host, port, path = self._split_url(abspath)
 
232
        error = []
 
233
        if (username != self._username):
 
234
            error.append('username mismatch')
 
235
        if (host != self._host):
 
236
            error.append('host mismatch')
 
237
        if (port != self._port):
 
238
            error.append('port mismatch')
 
239
        if (not path.startswith(self._path)):
 
240
            error.append('path mismatch')
 
241
        if error:
 
242
            extra = ': ' + ', '.join(error)
 
243
            raise PathNotChild(abspath, self.base, extra=extra)
 
244
        pl = len(self._path)
 
245
        return path[pl:].strip('/')
 
246
 
 
247
    def has(self, relpath):
 
248
        """
 
249
        Does the target location exist?
 
250
        """
 
251
        try:
 
252
            self._sftp.stat(self._remote_path(relpath))
 
253
            return True
 
254
        except IOError:
 
255
            return False
 
256
 
 
257
    def get(self, relpath):
 
258
        """
 
259
        Get the file at the given relative path.
 
260
 
 
261
        :param relpath: The relative path to the file
 
262
        """
 
263
        try:
 
264
            path = self._remote_path(relpath)
 
265
            f = self._sftp.file(path, mode='rb')
 
266
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
 
267
                f.prefetch()
 
268
            return f
 
269
        except (IOError, paramiko.SSHException), e:
 
270
            self._translate_io_exception(e, path, ': error retrieving')
 
271
 
 
272
    def readv(self, relpath, offsets):
 
273
        """See Transport.readv()"""
 
274
        # We overload the default readv() because we want to use a file
 
275
        # that does not have prefetch enabled.
 
276
        # Also, if we have a new paramiko, it implements an async readv()
 
277
        if not offsets:
 
278
            return
 
279
 
 
280
        try:
 
281
            path = self._remote_path(relpath)
 
282
            fp = self._sftp.file(path, mode='rb')
 
283
            readv = getattr(fp, 'readv', None)
 
284
            if readv:
 
285
                return self._sftp_readv(fp, offsets)
 
286
            mutter('seek and read %s offsets', len(offsets))
 
287
            return self._seek_and_read(fp, offsets)
 
288
        except (IOError, paramiko.SSHException), e:
 
289
            self._translate_io_exception(e, path, ': error retrieving')
 
290
 
 
291
    def _sftp_readv(self, fp, offsets):
 
292
        """Use the readv() member of fp to do async readv.
 
293
 
 
294
        And then read them using paramiko.readv(). paramiko.readv()
 
295
        does not support ranges > 64K, so it caps the request size, and
 
296
        just reads until it gets all the stuff it wants
 
297
        """
 
298
        offsets = list(offsets)
 
299
        sorted_offsets = sorted(offsets)
 
300
 
 
301
        # The algorithm works as follows:
 
302
        # 1) Coalesce nearby reads into a single chunk
 
303
        #    This generates a list of combined regions, the total size
 
304
        #    and the size of the sub regions. This coalescing step is limited
 
305
        #    in the number of nearby chunks to combine, and is allowed to
 
306
        #    skip small breaks in the requests. Limiting it makes sure that
 
307
        #    we can start yielding some data earlier, and skipping means we
 
308
        #    make fewer requests. (Beneficial even when using async)
 
309
        # 2) Break up this combined regions into chunks that are smaller
 
310
        #    than 64KiB. Technically the limit is 65536, but we are a
 
311
        #    little bit conservative. This is because sftp has a maximum
 
312
        #    return chunk size of 64KiB (max size of an unsigned short)
 
313
        # 3) Issue a readv() to paramiko to create an async request for
 
314
        #    all of this data
 
315
        # 4) Read in the data as it comes back, until we've read one
 
316
        #    continuous section as determined in step 1
 
317
        # 5) Break up the full sections into hunks for the original requested
 
318
        #    offsets. And put them in a cache
 
319
        # 6) Check if the next request is in the cache, and if it is, remove
 
320
        #    it from the cache, and yield its data. Continue until no more
 
321
        #    entries are in the cache.
 
322
        # 7) loop back to step 4 until all data has been read
 
323
        #
 
324
        # TODO: jam 20060725 This could be optimized one step further, by
 
325
        #       attempting to yield whatever data we have read, even before
 
326
        #       the first coallesced section has been fully processed.
 
327
 
 
328
        # When coalescing for use with readv(), we don't really need to
 
329
        # use any fudge factor, because the requests are made asynchronously
 
330
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
331
                               limit=self._max_readv_combine,
 
332
                               fudge_factor=0,
 
333
                               ))
 
334
        requests = []
 
335
        for c_offset in coalesced:
 
336
            start = c_offset.start
 
337
            size = c_offset.length
 
338
 
 
339
            # We need to break this up into multiple requests
 
340
            while size > 0:
 
341
                next_size = min(size, self._max_request_size)
 
342
                requests.append((start, next_size))
 
343
                size -= next_size
 
344
                start += next_size
 
345
 
 
346
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
347
                len(offsets), len(coalesced), len(requests))
 
348
 
 
349
        # Queue the current read until we have read the full coalesced section
 
350
        cur_data = []
 
351
        cur_data_len = 0
 
352
        cur_coalesced_stack = iter(coalesced)
 
353
        cur_coalesced = cur_coalesced_stack.next()
 
354
 
 
355
        # Cache the results, but only until they have been fulfilled
 
356
        data_map = {}
 
357
        # turn the list of offsets into a stack
 
358
        offset_stack = iter(offsets)
 
359
        cur_offset_and_size = offset_stack.next()
 
360
 
 
361
        for data in fp.readv(requests):
 
362
            cur_data += data
 
363
            cur_data_len += len(data)
 
364
 
 
365
            if cur_data_len < cur_coalesced.length:
 
366
                continue
 
367
            assert cur_data_len == cur_coalesced.length, \
 
368
                "Somehow we read too much: %s != %s" % (cur_data_len,
 
369
                                                        cur_coalesced.length)
 
370
            all_data = ''.join(cur_data)
 
371
            cur_data = []
 
372
            cur_data_len = 0
 
373
 
 
374
            for suboffset, subsize in cur_coalesced.ranges:
 
375
                key = (cur_coalesced.start+suboffset, subsize)
 
376
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
377
 
 
378
            # Now that we've read some data, see if we can yield anything back
 
379
            while cur_offset_and_size in data_map:
 
380
                this_data = data_map.pop(cur_offset_and_size)
 
381
                yield cur_offset_and_size[0], this_data
 
382
                cur_offset_and_size = offset_stack.next()
 
383
 
 
384
            # Now that we've read all of the data for this coalesced section
 
385
            # on to the next
 
386
            cur_coalesced = cur_coalesced_stack.next()
 
387
 
 
388
    def put_file(self, relpath, f, mode=None):
 
389
        """
 
390
        Copy the file-like object into the location.
 
391
 
 
392
        :param relpath: Location to put the contents, relative to base.
 
393
        :param f:       File-like object.
 
394
        :param mode: The final mode for the file
 
395
        """
 
396
        final_path = self._remote_path(relpath)
 
397
        self._put(final_path, f, mode=mode)
 
398
 
 
399
    def _put(self, abspath, f, mode=None):
 
400
        """Helper function so both put() and copy_abspaths can reuse the code"""
 
401
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
 
402
                        os.getpid(), random.randint(0,0x7FFFFFFF))
 
403
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
 
404
        closed = False
 
405
        try:
 
406
            try:
 
407
                fout.set_pipelined(True)
 
408
                self._pump(f, fout)
 
409
            except (IOError, paramiko.SSHException), e:
 
410
                self._translate_io_exception(e, tmp_abspath)
 
411
            # XXX: This doesn't truly help like we would like it to.
 
412
            #      The problem is that openssh strips sticky bits. So while we
 
413
            #      can properly set group write permission, we lose the group
 
414
            #      sticky bit. So it is probably best to stop chmodding, and
 
415
            #      just tell users that they need to set the umask correctly.
 
416
            #      The attr.st_mode = mode, in _sftp_open_exclusive
 
417
            #      will handle when the user wants the final mode to be more 
 
418
            #      restrictive. And then we avoid a round trip. Unless 
 
419
            #      paramiko decides to expose an async chmod()
 
420
 
 
421
            # This is designed to chmod() right before we close.
 
422
            # Because we set_pipelined() earlier, theoretically we might 
 
423
            # avoid the round trip for fout.close()
 
424
            if mode is not None:
 
425
                self._sftp.chmod(tmp_abspath, mode)
 
426
            fout.close()
 
427
            closed = True
 
428
            self._rename_and_overwrite(tmp_abspath, abspath)
 
429
        except Exception, e:
 
430
            # If we fail, try to clean up the temporary file
 
431
            # before we throw the exception
 
432
            # but don't let another exception mess things up
 
433
            # Write out the traceback, because otherwise
 
434
            # the catch and throw destroys it
 
435
            import traceback
 
436
            mutter(traceback.format_exc())
 
437
            try:
 
438
                if not closed:
 
439
                    fout.close()
 
440
                self._sftp.remove(tmp_abspath)
 
441
            except:
 
442
                # raise the saved except
 
443
                raise e
 
444
            # raise the original with its traceback if we can.
 
445
            raise
 
446
 
 
447
    def _put_non_atomic_helper(self, relpath, writer, mode=None,
 
448
                               create_parent_dir=False,
 
449
                               dir_mode=None):
 
450
        abspath = self._remote_path(relpath)
 
451
 
 
452
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
453
        #       set the file mode at create time. If it does, use it.
 
454
        #       But for now, we just chmod later anyway.
 
455
 
 
456
        def _open_and_write_file():
 
457
            """Try to open the target file, raise error on failure"""
 
458
            fout = None
 
459
            try:
 
460
                try:
 
461
                    fout = self._sftp.file(abspath, mode='wb')
 
462
                    fout.set_pipelined(True)
 
463
                    writer(fout)
 
464
                except (paramiko.SSHException, IOError), e:
 
465
                    self._translate_io_exception(e, abspath,
 
466
                                                 ': unable to open')
 
467
 
 
468
                # This is designed to chmod() right before we close.
 
469
                # Because we set_pipelined() earlier, theoretically we might 
 
470
                # avoid the round trip for fout.close()
 
471
                if mode is not None:
 
472
                    self._sftp.chmod(abspath, mode)
 
473
            finally:
 
474
                if fout is not None:
 
475
                    fout.close()
 
476
 
 
477
        if not create_parent_dir:
 
478
            _open_and_write_file()
 
479
            return
 
480
 
 
481
        # Try error handling to create the parent directory if we need to
 
482
        try:
 
483
            _open_and_write_file()
 
484
        except NoSuchFile:
 
485
            # Try to create the parent directory, and then go back to
 
486
            # writing the file
 
487
            parent_dir = os.path.dirname(abspath)
 
488
            self._mkdir(parent_dir, dir_mode)
 
489
            _open_and_write_file()
 
490
 
 
491
    def put_file_non_atomic(self, relpath, f, mode=None,
 
492
                            create_parent_dir=False,
 
493
                            dir_mode=None):
 
494
        """Copy the file-like object into the target location.
 
495
 
 
496
        This function is not strictly safe to use. It is only meant to
 
497
        be used when you already know that the target does not exist.
 
498
        It is not safe, because it will open and truncate the remote
 
499
        file. So there may be a time when the file has invalid contents.
 
500
 
 
501
        :param relpath: The remote location to put the contents.
 
502
        :param f:       File-like object.
 
503
        :param mode:    Possible access permissions for new file.
 
504
                        None means do not set remote permissions.
 
505
        :param create_parent_dir: If we cannot create the target file because
 
506
                        the parent directory does not exist, go ahead and
 
507
                        create it, and then try again.
 
508
        """
 
509
        def writer(fout):
 
510
            self._pump(f, fout)
 
511
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
512
                                    create_parent_dir=create_parent_dir,
 
513
                                    dir_mode=dir_mode)
 
514
 
 
515
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
516
                             create_parent_dir=False,
 
517
                             dir_mode=None):
 
518
        def writer(fout):
 
519
            fout.write(bytes)
 
520
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
521
                                    create_parent_dir=create_parent_dir,
 
522
                                    dir_mode=dir_mode)
 
523
 
 
524
    def iter_files_recursive(self):
 
525
        """Walk the relative paths of all files in this transport."""
 
526
        queue = list(self.list_dir('.'))
 
527
        while queue:
 
528
            relpath = queue.pop(0)
 
529
            st = self.stat(relpath)
 
530
            if stat.S_ISDIR(st.st_mode):
 
531
                for i, basename in enumerate(self.list_dir(relpath)):
 
532
                    queue.insert(i, relpath+'/'+basename)
 
533
            else:
 
534
                yield relpath
 
535
 
 
536
    def _mkdir(self, abspath, mode=None):
 
537
        if mode is None:
 
538
            local_mode = 0777
 
539
        else:
 
540
            local_mode = mode
 
541
        try:
 
542
            self._sftp.mkdir(abspath, local_mode)
 
543
            if mode is not None:
 
544
                self._sftp.chmod(abspath, mode=mode)
 
545
        except (paramiko.SSHException, IOError), e:
 
546
            self._translate_io_exception(e, abspath, ': unable to mkdir',
 
547
                failure_exc=FileExists)
 
548
 
 
549
    def mkdir(self, relpath, mode=None):
 
550
        """Create a directory at the given path."""
 
551
        self._mkdir(self._remote_path(relpath), mode=mode)
 
552
 
 
553
    def _translate_io_exception(self, e, path, more_info='', 
 
554
                                failure_exc=PathError):
 
555
        """Translate a paramiko or IOError into a friendlier exception.
 
556
 
 
557
        :param e: The original exception
 
558
        :param path: The path in question when the error is raised
 
559
        :param more_info: Extra information that can be included,
 
560
                          such as what was going on
 
561
        :param failure_exc: Paramiko has the super fun ability to raise completely
 
562
                           opaque errors that just set "e.args = ('Failure',)" with
 
563
                           no more information.
 
564
                           If this parameter is set, it defines the exception 
 
565
                           to raise in these cases.
 
566
        """
 
567
        # paramiko seems to generate detailless errors.
 
568
        self._translate_error(e, path, raise_generic=False)
 
569
        if getattr(e, 'args', None) is not None:
 
570
            if (e.args == ('No such file or directory',) or
 
571
                e.args == ('No such file',)):
 
572
                raise NoSuchFile(path, str(e) + more_info)
 
573
            if (e.args == ('mkdir failed',)):
 
574
                raise FileExists(path, str(e) + more_info)
 
575
            # strange but true, for the paramiko server.
 
576
            if (e.args == ('Failure',)):
 
577
                raise failure_exc(path, str(e) + more_info)
 
578
            mutter('Raising exception with args %s', e.args)
 
579
        if getattr(e, 'errno', None) is not None:
 
580
            mutter('Raising exception with errno %s', e.errno)
 
581
        raise e
 
582
 
 
583
    def append_file(self, relpath, f, mode=None):
 
584
        """
 
585
        Append the text in the file-like object into the final
 
586
        location.
 
587
        """
 
588
        try:
 
589
            path = self._remote_path(relpath)
 
590
            fout = self._sftp.file(path, 'ab')
 
591
            if mode is not None:
 
592
                self._sftp.chmod(path, mode)
 
593
            result = fout.tell()
 
594
            self._pump(f, fout)
 
595
            return result
 
596
        except (IOError, paramiko.SSHException), e:
 
597
            self._translate_io_exception(e, relpath, ': unable to append')
 
598
 
 
599
    def rename(self, rel_from, rel_to):
 
600
        """Rename without special overwriting"""
 
601
        try:
 
602
            self._sftp.rename(self._remote_path(rel_from),
 
603
                              self._remote_path(rel_to))
 
604
        except (IOError, paramiko.SSHException), e:
 
605
            self._translate_io_exception(e, rel_from,
 
606
                    ': unable to rename to %r' % (rel_to))
 
607
 
 
608
    def _rename_and_overwrite(self, abs_from, abs_to):
 
609
        """Do a fancy rename on the remote server.
 
610
        
 
611
        Using the implementation provided by osutils.
 
612
        """
 
613
        try:
 
614
            fancy_rename(abs_from, abs_to,
 
615
                    rename_func=self._sftp.rename,
 
616
                    unlink_func=self._sftp.remove)
 
617
        except (IOError, paramiko.SSHException), e:
 
618
            self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
 
619
 
 
620
    def move(self, rel_from, rel_to):
 
621
        """Move the item at rel_from to the location at rel_to"""
 
622
        path_from = self._remote_path(rel_from)
 
623
        path_to = self._remote_path(rel_to)
 
624
        self._rename_and_overwrite(path_from, path_to)
 
625
 
 
626
    def delete(self, relpath):
 
627
        """Delete the item at relpath"""
 
628
        path = self._remote_path(relpath)
 
629
        try:
 
630
            self._sftp.remove(path)
 
631
        except (IOError, paramiko.SSHException), e:
 
632
            self._translate_io_exception(e, path, ': unable to delete')
 
633
            
 
634
    def listable(self):
 
635
        """Return True if this store supports listing."""
 
636
        return True
 
637
 
 
638
    def list_dir(self, relpath):
 
639
        """
 
640
        Return a list of all files at the given location.
 
641
        """
 
642
        # does anything actually use this?
 
643
        # -- Unknown
 
644
        # This is at least used by copy_tree for remote upgrades.
 
645
        # -- David Allouche 2006-08-11
 
646
        path = self._remote_path(relpath)
 
647
        try:
 
648
            entries = self._sftp.listdir(path)
 
649
        except (IOError, paramiko.SSHException), e:
 
650
            self._translate_io_exception(e, path, ': failed to list_dir')
 
651
        return [urlutils.escape(entry) for entry in entries]
 
652
 
 
653
    def rmdir(self, relpath):
 
654
        """See Transport.rmdir."""
 
655
        path = self._remote_path(relpath)
 
656
        try:
 
657
            return self._sftp.rmdir(path)
 
658
        except (IOError, paramiko.SSHException), e:
 
659
            self._translate_io_exception(e, path, ': failed to rmdir')
 
660
 
 
661
    def stat(self, relpath):
 
662
        """Return the stat information for a file."""
 
663
        path = self._remote_path(relpath)
 
664
        try:
 
665
            return self._sftp.stat(path)
 
666
        except (IOError, paramiko.SSHException), e:
 
667
            self._translate_io_exception(e, path, ': unable to stat')
 
668
 
 
669
    def lock_read(self, relpath):
 
670
        """
 
671
        Lock the given file for shared (read) access.
 
672
        :return: A lock object, which has an unlock() member function
 
673
        """
 
674
        # FIXME: there should be something clever i can do here...
 
675
        class BogusLock(object):
 
676
            def __init__(self, path):
 
677
                self.path = path
 
678
            def unlock(self):
 
679
                pass
 
680
        return BogusLock(relpath)
 
681
 
 
682
    def lock_write(self, relpath):
 
683
        """
 
684
        Lock the given file for exclusive (write) access.
 
685
        WARNING: many transports do not support this, so trying avoid using it
 
686
 
 
687
        :return: A lock object, which has an unlock() member function
 
688
        """
 
689
        # This is a little bit bogus, but basically, we create a file
 
690
        # which should not already exist, and if it does, we assume
 
691
        # that there is a lock, and if it doesn't, the we assume
 
692
        # that we have taken the lock.
 
693
        return SFTPLock(relpath, self)
 
694
 
 
695
    def _unparse_url(self, path=None):
 
696
        """Gives a url for a relative reference."""
 
697
        if path is None:
 
698
            path = self._path
 
699
        path = urllib.quote(path)
 
700
        # handle homedir paths
 
701
        if not path.startswith('/'):
 
702
            path = "/~/" + path
 
703
        netloc = urllib.quote(self._host)
 
704
        if self._username is not None:
 
705
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
706
        if self._port is not None:
 
707
            netloc = '%s:%d' % (netloc, self._port)
 
708
        return urlparse.urlunparse(('sftp', netloc, path, '', '', ''))
 
709
 
 
710
    def _split_url(self, url):
 
711
        (scheme, username, password, host, port, path) = split_url(url)
 
712
        assert scheme == 'sftp'
 
713
 
 
714
        # the initial slash should be removed from the path, and treated
 
715
        # as a homedir relative path (the path begins with a double slash
 
716
        # if it is absolute).
 
717
        # see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
718
        # RBC 20060118 we are not using this as its too user hostile. instead
 
719
        # we are following lftp and using /~/foo to mean '~/foo'.
 
720
        # handle homedir paths
 
721
        if path.startswith('/~/'):
 
722
            path = path[3:]
 
723
        elif path == '/~':
 
724
            path = ''
 
725
        return (username, password, host, port, path)
 
726
 
 
727
    def _parse_url(self, url):
 
728
        (self._username, self._password,
 
729
         self._host, self._port, self._path) = self._split_url(url)
 
730
 
 
731
    def _sftp_connect(self):
 
732
        """Connect to the remote sftp server.
 
733
        After this, self._sftp should have a valid connection (or
 
734
        we raise an TransportError 'could not connect').
 
735
 
 
736
        TODO: Raise a more reasonable ConnectionFailed exception
 
737
        """
 
738
        self._sftp = _sftp_connect(self._host, self._port, self._username,
 
739
                self._password)
 
740
 
 
741
    def _sftp_open_exclusive(self, abspath, mode=None):
 
742
        """Open a remote path exclusively.
 
743
 
 
744
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
 
745
        the file already exists. However it does not expose this
 
746
        at the higher level of SFTPClient.open(), so we have to
 
747
        sneak away with it.
 
748
 
 
749
        WARNING: This breaks the SFTPClient abstraction, so it
 
750
        could easily break against an updated version of paramiko.
 
751
 
 
752
        :param abspath: The remote absolute path where the file should be opened
 
753
        :param mode: The mode permissions bits for the new file
 
754
        """
 
755
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
 
756
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
 
757
        #       However, there is no way to set the permission mode at open 
 
758
        #       time using the sftp_client.file() functionality.
 
759
        path = self._sftp._adjust_cwd(abspath)
 
760
        # mutter('sftp abspath %s => %s', abspath, path)
 
761
        attr = SFTPAttributes()
 
762
        if mode is not None:
 
763
            attr.st_mode = mode
 
764
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
765
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
 
766
        try:
 
767
            t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
 
768
            if t != CMD_HANDLE:
 
769
                raise TransportError('Expected an SFTP handle')
 
770
            handle = msg.get_string()
 
771
            return SFTPFile(self._sftp, handle, 'wb', -1)
 
772
        except (paramiko.SSHException, IOError), e:
 
773
            self._translate_io_exception(e, abspath, ': unable to open',
 
774
                failure_exc=FileExists)
 
775
 
 
776
    def _can_roundtrip_unix_modebits(self):
 
777
        if sys.platform == 'win32':
 
778
            # anyone else?
 
779
            return False
 
780
        else:
 
781
            return True
 
782
 
 
783
# ------------- server test implementation --------------
 
784
import threading
 
785
 
 
786
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
 
787
 
 
788
STUB_SERVER_KEY = """
 
789
-----BEGIN RSA PRIVATE KEY-----
 
790
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
791
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
792
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
793
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
794
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
795
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
796
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
797
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
798
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
799
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
800
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
801
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
802
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
803
-----END RSA PRIVATE KEY-----
 
804
"""
 
805
 
 
806
 
 
807
class SocketListener(threading.Thread):
 
808
 
 
809
    def __init__(self, callback):
 
810
        threading.Thread.__init__(self)
 
811
        self._callback = callback
 
812
        self._socket = socket.socket()
 
813
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
814
        self._socket.bind(('localhost', 0))
 
815
        self._socket.listen(1)
 
816
        self.port = self._socket.getsockname()[1]
 
817
        self._stop_event = threading.Event()
 
818
 
 
819
    def stop(self):
 
820
        # called from outside this thread
 
821
        self._stop_event.set()
 
822
        # use a timeout here, because if the test fails, the server thread may
 
823
        # never notice the stop_event.
 
824
        self.join(5.0)
 
825
        self._socket.close()
 
826
 
 
827
    def run(self):
 
828
        while True:
 
829
            readable, writable_unused, exception_unused = \
 
830
                select.select([self._socket], [], [], 0.1)
 
831
            if self._stop_event.isSet():
 
832
                return
 
833
            if len(readable) == 0:
 
834
                continue
 
835
            try:
 
836
                s, addr_unused = self._socket.accept()
 
837
                # because the loopback socket is inline, and transports are
 
838
                # never explicitly closed, best to launch a new thread.
 
839
                threading.Thread(target=self._callback, args=(s,)).start()
 
840
            except socket.error, x:
 
841
                sys.excepthook(*sys.exc_info())
 
842
                warning('Socket error during accept() within unit test server'
 
843
                        ' thread: %r' % x)
 
844
            except Exception, x:
 
845
                # probably a failed test; unit test thread will log the
 
846
                # failure/error
 
847
                sys.excepthook(*sys.exc_info())
 
848
                warning('Exception from within unit test server thread: %r' % 
 
849
                        x)
 
850
 
 
851
 
 
852
class SocketDelay(object):
 
853
    """A socket decorator to make TCP appear slower.
 
854
 
 
855
    This changes recv, send, and sendall to add a fixed latency to each python
 
856
    call if a new roundtrip is detected. That is, when a recv is called and the
 
857
    flag new_roundtrip is set, latency is charged. Every send and send_all
 
858
    sets this flag.
 
859
 
 
860
    In addition every send, sendall and recv sleeps a bit per character send to
 
861
    simulate bandwidth.
 
862
 
 
863
    Not all methods are implemented, this is deliberate as this class is not a
 
864
    replacement for the builtin sockets layer. fileno is not implemented to
 
865
    prevent the proxy being bypassed. 
 
866
    """
 
867
 
 
868
    simulated_time = 0
 
869
    _proxied_arguments = dict.fromkeys([
 
870
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
 
871
        "setblocking", "setsockopt", "settimeout", "shutdown"])
 
872
 
 
873
    def __init__(self, sock, latency, bandwidth=1.0, 
 
874
                 really_sleep=True):
 
875
        """ 
 
876
        :param bandwith: simulated bandwith (MegaBit)
 
877
        :param really_sleep: If set to false, the SocketDelay will just
 
878
        increase a counter, instead of calling time.sleep. This is useful for
 
879
        unittesting the SocketDelay.
 
880
        """
 
881
        self.sock = sock
 
882
        self.latency = latency
 
883
        self.really_sleep = really_sleep
 
884
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
 
885
        self.new_roundtrip = False
 
886
 
 
887
    def sleep(self, s):
 
888
        if self.really_sleep:
 
889
            time.sleep(s)
 
890
        else:
 
891
            SocketDelay.simulated_time += s
 
892
 
 
893
    def __getattr__(self, attr):
 
894
        if attr in SocketDelay._proxied_arguments:
 
895
            return getattr(self.sock, attr)
 
896
        raise AttributeError("'SocketDelay' object has no attribute %r" %
 
897
                             attr)
 
898
 
 
899
    def dup(self):
 
900
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
 
901
                           self._sleep)
 
902
 
 
903
    def recv(self, *args):
 
904
        data = self.sock.recv(*args)
 
905
        if data and self.new_roundtrip:
 
906
            self.new_roundtrip = False
 
907
            self.sleep(self.latency)
 
908
        self.sleep(len(data) * self.time_per_byte)
 
909
        return data
 
910
 
 
911
    def sendall(self, data, flags=0):
 
912
        if not self.new_roundtrip:
 
913
            self.new_roundtrip = True
 
914
            self.sleep(self.latency)
 
915
        self.sleep(len(data) * self.time_per_byte)
 
916
        return self.sock.sendall(data, flags)
 
917
 
 
918
    def send(self, data, flags=0):
 
919
        if not self.new_roundtrip:
 
920
            self.new_roundtrip = True
 
921
            self.sleep(self.latency)
 
922
        bytes_sent = self.sock.send(data, flags)
 
923
        self.sleep(bytes_sent * self.time_per_byte)
 
924
        return bytes_sent
 
925
 
 
926
 
 
927
class SFTPServer(Server):
 
928
    """Common code for SFTP server facilities."""
 
929
 
 
930
    def __init__(self):
 
931
        self._original_vendor = None
 
932
        self._homedir = None
 
933
        self._server_homedir = None
 
934
        self._listener = None
 
935
        self._root = None
 
936
        self._vendor = ssh.ParamikoVendor()
 
937
        # sftp server logs
 
938
        self.logs = []
 
939
        self.add_latency = 0
 
940
 
 
941
    def _get_sftp_url(self, path):
 
942
        """Calculate an sftp url to this server for path."""
 
943
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
 
944
 
 
945
    def log(self, message):
 
946
        """StubServer uses this to log when a new server is created."""
 
947
        self.logs.append(message)
 
948
 
 
949
    def _run_server_entry(self, sock):
 
950
        """Entry point for all implementations of _run_server.
 
951
        
 
952
        If self.add_latency is > 0.000001 then sock is given a latency adding
 
953
        decorator.
 
954
        """
 
955
        if self.add_latency > 0.000001:
 
956
            sock = SocketDelay(sock, self.add_latency)
 
957
        return self._run_server(sock)
 
958
 
 
959
    def _run_server(self, s):
 
960
        ssh_server = paramiko.Transport(s)
 
961
        key_file = pathjoin(self._homedir, 'test_rsa.key')
 
962
        f = open(key_file, 'w')
 
963
        f.write(STUB_SERVER_KEY)
 
964
        f.close()
 
965
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
966
        ssh_server.add_server_key(host_key)
 
967
        server = StubServer(self)
 
968
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
969
                                         StubSFTPServer, root=self._root,
 
970
                                         home=self._server_homedir)
 
971
        event = threading.Event()
 
972
        ssh_server.start_server(event, server)
 
973
        event.wait(5.0)
 
974
    
 
975
    def setUp(self):
 
976
        self._original_vendor = ssh._ssh_vendor
 
977
        ssh._ssh_vendor = self._vendor
 
978
        if sys.platform == 'win32':
 
979
            # Win32 needs to use the UNICODE api
 
980
            self._homedir = getcwd()
 
981
        else:
 
982
            # But Linux SFTP servers should just deal in bytestreams
 
983
            self._homedir = os.getcwd()
 
984
        if self._server_homedir is None:
 
985
            self._server_homedir = self._homedir
 
986
        self._root = '/'
 
987
        if sys.platform == 'win32':
 
988
            self._root = ''
 
989
        self._listener = SocketListener(self._run_server_entry)
 
990
        self._listener.setDaemon(True)
 
991
        self._listener.start()
 
992
 
 
993
    def tearDown(self):
 
994
        """See bzrlib.transport.Server.tearDown."""
 
995
        self._listener.stop()
 
996
        ssh._ssh_vendor = self._original_vendor
 
997
 
 
998
    def get_bogus_url(self):
 
999
        """See bzrlib.transport.Server.get_bogus_url."""
 
1000
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
1001
        # we bind a random socket, so that we get a guaranteed unused port
 
1002
        # we just never listen on that port
 
1003
        s = socket.socket()
 
1004
        s.bind(('localhost', 0))
 
1005
        return 'sftp://%s:%s/' % s.getsockname()
 
1006
 
 
1007
 
 
1008
class SFTPFullAbsoluteServer(SFTPServer):
 
1009
    """A test server for sftp transports, using absolute urls and ssh."""
 
1010
 
 
1011
    def get_url(self):
 
1012
        """See bzrlib.transport.Server.get_url."""
 
1013
        return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
 
1014
 
 
1015
 
 
1016
class SFTPServerWithoutSSH(SFTPServer):
 
1017
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
 
1018
 
 
1019
    def __init__(self):
 
1020
        super(SFTPServerWithoutSSH, self).__init__()
 
1021
        self._vendor = ssh.LoopbackVendor()
 
1022
 
 
1023
    def _run_server(self, sock):
 
1024
        # Re-import these as locals, so that they're still accessible during
 
1025
        # interpreter shutdown (when all module globals get set to None, leading
 
1026
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
1027
        import socket, errno
 
1028
        class FakeChannel(object):
 
1029
            def get_transport(self):
 
1030
                return self
 
1031
            def get_log_channel(self):
 
1032
                return 'paramiko'
 
1033
            def get_name(self):
 
1034
                return '1'
 
1035
            def get_hexdump(self):
 
1036
                return False
 
1037
            def close(self):
 
1038
                pass
 
1039
 
 
1040
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
 
1041
                                     root=self._root, home=self._server_homedir)
 
1042
        try:
 
1043
            server.start_subsystem('sftp', None, sock)
 
1044
        except socket.error, e:
 
1045
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
1046
                # it's okay for the client to disconnect abruptly
 
1047
                # (bug in paramiko 1.6: it should absorb this exception)
 
1048
                pass
 
1049
            else:
 
1050
                raise
 
1051
        except Exception, e:
 
1052
            import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
 
1053
        server.finish_subsystem()
 
1054
 
 
1055
 
 
1056
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
 
1057
    """A test server for sftp transports, using absolute urls."""
 
1058
 
 
1059
    def get_url(self):
 
1060
        """See bzrlib.transport.Server.get_url."""
 
1061
        if sys.platform == 'win32':
 
1062
            return self._get_sftp_url(urlutils.escape(self._homedir))
 
1063
        else:
 
1064
            return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
 
1065
 
 
1066
 
 
1067
class SFTPHomeDirServer(SFTPServerWithoutSSH):
 
1068
    """A test server for sftp transports, using homedir relative urls."""
 
1069
 
 
1070
    def get_url(self):
 
1071
        """See bzrlib.transport.Server.get_url."""
 
1072
        return self._get_sftp_url("~/")
 
1073
 
 
1074
 
 
1075
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
 
1076
    """A test servere for sftp transports, using absolute urls to non-home."""
 
1077
 
 
1078
    def setUp(self):
 
1079
        self._server_homedir = '/dev/noone/runs/tests/here'
 
1080
        super(SFTPSiblingAbsoluteServer, self).setUp()
 
1081
 
 
1082
 
 
1083
def _sftp_connect(host, port, username, password):
 
1084
    """Connect to the remote sftp server.
 
1085
 
 
1086
    :raises: a TransportError 'could not connect'.
 
1087
 
 
1088
    :returns: an paramiko.sftp_client.SFTPClient
 
1089
 
 
1090
    TODO: Raise a more reasonable ConnectionFailed exception
 
1091
    """
 
1092
    idx = (host, port, username)
 
1093
    try:
 
1094
        return _connected_hosts[idx]
 
1095
    except KeyError:
 
1096
        pass
 
1097
    
 
1098
    sftp = _sftp_connect_uncached(host, port, username, password)
 
1099
    _connected_hosts[idx] = sftp
 
1100
    return sftp
 
1101
 
 
1102
def _sftp_connect_uncached(host, port, username, password):
 
1103
    vendor = ssh._get_ssh_vendor()
 
1104
    sftp = vendor.connect_sftp(username, password, host, port)
 
1105
    return sftp
 
1106
 
 
1107
 
 
1108
def get_test_permutations():
 
1109
    """Return the permutations to be used in testing."""
 
1110
    return [(SFTPTransport, SFTPAbsoluteServer),
 
1111
            (SFTPTransport, SFTPHomeDirServer),
 
1112
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
1113
            ]