/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-05-07 10:04:52 UTC
  • mfrom: (3408.4.1 ianc-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20080507100452-ya8ofjjd5f5pb9q7
Nicer error when smart server started on an address already in use
        (Andrea Corbellini)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
 
18
"""Implementation of Transport over SFTP, using paramiko."""
 
19
 
 
20
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
 
21
# then raise TransportNotPossible, which will break remote access to any
 
22
# formats which rely on OS-level locks.  That should be fine as those formats
 
23
# are pretty old, but these combinations may have to be removed from the test
 
24
# suite.  Those formats all date back to 0.7; so we should be able to remove
 
25
# these methods when we officially drop support for those formats.
 
26
 
 
27
import errno
 
28
import os
 
29
import random
 
30
import select
 
31
import socket
 
32
import stat
 
33
import sys
 
34
import time
 
35
import urllib
 
36
import urlparse
 
37
import warnings
 
38
 
 
39
from bzrlib import (
 
40
    errors,
 
41
    urlutils,
 
42
    )
 
43
from bzrlib.errors import (FileExists,
 
44
                           NoSuchFile, PathNotChild,
 
45
                           TransportError,
 
46
                           LockError,
 
47
                           PathError,
 
48
                           ParamikoNotPresent,
 
49
                           )
 
50
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
51
from bzrlib.symbol_versioning import (
 
52
        deprecated_function,
 
53
        )
 
54
from bzrlib.trace import mutter, warning
 
55
from bzrlib.transport import (
 
56
    FileFileStream,
 
57
    _file_streams,
 
58
    local,
 
59
    Server,
 
60
    ssh,
 
61
    ConnectedTransport,
 
62
    )
 
63
 
 
64
# Disable one particular warning that comes from paramiko in Python2.5; if
 
65
# this is emitted at the wrong time it tends to cause spurious test failures
 
66
# or at least noise in the test case::
 
67
#
 
68
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
 
69
# test_permissions.TestSftpPermissions.test_new_files
 
70
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
 
71
#  self.packet.write(struct.pack('>I', n))
 
72
warnings.filterwarnings('ignore',
 
73
        'integer argument expected, got float',
 
74
        category=DeprecationWarning,
 
75
        module='paramiko.message')
 
76
 
 
77
try:
 
78
    import paramiko
 
79
except ImportError, e:
 
80
    raise ParamikoNotPresent(e)
 
81
else:
 
82
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
 
83
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
 
84
                               CMD_HANDLE, CMD_OPEN)
 
85
    from paramiko.sftp_attr import SFTPAttributes
 
86
    from paramiko.sftp_file import SFTPFile
 
87
 
 
88
 
 
89
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
 
90
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
 
91
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
 
92
 
 
93
 
 
94
class SFTPLock(object):
 
95
    """This fakes a lock in a remote location.
 
96
    
 
97
    A present lock is indicated just by the existence of a file.  This
 
98
    doesn't work well on all transports and they are only used in 
 
99
    deprecated storage formats.
 
100
    """
 
101
    
 
102
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
 
103
 
 
104
    def __init__(self, path, transport):
 
105
        assert isinstance(transport, SFTPTransport)
 
106
 
 
107
        self.lock_file = None
 
108
        self.path = path
 
109
        self.lock_path = path + '.write-lock'
 
110
        self.transport = transport
 
111
        try:
 
112
            # RBC 20060103 FIXME should we be using private methods here ?
 
113
            abspath = transport._remote_path(self.lock_path)
 
114
            self.lock_file = transport._sftp_open_exclusive(abspath)
 
115
        except FileExists:
 
116
            raise LockError('File %r already locked' % (self.path,))
 
117
 
 
118
    def __del__(self):
 
119
        """Should this warn, or actually try to cleanup?"""
 
120
        if self.lock_file:
 
121
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
 
122
            self.unlock()
 
123
 
 
124
    def unlock(self):
 
125
        if not self.lock_file:
 
126
            return
 
127
        self.lock_file.close()
 
128
        self.lock_file = None
 
129
        try:
 
130
            self.transport.delete(self.lock_path)
 
131
        except (NoSuchFile,):
 
132
            # What specific errors should we catch here?
 
133
            pass
 
134
 
 
135
 
 
136
class SFTPTransport(ConnectedTransport):
 
137
    """Transport implementation for SFTP access."""
 
138
 
 
139
    _do_prefetch = _default_do_prefetch
 
140
    # TODO: jam 20060717 Conceivably these could be configurable, either
 
141
    #       by auto-tuning at run-time, or by a configuration (per host??)
 
142
    #       but the performance curve is pretty flat, so just going with
 
143
    #       reasonable defaults.
 
144
    _max_readv_combine = 200
 
145
    # Having to round trip to the server means waiting for a response,
 
146
    # so it is better to download extra bytes.
 
147
    # 8KiB had good performance for both local and remote network operations
 
148
    _bytes_to_read_before_seek = 8192
 
149
 
 
150
    # The sftp spec says that implementations SHOULD allow reads
 
151
    # to be at least 32K. paramiko.readv() does an async request
 
152
    # for the chunks. So we need to keep it within a single request
 
153
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
 
154
    # up the request itself, rather than us having to worry about it
 
155
    _max_request_size = 32768
 
156
 
 
157
    def __init__(self, base, _from_transport=None):
 
158
        assert base.startswith('sftp://')
 
159
        super(SFTPTransport, self).__init__(base,
 
160
                                            _from_transport=_from_transport)
 
161
 
 
162
    def _remote_path(self, relpath):
 
163
        """Return the path to be passed along the sftp protocol for relpath.
 
164
        
 
165
        :param relpath: is a urlencoded string.
 
166
        """
 
167
        relative = urlutils.unescape(relpath).encode('utf-8')
 
168
        remote_path = self._combine_paths(self._path, relative)
 
169
        # the initial slash should be removed from the path, and treated as a
 
170
        # homedir relative path (the path begins with a double slash if it is
 
171
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
172
        # RBC 20060118 we are not using this as its too user hostile. instead
 
173
        # we are following lftp and using /~/foo to mean '~/foo'
 
174
        # vila--20070602 and leave absolute paths begin with a single slash.
 
175
        if remote_path.startswith('/~/'):
 
176
            remote_path = remote_path[3:]
 
177
        elif remote_path == '/~':
 
178
            remote_path = ''
 
179
        return remote_path
 
180
 
 
181
    def _create_connection(self, credentials=None):
 
182
        """Create a new connection with the provided credentials.
 
183
 
 
184
        :param credentials: The credentials needed to establish the connection.
 
185
 
 
186
        :return: The created connection and its associated credentials.
 
187
 
 
188
        The credentials are only the password as it may have been entered
 
189
        interactively by the user and may be different from the one provided
 
190
        in base url at transport creation time.
 
191
        """
 
192
        if credentials is None:
 
193
            password = self._password
 
194
        else:
 
195
            password = credentials
 
196
 
 
197
        vendor = ssh._get_ssh_vendor()
 
198
        connection = vendor.connect_sftp(self._user, password,
 
199
                                         self._host, self._port)
 
200
        return connection, password
 
201
 
 
202
    def _get_sftp(self):
 
203
        """Ensures that a connection is established"""
 
204
        connection = self._get_connection()
 
205
        if connection is None:
 
206
            # First connection ever
 
207
            connection, credentials = self._create_connection()
 
208
            self._set_connection(connection, credentials)
 
209
        return connection
 
210
 
 
211
    def has(self, relpath):
 
212
        """
 
213
        Does the target location exist?
 
214
        """
 
215
        try:
 
216
            self._get_sftp().stat(self._remote_path(relpath))
 
217
            return True
 
218
        except IOError:
 
219
            return False
 
220
 
 
221
    def get(self, relpath):
 
222
        """
 
223
        Get the file at the given relative path.
 
224
 
 
225
        :param relpath: The relative path to the file
 
226
        """
 
227
        try:
 
228
            path = self._remote_path(relpath)
 
229
            f = self._get_sftp().file(path, mode='rb')
 
230
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
 
231
                f.prefetch()
 
232
            return f
 
233
        except (IOError, paramiko.SSHException), e:
 
234
            self._translate_io_exception(e, path, ': error retrieving',
 
235
                failure_exc=errors.ReadError)
 
236
 
 
237
    def _readv(self, relpath, offsets):
 
238
        """See Transport.readv()"""
 
239
        # We overload the default readv() because we want to use a file
 
240
        # that does not have prefetch enabled.
 
241
        # Also, if we have a new paramiko, it implements an async readv()
 
242
        if not offsets:
 
243
            return
 
244
 
 
245
        try:
 
246
            path = self._remote_path(relpath)
 
247
            fp = self._get_sftp().file(path, mode='rb')
 
248
            readv = getattr(fp, 'readv', None)
 
249
            if readv:
 
250
                return self._sftp_readv(fp, offsets, relpath)
 
251
            mutter('seek and read %s offsets', len(offsets))
 
252
            return self._seek_and_read(fp, offsets, relpath)
 
253
        except (IOError, paramiko.SSHException), e:
 
254
            self._translate_io_exception(e, path, ': error retrieving')
 
255
 
 
256
    def recommended_page_size(self):
 
257
        """See Transport.recommended_page_size().
 
258
 
 
259
        For SFTP we suggest a large page size to reduce the overhead
 
260
        introduced by latency.
 
261
        """
 
262
        return 64 * 1024
 
263
 
 
264
    def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
 
265
        """Use the readv() member of fp to do async readv.
 
266
 
 
267
        And then read them using paramiko.readv(). paramiko.readv()
 
268
        does not support ranges > 64K, so it caps the request size, and
 
269
        just reads until it gets all the stuff it wants
 
270
        """
 
271
        offsets = list(offsets)
 
272
        sorted_offsets = sorted(offsets)
 
273
 
 
274
        # The algorithm works as follows:
 
275
        # 1) Coalesce nearby reads into a single chunk
 
276
        #    This generates a list of combined regions, the total size
 
277
        #    and the size of the sub regions. This coalescing step is limited
 
278
        #    in the number of nearby chunks to combine, and is allowed to
 
279
        #    skip small breaks in the requests. Limiting it makes sure that
 
280
        #    we can start yielding some data earlier, and skipping means we
 
281
        #    make fewer requests. (Beneficial even when using async)
 
282
        # 2) Break up this combined regions into chunks that are smaller
 
283
        #    than 64KiB. Technically the limit is 65536, but we are a
 
284
        #    little bit conservative. This is because sftp has a maximum
 
285
        #    return chunk size of 64KiB (max size of an unsigned short)
 
286
        # 3) Issue a readv() to paramiko to create an async request for
 
287
        #    all of this data
 
288
        # 4) Read in the data as it comes back, until we've read one
 
289
        #    continuous section as determined in step 1
 
290
        # 5) Break up the full sections into hunks for the original requested
 
291
        #    offsets. And put them in a cache
 
292
        # 6) Check if the next request is in the cache, and if it is, remove
 
293
        #    it from the cache, and yield its data. Continue until no more
 
294
        #    entries are in the cache.
 
295
        # 7) loop back to step 4 until all data has been read
 
296
        #
 
297
        # TODO: jam 20060725 This could be optimized one step further, by
 
298
        #       attempting to yield whatever data we have read, even before
 
299
        #       the first coallesced section has been fully processed.
 
300
 
 
301
        # When coalescing for use with readv(), we don't really need to
 
302
        # use any fudge factor, because the requests are made asynchronously
 
303
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
304
                               limit=self._max_readv_combine,
 
305
                               fudge_factor=0,
 
306
                               ))
 
307
        requests = []
 
308
        for c_offset in coalesced:
 
309
            start = c_offset.start
 
310
            size = c_offset.length
 
311
 
 
312
            # We need to break this up into multiple requests
 
313
            while size > 0:
 
314
                next_size = min(size, self._max_request_size)
 
315
                requests.append((start, next_size))
 
316
                size -= next_size
 
317
                start += next_size
 
318
 
 
319
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
320
                len(offsets), len(coalesced), len(requests))
 
321
 
 
322
        # Queue the current read until we have read the full coalesced section
 
323
        cur_data = []
 
324
        cur_data_len = 0
 
325
        cur_coalesced_stack = iter(coalesced)
 
326
        cur_coalesced = cur_coalesced_stack.next()
 
327
 
 
328
        # Cache the results, but only until they have been fulfilled
 
329
        data_map = {}
 
330
        # turn the list of offsets into a stack
 
331
        offset_stack = iter(offsets)
 
332
        cur_offset_and_size = offset_stack.next()
 
333
 
 
334
        for data in fp.readv(requests):
 
335
            cur_data += data
 
336
            cur_data_len += len(data)
 
337
 
 
338
            if cur_data_len < cur_coalesced.length:
 
339
                continue
 
340
            assert cur_data_len == cur_coalesced.length, \
 
341
                "Somehow we read too much: %s != %s" % (cur_data_len,
 
342
                                                        cur_coalesced.length)
 
343
            all_data = ''.join(cur_data)
 
344
            cur_data = []
 
345
            cur_data_len = 0
 
346
 
 
347
            for suboffset, subsize in cur_coalesced.ranges:
 
348
                key = (cur_coalesced.start+suboffset, subsize)
 
349
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
350
 
 
351
            # Now that we've read some data, see if we can yield anything back
 
352
            while cur_offset_and_size in data_map:
 
353
                this_data = data_map.pop(cur_offset_and_size)
 
354
                yield cur_offset_and_size[0], this_data
 
355
                cur_offset_and_size = offset_stack.next()
 
356
 
 
357
            # We read a coalesced entry, so mark it as done
 
358
            cur_coalesced = None
 
359
            # Now that we've read all of the data for this coalesced section
 
360
            # on to the next
 
361
            cur_coalesced = cur_coalesced_stack.next()
 
362
 
 
363
        if cur_coalesced is not None:
 
364
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
 
365
                cur_coalesced.length, len(data))
 
366
 
 
367
    def put_file(self, relpath, f, mode=None):
 
368
        """
 
369
        Copy the file-like object into the location.
 
370
 
 
371
        :param relpath: Location to put the contents, relative to base.
 
372
        :param f:       File-like object.
 
373
        :param mode: The final mode for the file
 
374
        """
 
375
        final_path = self._remote_path(relpath)
 
376
        return self._put(final_path, f, mode=mode)
 
377
 
 
378
    def _put(self, abspath, f, mode=None):
 
379
        """Helper function so both put() and copy_abspaths can reuse the code"""
 
380
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
 
381
                        os.getpid(), random.randint(0,0x7FFFFFFF))
 
382
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
 
383
        closed = False
 
384
        try:
 
385
            try:
 
386
                fout.set_pipelined(True)
 
387
                length = self._pump(f, fout)
 
388
            except (IOError, paramiko.SSHException), e:
 
389
                self._translate_io_exception(e, tmp_abspath)
 
390
            # XXX: This doesn't truly help like we would like it to.
 
391
            #      The problem is that openssh strips sticky bits. So while we
 
392
            #      can properly set group write permission, we lose the group
 
393
            #      sticky bit. So it is probably best to stop chmodding, and
 
394
            #      just tell users that they need to set the umask correctly.
 
395
            #      The attr.st_mode = mode, in _sftp_open_exclusive
 
396
            #      will handle when the user wants the final mode to be more 
 
397
            #      restrictive. And then we avoid a round trip. Unless 
 
398
            #      paramiko decides to expose an async chmod()
 
399
 
 
400
            # This is designed to chmod() right before we close.
 
401
            # Because we set_pipelined() earlier, theoretically we might 
 
402
            # avoid the round trip for fout.close()
 
403
            if mode is not None:
 
404
                self._get_sftp().chmod(tmp_abspath, mode)
 
405
            fout.close()
 
406
            closed = True
 
407
            self._rename_and_overwrite(tmp_abspath, abspath)
 
408
            return length
 
409
        except Exception, e:
 
410
            # If we fail, try to clean up the temporary file
 
411
            # before we throw the exception
 
412
            # but don't let another exception mess things up
 
413
            # Write out the traceback, because otherwise
 
414
            # the catch and throw destroys it
 
415
            import traceback
 
416
            mutter(traceback.format_exc())
 
417
            try:
 
418
                if not closed:
 
419
                    fout.close()
 
420
                self._get_sftp().remove(tmp_abspath)
 
421
            except:
 
422
                # raise the saved except
 
423
                raise e
 
424
            # raise the original with its traceback if we can.
 
425
            raise
 
426
 
 
427
    def _put_non_atomic_helper(self, relpath, writer, mode=None,
 
428
                               create_parent_dir=False,
 
429
                               dir_mode=None):
 
430
        abspath = self._remote_path(relpath)
 
431
 
 
432
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
433
        #       set the file mode at create time. If it does, use it.
 
434
        #       But for now, we just chmod later anyway.
 
435
 
 
436
        def _open_and_write_file():
 
437
            """Try to open the target file, raise error on failure"""
 
438
            fout = None
 
439
            try:
 
440
                try:
 
441
                    fout = self._get_sftp().file(abspath, mode='wb')
 
442
                    fout.set_pipelined(True)
 
443
                    writer(fout)
 
444
                except (paramiko.SSHException, IOError), e:
 
445
                    self._translate_io_exception(e, abspath,
 
446
                                                 ': unable to open')
 
447
 
 
448
                # This is designed to chmod() right before we close.
 
449
                # Because we set_pipelined() earlier, theoretically we might 
 
450
                # avoid the round trip for fout.close()
 
451
                if mode is not None:
 
452
                    self._get_sftp().chmod(abspath, mode)
 
453
            finally:
 
454
                if fout is not None:
 
455
                    fout.close()
 
456
 
 
457
        if not create_parent_dir:
 
458
            _open_and_write_file()
 
459
            return
 
460
 
 
461
        # Try error handling to create the parent directory if we need to
 
462
        try:
 
463
            _open_and_write_file()
 
464
        except NoSuchFile:
 
465
            # Try to create the parent directory, and then go back to
 
466
            # writing the file
 
467
            parent_dir = os.path.dirname(abspath)
 
468
            self._mkdir(parent_dir, dir_mode)
 
469
            _open_and_write_file()
 
470
 
 
471
    def put_file_non_atomic(self, relpath, f, mode=None,
 
472
                            create_parent_dir=False,
 
473
                            dir_mode=None):
 
474
        """Copy the file-like object into the target location.
 
475
 
 
476
        This function is not strictly safe to use. It is only meant to
 
477
        be used when you already know that the target does not exist.
 
478
        It is not safe, because it will open and truncate the remote
 
479
        file. So there may be a time when the file has invalid contents.
 
480
 
 
481
        :param relpath: The remote location to put the contents.
 
482
        :param f:       File-like object.
 
483
        :param mode:    Possible access permissions for new file.
 
484
                        None means do not set remote permissions.
 
485
        :param create_parent_dir: If we cannot create the target file because
 
486
                        the parent directory does not exist, go ahead and
 
487
                        create it, and then try again.
 
488
        """
 
489
        def writer(fout):
 
490
            self._pump(f, fout)
 
491
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
492
                                    create_parent_dir=create_parent_dir,
 
493
                                    dir_mode=dir_mode)
 
494
 
 
495
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
496
                             create_parent_dir=False,
 
497
                             dir_mode=None):
 
498
        def writer(fout):
 
499
            fout.write(bytes)
 
500
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
501
                                    create_parent_dir=create_parent_dir,
 
502
                                    dir_mode=dir_mode)
 
503
 
 
504
    def iter_files_recursive(self):
 
505
        """Walk the relative paths of all files in this transport."""
 
506
        queue = list(self.list_dir('.'))
 
507
        while queue:
 
508
            relpath = queue.pop(0)
 
509
            st = self.stat(relpath)
 
510
            if stat.S_ISDIR(st.st_mode):
 
511
                for i, basename in enumerate(self.list_dir(relpath)):
 
512
                    queue.insert(i, relpath+'/'+basename)
 
513
            else:
 
514
                yield relpath
 
515
 
 
516
    def _mkdir(self, abspath, mode=None):
 
517
        if mode is None:
 
518
            local_mode = 0777
 
519
        else:
 
520
            local_mode = mode
 
521
        try:
 
522
            self._get_sftp().mkdir(abspath, local_mode)
 
523
            if mode is not None:
 
524
                self._get_sftp().chmod(abspath, mode=mode)
 
525
        except (paramiko.SSHException, IOError), e:
 
526
            self._translate_io_exception(e, abspath, ': unable to mkdir',
 
527
                failure_exc=FileExists)
 
528
 
 
529
    def mkdir(self, relpath, mode=None):
 
530
        """Create a directory at the given path."""
 
531
        self._mkdir(self._remote_path(relpath), mode=mode)
 
532
 
 
533
    def open_write_stream(self, relpath, mode=None):
 
534
        """See Transport.open_write_stream."""
 
535
        # initialise the file to zero-length
 
536
        # this is three round trips, but we don't use this 
 
537
        # api more than once per write_group at the moment so 
 
538
        # it is a tolerable overhead. Better would be to truncate
 
539
        # the file after opening. RBC 20070805
 
540
        self.put_bytes_non_atomic(relpath, "", mode)
 
541
        abspath = self._remote_path(relpath)
 
542
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
543
        #       set the file mode at create time. If it does, use it.
 
544
        #       But for now, we just chmod later anyway.
 
545
        handle = None
 
546
        try:
 
547
            handle = self._get_sftp().file(abspath, mode='wb')
 
548
            handle.set_pipelined(True)
 
549
        except (paramiko.SSHException, IOError), e:
 
550
            self._translate_io_exception(e, abspath,
 
551
                                         ': unable to open')
 
552
        _file_streams[self.abspath(relpath)] = handle
 
553
        return FileFileStream(self, relpath, handle)
 
554
 
 
555
    def _translate_io_exception(self, e, path, more_info='',
 
556
                                failure_exc=PathError):
 
557
        """Translate a paramiko or IOError into a friendlier exception.
 
558
 
 
559
        :param e: The original exception
 
560
        :param path: The path in question when the error is raised
 
561
        :param more_info: Extra information that can be included,
 
562
                          such as what was going on
 
563
        :param failure_exc: Paramiko has the super fun ability to raise completely
 
564
                           opaque errors that just set "e.args = ('Failure',)" with
 
565
                           no more information.
 
566
                           If this parameter is set, it defines the exception 
 
567
                           to raise in these cases.
 
568
        """
 
569
        # paramiko seems to generate detailless errors.
 
570
        self._translate_error(e, path, raise_generic=False)
 
571
        if getattr(e, 'args', None) is not None:
 
572
            if (e.args == ('No such file or directory',) or
 
573
                e.args == ('No such file',)):
 
574
                raise NoSuchFile(path, str(e) + more_info)
 
575
            if (e.args == ('mkdir failed',)):
 
576
                raise FileExists(path, str(e) + more_info)
 
577
            # strange but true, for the paramiko server.
 
578
            if (e.args == ('Failure',)):
 
579
                raise failure_exc(path, str(e) + more_info)
 
580
            mutter('Raising exception with args %s', e.args)
 
581
        if getattr(e, 'errno', None) is not None:
 
582
            mutter('Raising exception with errno %s', e.errno)
 
583
        raise e
 
584
 
 
585
    def append_file(self, relpath, f, mode=None):
 
586
        """
 
587
        Append the text in the file-like object into the final
 
588
        location.
 
589
        """
 
590
        try:
 
591
            path = self._remote_path(relpath)
 
592
            fout = self._get_sftp().file(path, 'ab')
 
593
            if mode is not None:
 
594
                self._get_sftp().chmod(path, mode)
 
595
            result = fout.tell()
 
596
            self._pump(f, fout)
 
597
            return result
 
598
        except (IOError, paramiko.SSHException), e:
 
599
            self._translate_io_exception(e, relpath, ': unable to append')
 
600
 
 
601
    def rename(self, rel_from, rel_to):
 
602
        """Rename without special overwriting"""
 
603
        try:
 
604
            self._get_sftp().rename(self._remote_path(rel_from),
 
605
                              self._remote_path(rel_to))
 
606
        except (IOError, paramiko.SSHException), e:
 
607
            self._translate_io_exception(e, rel_from,
 
608
                    ': unable to rename to %r' % (rel_to))
 
609
 
 
610
    def _rename_and_overwrite(self, abs_from, abs_to):
 
611
        """Do a fancy rename on the remote server.
 
612
        
 
613
        Using the implementation provided by osutils.
 
614
        """
 
615
        try:
 
616
            sftp = self._get_sftp()
 
617
            fancy_rename(abs_from, abs_to,
 
618
                         rename_func=sftp.rename,
 
619
                         unlink_func=sftp.remove)
 
620
        except (IOError, paramiko.SSHException), e:
 
621
            self._translate_io_exception(e, abs_from,
 
622
                                         ': unable to rename to %r' % (abs_to))
 
623
 
 
624
    def move(self, rel_from, rel_to):
 
625
        """Move the item at rel_from to the location at rel_to"""
 
626
        path_from = self._remote_path(rel_from)
 
627
        path_to = self._remote_path(rel_to)
 
628
        self._rename_and_overwrite(path_from, path_to)
 
629
 
 
630
    def delete(self, relpath):
 
631
        """Delete the item at relpath"""
 
632
        path = self._remote_path(relpath)
 
633
        try:
 
634
            self._get_sftp().remove(path)
 
635
        except (IOError, paramiko.SSHException), e:
 
636
            self._translate_io_exception(e, path, ': unable to delete')
 
637
            
 
638
    def external_url(self):
 
639
        """See bzrlib.transport.Transport.external_url."""
 
640
        # the external path for SFTP is the base
 
641
        return self.base
 
642
 
 
643
    def listable(self):
 
644
        """Return True if this store supports listing."""
 
645
        return True
 
646
 
 
647
    def list_dir(self, relpath):
 
648
        """
 
649
        Return a list of all files at the given location.
 
650
        """
 
651
        # does anything actually use this?
 
652
        # -- Unknown
 
653
        # This is at least used by copy_tree for remote upgrades.
 
654
        # -- David Allouche 2006-08-11
 
655
        path = self._remote_path(relpath)
 
656
        try:
 
657
            entries = self._get_sftp().listdir(path)
 
658
        except (IOError, paramiko.SSHException), e:
 
659
            self._translate_io_exception(e, path, ': failed to list_dir')
 
660
        return [urlutils.escape(entry) for entry in entries]
 
661
 
 
662
    def rmdir(self, relpath):
 
663
        """See Transport.rmdir."""
 
664
        path = self._remote_path(relpath)
 
665
        try:
 
666
            return self._get_sftp().rmdir(path)
 
667
        except (IOError, paramiko.SSHException), e:
 
668
            self._translate_io_exception(e, path, ': failed to rmdir')
 
669
 
 
670
    def stat(self, relpath):
 
671
        """Return the stat information for a file."""
 
672
        path = self._remote_path(relpath)
 
673
        try:
 
674
            return self._get_sftp().stat(path)
 
675
        except (IOError, paramiko.SSHException), e:
 
676
            self._translate_io_exception(e, path, ': unable to stat')
 
677
 
 
678
    def lock_read(self, relpath):
 
679
        """
 
680
        Lock the given file for shared (read) access.
 
681
        :return: A lock object, which has an unlock() member function
 
682
        """
 
683
        # FIXME: there should be something clever i can do here...
 
684
        class BogusLock(object):
 
685
            def __init__(self, path):
 
686
                self.path = path
 
687
            def unlock(self):
 
688
                pass
 
689
        return BogusLock(relpath)
 
690
 
 
691
    def lock_write(self, relpath):
 
692
        """
 
693
        Lock the given file for exclusive (write) access.
 
694
        WARNING: many transports do not support this, so trying avoid using it
 
695
 
 
696
        :return: A lock object, which has an unlock() member function
 
697
        """
 
698
        # This is a little bit bogus, but basically, we create a file
 
699
        # which should not already exist, and if it does, we assume
 
700
        # that there is a lock, and if it doesn't, the we assume
 
701
        # that we have taken the lock.
 
702
        return SFTPLock(relpath, self)
 
703
 
 
704
    def _sftp_open_exclusive(self, abspath, mode=None):
 
705
        """Open a remote path exclusively.
 
706
 
 
707
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
 
708
        the file already exists. However it does not expose this
 
709
        at the higher level of SFTPClient.open(), so we have to
 
710
        sneak away with it.
 
711
 
 
712
        WARNING: This breaks the SFTPClient abstraction, so it
 
713
        could easily break against an updated version of paramiko.
 
714
 
 
715
        :param abspath: The remote absolute path where the file should be opened
 
716
        :param mode: The mode permissions bits for the new file
 
717
        """
 
718
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
 
719
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
 
720
        #       However, there is no way to set the permission mode at open 
 
721
        #       time using the sftp_client.file() functionality.
 
722
        path = self._get_sftp()._adjust_cwd(abspath)
 
723
        # mutter('sftp abspath %s => %s', abspath, path)
 
724
        attr = SFTPAttributes()
 
725
        if mode is not None:
 
726
            attr.st_mode = mode
 
727
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
728
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
 
729
        try:
 
730
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
 
731
            if t != CMD_HANDLE:
 
732
                raise TransportError('Expected an SFTP handle')
 
733
            handle = msg.get_string()
 
734
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
 
735
        except (paramiko.SSHException, IOError), e:
 
736
            self._translate_io_exception(e, abspath, ': unable to open',
 
737
                failure_exc=FileExists)
 
738
 
 
739
    def _can_roundtrip_unix_modebits(self):
 
740
        if sys.platform == 'win32':
 
741
            # anyone else?
 
742
            return False
 
743
        else:
 
744
            return True
 
745
 
 
746
# ------------- server test implementation --------------
 
747
import threading
 
748
 
 
749
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
 
750
 
 
751
STUB_SERVER_KEY = """
 
752
-----BEGIN RSA PRIVATE KEY-----
 
753
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
754
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
755
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
756
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
757
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
758
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
759
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
760
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
761
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
762
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
763
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
764
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
765
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
766
-----END RSA PRIVATE KEY-----
 
767
"""
 
768
 
 
769
 
 
770
class SocketListener(threading.Thread):
 
771
 
 
772
    def __init__(self, callback):
 
773
        threading.Thread.__init__(self)
 
774
        self._callback = callback
 
775
        self._socket = socket.socket()
 
776
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
777
        self._socket.bind(('localhost', 0))
 
778
        self._socket.listen(1)
 
779
        self.port = self._socket.getsockname()[1]
 
780
        self._stop_event = threading.Event()
 
781
 
 
782
    def stop(self):
 
783
        # called from outside this thread
 
784
        self._stop_event.set()
 
785
        # use a timeout here, because if the test fails, the server thread may
 
786
        # never notice the stop_event.
 
787
        self.join(5.0)
 
788
        self._socket.close()
 
789
 
 
790
    def run(self):
 
791
        while True:
 
792
            readable, writable_unused, exception_unused = \
 
793
                select.select([self._socket], [], [], 0.1)
 
794
            if self._stop_event.isSet():
 
795
                return
 
796
            if len(readable) == 0:
 
797
                continue
 
798
            try:
 
799
                s, addr_unused = self._socket.accept()
 
800
                # because the loopback socket is inline, and transports are
 
801
                # never explicitly closed, best to launch a new thread.
 
802
                threading.Thread(target=self._callback, args=(s,)).start()
 
803
            except socket.error, x:
 
804
                sys.excepthook(*sys.exc_info())
 
805
                warning('Socket error during accept() within unit test server'
 
806
                        ' thread: %r' % x)
 
807
            except Exception, x:
 
808
                # probably a failed test; unit test thread will log the
 
809
                # failure/error
 
810
                sys.excepthook(*sys.exc_info())
 
811
                warning('Exception from within unit test server thread: %r' % 
 
812
                        x)
 
813
 
 
814
 
 
815
class SocketDelay(object):
 
816
    """A socket decorator to make TCP appear slower.
 
817
 
 
818
    This changes recv, send, and sendall to add a fixed latency to each python
 
819
    call if a new roundtrip is detected. That is, when a recv is called and the
 
820
    flag new_roundtrip is set, latency is charged. Every send and send_all
 
821
    sets this flag.
 
822
 
 
823
    In addition every send, sendall and recv sleeps a bit per character send to
 
824
    simulate bandwidth.
 
825
 
 
826
    Not all methods are implemented, this is deliberate as this class is not a
 
827
    replacement for the builtin sockets layer. fileno is not implemented to
 
828
    prevent the proxy being bypassed. 
 
829
    """
 
830
 
 
831
    simulated_time = 0
 
832
    _proxied_arguments = dict.fromkeys([
 
833
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
 
834
        "setblocking", "setsockopt", "settimeout", "shutdown"])
 
835
 
 
836
    def __init__(self, sock, latency, bandwidth=1.0, 
 
837
                 really_sleep=True):
 
838
        """ 
 
839
        :param bandwith: simulated bandwith (MegaBit)
 
840
        :param really_sleep: If set to false, the SocketDelay will just
 
841
        increase a counter, instead of calling time.sleep. This is useful for
 
842
        unittesting the SocketDelay.
 
843
        """
 
844
        self.sock = sock
 
845
        self.latency = latency
 
846
        self.really_sleep = really_sleep
 
847
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
 
848
        self.new_roundtrip = False
 
849
 
 
850
    def sleep(self, s):
 
851
        if self.really_sleep:
 
852
            time.sleep(s)
 
853
        else:
 
854
            SocketDelay.simulated_time += s
 
855
 
 
856
    def __getattr__(self, attr):
 
857
        if attr in SocketDelay._proxied_arguments:
 
858
            return getattr(self.sock, attr)
 
859
        raise AttributeError("'SocketDelay' object has no attribute %r" %
 
860
                             attr)
 
861
 
 
862
    def dup(self):
 
863
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
 
864
                           self._sleep)
 
865
 
 
866
    def recv(self, *args):
 
867
        data = self.sock.recv(*args)
 
868
        if data and self.new_roundtrip:
 
869
            self.new_roundtrip = False
 
870
            self.sleep(self.latency)
 
871
        self.sleep(len(data) * self.time_per_byte)
 
872
        return data
 
873
 
 
874
    def sendall(self, data, flags=0):
 
875
        if not self.new_roundtrip:
 
876
            self.new_roundtrip = True
 
877
            self.sleep(self.latency)
 
878
        self.sleep(len(data) * self.time_per_byte)
 
879
        return self.sock.sendall(data, flags)
 
880
 
 
881
    def send(self, data, flags=0):
 
882
        if not self.new_roundtrip:
 
883
            self.new_roundtrip = True
 
884
            self.sleep(self.latency)
 
885
        bytes_sent = self.sock.send(data, flags)
 
886
        self.sleep(bytes_sent * self.time_per_byte)
 
887
        return bytes_sent
 
888
 
 
889
 
 
890
class SFTPServer(Server):
 
891
    """Common code for SFTP server facilities."""
 
892
 
 
893
    def __init__(self, server_interface=StubServer):
 
894
        self._original_vendor = None
 
895
        self._homedir = None
 
896
        self._server_homedir = None
 
897
        self._listener = None
 
898
        self._root = None
 
899
        self._vendor = ssh.ParamikoVendor()
 
900
        self._server_interface = server_interface
 
901
        # sftp server logs
 
902
        self.logs = []
 
903
        self.add_latency = 0
 
904
 
 
905
    def _get_sftp_url(self, path):
 
906
        """Calculate an sftp url to this server for path."""
 
907
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
 
908
 
 
909
    def log(self, message):
 
910
        """StubServer uses this to log when a new server is created."""
 
911
        self.logs.append(message)
 
912
 
 
913
    def _run_server_entry(self, sock):
 
914
        """Entry point for all implementations of _run_server.
 
915
        
 
916
        If self.add_latency is > 0.000001 then sock is given a latency adding
 
917
        decorator.
 
918
        """
 
919
        if self.add_latency > 0.000001:
 
920
            sock = SocketDelay(sock, self.add_latency)
 
921
        return self._run_server(sock)
 
922
 
 
923
    def _run_server(self, s):
 
924
        ssh_server = paramiko.Transport(s)
 
925
        key_file = pathjoin(self._homedir, 'test_rsa.key')
 
926
        f = open(key_file, 'w')
 
927
        f.write(STUB_SERVER_KEY)
 
928
        f.close()
 
929
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
930
        ssh_server.add_server_key(host_key)
 
931
        server = self._server_interface(self)
 
932
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
933
                                         StubSFTPServer, root=self._root,
 
934
                                         home=self._server_homedir)
 
935
        event = threading.Event()
 
936
        ssh_server.start_server(event, server)
 
937
        event.wait(5.0)
 
938
    
 
939
    def setUp(self, backing_server=None):
 
940
        # XXX: TODO: make sftpserver back onto backing_server rather than local
 
941
        # disk.
 
942
        assert (backing_server is None or
 
943
                isinstance(backing_server, local.LocalURLServer)), (
 
944
            "backing_server should not be %r, because this can only serve the "
 
945
            "local current working directory." % (backing_server,))
 
946
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
 
947
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
 
948
        if sys.platform == 'win32':
 
949
            # Win32 needs to use the UNICODE api
 
950
            self._homedir = getcwd()
 
951
        else:
 
952
            # But Linux SFTP servers should just deal in bytestreams
 
953
            self._homedir = os.getcwd()
 
954
        if self._server_homedir is None:
 
955
            self._server_homedir = self._homedir
 
956
        self._root = '/'
 
957
        if sys.platform == 'win32':
 
958
            self._root = ''
 
959
        self._listener = SocketListener(self._run_server_entry)
 
960
        self._listener.setDaemon(True)
 
961
        self._listener.start()
 
962
 
 
963
    def tearDown(self):
 
964
        """See bzrlib.transport.Server.tearDown."""
 
965
        self._listener.stop()
 
966
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
 
967
 
 
968
    def get_bogus_url(self):
 
969
        """See bzrlib.transport.Server.get_bogus_url."""
 
970
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
971
        # we bind a random socket, so that we get a guaranteed unused port
 
972
        # we just never listen on that port
 
973
        s = socket.socket()
 
974
        s.bind(('localhost', 0))
 
975
        return 'sftp://%s:%s/' % s.getsockname()
 
976
 
 
977
 
 
978
class SFTPFullAbsoluteServer(SFTPServer):
 
979
    """A test server for sftp transports, using absolute urls and ssh."""
 
980
 
 
981
    def get_url(self):
 
982
        """See bzrlib.transport.Server.get_url."""
 
983
        homedir = self._homedir
 
984
        if sys.platform != 'win32':
 
985
            # Remove the initial '/' on all platforms but win32
 
986
            homedir = homedir[1:]
 
987
        return self._get_sftp_url(urlutils.escape(homedir))
 
988
 
 
989
 
 
990
class SFTPServerWithoutSSH(SFTPServer):
 
991
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
 
992
 
 
993
    def __init__(self):
 
994
        super(SFTPServerWithoutSSH, self).__init__()
 
995
        self._vendor = ssh.LoopbackVendor()
 
996
 
 
997
    def _run_server(self, sock):
 
998
        # Re-import these as locals, so that they're still accessible during
 
999
        # interpreter shutdown (when all module globals get set to None, leading
 
1000
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
1001
        class FakeChannel(object):
 
1002
            def get_transport(self):
 
1003
                return self
 
1004
            def get_log_channel(self):
 
1005
                return 'paramiko'
 
1006
            def get_name(self):
 
1007
                return '1'
 
1008
            def get_hexdump(self):
 
1009
                return False
 
1010
            def close(self):
 
1011
                pass
 
1012
 
 
1013
        server = paramiko.SFTPServer(
 
1014
            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
 
1015
            root=self._root, home=self._server_homedir)
 
1016
        try:
 
1017
            server.start_subsystem(
 
1018
                'sftp', None, ssh.SocketAsChannelAdapter(sock))
 
1019
        except socket.error, e:
 
1020
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
1021
                # it's okay for the client to disconnect abruptly
 
1022
                # (bug in paramiko 1.6: it should absorb this exception)
 
1023
                pass
 
1024
            else:
 
1025
                raise
 
1026
        except Exception, e:
 
1027
            # This typically seems to happen during interpreter shutdown, so
 
1028
            # most of the useful ways to report this error are won't work.
 
1029
            # Writing the exception type, and then the text of the exception,
 
1030
            # seems to be the best we can do.
 
1031
            import sys
 
1032
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
 
1033
            sys.stderr.write('%s\n\n' % (e,))
 
1034
        server.finish_subsystem()
 
1035
 
 
1036
 
 
1037
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
 
1038
    """A test server for sftp transports, using absolute urls."""
 
1039
 
 
1040
    def get_url(self):
 
1041
        """See bzrlib.transport.Server.get_url."""
 
1042
        homedir = self._homedir
 
1043
        if sys.platform != 'win32':
 
1044
            # Remove the initial '/' on all platforms but win32
 
1045
            homedir = homedir[1:]
 
1046
        return self._get_sftp_url(urlutils.escape(homedir))
 
1047
 
 
1048
 
 
1049
class SFTPHomeDirServer(SFTPServerWithoutSSH):
 
1050
    """A test server for sftp transports, using homedir relative urls."""
 
1051
 
 
1052
    def get_url(self):
 
1053
        """See bzrlib.transport.Server.get_url."""
 
1054
        return self._get_sftp_url("~/")
 
1055
 
 
1056
 
 
1057
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
 
1058
    """A test server for sftp transports where only absolute paths will work.
 
1059
 
 
1060
    It does this by serving from a deeply-nested directory that doesn't exist.
 
1061
    """
 
1062
 
 
1063
    def setUp(self, backing_server=None):
 
1064
        self._server_homedir = '/dev/noone/runs/tests/here'
 
1065
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
 
1066
 
 
1067
 
 
1068
def get_test_permutations():
 
1069
    """Return the permutations to be used in testing."""
 
1070
    return [(SFTPTransport, SFTPAbsoluteServer),
 
1071
            (SFTPTransport, SFTPHomeDirServer),
 
1072
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
1073
            ]