/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/stub_sftp.py

  • Committer: John Arbash Meinel
  • Date: 2007-02-08 16:28:05 UTC
  • mto: This revision was merged to the branch mainline in revision 2278.
  • Revision ID: john@arbash-meinel.com-20070208162805-dcqiqrwjh9a5lo7n
``GPGStrategy.sign()`` will now raise ``BzrBadParameterUnicode`` if
you pass a Unicode string rather than an 8-bit string. It doesn't 
make sense to sign a Unicode string, and it turns out that some 
versions of python will write out the raw Unicode bytes rather than
encoding automatically. So fail and make callers do the right thing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""
 
18
A stub SFTP server for loopback SFTP testing.
 
19
Adapted from the one in paramiko's unit tests.
 
20
"""
 
21
 
 
22
import os
 
23
from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
 
24
    SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
 
25
import sys
 
26
 
 
27
from bzrlib.osutils import pathjoin
 
28
from bzrlib.trace import mutter
 
29
 
 
30
 
 
31
class StubServer (ServerInterface):
 
32
 
 
33
    def __init__(self, test_case):
 
34
        ServerInterface.__init__(self)
 
35
        self._test_case = test_case
 
36
 
 
37
    def check_auth_password(self, username, password):
 
38
        # all are allowed
 
39
        self._test_case.log('sftpserver - authorizing: %s' % (username,))
 
40
        return AUTH_SUCCESSFUL
 
41
 
 
42
    def check_channel_request(self, kind, chanid):
 
43
        self._test_case.log('sftpserver - channel request: %s, %s' % (kind, chanid))
 
44
        return OPEN_SUCCEEDED
 
45
 
 
46
 
 
47
class StubSFTPHandle (SFTPHandle):
 
48
    def stat(self):
 
49
        try:
 
50
            return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
 
51
        except OSError, e:
 
52
            return SFTPServer.convert_errno(e.errno)
 
53
 
 
54
    def chattr(self, attr):
 
55
        # python doesn't have equivalents to fchown or fchmod, so we have to
 
56
        # use the stored filename
 
57
        mutter('Changing permissions on %s to %s', self.filename, attr)
 
58
        try:
 
59
            SFTPServer.set_file_attr(self.filename, attr)
 
60
        except OSError, e:
 
61
            return SFTPServer.convert_errno(e.errno)
 
62
 
 
63
 
 
64
class StubSFTPServer (SFTPServerInterface):
 
65
 
 
66
    def __init__(self, server, root, home=None):
 
67
        SFTPServerInterface.__init__(self, server)
 
68
        # All paths are actually relative to 'root'.
 
69
        # this is like implementing chroot().
 
70
        self.root = root
 
71
        if home is None:
 
72
            self.home = ''
 
73
        else:
 
74
            assert home.startswith(self.root), \
 
75
                    "home must be a subdirectory of root (%s vs %s)" \
 
76
                    % (home, root)
 
77
            self.home = home[len(self.root):]
 
78
        if self.home.startswith('/'):
 
79
            self.home = self.home[1:]
 
80
        server._test_case.log('sftpserver - new connection')
 
81
 
 
82
    def _realpath(self, path):
 
83
        # paths returned from self.canonicalize() always start with
 
84
        # a path separator. So if 'root' is just '/', this would cause
 
85
        # a double slash at the beginning '//home/dir'. 
 
86
        if self.root == '/':
 
87
            return self.canonicalize(path)
 
88
        return self.root + self.canonicalize(path)
 
89
 
 
90
    if sys.platform == 'win32':
 
91
        def canonicalize(self, path):
 
92
            # Win32 sftp paths end up looking like
 
93
            #     sftp://host@foo/h:/foo/bar
 
94
            # which means absolute paths look like:
 
95
            #     /h:/foo/bar
 
96
            # and relative paths stay the same:
 
97
            #     foo/bar
 
98
            # win32 needs to use the Unicode APIs. so we require the 
 
99
            # paths to be utf8 (Linux just uses bytestreams)
 
100
            thispath = path.decode('utf8')
 
101
            if path.startswith('/'):
 
102
                # Abspath H:/foo/bar
 
103
                return os.path.normpath(thispath[1:])
 
104
            else:
 
105
                return os.path.normpath(os.path.join(self.home, thispath))
 
106
    else:
 
107
        def canonicalize(self, path):
 
108
            if os.path.isabs(path):
 
109
                return os.path.normpath(path)
 
110
            else:
 
111
                return os.path.normpath('/' + os.path.join(self.home, path))
 
112
 
 
113
    def chattr(self, path, attr):
 
114
        try:
 
115
            SFTPServer.set_file_attr(path, attr)
 
116
        except OSError, e:
 
117
            return SFTPServer.convert_errno(e.errno)
 
118
        return SFTP_OK
 
119
 
 
120
    def list_folder(self, path):
 
121
        path = self._realpath(path)
 
122
        try:
 
123
            out = [ ]
 
124
            # TODO: win32 incorrectly lists paths with non-ascii if path is not
 
125
            # unicode. However on Linux the server should only deal with
 
126
            # bytestreams and posix.listdir does the right thing 
 
127
            if sys.platform == 'win32':
 
128
                flist = [f.encode('utf8') for f in os.listdir(path)]
 
129
            else:
 
130
                flist = os.listdir(path)
 
131
            for fname in flist:
 
132
                attr = SFTPAttributes.from_stat(os.stat(pathjoin(path, fname)))
 
133
                attr.filename = fname
 
134
                out.append(attr)
 
135
            return out
 
136
        except OSError, e:
 
137
            return SFTPServer.convert_errno(e.errno)
 
138
 
 
139
    def stat(self, path):
 
140
        path = self._realpath(path)
 
141
        try:
 
142
            return SFTPAttributes.from_stat(os.stat(path))
 
143
        except OSError, e:
 
144
            return SFTPServer.convert_errno(e.errno)
 
145
 
 
146
    def lstat(self, path):
 
147
        path = self._realpath(path)
 
148
        try:
 
149
            return SFTPAttributes.from_stat(os.lstat(path))
 
150
        except OSError, e:
 
151
            return SFTPServer.convert_errno(e.errno)
 
152
 
 
153
    def open(self, path, flags, attr):
 
154
        path = self._realpath(path)
 
155
        try:
 
156
            flags |= getattr(os, 'O_BINARY', 0)
 
157
            if getattr(attr, 'st_mode', None):
 
158
                fd = os.open(path, flags, attr.st_mode)
 
159
            else:
 
160
                # os.open() defaults to 0777 which is
 
161
                # an odd default mode for files
 
162
                fd = os.open(path, flags, 0666)
 
163
        except OSError, e:
 
164
            return SFTPServer.convert_errno(e.errno)
 
165
 
 
166
        if (flags & os.O_CREAT) and (attr is not None):
 
167
            attr._flags &= ~attr.FLAG_PERMISSIONS
 
168
            SFTPServer.set_file_attr(path, attr)
 
169
        if flags & os.O_WRONLY:
 
170
            fstr = 'wb'
 
171
        elif flags & os.O_RDWR:
 
172
            fstr = 'rb+'
 
173
        else:
 
174
            # O_RDONLY (== 0)
 
175
            fstr = 'rb'
 
176
        try:
 
177
            f = os.fdopen(fd, fstr)
 
178
        except (IOError, OSError), e:
 
179
            return SFTPServer.convert_errno(e.errno)
 
180
        fobj = StubSFTPHandle()
 
181
        fobj.filename = path
 
182
        fobj.readfile = f
 
183
        fobj.writefile = f
 
184
        return fobj
 
185
 
 
186
    def remove(self, path):
 
187
        path = self._realpath(path)
 
188
        try:
 
189
            os.remove(path)
 
190
        except OSError, e:
 
191
            return SFTPServer.convert_errno(e.errno)
 
192
        return SFTP_OK
 
193
 
 
194
    def rename(self, oldpath, newpath):
 
195
        oldpath = self._realpath(oldpath)
 
196
        newpath = self._realpath(newpath)
 
197
        try:
 
198
            os.rename(oldpath, newpath)
 
199
        except OSError, e:
 
200
            return SFTPServer.convert_errno(e.errno)
 
201
        return SFTP_OK
 
202
 
 
203
    def mkdir(self, path, attr):
 
204
        path = self._realpath(path)
 
205
        try:
 
206
            # Using getattr() in case st_mode is None or 0
 
207
            # both evaluate to False
 
208
            if getattr(attr, 'st_mode', None):
 
209
                os.mkdir(path, attr.st_mode)
 
210
            else:
 
211
                os.mkdir(path)
 
212
            if attr is not None:
 
213
                attr._flags &= ~attr.FLAG_PERMISSIONS
 
214
                SFTPServer.set_file_attr(path, attr)
 
215
        except OSError, e:
 
216
            return SFTPServer.convert_errno(e.errno)
 
217
        return SFTP_OK
 
218
 
 
219
    def rmdir(self, path):
 
220
        path = self._realpath(path)
 
221
        try:
 
222
            os.rmdir(path)
 
223
        except OSError, e:
 
224
            return SFTPServer.convert_errno(e.errno)
 
225
        return SFTP_OK
 
226
 
 
227
    # removed: chattr, symlink, readlink
 
228
    # (nothing in bzr's sftp transport uses those)