/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/http_utils.py

  • Committer: Martin
  • Date: 2018-11-16 16:38:22 UTC
  • mto: This revision was merged to the branch mainline in revision 7172.
  • Revision ID: gzlist@googlemail.com-20181116163822-yg1h1cdng6w7w9kn
Make --profile-imports work on Python 3

Also tweak heading to line up correctly.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
from cStringIO import StringIO
18
 
import errno
 
17
import base64
19
18
import re
20
 
import socket
21
 
import threading
22
 
import time
23
 
import urllib2
24
 
import urlparse
25
 
 
26
 
 
27
 
from bzrlib import (
 
19
try:
 
20
    from urllib.request import (
 
21
        parse_http_list,
 
22
        parse_keqv_list,
 
23
        )
 
24
except ImportError:  # python < 3
 
25
    from urllib2 import (
 
26
        parse_http_list,
 
27
        parse_keqv_list,
 
28
        )
 
29
 
 
30
 
 
31
from .. import (
28
32
    errors,
29
33
    osutils,
30
34
    tests,
31
 
    )
32
 
from bzrlib.smart import medium, protocol
33
 
from bzrlib.tests import http_server
34
 
from bzrlib.transport import (
35
 
    chroot,
36
 
    get_transport,
37
 
    )
 
35
    transport,
 
36
    )
 
37
from ..sixish import (
 
38
    BytesIO,
 
39
    )
 
40
from ..bzr.smart import (
 
41
    medium,
 
42
    )
 
43
from . import http_server
 
44
from ..transport import chroot
38
45
 
39
46
 
40
47
class HTTPServerWithSmarts(http_server.HttpServer):
51
58
class SmartRequestHandler(http_server.TestingHTTPRequestHandler):
52
59
    """Extend TestingHTTPRequestHandler to support smart client POSTs.
53
60
 
54
 
    XXX: This duplicates a fair bit of the logic in bzrlib.transport.http.wsgi.
 
61
    XXX: This duplicates a fair bit of the logic in breezy.transport.http.wsgi.
55
62
    """
56
63
 
57
64
    def do_POST(self):
58
65
        """Hand the request off to a smart server instance."""
59
 
        backing = get_transport(self.server.test_case_server._home_dir)
 
66
        backing = transport.get_transport_from_path(
 
67
            self.server.test_case_server._home_dir)
60
68
        chroot_server = chroot.ChrootServer(backing)
61
69
        chroot_server.start_server()
62
70
        try:
63
 
            t = get_transport(chroot_server.get_url())
 
71
            t = transport.get_transport_from_url(chroot_server.get_url())
64
72
            self.do_POST_inner(t)
65
73
        finally:
66
74
            chroot_server.stop_server()
85
93
        request_bytes = self.rfile.read(data_length)
86
94
        protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
87
95
            request_bytes)
88
 
        out_buffer = StringIO()
 
96
        out_buffer = BytesIO()
89
97
        smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
90
98
        # Perhaps there should be a SmartServerHTTPMedium that takes care of
91
99
        # feeding the bytes in the http request to the smart_protocol_request,
106
114
    one. This will currently fail if the primary transport is not
107
115
    backed by regular disk files.
108
116
    """
 
117
 
 
118
    # These attributes can be overriden or parametrized by daughter clasess if
 
119
    # needed, but must exist so that the create_transport_readonly_server()
 
120
    # method (or any method creating an http(s) server) can propagate it.
 
121
    _protocol_version = None
 
122
    _url_protocol = 'http'
 
123
 
109
124
    def setUp(self):
110
125
        super(TestCaseWithWebserver, self).setUp()
111
126
        self.transport_readonly_server = http_server.HttpServer
112
127
 
 
128
    def create_transport_readonly_server(self):
 
129
        server = self.transport_readonly_server(
 
130
            protocol_version=self._protocol_version)
 
131
        server._url_protocol = self._url_protocol
 
132
        return server
 
133
 
113
134
 
114
135
class TestCaseWithTwoWebservers(TestCaseWithWebserver):
115
136
    """A support class providing readonly urls on two servers that are http://.
127
148
 
128
149
        This is mostly a hook for daughter classes.
129
150
        """
130
 
        return self.transport_secondary_server()
 
151
        server = self.transport_secondary_server(
 
152
            protocol_version=self._protocol_version)
 
153
        server._url_protocol = self._url_protocol
 
154
        return server
131
155
 
132
156
    def get_secondary_server(self):
133
157
        """Get the server instance for the secondary transport."""
136
160
            self.start_server(self.__secondary_server)
137
161
        return self.__secondary_server
138
162
 
 
163
    def get_secondary_url(self, relpath=None):
 
164
        base = self.get_secondary_server().get_url()
 
165
        return self._adjust_url(base, relpath)
 
166
 
 
167
    def get_secondary_transport(self, relpath=None):
 
168
        t = transport.get_transport_from_url(self.get_secondary_url(relpath))
 
169
        self.assertTrue(t.is_readonly())
 
170
        return t
 
171
 
139
172
 
140
173
class ProxyServer(http_server.HttpServer):
141
174
    """A proxy test server for http transports."""
184
217
    def redirect_to(self, host, port):
185
218
        """Redirect all requests to a specific host:port"""
186
219
        self.redirections = [('(.*)',
187
 
                              r'http://%s:%s\1' % (host, port) ,
 
220
                              r'http://%s:%s\1' % (host, port),
188
221
                              301)]
189
222
 
190
223
    def is_redirected(self, path):
215
248
   The 'old' server is redirected to the 'new' server.
216
249
   """
217
250
 
 
251
   def setUp(self):
 
252
       super(TestCaseWithRedirectedWebserver, self).setUp()
 
253
       # The redirections will point to the new server
 
254
       self.new_server = self.get_readonly_server()
 
255
       # The requests to the old server will be redirected to the new server
 
256
       self.old_server = self.get_secondary_server()
 
257
 
218
258
   def create_transport_secondary_server(self):
219
259
       """Create the secondary server redirecting to the primary server"""
220
260
       new = self.get_readonly_server()
221
 
       redirecting = HTTPServerRedirecting()
 
261
       redirecting = HTTPServerRedirecting(
 
262
           protocol_version=self._protocol_version)
222
263
       redirecting.redirect_to(new.host, new.port)
 
264
       redirecting._url_protocol = self._url_protocol
223
265
       return redirecting
224
266
 
225
 
   def setUp(self):
226
 
       super(TestCaseWithRedirectedWebserver, self).setUp()
227
 
       # The redirections will point to the new server
228
 
       self.new_server = self.get_readonly_server()
229
 
       # The requests to the old server will be redirected
230
 
       self.old_server = self.get_secondary_server()
 
267
   def get_old_url(self, relpath=None):
 
268
        base = self.old_server.get_url()
 
269
        return self._adjust_url(base, relpath)
 
270
 
 
271
   def get_old_transport(self, relpath=None):
 
272
        t = transport.get_transport_from_url(self.get_old_url(relpath))
 
273
        self.assertTrue(t.is_readonly())
 
274
        return t
 
275
 
 
276
   def get_new_url(self, relpath=None):
 
277
        base = self.new_server.get_url()
 
278
        return self._adjust_url(base, relpath)
 
279
 
 
280
   def get_new_transport(self, relpath=None):
 
281
        t = transport.get_transport_from_url(self.get_new_url(relpath))
 
282
        self.assertTrue(t.is_readonly())
 
283
        return t
231
284
 
232
285
 
233
286
class AuthRequestHandler(http_server.TestingHTTPRequestHandler):
243
296
    # - auth_header_recv: the header received containing auth
244
297
    # - auth_error_code: the error code to indicate auth required
245
298
 
 
299
    def _require_authentication(self):
 
300
        # Note that we must update test_case_server *before*
 
301
        # sending the error or the client may try to read it
 
302
        # before we have sent the whole error back.
 
303
        tcs = self.server.test_case_server
 
304
        tcs.auth_required_errors += 1
 
305
        self.send_response(tcs.auth_error_code)
 
306
        self.send_header_auth_reqed()
 
307
        # We do not send a body
 
308
        self.send_header('Content-Length', '0')
 
309
        self.end_headers()
 
310
        return
 
311
 
246
312
    def do_GET(self):
247
313
        if self.authorized():
248
314
            return http_server.TestingHTTPRequestHandler.do_GET(self)
249
315
        else:
250
 
            # Note that we must update test_case_server *before*
251
 
            # sending the error or the client may try to read it
252
 
            # before we have sent the whole error back.
253
 
            tcs = self.server.test_case_server
254
 
            tcs.auth_required_errors += 1
255
 
            self.send_response(tcs.auth_error_code)
256
 
            self.send_header_auth_reqed()
257
 
            # We do not send a body
258
 
            self.send_header('Content-Length', '0')
259
 
            self.end_headers()
260
 
            return
 
316
            return self._require_authentication()
 
317
 
 
318
    def do_HEAD(self):
 
319
        if self.authorized():
 
320
            return http_server.TestingHTTPRequestHandler.do_HEAD(self)
 
321
        else:
 
322
            return self._require_authentication()
261
323
 
262
324
 
263
325
class BasicAuthRequestHandler(AuthRequestHandler):
272
334
        if auth_header:
273
335
            scheme, raw_auth = auth_header.split(' ', 1)
274
336
            if scheme.lower() == tcs.auth_scheme:
275
 
                user, password = raw_auth.decode('base64').split(':')
276
 
                return tcs.authorized(user, password)
 
337
                user, password = base64.b64decode(raw_auth).split(b':')
 
338
                return tcs.authorized(user.decode('ascii'),
 
339
                                      password.decode('ascii'))
277
340
 
278
341
        return False
279
342
 
302
365
            return False
303
366
        scheme, auth = auth_header.split(None, 1)
304
367
        if scheme.lower() == tcs.auth_scheme:
305
 
            auth_dict = urllib2.parse_keqv_list(urllib2.parse_http_list(auth))
 
368
            auth_dict = parse_keqv_list(parse_http_list(auth))
306
369
 
307
370
            return tcs.digest_authorized(auth_dict, self.command)
308
371
 
313
376
        header = 'Digest realm="%s", ' % tcs.auth_realm
314
377
        header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
315
378
                                                              'MD5')
316
 
        self.send_header(tcs.auth_header_sent,header)
 
379
        self.send_header(tcs.auth_header_sent, header)
317
380
 
318
381
 
319
382
class DigestAndBasicAuthRequestHandler(DigestAuthRequestHandler):
331
394
        header = 'Digest realm="%s", ' % tcs.auth_realm
332
395
        header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
333
396
                                                              'MD5')
334
 
        self.send_header(tcs.auth_header_sent,header)
 
397
        self.send_header(tcs.auth_header_sent, header)
335
398
 
336
399
 
337
400
class AuthServer(http_server.HttpServer):
349
412
    auth_header_sent = None
350
413
    auth_header_recv = None
351
414
    auth_error_code = None
352
 
    auth_realm = "Thou should not pass"
 
415
    auth_realm = u"Thou should not pass"
353
416
 
354
417
    def __init__(self, request_handler, auth_scheme,
355
418
                 protocol_version=None):
396
459
        if realm != self.auth_realm:
397
460
            return False
398
461
        user = auth['username']
399
 
        if not self.password_of.has_key(user):
 
462
        if user not in self.password_of:
400
463
            return False
401
464
        algorithm= auth['algorithm']
402
465
        if algorithm != 'MD5':
409
472
 
410
473
        # Recalculate the response_digest to compare with the one
411
474
        # sent by the client
412
 
        A1 = '%s:%s:%s' % (user, realm, password)
413
 
        A2 = '%s:%s' % (command, auth['uri'])
 
475
        A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
 
476
        A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
414
477
 
415
478
        H = lambda x: osutils.md5(x).hexdigest()
416
 
        KD = lambda secret, data: H("%s:%s" % (secret, data))
 
479
        KD = lambda secret, data: H(("%s:%s" % (secret, data)).encode('utf-8'))
417
480
 
418
481
        nonce_count = int(auth['nc'], 16)
419
482