/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/ftp_server/pyftpdlib_based.py

  • Committer: Jelmer Vernooij
  • Date: 2018-11-06 01:18:08 UTC
  • mfrom: (7143 work)
  • mto: This revision was merged to the branch mainline in revision 7151.
  • Revision ID: jelmer@jelmer.uk-20181106011808-y870f4vq0ork3ahu
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2009, 2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
"""
17
 
FTP test server.
18
 
 
19
 
Based on pyftpdlib: http://code.google.com/p/pyftpdlib/
20
 
"""
21
 
 
22
 
import errno
23
 
import logging
24
 
import os
25
 
import pyftpdlib
26
 
import sys
27
 
from pyftpdlib.authorizers import (
28
 
    AuthorizerError,
29
 
    DummyAuthorizer,
30
 
    )
31
 
from pyftpdlib.filesystems import AbstractedFS
32
 
from pyftpdlib.handlers import (
33
 
    FTPHandler,
34
 
    proto_cmds,
35
 
    )
36
 
from pyftpdlib.servers import FTPServer
37
 
import select
38
 
import threading
39
 
 
40
 
 
41
 
from breezy import (
42
 
    osutils,
43
 
    tests,
44
 
    trace,
45
 
    )
46
 
from breezy.tests import test_server
47
 
 
48
 
 
49
 
class NullHandler(logging.Handler):
50
 
 
51
 
    def emit(self, record):
52
 
        pass
53
 
 
54
 
# Shut up very verbose pyftpdlib
55
 
logging.getLogger('pyftpdlib').addHandler(NullHandler())
56
 
 
57
 
 
58
 
# Convert the pyftplib string version into a tuple to avoid traps in string
59
 
# comparison.
60
 
pyftplib_version = tuple(map(int, pyftpdlib.__ver__.split('.')))
61
 
 
62
 
 
63
 
class AnonymousWithWriteAccessAuthorizer(DummyAuthorizer):
64
 
 
65
 
    def _check_permissions(self, username, perm):
66
 
        # Like base implementation but don't warn about write permissions
67
 
        # assigned to anonymous, since that's exactly our purpose.
68
 
        for p in perm:
69
 
            if p not in self.read_perms + self.write_perms:
70
 
                raise AuthorizerError('No such permission "%s"' %p)
71
 
 
72
 
 
73
 
class BzrConformingFS(AbstractedFS):
74
 
 
75
 
    def chmod(self, path, mode):
76
 
        return os.chmod(path, mode)
77
 
 
78
 
    def listdir(self, path):
79
 
        """List the content of a directory."""
80
 
        return [osutils.safe_utf8(s) for s in os.listdir(path)]
81
 
 
82
 
    def fs2ftp(self, fspath):
83
 
        p = AbstractedFS.fs2ftp(self, osutils.safe_unicode(fspath))
84
 
        return osutils.safe_utf8(p)
85
 
 
86
 
    def ftp2fs(self, ftppath):
87
 
        p = osutils.safe_unicode(ftppath)
88
 
        return AbstractedFS.ftp2fs(self, p)
89
 
 
90
 
 
91
 
class BzrConformingFTPHandler(FTPHandler):
92
 
 
93
 
    abstracted_fs = BzrConformingFS
94
 
 
95
 
    def __init__(self, conn, server, ioloop=None):
96
 
        FTPHandler.__init__(self, conn, server)
97
 
        self.authorizer = server.authorizer
98
 
 
99
 
    def ftp_SIZE(self, path):
100
 
        # bzr is overly picky here, but we want to make the test suite pass
101
 
        # first. This may need to be revisited -- vila 20090226
102
 
        line = self.fs.fs2ftp(path)
103
 
        if self.fs.isdir(self.fs.realpath(path)):
104
 
            why = "%s is a directory" % line
105
 
            self.log('FAIL SIZE "%s". %s.' % (line, why))
106
 
            self.respond("550 %s."  %why)
107
 
        else:
108
 
            FTPHandler.ftp_SIZE(self, path)
109
 
 
110
 
    def ftp_NLST(self, path):
111
 
        # bzr is overly picky here, but we want to make the test suite pass
112
 
        # first. This may need to be revisited -- vila 20090226
113
 
        line = self.fs.fs2ftp(path)
114
 
        if self.fs.isfile(self.fs.realpath(path)):
115
 
            why = "Not a directory: %s" % line
116
 
            self.log('FAIL NLST "%s". %s.' % (line, why))
117
 
            self.respond("550 %s."  %why)
118
 
        else:
119
 
            FTPHandler.ftp_NLST(self, path)
120
 
 
121
 
    def log_cmd(self, cmd, arg, respcode, respstr):
122
 
        # base class version choke on unicode, the alternative is to just
123
 
        # provide an empty implementation and relies on the client to do
124
 
        # the logging for debugging purposes. Not worth the trouble so far
125
 
        # -- vila 20110607
126
 
        if cmd in ("DELE", "RMD", "RNFR", "RNTO", "MKD"):
127
 
            line = '"%s" %s' % (' '.join([cmd, unicode(arg)]).strip(), respcode)
128
 
            self.log(line)
129
 
 
130
 
 
131
 
# An empty password is valid, hence the arg is neither mandatory nor forbidden
132
 
proto_cmds['PASS']['arg'] = None
133
 
 
134
 
class ftp_server(FTPServer):
135
 
 
136
 
    def __init__(self, address, handler, authorizer):
137
 
        FTPServer.__init__(self, address, handler)
138
 
        self.authorizer = authorizer
139
 
        # Worth backporting upstream ?
140
 
        self.addr = self.socket.getsockname()
141
 
 
142
 
 
143
 
class FTPTestServer(test_server.TestServer):
144
 
    """Common code for FTP server facilities."""
145
 
 
146
 
    def __init__(self):
147
 
        self._root = None
148
 
        self._ftp_server = None
149
 
        self._port = None
150
 
        self._async_thread = None
151
 
        # ftp server logs
152
 
        self.logs = []
153
 
        self._ftpd_running = False
154
 
 
155
 
    def get_url(self):
156
 
        """Calculate an ftp url to this server."""
157
 
        return 'ftp://anonymous@localhost:%d/' % (self._port)
158
 
 
159
 
    def get_bogus_url(self):
160
 
        """Return a URL which cannot be connected to."""
161
 
        return 'ftp://127.0.0.1:1/'
162
 
 
163
 
    def log(self, message):
164
 
        """This is used by ftp_server to log connections, etc."""
165
 
        self.logs.append(message)
166
 
 
167
 
    def start_server(self, vfs_server=None):
168
 
        if not (vfs_server is None or isinstance(vfs_server,
169
 
                                                 test_server.LocalURLServer)):
170
 
            raise AssertionError(
171
 
                "FTPServer currently assumes local transport, got %s"
172
 
                % vfs_server)
173
 
        self._root = osutils.getcwd()
174
 
 
175
 
        address = ('localhost', 0) # bind to a random port
176
 
        authorizer = AnonymousWithWriteAccessAuthorizer()
177
 
        authorizer.add_anonymous(self._root, perm='elradfmwM')
178
 
        self._ftp_server = ftp_server(address, BzrConformingFTPHandler,
179
 
                                      authorizer)
180
 
 
181
 
        self._port = self._ftp_server.socket.getsockname()[1]
182
 
        self._ftpd_starting = threading.Lock()
183
 
        self._ftpd_starting.acquire() # So it can be released by the server
184
 
        self._ftpd_thread = threading.Thread(target=self._run_server,)
185
 
        self._ftpd_thread.start()
186
 
        if 'threads' in tests.selftest_debug_flags:
187
 
            sys.stderr.write('Thread started: %s\n'
188
 
                             % (self._ftpd_thread.ident,))
189
 
        # Wait for the server thread to start (i.e release the lock)
190
 
        self._ftpd_starting.acquire()
191
 
        self._ftpd_starting.release()
192
 
 
193
 
    def stop_server(self):
194
 
        """See breezy.transport.Server.stop_server."""
195
 
        # Tell the server to stop, but also close the server socket for tests
196
 
        # that start the server but never initiate a connection. Closing the
197
 
        # socket should be done first though, to avoid further connections.
198
 
        self._ftp_server.close()
199
 
        self._ftpd_running = False
200
 
        self._ftpd_thread.join()
201
 
        if 'threads' in tests.selftest_debug_flags:
202
 
            sys.stderr.write('Thread  joined: %s\n'
203
 
                             % (self._ftpd_thread.ident,))
204
 
 
205
 
    def _run_server(self):
206
 
        """Run the server until stop_server is called.
207
 
 
208
 
        Shut it down properly then.
209
 
        """
210
 
        self._ftpd_running = True
211
 
        self._ftpd_starting.release()
212
 
        while self._ftpd_running:
213
 
            try:
214
 
                self._ftp_server.serve_forever(timeout=0.1)
215
 
            except select.error as e:
216
 
                if e.args[0] != errno.EBADF:
217
 
                    raise
218
 
        self._ftp_server.close_all()
219
 
 
220
 
    def add_user(self, user, password):
221
 
        """Add a user with write access."""
222
 
        self._ftp_server.authorizer.add_user(user, password, self._root,
223
 
                                             perm='elradfmwM')