14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
from cStringIO import StringIO
18
from io import BytesIO
20
from urllib.request import (
32
from bzrlib.smart import medium, protocol
33
from bzrlib.tests import http_server
34
from bzrlib.transport import (
32
from ..bzr.smart import (
35
from . import http_server
36
from ..transport import chroot
40
39
class HTTPServerWithSmarts(http_server.HttpServer):
51
50
class SmartRequestHandler(http_server.TestingHTTPRequestHandler):
52
51
"""Extend TestingHTTPRequestHandler to support smart client POSTs.
54
XXX: This duplicates a fair bit of the logic in bzrlib.transport.http.wsgi.
53
XXX: This duplicates a fair bit of the logic in breezy.transport.http.wsgi.
58
57
"""Hand the request off to a smart server instance."""
59
backing = get_transport(self.server.test_case_server._home_dir)
58
backing = transport.get_transport_from_path(
59
self.server.test_case_server._home_dir)
60
60
chroot_server = chroot.ChrootServer(backing)
61
61
chroot_server.start_server()
63
t = get_transport(chroot_server.get_url())
63
t = transport.get_transport_from_url(chroot_server.get_url())
64
64
self.do_POST_inner(t)
66
66
chroot_server.stop_server()
83
83
# we have to stop early due to error, but we would also have to use the
84
84
# HTTP trailer facility which may not be widely available.
85
85
request_bytes = self.rfile.read(data_length)
86
protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
88
out_buffer = StringIO()
86
protocol_factory, unused_bytes = (
87
medium._get_protocol_factory_for_bytes(request_bytes))
88
out_buffer = BytesIO()
89
89
smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
90
90
# Perhaps there should be a SmartServerHTTPMedium that takes care of
91
91
# feeding the bytes in the http request to the smart_protocol_request,
106
106
one. This will currently fail if the primary transport is not
107
107
backed by regular disk files.
110
# These attributes can be overriden or parametrized by daughter clasess if
111
# needed, but must exist so that the create_transport_readonly_server()
112
# method (or any method creating an http(s) server) can propagate it.
113
_protocol_version = None
114
_url_protocol = 'http'
110
117
super(TestCaseWithWebserver, self).setUp()
111
118
self.transport_readonly_server = http_server.HttpServer
120
def create_transport_readonly_server(self):
121
server = self.transport_readonly_server(
122
protocol_version=self._protocol_version)
123
server._url_protocol = self._url_protocol
114
127
class TestCaseWithTwoWebservers(TestCaseWithWebserver):
115
128
"""A support class providing readonly urls on two servers that are http://.
136
153
self.start_server(self.__secondary_server)
137
154
return self.__secondary_server
156
def get_secondary_url(self, relpath=None):
157
base = self.get_secondary_server().get_url()
158
return self._adjust_url(base, relpath)
160
def get_secondary_transport(self, relpath=None):
161
t = transport.get_transport_from_url(self.get_secondary_url(relpath))
162
self.assertTrue(t.is_readonly())
140
166
class ProxyServer(http_server.HttpServer):
141
167
"""A proxy test server for http transports."""
184
210
def redirect_to(self, host, port):
185
211
"""Redirect all requests to a specific host:port"""
186
212
self.redirections = [('(.*)',
187
r'http://%s:%s\1' % (host, port) ,
213
r'http://%s:%s\1' % (host, port),
190
216
def is_redirected(self, path):
200
226
for (rsource, rtarget, rcode) in self.redirections:
201
target, match = re.subn(rsource, rtarget, path)
227
target, match = re.subn(rsource, rtarget, path, count=1)
204
break # The first match wins
230
break # The first match wins
207
233
return code, target
210
236
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
211
"""A support class providing redirections from one server to another.
213
We set up two webservers to allows various tests involving
215
The 'old' server is redirected to the 'new' server.
218
def create_transport_secondary_server(self):
219
"""Create the secondary server redirecting to the primary server"""
220
new = self.get_readonly_server()
221
redirecting = HTTPServerRedirecting()
222
redirecting.redirect_to(new.host, new.port)
226
super(TestCaseWithRedirectedWebserver, self).setUp()
227
# The redirections will point to the new server
228
self.new_server = self.get_readonly_server()
229
# The requests to the old server will be redirected
230
self.old_server = self.get_secondary_server()
237
"""A support class providing redirections from one server to another.
239
We set up two webservers to allows various tests involving
241
The 'old' server is redirected to the 'new' server.
245
super(TestCaseWithRedirectedWebserver, self).setUp()
246
# The redirections will point to the new server
247
self.new_server = self.get_readonly_server()
248
# The requests to the old server will be redirected to the new server
249
self.old_server = self.get_secondary_server()
251
def create_transport_secondary_server(self):
252
"""Create the secondary server redirecting to the primary server"""
253
new = self.get_readonly_server()
254
redirecting = HTTPServerRedirecting(
255
protocol_version=self._protocol_version)
256
redirecting.redirect_to(new.host, new.port)
257
redirecting._url_protocol = self._url_protocol
260
def get_old_url(self, relpath=None):
261
base = self.old_server.get_url()
262
return self._adjust_url(base, relpath)
264
def get_old_transport(self, relpath=None):
265
t = transport.get_transport_from_url(self.get_old_url(relpath))
266
self.assertTrue(t.is_readonly())
269
def get_new_url(self, relpath=None):
270
base = self.new_server.get_url()
271
return self._adjust_url(base, relpath)
273
def get_new_transport(self, relpath=None):
274
t = transport.get_transport_from_url(self.get_new_url(relpath))
275
self.assertTrue(t.is_readonly())
233
279
class AuthRequestHandler(http_server.TestingHTTPRequestHandler):
243
289
# - auth_header_recv: the header received containing auth
244
290
# - auth_error_code: the error code to indicate auth required
292
def _require_authentication(self):
293
# Note that we must update test_case_server *before*
294
# sending the error or the client may try to read it
295
# before we have sent the whole error back.
296
tcs = self.server.test_case_server
297
tcs.auth_required_errors += 1
298
self.send_response(tcs.auth_error_code)
299
self.send_header_auth_reqed()
300
# We do not send a body
301
self.send_header('Content-Length', '0')
246
305
def do_GET(self):
247
306
if self.authorized():
248
307
return http_server.TestingHTTPRequestHandler.do_GET(self)
250
# Note that we must update test_case_server *before*
251
# sending the error or the client may try to read it
252
# before we have sent the whole error back.
253
tcs = self.server.test_case_server
254
tcs.auth_required_errors += 1
255
self.send_response(tcs.auth_error_code)
256
self.send_header_auth_reqed()
257
# We do not send a body
258
self.send_header('Content-Length', '0')
309
return self._require_authentication()
312
if self.authorized():
313
return http_server.TestingHTTPRequestHandler.do_HEAD(self)
315
return self._require_authentication()
263
318
class BasicAuthRequestHandler(AuthRequestHandler):
273
328
scheme, raw_auth = auth_header.split(' ', 1)
274
329
if scheme.lower() == tcs.auth_scheme:
275
user, password = raw_auth.decode('base64').split(':')
276
return tcs.authorized(user, password)
330
user, password = base64.b64decode(raw_auth).split(b':')
331
return tcs.authorized(user.decode('ascii'),
332
password.decode('ascii'))
410
466
# Recalculate the response_digest to compare with the one
411
467
# sent by the client
412
A1 = '%s:%s:%s' % (user, realm, password)
413
A2 = '%s:%s' % (command, auth['uri'])
415
H = lambda x: osutils.md5(x).hexdigest()
416
KD = lambda secret, data: H("%s:%s" % (secret, data))
468
A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
469
A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
472
return osutils.md5(x).hexdigest()
474
def KD(secret, data):
475
return H(("%s:%s" % (secret, data)).encode('utf-8'))
418
477
nonce_count = int(auth['nc'], 16)