/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/tests/http_utils.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
import base64
18
 
from io import BytesIO
 
17
from cStringIO import StringIO
19
18
import re
20
 
from urllib.request import (
21
 
    parse_http_list,
22
 
    parse_keqv_list,
23
 
    )
24
 
 
25
 
 
26
 
from .. import (
 
19
import urllib2
 
20
 
 
21
 
 
22
from brzlib import (
27
23
    errors,
28
24
    osutils,
29
25
    tests,
30
26
    transport,
31
27
    )
32
 
from ..bzr.smart import (
 
28
from brzlib.smart import (
33
29
    medium,
34
30
    )
35
 
from . import http_server
36
 
from ..transport import chroot
 
31
from brzlib.tests import http_server
 
32
from brzlib.transport import chroot
37
33
 
38
34
 
39
35
class HTTPServerWithSmarts(http_server.HttpServer):
50
46
class SmartRequestHandler(http_server.TestingHTTPRequestHandler):
51
47
    """Extend TestingHTTPRequestHandler to support smart client POSTs.
52
48
 
53
 
    XXX: This duplicates a fair bit of the logic in breezy.transport.http.wsgi.
 
49
    XXX: This duplicates a fair bit of the logic in brzlib.transport.http.wsgi.
54
50
    """
55
51
 
56
52
    def do_POST(self):
83
79
        # we have to stop early due to error, but we would also have to use the
84
80
        # HTTP trailer facility which may not be widely available.
85
81
        request_bytes = self.rfile.read(data_length)
86
 
        protocol_factory, unused_bytes = (
87
 
            medium._get_protocol_factory_for_bytes(request_bytes))
88
 
        out_buffer = BytesIO()
 
82
        protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
 
83
            request_bytes)
 
84
        out_buffer = StringIO()
89
85
        smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
90
86
        # Perhaps there should be a SmartServerHTTPMedium that takes care of
91
87
        # feeding the bytes in the http request to the smart_protocol_request,
130
126
    We set up two webservers to allows various tests involving
131
127
    proxies or redirections from one server to the other.
132
128
    """
133
 
 
134
129
    def setUp(self):
135
130
        super(TestCaseWithTwoWebservers, self).setUp()
136
131
        self.transport_secondary_server = http_server.HttpServer
185
180
                # We do not send a body
186
181
                self.send_header('Content-Length', '0')
187
182
                self.end_headers()
188
 
                return False  # The job is done
 
183
                return False # The job is done
189
184
            else:
190
185
                # We leave the parent class serve the request
191
186
                pass
210
205
    def redirect_to(self, host, port):
211
206
        """Redirect all requests to a specific host:port"""
212
207
        self.redirections = [('(.*)',
213
 
                              r'http://%s:%s\1' % (host, port),
 
208
                              r'http://%s:%s\1' % (host, port) ,
214
209
                              301)]
215
210
 
216
211
    def is_redirected(self, path):
224
219
        code = None
225
220
        target = None
226
221
        for (rsource, rtarget, rcode) in self.redirections:
227
 
            target, match = re.subn(rsource, rtarget, path, count=1)
 
222
            target, match = re.subn(rsource, rtarget, path)
228
223
            if match:
229
224
                code = rcode
230
 
                break  # The first match wins
 
225
                break # The first match wins
231
226
            else:
232
227
                target = None
233
228
        return code, target
234
229
 
235
230
 
236
231
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
237
 
    """A support class providing redirections from one server to another.
238
 
 
239
 
    We set up two webservers to allows various tests involving
240
 
    redirections.
241
 
    The 'old' server is redirected to the 'new' server.
242
 
    """
243
 
 
244
 
    def setUp(self):
245
 
        super(TestCaseWithRedirectedWebserver, self).setUp()
246
 
        # The redirections will point to the new server
247
 
        self.new_server = self.get_readonly_server()
248
 
        # The requests to the old server will be redirected to the new server
249
 
        self.old_server = self.get_secondary_server()
250
 
 
251
 
    def create_transport_secondary_server(self):
252
 
        """Create the secondary server redirecting to the primary server"""
253
 
        new = self.get_readonly_server()
254
 
        redirecting = HTTPServerRedirecting(
255
 
            protocol_version=self._protocol_version)
256
 
        redirecting.redirect_to(new.host, new.port)
257
 
        redirecting._url_protocol = self._url_protocol
258
 
        return redirecting
259
 
 
260
 
    def get_old_url(self, relpath=None):
 
232
   """A support class providing redirections from one server to another.
 
233
 
 
234
   We set up two webservers to allows various tests involving
 
235
   redirections.
 
236
   The 'old' server is redirected to the 'new' server.
 
237
   """
 
238
 
 
239
   def setUp(self):
 
240
       super(TestCaseWithRedirectedWebserver, self).setUp()
 
241
       # The redirections will point to the new server
 
242
       self.new_server = self.get_readonly_server()
 
243
       # The requests to the old server will be redirected to the new server
 
244
       self.old_server = self.get_secondary_server()
 
245
 
 
246
   def create_transport_secondary_server(self):
 
247
       """Create the secondary server redirecting to the primary server"""
 
248
       new = self.get_readonly_server()
 
249
       redirecting = HTTPServerRedirecting(
 
250
           protocol_version=self._protocol_version)
 
251
       redirecting.redirect_to(new.host, new.port)
 
252
       redirecting._url_protocol = self._url_protocol
 
253
       return redirecting
 
254
 
 
255
   def get_old_url(self, relpath=None):
261
256
        base = self.old_server.get_url()
262
257
        return self._adjust_url(base, relpath)
263
258
 
264
 
    def get_old_transport(self, relpath=None):
 
259
   def get_old_transport(self, relpath=None):
265
260
        t = transport.get_transport_from_url(self.get_old_url(relpath))
266
261
        self.assertTrue(t.is_readonly())
267
262
        return t
268
263
 
269
 
    def get_new_url(self, relpath=None):
 
264
   def get_new_url(self, relpath=None):
270
265
        base = self.new_server.get_url()
271
266
        return self._adjust_url(base, relpath)
272
267
 
273
 
    def get_new_transport(self, relpath=None):
 
268
   def get_new_transport(self, relpath=None):
274
269
        t = transport.get_transport_from_url(self.get_new_url(relpath))
275
270
        self.assertTrue(t.is_readonly())
276
271
        return t
327
322
        if auth_header:
328
323
            scheme, raw_auth = auth_header.split(' ', 1)
329
324
            if scheme.lower() == tcs.auth_scheme:
330
 
                user, password = base64.b64decode(raw_auth).split(b':')
331
 
                return tcs.authorized(user.decode('ascii'),
332
 
                                      password.decode('ascii'))
 
325
                user, password = raw_auth.decode('base64').split(':')
 
326
                return tcs.authorized(user, password)
333
327
 
334
328
        return False
335
329
 
358
352
            return False
359
353
        scheme, auth = auth_header.split(None, 1)
360
354
        if scheme.lower() == tcs.auth_scheme:
361
 
            auth_dict = parse_keqv_list(parse_http_list(auth))
 
355
            auth_dict = urllib2.parse_keqv_list(urllib2.parse_http_list(auth))
362
356
 
363
357
            return tcs.digest_authorized(auth_dict, self.command)
364
358
 
369
363
        header = 'Digest realm="%s", ' % tcs.auth_realm
370
364
        header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
371
365
                                                              'MD5')
372
 
        self.send_header(tcs.auth_header_sent, header)
 
366
        self.send_header(tcs.auth_header_sent,header)
373
367
 
374
368
 
375
369
class DigestAndBasicAuthRequestHandler(DigestAuthRequestHandler):
387
381
        header = 'Digest realm="%s", ' % tcs.auth_realm
388
382
        header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
389
383
                                                              'MD5')
390
 
        self.send_header(tcs.auth_header_sent, header)
 
384
        self.send_header(tcs.auth_header_sent,header)
391
385
 
392
386
 
393
387
class AuthServer(http_server.HttpServer):
405
399
    auth_header_sent = None
406
400
    auth_header_recv = None
407
401
    auth_error_code = None
408
 
    auth_realm = u"Thou should not pass"
 
402
    auth_realm = "Thou should not pass"
409
403
 
410
404
    def __init__(self, request_handler, auth_scheme,
411
405
                 protocol_version=None):
452
446
        if realm != self.auth_realm:
453
447
            return False
454
448
        user = auth['username']
455
 
        if user not in self.password_of:
 
449
        if not self.password_of.has_key(user):
456
450
            return False
457
 
        algorithm = auth['algorithm']
 
451
        algorithm= auth['algorithm']
458
452
        if algorithm != 'MD5':
459
453
            return False
460
454
        qop = auth['qop']
465
459
 
466
460
        # Recalculate the response_digest to compare with the one
467
461
        # sent by the client
468
 
        A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
469
 
        A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
470
 
 
471
 
        def H(x):
472
 
            return osutils.md5(x).hexdigest()
473
 
 
474
 
        def KD(secret, data):
475
 
            return H(("%s:%s" % (secret, data)).encode('utf-8'))
 
462
        A1 = '%s:%s:%s' % (user, realm, password)
 
463
        A2 = '%s:%s' % (command, auth['uri'])
 
464
 
 
465
        H = lambda x: osutils.md5(x).hexdigest()
 
466
        KD = lambda secret, data: H("%s:%s" % (secret, data))
476
467
 
477
468
        nonce_count = int(auth['nc'], 16)
478
469
 
562
553
        self.init_proxy_auth()
563
554
        # We really accept Digest only
564
555
        self.auth_scheme = 'digest'
 
556
 
 
557