/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/stub_sftp.py

  • Committer: John Arbash Meinel
  • Date: 2006-07-18 18:57:54 UTC
  • mto: This revision was merged to the branch mainline in revision 1868.
  • Revision ID: john@arbash-meinel.com-20060718185754-4007745748e28db9
Commit timestamp restricted to 1ms precision.

The old code would restrict to 1s resolution if the timestamp was
supplied, while it preserved full resolution if the timestamp was
auto generated. Now both paths preserve only 1ms resolution.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""
 
18
A stub SFTP server for loopback SFTP testing.
 
19
Adapted from the one in paramiko's unit tests.
 
20
"""
 
21
 
 
22
import os
 
23
from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
 
24
    SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
 
25
import sys
 
26
 
 
27
from bzrlib.osutils import pathjoin
 
28
from bzrlib.trace import mutter
 
29
 
 
30
 
 
31
class StubServer (ServerInterface):
 
32
 
 
33
    def __init__(self, test_case):
 
34
        ServerInterface.__init__(self)
 
35
        self._test_case = test_case
 
36
 
 
37
    def check_auth_password(self, username, password):
 
38
        # all are allowed
 
39
        self._test_case.log('sftpserver - authorizing: %s' % (username,))
 
40
        return AUTH_SUCCESSFUL
 
41
 
 
42
    def check_channel_request(self, kind, chanid):
 
43
        self._test_case.log('sftpserver - channel request: %s, %s' % (kind, chanid))
 
44
        return OPEN_SUCCEEDED
 
45
 
 
46
 
 
47
class StubSFTPHandle (SFTPHandle):
 
48
    def stat(self):
 
49
        try:
 
50
            return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
 
51
        except OSError, e:
 
52
            return SFTPServer.convert_errno(e.errno)
 
53
 
 
54
    def chattr(self, attr):
 
55
        # python doesn't have equivalents to fchown or fchmod, so we have to
 
56
        # use the stored filename
 
57
        mutter('Changing permissions on %s to %s', self.filename, attr)
 
58
        try:
 
59
            SFTPServer.set_file_attr(self.filename, attr)
 
60
        except OSError, e:
 
61
            return SFTPServer.convert_errno(e.errno)
 
62
 
 
63
 
 
64
class StubSFTPServer (SFTPServerInterface):
 
65
 
 
66
    def __init__(self, server, root, home=None):
 
67
        SFTPServerInterface.__init__(self, server)
 
68
        # All paths are actually relative to 'root'.
 
69
        # this is like implementing chroot().
 
70
        self.root = root
 
71
        if home is None:
 
72
            self.home = ''
 
73
        else:
 
74
            assert home.startswith(self.root), \
 
75
                    "home must be a subdirectory of root (%s vs %s)" \
 
76
                    % (home, root)
 
77
            self.home = home[len(self.root):]
 
78
        if self.home.startswith('/'):
 
79
            self.home = self.home[1:]
 
80
        server._test_case.log('sftpserver - new connection')
 
81
 
 
82
    def _realpath(self, path):
 
83
        # paths returned from self.canonicalize() always start with
 
84
        # a path separator. So if 'root' is just '/', this would cause
 
85
        # a double slash at the beginning '//home/dir'. 
 
86
        if self.root == '/':
 
87
            return self.canonicalize(path)
 
88
        return self.root + self.canonicalize(path)
 
89
 
 
90
    if sys.platform == 'win32':
 
91
        def canonicalize(self, path):
 
92
            # Win32 sftp paths end up looking like
 
93
            #     sftp://host@foo/h:/foo/bar
 
94
            # which means absolute paths look like:
 
95
            #     /h:/foo/bar
 
96
            # and relative paths stay the same:
 
97
            #     foo/bar
 
98
            # win32 needs to use the Unicode APIs. so we require the 
 
99
            # paths to be utf8 (Linux just uses bytestreams)
 
100
            thispath = path.decode('utf8')
 
101
            if path.startswith('/'):
 
102
                # Abspath H:/foo/bar
 
103
                return os.path.normpath(thispath[1:])
 
104
            else:
 
105
                return os.path.normpath(os.path.join(self.home, thispath))
 
106
    else:
 
107
        def canonicalize(self, path):
 
108
            if os.path.isabs(path):
 
109
                return os.path.normpath(path)
 
110
            else:
 
111
                return os.path.normpath('/' + os.path.join(self.home, path))
 
112
 
 
113
    def chattr(self, path, attr):
 
114
        try:
 
115
            SFTPServer.set_file_attr(path, attr)
 
116
        except OSError, e:
 
117
            return SFTPServer.convert_errno(e.errno)
 
118
        return SFTP_OK
 
119
 
 
120
    def list_folder(self, path):
 
121
        path = self._realpath(path)
 
122
        try:
 
123
            out = [ ]
 
124
            # TODO: win32 incorrectly lists paths with non-ascii if path is not
 
125
            # unicode. However on Linux the server should only deal with
 
126
            # bytestreams and posix.listdir does the right thing 
 
127
            if sys.platform == 'win32':
 
128
                flist = [f.encode('utf8') for f in os.listdir(path)]
 
129
            else:
 
130
                flist = os.listdir(path)
 
131
            for fname in flist:
 
132
                attr = SFTPAttributes.from_stat(os.stat(pathjoin(path, fname)))
 
133
                attr.filename = fname
 
134
                out.append(attr)
 
135
            return out
 
136
        except OSError, e:
 
137
            return SFTPServer.convert_errno(e.errno)
 
138
 
 
139
    def stat(self, path):
 
140
        path = self._realpath(path)
 
141
        try:
 
142
            return SFTPAttributes.from_stat(os.stat(path))
 
143
        except OSError, e:
 
144
            return SFTPServer.convert_errno(e.errno)
 
145
 
 
146
    def lstat(self, path):
 
147
        path = self._realpath(path)
 
148
        try:
 
149
            return SFTPAttributes.from_stat(os.lstat(path))
 
150
        except OSError, e:
 
151
            return SFTPServer.convert_errno(e.errno)
 
152
 
 
153
    def open(self, path, flags, attr):
 
154
        path = self._realpath(path)
 
155
        try:
 
156
            if hasattr(os, 'O_BINARY'):
 
157
                flags |= os.O_BINARY
 
158
            if getattr(attr, 'st_mode', None):
 
159
                fd = os.open(path, flags, attr.st_mode)
 
160
            else:
 
161
                fd = os.open(path, flags)
 
162
        except OSError, e:
 
163
            return SFTPServer.convert_errno(e.errno)
 
164
 
 
165
        if (flags & os.O_CREAT) and (attr is not None):
 
166
            attr._flags &= ~attr.FLAG_PERMISSIONS
 
167
            SFTPServer.set_file_attr(path, attr)
 
168
        if flags & os.O_WRONLY:
 
169
            fstr = 'wb'
 
170
        elif flags & os.O_RDWR:
 
171
            fstr = 'rb+'
 
172
        else:
 
173
            # O_RDONLY (== 0)
 
174
            fstr = 'rb'
 
175
        try:
 
176
            f = os.fdopen(fd, fstr)
 
177
        except (IOError, OSError), e:
 
178
            return SFTPServer.convert_errno(e.errno)
 
179
        fobj = StubSFTPHandle()
 
180
        fobj.filename = path
 
181
        fobj.readfile = f
 
182
        fobj.writefile = f
 
183
        return fobj
 
184
 
 
185
    def remove(self, path):
 
186
        path = self._realpath(path)
 
187
        try:
 
188
            os.remove(path)
 
189
        except OSError, e:
 
190
            return SFTPServer.convert_errno(e.errno)
 
191
        return SFTP_OK
 
192
 
 
193
    def rename(self, oldpath, newpath):
 
194
        oldpath = self._realpath(oldpath)
 
195
        newpath = self._realpath(newpath)
 
196
        try:
 
197
            os.rename(oldpath, newpath)
 
198
        except OSError, e:
 
199
            return SFTPServer.convert_errno(e.errno)
 
200
        return SFTP_OK
 
201
 
 
202
    def mkdir(self, path, attr):
 
203
        path = self._realpath(path)
 
204
        try:
 
205
            # Using getattr() in case st_mode is None or 0
 
206
            # both evaluate to False
 
207
            if getattr(attr, 'st_mode', None):
 
208
                os.mkdir(path, attr.st_mode)
 
209
            else:
 
210
                os.mkdir(path)
 
211
            if attr is not None:
 
212
                attr._flags &= ~attr.FLAG_PERMISSIONS
 
213
                SFTPServer.set_file_attr(path, attr)
 
214
        except OSError, e:
 
215
            return SFTPServer.convert_errno(e.errno)
 
216
        return SFTP_OK
 
217
 
 
218
    def rmdir(self, path):
 
219
        path = self._realpath(path)
 
220
        try:
 
221
            os.rmdir(path)
 
222
        except OSError, e:
 
223
            return SFTPServer.convert_errno(e.errno)
 
224
        return SFTP_OK
 
225
 
 
226
    # removed: chattr, symlink, readlink
 
227
    # (nothing in bzr's sftp transport uses those)