13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests for WSGI application"""
19
19
from cStringIO import StringIO
21
21
from bzrlib import tests
22
from bzrlib.smart import medium, message, protocol
23
22
from bzrlib.transport.http import wsgi
24
23
from bzrlib.transport import chroot, memory
27
class WSGITestMixin(object):
26
class TestWSGI(tests.TestCase):
29
tests.TestCase.setUp(self)
29
33
def build_environ(self, updates=None):
30
34
"""Builds an environ dict with all fields required by PEP 333.
32
36
:param updates: a dict to that will be incorporated into the returned
33
37
dict using dict.update(updates).
85
81
self.read_response(iterable)
86
82
self.assertEqual('405 Method not allowed', self.status)
87
83
self.assertTrue(('Allow', 'POST') in self.headers)
89
def _fake_make_request(self, transport, write_func, bytes, rcp):
90
request = FakeRequest(transport, write_func)
91
request.accept_bytes(bytes)
92
self.request = request
95
85
def test_smart_wsgi_app_uses_given_relpath(self):
86
# XXX XXX XXX update comment
96
87
# The SmartWSGIApp should use the "bzrlib.relpath" field from the
97
88
# WSGI environ to clone from its backing transport to get a specific
98
89
# transport for this request.
99
90
transport = FakeTransport()
100
91
wsgi_app = wsgi.SmartWSGIApp(transport)
101
92
wsgi_app.backing_transport = transport
102
wsgi_app.make_request = self._fake_make_request
93
def make_request(transport, write_func):
94
request = FakeRequest(transport, write_func)
95
self.request = request
97
wsgi_app.make_request = make_request
103
98
fake_input = StringIO('fake request')
104
99
environ = self.build_environ({
105
100
'REQUEST_METHOD': 'POST',
182
181
backing_transport = app.app.backing_transport
183
182
chroot_backing_transport = backing_transport.server.backing_transport
184
183
self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
185
self.assertEqual(app.app.root_client_path, 'a prefix')
184
self.assertEqual(app.prefix, 'a prefix')
186
185
self.assertEqual(app.path_var, 'a path_var')
188
187
def test_incomplete_request(self):
189
188
transport = FakeTransport()
190
189
wsgi_app = wsgi.SmartWSGIApp(transport)
191
def make_request(transport, write_func, bytes, root_client_path):
190
def make_request(transport, write_func):
192
191
request = IncompleteRequest(transport, write_func)
193
request.accept_bytes(bytes)
194
192
self.request = request
196
194
wsgi_app.make_request = make_request
207
205
self.assertEqual('200 OK', self.status)
208
206
self.assertEqual('error\x01incomplete request\n', response)
210
def test_protocol_version_detection_one(self):
211
# SmartWSGIApp detects requests that don't start with
212
# REQUEST_VERSION_TWO as version one.
213
transport = memory.MemoryTransport()
214
wsgi_app = wsgi.SmartWSGIApp(transport)
215
fake_input = StringIO('hello\n')
216
environ = self.build_environ({
217
'REQUEST_METHOD': 'POST',
218
'CONTENT_LENGTH': len(fake_input.getvalue()),
219
'wsgi.input': fake_input,
220
'bzrlib.relpath': 'foo',
222
iterable = wsgi_app(environ, self.start_response)
223
response = self.read_response(iterable)
224
self.assertEqual('200 OK', self.status)
225
# Expect a version 1-encoded response.
226
self.assertEqual('ok\x012\n', response)
228
def test_protocol_version_detection_two(self):
229
# SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
231
transport = memory.MemoryTransport()
232
wsgi_app = wsgi.SmartWSGIApp(transport)
233
fake_input = StringIO(protocol.REQUEST_VERSION_TWO + 'hello\n')
234
environ = self.build_environ({
235
'REQUEST_METHOD': 'POST',
236
'CONTENT_LENGTH': len(fake_input.getvalue()),
237
'wsgi.input': fake_input,
238
'bzrlib.relpath': 'foo',
240
iterable = wsgi_app(environ, self.start_response)
241
response = self.read_response(iterable)
242
self.assertEqual('200 OK', self.status)
243
# Expect a version 2-encoded response.
245
protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n', response)
248
class TestWSGIJail(tests.TestCaseWithMemoryTransport, WSGITestMixin):
250
def make_hpss_wsgi_request(self, wsgi_relpath, *args):
251
write_buf = StringIO()
252
request_medium = medium.SmartSimplePipesClientMedium(
253
None, write_buf, 'fake:' + wsgi_relpath)
254
request_encoder = protocol.ProtocolThreeRequester(
255
request_medium.get_request())
256
request_encoder.call(*args)
258
environ = self.build_environ({
259
'REQUEST_METHOD': 'POST',
260
'CONTENT_LENGTH': len(write_buf.getvalue()),
261
'wsgi.input': write_buf,
262
'bzrlib.relpath': wsgi_relpath,
266
def test_jail_root(self):
267
"""The WSGI HPSS glue allows access to the whole WSGI backing
268
transport, regardless of which HTTP path the request was delivered
271
# make a branch in a shared repo
272
self.make_repository('repo', shared=True)
273
branch = self.make_bzrdir('repo/branch').create_branch()
274
# serve the repo via bzr+http WSGI
275
wsgi_app = wsgi.SmartWSGIApp(self.get_transport())
276
# send a request to /repo/branch that will have to access /repo.
277
environ = self.make_hpss_wsgi_request(
278
'/repo/branch', 'BzrDir.open_branchV2', '.')
279
iterable = wsgi_app(environ, self.start_response)
280
response_bytes = self.read_response(iterable)
281
self.assertEqual('200 OK', self.status)
282
# expect a successful response, rather than a jail break error
283
from bzrlib.tests.test_smart_transport import LoggingMessageHandler
284
message_handler = LoggingMessageHandler()
285
decoder = protocol.ProtocolThreeDecoder(
286
message_handler, expect_version_marker=True)
287
decoder.accept_bytes(response_bytes)
289
('structure', ('branch', branch._format.network_name()))
290
in message_handler.event_log)
293
209
class FakeRequest(object):
295
211
def __init__(self, transport, write_func):
296
212
self.transport = transport
297
213
self.write_func = write_func