1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
 
3
# This program is free software; you can redistribute it and/or modify
 
 
4
# it under the terms of the GNU General Public License as published by
 
 
5
# the Free Software Foundation; either version 2 of the License, or
 
 
6
# (at your option) any later version.
 
 
8
# This program is distributed in the hope that it will be useful,
 
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
11
# GNU General Public License for more details.
 
 
13
# You should have received a copy of the GNU General Public License
 
 
14
# along with this program; if not, write to the Free Software
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
22
from bzrlib.selftest import TestCaseInTempDir
 
 
23
from bzrlib.selftest.testtransport import TestTransportMixIn
 
 
27
    from stub_sftp import StubServer, StubSFTPServer
 
 
28
    paramiko_loaded = True
 
 
30
    paramiko_loaded = False
 
 
34
-----BEGIN RSA PRIVATE KEY-----
 
 
35
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
 
36
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
 
37
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
 
38
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
 
39
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
 
40
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
 
41
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
 
42
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
 
43
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
 
44
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
 
45
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
 
46
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
 
47
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
 
48
-----END RSA PRIVATE KEY-----
 
 
52
class SingleListener (threading.Thread):
 
 
53
    def __init__(self, callback):
 
 
54
        threading.Thread.__init__(self)
 
 
55
        self._callback = callback
 
 
56
        self._socket = socket.socket()
 
 
57
        self._socket.listen(1)
 
 
58
        self.port = self._socket.getsockname()[1]
 
 
59
        self.stop_event = threading.Event()
 
 
62
        s, _ = self._socket.accept()
 
 
63
        # now close the listen socket
 
 
65
        self._callback(s, self.stop_event)
 
 
71
class TestCaseWithSFTPServer (TestCaseInTempDir):
 
 
73
    Execute a test case with a stub SFTP server, serving files from the local
 
 
74
    filesystem over the loopback network.
 
 
77
    def _run_server(self, s, stop_event):
 
 
78
        ssh_server = paramiko.Transport(s)
 
 
79
        key_file = os.path.join(self._root, 'test_rsa.key')
 
 
80
        file(key_file, 'w').write(STUB_SERVER_KEY)
 
 
81
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
 
82
        ssh_server.add_server_key(host_key)
 
 
84
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
 
 
85
        event = threading.Event()
 
 
86
        ssh_server.start_server(event, server)
 
 
91
        TestCaseInTempDir.setUp(self)
 
 
92
        self._root = self.test_dir
 
 
94
        self._listener = SingleListener(self._run_server)
 
 
95
        self._listener.setDaemon(True)
 
 
96
        self._listener.start()        
 
 
97
        self._sftp_url = 'sftp://foo:bar@localhost:%d/' % (self._listener.port,)
 
 
100
        self._listener.stop()
 
 
101
        TestCaseInTempDir.tearDown(self)
 
 
104
class SFTPTransportTest (TestCaseWithSFTPServer, TestTransportMixIn):
 
 
107
    def get_transport(self):
 
 
108
        from bzrlib.transport.sftp import SFTPTransport
 
 
110
        return SFTPTransport(url)
 
 
112
if not paramiko_loaded:
 
 
113
    del SFTPTransportTest