14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
from cStringIO import StringIO
20
from urllib.request import (
24
except ImportError: # python < 3
37
from ..sixish import (
40
from ..bzr.smart import (
43
from . import http_server
44
from ..transport import chroot
32
from bzrlib.smart import medium, protocol
33
from bzrlib.tests import http_server
34
from bzrlib.transport import (
47
40
class HTTPServerWithSmarts(http_server.HttpServer):
58
51
class SmartRequestHandler(http_server.TestingHTTPRequestHandler):
59
52
"""Extend TestingHTTPRequestHandler to support smart client POSTs.
61
XXX: This duplicates a fair bit of the logic in breezy.transport.http.wsgi.
54
XXX: This duplicates a fair bit of the logic in bzrlib.transport.http.wsgi.
65
58
"""Hand the request off to a smart server instance."""
66
backing = transport.get_transport_from_path(
67
self.server.test_case_server._home_dir)
59
backing = get_transport(self.server.test_case_server._home_dir)
68
60
chroot_server = chroot.ChrootServer(backing)
69
61
chroot_server.start_server()
71
t = transport.get_transport_from_url(chroot_server.get_url())
63
t = get_transport(chroot_server.get_url())
72
64
self.do_POST_inner(t)
74
66
chroot_server.stop_server()
91
83
# we have to stop early due to error, but we would also have to use the
92
84
# HTTP trailer facility which may not be widely available.
93
85
request_bytes = self.rfile.read(data_length)
94
protocol_factory, unused_bytes = (
95
medium._get_protocol_factory_for_bytes(request_bytes))
96
out_buffer = BytesIO()
86
protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
88
out_buffer = StringIO()
97
89
smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
98
90
# Perhaps there should be a SmartServerHTTPMedium that takes care of
99
91
# feeding the bytes in the http request to the smart_protocol_request,
115
107
backed by regular disk files.
118
# These attributes can be overriden or parametrized by daughter clasess if
119
# needed, but must exist so that the create_transport_readonly_server()
120
# method (or any method creating an http(s) server) can propagate it.
110
# This can be overriden or parametrized by daughter clasess if needed, but
111
# it must exist so that the create_transport_readonly_server() method can
121
113
_protocol_version = None
122
_url_protocol = 'http'
125
116
super(TestCaseWithWebserver, self).setUp()
126
117
self.transport_readonly_server = http_server.HttpServer
128
119
def create_transport_readonly_server(self):
129
server = self.transport_readonly_server(
120
return self.transport_readonly_server(
130
121
protocol_version=self._protocol_version)
131
server._url_protocol = self._url_protocol
135
124
class TestCaseWithTwoWebservers(TestCaseWithWebserver):
161
147
self.start_server(self.__secondary_server)
162
148
return self.__secondary_server
164
def get_secondary_url(self, relpath=None):
165
base = self.get_secondary_server().get_url()
166
return self._adjust_url(base, relpath)
168
def get_secondary_transport(self, relpath=None):
169
t = transport.get_transport_from_url(self.get_secondary_url(relpath))
170
self.assertTrue(t.is_readonly())
174
151
class ProxyServer(http_server.HttpServer):
175
152
"""A proxy test server for http transports."""
218
195
def redirect_to(self, host, port):
219
196
"""Redirect all requests to a specific host:port"""
220
197
self.redirections = [('(.*)',
221
r'http://%s:%s\1' % (host, port),
198
r'http://%s:%s\1' % (host, port) ,
224
201
def is_redirected(self, path):
234
211
for (rsource, rtarget, rcode) in self.redirections:
235
target, match = re.subn(rsource, rtarget, path, count=1)
212
target, match = re.subn(rsource, rtarget, path)
238
break # The first match wins
215
break # The first match wins
241
218
return code, target
244
221
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
245
"""A support class providing redirections from one server to another.
247
We set up two webservers to allows various tests involving
249
The 'old' server is redirected to the 'new' server.
253
super(TestCaseWithRedirectedWebserver, self).setUp()
254
# The redirections will point to the new server
255
self.new_server = self.get_readonly_server()
256
# The requests to the old server will be redirected to the new server
257
self.old_server = self.get_secondary_server()
259
def create_transport_secondary_server(self):
260
"""Create the secondary server redirecting to the primary server"""
261
new = self.get_readonly_server()
262
redirecting = HTTPServerRedirecting(
263
protocol_version=self._protocol_version)
264
redirecting.redirect_to(new.host, new.port)
265
redirecting._url_protocol = self._url_protocol
268
def get_old_url(self, relpath=None):
269
base = self.old_server.get_url()
270
return self._adjust_url(base, relpath)
272
def get_old_transport(self, relpath=None):
273
t = transport.get_transport_from_url(self.get_old_url(relpath))
274
self.assertTrue(t.is_readonly())
277
def get_new_url(self, relpath=None):
278
base = self.new_server.get_url()
279
return self._adjust_url(base, relpath)
281
def get_new_transport(self, relpath=None):
282
t = transport.get_transport_from_url(self.get_new_url(relpath))
283
self.assertTrue(t.is_readonly())
222
"""A support class providing redirections from one server to another.
224
We set up two webservers to allows various tests involving
226
The 'old' server is redirected to the 'new' server.
229
def create_transport_secondary_server(self):
230
"""Create the secondary server redirecting to the primary server"""
231
new = self.get_readonly_server()
232
redirecting = HTTPServerRedirecting(
233
protocol_version=self._protocol_version)
234
redirecting.redirect_to(new.host, new.port)
238
super(TestCaseWithRedirectedWebserver, self).setUp()
239
# The redirections will point to the new server
240
self.new_server = self.get_readonly_server()
241
# The requests to the old server will be redirected
242
self.old_server = self.get_secondary_server()
287
245
class AuthRequestHandler(http_server.TestingHTTPRequestHandler):
297
255
# - auth_header_recv: the header received containing auth
298
256
# - auth_error_code: the error code to indicate auth required
300
def _require_authentication(self):
301
# Note that we must update test_case_server *before*
302
# sending the error or the client may try to read it
303
# before we have sent the whole error back.
304
tcs = self.server.test_case_server
305
tcs.auth_required_errors += 1
306
self.send_response(tcs.auth_error_code)
307
self.send_header_auth_reqed()
308
# We do not send a body
309
self.send_header('Content-Length', '0')
313
258
def do_GET(self):
314
259
if self.authorized():
315
260
return http_server.TestingHTTPRequestHandler.do_GET(self)
317
return self._require_authentication()
320
if self.authorized():
321
return http_server.TestingHTTPRequestHandler.do_HEAD(self)
323
return self._require_authentication()
262
# Note that we must update test_case_server *before*
263
# sending the error or the client may try to read it
264
# before we have sent the whole error back.
265
tcs = self.server.test_case_server
266
tcs.auth_required_errors += 1
267
self.send_response(tcs.auth_error_code)
268
self.send_header_auth_reqed()
269
# We do not send a body
270
self.send_header('Content-Length', '0')
326
275
class BasicAuthRequestHandler(AuthRequestHandler):
336
285
scheme, raw_auth = auth_header.split(' ', 1)
337
286
if scheme.lower() == tcs.auth_scheme:
338
user, password = base64.b64decode(raw_auth).split(b':')
339
return tcs.authorized(user.decode('ascii'),
340
password.decode('ascii'))
287
user, password = raw_auth.decode('base64').split(':')
288
return tcs.authorized(user, password)
474
422
# Recalculate the response_digest to compare with the one
475
423
# sent by the client
476
A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
477
A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
480
return osutils.md5(x).hexdigest()
482
def KD(secret, data):
483
return H(("%s:%s" % (secret, data)).encode('utf-8'))
424
A1 = '%s:%s:%s' % (user, realm, password)
425
A2 = '%s:%s' % (command, auth['uri'])
427
H = lambda x: osutils.md5(x).hexdigest()
428
KD = lambda secret, data: H("%s:%s" % (secret, data))
485
430
nonce_count = int(auth['nc'], 16)