/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Jan Balster
  • Date: 2006-08-15 12:39:42 UTC
  • mfrom: (1923 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1928.
  • Revision ID: jan@merlinux.de-20060815123942-22c388c6e9a8ac91
merge bzr.dev 1923

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
2
# Copyright (C) 2005, 2006 Canonical Ltd
3
 
 
 
3
#
4
4
# This program is free software; you can redistribute it and/or modify
5
5
# it under the terms of the GNU General Public License as published by
6
6
# the Free Software Foundation; either version 2 of the License, or
7
7
# (at your option) any later version.
8
 
 
 
8
#
9
9
# This program is distributed in the hope that it will be useful,
10
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
12
# GNU General Public License for more details.
13
 
 
 
13
#
14
14
# You should have received a copy of the GNU General Public License
15
15
# along with this program; if not, write to the Free Software
16
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
19
 
20
20
import errno
21
21
import getpass
 
22
import itertools
22
23
import os
23
24
import random
24
25
import re
25
26
import select
 
27
import socket
26
28
import stat
27
29
import subprocess
28
30
import sys
40
42
                           PathError,
41
43
                           ParamikoNotPresent,
42
44
                           )
43
 
from bzrlib.osutils import pathjoin, fancy_rename
 
45
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
44
46
from bzrlib.trace import mutter, warning, error
45
47
from bzrlib.transport import (
46
48
    register_urlparse_netloc_protocol,
101
103
                }
102
104
 
103
105
 
104
 
# don't use prefetch unless paramiko version >= 1.5.2 (there were bugs earlier)
105
 
_default_do_prefetch = False
106
 
if getattr(paramiko, '__version_info__', (0, 0, 0)) >= (1, 5, 5):
107
 
    _default_do_prefetch = True
 
106
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
 
107
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
 
108
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
 
109
 
 
110
# Paramiko 1.5 tries to open a socket.AF_UNIX in order to connect
 
111
# to ssh-agent. That attribute doesn't exist on win32 (it does in cygwin)
 
112
# so we get an AttributeError exception. So we will not try to
 
113
# connect to an agent if we are on win32 and using Paramiko older than 1.6
 
114
_use_ssh_agent = (sys.platform != 'win32' or _paramiko_version >= (1, 6, 0))
108
115
 
109
116
 
110
117
_ssh_vendor = None
305
312
 
306
313
 
307
314
class SFTPTransport (Transport):
308
 
    """
309
 
    Transport implementation for SFTP access.
310
 
    """
 
315
    """Transport implementation for SFTP access"""
 
316
 
311
317
    _do_prefetch = _default_do_prefetch
 
318
    # TODO: jam 20060717 Conceivably these could be configurable, either
 
319
    #       by auto-tuning at run-time, or by a configuration (per host??)
 
320
    #       but the performance curve is pretty flat, so just going with
 
321
    #       reasonable defaults.
 
322
    _max_readv_combine = 200
 
323
    # Having to round trip to the server means waiting for a response,
 
324
    # so it is better to download extra bytes.
 
325
    # 8KiB had good performance for both local and remote network operations
 
326
    _bytes_to_read_before_seek = 8192
 
327
 
 
328
    # The sftp spec says that implementations SHOULD allow reads
 
329
    # to be at least 32K. paramiko.readv() does an async request
 
330
    # for the chunks. So we need to keep it within a single request
 
331
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
 
332
    # up the request itself, rather than us having to worry about it
 
333
    _max_request_size = 32768
312
334
 
313
335
    def __init__(self, base, clone_from=None):
314
336
        assert base.startswith('sftp://')
375
397
                basepath.append(p)
376
398
 
377
399
        path = '/'.join(basepath)
 
400
        # mutter('relpath => remotepath %s => %s', relpath, path)
378
401
        return path
379
402
 
380
403
    def relpath(self, abspath):
419
442
        except (IOError, paramiko.SSHException), e:
420
443
            self._translate_io_exception(e, path, ': error retrieving')
421
444
 
422
 
    def get_partial(self, relpath, start, length=None):
423
 
        """
424
 
        Get just part of a file.
425
 
 
426
 
        :param relpath: Path to the file, relative to base
427
 
        :param start: The starting position to read from
428
 
        :param length: The length to read. A length of None indicates
429
 
                       read to the end of the file.
430
 
        :return: A file-like object containing at least the specified bytes.
431
 
                 Some implementations may return objects which can be read
432
 
                 past this length, but this is not guaranteed.
433
 
        """
434
 
        # TODO: implement get_partial_multi to help with knit support
435
 
        f = self.get(relpath)
436
 
        f.seek(start)
437
 
        if self._do_prefetch and hasattr(f, 'prefetch'):
438
 
            f.prefetch()
439
 
        return f
 
445
    def readv(self, relpath, offsets):
 
446
        """See Transport.readv()"""
 
447
        # We overload the default readv() because we want to use a file
 
448
        # that does not have prefetch enabled.
 
449
        # Also, if we have a new paramiko, it implements an async readv()
 
450
        if not offsets:
 
451
            return
 
452
 
 
453
        try:
 
454
            path = self._remote_path(relpath)
 
455
            fp = self._sftp.file(path, mode='rb')
 
456
            readv = getattr(fp, 'readv', None)
 
457
            if readv:
 
458
                return self._sftp_readv(fp, offsets)
 
459
            mutter('seek and read %s offsets', len(offsets))
 
460
            return self._seek_and_read(fp, offsets)
 
461
        except (IOError, paramiko.SSHException), e:
 
462
            self._translate_io_exception(e, path, ': error retrieving')
 
463
 
 
464
    def _sftp_readv(self, fp, offsets):
 
465
        """Use the readv() member of fp to do async readv.
 
466
 
 
467
        And then read them using paramiko.readv(). paramiko.readv()
 
468
        does not support ranges > 64K, so it caps the request size, and
 
469
        just reads until it gets all the stuff it wants
 
470
        """
 
471
        offsets = list(offsets)
 
472
        sorted_offsets = sorted(offsets)
 
473
 
 
474
        # The algorithm works as follows:
 
475
        # 1) Coalesce nearby reads into a single chunk
 
476
        #    This generates a list of combined regions, the total size
 
477
        #    and the size of the sub regions. This coalescing step is limited
 
478
        #    in the number of nearby chunks to combine, and is allowed to
 
479
        #    skip small breaks in the requests. Limiting it makes sure that
 
480
        #    we can start yielding some data earlier, and skipping means we
 
481
        #    make fewer requests. (Beneficial even when using async)
 
482
        # 2) Break up this combined regions into chunks that are smaller
 
483
        #    than 64KiB. Technically the limit is 65536, but we are a
 
484
        #    little bit conservative. This is because sftp has a maximum
 
485
        #    return chunk size of 64KiB (max size of an unsigned short)
 
486
        # 3) Issue a readv() to paramiko to create an async request for
 
487
        #    all of this data
 
488
        # 4) Read in the data as it comes back, until we've read one
 
489
        #    continuous section as determined in step 1
 
490
        # 5) Break up the full sections into hunks for the original requested
 
491
        #    offsets. And put them in a cache
 
492
        # 6) Check if the next request is in the cache, and if it is, remove
 
493
        #    it from the cache, and yield its data. Continue until no more
 
494
        #    entries are in the cache.
 
495
        # 7) loop back to step 4 until all data has been read
 
496
        #
 
497
        # TODO: jam 20060725 This could be optimized one step further, by
 
498
        #       attempting to yield whatever data we have read, even before
 
499
        #       the first coallesced section has been fully processed.
 
500
 
 
501
        # When coalescing for use with readv(), we don't really need to
 
502
        # use any fudge factor, because the requests are made asynchronously
 
503
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
504
                               limit=self._max_readv_combine,
 
505
                               fudge_factor=0,
 
506
                               ))
 
507
        requests = []
 
508
        for c_offset in coalesced:
 
509
            start = c_offset.start
 
510
            size = c_offset.length
 
511
 
 
512
            # We need to break this up into multiple requests
 
513
            while size > 0:
 
514
                next_size = min(size, self._max_request_size)
 
515
                requests.append((start, next_size))
 
516
                size -= next_size
 
517
                start += next_size
 
518
 
 
519
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
520
                len(offsets), len(coalesced), len(requests))
 
521
 
 
522
        # Queue the current read until we have read the full coalesced section
 
523
        cur_data = []
 
524
        cur_data_len = 0
 
525
        cur_coalesced_stack = iter(coalesced)
 
526
        cur_coalesced = cur_coalesced_stack.next()
 
527
 
 
528
        # Cache the results, but only until they have been fulfilled
 
529
        data_map = {}
 
530
        # turn the list of offsets into a stack
 
531
        offset_stack = iter(offsets)
 
532
        cur_offset_and_size = offset_stack.next()
 
533
 
 
534
        for data in fp.readv(requests):
 
535
            cur_data += data
 
536
            cur_data_len += len(data)
 
537
 
 
538
            if cur_data_len < cur_coalesced.length:
 
539
                continue
 
540
            assert cur_data_len == cur_coalesced.length, \
 
541
                "Somehow we read too much: %s != %s" % (cur_data_len,
 
542
                                                        cur_coalesced.length)
 
543
            all_data = ''.join(cur_data)
 
544
            cur_data = []
 
545
            cur_data_len = 0
 
546
 
 
547
            for suboffset, subsize in cur_coalesced.ranges:
 
548
                key = (cur_coalesced.start+suboffset, subsize)
 
549
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
550
 
 
551
            # Now that we've read some data, see if we can yield anything back
 
552
            while cur_offset_and_size in data_map:
 
553
                this_data = data_map.pop(cur_offset_and_size)
 
554
                yield cur_offset_and_size[0], this_data
 
555
                cur_offset_and_size = offset_stack.next()
 
556
 
 
557
            # Now that we've read all of the data for this coalesced section
 
558
            # on to the next
 
559
            cur_coalesced = cur_coalesced_stack.next()
440
560
 
441
561
    def put(self, relpath, f, mode=None):
442
562
        """
498
618
 
499
619
    def mkdir(self, relpath, mode=None):
500
620
        """Create a directory at the given path."""
 
621
        path = self._remote_path(relpath)
501
622
        try:
502
 
            path = self._remote_path(relpath)
503
623
            # In the paramiko documentation, it says that passing a mode flag 
504
624
            # will filtered against the server umask.
505
625
            # StubSFTPServer does not do this, which would be nice, because it is
711
831
                                      % (self._host, self._port, e))
712
832
            self._sftp = SFTPClient(LoopbackSFTP(sock))
713
833
        elif vendor != 'none':
714
 
            sock = SFTPSubprocess(self._host, vendor, self._port,
715
 
                                  self._username)
716
 
            self._sftp = SFTPClient(sock)
 
834
            try:
 
835
                sock = SFTPSubprocess(self._host, vendor, self._port,
 
836
                                      self._username)
 
837
                self._sftp = SFTPClient(sock)
 
838
            except (EOFError, paramiko.SSHException), e:
 
839
                raise ConnectionError('Unable to connect to SSH host %s:%s: %s'
 
840
                                      % (self._host, self._port, e))
 
841
            except (OSError, IOError), e:
 
842
                # If the machine is fast enough, ssh can actually exit
 
843
                # before we try and send it the sftp request, which
 
844
                # raises a Broken Pipe
 
845
                if e.errno not in (errno.EPIPE,):
 
846
                    raise
 
847
                raise ConnectionError('Unable to connect to SSH host %s:%s: %s'
 
848
                                      % (self._host, self._port, e))
717
849
        else:
718
850
            self._paramiko_connect()
719
851
 
728
860
            t = paramiko.Transport((self._host, self._port or 22))
729
861
            t.set_log_channel('bzr.paramiko')
730
862
            t.start_client()
731
 
        except paramiko.SSHException, e:
 
863
        except (paramiko.SSHException, socket.error), e:
732
864
            raise ConnectionError('Unable to reach SSH host %s:%s: %s' 
733
865
                                  % (self._host, self._port, e))
734
866
            
773
905
        # Also, it would mess up the self.relpath() functionality
774
906
        username = self._username or getpass.getuser()
775
907
 
776
 
        # Paramiko tries to open a socket.AF_UNIX in order to connect
777
 
        # to ssh-agent. That attribute doesn't exist on win32 (it does in cygwin)
778
 
        # so we get an AttributeError exception. For now, just don't try to
779
 
        # connect to an agent if we are on win32
780
 
        if sys.platform != 'win32':
 
908
        if _use_ssh_agent:
781
909
            agent = paramiko.Agent()
782
910
            for key in agent.get_keys():
783
911
                mutter('Trying SSH agent key %s' % paramiko.util.hexify(key.get_fingerprint()))
850
978
        :param mode: The mode permissions bits for the new file
851
979
        """
852
980
        path = self._sftp._adjust_cwd(abspath)
 
981
        # mutter('sftp abspath %s => %s', abspath, path)
853
982
        attr = SFTPAttributes()
854
983
        if mode is not None:
855
984
            attr.st_mode = mode
959
1088
 
960
1089
    def _run_server(self, s):
961
1090
        ssh_server = paramiko.Transport(s)
962
 
        key_file = os.path.join(self._homedir, 'test_rsa.key')
 
1091
        key_file = pathjoin(self._homedir, 'test_rsa.key')
963
1092
        f = open(key_file, 'w')
964
1093
        f.write(STUB_SERVER_KEY)
965
1094
        f.close()
977
1106
        global _ssh_vendor
978
1107
        self._original_vendor = _ssh_vendor
979
1108
        _ssh_vendor = self._vendor
980
 
        self._homedir = os.getcwd()
 
1109
        if sys.platform == 'win32':
 
1110
            # Win32 needs to use the UNICODE api
 
1111
            self._homedir = getcwd()
 
1112
        else:
 
1113
            # But Linux SFTP servers should just deal in bytestreams
 
1114
            self._homedir = os.getcwd()
981
1115
        if self._server_homedir is None:
982
1116
            self._server_homedir = self._homedir
983
1117
        self._root = '/'
984
 
        # FIXME WINDOWS: _root should be _server_homedir[0]:/
 
1118
        if sys.platform == 'win32':
 
1119
            self._root = ''
985
1120
        self._listener = SocketListener(self._run_server)
986
1121
        self._listener.setDaemon(True)
987
1122
        self._listener.start()
994
1129
 
995
1130
    def get_bogus_url(self):
996
1131
        """See bzrlib.transport.Server.get_bogus_url."""
997
 
        # this is chosen to try to prevent trouble with proxies, wierd dns,
998
 
        # etc
999
 
        return 'sftp://127.0.0.1:1/'
1000
 
 
 
1132
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
1133
        # we bind a random socket, so that we get a guaranteed unused port
 
1134
        # we just never listen on that port
 
1135
        s = socket.socket()
 
1136
        s.bind(('localhost', 0))
 
1137
        return 'sftp://%s:%s/' % s.getsockname()
1001
1138
 
1002
1139
 
1003
1140
class SFTPFullAbsoluteServer(SFTPServer):
1030
1167
 
1031
1168
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1032
1169
                                     root=self._root, home=self._server_homedir)
1033
 
        server.start_subsystem('sftp', None, sock)
 
1170
        try:
 
1171
            server.start_subsystem('sftp', None, sock)
 
1172
        except socket.error, e:
 
1173
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
1174
                # it's okay for the client to disconnect abruptly
 
1175
                # (bug in paramiko 1.6: it should absorb this exception)
 
1176
                pass
 
1177
            else:
 
1178
                raise
 
1179
        except Exception, e:
 
1180
            import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
1034
1181
        server.finish_subsystem()
1035
1182
 
1036
1183
 
1039
1186
 
1040
1187
    def get_url(self):
1041
1188
        """See bzrlib.transport.Server.get_url."""
1042
 
        return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
 
1189
        if sys.platform == 'win32':
 
1190
            return self._get_sftp_url(urlutils.escape(self._homedir))
 
1191
        else:
 
1192
            return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1043
1193
 
1044
1194
 
1045
1195
class SFTPHomeDirServer(SFTPServerWithoutSSH):