/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/http_utils.py

  • Committer: Jelmer Vernooij
  • Date: 2017-07-23 22:06:41 UTC
  • mfrom: (6738 trunk)
  • mto: This revision was merged to the branch mainline in revision 6739.
  • Revision ID: jelmer@jelmer.uk-20170723220641-69eczax9bmv8d6kk
Merge trunk, address review comments.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
import base64
18
 
from io import BytesIO
19
17
import re
20
 
from urllib.request import (
21
 
    parse_http_list,
22
 
    parse_keqv_list,
23
 
    )
 
18
import urllib2
24
19
 
25
20
 
26
21
from .. import (
29
24
    tests,
30
25
    transport,
31
26
    )
 
27
from ..sixish import (
 
28
    BytesIO,
 
29
    )
32
30
from ..bzr.smart import (
33
31
    medium,
34
32
    )
83
81
        # we have to stop early due to error, but we would also have to use the
84
82
        # HTTP trailer facility which may not be widely available.
85
83
        request_bytes = self.rfile.read(data_length)
86
 
        protocol_factory, unused_bytes = (
87
 
            medium._get_protocol_factory_for_bytes(request_bytes))
 
84
        protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
 
85
            request_bytes)
88
86
        out_buffer = BytesIO()
89
87
        smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
90
88
        # Perhaps there should be a SmartServerHTTPMedium that takes care of
130
128
    We set up two webservers to allows various tests involving
131
129
    proxies or redirections from one server to the other.
132
130
    """
133
 
 
134
131
    def setUp(self):
135
132
        super(TestCaseWithTwoWebservers, self).setUp()
136
133
        self.transport_secondary_server = http_server.HttpServer
185
182
                # We do not send a body
186
183
                self.send_header('Content-Length', '0')
187
184
                self.end_headers()
188
 
                return False  # The job is done
 
185
                return False # The job is done
189
186
            else:
190
187
                # We leave the parent class serve the request
191
188
                pass
210
207
    def redirect_to(self, host, port):
211
208
        """Redirect all requests to a specific host:port"""
212
209
        self.redirections = [('(.*)',
213
 
                              r'http://%s:%s\1' % (host, port),
 
210
                              r'http://%s:%s\1' % (host, port) ,
214
211
                              301)]
215
212
 
216
213
    def is_redirected(self, path):
224
221
        code = None
225
222
        target = None
226
223
        for (rsource, rtarget, rcode) in self.redirections:
227
 
            target, match = re.subn(rsource, rtarget, path, count=1)
 
224
            target, match = re.subn(rsource, rtarget, path)
228
225
            if match:
229
226
                code = rcode
230
 
                break  # The first match wins
 
227
                break # The first match wins
231
228
            else:
232
229
                target = None
233
230
        return code, target
234
231
 
235
232
 
236
233
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
237
 
    """A support class providing redirections from one server to another.
238
 
 
239
 
    We set up two webservers to allows various tests involving
240
 
    redirections.
241
 
    The 'old' server is redirected to the 'new' server.
242
 
    """
243
 
 
244
 
    def setUp(self):
245
 
        super(TestCaseWithRedirectedWebserver, self).setUp()
246
 
        # The redirections will point to the new server
247
 
        self.new_server = self.get_readonly_server()
248
 
        # The requests to the old server will be redirected to the new server
249
 
        self.old_server = self.get_secondary_server()
250
 
 
251
 
    def create_transport_secondary_server(self):
252
 
        """Create the secondary server redirecting to the primary server"""
253
 
        new = self.get_readonly_server()
254
 
        redirecting = HTTPServerRedirecting(
255
 
            protocol_version=self._protocol_version)
256
 
        redirecting.redirect_to(new.host, new.port)
257
 
        redirecting._url_protocol = self._url_protocol
258
 
        return redirecting
259
 
 
260
 
    def get_old_url(self, relpath=None):
 
234
   """A support class providing redirections from one server to another.
 
235
 
 
236
   We set up two webservers to allows various tests involving
 
237
   redirections.
 
238
   The 'old' server is redirected to the 'new' server.
 
239
   """
 
240
 
 
241
   def setUp(self):
 
242
       super(TestCaseWithRedirectedWebserver, self).setUp()
 
243
       # The redirections will point to the new server
 
244
       self.new_server = self.get_readonly_server()
 
245
       # The requests to the old server will be redirected to the new server
 
246
       self.old_server = self.get_secondary_server()
 
247
 
 
248
   def create_transport_secondary_server(self):
 
249
       """Create the secondary server redirecting to the primary server"""
 
250
       new = self.get_readonly_server()
 
251
       redirecting = HTTPServerRedirecting(
 
252
           protocol_version=self._protocol_version)
 
253
       redirecting.redirect_to(new.host, new.port)
 
254
       redirecting._url_protocol = self._url_protocol
 
255
       return redirecting
 
256
 
 
257
   def get_old_url(self, relpath=None):
261
258
        base = self.old_server.get_url()
262
259
        return self._adjust_url(base, relpath)
263
260
 
264
 
    def get_old_transport(self, relpath=None):
 
261
   def get_old_transport(self, relpath=None):
265
262
        t = transport.get_transport_from_url(self.get_old_url(relpath))
266
263
        self.assertTrue(t.is_readonly())
267
264
        return t
268
265
 
269
 
    def get_new_url(self, relpath=None):
 
266
   def get_new_url(self, relpath=None):
270
267
        base = self.new_server.get_url()
271
268
        return self._adjust_url(base, relpath)
272
269
 
273
 
    def get_new_transport(self, relpath=None):
 
270
   def get_new_transport(self, relpath=None):
274
271
        t = transport.get_transport_from_url(self.get_new_url(relpath))
275
272
        self.assertTrue(t.is_readonly())
276
273
        return t
327
324
        if auth_header:
328
325
            scheme, raw_auth = auth_header.split(' ', 1)
329
326
            if scheme.lower() == tcs.auth_scheme:
330
 
                user, password = base64.b64decode(raw_auth).split(b':')
331
 
                return tcs.authorized(user.decode('ascii'),
332
 
                                      password.decode('ascii'))
 
327
                user, password = raw_auth.decode('base64').split(':')
 
328
                return tcs.authorized(user, password)
333
329
 
334
330
        return False
335
331
 
358
354
            return False
359
355
        scheme, auth = auth_header.split(None, 1)
360
356
        if scheme.lower() == tcs.auth_scheme:
361
 
            auth_dict = parse_keqv_list(parse_http_list(auth))
 
357
            auth_dict = urllib2.parse_keqv_list(urllib2.parse_http_list(auth))
362
358
 
363
359
            return tcs.digest_authorized(auth_dict, self.command)
364
360
 
369
365
        header = 'Digest realm="%s", ' % tcs.auth_realm
370
366
        header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
371
367
                                                              'MD5')
372
 
        self.send_header(tcs.auth_header_sent, header)
 
368
        self.send_header(tcs.auth_header_sent,header)
373
369
 
374
370
 
375
371
class DigestAndBasicAuthRequestHandler(DigestAuthRequestHandler):
387
383
        header = 'Digest realm="%s", ' % tcs.auth_realm
388
384
        header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
389
385
                                                              'MD5')
390
 
        self.send_header(tcs.auth_header_sent, header)
 
386
        self.send_header(tcs.auth_header_sent,header)
391
387
 
392
388
 
393
389
class AuthServer(http_server.HttpServer):
405
401
    auth_header_sent = None
406
402
    auth_header_recv = None
407
403
    auth_error_code = None
408
 
    auth_realm = u"Thou should not pass"
 
404
    auth_realm = "Thou should not pass"
409
405
 
410
406
    def __init__(self, request_handler, auth_scheme,
411
407
                 protocol_version=None):
454
450
        user = auth['username']
455
451
        if user not in self.password_of:
456
452
            return False
457
 
        algorithm = auth['algorithm']
 
453
        algorithm= auth['algorithm']
458
454
        if algorithm != 'MD5':
459
455
            return False
460
456
        qop = auth['qop']
465
461
 
466
462
        # Recalculate the response_digest to compare with the one
467
463
        # sent by the client
468
 
        A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
469
 
        A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
470
 
 
471
 
        def H(x):
472
 
            return osutils.md5(x).hexdigest()
473
 
 
474
 
        def KD(secret, data):
475
 
            return H(("%s:%s" % (secret, data)).encode('utf-8'))
 
464
        A1 = '%s:%s:%s' % (user, realm, password)
 
465
        A2 = '%s:%s' % (command, auth['uri'])
 
466
 
 
467
        H = lambda x: osutils.md5(x).hexdigest()
 
468
        KD = lambda secret, data: H("%s:%s" % (secret, data))
476
469
 
477
470
        nonce_count = int(auth['nc'], 16)
478
471
 
562
555
        self.init_proxy_auth()
563
556
        # We really accept Digest only
564
557
        self.auth_scheme = 'digest'
 
558
 
 
559