/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_wsgi.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2009, 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for WSGI application"""
 
18
 
 
19
from .. import tests
 
20
from ..sixish import (
 
21
    BytesIO,
 
22
    )
 
23
from ..smart import medium, protocol
 
24
from ..transport.http import wsgi
 
25
from ..transport import chroot, memory
 
26
 
 
27
 
 
28
class WSGITestMixin(object):
 
29
 
 
30
    def build_environ(self, updates=None):
 
31
        """Builds an environ dict with all fields required by PEP 333.
 
32
 
 
33
        :param updates: a dict to that will be incorporated into the returned
 
34
            dict using dict.update(updates).
 
35
        """
 
36
        environ = {
 
37
            # Required CGI variables
 
38
            'REQUEST_METHOD': 'GET',
 
39
            'SCRIPT_NAME': '/script/name/',
 
40
            'PATH_INFO': 'path/info',
 
41
            'SERVER_NAME': 'test',
 
42
            'SERVER_PORT': '9999',
 
43
            'SERVER_PROTOCOL': 'HTTP/1.0',
 
44
 
 
45
            # Required WSGI variables
 
46
            'wsgi.version': (1,0),
 
47
            'wsgi.url_scheme': 'http',
 
48
            'wsgi.input': BytesIO(b''),
 
49
            'wsgi.errors': BytesIO(),
 
50
            'wsgi.multithread': False,
 
51
            'wsgi.multiprocess': False,
 
52
            'wsgi.run_once': True,
 
53
        }
 
54
        if updates is not None:
 
55
            environ.update(updates)
 
56
        return environ
 
57
 
 
58
    def read_response(self, iterable):
 
59
        response = ''
 
60
        for string in iterable:
 
61
            response += string
 
62
        return response
 
63
 
 
64
    def start_response(self, status, headers):
 
65
        self.status = status
 
66
        self.headers = headers
 
67
 
 
68
 
 
69
class TestWSGI(tests.TestCaseInTempDir, WSGITestMixin):
 
70
 
 
71
    def setUp(self):
 
72
        super(TestWSGI, self).setUp()
 
73
        self.status = None
 
74
        self.headers = None
 
75
 
 
76
    def test_construct(self):
 
77
        app = wsgi.SmartWSGIApp(FakeTransport())
 
78
        self.assertIsInstance(
 
79
            app.backing_transport, chroot.ChrootTransport)
 
80
 
 
81
    def test_http_get_rejected(self):
 
82
        # GET requests are rejected.
 
83
        app = wsgi.SmartWSGIApp(FakeTransport())
 
84
        environ = self.build_environ({'REQUEST_METHOD': 'GET'})
 
85
        iterable = app(environ, self.start_response)
 
86
        self.read_response(iterable)
 
87
        self.assertEqual('405 Method not allowed', self.status)
 
88
        self.assertTrue(('Allow', 'POST') in self.headers)
 
89
 
 
90
    def _fake_make_request(self, transport, write_func, bytes, rcp):
 
91
        request = FakeRequest(transport, write_func)
 
92
        request.accept_bytes(bytes)
 
93
        self.request = request
 
94
        return request
 
95
 
 
96
    def test_smart_wsgi_app_uses_given_relpath(self):
 
97
        # The SmartWSGIApp should use the "breezy.relpath" field from the
 
98
        # WSGI environ to clone from its backing transport to get a specific
 
99
        # transport for this request.
 
100
        transport = FakeTransport()
 
101
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
102
        wsgi_app.backing_transport = transport
 
103
        wsgi_app.make_request = self._fake_make_request
 
104
        fake_input = BytesIO(b'fake request')
 
105
        environ = self.build_environ({
 
106
            'REQUEST_METHOD': 'POST',
 
107
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
108
            'wsgi.input': fake_input,
 
109
            'breezy.relpath': 'foo/bar',
 
110
        })
 
111
        iterable = wsgi_app(environ, self.start_response)
 
112
        response = self.read_response(iterable)
 
113
        self.assertEqual([('clone', 'foo/bar/')] , transport.calls)
 
114
 
 
115
    def test_smart_wsgi_app_request_and_response(self):
 
116
        # SmartWSGIApp reads the smart request from the 'wsgi.input' file-like
 
117
        # object in the environ dict, and returns the response via the iterable
 
118
        # returned to the WSGI handler.
 
119
        transport = memory.MemoryTransport()
 
120
        transport.put_bytes('foo', 'some bytes')
 
121
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
122
        wsgi_app.make_request = self._fake_make_request
 
123
        fake_input = BytesIO(b'fake request')
 
124
        environ = self.build_environ({
 
125
            'REQUEST_METHOD': 'POST',
 
126
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
127
            'wsgi.input': fake_input,
 
128
            'breezy.relpath': 'foo',
 
129
        })
 
130
        iterable = wsgi_app(environ, self.start_response)
 
131
        response = self.read_response(iterable)
 
132
        self.assertEqual('200 OK', self.status)
 
133
        self.assertEqual('got bytes: fake request', response)
 
134
 
 
135
    def test_relpath_setter(self):
 
136
        # wsgi.RelpathSetter is WSGI "middleware" to set the 'breezy.relpath'
 
137
        # variable.
 
138
        calls = []
 
139
        def fake_app(environ, start_response):
 
140
            calls.append(environ['breezy.relpath'])
 
141
        wrapped_app = wsgi.RelpathSetter(
 
142
            fake_app, prefix='/abc/', path_var='FOO')
 
143
        wrapped_app({'FOO': '/abc/xyz/.bzr/smart'}, None)
 
144
        self.assertEqual(['xyz'], calls)
 
145
 
 
146
    def test_relpath_setter_bad_path_prefix(self):
 
147
        # wsgi.RelpathSetter will reject paths with that don't match the prefix
 
148
        # with a 404.  This is probably a sign of misconfiguration; a server
 
149
        # shouldn't ever be invoking our WSGI application with bad paths.
 
150
        def fake_app(environ, start_response):
 
151
            self.fail('The app should never be called when the path is wrong')
 
152
        wrapped_app = wsgi.RelpathSetter(
 
153
            fake_app, prefix='/abc/', path_var='FOO')
 
154
        iterable = wrapped_app(
 
155
            {'FOO': 'AAA/abc/xyz/.bzr/smart'}, self.start_response)
 
156
        self.read_response(iterable)
 
157
        self.assertTrue(self.status.startswith('404'))
 
158
 
 
159
    def test_relpath_setter_bad_path_suffix(self):
 
160
        # Similar to test_relpath_setter_bad_path_prefix: wsgi.RelpathSetter
 
161
        # will reject paths with that don't match the suffix '.bzr/smart' with a
 
162
        # 404 as well.  Again, this shouldn't be seen by our WSGI application if
 
163
        # the server is configured correctly.
 
164
        def fake_app(environ, start_response):
 
165
            self.fail('The app should never be called when the path is wrong')
 
166
        wrapped_app = wsgi.RelpathSetter(
 
167
            fake_app, prefix='/abc/', path_var='FOO')
 
168
        iterable = wrapped_app(
 
169
            {'FOO': '/abc/xyz/.bzr/AAA'}, self.start_response)
 
170
        self.read_response(iterable)
 
171
        self.assertTrue(self.status.startswith('404'))
 
172
 
 
173
    def test_make_app(self):
 
174
        # The make_app helper constructs a SmartWSGIApp wrapped in a
 
175
        # RelpathSetter.
 
176
        app = wsgi.make_app(
 
177
            root='a root',
 
178
            prefix='a prefix',
 
179
            path_var='a path_var')
 
180
        self.assertIsInstance(app, wsgi.RelpathSetter)
 
181
        self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
 
182
        self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
 
183
        backing_transport = app.app.backing_transport
 
184
        chroot_backing_transport = backing_transport.server.backing_transport
 
185
        self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
 
186
        self.assertEqual(app.app.root_client_path, 'a prefix')
 
187
        self.assertEqual(app.path_var, 'a path_var')
 
188
 
 
189
    def test_incomplete_request(self):
 
190
        transport = FakeTransport()
 
191
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
192
        def make_request(transport, write_func, bytes, root_client_path):
 
193
            request = IncompleteRequest(transport, write_func)
 
194
            request.accept_bytes(bytes)
 
195
            self.request = request
 
196
            return request
 
197
        wsgi_app.make_request = make_request
 
198
 
 
199
        fake_input = BytesIO(b'incomplete request')
 
200
        environ = self.build_environ({
 
201
            'REQUEST_METHOD': 'POST',
 
202
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
203
            'wsgi.input': fake_input,
 
204
            'breezy.relpath': 'foo/bar',
 
205
        })
 
206
        iterable = wsgi_app(environ, self.start_response)
 
207
        response = self.read_response(iterable)
 
208
        self.assertEqual('200 OK', self.status)
 
209
        self.assertEqual('error\x01incomplete request\n', response)
 
210
 
 
211
    def test_protocol_version_detection_one(self):
 
212
        # SmartWSGIApp detects requests that don't start with
 
213
        # REQUEST_VERSION_TWO as version one.
 
214
        transport = memory.MemoryTransport()
 
215
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
216
        fake_input = BytesIO(b'hello\n')
 
217
        environ = self.build_environ({
 
218
            'REQUEST_METHOD': 'POST',
 
219
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
220
            'wsgi.input': fake_input,
 
221
            'breezy.relpath': 'foo',
 
222
        })
 
223
        iterable = wsgi_app(environ, self.start_response)
 
224
        response = self.read_response(iterable)
 
225
        self.assertEqual('200 OK', self.status)
 
226
        # Expect a version 1-encoded response.
 
227
        self.assertEqual('ok\x012\n', response)
 
228
 
 
229
    def test_protocol_version_detection_two(self):
 
230
        # SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
 
231
        # as version two.
 
232
        transport = memory.MemoryTransport()
 
233
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
234
        fake_input = BytesIO(protocol.REQUEST_VERSION_TWO + 'hello\n')
 
235
        environ = self.build_environ({
 
236
            'REQUEST_METHOD': 'POST',
 
237
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
238
            'wsgi.input': fake_input,
 
239
            'breezy.relpath': 'foo',
 
240
        })
 
241
        iterable = wsgi_app(environ, self.start_response)
 
242
        response = self.read_response(iterable)
 
243
        self.assertEqual('200 OK', self.status)
 
244
        # Expect a version 2-encoded response.
 
245
        self.assertEqual(
 
246
            protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n', response)
 
247
 
 
248
 
 
249
class TestWSGIJail(tests.TestCaseWithMemoryTransport, WSGITestMixin):
 
250
 
 
251
    def make_hpss_wsgi_request(self, wsgi_relpath, *args):
 
252
        write_buf = BytesIO()
 
253
        request_medium = medium.SmartSimplePipesClientMedium(
 
254
            None, write_buf, 'fake:' + wsgi_relpath)
 
255
        request_encoder = protocol.ProtocolThreeRequester(
 
256
            request_medium.get_request())
 
257
        request_encoder.call(*args)
 
258
        write_buf.seek(0)
 
259
        environ = self.build_environ({
 
260
            'REQUEST_METHOD': 'POST',
 
261
            'CONTENT_LENGTH': len(write_buf.getvalue()),
 
262
            'wsgi.input': write_buf,
 
263
            'breezy.relpath': wsgi_relpath,
 
264
        })
 
265
        return environ
 
266
 
 
267
    def test_jail_root(self):
 
268
        """The WSGI HPSS glue allows access to the whole WSGI backing
 
269
        transport, regardless of which HTTP path the request was delivered
 
270
        to.
 
271
        """
 
272
        # make a branch in a shared repo
 
273
        self.make_repository('repo', shared=True)
 
274
        branch = self.make_bzrdir('repo/branch').create_branch()
 
275
        # serve the repo via bzr+http WSGI
 
276
        wsgi_app = wsgi.SmartWSGIApp(self.get_transport())
 
277
        # send a request to /repo/branch that will have to access /repo.
 
278
        environ = self.make_hpss_wsgi_request(
 
279
            '/repo/branch', 'BzrDir.open_branchV2', '.')
 
280
        iterable = wsgi_app(environ, self.start_response)
 
281
        response_bytes = self.read_response(iterable)
 
282
        self.assertEqual('200 OK', self.status)
 
283
        # expect a successful response, rather than a jail break error
 
284
        from breezy.tests.test_smart_transport import LoggingMessageHandler
 
285
        message_handler = LoggingMessageHandler()
 
286
        decoder = protocol.ProtocolThreeDecoder(
 
287
            message_handler, expect_version_marker=True)
 
288
        decoder.accept_bytes(response_bytes)
 
289
        self.assertTrue(
 
290
            ('structure', ('branch', branch._format.network_name()))
 
291
            in message_handler.event_log)
 
292
 
 
293
 
 
294
class FakeRequest(object):
 
295
 
 
296
    def __init__(self, transport, write_func):
 
297
        self.transport = transport
 
298
        self.write_func = write_func
 
299
        self.accepted_bytes = ''
 
300
 
 
301
    def accept_bytes(self, bytes):
 
302
        self.accepted_bytes = bytes
 
303
        self.write_func('got bytes: ' + bytes)
 
304
 
 
305
    def next_read_size(self):
 
306
        return 0
 
307
 
 
308
 
 
309
class FakeTransport(object):
 
310
 
 
311
    def __init__(self):
 
312
        self.calls = []
 
313
        self.base = 'fake:///'
 
314
 
 
315
    def abspath(self, relpath):
 
316
        return 'fake:///' + relpath
 
317
 
 
318
    def clone(self, relpath):
 
319
        self.calls.append(('clone', relpath))
 
320
        return self
 
321
 
 
322
 
 
323
class IncompleteRequest(FakeRequest):
 
324
    """A request-like object that always expects to read more bytes."""
 
325
 
 
326
    def next_read_size(self):
 
327
        # this request always asks for more
 
328
        return 1
 
329