1
# Copyright (C) 2006-2009, 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for WSGI application"""
20
from ..sixish import (
23
from ..smart import medium, protocol
24
from ..transport.http import wsgi
25
from ..transport import chroot, memory
28
class WSGITestMixin(object):
30
def build_environ(self, updates=None):
31
"""Builds an environ dict with all fields required by PEP 333.
33
:param updates: a dict to that will be incorporated into the returned
34
dict using dict.update(updates).
37
# Required CGI variables
38
'REQUEST_METHOD': 'GET',
39
'SCRIPT_NAME': '/script/name/',
40
'PATH_INFO': 'path/info',
41
'SERVER_NAME': 'test',
42
'SERVER_PORT': '9999',
43
'SERVER_PROTOCOL': 'HTTP/1.0',
45
# Required WSGI variables
46
'wsgi.version': (1,0),
47
'wsgi.url_scheme': 'http',
48
'wsgi.input': BytesIO(b''),
49
'wsgi.errors': BytesIO(),
50
'wsgi.multithread': False,
51
'wsgi.multiprocess': False,
52
'wsgi.run_once': True,
54
if updates is not None:
55
environ.update(updates)
58
def read_response(self, iterable):
60
for string in iterable:
64
def start_response(self, status, headers):
66
self.headers = headers
69
class TestWSGI(tests.TestCaseInTempDir, WSGITestMixin):
72
super(TestWSGI, self).setUp()
76
def test_construct(self):
77
app = wsgi.SmartWSGIApp(FakeTransport())
78
self.assertIsInstance(
79
app.backing_transport, chroot.ChrootTransport)
81
def test_http_get_rejected(self):
82
# GET requests are rejected.
83
app = wsgi.SmartWSGIApp(FakeTransport())
84
environ = self.build_environ({'REQUEST_METHOD': 'GET'})
85
iterable = app(environ, self.start_response)
86
self.read_response(iterable)
87
self.assertEqual('405 Method not allowed', self.status)
88
self.assertTrue(('Allow', 'POST') in self.headers)
90
def _fake_make_request(self, transport, write_func, bytes, rcp):
91
request = FakeRequest(transport, write_func)
92
request.accept_bytes(bytes)
93
self.request = request
96
def test_smart_wsgi_app_uses_given_relpath(self):
97
# The SmartWSGIApp should use the "breezy.relpath" field from the
98
# WSGI environ to clone from its backing transport to get a specific
99
# transport for this request.
100
transport = FakeTransport()
101
wsgi_app = wsgi.SmartWSGIApp(transport)
102
wsgi_app.backing_transport = transport
103
wsgi_app.make_request = self._fake_make_request
104
fake_input = BytesIO(b'fake request')
105
environ = self.build_environ({
106
'REQUEST_METHOD': 'POST',
107
'CONTENT_LENGTH': len(fake_input.getvalue()),
108
'wsgi.input': fake_input,
109
'breezy.relpath': 'foo/bar',
111
iterable = wsgi_app(environ, self.start_response)
112
response = self.read_response(iterable)
113
self.assertEqual([('clone', 'foo/bar/')] , transport.calls)
115
def test_smart_wsgi_app_request_and_response(self):
116
# SmartWSGIApp reads the smart request from the 'wsgi.input' file-like
117
# object in the environ dict, and returns the response via the iterable
118
# returned to the WSGI handler.
119
transport = memory.MemoryTransport()
120
transport.put_bytes('foo', 'some bytes')
121
wsgi_app = wsgi.SmartWSGIApp(transport)
122
wsgi_app.make_request = self._fake_make_request
123
fake_input = BytesIO(b'fake request')
124
environ = self.build_environ({
125
'REQUEST_METHOD': 'POST',
126
'CONTENT_LENGTH': len(fake_input.getvalue()),
127
'wsgi.input': fake_input,
128
'breezy.relpath': 'foo',
130
iterable = wsgi_app(environ, self.start_response)
131
response = self.read_response(iterable)
132
self.assertEqual('200 OK', self.status)
133
self.assertEqual('got bytes: fake request', response)
135
def test_relpath_setter(self):
136
# wsgi.RelpathSetter is WSGI "middleware" to set the 'breezy.relpath'
139
def fake_app(environ, start_response):
140
calls.append(environ['breezy.relpath'])
141
wrapped_app = wsgi.RelpathSetter(
142
fake_app, prefix='/abc/', path_var='FOO')
143
wrapped_app({'FOO': '/abc/xyz/.bzr/smart'}, None)
144
self.assertEqual(['xyz'], calls)
146
def test_relpath_setter_bad_path_prefix(self):
147
# wsgi.RelpathSetter will reject paths with that don't match the prefix
148
# with a 404. This is probably a sign of misconfiguration; a server
149
# shouldn't ever be invoking our WSGI application with bad paths.
150
def fake_app(environ, start_response):
151
self.fail('The app should never be called when the path is wrong')
152
wrapped_app = wsgi.RelpathSetter(
153
fake_app, prefix='/abc/', path_var='FOO')
154
iterable = wrapped_app(
155
{'FOO': 'AAA/abc/xyz/.bzr/smart'}, self.start_response)
156
self.read_response(iterable)
157
self.assertTrue(self.status.startswith('404'))
159
def test_relpath_setter_bad_path_suffix(self):
160
# Similar to test_relpath_setter_bad_path_prefix: wsgi.RelpathSetter
161
# will reject paths with that don't match the suffix '.bzr/smart' with a
162
# 404 as well. Again, this shouldn't be seen by our WSGI application if
163
# the server is configured correctly.
164
def fake_app(environ, start_response):
165
self.fail('The app should never be called when the path is wrong')
166
wrapped_app = wsgi.RelpathSetter(
167
fake_app, prefix='/abc/', path_var='FOO')
168
iterable = wrapped_app(
169
{'FOO': '/abc/xyz/.bzr/AAA'}, self.start_response)
170
self.read_response(iterable)
171
self.assertTrue(self.status.startswith('404'))
173
def test_make_app(self):
174
# The make_app helper constructs a SmartWSGIApp wrapped in a
179
path_var='a path_var')
180
self.assertIsInstance(app, wsgi.RelpathSetter)
181
self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
182
self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
183
backing_transport = app.app.backing_transport
184
chroot_backing_transport = backing_transport.server.backing_transport
185
self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
186
self.assertEqual(app.app.root_client_path, 'a prefix')
187
self.assertEqual(app.path_var, 'a path_var')
189
def test_incomplete_request(self):
190
transport = FakeTransport()
191
wsgi_app = wsgi.SmartWSGIApp(transport)
192
def make_request(transport, write_func, bytes, root_client_path):
193
request = IncompleteRequest(transport, write_func)
194
request.accept_bytes(bytes)
195
self.request = request
197
wsgi_app.make_request = make_request
199
fake_input = BytesIO(b'incomplete request')
200
environ = self.build_environ({
201
'REQUEST_METHOD': 'POST',
202
'CONTENT_LENGTH': len(fake_input.getvalue()),
203
'wsgi.input': fake_input,
204
'breezy.relpath': 'foo/bar',
206
iterable = wsgi_app(environ, self.start_response)
207
response = self.read_response(iterable)
208
self.assertEqual('200 OK', self.status)
209
self.assertEqual('error\x01incomplete request\n', response)
211
def test_protocol_version_detection_one(self):
212
# SmartWSGIApp detects requests that don't start with
213
# REQUEST_VERSION_TWO as version one.
214
transport = memory.MemoryTransport()
215
wsgi_app = wsgi.SmartWSGIApp(transport)
216
fake_input = BytesIO(b'hello\n')
217
environ = self.build_environ({
218
'REQUEST_METHOD': 'POST',
219
'CONTENT_LENGTH': len(fake_input.getvalue()),
220
'wsgi.input': fake_input,
221
'breezy.relpath': 'foo',
223
iterable = wsgi_app(environ, self.start_response)
224
response = self.read_response(iterable)
225
self.assertEqual('200 OK', self.status)
226
# Expect a version 1-encoded response.
227
self.assertEqual('ok\x012\n', response)
229
def test_protocol_version_detection_two(self):
230
# SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
232
transport = memory.MemoryTransport()
233
wsgi_app = wsgi.SmartWSGIApp(transport)
234
fake_input = BytesIO(protocol.REQUEST_VERSION_TWO + 'hello\n')
235
environ = self.build_environ({
236
'REQUEST_METHOD': 'POST',
237
'CONTENT_LENGTH': len(fake_input.getvalue()),
238
'wsgi.input': fake_input,
239
'breezy.relpath': 'foo',
241
iterable = wsgi_app(environ, self.start_response)
242
response = self.read_response(iterable)
243
self.assertEqual('200 OK', self.status)
244
# Expect a version 2-encoded response.
246
protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n', response)
249
class TestWSGIJail(tests.TestCaseWithMemoryTransport, WSGITestMixin):
251
def make_hpss_wsgi_request(self, wsgi_relpath, *args):
252
write_buf = BytesIO()
253
request_medium = medium.SmartSimplePipesClientMedium(
254
None, write_buf, 'fake:' + wsgi_relpath)
255
request_encoder = protocol.ProtocolThreeRequester(
256
request_medium.get_request())
257
request_encoder.call(*args)
259
environ = self.build_environ({
260
'REQUEST_METHOD': 'POST',
261
'CONTENT_LENGTH': len(write_buf.getvalue()),
262
'wsgi.input': write_buf,
263
'breezy.relpath': wsgi_relpath,
267
def test_jail_root(self):
268
"""The WSGI HPSS glue allows access to the whole WSGI backing
269
transport, regardless of which HTTP path the request was delivered
272
# make a branch in a shared repo
273
self.make_repository('repo', shared=True)
274
branch = self.make_bzrdir('repo/branch').create_branch()
275
# serve the repo via bzr+http WSGI
276
wsgi_app = wsgi.SmartWSGIApp(self.get_transport())
277
# send a request to /repo/branch that will have to access /repo.
278
environ = self.make_hpss_wsgi_request(
279
'/repo/branch', 'BzrDir.open_branchV2', '.')
280
iterable = wsgi_app(environ, self.start_response)
281
response_bytes = self.read_response(iterable)
282
self.assertEqual('200 OK', self.status)
283
# expect a successful response, rather than a jail break error
284
from breezy.tests.test_smart_transport import LoggingMessageHandler
285
message_handler = LoggingMessageHandler()
286
decoder = protocol.ProtocolThreeDecoder(
287
message_handler, expect_version_marker=True)
288
decoder.accept_bytes(response_bytes)
290
('structure', ('branch', branch._format.network_name()))
291
in message_handler.event_log)
294
class FakeRequest(object):
296
def __init__(self, transport, write_func):
297
self.transport = transport
298
self.write_func = write_func
299
self.accepted_bytes = ''
301
def accept_bytes(self, bytes):
302
self.accepted_bytes = bytes
303
self.write_func('got bytes: ' + bytes)
305
def next_read_size(self):
309
class FakeTransport(object):
313
self.base = 'fake:///'
315
def abspath(self, relpath):
316
return 'fake:///' + relpath
318
def clone(self, relpath):
319
self.calls.append(('clone', relpath))
323
class IncompleteRequest(FakeRequest):
324
"""A request-like object that always expects to read more bytes."""
326
def next_read_size(self):
327
# this request always asks for more