14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from io import BytesIO
17
from cStringIO import StringIO
20
from urllib.request import (
32
from ..bzr.smart import (
28
from brzlib.smart import (
35
from . import http_server
36
from ..transport import chroot
31
from brzlib.tests import http_server
32
from brzlib.transport import chroot
39
35
class HTTPServerWithSmarts(http_server.HttpServer):
83
79
# we have to stop early due to error, but we would also have to use the
84
80
# HTTP trailer facility which may not be widely available.
85
81
request_bytes = self.rfile.read(data_length)
86
protocol_factory, unused_bytes = (
87
medium._get_protocol_factory_for_bytes(request_bytes))
88
out_buffer = BytesIO()
82
protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
84
out_buffer = StringIO()
89
85
smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
90
86
# Perhaps there should be a SmartServerHTTPMedium that takes care of
91
87
# feeding the bytes in the http request to the smart_protocol_request,
226
221
for (rsource, rtarget, rcode) in self.redirections:
227
target, match = re.subn(rsource, rtarget, path, count=1)
222
target, match = re.subn(rsource, rtarget, path)
230
break # The first match wins
225
break # The first match wins
233
228
return code, target
236
231
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
237
"""A support class providing redirections from one server to another.
239
We set up two webservers to allows various tests involving
241
The 'old' server is redirected to the 'new' server.
245
super(TestCaseWithRedirectedWebserver, self).setUp()
246
# The redirections will point to the new server
247
self.new_server = self.get_readonly_server()
248
# The requests to the old server will be redirected to the new server
249
self.old_server = self.get_secondary_server()
251
def create_transport_secondary_server(self):
252
"""Create the secondary server redirecting to the primary server"""
253
new = self.get_readonly_server()
254
redirecting = HTTPServerRedirecting(
255
protocol_version=self._protocol_version)
256
redirecting.redirect_to(new.host, new.port)
257
redirecting._url_protocol = self._url_protocol
260
def get_old_url(self, relpath=None):
232
"""A support class providing redirections from one server to another.
234
We set up two webservers to allows various tests involving
236
The 'old' server is redirected to the 'new' server.
240
super(TestCaseWithRedirectedWebserver, self).setUp()
241
# The redirections will point to the new server
242
self.new_server = self.get_readonly_server()
243
# The requests to the old server will be redirected to the new server
244
self.old_server = self.get_secondary_server()
246
def create_transport_secondary_server(self):
247
"""Create the secondary server redirecting to the primary server"""
248
new = self.get_readonly_server()
249
redirecting = HTTPServerRedirecting(
250
protocol_version=self._protocol_version)
251
redirecting.redirect_to(new.host, new.port)
252
redirecting._url_protocol = self._url_protocol
255
def get_old_url(self, relpath=None):
261
256
base = self.old_server.get_url()
262
257
return self._adjust_url(base, relpath)
264
def get_old_transport(self, relpath=None):
259
def get_old_transport(self, relpath=None):
265
260
t = transport.get_transport_from_url(self.get_old_url(relpath))
266
261
self.assertTrue(t.is_readonly())
269
def get_new_url(self, relpath=None):
264
def get_new_url(self, relpath=None):
270
265
base = self.new_server.get_url()
271
266
return self._adjust_url(base, relpath)
273
def get_new_transport(self, relpath=None):
268
def get_new_transport(self, relpath=None):
274
269
t = transport.get_transport_from_url(self.get_new_url(relpath))
275
270
self.assertTrue(t.is_readonly())
328
323
scheme, raw_auth = auth_header.split(' ', 1)
329
324
if scheme.lower() == tcs.auth_scheme:
330
user, password = base64.b64decode(raw_auth).split(b':')
331
return tcs.authorized(user.decode('ascii'),
332
password.decode('ascii'))
325
user, password = raw_auth.decode('base64').split(':')
326
return tcs.authorized(user, password)
466
460
# Recalculate the response_digest to compare with the one
467
461
# sent by the client
468
A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
469
A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
472
return osutils.md5(x).hexdigest()
474
def KD(secret, data):
475
return H(("%s:%s" % (secret, data)).encode('utf-8'))
462
A1 = '%s:%s:%s' % (user, realm, password)
463
A2 = '%s:%s' % (command, auth['uri'])
465
H = lambda x: osutils.md5(x).hexdigest()
466
KD = lambda secret, data: H("%s:%s" % (secret, data))
477
468
nonce_count = int(auth['nc'], 16)