/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_wsgi.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-19 23:18:42 UTC
  • mto: (7490.3.4 work)
  • mto: This revision was merged to the branch mainline in revision 7495.
  • Revision ID: jelmer@jelmer.uk-20200219231842-agwjh2db66cpajqg
Consistent return values.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2009, 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for WSGI application"""
 
18
 
 
19
from .. import tests
 
20
from ..sixish import (
 
21
    BytesIO,
 
22
    )
 
23
from ..bzr.smart import medium, protocol
 
24
from ..transport.http import wsgi
 
25
from ..transport import chroot, memory
 
26
 
 
27
 
 
28
class WSGITestMixin(object):
 
29
 
 
30
    def build_environ(self, updates=None):
 
31
        """Builds an environ dict with all fields required by PEP 333.
 
32
 
 
33
        :param updates: a dict to that will be incorporated into the returned
 
34
            dict using dict.update(updates).
 
35
        """
 
36
        environ = {
 
37
            # Required CGI variables
 
38
            'REQUEST_METHOD': 'GET',
 
39
            'SCRIPT_NAME': '/script/name/',
 
40
            'PATH_INFO': 'path/info',
 
41
            'SERVER_NAME': 'test',
 
42
            'SERVER_PORT': '9999',
 
43
            'SERVER_PROTOCOL': 'HTTP/1.0',
 
44
 
 
45
            # Required WSGI variables
 
46
            'wsgi.version': (1, 0),
 
47
            'wsgi.url_scheme': 'http',
 
48
            'wsgi.input': BytesIO(b''),
 
49
            'wsgi.errors': BytesIO(),
 
50
            'wsgi.multithread': False,
 
51
            'wsgi.multiprocess': False,
 
52
            'wsgi.run_once': True,
 
53
        }
 
54
        if updates is not None:
 
55
            environ.update(updates)
 
56
        return environ
 
57
 
 
58
    def read_response(self, iterable):
 
59
        response = b''
 
60
        for string in iterable:
 
61
            response += string
 
62
        return response
 
63
 
 
64
    def start_response(self, status, headers):
 
65
        self.status = status
 
66
        self.headers = headers
 
67
 
 
68
 
 
69
class TestWSGI(tests.TestCaseInTempDir, WSGITestMixin):
 
70
 
 
71
    def setUp(self):
 
72
        super(TestWSGI, self).setUp()
 
73
        self.status = None
 
74
        self.headers = None
 
75
 
 
76
    def test_construct(self):
 
77
        app = wsgi.SmartWSGIApp(FakeTransport())
 
78
        self.assertIsInstance(
 
79
            app.backing_transport, chroot.ChrootTransport)
 
80
 
 
81
    def test_http_get_rejected(self):
 
82
        # GET requests are rejected.
 
83
        app = wsgi.SmartWSGIApp(FakeTransport())
 
84
        environ = self.build_environ({'REQUEST_METHOD': 'GET'})
 
85
        iterable = app(environ, self.start_response)
 
86
        self.read_response(iterable)
 
87
        self.assertEqual('405 Method not allowed', self.status)
 
88
        self.assertTrue(('Allow', 'POST') in self.headers)
 
89
 
 
90
    def _fake_make_request(self, transport, write_func, bytes, rcp):
 
91
        request = FakeRequest(transport, write_func)
 
92
        request.accept_bytes(bytes)
 
93
        self.request = request
 
94
        return request
 
95
 
 
96
    def test_smart_wsgi_app_uses_given_relpath(self):
 
97
        # The SmartWSGIApp should use the "breezy.relpath" field from the
 
98
        # WSGI environ to clone from its backing transport to get a specific
 
99
        # transport for this request.
 
100
        transport = FakeTransport()
 
101
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
102
        wsgi_app.backing_transport = transport
 
103
        wsgi_app.make_request = self._fake_make_request
 
104
        fake_input = BytesIO(b'fake request')
 
105
        environ = self.build_environ({
 
106
            'REQUEST_METHOD': 'POST',
 
107
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
108
            'wsgi.input': fake_input,
 
109
            'breezy.relpath': 'foo/bar',
 
110
        })
 
111
        iterable = wsgi_app(environ, self.start_response)
 
112
        response = self.read_response(iterable)
 
113
        self.assertEqual([('clone', 'foo/bar/')], transport.calls)
 
114
 
 
115
    def test_smart_wsgi_app_request_and_response(self):
 
116
        # SmartWSGIApp reads the smart request from the 'wsgi.input' file-like
 
117
        # object in the environ dict, and returns the response via the iterable
 
118
        # returned to the WSGI handler.
 
119
        transport = memory.MemoryTransport()
 
120
        transport.put_bytes('foo', b'some bytes')
 
121
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
122
        wsgi_app.make_request = self._fake_make_request
 
123
        fake_input = BytesIO(b'fake request')
 
124
        environ = self.build_environ({
 
125
            'REQUEST_METHOD': 'POST',
 
126
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
127
            'wsgi.input': fake_input,
 
128
            'breezy.relpath': 'foo',
 
129
        })
 
130
        iterable = wsgi_app(environ, self.start_response)
 
131
        response = self.read_response(iterable)
 
132
        self.assertEqual('200 OK', self.status)
 
133
        self.assertEqual(b'got bytes: fake request', response)
 
134
 
 
135
    def test_relpath_setter(self):
 
136
        # wsgi.RelpathSetter is WSGI "middleware" to set the 'breezy.relpath'
 
137
        # variable.
 
138
        calls = []
 
139
 
 
140
        def fake_app(environ, start_response):
 
141
            calls.append(environ['breezy.relpath'])
 
142
        wrapped_app = wsgi.RelpathSetter(
 
143
            fake_app, prefix='/abc/', path_var='FOO')
 
144
        wrapped_app({'FOO': '/abc/xyz/.bzr/smart'}, None)
 
145
        self.assertEqual(['xyz'], calls)
 
146
 
 
147
    def test_relpath_setter_bad_path_prefix(self):
 
148
        # wsgi.RelpathSetter will reject paths with that don't match the prefix
 
149
        # with a 404.  This is probably a sign of misconfiguration; a server
 
150
        # shouldn't ever be invoking our WSGI application with bad paths.
 
151
        def fake_app(environ, start_response):
 
152
            self.fail('The app should never be called when the path is wrong')
 
153
        wrapped_app = wsgi.RelpathSetter(
 
154
            fake_app, prefix='/abc/', path_var='FOO')
 
155
        iterable = wrapped_app(
 
156
            {'FOO': 'AAA/abc/xyz/.bzr/smart'}, self.start_response)
 
157
        self.read_response(iterable)
 
158
        self.assertTrue(self.status.startswith('404'))
 
159
 
 
160
    def test_relpath_setter_bad_path_suffix(self):
 
161
        # Similar to test_relpath_setter_bad_path_prefix: wsgi.RelpathSetter
 
162
        # will reject paths with that don't match the suffix '.bzr/smart' with a
 
163
        # 404 as well.  Again, this shouldn't be seen by our WSGI application if
 
164
        # the server is configured correctly.
 
165
        def fake_app(environ, start_response):
 
166
            self.fail('The app should never be called when the path is wrong')
 
167
        wrapped_app = wsgi.RelpathSetter(
 
168
            fake_app, prefix='/abc/', path_var='FOO')
 
169
        iterable = wrapped_app(
 
170
            {'FOO': '/abc/xyz/.bzr/AAA'}, self.start_response)
 
171
        self.read_response(iterable)
 
172
        self.assertTrue(self.status.startswith('404'))
 
173
 
 
174
    def test_make_app(self):
 
175
        # The make_app helper constructs a SmartWSGIApp wrapped in a
 
176
        # RelpathSetter.
 
177
        app = wsgi.make_app(
 
178
            root='a root',
 
179
            prefix='a prefix',
 
180
            path_var='a path_var')
 
181
        self.assertIsInstance(app, wsgi.RelpathSetter)
 
182
        self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
 
183
        self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
 
184
        backing_transport = app.app.backing_transport
 
185
        chroot_backing_transport = backing_transport.server.backing_transport
 
186
        self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
 
187
        self.assertEqual(app.app.root_client_path, 'a prefix')
 
188
        self.assertEqual(app.path_var, 'a path_var')
 
189
 
 
190
    def test_incomplete_request(self):
 
191
        transport = FakeTransport()
 
192
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
193
 
 
194
        def make_request(transport, write_func, bytes, root_client_path):
 
195
            request = IncompleteRequest(transport, write_func)
 
196
            request.accept_bytes(bytes)
 
197
            self.request = request
 
198
            return request
 
199
        wsgi_app.make_request = make_request
 
200
 
 
201
        fake_input = BytesIO(b'incomplete request')
 
202
        environ = self.build_environ({
 
203
            'REQUEST_METHOD': 'POST',
 
204
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
205
            'wsgi.input': fake_input,
 
206
            'breezy.relpath': 'foo/bar',
 
207
        })
 
208
        iterable = wsgi_app(environ, self.start_response)
 
209
        response = self.read_response(iterable)
 
210
        self.assertEqual('200 OK', self.status)
 
211
        self.assertEqual(b'error\x01incomplete request\n', response)
 
212
 
 
213
    def test_protocol_version_detection_one(self):
 
214
        # SmartWSGIApp detects requests that don't start with
 
215
        # REQUEST_VERSION_TWO as version one.
 
216
        transport = memory.MemoryTransport()
 
217
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
218
        fake_input = BytesIO(b'hello\n')
 
219
        environ = self.build_environ({
 
220
            'REQUEST_METHOD': 'POST',
 
221
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
222
            'wsgi.input': fake_input,
 
223
            'breezy.relpath': 'foo',
 
224
        })
 
225
        iterable = wsgi_app(environ, self.start_response)
 
226
        response = self.read_response(iterable)
 
227
        self.assertEqual('200 OK', self.status)
 
228
        # Expect a version 1-encoded response.
 
229
        self.assertEqual(b'ok\x012\n', response)
 
230
 
 
231
    def test_protocol_version_detection_two(self):
 
232
        # SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
 
233
        # as version two.
 
234
        transport = memory.MemoryTransport()
 
235
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
236
        fake_input = BytesIO(protocol.REQUEST_VERSION_TWO + b'hello\n')
 
237
        environ = self.build_environ({
 
238
            'REQUEST_METHOD': 'POST',
 
239
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
240
            'wsgi.input': fake_input,
 
241
            'breezy.relpath': 'foo',
 
242
        })
 
243
        iterable = wsgi_app(environ, self.start_response)
 
244
        response = self.read_response(iterable)
 
245
        self.assertEqual('200 OK', self.status)
 
246
        # Expect a version 2-encoded response.
 
247
        self.assertEqual(
 
248
            protocol.RESPONSE_VERSION_TWO + b'success\nok\x012\n', response)
 
249
 
 
250
 
 
251
class TestWSGIJail(tests.TestCaseWithMemoryTransport, WSGITestMixin):
 
252
 
 
253
    def make_hpss_wsgi_request(self, wsgi_relpath, *args):
 
254
        write_buf = BytesIO()
 
255
        request_medium = medium.SmartSimplePipesClientMedium(
 
256
            None, write_buf, 'fake:' + wsgi_relpath)
 
257
        request_encoder = protocol.ProtocolThreeRequester(
 
258
            request_medium.get_request())
 
259
        request_encoder.call(*args)
 
260
        write_buf.seek(0)
 
261
        environ = self.build_environ({
 
262
            'REQUEST_METHOD': 'POST',
 
263
            'CONTENT_LENGTH': len(write_buf.getvalue()),
 
264
            'wsgi.input': write_buf,
 
265
            'breezy.relpath': wsgi_relpath,
 
266
        })
 
267
        return environ
 
268
 
 
269
    def test_jail_root(self):
 
270
        """The WSGI HPSS glue allows access to the whole WSGI backing
 
271
        transport, regardless of which HTTP path the request was delivered
 
272
        to.
 
273
        """
 
274
        # make a branch in a shared repo
 
275
        self.make_repository('repo', shared=True)
 
276
        branch = self.make_controldir('repo/branch').create_branch()
 
277
        # serve the repo via bzr+http WSGI
 
278
        wsgi_app = wsgi.SmartWSGIApp(self.get_transport())
 
279
        # send a request to /repo/branch that will have to access /repo.
 
280
        environ = self.make_hpss_wsgi_request(
 
281
            '/repo/branch', b'BzrDir.open_branchV2', b'.')
 
282
        iterable = wsgi_app(environ, self.start_response)
 
283
        response_bytes = self.read_response(iterable)
 
284
        self.assertEqual('200 OK', self.status)
 
285
        # expect a successful response, rather than a jail break error
 
286
        from breezy.tests.test_smart_transport import LoggingMessageHandler
 
287
        message_handler = LoggingMessageHandler()
 
288
        decoder = protocol.ProtocolThreeDecoder(
 
289
            message_handler, expect_version_marker=True)
 
290
        decoder.accept_bytes(response_bytes)
 
291
        self.assertTrue(
 
292
            ('structure', (b'branch', branch._format.network_name()))
 
293
            in message_handler.event_log)
 
294
 
 
295
 
 
296
class FakeRequest(object):
 
297
 
 
298
    def __init__(self, transport, write_func):
 
299
        self.transport = transport
 
300
        self.write_func = write_func
 
301
        self.accepted_bytes = b''
 
302
 
 
303
    def accept_bytes(self, bytes):
 
304
        self.accepted_bytes = bytes
 
305
        self.write_func(b'got bytes: ' + bytes)
 
306
 
 
307
    def next_read_size(self):
 
308
        return 0
 
309
 
 
310
 
 
311
class FakeTransport(object):
 
312
 
 
313
    def __init__(self):
 
314
        self.calls = []
 
315
        self.base = 'fake:///'
 
316
 
 
317
    def abspath(self, relpath):
 
318
        return 'fake:///' + relpath
 
319
 
 
320
    def clone(self, relpath):
 
321
        self.calls.append(('clone', relpath))
 
322
        return self
 
323
 
 
324
 
 
325
class IncompleteRequest(FakeRequest):
 
326
    """A request-like object that always expects to read more bytes."""
 
327
 
 
328
    def next_read_size(self):
 
329
        # this request always asks for more
 
330
        return 1