83
81
# we have to stop early due to error, but we would also have to use the
84
82
# HTTP trailer facility which may not be widely available.
85
83
request_bytes = self.rfile.read(data_length)
86
protocol_factory, unused_bytes = (
87
medium._get_protocol_factory_for_bytes(request_bytes))
84
protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
88
86
out_buffer = BytesIO()
89
87
smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
90
88
# Perhaps there should be a SmartServerHTTPMedium that takes care of
226
223
for (rsource, rtarget, rcode) in self.redirections:
227
target, match = re.subn(rsource, rtarget, path, count=1)
224
target, match = re.subn(rsource, rtarget, path)
230
break # The first match wins
227
break # The first match wins
233
230
return code, target
236
233
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
237
"""A support class providing redirections from one server to another.
239
We set up two webservers to allows various tests involving
241
The 'old' server is redirected to the 'new' server.
245
super(TestCaseWithRedirectedWebserver, self).setUp()
246
# The redirections will point to the new server
247
self.new_server = self.get_readonly_server()
248
# The requests to the old server will be redirected to the new server
249
self.old_server = self.get_secondary_server()
251
def create_transport_secondary_server(self):
252
"""Create the secondary server redirecting to the primary server"""
253
new = self.get_readonly_server()
254
redirecting = HTTPServerRedirecting(
255
protocol_version=self._protocol_version)
256
redirecting.redirect_to(new.host, new.port)
257
redirecting._url_protocol = self._url_protocol
260
def get_old_url(self, relpath=None):
234
"""A support class providing redirections from one server to another.
236
We set up two webservers to allows various tests involving
238
The 'old' server is redirected to the 'new' server.
242
super(TestCaseWithRedirectedWebserver, self).setUp()
243
# The redirections will point to the new server
244
self.new_server = self.get_readonly_server()
245
# The requests to the old server will be redirected to the new server
246
self.old_server = self.get_secondary_server()
248
def create_transport_secondary_server(self):
249
"""Create the secondary server redirecting to the primary server"""
250
new = self.get_readonly_server()
251
redirecting = HTTPServerRedirecting(
252
protocol_version=self._protocol_version)
253
redirecting.redirect_to(new.host, new.port)
254
redirecting._url_protocol = self._url_protocol
257
def get_old_url(self, relpath=None):
261
258
base = self.old_server.get_url()
262
259
return self._adjust_url(base, relpath)
264
def get_old_transport(self, relpath=None):
261
def get_old_transport(self, relpath=None):
265
262
t = transport.get_transport_from_url(self.get_old_url(relpath))
266
263
self.assertTrue(t.is_readonly())
269
def get_new_url(self, relpath=None):
266
def get_new_url(self, relpath=None):
270
267
base = self.new_server.get_url()
271
268
return self._adjust_url(base, relpath)
273
def get_new_transport(self, relpath=None):
270
def get_new_transport(self, relpath=None):
274
271
t = transport.get_transport_from_url(self.get_new_url(relpath))
275
272
self.assertTrue(t.is_readonly())
328
325
scheme, raw_auth = auth_header.split(' ', 1)
329
326
if scheme.lower() == tcs.auth_scheme:
330
user, password = base64.b64decode(raw_auth).split(b':')
331
return tcs.authorized(user.decode('ascii'),
332
password.decode('ascii'))
327
user, password = raw_auth.decode('base64').split(':')
328
return tcs.authorized(user, password)
466
462
# Recalculate the response_digest to compare with the one
467
463
# sent by the client
468
A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
469
A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
472
return osutils.md5(x).hexdigest()
474
def KD(secret, data):
475
return H(("%s:%s" % (secret, data)).encode('utf-8'))
464
A1 = '%s:%s:%s' % (user, realm, password)
465
A2 = '%s:%s' % (command, auth['uri'])
467
H = lambda x: osutils.md5(x).hexdigest()
468
KD = lambda secret, data: H("%s:%s" % (secret, data))
477
470
nonce_count = int(auth['nc'], 16)