1
# Copyright (C) 2009, 2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
Based on pyftpdlib: http://code.google.com/p/pyftpdlib/
27
from pyftpdlib.authorizers import (
31
from pyftpdlib.filesystems import AbstractedFS
32
from pyftpdlib.handlers import (
36
from pyftpdlib.servers import FTPServer
46
from breezy.tests import test_server
49
class NullHandler(logging.Handler):
51
def emit(self, record):
54
# Shut up very verbose pyftpdlib
55
logging.getLogger('pyftpdlib').addHandler(NullHandler())
58
# Convert the pyftplib string version into a tuple to avoid traps in string
60
pyftplib_version = tuple(map(int, pyftpdlib.__ver__.split('.')))
63
class AnonymousWithWriteAccessAuthorizer(DummyAuthorizer):
65
def _check_permissions(self, username, perm):
66
# Like base implementation but don't warn about write permissions
67
# assigned to anonymous, since that's exactly our purpose.
69
if p not in self.read_perms + self.write_perms:
70
raise AuthorizerError('No such permission "%s"' %p)
73
class BzrConformingFS(AbstractedFS):
75
def chmod(self, path, mode):
76
return os.chmod(path, mode)
78
def listdir(self, path):
79
"""List the content of a directory."""
80
return [osutils.safe_utf8(s) for s in os.listdir(path)]
82
def fs2ftp(self, fspath):
83
p = AbstractedFS.fs2ftp(self, osutils.safe_unicode(fspath))
84
return osutils.safe_utf8(p)
86
def ftp2fs(self, ftppath):
87
p = osutils.safe_unicode(ftppath)
88
return AbstractedFS.ftp2fs(self, p)
91
class BzrConformingFTPHandler(FTPHandler):
93
abstracted_fs = BzrConformingFS
95
def __init__(self, conn, server, ioloop=None):
96
FTPHandler.__init__(self, conn, server)
97
self.authorizer = server.authorizer
99
def ftp_SIZE(self, path):
100
# bzr is overly picky here, but we want to make the test suite pass
101
# first. This may need to be revisited -- vila 20090226
102
line = self.fs.fs2ftp(path)
103
if self.fs.isdir(self.fs.realpath(path)):
104
why = "%s is a directory" % line
105
self.log('FAIL SIZE "%s". %s.' % (line, why))
106
self.respond("550 %s." %why)
108
FTPHandler.ftp_SIZE(self, path)
110
def ftp_NLST(self, path):
111
# bzr is overly picky here, but we want to make the test suite pass
112
# first. This may need to be revisited -- vila 20090226
113
line = self.fs.fs2ftp(path)
114
if self.fs.isfile(self.fs.realpath(path)):
115
why = "Not a directory: %s" % line
116
self.log('FAIL NLST "%s". %s.' % (line, why))
117
self.respond("550 %s." %why)
119
FTPHandler.ftp_NLST(self, path)
121
def log_cmd(self, cmd, arg, respcode, respstr):
122
# base class version choke on unicode, the alternative is to just
123
# provide an empty implementation and relies on the client to do
124
# the logging for debugging purposes. Not worth the trouble so far
126
if cmd in ("DELE", "RMD", "RNFR", "RNTO", "MKD"):
127
line = '"%s" %s' % (' '.join([cmd, unicode(arg)]).strip(), respcode)
131
# An empty password is valid, hence the arg is neither mandatory nor forbidden
132
proto_cmds['PASS']['arg'] = None
134
class ftp_server(FTPServer):
136
def __init__(self, address, handler, authorizer):
137
FTPServer.__init__(self, address, handler)
138
self.authorizer = authorizer
139
# Worth backporting upstream ?
140
self.addr = self.socket.getsockname()
143
class FTPTestServer(test_server.TestServer):
144
"""Common code for FTP server facilities."""
148
self._ftp_server = None
150
self._async_thread = None
153
self._ftpd_running = False
156
"""Calculate an ftp url to this server."""
157
return 'ftp://anonymous@localhost:%d/' % (self._port)
159
def get_bogus_url(self):
160
"""Return a URL which cannot be connected to."""
161
return 'ftp://127.0.0.1:1/'
163
def log(self, message):
164
"""This is used by ftp_server to log connections, etc."""
165
self.logs.append(message)
167
def start_server(self, vfs_server=None):
168
if not (vfs_server is None or isinstance(vfs_server,
169
test_server.LocalURLServer)):
170
raise AssertionError(
171
"FTPServer currently assumes local transport, got %s"
173
self._root = osutils.getcwd()
175
address = ('localhost', 0) # bind to a random port
176
authorizer = AnonymousWithWriteAccessAuthorizer()
177
authorizer.add_anonymous(self._root, perm='elradfmwM')
178
self._ftp_server = ftp_server(address, BzrConformingFTPHandler,
181
self._port = self._ftp_server.socket.getsockname()[1]
182
self._ftpd_starting = threading.Lock()
183
self._ftpd_starting.acquire() # So it can be released by the server
184
self._ftpd_thread = threading.Thread(target=self._run_server,)
185
self._ftpd_thread.start()
186
if 'threads' in tests.selftest_debug_flags:
187
sys.stderr.write('Thread started: %s\n'
188
% (self._ftpd_thread.ident,))
189
# Wait for the server thread to start (i.e release the lock)
190
self._ftpd_starting.acquire()
191
self._ftpd_starting.release()
193
def stop_server(self):
194
"""See breezy.transport.Server.stop_server."""
195
# Tell the server to stop, but also close the server socket for tests
196
# that start the server but never initiate a connection. Closing the
197
# socket should be done first though, to avoid further connections.
198
self._ftp_server.close()
199
self._ftpd_running = False
200
self._ftpd_thread.join()
201
if 'threads' in tests.selftest_debug_flags:
202
sys.stderr.write('Thread joined: %s\n'
203
% (self._ftpd_thread.ident,))
205
def _run_server(self):
206
"""Run the server until stop_server is called.
208
Shut it down properly then.
210
self._ftpd_running = True
211
self._ftpd_starting.release()
212
while self._ftpd_running:
214
self._ftp_server.serve_forever(timeout=0.1)
215
except select.error as e:
216
if e.args[0] != errno.EBADF:
218
self._ftp_server.close_all()
220
def add_user(self, user, password):
221
"""Add a user with write access."""
222
self._ftp_server.authorizer.add_user(user, password, self._root,