1
# Copyright (C) 2006-2009, 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for WSGI application"""
20
from ..sixish import (
23
from ..bzr.smart import medium, protocol
24
from ..transport.http import wsgi
25
from ..transport import chroot, memory
28
class WSGITestMixin(object):
30
def build_environ(self, updates=None):
31
"""Builds an environ dict with all fields required by PEP 333.
33
:param updates: a dict to that will be incorporated into the returned
34
dict using dict.update(updates).
37
# Required CGI variables
38
'REQUEST_METHOD': 'GET',
39
'SCRIPT_NAME': '/script/name/',
40
'PATH_INFO': 'path/info',
41
'SERVER_NAME': 'test',
42
'SERVER_PORT': '9999',
43
'SERVER_PROTOCOL': 'HTTP/1.0',
45
# Required WSGI variables
46
'wsgi.version': (1, 0),
47
'wsgi.url_scheme': 'http',
48
'wsgi.input': BytesIO(b''),
49
'wsgi.errors': BytesIO(),
50
'wsgi.multithread': False,
51
'wsgi.multiprocess': False,
52
'wsgi.run_once': True,
54
if updates is not None:
55
environ.update(updates)
58
def read_response(self, iterable):
60
for string in iterable:
64
def start_response(self, status, headers):
66
self.headers = headers
69
class TestWSGI(tests.TestCaseInTempDir, WSGITestMixin):
72
super(TestWSGI, self).setUp()
76
def test_construct(self):
77
app = wsgi.SmartWSGIApp(FakeTransport())
78
self.assertIsInstance(
79
app.backing_transport, chroot.ChrootTransport)
81
def test_http_get_rejected(self):
82
# GET requests are rejected.
83
app = wsgi.SmartWSGIApp(FakeTransport())
84
environ = self.build_environ({'REQUEST_METHOD': 'GET'})
85
iterable = app(environ, self.start_response)
86
self.read_response(iterable)
87
self.assertEqual('405 Method not allowed', self.status)
88
self.assertTrue(('Allow', 'POST') in self.headers)
90
def _fake_make_request(self, transport, write_func, bytes, rcp):
91
request = FakeRequest(transport, write_func)
92
request.accept_bytes(bytes)
93
self.request = request
96
def test_smart_wsgi_app_uses_given_relpath(self):
97
# The SmartWSGIApp should use the "breezy.relpath" field from the
98
# WSGI environ to clone from its backing transport to get a specific
99
# transport for this request.
100
transport = FakeTransport()
101
wsgi_app = wsgi.SmartWSGIApp(transport)
102
wsgi_app.backing_transport = transport
103
wsgi_app.make_request = self._fake_make_request
104
fake_input = BytesIO(b'fake request')
105
environ = self.build_environ({
106
'REQUEST_METHOD': 'POST',
107
'CONTENT_LENGTH': len(fake_input.getvalue()),
108
'wsgi.input': fake_input,
109
'breezy.relpath': 'foo/bar',
111
iterable = wsgi_app(environ, self.start_response)
112
response = self.read_response(iterable)
113
self.assertEqual([('clone', 'foo/bar/')], transport.calls)
115
def test_smart_wsgi_app_request_and_response(self):
116
# SmartWSGIApp reads the smart request from the 'wsgi.input' file-like
117
# object in the environ dict, and returns the response via the iterable
118
# returned to the WSGI handler.
119
transport = memory.MemoryTransport()
120
transport.put_bytes('foo', b'some bytes')
121
wsgi_app = wsgi.SmartWSGIApp(transport)
122
wsgi_app.make_request = self._fake_make_request
123
fake_input = BytesIO(b'fake request')
124
environ = self.build_environ({
125
'REQUEST_METHOD': 'POST',
126
'CONTENT_LENGTH': len(fake_input.getvalue()),
127
'wsgi.input': fake_input,
128
'breezy.relpath': 'foo',
130
iterable = wsgi_app(environ, self.start_response)
131
response = self.read_response(iterable)
132
self.assertEqual('200 OK', self.status)
133
self.assertEqual(b'got bytes: fake request', response)
135
def test_relpath_setter(self):
136
# wsgi.RelpathSetter is WSGI "middleware" to set the 'breezy.relpath'
140
def fake_app(environ, start_response):
141
calls.append(environ['breezy.relpath'])
142
wrapped_app = wsgi.RelpathSetter(
143
fake_app, prefix='/abc/', path_var='FOO')
144
wrapped_app({'FOO': '/abc/xyz/.bzr/smart'}, None)
145
self.assertEqual(['xyz'], calls)
147
def test_relpath_setter_bad_path_prefix(self):
148
# wsgi.RelpathSetter will reject paths with that don't match the prefix
149
# with a 404. This is probably a sign of misconfiguration; a server
150
# shouldn't ever be invoking our WSGI application with bad paths.
151
def fake_app(environ, start_response):
152
self.fail('The app should never be called when the path is wrong')
153
wrapped_app = wsgi.RelpathSetter(
154
fake_app, prefix='/abc/', path_var='FOO')
155
iterable = wrapped_app(
156
{'FOO': 'AAA/abc/xyz/.bzr/smart'}, self.start_response)
157
self.read_response(iterable)
158
self.assertTrue(self.status.startswith('404'))
160
def test_relpath_setter_bad_path_suffix(self):
161
# Similar to test_relpath_setter_bad_path_prefix: wsgi.RelpathSetter
162
# will reject paths with that don't match the suffix '.bzr/smart' with a
163
# 404 as well. Again, this shouldn't be seen by our WSGI application if
164
# the server is configured correctly.
165
def fake_app(environ, start_response):
166
self.fail('The app should never be called when the path is wrong')
167
wrapped_app = wsgi.RelpathSetter(
168
fake_app, prefix='/abc/', path_var='FOO')
169
iterable = wrapped_app(
170
{'FOO': '/abc/xyz/.bzr/AAA'}, self.start_response)
171
self.read_response(iterable)
172
self.assertTrue(self.status.startswith('404'))
174
def test_make_app(self):
175
# The make_app helper constructs a SmartWSGIApp wrapped in a
180
path_var='a path_var')
181
self.assertIsInstance(app, wsgi.RelpathSetter)
182
self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
183
self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
184
backing_transport = app.app.backing_transport
185
chroot_backing_transport = backing_transport.server.backing_transport
186
self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
187
self.assertEqual(app.app.root_client_path, 'a prefix')
188
self.assertEqual(app.path_var, 'a path_var')
190
def test_incomplete_request(self):
191
transport = FakeTransport()
192
wsgi_app = wsgi.SmartWSGIApp(transport)
194
def make_request(transport, write_func, bytes, root_client_path):
195
request = IncompleteRequest(transport, write_func)
196
request.accept_bytes(bytes)
197
self.request = request
199
wsgi_app.make_request = make_request
201
fake_input = BytesIO(b'incomplete request')
202
environ = self.build_environ({
203
'REQUEST_METHOD': 'POST',
204
'CONTENT_LENGTH': len(fake_input.getvalue()),
205
'wsgi.input': fake_input,
206
'breezy.relpath': 'foo/bar',
208
iterable = wsgi_app(environ, self.start_response)
209
response = self.read_response(iterable)
210
self.assertEqual('200 OK', self.status)
211
self.assertEqual(b'error\x01incomplete request\n', response)
213
def test_protocol_version_detection_one(self):
214
# SmartWSGIApp detects requests that don't start with
215
# REQUEST_VERSION_TWO as version one.
216
transport = memory.MemoryTransport()
217
wsgi_app = wsgi.SmartWSGIApp(transport)
218
fake_input = BytesIO(b'hello\n')
219
environ = self.build_environ({
220
'REQUEST_METHOD': 'POST',
221
'CONTENT_LENGTH': len(fake_input.getvalue()),
222
'wsgi.input': fake_input,
223
'breezy.relpath': 'foo',
225
iterable = wsgi_app(environ, self.start_response)
226
response = self.read_response(iterable)
227
self.assertEqual('200 OK', self.status)
228
# Expect a version 1-encoded response.
229
self.assertEqual(b'ok\x012\n', response)
231
def test_protocol_version_detection_two(self):
232
# SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
234
transport = memory.MemoryTransport()
235
wsgi_app = wsgi.SmartWSGIApp(transport)
236
fake_input = BytesIO(protocol.REQUEST_VERSION_TWO + b'hello\n')
237
environ = self.build_environ({
238
'REQUEST_METHOD': 'POST',
239
'CONTENT_LENGTH': len(fake_input.getvalue()),
240
'wsgi.input': fake_input,
241
'breezy.relpath': 'foo',
243
iterable = wsgi_app(environ, self.start_response)
244
response = self.read_response(iterable)
245
self.assertEqual('200 OK', self.status)
246
# Expect a version 2-encoded response.
248
protocol.RESPONSE_VERSION_TWO + b'success\nok\x012\n', response)
251
class TestWSGIJail(tests.TestCaseWithMemoryTransport, WSGITestMixin):
253
def make_hpss_wsgi_request(self, wsgi_relpath, *args):
254
write_buf = BytesIO()
255
request_medium = medium.SmartSimplePipesClientMedium(
256
None, write_buf, 'fake:' + wsgi_relpath)
257
request_encoder = protocol.ProtocolThreeRequester(
258
request_medium.get_request())
259
request_encoder.call(*args)
261
environ = self.build_environ({
262
'REQUEST_METHOD': 'POST',
263
'CONTENT_LENGTH': len(write_buf.getvalue()),
264
'wsgi.input': write_buf,
265
'breezy.relpath': wsgi_relpath,
269
def test_jail_root(self):
270
"""The WSGI HPSS glue allows access to the whole WSGI backing
271
transport, regardless of which HTTP path the request was delivered
274
# make a branch in a shared repo
275
self.make_repository('repo', shared=True)
276
branch = self.make_controldir('repo/branch').create_branch()
277
# serve the repo via bzr+http WSGI
278
wsgi_app = wsgi.SmartWSGIApp(self.get_transport())
279
# send a request to /repo/branch that will have to access /repo.
280
environ = self.make_hpss_wsgi_request(
281
'/repo/branch', b'BzrDir.open_branchV2', b'.')
282
iterable = wsgi_app(environ, self.start_response)
283
response_bytes = self.read_response(iterable)
284
self.assertEqual('200 OK', self.status)
285
# expect a successful response, rather than a jail break error
286
from breezy.tests.test_smart_transport import LoggingMessageHandler
287
message_handler = LoggingMessageHandler()
288
decoder = protocol.ProtocolThreeDecoder(
289
message_handler, expect_version_marker=True)
290
decoder.accept_bytes(response_bytes)
292
('structure', (b'branch', branch._format.network_name()))
293
in message_handler.event_log)
296
class FakeRequest(object):
298
def __init__(self, transport, write_func):
299
self.transport = transport
300
self.write_func = write_func
301
self.accepted_bytes = b''
303
def accept_bytes(self, bytes):
304
self.accepted_bytes = bytes
305
self.write_func(b'got bytes: ' + bytes)
307
def next_read_size(self):
311
class FakeTransport(object):
315
self.base = 'fake:///'
317
def abspath(self, relpath):
318
return 'fake:///' + relpath
320
def clone(self, relpath):
321
self.calls.append(('clone', relpath))
325
class IncompleteRequest(FakeRequest):
326
"""A request-like object that always expects to read more bytes."""
328
def next_read_size(self):
329
# this request always asks for more