90
83
# we have to stop early due to error, but we would also have to use the
91
84
# HTTP trailer facility which may not be widely available.
92
85
request_bytes = self.rfile.read(data_length)
93
protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
86
protocol_factory, unused_bytes = (
87
medium._get_protocol_factory_for_bytes(request_bytes))
95
88
out_buffer = BytesIO()
96
89
smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
97
90
# Perhaps there should be a SmartServerHTTPMedium that takes care of
232
226
for (rsource, rtarget, rcode) in self.redirections:
233
target, match = re.subn(rsource, rtarget, path)
227
target, match = re.subn(rsource, rtarget, path, count=1)
236
break # The first match wins
230
break # The first match wins
239
233
return code, target
242
236
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
243
"""A support class providing redirections from one server to another.
245
We set up two webservers to allows various tests involving
247
The 'old' server is redirected to the 'new' server.
251
super(TestCaseWithRedirectedWebserver, self).setUp()
252
# The redirections will point to the new server
253
self.new_server = self.get_readonly_server()
254
# The requests to the old server will be redirected to the new server
255
self.old_server = self.get_secondary_server()
257
def create_transport_secondary_server(self):
258
"""Create the secondary server redirecting to the primary server"""
259
new = self.get_readonly_server()
260
redirecting = HTTPServerRedirecting(
261
protocol_version=self._protocol_version)
262
redirecting.redirect_to(new.host, new.port)
263
redirecting._url_protocol = self._url_protocol
266
def get_old_url(self, relpath=None):
237
"""A support class providing redirections from one server to another.
239
We set up two webservers to allows various tests involving
241
The 'old' server is redirected to the 'new' server.
245
super(TestCaseWithRedirectedWebserver, self).setUp()
246
# The redirections will point to the new server
247
self.new_server = self.get_readonly_server()
248
# The requests to the old server will be redirected to the new server
249
self.old_server = self.get_secondary_server()
251
def create_transport_secondary_server(self):
252
"""Create the secondary server redirecting to the primary server"""
253
new = self.get_readonly_server()
254
redirecting = HTTPServerRedirecting(
255
protocol_version=self._protocol_version)
256
redirecting.redirect_to(new.host, new.port)
257
redirecting._url_protocol = self._url_protocol
260
def get_old_url(self, relpath=None):
267
261
base = self.old_server.get_url()
268
262
return self._adjust_url(base, relpath)
270
def get_old_transport(self, relpath=None):
264
def get_old_transport(self, relpath=None):
271
265
t = transport.get_transport_from_url(self.get_old_url(relpath))
272
266
self.assertTrue(t.is_readonly())
275
def get_new_url(self, relpath=None):
269
def get_new_url(self, relpath=None):
276
270
base = self.new_server.get_url()
277
271
return self._adjust_url(base, relpath)
279
def get_new_transport(self, relpath=None):
273
def get_new_transport(self, relpath=None):
280
274
t = transport.get_transport_from_url(self.get_new_url(relpath))
281
275
self.assertTrue(t.is_readonly())
334
328
scheme, raw_auth = auth_header.split(' ', 1)
335
329
if scheme.lower() == tcs.auth_scheme:
336
user, password = raw_auth.decode('base64').split(':')
337
return tcs.authorized(user, password)
330
user, password = base64.b64decode(raw_auth).split(b':')
331
return tcs.authorized(user.decode('ascii'),
332
password.decode('ascii'))
471
466
# Recalculate the response_digest to compare with the one
472
467
# sent by the client
473
A1 = '%s:%s:%s' % (user, realm, password)
474
A2 = '%s:%s' % (command, auth['uri'])
476
H = lambda x: osutils.md5(x).hexdigest()
477
KD = lambda secret, data: H("%s:%s" % (secret, data))
468
A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
469
A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
472
return osutils.md5(x).hexdigest()
474
def KD(secret, data):
475
return H(("%s:%s" % (secret, data)).encode('utf-8'))
479
477
nonce_count = int(auth['nc'], 16)