/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

More tests for abspath and clone behaviour

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006 Canonical Ltd
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
 
18
"""Implementation of Transport over SFTP, using paramiko."""
 
19
 
 
20
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
 
21
# then raise TransportNotPossible, which will break remote access to any
 
22
# formats which rely on OS-level locks.  That should be fine as those formats
 
23
# are pretty old, but these combinations may have to be removed from the test
 
24
# suite.
 
25
 
 
26
import errno
 
27
import os
 
28
import random
 
29
import select
 
30
import socket
 
31
import stat
 
32
import subprocess
 
33
import sys
 
34
import time
 
35
import urllib
 
36
import urlparse
 
37
import weakref
 
38
 
 
39
from bzrlib.errors import (FileExists, 
 
40
                           NoSuchFile, PathNotChild,
 
41
                           TransportError,
 
42
                           LockError, 
 
43
                           PathError,
 
44
                           ParamikoNotPresent,
 
45
                           UnknownSSH,
 
46
                           )
 
47
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
48
from bzrlib.trace import mutter, warning
 
49
from bzrlib.transport import (
 
50
    register_urlparse_netloc_protocol,
 
51
    Server,
 
52
    split_url,
 
53
    ssh,
 
54
    Transport,
 
55
    )
 
56
import bzrlib.urlutils as urlutils
 
57
 
 
58
try:
 
59
    import paramiko
 
60
except ImportError, e:
 
61
    raise ParamikoNotPresent(e)
 
62
else:
 
63
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
 
64
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
 
65
                               CMD_HANDLE, CMD_OPEN)
 
66
    from paramiko.sftp_attr import SFTPAttributes
 
67
    from paramiko.sftp_file import SFTPFile
 
68
 
 
69
 
 
70
register_urlparse_netloc_protocol('sftp')
 
71
 
 
72
 
 
73
# This is a weakref dictionary, so that we can reuse connections
 
74
# that are still active. Long term, it might be nice to have some
 
75
# sort of expiration policy, such as disconnect if inactive for
 
76
# X seconds. But that requires a lot more fanciness.
 
77
_connected_hosts = weakref.WeakValueDictionary()
 
78
 
 
79
 
 
80
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
 
81
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
 
82
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
 
83
 
 
84
 
 
85
def clear_connection_cache():
 
86
    """Remove all hosts from the SFTP connection cache.
 
87
 
 
88
    Primarily useful for test cases wanting to force garbage collection.
 
89
    """
 
90
    _connected_hosts.clear()
 
91
 
 
92
 
 
93
class SFTPLock(object):
 
94
    """This fakes a lock in a remote location.
 
95
    
 
96
    A present lock is indicated just by the existence of a file.  This
 
97
    doesn't work well on all transports and they are only used in 
 
98
    deprecated storage formats.
 
99
    """
 
100
    
 
101
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
 
102
 
 
103
    def __init__(self, path, transport):
 
104
        assert isinstance(transport, SFTPTransport)
 
105
 
 
106
        self.lock_file = None
 
107
        self.path = path
 
108
        self.lock_path = path + '.write-lock'
 
109
        self.transport = transport
 
110
        try:
 
111
            # RBC 20060103 FIXME should we be using private methods here ?
 
112
            abspath = transport._remote_path(self.lock_path)
 
113
            self.lock_file = transport._sftp_open_exclusive(abspath)
 
114
        except FileExists:
 
115
            raise LockError('File %r already locked' % (self.path,))
 
116
 
 
117
    def __del__(self):
 
118
        """Should this warn, or actually try to cleanup?"""
 
119
        if self.lock_file:
 
120
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
 
121
            self.unlock()
 
122
 
 
123
    def unlock(self):
 
124
        if not self.lock_file:
 
125
            return
 
126
        self.lock_file.close()
 
127
        self.lock_file = None
 
128
        try:
 
129
            self.transport.delete(self.lock_path)
 
130
        except (NoSuchFile,):
 
131
            # What specific errors should we catch here?
 
132
            pass
 
133
 
 
134
 
 
135
class SFTPTransport(Transport):
 
136
    """Transport implementation for SFTP access."""
 
137
 
 
138
    _do_prefetch = _default_do_prefetch
 
139
    # TODO: jam 20060717 Conceivably these could be configurable, either
 
140
    #       by auto-tuning at run-time, or by a configuration (per host??)
 
141
    #       but the performance curve is pretty flat, so just going with
 
142
    #       reasonable defaults.
 
143
    _max_readv_combine = 200
 
144
    # Having to round trip to the server means waiting for a response,
 
145
    # so it is better to download extra bytes.
 
146
    # 8KiB had good performance for both local and remote network operations
 
147
    _bytes_to_read_before_seek = 8192
 
148
 
 
149
    # The sftp spec says that implementations SHOULD allow reads
 
150
    # to be at least 32K. paramiko.readv() does an async request
 
151
    # for the chunks. So we need to keep it within a single request
 
152
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
 
153
    # up the request itself, rather than us having to worry about it
 
154
    _max_request_size = 32768
 
155
 
 
156
    def __init__(self, base, clone_from=None):
 
157
        assert base.startswith('sftp://')
 
158
        self._parse_url(base)
 
159
        base = self._unparse_url()
 
160
        if base[-1] != '/':
 
161
            base += '/'
 
162
        super(SFTPTransport, self).__init__(base)
 
163
        if clone_from is None:
 
164
            self._sftp_connect()
 
165
        else:
 
166
            # use the same ssh connection, etc
 
167
            self._sftp = clone_from._sftp
 
168
        # super saves 'self.base'
 
169
    
 
170
    def should_cache(self):
 
171
        """
 
172
        Return True if the data pulled across should be cached locally.
 
173
        """
 
174
        return True
 
175
 
 
176
    def clone(self, offset=None):
 
177
        """
 
178
        Return a new SFTPTransport with root at self.base + offset.
 
179
        We share the same SFTP session between such transports, because it's
 
180
        fairly expensive to set them up.
 
181
        """
 
182
        if offset is None:
 
183
            return SFTPTransport(self.base, self)
 
184
        else:
 
185
            return SFTPTransport(self.abspath(offset), self)
 
186
 
 
187
    def abspath(self, relpath):
 
188
        """
 
189
        Return the full url to the given relative path.
 
190
        
 
191
        @param relpath: the relative path or path components
 
192
        @type relpath: str or list
 
193
        """
 
194
        return self._unparse_url(self._remote_path(relpath))
 
195
    
 
196
    def _remote_path(self, relpath):
 
197
        """Return the path to be passed along the sftp protocol for relpath.
 
198
        
 
199
        relpath is a urlencoded string.
 
200
        """
 
201
        # FIXME: share the common code across transports
 
202
        assert isinstance(relpath, basestring)
 
203
        relpath = urlutils.unescape(relpath).split('/')
 
204
        basepath = self._path.split('/')
 
205
        if len(basepath) > 0 and basepath[-1] == '':
 
206
            basepath = basepath[:-1]
 
207
 
 
208
        for p in relpath:
 
209
            if p == '..':
 
210
                if len(basepath) == 0:
 
211
                    # In most filesystems, a request for the parent
 
212
                    # of root, just returns root.
 
213
                    continue
 
214
                basepath.pop()
 
215
            elif p == '.':
 
216
                continue # No-op
 
217
            else:
 
218
                basepath.append(p)
 
219
 
 
220
        path = '/'.join(basepath)
 
221
        # mutter('relpath => remotepath %s => %s', relpath, path)
 
222
        return path
 
223
 
 
224
    def relpath(self, abspath):
 
225
        username, password, host, port, path = self._split_url(abspath)
 
226
        error = []
 
227
        if (username != self._username):
 
228
            error.append('username mismatch')
 
229
        if (host != self._host):
 
230
            error.append('host mismatch')
 
231
        if (port != self._port):
 
232
            error.append('port mismatch')
 
233
        if (not path.startswith(self._path)):
 
234
            error.append('path mismatch')
 
235
        if error:
 
236
            extra = ': ' + ', '.join(error)
 
237
            raise PathNotChild(abspath, self.base, extra=extra)
 
238
        pl = len(self._path)
 
239
        return path[pl:].strip('/')
 
240
 
 
241
    def has(self, relpath):
 
242
        """
 
243
        Does the target location exist?
 
244
        """
 
245
        try:
 
246
            self._sftp.stat(self._remote_path(relpath))
 
247
            return True
 
248
        except IOError:
 
249
            return False
 
250
 
 
251
    def get(self, relpath):
 
252
        """
 
253
        Get the file at the given relative path.
 
254
 
 
255
        :param relpath: The relative path to the file
 
256
        """
 
257
        try:
 
258
            path = self._remote_path(relpath)
 
259
            f = self._sftp.file(path, mode='rb')
 
260
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
 
261
                f.prefetch()
 
262
            return f
 
263
        except (IOError, paramiko.SSHException), e:
 
264
            self._translate_io_exception(e, path, ': error retrieving')
 
265
 
 
266
    def readv(self, relpath, offsets):
 
267
        """See Transport.readv()"""
 
268
        # We overload the default readv() because we want to use a file
 
269
        # that does not have prefetch enabled.
 
270
        # Also, if we have a new paramiko, it implements an async readv()
 
271
        if not offsets:
 
272
            return
 
273
 
 
274
        try:
 
275
            path = self._remote_path(relpath)
 
276
            fp = self._sftp.file(path, mode='rb')
 
277
            readv = getattr(fp, 'readv', None)
 
278
            if readv:
 
279
                return self._sftp_readv(fp, offsets)
 
280
            mutter('seek and read %s offsets', len(offsets))
 
281
            return self._seek_and_read(fp, offsets)
 
282
        except (IOError, paramiko.SSHException), e:
 
283
            self._translate_io_exception(e, path, ': error retrieving')
 
284
 
 
285
    def _sftp_readv(self, fp, offsets):
 
286
        """Use the readv() member of fp to do async readv.
 
287
 
 
288
        And then read them using paramiko.readv(). paramiko.readv()
 
289
        does not support ranges > 64K, so it caps the request size, and
 
290
        just reads until it gets all the stuff it wants
 
291
        """
 
292
        offsets = list(offsets)
 
293
        sorted_offsets = sorted(offsets)
 
294
 
 
295
        # The algorithm works as follows:
 
296
        # 1) Coalesce nearby reads into a single chunk
 
297
        #    This generates a list of combined regions, the total size
 
298
        #    and the size of the sub regions. This coalescing step is limited
 
299
        #    in the number of nearby chunks to combine, and is allowed to
 
300
        #    skip small breaks in the requests. Limiting it makes sure that
 
301
        #    we can start yielding some data earlier, and skipping means we
 
302
        #    make fewer requests. (Beneficial even when using async)
 
303
        # 2) Break up this combined regions into chunks that are smaller
 
304
        #    than 64KiB. Technically the limit is 65536, but we are a
 
305
        #    little bit conservative. This is because sftp has a maximum
 
306
        #    return chunk size of 64KiB (max size of an unsigned short)
 
307
        # 3) Issue a readv() to paramiko to create an async request for
 
308
        #    all of this data
 
309
        # 4) Read in the data as it comes back, until we've read one
 
310
        #    continuous section as determined in step 1
 
311
        # 5) Break up the full sections into hunks for the original requested
 
312
        #    offsets. And put them in a cache
 
313
        # 6) Check if the next request is in the cache, and if it is, remove
 
314
        #    it from the cache, and yield its data. Continue until no more
 
315
        #    entries are in the cache.
 
316
        # 7) loop back to step 4 until all data has been read
 
317
        #
 
318
        # TODO: jam 20060725 This could be optimized one step further, by
 
319
        #       attempting to yield whatever data we have read, even before
 
320
        #       the first coallesced section has been fully processed.
 
321
 
 
322
        # When coalescing for use with readv(), we don't really need to
 
323
        # use any fudge factor, because the requests are made asynchronously
 
324
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
325
                               limit=self._max_readv_combine,
 
326
                               fudge_factor=0,
 
327
                               ))
 
328
        requests = []
 
329
        for c_offset in coalesced:
 
330
            start = c_offset.start
 
331
            size = c_offset.length
 
332
 
 
333
            # We need to break this up into multiple requests
 
334
            while size > 0:
 
335
                next_size = min(size, self._max_request_size)
 
336
                requests.append((start, next_size))
 
337
                size -= next_size
 
338
                start += next_size
 
339
 
 
340
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
341
                len(offsets), len(coalesced), len(requests))
 
342
 
 
343
        # Queue the current read until we have read the full coalesced section
 
344
        cur_data = []
 
345
        cur_data_len = 0
 
346
        cur_coalesced_stack = iter(coalesced)
 
347
        cur_coalesced = cur_coalesced_stack.next()
 
348
 
 
349
        # Cache the results, but only until they have been fulfilled
 
350
        data_map = {}
 
351
        # turn the list of offsets into a stack
 
352
        offset_stack = iter(offsets)
 
353
        cur_offset_and_size = offset_stack.next()
 
354
 
 
355
        for data in fp.readv(requests):
 
356
            cur_data += data
 
357
            cur_data_len += len(data)
 
358
 
 
359
            if cur_data_len < cur_coalesced.length:
 
360
                continue
 
361
            assert cur_data_len == cur_coalesced.length, \
 
362
                "Somehow we read too much: %s != %s" % (cur_data_len,
 
363
                                                        cur_coalesced.length)
 
364
            all_data = ''.join(cur_data)
 
365
            cur_data = []
 
366
            cur_data_len = 0
 
367
 
 
368
            for suboffset, subsize in cur_coalesced.ranges:
 
369
                key = (cur_coalesced.start+suboffset, subsize)
 
370
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
371
 
 
372
            # Now that we've read some data, see if we can yield anything back
 
373
            while cur_offset_and_size in data_map:
 
374
                this_data = data_map.pop(cur_offset_and_size)
 
375
                yield cur_offset_and_size[0], this_data
 
376
                cur_offset_and_size = offset_stack.next()
 
377
 
 
378
            # Now that we've read all of the data for this coalesced section
 
379
            # on to the next
 
380
            cur_coalesced = cur_coalesced_stack.next()
 
381
 
 
382
    def put(self, relpath, f, mode=None):
 
383
        """
 
384
        Copy the file-like or string object into the location.
 
385
 
 
386
        :param relpath: Location to put the contents, relative to base.
 
387
        :param f:       File-like or string object.
 
388
        :param mode: The final mode for the file
 
389
        """
 
390
        final_path = self._remote_path(relpath)
 
391
        self._put(final_path, f, mode=mode)
 
392
 
 
393
    def _put(self, abspath, f, mode=None):
 
394
        """Helper function so both put() and copy_abspaths can reuse the code"""
 
395
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
 
396
                        os.getpid(), random.randint(0,0x7FFFFFFF))
 
397
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
 
398
        closed = False
 
399
        try:
 
400
            try:
 
401
                fout.set_pipelined(True)
 
402
                self._pump(f, fout)
 
403
            except (IOError, paramiko.SSHException), e:
 
404
                self._translate_io_exception(e, tmp_abspath)
 
405
            if mode is not None:
 
406
                self._sftp.chmod(tmp_abspath, mode)
 
407
            fout.close()
 
408
            closed = True
 
409
            self._rename_and_overwrite(tmp_abspath, abspath)
 
410
        except Exception, e:
 
411
            # If we fail, try to clean up the temporary file
 
412
            # before we throw the exception
 
413
            # but don't let another exception mess things up
 
414
            # Write out the traceback, because otherwise
 
415
            # the catch and throw destroys it
 
416
            import traceback
 
417
            mutter(traceback.format_exc())
 
418
            try:
 
419
                if not closed:
 
420
                    fout.close()
 
421
                self._sftp.remove(tmp_abspath)
 
422
            except:
 
423
                # raise the saved except
 
424
                raise e
 
425
            # raise the original with its traceback if we can.
 
426
            raise
 
427
 
 
428
    def iter_files_recursive(self):
 
429
        """Walk the relative paths of all files in this transport."""
 
430
        queue = list(self.list_dir('.'))
 
431
        while queue:
 
432
            relpath = queue.pop(0)
 
433
            st = self.stat(relpath)
 
434
            if stat.S_ISDIR(st.st_mode):
 
435
                for i, basename in enumerate(self.list_dir(relpath)):
 
436
                    queue.insert(i, relpath+'/'+basename)
 
437
            else:
 
438
                yield relpath
 
439
 
 
440
    def mkdir(self, relpath, mode=None):
 
441
        """Create a directory at the given path."""
 
442
        path = self._remote_path(relpath)
 
443
        try:
 
444
            # In the paramiko documentation, it says that passing a mode flag 
 
445
            # will filtered against the server umask.
 
446
            # StubSFTPServer does not do this, which would be nice, because it is
 
447
            # what we really want :)
 
448
            # However, real servers do use umask, so we really should do it that way
 
449
            self._sftp.mkdir(path)
 
450
            if mode is not None:
 
451
                self._sftp.chmod(path, mode=mode)
 
452
        except (paramiko.SSHException, IOError), e:
 
453
            self._translate_io_exception(e, path, ': unable to mkdir',
 
454
                failure_exc=FileExists)
 
455
 
 
456
    def _translate_io_exception(self, e, path, more_info='', 
 
457
                                failure_exc=PathError):
 
458
        """Translate a paramiko or IOError into a friendlier exception.
 
459
 
 
460
        :param e: The original exception
 
461
        :param path: The path in question when the error is raised
 
462
        :param more_info: Extra information that can be included,
 
463
                          such as what was going on
 
464
        :param failure_exc: Paramiko has the super fun ability to raise completely
 
465
                           opaque errors that just set "e.args = ('Failure',)" with
 
466
                           no more information.
 
467
                           If this parameter is set, it defines the exception 
 
468
                           to raise in these cases.
 
469
        """
 
470
        # paramiko seems to generate detailless errors.
 
471
        self._translate_error(e, path, raise_generic=False)
 
472
        if hasattr(e, 'args'):
 
473
            if (e.args == ('No such file or directory',) or
 
474
                e.args == ('No such file',)):
 
475
                raise NoSuchFile(path, str(e) + more_info)
 
476
            if (e.args == ('mkdir failed',)):
 
477
                raise FileExists(path, str(e) + more_info)
 
478
            # strange but true, for the paramiko server.
 
479
            if (e.args == ('Failure',)):
 
480
                raise failure_exc(path, str(e) + more_info)
 
481
            mutter('Raising exception with args %s', e.args)
 
482
        if hasattr(e, 'errno'):
 
483
            mutter('Raising exception with errno %s', e.errno)
 
484
        raise e
 
485
 
 
486
    def append(self, relpath, f, mode=None):
 
487
        """
 
488
        Append the text in the file-like object into the final
 
489
        location.
 
490
        """
 
491
        try:
 
492
            path = self._remote_path(relpath)
 
493
            fout = self._sftp.file(path, 'ab')
 
494
            if mode is not None:
 
495
                self._sftp.chmod(path, mode)
 
496
            result = fout.tell()
 
497
            self._pump(f, fout)
 
498
            return result
 
499
        except (IOError, paramiko.SSHException), e:
 
500
            self._translate_io_exception(e, relpath, ': unable to append')
 
501
 
 
502
    def rename(self, rel_from, rel_to):
 
503
        """Rename without special overwriting"""
 
504
        try:
 
505
            self._sftp.rename(self._remote_path(rel_from),
 
506
                              self._remote_path(rel_to))
 
507
        except (IOError, paramiko.SSHException), e:
 
508
            self._translate_io_exception(e, rel_from,
 
509
                    ': unable to rename to %r' % (rel_to))
 
510
 
 
511
    def _rename_and_overwrite(self, abs_from, abs_to):
 
512
        """Do a fancy rename on the remote server.
 
513
        
 
514
        Using the implementation provided by osutils.
 
515
        """
 
516
        try:
 
517
            fancy_rename(abs_from, abs_to,
 
518
                    rename_func=self._sftp.rename,
 
519
                    unlink_func=self._sftp.remove)
 
520
        except (IOError, paramiko.SSHException), e:
 
521
            self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
 
522
 
 
523
    def move(self, rel_from, rel_to):
 
524
        """Move the item at rel_from to the location at rel_to"""
 
525
        path_from = self._remote_path(rel_from)
 
526
        path_to = self._remote_path(rel_to)
 
527
        self._rename_and_overwrite(path_from, path_to)
 
528
 
 
529
    def delete(self, relpath):
 
530
        """Delete the item at relpath"""
 
531
        path = self._remote_path(relpath)
 
532
        try:
 
533
            self._sftp.remove(path)
 
534
        except (IOError, paramiko.SSHException), e:
 
535
            self._translate_io_exception(e, path, ': unable to delete')
 
536
            
 
537
    def listable(self):
 
538
        """Return True if this store supports listing."""
 
539
        return True
 
540
 
 
541
    def list_dir(self, relpath):
 
542
        """
 
543
        Return a list of all files at the given location.
 
544
        """
 
545
        # does anything actually use this?
 
546
        # -- Unknown
 
547
        # This is at least used by copy_tree for remote upgrades.
 
548
        # -- David Allouche 2006-08-11
 
549
        path = self._remote_path(relpath)
 
550
        try:
 
551
            entries = self._sftp.listdir(path)
 
552
        except (IOError, paramiko.SSHException), e:
 
553
            self._translate_io_exception(e, path, ': failed to list_dir')
 
554
        return [urlutils.escape(entry) for entry in entries]
 
555
 
 
556
    def rmdir(self, relpath):
 
557
        """See Transport.rmdir."""
 
558
        path = self._remote_path(relpath)
 
559
        try:
 
560
            return self._sftp.rmdir(path)
 
561
        except (IOError, paramiko.SSHException), e:
 
562
            self._translate_io_exception(e, path, ': failed to rmdir')
 
563
 
 
564
    def stat(self, relpath):
 
565
        """Return the stat information for a file."""
 
566
        path = self._remote_path(relpath)
 
567
        try:
 
568
            return self._sftp.stat(path)
 
569
        except (IOError, paramiko.SSHException), e:
 
570
            self._translate_io_exception(e, path, ': unable to stat')
 
571
 
 
572
    def lock_read(self, relpath):
 
573
        """
 
574
        Lock the given file for shared (read) access.
 
575
        :return: A lock object, which has an unlock() member function
 
576
        """
 
577
        # FIXME: there should be something clever i can do here...
 
578
        class BogusLock(object):
 
579
            def __init__(self, path):
 
580
                self.path = path
 
581
            def unlock(self):
 
582
                pass
 
583
        return BogusLock(relpath)
 
584
 
 
585
    def lock_write(self, relpath):
 
586
        """
 
587
        Lock the given file for exclusive (write) access.
 
588
        WARNING: many transports do not support this, so trying avoid using it
 
589
 
 
590
        :return: A lock object, which has an unlock() member function
 
591
        """
 
592
        # This is a little bit bogus, but basically, we create a file
 
593
        # which should not already exist, and if it does, we assume
 
594
        # that there is a lock, and if it doesn't, the we assume
 
595
        # that we have taken the lock.
 
596
        return SFTPLock(relpath, self)
 
597
 
 
598
    def _unparse_url(self, path=None):
 
599
        if path is None:
 
600
            path = self._path
 
601
        path = urllib.quote(path)
 
602
        # handle homedir paths
 
603
        if not path.startswith('/'):
 
604
            path = "/~/" + path
 
605
        netloc = urllib.quote(self._host)
 
606
        if self._username is not None:
 
607
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
608
        if self._port is not None:
 
609
            netloc = '%s:%d' % (netloc, self._port)
 
610
        return urlparse.urlunparse(('sftp', netloc, path, '', '', ''))
 
611
 
 
612
    def _split_url(self, url):
 
613
        (scheme, username, password, host, port, path) = split_url(url)
 
614
        assert scheme == 'sftp'
 
615
 
 
616
        # the initial slash should be removed from the path, and treated
 
617
        # as a homedir relative path (the path begins with a double slash
 
618
        # if it is absolute).
 
619
        # see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
620
        # RBC 20060118 we are not using this as its too user hostile. instead
 
621
        # we are following lftp and using /~/foo to mean '~/foo'.
 
622
        # handle homedir paths
 
623
        if path.startswith('/~/'):
 
624
            path = path[3:]
 
625
        elif path == '/~':
 
626
            path = ''
 
627
        return (username, password, host, port, path)
 
628
 
 
629
    def _parse_url(self, url):
 
630
        (self._username, self._password,
 
631
         self._host, self._port, self._path) = self._split_url(url)
 
632
 
 
633
    def _sftp_connect(self):
 
634
        """Connect to the remote sftp server.
 
635
        After this, self._sftp should have a valid connection (or
 
636
        we raise an TransportError 'could not connect').
 
637
 
 
638
        TODO: Raise a more reasonable ConnectionFailed exception
 
639
        """
 
640
        self._sftp = _sftp_connect(self._host, self._port, self._username,
 
641
                self._password)
 
642
 
 
643
    def _sftp_open_exclusive(self, abspath, mode=None):
 
644
        """Open a remote path exclusively.
 
645
 
 
646
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
 
647
        the file already exists. However it does not expose this
 
648
        at the higher level of SFTPClient.open(), so we have to
 
649
        sneak away with it.
 
650
 
 
651
        WARNING: This breaks the SFTPClient abstraction, so it
 
652
        could easily break against an updated version of paramiko.
 
653
 
 
654
        :param abspath: The remote absolute path where the file should be opened
 
655
        :param mode: The mode permissions bits for the new file
 
656
        """
 
657
        path = self._sftp._adjust_cwd(abspath)
 
658
        # mutter('sftp abspath %s => %s', abspath, path)
 
659
        attr = SFTPAttributes()
 
660
        if mode is not None:
 
661
            attr.st_mode = mode
 
662
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
663
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
 
664
        try:
 
665
            t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
 
666
            if t != CMD_HANDLE:
 
667
                raise TransportError('Expected an SFTP handle')
 
668
            handle = msg.get_string()
 
669
            return SFTPFile(self._sftp, handle, 'wb', -1)
 
670
        except (paramiko.SSHException, IOError), e:
 
671
            self._translate_io_exception(e, abspath, ': unable to open',
 
672
                failure_exc=FileExists)
 
673
 
 
674
 
 
675
# ------------- server test implementation --------------
 
676
import threading
 
677
 
 
678
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
 
679
 
 
680
STUB_SERVER_KEY = """
 
681
-----BEGIN RSA PRIVATE KEY-----
 
682
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
683
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
684
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
685
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
686
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
687
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
688
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
689
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
690
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
691
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
692
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
693
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
694
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
695
-----END RSA PRIVATE KEY-----
 
696
"""
 
697
 
 
698
 
 
699
class SocketListener(threading.Thread):
 
700
 
 
701
    def __init__(self, callback):
 
702
        threading.Thread.__init__(self)
 
703
        self._callback = callback
 
704
        self._socket = socket.socket()
 
705
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
706
        self._socket.bind(('localhost', 0))
 
707
        self._socket.listen(1)
 
708
        self.port = self._socket.getsockname()[1]
 
709
        self._stop_event = threading.Event()
 
710
 
 
711
    def stop(self):
 
712
        # called from outside this thread
 
713
        self._stop_event.set()
 
714
        # use a timeout here, because if the test fails, the server thread may
 
715
        # never notice the stop_event.
 
716
        self.join(5.0)
 
717
        self._socket.close()
 
718
 
 
719
    def run(self):
 
720
        while True:
 
721
            readable, writable_unused, exception_unused = \
 
722
                select.select([self._socket], [], [], 0.1)
 
723
            if self._stop_event.isSet():
 
724
                return
 
725
            if len(readable) == 0:
 
726
                continue
 
727
            try:
 
728
                s, addr_unused = self._socket.accept()
 
729
                # because the loopback socket is inline, and transports are
 
730
                # never explicitly closed, best to launch a new thread.
 
731
                threading.Thread(target=self._callback, args=(s,)).start()
 
732
            except socket.error, x:
 
733
                sys.excepthook(*sys.exc_info())
 
734
                warning('Socket error during accept() within unit test server'
 
735
                        ' thread: %r' % x)
 
736
            except Exception, x:
 
737
                # probably a failed test; unit test thread will log the
 
738
                # failure/error
 
739
                sys.excepthook(*sys.exc_info())
 
740
                warning('Exception from within unit test server thread: %r' % 
 
741
                        x)
 
742
 
 
743
 
 
744
class SocketDelay(object):
 
745
    """A socket decorator to make TCP appear slower.
 
746
 
 
747
    This changes recv, send, and sendall to add a fixed latency to each python
 
748
    call if a new roundtrip is detected. That is, when a recv is called and the
 
749
    flag new_roundtrip is set, latency is charged. Every send and send_all
 
750
    sets this flag.
 
751
 
 
752
    In addition every send, sendall and recv sleeps a bit per character send to
 
753
    simulate bandwidth.
 
754
 
 
755
    Not all methods are implemented, this is deliberate as this class is not a
 
756
    replacement for the builtin sockets layer. fileno is not implemented to
 
757
    prevent the proxy being bypassed. 
 
758
    """
 
759
 
 
760
    simulated_time = 0
 
761
    _proxied_arguments = dict.fromkeys([
 
762
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
 
763
        "setblocking", "setsockopt", "settimeout", "shutdown"])
 
764
 
 
765
    def __init__(self, sock, latency, bandwidth=1.0, 
 
766
                 really_sleep=True):
 
767
        """ 
 
768
        :param bandwith: simulated bandwith (MegaBit)
 
769
        :param really_sleep: If set to false, the SocketDelay will just
 
770
        increase a counter, instead of calling time.sleep. This is useful for
 
771
        unittesting the SocketDelay.
 
772
        """
 
773
        self.sock = sock
 
774
        self.latency = latency
 
775
        self.really_sleep = really_sleep
 
776
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
 
777
        self.new_roundtrip = False
 
778
 
 
779
    def sleep(self, s):
 
780
        if self.really_sleep:
 
781
            time.sleep(s)
 
782
        else:
 
783
            SocketDelay.simulated_time += s
 
784
 
 
785
    def __getattr__(self, attr):
 
786
        if attr in SocketDelay._proxied_arguments:
 
787
            return getattr(self.sock, attr)
 
788
        raise AttributeError("'SocketDelay' object has no attribute %r" %
 
789
                             attr)
 
790
 
 
791
    def dup(self):
 
792
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
 
793
                           self._sleep)
 
794
 
 
795
    def recv(self, *args):
 
796
        data = self.sock.recv(*args)
 
797
        if data and self.new_roundtrip:
 
798
            self.new_roundtrip = False
 
799
            self.sleep(self.latency)
 
800
        self.sleep(len(data) * self.time_per_byte)
 
801
        return data
 
802
 
 
803
    def sendall(self, data, flags=0):
 
804
        if not self.new_roundtrip:
 
805
            self.new_roundtrip = True
 
806
            self.sleep(self.latency)
 
807
        self.sleep(len(data) * self.time_per_byte)
 
808
        return self.sock.sendall(data, flags)
 
809
 
 
810
    def send(self, data, flags=0):
 
811
        if not self.new_roundtrip:
 
812
            self.new_roundtrip = True
 
813
            self.sleep(self.latency)
 
814
        bytes_sent = self.sock.send(data, flags)
 
815
        self.sleep(bytes_sent * self.time_per_byte)
 
816
        return bytes_sent
 
817
 
 
818
 
 
819
class SFTPServer(Server):
 
820
    """Common code for SFTP server facilities."""
 
821
 
 
822
    def __init__(self):
 
823
        self._original_vendor = None
 
824
        self._homedir = None
 
825
        self._server_homedir = None
 
826
        self._listener = None
 
827
        self._root = None
 
828
        self._vendor = ssh.ParamikoVendor()
 
829
        # sftp server logs
 
830
        self.logs = []
 
831
        self.add_latency = 0
 
832
 
 
833
    def _get_sftp_url(self, path):
 
834
        """Calculate an sftp url to this server for path."""
 
835
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
 
836
 
 
837
    def log(self, message):
 
838
        """StubServer uses this to log when a new server is created."""
 
839
        self.logs.append(message)
 
840
 
 
841
    def _run_server_entry(self, sock):
 
842
        """Entry point for all implementations of _run_server.
 
843
        
 
844
        If self.add_latency is > 0.000001 then sock is given a latency adding
 
845
        decorator.
 
846
        """
 
847
        if self.add_latency > 0.000001:
 
848
            sock = SocketDelay(sock, self.add_latency)
 
849
        return self._run_server(sock)
 
850
 
 
851
    def _run_server(self, s):
 
852
        ssh_server = paramiko.Transport(s)
 
853
        key_file = pathjoin(self._homedir, 'test_rsa.key')
 
854
        f = open(key_file, 'w')
 
855
        f.write(STUB_SERVER_KEY)
 
856
        f.close()
 
857
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
858
        ssh_server.add_server_key(host_key)
 
859
        server = StubServer(self)
 
860
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
861
                                         StubSFTPServer, root=self._root,
 
862
                                         home=self._server_homedir)
 
863
        event = threading.Event()
 
864
        ssh_server.start_server(event, server)
 
865
        event.wait(5.0)
 
866
    
 
867
    def setUp(self):
 
868
        self._original_vendor = ssh._ssh_vendor
 
869
        ssh._ssh_vendor = self._vendor
 
870
        if sys.platform == 'win32':
 
871
            # Win32 needs to use the UNICODE api
 
872
            self._homedir = getcwd()
 
873
        else:
 
874
            # But Linux SFTP servers should just deal in bytestreams
 
875
            self._homedir = os.getcwd()
 
876
        if self._server_homedir is None:
 
877
            self._server_homedir = self._homedir
 
878
        self._root = '/'
 
879
        if sys.platform == 'win32':
 
880
            self._root = ''
 
881
        self._listener = SocketListener(self._run_server_entry)
 
882
        self._listener.setDaemon(True)
 
883
        self._listener.start()
 
884
 
 
885
    def tearDown(self):
 
886
        """See bzrlib.transport.Server.tearDown."""
 
887
        self._listener.stop()
 
888
        ssh._ssh_vendor = self._original_vendor
 
889
 
 
890
    def get_bogus_url(self):
 
891
        """See bzrlib.transport.Server.get_bogus_url."""
 
892
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
893
        # we bind a random socket, so that we get a guaranteed unused port
 
894
        # we just never listen on that port
 
895
        s = socket.socket()
 
896
        s.bind(('localhost', 0))
 
897
        return 'sftp://%s:%s/' % s.getsockname()
 
898
 
 
899
 
 
900
class SFTPFullAbsoluteServer(SFTPServer):
 
901
    """A test server for sftp transports, using absolute urls and ssh."""
 
902
 
 
903
    def get_url(self):
 
904
        """See bzrlib.transport.Server.get_url."""
 
905
        return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
 
906
 
 
907
 
 
908
class SFTPServerWithoutSSH(SFTPServer):
 
909
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
 
910
 
 
911
    def __init__(self):
 
912
        super(SFTPServerWithoutSSH, self).__init__()
 
913
        self._vendor = ssh.LoopbackVendor()
 
914
 
 
915
    def _run_server(self, sock):
 
916
        # Re-import these as locals, so that they're still accessible during
 
917
        # interpreter shutdown (when all module globals get set to None, leading
 
918
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
919
        import socket, errno
 
920
        class FakeChannel(object):
 
921
            def get_transport(self):
 
922
                return self
 
923
            def get_log_channel(self):
 
924
                return 'paramiko'
 
925
            def get_name(self):
 
926
                return '1'
 
927
            def get_hexdump(self):
 
928
                return False
 
929
            def close(self):
 
930
                pass
 
931
 
 
932
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
 
933
                                     root=self._root, home=self._server_homedir)
 
934
        try:
 
935
            server.start_subsystem('sftp', None, sock)
 
936
        except socket.error, e:
 
937
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
938
                # it's okay for the client to disconnect abruptly
 
939
                # (bug in paramiko 1.6: it should absorb this exception)
 
940
                pass
 
941
            else:
 
942
                raise
 
943
        except Exception, e:
 
944
            import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
 
945
        server.finish_subsystem()
 
946
 
 
947
 
 
948
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
 
949
    """A test server for sftp transports, using absolute urls."""
 
950
 
 
951
    def get_url(self):
 
952
        """See bzrlib.transport.Server.get_url."""
 
953
        if sys.platform == 'win32':
 
954
            return self._get_sftp_url(urlutils.escape(self._homedir))
 
955
        else:
 
956
            return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
 
957
 
 
958
 
 
959
class SFTPHomeDirServer(SFTPServerWithoutSSH):
 
960
    """A test server for sftp transports, using homedir relative urls."""
 
961
 
 
962
    def get_url(self):
 
963
        """See bzrlib.transport.Server.get_url."""
 
964
        return self._get_sftp_url("~/")
 
965
 
 
966
 
 
967
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
 
968
    """A test servere for sftp transports, using absolute urls to non-home."""
 
969
 
 
970
    def setUp(self):
 
971
        self._server_homedir = '/dev/noone/runs/tests/here'
 
972
        super(SFTPSiblingAbsoluteServer, self).setUp()
 
973
 
 
974
 
 
975
def _sftp_connect(host, port, username, password):
 
976
    """Connect to the remote sftp server.
 
977
 
 
978
    :raises: a TransportError 'could not connect'.
 
979
 
 
980
    :returns: an paramiko.sftp_client.SFTPClient
 
981
 
 
982
    TODO: Raise a more reasonable ConnectionFailed exception
 
983
    """
 
984
    idx = (host, port, username)
 
985
    try:
 
986
        return _connected_hosts[idx]
 
987
    except KeyError:
 
988
        pass
 
989
    
 
990
    sftp = _sftp_connect_uncached(host, port, username, password)
 
991
    _connected_hosts[idx] = sftp
 
992
    return sftp
 
993
 
 
994
def _sftp_connect_uncached(host, port, username, password):
 
995
    vendor = ssh._get_ssh_vendor()
 
996
    sftp = vendor.connect_sftp(username, password, host, port)
 
997
    return sftp
 
998
 
 
999
 
 
1000
def get_test_permutations():
 
1001
    """Return the permutations to be used in testing."""
 
1002
    return [(SFTPTransport, SFTPAbsoluteServer),
 
1003
            (SFTPTransport, SFTPHomeDirServer),
 
1004
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
1005
            ]