1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
from cStringIO import StringIO
19
from urllib.request import (
23
except ImportError: # python < 3
32
from bzrlib.smart import medium, protocol
33
from bzrlib.tests import http_server
34
from bzrlib.transport import (
36
from ..sixish import (
39
from ..bzr.smart import (
42
from . import http_server
43
from ..transport import chroot
40
46
class HTTPServerWithSmarts(http_server.HttpServer):
51
57
class SmartRequestHandler(http_server.TestingHTTPRequestHandler):
52
58
"""Extend TestingHTTPRequestHandler to support smart client POSTs.
54
XXX: This duplicates a fair bit of the logic in bzrlib.transport.http.wsgi.
60
XXX: This duplicates a fair bit of the logic in breezy.transport.http.wsgi.
58
64
"""Hand the request off to a smart server instance."""
59
backing = get_transport(self.server.test_case_server._home_dir)
65
backing = transport.get_transport_from_path(
66
self.server.test_case_server._home_dir)
60
67
chroot_server = chroot.ChrootServer(backing)
61
68
chroot_server.start_server()
63
t = get_transport(chroot_server.get_url())
70
t = transport.get_transport_from_url(chroot_server.get_url())
64
71
self.do_POST_inner(t)
66
73
chroot_server.stop_server()
85
92
request_bytes = self.rfile.read(data_length)
86
93
protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
88
out_buffer = StringIO()
95
out_buffer = BytesIO()
89
96
smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
90
97
# Perhaps there should be a SmartServerHTTPMedium that takes care of
91
98
# feeding the bytes in the http request to the smart_protocol_request,
106
113
one. This will currently fail if the primary transport is not
107
114
backed by regular disk files.
117
# These attributes can be overriden or parametrized by daughter clasess if
118
# needed, but must exist so that the create_transport_readonly_server()
119
# method (or any method creating an http(s) server) can propagate it.
120
_protocol_version = None
121
_url_protocol = 'http'
110
124
super(TestCaseWithWebserver, self).setUp()
111
125
self.transport_readonly_server = http_server.HttpServer
127
def create_transport_readonly_server(self):
128
server = self.transport_readonly_server(
129
protocol_version=self._protocol_version)
130
server._url_protocol = self._url_protocol
114
134
class TestCaseWithTwoWebservers(TestCaseWithWebserver):
115
135
"""A support class providing readonly urls on two servers that are http://.
128
148
This is mostly a hook for daughter classes.
130
return self.transport_secondary_server()
150
server = self.transport_secondary_server(
151
protocol_version=self._protocol_version)
152
server._url_protocol = self._url_protocol
132
155
def get_secondary_server(self):
133
156
"""Get the server instance for the secondary transport."""
136
159
self.start_server(self.__secondary_server)
137
160
return self.__secondary_server
162
def get_secondary_url(self, relpath=None):
163
base = self.get_secondary_server().get_url()
164
return self._adjust_url(base, relpath)
166
def get_secondary_transport(self, relpath=None):
167
t = transport.get_transport_from_url(self.get_secondary_url(relpath))
168
self.assertTrue(t.is_readonly())
140
172
class ProxyServer(http_server.HttpServer):
141
173
"""A proxy test server for http transports."""
184
216
def redirect_to(self, host, port):
185
217
"""Redirect all requests to a specific host:port"""
186
218
self.redirections = [('(.*)',
187
r'http://%s:%s\1' % (host, port) ,
219
r'http://%s:%s\1' % (host, port),
190
222
def is_redirected(self, path):
215
247
The 'old' server is redirected to the 'new' server.
251
super(TestCaseWithRedirectedWebserver, self).setUp()
252
# The redirections will point to the new server
253
self.new_server = self.get_readonly_server()
254
# The requests to the old server will be redirected to the new server
255
self.old_server = self.get_secondary_server()
218
257
def create_transport_secondary_server(self):
219
258
"""Create the secondary server redirecting to the primary server"""
220
259
new = self.get_readonly_server()
221
redirecting = HTTPServerRedirecting()
260
redirecting = HTTPServerRedirecting(
261
protocol_version=self._protocol_version)
222
262
redirecting.redirect_to(new.host, new.port)
263
redirecting._url_protocol = self._url_protocol
223
264
return redirecting
226
super(TestCaseWithRedirectedWebserver, self).setUp()
227
# The redirections will point to the new server
228
self.new_server = self.get_readonly_server()
229
# The requests to the old server will be redirected
230
self.old_server = self.get_secondary_server()
266
def get_old_url(self, relpath=None):
267
base = self.old_server.get_url()
268
return self._adjust_url(base, relpath)
270
def get_old_transport(self, relpath=None):
271
t = transport.get_transport_from_url(self.get_old_url(relpath))
272
self.assertTrue(t.is_readonly())
275
def get_new_url(self, relpath=None):
276
base = self.new_server.get_url()
277
return self._adjust_url(base, relpath)
279
def get_new_transport(self, relpath=None):
280
t = transport.get_transport_from_url(self.get_new_url(relpath))
281
self.assertTrue(t.is_readonly())
233
285
class AuthRequestHandler(http_server.TestingHTTPRequestHandler):
243
295
# - auth_header_recv: the header received containing auth
244
296
# - auth_error_code: the error code to indicate auth required
298
def _require_authentication(self):
299
# Note that we must update test_case_server *before*
300
# sending the error or the client may try to read it
301
# before we have sent the whole error back.
302
tcs = self.server.test_case_server
303
tcs.auth_required_errors += 1
304
self.send_response(tcs.auth_error_code)
305
self.send_header_auth_reqed()
306
# We do not send a body
307
self.send_header('Content-Length', '0')
246
311
def do_GET(self):
247
312
if self.authorized():
248
313
return http_server.TestingHTTPRequestHandler.do_GET(self)
250
# Note that we must update test_case_server *before*
251
# sending the error or the client may try to read it
252
# before we have sent the whole error back.
253
tcs = self.server.test_case_server
254
tcs.auth_required_errors += 1
255
self.send_response(tcs.auth_error_code)
256
self.send_header_auth_reqed()
257
# We do not send a body
258
self.send_header('Content-Length', '0')
315
return self._require_authentication()
318
if self.authorized():
319
return http_server.TestingHTTPRequestHandler.do_HEAD(self)
321
return self._require_authentication()
263
324
class BasicAuthRequestHandler(AuthRequestHandler):
303
364
scheme, auth = auth_header.split(None, 1)
304
365
if scheme.lower() == tcs.auth_scheme:
305
auth_dict = urllib2.parse_keqv_list(urllib2.parse_http_list(auth))
366
auth_dict = parse_keqv_list(parse_http_list(auth))
307
368
return tcs.digest_authorized(auth_dict, self.command)
313
374
header = 'Digest realm="%s", ' % tcs.auth_realm
314
375
header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
316
self.send_header(tcs.auth_header_sent,header)
377
self.send_header(tcs.auth_header_sent, header)
319
380
class DigestAndBasicAuthRequestHandler(DigestAuthRequestHandler):
331
392
header = 'Digest realm="%s", ' % tcs.auth_realm
332
393
header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
334
self.send_header(tcs.auth_header_sent,header)
395
self.send_header(tcs.auth_header_sent, header)
337
398
class AuthServer(http_server.HttpServer):