1
# Copyright (C) 2006-2009, 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for WSGI application"""
19
from io import BytesIO
22
from ..bzr.smart import medium, protocol
23
from ..transport.http import wsgi
24
from ..transport import chroot, memory
27
class WSGITestMixin(object):
29
def build_environ(self, updates=None):
30
"""Builds an environ dict with all fields required by PEP 333.
32
:param updates: a dict to that will be incorporated into the returned
33
dict using dict.update(updates).
36
# Required CGI variables
37
'REQUEST_METHOD': 'GET',
38
'SCRIPT_NAME': '/script/name/',
39
'PATH_INFO': 'path/info',
40
'SERVER_NAME': 'test',
41
'SERVER_PORT': '9999',
42
'SERVER_PROTOCOL': 'HTTP/1.0',
44
# Required WSGI variables
45
'wsgi.version': (1, 0),
46
'wsgi.url_scheme': 'http',
47
'wsgi.input': BytesIO(b''),
48
'wsgi.errors': BytesIO(),
49
'wsgi.multithread': False,
50
'wsgi.multiprocess': False,
51
'wsgi.run_once': True,
53
if updates is not None:
54
environ.update(updates)
57
def read_response(self, iterable):
59
for string in iterable:
63
def start_response(self, status, headers):
65
self.headers = headers
68
class TestWSGI(tests.TestCaseInTempDir, WSGITestMixin):
71
super(TestWSGI, self).setUp()
75
def test_construct(self):
76
app = wsgi.SmartWSGIApp(FakeTransport())
77
self.assertIsInstance(
78
app.backing_transport, chroot.ChrootTransport)
80
def test_http_get_rejected(self):
81
# GET requests are rejected.
82
app = wsgi.SmartWSGIApp(FakeTransport())
83
environ = self.build_environ({'REQUEST_METHOD': 'GET'})
84
iterable = app(environ, self.start_response)
85
self.read_response(iterable)
86
self.assertEqual('405 Method not allowed', self.status)
87
self.assertTrue(('Allow', 'POST') in self.headers)
89
def _fake_make_request(self, transport, write_func, bytes, rcp):
90
request = FakeRequest(transport, write_func)
91
request.accept_bytes(bytes)
92
self.request = request
95
def test_smart_wsgi_app_uses_given_relpath(self):
96
# The SmartWSGIApp should use the "breezy.relpath" field from the
97
# WSGI environ to clone from its backing transport to get a specific
98
# transport for this request.
99
transport = FakeTransport()
100
wsgi_app = wsgi.SmartWSGIApp(transport)
101
wsgi_app.backing_transport = transport
102
wsgi_app.make_request = self._fake_make_request
103
fake_input = BytesIO(b'fake request')
104
environ = self.build_environ({
105
'REQUEST_METHOD': 'POST',
106
'CONTENT_LENGTH': len(fake_input.getvalue()),
107
'wsgi.input': fake_input,
108
'breezy.relpath': 'foo/bar',
110
iterable = wsgi_app(environ, self.start_response)
111
response = self.read_response(iterable)
112
self.assertEqual([('clone', 'foo/bar/')], transport.calls)
114
def test_smart_wsgi_app_request_and_response(self):
115
# SmartWSGIApp reads the smart request from the 'wsgi.input' file-like
116
# object in the environ dict, and returns the response via the iterable
117
# returned to the WSGI handler.
118
transport = memory.MemoryTransport()
119
transport.put_bytes('foo', b'some bytes')
120
wsgi_app = wsgi.SmartWSGIApp(transport)
121
wsgi_app.make_request = self._fake_make_request
122
fake_input = BytesIO(b'fake request')
123
environ = self.build_environ({
124
'REQUEST_METHOD': 'POST',
125
'CONTENT_LENGTH': len(fake_input.getvalue()),
126
'wsgi.input': fake_input,
127
'breezy.relpath': 'foo',
129
iterable = wsgi_app(environ, self.start_response)
130
response = self.read_response(iterable)
131
self.assertEqual('200 OK', self.status)
132
self.assertEqual(b'got bytes: fake request', response)
134
def test_relpath_setter(self):
135
# wsgi.RelpathSetter is WSGI "middleware" to set the 'breezy.relpath'
139
def fake_app(environ, start_response):
140
calls.append(environ['breezy.relpath'])
141
wrapped_app = wsgi.RelpathSetter(
142
fake_app, prefix='/abc/', path_var='FOO')
143
wrapped_app({'FOO': '/abc/xyz/.bzr/smart'}, None)
144
self.assertEqual(['xyz'], calls)
146
def test_relpath_setter_bad_path_prefix(self):
147
# wsgi.RelpathSetter will reject paths with that don't match the prefix
148
# with a 404. This is probably a sign of misconfiguration; a server
149
# shouldn't ever be invoking our WSGI application with bad paths.
150
def fake_app(environ, start_response):
151
self.fail('The app should never be called when the path is wrong')
152
wrapped_app = wsgi.RelpathSetter(
153
fake_app, prefix='/abc/', path_var='FOO')
154
iterable = wrapped_app(
155
{'FOO': 'AAA/abc/xyz/.bzr/smart'}, self.start_response)
156
self.read_response(iterable)
157
self.assertTrue(self.status.startswith('404'))
159
def test_relpath_setter_bad_path_suffix(self):
160
# Similar to test_relpath_setter_bad_path_prefix: wsgi.RelpathSetter
161
# will reject paths with that don't match the suffix '.bzr/smart' with a
162
# 404 as well. Again, this shouldn't be seen by our WSGI application if
163
# the server is configured correctly.
164
def fake_app(environ, start_response):
165
self.fail('The app should never be called when the path is wrong')
166
wrapped_app = wsgi.RelpathSetter(
167
fake_app, prefix='/abc/', path_var='FOO')
168
iterable = wrapped_app(
169
{'FOO': '/abc/xyz/.bzr/AAA'}, self.start_response)
170
self.read_response(iterable)
171
self.assertTrue(self.status.startswith('404'))
173
def test_make_app(self):
174
# The make_app helper constructs a SmartWSGIApp wrapped in a
179
path_var='a path_var')
180
self.assertIsInstance(app, wsgi.RelpathSetter)
181
self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
182
self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
183
backing_transport = app.app.backing_transport
184
chroot_backing_transport = backing_transport.server.backing_transport
185
self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
186
self.assertEqual(app.app.root_client_path, 'a prefix')
187
self.assertEqual(app.path_var, 'a path_var')
189
def test_incomplete_request(self):
190
transport = FakeTransport()
191
wsgi_app = wsgi.SmartWSGIApp(transport)
193
def make_request(transport, write_func, bytes, root_client_path):
194
request = IncompleteRequest(transport, write_func)
195
request.accept_bytes(bytes)
196
self.request = request
198
wsgi_app.make_request = make_request
200
fake_input = BytesIO(b'incomplete request')
201
environ = self.build_environ({
202
'REQUEST_METHOD': 'POST',
203
'CONTENT_LENGTH': len(fake_input.getvalue()),
204
'wsgi.input': fake_input,
205
'breezy.relpath': 'foo/bar',
207
iterable = wsgi_app(environ, self.start_response)
208
response = self.read_response(iterable)
209
self.assertEqual('200 OK', self.status)
210
self.assertEqual(b'error\x01incomplete request\n', response)
212
def test_protocol_version_detection_one(self):
213
# SmartWSGIApp detects requests that don't start with
214
# REQUEST_VERSION_TWO as version one.
215
transport = memory.MemoryTransport()
216
wsgi_app = wsgi.SmartWSGIApp(transport)
217
fake_input = BytesIO(b'hello\n')
218
environ = self.build_environ({
219
'REQUEST_METHOD': 'POST',
220
'CONTENT_LENGTH': len(fake_input.getvalue()),
221
'wsgi.input': fake_input,
222
'breezy.relpath': 'foo',
224
iterable = wsgi_app(environ, self.start_response)
225
response = self.read_response(iterable)
226
self.assertEqual('200 OK', self.status)
227
# Expect a version 1-encoded response.
228
self.assertEqual(b'ok\x012\n', response)
230
def test_protocol_version_detection_two(self):
231
# SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
233
transport = memory.MemoryTransport()
234
wsgi_app = wsgi.SmartWSGIApp(transport)
235
fake_input = BytesIO(protocol.REQUEST_VERSION_TWO + b'hello\n')
236
environ = self.build_environ({
237
'REQUEST_METHOD': 'POST',
238
'CONTENT_LENGTH': len(fake_input.getvalue()),
239
'wsgi.input': fake_input,
240
'breezy.relpath': 'foo',
242
iterable = wsgi_app(environ, self.start_response)
243
response = self.read_response(iterable)
244
self.assertEqual('200 OK', self.status)
245
# Expect a version 2-encoded response.
247
protocol.RESPONSE_VERSION_TWO + b'success\nok\x012\n', response)
250
class TestWSGIJail(tests.TestCaseWithMemoryTransport, WSGITestMixin):
252
def make_hpss_wsgi_request(self, wsgi_relpath, *args):
253
write_buf = BytesIO()
254
request_medium = medium.SmartSimplePipesClientMedium(
255
None, write_buf, 'fake:' + wsgi_relpath)
256
request_encoder = protocol.ProtocolThreeRequester(
257
request_medium.get_request())
258
request_encoder.call(*args)
260
environ = self.build_environ({
261
'REQUEST_METHOD': 'POST',
262
'CONTENT_LENGTH': len(write_buf.getvalue()),
263
'wsgi.input': write_buf,
264
'breezy.relpath': wsgi_relpath,
268
def test_jail_root(self):
269
"""The WSGI HPSS glue allows access to the whole WSGI backing
270
transport, regardless of which HTTP path the request was delivered
273
# make a branch in a shared repo
274
self.make_repository('repo', shared=True)
275
branch = self.make_controldir('repo/branch').create_branch()
276
# serve the repo via bzr+http WSGI
277
wsgi_app = wsgi.SmartWSGIApp(self.get_transport())
278
# send a request to /repo/branch that will have to access /repo.
279
environ = self.make_hpss_wsgi_request(
280
'/repo/branch', b'BzrDir.open_branchV2', b'.')
281
iterable = wsgi_app(environ, self.start_response)
282
response_bytes = self.read_response(iterable)
283
self.assertEqual('200 OK', self.status)
284
# expect a successful response, rather than a jail break error
285
from breezy.tests.test_smart_transport import LoggingMessageHandler
286
message_handler = LoggingMessageHandler()
287
decoder = protocol.ProtocolThreeDecoder(
288
message_handler, expect_version_marker=True)
289
decoder.accept_bytes(response_bytes)
291
('structure', (b'branch', branch._format.network_name()))
292
in message_handler.event_log)
295
class FakeRequest(object):
297
def __init__(self, transport, write_func):
298
self.transport = transport
299
self.write_func = write_func
300
self.accepted_bytes = b''
302
def accept_bytes(self, bytes):
303
self.accepted_bytes = bytes
304
self.write_func(b'got bytes: ' + bytes)
306
def next_read_size(self):
310
class FakeTransport(object):
314
self.base = 'fake:///'
316
def abspath(self, relpath):
317
return 'fake:///' + relpath
319
def clone(self, relpath):
320
self.calls.append(('clone', relpath))
324
class IncompleteRequest(FakeRequest):
325
"""A request-like object that always expects to read more bytes."""
327
def next_read_size(self):
328
# this request always asks for more